Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • BICF/Astrocyte/chipseq_analysis
  • s190984/chipseq_analysis
  • astrocyte/workflows/bicf/chipseq_analysis
  • s219741/chipseq-analysis-containerized
Show changes
Showing
with 1207 additions and 0 deletions
#!/usr/bin/env python3
#
# * --------------------------------------------------------------------------
# * Licensed under MIT (https://git.biohpc.swmed.edu/BICF/Astrocyte/chipseq_analysis/LICENSE.md)
# * --------------------------------------------------------------------------
#
'''Compute cross-correlation analysis.'''
import os
import argparse
import shutil
import subprocess
import logging
from multiprocessing import cpu_count
import utils
EPILOG = '''
For more details:
%(prog)s --help
'''
# SETTINGS
logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())
logger.propagate = False
logger.setLevel(logging.INFO)
# the order of this list is important.
# strip_extensions strips from the right inward, so
# the expected right-most extensions should appear first (like .gz)
# Modified from J. Seth Strattan
STRIP_EXTENSIONS = ['.gz', '.tagAlign', '.bedse', '.bedpe']
def get_args():
'''Define arguments.'''
parser = argparse.ArgumentParser(
description=__doc__, epilog=EPILOG,
formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument('-t', '--tag',
help="The tagAlign file to qc on.",
required=True)
parser.add_argument('-p', '--paired',
help="True/False if paired-end or single end.",
default=False,
action='store_true')
args = parser.parse_args()
return args
# Functions
def check_tools():
'''Checks for required componenets on user system'''
logger.info('Checking for required libraries and components on this system')
r_path = shutil.which("R")
if r_path:
logger.info('Found R: %s', r_path)
# Get Version
r_version_command = "R --version"
r_version = subprocess.check_output(r_version_command, shell=True)
# Write to file
r_file = open("version_r.txt", "wb")
r_file.write(r_version)
r_file.close()
else:
logger.error('Missing R')
raise Exception('Missing R')
phantompeak_path = shutil.which("run_spp.R")
if phantompeak_path:
logger.info('Found phantompeak: %s', phantompeak_path)
# Get Version
spp_version_command = "R -e \"packageVersion('spp')\""
spp_version = subprocess.check_output(spp_version_command, shell=True)
# Write to file
spp_file = open("version_spp.txt", "wb")
spp_file.write(spp_version)
spp_file.close()
else:
logger.error('Missing phantompeak')
raise Exception('Missing phantompeak')
def xcor(tag, paired):
'''Use spp to calculate cross-correlation stats.'''
tag_basename = os.path.basename(utils.strip_extensions(tag, STRIP_EXTENSIONS))
uncompressed_tag_filename = tag_basename
# Subsample tagAlign file
number_reads = 20000000
subsampled_tag_filename = \
tag_basename + ".%d.tagAlign.gz" % (number_reads/1000000)
tag_extended = 'cat.tagAlign.gz'
out, err = utils.run_pipe([
"zcat %s %s %s" %
(tag, tag, tag)
], outfile=tag_extended)
steps = [
'zcat %s' % (tag),
'grep -v "chrM"',
'shuf -n %d --random-source=%s' % (number_reads, tag_extended)]
if paired:
steps.extend([r"""awk 'BEGIN{OFS="\t"}{$4="N";$5="1000";print $0}'"""])
steps.extend(['gzip -nc'])
out, err = utils.run_pipe(steps, outfile=subsampled_tag_filename)
# Calculate Cross-correlation QC scores
cc_scores_filename = tag_basename + ".cc.qc"
cc_plot_filename = tag_basename + ".cc.plot.pdf"
# CC_SCORE FILE format
# Filename <tab>
# numReads <tab>
# estFragLen <tab>
# corr_estFragLen <tab>
# PhantomPeak <tab>
# corr_phantomPeak <tab>
# argmin_corr <tab>
# min_corr <tab>
# phantomPeakCoef <tab>
# relPhantomPeakCoef <tab>
# QualityTag
run_spp_command = shutil.which("run_spp.R")
out, err = utils.run_pipe([
"Rscript %s -c=%s -p=%d -filtchr=chrM -savp=%s -out=%s" %
(run_spp_command, subsampled_tag_filename, cpu_count(),
cc_plot_filename, cc_scores_filename)
])
return cc_scores_filename
def main():
args = get_args()
paired = args.paired
tag = args.tag
# Create a file handler
handler = logging.FileHandler('xcor.log')
logger.addHandler(handler)
# Check if tools are present
check_tools()
# Calculate Cross-correlation
xcor(tag, paired)
if __name__ == '__main__':
main()
#!/opt/bats/libexec/bats-core/ bats
profile_script="./workflow/scripts/plot_profile.sh"
@test "Test deeptools present" {
source ${profile_script}
run check_tools
}
@test "Test deeptools computeMatrix" {
source ${profile_script}
run compute_matrix test_data/ENCSR238SGC_pooled.fc_signal.bw /project/shared/bicf_workflow_ref/mouse/GRCm38/gencode.vM20.annotation.gtf
FILE=computeMatrix.gz
if [[ -s "$FILE" ]]; then
echo "$FILE exists and not empty"
fi
}
@test "Test deeptools plotProfile" {
source ${profile_script}
run plot_profile computeMatrix.gz
FILE=plotProfile.png
if [[ -s "$FILE" ]]; then
echo "$FILE exists and not empty"
fi
}
#!/usr/bin/env python3
import pytest
import pandas as pd
from io import StringIO
import os
import utils
test_output_path = os.path.dirname(os.path.abspath(__file__)) + \
'/../output/peakAnnotation/'
@pytest.mark.singleend
def test_pie_singleend():
assert os.path.exists(os.path.join(test_output_path, 'ENCSR238SGC.chipseeker_pie.pdf'))
@pytest.mark.singleend
def test_upsetplot_singleend():
assert os.path.exists(os.path.join(test_output_path, 'ENCSR238SGC.chipseeker_upsetplot.pdf'))
@pytest.mark.singleend
def test_annotation_singleend():
annotation_file = test_output_path + 'ENCSR238SGC.chipseeker_annotation.tsv'
assert os.path.exists(annotation_file)
assert utils.count_lines(annotation_file) >= 149284
df = pd.read_csv(annotation_file, sep = "\t", header = 0)
print(df.head())
#assert df['symbol'].notna().all()
assert not(df['symbol'].isnull().values.any())
@pytest.mark.pairedend
def test_pie_pairedend():
assert os.path.exists(os.path.join(test_output_path, 'ENCSR729LGA.chipseeker_pie.pdf'))
@pytest.mark.pairedend
def test_upsetplot_pairedend():
assert os.path.exists(os.path.join(test_output_path, 'ENCSR729LGA.chipseeker_upsetplot.pdf'))
@pytest.mark.pairedend
def test_annotation_pairedend():
annotation_file = test_output_path + 'ENCSR729LGA.chipseeker_annotation.tsv'
assert os.path.exists(annotation_file)
assert utils.count_lines(annotation_file) >= 25367
df = pd.read_csv(annotation_file, sep = "\t", header = 0)
print(df.head())
#assert df['symbol'].notna().all()
assert not(df['symbol'].isnull().values.any())
#!/usr/bin/env python3
import pytest
import os
import utils
from io import StringIO
import call_peaks_macs
test_output_path = os.path.dirname(os.path.abspath(__file__)) + \
'/../output/callPeaksMACS/'
test_data_path = os.path.dirname(os.path.abspath(__file__)) + \
'/../../test_data/'
@pytest.mark.unit
def test_fragment_length():
experiment = "experiment.tagAlign.gz"
control = "control.tagAlign.gz"
prefix = 'test'
genome_size = 'hs'
chrom_sizes = 'genomefile.txt'
cross_qc = os.path.join(test_data_path, 'test_cross.qc')
with pytest.raises(Exception) as excinfo:
call_peaks_macs.call_peaks_macs(experiment, cross_qc, control, prefix, genome_size, chrom_sizes)
assert str(excinfo.value) == "Error in cross-correlation analysis: ['0', '20', '33']"
@pytest.mark.singleend
def test_fc_signal_singleend():
assert os.path.exists(os.path.join(test_output_path, 'ENCSR238SGC/1/', 'ENCLB144FDT.fc_signal.bw'))
@pytest.mark.singleend
def test_pvalue_signal_singleend():
assert os.path.exists(os.path.join(test_output_path, 'ENCSR238SGC/1/', 'ENCLB144FDT.pvalue_signal.bw'))
@pytest.mark.singleend
def test_peaks_xls_singleend():
assert os.path.exists(os.path.join(test_output_path, 'ENCSR238SGC/1/', 'ENCLB144FDT_peaks.xls'))
@pytest.mark.singleend
def test_peaks_bed_singleend():
peak_file = test_output_path + 'ENCSR238SGC/1/' + 'ENCLB144FDT.narrowPeak'
assert utils.count_lines(peak_file) == 226738
@pytest.mark.pairedend
def test_fc_signal_pairedend():
assert os.path.exists(os.path.join(test_output_path, 'ENCSR729LGA/2/', 'ENCLB568IYX.fc_signal.bw'))
@pytest.mark.pairedend
def test_pvalue_signal_pairedend():
assert os.path.exists(os.path.join(test_output_path, 'ENCSR729LGA/2/', 'ENCLB568IYX.pvalue_signal.bw'))
@pytest.mark.pairedend
def test_peaks_xls_pairedend():
assert os.path.exists(os.path.join(test_output_path, 'ENCSR729LGA/2/', 'ENCLB568IYX_peaks.xls'))
@pytest.mark.pairedend
def test_peaks_bed_pairedend():
peak_file = test_output_path + 'ENCSR729LGA/2/' + 'ENCLB568IYX.narrowPeak'
assert utils.count_lines(peak_file) == 112631
#!/usr/bin/env python3
import pytest
import pandas as pd
from io import StringIO
import check_design
DESIGN_STRING = """sample_id\texperiment_id\tbiosample\tfactor\ttreatment\treplicate\tcontrol_id\tfastq_read1
A_1\tA\tLiver\tH3K27ac\tNone\t1\tB_1\tA_1.fastq.gz
A_2\tA\tLiver\tH3K27ac\tNone\t2\tB_2\tA_2.fastq.gz
B_1\tB\tLiver\tInput\tNone\t1\tB_1\tB_1.fastq.gz
B_2\tB\tLiver\tInput\tNone\t2\tB_2\tB_2.fastq.gz
"""
FASTQ_STRING = """
A_1.fastq.gz\t/path/to/file/A_1.fastq.gz
A_2.fastq.gz\t/path/to/file/A_2.fastq.gz
B_1.fastq.gz\t/path/to/file/B_1.fastq.gz
B_2.fastq.gz\t/path/to/file/B_2.fastq.gz
"""
@pytest.fixture
def design():
design_file = StringIO(DESIGN_STRING)
design_df = pd.read_csv(design_file, sep="\t")
return design_df
@pytest.fixture
def fastq_files():
fastq_file = StringIO(FASTQ_STRING)
fastq_df = pd.read_csv(fastq_file, sep='\t', names=['name', 'path'])
return fastq_df
@pytest.fixture
def design_1(design):
design_df = design.drop('fastq_read1', axis=1)
return design_df
@pytest.fixture
def design_2(design):
# Drop Control B_1
design_df = design.drop(design.index[2])
return design_df
@pytest.fixture
def design_3(design):
# Drop A_2 and B_2 and append as fastq_read2
design_df = design.drop(design.index[[1, 3]])
design_df['fastq_read2'] = design.loc[[1, 3], 'fastq_read1'].values
return design_df
@pytest.fixture
def design_4(design):
# Update replicate 2 for experiment B to be 1
design.loc[design['sample_id'] == 'B_2', 'replicate'] = 1
return design
@pytest.fixture
def design_5(design):
# Update sample_id to have -, spaces or periods
design.loc[design['sample_id'] == 'A_1', 'sample_id'] = 'A 1'
design.loc[design['sample_id'] == 'A_2', 'sample_id'] = 'A.2'
design.loc[design['sample_id'] == 'B_1', 'sample_id'] = 'B-1'
return design
@pytest.fixture
def design_6(design):
# Update experiment_id to have -, spaces or periods
design.loc[design['sample_id'] == 'A_1', 'experiment_id'] = 'A ChIP'
design.loc[design['sample_id'] == 'A_2', 'experiment_id'] = 'A.ChIP'
design.loc[design['sample_id'] == 'B_1', 'experiment_id'] = 'B-ChIP'
return design
@pytest.fixture
def fastq_files_1(fastq_files):
# Drop B_2.fastq.gz
fastq_df = fastq_files.drop(fastq_files.index[3])
return fastq_df
@pytest.mark.unit
def test_check_headers_singleend(design_1):
paired = False
with pytest.raises(Exception) as excinfo:
check_design.check_design_headers(design_1, paired)
assert str(excinfo.value) == "Missing column headers: ['fastq_read1']"
@pytest.mark.unit
def test_check_headers_pairedend(design):
paired = True
with pytest.raises(Exception) as excinfo:
check_design.check_design_headers(design, paired)
assert str(excinfo.value) == "Missing column headers: ['fastq_read2']"
@pytest.mark.unit
def test_check_controls(design_2):
with pytest.raises(Exception) as excinfo:
check_design.check_controls(design_2)
assert str(excinfo.value) == "Missing control experiments: ['B_1']"
@pytest.mark.unit
def test_check_files_missing_files(design, fastq_files_1):
paired = False
with pytest.raises(Exception) as excinfo:
new_design = check_design.check_files(design, fastq_files_1, paired)
assert str(excinfo.value) == "Missing files from design file: ['B_2.fastq.gz']"
@pytest.mark.unit
def test_check_files_output_singleend(design, fastq_files):
paired = False
new_design = check_design.check_files(design, fastq_files, paired)
assert new_design.loc[0, 'fastq_read1'] == "/path/to/file/A_1.fastq.gz"
@pytest.mark.unit
def test_check_files_output_pairedend(design_3, fastq_files):
paired = True
new_design = check_design.check_files(design_3, fastq_files, paired)
assert new_design.loc[0, 'fastq_read2'] == "/path/to/file/A_2.fastq.gz"
@pytest.mark.unit
def test_check_replicates(design_4):
paired = False
with pytest.raises(Exception) as excinfo:
new_design = check_design.check_replicates(design_4)
assert str(excinfo.value) == "Duplicate replicates in experiments: ['B']"
@pytest.mark.unit
def test_check_samples(design_5):
paired = False
with pytest.raises(Exception) as excinfo:
new_design = check_design.check_samples(design_5)
assert str(excinfo.value) == "Malformed samples from design file: ['A 1', 'A.2', 'B-1']"
@pytest.mark.unit
def test_check_experiments(design_6):
paired = False
with pytest.raises(Exception) as excinfo:
new_design = check_design.check_experiments(design_6)
assert str(excinfo.value) == "Malformed experiment from design file: ['A ChIP', 'A.ChIP', 'B-ChIP']"
#!/usr/bin/env python3
import pytest
import os
test_output_path = os.path.dirname(os.path.abspath(__file__)) + \
'/../output/convertReads/'
@pytest.mark.singleend
def test_tag_reads_singleend():
assert os.path.exists(os.path.join(test_output_path, 'ENCLB831RUI/ENCLB831RUI.tagAlign.gz'))
@pytest.mark.singleend
def test_bed_reads_singleend():
assert os.path.exists(os.path.join(test_output_path, 'ENCLB831RUI/ENCLB831RUI.bedse.gz'))
@pytest.mark.pairedend
def test_tag_reads_pairedend():
assert os.path.exists(os.path.join(test_output_path, 'ENCLB568IYX/ENCLB568IYX.tagAlign.gz'))
@pytest.mark.pairedend
def test_bed_reads_pairedend():
assert os.path.exists(os.path.join(test_output_path, 'ENCLB568IYX/ENCLB568IYX.bedpe.gz'))
#!/usr/bin/env python3
import pytest
import pandas as pd
from io import StringIO
import os
import utils
test_output_path = os.path.dirname(os.path.abspath(__file__)) + \
'/../output/diffPeaks/'
@pytest.mark.singleskip_true
def test_diff_peaks_singleend_single_rep():
assert os.path.isdir(test_output_path) == False
@pytest.mark.pairedendskip_true
def test_diff_peaks_pairedend_single_rep():
assert os.path.isdir(test_output_path) == False
@pytest.mark.singlediff
def test_heatmap_singleend_multiple_rep():
assert os.path.exists(os.path.join(test_output_path, 'heatmap.pdf'))
@pytest.mark.singlediff
def test_pca_singleend_multiple_rep():
assert os.path.exists(os.path.join(test_output_path, 'pca.pdf'))
@pytest.mark.singlediff
def test_normcount_singleend_multiple_rep():
assert os.path.exists(os.path.join(test_output_path, 'normcount_peaksets.txt'))
@pytest.mark.singlediff
def test_diffbind_singleend_multiple_rep():
if os.path.isfile(os.path.join(test_output_path, 'ENCSR272GNQ_vs_ENCSR238SGC_diffbind.bed')):
assert os.path.exists(os.path.join(test_output_path, 'ENCSR272GNQ_vs_ENCSR238SGC_diffbind.bed'))
diffbind_file = test_output_path + 'ENCSR272GNQ_vs_ENCSR238SGC_diffbind.csv'
elif os.path.isfile(os.path.join(test_output_path, 'ENCSR238SGC_vs_ENCSR272GNQ_diffbind.bed')):
assert os.path.exists(os.path.join(test_output_path, 'ENCSR238SGC_vs_ENCSR272GNQ_diffbind.bed'))
diffbind_file = test_output_path + 'ENCSR238SGC_vs_ENCSR272GNQ_diffbind.csv'
assert os.path.exists(diffbind_file)
assert utils.count_lines(diffbind_file) >= 197471
@pytest.mark.paireddiff
def test_heatmap_pairedend_single_rep():
assert os.path.exists(os.path.join(test_output_path, 'heatmap.pdf'))
@pytest.mark.paireddiff
def test_pca_pairedend_single_rep():
assert os.path.exists(os.path.join(test_output_path, 'pca.pdf'))
@pytest.mark.paireddiff
def test_normcount_pairedend_single_rep():
assert os.path.exists(os.path.join(test_output_path, 'normcount_peaksets.txt'))
@pytest.mark.paireddiff
def test_diffbind_pairedend_single_rep():
if os.path.isfile(os.path.join(test_output_path, 'ENCSR757EMK_vs_ENCSR729LGA_diffbind.bed')):
assert os.path.exists(os.path.join(test_output_path, 'ENCSR757EMK_vs_ENCSR729LGA_diffbind.bed'))
diffbind_file = test_output_path + 'ENCSR757EMK_vs_ENCSR729LGA_diffbind.csv'
elif os.path.isfile(os.path.join(test_output_path, 'ENCSR729LGA_vs_ENCSR757EMK_diffbind.bed')):
assert os.path.exists(os.path.join(test_output_path, 'ENCSR729LGA_vs_ENCSR757EMK_diffbind.bed'))
diffbind_file = test_output_path + 'ENCSR729LGA_vs_ENCSR757EMK_diffbind.csv'
assert os.path.exists(diffbind_file)
assert utils.count_lines(diffbind_file) >= 65124
#!/usr/bin/env python3
import pytest
import pandas as pd
from io import StringIO
import experiment_design
import os
test_output_path = os.path.dirname(os.path.abspath(__file__)) + \
'/../output/design/'
DESIGN_STRING = """sample_id\ttag_align\txcor\tbiosample\tfactor\ttreatment\treplicate\tcontrol_id
A_1\tA_1.tagAlign.gz\tA\tLiver\tH3K27ac\tNone\t1\tB_1
A_2\tA_2.tagAlign.gz\tA\tLiver\tH3K27ac\tNone\t2\tB_2
B_1\tB_1.tagAlign.gz\tB\tLiver\tInput\tNone\t1\tB_1
B_2\tB_2.tagAlign.gz\tB\tLiver\tInput\tNone\t2\tB_2
"""
@pytest.fixture
def design_tag():
design_file = StringIO(DESIGN_STRING)
design_df = pd.read_csv(design_file, sep="\t")
return design_df
@pytest.mark.unit
def test_check_update_controls_tag(design_tag):
new_design = experiment_design.update_controls(design_tag)
assert new_design.loc[0, 'control_tag_align'] == "B_1.tagAlign.gz"
@pytest.mark.singleend
def test_experiment_design_singleend():
design_file = os.path.join(test_output_path, 'ENCSR238SGC.tsv')
assert os.path.exists(design_file)
design_df = pd.read_csv(design_file, sep="\t")
assert design_df.shape[0] == 2
@pytest.mark.pairedend
def test_experiment_design_pairedend():
design_file = os.path.join(test_output_path, 'ENCSR729LGA.tsv')
assert os.path.exists(design_file)
design_df = pd.read_csv(design_file, sep="\t")
assert design_df.shape[0] == 2
#!/usr/bin/env python3
import pytest
import os
import pandas as pd
from io import StringIO
import experiment_qc
test_output_path = os.path.dirname(os.path.abspath(__file__)) + \
'/../output/experimentQC/'
DESIGN_STRING = """sample_id\texperiment_id\tbiosample\tfactor\ttreatment\treplicate\tcontrol_id\tbam_reads
A_1\tA\tLiver\tH3K27ac\tNone\t1\tB_1\tA_1.bam
A_2\tA\tLiver\tH3K27ac\tNone\t2\tB_2\tA_2.bam
B_1\tB\tLiver\tInput\tNone\t1\tB_1\tB_1.bam
B_2\tB\tLiver\tInput\tNone\t2\tB_2\tB_2.bam
"""
@pytest.fixture
def design_bam():
design_file = StringIO(DESIGN_STRING)
design_df = pd.read_csv(design_file, sep="\t")
return design_df
@pytest.mark.unit
def test_check_update_controls(design_bam):
new_design = experiment_qc.update_controls(design_bam)
assert new_design.loc[0, 'control_reads'] == "B_1.bam"
@pytest.mark.singleend
def test_coverage_singleend():
assert os.path.exists(os.path.join(test_output_path, 'sample_mbs.npz'))
assert os.path.exists(os.path.join(test_output_path, 'coverage.pdf'))
@pytest.mark.singleend
def test_spearman_singleend():
assert os.path.exists(os.path.join(test_output_path, 'heatmap_SpearmanCorr.pdf'))
@pytest.mark.singleend
def test_pearson_singleend():
assert os.path.exists(os.path.join(test_output_path, 'heatmap_PearsonCorr.pdf'))
@pytest.mark.singleend
def test_fingerprint_singleend():
assert os.path.exists(os.path.join(test_output_path, 'ENCLB144FDT_fingerprint.pdf'))
assert os.path.exists(os.path.join(test_output_path, 'ENCLB831RUI_fingerprint.pdf'))
@pytest.mark.pairdend
def test_coverage_pairedend():
assert os.path.exists(os.path.join(test_output_path, 'sample_mbs.npz'))
assert os.path.exists(os.path.join(test_output_path, 'coverage.pdf'))
@pytest.mark.pairdend
def test_spearman_pairedend():
assert os.path.exists(os.path.join(test_output_path, 'heatmap_SpearmanCorr.pdf'))
@pytest.mark.pairdend
def test_pearson_pairedend():
assert os.path.exists(os.path.join(test_output_path, 'heatmap_PearsonCorr.pdf'))
@pytest.mark.pairdend
def test_fingerprint_pairedend():
assert os.path.exists(os.path.join(test_output_path, 'ENCLB568IYX_fingerprint.pdf'))
assert os.path.exists(os.path.join(test_output_path, 'ENCLB637LZP_fingerprint.pdf'))
#!/usr/bin/env python3
import pytest
import os
import utils
import yaml
from bs4 import BeautifulSoup
test_output_path = os.path.dirname(os.path.abspath(__file__)) + \
'/../output/multiqcReport/'
@pytest.mark.singleend
def test_software_references():
assert os.path.exists(os.path.join(test_output_path, 'software_references_mqc.yaml'))
@pytest.mark.singleend
def test_software_references_output():
software_references = os.path.join(test_output_path, 'software_references_mqc.yaml')
with open(software_references, 'r') as stream:
data_loaded = yaml.load(stream)
assert len(data_loaded['data'].split('<ul>')) == 19
@pytest.mark.singleend
def test_software_references_html():
multiqc_report = os.path.join(test_output_path, 'multiqc_report.html')
html_file = open(multiqc_report, 'r')
source_code = html_file.read()
multiqc_html = BeautifulSoup(source_code, 'html.parser')
references = multiqc_html.find(id="mqc-module-section-Software_References")
assert references is not None
assert len(references.find_all('ul')) == 18
#!/usr/bin/env python3
import pytest
import os
import utils
import yaml
test_output_path = os.path.dirname(os.path.abspath(__file__)) + \
'/../output/multiqcReport/'
@pytest.mark.singleend
def test_software_versions():
assert os.path.exists(os.path.join(test_output_path, 'software_versions_mqc.yaml'))
@pytest.mark.singleend
def test_software_versions_output():
software_versions = os.path.join(test_output_path, 'software_versions_mqc.yaml')
with open(software_versions, 'r') as stream:
data_loaded = yaml.load(stream)
assert len(data_loaded['data'].split('<dt>')) == 18
assert 'Not Run' not in data_loaded['data'].split('<dt>')[17]
#!/usr/bin/env python3
import pytest
import os
import pandas as pd
test_output_path = os.path.dirname(os.path.abspath(__file__)) + \
'/../output/filterReads/'
@pytest.mark.singleend
def test_dedup_files_singleend():
assert os.path.exists(os.path.join(test_output_path, 'ENCLB831RUI/ENCLB831RUI.dedup.bam'))
assert os.path.exists(os.path.join(test_output_path, 'ENCLB831RUI/ENCLB831RUI.dedup.bam.bai'))
assert os.path.exists(os.path.join(test_output_path, 'ENCLB831RUI/ENCLB831RUI.dedup.qc'))
@pytest.mark.singleend
def test_map_qc_singleend():
filtered_reads_report = test_output_path + 'ENCLB831RUI/ENCLB831RUI.dedup.flagstat.qc'
samtools_report = open(filtered_reads_report).readlines()
assert '64962570 + 0 in total' in samtools_report[0]
assert '64962570 + 0 mapped (100.00%:N/A)' in samtools_report[4]
@pytest.mark.singleend
def test_library_complexity_singleend():
library_complexity = test_output_path + 'ENCLB831RUI/ENCLB831RUI.pbc.qc'
df_library_complexity = pd.read_csv(library_complexity, sep='\t')
assert df_library_complexity["NRF"].iloc[0] == 0.926192
assert df_library_complexity["PBC1"].iloc[0] == 0.926775
assert df_library_complexity["PBC2"].iloc[0] == 13.706885
@pytest.mark.pairedend
def test_dedup_files_pairedend():
assert os.path.exists(os.path.join(test_output_path, 'ENCLB568IYX/ENCLB568IYX.dedup.bam'))
assert os.path.exists(os.path.join(test_output_path, 'ENCLB568IYX/ENCLB568IYX.dedup.bam.bai'))
assert os.path.exists(os.path.join(test_output_path, 'ENCLB568IYX/ENCLB568IYX.dedup.qc'))
@pytest.mark.pairedend
def test_map_qc_pairedend():
filtered_reads_report = test_output_path + 'ENCLB568IYX/ENCLB568IYX.dedup.flagstat.qc'
samtools_report = open(filtered_reads_report).readlines()
assert '47388510 + 0 in total' in samtools_report[0]
assert '47388510 + 0 mapped (100.00%:N/A)' in samtools_report[4]
@pytest.mark.pairedend
def test_library_complexity_pairedend():
library_complexity = test_output_path + 'ENCLB568IYX/ENCLB568IYX.pbc.qc'
df_library_complexity = pd.read_csv(library_complexity, sep='\t')
assert df_library_complexity["NRF"].iloc[0] == 0.947064
assert round(df_library_complexity["PBC1"].iloc[0],6) == 0.946723
assert round(df_library_complexity["PBC2"].iloc[0],6) == 18.642645
#!/usr/bin/env python3
import pytest
import os
test_output_path = os.path.dirname(os.path.abspath(__file__)) + \
'/../output/alignReads/'
@pytest.mark.singleend
def test_map_reads_singleend():
assert os.path.exists(os.path.join(test_output_path, 'ENCLB831RUI/ENCLB831RUI.bam'))
aligned_reads_report = test_output_path + 'ENCLB831RUI/ENCLB831RUI.flagstat.qc'
samtools_report = open(aligned_reads_report).readlines()
assert '80795025 + 0 in total' in samtools_report[0]
assert '80050072 + 0 mapped (99.08% : N/A)' in samtools_report[4]
@pytest.mark.pairedend
def test_map_reads_pairedend():
assert os.path.exists(os.path.join(test_output_path, 'ENCLB678IDC/ENCLB678IDC.bam'))
aligned_reads_report = test_output_path + 'ENCLB678IDC/ENCLB678IDC.flagstat.qc'
samtools_report = open(aligned_reads_report).readlines()
assert '72660890 + 0 in total' in samtools_report[0]
assert '72053925 + 0 mapped (99.16% : N/A)' in samtools_report[4]
assert '71501126 + 0 properly paired (98.40% : N/A)' in samtools_report[8]
#!/usr/bin/env python3
import pytest
import pandas as pd
from io import StringIO
import os
import utils
test_output_path = os.path.dirname(os.path.abspath(__file__)) + \
'/../output/motifSearch/'
@pytest.mark.singleend
def test_limited_peaks_singleend():
peak_file_ENCSR238SGC = test_output_path + 'ENCSR238SGC.600.narrowPeak'
assert os.path.exists(peak_file_ENCSR238SGC)
assert utils.count_lines(peak_file_ENCSR238SGC) == 600
@pytest.mark.singleend
def test_motif_search_singleend():
assert os.path.exists(os.path.join(test_output_path, 'ENCSR238SGC_memechip', 'ENCSR238SGC.fa'))
assert os.path.exists(os.path.join(test_output_path, 'ENCSR238SGC_memechip', 'index.html'))
assert os.path.getsize(os.path.join(test_output_path, 'ENCSR238SGC_memechip', 'ENCSR238SGC.fa')) > 0
@pytest.mark.singleskip_true
def test_motif_search_singleend():
assert os.path.isdir(test_output_path) == False
@pytest.mark.pairedend
def test_limited_peaks_pairedend():
peak_file_ENCSR729LGA= test_output_path + 'ENCSR729LGA.600.narrowPeak'
assert os.path.exists(peak_file_ENCSR729LGA)
assert utils.count_lines(peak_file_ENCSR729LGA) == 600
@pytest.mark.pairedend
def test_motif_search_pairedend():
assert os.path.exists(os.path.join(test_output_path, 'ENCSR729LGA_memechip', 'ENCSR729LGA.fa'))
assert os.path.exists(os.path.join(test_output_path, 'ENCSR729LGA_memechip', 'index.html'))
assert os.path.getsize(os.path.join(test_output_path, 'ENCSR729LGA_memechip', 'ENCSR729LGA.fa')) > 0
#!/usr/bin/env python3
import pytest
import pandas as pd
from io import StringIO
import os
import utils
import overlap_peaks
test_output_path = os.path.dirname(os.path.abspath(__file__)) + \
'/../output/consensusPeaks/'
DESIGN_STRING = """sample_id\tbam_reads\tbam_index\texperiment_id\tbiosample\tfactor\ttreatment\treplicate\tcontrol_id
A_1\tA_1.bam\tA_1.bai\tA\tLiver\tH3K27ac\tNone\t1\tB_1
A_2\tA_2.bam\tA_2.bai\tA\tLiver\tH3K27ac\tNone\t2\tB_2
B_1\tB_1.bam\tB_1.bai\tB\tLiver\tH3K27ac\tNone\t1\tB_1
B_2\tB_2.bam\tB_2.bai\tB\tLiver\tH3K27ac\tNone\t2\tB_2
"""
@pytest.fixture
def design_diff():
design_file = StringIO(DESIGN_STRING)
design_df = pd.read_csv(design_file, sep="\t")
return design_df
@pytest.mark.unit
def test_check_update_design(design_diff):
new_design = overlap_peaks.update_design(design_diff)
assert new_design.shape[0] == 2
assert new_design.loc[0, 'control_bam_reads'] == "B_1.bam"
assert new_design.loc[0, 'peak_caller'] == "bed"
@pytest.mark.singleend
def test_overlap_peaks_singleend():
assert os.path.exists(os.path.join(test_output_path, 'ENCSR238SGC.rejected.narrowPeak'))
peak_file = test_output_path + 'ENCSR238SGC.replicated.narrowPeak'
assert utils.count_lines(peak_file) >= 149291
@pytest.mark.pairedend
def test_overlap_peaks_pairedend():
assert os.path.exists(os.path.join(test_output_path, 'ENCSR729LGA.rejected.narrowPeak'))
peak_file = test_output_path + 'ENCSR729LGA.replicated.narrowPeak'
assert utils.count_lines(peak_file) >= 25657
@pytest.mark.singlecontrol
def test_overlap_peaks_singlecontrol():
assert os.path.exists(os.path.join(test_output_path, 'ENCSR000DXB.rejected.narrowPeak'))
peak_file = test_output_path + 'ENCSR000DXB.replicated.narrowPeak'
assert utils.count_lines(peak_file) >= 35097
#!/usr/bin/env python3
import pytest
import os
import utils
test_output_path = os.path.dirname(os.path.abspath(__file__)) + \
'/../output/experimentQC/'
@pytest.mark.singleend
def test_plot_singleend():
assert os.path.exists(os.path.join(test_output_path, 'plotProfile.png'))
assert os.path.getsize(os.path.join(test_output_path, 'plotProfile.png')) > 0
@pytest.mark.pairedend
def test_plot_pairedend():
assert os.path.exists(os.path.join(test_output_path, 'plotProfile.png'))
assert os.path.getsize(os.path.join(test_output_path, 'plotProfile.png')) > 0
#!/usr/bin/env python3
import pytest
import pandas as pd
from io import StringIO
import os
import pool_and_psuedoreplicate
import shutil
test_design_path = os.path.dirname(os.path.abspath(__file__)) + \
'/../../test_data/'
test_output_path = os.path.dirname(os.path.abspath(__file__)) + \
'/../output/design/'
DESIGN_STRING = """sample_id\tse_tag_align\ttag_align\txcor\texperiment_id\tbiosample\tfactor\ttreatment\treplicate\tcontrol_id\tcontrol_tag_align
A_1\tA_1.tagAlign.gz\tA_1.bedse.gz\tA_1.cc.qc\tA\tLiver\tH3K27ac\tNone\t1\tB_1\tB_1.bedse.gz
A_2\tA_2.tagAlign.gz\tA_2.bedse.gz\tA_2.cc.qc\tA\tLiver\tH3K27ac\tNone\t2\tB_2\tB_2.bedse.gz
"""
@pytest.fixture
def design_experiment():
design_file = StringIO(DESIGN_STRING)
design_df = pd.read_csv(design_file, sep="\t")
return design_df
@pytest.fixture
def design_experiment_2(design_experiment):
# Drop Replicate A_2
design_df = design_experiment.drop(design_experiment.index[1])
return design_df
@pytest.fixture
def design_experiment_3(design_experiment):
# Drop Replicate A_2
design_df = design_experiment.drop(design_experiment.index[1])
# Update to be paired as first
design_df.loc[0, 'control_tag_align'] = 'B_1.bedpe.gz'
design_df.loc[0, 'tag_align'] = 'A_1.bedpe.gz'
return design_df
@pytest.mark.unit
def test_check_replicates(design_experiment):
no_reps = pool_and_psuedoreplicate.check_replicates(design_experiment)
assert no_reps == 2
@pytest.mark.unit
def test_check_replicates_single(design_experiment_2):
no_reps = pool_and_psuedoreplicate.check_replicates(design_experiment_2)
assert no_reps == 1
@pytest.mark.unit
def test_check_controls(design_experiment):
no_controls = pool_and_psuedoreplicate.check_controls(design_experiment)
assert no_controls == 2
@pytest.mark.unit
def test_check_controls_single(design_experiment_3):
no_controls = pool_and_psuedoreplicate.check_controls(design_experiment_3)
assert no_controls == 1
@pytest.mark.unit
def test_single_rep(design_experiment_2):
cwd = os.getcwd()
shutil.copy(test_design_path + 'A_1.bedse.gz', cwd)
shutil.copy(test_design_path + 'B_1.bedse.gz', cwd)
shutil.copy(test_design_path + 'A_1.tagAlign.gz', cwd)
shutil.copy(test_design_path + 'B_1.tagAlign.gz', cwd)
single_rep = pool_and_psuedoreplicate.generate_design('false', 1.2, design_experiment_2, cwd, 1, 1)
assert single_rep.shape[0] == 4
assert len(single_rep['control_tag_align'].unique()) == 2
assert 'pool_control.tagAlign.gz' in single_rep['control_tag_align'].unique()[1]
@pytest.mark.unit
def test_single_control(design_experiment_3):
cwd = os.getcwd()
shutil.copy(test_design_path + 'A_1.bedpe.gz', cwd)
shutil.copy(test_design_path + 'B_1.bedpe.gz', cwd)
shutil.copy(test_design_path + 'A_1.tagAlign.gz', cwd)
single_control = pool_and_psuedoreplicate.generate_design('true', 1.2, design_experiment_3, cwd, 1, 1)
assert 'pool_control.tagAlign.gz' in single_control['control_tag_align'].unique()[0]
@pytest.mark.singleend
def test_pool_and_psuedoreplicate_singleend():
design_file = os.path.join(test_output_path, 'ENCSR238SGC_ppr.tsv')
assert os.path.exists(design_file)
design_df = pd.read_csv(design_file, sep="\t")
assert design_df.shape[0] == 5
@pytest.mark.pairedend
def test_experiment_design_pairedend():
design_file = os.path.join(test_output_path, 'ENCSR729LGA_ppr.tsv')
assert os.path.exists(design_file)
design_df = pd.read_csv(design_file, sep="\t")
assert design_df.shape[0] == 5
#!/usr/bin/env python3
import pytest
import os
test_data_path = os.path.dirname(os.path.abspath(__file__)) + \
'/../../test_data/'
test_output_path = os.path.dirname(os.path.abspath(__file__)) + \
'/../output/trimReads/'
@pytest.mark.singleend
def test_trim_reads_singleend():
raw_fastq = test_data_path + 'ENCFF833BLU.fastq.gz'
trimmed_fastq = test_output_path + 'ENCLB144FDT/ENCLB144FDT_R1_trimmed.fq.gz'
assert os.path.getsize(raw_fastq) != os.path.getsize(trimmed_fastq)
assert os.path.getsize(trimmed_fastq) == 2512853101
@pytest.mark.singleend
def test_trim_report_singleend():
trimmed_fastq_report = test_output_path + \
'ENCLB144FDT/ENCLB144FDT_R1.fastq.gz_trimming_report.txt'
assert 'Trimming mode: single-end' in open(trimmed_fastq_report).readlines()[4]
@pytest.mark.pairedend
def test_trim_reads_pairedend():
raw_fastq = test_data_path + 'ENCFF582IOZ.fastq.gz'
trimmed_fastq = test_output_path + 'ENCLB637LZP/ENCLB637LZP_R2_val_2.fq.gz'
assert os.path.getsize(raw_fastq) != os.path.getsize(trimmed_fastq)
assert os.path.getsize(trimmed_fastq) == 2229312710
@pytest.mark.pairedend
def test_trim_report_pairedend():
trimmed_fastq_report = test_output_path + \
'ENCLB637LZP/ENCLB637LZP_R2.fastq.gz_trimming_report.txt'
assert 'Trimming mode: paired-end' in open(trimmed_fastq_report).readlines()[4]
#!/usr/bin/env python3
import pytest
import utils
STRIP_EXTENSIONS = ['.gz', '.fq', '.fastq', '.fa', '.fasta']
@pytest.fixture
def steps():
steps = []
return steps
@pytest.fixture
def steps_1(steps):
design_file = "test_data/design_ENCSR238SGC_SE.txt"
step = [
"grep H3K4me1 %s " % (design_file)]
return step
@pytest.fixture
def steps_2(steps_1):
steps_1.extend([
"cut -f8"
])
return steps_1
@pytest.mark.unit
def test_run_one_step(steps_1, capsys):
check_output = 'ENCLB144FDT\tENCSR238SGC\tlimb\tH3K4me1\tNone\t1\tENCLB304SBJ\tENCFF833BLU.fastq.gz'.encode('UTF-8')
out, err = utils.run_pipe(steps_1)
output, errors = capsys.readouterr()
assert "first step shlex to stdout" in output
assert check_output in out
@pytest.mark.unit
def test_run_two_step(steps_2, capsys):
check_output = 'ENCFF833BLU.fastq.gz\nENCFF646LXU.fastq.gz'.encode('UTF-8')
out, err = utils.run_pipe(steps_2)
output, errors = capsys.readouterr()
assert "intermediate step 2 shlex to stdout" in output
assert check_output in out
@pytest.mark.unit
def test_run_last_step_file(steps_2, capsys, tmpdir):
check_output = 'ENCFF833BLU.fastq.gz\nENCFF646LXU.fastq.gz'
tmp_outfile = tmpdir.join('output.txt')
out, err = utils.run_pipe(steps_2, tmp_outfile.strpath)
output, errors = capsys.readouterr()
assert "last step shlex" in output
assert check_output in tmp_outfile.read()
@pytest.mark.unit
def test_strip_extensions():
filename = utils.strip_extensions('ENCFF833BLU.fastq.gz', STRIP_EXTENSIONS)
assert filename == 'ENCFF833BLU'
@pytest.mark.unit
def test_strip_extensions_not_valid():
filename = utils.strip_extensions('ENCFF833BLU.not.valid', STRIP_EXTENSIONS)
assert filename == 'ENCFF833BLU.not.valid'
@pytest.mark.unit
def test_strip_extensions_missing_basename():
filename = utils.strip_extensions('.fastq.gz', STRIP_EXTENSIONS)
assert filename == '.fastq'
#!/usr/bin/env python3
import pytest
import os
import pandas as pd
test_output_path = os.path.dirname(os.path.abspath(__file__)) + \
'/../output/crossReads/'
@pytest.mark.singleend
def test_cross_plot_singleend():
assert os.path.exists(os.path.join(test_output_path, 'ENCLB144FDT/ENCLB144FDT.cc.plot.pdf'))
@pytest.mark.singleend
def test_cross_qc_singleend():
qc_file = os.path.join(test_output_path,"ENCLB144FDT/ENCLB144FDT.cc.qc")
df_xcor = pd.read_csv(qc_file, sep="\t", header=None)
assert df_xcor[2].iloc[0] == '185,195,205'
assert df_xcor[8].iloc[0] == 1.02454
assert df_xcor[9].iloc[0] == 0.8098014
@pytest.mark.pairedend
def test_cross_qc_pairedend():
assert os.path.exists(os.path.join(test_output_path, 'ENCLB568IYX/ENCLB568IYX.cc.plot.pdf'))
@pytest.mark.pairedend
def test_cross_plot_pairedend():
qc_file = os.path.join(test_output_path,"ENCLB568IYX/ENCLB568IYX.cc.qc")
df_xcor = pd.read_csv(qc_file, sep="\t", header=None)
assert df_xcor[2].iloc[0] == '215,225,455'
assert round(df_xcor[8].iloc[0],6) == 1.056201
assert df_xcor[9].iloc[0] == 3.599357