From 32e90024fb8610d632f0a723451beb358419b273 Mon Sep 17 00:00:00 2001
From: Venkat Malladi <venkat.malladi@utsouthwestern.edu>
Date: Fri, 27 Oct 2017 11:30:12 -0500
Subject: [PATCH] Add in pooling and psuedoreplicate scripts and tests.

---
 workflow/conf/biohpc.config                   |   5 +
 workflow/main.nf                              |  31 ++
 workflow/scripts/pool_and_psuedoreplicate.py  | 277 ++++++++++++++++++
 workflow/scripts/utils.py                     |  19 ++
 workflow/tests/test_experiment_design.py      |   5 +-
 .../tests/test_pool_and_psuedoreplicate.py    |  67 +++++
 6 files changed, 403 insertions(+), 1 deletion(-)
 create mode 100644 workflow/scripts/pool_and_psuedoreplicate.py
 create mode 100644 workflow/tests/test_pool_and_psuedoreplicate.py

diff --git a/workflow/conf/biohpc.config b/workflow/conf/biohpc.config
index da12094..c43f8e0 100644
--- a/workflow/conf/biohpc.config
+++ b/workflow/conf/biohpc.config
@@ -35,6 +35,11 @@ process {
     module = ['python/3.6.1-2-anaconda']
     executor = 'local'
   }
