diff --git a/README.md b/README.md index 185f4453163575fc4b76e8ccb00aac53c74b25ab..1edcd6f4e6fe6ce2037d2efe2e53d8092b2ef177 100644 --- a/README.md +++ b/README.md @@ -40,7 +40,7 @@ To Run: * `--refMoVersion` mouse reference version ***(optional, default = 38.p6.vM22)*** * `--refHuVersion` human reference version ***(optional, default = 38.p12.v31)*** * `--refERCCVersion` human reference version ***(optional, default = 92)*** - * `--upload` option to not upload output back to the data-hub ***(optional, default = true)*** + * `--upload` option to not upload output back to the data-hub ***(optional, default = false)*** * **true** = upload outputs to the data-hub * **false** = do *NOT* upload outputs to the data-hub * `-profile` config profile to use ***(optional)***: @@ -69,7 +69,7 @@ To Run: * eg: `--speciesForce 'Mus musculus'` * Tracking parameters ([Tracking Site](http://bicf.pipeline.tracker.s3-website-us-east-1.amazonaws.com/)): * `--ci` boolean (default = false) - * `--dev` boolean (default = false) + * `--dev` boolean (default = true) FULL EXAMPLE: ------------- diff --git a/workflow/conf/aws.config b/workflow/conf/aws.config index 3e3cbb65a60f726cb936d0dacaefe3a1a07662e4..75d1c71a2eca321e01dd7ff70ae5e0ec6d069962 100644 --- a/workflow/conf/aws.config +++ b/workflow/conf/aws.config @@ -16,91 +16,99 @@ process { cpus = 1 memory = '1 GB' - withName: trackStart { + withName:trackStart { cpus = 1 memory = '1 GB' } - withName: getBag { + withName:getBag { cpus = 1 memory = '1 GB' } - withName: getData { + withName:getData { cpus = 1 memory = '1 GB' } - withName: parseMetadata { + withName:parseMetadata { cpus = 15 memory = '1 GB' } - withName: trimData { + withName:trimData { cpus = 20 memory = '2 GB' } - withName: getRefInfer { + withName:getRefInfer { cpus = 1 memory = '1 GB' } - withName: downsampleData { + withName:downsampleData { cpus = 1 memory = '1 GB' } - withName: alignSampleData { + withName:alignSampleData { cpus = 50 memory = '5 GB' } - withName: inferMetadata { + withName:inferMetadata { cpus = 5 memory = '1 GB' } - withName: getRef { + withName:checkMetadata { cpus = 1 memory = '1 GB' } - withName: alignData { + withName:getRef { + cpus = 1 + memory = '1 GB' + } + withName:alignData { cpus = 50 memory = '10 GB' } - withName: dedupData { + withName:dedupData { cpus = 5 memory = '20 GB' } - withName: countData { + withName:countData { cpus = 2 memory = '5 GB' } - withName: makeBigWig { + withName:makeBigWig { cpus = 15 memory = '5 GB' } - withName: fastqc { + withName:fastqc { cpus = 1 memory = '1 GB' } - withName: dataQC { + withName:dataQC { cpus = 15 memory = '2 GB' } - withName: aggrQC { + withName:aggrQC { cpus = 2 memory = '1 GB' } - withName: uploadInputBag { + withName:uploadInputBag { + cpus = 1 + memory = '1 GB' + } + withName:uploadExecutionRun { cpus = 1 memory = '1 GB' } - withName: uploadExecutionRun { + withName:uploadQC { cpus = 1 memory = '1 GB' } - withName: uploadQC { + withName:uploadProcessedFile { cpus = 1 memory = '1 GB' } - withName: uploadProcessedFile { + withName:uploadOutputBag { cpus = 1 memory = '1 GB' } - withName: uploadOutputBag { + withName:finalizeExecutionRun { cpus = 1 memory = '1 GB' } diff --git a/workflow/conf/biohpc.config b/workflow/conf/biohpc.config index ca9f0e4f935099a50f6ef241dfe42f37e6e382b9..44808d6e3f4001374dd29b967430674ffb2df095 100755 --- a/workflow/conf/biohpc.config +++ b/workflow/conf/biohpc.config @@ -7,70 +7,76 @@ process { queue = 'super' clusterOptions = '--hold' - withName: trackStart { + withName:trackStart { executor = 'local' } - withName: getBag { + withName:getBag { executor = 'local' } - withName: getData { + withName:getData { queue = 'super' } - withName: parseMetadata { + withName:parseMetadata { executor = 'local' } - withName: trimData { + withName:trimData { queue = 'super' } - withName: getRefInfer { + withName:getRefInfer { queue = 'super' } - withName: downsampleData { + withName:downsampleData { executor = 'local' } - withName: alignSampleData { + withName:alignSampleData { queue = 'super' } - withName: inferMetadata { + withName:inferMetadata { queue = 'super' } - withName: getRef { + withName:checkMetadata { + executor = 'local' + } + withName:getRef { queue = 'super' } - withName: alignData { + withName:alignData { queue = '256GB,256GBv1' } - withName: dedupData { + withName:dedupData { queue = 'super' } - withName: countData { + withName:countData { queue = 'super' } - withName: makeBigWig { + withName:makeBigWig { queue = 'super' } - withName: fastqc { + withName:fastqc { queue = 'super' } - withName: dataQC { + withName:dataQC { queue = 'super' } - withName: aggrQC { + withName:aggrQC { + executor = 'local' + } + withName:uploadInputBag { executor = 'local' } - withName: uploadInputBag { + withName:uploadExecutionRun { executor = 'local' } - withName: uploadExecutionRun { + withName:uploadQC { executor = 'local' } - withName: uploadQC { + withName:uploadProcessedFile { executor = 'local' } - withName: uploadProcessedFile { + withName:uploadOutputBag { executor = 'local' } - withName: uploadOutputBag { + withName:finalizeExecutionRun { executor = 'local' } } diff --git a/workflow/nextflow.config b/workflow/nextflow.config index 33b16b42074e47dcce027c2ac391304b95c91ff3..075d6dcb9107a357194fa1696c3c6ba07bf60d93 100644 --- a/workflow/nextflow.config +++ b/workflow/nextflow.config @@ -25,46 +25,49 @@ process { withName:getData { container = 'gudmaprbk/deriva1.3:1.0.0' } - withName: parseMetadata { + withName:parseMetadata { container = 'gudmaprbk/python3:1.0.0' } - withName: trimData { + withName:trimData { container = 'gudmaprbk/trimgalore0.6.5:1.0.0' } - withName: getRefInfer { + withName:getRefInfer { container = 'gudmaprbk/deriva1.3:1.0.0' } - withName: downsampleData { + withName:downsampleData { container = 'gudmaprbk/seqtk1.3:1.0.0' } - withName: alignSampleData { + withName:alignSampleData { container = 'gudmaprbk/hisat2.2.1:1.0.0' } - withName: inferMetadata { + withName:inferMetadata { container = 'gudmaprbk/rseqc4.0.0:1.0.0' } - withName: getRef { + withName:checkMetadata { + container = 'gudmaprbk/gudmap-rbk_base:1.0.0' + } + withName:getRef { container = 'gudmaprbk/deriva1.3:1.0.0' } - withName: alignData { + withName:alignData { container = 'gudmaprbk/hisat2.2.1:1.0.0' } - withName: dedupData { + withName:dedupData { container = 'gudmaprbk/picard2.23.9:1.0.0' } - withName: countData { + withName:countData { container = 'gudmaprbk/subread2.0.1:1.0.0' } - withName: makeBigWig { + withName:makeBigWig { container = 'gudmaprbk/deeptools3.5.0:1.0.0' } - withName: fastqc { + withName:fastqc { container = 'gudmaprbk/fastqc0.11.9:1.0.0' } - withName: dataQC { + withName:dataQC { container = 'gudmaprbk/rseqc4.0.0:1.0.0' } - withName: aggrQC { + withName:aggrQC { container = 'gudmaprbk/multiqc1.9:1.0.0' } withName:uploadInputBag { @@ -82,6 +85,9 @@ process { withName:uploadOutputBag { container = 'gudmaprbk/deriva1.3:1.0.0' } + withName:finalizeExecutionRun { + container = 'gudmaprbk/deriva1.3:1.0.0' + } } trace { @@ -110,6 +116,6 @@ manifest { homePage = 'https://git.biohpc.swmed.edu/gudmap_rbk/rna-seq' description = 'This pipeline was created to be a standard mRNA-sequencing analysis pipeline which integrates with the GUDMAP and RBK consortium data-hub.' mainScript = 'rna-seq.nf' - version = 'v0.0.4_indev' + version = 'v0.1.0' nextflowVersion = '>=19.09.0' } diff --git a/workflow/rna-seq.nf b/workflow/rna-seq.nf index 3a97735fe88059a99de264e03d798dc5e2410032..5c548b28b8c92e5a3cb4317621ee7cfb448211c0 100644 --- a/workflow/rna-seq.nf +++ b/workflow/rna-seq.nf @@ -14,8 +14,8 @@ params.bdbag = "${baseDir}/../test_data/auth/cookies.txt" //params.repRID = "16-1ZX4" params.repRID = "Q-Y5F6" params.source = "dev" -params.refMoVersion = "38.p6.vM22" -params.refHuVersion = "38.p12.v31" +params.refMoVersion = "38.p6.vM25" +params.refHuVersion = "38.p13.v36" params.refERCCVersion = "92" params.outDir = "${baseDir}/../output" params.upload = false @@ -46,6 +46,7 @@ deriva.into { deriva_uploadQC deriva_uploadProcessedFile deriva_uploadOutputBag + deriva_finalizeExecutionRun } bdbag = Channel .fromPath(params.bdbag) @@ -62,7 +63,7 @@ fastqsForce = params.fastqsForce speciesForce = params.speciesForce email = params.email -// Define fixed files and +// Define fixed files and variables replicateExportConfig = Channel.fromPath("${baseDir}/conf/Replicate_For_Input_Bag.json") executionRunExportConfig = Channel.fromPath("${baseDir}/conf/Execution_Run_For_Output_Bag.json") if (params.source == "dev") { @@ -73,11 +74,9 @@ if (params.source == "dev") { source = "www.gudmap.org" } if (params.refSource == "biohpc") { - referenceBase = "/project/BICF/BICF_Core/shared/gudmap/references" -//} else if (params.refSource == "aws") { -// referenceBase = "s3://bicf-references" + referenceBase = "/project/BICF/BICF_Core/shared/gudmap/references/new" } else if (params.refSource == "datahub") { - referenceBase = "dev.gudmap.org" + referenceBase = "www.gudmap.org" } referenceInfer = Channel.fromList(["ERCC","GRCh","GRCm"]) multiqcConfig = Channel.fromPath("${baseDir}/conf/multiqc_config.yaml") @@ -95,7 +94,8 @@ script_calculateTPM = Channel.fromPath("${baseDir}/scripts/calculateTPM.R") script_convertGeneSymbols = Channel.fromPath("${baseDir}/scripts/convertGeneSymbols.R") script_tinHist = Channel.fromPath("${baseDir}/scripts/tin_hist.py") script_uploadInputBag = Channel.fromPath("${baseDir}/scripts/upload_input_bag.py") -script_uploadExecutionRun = Channel.fromPath("${baseDir}/scripts/upload_execution_run.py") +script_uploadExecutionRun_uploadExecutionRun = Channel.fromPath("${baseDir}/scripts/upload_execution_run.py") +script_uploadExecutionRun_finalizeExecutionRun = Channel.fromPath("${baseDir}/scripts/upload_execution_run.py") script_uploadQC = Channel.fromPath("${baseDir}/scripts/upload_qc.py") script_uploadOutputBag = Channel.fromPath("${baseDir}/scripts/upload_output_bag.py") script_deleteEntry_uploadQC = Channel.fromPath("${baseDir}/scripts/delete_entry.py") @@ -105,7 +105,7 @@ script_deleteEntry_uploadProcessedFile = Channel.fromPath("${baseDir}/scripts/de * trackStart: track start of pipeline */ process trackStart { - container 'docker://bicf/bicfbase:2.1.0' + container 'docker://gudmaprbk/gudmap-rbk_base:1.0.0' script: """ hostname @@ -139,6 +139,7 @@ Human Reference Version: ${params.refHuVersion} ERCC Reference Version : ${params.refERCCVersion} Reference source : ${params.refSource} Output Directory : ${params.outDir} +Upload : ${upload} ------------------------------------ Nextflow Version : ${workflow.nextflow.version} Pipeline Version : ${workflow.manifest.version} @@ -348,12 +349,32 @@ metadata_fl.splitCsv(sep: ",", header: false).separate( ) // Replicate metadata for multiple process inputs +endsMeta.into { + endsMeta_checkMetadata + endsMeta_aggrQC + endsMeta_finalizeExecutionRun +} endsManual.into { endsManual_trimData endsManual_downsampleData endsManual_alignSampleData endsManual_aggrQC } +strandedMeta.into { + strandedMeta_checkMetadata + strandedMeta_aggrQC + strandedMeta_finalizeExecutionRun +} +spikeMeta.into { + spikeMeta_checkMetadata + spikeMeta_aggrQC + spikeMeta_finalizeExecutionRun +} +speciesMeta.into { + speciesMeta_checkMetadata + speciesMeta_aggrQC + speciesMeta_finalizeExecutionRun +} studyRID.into { studyRID_aggrQC studyRID_uploadInputBag @@ -365,7 +386,6 @@ expRID.into { expRID_uploadProcessedFile } - /* * trimData: trims any adapter or non-host sequences from the data */ @@ -464,23 +484,14 @@ process getRefInfer { echo -e "LOG: ERROR - References could not be set!\nReference found: ${referenceBase}" >> ${repRID}.${refName}.getRefInfer.log exit 1 fi - mkdir ${refName} # retreive appropriate reference appropriate location echo -e "LOG: fetching ${refName} reference files from ${referenceBase}" >> ${repRID}.${refName}.getRefInfer.log - if [ ${referenceBase} == "/project/BICF/BICF_Core/shared/gudmap/references" ] + if [ ${referenceBase} == "/project/BICF/BICF_Core/shared/gudmap/references/new" ] then - ln -s "\${references}"/hisat2 - ln -s "\${references}"/bed ${refName}/bed - ln -s "\${references}"/genome.fna - ln -s "\${references}"/genome.gtf - #elif [ ${referenceBase} == "s3://bicf-references" ] - #then - # aws s3 cp "\${references}"/hisat2 ./hisat2 --recursive - # aws s3 cp "\${references}"/bed ./${refName}/bed --recursive - # aws s3 cp "\${references}"/genome.fna ./ - # aws s3 cp "\${references}"/genome.gtf ./ - elif [ ${referenceBase} == "dev.gudmap.org" ] + unzip \${references}.zip + mv \$(basename \${references})/data/* . + elif [ params.refSource == "datahub" ] then GRCv=\$(echo \${references} | grep -o ${refName}.* | cut -d '.' -f1) GRCp=\$(echo \${references} | grep -o ${refName}.* | cut -d '.' -f2) @@ -502,19 +513,14 @@ process getRefInfer { unzip \$(basename \${refURL}) mv \${fName}/data/* . fi - echo -e "LOG: fetched" >> ${repRID}.${refName}.getRefInfer.log - - # make blank bed folder for ERCC - echo -e "LOG: making dummy bed folder for ERCC" >> ${repRID}.${refName}.getRefInfer.log - if [ "${refName}" == "ERCC" ] - then - rm -rf ${refName}/bed - mkdir ${refName}/bed - touch ${refName}/bed/temp - elif [ ${referenceBase} == "dev.gudmap.org" ] + mv ./annotation/genome.gtf . + mv ./sequence/genome.fna . + mkdir ${refName} + if [ "${refName}" != "ERCC" ] then - mv bed ${refName}/ + mv ./annotation/genome.bed ./${refName} fi + echo -e "LOG: fetched" >> ${repRID}.${refName}.getRefInfer.log """ } @@ -650,12 +656,12 @@ process inferMetadata { then species="Homo sapiens" bam="GRCh.sampled.sorted.bam" - bed="./GRCh/bed/genome.bed" + bed="./GRCh/genome.bed" elif [ 1 -eq \$(echo \$(expr \${align_mo} ">=" 40)) ] && [ 1 -eq \$(echo \$(expr \${align_hu} "<" 40)) ] then species="Mus musculus" bam="GRCm.sampled.sorted.bam" - bed="./GRCm/bed/genome.bed" + bed="./GRCm/genome.bed" else echo -e "LOG: ERROR - inference of species returns an ambiguous result: hu=\${align_hu} mo=\${align_mo}" >> ${repRID}.inferMetadata.log if [ "${speciesForce}" == "" ] @@ -670,11 +676,11 @@ process inferMetadata { if [ "${speciesForce}" == "Homo sapiens" ] then bam="GRCh.sampled.sorted.bam" - bed="./GRCh/bed/genome.bed" + bed="./GRCh/genome.bed" elif [ "${speciesForce}" == "Mus musculus" ] then bam="GRCm.sampled.sorted.bam" - bed="./GRCm/bed/genome.bed" + bed="./GRCm/genome.bed" fi fi echo -e "LOG: inference of species results in: \${species}" >> ${repRID}.inferMetadata.log @@ -741,30 +747,301 @@ inferMetadata_fl.splitCsv(sep: ",", header: false).separate( // Replicate metadata for multiple process inputs endsInfer.into { + endsInfer_checkMetadata endsInfer_alignData endsInfer_countData endsInfer_dataQC endsInfer_aggrQC endsInfer_uploadQC + endsInfer_finalizeExecutionRun } strandedInfer.into { + strandedInfer_checkMetadata strandedInfer_alignData strandedInfer_countData strandedInfer_aggrQC strandedInfer_uploadQC + strandedInfer_finalizeExecutionRun } spikeInfer.into{ + spikeInfer_checkMetadata spikeInfer_getRef spikeInfer_aggrQC spikeInfer_uploadExecutionRun + spikeInfer_finalizeExecutionRun } speciesInfer.into { + speciesInfer_checkMetadata speciesInfer_getRef speciesInfer_aggrQC speciesInfer_uploadExecutionRun speciesInfer_uploadProcessedFile + speciesInfer_finalizeExecutionRun +} + +/* + * checkMetadata: checks the submitted metada against infered +*/ +process checkMetadata { + tag "${repRID}" + + input: + val endsMeta from endsMeta_checkMetadata + val strandedMeta from strandedMeta_checkMetadata + val spikeMeta from spikeMeta_checkMetadata + val speciesMeta from speciesMeta_checkMetadata + val endsInfer from endsInfer_checkMetadata + val strandedInfer from strandedInfer_checkMetadata + val spikeInfer from spikeInfer_checkMetadata + val speciesInfer from speciesInfer_checkMetadata + + output: + path ("check.csv") into checkMetadata_fl + path ("outputBagRID.csv") optional true into outputBagRID_fl_dummy + + script: + """ + hostname > ${repRID}.checkMetadata.log + ulimit -a >> ${repRID}.checkMetadata.log + + pipelineError=false + # check if submitted metadata matches infered + if [ "${endsMeta}" != "${endsInfer}" ] + then + pipelineError=true + pipelineError_ends=true + echo -e "LOG: ends do not match: Submitted=${endsMeta}; Infered=${endsInfer}" >> ${repRID}.checkMetadata.log + else + pipelineError_ends=false + echo -e "LOG: ends matches: Submitted=${endsMeta}; Infered=${endsInfer}" >> ${repRID}.checkMetadata.log + fi + if [ "${strandedMeta}" != "${strandedInfer}" ] + then + if [ "${strandedMeta}" == "unstranded" ] + then + pipelineError=true + pipelineError_stranded=true + echo -e "LOG: stranded does not match: Submitted=${strandedMeta}; Infered=${strandedInfer}" >> ${repRID}.checkMetadata.log + elif [ "${strandedInfer}" != "forward"] || [ "${strandedInfer}" != "reverse" ] + then + pipelineError=true + pipelineError_stranded=true + echo -e "LOG: stranded does not match: Submitted=${strandedMeta}; Infered=${strandedInfer}" >> ${repRID}.checkMetadata.log + else + pipelineError_stranded=false + echo -e "LOG: stranded matches: Submitted=${strandedMeta}; Infered=${strandedInfer}" >> ${repRID}.checkMetadata.log + fi + else + pipelineError_stranded=false + echo -e "LOG: stranded matches: Submitted=${strandedMeta}; Infered=${strandedInfer}" >> ${repRID}.checkMetadata.log + fi + if [ "${spikeMeta}" != "${spikeInfer}" ] + then + pipelineError=true + pipelineError_spike=true + echo -e "LOG: spike does not match: Submitted=${spikeMeta}; Infered=${spikeInfer}" >> ${repRID}.checkMetadata.log + else + pipelineError_spike=false + echo -e "LOG: stranded matches: Submitted=${spikeMeta}; Infered=${spikeInfer}" >> ${repRID}.checkMetadata.log + fi + if [ "${speciesMeta}" != "${speciesInfer}" ] + then + pipelineError=true + pipelineError_species=true + echo -e "LOG: species does not match: Submitted=${speciesMeta}; Infered=${speciesInfer}" >> ${repRID}.checkMetadata.log + else + pipelineError_species=false + echo -e "LOG: species matches: Submitted=${speciesMeta}; Infered=${speciesInfer}" >> ${repRID}.checkMetadata.log + fi + + # create dummy output bag rid if failure + if [ \${pipelineError} == true ] + then + echo "fail" 1>> outputBagRID.csv + fi + + # write checks to file + echo "\${pipelineError},\${pipelineError_ends},\${pipelineError_stranded},\${pipelineError_spike},\${pipelineError_species}" 1>> check.csv + """ +} + +// Split errors into separate channels +pipelineError = Channel.create() +pipelineError_ends = Channel.create() +pipelineError_stranded = Channel.create() +pipelineError_spike = Channel.create() +pipelineError_species = Channel.create() +checkMetadata_fl.splitCsv(sep: ",", header: false).separate( + pipelineError, + pipelineError_ends, + pipelineError_stranded, + pipelineError_spike, + pipelineError_species +) + +// Replicate errors for multiple process inputs +pipelineError.into { + pipelineError_getRef + pipelineError_alignData + pipelineError_dedupData + pipelineError_makeBigWig + pipelineError_countData + pipelineError_fastqc + pipelineError_dataQC + pipelineError_aggrQC + pipelineError_uploadQC + pipelineError_uploadProcessedFile + pipelineError_uploadOutputBag + pipelineError_finalizeExecutionRun } +/* + * uploadInputBag: uploads the input bag +*/ +process uploadInputBag { + tag "${repRID}" + + input: + path script_uploadInputBag + path credential, stageAs: "credential.json" from deriva_uploadInputBag + path inputBag from inputBag_uploadInputBag + val studyRID from studyRID_uploadInputBag + + output: + path ("inputBagRID.csv") into inputBagRID_fl + + when: + upload + + script: + """ + hostname > ${repRID}.uploadInputBag.log + ulimit -a >> ${repRID}.uploadInputBag.log + + yr=\$(date +'%Y') + mn=\$(date +'%m') + dy=\$(date +'%d') + + file=\$(basename -a ${inputBag}) + md5=\$(md5sum ./\${file} | awk '{ print \$1 }') + echo LOG: ${repRID} input bag md5 sum - \${md5} >> ${repRID}.uploadInputBag.log + size=\$(wc -c < ./\${file}) + echo LOG: ${repRID} input bag size - \${size} bytes >> ${repRID}.uploadInputBag.log + + exist=\$(curl -s https://${source}/ermrest/catalog/2/entity/RNASeq:Input_Bag/File_MD5=\${md5}) + if [ "\${exist}" == "[]" ] + then + cookie=\$(cat credential.json | grep -A 1 '\\"${source}\\": {' | grep -o '\\"cookie\\": \\".*\\"') + cookie=\${cookie:11:-1} + + loc=\$(deriva-hatrac-cli --host ${source} put ./\${file} /hatrac/resources/rnaseq/pipeline/input_bag/study/${studyRID}/replicate/${repRID}/\${file} --parents) + inputBag_rid=\$(python3 ${script_uploadInputBag} -f \${file} -l \${loc} -s \${md5} -b \${size} -o ${source} -c \${cookie}) + echo LOG: input bag RID uploaded - \${inputBag_rid} >> ${repRID}.uploadInputBag.log + rid=\${inputBag_rid} + else + exist=\$(echo \${exist} | grep -o '\\"RID\\":\\".*\\",\\"RCT') + exist=\${exist:7:-6} + echo LOG: input bag RID already exists - \${exist} >> ${repRID}.uploadInputBag.log + rid=\${exist} + fi + + echo \${rid} > inputBagRID.csv + """ +} + +// Extract input bag RID into channel +inputBagRID = Channel.create() +inputBagRID_fl.splitCsv(sep: ",", header: false).separate( + inputBagRID +) + +// Replicate input bag RID for multiple process inputs +inputBagRID.into { + inputBagRID_uploadExecutionRun + inputBagRID_finalizeExecutionRun +} + +/* + * uploadExecutionRun: uploads the execution run +*/ +process uploadExecutionRun { + tag "${repRID}" + + input: + path script_uploadExecutionRun_uploadExecutionRun + path credential, stageAs: "credential.json" from deriva_uploadExecutionRun + val spike from spikeInfer_uploadExecutionRun + val species from speciesInfer_uploadExecutionRun + val inputBagRID from inputBagRID_uploadExecutionRun + + output: + path ("executionRunRID.csv") into executionRunRID_fl + + when: + upload + + script: + """ + hostname > ${repRID}.uploadExecutionRun.log + ulimit -a >> ${repRID}.uploadExecutionRun.log + + echo LOG: searching for workflow RID - BICF mRNA ${workflow.manifest.version} >> ${repRID}.uploadExecutionRun.log + workflow=\$(curl -s https://${source}/ermrest/catalog/2/entity/RNASeq:Workflow/Name=BICF%20mRNA%20Replicate/Version=${workflow.manifest.version}) + workflow=\$(echo \${workflow} | grep -o '\\"RID\\":\\".*\\",\\"RCT') + workflow=\${workflow:7:-6} + echo LOG: workflow RID extracted - \${workflow} >> ${repRID}.uploadExecutionRun.log + + if [ "${species}" == "Homo sapiens" ] + then + genomeName=\$(echo GRCh${refHuVersion}) + elif [ "${species}" == "Mus musculus" ] + then + genomeName=\$(echo GRCm${refMoVersion}) + fi + if [ "${spike}" == "yes" ] + then + genomeName=\$(echo \${genomeName}-S) + fi + echo LOG: searching for genome name - \${genomeName} >> ${repRID}.uploadExecutionRun.log + genome=\$(curl -s https://${source}/ermrest/catalog/2/entity/RNASeq:Reference_Genome/Name=\${genomeName}) + genome=\$(echo \${genome} | grep -o '\\"RID\\":\\".*\\",\\"RCT') + genome=\${genome:7:-6} + echo LOG: genome RID extracted - \${genome} >> ${repRID}.uploadExecutionRun.log + + cookie=\$(cat credential.json | grep -A 1 '\\"${source}\\": {' | grep -o '\\"cookie\\": \\".*\\"') + cookie=\${cookie:11:-1} + + exist=\$(curl -s https://${source}/ermrest/catalog/2/entity/RNASeq:Execution_Run/Workflow=\${workflow}/Replicate=${repRID}/Input_Bag=${inputBagRID}) + echo \${exist} >> ${repRID}.uploadExecutionRun.log + if [ "\${exist}" == "[]" ] + then + executionRun_rid=\$(python3 ${script_uploadExecutionRun_uploadExecutionRun} -r ${repRID} -w \${workflow} -g \${genome} -i ${inputBagRID} -s In-progress -d 'Run in process' -o ${source} -c \${cookie} -u F) + echo LOG: execution run RID uploaded - \${executionRun_rid} >> ${repRID}.uploadExecutionRun.log + else + rid=\$(echo \${exist} | grep -o '\\"RID\\":\\".*\\",\\"RCT') + rid=\${rid:7:-6} + echo \${rid} >> ${repRID}.uploadExecutionRun.log + executionRun_rid=\$(python3 ${script_uploadExecutionRun_uploadExecutionRun} -r ${repRID} -w \${workflow} -g \${genome} -i ${inputBagRID} -s In-progress -d 'Run in process' -o ${source} -c \${cookie} -u \${rid}) + echo LOG: execution run RID updated - \${executionRun_rid} >> ${repRID}.uploadExecutionRun.log + fi + + echo \${executionRun_rid} > executionRunRID.csv + """ +} + +// Extract execution run RID into channel +executionRunRID = Channel.create() +executionRunRID_fl.splitCsv(sep: ",", header: false).separate( + executionRunRID +) + +// Replicate execution run RID for multiple process inputs +executionRunRID.into { + executionRunRID_uploadQC + executionRunRID_uploadProcessedFile + executionRunRID_uploadOutputBag + executionRunRID_finalizeExecutionRun +} /* * getRef: downloads appropriate reference @@ -777,9 +1054,13 @@ process getRef { path credential, stageAs: "credential.json" from deriva_getRef val spike from spikeInfer_getRef val species from speciesInfer_getRef + val pipelineError_getRef output: - tuple path ("hisat2", type: 'dir'), path ("bed", type: 'dir'), path ("*.fna"), path ("*.gtf"), path ("geneID.tsv"), path ("Entrez.tsv") into reference + tuple path ("hisat2", type: 'dir'), path ("*.bed"), path ("*.fna"), path ("*.gtf"), path ("geneID.tsv"), path ("Entrez.tsv") into reference + + when: + pipelineError_getRef == "false" script: """ @@ -807,34 +1088,21 @@ process getRef { fi if [ "${spike}" == "yes" ] then - references=\$(echo \${reference}-S/) + references=\$(echo \${reference}-S) elif [ "${spike}" == "no" ] then - reference=\$(echo \${references}/) + reference=\$(echo \${references}) fi echo -e "LOG: species set to \${references}" >> ${repRID}.getRef.log # retreive appropriate reference appropriate location echo -e "LOG: fetching ${species} reference files from ${referenceBase}" >> ${repRID}.getRef.log - if [ ${referenceBase} == "/project/BICF/BICF_Core/shared/gudmap/references" ] + if [ ${referenceBase} == "/project/BICF/BICF_Core/shared/gudmap/references/new" ] then echo -e "LOG: grabbing reference files from local (BioHPC)" >> ${repRID}.getRef.log - ln -s "\${references}"/hisat2 - ln -s "\${references}"/bed - ln -s "\${references}"/genome.fna - ln -s "\${references}"/genome.gtf - ln -s "\${references}"/geneID.tsv - ln -s "\${references}"/Entrez.tsv - #elif [ ${referenceBase} == "s3://bicf-references" ] - #then - # echo -e "LOG: grabbing reference files from S3" >> ${repRID}.getRef.log - # aws s3 cp "\${references}"/hisat2 ./hisat2 --recursive - # aws s3 cp "\${references}"/bed ./bed --recursive - # aws s3 cp "\${references}"/genome.fna ./ - # aws s3 cp "\${references}"/genome.gtf ./ - # aws s3 cp "\${references}"/geneID.tsv ./ - # aws s3 cp "\${references}"/Entrez.tsv ./ - elif [ ${referenceBase} == "dev.gudmap.org" ] + unzip \${reference}.zip + mv \$(basename \${reference})/data/* . + elif [ arams.refSource == "datahub" ] then echo -e "LOG: grabbing reference files from datahub" >> ${repRID}.getRef.log GRCv=\$(echo \${references} | grep -o \${refName}.* | cut -d '.' -f1) @@ -853,6 +1121,12 @@ process getRef { mv \${fName}/data/* . fi echo -e "LOG: fetched" >> ${repRID}.getRef.log + + mv ./annotation/genome.gtf . + mv ./sequence/genome.fna . + mv ./annotation/genome.bed . + mv ./metadata/Entrez.tsv . + mv ./metadata/geneID.tsv . """ } @@ -874,11 +1148,15 @@ process alignData { path reference_alignData val ends from endsInfer_alignData val stranded from strandedInfer_alignData + val pipelineError_alignData output: tuple path ("${repRID}.sorted.bam"), path ("${repRID}.sorted.bam.bai") into rawBam path ("*.alignSummary.txt") into alignQC + when: + pipelineError_alignData == "false" + script: """ hostname > ${repRID}.align.log @@ -941,12 +1219,16 @@ process dedupData { input: tuple path (bam), path (bai) from rawBam_dedupData + val pipelineError_dedupData output: tuple path ("${repRID}_sorted.deduped.bam"), path ("${repRID}_sorted.deduped.bam.bai") into dedupBam tuple path ("${repRID}_sorted.deduped.*.bam"), path ("${repRID}_sorted.deduped.*.bam.bai") into dedupChrBam path ("*.deduped.Metrics.txt") into dedupQC + when: + pipelineError_dedupData == 'false' + script: """ hostname > ${repRID}.dedup.log @@ -990,10 +1272,14 @@ process makeBigWig { input: tuple path (bam), path (bai) from dedupBam_makeBigWig + val pipelineError_makeBigWig output: path ("${repRID}_sorted.deduped.bw") into bigwig + when: + pipelineError_makeBigWig == 'false' + script: """ hostname > ${repRID}.makeBigWig.log @@ -1020,12 +1306,16 @@ process countData { path ref from reference_countData val ends from endsInfer_countData val stranded from strandedInfer_countData + val pipelineError_countData output: path ("*_tpmTable.csv") into counts path ("*_countData.summary") into countsQC path ("assignedReads.csv") into assignedReadsInfer_fl + when: + pipelineError_countData == 'false' + script: """ hostname > ${repRID}.countData.log @@ -1091,11 +1381,15 @@ process fastqc { input: path (fastq) from fastqs_fastqc + val pipelineError_fastqc output: path ("*_fastqc.zip") into fastqc path ("rawReads.csv") into rawReadsInfer_fl + when: + pipelineError_fastqc == 'false' + script: """ hostname > ${repRID}.fastqc.log @@ -1134,12 +1428,16 @@ process dataQC { tuple path (bam), path (bai) from dedupBam_dataQC tuple path (chrBam), path (chrBai) from dedupChrBam val ends from endsInfer_dataQC + val pipelineError_dataQC output: path "${repRID}_tin.hist.tsv" into tinHist path "${repRID}_tin.med.csv" into tinMedInfer_fl path "${repRID}_insertSize.inner_distance_freq.txt" into innerDistance + when: + pipelineError_dataQC == 'false' + script: """ hostname > ${repRID}.dataQC.log @@ -1147,8 +1445,8 @@ process dataQC { # calcualte TIN values per feature on each chromosome echo -e "geneID\tchrom\ttx_start\ttx_end\tTIN" > ${repRID}_sorted.deduped.tin.xls - for i in `cat ./bed/genome.bed | cut -f1 | sort | uniq`; do - echo "echo \"LOG: running tin.py on \${i}\" >> ${repRID}.dataQC.log; tin.py -i ${repRID}_sorted.deduped.\${i}.bam -r ./bed/genome.bed; cat ${repRID}_sorted.deduped.\${i}.tin.xls | tr -s \"\\w\" \"\\t\" | grep -P \\\"\\\\t\${i}\\\\t\\\";"; + for i in `cat ./genome.bed | cut -f1 | grep -o chr.* | sort | uniq`; do + echo "echo \"LOG: running tin.py on \${i}\" >> ${repRID}.dataQC.log; tin.py -i ${repRID}_sorted.deduped.\${i}.bam -r ./genome.bed; cat ${repRID}_sorted.deduped.\${i}.tin.xls | tr -s \"\\w\" \"\\t\" | grep -P \\\"\\\\t\${i}\\\\t\\\";"; done | parallel -j `nproc` -k 1>> ${repRID}_sorted.deduped.tin.xls # bin TIN values @@ -1160,7 +1458,7 @@ process dataQC { if [ "${ends}" == "pe" ] then echo -e "LOG: calculating inner distances for ${ends}" >> ${repRID}.dataQC.log - inner_distance.py -i "${bam}" -o ${repRID}_insertSize -r ./bed/genome.bed + inner_distance.py -i "${bam}" -o ${repRID}_insertSize -r ./genome.bed echo -e "LOG: calculated" >> ${repRID}.dataQC.log elif [ "${ends}" == "se" ] then @@ -1199,10 +1497,10 @@ process aggrQC { path alignSampleQCs from alignSampleQC_aggrQC.collect() path inferExperiment val endsManual from endsManual_aggrQC - val endsM from endsMeta - val strandedM from strandedMeta - val spikeM from spikeMeta - val speciesM from speciesMeta + val endsM from endsMeta_aggrQC + val strandedM from strandedMeta_aggrQC + val spikeM from spikeMeta_aggrQC + val speciesM from speciesMeta_aggrQC val endsI from endsInfer_aggrQC val strandedI from strandedInfer_aggrQC val spikeI from spikeInfer_aggrQC @@ -1214,11 +1512,15 @@ process aggrQC { val tinMedI from tinMedInfer val studyRID from studyRID_aggrQC val expRID from expRID_aggrQC + val pipelineError_aggrQC output: path "${repRID}.multiqc.html" into multiqc path "${repRID}.multiqc_data.json" into multiqcJSON + when: + pipelineError_aggrQC == 'false' + script: """ hostname > ${repRID}.aggrQC.log @@ -1285,147 +1587,6 @@ process aggrQC { """ } -/* - * uploadInputBag: uploads the input bag -*/ -process uploadInputBag { - tag "${repRID}" - - input: - path script_uploadInputBag - path credential, stageAs: "credential.json" from deriva_uploadInputBag - path inputBag from inputBag_uploadInputBag - val studyRID from studyRID_uploadInputBag - - output: - path ("inputBagRID.csv") into inputBagRID_fl - - when: - upload - - script: - """ - hostname > ${repRID}.uploadInputBag.log - ulimit -a >> ${repRID}.uploadInputBag.log - - yr=\$(date +'%Y') - mn=\$(date +'%m') - dy=\$(date +'%d') - - file=\$(basename -a ${inputBag}) - md5=\$(md5sum ./\${file} | awk '{ print \$1 }') - echo LOG: ${repRID} input bag md5 sum - \${md5} >> ${repRID}.uploadInputBag.log - size=\$(wc -c < ./\${file}) - echo LOG: ${repRID} input bag size - \${size} bytes >> ${repRID}.uploadInputBag.log - - exist=\$(curl -s https://${source}/ermrest/catalog/2/entity/RNASeq:Input_Bag/File_MD5=\${md5}) - if [ "\${exist}" == "[]" ] - then - cookie=\$(cat credential.json | grep -A 1 '\\"${source}\\": {' | grep -o '\\"cookie\\": \\".*\\"') - cookie=\${cookie:11:-1} - - loc=\$(deriva-hatrac-cli --host ${source} put ./\${file} /hatrac/resources/rnaseq/pipeline/input_bag/study/${studyRID}/replicate/${repRID}/\${file} --parents) - inputBag_rid=\$(python3 ${script_uploadInputBag} -f \${file} -l \${loc} -s \${md5} -b \${size} -o ${source} -c \${cookie}) - echo LOG: input bag RID uploaded - \${inputBag_rid} >> ${repRID}.uploadInputBag.log - rid=\${inputBag_rid} - else - exist=\$(echo \${exist} | grep -o '\\"RID\\":\\".*\\",\\"RCT') - exist=\${exist:7:-6} - echo LOG: input bag RID already exists - \${exist} >> ${repRID}.uploadInputBag.log - rid=\${exist} - fi - - echo \${rid} > inputBagRID.csv - """ -} - -// Extract input bag RID into channel -inputBagRID = Channel.create() -inputBagRID_fl.splitCsv(sep: ",", header: false).separate( - inputBagRID -) - -/* - * uploadExecutionRun: uploads the execution run -*/ -process uploadExecutionRun { - tag "${repRID}" - - input: - path script_uploadExecutionRun - path credential, stageAs: "credential.json" from deriva_uploadExecutionRun - val spike from spikeInfer_uploadExecutionRun - val species from speciesInfer_uploadExecutionRun - val inputBagRID - - output: - path ("executionRunRID.csv") into executionRunRID_fl - - when: - upload - - script: - """ - hostname > ${repRID}.uploadExecutionRun.log - ulimit -a >> ${repRID}.uploadExecutionRun.log - - echo LOG: searching for workflow RID - BICF mRNA ${workflow.manifest.version} >> ${repRID}.uploadExecutionRun.log - workflow=\$(curl -s https://${source}/ermrest/catalog/2/entity/RNASeq:Workflow/Name=BICF%20mRNA%20Replicate/Version=${workflow.manifest.version}) - workflow=\$(echo \${workflow} | grep -o '\\"RID\\":\\".*\\",\\"RCT') - workflow=\${workflow:7:-6} - echo LOG: workflow RID extracted - \${workflow} >> ${repRID}.uploadExecutionRun.log - - if [ "${species}" == "Homo sapiens" ] - then - genomeName=\$(echo GRCh${refHuVersion}) - elif [ "${species}" == "Mus musculus" ] - then - genomeName=\$(echo GRCm${refMoVersion}) - fi - if [ "${spike}" == "yes" ] - then - genomeName=\$(echo \${genomeName}-S) - fi - echo LOG: searching for genome name - \${genomeName} >> ${repRID}.uploadExecutionRun.log - genome=\$(curl -s https://${source}/ermrest/catalog/2/entity/RNASeq:Reference_Genome/Name=\${genomeName}_indev) - genome=\$(echo \${genome} | grep -o '\\"RID\\":\\".*\\",\\"RCT') - genome=\${genome:7:-6} - echo LOG: genome RID extracted - \${genome} >> ${repRID}.uploadExecutionRun.log - - cookie=\$(cat credential.json | grep -A 1 '\\"${source}\\": {' | grep -o '\\"cookie\\": \\".*\\"') - cookie=\${cookie:11:-1} - - exist=\$(curl -s https://${source}/ermrest/catalog/2/entity/RNASeq:Execution_Run/Workflow=\${workflow}/Replicate=${repRID}/Input_Bag=${inputBagRID}) - echo \${exist} >> ${repRID}.uploadExecutionRun.log - if [ "\${exist}" == "[]" ] - then - executionRun_rid=\$(python3 ${script_uploadExecutionRun} -r ${repRID} -w \${workflow} -g \${genome} -i ${inputBagRID} -s In-progress -d 'Run in process' -o ${source} -c \${cookie} -u F) - echo LOG: execution run RID uploaded - \${executionRun_rid} >> ${repRID}.uploadExecutionRun.log - else - rid=\$(echo \${exist} | grep -o '\\"RID\\":\\".*\\",\\"RCT') - rid=\${rid:7:-6} - echo \${rid} >> ${repRID}.uploadExecutionRun.log - executionRun_rid=\$(python3 ${script_uploadExecutionRun} -r ${repRID} -w \${workflow} -g \${genome} -i ${inputBagRID} -s In-progress -d 'Run in process' -o ${source} -c \${cookie} -u \${rid}) - echo LOG: execution run RID updated - \${executionRun_rid} >> ${repRID}.uploadExecutionRun.log - fi - - echo \${executionRun_rid} > executionRunRID.csv - """ -} - -// Extract execution run RID into channel -executionRunRID = Channel.create() -executionRunRID_fl.splitCsv(sep: ",", header: false).separate( - executionRunRID -) - -// -executionRunRID.into { - executionRunRID_uploadQC - executionRunRID_uploadProcessedFile - executionRunRID_uploadOutputBag -} - /* * uploadQC: uploads the mRNA QC */ @@ -1442,13 +1603,14 @@ process uploadQC { val length from readLengthInfer_uploadQC val rawCount from rawReadsInfer_uploadQC val finalCount from assignedReadsInfer_uploadQC - - + val pipelineError_uploadQC + output: path ("qcRID.csv") into qcRID_fl when: upload + pipelineError_uploadQC == 'false' script: """ @@ -1511,12 +1673,14 @@ process uploadProcessedFile { val studyRID from studyRID_uploadProcessedFile val expRID from expRID_uploadProcessedFile val executionRunRID from executionRunRID_uploadProcessedFile + val pipelineError_uploadProcessedFile output: path ("${repRID}_Output_Bag.zip") into outputBag when: upload + pipelineError_uploadProcessedFile == 'false' script: """ @@ -1595,12 +1759,14 @@ process uploadOutputBag { path outputBag val studyRID from studyRID_uploadOutputBag val executionRunRID from executionRunRID_uploadOutputBag + val pipelineError_uploadOutputBag output: path ("outputBagRID.csv") into outputBagRID_fl when: upload + pipelineError_uploadOutputBag == 'false' script: """ @@ -1616,11 +1782,11 @@ process uploadOutputBag { echo LOG: ${repRID} output bag md5 sum - \${md5} >> ${repRID}.uploadOutputBag.log size=\$(wc -c < ./\${file}) echo LOG: ${repRID} output bag size - \${size} bytes >> ${repRID}.uploadOutputBag.log - + exist=\$(curl -s https://${source}/ermrest/catalog/2/entity/RNASeq:Output_Bag/File_MD5=\${md5}) if [ "\${exist}" == "[]" ] then - cookie=\$(cat credential.json | grep -A 1 '\\"${source}\\": {' | grep -o '\\"cookie\\": \\".*\\"') + cookie=\$(cat credential.json | grep -A 1 '\\"${source}\\": {' | grep -o '\\"cookie\\": \\".*\\"') cookie=\${cookie:11:-1} loc=\$(deriva-hatrac-cli --host ${source} put ./\${file} /hatrac/resources/rnaseq/pipeline/output_bag/study/${studyRID}/replicate/${repRID}/\${file} --parents) @@ -1639,10 +1805,82 @@ process uploadOutputBag { } // Extract output bag RID into channel -outputBagRID = Channel.create() +outputBagRID = Channel.value() +outputBagRID_temp = Channel.create() outputBagRID_fl.splitCsv(sep: ",", header: false).separate( - outputBagRID + outputBagRID_temp ) +outputBagRID = outputBagRID_temp + +/* + * finalizeExecutionRun: finalizes the execution run +*/ +process finalizeExecutionRun { + tag "${repRID}" + + input: + path script_uploadExecutionRun_finalizeExecutionRun + path credential, stageAs: "credential.json" from deriva_finalizeExecutionRun + val executionRunRID from executionRunRID_finalizeExecutionRun + val inputBagRID from inputBagRID_finalizeExecutionRun + path outputBagRID + val endsMeta from endsMeta_finalizeExecutionRun + val strandedMeta from strandedMeta_finalizeExecutionRun + val spikeMeta from spikeMeta_finalizeExecutionRun + val speciesMeta from speciesMeta_finalizeExecutionRun + val endsInfer from endsInfer_finalizeExecutionRun + val strandedInfer from strandedInfer_finalizeExecutionRun + val spikeInfer from spikeInfer_finalizeExecutionRun + val speciesInfer from speciesInfer_finalizeExecutionRun + val pipelineError from pipelineError_finalizeExecutionRun + val pipelineError_ends + val pipelineError_stranded + val pipelineError_spike + val pipelineError_species + + when: + upload + + script: + """ + hostname > ${repRID}.finalizeExecutionRun.log + ulimit -a >> ${repRID}.finalizeExecutionRun.log + + executionRun=\$(curl -s https://${source}/ermrest/catalog/2/entity/RNASeq:Execution_Run/RID=${executionRunRID}) + workflow=\$(echo \${executionRun} | grep -o '\\"Workflow\\":.*\\"Reference' | grep -oP '(?<=\\"Workflow\\":\\").*(?=\\",\\"Reference)') + genome=\$(echo \${executionRun} | grep -o '\\"Reference_Genome\\":.*\\"Input_Bag' | grep -oP '(?<=\\"Reference_Genome\\":\\").*(?=\\",\\"Input_Bag)') + + cookie=\$(cat credential.json | grep -A 1 '\\"${source}\\": {' | grep -o '\\"cookie\\": \\".*\\"') + cookie=\${cookie:11:-1} + + if [ ${pipelineError} == false ] + then + rid=\$(python3 ${script_uploadExecutionRun_finalizeExecutionRun} -r ${repRID} -w \${workflow} -g \${genome} -i ${inputBagRID} -s Success -d 'Run Successful' -o ${source} -c \${cookie} -u ${executionRunRID}) + echo LOG: execution run RID marked as successful - \${rid} >> ${repRID}.finalizeExecutionRun.log + else + pipelineError_details=\$(echo "**Submitted metadata does not match infered:** ") + if ${pipelineError_ends} + then + pipelineError_details=\$(echo \${pipelineError_details}"[ Submitted ends = *${endsMeta}* **|** Infered ends = *${endsInfer}* ] ") + fi + if ${pipelineError_stranded} + then + pipelineError_details=\$(echo \${pipelineError_details}"[ Submitted strandedness = *${strandedMeta}* **|** Infered strandedness = *${strandedInfer}* ] ") + fi + if ${pipelineError_spike} + then + pipelineError_details=\$(echo \${pipelineError_details}"[ Submitted spike-in = *${spikeMeta}* **|** Infered spike-in = *${spikeInfer}* ] ") + fi + if ${pipelineError_species} + then + pipelineError_details=\$(echo \${pipelineError_details}"[ Submitted species = *${speciesMeta}* **|** Infered species = *${speciesInfer}* ] ") + fi + printf "\${pipelineError_details}" + rid=\$(python3 ${script_uploadExecutionRun_finalizeExecutionRun} -r ${repRID} -w \${workflow} -g \${genome} -i ${inputBagRID} -s Error -d "\${pipelineError_details}" -o ${source} -c \${cookie} -u ${executionRunRID}) + echo LOG: execution run RID marked as error - \${rid} >> ${repRID}.finalizeExecutionRun.log + fi + """ +} workflow.onError = {