diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index f9a5b7b8b017f73d5f0f86efb4684f1ff2ba930e..c2b63ce58bde32bc80c7bfbd5ee2792bad99fdb4 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -109,7 +109,7 @@ parseMetadata: - study=$(singularity run 'docker://gudmaprbk/python3:1.0.0' python3 ./workflow/scripts/parse_meta.py -r Replicate_RID -m "./test_data/meta/metaTest.csv" -p studyRID) - endsRaw=$(singularity run 'docker://gudmaprbk/python3:1.0.0' python3 ./workflow/scripts/parse_meta.py -r Replicate_RID -m "./test_data/meta/metaTest.csv" -p endsMeta) - endsMeta="uk" - - endsManual=$(singularity run 'docker://gudmaprbk/python3:1.0.0' python3 ./workflow/scripts/parse_meta.py -r Replicate_RID -m "./test_data/meta/metaTest.csv" -p endsManual) + - endsManual="se" - stranded=$(singularity run 'docker://gudmaprbk/python3:1.0.0' python3 ./workflow/scripts/parse_meta.py -r Replicate_RID -m "./test_data/meta/metaTest.csv" -p stranded) - spike=$(singularity run 'docker://gudmaprbk/python3:1.0.0' python3 ./workflow/scripts/parse_meta.py -r Replicate_RID -m "./test_data/meta/metaTest.csv" -p spike) - species=$(singularity run 'docker://gudmaprbk/python3:1.0.0' python3 ./workflow/scripts/parse_meta.py -r Replicate_RID -m "./test_data/meta/metaTest.csv" -p species) @@ -750,6 +750,36 @@ failMismatchR1R2: when: - always +failUnexpectedMeta: + stage: integration + only: [merge_requests] + except: + variables: + - $CI_MERGE_REQUEST_TARGET_BRANCH_NAME =~ /master/ + script: + - hostname + - ulimit -a + - nextflow -q run ./workflow/rna-seq.nf --deriva ./test_data/auth/credential.json --bdbag ./test_data/auth/cookies.txt --repRID 14-3R4R --source staging --upload true -with-dag dag.png --dev false --ci true + retry: + max: 0 + when: + - always + +failFileStructure: + stage: integration + only: [merge_requests] + except: + variables: + - $CI_MERGE_REQUEST_TARGET_BRANCH_NAME =~ /master/ + script: + - hostname + - ulimit -a + - nextflow -q run ./workflow/rna-seq.nf --deriva ./test_data/auth/credential.json --bdbag ./test_data/auth/cookies.txt --repRID Q-Y5HT --source staging --upload true -with-dag dag.png --dev false --ci true + retry: + max: 0 + when: + - always + override_inputBag: stage: integration only: [merge_requests] diff --git a/CHANGELOG.md b/CHANGELOG.md index d291ce8ff00500d597cff6db858ce18e0664aa19..b4dfcdf3266b106d22d254bad61ec9a2bb611d2e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,27 @@ +# v1.0.3 (in development) +**User Facing** + +**Background** +* Add memory limit (75%) per thread for samtools sort (#108) +* Remove parsing restrictions for submitted stranded/spike/species (#105, #106) +* Pass unidentified ends instead of overwriting it as unknown +* Move fastqc process before trim to catch fastq errors (#107) +* Only use fastq's that match *[_.]R[1-2].fastq.gz naming convention (#107) +* Add error output for no fastq's +* Update input bag export config to only fetch fastq's that match *[_.]R[1-2].fastq.gz naming convention +* Remove check for multiple fastq check in parse metadata (redundant and no longer valid) +* Handle blank submitted endness better +* Don't use file.csv from inputBag to parse manual endness, use counted from getData +* Detect malformed fastq's (#107) +* Restrict sampled alignment process to use >32GB nodes on BioHPC (#108) +* Use nproc**-1** for alignment processes (#108) + +*Known Bugs* +* Override params (inputBag, fastq, species) aren't checked for integrity +* Authentication files and tokens must be active (active auth client) for the duration of the pipeline run (until long-lived token utilization included) + +<hr> + # v1.0.2 **User Facing** diff --git a/docs/dag.png b/docs/dag.png index 74bf1dbbf26f30d9cee216682aa795393db24ae8..7eb7c79f097a7fcac368a58ef63cfda42b154f41 100755 Binary files a/docs/dag.png and b/docs/dag.png differ diff --git a/workflow/conf/Replicate_For_Input_Bag.json b/workflow/conf/Replicate_For_Input_Bag.json index 278d0bf4d9d9f5074d7e3c4ef948287eb97ed767..842cf62fbb5237481a62173ff88be71fb22d04a4 100644 --- a/workflow/conf/Replicate_For_Input_Bag.json +++ b/workflow/conf/Replicate_For_Input_Bag.json @@ -89,7 +89,7 @@ "processor": "fetch", "processor_params": { "output_path": "assets/Study/{Study_RID}/Experiment/{Experiment_RID}/Replicate/{Replicate_RID}", - "query_path": "/attribute/M:=RNASeq:Replicate/RID={rid}/(RID)=(RNASeq:File:Replicate_RID)/File_Type=FastQ/url:=URI,length:=File_size,filename:=File_Name,md5:=MD5,Study_RID,Experiment_RID,Replicate_RID?limit=none" + "query_path": "/attribute/M:=RNASeq:Replicate/RID={rid}/(RID)=(RNASeq:File:Replicate_RID)/File_Type=FastQ/File_Name::ciregexp::%5B_.%5DR%5B12%5D%5C.fastq%5C.gz/url:=URI,length:=File_size,filename:=File_Name,md5:=MD5,Study_RID,Experiment_RID,Replicate_RID?limit=none" } } ] diff --git a/workflow/conf/aws.config b/workflow/conf/aws.config index eec8a7a829280d29006d9f0dc9f9b9932d9a9867..d87669599a7e70add448f0cc7f0636dd8bef499b 100644 --- a/workflow/conf/aws.config +++ b/workflow/conf/aws.config @@ -116,6 +116,10 @@ process { cpus = 1 memory = '1 GB' } + withName:failPreExecutionRun_fastqFile { + cpus = 1 + memory = '1 GB' + } withName:failPreExecutionRun_species { { cpus = 1 diff --git a/workflow/conf/biohpc.config b/workflow/conf/biohpc.config index 33917dbe810969bc6de2482dba00df16b06547ce..cc058dfef74bc49adf3932554994678934ac7a44 100755 --- a/workflow/conf/biohpc.config +++ b/workflow/conf/biohpc.config @@ -32,7 +32,7 @@ process { executor = 'local' } withName:alignSampleData { - queue = 'super' + queue = '128GB,256GB,256GBv1,384GB' } withName:inferMetadata { queue = 'super' @@ -85,6 +85,9 @@ process { withName:failPreExecutionRun_fastq { executor = 'local' } + withName:failPreExecutionRun_fastqFile { + executor = 'local' + } withName:failPreExecutionRun_species { executor = 'local' } diff --git a/workflow/nextflow.config b/workflow/nextflow.config index 66b847beffb6499b9ba16678887ed0488c2d12cb..29c8515a852fb3356335b89a71f4ddf4f2c7a78b 100644 --- a/workflow/nextflow.config +++ b/workflow/nextflow.config @@ -91,6 +91,9 @@ process { withName:failPreExecutionRun_fastq { container = 'gudmaprbk/deriva1.4:1.0.0' } + withName:failPreExecutionRun_fastqFile { + container = 'gudmaprbk/deriva1.4:1.0.0' + } withName:failPreExecutionRun_species { container = 'gudmaprbk/deriva1.4:1.0.0' } @@ -125,6 +128,6 @@ manifest { homePage = 'https://git.biohpc.swmed.edu/gudmap_rbk/rna-seq' description = 'This pipeline was created to be a standard mRNA-sequencing analysis pipeline which integrates with the GUDMAP and RBK consortium data-hub.' mainScript = 'rna-seq.nf' - version = 'v1.0.2' + version = 'v1.0.3' nextflowVersion = '>=19.09.0' } diff --git a/workflow/rna-seq.nf b/workflow/rna-seq.nf index fb7158f9272d8731a98b6f7a4eb74bd03fe8b11b..e5962054809e06611db398955e62eab378ed41e4 100644 --- a/workflow/rna-seq.nf +++ b/workflow/rna-seq.nf @@ -48,6 +48,7 @@ deriva.into { deriva_uploadOutputBag deriva_finalizeExecutionRun deriva_failPreExecutionRun_fastq + deriva_failPreExecutionRun_fastqFile deriva_failPreExecutionRun_species deriva_failExecutionRun } @@ -100,6 +101,7 @@ script_uploadInputBag = Channel.fromPath("${baseDir}/scripts/upload_input_bag.py script_uploadExecutionRun_uploadExecutionRun = Channel.fromPath("${baseDir}/scripts/upload_execution_run.py") script_uploadExecutionRun_finalizeExecutionRun = Channel.fromPath("${baseDir}/scripts/upload_execution_run.py") script_uploadExecutionRun_failPreExecutionRun_fastq = Channel.fromPath("${baseDir}/scripts/upload_execution_run.py") +script_uploadExecutionRun_failPreExecutionRun_fastqFile = Channel.fromPath("${baseDir}/scripts/upload_execution_run.py") script_uploadExecutionRun_failPreExecutionRun_species = Channel.fromPath("${baseDir}/scripts/upload_execution_run.py") script_uploadExecutionRun_failExecutionRun = Channel.fromPath("${baseDir}/scripts/upload_execution_run.py") script_uploadQC = Channel.fromPath("${baseDir}/scripts/upload_qc.py") @@ -267,6 +269,10 @@ process getData { echo -e "LOG: fetched" >> ${repRID}.getData.log fastqCount=\$(ls *.fastq.gz | wc -l) + if [ "\${fastqCount}" == "0" ] + then + touch dummy.R1.fastq.gz + fi echo "\${fastqCount}" > fastqCount.csv """ } @@ -284,12 +290,12 @@ if (fastqsForce != "") { .ifEmpty { exit 1, "override inputBag file not found: ${fastqsForce}" } .collect().into { fastqs_parseMetadata - fastqs_trimData + fastqs_fastqc } } else { - fastqs.into { + fastqs.collect().into { fastqs_parseMetadata - fastqs_trimData + fastqs_fastqc } } @@ -304,7 +310,7 @@ process parseMetadata { path file from fileMeta path experimentSettings, stageAs: "ExperimentSettings.csv" from experimentSettingsMeta path experiment from experimentMeta - path (fastq) from fastqs_parseMetadata + path (fastq) from fastqs_parseMetadata.collect() val fastqCount output: @@ -337,17 +343,20 @@ process parseMetadata { elif [ "\${endsRaw}" == "Paired End" ] then endsMeta="pe" - else - endsMeta="unknown" - fi - if [ "\${endsRaw}" == "" ] + elif [ "\${endsRaw}" == "nan" ] then endsRaw="_No value_" + endsMeta="NA" fi # ganually get endness - endsManual=\$(python3 ${script_parseMeta} -r ${repRID} -m "${file}" -p endsManual) - echo -e "LOG: endedness manually detected: \${endsManual}" >> ${repRID}.parseMetadata.log + if [ "${fastqCount}" == "1" ] + then + endsManual="se" + else + endsManual="pe" + fi + echo -e "LOG: endedness manually detected: ${fastqCount}" >> ${repRID}.parseMetadata.log # get strandedness metadata stranded=\$(python3 ${script_parseMeta} -r ${repRID} -m "${experimentSettings}" -p stranded) @@ -376,6 +385,10 @@ process parseMetadata { then fastqCountError=true fastqCountError_details="**Too many fastqs detected (>2)**" + elif [ "${fastqCount}" -eq "0" ] + then + fastqCountError=true + fastqCountError_details="**No valid fastqs detected \\(may not match .R{12}.fastq.gz convention\\)**" elif [ "\${endsMeta}" == "se" ] && [ "${fastqCount}" -ne "1" ] then fastqCountError=true @@ -451,6 +464,7 @@ spikeMeta.into { spikeMeta_checkMetadata spikeMeta_aggrQC spikeMeta_failPreExecutionRun_fastq + spikeMeta_failPreExecutionRun_fastqFile spikeMeta_failPreExecutionRun_species spikeMeta_failExecutionRun } @@ -458,6 +472,7 @@ speciesMeta.into { speciesMeta_checkMetadata speciesMeta_aggrQC speciesMeta_failPreExecutionRun_fastq + speciesMeta_failPreExecutionRun_fastqFile speciesMeta_failPreExecutionRun_species speciesMeta_failExecutionRun } @@ -486,6 +501,7 @@ fastqError_fl.splitCsv(sep: ",", header: false).separate( // Replicate errors for multiple process inputs fastqCountError.into { + fastqCountError_fastqc fastqCountError_trimData fastqCountError_getRefInfer fastqCountError_downsampleData @@ -498,7 +514,6 @@ fastqCountError.into { fastqCountError_dedupData fastqCountError_makeBigWig fastqCountError_countData - fastqCountError_fastqc fastqCountError_dataQC fastqCountError_aggrQC fastqCountError_uploadQC @@ -507,6 +522,7 @@ fastqCountError.into { fastqCountError_failPreExecutionRun_fastq } fastqReadError.into { + fastqReadError_fastqc fastqReadError_trimData fastqReadError_getRefInfer fastqReadError_downsampleData @@ -519,7 +535,6 @@ fastqReadError.into { fastqReadError_dedupData fastqReadError_makeBigWig fastqReadError_countData - fastqReadError_fastqc fastqReadError_dataQC fastqReadError_aggrQC fastqReadError_uploadQC @@ -528,6 +543,98 @@ fastqReadError.into { fastqReadError_failPreExecutionRun_fastq } +/* + *fastqc: run fastqc on untrimmed fastq's +*/ +process fastqc { + tag "${repRID}" + + input: + path (fastq) from fastqs_fastqc.collect() + val fastqCountError_fastqc + val fastqReadError_fastqc + + output: + path ("*.R{1,2}.fastq.gz", includeInputs:true) into fastqs_trimData + path ("*_fastqc.zip") into fastqc + path ("rawReads.csv") into rawReadsInfer_fl + path "fastqFileError.csv" into fastqFileError_fl + + when: + fastqCountError_fastqc == 'false' && fastqReadError_fastqc == 'false' + + script: + """ + hostname > ${repRID}.fastqc.log + ulimit -a >> ${repRID}.fastqc.log + + # run fastqc + echo -e "LOG: running fastq on raw fastqs" >> ${repRID}.fastqc.log + fastqc *.fastq.gz -o . &> fastqc.out || true + fastqcErrorOut=\$(cat fastqc.out | grep -c 'Failed to process file') || fastqcErrorOut=0 + fastqFileError=false + fastqFileError_details="" + if [ "\${fastqcErrorOut}" -ne "0" ] + then + fastqFileError=true + fastqFileError_details="**There is an error with the structure of the fastq**" + echo -e "LOG: There is an error with the structure of the fastq" >> ${repRID}.fastqc.log + touch dummy_fastqc.zip + else + echo -e "LOG: The structure of the fastq is correct" >> ${repRID}.fastqc.log + fi + + # count raw reads + zcat *.R1.fastq.gz | echo \$((`wc -l`/4)) > rawReads.csv + + # save fastq error file + echo "\${fastqFileError},\${fastqFileError_details}" > fastqFileError.csv + """ +} + +// Extract number of raw reads metadata into channel +rawReadsInfer = Channel.create() +rawReadsInfer_fl.splitCsv(sep: ",", header: false).separate( + rawReadsInfer +) + +// Replicate inferred raw reads for multiple process inputs +rawReadsInfer.into { + rawReadsInfer_aggrQC + rawReadsInfer_uploadQC +} + +// Split fastq count error into separate channel +fastqFileError = Channel.create() +fastqFileError_details = Channel.create() +fastqFileError_fl.splitCsv(sep: ",", header: false).separate( + fastqFileError, + fastqFileError_details +) + +// Replicate errors for multiple process inputs +fastqFileError.into { + fastqFileError_fastqc + fastqFileError_trimData + fastqFileError_getRefInfer + fastqFileError_downsampleData + fastqFileError_alignSampleData + fastqFileError_inferMetadata + fastqFileError_checkMetadata + fastqFileError_uploadExecutionRun + fastqFileError_getRef + fastqFileError_alignData + fastqFileError_dedupData + fastqFileError_makeBigWig + fastqFileError_countData + fastqFileError_dataQC + fastqFileError_aggrQC + fastqFileError_uploadQC + fastqFileError_uploadProcessedFile + fastqFileError_uploadOutputBag + fastqFileError_failPreExecutionRun_fastqFile +} + /* * trimData: trims any adapter or non-host sequences from the data */ @@ -539,16 +646,17 @@ process trimData { val ends from endsManual_trimData val fastqCountError_trimData val fastqReadError_trimData + val fastqFileError_trimData output: path ("*.fq.gz") into fastqsTrim - path ("*.fastq.gz", includeInputs:true) into fastqs_fastqc path ("*_trimming_report.txt") into trimQC path ("readLength.csv") into readLengthInfer_fl when: fastqCountError_trimData == "false" fastqReadError_trimData == "false" + fastqFileError_trimData == "false" script: """ @@ -592,7 +700,7 @@ fastqsTrim.into { } // Combine inputs of getRefInfer -getRefInferInput = referenceInfer.combine(deriva_getRefInfer.combine(script_refDataInfer.combine(fastqCountError_getRefInfer.combine(fastqReadError_getRefInfer)))) +getRefInferInput = referenceInfer.combine(deriva_getRefInfer.combine(script_refDataInfer.combine(fastqCountError_getRefInfer.combine(fastqReadError_getRefInfer.combine(fastqFileError_getRefInfer))))) /* * getRefInfer: dowloads appropriate reference for metadata inference @@ -601,7 +709,7 @@ process getRefInfer { tag "${refName}" input: - tuple val (refName), path (credential, stageAs: "credential.json"), path (script_refDataInfer), val (fastqCountError), val (fastqReadError) from getRefInferInput + tuple val (refName), path (credential, stageAs: "credential.json"), path (script_refDataInfer), val (fastqCountError), val (fastqReadError), val (fastqFileError) from getRefInferInput output: tuple val (refName), path ("hisat2", type: 'dir'), path ("*.fna"), path ("*.gtf") into refInfer @@ -610,6 +718,7 @@ process getRefInfer { when: fastqCountError == "false" fastqReadError == "false" + fastqFileError == "false" script: """ @@ -687,6 +796,7 @@ process downsampleData { val ends from endsManual_downsampleData val fastqCountError_downsampleData val fastqReadError_downsampleData + val fastqFileError_downsampleData output: path ("sampled.1.fq") into fastqs1Sample @@ -695,6 +805,7 @@ process downsampleData { when: fastqCountError_downsampleData == "false" fastqReadError_downsampleData == "false" + fastqFileError_downsampleData == "false" script: """ @@ -718,7 +829,7 @@ process downsampleData { } // Replicate the dowsampled fastq's and attatched to the references -inferInput = endsManual_alignSampleData.combine(refInfer.combine(fastqs1Sample.collect().combine(fastqs2Sample.collect().combine(fastqCountError_alignSampleData.combine(fastqReadError_alignSampleData))))) +inferInput = endsManual_alignSampleData.combine(refInfer.combine(fastqs1Sample.collect().combine(fastqs2Sample.collect().combine(fastqCountError_alignSampleData.combine(fastqReadError_alignSampleData.combine(fastqFileError_alignSampleData)))))) /* * alignSampleData: aligns the downsampled reads to a reference database @@ -727,7 +838,7 @@ process alignSampleData { tag "${ref}" input: - tuple val (ends), val (ref), path (hisat2), path (fna), path (gtf), path (fastq1), path (fastq2), val (fastqCountError), val (fastqReadError) from inferInput + tuple val (ends), val (ref), path (hisat2), path (fna), path (gtf), path (fastq1), path (fastq2), val (fastqCountError), val (fastqReadError), val (fastqFileError) from inferInput output: path ("${ref}.sampled.sorted.bam") into sampleBam @@ -737,6 +848,7 @@ process alignSampleData { when: fastqCountError == "false" fastqReadError == "false" + fastqFileError == "false" script: """ @@ -761,7 +873,10 @@ process alignSampleData { # sort the bam file using Samtools echo -e "LOG: sorting the bam file" >> ${repRID}.${ref}.alignSampleData.log - samtools sort -@ `nproc` -O BAM -o ${ref}.sampled.sorted.bam ${ref}.sampled.bam + proc=\$(expr `nproc` - 1) + mem=\$(vmstat -s -S K | grep 'total memory' | grep -o '[0-9]*') + mem=\$(expr \${mem} / \${proc} \\* 85 / 100) + samtools sort -@ \${proc} -m \${mem}K -O BAM -o ${ref}.sampled.sorted.bam ${ref}.sampled.bam # index the sorted bam using Samtools echo -e "LOG: indexing sorted bam file" >> ${repRID}.${ref}.alignSampleData.log @@ -785,6 +900,7 @@ process inferMetadata { path alignSummary from alignSampleQC_inferMetadata.collect() val fastqCountError_inferMetadata val fastqReadError_inferMetadata + val fastqFileError_inferMetadata output: path "infer.csv" into inferMetadata_fl @@ -794,6 +910,7 @@ process inferMetadata { when: fastqCountError_inferMetadata == "false" fastqReadError_inferMetadata == "false" + fastqFileError_inferMetadata == "false" script: """ @@ -1011,6 +1128,7 @@ process checkMetadata { val speciesInfer from speciesInfer_checkMetadata val fastqCountError_checkMetadata val fastqReadError_checkMetadata + val fastqFileError_checkMetadata val speciesError_checkMetadata output: @@ -1020,6 +1138,7 @@ process checkMetadata { when: fastqCountError_checkMetadata == "false" fastqReadError_checkMetadata == "false" + fastqFileError_checkMetadata == "false" speciesError_checkMetadata == "false" script: @@ -1185,6 +1304,7 @@ inputBagRID.into { inputBagRID_uploadExecutionRun inputBagRID_finalizeExecutionRun inputBagRID_failPreExecutionRun_fastq + inputBagRID_failPreExecutionRun_fastqFile inputBagRID_failPreExecutionRun_species inputBagRID_failExecutionRun } @@ -1203,6 +1323,7 @@ process uploadExecutionRun { val inputBagRID from inputBagRID_uploadExecutionRun val fastqCountError_uploadExecutionRun val fastqReadError_uploadExecutionRun + val fastqFileError_uploadExecutionRun val speciesError_uploadExecutionRun output: @@ -1212,6 +1333,7 @@ process uploadExecutionRun { upload fastqCountError_uploadExecutionRun == "false" fastqReadError_uploadExecutionRun == "false" + fastqFileError_uploadExecutionRun == "false" speciesError_uploadExecutionRun == "false" script: @@ -1298,6 +1420,7 @@ process getRef { val species from speciesInfer_getRef val fastqCountError_getRef val fastqReadError_getRef + val fastqFileError_getRef val speciesError_getRef val pipelineError_getRef @@ -1307,6 +1430,7 @@ process getRef { when: fastqCountError_getRef == "false" fastqReadError_getRef == "false" + fastqFileError_getRef == "false" speciesError_getRef == "false" pipelineError_getRef == "false" @@ -1398,6 +1522,7 @@ process alignData { val stranded from strandedInfer_alignData val fastqCountError_alignData val fastqReadError_alignData + val fastqFileError_alignData val speciesError_alignData val pipelineError_alignData @@ -1408,6 +1533,7 @@ process alignData { when: fastqCountError_alignData == "false" fastqReadError_alignData == "false" + fastqFileError_alignData == "false" speciesError_alignData == "false" pipelineError_alignData == "false" @@ -1451,7 +1577,10 @@ process alignData { # sort the bam file using Samtools echo -e "LOG: sorting the bam file" >> ${repRID}.align.log - samtools sort -@ `nproc` -O BAM -o ${repRID}.sorted.bam ${repRID}.bam + proc=\$(expr `nproc` - 1) + mem=\$(vmstat -s -S K | grep 'total memory' | grep -o '[0-9]*') + mem=\$(expr \${mem} / \${proc} \\* 75 / 100) + samtools sort -@ \${proc} -m \${mem}K -O BAM -o ${repRID}.sorted.bam ${repRID}.bam # index the sorted bam using Samtools echo -e "LOG: indexing sorted bam file" >> ${repRID}.align.log @@ -1475,6 +1604,7 @@ process dedupData { tuple path (bam), path (bai) from rawBam_dedupData val fastqCountError_dedupData val fastqReadError_dedupData + val fastqFileError_dedupData val speciesError_dedupData val pipelineError_dedupData @@ -1486,6 +1616,7 @@ process dedupData { when: fastqCountError_dedupData == 'false' fastqReadError_dedupData == 'false' + fastqFileError_dedupData == 'false' speciesError_dedupData == 'false' pipelineError_dedupData == 'false' @@ -1534,6 +1665,7 @@ process makeBigWig { tuple path (bam), path (bai) from dedupBam_makeBigWig val fastqCountError_makeBigWig val fastqReadError_makeBigWig + val fastqFileError_makeBigWig val speciesError_makeBigWig val pipelineError_makeBigWig @@ -1543,6 +1675,7 @@ process makeBigWig { when: fastqCountError_makeBigWig == 'false' fastqReadError_makeBigWig == 'false' + fastqFileError_makeBigWig == 'false' speciesError_makeBigWig == 'false' pipelineError_makeBigWig == 'false' @@ -1574,6 +1707,7 @@ process countData { val stranded from strandedInfer_countData val fastqCountError_countData val fastqReadError_countData + val fastqFileError_countData val speciesError_countData val pipelineError_countData @@ -1585,6 +1719,7 @@ process countData { when: fastqCountError_countData == 'false' fastqReadError_countData == 'false' + fastqFileError_countData == 'false' speciesError_countData == 'false' pipelineError_countData == 'false' @@ -1645,55 +1780,6 @@ assignedReadsInfer.into { assignedReadsInfer_uploadQC } -/* - *fastqc: run fastqc on untrimmed fastq's -*/ -process fastqc { - tag "${repRID}" - - input: - path (fastq) from fastqs_fastqc - val fastqCountError_fastqc - val fastqReadError_fastqc - val speciesError_fastqc - val pipelineError_fastqc - - output: - path ("*_fastqc.zip") into fastqc - path ("rawReads.csv") into rawReadsInfer_fl - - when: - fastqCountError_fastqc == 'false' - fastqReadError_fastqc == 'false' - speciesError_fastqc == 'false' - pipelineError_fastqc == 'false' - - script: - """ - hostname > ${repRID}.fastqc.log - ulimit -a >> ${repRID}.fastqc.log - - # run fastqc - echo -e "LOG: running fastq on raw fastqs" >> ${repRID}.fastqc.log - fastqc *.fastq.gz -o . - - # count raw reads - zcat *.R1.fastq.gz | echo \$((`wc -l`/4)) > rawReads.csv - """ -} - -// Extract number of raw reads metadata into channel -rawReadsInfer = Channel.create() -rawReadsInfer_fl.splitCsv(sep: ",", header: false).separate( - rawReadsInfer -) - -// Replicate inferred raw reads for multiple process inputs -rawReadsInfer.into { - rawReadsInfer_aggrQC - rawReadsInfer_uploadQC -} - /* *dataQC: calculate transcript integrity numbers (TIN) and bin as well as calculate innerdistance of PE replicates */ @@ -1708,6 +1794,7 @@ process dataQC { val ends from endsInfer_dataQC val fastqCountError_dataQC val fastqReadError_dataQC + val fastqFileError_dataQC val speciesError_dataQC val pipelineError_dataQC @@ -1719,6 +1806,7 @@ process dataQC { when: fastqCountError_dataQC == 'false' fastqReadError_dataQC == 'false' + fastqFileError_dataQC == 'false' speciesError_dataQC == 'false' pipelineError_dataQC == 'false' @@ -1804,6 +1892,7 @@ process aggrQC { val expRID from expRID_aggrQC val fastqCountError_aggrQC val fastqReadError_aggrQC + val fastqFileError_aggrQC val speciesError_aggrQC val pipelineError_aggrQC @@ -1814,6 +1903,7 @@ process aggrQC { when: fastqCountError_aggrQC == 'false' fastqReadError_aggrQC == 'false' + fastqFileError_aggrQC == 'false' speciesError_aggrQC == 'false' pipelineError_aggrQC == 'false' @@ -1906,6 +1996,7 @@ process uploadQC { val tinMed from tinMedInfer_uploadQC val fastqCountError_uploadQC val fastqReadError_uploadQC + val fastqFileError_uploadQC val speciesError_uploadQC val pipelineError_uploadQC @@ -1916,6 +2007,7 @@ process uploadQC { upload fastqCountError_uploadQC == 'false' fastqReadError_uploadQC == 'false' + fastqFileError_uploadQC == 'false' speciesError_uploadQC == 'false' pipelineError_uploadQC == 'false' @@ -1982,6 +2074,7 @@ process uploadProcessedFile { val executionRunRID from executionRunRID_uploadProcessedFile val fastqCountError_uploadProcessedFile val fastqReadError_uploadProcessedFile + val fastqFileError_uploadProcessedFile val speciesError_uploadProcessedFile val pipelineError_uploadProcessedFile @@ -1992,6 +2085,7 @@ process uploadProcessedFile { upload fastqCountError_uploadProcessedFile == 'false' fastqReadError_uploadProcessedFile == 'false' + fastqFileError_uploadProcessedFile == 'false' speciesError_uploadProcessedFile == 'false' pipelineError_uploadProcessedFile == 'false' @@ -2074,6 +2168,7 @@ process uploadOutputBag { val executionRunRID from executionRunRID_uploadOutputBag val fastqCountError_uploadOutputBag val fastqReadError_uploadOutputBag + val fastqFileError_uploadOutputBag val speciesError_uploadOutputBag val pipelineError_uploadOutputBag @@ -2084,6 +2179,7 @@ process uploadOutputBag { upload fastqCountError_uploadOutputBag == 'false' fastqReadError_uploadOutputBag == 'false' + fastqFileError_uploadOutputBag == 'false' speciesError_uploadOutputBag == 'false' pipelineError_uploadOutputBag == 'false' @@ -2256,6 +2352,86 @@ process failPreExecutionRun_fastq { """ } +/* + * failPreExecutionRun_fastqFile: fail the execution run prematurely for fastqFile errors +*/ +process failPreExecutionRun_fastqFile { + tag "${repRID}" + + input: + path script_uploadExecutionRun from script_uploadExecutionRun_failPreExecutionRun_fastqFile + path credential, stageAs: "credential.json" from deriva_failPreExecutionRun_fastqFile + val spike from spikeMeta_failPreExecutionRun_fastqFile + val species from speciesMeta_failPreExecutionRun_fastqFile + val inputBagRID from inputBagRID_failPreExecutionRun_fastqFile + val fastqFileError from fastqFileError_failPreExecutionRun_fastqFile + val fastqFileError_details + + when: + upload + fastqFileError == 'true' + + script: + """ + hostname > ${repRID}.failPreExecutionRun_fastqfile.log + ulimit -a >> ${repRID}.failPreExecutionRun_fastqfile.log + + errorDetails="" + if [ ${fastqFileError} == true ] + then + errorDetails=\$(echo \$(errorDetails)${fastqFileError_details}"\\n") + fi + + echo LOG: searching for workflow RID - BICF mRNA ${workflow.manifest.version} >> ${repRID}.failPreExecutionRun_fastqfile.log + workflow=\$(curl -s https://${source}/ermrest/catalog/2/entity/RNASeq:Workflow/Name=BICF%20mRNA%20Replicate/Version=${workflow.manifest.version}) + workflow=\$(echo \${workflow} | grep -o '\\"RID\\":\\".*\\",\\"RCT') + workflow=\${workflow:7:-6} + echo LOG: workflow RID extracted - \${workflow} >> ${repRID}.failPreExecutionRun_fastqfile.log + + if [ "${species}" == "Homo sapiens" ] + then + genomeName=\$(echo GRCh${refHuVersion}) + elif [ "${species}" == "Mus musculus" ] + then + genomeName=\$(echo GRCm${refMoVersion}) + fi + if [ "${spike}" == "yes" ] + then + genomeName=\$(echo \${genomeName}-S) + fi + echo LOG: searching for genome name - \${genomeName} >> ${repRID}.failPreExecutionRun_fastqfile.log + genome=\$(curl -s https://${source}/ermrest/catalog/2/entity/RNASeq:Reference_Genome/Name=\${genomeName}) + genome=\$(echo \${genome} | grep -o '\\"RID\\":\\".*\\",\\"RCT') + genome=\${genome:7:-6} + echo LOG: genome RID extracted - \${genome} >> ${repRID}.failPreExecutionRun_fastqfile.log + + cookie=\$(cat credential.json | grep -A 1 '\\"${source}\\": {' | grep -o '\\"cookie\\": \\".*\\"') + cookie=\${cookie:11:-1} + + exist=\$(curl -s https://${source}/ermrest/catalog/2/entity/RNASeq:Execution_Run/Workflow=\${workflow}/Replicate=${repRID}/Input_Bag=${inputBagRID}) + echo \${exist} >> ${repRID}.failPreExecutionRun_fastqfile.log + if [ "\${exist}" == "[]" ] + then + rid=\$(python3 ${script_uploadExecutionRun} -r ${repRID} -w \${workflow} -g \${genome} -i ${inputBagRID} -s Error -d "\${errorDetails}" -o ${source} -c \${cookie} -u F) + echo LOG: execution run RID uploaded - \${rid} >> ${repRID}.failPreExecutionRun_fastqfile.log + else + rid=\$(echo \${exist} | grep -o '\\"RID\\":\\".*\\",\\"RCT') + rid=\${rid:7:-6} + echo \${rid} >> ${repRID}.failPreExecutionRun_fastqfile.log + executionRun_rid==\$(python3 ${script_uploadExecutionRun} -r ${repRID} -w \${workflow} -g \${genome} -i ${inputBagRID} -s Error -d "\${errorDetails}" -o ${source} -c \${cookie} -u \${rid}) + echo LOG: execution run RID updated - \${executionRun_rid} >> ${repRID}.failPreExecutionRun_fastqfile.log + fi + + dt=`date +%FT%T.%3N%:z` + curl -H 'Content-Type: application/json' -X PUT -d \ + '{ \ + "ID": "${workflow.sessionId}", \ + "ExecutionRunRID": "'\${rid}'", \ + "Failure": "'\${dt}'" \ + }' \ + "https://9ouc12dkwb.execute-api.us-east-2.amazonaws.com/prod/db/track" + """ +} /* * failPreExecutionRun_species: fail the execution run prematurely for species error diff --git a/workflow/scripts/bdbag_fetch.sh b/workflow/scripts/bdbag_fetch.sh index c34dc756d0cc5a47382fb9f96267e378c19ae79a..45ee14a7da409e011494921bafa204b44e96f795 100644 --- a/workflow/scripts/bdbag_fetch.sh +++ b/workflow/scripts/bdbag_fetch.sh @@ -18,7 +18,7 @@ if [ "${validate}" != "is valid" ] then exit 1 fi -for i in $(find */ -name "*R*.fastq.gz") +for i in $(find */ -name "*[_.]R[1-2].fastq.gz") do path=${2}.$(echo ${i##*/} | grep -o "R[1,2].fastq.gz") cp ${i} ./${path} diff --git a/workflow/scripts/parse_meta.py b/workflow/scripts/parse_meta.py index 12cc7c7233b94509e5c3a7307e8ef7985a94a958..fdbc86c12a2fb6832217ec0f08263d0102c9e566 100644 --- a/workflow/scripts/parse_meta.py +++ b/workflow/scripts/parse_meta.py @@ -35,15 +35,11 @@ def main(): else: rep = metaFile["Replicate_RID"].unique()[0] print(rep) - if (len(metaFile[metaFile["File_Type"] == "FastQ"]) > 2): - print("There are more then 2 fastq's in the metadata: " + - " ".join(metaFile[metaFile["File_Type"] == "FastQ"].RID)) - exit(1) # Check experiment RID metadata from 'Experiment.csv' if (args.parameter == "expRID"): if (len(metaFile.Experiment_RID.unique()) > 1): - print("There are multiple experoment RID's in the metadata: " + + print("There are multiple experiment RID's in the metadata: " + " ".join(metaFile.Experiment_RID.unique())) exit(1) else: @@ -65,14 +61,6 @@ def main(): endsMeta = metaFile.Paired_End.unique()[0] print(endsMeta) - # Manually get endness count from 'File.csv' - if (args.parameter == "endsManual"): - if (len(metaFile[metaFile["File_Type"] == "FastQ"]) == 1): - endsManual = "se" - elif (len(metaFile[metaFile["File_Type"] == "FastQ"]) == 2): - endsManual = "pe" - print(endsManual) - # Get strandedness metadata from 'Experiment Settings.csv' if (args.parameter == "stranded"): if (metaFile.Has_Strand_Specific_Information.unique() == "yes"): @@ -80,9 +68,7 @@ def main(): elif (metaFile.Has_Strand_Specific_Information.unique() == "no"): stranded = "unstranded" else: - print("Stranded metadata not match expected options: " + - metaFile.Has_Strand_Specific_Information.unique()) - exit(1) + stranded = metaFile.Has_Strand_Specific_Information.unique()[0] print(stranded) # Get spike-in metadata from 'Experiment Settings.csv' @@ -92,9 +78,7 @@ def main(): elif (metaFile.Used_Spike_Ins.unique() == "no"): spike = "no" else: - print("Spike-ins metadata not match expected options: " + - metaFile.Used_Spike_Ins.unique()) - exit(1) + spike = metaFile.Used_Spike_Ins.unique()[0] print(spike) # Get species metadata from 'Experiment.csv' @@ -104,9 +88,7 @@ def main(): elif (metaFile.Species.unique() == "Homo sapiens"): species = "Homo sapiens" else: - print("Species metadata not match expected options: " + - metaFile.Species.unique()) - exit(1) + species = metaFile.Species.unique()[0] print(species) # Get read length metadata from 'Experiment Settings.csv'