+  $poolAndPsuedoReads {
+    module = ['python/3.6.1-2-anaconda']
+    executor = 'local'
+    cpus = 32
+  }
 }
 
 params {
diff --git a/workflow/main.nf b/workflow/main.nf
index e04236a..fa205e6 100644
--- a/workflow/main.nf
+++ b/workflow/main.nf
@@ -10,6 +10,7 @@ params.designFile = "$baseDir/../test_data/design_ENCSR238SGC_SE.txt"
 params.genome = 'GRCm38'
 params.genomes = []
 params.bwaIndex = params.genome ? params.genomes[ params.genome ].bwa ?: false : false
+params.cutoffRatio = 1.2
 
 // Check inputs
 if( params.bwaIndex ){
@@ -30,6 +31,7 @@ readsList = Channel
 // Define regular variables
 pairedEnd = params.pairedEnd
 designFile = params.designFile
+cutoffRatio = params.cutoffRatio
 
 process checkDesignFile {
 
@@ -276,3 +278,32 @@ process defineExpDesignFiles {
   """
 
 }
+
+
+// Make Experiment design files to be read in for downstream analysis
+process poolAndPsuedoReads {
+
+  publishDir "$baseDir/output/design", mode: 'copy'
+
+  input:
+
+  file experimentObjs
+
+  output:
+
+  file '*.tsv' into experimentPoolObjs
+
+  script:
+
+  if (pairedEnd) {
+    """
+    python3 $baseDir/scripts/pool_and_psuedoreplicate.py -t $experimentObjs -p -c cutoffRatio
+    """
+  }
+  else {
+    """
+    python3 $baseDir/scripts/pool_and_psuedoreplicate.py -t $experimentObjs -c cutoffRatio
+    """
+  }
+
+}
diff --git a/workflow/scripts/pool_and_psuedoreplicate.py b/workflow/scripts/pool_and_psuedoreplicate.py
new file mode 100644
index 0000000..cb604ac
--- /dev/null
+++ b/workflow/scripts/pool_and_psuedoreplicate.py
@@ -0,0 +1,277 @@
+#!/usr/bin/env python3
+
+'''Generate pooled and pseudoreplicate from data.'''
+
+import argparse
+import utils
+import logging
+import pandas as pd
+import numpy as np
+import os
+
+EPILOG = '''
+For more details:
+        %(prog)s --help
+'''
+
+# SETTINGS
+
+logger = logging.getLogger(__name__)
+logger.addHandler(logging.NullHandler())
+logger.propagate = False
+logger.setLevel(logging.INFO)
+
+
+def get_args():
+    '''Define arguments.'''
+
+    parser = argparse.ArgumentParser(
+        description=__doc__, epilog=EPILOG,
+        formatter_class=argparse.RawDescriptionHelpFormatter)
+
+    parser.add_argument('-d', '--design',
+                        help="The design file to make experiemnts (tsv format).",
+                        required=True)
+
+    parser.add_argument('-p', '--paired',
+                        help="True/False if paired-end or single end.",
+                        default=False,
+                        action='store_true')
+
+    parser.add_argument('-c', '--cutoff',
+                        help="Cutoff ratio used for choosing controls.",
+                        default=1.2)
+
+    args = parser.parse_args()
+    return args
+
+
+def check_replicates(design):
+    '''Check the number of replicates for the experiment.'''
+
+    no_rep = design.shape[0]
+
+    return no_rep
+
+
+def check_controls(design):
+    '''Check the number of controls for the experiment.'''
+
+    no_controls = len(design.control_tag_align.unique())
+
+    return no_controls
+
+
+def get_read_count_ratio(design):
+    '''Get the ratio of read counts for unique controls.'''
+
+    controls = design.control_tag_align.unique()
+    control_dict = {}
+    for con in controls:
+        no_reads = utils.count_lines(con)
+        control_dict[con] = no_reads
+
+    control_matrix = {c: control_dict for c in controls}
+    control_df = pd.DataFrame.from_dict(control_matrix)
+
+    control_ratio = control_df.divide(list(control_dict.values()), axis=0)
+    return control_ratio
+
+
+def pool(tag_files, outfile, paired):
+    '''Pool files together.'''
+
+    if paired:
+        file_extension = 'bedpe.gz'
+    else:
+        file_extension = 'bedse.gz'
+
+    pooled_filename = outfile + file_extension
+
+    # Merge files
+    out, err = utils.run_pipe([
+        'gzip -dc %s' % (' '.join(tag_files)),
+        'gzip -cn'],
+        outfile=pooled_filename)
+
+    return pooled_filename
+
+
+def self_psuedoreplication(tag_file, prefix, reps, paired):
+    '''Make n number of self-psuedoreplicates equivlent to reps.'''
+
+    # Get total number of reads
+    no_lines = utils.count_lines(tag_file)
+
+    # Number of lines to split into
+    lines_per_rep = round(no_lines/reps)
+
+    # Make an array of number of psuedoreplicatesfile names
+    pseudoreplicate_dict = {r: prefix + '.pr' + str(r) + '.bedse.tagAlign.gz'
+                            for r in list(range(0, reps))}
+
+    # Shuffle and split file into equal parts
+    # by using the input to seed shuf we ensure multiple runs with the same
+    # input will produce the same output
+    # Produces two files named splits_prefix0n, n=1,2
+
+    splits_prefix = 'temp_split'
+
+    out, err = utils.run_pipe([
+        'gzip -dc %s' % (tag_file),
+        'shuf --random-source=%s' % (tag_file),
+        'split -d -l %d - %s' % (lines_per_rep, splits_prefix)])
+
+    # Convert read pairs to reads into standard tagAlign file
+
+    for i, index in enumerate(list(range(0, reps))):
+        steps = ['cat %s' % (splits_prefix + index)]
+        if paired:
+            steps.extend([r"""awk 'BEGIN{OFS="\t"}{printf "%s\t%s\t%s\tN\t1000\t%s\n%s\t%s\t%s\tN\t1000\t%s\n",$1,$2,$3,$9,$4,$5,$6,$10}'"""])
+        steps.extend(['gzip -cn'])
+        out, err = utils.run_pipe(steps, outfile=pseudoreplicate_dict[i])
+        os.remove(splits_prefix + index)
+
+    return pseudoreplicate_dict
+
+
+def main():
+    args = get_args()
+    paired = args.paired
+    design = args.design
+    cutoff_ratio = args.cutoff_ratio
+
+    # Create a file handler
+    handler = logging.FileHandler('experiment_generation.log')
+    logger.addHandler(handler)
+
+    # Read files as dataframes
+    design_df = pd.read_csv(design, sep='\t')
+
+    # Get current directory to build paths
+    cwd = os.getwd()
+
+    # Check Number of replicates and replicates
+    no_reps = check_replicates(design_df)
+    no_unique_controls = check_controls(design_df)
+
+    if no_reps == 1:
+        logger.info("No other replicate specified "
+                    "so processing as an unreplicated experiment.")
+        replicated = False
+
+    else:
+        logger.info("Multiple replicates specified"
+                    "so processing as a replicated experiment.")
+        replicated = True
+
+    if no_unique_controls == 1 and replicated:
+        logger.info("Only a single control was specified"
+                    "so using same control for replicates, pool and psuedoreplicates.")
+        single_control = True
+    else:
+        logger.info("Will merge only unique controls for pooled")
+        single_control = False
+
+    # Pool the controls for checking
+    if not single_control:
+        control_df = get_read_count_ratio(design_df)
+        control_files = design_df.control_tag_align.unique()
+        pool_control = pool(control_files, "pool_control", paired)
+    else:
+        pool_control = design_df.control_tag_align.unique()[0]
+
+    # Psuedoreplicate and update design accordingly
+    if not replicated:
+
+        # Duplicate rows and update for pool and psuedoreplicates
+        experiment_id = design_df.at[0, 'experiment_id']
+        replicate = design_df.at[0, 'replicate']
+        design_new_df = design_df.loc[np.repeat(design_df.index, 4)].reset_index()
+        design_new_df.set_value(1, 'sample_id', experiment_id + '_pr')
+        design_new_df.set_value(1, 'replicate', '1_pr')
+        design_new_df.set_value(2, 'sample_id', experiment_id + '_pr')
+        design_new_df.set_value(2, 'replicate', '2_pr')
+        design_new_df.set_value(3, 'sample_id', experiment_id + '_pooled')
+        design_new_df.set_value(3, 'sample_id', experiment_id + '_pooled')
+
+        # Make 2 self psuedoreplicates
+        self_pseudoreplicates_dict = {}
+        for rep, tag_file in zip(design_df['replicate'], design_df['tag_align']):
+            replicate_prefix = experiment_id + '_' + rep
+            self_pseudoreplicates_dict = \
+                self_psuedoreplication(tag_file, replicate_prefix, 2, paired)
+
+        # Update design to include new self pseudo replicates
+        for rep, pseudorep_file in self_pseudoreplicates_dict.items():
+            path_to_file = cwd + '/' + pseudorep_file
+            replicate = rep + 1
+            design_new_df.loc[replicate, 'tag_align'] = path_to_file
+
+    else:
+        # Make pool of replicates
+        replicate_files = design_df.tag_align.unique()
+        experiment_id = design_df.at[0, 'experiment_id']
+        pool_experiment = pool(replicate_files, experiment_id + "_pooled", paired)
+
+        # Make self psuedoreplicates equivalent to number of replicates
+        pseudoreplicates_dict = {}
+        for rep, tag_file in zip(design_df['replicate'], design_df['tag_align']):
+            replicate_prefix = experiment_id + '_' + rep
+            pr_dict = self_psuedoreplication(tag_file, replicate_prefix, no_reps, paired)
+            pseudoreplicates_dict[rep] = pr_dict
+
+        # Merge self psuedoreplication for each true replicate
+        pseudoreplicates_df = pd.DataFrame.from_dict(pseudoreplicates_dict)
+        pool_pseudoreplicates_dict = {}
+        for index, row in pseudoreplicates_df.iterrows():
+            replicate_id = index + 1
+            pr_filename = experiment_id + ".pr" + replicate_id + '.bedse.gz'
+            pool_replicate = pool(row, pr_filename, False)
+            pool_pseudoreplicates_dict[replicate_id] = pool_replicate
+
+        design_new_df = design_df
+        # Check controls against cutoff_ratio
+        # if so replace with pool_control
+        path_to_pool_control = cwd + '/' + pool_control
+        if control_df[control_df > cutoff_ratio].values.any():
+            logger.info("Number of reads in controls differ by " +
+                " > factor of %f. Using pooled controls." % (cutoff_ratio))
+            design_new_df['control_tag_align'] = path_to_pool_control
+        else:
+            for index, row in design_new_df.iterrows():
+                exp_no_reads = utils.count_lines(row['tag_align'])
+                con_no_reads = utils.count_lines(row['control_tag_align'])
+                if con_no_reads < exp_no_reads:
+                    logger.info("Fewer reads in control than experiment." +
+                                "Using pooled controls for replicate %s."
+                                % row['replicate'])
+                    design_new_df.loc[index, 'control_tag_align'] = \
+                                                            path_to_pool_control
+
+        # Add in pseudo replicates
+        tmp_metadata = design_new_df.loc[0].copy()
+        tmp_metadata['control_tag_align'] = path_to_pool_control
+        for rep, pseudorep_file in pool_pseudoreplicates_dict.items():
+            tmp_metadata['sample_id'] = experiment_id + '_pr' + rep
+            tmp_metadata['replicate'] = rep + '_pr'
+            tmp_metadata['xcor'] = 'Calculate'
+            path_to_file = cwd + '/' + pseudorep_file
+            tmp_metadata['tag_align'] = path_to_file
+            design_new_df = design_new_df.append(tmp_metadata)
+
+        # Add in pool experiment
+        tmp_metadata['sample_id'] = experiment_id + '_pooled'
+        tmp_metadata['replicate'] = 'pooled'
+        tmp_metadata['xcor'] = 'Calculate'
+        path_to_file = cwd + '/' + pool_experiment
+        tmp_metadata['tag_align'] = path_to_file
+        design_new_df = design_new_df.append(tmp_metadata)
+
+    # Write out new dataframe
+    design_new_df.to_csv(experiment_id + '_ppr.tsv',
+                        header=True, sep='\t', index=False)
+
+
+if __name__ == '__main__':
+    main()
diff --git a/workflow/scripts/utils.py b/workflow/scripts/utils.py
index 5162646..11c4170 100644
--- a/workflow/scripts/utils.py
+++ b/workflow/scripts/utils.py
@@ -53,3 +53,22 @@ def strip_extensions(filename, extensions):
         basename = basename.rpartition(extension)[0] or basename
 
     return basename
+
+
+def count_lines(filename):
+    from magic import from_file
+    compressed_mimetypes = [
+        "application/x-compress",
+        "application/x-bzip2",
+        "application/x-gzip"
+        ]
+    mime_type = from_file(fname, mime=True)
+    if mime_type in compressed_mimetypes:
+        catcommand = 'gzip -dc'
+    else:
+        catcommand = 'cat'
+    out, err = run_pipe([
+        '%s %s' % (catcommand, fname),
+        'wc -l'
+        ])
+    return int(out)
diff --git a/workflow/tests/test_experiment_design.py b/workflow/tests/test_experiment_design.py
index 508dbb1..bfbd3c0 100644
--- a/workflow/tests/test_experiment_design.py
+++ b/workflow/tests/test_experiment_design.py
@@ -30,7 +30,10 @@ def test_check_update_controls_tag(design_tag):
 
 
 def test_experiment_design_single_end():
-    assert os.path.exists(os.path.join(test_output_path, 'ENCSR238SGC.tsv'))
+    design_file = os.path.join(test_output_path, 'ENCSR238SGC.tsv')
+    assert os.path.exists(design_file)
+    design_df = pd.read_csv(design_file, sep="\t")
+    assert design_df.shape[0] == 2
 
 
 def test_experiment_design_paired_end():
diff --git a/workflow/tests/test_pool_and_psuedoreplicate.py b/workflow/tests/test_pool_and_psuedoreplicate.py
new file mode 100644
index 0000000..fa09952
--- /dev/null
+++ b/workflow/tests/test_pool_and_psuedoreplicate.py
@@ -0,0 +1,67 @@
+#!/usr/bin/env python3
+
+import pytest
+import pandas as pd
+from io import StringIO
+import pool_and_psuedoreplicate
+
+test_output_path = os.path.dirname(os.path.abspath(__file__)) + \
+                '/../output/design/'
+
+DESIGN_STRING = """sample_id\ttag_align\txcor\tbiosample\tfactor\ttreatment\treplicate\tcontrol_tag_align
+A_1\tA_1.bedse.gz\tA_1.cc.qc\tLiver\tH3K27ac\tNone\t1\tB_1.bedse.gz
+A_2\tA_2.bedse.gz\tA_2.cc.qc\tLiver\tH3K27ac\tNone\t2\tB_2.bedse.gz
+"""
+
+
+@pytest.fixture
+def design_experiment():
+    design_file = StringIO(DESIGN_STRING)
+    design_df = pd.read_csv(design_file, sep="\t")
+    return design_df
+
+
+@pytest.fixture
+def design_experiment_2(design_experiment):
+    # Drop Replicate A_2
+    design_df = design_experiment.drop(design_experiment.index[1])
+    return design_df
+
+
+@pytest.fixture
+def design_experiment_3(design_experiment):
+    # Update second control to be same as first
+    design_experiment.loc[1, 'control_tag_align'] = 'B_1.bedse.gz'
+    return design_experiment
+
+
+def test_check_replicates(design_experiment):
+    no_reps = pool_and_psuedoreplicate.check_replicates(design_experiment)
+    assert no_reps == 2
+
+
+def test_check_replicates_single(design_experiment_2):
+    no_reps = pool_and_psuedoreplicate.check_replicates(design_experiment_2)
+    assert no_reps == 1
+
+
+def test_check_controls(design_experiment):
+    no_controls = pool_and_psuedoreplicate.check_controls(design_experiment)
+    assert no_controls == 2
+
+
+def test_check_controls_single(design_experiment_3):
+    no_controls = pool_and_psuedoreplicate.check_controls(design_experiment_3)
+    assert no_controls == 1
+
+
+def test_pool_and_psuedoreplicate_single_end():
+    design_file = os.path.join(test_output_path, 'ENCSR238SGC_ppr.tsv')
+    assert os.path.exists(design_file)
+    design_df = pd.read_csv(design_file, sep="\t")
+    assert design_df.shape[0] == 5
+
+
+def test_experiment_design_paired_end():
+    # Do the same thing for paired end data
+    pass
-- 
GitLab