Skip to content
Snippets Groups Projects
check_design.py 6.42 KiB
Newer Older
#!/usr/bin/env python3

Venkat Malladi's avatar
Venkat Malladi committed
#
# * --------------------------------------------------------------------------
# * Licensed under MIT (https://git.biohpc.swmed.edu/BICF/Astrocyte/chipseq_analysis/LICENSE.md)
# * --------------------------------------------------------------------------
#

'''Check if design file is correctly formatted and matches files list.'''

import argparse
import logging
import pandas as pd

EPILOG = '''
For more details:
        %(prog)s --help
'''

# SETTINGS

logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())
logger.propagate = False
logger.setLevel(logging.INFO)


def get_args():
    '''Define arguments.'''
    parser = argparse.ArgumentParser(
        description=__doc__, epilog=EPILOG,
        formatter_class=argparse.RawDescriptionHelpFormatter)

    parser.add_argument('-d', '--design',
                        help="The design file to run QC (tsv format).",
                        required=True)

    parser.add_argument('-f', '--fastq',
                        help="File with list of fastq files (tsv format).",
                        required=True)

    parser.add_argument('-p', '--paired',
                        help="True/False if paired-end or single end.",
                        default=False,
                        action='store_true')

    args = parser.parse_args()
    return args


def check_design_headers(design, paired):
    '''Check if design file conforms to sequencing type.'''

    # Default headers
    design_template = [
        'sample_id',
        'biosample',
        'factor',
        'treatment',
        'replicate',
        'control_id',
        'fastq_read1']

    design_headers = list(design.columns.values)

    if paired:  # paired-end data
        design_template.extend(['fastq_read2'])

    # Check if headers
    logger.info("Running header check.")

    missing_headers = set(design_template) - set(design_headers)

    if len(missing_headers) > 0:
        logger.error('Missing column headers: %s', list(missing_headers))
        raise Exception("Missing column headers: %s" % list(missing_headers))


def check_samples(design):
    '''Check if design file has the correct sample name mapping.'''

    logger.info("Running sample check.")

    samples = design.groupby('sample_id') \
                            .apply(list)

    malformated_samples = []
    chars = set('-.')
    for sample in samples.index.values:
Venkat Malladi's avatar
Venkat Malladi committed
        if(any(char.isspace() for char in sample) | any((char in chars) for char in sample)):
            malformated_samples.append(sample)

    if len(malformated_samples) > 0:
        logger.error('Malformed samples from design file: %s', list(malformated_samples))
        raise Exception("Malformed samples from design file: %s" %
                        list(malformated_samples))


def check_experiments(design):
    '''Check if design file has the correct experiment name mapping.'''

    logger.info("Running experiment check.")

    experiments = design.groupby('experiment_id') \
                            .apply(list)

    malformated_experiments = []
    chars = set('-.')
    for experiment in experiments.index.values:
Venkat Malladi's avatar
Venkat Malladi committed
        if(any(char.isspace() for char in experiment) | any((char in chars) for char in experiment)):
            malformated_experiments.append(experiment)

    if len(malformated_experiments) > 0:
        logger.error('Malformed experiment from design file: %s', list(malformated_experiments))
        raise Exception("Malformed experiment from design file: %s" %
                        list(malformated_experiments))


def check_controls(design):
    '''Check if design file has the correct control mapping.'''

    logger.info("Running control check.")

    missing_controls = set(design['control_id']) - set(design['sample_id'])

    if len(missing_controls) > 0:
        logger.error('Missing control experiments: %s', list(missing_controls))
        raise Exception("Missing control experiments: %s" %
                        list(missing_controls))
def check_replicates(design):
    '''Check if design file has unique replicate numbersfor an experiment.'''

    logger.info("Running replicate check.")

    experiment_replicates = design.groupby('experiment_id')['replicate'] \
                            .apply(list)

    duplicated_replicates = []
    for experiment in experiment_replicates.index.values:
        replicates = experiment_replicates[experiment]
        unique_replicates = set(replicates)
        if len(replicates) != len(unique_replicates):
            duplicated_replicates.append(experiment)
    if len(duplicated_replicates) > 0:
        logger.error('Duplicate replicates in experiments: %s', list(duplicated_replicates))
        raise Exception("Duplicate replicates in experiments: %s" %
                        list(duplicated_replicates))


def check_files(design, fastq, paired):
    '''Check if design file has the files found.'''

    logger.info("Running file check.")

    if paired:  # paired-end data
        files = list(design['fastq_read1']) + list(design['fastq_read2'])
    else:  # single-end data
        files = design['fastq_read1']

    files_found = fastq['name']

    missing_files = set(files) - set(files_found)

    if len(missing_files) > 0:
        logger.error('Missing files from design file: %s', list(missing_files))
        raise Exception("Missing files from design file: %s" %
                        list(missing_files))
    else:
        file_dict = fastq.set_index('name').T.to_dict()

        design['fastq_read1'] = design['fastq_read1'] \
                                .apply(lambda x: file_dict[x]['path'])
        if paired:  # paired-end data
            design['fastq_read2'] = design['fastq_read2'] \
                                    .apply(lambda x: file_dict[x]['path'])
    return design


def main():
    args = get_args()
    design = args.design
    fastq = args.fastq
    paired = args.paired

    # Create a file handler
    handler = logging.FileHandler('design.log')
    logger.addHandler(handler)

Venkat Malladi's avatar
Venkat Malladi committed
    design_df = pd.read_csv(design, sep='\t')
    fastq_df = pd.read_csv(fastq, sep='\t', names=['name', 'path'])

    # Check design file
    check_design_headers(design_df, paired)
    check_controls(design_df)
    check_replicates(design_df)
Venkat Malladi's avatar
Venkat Malladi committed
    new_design_df = check_files(design_df, fastq_df, paired)

    # Write out new design file
    new_design_df.to_csv('design.tsv', header=True, sep='\t', index=False)


if __name__ == '__main__':
    main()