diff --git a/CHANGELOG.md b/CHANGELOG.md index 9eb41cc45cd4314c16a8b1c4fc75293ad5129a57..c95ee90ea37c9a7709b76790a6669cbc8236e83e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ **User Facing** * Endness metadata "Single Read" changed to "Single End" in data-hub, pipeline updated to handle (#110) ("Single Read" still acceptable for backwards compatibility) * Strandedness metadata "yes"/"no" changed to boolean "t"/"f" in data-hub, pipeline updated to handle (#70) ("yes"/"no" still acceptable for backwards compatibility) +* Upload empty mRNA_QC entry if data error (#111) **Background** * Add memory limit (75%) per thread for samtools sort (#108) diff --git a/workflow/conf/aws.config b/workflow/conf/aws.config index d87669599a7e70add448f0cc7f0636dd8bef499b..9e862d3eba6c97f4fb65d2095a90cd01a854b456 100644 --- a/workflow/conf/aws.config +++ b/workflow/conf/aws.config @@ -129,4 +129,8 @@ process { cpus = 1 memory = '1 GB' } + withName:uploadQC_fail { + cpus = 1 + memory = '1 GB' + } } diff --git a/workflow/conf/biohpc.config b/workflow/conf/biohpc.config index cc058dfef74bc49adf3932554994678934ac7a44..8b80b80c2d51e7e1b0da6bd986292d621bc2e8b2 100755 --- a/workflow/conf/biohpc.config +++ b/workflow/conf/biohpc.config @@ -94,6 +94,9 @@ process { withName:failExecutionRun { executor = 'local' } + withName:uploadQC_fail { + executor = 'local' + } } singularity { diff --git a/workflow/nextflow.config b/workflow/nextflow.config index 29c8515a852fb3356335b89a71f4ddf4f2c7a78b..459f723c21cef8fc5f16c1ecddaa5ade4eb1d31a 100644 --- a/workflow/nextflow.config +++ b/workflow/nextflow.config @@ -100,6 +100,9 @@ process { withName:failExecutionRun { container = 'gudmaprbk/deriva1.4:1.0.0' } + withName:uploadQC_fail { + container = 'gudmaprbk/deriva1.4:1.0.0' + } } trace { diff --git a/workflow/rna-seq.nf b/workflow/rna-seq.nf index cc02c3650b2ab441b0dc3c379cb4432f1ab831b1..ec9048d8e2b3654076f9fa90d41cb2a870bc9e8f 100644 --- a/workflow/rna-seq.nf +++ b/workflow/rna-seq.nf @@ -44,6 +44,7 @@ deriva.into { deriva_uploadInputBag deriva_uploadExecutionRun deriva_uploadQC + deriva_uploadQC_fail deriva_uploadProcessedFile deriva_uploadOutputBag deriva_finalizeExecutionRun @@ -105,8 +106,10 @@ script_uploadExecutionRun_failPreExecutionRun_fastqFile = Channel.fromPath("${ba script_uploadExecutionRun_failPreExecutionRun_species = Channel.fromPath("${baseDir}/scripts/upload_execution_run.py") script_uploadExecutionRun_failExecutionRun = Channel.fromPath("${baseDir}/scripts/upload_execution_run.py") script_uploadQC = Channel.fromPath("${baseDir}/scripts/upload_qc.py") +script_uploadQC_fail = Channel.fromPath("${baseDir}/scripts/upload_qc.py") script_uploadOutputBag = Channel.fromPath("${baseDir}/scripts/upload_output_bag.py") script_deleteEntry_uploadQC = Channel.fromPath("${baseDir}/scripts/delete_entry.py") +script_deleteEntry_uploadQC_fail = Channel.fromPath("${baseDir}/scripts/delete_entry.py") script_deleteEntry_uploadProcessedFile = Channel.fromPath("${baseDir}/scripts/delete_entry.py") /* @@ -343,7 +346,7 @@ process parseMetadata { elif [ "\${endsRaw}" == "Paired End" ] then endsMeta="pe" - elseif [ "\${endsRaw}" == "Single Read" ] + elif [ "\${endsRaw}" == "Single Read" ] # "Single Read" depreciated as of Jan 2021, this option is present for backwards compatibility then endsMeta="se" @@ -367,7 +370,7 @@ process parseMetadata { echo -e "LOG: strandedness metadata parsed: \${stranded}" >> ${repRID}.parseMetadata.log # get spike-in metadata - v=\$(python3 ${script_parseMeta} -r ${repRID} -m "${experimentSettings}" -p spike) + spike=\$(python3 ${script_parseMeta} -r ${repRID} -m "${experimentSettings}" -p spike) echo -e "LOG: spike-in metadata parsed: \${spike}" >> ${repRID}.parseMetadata.log if [ "\${spike}" == "f" ] then @@ -375,11 +378,11 @@ process parseMetadata { elif [ "\${spike}" == "t" ] then spike="true" - elseif [ "\${spike}" == "no" ] + elif [ "\${spike}" == "no" ] # "yes"/"no" depreciated as of Jan 2021, this option is present for backwards compatibility then spike="false" - elseif [ "\${spike}" == "yes" ] + elif [ "\${spike}" == "yes" ] # "yes"/"no" depreciated as of Jan 2021, this option is present for backwards compatibility then spike="true" @@ -540,6 +543,7 @@ fastqCountError.into { fastqCountError_dataQC fastqCountError_aggrQC fastqCountError_uploadQC + fastqCountError_uploadQC_fail fastqCountError_uploadProcessedFile fastqCountError_uploadOutputBag fastqCountError_failPreExecutionRun_fastq @@ -561,6 +565,7 @@ fastqReadError.into { fastqReadError_dataQC fastqReadError_aggrQC fastqReadError_uploadQC + fastqReadError_uploadQC_fail fastqReadError_uploadProcessedFile fastqReadError_uploadOutputBag fastqReadError_failPreExecutionRun_fastq @@ -653,6 +658,7 @@ fastqFileError.into { fastqFileError_dataQC fastqFileError_aggrQC fastqFileError_uploadQC + fastqFileError_uploadQC_fail fastqFileError_uploadProcessedFile fastqFileError_uploadOutputBag fastqFileError_failPreExecutionRun_fastqFile @@ -1129,6 +1135,7 @@ speciesError.into { speciesError_dataQC speciesError_aggrQC speciesError_uploadQC + speciesError_uploadQC_fail speciesError_uploadProcessedFile speciesError_uploadOutputBag speciesError_failPreExecutionRun_species @@ -1257,6 +1264,7 @@ pipelineError.into { pipelineError_dataQC pipelineError_aggrQC pipelineError_uploadQC + pipelineError_uploadQC_fail pipelineError_uploadProcessedFile pipelineError_uploadOutputBag pipelineError_failExecutionRun @@ -1428,6 +1436,7 @@ executionRunRID.into { executionRunRID_uploadOutputBag executionRunRID_finalizeExecutionRun executionRunRID_failExecutionRun + executionRunRID_fail } /* @@ -2300,6 +2309,9 @@ process failPreExecutionRun_fastq { val fastqReadError from fastqReadError_failPreExecutionRun_fastq val fastqReadError_details + output: + path ("executionRunRID.csv") into executionRunRID_fastqFail_fl + when: upload fastqCountError == 'true' || fastqReadError == 'true' @@ -2331,7 +2343,7 @@ process failPreExecutionRun_fastq { then genomeName=\$(echo GRCm${refMoVersion}) fi - if [ "${spike}" == "yes" ] + if [ "${spike}" == "true" ] then genomeName=\$(echo \${genomeName}-S) fi @@ -2354,10 +2366,12 @@ process failPreExecutionRun_fastq { rid=\$(echo \${exist} | grep -o '\\"RID\\":\\".*\\",\\"RCT') rid=\${rid:7:-6} echo \${rid} >> ${repRID}.failPreExecutionRun_fastq.log - executionRun_rid==\$(python3 ${script_uploadExecutionRun} -r ${repRID} -w \${workflow} -g \${genome} -i ${inputBagRID} -s Error -d "\${errorDetails}" -o ${source} -c \${cookie} -u \${rid}) + executionRun_rid=\$(python3 ${script_uploadExecutionRun} -r ${repRID} -w \${workflow} -g \${genome} -i ${inputBagRID} -s Error -d "\${errorDetails}" -o ${source} -c \${cookie} -u \${rid}) echo LOG: execution run RID updated - \${executionRun_rid} >> ${repRID}.failPreExecutionRun_fastq.log fi + echo "\${rid}" > executionRunRID.csv + dt=`date +%FT%T.%3N%:z` curl -H 'Content-Type: application/json' -X PUT -d \ '{ \ @@ -2384,6 +2398,9 @@ process failPreExecutionRun_fastqFile { val fastqFileError from fastqFileError_failPreExecutionRun_fastqFile val fastqFileError_details + output: + path ("executionRunRID.csv") into executionRunRID_fastqFileFail_fl + when: upload fastqFileError == 'true' @@ -2435,10 +2452,12 @@ process failPreExecutionRun_fastqFile { rid=\$(echo \${exist} | grep -o '\\"RID\\":\\".*\\",\\"RCT') rid=\${rid:7:-6} echo \${rid} >> ${repRID}.failPreExecutionRun_fastqfile.log - executionRun_rid==\$(python3 ${script_uploadExecutionRun} -r ${repRID} -w \${workflow} -g \${genome} -i ${inputBagRID} -s Error -d "\${errorDetails}" -o ${source} -c \${cookie} -u \${rid}) + executionRun_rid=\$(python3 ${script_uploadExecutionRun} -r ${repRID} -w \${workflow} -g \${genome} -i ${inputBagRID} -s Error -d "\${errorDetails}" -o ${source} -c \${cookie} -u \${rid}) echo LOG: execution run RID updated - \${executionRun_rid} >> ${repRID}.failPreExecutionRun_fastqfile.log fi + echo "\${rid}" > executionRunRID.csv + dt=`date +%FT%T.%3N%:z` curl -H 'Content-Type: application/json' -X PUT -d \ '{ \ @@ -2465,6 +2484,9 @@ process failPreExecutionRun_species { val speciesError from speciesError_failPreExecutionRun_species val speciesError_details + output: + path ("executionRunRID.csv") into executionRunRID_speciesFail_fl + when: upload speciesError == 'true' @@ -2516,10 +2538,12 @@ process failPreExecutionRun_species { rid=\$(echo \${exist} | grep -o '\\"RID\\":\\".*\\",\\"RCT') rid=\${rid:7:-6} echo \${rid} >> ${repRID}.failPreExecutionRun_species.log - executionRun_rid==\$(python3 ${script_uploadExecutionRun} -r ${repRID} -w \${workflow} -g \${genome} -i ${inputBagRID} -s Error -d "\${errorDetails}" -o ${source} -c \${cookie} -u \${rid}) + executionRun_rid=\$(python3 ${script_uploadExecutionRun} -r ${repRID} -w \${workflow} -g \${genome} -i ${inputBagRID} -s Error -d "\${errorDetails}" -o ${source} -c \${cookie} -u \${rid}) echo LOG: execution run RID updated - \${executionRun_rid} >> ${repRID}.failPreExecutionRun_species.log fi + echo "\${rid}" > executionRunRID.csv + dt=`date +%FT%T.%3N%:z` curl -H 'Content-Type: application/json' -X PUT -d \ '{ \ @@ -2531,6 +2555,22 @@ process failPreExecutionRun_species { """ } +// Extract execution run RID into channel +executionRunRID_fastqFail = Channel.create() +executionRunRID_fastqFail_fl.splitCsv(sep: ",", header: false).separate( + executionRunRID_fastqFail +) +executionRunRID_fastqFileFail = Channel.create() +executionRunRID_fastqFileFail_fl.splitCsv(sep: ",", header: false).separate( + executionRunRID_fastqFileFail +) +executionRunRID_speciesFail = Channel.create() +executionRunRID_speciesFail_fl.splitCsv(sep: ",", header: false).separate( + executionRunRID_speciesFail +) + +failExecutionRunRID = executionRunRID_fail.ifEmpty('').mix(executionRunRID_fastqFail.ifEmpty(''),executionRunRID_fastqFileFail.ifEmpty(''),executionRunRID_speciesFail.ifEmpty('')).filter { it != "" } + /* * failExecutionRun: fail the execution run */ @@ -2623,6 +2663,53 @@ process failExecutionRun { """ } +// Combine errors +error_uploadQC_fail = fastqCountError_uploadQC_fail.ifEmpty(false).combine(fastqReadError_uploadQC_fail.ifEmpty(false).combine(fastqFileError_uploadQC_fail.ifEmpty(false).combine(speciesError_uploadQC_fail.ifEmpty(false).combine(pipelineError_uploadQC_fail.ifEmpty(false))))) + +/* + * uploadQC_fail: uploads the mRNA QC on failed execution run +*/ +process uploadQC_fail { + tag "${repRID}" + + input: + path script_deleteEntry_uploadQC_fail + path script_uploadQC_fail + path credential, stageAs: "credential.json" from deriva_uploadQC_fail + val executionRunRID from failExecutionRunRID + tuple val (fastqCountError), val (fastqReadError_uploadQC), val (fastqFileError_uploadQC), val (speciesError_uploadQC), val (pipelineError_uploadQC) from error_uploadQC_fail + + when: + upload + fastqCountError_uploadQC == 'true' || fastqReadError_uploadQC == 'true' || fastqFileError_uploadQC == 'true' || speciesError_uploadQC == 'true' || pipelineError_uploadQC == 'true' + + script: + """ + hostname > ${repRID}.uploadQC.log + ulimit -a >> ${repRID}.uploadQC.log + + cookie=\$(cat credential.json | grep -A 1 '\\"${source}\\": {' | grep -o '\\"cookie\\": \\".*\\"') + cookie=\${cookie:11:-1} + + exist=\$(curl -s https://${source}/ermrest/catalog/2/entity/RNASeq:mRNA_QC/Replicate=${repRID}) + if [ "\${exist}" != "[]" ] + then + rids=\$(echo \${exist} | grep -o '\\"RID\\":\\".\\{7\\}' | sed 's/^.\\{7\\}//') + for rid in \${rids} + do + python3 ${script_deleteEntry_uploadQC_fail} -r \${rid} -t mRNA_QC -o ${source} -c \${cookie} + echo LOG: old mRNA QC RID deleted - \${rid} >> ${repRID}.uploadQC.log + done + echo LOG: all old mRNA QC RIDs deleted >> ${repRID}.uploadQC.log + fi + + qc_rid=\$(python3 ${script_uploadQC_fail} -r ${repRID} -e ${executionRunRID} -o ${source} -c \${cookie} -u E) + echo LOG: mRNA QC RID uploaded - \${qc_rid} >> ${repRID}.uploadQC.log + + echo "\${qc_rid}" > qcRID.csv + """ +} + workflow.onError = { subject = "$workflow.manifest.name FAILED: $params.repRID" diff --git a/workflow/scripts/parse_meta.py b/workflow/scripts/parse_meta.py index 0b15f07d15bde582e0bd4de2d4be3d8fa12b9265..52f0f18200525f776fe73fcf2f4cd5be8db35045 100644 --- a/workflow/scripts/parse_meta.py +++ b/workflow/scripts/parse_meta.py @@ -63,7 +63,7 @@ def main(): # Get strandedness metadata from 'Experiment Settings.csv' if (args.parameter == "stranded"): - stranded = metaFile.Stranded.unique()[0] + stranded = metaFile.Strandedness.unique()[0] print(stranded) # Get spike-in metadata from 'Experiment Settings.csv' diff --git a/workflow/scripts/upload_qc.py b/workflow/scripts/upload_qc.py index b842a7a36cc47fa4f599ab086a5c1b3dbece437a..29fac063d9812ad05877d3e8f8f0d865d52eca14 100644 --- a/workflow/scripts/upload_qc.py +++ b/workflow/scripts/upload_qc.py @@ -7,12 +7,12 @@ def get_args(): parser = argparse.ArgumentParser() parser.add_argument('-r', '--repRID', help="replicate RID", required=True) parser.add_argument('-e', '--executionRunRID', help="exection run RID", required=True) - parser.add_argument('-p', '--ends', help="single/paired ends", required=True) - parser.add_argument('-s', '--stranded', help="stranded?", required=True) - parser.add_argument('-l', '--length', help="median read length", required=True) - parser.add_argument('-w', '--rawCount', help="raw count", required=True) - parser.add_argument('-f', '--assignedCount', help="final assigned count", required=True) - parser.add_argument('-t', '--tin', help="median TIN", required=True) + parser.add_argument('-p', '--ends', help="single/paired ends", required=False) + parser.add_argument('-s', '--stranded', help="stranded?", required=False) + parser.add_argument('-l', '--length', help="median read length", required=False) + parser.add_argument('-w', '--rawCount', help="raw count", required=False) + parser.add_argument('-f', '--assignedCount', help="final assigned count", required=False) + parser.add_argument('-t', '--tin', help="median TIN", required=False) parser.add_argument('-n', '--notes', help="notes", default="", required=False) parser.add_argument('-o', '--host', help="datahub host", required=True) parser.add_argument('-c', '--cookie', help="cookie token", required=True) @@ -39,6 +39,13 @@ def main(hostname, catalog_number, credential): } entities = run_table.insert([run_data]) rid = entities[0]["RID"] + elif args.update == "E": + run_data = { + "Execution_Run": args.executionRunRID, + "Replicate": args.repRID + } + entities = run_table.insert([run_data]) + rid = entities[0]["RID"] else: run_data = { "RID": args.update,