Newer
Older
// ######## #### ###### ########
// ## ## ## ## ## ##
// ## ## ## ## ##
// ######## ## ## ######
// ## ## ## ## ##
// ## ## ## ## ## ##
// ######## #### ###### ##
params.deriva = "${baseDir}/../test_data/auth/credential.json"
params.bdbag = "${baseDir}/../test_data/auth/cookies.txt"
params.refMoVersion = "38.p6.vM25"
params.refHuVersion = "38.p13.v36"
// Define tracking input variables
params.ci = false
params.dev = true
deriva = Channel
.fromPath(params.deriva)
.ifEmpty { exit 1, "deriva credential file not found: ${params.deriva}" }
deriva.into {
deriva_getBag
deriva_getRefInfer
deriva_getRef
deriva_uploadProcessedFile
deriva_finalizeExecutionRun
bdbag = Channel
.fromPath(params.bdbag)
.ifEmpty { exit 1, "deriva cookie file for bdbag not found: ${params.bdbag}" }
refMoVersion = params.refMoVersion
logsDir = "${outDir}/Logs"
// Define fixed files and variables
replicateExportConfig = Channel.fromPath("${baseDir}/conf/Replicate_For_Input_Bag.json")
executionRunExportConfig = Channel.fromPath("${baseDir}/conf/Execution_Run_For_Output_Bag.json")
if (params.source == "dev") {
source = "dev.gudmap.org"
} else if (params.source == "staging") {
source = "staging.gudmap.org"
} else if (params.source == "production") {
source = "www.gudmap.org"
}
referenceBase = "/project/BICF/BICF_Core/shared/gudmap/references/new"
referenceBase = "www.gudmap.org"
referenceInfer = Channel.fromList(["ERCC","GRCh","GRCm"])
multiqcConfig = Channel.fromPath("${baseDir}/conf/multiqc_config.yaml")
bicfLogo = Channel.fromPath("${baseDir}/../docs/bicf_logo.png")
softwareReferences = Channel.fromPath("${baseDir}/../docs/software_references_mqc.yaml")
softwareVersions = Channel.fromPath("${baseDir}/../docs/software_versions_mqc.yaml")
script_bdbagFetch = Channel.fromPath("${baseDir}/scripts/bdbag_fetch.sh")
script_parseMeta = Channel.fromPath("${baseDir}/scripts/parse_meta.py")
script_inferMeta = Channel.fromPath("${baseDir}/scripts/infer_meta.sh")
script_refDataInfer = Channel.fromPath("${baseDir}/scripts/extract_ref_data.py")
script_refData = Channel.fromPath("${baseDir}/scripts/extract_ref_data.py")
script_calculateTPM = Channel.fromPath("${baseDir}/scripts/calculateTPM.R")
script_convertGeneSymbols = Channel.fromPath("${baseDir}/scripts/convertGeneSymbols.R")
script_tinHist = Channel.fromPath("${baseDir}/scripts/tin_hist.py")
script_uploadInputBag = Channel.fromPath("${baseDir}/scripts/upload_input_bag.py")
script_uploadExecutionRun_uploadExecutionRun = Channel.fromPath("${baseDir}/scripts/upload_execution_run.py")
script_uploadExecutionRun_finalizeExecutionRun = Channel.fromPath("${baseDir}/scripts/upload_execution_run.py")
script_uploadExecutionRun_failExecutionRun = Channel.fromPath("${baseDir}/scripts/upload_execution_run.py")
script_uploadQC = Channel.fromPath("${baseDir}/scripts/upload_qc.py")
script_uploadOutputBag = Channel.fromPath("${baseDir}/scripts/upload_output_bag.py")
script_deleteEntry_uploadQC = Channel.fromPath("${baseDir}/scripts/delete_entry.py")
script_deleteEntry_uploadProcessedFile = Channel.fromPath("${baseDir}/scripts/delete_entry.py")
/*
* trackStart: track start of pipeline
*/
process trackStart {
container 'docker://gudmaprbk/gudmap-rbk_base:1.0.0'
curl -H 'Content-Type: application/json' -X PUT -d \
'{ \
"sessionId": "${workflow.sessionId}", \
"pipeline": "gudmap.rbk_rnaseq", \
"start": "${workflow.start}", \
"astrocyte": false, \
"status": "started", \
"nextflowVersion": "${workflow.nextflow.version}", \
"pipelineVersion": "${workflow.manifest.version}", \
"dev": ${params.dev} \
}' \
"https://xku43pcwnf.execute-api.us-east-1.amazonaws.com/ProdDeploy/pipeline-tracking"
log.info """\
====================================
BICF RNA-seq Pipeline for GUDMAP/RBK
====================================
Replicate RID : ${params.repRID}
Source Server : ${params.source}
Human Reference Version: ${params.refHuVersion}
ERCC Reference Version : ${params.refERCCVersion}
Upload : ${upload}
Nextflow Version : ${workflow.nextflow.version}
Pipeline Version : ${workflow.manifest.version}
Session ID : ${workflow.sessionId}
CI : ${params.ci}
Development : ${params.dev}
publishDir "${outDir}/inputBag", mode: 'copy', pattern: "Replicate_*.zip"
path credential, stageAs: "credential.json" from deriva_getBag
hostname > ${repRID}.getBag.log
ulimit -a >> ${repRID}.getBag.log
# link credential file for authentication
echo -e "LOG: linking deriva credentials" >> ${repRID}.getBag.log
ln -sf `readlink -e credential.json` ~/.deriva/credential.json
echo -e "LOG: fetching bag for ${repRID} in GUDMAP" >> ${repRID}.getBag.log
deriva-download-cli ${source} --catalog 2 ${replicateExportConfig} . rid=${repRID}
name=\$(ls *.zip)
name=\$(basename \${name} | cut -d "." -f1)
yr=\$(date +'%Y')
mn=\$(date +'%m')
dy=\$(date +'%d')
mv \${name}.zip \${name}_\${yr}\${mn}\${dy}.zip
// Set inputBag to downloaded or forced input
if (inputBagForce != "") {
inputBag = Channel
.fromPath(inputBagForce)
.ifEmpty { exit 1, "override inputBag file not found: ${inputBagForce}" }
inputBag.into {
inputBag_getData
inputBag_uploadInputBag
}
* getData: fetch replicate files from consortium with downloaded bdbag.zip
*/
process getData {
path cookies, stageAs: "deriva-cookies.txt" from bdbag
path ("*.R{1,2}.fastq.gz") into fastqs
path ("**/File.csv") into fileMeta
path ("**/Experiment Settings.csv") into experimentSettingsMeta
path ("**/Experiment.csv") into experimentMeta
path "fastqCount.csv" into fastqCount_fl
hostname > ${repRID}.getData.log
ulimit -a >> ${repRID}.getData.log
echo -e "LOG: linking deriva cookie" >> ${repRID}.getData.log
ln -sf `readlink -e deriva-cookies.txt` ~/.bdbag/deriva-cookies.txt
echo -e "LOG: bag replicate name \${replicate}" >> ${repRID}.getData.log
# unzip bag
echo -e "LOG: unzipping replicate bag" >> ${repRID}.getData.log
echo -e "LOG: fetching replicate bdbag" >> ${repRID}.getData.log
sh ${script_bdbagFetch} \${replicate::-13} ${repRID}
fastqCount=\$(ls *.fastq.gz | wc -l)
echo -e \${fastqCount} > fastqCount.csv
// Split fastq count into channel
fastqCount = Channel.create()
fastqCount_fl.splitCsv(sep: ",", header: false).separate(
fastqCount
)
// Set raw fastq to downloaded or forced input and replicate them for multiple process inputs
if (fastqsForce != "") {
Channel
.fromPath(fastqsForce)
.ifEmpty { exit 1, "override inputBag file not found: ${fastqsForce}" }

Venkat Malladi
committed
.collect().set {

Venkat Malladi
committed
fastqs.set {
/*
* parseMetadata: parses metadata to extract experiment parameters
*/
process parseMetadata {
path file from fileMeta
path experimentSettings, stageAs: "ExperimentSettings.csv" from experimentSettingsMeta
path experiment from experimentMeta
path "fastqCountError.csv" into fastqCountError_fl
hostname > ${repRID}.parseMetadata.log
ulimit -a >> ${repRID}.parseMetadata.log
rep=\$(python3 ${script_parseMeta} -r ${repRID} -m "${file}" -p repRID)
echo -e "LOG: replicate RID metadata parsed: \${rep}" >> ${repRID}.parseMetadata.log
exp=\$(python3 ${script_parseMeta} -r ${repRID} -m "${file}" -p expRID)
echo -e "LOG: experiment RID metadata parsed: \${exp}" >> ${repRID}.parseMetadata.log
study=\$(python3 ${script_parseMeta} -r ${repRID} -m "${file}" -p studyRID)
echo -e "LOG: study RID metadata parsed: \${study}" >> ${repRID}.parseMetadata.log
endsMeta=\$(python3 ${script_parseMeta} -r ${repRID} -m "${experimentSettings}" -p endsMeta)
echo -e "LOG: endedness metadata parsed: \${endsMeta}" >> ${repRID}.parseMetadata.log
endsManual=\$(python3 ${script_parseMeta} -r ${repRID} -m "${file}" -p endsManual)
echo -e "LOG: endedness manually detected: \${endsManual}" >> ${repRID}.parseMetadata.log
stranded=\$(python3 ${script_parseMeta} -r ${repRID} -m "${experimentSettings}" -p stranded)
echo -e "LOG: strandedness metadata parsed: \${stranded}" >> ${repRID}.parseMetadata.log
spike=\$(python3 ${script_parseMeta} -r ${repRID} -m "${experimentSettings}" -p spike)
echo -e "LOG: spike-in metadata parsed: \${spike}" >> ${repRID}.parseMetadata.log
species=\$(python3 ${script_parseMeta} -r ${repRID} -m "${experiment}" -p species)
echo -e "LOG: species metadata parsed: \${species}" >> ${repRID}.parseMetadata.log
readLength=\$(python3 ${script_parseMeta} -r ${repRID} -m "${experimentSettings}" -p readLength)
if [ "\${readLength}" = "nan"]
echo -e "LOG: read length metadata parsed: \${readLength}" >> ${repRID}.parseMetadata.log
# check not incorrect number of fastqs
fastqCountError=false
fastqCountError_details=""
if [ "${fastqCount}" -gt "2" ]
then
fastqCountError=true
fastqCountError_details="Too many fastqs detected (>2)"
elif [ "\${endsMeta}"" == "Single Read" ] && [ "${fastqCount}" -ne "1" ]
then
fastqCountError=true
fastqCountError_details="Number of fastqs detected does not match submitted endness"
elif [ "\${endsMeta}"" == "Paired End" ] && [ "${fastqCount}" -ne "2" ]
then
fastqCountError=true
fastqCountError_details="Number of fastqs detected does not match submitted endness"
fi
# save design file
echo -e "\${endsMeta},\${endsManual},\${stranded},\${spike},\${species},\${readLength},\${exp},\${study}" > design.csv
# save fastq count error file
echo -e "\${fastqCountError},\${fastqCountError_details}" > fastqCountError.csv
// Split metadata into separate channels
endsMeta = Channel.create()
endsManual = Channel.create()
strandedMeta = Channel.create()
spikeMeta = Channel.create()
speciesMeta = Channel.create()
readLengthMeta = Channel.create()
expRID = Channel.create()
studyRID = Channel.create()
metadata_fl.splitCsv(sep: ",", header: false).separate(
strandedMeta,
spikeMeta,
// Replicate metadata for multiple process inputs
endsMeta.into {
endsMeta_checkMetadata
endsMeta_aggrQC
endsMeta_failExecutionRun
strandedMeta.into {
strandedMeta_checkMetadata
strandedMeta_aggrQC
strandedMeta_failExecutionRun
}
spikeMeta.into {
spikeMeta_checkMetadata
spikeMeta_aggrQC
spikeMeta_failExecutionRun
}
speciesMeta.into {
speciesMeta_checkMetadata
speciesMeta_aggrQC
speciesMeta_failExecutionRun
studyRID_uploadInputBag
studyRID_uploadProcessedFile
studyRID_uploadOutputBag
expRID_uploadProcessedFile
// Split fastq count error into separate channel
fastqCountError = Channel.create()
fastqCountError_details = Channel.create()
fastqCountError_fl.splitCsv(sep: ",", header: false).separate(
fastqCountError,
fastqCountError_details
)
// Replicate errors for multiple process inputs
fastqCountError.into {
fastqCountError_getRef
fastqCountError_alignData
fastqCountError_dedupData
fastqCountError_makeBigWig
fastqCountError_countData
fastqCountError_fastqc
fastqCountError_dataQC
fastqCountError_aggrQC
fastqCountError_uploadQC
fastqCountError_uploadProcessedFile
fastqCountError_uploadOutputBag
fastqCountError_failExecutionRun
/*
* trimData: trims any adapter or non-host sequences from the data
*/
process trimData {
tag "${repRID}"
input:
path (fastq) from fastqs_trimData
output:
path ("*.fq.gz") into fastqsTrim
path ("*.fastq.gz", includeInputs:true) into fastqs_fastqc
path ("*_trimming_report.txt") into trimQC
hostname > ${repRID}.trimData.log
ulimit -a >> ${repRID}.trimData.log
# trim fastq's using trim_galore and extract median read length
echo -e "LOG: trimming ${ends}" >> ${repRID}.trimData.log
if [ "${ends}" == "se" ]
then
trim_galore --gzip -q 25 --length 35 --basename ${repRID} ${fastq[0]}
readLength=\$(zcat *_trimmed.fq.gz | awk '{if(NR%4==2) print length(\$1)}' | sort -n | awk '{a[NR]=\$0}END{print(NR%2==1)?a[int(NR/2)+1]:(a[NR/2]+a[NR/2+1])/2}')
elif [ "${ends}" == "pe" ]
then
trim_galore --gzip -q 25 --length 35 --paired --basename ${repRID} ${fastq[0]} ${fastq[1]}
readLength=\$(zcat *_1.fq.gz | awk '{if(NR%4==2) print length(\$1)}' | sort -n | awk '{a[NR]=\$0}END{print(NR%2==1)?a[int(NR/2)+1]:(a[NR/2]+a[NR/2+1])/2}')
echo -e "LOG: average trimmed read length: \${readLength}" >> ${repRID}.trimData.log
# save read length file
echo -e "\${readLength}" > readLength.csv
// Extract calculated read length metadata into channel
readLengthInfer = Channel.create()
readLengthInfer_fl.splitCsv(sep: ",", header: false).separate(
readLengthInfer
// Replicate infered read length for multiple process inputs
readLengthInfer.into {
readLengthInfer_aggrQC
readLengthInfer_uploadQC
}
// Replicate trimmed fastq's for multiple process inputs
fastqsTrim.into {
fastqsTrim_alignData
fastqsTrim_downsampleData
// Combine inputs of getRefInfer
getRefInferInput = referenceInfer.combine(deriva_getRefInfer.combine(script_refDataInfer))
* getRefInfer: dowloads appropriate reference for metadata inference
tuple val (refName), path (credential, stageAs: "credential.json"), path (script_refDataInfer) from getRefInferInput
tuple val (refName), path ("hisat2", type: 'dir'), path ("*.fna"), path ("*.gtf") into refInfer
path ("${refName}", type: 'dir') into bedInfer
hostname > ${repRID}.${refName}.getRefInfer.log
ulimit -a >> ${repRID}.${refName}.getRefInfer.log
# link credential file for authentication
echo -e "LOG: linking deriva credentials" >> ${repRID}.getRefInfer.log
mkdir -p ~/.deriva
ln -sf `readlink -e credential.json` ~/.deriva/credential.json
echo -e "LOG: linked" >> ${repRID}.getRefInfer.log
if [ "${refName}" == "ERCC" ]
then
references=\$(echo ${referenceBase}/ERCC${refERCCVersion})
elif [ "${refName}" == "GRCm" ]
then
references=\$(echo ${referenceBase}/GRCm${refMoVersion})
elif [ '${refName}' == "GRCh" ]
then
references=\$(echo ${referenceBase}/GRCh${refHuVersion})
else
echo -e "LOG: ERROR - References could not be set!\nReference found: ${referenceBase}" >> ${repRID}.${refName}.getRefInfer.log
# retreive appropriate reference appropriate location
echo -e "LOG: fetching ${refName} reference files from ${referenceBase}" >> ${repRID}.${refName}.getRefInfer.log
if [ ${referenceBase} == "/project/BICF/BICF_Core/shared/gudmap/references/new" ]
unzip \${references}.zip
mv \$(basename \${references})/data/* .
elif [ params.refSource == "datahub" ]
GRCv=\$(echo \${references} | grep -o ${refName}.* | cut -d '.' -f1)
GRCp=\$(echo \${references} | grep -o ${refName}.* | cut -d '.' -f2)
GENCODE=\$(echo \${references} | grep -o ${refName}.* | cut -d '.' -f3)
if [ "${refName}" != "ERCC" ]
then
query=\$(echo 'https://${referenceBase}/ermrest/catalog/2/entity/RNASeq:Reference_Genome/Reference_Version='\${GRCv}'.'\${GRCp}'/Annotation_Version=GENCODE%20'\${GENCODE})
else
query=\$(echo 'https://${referenceBase}/ermrest/catalog/2/entity/RNASeq:Reference_Genome/Reference_Version=${refName}${refERCCVersion}/Annotation_Version=${refName}${refERCCVersion}')
fi
curl --request GET \${query} > refQuery.json
refURL=\$(python ${script_refDataInfer} --returnParam URL)
fName=\$(python ${script_refDataInfer} --returnParam fName)
fName=\${fName%.*}
if [ "\${loc}" = "/hatrac/*" ]; then echo "LOG: Reference not present in hatrac"; exit 1; fi
filename=\$(echo \$(basename \${refURL}) | grep -oP '.*(?=:)')
deriva-hatrac-cli --host ${referenceBase} get \${refURL}
unzip \$(basename \${refURL})
mv ./annotation/genome.gtf .
mv ./sequence/genome.fna .
mkdir ${refName}
if [ "${refName}" != "ERCC" ]
mv ./annotation/genome.bed ./${refName}
echo -e "LOG: fetched" >> ${repRID}.${refName}.getRefInfer.log
"""
}
/*
* downsampleData: downsample fastq's for metadata inference
*/
process downsampleData {
tag "${repRID}"
input:
path fastq from fastqsTrim_downsampleData
output:
path ("sampled.1.fq") into fastqs1Sample
path ("sampled.2.fq") into fastqs2Sample
hostname > ${repRID}.downsampleData.log
ulimit -a >> ${repRID}.downsampleData.log
if [ "${ends}" == "se" ]
then
echo -e "LOG: downsampling SE trimmed fastq" >> ${repRID}.downsampleData.log
seqtk sample -s100 *trimmed.fq.gz 100000 1> sampled.1.fq
elif [ "${ends}" == "pe" ]
then
echo -e "LOG: downsampling R1 of PE trimmed fastq" >> ${repRID}.downsampleData.log
seqtk sample -s100 *1.fq.gz 1000000 1> sampled.1.fq
echo -e "LOG: downsampling R2 of PE trimmed fastq" >> ${repRID}.downsampleData.log
seqtk sample -s100 *2.fq.gz 1000000 1> sampled.2.fq
echo -e "LOG: downsampled" >> ${repRID}.downsampleData.log
// Replicate the dowsampled fastq's and attatched to the references
inferInput = endsManual_alignSampleData.combine(refInfer.combine(fastqs1Sample.collect().combine(fastqs2Sample.collect())))
/*
* alignSampleData: aligns the downsampled reads to a reference database
*/
process alignSampleData {
tag "${ref}"
input:
tuple val (ends), val (ref), path (hisat2), path (fna), path (gtf), path (fastq1), path (fastq2) from inferInput
output:
path ("${ref}.sampled.sorted.bam") into sampleBam
path ("${ref}.sampled.sorted.bam.bai") into sampleBai
path ("${ref}.alignSampleSummary.txt") into alignSampleQC
script:
"""
hostname > ${repRID}.${ref}.alignSampleData.log
ulimit -a >> ${repRID}.${ref}.alignSampleData.log
# align the reads with Hisat2
echo -e "LOG: aligning ${ends}" >> ${repRID}.${ref}.alignSampleData.log
if [ "${ends}" == "se" ]
then
hisat2 -p `nproc` --add-chrname -S ${ref}.sampled.sam -x hisat2/genome -U ${fastq1} --summary-file ${ref}.alignSampleSummary.txt --new-summary
elif [ "${ends}" == "pe" ]
then
hisat2 -p `nproc` --add-chrname -S ${ref}.sampled.sam -x hisat2/genome --no-mixed --no-discordant -1 ${fastq1} -2 ${fastq2} --summary-file ${ref}.alignSampleSummary.txt --new-summary
echo -e "LOG: aliged" >> ${repRID}.${ref}.alignSampleData.log
# convert the output sam file to a sorted bam file using Samtools
echo -e "LOG: converting from sam to bam" >> ${repRID}.${ref}.alignSampleData.log
samtools view -1 -@ `nproc` -F 4 -F 8 -F 256 -o ${ref}.sampled.bam ${ref}.sampled.sam
echo -e "LOG: sorting the bam file" >> ${repRID}.${ref}.alignSampleData.log
samtools sort -@ `nproc` -O BAM -o ${ref}.sampled.sorted.bam ${ref}.sampled.bam
echo -e "LOG: indexing sorted bam file" >> ${repRID}.${ref}.alignSampleData.log
samtools index -@ `nproc` -b ${ref}.sampled.sorted.bam ${ref}.sampled.sorted.bam.bai
alignSampleQC.into {
alignSampleQC_inferMetadata
alignSampleQC_aggrQC
}
process inferMetadata {
tag "${repRID}"
input:
path script_inferMeta
path beds from bedInfer.collect()
path bam from sampleBam.collect()
path bai from sampleBai.collect()
path alignSummary from alignSampleQC_inferMetadata.collect()
path "${repRID}.infer_experiment.txt" into inferExperiment
hostname > ${repRID}.inferMetadata.log
ulimit -a >> ${repRID}.inferMetadata.log
# collect alignment rates (round down to integers)
align_ercc=\$(echo \$(grep "Overall alignment rate" ERCC.alignSampleSummary.txt | cut -f2 -d ':' | cut -f2 -d ' ' | tr -d '%'))
echo -e "LOG: alignment rate to ERCC: \${align_ercc}" >> ${repRID}.inferMetadata.log
align_hu=\$(echo \$(grep "Overall alignment rate" GRCh.alignSampleSummary.txt | cut -f2 -d ':' | cut -f2 -d ' ' | tr -d '%'))
echo -e "LOG: alignment rate to GRCh: \${align_hu}" >> ${repRID}.inferMetadata.log
align_mo=\$(echo \$(grep "Overall alignment rate" GRCm.alignSampleSummary.txt | cut -f2 -d ':' | cut -f2 -d ' ' | tr -d '%'))
echo -e "LOG: alignment rate to GRCm: \${align_mo}" >> ${repRID}.inferMetadata.log
if [ 1 -eq \$(echo \$(expr \${align_ercc} ">=" 10)) ]
then
spike="yes"
else
spike="no"
fi
echo -e "LOG: inference of strandedness results is: \${spike}" >> ${repRID}.inferMetadata.log
if [ 1 -eq \$(echo \$(expr \${align_hu} ">=" 40)) ] && [ 1 -eq \$(echo \$(expr \${align_mo} "<" 40)) ]
then
species="Homo sapiens"
bam="GRCh.sampled.sorted.bam"
bed="./GRCh/genome.bed"
elif [ 1 -eq \$(echo \$(expr \${align_mo} ">=" 40)) ] && [ 1 -eq \$(echo \$(expr \${align_hu} "<" 40)) ]
then
species="Mus musculus"
bam="GRCm.sampled.sorted.bam"
bed="./GRCm/genome.bed"
echo -e "LOG: ERROR - inference of species returns an ambiguous result: hu=\${align_hu} mo=\${align_mo}" >> ${repRID}.inferMetadata.log
if [ "${speciesForce}" == "" ]
then
exit 1
fi
fi
if [ "${speciesForce}" != "" ]
then
echo -e "LOG: species overridden to: ${speciesForce}"
species="${speciesForce}"
if [ "${speciesForce}" == "Homo sapiens" ]
then
bam="GRCh.sampled.sorted.bam"
bed="./GRCh/genome.bed"
elif [ "${speciesForce}" == "Mus musculus" ]
then
bam="GRCm.sampled.sorted.bam"
bed="./GRCm/genome.bed"
echo -e "LOG: inference of species results in: \${species}" >> ${repRID}.inferMetadata.log
# infer experimental setting from dedup bam
echo -e "LOG: infer experimental setting from dedup bam" >> ${repRID}.inferMetadata.log
infer_experiment.py -r "\${bed}" -i "\${bam}" 1>> ${repRID}.infer_experiment.txt
ended=`bash ${script_inferMeta} endness ${repRID}.infer_experiment.txt`
fail=`bash ${script_inferMeta} fail ${repRID}.infer_experiment.txt`
percentF=`bash ${script_inferMeta} pef ${repRID}.infer_experiment.txt`
percentR=`bash ${script_inferMeta} per ${repRID}.infer_experiment.txt`
elif [ \${ended} == "SingleEnd" ]
then
ends="se"
percentF=`bash ${script_inferMeta} sef ${repRID}.infer_experiment.txt`
percentR=`bash ${script_inferMeta} ser ${repRID}.infer_experiment.txt`
echo -e "LOG: percentage reads in the same direction as gene: \${percentF}" >> ${repRID}.inferMetadata.log
echo -e "LOG: percentage reads in the opposite direction as gene: \${percentR}" >> ${repRID}.inferMetadata.log
if [ 1 -eq \$(echo \$(expr \${percentF#*.} ">" 2500)) ] && [ 1 -eq \$(echo \$(expr \${percentR#*.} "<" 2500)) ]
then
stranded="forward"
elif [ 1 -eq \$(echo \$(expr \${percentR#*.} ">" 2500)) ] && [ 1 -eq \$(echo \$(expr \${percentF#*.} "<" 2500)) ]
then
stranded="reverse"
else
stranded="unstranded"
fi
echo -e "LOG: stradedness set to: \${stranded}" >> ${repRID}.inferMetadata.log
# write infered metadata to file
echo "\${ends},\${stranded},\${spike},\${species},\${align_ercc},\${align_hu},\${align_mo},\${percentF},\${percentR},\${fail}" 1>> infer.csv
"""
}
// Split metadata into separate channels
endsInfer = Channel.create()
strandedInfer = Channel.create()
spikeInfer = Channel.create()
speciesInfer = Channel.create()
align_erccInfer = Channel.create()
align_huInfer = Channel.create()
align_moInfer = Channel.create()
percentFInfer = Channel.create()
percentRInfer = Channel.create()
failInfer = Channel.create()
inferMetadata_fl.splitCsv(sep: ",", header: false).separate(
endsInfer,
strandedInfer,
spikeInfer,
speciesInfer,
align_erccInfer,
align_huInfer,
align_moInfer,
percentFInfer,
percentRInfer,
failInfer
)
// Replicate metadata for multiple process inputs
endsInfer.into {
endsInfer_checkMetadata
endsInfer_alignData
endsInfer_countData
endsInfer_failExecutionRun
}
strandedInfer.into {
strandedInfer_checkMetadata
strandedInfer_alignData
strandedInfer_countData
strandedInfer_failExecutionRun
spikeInfer_checkMetadata
spikeInfer_failExecutionRun
speciesInfer_checkMetadata
speciesInfer_uploadProcessedFile
speciesInfer_failExecutionRun
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
}
/*
* checkMetadata: checks the submitted metada against infered
*/
process checkMetadata {
tag "${repRID}"
input:
val endsMeta from endsMeta_checkMetadata
val strandedMeta from strandedMeta_checkMetadata
val spikeMeta from spikeMeta_checkMetadata
val speciesMeta from speciesMeta_checkMetadata
val endsInfer from endsInfer_checkMetadata
val strandedInfer from strandedInfer_checkMetadata
val spikeInfer from spikeInfer_checkMetadata
val speciesInfer from speciesInfer_checkMetadata
output:
path ("check.csv") into checkMetadata_fl
path ("outputBagRID.csv") optional true into outputBagRID_fl_dummy
script:
"""
hostname > ${repRID}.checkMetadata.log
ulimit -a >> ${repRID}.checkMetadata.log
pipelineError=false
# check if submitted metadata matches infered
if [ "${endsMeta}" != "${endsInfer}" ]
then
pipelineError=true
pipelineError_ends=true
echo -e "LOG: ends do not match: Submitted=${endsMeta}; Infered=${endsInfer}" >> ${repRID}.checkMetadata.log
else
pipelineError_ends=false
echo -e "LOG: ends matches: Submitted=${endsMeta}; Infered=${endsInfer}" >> ${repRID}.checkMetadata.log
fi
if [ "${strandedMeta}" != "${strandedInfer}" ]
then
if [ "${strandedMeta}" == "unstranded" ]
then
pipelineError=true
pipelineError_stranded=true
echo -e "LOG: stranded does not match: Submitted=${strandedMeta}; Infered=${strandedInfer}" >> ${repRID}.checkMetadata.log
elif [ "${strandedInfer}" != "forward"] || [ "${strandedInfer}" != "reverse" ]
then
pipelineError=true
pipelineError_stranded=true
echo -e "LOG: stranded does not match: Submitted=${strandedMeta}; Infered=${strandedInfer}" >> ${repRID}.checkMetadata.log
else
pipelineError_stranded=false
echo -e "LOG: stranded matches: Submitted=${strandedMeta}; Infered=${strandedInfer}" >> ${repRID}.checkMetadata.log
fi
else
pipelineError_stranded=false
echo -e "LOG: stranded matches: Submitted=${strandedMeta}; Infered=${strandedInfer}" >> ${repRID}.checkMetadata.log
fi
if [ "${spikeMeta}" != "${spikeInfer}" ]
then
pipelineError=true
pipelineError_spike=true
echo -e "LOG: spike does not match: Submitted=${spikeMeta}; Infered=${spikeInfer}" >> ${repRID}.checkMetadata.log
else
pipelineError_spike=false
echo -e "LOG: stranded matches: Submitted=${spikeMeta}; Infered=${spikeInfer}" >> ${repRID}.checkMetadata.log
fi
if [ "${speciesMeta}" != "${speciesInfer}" ]
then
pipelineError=true
pipelineError_species=true
echo -e "LOG: species does not match: Submitted=${speciesMeta}; Infered=${speciesInfer}" >> ${repRID}.checkMetadata.log
else
pipelineError_species=false
echo -e "LOG: species matches: Submitted=${speciesMeta}; Infered=${speciesInfer}" >> ${repRID}.checkMetadata.log
fi
# create dummy output bag rid if failure
if [ \${pipelineError} == true ]
then
echo "fail" 1>> outputBagRID.csv
fi
# write checks to file
echo "\${pipelineError},\${pipelineError_ends},\${pipelineError_stranded},\${pipelineError_spike},\${pipelineError_species}" 1>> check.csv
"""
}
// Split errors into separate channels
pipelineError = Channel.create()
pipelineError_ends = Channel.create()
pipelineError_stranded = Channel.create()
pipelineError_spike = Channel.create()
pipelineError_species = Channel.create()
checkMetadata_fl.splitCsv(sep: ",", header: false).separate(
pipelineError,
pipelineError_ends,
pipelineError_stranded,
pipelineError_spike,
pipelineError_species
)
// Replicate errors for multiple process inputs
pipelineError.into {
pipelineError_getRef
pipelineError_alignData
pipelineError_dedupData
pipelineError_makeBigWig
pipelineError_countData
pipelineError_fastqc
pipelineError_dataQC
pipelineError_aggrQC
pipelineError_uploadQC
pipelineError_uploadProcessedFile
pipelineError_uploadOutputBag
pipelineError_failExecutionRun
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
* uploadInputBag: uploads the input bag
*/
process uploadInputBag {
tag "${repRID}"
input:
path script_uploadInputBag
path credential, stageAs: "credential.json" from deriva_uploadInputBag
path inputBag from inputBag_uploadInputBag
val studyRID from studyRID_uploadInputBag
output:
path ("inputBagRID.csv") into inputBagRID_fl
when:
upload
script:
"""
hostname > ${repRID}.uploadInputBag.log
ulimit -a >> ${repRID}.uploadInputBag.log
yr=\$(date +'%Y')
mn=\$(date +'%m')
dy=\$(date +'%d')
file=\$(basename -a ${inputBag})
md5=\$(md5sum ./\${file} | awk '{ print \$1 }')
echo LOG: ${repRID} input bag md5 sum - \${md5} >> ${repRID}.uploadInputBag.log
size=\$(wc -c < ./\${file})
echo LOG: ${repRID} input bag size - \${size} bytes >> ${repRID}.uploadInputBag.log
exist=\$(curl -s https://${source}/ermrest/catalog/2/entity/RNASeq:Input_Bag/File_MD5=\${md5})
if [ "\${exist}" == "[]" ]
then
cookie=\$(cat credential.json | grep -A 1 '\\"${source}\\": {' | grep -o '\\"cookie\\": \\".*\\"')
cookie=\${cookie:11:-1}
loc=\$(deriva-hatrac-cli --host ${source} put ./\${file} /hatrac/resources/rnaseq/pipeline/input_bag/study/${studyRID}/replicate/${repRID}/\${file} --parents)
inputBag_rid=\$(python3 ${script_uploadInputBag} -f \${file} -l \${loc} -s \${md5} -b \${size} -o ${source} -c \${cookie})
echo LOG: input bag RID uploaded - \${inputBag_rid} >> ${repRID}.uploadInputBag.log
rid=\${inputBag_rid}
else
exist=\$(echo \${exist} | grep -o '\\"RID\\":\\".*\\",\\"RCT')