diff --git a/nextflow.config b/nextflow.config index 5406cc25d44d971657e58d78a45b33534bd69a6e..fbfb2f154ec529822550560fe2fac158219212e0 100644 --- a/nextflow.config +++ b/nextflow.config @@ -98,6 +98,9 @@ process { withName:failPreExecutionRun { container = 'gudmaprbk/deriva1.4:1.0.1' } + withName:failPreExecutionRun_seqwho { + container = 'gudmaprbk/deriva1.4:1.0.1' + } withName:failExecutionRun { container = 'gudmaprbk/deriva1.4:1.0.1' } diff --git a/nextflowConf/aws.config b/nextflowConf/aws.config index a5133aaab9628885c0bcae79617724020855850c..1b1f81c4502177eef7bf48768ccc00517de75fe9 100644 --- a/nextflowConf/aws.config +++ b/nextflowConf/aws.config @@ -111,6 +111,10 @@ process { cpus = 1 memory = '1 GB' } + withName:failPreExecutionRun_seqwho { + cpus = 1 + memory = '1 GB' + } withName:failExecutionRun { cpus = 1 memory = '1 GB' diff --git a/nextflowConf/biohpc.config b/nextflowConf/biohpc.config index dff28cb4ae54ee54ad63204ec8bd88e2441eb71b..a628aa43a5d9fe7473f2a628c73640ce7cf37c1e 100755 --- a/nextflowConf/biohpc.config +++ b/nextflowConf/biohpc.config @@ -91,6 +91,9 @@ process { withName:failPreExecutionRun { executor = 'local' } + withName:failPreExecutionRun_seqwho { + executor = 'local' + } withName:failExecutionRun { executor = 'local' } diff --git a/nextflowConf/dnanexus.config b/nextflowConf/dnanexus.config index d7c10297d2548536ba7deb7c9501b5f1f8de0836..b493b32bb85c8c490815a72ca8e01a7df2a52ce3 100755 --- a/nextflowConf/dnanexus.config +++ b/nextflowConf/dnanexus.config @@ -138,6 +138,11 @@ process { cpus = 1 memory = '1 GB' } + withName:failPreExecutionRun_seqwho { + executor = 'dnanexus' + cpus = 1 + memory = '1 GB' + } withName:failExecutionRun { executor = 'dnanexus' cpus = 1 diff --git a/rna-seq.nf b/rna-seq.nf index 9f6c9afc27352b71e968424245d937554c0dae19..0adcc63f7b7a1a540b2be56a2e94b86376e617bf 100644 --- a/rna-seq.nf +++ b/rna-seq.nf @@ -51,6 +51,7 @@ deriva.into { deriva_uploadOutputBag deriva_finalizeExecutionRun deriva_failPreExecutionRun + deriva_failPreExecutionRun_seqwho deriva_failExecutionRun } bdbag = Channel @@ -105,6 +106,7 @@ script_uploadInputBag = Channel.fromPath("${baseDir}/workflow/scripts/upload_inp script_uploadExecutionRun_uploadExecutionRun = Channel.fromPath("${baseDir}/workflow/scripts/upload_execution_run.py") script_uploadExecutionRun_finalizeExecutionRun = Channel.fromPath("${baseDir}/workflow/scripts/upload_execution_run.py") script_uploadExecutionRun_failPreExecutionRun = Channel.fromPath("${baseDir}/workflow/scripts/upload_execution_run.py") +script_uploadExecutionRun_failPreExecutionRun_seqwho = Channel.fromPath("${baseDir}/workflow/scripts/upload_execution_run.py") script_uploadExecutionRun_failExecutionRun = Channel.fromPath("${baseDir}/workflow/scripts/upload_execution_run.py") script_uploadQC = Channel.fromPath("${baseDir}/workflow/scripts/upload_qc.py") script_uploadQC_fail = Channel.fromPath("${baseDir}/workflow/scripts/upload_qc.py") @@ -521,7 +523,9 @@ strandedMeta.into { spikeMeta.into { spikeMeta_checkMetadata spikeMeta_aggrQC + spikeMeta_uploadExecutionRun spikeMeta_failPreExecutionRun + spikeMeta_failPreExecutionRun_seqwho spikeMeta_failExecutionRun } speciesMeta.into { @@ -529,6 +533,7 @@ speciesMeta.into { speciesMeta_checkMetadata speciesMeta_aggrQC speciesMeta_failPreExecutionRun + speciesMeta_failPreExecutionRun_seqwho speciesMeta_failExecutionRun } studyRID.into { @@ -953,7 +958,7 @@ speciesInfer.into { speciesInfer_aggrQC speciesInfer_uploadExecutionRun speciesInfer_uploadProcessedFile - speciesInfer_failPreExecutionRun + speciesInfer_failPreExecutionRun_seqwho speciesInfer_failExecutionRun } @@ -1297,7 +1302,6 @@ spikeInfer.into { spikeInfer_getRef spikeInfer_checkMetadata spikeInfer_aggrQC - spikeInfer_uploadExecutionRun spikeInfer_failExecutionRun } @@ -2257,6 +2261,7 @@ inputBagRID.into { inputBagRID_uploadExecutionRun inputBagRID_finalizeExecutionRun inputBagRID_failPreExecutionRun + inputBagRID_failPreExecutionRun_seqwho inputBagRID_failExecutionRun } @@ -2269,7 +2274,7 @@ process uploadExecutionRun { input: path script_uploadExecutionRun_uploadExecutionRun path credential, stageAs: "credential.json" from deriva_uploadExecutionRun - val spike from spikeInfer_uploadExecutionRun + val spike from spikeMeta_uploadExecutionRun val species from speciesInfer_uploadExecutionRun val inputBagRID from inputBagRID_uploadExecutionRun val fastqCountError from fastqCountError_uploadExecutionRun @@ -2681,11 +2686,17 @@ process finalizeExecutionRun { // Combine errors error_meta = fastqCountError_uploadQC_fail.ifEmpty(false).combine(fastqReadError_uploadQC_fail.ifEmpty(false).combine(fastqFileError_uploadQC_fail.ifEmpty(false).combine(seqtypeError_uploadQC_fail.ifEmpty(false).combine(speciesErrorSeqwho_uploadQC_fail.ifEmpty(false).combine(speciesError_uploadQC_fail.ifEmpty(false).combine(pipelineError_uploadQC_fail.ifEmpty(false))))))) -error_meta. into { +error_meta.into { error_failPreExecutionRun + error_failPreExecutionRun_seqwho error_uploadQC_fail } errorDetails = fastqCountError_details.ifEmpty("").combine(fastqReadError_details.ifEmpty("").combine(fastqFileError_details.ifEmpty("").combine(seqtypeError_details.ifEmpty("").combine(speciesErrorSeqwho_details.ifEmpty(""))))) +errorDetails.into { + errorDetails_failPreExecutionRun + errorDetails_failPreExecutionRun_seqwho +} + /* * failPreExecutionRun: fail the execution run prematurely for fastq errors @@ -2698,17 +2709,16 @@ process failPreExecutionRun { path credential, stageAs: "credential.json" from deriva_failPreExecutionRun val spike from spikeMeta_failPreExecutionRun val speciesMeta from speciesMeta_failPreExecutionRun - val speciesInfer from speciesInfer_failPreExecutionRun val inputBagRID from inputBagRID_failPreExecutionRun tuple val (fastqCountError), val (fastqReadError), val (fastqFileError), val (seqtypeError), val (speciesErrorSeqwho), val (speciesError), val (pipelineError) from error_failPreExecutionRun - tuple val (fastqCountError_details), val (fastqReadError_details), val (fastqFileError_details), val (seqtypeError_details), val (speciesErrorSeqwho_details) from errorDetails + tuple val (fastqCountError_details), val (fastqReadError_details), val (fastqFileError_details), val (seqtypeError_details), val (speciesErrorSeqwho_details) from errorDetails_failPreExecutionRun output: path ("executionRunRID.csv") into executionRunRID_preFail_fl when: upload - fastqCountError == "true" || fastqReadError == "true" || fastqFileError == "true" || seqtypeError == "true" || speciesError == "true" + fastqCountError == "true" || fastqReadError == "true" || fastqFileError == "true" script: """ @@ -2725,7 +2735,99 @@ process failPreExecutionRun { elif [ ${fastqFileError} == true ] then errorDetails=\$(echo "\${errorDetails}${fastqFileError_details}\\n") - elif [ ${seqtypeError} == true ] + fi + + echo LOG: searching for workflow RID - BICF mRNA ${workflow.manifest.version} >> ${repRID}.failPreExecutionRun.log + workflow=\$(curl -s https://${source}/ermrest/catalog/2/entity/RNASeq:Workflow/Name=BICF%20mRNA%20Replicate/Version=${workflow.manifest.version}) + workflow=\$(echo \${workflow} | grep -o '\\"RID\\":\\".*\\",\\"RCT') + workflow=\${workflow:7:-6} + echo LOG: workflow RID extracted - \${workflow} >> ${repRID}.failPreExecutionRun.log + + if [ "${speciesMeta}" == "Homo sapiens" ] + then + genomeName=\$(echo GRCh${refHuVersion}) + elif [ "${speciesMeta}" == "Mus musculus" ] + then + genomeName=\$(echo GRCm${refMoVersion}) + fi + if [ "${spike}" == "true" ] + then + genomeName=\$(echo \${genomeName}-S) + fi + echo LOG: searching for genome name - \${genomeName} >> ${repRID}.failPreExecutionRun.log + genome=\$(curl -s https://${source}/ermrest/catalog/2/entity/RNASeq:Reference_Genome/Name=\${genomeName}) + genome=\$(echo \${genome} | grep -o '\\"RID\\":\\".*\\",\\"RCT') + genome=\${genome:7:-6} + echo LOG: genome RID extracted - \${genome} >> ${repRID}.failPreExecutionRun.log + + cookie=\$(cat credential.json | grep -A 1 '\\"${source}\\": {' | grep -o '\\"cookie\\": \\".*\\"') + cookie=\${cookie:11:-1} + + exist=\$(curl -s https://${source}/ermrest/catalog/2/entity/RNASeq:Execution_Run/Workflow=\${workflow}/Replicate=${repRID}/Input_Bag=${inputBagRID}) + echo \${exist} >> ${repRID}.failPreExecutionRun.log + if [ "\${exist}" == "[]" ] + then + rid=\$(python3 ${script_uploadExecutionRun} -r ${repRID} -w \${workflow} -g \${genome} -i ${inputBagRID} -s Error -d "\${errorDetails}" -o ${source} -c \${cookie} -u F) + echo LOG: execution run RID uploaded - \${rid} >> ${repRID}.failPreExecutionRun.log + else + rid=\$(echo \${exist} | grep -o '\\"RID\\":\\".*\\",\\"RCT') + rid=\${rid:7:-6} + echo \${rid} >> ${repRID}.failPreExecutionRun.log + executionRun_rid=\$(python3 ${script_uploadExecutionRun} -r ${repRID} -w \${workflow} -g \${genome} -i ${inputBagRID} -s Error -d "\${errorDetails}" -o ${source} -c \${cookie} -u \${rid}) + echo LOG: execution run RID updated - \${executionRun_rid} >> ${repRID}.failPreExecutionRun.log + fi + + echo "\${rid}" > executionRunRID.csv + + if [ ${params.track} == true ] + then + dt=`date +%FT%T.%3N%:z` + curl -H 'Content-Type: application/json' -X PUT -d \ + '{ \ + "ID": "${workflow.sessionId}", \ + "ExecutionRunRID": "'\${rid}'", \ + "Failure": "'\${dt}'" \ + }' \ + "https://9ouc12dkwb.execute-api.us-east-2.amazonaws.com/prod/db/track" + fi + """ +} +// Extract execution run RID into channel +executionRunRID_preFail = Channel.create() +executionRunRID_preFail_fl.splitCsv(sep: ",", header: false).separate( + executionRunRID_preFail +) + +/* + * failPreExecutionRun_seqwho: fail the execution run prematurely for seqwho errors + */ +process failPreExecutionRun_seqwho { + tag "${repRID}" + + input: + path script_uploadExecutionRun from script_uploadExecutionRun_failPreExecutionRun_seqwho + path credential, stageAs: "credential.json" from deriva_failPreExecutionRun_seqwho + val spike from spikeMeta_failPreExecutionRun_seqwho + val speciesMeta from speciesMeta_failPreExecutionRun_seqwho + val speciesInfer from speciesInfer_failPreExecutionRun_seqwho + val inputBagRID from inputBagRID_failPreExecutionRun_seqwho + tuple val (fastqCountError), val (fastqReadError), val (fastqFileError), val (seqtypeError), val (speciesErrorSeqwho), val (speciesError), val (pipelineError) from error_failPreExecutionRun_seqwho + tuple val (fastqCountError_details), val (fastqReadError_details), val (fastqFileError_details), val (seqtypeError_details), val (speciesErrorSeqwho_details) from errorDetails_failPreExecutionRun_seqwho + + output: + path ("executionRunRID.csv") into executionRunRID_preFailseqwho_fl + + when: + upload + seqtypeError == "true" || speciesError == "true" + + script: + """ + hostname > ${repRID}.failPreExecutionRun.log + ulimit -a >> ${repRID}.failPreExecutionRun.log + + errorDetails="" + if [ ${seqtypeError} == true ] then errorDetails=\$(echo "\${errorDetails}${seqtypeError_details}\\n") elif [ ${speciesError} == true ] @@ -2792,12 +2894,13 @@ process failPreExecutionRun { """ } // Extract execution run RID into channel -executionRunRID_preFail = Channel.create() -executionRunRID_preFail_fl.splitCsv(sep: ",", header: false).separate( - executionRunRID_preFail +executionRunRID_preFailseqwho = Channel.create() +executionRunRID_preFailseqwho_fl.splitCsv(sep: ",", header: false).separate( + executionRunRID_preFailseqwho ) -failExecutionRunRID = executionRunRID_fail.ifEmpty('').mix(executionRunRID_preFail.ifEmpty('')).filter { it != "" } + +failExecutionRunRID = executionRunRID_fail.ifEmpty('').mix(executionRunRID_preFail.ifEmpty('').mix(executionRunRID_preFailseqwho.ifEmpty(''))).filter { it != "" } /* * failExecutionRun: fail the execution run