diff --git a/workflow/rna-seq.nf b/workflow/rna-seq.nf index e695f5a4e4efb25a57fbeda60ae548a62d9d6780..df26b7a652e430d8856ff84a744eecb4e0fe93cb 100644 --- a/workflow/rna-seq.nf +++ b/workflow/rna-seq.nf @@ -151,7 +151,7 @@ Development : ${params.dev} """ /* - * splitData: split bdbag files by replicate so fetch can occure in parallel, and rename files to replicate rid + * getBag: download input bag */ process getBag { tag "${repRID}" @@ -206,7 +206,7 @@ inputBag.into { } /* - * getData: fetch study files from consortium with downloaded bdbag.zip + * getData: fetch replicate files from consortium with downloaded bdbag.zip */ process getData { tag "${repRID}" @@ -221,6 +221,7 @@ process getData { path ("**/File.csv") into fileMeta path ("**/Experiment Settings.csv") into experimentSettingsMeta path ("**/Experiment.csv") into experimentMeta + path "fastqCount.csv" into fastqCount_fl script: """ @@ -246,9 +247,18 @@ process getData { echo -e "LOG: fetching replicate bdbag" >> ${repRID}.getData.log sh ${script_bdbagFetch} \${replicate::-13} ${repRID} echo -e "LOG: fetched" >> ${repRID}.getData.log + + fastqCount=\$(ls *.fastq.gz | wc -l) + echo -e \${fastqCount} > fastqCount.csv """ } +// Split fastq count into channel +fastqCount = Channel.create() +fastqCount_fl.splitCsv(sep: ",", header: false).separate( + fastqCount +) + // Set raw fastq to downloaded or forced input and replicate them for multiple process inputs if (fastqsForce != "") { Channel @@ -274,9 +284,11 @@ process parseMetadata { path file from fileMeta path experimentSettings, stageAs: "ExperimentSettings.csv" from experimentSettingsMeta path experiment from experimentMeta + val fastqCount output: path "design.csv" into metadata_fl + path "fastqCountError.csv" into fastqCountError_fl script: """ @@ -323,8 +335,28 @@ process parseMetadata { fi echo -e "LOG: read length metadata parsed: \${readLength}" >> ${repRID}.parseMetadata.log + # check not incorrect number of fastqs + fastqCountError=false + fastqCountError_details="" + if [ ${fastqCount} > 2 ] + then + fastqCountError=true + fastqCountError_details="Too many fastqs detected (>2)" + elif [ "\${endsMeta}"" == "Single Read" ] && [ ${fastqCount} != 1 ] + then + fastqCountError=true + fastqCountError_details="Number of fastqs detected does not match submitted endness" + elif [ "\${endsMeta}"" == "Paired End" ] && [ ${fastqCount} != 2 ] + then + fastqCountError=true + fastqCountError_details="Number of fastqs detected does not match submitted endness" + fi + # save design file echo -e "\${endsMeta},\${endsManual},\${stranded},\${spike},\${species},\${readLength},\${exp},\${study}" > design.csv + + # save fastq count error file + echo -e "\${fastqCountError},\${fastqCountError_details}" > fastqCountError.csv """ } @@ -386,6 +418,30 @@ expRID.into { expRID_uploadProcessedFile } +// Split fastq count error into separate channel +fastqCountError = Channel.create() +fastqCountError_details = Channel.create() +fastqCountError_fl.splitCsv(sep: ",", header: false).separate( + fastqCountError, + fastqCountError_details +) + +// Replicate errors for multiple process inputs +fastqCountError.into { + fastqCountError_getRef + fastqCountError_alignData + fastqCountError_dedupData + fastqCountError_makeBigWig + fastqCountError_countData + fastqCountError_fastqc + fastqCountError_dataQC + fastqCountError_aggrQC + fastqCountError_uploadQC + fastqCountError_uploadProcessedFile + fastqCountError_uploadOutputBag + fastqCountError_finalizeExecutionRun +} + /* * trimData: trims any adapter or non-host sequences from the data */ @@ -879,7 +935,7 @@ checkMetadata_fl.splitCsv(sep: ",", header: false).separate( pipelineError_species ) -// Replicate errors for multiple process inputs +// Replicate errors for multiple process inputs pipelineError.into { pipelineError_getRef pipelineError_alignData @@ -1054,12 +1110,14 @@ process getRef { path credential, stageAs: "credential.json" from deriva_getRef val spike from spikeInfer_getRef val species from speciesInfer_getRef + val fastqCountError_getRef val pipelineError_getRef output: tuple path ("hisat2", type: 'dir'), path ("*.bed"), path ("*.fna"), path ("*.gtf"), path ("geneID.tsv"), path ("Entrez.tsv") into reference when: + fastqCountError_getRef == "false" pipelineError_getRef == "false" script: @@ -1148,6 +1206,7 @@ process alignData { path reference_alignData val ends from endsInfer_alignData val stranded from strandedInfer_alignData + val fastqCountError_alignData val pipelineError_alignData output: @@ -1155,6 +1214,7 @@ process alignData { path ("*.alignSummary.txt") into alignQC when: + fastqCountError_alignData == "false" pipelineError_alignData == "false" script: @@ -1219,6 +1279,7 @@ process dedupData { input: tuple path (bam), path (bai) from rawBam_dedupData + val fastqCountError_dedupData val pipelineError_dedupData output: @@ -1227,6 +1288,7 @@ process dedupData { path ("*.deduped.Metrics.txt") into dedupQC when: + fastqCountError_dedupData == 'false' pipelineError_dedupData == 'false' script: @@ -1272,12 +1334,14 @@ process makeBigWig { input: tuple path (bam), path (bai) from dedupBam_makeBigWig + val fastqCountError_makeBigWig val pipelineError_makeBigWig output: path ("${repRID}_sorted.deduped.bw") into bigwig when: + fastqCountError_makeBigWig == 'false' pipelineError_makeBigWig == 'false' script: @@ -1306,6 +1370,7 @@ process countData { path ref from reference_countData val ends from endsInfer_countData val stranded from strandedInfer_countData + val fastqCountError_countData val pipelineError_countData output: @@ -1314,6 +1379,7 @@ process countData { path ("assignedReads.csv") into assignedReadsInfer_fl when: + fastqCountError_countData == 'false' pipelineError_countData == 'false' script: @@ -1381,6 +1447,7 @@ process fastqc { input: path (fastq) from fastqs_fastqc + val fastqCountError_fastqc val pipelineError_fastqc output: @@ -1388,6 +1455,7 @@ process fastqc { path ("rawReads.csv") into rawReadsInfer_fl when: + fastqCountError_fastqc == 'false' pipelineError_fastqc == 'false' script: @@ -1428,6 +1496,7 @@ process dataQC { tuple path (bam), path (bai) from dedupBam_dataQC tuple path (chrBam), path (chrBai) from dedupChrBam val ends from endsInfer_dataQC + val fastqCountError_dataQC val pipelineError_dataQC output: @@ -1436,6 +1505,7 @@ process dataQC { path "${repRID}_insertSize.inner_distance_freq.txt" into innerDistance when: + fastqCountError_dataQC == 'false' pipelineError_dataQC == 'false' script: @@ -1512,6 +1582,7 @@ process aggrQC { val tinMedI from tinMedInfer val studyRID from studyRID_aggrQC val expRID from expRID_aggrQC + val fastqCountError_aggrQC val pipelineError_aggrQC output: @@ -1519,6 +1590,7 @@ process aggrQC { path "${repRID}.multiqc_data.json" into multiqcJSON when: + fastqCountError_aggrQC == 'false' pipelineError_aggrQC == 'false' script: @@ -1603,6 +1675,7 @@ process uploadQC { val length from readLengthInfer_uploadQC val rawCount from rawReadsInfer_uploadQC val finalCount from assignedReadsInfer_uploadQC + val fastqCountError_uploadQC val pipelineError_uploadQC output: @@ -1610,6 +1683,7 @@ process uploadQC { when: upload + fastqCountError_uploadQC == 'false' pipelineError_uploadQC == 'false' script: @@ -1673,6 +1747,7 @@ process uploadProcessedFile { val studyRID from studyRID_uploadProcessedFile val expRID from expRID_uploadProcessedFile val executionRunRID from executionRunRID_uploadProcessedFile + val fastqCountError_uploadProcessedFile val pipelineError_uploadProcessedFile output: @@ -1680,6 +1755,7 @@ process uploadProcessedFile { when: upload + fastqCountError_uploadProcessedFile == 'false' pipelineError_uploadProcessedFile == 'false' script: @@ -1759,6 +1835,7 @@ process uploadOutputBag { path outputBag val studyRID from studyRID_uploadOutputBag val executionRunRID from executionRunRID_uploadOutputBag + val fastqCountError_uploadOutputBag val pipelineError_uploadOutputBag output: @@ -1766,6 +1843,7 @@ process uploadOutputBag { when: upload + fastqCountError_uploadOutputBag == 'false' pipelineError_uploadOutputBag == 'false' script: @@ -1805,12 +1883,11 @@ process uploadOutputBag { } // Extract output bag RID into channel -outputBagRID = Channel.value() -outputBagRID_temp = Channel.create() +outputBagRID = Channel.create() outputBagRID_fl.splitCsv(sep: ",", header: false).separate( - outputBagRID_temp + outputBagRID ) -outputBagRID = outputBagRID_temp +outputBagRID.ifEmpty(false) /* * finalizeExecutionRun: finalizes the execution run @@ -1832,6 +1909,8 @@ process finalizeExecutionRun { val strandedInfer from strandedInfer_finalizeExecutionRun val spikeInfer from spikeInfer_finalizeExecutionRun val speciesInfer from speciesInfer_finalizeExecutionRun + val fastqCountError from fastqCountError_finalizeExecutionRun + val fastqCountError_details val pipelineError from pipelineError_finalizeExecutionRun val pipelineError_ends val pipelineError_stranded @@ -1853,11 +1932,16 @@ process finalizeExecutionRun { cookie=\$(cat credential.json | grep -A 1 '\\"${source}\\": {' | grep -o '\\"cookie\\": \\".*\\"') cookie=\${cookie:11:-1} - if [ ${pipelineError} == false ] + if [ ${fastqCountError} == false ] && [ ${pipelineError} == false ] then rid=\$(python3 ${script_uploadExecutionRun_finalizeExecutionRun} -r ${repRID} -w \${workflow} -g \${genome} -i ${inputBagRID} -s Success -d 'Run Successful' -o ${source} -c \${cookie} -u ${executionRunRID}) echo LOG: execution run RID marked as successful - \${rid} >> ${repRID}.finalizeExecutionRun.log - else + elif [ ${fastqCountError} == true ] + then + rid=\$(python3 ${script_uploadExecutionRun_finalizeExecutionRun} -r ${repRID} -w \${workflow} -g \${genome} -i ${inputBagRID} -s Error -d "${fastqCountError_details}" -o ${source} -c \${cookie} -u ${executionRunRID}) + echo LOG: execution run RID marked as error - \${rid} >> ${repRID}.finalizeExecutionRun.log + elif [ ${pipelineError} == true ] + then pipelineError_details=\$(echo "**Submitted metadata does not match infered:** ") if ${pipelineError_ends} then