diff --git a/workflow/rna-seq.nf b/workflow/rna-seq.nf index 7b5853661356aed5de3d237fd15a750c1d892447..2c6d10929b5ef531c72473ed3dc3fb23ca735f7e 100644 --- a/workflow/rna-seq.nf +++ b/workflow/rna-seq.nf @@ -47,6 +47,7 @@ deriva.into { deriva_uploadProcessedFile deriva_uploadOutputBag deriva_finalizeExecutionRun + deriva_failPreExecutionRun deriva_failExecutionRun } bdbag = Channel @@ -97,6 +98,7 @@ script_tinHist = Channel.fromPath("${baseDir}/scripts/tin_hist.py") script_uploadInputBag = Channel.fromPath("${baseDir}/scripts/upload_input_bag.py") script_uploadExecutionRun_uploadExecutionRun = Channel.fromPath("${baseDir}/scripts/upload_execution_run.py") script_uploadExecutionRun_finalizeExecutionRun = Channel.fromPath("${baseDir}/scripts/upload_execution_run.py") +script_uploadExecutionRun_failPreExecutionRun = Channel.fromPath("${baseDir}/scripts/upload_execution_run.py") script_uploadExecutionRun_failExecutionRun = Channel.fromPath("${baseDir}/scripts/upload_execution_run.py") script_uploadQC = Channel.fromPath("${baseDir}/scripts/upload_qc.py") script_uploadOutputBag = Channel.fromPath("${baseDir}/scripts/upload_output_bag.py") @@ -271,6 +273,7 @@ if (fastqsForce != "") { } } else { fastqs.set { + fastqs_parseMetadata fastqs_trimData } } @@ -286,11 +289,11 @@ process parseMetadata { path file from fileMeta path experimentSettings, stageAs: "ExperimentSettings.csv" from experimentSettingsMeta path experiment from experimentMeta - val fastqCount + path (fastq) from fastqs_parseMetadata output: path "design.csv" into metadata_fl - path "fastqCountError.csv" into fastqCountError_fl + path "fastqError.csv" into fastqError_fl script: """ @@ -367,11 +370,23 @@ process parseMetadata { fastqCountError_details="Number of fastqs detected does not match submitted endness" fi + # check read counts match for fastqs + fastqReadError=false + fastqReadError_details="" + if [ "\${endsMeta}" == "pe" ] + then + r1Count=\$(echo \$(zcat ${fastq[0]} | wc -l)/4 | bc) + r2Count=\$(echo \$(zcat ${fastq[1]} | wc -l)/4 | bc) + if [ \${r1Count} -ne \${r2Count} ] + then + fastqReadError=true + fastqReadError_details="Number of reads do not match for R1 or R2, there may be a trunkation or mismatch of fastq files" + fi # save design file echo -e "\${endsMeta},\${endsRaw},\${endsManual},\${stranded},\${spike},\${species},\${readLength},\${exp},\${study}" > design.csv - # save fastq count error file - echo -e "\${fastqCountError},\${fastqCountError_details}" > fastqCountError.csv + # save fastq error file + echo -e "\${fastqCountError},\${fastqCountError_details},\${fastqReadError},\${fastqReadError_details}" > fastqError.csv """ } @@ -417,11 +432,13 @@ strandedMeta.into { spikeMeta.into { spikeMeta_checkMetadata spikeMeta_aggrQC + spikeMeta_failPreExecutionRun spikeMeta_failExecutionRun } speciesMeta.into { speciesMeta_checkMetadata speciesMeta_aggrQC + speciesMeta_failPreExecutionRun speciesMeta_failExecutionRun } studyRID.into { @@ -438,13 +455,24 @@ expRID.into { // Split fastq count error into separate channel fastqCountError = Channel.create() fastqCountError_details = Channel.create() -fastqCountError_fl.splitCsv(sep: ",", header: false).separate( +fastqReadError = Channel.create() +fastqReadError_details = Channel.create() +fastqError_fl.splitCsv(sep: ",", header: false).separate( fastqCountError, - fastqCountError_details + fastqCountError_details, + fastqReadError, + fastqReadError_details ) // Replicate errors for multiple process inputs fastqCountError.into { + fastqCountError_trimData + fastqCountError_getRefInfer + fastqCountError_downsampleData + fastqCountError_alignSampleData + fastqCountError_inferMetadata + fastqCountError_checkMetadata + fastqCountError_uploadExecutionRun fastqCountError_getRef fastqCountError_alignData fastqCountError_dedupData @@ -456,7 +484,28 @@ fastqCountError.into { fastqCountError_uploadQC fastqCountError_uploadProcessedFile fastqCountError_uploadOutputBag - fastqCountError_failExecutionRun + fastqCountError_failPreExecutionRun +} +fastqReadError.into { + fastqReadError_trimData + fastqReadError_getRefInfer + fastqReadError_downsampleData + fastqReadError_alignSampleData + fastqReadError_inferMetadata + fastqReadError_checkMetadata + fastqReadError_uploadExecutionRun + fastqReadError_getRef + fastqReadError_alignData + fastqReadError_dedupData + fastqReadError_makeBigWig + fastqReadError_countData + fastqReadError_fastqc + fastqReadError_dataQC + fastqReadError_aggrQC + fastqReadError_uploadQC + fastqReadError_uploadProcessedFile + fastqReadError_uploadOutputBag + fastqReadError_failPreExecutionRun } /* @@ -468,6 +517,8 @@ process trimData { input: path (fastq) from fastqs_trimData val ends from endsManual_trimData + val fastqCountError_trimData + val fastqReadError_trimData output: path ("*.fq.gz") into fastqsTrim @@ -475,6 +526,10 @@ process trimData { path ("*_trimming_report.txt") into trimQC path ("readLength.csv") into readLengthInfer_fl + when: + fastqCountError_trimData == "false" + fastqReadError_trimData == "false" + script: """ hostname > ${repRID}.trimData.log @@ -527,11 +582,17 @@ process getRefInfer { input: tuple val (refName), path (credential, stageAs: "credential.json"), path (script_refDataInfer) from getRefInferInput + val fastqCountError_getRefInfer + val fastqReadError_getRefInfer output: tuple val (refName), path ("hisat2", type: 'dir'), path ("*.fna"), path ("*.gtf") into refInfer path ("${refName}", type: 'dir') into bedInfer + when: + fastqCountError_getRefInfer == "false" + fastqReadError_getRefInfer == "false" + script: """ hostname > ${repRID}.${refName}.getRefInfer.log @@ -606,11 +667,17 @@ process downsampleData { input: path fastq from fastqsTrim_downsampleData val ends from endsManual_downsampleData + val fastqCountError_downsampleData + val fastqReadError_downsampleData output: path ("sampled.1.fq") into fastqs1Sample path ("sampled.2.fq") into fastqs2Sample + when: + fastqCountError_downsampleData == "false" + fastqReadError_downsampleData == "false" + script: """ hostname > ${repRID}.downsampleData.log @@ -643,12 +710,18 @@ process alignSampleData { input: tuple val (ends), val (ref), path (hisat2), path (fna), path (gtf), path (fastq1), path (fastq2) from inferInput + val fastqCountError_alignSampleData + val fastqReadError_alignSampleData output: path ("${ref}.sampled.sorted.bam") into sampleBam path ("${ref}.sampled.sorted.bam.bai") into sampleBai path ("${ref}.alignSampleSummary.txt") into alignSampleQC + when: + fastqCountError_alignSampleData == "false" + fastqReadError_alignSampleData == "false" + script: """ hostname > ${repRID}.${ref}.alignSampleData.log @@ -694,11 +767,17 @@ process inferMetadata { path bam from sampleBam.collect() path bai from sampleBai.collect() path alignSummary from alignSampleQC_inferMetadata.collect() + val fastqCountError_inferMetadata + val fastqReadError_inferMetadata output: path "infer.csv" into inferMetadata_fl path "${repRID}.infer_experiment.txt" into inferExperiment + when: + fastqCountError_inferMetadata == "false" + fastqReadError_inferMetadata == "false" + script: """ hostname > ${repRID}.inferMetadata.log @@ -867,11 +946,17 @@ process checkMetadata { val strandedInfer from strandedInfer_checkMetadata val spikeInfer from spikeInfer_checkMetadata val speciesInfer from speciesInfer_checkMetadata + val fastqCountError_checkMetadata + val fastqReadError_checkMetadata output: path ("check.csv") into checkMetadata_fl path ("outputBagRID.csv") optional true into outputBagRID_fl_dummy + when: + fastqCountError_checkMetadata == "false" + fastqReadError_checkMetadata == "false" + script: """ hostname > ${repRID}.checkMetadata.log @@ -1049,12 +1134,16 @@ process uploadExecutionRun { val spike from spikeInfer_uploadExecutionRun val species from speciesInfer_uploadExecutionRun val inputBagRID from inputBagRID_uploadExecutionRun + val fastqCountError_uploadExecutionRun + val fastqReadError_uploadExecutionRun output: path ("executionRunRID.csv") into executionRunRID_fl when: upload + fastqCountError_uploadExecutionRun == "false" + fastqReadError_uploadExecutionRun == "false" script: """ @@ -1132,6 +1221,7 @@ process getRef { val spike from spikeInfer_getRef val species from speciesInfer_getRef val fastqCountError_getRef + val fastqReadError_getRef val pipelineError_getRef output: @@ -1139,6 +1229,7 @@ process getRef { when: fastqCountError_getRef == "false" + fastqReadError_getRef == "false" pipelineError_getRef == "false" script: @@ -1228,6 +1319,7 @@ process alignData { val ends from endsInfer_alignData val stranded from strandedInfer_alignData val fastqCountError_alignData + val fastqReadError_alignData val pipelineError_alignData output: @@ -1236,6 +1328,7 @@ process alignData { when: fastqCountError_alignData == "false" + fastqReadError_alignData == "false" pipelineError_alignData == "false" script: @@ -1301,6 +1394,7 @@ process dedupData { input: tuple path (bam), path (bai) from rawBam_dedupData val fastqCountError_dedupData + val fastqReadError_dedupData val pipelineError_dedupData output: @@ -1310,6 +1404,7 @@ process dedupData { when: fastqCountError_dedupData == 'false' + fastqReadError_dedupData == 'false' pipelineError_dedupData == 'false' script: @@ -1356,6 +1451,7 @@ process makeBigWig { input: tuple path (bam), path (bai) from dedupBam_makeBigWig val fastqCountError_makeBigWig + val fastqReadError_makeBigWig val pipelineError_makeBigWig output: @@ -1363,6 +1459,7 @@ process makeBigWig { when: fastqCountError_makeBigWig == 'false' + fastqReadError_makeBigWig == 'false' pipelineError_makeBigWig == 'false' script: @@ -1392,6 +1489,7 @@ process countData { val ends from endsInfer_countData val stranded from strandedInfer_countData val fastqCountError_countData + val fastqReadError_countData val pipelineError_countData output: @@ -1401,6 +1499,7 @@ process countData { when: fastqCountError_countData == 'false' + fastqReadError_countData == 'false' pipelineError_countData == 'false' script: @@ -1469,6 +1568,7 @@ process fastqc { input: path (fastq) from fastqs_fastqc val fastqCountError_fastqc + val fastqReadError_fastqc val pipelineError_fastqc output: @@ -1477,6 +1577,7 @@ process fastqc { when: fastqCountError_fastqc == 'false' + fastqReadError_fastqc == 'false' pipelineError_fastqc == 'false' script: @@ -1518,6 +1619,7 @@ process dataQC { tuple path (chrBam), path (chrBai) from dedupChrBam val ends from endsInfer_dataQC val fastqCountError_dataQC + val fastqReadError_dataQC val pipelineError_dataQC output: @@ -1527,6 +1629,7 @@ process dataQC { when: fastqCountError_dataQC == 'false' + fastqReadError_dataQC == 'false' pipelineError_dataQC == 'false' script: @@ -1604,6 +1707,7 @@ process aggrQC { val studyRID from studyRID_aggrQC val expRID from expRID_aggrQC val fastqCountError_aggrQC + val fastqReadError_aggrQC val pipelineError_aggrQC output: @@ -1612,6 +1716,7 @@ process aggrQC { when: fastqCountError_aggrQC == 'false' + fastqReadError_aggrQC == 'false' pipelineError_aggrQC == 'false' script: @@ -1697,6 +1802,7 @@ process uploadQC { val rawCount from rawReadsInfer_uploadQC val finalCount from assignedReadsInfer_uploadQC val fastqCountError_uploadQC + val fastqReadError_uploadQC val pipelineError_uploadQC output: @@ -1705,6 +1811,7 @@ process uploadQC { when: upload fastqCountError_uploadQC == 'false' + fastqReadError_uploadQC == 'false' pipelineError_uploadQC == 'false' script: @@ -1769,6 +1876,7 @@ process uploadProcessedFile { val expRID from expRID_uploadProcessedFile val executionRunRID from executionRunRID_uploadProcessedFile val fastqCountError_uploadProcessedFile + val fastqReadError_uploadProcessedFile val pipelineError_uploadProcessedFile output: @@ -1777,6 +1885,7 @@ process uploadProcessedFile { when: upload fastqCountError_uploadProcessedFile == 'false' + fastqReadError_uploadProcessedFile == 'false' pipelineError_uploadProcessedFile == 'false' script: @@ -1857,6 +1966,7 @@ process uploadOutputBag { val studyRID from studyRID_uploadOutputBag val executionRunRID from executionRunRID_uploadOutputBag val fastqCountError_uploadOutputBag + val fastqReadError_uploadOutputBag val pipelineError_uploadOutputBag output: @@ -1865,6 +1975,7 @@ process uploadOutputBag { when: upload fastqCountError_uploadOutputBag == 'false' + fastqReadError_uploadOutputBag == 'false' pipelineError_uploadOutputBag == 'false' script: @@ -1942,6 +2053,89 @@ process finalizeExecutionRun { """ } +/* + * failPreExecutionRun: fail the execution run prematurely +*/ +process failPreExecutionRun { + tag "${repRID}" + + input: + path script_uploadExecutionRun_failPreExecutionRun + path credential, stageAs: "credential.json" from deriva_failPreExecutionRun + val spike from spikeMeta_failPreExecutionRun + val species from speciesMeta_failPreExecutionRun + val inputBagRID from inputBagRID_failExecutionRun + val fastqCountError from fastqCountError_failExecutionRun + val fastqCountError_details + val fastqReadError from fastqReadError_failPreExecutionRun + val fastqReadError_details + when: + upload + fastqCountError == 'true' || fastqReadError == 'true' + + script: + """ + hostname > ${repRID}.failPreExecutionRun.log + ulimit -a >> ${repRID}.failPreExecutionRun.log + + executionRun=\$(curl -s https://${source}/ermrest/catalog/2/entity/RNASeq:Execution_Run/RID=${executionRunRID}) + workflow=\$(echo \${executionRun} | grep -o '\\"Workflow\\":.*\\"Reference' | grep -oP '(?<=\\"Workflow\\":\\").*(?=\\",\\"Reference)') + genome=\$(echo \${executionRun} | grep -o '\\"Reference_Genome\\":.*\\"Input_Bag' | grep -oP '(?<=\\"Reference_Genome\\":\\").*(?=\\",\\"Input_Bag)') + + cookie=\$(cat credential.json | grep -A 1 '\\"${source}\\": {' | grep -o '\\"cookie\\": \\".*\\"') + cookie=\${cookie:11:-1} + + errorDetails="" + if [ ${fastqCountError} == true ] + then + errorDetails=\$(echo ${fastqCountError_details}"\\n") + elif [ ${fastqReadError} == true ] + then + errorDetails=\$(echo \$(errorDetails)${pipelineError}"\\n") + fi + + echo LOG: searching for workflow RID - BICF mRNA ${workflow.manifest.version} >> ${repRID}.failPreExecutionRun.log + workflow=\$(curl -s https://${source}/ermrest/catalog/2/entity/RNASeq:Workflow/Name=BICF%20mRNA%20Replicate/Version=${workflow.manifest.version}) + workflow=\$(echo \${workflow} | grep -o '\\"RID\\":\\".*\\",\\"RCT') + workflow=\${workflow:7:-6} + echo LOG: workflow RID extracted - \${workflow} >> ${repRID}.failPreExecutionRun.log + + if [ "${species}" == "Homo sapiens" ] + then + genomeName=\$(echo GRCh${refHuVersion}) + elif [ "${species}" == "Mus musculus" ] + then + genomeName=\$(echo GRCm${refMoVersion}) + fi + if [ "${spike}" == "yes" ] + then + genomeName=\$(echo \${genomeName}-S) + fi + echo LOG: searching for genome name - \${genomeName} >> ${repRID}.failPreExecutionRun.log + genome=\$(curl -s https://${source}/ermrest/catalog/2/entity/RNASeq:Reference_Genome/Name=\${genomeName}) + genome=\$(echo \${genome} | grep -o '\\"RID\\":\\".*\\",\\"RCT') + genome=\${genome:7:-6} + echo LOG: genome RID extracted - \${genome} >> ${repRID}.failPreExecutionRun.log + + cookie=\$(cat credential.json | grep -A 1 '\\"${source}\\": {' | grep -o '\\"cookie\\": \\".*\\"') + cookie=\${cookie:11:-1} + + exist=\$(curl -s https://${source}/ermrest/catalog/2/entity/RNASeq:Execution_Run/Workflow=\${workflow}/Replicate=${repRID}/Input_Bag=${inputBagRID}) + echo \${exist} >> ${repRID}.failPreExecutionRun.log + if [ "\${exist}" == "[]" ] + then + rid=\$(python3 ${script_uploadExecutionRun_failPreExecutionRun} -r ${repRID} -w \${workflow} -g \${genome} -i ${inputBagRID} -s Error -d \${errorDetails} -o ${source} -c \${cookie} -u F) + echo LOG: execution run RID uploaded - \${executionRun_rid} >> ${repRID}.failPreExecutionRun.log + else + rid=\$(echo \${exist} | grep -o '\\"RID\\":\\".*\\",\\"RCT') + rid=\${rid:7:-6} + echo \${rid} >> ${repRID}.failPreExecutionRun.log + executionRun_rid==\$(python3 ${script_uploadExecutionRun_failPreExecutionRun} -r ${repRID} -w \${workflow} -g \${genome} -i ${inputBagRID} -s Error -d \${errorDetails} -o ${source} -c \${cookie} -u \${rid}) + echo LOG: execution run RID updated - \${executionRun_rid} >> ${repRID}.failPreExecutionRun.log + fi + """ +} + /* * failExecutionRun: fail the execution run */ @@ -1962,8 +2156,6 @@ process failExecutionRun { val strandedInfer from strandedInfer_failExecutionRun val spikeInfer from spikeInfer_failExecutionRun val speciesInfer from speciesInfer_failExecutionRun - val fastqCountError from fastqCountError_failExecutionRun - val fastqCountError_details val pipelineError from pipelineError_failExecutionRun val pipelineError_ends val pipelineError_stranded @@ -1972,12 +2164,12 @@ process failExecutionRun { when: upload - fastqCountError == 'true' || pipelineError == 'true' + pipelineError == 'true' script: """ - hostname > ${repRID}.finalizeExecutionRun.log - ulimit -a >> ${repRID}.finalizeExecutionRun.log + hostname > ${repRID}.failExecutionRun.log + ulimit -a >> ${repRID}.failExecutionRun.log executionRun=\$(curl -s https://${source}/ermrest/catalog/2/entity/RNASeq:Execution_Run/RID=${executionRunRID}) workflow=\$(echo \${executionRun} | grep -o '\\"Workflow\\":.*\\"Reference' | grep -oP '(?<=\\"Workflow\\":\\").*(?=\\",\\"Reference)') @@ -1986,16 +2178,12 @@ process failExecutionRun { cookie=\$(cat credential.json | grep -A 1 '\\"${source}\\": {' | grep -o '\\"cookie\\": \\".*\\"') cookie=\${cookie:11:-1} - if [ ${fastqCountError} == false ] && [ ${pipelineError} == false ] + errorDetails="" + if [ ${pipelineError} == false ] then rid=\$(python3 ${script_uploadExecutionRun_failExecutionRun} -r ${repRID} -w \${workflow} -g \${genome} -i ${inputBagRID} -s Success -d 'Run Successful' -o ${source} -c \${cookie} -u ${executionRunRID}) - echo LOG: execution run RID marked as successful - \${rid} >> ${repRID}.finalizeExecutionRun.log - elif [ ${fastqCountError} == true ] - then - rid=\$(python3 ${script_uploadExecutionRun_failExecutionRun} -r ${repRID} -w \${workflow} -g \${genome} -i ${inputBagRID} -s Error -d "${fastqCountError_details}" -o ${source} -c \${cookie} -u ${executionRunRID}) - echo LOG: execution run RID marked as error - \${rid} >> ${repRID}.finalizeExecutionRun.log - elif [ ${pipelineError} == true ] - then + echo LOG: execution run RID marked as successful - \${rid} >> ${repRID}.failExecutionRun.log + else pipelineError_details=\$(echo "**Submitted metadata does not match inferred:**\\n") pipelineError_details=\$(echo \${pipelineError_details}"|Metadata|Submitted value|Inferred value|\\n") pipelineError_details=\$(echo \${pipelineError_details}"|:-:|-:|-:|\\n") @@ -2026,7 +2214,7 @@ process failExecutionRun { fi pipelineError_details=\${pipelineError_details::-2} rid=\$(python3 ${script_uploadExecutionRun_failExecutionRun} -r ${repRID} -w \${workflow} -g \${genome} -i ${inputBagRID} -s Error -d "\${pipelineError_details}" -o ${source} -c \${cookie} -u ${executionRunRID}) - echo LOG: execution run RID marked as error - \${rid} >> ${repRID}.finalizeExecutionRun.log + echo LOG: execution run RID marked as error - \${rid} >> ${repRID}.failExecutionRun.log fi """ }