Skip to content
Snippets Groups Projects
Commit 57834f48 authored by Gervaise Henry's avatar Gervaise Henry :cowboy:
Browse files

Add finalize execution run and add new references using bag

parent beecb815
2 merge requests!58Develop,!56Resolve "Detect error in inferMetadata for tracking"
...@@ -40,7 +40,7 @@ To Run: ...@@ -40,7 +40,7 @@ To Run:
* `--refMoVersion` mouse reference version ***(optional, default = 38.p6.vM22)*** * `--refMoVersion` mouse reference version ***(optional, default = 38.p6.vM22)***
* `--refHuVersion` human reference version ***(optional, default = 38.p12.v31)*** * `--refHuVersion` human reference version ***(optional, default = 38.p12.v31)***
* `--refERCCVersion` human reference version ***(optional, default = 92)*** * `--refERCCVersion` human reference version ***(optional, default = 92)***
* `--upload` option to not upload output back to the data-hub ***(optional, default = true)*** * `--upload` option to not upload output back to the data-hub ***(optional, default = false)***
* **true** = upload outputs to the data-hub * **true** = upload outputs to the data-hub
* **false** = do *NOT* upload outputs to the data-hub * **false** = do *NOT* upload outputs to the data-hub
* `-profile` config profile to use ***(optional)***: * `-profile` config profile to use ***(optional)***:
...@@ -69,7 +69,7 @@ To Run: ...@@ -69,7 +69,7 @@ To Run:
* eg: `--speciesForce 'Mus musculus'` * eg: `--speciesForce 'Mus musculus'`
* Tracking parameters ([Tracking Site](http://bicf.pipeline.tracker.s3-website-us-east-1.amazonaws.com/)): * Tracking parameters ([Tracking Site](http://bicf.pipeline.tracker.s3-website-us-east-1.amazonaws.com/)):
* `--ci` boolean (default = false) * `--ci` boolean (default = false)
* `--dev` boolean (default = false) * `--dev` boolean (default = true)
FULL EXAMPLE: FULL EXAMPLE:
------------- -------------
......
...@@ -16,91 +16,99 @@ process { ...@@ -16,91 +16,99 @@ process {
cpus = 1 cpus = 1
memory = '1 GB' memory = '1 GB'
withName: trackStart { withName:trackStart {
cpus = 1 cpus = 1
memory = '1 GB' memory = '1 GB'
} }
withName: getBag { withName:getBag {
cpus = 1 cpus = 1
memory = '1 GB' memory = '1 GB'
} }
withName: getData { withName:getData {
cpus = 1 cpus = 1
memory = '1 GB' memory = '1 GB'
} }
withName: parseMetadata { withName:parseMetadata {
cpus = 15 cpus = 15
memory = '1 GB' memory = '1 GB'
} }
withName: trimData { withName:trimData {
cpus = 20 cpus = 20
memory = '2 GB' memory = '2 GB'
} }
withName: getRefInfer { withName:getRefInfer {
cpus = 1 cpus = 1
memory = '1 GB' memory = '1 GB'
} }
withName: downsampleData { withName:downsampleData {
cpus = 1 cpus = 1
memory = '1 GB' memory = '1 GB'
} }
withName: alignSampleData { withName:alignSampleData {
cpus = 50 cpus = 50
memory = '5 GB' memory = '5 GB'
} }
withName: inferMetadata { withName:inferMetadata {
cpus = 5 cpus = 5
memory = '1 GB' memory = '1 GB'
} }
withName: getRef { withName:checkMetadata {
cpus = 1 cpus = 1
memory = '1 GB' memory = '1 GB'
} }
withName: alignData { withName:getRef {
cpus = 1
memory = '1 GB'
}
withName:alignData {
cpus = 50 cpus = 50
memory = '10 GB' memory = '10 GB'
} }
withName: dedupData { withName:dedupData {
cpus = 5 cpus = 5
memory = '20 GB' memory = '20 GB'
} }
withName: countData { withName:countData {
cpus = 2 cpus = 2
memory = '5 GB' memory = '5 GB'
} }
withName: makeBigWig { withName:makeBigWig {
cpus = 15 cpus = 15
memory = '5 GB' memory = '5 GB'
} }
withName: fastqc { withName:fastqc {
cpus = 1 cpus = 1
memory = '1 GB' memory = '1 GB'
} }
withName: dataQC { withName:dataQC {
cpus = 15 cpus = 15
memory = '2 GB' memory = '2 GB'
} }
withName: aggrQC { withName:aggrQC {
cpus = 2 cpus = 2
memory = '1 GB' memory = '1 GB'
} }
withName: uploadInputBag { withName:uploadInputBag {
cpus = 1
memory = '1 GB'
}
withName:uploadExecutionRun {
cpus = 1 cpus = 1
memory = '1 GB' memory = '1 GB'
} }
withName: uploadExecutionRun { withName:uploadQC {
cpus = 1 cpus = 1
memory = '1 GB' memory = '1 GB'
} }
withName: uploadQC { withName:uploadProcessedFile {
cpus = 1 cpus = 1
memory = '1 GB' memory = '1 GB'
} }
withName: uploadProcessedFile { withName:uploadOutputBag {
cpus = 1 cpus = 1
memory = '1 GB' memory = '1 GB'
} }
withName: uploadOutputBag { withName:finalizeExecutionRun {
cpus = 1 cpus = 1
memory = '1 GB' memory = '1 GB'
} }
......
...@@ -7,70 +7,76 @@ process { ...@@ -7,70 +7,76 @@ process {
queue = 'super' queue = 'super'
clusterOptions = '--hold' clusterOptions = '--hold'
withName: trackStart { withName:trackStart {
executor = 'local' executor = 'local'
} }
withName: getBag { withName:getBag {
executor = 'local' executor = 'local'
} }
withName: getData { withName:getData {
queue = 'super' queue = 'super'
} }
withName: parseMetadata { withName:parseMetadata {
executor = 'local' executor = 'local'
} }
withName: trimData { withName:trimData {
queue = 'super' queue = 'super'
} }
withName: getRefInfer { withName:getRefInfer {
queue = 'super' queue = 'super'
} }
withName: downsampleData { withName:downsampleData {
executor = 'local' executor = 'local'
} }
withName: alignSampleData { withName:alignSampleData {
queue = 'super' queue = 'super'
} }
withName: inferMetadata { withName:inferMetadata {
queue = 'super' queue = 'super'
} }
withName: getRef { withName:checkMetadata {
executor = 'local'
}
withName:getRef {
queue = 'super' queue = 'super'
} }
withName: alignData { withName:alignData {
queue = '256GB,256GBv1' queue = '256GB,256GBv1'
} }
withName: dedupData { withName:dedupData {
queue = 'super' queue = 'super'
} }
withName: countData { withName:countData {
queue = 'super' queue = 'super'
} }
withName: makeBigWig { withName:makeBigWig {
queue = 'super' queue = 'super'
} }
withName: fastqc { withName:fastqc {
queue = 'super' queue = 'super'
} }
withName: dataQC { withName:dataQC {
queue = 'super' queue = 'super'
} }
withName: aggrQC { withName:aggrQC {
executor = 'local'
}
withName:uploadInputBag {
executor = 'local' executor = 'local'
} }
withName: uploadInputBag { withName:uploadExecutionRun {
executor = 'local' executor = 'local'
} }
withName: uploadExecutionRun { withName:uploadQC {
executor = 'local' executor = 'local'
} }
withName: uploadQC { withName:uploadProcessedFile {
executor = 'local' executor = 'local'
} }
withName: uploadProcessedFile { withName:uploadOutputBag {
executor = 'local' executor = 'local'
} }
withName: uploadOutputBag { withName:finalizeExecutionRun {
executor = 'local' executor = 'local'
} }
} }
......
...@@ -25,46 +25,49 @@ process { ...@@ -25,46 +25,49 @@ process {
withName:getData { withName:getData {
container = 'gudmaprbk/deriva1.3:1.0.0' container = 'gudmaprbk/deriva1.3:1.0.0'
} }
withName: parseMetadata { withName:parseMetadata {
container = 'gudmaprbk/python3:1.0.0' container = 'gudmaprbk/python3:1.0.0'
} }
withName: trimData { withName:trimData {
container = 'gudmaprbk/trimgalore0.6.5:1.0.0' container = 'gudmaprbk/trimgalore0.6.5:1.0.0'
} }
withName: getRefInfer { withName:getRefInfer {
container = 'gudmaprbk/deriva1.3:1.0.0' container = 'gudmaprbk/deriva1.3:1.0.0'
} }
withName: downsampleData { withName:downsampleData {
container = 'gudmaprbk/seqtk1.3:1.0.0' container = 'gudmaprbk/seqtk1.3:1.0.0'
} }
withName: alignSampleData { withName:alignSampleData {
container = 'gudmaprbk/hisat2.2.1:1.0.0' container = 'gudmaprbk/hisat2.2.1:1.0.0'
} }
withName: inferMetadata { withName:inferMetadata {
container = 'gudmaprbk/rseqc4.0.0:1.0.0' container = 'gudmaprbk/rseqc4.0.0:1.0.0'
} }
withName: getRef { withName:checkMetadata {
container = 'gudmaprbk/gudmap-rbk_base:1.0.0'
}
withName:getRef {
container = 'gudmaprbk/deriva1.3:1.0.0' container = 'gudmaprbk/deriva1.3:1.0.0'
} }
withName: alignData { withName:alignData {
container = 'gudmaprbk/hisat2.2.1:1.0.0' container = 'gudmaprbk/hisat2.2.1:1.0.0'
} }
withName: dedupData { withName:dedupData {
container = 'gudmaprbk/picard2.23.9:1.0.0' container = 'gudmaprbk/picard2.23.9:1.0.0'
} }
withName: countData { withName:countData {
container = 'gudmaprbk/subread2.0.1:1.0.0' container = 'gudmaprbk/subread2.0.1:1.0.0'
} }
withName: makeBigWig { withName:makeBigWig {
container = 'gudmaprbk/deeptools3.5.0:1.0.0' container = 'gudmaprbk/deeptools3.5.0:1.0.0'
} }
withName: fastqc { withName:fastqc {
container = 'gudmaprbk/fastqc0.11.9:1.0.0' container = 'gudmaprbk/fastqc0.11.9:1.0.0'
} }
withName: dataQC { withName:dataQC {
container = 'gudmaprbk/rseqc4.0.0:1.0.0' container = 'gudmaprbk/rseqc4.0.0:1.0.0'
} }
withName: aggrQC { withName:aggrQC {
container = 'gudmaprbk/multiqc1.9:1.0.0' container = 'gudmaprbk/multiqc1.9:1.0.0'
} }
withName:uploadInputBag { withName:uploadInputBag {
...@@ -82,6 +85,9 @@ process { ...@@ -82,6 +85,9 @@ process {
withName:uploadOutputBag { withName:uploadOutputBag {
container = 'gudmaprbk/deriva1.3:1.0.0' container = 'gudmaprbk/deriva1.3:1.0.0'
} }
withName:finalizeExecutionRun {
container = 'gudmaprbk/deriva1.3:1.0.0'
}
} }
trace { trace {
...@@ -110,6 +116,6 @@ manifest { ...@@ -110,6 +116,6 @@ manifest {
homePage = 'https://git.biohpc.swmed.edu/gudmap_rbk/rna-seq' homePage = 'https://git.biohpc.swmed.edu/gudmap_rbk/rna-seq'
description = 'This pipeline was created to be a standard mRNA-sequencing analysis pipeline which integrates with the GUDMAP and RBK consortium data-hub.' description = 'This pipeline was created to be a standard mRNA-sequencing analysis pipeline which integrates with the GUDMAP and RBK consortium data-hub.'
mainScript = 'rna-seq.nf' mainScript = 'rna-seq.nf'
version = 'v0.0.4_indev' version = 'v0.1.0'
nextflowVersion = '>=19.09.0' nextflowVersion = '>=19.09.0'
} }
...@@ -14,8 +14,8 @@ params.bdbag = "${baseDir}/../test_data/auth/cookies.txt" ...@@ -14,8 +14,8 @@ params.bdbag = "${baseDir}/../test_data/auth/cookies.txt"
//params.repRID = "16-1ZX4" //params.repRID = "16-1ZX4"
params.repRID = "Q-Y5F6" params.repRID = "Q-Y5F6"
params.source = "dev" params.source = "dev"
params.refMoVersion = "38.p6.vM22" params.refMoVersion = "38.p6.vM25"
params.refHuVersion = "38.p12.v31" params.refHuVersion = "38.p13.v36"
params.refERCCVersion = "92" params.refERCCVersion = "92"
params.outDir = "${baseDir}/../output" params.outDir = "${baseDir}/../output"
params.upload = false params.upload = false
...@@ -46,6 +46,7 @@ deriva.into { ...@@ -46,6 +46,7 @@ deriva.into {
deriva_uploadQC deriva_uploadQC
deriva_uploadProcessedFile deriva_uploadProcessedFile
deriva_uploadOutputBag deriva_uploadOutputBag
deriva_finalizeExecutionRun
} }
bdbag = Channel bdbag = Channel
.fromPath(params.bdbag) .fromPath(params.bdbag)
...@@ -62,7 +63,7 @@ fastqsForce = params.fastqsForce ...@@ -62,7 +63,7 @@ fastqsForce = params.fastqsForce
speciesForce = params.speciesForce speciesForce = params.speciesForce
email = params.email email = params.email
// Define fixed files and // Define fixed files and variables
replicateExportConfig = Channel.fromPath("${baseDir}/conf/Replicate_For_Input_Bag.json") replicateExportConfig = Channel.fromPath("${baseDir}/conf/Replicate_For_Input_Bag.json")
executionRunExportConfig = Channel.fromPath("${baseDir}/conf/Execution_Run_For_Output_Bag.json") executionRunExportConfig = Channel.fromPath("${baseDir}/conf/Execution_Run_For_Output_Bag.json")
if (params.source == "dev") { if (params.source == "dev") {
...@@ -73,11 +74,9 @@ if (params.source == "dev") { ...@@ -73,11 +74,9 @@ if (params.source == "dev") {
source = "www.gudmap.org" source = "www.gudmap.org"
} }
if (params.refSource == "biohpc") { if (params.refSource == "biohpc") {
referenceBase = "/project/BICF/BICF_Core/shared/gudmap/references" referenceBase = "/project/BICF/BICF_Core/shared/gudmap/references/new"
//} else if (params.refSource == "aws") {
// referenceBase = "s3://bicf-references"
} else if (params.refSource == "datahub") { } else if (params.refSource == "datahub") {
referenceBase = "dev.gudmap.org" referenceBase = "www.gudmap.org"
} }
referenceInfer = Channel.fromList(["ERCC","GRCh","GRCm"]) referenceInfer = Channel.fromList(["ERCC","GRCh","GRCm"])
multiqcConfig = Channel.fromPath("${baseDir}/conf/multiqc_config.yaml") multiqcConfig = Channel.fromPath("${baseDir}/conf/multiqc_config.yaml")
...@@ -95,7 +94,8 @@ script_calculateTPM = Channel.fromPath("${baseDir}/scripts/calculateTPM.R") ...@@ -95,7 +94,8 @@ script_calculateTPM = Channel.fromPath("${baseDir}/scripts/calculateTPM.R")
script_convertGeneSymbols = Channel.fromPath("${baseDir}/scripts/convertGeneSymbols.R") script_convertGeneSymbols = Channel.fromPath("${baseDir}/scripts/convertGeneSymbols.R")
script_tinHist = Channel.fromPath("${baseDir}/scripts/tin_hist.py") script_tinHist = Channel.fromPath("${baseDir}/scripts/tin_hist.py")
script_uploadInputBag = Channel.fromPath("${baseDir}/scripts/upload_input_bag.py") script_uploadInputBag = Channel.fromPath("${baseDir}/scripts/upload_input_bag.py")
script_uploadExecutionRun = Channel.fromPath("${baseDir}/scripts/upload_execution_run.py") script_uploadExecutionRun_uploadExecutionRun = Channel.fromPath("${baseDir}/scripts/upload_execution_run.py")
script_uploadExecutionRun_finalizeExecutionRun = Channel.fromPath("${baseDir}/scripts/upload_execution_run.py")
script_uploadQC = Channel.fromPath("${baseDir}/scripts/upload_qc.py") script_uploadQC = Channel.fromPath("${baseDir}/scripts/upload_qc.py")
script_uploadOutputBag = Channel.fromPath("${baseDir}/scripts/upload_output_bag.py") script_uploadOutputBag = Channel.fromPath("${baseDir}/scripts/upload_output_bag.py")
script_deleteEntry_uploadQC = Channel.fromPath("${baseDir}/scripts/delete_entry.py") script_deleteEntry_uploadQC = Channel.fromPath("${baseDir}/scripts/delete_entry.py")
...@@ -105,7 +105,7 @@ script_deleteEntry_uploadProcessedFile = Channel.fromPath("${baseDir}/scripts/de ...@@ -105,7 +105,7 @@ script_deleteEntry_uploadProcessedFile = Channel.fromPath("${baseDir}/scripts/de
* trackStart: track start of pipeline * trackStart: track start of pipeline
*/ */
process trackStart { process trackStart {
container 'docker://bicf/bicfbase:2.1.0' container 'docker://gudmaprbk/gudmap-rbk_base:1.0.0'
script: script:
""" """
hostname hostname
...@@ -139,6 +139,7 @@ Human Reference Version: ${params.refHuVersion} ...@@ -139,6 +139,7 @@ Human Reference Version: ${params.refHuVersion}
ERCC Reference Version : ${params.refERCCVersion} ERCC Reference Version : ${params.refERCCVersion}
Reference source : ${params.refSource} Reference source : ${params.refSource}
Output Directory : ${params.outDir} Output Directory : ${params.outDir}
Upload : ${upload}
------------------------------------ ------------------------------------
Nextflow Version : ${workflow.nextflow.version} Nextflow Version : ${workflow.nextflow.version}
Pipeline Version : ${workflow.manifest.version} Pipeline Version : ${workflow.manifest.version}
...@@ -348,12 +349,32 @@ metadata_fl.splitCsv(sep: ",", header: false).separate( ...@@ -348,12 +349,32 @@ metadata_fl.splitCsv(sep: ",", header: false).separate(
) )
// Replicate metadata for multiple process inputs // Replicate metadata for multiple process inputs
endsMeta.into {
endsMeta_checkMetadata
endsMeta_aggrQC
endsMeta_finalizeExecutionRun
}
endsManual.into { endsManual.into {
endsManual_trimData endsManual_trimData
endsManual_downsampleData endsManual_downsampleData
endsManual_alignSampleData endsManual_alignSampleData
endsManual_aggrQC endsManual_aggrQC
} }
strandedMeta.into {
strandedMeta_checkMetadata
strandedMeta_aggrQC
strandedMeta_finalizeExecutionRun
}
spikeMeta.into {
spikeMeta_checkMetadata
spikeMeta_aggrQC
spikeMeta_finalizeExecutionRun
}
speciesMeta.into {
speciesMeta_checkMetadata
speciesMeta_aggrQC
speciesMeta_finalizeExecutionRun
}
studyRID.into { studyRID.into {
studyRID_aggrQC studyRID_aggrQC
studyRID_uploadInputBag studyRID_uploadInputBag
...@@ -365,7 +386,6 @@ expRID.into { ...@@ -365,7 +386,6 @@ expRID.into {
expRID_uploadProcessedFile expRID_uploadProcessedFile
} }
/* /*
* trimData: trims any adapter or non-host sequences from the data * trimData: trims any adapter or non-host sequences from the data
*/ */
...@@ -464,23 +484,14 @@ process getRefInfer { ...@@ -464,23 +484,14 @@ process getRefInfer {
echo -e "LOG: ERROR - References could not be set!\nReference found: ${referenceBase}" >> ${repRID}.${refName}.getRefInfer.log echo -e "LOG: ERROR - References could not be set!\nReference found: ${referenceBase}" >> ${repRID}.${refName}.getRefInfer.log
exit 1 exit 1
fi fi
mkdir ${refName}
# retreive appropriate reference appropriate location # retreive appropriate reference appropriate location
echo -e "LOG: fetching ${refName} reference files from ${referenceBase}" >> ${repRID}.${refName}.getRefInfer.log echo -e "LOG: fetching ${refName} reference files from ${referenceBase}" >> ${repRID}.${refName}.getRefInfer.log
if [ ${referenceBase} == "/project/BICF/BICF_Core/shared/gudmap/references" ] if [ ${referenceBase} == "/project/BICF/BICF_Core/shared/gudmap/references/new" ]
then then
ln -s "\${references}"/hisat2 unzip \${references}.zip
ln -s "\${references}"/bed ${refName}/bed mv \$(basename \${references})/data/* .
ln -s "\${references}"/genome.fna elif [ params.refSource == "datahub" ]
ln -s "\${references}"/genome.gtf
#elif [ ${referenceBase} == "s3://bicf-references" ]
#then
# aws s3 cp "\${references}"/hisat2 ./hisat2 --recursive
# aws s3 cp "\${references}"/bed ./${refName}/bed --recursive
# aws s3 cp "\${references}"/genome.fna ./
# aws s3 cp "\${references}"/genome.gtf ./
elif [ ${referenceBase} == "dev.gudmap.org" ]
then then
GRCv=\$(echo \${references} | grep -o ${refName}.* | cut -d '.' -f1) GRCv=\$(echo \${references} | grep -o ${refName}.* | cut -d '.' -f1)
GRCp=\$(echo \${references} | grep -o ${refName}.* | cut -d '.' -f2) GRCp=\$(echo \${references} | grep -o ${refName}.* | cut -d '.' -f2)
...@@ -502,19 +513,14 @@ process getRefInfer { ...@@ -502,19 +513,14 @@ process getRefInfer {
unzip \$(basename \${refURL}) unzip \$(basename \${refURL})
mv \${fName}/data/* . mv \${fName}/data/* .
fi fi
echo -e "LOG: fetched" >> ${repRID}.${refName}.getRefInfer.log mv ./annotation/genome.gtf .
mv ./sequence/genome.fna .
# make blank bed folder for ERCC mkdir ${refName}
echo -e "LOG: making dummy bed folder for ERCC" >> ${repRID}.${refName}.getRefInfer.log if [ "${refName}" != "ERCC" ]
if [ "${refName}" == "ERCC" ]
then
rm -rf ${refName}/bed
mkdir ${refName}/bed
touch ${refName}/bed/temp
elif [ ${referenceBase} == "dev.gudmap.org" ]
then then
mv bed ${refName}/ mv ./annotation/genome.bed ./${refName}
fi fi
echo -e "LOG: fetched" >> ${repRID}.${refName}.getRefInfer.log
""" """
} }
...@@ -650,12 +656,12 @@ process inferMetadata { ...@@ -650,12 +656,12 @@ process inferMetadata {
then then
species="Homo sapiens" species="Homo sapiens"
bam="GRCh.sampled.sorted.bam" bam="GRCh.sampled.sorted.bam"
bed="./GRCh/bed/genome.bed" bed="./GRCh/genome.bed"
elif [ 1 -eq \$(echo \$(expr \${align_mo} ">=" 40)) ] && [ 1 -eq \$(echo \$(expr \${align_hu} "<" 40)) ] elif [ 1 -eq \$(echo \$(expr \${align_mo} ">=" 40)) ] && [ 1 -eq \$(echo \$(expr \${align_hu} "<" 40)) ]
then then
species="Mus musculus" species="Mus musculus"
bam="GRCm.sampled.sorted.bam" bam="GRCm.sampled.sorted.bam"
bed="./GRCm/bed/genome.bed" bed="./GRCm/genome.bed"
else else
echo -e "LOG: ERROR - inference of species returns an ambiguous result: hu=\${align_hu} mo=\${align_mo}" >> ${repRID}.inferMetadata.log echo -e "LOG: ERROR - inference of species returns an ambiguous result: hu=\${align_hu} mo=\${align_mo}" >> ${repRID}.inferMetadata.log
if [ "${speciesForce}" == "" ] if [ "${speciesForce}" == "" ]
...@@ -670,11 +676,11 @@ process inferMetadata { ...@@ -670,11 +676,11 @@ process inferMetadata {
if [ "${speciesForce}" == "Homo sapiens" ] if [ "${speciesForce}" == "Homo sapiens" ]
then then
bam="GRCh.sampled.sorted.bam" bam="GRCh.sampled.sorted.bam"
bed="./GRCh/bed/genome.bed" bed="./GRCh/genome.bed"
elif [ "${speciesForce}" == "Mus musculus" ] elif [ "${speciesForce}" == "Mus musculus" ]
then then
bam="GRCm.sampled.sorted.bam" bam="GRCm.sampled.sorted.bam"
bed="./GRCm/bed/genome.bed" bed="./GRCm/genome.bed"
fi fi
fi fi
echo -e "LOG: inference of species results in: \${species}" >> ${repRID}.inferMetadata.log echo -e "LOG: inference of species results in: \${species}" >> ${repRID}.inferMetadata.log
...@@ -741,30 +747,301 @@ inferMetadata_fl.splitCsv(sep: ",", header: false).separate( ...@@ -741,30 +747,301 @@ inferMetadata_fl.splitCsv(sep: ",", header: false).separate(
// Replicate metadata for multiple process inputs // Replicate metadata for multiple process inputs
endsInfer.into { endsInfer.into {
endsInfer_checkMetadata
endsInfer_alignData endsInfer_alignData
endsInfer_countData endsInfer_countData
endsInfer_dataQC endsInfer_dataQC
endsInfer_aggrQC endsInfer_aggrQC
endsInfer_uploadQC endsInfer_uploadQC
endsInfer_finalizeExecutionRun
} }
strandedInfer.into { strandedInfer.into {
strandedInfer_checkMetadata
strandedInfer_alignData strandedInfer_alignData
strandedInfer_countData strandedInfer_countData
strandedInfer_aggrQC strandedInfer_aggrQC
strandedInfer_uploadQC strandedInfer_uploadQC
strandedInfer_finalizeExecutionRun
} }
spikeInfer.into{ spikeInfer.into{
spikeInfer_checkMetadata
spikeInfer_getRef spikeInfer_getRef
spikeInfer_aggrQC spikeInfer_aggrQC
spikeInfer_uploadExecutionRun spikeInfer_uploadExecutionRun
spikeInfer_finalizeExecutionRun
} }
speciesInfer.into { speciesInfer.into {
speciesInfer_checkMetadata
speciesInfer_getRef speciesInfer_getRef
speciesInfer_aggrQC speciesInfer_aggrQC
speciesInfer_uploadExecutionRun speciesInfer_uploadExecutionRun
speciesInfer_uploadProcessedFile speciesInfer_uploadProcessedFile
speciesInfer_finalizeExecutionRun
}
/*
* checkMetadata: checks the submitted metada against infered
*/
process checkMetadata {
tag "${repRID}"
input:
val endsMeta from endsMeta_checkMetadata
val strandedMeta from strandedMeta_checkMetadata
val spikeMeta from spikeMeta_checkMetadata
val speciesMeta from speciesMeta_checkMetadata
val endsInfer from endsInfer_checkMetadata
val strandedInfer from strandedInfer_checkMetadata
val spikeInfer from spikeInfer_checkMetadata
val speciesInfer from speciesInfer_checkMetadata
output:
path ("check.csv") into checkMetadata_fl
path ("outputBagRID.csv") optional true into outputBagRID_fl_dummy
script:
"""
hostname > ${repRID}.checkMetadata.log
ulimit -a >> ${repRID}.checkMetadata.log
pipelineError=false
# check if submitted metadata matches infered
if [ "${endsMeta}" != "${endsInfer}" ]
then
pipelineError=true
pipelineError_ends=true
echo -e "LOG: ends do not match: Submitted=${endsMeta}; Infered=${endsInfer}" >> ${repRID}.checkMetadata.log
else
pipelineError_ends=false
echo -e "LOG: ends matches: Submitted=${endsMeta}; Infered=${endsInfer}" >> ${repRID}.checkMetadata.log
fi
if [ "${strandedMeta}" != "${strandedInfer}" ]
then
if [ "${strandedMeta}" == "unstranded" ]
then
pipelineError=true
pipelineError_stranded=true
echo -e "LOG: stranded does not match: Submitted=${strandedMeta}; Infered=${strandedInfer}" >> ${repRID}.checkMetadata.log
elif [ "${strandedInfer}" != "forward"] || [ "${strandedInfer}" != "reverse" ]
then
pipelineError=true
pipelineError_stranded=true
echo -e "LOG: stranded does not match: Submitted=${strandedMeta}; Infered=${strandedInfer}" >> ${repRID}.checkMetadata.log
else
pipelineError_stranded=false
echo -e "LOG: stranded matches: Submitted=${strandedMeta}; Infered=${strandedInfer}" >> ${repRID}.checkMetadata.log
fi
else
pipelineError_stranded=false
echo -e "LOG: stranded matches: Submitted=${strandedMeta}; Infered=${strandedInfer}" >> ${repRID}.checkMetadata.log
fi
if [ "${spikeMeta}" != "${spikeInfer}" ]
then
pipelineError=true
pipelineError_spike=true
echo -e "LOG: spike does not match: Submitted=${spikeMeta}; Infered=${spikeInfer}" >> ${repRID}.checkMetadata.log
else
pipelineError_spike=false
echo -e "LOG: stranded matches: Submitted=${spikeMeta}; Infered=${spikeInfer}" >> ${repRID}.checkMetadata.log
fi
if [ "${speciesMeta}" != "${speciesInfer}" ]
then
pipelineError=true
pipelineError_species=true
echo -e "LOG: species does not match: Submitted=${speciesMeta}; Infered=${speciesInfer}" >> ${repRID}.checkMetadata.log
else
pipelineError_species=false
echo -e "LOG: species matches: Submitted=${speciesMeta}; Infered=${speciesInfer}" >> ${repRID}.checkMetadata.log
fi
# create dummy output bag rid if failure
if [ \${pipelineError} == true ]
then
echo "fail" 1>> outputBagRID.csv
fi
# write checks to file
echo "\${pipelineError},\${pipelineError_ends},\${pipelineError_stranded},\${pipelineError_spike},\${pipelineError_species}" 1>> check.csv
"""
}
// Split errors into separate channels
pipelineError = Channel.create()
pipelineError_ends = Channel.create()
pipelineError_stranded = Channel.create()
pipelineError_spike = Channel.create()
pipelineError_species = Channel.create()
checkMetadata_fl.splitCsv(sep: ",", header: false).separate(
pipelineError,
pipelineError_ends,
pipelineError_stranded,
pipelineError_spike,
pipelineError_species
)
// Replicate errors for multiple process inputs
pipelineError.into {
pipelineError_getRef
pipelineError_alignData
pipelineError_dedupData
pipelineError_makeBigWig
pipelineError_countData
pipelineError_fastqc
pipelineError_dataQC
pipelineError_aggrQC
pipelineError_uploadQC
pipelineError_uploadProcessedFile
pipelineError_uploadOutputBag
pipelineError_finalizeExecutionRun
} }
/*
* uploadInputBag: uploads the input bag
*/
process uploadInputBag {
tag "${repRID}"
input:
path script_uploadInputBag
path credential, stageAs: "credential.json" from deriva_uploadInputBag
path inputBag from inputBag_uploadInputBag
val studyRID from studyRID_uploadInputBag
output:
path ("inputBagRID.csv") into inputBagRID_fl
when:
upload
script:
"""
hostname > ${repRID}.uploadInputBag.log
ulimit -a >> ${repRID}.uploadInputBag.log
yr=\$(date +'%Y')
mn=\$(date +'%m')
dy=\$(date +'%d')
file=\$(basename -a ${inputBag})
md5=\$(md5sum ./\${file} | awk '{ print \$1 }')
echo LOG: ${repRID} input bag md5 sum - \${md5} >> ${repRID}.uploadInputBag.log
size=\$(wc -c < ./\${file})
echo LOG: ${repRID} input bag size - \${size} bytes >> ${repRID}.uploadInputBag.log
exist=\$(curl -s https://${source}/ermrest/catalog/2/entity/RNASeq:Input_Bag/File_MD5=\${md5})
if [ "\${exist}" == "[]" ]
then
cookie=\$(cat credential.json | grep -A 1 '\\"${source}\\": {' | grep -o '\\"cookie\\": \\".*\\"')
cookie=\${cookie:11:-1}
loc=\$(deriva-hatrac-cli --host ${source} put ./\${file} /hatrac/resources/rnaseq/pipeline/input_bag/study/${studyRID}/replicate/${repRID}/\${file} --parents)
inputBag_rid=\$(python3 ${script_uploadInputBag} -f \${file} -l \${loc} -s \${md5} -b \${size} -o ${source} -c \${cookie})
echo LOG: input bag RID uploaded - \${inputBag_rid} >> ${repRID}.uploadInputBag.log
rid=\${inputBag_rid}
else
exist=\$(echo \${exist} | grep -o '\\"RID\\":\\".*\\",\\"RCT')
exist=\${exist:7:-6}
echo LOG: input bag RID already exists - \${exist} >> ${repRID}.uploadInputBag.log
rid=\${exist}
fi
echo \${rid} > inputBagRID.csv
"""
}
// Extract input bag RID into channel
inputBagRID = Channel.create()
inputBagRID_fl.splitCsv(sep: ",", header: false).separate(
inputBagRID
)
// Replicate input bag RID for multiple process inputs
inputBagRID.into {
inputBagRID_uploadExecutionRun
inputBagRID_finalizeExecutionRun
}
/*
* uploadExecutionRun: uploads the execution run
*/
process uploadExecutionRun {
tag "${repRID}"
input:
path script_uploadExecutionRun_uploadExecutionRun
path credential, stageAs: "credential.json" from deriva_uploadExecutionRun
val spike from spikeInfer_uploadExecutionRun
val species from speciesInfer_uploadExecutionRun
val inputBagRID from inputBagRID_uploadExecutionRun
output:
path ("executionRunRID.csv") into executionRunRID_fl
when:
upload
script:
"""
hostname > ${repRID}.uploadExecutionRun.log
ulimit -a >> ${repRID}.uploadExecutionRun.log
echo LOG: searching for workflow RID - BICF mRNA ${workflow.manifest.version} >> ${repRID}.uploadExecutionRun.log
workflow=\$(curl -s https://${source}/ermrest/catalog/2/entity/RNASeq:Workflow/Name=BICF%20mRNA%20Replicate/Version=${workflow.manifest.version})
workflow=\$(echo \${workflow} | grep -o '\\"RID\\":\\".*\\",\\"RCT')
workflow=\${workflow:7:-6}
echo LOG: workflow RID extracted - \${workflow} >> ${repRID}.uploadExecutionRun.log
if [ "${species}" == "Homo sapiens" ]
then
genomeName=\$(echo GRCh${refHuVersion})
elif [ "${species}" == "Mus musculus" ]
then
genomeName=\$(echo GRCm${refMoVersion})
fi
if [ "${spike}" == "yes" ]
then
genomeName=\$(echo \${genomeName}-S)
fi
echo LOG: searching for genome name - \${genomeName} >> ${repRID}.uploadExecutionRun.log
genome=\$(curl -s https://${source}/ermrest/catalog/2/entity/RNASeq:Reference_Genome/Name=\${genomeName})
genome=\$(echo \${genome} | grep -o '\\"RID\\":\\".*\\",\\"RCT')
genome=\${genome:7:-6}
echo LOG: genome RID extracted - \${genome} >> ${repRID}.uploadExecutionRun.log
cookie=\$(cat credential.json | grep -A 1 '\\"${source}\\": {' | grep -o '\\"cookie\\": \\".*\\"')
cookie=\${cookie:11:-1}
exist=\$(curl -s https://${source}/ermrest/catalog/2/entity/RNASeq:Execution_Run/Workflow=\${workflow}/Replicate=${repRID}/Input_Bag=${inputBagRID})
echo \${exist} >> ${repRID}.uploadExecutionRun.log
if [ "\${exist}" == "[]" ]
then
executionRun_rid=\$(python3 ${script_uploadExecutionRun_uploadExecutionRun} -r ${repRID} -w \${workflow} -g \${genome} -i ${inputBagRID} -s In-progress -d 'Run in process' -o ${source} -c \${cookie} -u F)
echo LOG: execution run RID uploaded - \${executionRun_rid} >> ${repRID}.uploadExecutionRun.log
else
rid=\$(echo \${exist} | grep -o '\\"RID\\":\\".*\\",\\"RCT')
rid=\${rid:7:-6}
echo \${rid} >> ${repRID}.uploadExecutionRun.log
executionRun_rid=\$(python3 ${script_uploadExecutionRun_uploadExecutionRun} -r ${repRID} -w \${workflow} -g \${genome} -i ${inputBagRID} -s In-progress -d 'Run in process' -o ${source} -c \${cookie} -u \${rid})
echo LOG: execution run RID updated - \${executionRun_rid} >> ${repRID}.uploadExecutionRun.log
fi
echo \${executionRun_rid} > executionRunRID.csv
"""
}
// Extract execution run RID into channel
executionRunRID = Channel.create()
executionRunRID_fl.splitCsv(sep: ",", header: false).separate(
executionRunRID
)
// Replicate execution run RID for multiple process inputs
executionRunRID.into {
executionRunRID_uploadQC
executionRunRID_uploadProcessedFile
executionRunRID_uploadOutputBag
executionRunRID_finalizeExecutionRun
}
/* /*
* getRef: downloads appropriate reference * getRef: downloads appropriate reference
...@@ -777,9 +1054,13 @@ process getRef { ...@@ -777,9 +1054,13 @@ process getRef {
path credential, stageAs: "credential.json" from deriva_getRef path credential, stageAs: "credential.json" from deriva_getRef
val spike from spikeInfer_getRef val spike from spikeInfer_getRef
val species from speciesInfer_getRef val species from speciesInfer_getRef
val pipelineError_getRef
output: output:
tuple path ("hisat2", type: 'dir'), path ("bed", type: 'dir'), path ("*.fna"), path ("*.gtf"), path ("geneID.tsv"), path ("Entrez.tsv") into reference tuple path ("hisat2", type: 'dir'), path ("*.bed"), path ("*.fna"), path ("*.gtf"), path ("geneID.tsv"), path ("Entrez.tsv") into reference
when:
pipelineError_getRef == "false"
script: script:
""" """
...@@ -807,34 +1088,21 @@ process getRef { ...@@ -807,34 +1088,21 @@ process getRef {
fi fi
if [ "${spike}" == "yes" ] if [ "${spike}" == "yes" ]
then then
references=\$(echo \${reference}-S/) references=\$(echo \${reference}-S)
elif [ "${spike}" == "no" ] elif [ "${spike}" == "no" ]
then then
reference=\$(echo \${references}/) reference=\$(echo \${references})
fi fi
echo -e "LOG: species set to \${references}" >> ${repRID}.getRef.log echo -e "LOG: species set to \${references}" >> ${repRID}.getRef.log
# retreive appropriate reference appropriate location # retreive appropriate reference appropriate location
echo -e "LOG: fetching ${species} reference files from ${referenceBase}" >> ${repRID}.getRef.log echo -e "LOG: fetching ${species} reference files from ${referenceBase}" >> ${repRID}.getRef.log
if [ ${referenceBase} == "/project/BICF/BICF_Core/shared/gudmap/references" ] if [ ${referenceBase} == "/project/BICF/BICF_Core/shared/gudmap/references/new" ]
then then
echo -e "LOG: grabbing reference files from local (BioHPC)" >> ${repRID}.getRef.log echo -e "LOG: grabbing reference files from local (BioHPC)" >> ${repRID}.getRef.log
ln -s "\${references}"/hisat2 unzip \${reference}.zip
ln -s "\${references}"/bed mv \$(basename \${reference})/data/* .
ln -s "\${references}"/genome.fna elif [ arams.refSource == "datahub" ]
ln -s "\${references}"/genome.gtf
ln -s "\${references}"/geneID.tsv
ln -s "\${references}"/Entrez.tsv
#elif [ ${referenceBase} == "s3://bicf-references" ]
#then
# echo -e "LOG: grabbing reference files from S3" >> ${repRID}.getRef.log
# aws s3 cp "\${references}"/hisat2 ./hisat2 --recursive
# aws s3 cp "\${references}"/bed ./bed --recursive
# aws s3 cp "\${references}"/genome.fna ./
# aws s3 cp "\${references}"/genome.gtf ./
# aws s3 cp "\${references}"/geneID.tsv ./
# aws s3 cp "\${references}"/Entrez.tsv ./
elif [ ${referenceBase} == "dev.gudmap.org" ]
then then
echo -e "LOG: grabbing reference files from datahub" >> ${repRID}.getRef.log echo -e "LOG: grabbing reference files from datahub" >> ${repRID}.getRef.log
GRCv=\$(echo \${references} | grep -o \${refName}.* | cut -d '.' -f1) GRCv=\$(echo \${references} | grep -o \${refName}.* | cut -d '.' -f1)
...@@ -853,6 +1121,12 @@ process getRef { ...@@ -853,6 +1121,12 @@ process getRef {
mv \${fName}/data/* . mv \${fName}/data/* .
fi fi
echo -e "LOG: fetched" >> ${repRID}.getRef.log echo -e "LOG: fetched" >> ${repRID}.getRef.log
mv ./annotation/genome.gtf .
mv ./sequence/genome.fna .
mv ./annotation/genome.bed .
mv ./metadata/Entrez.tsv .
mv ./metadata/geneID.tsv .
""" """
} }
...@@ -874,11 +1148,15 @@ process alignData { ...@@ -874,11 +1148,15 @@ process alignData {
path reference_alignData path reference_alignData
val ends from endsInfer_alignData val ends from endsInfer_alignData
val stranded from strandedInfer_alignData val stranded from strandedInfer_alignData
val pipelineError_alignData
output: output:
tuple path ("${repRID}.sorted.bam"), path ("${repRID}.sorted.bam.bai") into rawBam tuple path ("${repRID}.sorted.bam"), path ("${repRID}.sorted.bam.bai") into rawBam
path ("*.alignSummary.txt") into alignQC path ("*.alignSummary.txt") into alignQC
when:
pipelineError_alignData == "false"
script: script:
""" """
hostname > ${repRID}.align.log hostname > ${repRID}.align.log
...@@ -941,12 +1219,16 @@ process dedupData { ...@@ -941,12 +1219,16 @@ process dedupData {
input: input:
tuple path (bam), path (bai) from rawBam_dedupData tuple path (bam), path (bai) from rawBam_dedupData
val pipelineError_dedupData
output: output:
tuple path ("${repRID}_sorted.deduped.bam"), path ("${repRID}_sorted.deduped.bam.bai") into dedupBam tuple path ("${repRID}_sorted.deduped.bam"), path ("${repRID}_sorted.deduped.bam.bai") into dedupBam
tuple path ("${repRID}_sorted.deduped.*.bam"), path ("${repRID}_sorted.deduped.*.bam.bai") into dedupChrBam tuple path ("${repRID}_sorted.deduped.*.bam"), path ("${repRID}_sorted.deduped.*.bam.bai") into dedupChrBam
path ("*.deduped.Metrics.txt") into dedupQC path ("*.deduped.Metrics.txt") into dedupQC
when:
pipelineError_dedupData == 'false'
script: script:
""" """
hostname > ${repRID}.dedup.log hostname > ${repRID}.dedup.log
...@@ -990,10 +1272,14 @@ process makeBigWig { ...@@ -990,10 +1272,14 @@ process makeBigWig {
input: input:
tuple path (bam), path (bai) from dedupBam_makeBigWig tuple path (bam), path (bai) from dedupBam_makeBigWig
val pipelineError_makeBigWig
output: output:
path ("${repRID}_sorted.deduped.bw") into bigwig path ("${repRID}_sorted.deduped.bw") into bigwig
when:
pipelineError_makeBigWig == 'false'
script: script:
""" """
hostname > ${repRID}.makeBigWig.log hostname > ${repRID}.makeBigWig.log
...@@ -1020,12 +1306,16 @@ process countData { ...@@ -1020,12 +1306,16 @@ process countData {
path ref from reference_countData path ref from reference_countData
val ends from endsInfer_countData val ends from endsInfer_countData
val stranded from strandedInfer_countData val stranded from strandedInfer_countData
val pipelineError_countData
output: output:
path ("*_tpmTable.csv") into counts path ("*_tpmTable.csv") into counts
path ("*_countData.summary") into countsQC path ("*_countData.summary") into countsQC
path ("assignedReads.csv") into assignedReadsInfer_fl path ("assignedReads.csv") into assignedReadsInfer_fl
when:
pipelineError_countData == 'false'
script: script:
""" """
hostname > ${repRID}.countData.log hostname > ${repRID}.countData.log
...@@ -1091,11 +1381,15 @@ process fastqc { ...@@ -1091,11 +1381,15 @@ process fastqc {
input: input:
path (fastq) from fastqs_fastqc path (fastq) from fastqs_fastqc
val pipelineError_fastqc
output: output:
path ("*_fastqc.zip") into fastqc path ("*_fastqc.zip") into fastqc
path ("rawReads.csv") into rawReadsInfer_fl path ("rawReads.csv") into rawReadsInfer_fl
when:
pipelineError_fastqc == 'false'
script: script:
""" """
hostname > ${repRID}.fastqc.log hostname > ${repRID}.fastqc.log
...@@ -1134,12 +1428,16 @@ process dataQC { ...@@ -1134,12 +1428,16 @@ process dataQC {
tuple path (bam), path (bai) from dedupBam_dataQC tuple path (bam), path (bai) from dedupBam_dataQC
tuple path (chrBam), path (chrBai) from dedupChrBam tuple path (chrBam), path (chrBai) from dedupChrBam
val ends from endsInfer_dataQC val ends from endsInfer_dataQC
val pipelineError_dataQC
output: output:
path "${repRID}_tin.hist.tsv" into tinHist path "${repRID}_tin.hist.tsv" into tinHist
path "${repRID}_tin.med.csv" into tinMedInfer_fl path "${repRID}_tin.med.csv" into tinMedInfer_fl
path "${repRID}_insertSize.inner_distance_freq.txt" into innerDistance path "${repRID}_insertSize.inner_distance_freq.txt" into innerDistance
when:
pipelineError_dataQC == 'false'
script: script:
""" """
hostname > ${repRID}.dataQC.log hostname > ${repRID}.dataQC.log
...@@ -1147,8 +1445,8 @@ process dataQC { ...@@ -1147,8 +1445,8 @@ process dataQC {
# calcualte TIN values per feature on each chromosome # calcualte TIN values per feature on each chromosome
echo -e "geneID\tchrom\ttx_start\ttx_end\tTIN" > ${repRID}_sorted.deduped.tin.xls echo -e "geneID\tchrom\ttx_start\ttx_end\tTIN" > ${repRID}_sorted.deduped.tin.xls
for i in `cat ./bed/genome.bed | cut -f1 | sort | uniq`; do for i in `cat ./genome.bed | cut -f1 | grep -o chr.* | sort | uniq`; do
echo "echo \"LOG: running tin.py on \${i}\" >> ${repRID}.dataQC.log; tin.py -i ${repRID}_sorted.deduped.\${i}.bam -r ./bed/genome.bed; cat ${repRID}_sorted.deduped.\${i}.tin.xls | tr -s \"\\w\" \"\\t\" | grep -P \\\"\\\\t\${i}\\\\t\\\";"; echo "echo \"LOG: running tin.py on \${i}\" >> ${repRID}.dataQC.log; tin.py -i ${repRID}_sorted.deduped.\${i}.bam -r ./genome.bed; cat ${repRID}_sorted.deduped.\${i}.tin.xls | tr -s \"\\w\" \"\\t\" | grep -P \\\"\\\\t\${i}\\\\t\\\";";
done | parallel -j `nproc` -k 1>> ${repRID}_sorted.deduped.tin.xls done | parallel -j `nproc` -k 1>> ${repRID}_sorted.deduped.tin.xls
# bin TIN values # bin TIN values
...@@ -1160,7 +1458,7 @@ process dataQC { ...@@ -1160,7 +1458,7 @@ process dataQC {
if [ "${ends}" == "pe" ] if [ "${ends}" == "pe" ]
then then
echo -e "LOG: calculating inner distances for ${ends}" >> ${repRID}.dataQC.log echo -e "LOG: calculating inner distances for ${ends}" >> ${repRID}.dataQC.log
inner_distance.py -i "${bam}" -o ${repRID}_insertSize -r ./bed/genome.bed inner_distance.py -i "${bam}" -o ${repRID}_insertSize -r ./genome.bed
echo -e "LOG: calculated" >> ${repRID}.dataQC.log echo -e "LOG: calculated" >> ${repRID}.dataQC.log
elif [ "${ends}" == "se" ] elif [ "${ends}" == "se" ]
then then
...@@ -1199,10 +1497,10 @@ process aggrQC { ...@@ -1199,10 +1497,10 @@ process aggrQC {
path alignSampleQCs from alignSampleQC_aggrQC.collect() path alignSampleQCs from alignSampleQC_aggrQC.collect()
path inferExperiment path inferExperiment
val endsManual from endsManual_aggrQC val endsManual from endsManual_aggrQC
val endsM from endsMeta val endsM from endsMeta_aggrQC
val strandedM from strandedMeta val strandedM from strandedMeta_aggrQC
val spikeM from spikeMeta val spikeM from spikeMeta_aggrQC
val speciesM from speciesMeta val speciesM from speciesMeta_aggrQC
val endsI from endsInfer_aggrQC val endsI from endsInfer_aggrQC
val strandedI from strandedInfer_aggrQC val strandedI from strandedInfer_aggrQC
val spikeI from spikeInfer_aggrQC val spikeI from spikeInfer_aggrQC
...@@ -1214,11 +1512,15 @@ process aggrQC { ...@@ -1214,11 +1512,15 @@ process aggrQC {
val tinMedI from tinMedInfer val tinMedI from tinMedInfer
val studyRID from studyRID_aggrQC val studyRID from studyRID_aggrQC
val expRID from expRID_aggrQC val expRID from expRID_aggrQC
val pipelineError_aggrQC
output: output:
path "${repRID}.multiqc.html" into multiqc path "${repRID}.multiqc.html" into multiqc
path "${repRID}.multiqc_data.json" into multiqcJSON path "${repRID}.multiqc_data.json" into multiqcJSON
when:
pipelineError_aggrQC == 'false'
script: script:
""" """
hostname > ${repRID}.aggrQC.log hostname > ${repRID}.aggrQC.log
...@@ -1285,147 +1587,6 @@ process aggrQC { ...@@ -1285,147 +1587,6 @@ process aggrQC {
""" """
} }
/*
* uploadInputBag: uploads the input bag
*/
process uploadInputBag {
tag "${repRID}"
input:
path script_uploadInputBag
path credential, stageAs: "credential.json" from deriva_uploadInputBag
path inputBag from inputBag_uploadInputBag
val studyRID from studyRID_uploadInputBag
output:
path ("inputBagRID.csv") into inputBagRID_fl
when:
upload
script:
"""
hostname > ${repRID}.uploadInputBag.log
ulimit -a >> ${repRID}.uploadInputBag.log
yr=\$(date +'%Y')
mn=\$(date +'%m')
dy=\$(date +'%d')
file=\$(basename -a ${inputBag})
md5=\$(md5sum ./\${file} | awk '{ print \$1 }')
echo LOG: ${repRID} input bag md5 sum - \${md5} >> ${repRID}.uploadInputBag.log
size=\$(wc -c < ./\${file})
echo LOG: ${repRID} input bag size - \${size} bytes >> ${repRID}.uploadInputBag.log
exist=\$(curl -s https://${source}/ermrest/catalog/2/entity/RNASeq:Input_Bag/File_MD5=\${md5})
if [ "\${exist}" == "[]" ]
then
cookie=\$(cat credential.json | grep -A 1 '\\"${source}\\": {' | grep -o '\\"cookie\\": \\".*\\"')
cookie=\${cookie:11:-1}
loc=\$(deriva-hatrac-cli --host ${source} put ./\${file} /hatrac/resources/rnaseq/pipeline/input_bag/study/${studyRID}/replicate/${repRID}/\${file} --parents)
inputBag_rid=\$(python3 ${script_uploadInputBag} -f \${file} -l \${loc} -s \${md5} -b \${size} -o ${source} -c \${cookie})
echo LOG: input bag RID uploaded - \${inputBag_rid} >> ${repRID}.uploadInputBag.log
rid=\${inputBag_rid}
else
exist=\$(echo \${exist} | grep -o '\\"RID\\":\\".*\\",\\"RCT')
exist=\${exist:7:-6}
echo LOG: input bag RID already exists - \${exist} >> ${repRID}.uploadInputBag.log
rid=\${exist}
fi
echo \${rid} > inputBagRID.csv
"""
}
// Extract input bag RID into channel
inputBagRID = Channel.create()
inputBagRID_fl.splitCsv(sep: ",", header: false).separate(
inputBagRID
)
/*
* uploadExecutionRun: uploads the execution run
*/
process uploadExecutionRun {
tag "${repRID}"
input:
path script_uploadExecutionRun
path credential, stageAs: "credential.json" from deriva_uploadExecutionRun
val spike from spikeInfer_uploadExecutionRun
val species from speciesInfer_uploadExecutionRun
val inputBagRID
output:
path ("executionRunRID.csv") into executionRunRID_fl
when:
upload
script:
"""
hostname > ${repRID}.uploadExecutionRun.log
ulimit -a >> ${repRID}.uploadExecutionRun.log
echo LOG: searching for workflow RID - BICF mRNA ${workflow.manifest.version} >> ${repRID}.uploadExecutionRun.log
workflow=\$(curl -s https://${source}/ermrest/catalog/2/entity/RNASeq:Workflow/Name=BICF%20mRNA%20Replicate/Version=${workflow.manifest.version})
workflow=\$(echo \${workflow} | grep -o '\\"RID\\":\\".*\\",\\"RCT')
workflow=\${workflow:7:-6}
echo LOG: workflow RID extracted - \${workflow} >> ${repRID}.uploadExecutionRun.log
if [ "${species}" == "Homo sapiens" ]
then
genomeName=\$(echo GRCh${refHuVersion})
elif [ "${species}" == "Mus musculus" ]
then
genomeName=\$(echo GRCm${refMoVersion})
fi
if [ "${spike}" == "yes" ]
then
genomeName=\$(echo \${genomeName}-S)
fi
echo LOG: searching for genome name - \${genomeName} >> ${repRID}.uploadExecutionRun.log
genome=\$(curl -s https://${source}/ermrest/catalog/2/entity/RNASeq:Reference_Genome/Name=\${genomeName}_indev)
genome=\$(echo \${genome} | grep -o '\\"RID\\":\\".*\\",\\"RCT')
genome=\${genome:7:-6}
echo LOG: genome RID extracted - \${genome} >> ${repRID}.uploadExecutionRun.log
cookie=\$(cat credential.json | grep -A 1 '\\"${source}\\": {' | grep -o '\\"cookie\\": \\".*\\"')
cookie=\${cookie:11:-1}
exist=\$(curl -s https://${source}/ermrest/catalog/2/entity/RNASeq:Execution_Run/Workflow=\${workflow}/Replicate=${repRID}/Input_Bag=${inputBagRID})
echo \${exist} >> ${repRID}.uploadExecutionRun.log
if [ "\${exist}" == "[]" ]
then
executionRun_rid=\$(python3 ${script_uploadExecutionRun} -r ${repRID} -w \${workflow} -g \${genome} -i ${inputBagRID} -s In-progress -d 'Run in process' -o ${source} -c \${cookie} -u F)
echo LOG: execution run RID uploaded - \${executionRun_rid} >> ${repRID}.uploadExecutionRun.log
else
rid=\$(echo \${exist} | grep -o '\\"RID\\":\\".*\\",\\"RCT')
rid=\${rid:7:-6}
echo \${rid} >> ${repRID}.uploadExecutionRun.log
executionRun_rid=\$(python3 ${script_uploadExecutionRun} -r ${repRID} -w \${workflow} -g \${genome} -i ${inputBagRID} -s In-progress -d 'Run in process' -o ${source} -c \${cookie} -u \${rid})
echo LOG: execution run RID updated - \${executionRun_rid} >> ${repRID}.uploadExecutionRun.log
fi
echo \${executionRun_rid} > executionRunRID.csv
"""
}
// Extract execution run RID into channel
executionRunRID = Channel.create()
executionRunRID_fl.splitCsv(sep: ",", header: false).separate(
executionRunRID
)
//
executionRunRID.into {
executionRunRID_uploadQC
executionRunRID_uploadProcessedFile
executionRunRID_uploadOutputBag
}
/* /*
* uploadQC: uploads the mRNA QC * uploadQC: uploads the mRNA QC
*/ */
...@@ -1442,13 +1603,14 @@ process uploadQC { ...@@ -1442,13 +1603,14 @@ process uploadQC {
val length from readLengthInfer_uploadQC val length from readLengthInfer_uploadQC
val rawCount from rawReadsInfer_uploadQC val rawCount from rawReadsInfer_uploadQC
val finalCount from assignedReadsInfer_uploadQC val finalCount from assignedReadsInfer_uploadQC
val pipelineError_uploadQC
output: output:
path ("qcRID.csv") into qcRID_fl path ("qcRID.csv") into qcRID_fl
when: when:
upload upload
pipelineError_uploadQC == 'false'
script: script:
""" """
...@@ -1511,12 +1673,14 @@ process uploadProcessedFile { ...@@ -1511,12 +1673,14 @@ process uploadProcessedFile {
val studyRID from studyRID_uploadProcessedFile val studyRID from studyRID_uploadProcessedFile
val expRID from expRID_uploadProcessedFile val expRID from expRID_uploadProcessedFile
val executionRunRID from executionRunRID_uploadProcessedFile val executionRunRID from executionRunRID_uploadProcessedFile
val pipelineError_uploadProcessedFile
output: output:
path ("${repRID}_Output_Bag.zip") into outputBag path ("${repRID}_Output_Bag.zip") into outputBag
when: when:
upload upload
pipelineError_uploadProcessedFile == 'false'
script: script:
""" """
...@@ -1595,12 +1759,14 @@ process uploadOutputBag { ...@@ -1595,12 +1759,14 @@ process uploadOutputBag {
path outputBag path outputBag
val studyRID from studyRID_uploadOutputBag val studyRID from studyRID_uploadOutputBag
val executionRunRID from executionRunRID_uploadOutputBag val executionRunRID from executionRunRID_uploadOutputBag
val pipelineError_uploadOutputBag
output: output:
path ("outputBagRID.csv") into outputBagRID_fl path ("outputBagRID.csv") into outputBagRID_fl
when: when:
upload upload
pipelineError_uploadOutputBag == 'false'
script: script:
""" """
...@@ -1616,11 +1782,11 @@ process uploadOutputBag { ...@@ -1616,11 +1782,11 @@ process uploadOutputBag {
echo LOG: ${repRID} output bag md5 sum - \${md5} >> ${repRID}.uploadOutputBag.log echo LOG: ${repRID} output bag md5 sum - \${md5} >> ${repRID}.uploadOutputBag.log
size=\$(wc -c < ./\${file}) size=\$(wc -c < ./\${file})
echo LOG: ${repRID} output bag size - \${size} bytes >> ${repRID}.uploadOutputBag.log echo LOG: ${repRID} output bag size - \${size} bytes >> ${repRID}.uploadOutputBag.log
exist=\$(curl -s https://${source}/ermrest/catalog/2/entity/RNASeq:Output_Bag/File_MD5=\${md5}) exist=\$(curl -s https://${source}/ermrest/catalog/2/entity/RNASeq:Output_Bag/File_MD5=\${md5})
if [ "\${exist}" == "[]" ] if [ "\${exist}" == "[]" ]
then then
cookie=\$(cat credential.json | grep -A 1 '\\"${source}\\": {' | grep -o '\\"cookie\\": \\".*\\"') cookie=\$(cat credential.json | grep -A 1 '\\"${source}\\": {' | grep -o '\\"cookie\\": \\".*\\"')
cookie=\${cookie:11:-1} cookie=\${cookie:11:-1}
loc=\$(deriva-hatrac-cli --host ${source} put ./\${file} /hatrac/resources/rnaseq/pipeline/output_bag/study/${studyRID}/replicate/${repRID}/\${file} --parents) loc=\$(deriva-hatrac-cli --host ${source} put ./\${file} /hatrac/resources/rnaseq/pipeline/output_bag/study/${studyRID}/replicate/${repRID}/\${file} --parents)
...@@ -1639,10 +1805,82 @@ process uploadOutputBag { ...@@ -1639,10 +1805,82 @@ process uploadOutputBag {
} }
// Extract output bag RID into channel // Extract output bag RID into channel
outputBagRID = Channel.create() outputBagRID = Channel.value()
outputBagRID_temp = Channel.create()
outputBagRID_fl.splitCsv(sep: ",", header: false).separate( outputBagRID_fl.splitCsv(sep: ",", header: false).separate(
outputBagRID outputBagRID_temp
) )
outputBagRID = outputBagRID_temp
/*
* finalizeExecutionRun: finalizes the execution run
*/
process finalizeExecutionRun {
tag "${repRID}"
input:
path script_uploadExecutionRun_finalizeExecutionRun
path credential, stageAs: "credential.json" from deriva_finalizeExecutionRun
val executionRunRID from executionRunRID_finalizeExecutionRun
val inputBagRID from inputBagRID_finalizeExecutionRun
path outputBagRID
val endsMeta from endsMeta_finalizeExecutionRun
val strandedMeta from strandedMeta_finalizeExecutionRun
val spikeMeta from spikeMeta_finalizeExecutionRun
val speciesMeta from speciesMeta_finalizeExecutionRun
val endsInfer from endsInfer_finalizeExecutionRun
val strandedInfer from strandedInfer_finalizeExecutionRun
val spikeInfer from spikeInfer_finalizeExecutionRun
val speciesInfer from speciesInfer_finalizeExecutionRun
val pipelineError from pipelineError_finalizeExecutionRun
val pipelineError_ends
val pipelineError_stranded
val pipelineError_spike
val pipelineError_species
when:
upload
script:
"""
hostname > ${repRID}.finalizeExecutionRun.log
ulimit -a >> ${repRID}.finalizeExecutionRun.log
executionRun=\$(curl -s https://${source}/ermrest/catalog/2/entity/RNASeq:Execution_Run/RID=${executionRunRID})
workflow=\$(echo \${executionRun} | grep -o '\\"Workflow\\":.*\\"Reference' | grep -oP '(?<=\\"Workflow\\":\\").*(?=\\",\\"Reference)')
genome=\$(echo \${executionRun} | grep -o '\\"Reference_Genome\\":.*\\"Input_Bag' | grep -oP '(?<=\\"Reference_Genome\\":\\").*(?=\\",\\"Input_Bag)')
cookie=\$(cat credential.json | grep -A 1 '\\"${source}\\": {' | grep -o '\\"cookie\\": \\".*\\"')
cookie=\${cookie:11:-1}
if [ ${pipelineError} == false ]
then
rid=\$(python3 ${script_uploadExecutionRun_finalizeExecutionRun} -r ${repRID} -w \${workflow} -g \${genome} -i ${inputBagRID} -s Success -d 'Run Successful' -o ${source} -c \${cookie} -u ${executionRunRID})
echo LOG: execution run RID marked as successful - \${rid} >> ${repRID}.finalizeExecutionRun.log
else
pipelineError_details=\$(echo "**Submitted metadata does not match infered:** ")
if ${pipelineError_ends}
then
pipelineError_details=\$(echo \${pipelineError_details}"[ Submitted ends = *${endsMeta}* **|** Infered ends = *${endsInfer}* ] ")
fi
if ${pipelineError_stranded}
then
pipelineError_details=\$(echo \${pipelineError_details}"[ Submitted strandedness = *${strandedMeta}* **|** Infered strandedness = *${strandedInfer}* ] ")
fi
if ${pipelineError_spike}
then
pipelineError_details=\$(echo \${pipelineError_details}"[ Submitted spike-in = *${spikeMeta}* **|** Infered spike-in = *${spikeInfer}* ] ")
fi
if ${pipelineError_species}
then
pipelineError_details=\$(echo \${pipelineError_details}"[ Submitted species = *${speciesMeta}* **|** Infered species = *${speciesInfer}* ] ")
fi
printf "\${pipelineError_details}"
rid=\$(python3 ${script_uploadExecutionRun_finalizeExecutionRun} -r ${repRID} -w \${workflow} -g \${genome} -i ${inputBagRID} -s Error -d "\${pipelineError_details}" -o ${source} -c \${cookie} -u ${executionRunRID})
echo LOG: execution run RID marked as error - \${rid} >> ${repRID}.finalizeExecutionRun.log
fi
"""
}
workflow.onError = { workflow.onError = {
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment