diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index f9a5b7b8b017f73d5f0f86efb4684f1ff2ba930e..c2b63ce58bde32bc80c7bfbd5ee2792bad99fdb4 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -109,7 +109,7 @@ parseMetadata: - study=$(singularity run 'docker://gudmaprbk/python3:1.0.0' python3 ./workflow/scripts/parse_meta.py -r Replicate_RID -m "./test_data/meta/metaTest.csv" -p studyRID) - endsRaw=$(singularity run 'docker://gudmaprbk/python3:1.0.0' python3 ./workflow/scripts/parse_meta.py -r Replicate_RID -m "./test_data/meta/metaTest.csv" -p endsMeta) - endsMeta="uk" - - endsManual=$(singularity run 'docker://gudmaprbk/python3:1.0.0' python3 ./workflow/scripts/parse_meta.py -r Replicate_RID -m "./test_data/meta/metaTest.csv" -p endsManual) + - endsManual="se" - stranded=$(singularity run 'docker://gudmaprbk/python3:1.0.0' python3 ./workflow/scripts/parse_meta.py -r Replicate_RID -m "./test_data/meta/metaTest.csv" -p stranded) - spike=$(singularity run 'docker://gudmaprbk/python3:1.0.0' python3 ./workflow/scripts/parse_meta.py -r Replicate_RID -m "./test_data/meta/metaTest.csv" -p spike) - species=$(singularity run 'docker://gudmaprbk/python3:1.0.0' python3 ./workflow/scripts/parse_meta.py -r Replicate_RID -m "./test_data/meta/metaTest.csv" -p species) @@ -750,6 +750,36 @@ failMismatchR1R2: when: - always +failUnexpectedMeta: + stage: integration + only: [merge_requests] + except: + variables: + - $CI_MERGE_REQUEST_TARGET_BRANCH_NAME =~ /master/ + script: + - hostname + - ulimit -a + - nextflow -q run ./workflow/rna-seq.nf --deriva ./test_data/auth/credential.json --bdbag ./test_data/auth/cookies.txt --repRID 14-3R4R --source staging --upload true -with-dag dag.png --dev false --ci true + retry: + max: 0 + when: + - always + +failFileStructure: + stage: integration + only: [merge_requests] + except: + variables: + - $CI_MERGE_REQUEST_TARGET_BRANCH_NAME =~ /master/ + script: + - hostname + - ulimit -a + - nextflow -q run ./workflow/rna-seq.nf --deriva ./test_data/auth/credential.json --bdbag ./test_data/auth/cookies.txt --repRID Q-Y5HT --source staging --upload true -with-dag dag.png --dev false --ci true + retry: + max: 0 + when: + - always + override_inputBag: stage: integration only: [merge_requests] diff --git a/CHANGELOG.md b/CHANGELOG.md index d291ce8ff00500d597cff6db858ce18e0664aa19..3382228f8fdee34882c81df36e2773641a82a604 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,27 @@ +# v1.0.3 +**User Facing** + +**Background** +* Add memory limit (75%) per thread for samtools sort (#108) +* Remove parsing restrictions for submitted stranded/spike/species (#105, #106) +* Pass unidentified ends instead of overwriting it as unknown +* Move fastqc process before trim to catch fastq errors (#107) +* Only use fastq's that match *[_.]R[1-2].fastq.gz naming convention (#107) +* Add error output for no fastq's +* Update input bag export config to only fetch fastq's that match *[_.]R[1-2].fastq.gz naming convention +* Remove check for multiple fastq check in parse metadata (redundant and no longer valid) +* Handle blank submitted endness better +* Don't use file.csv from inputBag to parse manual endness, use counted from getData +* Detect malformed fastq's (#107) +* Restrict sampled alignment process to use >32GB nodes on BioHPC (#108) +* Use nproc**-1** for alignment processes (#108) + +*Known Bugs* +* Override params (inputBag, fastq, species) aren't checked for integrity +* Authentication files and tokens must be active (active auth client) for the duration of the pipeline run (until long-lived token utilization included) + +<hr> + # v1.0.2 **User Facing** diff --git a/README.md b/README.md index 4639e51ee03b52d261274d95a9a3c72a7fd9a434..90878a2234e5fe326cefa12c915d5c75fab81bb2 100644 --- a/README.md +++ b/README.md @@ -93,10 +93,12 @@ Error reported back to the data-hub are (they aren't thrown on the command line |Error|Descripton| |:-|:-:| |**Too many fastqs detected (>2)**|Data-hub standards and that of this pipeline is for one read-1 fastq and if paired-end, one read\-2 fastq. As a result, the maximum number of fastq's per replicate cannot be more than 2.| +|**No valid fastqs detected (may not match {_.}R{12}.fastq.gz convention)**|Files may be missing, or named in a way that doesn't match the data-hub convention.| |**Number of fastqs detected does not match submitted endness**|Single-end sequenced replicates can only have one fastq, while paried\-end can only have two (see above).| -|**Number of reads do not match for R1 and R2**|For paired\-end sequenced studies the number of reads in read\-1 fastq must match that of read\-2. This error is usually indicative of uploading of currupted, trunkated, or wrong fastq files| +|**Number of reads do not match for R1 and R2**|For paired\-end sequenced studies the number of reads in read\-1 fastq must match that of read\-2. This error is usually indicative of uploading of currupted, trunkated, or wrong fastq files.| +|**There is an error with the structure of the fastq**|The fastq's fail a test of their structure. This error is usually indicative of uploading of currupted, trunkated, or wrong fastq files.| |**Inference of species returns an ambiguous result**|Species of the replicate is done by aligning a random subset of 1 million reads from the data to both the human and mouse reference genomes. If there isn't a clear difference between the alignment rates (`>=40%` of one species, but `<40%` of the other), then this error is detected.| -|**Submitted metadata does not match inferred**|All required metadata for analysis of the data is internally inferred by the pipeline, if any of those do not match the submitted metadata, this error is detected to notify of a potential error.| +|**Submitted metadata does not match inferred**|All required metadata for analysis of the data is internally inferred by the pipeline, if any of those do not match the submitted metadata, this error is detected to notify of a potential error. The mismatched metadata will be listed.| <hr> [**CHANGELOG**](CHANGELOG.md) diff --git a/docs/dag.png b/docs/dag.png index 74bf1dbbf26f30d9cee216682aa795393db24ae8..7eb7c79f097a7fcac368a58ef63cfda42b154f41 100755 Binary files a/docs/dag.png and b/docs/dag.png differ diff --git a/workflow/conf/Replicate_For_Input_Bag.json b/workflow/conf/Replicate_For_Input_Bag.json index 278d0bf4d9d9f5074d7e3c4ef948287eb97ed767..842cf62fbb5237481a62173ff88be71fb22d04a4 100644 --- a/workflow/conf/Replicate_For_Input_Bag.json +++ b/workflow/conf/Replicate_For_Input_Bag.json @@ -89,7 +89,7 @@ "processor": "fetch", "processor_params": { "output_path": "assets/Study/{Study_RID}/Experiment/{Experiment_RID}/Replicate/{Replicate_RID}", - "query_path": "/attribute/M:=RNASeq:Replicate/RID={rid}/(RID)=(RNASeq:File:Replicate_RID)/File_Type=FastQ/url:=URI,length:=File_size,filename:=File_Name,md5:=MD5,Study_RID,Experiment_RID,Replicate_RID?limit=none" + "query_path": "/attribute/M:=RNASeq:Replicate/RID={rid}/(RID)=(RNASeq:File:Replicate_RID)/File_Type=FastQ/File_Name::ciregexp::%5B_.%5DR%5B12%5D%5C.fastq%5C.gz/url:=URI,length:=File_size,filename:=File_Name,md5:=MD5,Study_RID,Experiment_RID,Replicate_RID?limit=none" } } ] diff --git a/workflow/conf/aws.config b/workflow/conf/aws.config index eec8a7a829280d29006d9f0dc9f9b9932d9a9867..d87669599a7e70add448f0cc7f0636dd8bef499b 100644 --- a/workflow/conf/aws.config +++ b/workflow/conf/aws.config @@ -116,6 +116,10 @@ process { cpus = 1 memory = '1 GB' } + withName:failPreExecutionRun_fastqFile { + cpus = 1 + memory = '1 GB' + } withName:failPreExecutionRun_species { { cpus = 1 diff --git a/workflow/conf/biohpc.config b/workflow/conf/biohpc.config index 33917dbe810969bc6de2482dba00df16b06547ce..cc058dfef74bc49adf3932554994678934ac7a44 100755 --- a/workflow/conf/biohpc.config +++ b/workflow/conf/biohpc.config @@ -32,7 +32,7 @@ process { executor = 'local' } withName:alignSampleData { - queue = 'super' + queue = '128GB,256GB,256GBv1,384GB' } withName:inferMetadata { queue = 'super' @@ -85,6 +85,9 @@ process { withName:failPreExecutionRun_fastq { executor = 'local' } + withName:failPreExecutionRun_fastqFile { + executor = 'local' + } withName:failPreExecutionRun_species { executor = 'local' } diff --git a/workflow/nextflow.config b/workflow/nextflow.config index 66b847beffb6499b9ba16678887ed0488c2d12cb..29c8515a852fb3356335b89a71f4ddf4f2c7a78b 100644 --- a/workflow/nextflow.config +++ b/workflow/nextflow.config @@ -91,6 +91,9 @@ process { withName:failPreExecutionRun_fastq { container = 'gudmaprbk/deriva1.4:1.0.0' } + withName:failPreExecutionRun_fastqFile { + container = 'gudmaprbk/deriva1.4:1.0.0' + } withName:failPreExecutionRun_species { container = 'gudmaprbk/deriva1.4:1.0.0' } @@ -125,6 +128,6 @@ manifest { homePage = 'https://git.biohpc.swmed.edu/gudmap_rbk/rna-seq' description = 'This pipeline was created to be a standard mRNA-sequencing analysis pipeline which integrates with the GUDMAP and RBK consortium data-hub.' mainScript = 'rna-seq.nf' - version = 'v1.0.2' + version = 'v1.0.3' nextflowVersion = '>=19.09.0' } diff --git a/workflow/rna-seq.nf b/workflow/rna-seq.nf index 14d7c498c268f4ae10ee10b74b02b84d683cd074..2113e55a5931b0b5fe07bfb993f96b3b9f3c3b30 100644 --- a/workflow/rna-seq.nf +++ b/workflow/rna-seq.nf @@ -48,6 +48,7 @@ deriva.into { deriva_uploadOutputBag deriva_finalizeExecutionRun deriva_failPreExecutionRun_fastq + deriva_failPreExecutionRun_fastqFile deriva_failPreExecutionRun_species deriva_failExecutionRun } @@ -100,6 +101,7 @@ script_uploadInputBag = Channel.fromPath("${baseDir}/scripts/upload_input_bag.py script_uploadExecutionRun_uploadExecutionRun = Channel.fromPath("${baseDir}/scripts/upload_execution_run.py") script_uploadExecutionRun_finalizeExecutionRun = Channel.fromPath("${baseDir}/scripts/upload_execution_run.py") script_uploadExecutionRun_failPreExecutionRun_fastq = Channel.fromPath("${baseDir}/scripts/upload_execution_run.py") +script_uploadExecutionRun_failPreExecutionRun_fastqFile = Channel.fromPath("${baseDir}/scripts/upload_execution_run.py") script_uploadExecutionRun_failPreExecutionRun_species = Channel.fromPath("${baseDir}/scripts/upload_execution_run.py") script_uploadExecutionRun_failExecutionRun = Channel.fromPath("${baseDir}/scripts/upload_execution_run.py") script_uploadQC = Channel.fromPath("${baseDir}/scripts/upload_qc.py") @@ -267,6 +269,10 @@ process getData { echo -e "LOG: fetched" >> ${repRID}.getData.log fastqCount=\$(ls *.fastq.gz | wc -l) + if [ "\${fastqCount}" == "0" ] + then + touch dummy.R1.fastq.gz + fi echo "\${fastqCount}" > fastqCount.csv """ } @@ -284,12 +290,12 @@ if (fastqsForce != "") { .ifEmpty { exit 1, "override inputBag file not found: ${fastqsForce}" } .collect().into { fastqs_parseMetadata - fastqs_trimData + fastqs_fastqc } } else { - fastqs.into { + fastqs.collect().into { fastqs_parseMetadata - fastqs_trimData + fastqs_fastqc } } @@ -304,7 +310,7 @@ process parseMetadata { path file from fileMeta path experimentSettings, stageAs: "ExperimentSettings.csv" from experimentSettingsMeta path experiment from experimentMeta - path (fastq) from fastqs_parseMetadata + path (fastq) from fastqs_parseMetadata.collect() val fastqCount output: @@ -337,17 +343,20 @@ process parseMetadata { elif [ "\${endsRaw}" == "Paired End" ] then endsMeta="pe" - else - endsMeta="unknown" - fi - if [ "\${endsRaw}" == "" ] + elif [ "\${endsRaw}" == "nan" ] then endsRaw="_No value_" + endsMeta="NA" fi # ganually get endness - endsManual=\$(python3 ${script_parseMeta} -r ${repRID} -m "${file}" -p endsManual) - echo -e "LOG: endedness manually detected: \${endsManual}" >> ${repRID}.parseMetadata.log + if [ "${fastqCount}" == "1" ] + then + endsManual="se" + else + endsManual="pe" + fi + echo -e "LOG: endedness manually detected: ${fastqCount}" >> ${repRID}.parseMetadata.log # get strandedness metadata stranded=\$(python3 ${script_parseMeta} -r ${repRID} -m "${experimentSettings}" -p stranded) @@ -376,6 +385,10 @@ process parseMetadata { then fastqCountError=true fastqCountError_details="**Too many fastqs detected (>2)**" + elif [ "${fastqCount}" -eq "0" ] + then + fastqCountError=true + fastqCountError_details="**No valid fastqs detected \\(may not match {_.}R{12}.fastq.gz convention\\)**" elif [ "\${endsMeta}" == "se" ] && [ "${fastqCount}" -ne "1" ] then fastqCountError=true @@ -451,6 +464,7 @@ spikeMeta.into { spikeMeta_checkMetadata spikeMeta_aggrQC spikeMeta_failPreExecutionRun_fastq + spikeMeta_failPreExecutionRun_fastqFile spikeMeta_failPreExecutionRun_species spikeMeta_failExecutionRun } @@ -458,6 +472,7 @@ speciesMeta.into { speciesMeta_checkMetadata speciesMeta_aggrQC speciesMeta_failPreExecutionRun_fastq + speciesMeta_failPreExecutionRun_fastqFile speciesMeta_failPreExecutionRun_species speciesMeta_failExecutionRun } @@ -486,6 +501,7 @@ fastqError_fl.splitCsv(sep: ",", header: false).separate( // Replicate errors for multiple process inputs fastqCountError.into { + fastqCountError_fastqc fastqCountError_trimData fastqCountError_getRefInfer fastqCountError_downsampleData @@ -498,7 +514,6 @@ fastqCountError.into { fastqCountError_dedupData fastqCountError_makeBigWig fastqCountError_countData - fastqCountError_fastqc fastqCountError_dataQC fastqCountError_aggrQC fastqCountError_uploadQC @@ -507,6 +522,7 @@ fastqCountError.into { fastqCountError_failPreExecutionRun_fastq } fastqReadError.into { + fastqReadError_fastqc fastqReadError_trimData fastqReadError_getRefInfer fastqReadError_downsampleData @@ -519,7 +535,6 @@ fastqReadError.into { fastqReadError_dedupData fastqReadError_makeBigWig fastqReadError_countData - fastqReadError_fastqc fastqReadError_dataQC fastqReadError_aggrQC fastqReadError_uploadQC @@ -528,6 +543,98 @@ fastqReadError.into { fastqReadError_failPreExecutionRun_fastq } +/* + *fastqc: run fastqc on untrimmed fastq's +*/ +process fastqc { + tag "${repRID}" + + input: + path (fastq) from fastqs_fastqc.collect() + val fastqCountError_fastqc + val fastqReadError_fastqc + + output: + path ("*.R{1,2}.fastq.gz", includeInputs:true) into fastqs_trimData + path ("*_fastqc.zip") into fastqc + path ("rawReads.csv") into rawReadsInfer_fl + path "fastqFileError.csv" into fastqFileError_fl + + when: + fastqCountError_fastqc == 'false' && fastqReadError_fastqc == 'false' + + script: + """ + hostname > ${repRID}.fastqc.log + ulimit -a >> ${repRID}.fastqc.log + + # run fastqc + echo -e "LOG: running fastq on raw fastqs" >> ${repRID}.fastqc.log + fastqc *.fastq.gz -o . &> fastqc.out || true + fastqcErrorOut=\$(cat fastqc.out | grep -c 'Failed to process file') || fastqcErrorOut=0 + fastqFileError=false + fastqFileError_details="" + if [ "\${fastqcErrorOut}" -ne "0" ] + then + fastqFileError=true + fastqFileError_details="**There is an error with the structure of the fastq**" + echo -e "LOG: There is an error with the structure of the fastq" >> ${repRID}.fastqc.log + touch dummy_fastqc.zip + else + echo -e "LOG: The structure of the fastq is correct" >> ${repRID}.fastqc.log + fi + + # count raw reads + zcat *.R1.fastq.gz | echo \$((`wc -l`/4)) > rawReads.csv + + # save fastq error file + echo "\${fastqFileError},\${fastqFileError_details}" > fastqFileError.csv + """ +} + +// Extract number of raw reads metadata into channel +rawReadsInfer = Channel.create() +rawReadsInfer_fl.splitCsv(sep: ",", header: false).separate( + rawReadsInfer +) + +// Replicate inferred raw reads for multiple process inputs +rawReadsInfer.into { + rawReadsInfer_aggrQC + rawReadsInfer_uploadQC +} + +// Split fastq count error into separate channel +fastqFileError = Channel.create() +fastqFileError_details = Channel.create() +fastqFileError_fl.splitCsv(sep: ",", header: false).separate( + fastqFileError, + fastqFileError_details +) + +// Replicate errors for multiple process inputs +fastqFileError.into { + fastqFileError_fastqc + fastqFileError_trimData + fastqFileError_getRefInfer + fastqFileError_downsampleData + fastqFileError_alignSampleData + fastqFileError_inferMetadata + fastqFileError_checkMetadata + fastqFileError_uploadExecutionRun + fastqFileError_getRef + fastqFileError_alignData + fastqFileError_dedupData + fastqFileError_makeBigWig + fastqFileError_countData + fastqFileError_dataQC + fastqFileError_aggrQC + fastqFileError_uploadQC + fastqFileError_uploadProcessedFile + fastqFileError_uploadOutputBag + fastqFileError_failPreExecutionRun_fastqFile +} + /* * trimData: trims any adapter or non-host sequences from the data */ @@ -539,16 +646,17 @@ process trimData { val ends from endsManual_trimData val fastqCountError_trimData val fastqReadError_trimData + val fastqFileError_trimData output: path ("*.fq.gz") into fastqsTrim - path ("*.fastq.gz", includeInputs:true) into fastqs_fastqc path ("*_trimming_report.txt") into trimQC path ("readLength.csv") into readLengthInfer_fl when: fastqCountError_trimData == "false" fastqReadError_trimData == "false" + fastqFileError_trimData == "false" script: """ @@ -592,7 +700,7 @@ fastqsTrim.into { } // Combine inputs of getRefInfer -getRefInferInput = referenceInfer.combine(deriva_getRefInfer.combine(script_refDataInfer.combine(fastqCountError_getRefInfer.combine(fastqReadError_getRefInfer)))) +getRefInferInput = referenceInfer.combine(deriva_getRefInfer.combine(script_refDataInfer.combine(fastqCountError_getRefInfer.combine(fastqReadError_getRefInfer.combine(fastqFileError_getRefInfer))))) /* * getRefInfer: dowloads appropriate reference for metadata inference @@ -601,7 +709,7 @@ process getRefInfer { tag "${refName}" input: - tuple val (refName), path (credential, stageAs: "credential.json"), path (script_refDataInfer), val (fastqCountError), val (fastqReadError) from getRefInferInput + tuple val (refName), path (credential, stageAs: "credential.json"), path (script_refDataInfer), val (fastqCountError), val (fastqReadError), val (fastqFileError) from getRefInferInput output: tuple val (refName), path ("hisat2", type: 'dir'), path ("*.fna"), path ("*.gtf") into refInfer @@ -610,6 +718,7 @@ process getRefInfer { when: fastqCountError == "false" fastqReadError == "false" + fastqFileError == "false" script: """ @@ -687,6 +796,7 @@ process downsampleData { val ends from endsManual_downsampleData val fastqCountError_downsampleData val fastqReadError_downsampleData + val fastqFileError_downsampleData output: path ("sampled.1.fq") into fastqs1Sample @@ -695,6 +805,7 @@ process downsampleData { when: fastqCountError_downsampleData == "false" fastqReadError_downsampleData == "false" + fastqFileError_downsampleData == "false" script: """ @@ -718,7 +829,7 @@ process downsampleData { } // Replicate the dowsampled fastq's and attatched to the references -inferInput = endsManual_alignSampleData.combine(refInfer.combine(fastqs1Sample.collect().combine(fastqs2Sample.collect().combine(fastqCountError_alignSampleData.combine(fastqReadError_alignSampleData))))) +inferInput = endsManual_alignSampleData.combine(refInfer.combine(fastqs1Sample.collect().combine(fastqs2Sample.collect().combine(fastqCountError_alignSampleData.combine(fastqReadError_alignSampleData.combine(fastqFileError_alignSampleData)))))) /* * alignSampleData: aligns the downsampled reads to a reference database @@ -727,7 +838,7 @@ process alignSampleData { tag "${ref}" input: - tuple val (ends), val (ref), path (hisat2), path (fna), path (gtf), path (fastq1), path (fastq2), val (fastqCountError), val (fastqReadError) from inferInput + tuple val (ends), val (ref), path (hisat2), path (fna), path (gtf), path (fastq1), path (fastq2), val (fastqCountError), val (fastqReadError), val (fastqFileError) from inferInput output: path ("${ref}.sampled.sorted.bam") into sampleBam @@ -737,6 +848,7 @@ process alignSampleData { when: fastqCountError == "false" fastqReadError == "false" + fastqFileError == "false" script: """ @@ -761,7 +873,10 @@ process alignSampleData { # sort the bam file using Samtools echo -e "LOG: sorting the bam file" >> ${repRID}.${ref}.alignSampleData.log - samtools sort -@ `nproc` -O BAM -o ${ref}.sampled.sorted.bam ${ref}.sampled.bam + proc=\$(expr `nproc` - 1) + mem=\$(vmstat -s -S K | grep 'total memory' | grep -o '[0-9]*') + mem=\$(expr \${mem} / \${proc} \\* 85 / 100) + samtools sort -@ \${proc} -m \${mem}K -O BAM -o ${ref}.sampled.sorted.bam ${ref}.sampled.bam # index the sorted bam using Samtools echo -e "LOG: indexing sorted bam file" >> ${repRID}.${ref}.alignSampleData.log @@ -785,6 +900,7 @@ process inferMetadata { path alignSummary from alignSampleQC_inferMetadata.collect() val fastqCountError_inferMetadata val fastqReadError_inferMetadata + val fastqFileError_inferMetadata output: path "infer.csv" into inferMetadata_fl @@ -794,6 +910,7 @@ process inferMetadata { when: fastqCountError_inferMetadata == "false" fastqReadError_inferMetadata == "false" + fastqFileError_inferMetadata == "false" script: """ @@ -1011,6 +1128,7 @@ process checkMetadata { val speciesInfer from speciesInfer_checkMetadata val fastqCountError_checkMetadata val fastqReadError_checkMetadata + val fastqFileError_checkMetadata val speciesError_checkMetadata output: @@ -1020,6 +1138,7 @@ process checkMetadata { when: fastqCountError_checkMetadata == "false" fastqReadError_checkMetadata == "false" + fastqFileError_checkMetadata == "false" speciesError_checkMetadata == "false" script: @@ -1185,6 +1304,7 @@ inputBagRID.into { inputBagRID_uploadExecutionRun inputBagRID_finalizeExecutionRun inputBagRID_failPreExecutionRun_fastq + inputBagRID_failPreExecutionRun_fastqFile inputBagRID_failPreExecutionRun_species inputBagRID_failExecutionRun } @@ -1203,6 +1323,7 @@ process uploadExecutionRun { val inputBagRID from inputBagRID_uploadExecutionRun val fastqCountError_uploadExecutionRun val fastqReadError_uploadExecutionRun + val fastqFileError_uploadExecutionRun val speciesError_uploadExecutionRun output: @@ -1212,6 +1333,7 @@ process uploadExecutionRun { upload fastqCountError_uploadExecutionRun == "false" fastqReadError_uploadExecutionRun == "false" + fastqFileError_uploadExecutionRun == "false" speciesError_uploadExecutionRun == "false" script: @@ -1298,6 +1420,7 @@ process getRef { val species from speciesInfer_getRef val fastqCountError_getRef val fastqReadError_getRef + val fastqFileError_getRef val speciesError_getRef val pipelineError_getRef @@ -1307,6 +1430,7 @@ process getRef { when: fastqCountError_getRef == "false" fastqReadError_getRef == "false" + fastqFileError_getRef == "false" speciesError_getRef == "false" pipelineError_getRef == "false" @@ -1398,6 +1522,7 @@ process alignData { val stranded from strandedInfer_alignData val fastqCountError_alignData val fastqReadError_alignData + val fastqFileError_alignData val speciesError_alignData val pipelineError_alignData @@ -1408,6 +1533,7 @@ process alignData { when: fastqCountError_alignData == "false" fastqReadError_alignData == "false" + fastqFileError_alignData == "false" speciesError_alignData == "false" pipelineError_alignData == "false" @@ -1451,7 +1577,10 @@ process alignData { # sort the bam file using Samtools echo -e "LOG: sorting the bam file" >> ${repRID}.align.log - samtools sort -@ `nproc` -O BAM -o ${repRID}.sorted.bam ${repRID}.bam + proc=\$(expr `nproc` - 1) + mem=\$(vmstat -s -S K | grep 'total memory' | grep -o '[0-9]*') + mem=\$(expr \${mem} / \${proc} \\* 75 / 100) + samtools sort -@ \${proc} -m \${mem}K -O BAM -o ${repRID}.sorted.bam ${repRID}.bam # index the sorted bam using Samtools echo -e "LOG: indexing sorted bam file" >> ${repRID}.align.log @@ -1475,6 +1604,7 @@ process dedupData { tuple path (bam), path (bai) from rawBam_dedupData val fastqCountError_dedupData val fastqReadError_dedupData + val fastqFileError_dedupData val speciesError_dedupData val pipelineError_dedupData @@ -1486,6 +1616,7 @@ process dedupData { when: fastqCountError_dedupData == 'false' fastqReadError_dedupData == 'false' + fastqFileError_dedupData == 'false' speciesError_dedupData == 'false' pipelineError_dedupData == 'false' @@ -1534,6 +1665,7 @@ process makeBigWig { tuple path (bam), path (bai) from dedupBam_makeBigWig val fastqCountError_makeBigWig val fastqReadError_makeBigWig + val fastqFileError_makeBigWig val speciesError_makeBigWig val pipelineError_makeBigWig @@ -1543,6 +1675,7 @@ process makeBigWig { when: fastqCountError_makeBigWig == 'false' fastqReadError_makeBigWig == 'false' + fastqFileError_makeBigWig == 'false' speciesError_makeBigWig == 'false' pipelineError_makeBigWig == 'false' @@ -1574,6 +1707,7 @@ process countData { val stranded from strandedInfer_countData val fastqCountError_countData val fastqReadError_countData + val fastqFileError_countData val speciesError_countData val pipelineError_countData @@ -1585,6 +1719,7 @@ process countData { when: fastqCountError_countData == 'false' fastqReadError_countData == 'false' + fastqFileError_countData == 'false' speciesError_countData == 'false' pipelineError_countData == 'false' @@ -1645,55 +1780,6 @@ assignedReadsInfer.into { assignedReadsInfer_uploadQC } -/* - *fastqc: run fastqc on untrimmed fastq's -*/ -process fastqc { - tag "${repRID}" - - input: - path (fastq) from fastqs_fastqc - val fastqCountError_fastqc - val fastqReadError_fastqc - val speciesError_fastqc - val pipelineError_fastqc - - output: - path ("*_fastqc.zip") into fastqc - path ("rawReads.csv") into rawReadsInfer_fl - - when: - fastqCountError_fastqc == 'false' - fastqReadError_fastqc == 'false' - speciesError_fastqc == 'false' - pipelineError_fastqc == 'false' - - script: - """ - hostname > ${repRID}.fastqc.log - ulimit -a >> ${repRID}.fastqc.log - - # run fastqc - echo -e "LOG: running fastq on raw fastqs" >> ${repRID}.fastqc.log - fastqc *.fastq.gz -o . - - # count raw reads - zcat *.R1.fastq.gz | echo \$((`wc -l`/4)) > rawReads.csv - """ -} - -// Extract number of raw reads metadata into channel -rawReadsInfer = Channel.create() -rawReadsInfer_fl.splitCsv(sep: ",", header: false).separate( - rawReadsInfer -) - -// Replicate inferred raw reads for multiple process inputs -rawReadsInfer.into { - rawReadsInfer_aggrQC - rawReadsInfer_uploadQC -} - /* *dataQC: calculate transcript integrity numbers (TIN) and bin as well as calculate innerdistance of PE replicates */ @@ -1708,6 +1794,7 @@ process dataQC { val ends from endsInfer_dataQC val fastqCountError_dataQC val fastqReadError_dataQC + val fastqFileError_dataQC val speciesError_dataQC val pipelineError_dataQC @@ -1719,6 +1806,7 @@ process dataQC { when: fastqCountError_dataQC == 'false' fastqReadError_dataQC == 'false' + fastqFileError_dataQC == 'false' speciesError_dataQC == 'false' pipelineError_dataQC == 'false' @@ -1804,6 +1892,7 @@ process aggrQC { val expRID from expRID_aggrQC val fastqCountError_aggrQC val fastqReadError_aggrQC + val fastqFileError_aggrQC val speciesError_aggrQC val pipelineError_aggrQC @@ -1814,6 +1903,7 @@ process aggrQC { when: fastqCountError_aggrQC == 'false' fastqReadError_aggrQC == 'false' + fastqFileError_aggrQC == 'false' speciesError_aggrQC == 'false' pipelineError_aggrQC == 'false' @@ -1905,6 +1995,7 @@ process uploadQC { val tinMed from tinMedInfer_uploadQC val fastqCountError_uploadQC val fastqReadError_uploadQC + val fastqFileError_uploadQC val speciesError_uploadQC val pipelineError_uploadQC @@ -1915,6 +2006,7 @@ process uploadQC { upload fastqCountError_uploadQC == 'false' fastqReadError_uploadQC == 'false' + fastqFileError_uploadQC == 'false' speciesError_uploadQC == 'false' pipelineError_uploadQC == 'false' @@ -1981,6 +2073,7 @@ process uploadProcessedFile { val executionRunRID from executionRunRID_uploadProcessedFile val fastqCountError_uploadProcessedFile val fastqReadError_uploadProcessedFile + val fastqFileError_uploadProcessedFile val speciesError_uploadProcessedFile val pipelineError_uploadProcessedFile @@ -1991,6 +2084,7 @@ process uploadProcessedFile { upload fastqCountError_uploadProcessedFile == 'false' fastqReadError_uploadProcessedFile == 'false' + fastqFileError_uploadProcessedFile == 'false' speciesError_uploadProcessedFile == 'false' pipelineError_uploadProcessedFile == 'false' @@ -2073,6 +2167,7 @@ process uploadOutputBag { val executionRunRID from executionRunRID_uploadOutputBag val fastqCountError_uploadOutputBag val fastqReadError_uploadOutputBag + val fastqFileError_uploadOutputBag val speciesError_uploadOutputBag val pipelineError_uploadOutputBag @@ -2083,6 +2178,7 @@ process uploadOutputBag { upload fastqCountError_uploadOutputBag == 'false' fastqReadError_uploadOutputBag == 'false' + fastqFileError_uploadOutputBag == 'false' speciesError_uploadOutputBag == 'false' pipelineError_uploadOutputBag == 'false' @@ -2255,6 +2351,86 @@ process failPreExecutionRun_fastq { """ } +/* + * failPreExecutionRun_fastqFile: fail the execution run prematurely for fastqFile errors +*/ +process failPreExecutionRun_fastqFile { + tag "${repRID}" + + input: + path script_uploadExecutionRun from script_uploadExecutionRun_failPreExecutionRun_fastqFile + path credential, stageAs: "credential.json" from deriva_failPreExecutionRun_fastqFile + val spike from spikeMeta_failPreExecutionRun_fastqFile + val species from speciesMeta_failPreExecutionRun_fastqFile + val inputBagRID from inputBagRID_failPreExecutionRun_fastqFile + val fastqFileError from fastqFileError_failPreExecutionRun_fastqFile + val fastqFileError_details + + when: + upload + fastqFileError == 'true' + + script: + """ + hostname > ${repRID}.failPreExecutionRun_fastqfile.log + ulimit -a >> ${repRID}.failPreExecutionRun_fastqfile.log + + errorDetails="" + if [ ${fastqFileError} == true ] + then + errorDetails=\$(echo \$(errorDetails)${fastqFileError_details}"\\n") + fi + + echo LOG: searching for workflow RID - BICF mRNA ${workflow.manifest.version} >> ${repRID}.failPreExecutionRun_fastqfile.log + workflow=\$(curl -s https://${source}/ermrest/catalog/2/entity/RNASeq:Workflow/Name=BICF%20mRNA%20Replicate/Version=${workflow.manifest.version}) + workflow=\$(echo \${workflow} | grep -o '\\"RID\\":\\".*\\",\\"RCT') + workflow=\${workflow:7:-6} + echo LOG: workflow RID extracted - \${workflow} >> ${repRID}.failPreExecutionRun_fastqfile.log + + if [ "${species}" == "Homo sapiens" ] + then + genomeName=\$(echo GRCh${refHuVersion}) + elif [ "${species}" == "Mus musculus" ] + then + genomeName=\$(echo GRCm${refMoVersion}) + fi + if [ "${spike}" == "yes" ] + then + genomeName=\$(echo \${genomeName}-S) + fi + echo LOG: searching for genome name - \${genomeName} >> ${repRID}.failPreExecutionRun_fastqfile.log + genome=\$(curl -s https://${source}/ermrest/catalog/2/entity/RNASeq:Reference_Genome/Name=\${genomeName}) + genome=\$(echo \${genome} | grep -o '\\"RID\\":\\".*\\",\\"RCT') + genome=\${genome:7:-6} + echo LOG: genome RID extracted - \${genome} >> ${repRID}.failPreExecutionRun_fastqfile.log + + cookie=\$(cat credential.json | grep -A 1 '\\"${source}\\": {' | grep -o '\\"cookie\\": \\".*\\"') + cookie=\${cookie:11:-1} + + exist=\$(curl -s https://${source}/ermrest/catalog/2/entity/RNASeq:Execution_Run/Workflow=\${workflow}/Replicate=${repRID}/Input_Bag=${inputBagRID}) + echo \${exist} >> ${repRID}.failPreExecutionRun_fastqfile.log + if [ "\${exist}" == "[]" ] + then + rid=\$(python3 ${script_uploadExecutionRun} -r ${repRID} -w \${workflow} -g \${genome} -i ${inputBagRID} -s Error -d "\${errorDetails}" -o ${source} -c \${cookie} -u F) + echo LOG: execution run RID uploaded - \${rid} >> ${repRID}.failPreExecutionRun_fastqfile.log + else + rid=\$(echo \${exist} | grep -o '\\"RID\\":\\".*\\",\\"RCT') + rid=\${rid:7:-6} + echo \${rid} >> ${repRID}.failPreExecutionRun_fastqfile.log + executionRun_rid==\$(python3 ${script_uploadExecutionRun} -r ${repRID} -w \${workflow} -g \${genome} -i ${inputBagRID} -s Error -d "\${errorDetails}" -o ${source} -c \${cookie} -u \${rid}) + echo LOG: execution run RID updated - \${executionRun_rid} >> ${repRID}.failPreExecutionRun_fastqfile.log + fi + + dt=`date +%FT%T.%3N%:z` + curl -H 'Content-Type: application/json' -X PUT -d \ + '{ \ + "ID": "${workflow.sessionId}", \ + "ExecutionRunRID": "'\${rid}'", \ + "Failure": "'\${dt}'" \ + }' \ + "https://9ouc12dkwb.execute-api.us-east-2.amazonaws.com/prod/db/track" + """ +} /* * failPreExecutionRun_species: fail the execution run prematurely for species error diff --git a/workflow/scripts/bdbag_fetch.sh b/workflow/scripts/bdbag_fetch.sh index c34dc756d0cc5a47382fb9f96267e378c19ae79a..45ee14a7da409e011494921bafa204b44e96f795 100644 --- a/workflow/scripts/bdbag_fetch.sh +++ b/workflow/scripts/bdbag_fetch.sh @@ -18,7 +18,7 @@ if [ "${validate}" != "is valid" ] then exit 1 fi -for i in $(find */ -name "*R*.fastq.gz") +for i in $(find */ -name "*[_.]R[1-2].fastq.gz") do path=${2}.$(echo ${i##*/} | grep -o "R[1,2].fastq.gz") cp ${i} ./${path} diff --git a/workflow/scripts/parse_meta.py b/workflow/scripts/parse_meta.py index 12cc7c7233b94509e5c3a7307e8ef7985a94a958..fdbc86c12a2fb6832217ec0f08263d0102c9e566 100644 --- a/workflow/scripts/parse_meta.py +++ b/workflow/scripts/parse_meta.py @@ -35,15 +35,11 @@ def main(): else: rep = metaFile["Replicate_RID"].unique()[0] print(rep) - if (len(metaFile[metaFile["File_Type"] == "FastQ"]) > 2): - print("There are more then 2 fastq's in the metadata: " + - " ".join(metaFile[metaFile["File_Type"] == "FastQ"].RID)) - exit(1) # Check experiment RID metadata from 'Experiment.csv' if (args.parameter == "expRID"): if (len(metaFile.Experiment_RID.unique()) > 1): - print("There are multiple experoment RID's in the metadata: " + + print("There are multiple experiment RID's in the metadata: " + " ".join(metaFile.Experiment_RID.unique())) exit(1) else: @@ -65,14 +61,6 @@ def main(): endsMeta = metaFile.Paired_End.unique()[0] print(endsMeta) - # Manually get endness count from 'File.csv' - if (args.parameter == "endsManual"): - if (len(metaFile[metaFile["File_Type"] == "FastQ"]) == 1): - endsManual = "se" - elif (len(metaFile[metaFile["File_Type"] == "FastQ"]) == 2): - endsManual = "pe" - print(endsManual) - # Get strandedness metadata from 'Experiment Settings.csv' if (args.parameter == "stranded"): if (metaFile.Has_Strand_Specific_Information.unique() == "yes"): @@ -80,9 +68,7 @@ def main(): elif (metaFile.Has_Strand_Specific_Information.unique() == "no"): stranded = "unstranded" else: - print("Stranded metadata not match expected options: " + - metaFile.Has_Strand_Specific_Information.unique()) - exit(1) + stranded = metaFile.Has_Strand_Specific_Information.unique()[0] print(stranded) # Get spike-in metadata from 'Experiment Settings.csv' @@ -92,9 +78,7 @@ def main(): elif (metaFile.Used_Spike_Ins.unique() == "no"): spike = "no" else: - print("Spike-ins metadata not match expected options: " + - metaFile.Used_Spike_Ins.unique()) - exit(1) + spike = metaFile.Used_Spike_Ins.unique()[0] print(spike) # Get species metadata from 'Experiment.csv' @@ -104,9 +88,7 @@ def main(): elif (metaFile.Species.unique() == "Homo sapiens"): species = "Homo sapiens" else: - print("Species metadata not match expected options: " + - metaFile.Species.unique()) - exit(1) + species = metaFile.Species.unique()[0] print(species) # Get read length metadata from 'Experiment Settings.csv'