Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • BICF/Astrocyte/chipseq_analysis
  • s190984/chipseq_analysis
  • astrocyte/workflows/bicf/chipseq_analysis
  • s219741/chipseq-analysis-containerized
Show changes
Showing
with 1390 additions and 0 deletions
#!/usr/bin/env python3
#
# * --------------------------------------------------------------------------
# * Licensed under MIT (https://git.biohpc.swmed.edu/BICF/Astrocyte/chipseq_analysis/LICENSE.md)
# * --------------------------------------------------------------------------
#
'''Trim low quality reads and remove sequences less than 35 base pairs.'''
import subprocess
import argparse
import shutil
import os
import logging
EPILOG = '''
For more details:
%(prog)s --help
'''
# SETTINGS
logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())
logger.propagate = False
logger.setLevel(logging.INFO)
def get_args():
'''Define arguments.'''
parser = argparse.ArgumentParser(
description=__doc__, epilog=EPILOG,
formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument('-f', '--fastq',
help="The fastq file to run triming on.",
nargs='+',
required=True)
parser.add_argument('-s', '--sample',
help="The name of the sample.",
required=True)
parser.add_argument('-p', '--paired',
help="True/False if paired-end or single end.",
default=False,
action='store_true')
args = parser.parse_args()
return args
def check_tools():
'''Checks for required componenets on user system.'''
logger.info('Checking for required libraries and components on this system')
trimgalore_path = shutil.which("trim_galore")
if trimgalore_path:
logger.info('Found trimgalore: %s', trimgalore_path)
# Get Version
trim_version_command = "trim_galore --version"
trimgalore_version = subprocess.check_output(trim_version_command, shell=True)
# Write to file
trimgalore_file = open("version_trimgalore.txt", "wb")
trimgalore_file.write(trimgalore_version)
trimgalore_file.close()
else:
logger.error('Missing trimgalore')
raise Exception('Missing trimgalore')
cutadapt_path = shutil.which("cutadapt")
if cutadapt_path:
logger.info('Found cutadapt: %s', cutadapt_path)
# Get Version
cutadapt_version_command = "cutadapt --version"
cutadapt_version = subprocess.check_output(cutadapt_version_command, shell=True)
# Write to file
cutadapt_file = open("version_cutadapt.txt", "wb")
cutadapt_file.write(b"Version %s" % (cutadapt_version))
cutadapt_file.close()
else:
logger.error('Missing cutadapt')
raise Exception('Missing cutadapt')
def rename_reads(fastq, sample, paired):
'''Rename fastq files by sample name.'''
# Get current directory to build paths
cwd = os.getcwd()
renamed_fastq = []
if paired: # paired-end data
# Set file names
renamed_fastq.append(cwd + '/' + sample + '_R1.fastq.gz')
renamed_fastq.append(cwd + '/' + sample + '_R2.fastq.gz')
# Great symbolic links
os.symlink(fastq[0], renamed_fastq[0])
os.symlink(fastq[1], renamed_fastq[1])
else:
# Set file names
renamed_fastq.append(cwd + '/' + sample + '_R1.fastq.gz')
# Great symbolic links
os.symlink(fastq[0], renamed_fastq[0])
return renamed_fastq
def trim_reads(fastq, paired):
'''Run trim_galore on 1 or 2 files.'''
if paired: # paired-end data
trim_params = '--paired -q 25 --illumina --gzip --length 35'
trim_command = "trim_galore %s %s %s " \
% (trim_params, fastq[0], fastq[1])
else:
trim_params = '-q 25 --illumina --gzip --length 35'
trim_command = "trim_galore %s %s " \
% (trim_params, fastq[0])
logger.info("Running trim_galore with %s", trim_command)
trim = subprocess.Popen(trim_command, shell=True)
out, err = trim.communicate()
def main():
args = get_args()
fastq = args.fastq
sample = args.sample
paired = args.paired
# Create a file handler
handler = logging.FileHandler('trim.log')
logger.addHandler(handler)
# Check if tools are present
check_tools()
# Rename fastq files by sample
fastq_rename = rename_reads(fastq, sample, paired)
# Run trim_reads
trim_reads(fastq_rename, paired)
if __name__ == '__main__':
main()
#!/usr/bin/env python3
#
# * --------------------------------------------------------------------------
# * Licensed under MIT (https://git.biohpc.swmed.edu/BICF/Astrocyte/chipseq_analysis/LICENSE.md)
# * --------------------------------------------------------------------------
#
'''General utilities.'''
import shlex
import logging
import subprocess
import sys
import os
logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())
logger.propagate = True
def run_pipe(steps, outfile=None):
from subprocess import Popen, PIPE
p = None
p_next = None
first_step_n = 1
last_step_n = len(steps)
for n, step in enumerate(steps, start=first_step_n):
logger.debug("step %d: %s" % (n, step))
if n == first_step_n:
if n == last_step_n and outfile: # one-step pipeline with outfile
with open(outfile, 'w') as fh:
print("one step shlex: %s to file: %s" % (shlex.split(step), outfile))
p = Popen(shlex.split(step), stdout=fh)
break
print("first step shlex to stdout: %s" % (shlex.split(step)))
p = Popen(shlex.split(step), stdout=PIPE)
elif n == last_step_n and outfile: # only treat the last step specially if you're sending stdout to a file
with open(outfile, 'w') as fh:
print("last step shlex: %s to file: %s" % (shlex.split(step), outfile))
p_last = Popen(shlex.split(step), stdin=p.stdout, stdout=fh)
p.stdout.close()
p = p_last
else: # handles intermediate steps and, in the case of a pipe to stdout, the last step
print("intermediate step %d shlex to stdout: %s" % (n, shlex.split(step)))
p_next = Popen(shlex.split(step), stdin=p.stdout, stdout=PIPE)
p.stdout.close()
p = p_next
out, err = p.communicate()
return out, err
def block_on(command):
process = subprocess.Popen(shlex.split(command), stderr=subprocess.STDOUT, stdout=subprocess.PIPE)
for line in iter(process.stdout.readline, b''):
sys.stdout.write(line.decode('utf-8'))
process.communicate()
return process.returncode
def strip_extensions(filename, extensions):
'''Strips extensions to get basename of file.'''
basename = filename
for extension in extensions:
basename = basename.rpartition(extension)[0] or basename
return basename
def count_lines(filename):
import mimetypes
compressed_mimetypes = [
"compress",
"bzip2",
"gzip"
]
mime_type = mimetypes.guess_type(filename)[1]
if mime_type in compressed_mimetypes:
catcommand = 'gzip -dc'
else:
catcommand = 'cat'
out, err = run_pipe([
'%s %s' % (catcommand, filename),
'wc -l'
])
return int(out)
def rescale_scores(filename, scores_col, new_min=10, new_max=1000):
sorted_fn = 'sorted-%s' % (filename)
rescaled_fn = 'rescaled-%s' % (filename)
out, err = run_pipe([
'sort -k %dgr,%dgr %s' % (scores_col, scores_col, filename),
r"""awk 'BEGIN{FS="\t";OFS="\t"}{if (NF != 0) print $0}'"""],
sorted_fn)
out, err = run_pipe([
'head -n 1 %s' % (sorted_fn),
'cut -f %s' % (scores_col)])
max_score = float(out.strip())
logger.info("rescale_scores: max_score = %s", max_score)
out, err = run_pipe([
'tail -n 1 %s' % (sorted_fn),
'cut -f %s' % (scores_col)])
min_score = float(out.strip())
logger.info("rescale_scores: min_score = %s", min_score)
a = min_score
b = max_score
x = new_min
y = new_max
if min_score == max_score: # give all peaks new_min
rescale_formula = "x"
else: # n is the unscaled score from scores_col
rescale_formula = "((n-a)*(y-x)/(b-a))+x"
out, err = run_pipe(
[
'cat %s' % (sorted_fn),
r"""awk 'BEGIN{OFS="\t"}{n=$%d;a=%d;b=%d;x=%d;y=%d}"""
% (scores_col, a, b, x, y) +
r"""{$%d=int(%s) ; print $0}'"""
% (scores_col, rescale_formula)
],
rescaled_fn)
os.remove(sorted_fn)
return rescaled_fn
#!/usr/bin/env python3
#
# * --------------------------------------------------------------------------
# * Licensed under MIT (https://git.biohpc.swmed.edu/BICF/Astrocyte/chipseq_analysis/LICENSE.md)
# * --------------------------------------------------------------------------
#
'''Compute cross-correlation analysis.'''
import os
import argparse
import shutil
import subprocess
import logging
from multiprocessing import cpu_count
import utils
EPILOG = '''
For more details:
%(prog)s --help
'''
# SETTINGS
logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())
logger.propagate = False
logger.setLevel(logging.INFO)
# the order of this list is important.
# strip_extensions strips from the right inward, so
# the expected right-most extensions should appear first (like .gz)
# Modified from J. Seth Strattan
STRIP_EXTENSIONS = ['.gz', '.tagAlign', '.bedse', '.bedpe']
def get_args():
'''Define arguments.'''
parser = argparse.ArgumentParser(
description=__doc__, epilog=EPILOG,
formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument('-t', '--tag',
help="The tagAlign file to qc on.",
required=True)
parser.add_argument('-p', '--paired',
help="True/False if paired-end or single end.",
default=False,
action='store_true')
args = parser.parse_args()
return args
# Functions
def check_tools():
'''Checks for required componenets on user system'''
logger.info('Checking for required libraries and components on this system')
r_path = shutil.which("R")
if r_path:
logger.info('Found R: %s', r_path)
# Get Version
r_version_command = "R --version"
r_version = subprocess.check_output(r_version_command, shell=True)
# Write to file
r_file = open("version_r.txt", "wb")
r_file.write(r_version)
r_file.close()
else:
logger.error('Missing R')
raise Exception('Missing R')
phantompeak_path = shutil.which("run_spp.R")
if phantompeak_path:
logger.info('Found phantompeak: %s', phantompeak_path)
# Get Version
spp_version_command = "R -e \"packageVersion('spp')\""
spp_version = subprocess.check_output(spp_version_command, shell=True)
# Write to file
spp_file = open("version_spp.txt", "wb")
spp_file.write(spp_version)
spp_file.close()
else:
logger.error('Missing phantompeak')
raise Exception('Missing phantompeak')
def xcor(tag, paired):
'''Use spp to calculate cross-correlation stats.'''
tag_basename = os.path.basename(utils.strip_extensions(tag, STRIP_EXTENSIONS))
uncompressed_tag_filename = tag_basename
# Subsample tagAlign file
number_reads = 20000000
subsampled_tag_filename = \
tag_basename + ".%d.tagAlign.gz" % (number_reads/1000000)
tag_extended = 'cat.tagAlign.gz'
out, err = utils.run_pipe([
"zcat %s %s %s" %
(tag, tag, tag)
], outfile=tag_extended)
steps = [
'zcat %s' % (tag),
'grep -v "chrM"',
'shuf -n %d --random-source=%s' % (number_reads, tag_extended)]
if paired:
steps.extend([r"""awk 'BEGIN{OFS="\t"}{$4="N";$5="1000";print $0}'"""])
steps.extend(['gzip -nc'])
out, err = utils.run_pipe(steps, outfile=subsampled_tag_filename)
# Calculate Cross-correlation QC scores
cc_scores_filename = tag_basename + ".cc.qc"
cc_plot_filename = tag_basename + ".cc.plot.pdf"
# CC_SCORE FILE format
# Filename <tab>
# numReads <tab>
# estFragLen <tab>
# corr_estFragLen <tab>
# PhantomPeak <tab>
# corr_phantomPeak <tab>
# argmin_corr <tab>
# min_corr <tab>
# phantomPeakCoef <tab>
# relPhantomPeakCoef <tab>
# QualityTag
run_spp_command = shutil.which("run_spp.R")
out, err = utils.run_pipe([
"Rscript %s -c=%s -p=%d -filtchr=chrM -savp=%s -out=%s" %
(run_spp_command, subsampled_tag_filename, cpu_count(),
cc_plot_filename, cc_scores_filename)
])
return cc_scores_filename
def main():
args = get_args()
paired = args.paired
tag = args.tag
# Create a file handler
handler = logging.FileHandler('xcor.log')
logger.addHandler(handler)
# Check if tools are present
check_tools()
# Calculate Cross-correlation
xcor(tag, paired)
if __name__ == '__main__':
main()
#!/opt/bats/libexec/bats-core/ bats
profile_script="./workflow/scripts/plot_profile.sh"
@test "Test deeptools present" {
source ${profile_script}
run check_tools
}
@test "Test deeptools computeMatrix" {
source ${profile_script}
run compute_matrix test_data/ENCSR238SGC_pooled.fc_signal.bw /project/shared/bicf_workflow_ref/mouse/GRCm38/gencode.vM20.annotation.gtf
FILE=computeMatrix.gz
if [[ -s "$FILE" ]]; then
echo "$FILE exists and not empty"
fi
}
@test "Test deeptools plotProfile" {
source ${profile_script}
run plot_profile computeMatrix.gz
FILE=plotProfile.png
if [[ -s "$FILE" ]]; then
echo "$FILE exists and not empty"
fi
}
#!/usr/bin/env python3
import pytest
import pandas as pd
from io import StringIO
import os
import utils
test_output_path = os.path.dirname(os.path.abspath(__file__)) + \
'/../output/peakAnnotation/'
@pytest.mark.singleend
def test_pie_singleend():
assert os.path.exists(os.path.join(test_output_path, 'ENCSR238SGC.chipseeker_pie.pdf'))
@pytest.mark.singleend
def test_upsetplot_singleend():
assert os.path.exists(os.path.join(test_output_path, 'ENCSR238SGC.chipseeker_upsetplot.pdf'))
@pytest.mark.singleend
def test_annotation_singleend():
annotation_file = test_output_path + 'ENCSR238SGC.chipseeker_annotation.tsv'
assert os.path.exists(annotation_file)
assert utils.count_lines(annotation_file) >= 149284
df = pd.read_csv(annotation_file, sep = "\t", header = 0)
print(df.head())
#assert df['symbol'].notna().all()
assert not(df['symbol'].isnull().values.any())
@pytest.mark.pairedend
def test_pie_pairedend():
assert os.path.exists(os.path.join(test_output_path, 'ENCSR729LGA.chipseeker_pie.pdf'))
@pytest.mark.pairedend
def test_upsetplot_pairedend():
assert os.path.exists(os.path.join(test_output_path, 'ENCSR729LGA.chipseeker_upsetplot.pdf'))
@pytest.mark.pairedend
def test_annotation_pairedend():
annotation_file = test_output_path + 'ENCSR729LGA.chipseeker_annotation.tsv'
assert os.path.exists(annotation_file)
assert utils.count_lines(annotation_file) >= 25367
df = pd.read_csv(annotation_file, sep = "\t", header = 0)
print(df.head())
#assert df['symbol'].notna().all()
assert not(df['symbol'].isnull().values.any())
#!/usr/bin/env python3
import pytest
import os
import utils
from io import StringIO
import call_peaks_macs
test_output_path = os.path.dirname(os.path.abspath(__file__)) + \
'/../output/callPeaksMACS/'
test_data_path = os.path.dirname(os.path.abspath(__file__)) + \
'/../../test_data/'
@pytest.mark.unit
def test_fragment_length():
experiment = "experiment.tagAlign.gz"
control = "control.tagAlign.gz"
prefix = 'test'
genome_size = 'hs'
chrom_sizes = 'genomefile.txt'
cross_qc = os.path.join(test_data_path, 'test_cross.qc')
with pytest.raises(Exception) as excinfo:
call_peaks_macs.call_peaks_macs(experiment, cross_qc, control, prefix, genome_size, chrom_sizes)
assert str(excinfo.value) == "Error in cross-correlation analysis: ['0', '20', '33']"
@pytest.mark.singleend
def test_fc_signal_singleend():
assert os.path.exists(os.path.join(test_output_path, 'ENCSR238SGC/1/', 'ENCLB144FDT.fc_signal.bw'))
@pytest.mark.singleend
def test_pvalue_signal_singleend():
assert os.path.exists(os.path.join(test_output_path, 'ENCSR238SGC/1/', 'ENCLB144FDT.pvalue_signal.bw'))
@pytest.mark.singleend
def test_peaks_xls_singleend():
assert os.path.exists(os.path.join(test_output_path, 'ENCSR238SGC/1/', 'ENCLB144FDT_peaks.xls'))
@pytest.mark.singleend
def test_peaks_bed_singleend():
peak_file = test_output_path + 'ENCSR238SGC/1/' + 'ENCLB144FDT.narrowPeak'
assert utils.count_lines(peak_file) == 226738
@pytest.mark.pairedend
def test_fc_signal_pairedend():
assert os.path.exists(os.path.join(test_output_path, 'ENCSR729LGA/2/', 'ENCLB568IYX.fc_signal.bw'))
@pytest.mark.pairedend
def test_pvalue_signal_pairedend():
assert os.path.exists(os.path.join(test_output_path, 'ENCSR729LGA/2/', 'ENCLB568IYX.pvalue_signal.bw'))
@pytest.mark.pairedend
def test_peaks_xls_pairedend():
assert os.path.exists(os.path.join(test_output_path, 'ENCSR729LGA/2/', 'ENCLB568IYX_peaks.xls'))
@pytest.mark.pairedend
def test_peaks_bed_pairedend():
peak_file = test_output_path + 'ENCSR729LGA/2/' + 'ENCLB568IYX.narrowPeak'
assert utils.count_lines(peak_file) == 112631
#!/usr/bin/env python3
import pytest
import pandas as pd
from io import StringIO
import check_design
DESIGN_STRING = """sample_id\texperiment_id\tbiosample\tfactor\ttreatment\treplicate\tcontrol_id\tfastq_read1
A_1\tA\tLiver\tH3K27ac\tNone\t1\tB_1\tA_1.fastq.gz
A_2\tA\tLiver\tH3K27ac\tNone\t2\tB_2\tA_2.fastq.gz
B_1\tB\tLiver\tInput\tNone\t1\tB_1\tB_1.fastq.gz
B_2\tB\tLiver\tInput\tNone\t2\tB_2\tB_2.fastq.gz
"""
FASTQ_STRING = """
A_1.fastq.gz\t/path/to/file/A_1.fastq.gz
A_2.fastq.gz\t/path/to/file/A_2.fastq.gz
B_1.fastq.gz\t/path/to/file/B_1.fastq.gz
B_2.fastq.gz\t/path/to/file/B_2.fastq.gz
"""
@pytest.fixture
def design():
design_file = StringIO(DESIGN_STRING)
design_df = pd.read_csv(design_file, sep="\t")
return design_df
@pytest.fixture
def fastq_files():
fastq_file = StringIO(FASTQ_STRING)
fastq_df = pd.read_csv(fastq_file, sep='\t', names=['name', 'path'])
return fastq_df
@pytest.fixture
def design_1(design):
design_df = design.drop('fastq_read1', axis=1)
return design_df
@pytest.fixture
def design_2(design):
# Drop Control B_1
design_df = design.drop(design.index[2])
return design_df
@pytest.fixture
def design_3(design):
# Drop A_2 and B_2 and append as fastq_read2
design_df = design.drop(design.index[[1, 3]])
design_df['fastq_read2'] = design.loc[[1, 3], 'fastq_read1'].values
return design_df
@pytest.fixture
def design_4(design):
# Update replicate 2 for experiment B to be 1
design.loc[design['sample_id'] == 'B_2', 'replicate'] = 1
return design
@pytest.fixture
def design_5(design):
# Update sample_id to have -, spaces or periods
design.loc[design['sample_id'] == 'A_1', 'sample_id'] = 'A 1'
design.loc[design['sample_id'] == 'A_2', 'sample_id'] = 'A.2'
design.loc[design['sample_id'] == 'B_1', 'sample_id'] = 'B-1'
return design
@pytest.fixture
def design_6(design):
# Update experiment_id to have -, spaces or periods
design.loc[design['sample_id'] == 'A_1', 'experiment_id'] = 'A ChIP'
design.loc[design['sample_id'] == 'A_2', 'experiment_id'] = 'A.ChIP'
design.loc[design['sample_id'] == 'B_1', 'experiment_id'] = 'B-ChIP'
return design
@pytest.fixture
def fastq_files_1(fastq_files):
# Drop B_2.fastq.gz
fastq_df = fastq_files.drop(fastq_files.index[3])
return fastq_df
@pytest.mark.unit
def test_check_headers_singleend(design_1):
paired = False
with pytest.raises(Exception) as excinfo:
check_design.check_design_headers(design_1, paired)
assert str(excinfo.value) == "Missing column headers: ['fastq_read1']"
@pytest.mark.unit
def test_check_headers_pairedend(design):
paired = True
with pytest.raises(Exception) as excinfo:
check_design.check_design_headers(design, paired)
assert str(excinfo.value) == "Missing column headers: ['fastq_read2']"
@pytest.mark.unit
def test_check_controls(design_2):
with pytest.raises(Exception) as excinfo:
check_design.check_controls(design_2)
assert str(excinfo.value) == "Missing control experiments: ['B_1']"
@pytest.mark.unit
def test_check_files_missing_files(design, fastq_files_1):
paired = False
with pytest.raises(Exception) as excinfo:
new_design = check_design.check_files(design, fastq_files_1, paired)
assert str(excinfo.value) == "Missing files from design file: ['B_2.fastq.gz']"
@pytest.mark.unit
def test_check_files_output_singleend(design, fastq_files):
paired = False
new_design = check_design.check_files(design, fastq_files, paired)
assert new_design.loc[0, 'fastq_read1'] == "/path/to/file/A_1.fastq.gz"
@pytest.mark.unit
def test_check_files_output_pairedend(design_3, fastq_files):
paired = True
new_design = check_design.check_files(design_3, fastq_files, paired)
assert new_design.loc[0, 'fastq_read2'] == "/path/to/file/A_2.fastq.gz"
@pytest.mark.unit
def test_check_replicates(design_4):
paired = False
with pytest.raises(Exception) as excinfo:
new_design = check_design.check_replicates(design_4)
assert str(excinfo.value) == "Duplicate replicates in experiments: ['B']"
@pytest.mark.unit
def test_check_samples(design_5):
paired = False
with pytest.raises(Exception) as excinfo:
new_design = check_design.check_samples(design_5)
assert str(excinfo.value) == "Malformed samples from design file: ['A 1', 'A.2', 'B-1']"
@pytest.mark.unit
def test_check_experiments(design_6):
paired = False
with pytest.raises(Exception) as excinfo:
new_design = check_design.check_experiments(design_6)
assert str(excinfo.value) == "Malformed experiment from design file: ['A ChIP', 'A.ChIP', 'B-ChIP']"
#!/usr/bin/env python3
import pytest
import os
test_output_path = os.path.dirname(os.path.abspath(__file__)) + \
'/../output/convertReads/'
@pytest.mark.singleend
def test_tag_reads_singleend():
assert os.path.exists(os.path.join(test_output_path, 'ENCLB831RUI/ENCLB831RUI.tagAlign.gz'))
@pytest.mark.singleend
def test_bed_reads_singleend():
assert os.path.exists(os.path.join(test_output_path, 'ENCLB831RUI/ENCLB831RUI.bedse.gz'))
@pytest.mark.pairedend
def test_tag_reads_pairedend():
assert os.path.exists(os.path.join(test_output_path, 'ENCLB568IYX/ENCLB568IYX.tagAlign.gz'))
@pytest.mark.pairedend
def test_bed_reads_pairedend():
assert os.path.exists(os.path.join(test_output_path, 'ENCLB568IYX/ENCLB568IYX.bedpe.gz'))
#!/usr/bin/env python3
import pytest
import pandas as pd
from io import StringIO
import os
import utils
test_output_path = os.path.dirname(os.path.abspath(__file__)) + \
'/../output/diffPeaks/'
@pytest.mark.singleskip_true
def test_diff_peaks_singleend_single_rep():
assert os.path.isdir(test_output_path) == False
@pytest.mark.pairedendskip_true
def test_diff_peaks_pairedend_single_rep():
assert os.path.isdir(test_output_path) == False
@pytest.mark.singlediff
def test_heatmap_singleend_multiple_rep():
assert os.path.exists(os.path.join(test_output_path, 'heatmap.pdf'))
@pytest.mark.singlediff
def test_pca_singleend_multiple_rep():
assert os.path.exists(os.path.join(test_output_path, 'pca.pdf'))
@pytest.mark.singlediff
def test_normcount_singleend_multiple_rep():
assert os.path.exists(os.path.join(test_output_path, 'normcount_peaksets.txt'))
@pytest.mark.singlediff
def test_diffbind_singleend_multiple_rep():
if os.path.isfile(os.path.join(test_output_path, 'ENCSR272GNQ_vs_ENCSR238SGC_diffbind.bed')):
assert os.path.exists(os.path.join(test_output_path, 'ENCSR272GNQ_vs_ENCSR238SGC_diffbind.bed'))
diffbind_file = test_output_path + 'ENCSR272GNQ_vs_ENCSR238SGC_diffbind.csv'
elif os.path.isfile(os.path.join(test_output_path, 'ENCSR238SGC_vs_ENCSR272GNQ_diffbind.bed')):
assert os.path.exists(os.path.join(test_output_path, 'ENCSR238SGC_vs_ENCSR272GNQ_diffbind.bed'))
diffbind_file = test_output_path + 'ENCSR238SGC_vs_ENCSR272GNQ_diffbind.csv'
assert os.path.exists(diffbind_file)
assert utils.count_lines(diffbind_file) >= 197471
@pytest.mark.paireddiff
def test_heatmap_pairedend_single_rep():
assert os.path.exists(os.path.join(test_output_path, 'heatmap.pdf'))
@pytest.mark.paireddiff
def test_pca_pairedend_single_rep():
assert os.path.exists(os.path.join(test_output_path, 'pca.pdf'))
@pytest.mark.paireddiff
def test_normcount_pairedend_single_rep():
assert os.path.exists(os.path.join(test_output_path, 'normcount_peaksets.txt'))
@pytest.mark.paireddiff
def test_diffbind_pairedend_single_rep():
if os.path.isfile(os.path.join(test_output_path, 'ENCSR757EMK_vs_ENCSR729LGA_diffbind.bed')):
assert os.path.exists(os.path.join(test_output_path, 'ENCSR757EMK_vs_ENCSR729LGA_diffbind.bed'))
diffbind_file = test_output_path + 'ENCSR757EMK_vs_ENCSR729LGA_diffbind.csv'
elif os.path.isfile(os.path.join(test_output_path, 'ENCSR729LGA_vs_ENCSR757EMK_diffbind.bed')):
assert os.path.exists(os.path.join(test_output_path, 'ENCSR729LGA_vs_ENCSR757EMK_diffbind.bed'))
diffbind_file = test_output_path + 'ENCSR729LGA_vs_ENCSR757EMK_diffbind.csv'
assert os.path.exists(diffbind_file)
assert utils.count_lines(diffbind_file) >= 65124
#!/usr/bin/env python3
import pytest
import pandas as pd
from io import StringIO
import experiment_design
import os
test_output_path = os.path.dirname(os.path.abspath(__file__)) + \
'/../output/design/'
DESIGN_STRING = """sample_id\ttag_align\txcor\tbiosample\tfactor\ttreatment\treplicate\tcontrol_id
A_1\tA_1.tagAlign.gz\tA\tLiver\tH3K27ac\tNone\t1\tB_1
A_2\tA_2.tagAlign.gz\tA\tLiver\tH3K27ac\tNone\t2\tB_2
B_1\tB_1.tagAlign.gz\tB\tLiver\tInput\tNone\t1\tB_1
B_2\tB_2.tagAlign.gz\tB\tLiver\tInput\tNone\t2\tB_2
"""
@pytest.fixture
def design_tag():
design_file = StringIO(DESIGN_STRING)
design_df = pd.read_csv(design_file, sep="\t")
return design_df
@pytest.mark.unit
def test_check_update_controls_tag(design_tag):
new_design = experiment_design.update_controls(design_tag)
assert new_design.loc[0, 'control_tag_align'] == "B_1.tagAlign.gz"
@pytest.mark.singleend
def test_experiment_design_singleend():
design_file = os.path.join(test_output_path, 'ENCSR238SGC.tsv')
assert os.path.exists(design_file)
design_df = pd.read_csv(design_file, sep="\t")
assert design_df.shape[0] == 2
@pytest.mark.pairedend
def test_experiment_design_pairedend():
design_file = os.path.join(test_output_path, 'ENCSR729LGA.tsv')
assert os.path.exists(design_file)
design_df = pd.read_csv(design_file, sep="\t")
assert design_df.shape[0] == 2
#!/usr/bin/env python3
import pytest
import os
import pandas as pd
from io import StringIO
import experiment_qc
test_output_path = os.path.dirname(os.path.abspath(__file__)) + \
'/../output/experimentQC/'
DESIGN_STRING = """sample_id\texperiment_id\tbiosample\tfactor\ttreatment\treplicate\tcontrol_id\tbam_reads
A_1\tA\tLiver\tH3K27ac\tNone\t1\tB_1\tA_1.bam
A_2\tA\tLiver\tH3K27ac\tNone\t2\tB_2\tA_2.bam
B_1\tB\tLiver\tInput\tNone\t1\tB_1\tB_1.bam
B_2\tB\tLiver\tInput\tNone\t2\tB_2\tB_2.bam
"""
@pytest.fixture
def design_bam():
design_file = StringIO(DESIGN_STRING)
design_df = pd.read_csv(design_file, sep="\t")
return design_df
@pytest.mark.unit
def test_check_update_controls(design_bam):
new_design = experiment_qc.update_controls(design_bam)
assert new_design.loc[0, 'control_reads'] == "B_1.bam"
@pytest.mark.singleend
def test_coverage_singleend():
assert os.path.exists(os.path.join(test_output_path, 'sample_mbs.npz'))
assert os.path.exists(os.path.join(test_output_path, 'coverage.pdf'))
@pytest.mark.singleend
def test_spearman_singleend():
assert os.path.exists(os.path.join(test_output_path, 'heatmap_SpearmanCorr.pdf'))
@pytest.mark.singleend
def test_pearson_singleend():
assert os.path.exists(os.path.join(test_output_path, 'heatmap_PearsonCorr.pdf'))
@pytest.mark.singleend
def test_fingerprint_singleend():
assert os.path.exists(os.path.join(test_output_path, 'ENCLB144FDT_fingerprint.pdf'))
assert os.path.exists(os.path.join(test_output_path, 'ENCLB831RUI_fingerprint.pdf'))
@pytest.mark.pairdend
def test_coverage_pairedend():
assert os.path.exists(os.path.join(test_output_path, 'sample_mbs.npz'))
assert os.path.exists(os.path.join(test_output_path, 'coverage.pdf'))
@pytest.mark.pairdend
def test_spearman_pairedend():
assert os.path.exists(os.path.join(test_output_path, 'heatmap_SpearmanCorr.pdf'))
@pytest.mark.pairdend
def test_pearson_pairedend():
assert os.path.exists(os.path.join(test_output_path, 'heatmap_PearsonCorr.pdf'))
@pytest.mark.pairdend
def test_fingerprint_pairedend():
assert os.path.exists(os.path.join(test_output_path, 'ENCLB568IYX_fingerprint.pdf'))
assert os.path.exists(os.path.join(test_output_path, 'ENCLB637LZP_fingerprint.pdf'))
#!/usr/bin/env python3
import pytest
import os
import utils
import yaml
from bs4 import BeautifulSoup
test_output_path = os.path.dirname(os.path.abspath(__file__)) + \
'/../output/multiqcReport/'
@pytest.mark.singleend
def test_software_references():
assert os.path.exists(os.path.join(test_output_path, 'software_references_mqc.yaml'))
@pytest.mark.singleend
def test_software_references_output():
software_references = os.path.join(test_output_path, 'software_references_mqc.yaml')
with open(software_references, 'r') as stream:
data_loaded = yaml.load(stream)
assert len(data_loaded['data'].split('<ul>')) == 19
@pytest.mark.singleend
def test_software_references_html():
multiqc_report = os.path.join(test_output_path, 'multiqc_report.html')
html_file = open(multiqc_report, 'r')
source_code = html_file.read()
multiqc_html = BeautifulSoup(source_code, 'html.parser')
references = multiqc_html.find(id="mqc-module-section-Software_References")
assert references is not None
assert len(references.find_all('ul')) == 18
#!/usr/bin/env python3
import pytest
import os
import utils
import yaml
test_output_path = os.path.dirname(os.path.abspath(__file__)) + \
'/../output/multiqcReport/'
@pytest.mark.singleend
def test_software_versions():
assert os.path.exists(os.path.join(test_output_path, 'software_versions_mqc.yaml'))
@pytest.mark.singleend
def test_software_versions_output():
software_versions = os.path.join(test_output_path, 'software_versions_mqc.yaml')
with open(software_versions, 'r') as stream:
data_loaded = yaml.load(stream)
assert len(data_loaded['data'].split('<dt>')) == 18
assert 'Not Run' not in data_loaded['data'].split('<dt>')[17]
#!/usr/bin/env python3
import pytest
import os
import pandas as pd
test_output_path = os.path.dirname(os.path.abspath(__file__)) + \
'/../output/filterReads/'
@pytest.mark.singleend
def test_dedup_files_singleend():
assert os.path.exists(os.path.join(test_output_path, 'ENCLB831RUI/ENCLB831RUI.dedup.bam'))
assert os.path.exists(os.path.join(test_output_path, 'ENCLB831RUI/ENCLB831RUI.dedup.bam.bai'))
assert os.path.exists(os.path.join(test_output_path, 'ENCLB831RUI/ENCLB831RUI.dedup.qc'))
@pytest.mark.singleend
def test_map_qc_singleend():
filtered_reads_report = test_output_path + 'ENCLB831RUI/ENCLB831RUI.dedup.flagstat.qc'
samtools_report = open(filtered_reads_report).readlines()
assert '64962570 + 0 in total' in samtools_report[0]
assert '64962570 + 0 mapped (100.00%:N/A)' in samtools_report[4]
@pytest.mark.singleend
def test_library_complexity_singleend():
library_complexity = test_output_path + 'ENCLB831RUI/ENCLB831RUI.pbc.qc'
df_library_complexity = pd.read_csv(library_complexity, sep='\t')
assert df_library_complexity["NRF"].iloc[0] == 0.926192
assert df_library_complexity["PBC1"].iloc[0] == 0.926775
assert df_library_complexity["PBC2"].iloc[0] == 13.706885
@pytest.mark.pairedend
def test_dedup_files_pairedend():
assert os.path.exists(os.path.join(test_output_path, 'ENCLB568IYX/ENCLB568IYX.dedup.bam'))
assert os.path.exists(os.path.join(test_output_path, 'ENCLB568IYX/ENCLB568IYX.dedup.bam.bai'))
assert os.path.exists(os.path.join(test_output_path, 'ENCLB568IYX/ENCLB568IYX.dedup.qc'))
@pytest.mark.pairedend
def test_map_qc_pairedend():
filtered_reads_report = test_output_path + 'ENCLB568IYX/ENCLB568IYX.dedup.flagstat.qc'
samtools_report = open(filtered_reads_report).readlines()
assert '47388510 + 0 in total' in samtools_report[0]
assert '47388510 + 0 mapped (100.00%:N/A)' in samtools_report[4]
@pytest.mark.pairedend
def test_library_complexity_pairedend():
library_complexity = test_output_path + 'ENCLB568IYX/ENCLB568IYX.pbc.qc'
df_library_complexity = pd.read_csv(library_complexity, sep='\t')
assert df_library_complexity["NRF"].iloc[0] == 0.947064
assert round(df_library_complexity["PBC1"].iloc[0],6) == 0.946723
assert round(df_library_complexity["PBC2"].iloc[0],6) == 18.642645
#!/usr/bin/env python3
import pytest
import os
test_output_path = os.path.dirname(os.path.abspath(__file__)) + \
'/../output/alignReads/'
@pytest.mark.singleend
def test_map_reads_singleend():
assert os.path.exists(os.path.join(test_output_path, 'ENCLB831RUI/ENCLB831RUI.bam'))
aligned_reads_report = test_output_path + 'ENCLB831RUI/ENCLB831RUI.flagstat.qc'
samtools_report = open(aligned_reads_report).readlines()
assert '80795025 + 0 in total' in samtools_report[0]
assert '80050072 + 0 mapped (99.08% : N/A)' in samtools_report[4]
@pytest.mark.pairedend
def test_map_reads_pairedend():
assert os.path.exists(os.path.join(test_output_path, 'ENCLB678IDC/ENCLB678IDC.bam'))
aligned_reads_report = test_output_path + 'ENCLB678IDC/ENCLB678IDC.flagstat.qc'
samtools_report = open(aligned_reads_report).readlines()
assert '72660890 + 0 in total' in samtools_report[0]
assert '72053925 + 0 mapped (99.16% : N/A)' in samtools_report[4]
assert '71501126 + 0 properly paired (98.40% : N/A)' in samtools_report[8]
#!/usr/bin/env python3
import pytest
import pandas as pd
from io import StringIO
import os
import utils
test_output_path = os.path.dirname(os.path.abspath(__file__)) + \
'/../output/motifSearch/'
@pytest.mark.singleend
def test_limited_peaks_singleend():
peak_file_ENCSR238SGC = test_output_path + 'ENCSR238SGC.600.narrowPeak'
assert os.path.exists(peak_file_ENCSR238SGC)
assert utils.count_lines(peak_file_ENCSR238SGC) == 600
@pytest.mark.singleend
def test_motif_search_singleend():
assert os.path.exists(os.path.join(test_output_path, 'ENCSR238SGC_memechip', 'ENCSR238SGC.fa'))
assert os.path.exists(os.path.join(test_output_path, 'ENCSR238SGC_memechip', 'index.html'))
assert os.path.getsize(os.path.join(test_output_path, 'ENCSR238SGC_memechip', 'ENCSR238SGC.fa')) > 0
@pytest.mark.singleskip_true
def test_motif_search_singleend():
assert os.path.isdir(test_output_path) == False
@pytest.mark.pairedend
def test_limited_peaks_pairedend():
peak_file_ENCSR729LGA= test_output_path + 'ENCSR729LGA.600.narrowPeak'
assert os.path.exists(peak_file_ENCSR729LGA)
assert utils.count_lines(peak_file_ENCSR729LGA) == 600
@pytest.mark.pairedend
def test_motif_search_pairedend():
assert os.path.exists(os.path.join(test_output_path, 'ENCSR729LGA_memechip', 'ENCSR729LGA.fa'))
assert os.path.exists(os.path.join(test_output_path, 'ENCSR729LGA_memechip', 'index.html'))
assert os.path.getsize(os.path.join(test_output_path, 'ENCSR729LGA_memechip', 'ENCSR729LGA.fa')) > 0
#!/usr/bin/env python3
import pytest
import pandas as pd
from io import StringIO
import os
import utils
import overlap_peaks
test_output_path = os.path.dirname(os.path.abspath(__file__)) + \
'/../output/consensusPeaks/'
DESIGN_STRING = """sample_id\tbam_reads\tbam_index\texperiment_id\tbiosample\tfactor\ttreatment\treplicate\tcontrol_id
A_1\tA_1.bam\tA_1.bai\tA\tLiver\tH3K27ac\tNone\t1\tB_1
A_2\tA_2.bam\tA_2.bai\tA\tLiver\tH3K27ac\tNone\t2\tB_2
B_1\tB_1.bam\tB_1.bai\tB\tLiver\tH3K27ac\tNone\t1\tB_1
B_2\tB_2.bam\tB_2.bai\tB\tLiver\tH3K27ac\tNone\t2\tB_2
"""
@pytest.fixture
def design_diff():
design_file = StringIO(DESIGN_STRING)
design_df = pd.read_csv(design_file, sep="\t")
return design_df
@pytest.mark.unit
def test_check_update_design(design_diff):
new_design = overlap_peaks.update_design(design_diff)
assert new_design.shape[0] == 2
assert new_design.loc[0, 'control_bam_reads'] == "B_1.bam"
assert new_design.loc[0, 'peak_caller'] == "bed"
@pytest.mark.singleend
def test_overlap_peaks_singleend():
assert os.path.exists(os.path.join(test_output_path, 'ENCSR238SGC.rejected.narrowPeak'))
peak_file = test_output_path + 'ENCSR238SGC.replicated.narrowPeak'
assert utils.count_lines(peak_file) >= 149291
@pytest.mark.pairedend
def test_overlap_peaks_pairedend():
assert os.path.exists(os.path.join(test_output_path, 'ENCSR729LGA.rejected.narrowPeak'))
peak_file = test_output_path + 'ENCSR729LGA.replicated.narrowPeak'
assert utils.count_lines(peak_file) >= 25657
@pytest.mark.singlecontrol
def test_overlap_peaks_singlecontrol():
assert os.path.exists(os.path.join(test_output_path, 'ENCSR000DXB.rejected.narrowPeak'))
peak_file = test_output_path + 'ENCSR000DXB.replicated.narrowPeak'
assert utils.count_lines(peak_file) >= 35097
#!/usr/bin/env python3
import pytest
import os
import utils
test_output_path = os.path.dirname(os.path.abspath(__file__)) + \
'/../output/experimentQC/'
@pytest.mark.singleend
def test_plot_singleend():
assert os.path.exists(os.path.join(test_output_path, 'plotProfile.png'))
assert os.path.getsize(os.path.join(test_output_path, 'plotProfile.png')) > 0
@pytest.mark.pairedend
def test_plot_pairedend():
assert os.path.exists(os.path.join(test_output_path, 'plotProfile.png'))
assert os.path.getsize(os.path.join(test_output_path, 'plotProfile.png')) > 0
#!/usr/bin/env python3
import pytest
import pandas as pd
from io import StringIO
import os
import pool_and_psuedoreplicate
import shutil
test_design_path = os.path.dirname(os.path.abspath(__file__)) + \
'/../../test_data/'
test_output_path = os.path.dirname(os.path.abspath(__file__)) + \
'/../output/design/'
DESIGN_STRING = """sample_id\tse_tag_align\ttag_align\txcor\texperiment_id\tbiosample\tfactor\ttreatment\treplicate\tcontrol_id\tcontrol_tag_align
A_1\tA_1.tagAlign.gz\tA_1.bedse.gz\tA_1.cc.qc\tA\tLiver\tH3K27ac\tNone\t1\tB_1\tB_1.bedse.gz
A_2\tA_2.tagAlign.gz\tA_2.bedse.gz\tA_2.cc.qc\tA\tLiver\tH3K27ac\tNone\t2\tB_2\tB_2.bedse.gz
"""
@pytest.fixture
def design_experiment():
design_file = StringIO(DESIGN_STRING)
design_df = pd.read_csv(design_file, sep="\t")
return design_df
@pytest.fixture
def design_experiment_2(design_experiment):
# Drop Replicate A_2
design_df = design_experiment.drop(design_experiment.index[1])
return design_df
@pytest.fixture
def design_experiment_3(design_experiment):
# Drop Replicate A_2
design_df = design_experiment.drop(design_experiment.index[1])
# Update to be paired as first
design_df.loc[0, 'control_tag_align'] = 'B_1.bedpe.gz'
design_df.loc[0, 'tag_align'] = 'A_1.bedpe.gz'
return design_df
@pytest.mark.unit
def test_check_replicates(design_experiment):
no_reps = pool_and_psuedoreplicate.check_replicates(design_experiment)
assert no_reps == 2
@pytest.mark.unit
def test_check_replicates_single(design_experiment_2):
no_reps = pool_and_psuedoreplicate.check_replicates(design_experiment_2)
assert no_reps == 1
@pytest.mark.unit
def test_check_controls(design_experiment):
no_controls = pool_and_psuedoreplicate.check_controls(design_experiment)
assert no_controls == 2
@pytest.mark.unit
def test_check_controls_single(design_experiment_3):
no_controls = pool_and_psuedoreplicate.check_controls(design_experiment_3)
assert no_controls == 1
@pytest.mark.unit
def test_single_rep(design_experiment_2):
cwd = os.getcwd()
shutil.copy(test_design_path + 'A_1.bedse.gz', cwd)
shutil.copy(test_design_path + 'B_1.bedse.gz', cwd)
shutil.copy(test_design_path + 'A_1.tagAlign.gz', cwd)
shutil.copy(test_design_path + 'B_1.tagAlign.gz', cwd)
single_rep = pool_and_psuedoreplicate.generate_design('false', 1.2, design_experiment_2, cwd, 1, 1)
assert single_rep.shape[0] == 4
assert len(single_rep['control_tag_align'].unique()) == 2
assert 'pool_control.tagAlign.gz' in single_rep['control_tag_align'].unique()[1]
@pytest.mark.unit
def test_single_control(design_experiment_3):
cwd = os.getcwd()
shutil.copy(test_design_path + 'A_1.bedpe.gz', cwd)
shutil.copy(test_design_path + 'B_1.bedpe.gz', cwd)
shutil.copy(test_design_path + 'A_1.tagAlign.gz', cwd)
single_control = pool_and_psuedoreplicate.generate_design('true', 1.2, design_experiment_3, cwd, 1, 1)
assert 'pool_control.tagAlign.gz' in single_control['control_tag_align'].unique()[0]
@pytest.mark.singleend
def test_pool_and_psuedoreplicate_singleend():
design_file = os.path.join(test_output_path, 'ENCSR238SGC_ppr.tsv')
assert os.path.exists(design_file)
design_df = pd.read_csv(design_file, sep="\t")
assert design_df.shape[0] == 5
@pytest.mark.pairedend
def test_experiment_design_pairedend():
design_file = os.path.join(test_output_path, 'ENCSR729LGA_ppr.tsv')
assert os.path.exists(design_file)
design_df = pd.read_csv(design_file, sep="\t")
assert design_df.shape[0] == 5
#!/usr/bin/env python3
import pytest
import os
test_data_path = os.path.dirname(os.path.abspath(__file__)) + \
'/../../test_data/'
test_output_path = os.path.dirname(os.path.abspath(__file__)) + \
'/../output/trimReads/'
@pytest.mark.singleend
def test_trim_reads_singleend():
raw_fastq = test_data_path + 'ENCFF833BLU.fastq.gz'
trimmed_fastq = test_output_path + 'ENCLB144FDT/ENCLB144FDT_R1_trimmed.fq.gz'
assert os.path.getsize(raw_fastq) != os.path.getsize(trimmed_fastq)
assert os.path.getsize(trimmed_fastq) == 2512853101
@pytest.mark.singleend
def test_trim_report_singleend():
trimmed_fastq_report = test_output_path + \
'ENCLB144FDT/ENCLB144FDT_R1.fastq.gz_trimming_report.txt'
assert 'Trimming mode: single-end' in open(trimmed_fastq_report).readlines()[4]
@pytest.mark.pairedend
def test_trim_reads_pairedend():
raw_fastq = test_data_path + 'ENCFF582IOZ.fastq.gz'
trimmed_fastq = test_output_path + 'ENCLB637LZP/ENCLB637LZP_R2_val_2.fq.gz'
assert os.path.getsize(raw_fastq) != os.path.getsize(trimmed_fastq)
assert os.path.getsize(trimmed_fastq) == 2229312710
@pytest.mark.pairedend
def test_trim_report_pairedend():
trimmed_fastq_report = test_output_path + \
'ENCLB637LZP/ENCLB637LZP_R2.fastq.gz_trimming_report.txt'
assert 'Trimming mode: paired-end' in open(trimmed_fastq_report).readlines()[4]