diff --git a/workflow/conf/biohpc.config b/workflow/conf/biohpc.config index da1209423786bce5c7cb745e043a97136226bcd0..c43f8e0d85733142342c9c6ca44886f6d448f6ad 100644 --- a/workflow/conf/biohpc.config +++ b/workflow/conf/biohpc.config @@ -35,6 +35,11 @@ process { module = ['python/3.6.1-2-anaconda'] executor = 'local' } + $poolAndPsuedoReads { + module = ['python/3.6.1-2-anaconda'] + executor = 'local' + cpus = 32 + } } params { diff --git a/workflow/main.nf b/workflow/main.nf index e04236a1f4913c40aed881805666ba32b8d61e41..fa205e6a4cf4bd967a03541a3ee345ccfb7461b5 100644 --- a/workflow/main.nf +++ b/workflow/main.nf @@ -10,6 +10,7 @@ params.designFile = "$baseDir/../test_data/design_ENCSR238SGC_SE.txt" params.genome = 'GRCm38' params.genomes = [] params.bwaIndex = params.genome ? params.genomes[ params.genome ].bwa ?: false : false +params.cutoffRatio = 1.2 // Check inputs if( params.bwaIndex ){ @@ -30,6 +31,7 @@ readsList = Channel // Define regular variables pairedEnd = params.pairedEnd designFile = params.designFile +cutoffRatio = params.cutoffRatio process checkDesignFile { @@ -276,3 +278,32 @@ process defineExpDesignFiles { """ } + + +// Make Experiment design files to be read in for downstream analysis +process poolAndPsuedoReads { + + publishDir "$baseDir/output/design", mode: 'copy' + + input: + + file experimentObjs + + output: + + file '*.tsv' into experimentPoolObjs + + script: + + if (pairedEnd) { + """ + python3 $baseDir/scripts/pool_and_psuedoreplicate.py -t $experimentObjs -p -c cutoffRatio + """ + } + else { + """ + python3 $baseDir/scripts/pool_and_psuedoreplicate.py -t $experimentObjs -c cutoffRatio + """ + } + +} diff --git a/workflow/scripts/pool_and_psuedoreplicate.py b/workflow/scripts/pool_and_psuedoreplicate.py new file mode 100644 index 0000000000000000000000000000000000000000..cb604ace3fc0e5da362bec334fbdf39a14379074 --- /dev/null +++ b/workflow/scripts/pool_and_psuedoreplicate.py @@ -0,0 +1,277 @@ +#!/usr/bin/env python3 + +'''Generate pooled and pseudoreplicate from data.''' + +import argparse +import utils +import logging +import pandas as pd +import numpy as np +import os + +EPILOG = ''' +For more details: + %(prog)s --help +''' + +# SETTINGS + +logger = logging.getLogger(__name__) +logger.addHandler(logging.NullHandler()) +logger.propagate = False +logger.setLevel(logging.INFO) + + +def get_args(): + '''Define arguments.''' + + parser = argparse.ArgumentParser( + description=__doc__, epilog=EPILOG, + formatter_class=argparse.RawDescriptionHelpFormatter) + + parser.add_argument('-d', '--design', + help="The design file to make experiemnts (tsv format).", + required=True) + + parser.add_argument('-p', '--paired', + help="True/False if paired-end or single end.", + default=False, + action='store_true') + + parser.add_argument('-c', '--cutoff', + help="Cutoff ratio used for choosing controls.", + default=1.2) + + args = parser.parse_args() + return args + + +def check_replicates(design): + '''Check the number of replicates for the experiment.''' + + no_rep = design.shape[0] + + return no_rep + + +def check_controls(design): + '''Check the number of controls for the experiment.''' + + no_controls = len(design.control_tag_align.unique()) + + return no_controls + + +def get_read_count_ratio(design): + '''Get the ratio of read counts for unique controls.''' + + controls = design.control_tag_align.unique() + control_dict = {} + for con in controls: + no_reads = utils.count_lines(con) + control_dict[con] = no_reads + + control_matrix = {c: control_dict for c in controls} + control_df = pd.DataFrame.from_dict(control_matrix) + + control_ratio = control_df.divide(list(control_dict.values()), axis=0) + return control_ratio + + +def pool(tag_files, outfile, paired): + '''Pool files together.''' + + if paired: + file_extension = 'bedpe.gz' + else: + file_extension = 'bedse.gz' + + pooled_filename = outfile + file_extension + + # Merge files + out, err = utils.run_pipe([ + 'gzip -dc %s' % (' '.join(tag_files)), + 'gzip -cn'], + outfile=pooled_filename) + + return pooled_filename + + +def self_psuedoreplication(tag_file, prefix, reps, paired): + '''Make n number of self-psuedoreplicates equivlent to reps.''' + + # Get total number of reads + no_lines = utils.count_lines(tag_file) + + # Number of lines to split into + lines_per_rep = round(no_lines/reps) + + # Make an array of number of psuedoreplicatesfile names + pseudoreplicate_dict = {r: prefix + '.pr' + str(r) + '.bedse.tagAlign.gz' + for r in list(range(0, reps))} + + # Shuffle and split file into equal parts + # by using the input to seed shuf we ensure multiple runs with the same + # input will produce the same output + # Produces two files named splits_prefix0n, n=1,2 + + splits_prefix = 'temp_split' + + out, err = utils.run_pipe([ + 'gzip -dc %s' % (tag_file), + 'shuf --random-source=%s' % (tag_file), + 'split -d -l %d - %s' % (lines_per_rep, splits_prefix)]) + + # Convert read pairs to reads into standard tagAlign file + + for i, index in enumerate(list(range(0, reps))): + steps = ['cat %s' % (splits_prefix + index)] + if paired: + steps.extend([r"""awk 'BEGIN{OFS="\t"}{printf "%s\t%s\t%s\tN\t1000\t%s\n%s\t%s\t%s\tN\t1000\t%s\n",$1,$2,$3,$9,$4,$5,$6,$10}'"""]) + steps.extend(['gzip -cn']) + out, err = utils.run_pipe(steps, outfile=pseudoreplicate_dict[i]) + os.remove(splits_prefix + index) + + return pseudoreplicate_dict + + +def main(): + args = get_args() + paired = args.paired + design = args.design + cutoff_ratio = args.cutoff_ratio + + # Create a file handler + handler = logging.FileHandler('experiment_generation.log') + logger.addHandler(handler) + + # Read files as dataframes + design_df = pd.read_csv(design, sep='\t') + + # Get current directory to build paths + cwd = os.getwd() + + # Check Number of replicates and replicates + no_reps = check_replicates(design_df) + no_unique_controls = check_controls(design_df) + + if no_reps == 1: + logger.info("No other replicate specified " + "so processing as an unreplicated experiment.") + replicated = False + + else: + logger.info("Multiple replicates specified" + "so processing as a replicated experiment.") + replicated = True + + if no_unique_controls == 1 and replicated: + logger.info("Only a single control was specified" + "so using same control for replicates, pool and psuedoreplicates.") + single_control = True + else: + logger.info("Will merge only unique controls for pooled") + single_control = False + + # Pool the controls for checking + if not single_control: + control_df = get_read_count_ratio(design_df) + control_files = design_df.control_tag_align.unique() + pool_control = pool(control_files, "pool_control", paired) + else: + pool_control = design_df.control_tag_align.unique()[0] + + # Psuedoreplicate and update design accordingly + if not replicated: + + # Duplicate rows and update for pool and psuedoreplicates + experiment_id = design_df.at[0, 'experiment_id'] + replicate = design_df.at[0, 'replicate'] + design_new_df = design_df.loc[np.repeat(design_df.index, 4)].reset_index() + design_new_df.set_value(1, 'sample_id', experiment_id + '_pr') + design_new_df.set_value(1, 'replicate', '1_pr') + design_new_df.set_value(2, 'sample_id', experiment_id + '_pr') + design_new_df.set_value(2, 'replicate', '2_pr') + design_new_df.set_value(3, 'sample_id', experiment_id + '_pooled') + design_new_df.set_value(3, 'sample_id', experiment_id + '_pooled') + + # Make 2 self psuedoreplicates + self_pseudoreplicates_dict = {} + for rep, tag_file in zip(design_df['replicate'], design_df['tag_align']): + replicate_prefix = experiment_id + '_' + rep + self_pseudoreplicates_dict = \ + self_psuedoreplication(tag_file, replicate_prefix, 2, paired) + + # Update design to include new self pseudo replicates + for rep, pseudorep_file in self_pseudoreplicates_dict.items(): + path_to_file = cwd + '/' + pseudorep_file + replicate = rep + 1 + design_new_df.loc[replicate, 'tag_align'] = path_to_file + + else: + # Make pool of replicates + replicate_files = design_df.tag_align.unique() + experiment_id = design_df.at[0, 'experiment_id'] + pool_experiment = pool(replicate_files, experiment_id + "_pooled", paired) + + # Make self psuedoreplicates equivalent to number of replicates + pseudoreplicates_dict = {} + for rep, tag_file in zip(design_df['replicate'], design_df['tag_align']): + replicate_prefix = experiment_id + '_' + rep + pr_dict = self_psuedoreplication(tag_file, replicate_prefix, no_reps, paired) + pseudoreplicates_dict[rep] = pr_dict + + # Merge self psuedoreplication for each true replicate + pseudoreplicates_df = pd.DataFrame.from_dict(pseudoreplicates_dict) + pool_pseudoreplicates_dict = {} + for index, row in pseudoreplicates_df.iterrows(): + replicate_id = index + 1 + pr_filename = experiment_id + ".pr" + replicate_id + '.bedse.gz' + pool_replicate = pool(row, pr_filename, False) + pool_pseudoreplicates_dict[replicate_id] = pool_replicate + + design_new_df = design_df + # Check controls against cutoff_ratio + # if so replace with pool_control + path_to_pool_control = cwd + '/' + pool_control + if control_df[control_df > cutoff_ratio].values.any(): + logger.info("Number of reads in controls differ by " + + " > factor of %f. Using pooled controls." % (cutoff_ratio)) + design_new_df['control_tag_align'] = path_to_pool_control + else: + for index, row in design_new_df.iterrows(): + exp_no_reads = utils.count_lines(row['tag_align']) + con_no_reads = utils.count_lines(row['control_tag_align']) + if con_no_reads < exp_no_reads: + logger.info("Fewer reads in control than experiment." + + "Using pooled controls for replicate %s." + % row['replicate']) + design_new_df.loc[index, 'control_tag_align'] = \ + path_to_pool_control + + # Add in pseudo replicates + tmp_metadata = design_new_df.loc[0].copy() + tmp_metadata['control_tag_align'] = path_to_pool_control + for rep, pseudorep_file in pool_pseudoreplicates_dict.items(): + tmp_metadata['sample_id'] = experiment_id + '_pr' + rep + tmp_metadata['replicate'] = rep + '_pr' + tmp_metadata['xcor'] = 'Calculate' + path_to_file = cwd + '/' + pseudorep_file + tmp_metadata['tag_align'] = path_to_file + design_new_df = design_new_df.append(tmp_metadata) + + # Add in pool experiment + tmp_metadata['sample_id'] = experiment_id + '_pooled' + tmp_metadata['replicate'] = 'pooled' + tmp_metadata['xcor'] = 'Calculate' + path_to_file = cwd + '/' + pool_experiment + tmp_metadata['tag_align'] = path_to_file + design_new_df = design_new_df.append(tmp_metadata) + + # Write out new dataframe + design_new_df.to_csv(experiment_id + '_ppr.tsv', + header=True, sep='\t', index=False) + + +if __name__ == '__main__': + main() diff --git a/workflow/scripts/utils.py b/workflow/scripts/utils.py index 5162646d56b5281a88cd7195dba784865cffe7f8..11c4170a00153f4c2b20dce03706e529231cb294 100644 --- a/workflow/scripts/utils.py +++ b/workflow/scripts/utils.py @@ -53,3 +53,22 @@ def strip_extensions(filename, extensions): basename = basename.rpartition(extension)[0] or basename return basename + + +def count_lines(filename): + from magic import from_file + compressed_mimetypes = [ + "application/x-compress", + "application/x-bzip2", + "application/x-gzip" + ] + mime_type = from_file(fname, mime=True) + if mime_type in compressed_mimetypes: + catcommand = 'gzip -dc' + else: + catcommand = 'cat' + out, err = run_pipe([ + '%s %s' % (catcommand, fname), + 'wc -l' + ]) + return int(out) diff --git a/workflow/tests/test_experiment_design.py b/workflow/tests/test_experiment_design.py index 508dbb12f7588a41622aff7eca1d58a4bc31a1ac..bfbd3c01148fb2e31e40f0ddcf25a4f208b92caa 100644 --- a/workflow/tests/test_experiment_design.py +++ b/workflow/tests/test_experiment_design.py @@ -30,7 +30,10 @@ def test_check_update_controls_tag(design_tag): def test_experiment_design_single_end(): - assert os.path.exists(os.path.join(test_output_path, 'ENCSR238SGC.tsv')) + design_file = os.path.join(test_output_path, 'ENCSR238SGC.tsv') + assert os.path.exists(design_file) + design_df = pd.read_csv(design_file, sep="\t") + assert design_df.shape[0] == 2 def test_experiment_design_paired_end(): diff --git a/workflow/tests/test_pool_and_psuedoreplicate.py b/workflow/tests/test_pool_and_psuedoreplicate.py new file mode 100644 index 0000000000000000000000000000000000000000..fa09952ec15d9d33ef5a53d2c3d027ec4c885fea --- /dev/null +++ b/workflow/tests/test_pool_and_psuedoreplicate.py @@ -0,0 +1,67 @@ +#!/usr/bin/env python3 + +import pytest +import pandas as pd +from io import StringIO +import pool_and_psuedoreplicate + +test_output_path = os.path.dirname(os.path.abspath(__file__)) + \ + '/../output/design/' + +DESIGN_STRING = """sample_id\ttag_align\txcor\tbiosample\tfactor\ttreatment\treplicate\tcontrol_tag_align +A_1\tA_1.bedse.gz\tA_1.cc.qc\tLiver\tH3K27ac\tNone\t1\tB_1.bedse.gz +A_2\tA_2.bedse.gz\tA_2.cc.qc\tLiver\tH3K27ac\tNone\t2\tB_2.bedse.gz +""" + + +@pytest.fixture +def design_experiment(): + design_file = StringIO(DESIGN_STRING) + design_df = pd.read_csv(design_file, sep="\t") + return design_df + + +@pytest.fixture +def design_experiment_2(design_experiment): + # Drop Replicate A_2 + design_df = design_experiment.drop(design_experiment.index[1]) + return design_df + + +@pytest.fixture +def design_experiment_3(design_experiment): + # Update second control to be same as first + design_experiment.loc[1, 'control_tag_align'] = 'B_1.bedse.gz' + return design_experiment + + +def test_check_replicates(design_experiment): + no_reps = pool_and_psuedoreplicate.check_replicates(design_experiment) + assert no_reps == 2 + + +def test_check_replicates_single(design_experiment_2): + no_reps = pool_and_psuedoreplicate.check_replicates(design_experiment_2) + assert no_reps == 1 + + +def test_check_controls(design_experiment): + no_controls = pool_and_psuedoreplicate.check_controls(design_experiment) + assert no_controls == 2 + + +def test_check_controls_single(design_experiment_3): + no_controls = pool_and_psuedoreplicate.check_controls(design_experiment_3) + assert no_controls == 1 + + +def test_pool_and_psuedoreplicate_single_end(): + design_file = os.path.join(test_output_path, 'ENCSR238SGC_ppr.tsv') + assert os.path.exists(design_file) + design_df = pd.read_csv(design_file, sep="\t") + assert design_df.shape[0] == 5 + + +def test_experiment_design_paired_end(): + # Do the same thing for paired end data + pass