Commit 168aaa3f authored by yy1533's avatar yy1533

🐾 hide intermediate files; support temp() to mask files

parent 9e4ced0c
......@@ -27,6 +27,7 @@ Select optional parameters from snakemake.snakemake():
* -j
* --cluster
* -n
* --notemp (--nt)
Refs:
- https://snakemake.readthedocs.io/en/stable/api_reference/snakemake.html
......@@ -64,6 +65,11 @@ def get_argument_parser():
action="store_true", default=False,
help="Read has to be mapped to the opposite strand as the feature")
parser.add_argument(
"--celseq2-to-st", "--st",
action="store_true", default=False,
help="Rotate the UMI-count matrix to a shape of spots by genes.")
parser.add_argument(
"--cores", "--jobs", "-j",
action="store",
......@@ -98,7 +104,10 @@ def get_argument_parser():
"--ignore-incomplete", "--ii",
action="store_true",
help="Do not check for incomplete output files.")
parser.add_argument(
"--notemp", "--nt",
action="store_true",
help="Ignore temp() declarations.")
parser.add_argument(
"--version", "-v",
action="version",
......@@ -120,7 +129,8 @@ def main():
configfile=args.config_file,
config={'output_dir': args.output_dir,
'experiment_table': args.experiment_table,
'stranded': stranded},
'stranded': stranded,
'run_celseq2_to_st': args.celseq2_to_st},
printshellcmds=True,
printreason=True,
......@@ -139,7 +149,8 @@ def main():
nodes=args.cores,
force_incomplete=args.rerun_incomplete,
ignore_incomplete=args.ignore_incomplete)
ignore_incomplete=args.ignore_incomplete,
notemp=args.notemp)
sys.exit(0 if success else 1)
......
......@@ -37,7 +37,8 @@ def celseq2stpipeline(celseq2_fpath, spatial_map, out,
genes = map(lambda x: x.replace(' ', '_'), expr_valid.index.values)
colnames = expr_valid.columns.values
fhout.write('{}\t{}\n'.format('', '\t'.join(genes))) # header
# fhout.write('{}\t{}\n'.format('', '\t'.join(genes))) # header
fhout.write('{}\t{}\t{}\n'.format('Row', 'Col', '\t'.join(genes))) # header
for colname in colnames:
tmp = colname.replace('.', '-')
......@@ -46,9 +47,13 @@ def celseq2stpipeline(celseq2_fpath, spatial_map, out,
spot_xy = dict_spatial_seq2xy.get(spot_seq, None)
if not spot_xy:
continue
spot_xy = 'x'.join(map(str, spot_xy))
fhout.write('{}\t{}\n'.format(
spot_xy,
# spot_xy = 'x'.join(map(str, spot_xy))
# fhout.write('{}\t{}\n'.format(
# spot_xy,
# '\t'.join(map(str, spot_expr))))
spot_r, spot_c = spot_xy
fhout.write('{}\t{}\t{}\n'.format(
spot_r, spot_c,
'\t'.join(map(str, spot_expr))))
fhout.close()
......
......@@ -70,6 +70,7 @@ STRANDED = config.get('stranded', 'yes')
num_threads = config.get('num_threads', 16) # 5
verbose = config.get('verbose', True) # True
RUN_CELSEQ2_TO_ST = config.get('run_celseq2_to_st', False)
#######################
## Pipeline reserved ##
......@@ -91,15 +92,16 @@ if R1 is None or R2 is None:
exit(1)
SUBDIR_INPUT = 'input'
SUBDIR_FASTQ = 'small_fq'
SUBDIR_ALIGN = 'small_sam'
SUBDIR_UMI_CNT = 'small_umi_count'
SUBDIR_UMI_SET = 'small_umi_set'
SUBDIR_FASTQ = '.small_fq'
SUBDIR_ALIGN = '.small_sam'
SUBDIR_UMI_CNT = '.small_umi_count'
SUBDIR_UMI_SET = '.small_umi_set'
SUBDIR_EXPR = 'expr'
SUBDIR_LOG = 'small_log'
SUBDIR_ST = 'ST'
SUBDIR_LOG = '.small_log'
SUBDIR_QSUB = 'qsub_log'
SUBDIR_DIAG = 'small_diagnose'
SUBDIR_ANNO = 'annotation'
SUBDIR_DIAG = '.small_diagnose'
SUBDIR_ANNO = '.annotation'
SUBDIR_REPORT = 'report'
SUBDIRS = [SUBDIR_INPUT,
......@@ -121,86 +123,31 @@ SUBDIRS = [SUBDIR_INPUT,
workdir: DIR_PROJ
rule all:
input:
'_done_annotation',
'_done_UMI',
'_done_report',
output:
touch('_DONE'),
run:
if glob.glob('celseq2_job*.sh*'):
mkfolder(SUBDIR_QSUB)
shell('mv -f celseq2_job*.sh* {}'.format(SUBDIR_QSUB))
'''
rule celseq2:
message: 'Finished Entire Pipeline.'
input:
# Annotation
anno = join_path(DIR_PROJ, SUBDIR_ANNO,
base_name(GFF) + '.pickle'),
# anno = rules.cook_annotation.output.anno,
# Expression Matrix per experiment/sample/plate
# csv = rules.summarize_umi_matrix_per_experiment.output.csv,
# hdf = rules.summarize_umi_matrix_per_experiment.output.hdf,
csv = expand(join_path(DIR_PROJ, SUBDIR_EXPR, '{expid}', 'expr.csv'),
expid=list(set(sample_list))),
hdf = expand(join_path(DIR_PROJ, SUBDIR_EXPR, '{expid}', 'expr.h5'),
expid=list(set(sample_list))),
# Expression Matrix per item/pair-of-reads/lane
# csv_item = rules.summarize_umi_matrix_per_item.output.csv_item,
# hdf_item = rules.summarize_umi_matrix_per_item.output.hdf_item,
csv_item = expand(join_path(DIR_PROJ, SUBDIR_EXPR,
'{expid}', '{itemid}', 'expr.csv'), zip,
expid=sample_list, itemid=item_names),
hdf_item = expand(join_path(DIR_PROJ, SUBDIR_EXPR,
'{expid}', '{itemid}', 'expr.h5'), zip,
expid=sample_list, itemid=item_names),
# Diagnose of demultiplexing
# demultiplexing = expand(join_path(DIR_PROJ, SUBDIR_DIAG,
# '{itemid}', 'demultiplexing.log'),
# itemid=item_names),
# # Diagnose of alignment
alignment = expand(join_path(DIR_PROJ, SUBDIR_DIAG,
'{itemid}', 'alignment_diagnose.csv'),
itemid=item_names),
output:
touch('_done_celseq2')
run:
if ALIGNER == 'star':
shell('rm {}'.format('_done_star_genome_loaded'))
print('Free memory loaded by STAR', flush=True)
cmd = 'STAR '
cmd += '--genomeLoad Remove '
cmd += '--genomeDir {STAR_INDEX_DIR} '
shell(cmd)
print_logger('Expression UMI matrix is saved at {}'.format(input.csv))
'''
# rule setup_dir:
# input: SAMPLE_TABLE_FPATH
# output:
# touch('_done_setupdir'),
# dir1 = SUBDIRS,
# dir2 = expand(join_path('{subdir}', '{itemid}'),
# subdir=[SUBDIR_INPUT,
# SUBDIR_FASTQ, SUBDIR_ALIGN, SUBDIR_DIAG,
# SUBDIR_UMI_CNT, SUBDIR_UMI_SET, SUBDIR_LOG],
# itemid=item_names),
# dir3 = expand(join_path(DIR_PROJ, SUBDIR_EXPR, '{expid}', '{itemid}'),
# zip, expid=sample_list, itemid=item_names),
# message: 'Setting up project directory.'
# run:
# for d in output.dir1:
# mkfolder(d)
# for d in output.dir2:
# mkfolder(d)
# for d in output.dir3:
# mkfolder(d)
if RUN_CELSEQ2_TO_ST:
rule all:
input:
# '_done_annotation',
'_done_UMI',
'_done_report',
'_done_ST',
output:
touch('_DONE'),
run:
if glob.glob('celseq2_job*.sh*'):
mkfolder(SUBDIR_QSUB)
shell('mv -f celseq2_job*.sh* {}'.format(SUBDIR_QSUB))
else:
rule all:
input:
# '_done_annotation',
'_done_UMI',
'_done_report',
output:
touch('_DONE'),
run:
if glob.glob('celseq2_job*.sh*'):
mkfolder(SUBDIR_QSUB)
shell('mv -f celseq2_job*.sh* {}'.format(SUBDIR_QSUB))
rule COUNT_MATRIX:
......@@ -225,6 +172,37 @@ rule COUNT_MATRIX:
shell(cmd)
print_logger('UMI-count matrix is saved at {}'.format(input.csv))
if RUN_CELSEQ2_TO_ST:
rule CELSEQ2_TO_ST:
input:
tsv = expand(join_path(DIR_PROJ, SUBDIR_ST, '{expid}', 'ST.tsv'),
expid=list(set(sample_list))),
message: 'Convert to ST format.'
output:
touch('_done_ST')
rule _celseq2_to_st:
input:
hdf = join_path(DIR_PROJ, SUBDIR_EXPR, '{expid}', 'expr.h5'),
flag = '_done_UMI',
output:
tsv = join_path(DIR_PROJ, SUBDIR_ST, '{expid}', 'ST.tsv'),
message: 'UMI-count matrix for ST: {output.tsv} '
params:
exclude_empty_spots = False,
exclude_nondetected_genes = False,
run:
cmd = 'celseq2-to-st {input.hdf} '
cmd += ' {} '.format(BC_INDEX_FPATH)
cmd += ' {output.tsv} '
if params.exclude_empty_spots:
cmd += ' --exclude-empty-spots '
if params.exclude_nondetected_genes:
cmd += ' --exclude-nondetected-genes '
shell(cmd)
rule REPORT_ALIGNMENT_LOG:
input:
report = expand(join_path(DIR_PROJ, SUBDIR_REPORT, '{itemid}',
......@@ -389,9 +367,11 @@ rule COOK_ANNOTATION:
input:
SAMPLE_TABLE_FPATH,
output:
anno_pkl = join_path(DIR_PROJ, SUBDIR_ANNO, base_name(GFF) + '.pickle'),
anno_csv = join_path(DIR_PROJ, SUBDIR_ANNO, base_name(GFF) + '.csv'),
flag = '_done_annotation',
anno_pkl = temp(join_path(DIR_PROJ, SUBDIR_ANNO,
base_name(GFF) + '.pickle')),
anno_csv = temp(join_path(DIR_PROJ, SUBDIR_ANNO,
base_name(GFF) + '.csv')),
# flag = '_done_annotation',
params:
gene_type = GENE_BIOTYPE
priority: 100
......@@ -421,20 +401,16 @@ rule COOK_ANNOTATION:
with open(output.anno_pkl, 'wb') as fh:
pickle.dump( (features, exported_genes), fh)
shell('touch {output.flag}')
rule count_umi:
input:
# gff = join_path(DIR_PROJ, SUBDIR_ANNO,
# base_name(GFF) + '.pickle'),
# flag = '_done_annotation',
gff = rules.COOK_ANNOTATION.output.anno_pkl,
sam = join_path(DIR_PROJ, SUBDIR_ALIGN, '{itemid}', '{bc}.sam'),
output:
umicnt = join_path(DIR_PROJ, SUBDIR_UMI_CNT, '{itemid}', '{bc}.pkl'),
umiset = join_path(DIR_PROJ, SUBDIR_UMI_SET, '{itemid}', '{bc}.pkl'),
aln_diag = join_path(DIR_PROJ, SUBDIR_DIAG, '{itemid}', '{bc}.pkl'),
umicnt = temp(join_path(DIR_PROJ, SUBDIR_UMI_CNT,
'{itemid}', '{bc}.pkl')),
umiset = temp(join_path(DIR_PROJ, SUBDIR_UMI_SET,
'{itemid}', '{bc}.pkl')),
message: 'Counting {input.sam}'
run:
features_f, _ = pickle.load(open(input.gff, 'rb'))
......@@ -446,14 +422,11 @@ rule count_umi:
dumpto=None)
pickle.dump(umi_cnt, open(output.umicnt, 'wb'))
pickle.dump(umi_set, open(output.umiset, 'wb'))
pickle.dump(aln_cnt, open(output.aln_diag, 'wb'))
# - regular umi-count using *_umicnt.pkl -> umi_count_matrix replicates/lanes per plate
rule summarize_umi_matrix_per_item:
input:
# gff = join_path(DIR_PROJ, SUBDIR_ANNO,
# base_name(GFF) + '.pickle'),
gff = rules.COOK_ANNOTATION.output.anno_pkl,
umicnt = dynamic(join_path(DIR_PROJ, SUBDIR_UMI_CNT,
'{itemid}', '{bc}.pkl')),
......@@ -465,7 +438,6 @@ rule summarize_umi_matrix_per_item:
hdf_item = expand(join_path(DIR_PROJ, SUBDIR_EXPR,
'{expid}', '{itemid}', 'expr.h5'), zip,
expid=sample_list, itemid=item_names),
flag = '_done_umimatrix_per_item',
run:
_, export_genes = pickle.load(open(input.gff, 'rb'))
......@@ -490,8 +462,6 @@ rule summarize_umi_matrix_per_item:
expr_df.to_hdf(join_path(DIR_PROJ, SUBDIR_EXPR,
exp_id, item_id, 'expr.h5'), 'table')
shell('touch {output.flag} ')
# - merge umi-count using *_umiset.pkl -> correct umi count per experiment/plate
rule summarize_umi_matrix_per_experiment:
......@@ -507,7 +477,7 @@ rule summarize_umi_matrix_per_experiment:
expid=list(set(sample_list))),
hdf = expand(join_path(DIR_PROJ, SUBDIR_EXPR, '{expid}', 'expr.h5'),
expid=list(set(sample_list))),
flag = '_done_umimatrix_per_experiment',
# flag = '_done_umimatrix_per_experiment',
run:
_, export_genes = pickle.load(open(input.gff, 'rb'))
......@@ -545,7 +515,7 @@ rule summarize_umi_matrix_per_experiment:
expr_df.to_hdf(join_path(DIR_PROJ, SUBDIR_EXPR,
exp_id, 'expr.h5'), 'table')
shell('touch {output.flag} ')
# shell('touch {output.flag} ')
rule report_alignment_log:
......@@ -576,33 +546,6 @@ rule report_alignment_log:
aligner_name=ALIGNER,
savetocsv=report_item)
# rule summarize_alignment_diagnose:
# input:
# aln_diag = dynamic(join_path(DIR_PROJ, SUBDIR_DIAG,
# '{itemid}', '{bc}.pkl')),
# output:
# # Diagnose of alignment
# alignment = expand(join_path(DIR_PROJ, SUBDIR_DIAG,
# '{itemid}', 'alignment_diagnose.csv'),
# itemid=item_names),
# run:
# # item_id -> dict(cell_bc -> Counter(align))
# item_aln_mat = defaultdict(dict)
# for f in input.aln_diag:
# bc_name = base_name(f) # BC-1-xxx
# item_id = base_name(dir_name(f)) # item-1
# item_aln_mat[item_id][bc_name] = pickle.load(open(f, 'rb'))
# for item_id, aln_dict in item_aln_mat.items():
# for bc, cnt in aln_dict.items():
# aln_dict[bc] = pd.Series([cnt[x] for x in aln_diagnose_item],
# index=aln_diagnose_item)
# aln_df = pd.DataFrame(aln_dict, index=aln_diagnose_item).fillna(0)
# aln_df.to_csv(join_path(DIR_PROJ, SUBDIR_DIAG,
# item_id, 'alignment_diagnose.csv'))
rule cleanall:
message: "Remove all files under {DIR_PROJ}"
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment