diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index cd792d6d1065cdaef00a8785cbb78a30f48409cc..6ea53a7f0dc2a8265a0a93ba8cfd4ef0dd9afc1b 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -38,11 +38,23 @@ parseMetadata: trimData: stage: unit script: - - if [ `nproc` -gt 8 ]; then ncore=8; else ncore=`nproc`; fi - - singularity run 'docker://bicf/trimgalore:1.1' trim_galore --gzip -q 25 --illumina --length 35 --basename 16-1ZX4 -j ${ncore} ./test_data/fastq/16-1ZX4.R1.fastq.gz - - singularity run 'docker://bicf/trimgalore:1.1' trim_galore --gzip -q 25 --illumina --length 35 --paired --basename Q-Y5JA -j ${ncore} ./test_data/fastq/Q-Y5JA.R1.fastq.gz ./test_data/fastq/Q-Y5JA.R2.fastq.gz + - singularity run 'docker://bicf/trimgalore:1.1' trim_galore --gzip -q 25 --illumina --length 35 --basename 16-1ZX4 -j `nproc` ./test_data/fastq/16-1ZX4.R1.fastq.gz + - singularity run 'docker://bicf/trimgalore:1.1' trim_galore --gzip -q 25 --illumina --length 35 --paired --basename Q-Y5JA -j `nproc` ./test_data/fastq/Q-Y5JA.R1.fastq.gz ./test_data/fastq/Q-Y5JA.R2.fastq.gz - pytest -m trimData +alignReads: + stage: unit + script: + - singularity run 'docker://bicf/gudmaprbkaligner:2.0.0' hisat2 -p `nproc` --add-chrname --un-gz Q-Y5JA.unal.gz -S Q-Y5JA.sam -x /project/BICF/BICF_Core/shared/gudmap/references/GRCh38.p12/hisat2/genome --rna-strandness FR --no-mixed --no-discordant -1 ./test_data/fastq/Q-Y5JA_R1_val_1.fq.gz -2 ./test_data/fastq/Q-Y5JA_R2_val_2.fq.gz + - singularity run 'docker://bicf/gudmaprbkaligner:2.0.0' samtools view -1 -@ `nproc` -F 4 -F 8 -F 256 -o Q-Y5JA.bam Q-Y5JA.sam + - singularity run 'docker://bicf/gudmaprbkaligner:2.0.0' samtools sort -@ `nproc` -O BAM -o Q-Y5JA.sorted.bam Q-Y5JA.bam + - singularity run 'docker://bicf/gudmaprbkaligner:2.0.0' samtools index -@ `nproc` -b Q-Y5JA.sorted.bam Q-Y5JA.sorted.bai + - singularity run 'docker://bicf/gudmaprbkaligner:2.0.0' hisat2 -p `nproc` --add-chrname --un-gz 16-1ZX4.unal.gz -S 16-1ZX4.sam -x /project/BICF/BICF_Core/shared/gudmap/references/GRCh38.p12/hisat2/genome --rna-strandness FR -U ./test_data/fastq/16-1ZX4_trimmed.fq.gz + - singularity run 'docker://bicf/gudmaprbkaligner:2.0.0' samtools view -1 -@ `nproc` -F 4 -F 8 -F 256 -o 16-1ZX4.bam 16-1ZX4.sam + - singularity run 'docker://bicf/gudmaprbkaligner:2.0.0' samtools sort -@ `nproc` -O BAM -o 16-1ZX4.sorted.bam 16-1ZX4.bam + - singularity run 'docker://bicf/gudmaprbkaligner:2.0.0' samtools index -@ `nproc` -b 16-1ZX4.sorted.bam 16-1ZX4.sorted.bai + - pytest -m alignData + integration_se: stage: integration script: @@ -51,4 +63,4 @@ integration_se: integration_pe: stage: integration script: - - nextflow run ./workflow/rna-seq.nf --deriva ./test_data/auth/credential.json --bdbag ./test_data/auth/cookies.txt --repRID Q-Y5JA \ No newline at end of file + - nextflow run ./workflow/rna-seq.nf --deriva ./test_data/auth/credential.json --bdbag ./test_data/auth/cookies.txt --repRID Q-Y5JA diff --git a/CHANGELOG.md b/CHANGELOG.md index 2dec9e320556503056f41874817d032ad618f3af..9345688f2a0ae3ee68fb31d2947a772882c51926 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,6 @@ # v0.0.1 (in development) **User Facing** +* Raw alignment files **Background** * Implementation of CI diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 0000000000000000000000000000000000000000..c8d98f29942ac24e83b741a0e102714b44230eab --- /dev/null +++ b/pytest.ini @@ -0,0 +1,2 @@ +[pytest] +python_paths = workflow/scripts diff --git a/workflow/conf/biohpc.config b/workflow/conf/biohpc.config index 36d5b332611f5d234a692b545819ad44db8c381e..e2b1335e2a2f9a923931ad3f293cf364f689e69f 100755 --- a/workflow/conf/biohpc.config +++ b/workflow/conf/biohpc.config @@ -9,11 +9,14 @@ process { withName:getData { executor = 'local' } + withName:parseMetadata { + executor = 'local' + } withName:trimData { queue = '256GB,256GBv1,384GB' } - withName:parseMetadata { - executor = 'local' + withName:alignReads { + queue = '256GB,256GBv1,384GB' } } @@ -26,4 +29,4 @@ env { http_proxy = 'http://proxy.swmed.edu:3128' https_proxy = 'http://proxy.swmed.edu:3128' all_proxy = 'http://proxy.swmed.edu:3128' -} \ No newline at end of file +} diff --git a/workflow/nextflow.config b/workflow/nextflow.config index 7782046e10ab0c49d186141867e5964e87ea52cb..acb710d45a2036fbfb83a0b5abb7a80ce64ce3ca 100644 --- a/workflow/nextflow.config +++ b/workflow/nextflow.config @@ -17,11 +17,14 @@ process { withName:getData { container = 'bicf/gudmaprbkfilexfer:1.3' } + withName:parseMetadata { + container = 'bicf/python3:1.3' + } withName:trimData { container = 'bicf/trimgalore:1.1' } - withName:parseMetadata { - container = 'bicf/python3:1.3' + withName: alignReads { + container = 'bicf/gudmaprbkaligner:2.0.0' } } @@ -52,4 +55,4 @@ manifest { mainScript = 'rna-seq.nf' version = 'v0.0.1_indev' nextflowVersion = '>=19.09.0' -} \ No newline at end of file +} diff --git a/workflow/rna-seq.nf b/workflow/rna-seq.nf index 5a8b614af62d339d32fa6b0bd92f8f00df77d673..ff348297ea88cefa54d494cc1097f090f896fb41 100755 --- a/workflow/rna-seq.nf +++ b/workflow/rna-seq.nf @@ -5,8 +5,8 @@ params.deriva = "${baseDir}/../test_data/auth/credential.json" params.bdbag = "${baseDir}/../test_data/auth/cookies.txt" //params.repRID = "16-1ZX4" params.repRID = "Q-Y5JA" - params.outDir = "${baseDir}/../output" +params.reference = "/project/BICF/BICF_Core/shared/gudmap/references" // Parse input variables deriva = Channel @@ -16,8 +16,8 @@ bdbag = Channel .fromPath(params.bdbag) .ifEmpty { exit 1, "deriva cookie file for bdbag not found: ${params.bdbag}" } repRID = params.repRID - outDir = params.outDir +referenceBase = file ("${params.reference}") logsDir = "${outDir}/Logs" // Define fixed files @@ -28,7 +28,7 @@ script_bdbagFetch = Channel.fromPath("${baseDir}/scripts/bdbagFetch.sh") script_parseMeta = Channel.fromPath("${baseDir}/scripts/parseMeta.py") /* - * getData: get bagit file from consortium + * splitData: split bdbag files by replicate so fetch can occure in parallel, and rename files to replicate rid */ process getBag { tag "${repRID}" @@ -146,14 +146,56 @@ process parseMetadata { species=\$(python3 ${script_parseMeta} -r ${repRID} -m "${experimentMeta}" -p species) echo "LOG: species metadata parsed: \${species}" >>${repRID}.parseMetadata.err + #Set the correct Reference Directory + if [ "\${spike}" == 'yes' ]; then + if [ "\${species}" == 'Homo sapiens' ]; then + referenceDir="/project/BICF/BICF_Core/shared/gudmap/references/GRCh38.p12-S/hisat2/genome"; + echo "LOG: Referece set to Homo sapiens with spike-in." >>${repRID}.parseMetadata.err; + elif [ "\${species}" == 'Mus musculus' ]; then + referenceDir="/project/BICF/BICF_Core/shared/gudmap/references/GRCm38.P6-S/hisat2/genome"; + echo "LOG: Referece set to Mus musculus with spike-in." >>${repRID}.parseMetadata.err; + fi; + else + if [ "\${species}" == 'Homo sapiens' ]; then + referenceDir="/project/BICF/BICF_Core/shared/gudmap/references/GRCh38.p12/hisat2/genome"; + echo "LOG: Referece set to Homo sapiens without spike-in." >>${repRID}.parseMetadata.err; + elif [ "\${species}" == 'Mus musculus' ]; then + referenceDir="/project/BICF/BICF_Core/shared/gudmap/references/GRCm38.P6/hisat2/genome"; + echo "LOG: Referece set to Mus musculus without spike-in." >>${repRID}.parseMetadata.err; + fi; + fi; + # Save design file - echo "\${rep},\${endsMeta},\${endsManual},\${stranded},\${spike},\${species}" > design.csv + echo "\${rep},\${endsMeta},\${endsManual},\${stranded},\${spike},\${species},\${referenceDir}" > design.csv """ } -metadata.splitCsv(sep: ',', header: false).into { - metadata_trimData - metadata_qc +// Split metadata into separate channels +rep = Channel.create() +endsMeta = Channel.create() +endsManual = Channel.create() +stranded = Channel.create() +spike = Channel.create() +species = Channel.create() +referenceDir = Channel.create() +metadata.splitCsv(sep: ',', header: false).separate( + rep, + endsMeta, + endsManual, + stranded, + spike, + species, + referenceDir +) +endsManual.into { + endsManual_trimData + endsManual_alignReads +} +stranded.into { + stranded_alignReads +} +referenceDir.into { + referenceDir_alignReads } /* @@ -161,11 +203,11 @@ metadata.splitCsv(sep: ',', header: false).into { */ process trimData { tag "${repRID}" - publishDir "${logsDir}", mode: 'copy', pattern: "\${repRID}.trimData.*" + publishDir "${logsDir}", mode: 'copy', pattern: "${repRID}.trimData.*" input: + val endsManual_trimData file(fastq) from fastqs - tuple val(rep), val(endsMeta), val(endsManual), val(stranded), val(spike), val(species) from metadata_trimData output: path ("*.fq.gz") into fastqs_trimmed @@ -178,11 +220,41 @@ process trimData { ulimit -a >>${repRID}.trimData.err # trim fastqs - if [ '${endsManual}' == 'se' ] + if [ '${endsManual_trimData}' == 'se' ] then trim_galore --gzip -q 25 --illumina --length 35 --basename ${repRID} -j `nproc` ${fastq[0]} 1>>${repRID}.trimData.log 2>>${repRID}.trimData.err; else trim_galore --gzip -q 25 --illumina --length 35 --paired --basename ${repRID} -j `nproc` ${fastq[0]} ${fastq[1]} 1>>${repRID}.trimData.log 2>>${repRID}.trimData.err; fi """ -} \ No newline at end of file +} + +/* + * alignReads: aligns the reads to a reference database +*/ +process alignReads { + tag "${repRID}" + publishDir "${outDir}/aligned", mode: "copy", pattern: "${repRID}.{unal,sorted}.{gz,bam,bai}" + publishDir "${logsDir}", mode: 'copy', pattern: "${repRID}.align.{out,err}" + + input: + val endsManual_alignReads + val stranded_alignReads + path fqs from fastqs_trimmed + val referenceDir_alignReads + + output: + set repRID, file ("${repRID}.unal.gz"), file ("${repRID}.sorted.bam"), file ("${repRID}.sorted.bai") + set repRID, file ("${repRID}.align.out"), file ("${repRID}.align.err") + + script: + """ + if [ "${endsManual_alignReads}" == 'pe' ]; then + hisat2 -p `nproc` --add-chrname --un-gz ${repRID}.unal.gz -S ${repRID}.sam -x ${referenceDir_alignReads} ${stranded_alignReads} --no-mixed --no-discordant -1 ${fqs[0]} -2 ${fqs[1]} 1>${repRID}.align.out 2>${repRID}.align.err; + else hisat2 -p `nproc` --add-chrname --un-gz ${repRID}.unal.gz -S ${repRID}.sam -x ${referenceDir_alignReads} ${stranded_alignReads} -U ${fqs[0]} 1>${repRID}.align.out 2>${repRID}.align.err; + fi; + samtools view -1 -@ `nproc` -F 4 -F 8 -F 256 -o ${repRID}.bam ${repRID}.sam 1>>${repRID}.align.out 2>>${repRID}.align.err; + samtools sort -@ `nproc` -O BAM -o ${repRID}.sorted.bam ${repRID}.bam 1>>${repRID}.align.out 2>>${repRID}.align.err; + samtools index -@ `nproc` -b ${repRID}.sorted.bam ${repRID}.sorted.bai 1>>${repRID}.align.out 2>>${repRID}.align.err; + """ +} diff --git a/workflow/scripts/parseMeta.py b/workflow/scripts/parseMeta.py index 43ca2392078171ec9a1f42f7f9a83d13d0f0383b..4797610f870f8e8f22024e0198dc0255c0829073 100644 --- a/workflow/scripts/parseMeta.py +++ b/workflow/scripts/parseMeta.py @@ -17,6 +17,7 @@ def get_args(): def main(): args = get_args() metaFile = pd.read_csv(args.metaFile,sep=",",header=0) + endsManual = "" # Check replicate RID metadata from 'File.csv' if (args.parameter == "repRID"): @@ -54,9 +55,12 @@ def main(): # Get strandedness metadata from 'Experiment Settings.csv' if (args.parameter == "stranded"): if (metaFile.Has_Strand_Specific_Information.unique() == "yes"): - stranded = "stranded" + if endsManual == "se": + stranded = "--rna-strandness F" + else: + stranded = "--rna-strandness FR" elif (metaFile.Has_Strand_Specific_Information.unique() == "no"): - stranded = "unstranded" + stranded = "" else: print("Stranded metadata not match expected options: " + metaFile.Has_Strand_Specific_Information.unique()) exit(1) @@ -85,4 +89,4 @@ def main(): print(species) if __name__ == '__main__': - main() \ No newline at end of file + main() diff --git a/workflow/scripts/utils.py b/workflow/scripts/utils.py new file mode 100644 index 0000000000000000000000000000000000000000..5e4478e3b1cef96e9b4c05033f75122f87aad06e --- /dev/null +++ b/workflow/scripts/utils.py @@ -0,0 +1,136 @@ +#!/usr/bin/env python3 + +# +# * -------------------------------------------------------------------------- +# * Licensed under MIT (https://git.biohpc.swmed.edu/BICF/Astrocyte/chipseq_analysis/LICENSE.md) +# * -------------------------------------------------------------------------- +# + +'''General utilities.''' + + +import shlex +import logging +import subprocess +import sys +import os + + +logger = logging.getLogger(__name__) +logger.addHandler(logging.NullHandler()) +logger.propagate = True + + +def run_pipe(steps, outfile=None): + from subprocess import Popen, PIPE + p = None + p_next = None + first_step_n = 1 + last_step_n = len(steps) + for n, step in enumerate(steps, start=first_step_n): + logger.debug("step %d: %s" % (n, step)) + if n == first_step_n: + if n == last_step_n and outfile: # one-step pipeline with outfile + with open(outfile, 'w') as fh: + print("one step shlex: %s to file: %s" % (shlex.split(step), outfile)) + p = Popen(shlex.split(step), stdout=fh) + break + print("first step shlex to stdout: %s" % (shlex.split(step))) + + p = Popen(shlex.split(step), stdout=PIPE) + elif n == last_step_n and outfile: # only treat the last step specially if you're sending stdout to a file + with open(outfile, 'w') as fh: + print("last step shlex: %s to file: %s" % (shlex.split(step), outfile)) + p_last = Popen(shlex.split(step), stdin=p.stdout, stdout=fh) + p.stdout.close() + p = p_last + else: # handles intermediate steps and, in the case of a pipe to stdout, the last step + print("intermediate step %d shlex to stdout: %s" % (n, shlex.split(step))) + p_next = Popen(shlex.split(step), stdin=p.stdout, stdout=PIPE) + p.stdout.close() + p = p_next + out, err = p.communicate() + return out, err + + +def block_on(command): + process = subprocess.Popen(shlex.split(command), stderr=subprocess.STDOUT, stdout=subprocess.PIPE) + for line in iter(process.stdout.readline, b''): + sys.stdout.write(line.decode('utf-8')) + process.communicate() + return process.returncode + + +def strip_extensions(filename, extensions): + '''Strips extensions to get basename of file.''' + + basename = filename + for extension in extensions: + basename = basename.rpartition(extension)[0] or basename + + return basename + + +def count_lines(filename): + import mimetypes + compressed_mimetypes = [ + "compress", + "bzip2", + "gzip" + ] + mime_type = mimetypes.guess_type(filename)[1] + if mime_type in compressed_mimetypes: + catcommand = 'gzip -dc' + else: + catcommand = 'cat' + out, err = run_pipe([ + '%s %s' % (catcommand, filename), + 'wc -l' + ]) + return int(out) + + +def rescale_scores(filename, scores_col, new_min=10, new_max=1000): + sorted_fn = 'sorted-%s' % (filename) + rescaled_fn = 'rescaled-%s' % (filename) + + out, err = run_pipe([ + 'sort -k %dgr,%dgr %s' % (scores_col, scores_col, filename), + r"""awk 'BEGIN{FS="\t";OFS="\t"}{if (NF != 0) print $0}'"""], + sorted_fn) + + out, err = run_pipe([ + 'head -n 1 %s' % (sorted_fn), + 'cut -f %s' % (scores_col)]) + max_score = float(out.strip()) + logger.info("rescale_scores: max_score = %s", max_score) + + out, err = run_pipe([ + 'tail -n 1 %s' % (sorted_fn), + 'cut -f %s' % (scores_col)]) + min_score = float(out.strip()) + logger.info("rescale_scores: min_score = %s", min_score) + + a = min_score + b = max_score + x = new_min + y = new_max + + if min_score == max_score: # give all peaks new_min + rescale_formula = "x" + else: # n is the unscaled score from scores_col + rescale_formula = "((n-a)*(y-x)/(b-a))+x" + + out, err = run_pipe( + [ + 'cat %s' % (sorted_fn), + r"""awk 'BEGIN{OFS="\t"}{n=$%d;a=%d;b=%d;x=%d;y=%d}""" + % (scores_col, a, b, x, y) + + r"""{$%d=int(%s) ; print $0}'""" + % (scores_col, rescale_formula) + ], + rescaled_fn) + + os.remove(sorted_fn) + + return rescaled_fn diff --git a/workflow/tests/test_alignReads.py b/workflow/tests/test_alignReads.py new file mode 100644 index 0000000000000000000000000000000000000000..79770bb42245df6da0ce7fa1081d78ec5aed736f --- /dev/null +++ b/workflow/tests/test_alignReads.py @@ -0,0 +1,45 @@ +#!/usr/bin/env python3 + +import pytest +import pandas as pd +import os +import utils + +data_output_path = os.path.dirname(os.path.abspath(__file__)) + \ + '/../../' +logs_output_path = os.path.dirname(os.path.abspath(__file__)) + \ + '/../../' + + +@pytest.mark.alignData +def test_alignData_se(): + assert os.path.exists(os.path.join(data_output_path, '16-1ZX4.unal.gz')) + assert utils.count_lines(os.path.join(data_output_path, '16-1ZX4.unal.gz')) == 3070528 + assert os.path.exists(os.path.join(data_output_path, '16-1ZX4.sorted.bam')) + assert os.path.exists(os.path.join(data_output_path, '16-1ZX4.sorted.bai')) + + +@pytest.mark.alignData +def test_alignData_pe(): + assert os.path.exists(os.path.join(data_output_path, 'Q-Y5JA.unal.gz')) + assert utils.count_lines(os.path.join(data_output_path, 'Q-Y5JA.unal.gz')) == 0 + assert os.path.exists(os.path.join(data_output_path, 'Q-Y5JA.sorted.bam')) + assert os.path.exists(os.path.join(data_output_path, 'Q-Y5JA.sorted.bai')) + + +@pytest.mark.alignLogs +def test_alignLogs_se(): + assert os.path.exists(os.path.join(logs_output_path, '16-1ZX4.align.err')) + assert utils.count_lines(os.path.join(logs_output_path, '16-1ZX4.align.err')) == 7 + assert '34497376 reads; of these:' in open(os.path.join(logs_output_path, '16-1ZX4.align.err')).readlines()[0] + assert os.path.exists(os.path.join(logs_output_path, '16-1ZX4.align.out')) + assert utils.count_lines(os.path.join(logs_output_path, '16-1ZX4.align.out')) == 0 + + +@pytest.mark.alignLogs +def test_alignLogs_pe(): + assert os.path.exists(os.path.join(logs_output_path, 'Q-Y5JA.align.err')) + assert utils.count_lines(os.path.join(logs_output_path, 'Q-Y5JA.align.err')) == 7 + assert '15824858 reads; of these:' in open(os.path.join(logs_output_path, 'Q-Y5JA.align.err')).readlines()[0] + assert os.path.exists(os.path.join(logs_output_path, 'Q-Y5JA.align.out')) + assert utils.count_lines(os.path.join(logs_output_path, 'Q-Y5JA.align.out')) == 0