Source code for src.data.common

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Fri Apr 30 15:13:32 2021

@author: Paolo Cozzi <paolo.cozzi@ibba.cnr.it>

Common stuff for smarter scripts
"""

import logging

from typing import Union
from pathlib import Path
from collections import namedtuple

from mongoengine.queryset import QuerySet

import pandas as pd

from src.features.smarterdb import (
    Dataset, VariantGoat, VariantSheep, SampleSheep, SampleGoat, Location,
    Probeset, SmarterDBException, SEX)

# Get an instance of a logger
logger = logging.getLogger(__name__)

# defining here supported assemblies and version info
AssemblyConf = namedtuple('AssemblyConf', ['version', 'imported_from'])

WORKING_ASSEMBLIES = {
    'OAR3': AssemblyConf('Oar_v3.1', 'SNPchiMp v.3'),
    'OAR4': AssemblyConf('Oar_v4.0', 'SNPchiMp v.3'),
    'ARS1': AssemblyConf('ARS1', 'manifest'),
    'CHI1': AssemblyConf('CHI1.0', 'SNPchiMp v.3')
}

PLINK_SPECIES_OPT = {
    # got this one from documentation + allow-no-sex, since I have
    # phenotypes with ambigous sex
    'Sheep': ['--chr-set', '26', 'no-xy', 'no-mt', '--allow-no-sex'],
    # this let to model 29 chromosome, X, Y, MT and contigs
    'Goat': ['--chr-set', '29', '--allow-extra-chr']
}


[docs]def fetch_and_check_dataset( archive: str, contents: list[str]) -> [Dataset, list[Path]]: """Common operations on dataset: fetch a dataset by file (submitted archive), check that working dir exists and required file contents is in dataset. Test and get full path of required files Args: archive (str): the dataset archive (file) contents (list): a list of files which beed to be defined in dataset Returns: Dataset: a dataset instance list[Path]: a list of Path of required files """ # get the dataset object dataset = Dataset.objects(file=archive).get() logger.debug(f"Found {dataset}") # check for working directory working_dir = dataset.working_dir if not working_dir.exists(): raise FileNotFoundError( f"Could find dataset directory '{working_dir}'") # check files are in dataset not_found = [] contents_path = [] for item in contents: if item not in dataset.contents: logger.error(f"Couldn't find '{item}' in dataset: '{dataset}'") not_found.append(item) continue item_path = working_dir / item if not item_path.exists(): logger.error( f"Couldn't find '{item_path}' in dir: '{working_dir}'") not_found.append(item) continue contents_path.append(item_path) if len(not_found) > 0: raise FileNotFoundError( f"Couldn't find '{not_found}'") return dataset, contents_path
[docs]def deal_with_datasets( src_dataset: str, dst_dataset: str, datafile: str) -> [Dataset, Dataset, Path]: """Check source and destination dataset with its content""" # custom method to check a dataset and ensure that needed stuff exists src_dataset, [datapath] = fetch_and_check_dataset( archive=src_dataset, contents=[datafile] ) if not dst_dataset: # destination is the same of origin, if not provided dst_dataset = src_dataset else: # this will be the dataset used to define samples dst_dataset, _ = fetch_and_check_dataset( archive=dst_dataset, contents=[] ) return src_dataset, dst_dataset, datapath
[docs]def get_variant_species(species: str) -> Union[VariantSheep, VariantGoat]: """Get a species name in input. It return the proper VariantSpecies class Args: species (str): the species name Returns: Union[VariantSheep, VariantGoat]: a VariantSpecies class """ # fix input parameters species = species.capitalize() if species == 'Sheep': VariantSpecie = VariantSheep elif species == 'Goat': VariantSpecie = VariantGoat else: raise NotImplementedError(f"'{species}' import not yet implemented") return VariantSpecie
[docs]def get_sample_species(species: str) -> Union[SampleSheep, SampleGoat]: """Get a species name in input. It return the proper SampleSpecies class Args: species (str): the species name Returns: Union[SampleSheep, SampleGoat]: a SampleSpecies class """ # fix input parameters species = species.capitalize() # mind dataset species if species == 'Sheep': SampleSpecie = SampleSheep elif species == 'Goat': SampleSpecie = SampleGoat else: raise NotImplementedError( f"'{species}' import not yet implemented") return SampleSpecie
[docs]def pandas_open(datapath: Path, **kwargs) -> pd.DataFrame: """Open an excel or csv file with pandas and returns a dataframe Args: datapath (Path): the path of the file kwargs (dict): additional pandas options Returns: pd.DataFrame: file content as a pandas dataframe """ data = None if datapath.suffix in ['.xls', '.xlsx']: with open(datapath, "rb") as handle: data = pd.read_excel(handle, **kwargs) elif datapath.suffix == '.csv': with open(datapath, "r") as handle: # set separator to None force pandas to use csv.Sniffer data = pd.read_csv(handle, sep=None, engine='python', **kwargs) else: raise Exception( f"'{datapath.suffix}' file type not managed" ) return data
[docs]def new_variant( variant: Union[VariantSheep, VariantGoat], location: Location): variant.locations.append(location) # set the illumina_top attribute relying on the first location variant.illumina_top = location.illumina_top logger.debug(f"adding {variant} to database") variant.save()
[docs]def update_variant( qs: QuerySet, variant: Union[VariantSheep, VariantGoat], location: Location) -> bool: """Update an existing variant (if necessary)""" record = qs.get() logger.debug(f"found {record} in database") update_record = False # check that the snp I want to update has the same illumina_top # allele with the new location: if not no update! if record.illumina_top != location.illumina_top: logger.error( f"illumina_top alleles between variant and new location don't " f"match: {record.illumina_top} <> {location.illumina_top}") logger.warning(f"ignoring {variant}") return update_record # check chip_name in variant list record, updated = update_chip_name(variant, record) if updated: update_record = True # update sequence record record, updated = update_sequence(variant, record) if updated: update_record = True # test for rs_id: record, updated = update_rs_id(variant, record) if updated: update_record = True # update affymetrix record (if any) record, updated = update_affymetrix_record(variant, record) if updated: update_record = True # I chose to not update other values, I suppose they be the same # However check for locations record, updated = update_location(location, record) if updated: update_record = True if update_record: record.save() return update_record
[docs]def update_chip_name( variant: Union[VariantSheep, VariantGoat], record: Union[VariantSheep, VariantGoat] ) -> [Union[VariantSheep, VariantGoat], bool]: variant_set = set(variant.chip_name) record_set = set(record.chip_name) updated = False # get new items as a difference of two sets new_chips = variant_set - record_set if len(new_chips) > 0: # this will append the resulting set as a list record.chip_name += list(new_chips) updated = True return record, updated
[docs]def update_sequence( variant: Union[VariantSheep, VariantGoat], record: Union[VariantSheep, VariantGoat] ) -> [Union[VariantSheep, VariantGoat], bool]: updated = False if variant.sequence and variant.sequence != record.sequence: record.sequence.update(variant.sequence) updated = True return record, updated
[docs]def update_probesets( variant_attr: list[Probeset], record_attr: list[Probeset] ) -> bool: """Update probeset relying on object references""" updated = False logger.debug(f"Got variant: {variant_attr} and record: {record_attr}") # get the variant probeset variant_probeset = variant_attr[0] # now, try to get the variant probesed for the same chip record_probesets = list( filter( lambda probeset: probeset.chip_name == variant_probeset.chip_name, record_attr) ) if not record_probesets: # I don't have this probeset_id for this chip record_attr.append(variant_probeset) updated = True else: # get the first item record_probeset = record_probesets[0] record_set = set(record_probeset.probeset_id) variant_set = set(variant_probeset.probeset_id) # get new items as a difference of two sets new_probeset_ids = variant_set - record_set if len(new_probeset_ids) > 0: # this will append the resulting set as a list record_probeset.probeset_id += list(new_probeset_ids) updated = True return updated
[docs]def update_affymetrix_record( variant: Union[VariantSheep, VariantGoat], record: Union[VariantSheep, VariantGoat] ) -> [Union[VariantSheep, VariantGoat], bool]: updated = False for key in ['probesets', 'affy_snp_id', 'cust_id']: variant_attr = getattr(variant, key) record_attr = getattr(record, key) if key == 'probesets' and variant_attr: if record_attr: # this will make an update relying on object references updated = update_probesets(variant_attr, record_attr) else: # this is when I add a probeset for an Illumina SNP setattr(record, key, variant_attr) updated = True elif key == 'cust_id': # only update cust_id if different from illumina name and # if necessary if variant_attr and variant_attr not in [record.name, record_attr]: if record_attr: logger.warning( f"Updating {key}: '{variant_attr}' with " f"'{record_attr}'") setattr(record, key, variant_attr) updated = True elif key == 'affy_snp_id': if variant_attr and variant_attr != record_attr: if record_attr: raise SmarterDBException( f"Error with {key}: '{variant_attr}' and " f"'{record_attr}': 'affy_snp_id' already defined!") setattr(record, key, variant_attr) updated = True return record, updated
[docs]def update_location( location: Location, variant: Union[VariantSheep, VariantGoat], force_update: bool = False ) -> [Union[VariantSheep, VariantGoat], bool]: """ Check provided Location with variant Locations: append new object or update a Location object if more recent than the data stored in database Parameters ---------- location : Location The location to test against the database. variant : Union[VariantSheep, VariantGoat] The variant to test. force_update : bool, optional Force location update. The default is False. Returns ------- [Union[VariantSheep, VariantGoat], bool] A list with the updated VariantSpecie object and a boolean value which is True if the location was updated """ updated = False if location.date: # make location.date offset-naive # https://stackoverflow.com/a/796019 location.date = location.date.replace(tzinfo=None) # get the old location as index try: index = variant.get_location_index( version=location.version, imported_from=location.imported_from) if force_update: # update location logger.warning( f"Force update for '{variant}' location") variant.locations[index] = location updated = True return variant, updated # ok get the old location and check with the new one old_location = variant.locations[index] if old_location == location: logger.debug("Locations match") # upgrade locations relying dates else: logger.warning( f"Locations differ for '{variant.name}': {location} <> " f"{old_location}" ) # check if values are defined if old_location.date and location.date: if old_location.date < location.date: # update location logger.warning( f"Replacing location for '{variant}' since is newer") variant.locations[index] = location updated = True else: logger.debug( f"New location is not more recent than the old " f"({location.date.date()}" f" <> {old_location.date.date()}), ignoring location") else: logger.debug("Skip location comparison: dates not set") except SmarterDBException as exc: # if a index does not exist, then insert feature without warnings logger.debug(exc) # if I'm impotring Affymetrix data, I could have a variant but not # a location to check. So add a location to variant logger.debug(f"Append location {location} to variant {variant}") variant.locations.append(location) updated = True return variant, updated
[docs]def update_rs_id( variant: Union[VariantSheep, VariantGoat], record: Union[VariantSheep, VariantGoat] ) -> [Union[VariantSheep, VariantGoat], bool]: updated = False if variant.rs_id: if not record.rs_id: logger.debug(f"Setting '{variant.rs_id}' to '{record}'") record.rs_id = variant.rs_id updated = True elif variant.rs_id[0] not in record.rs_id: logger.debug(f"Appending '{variant.rs_id[0]}' to '{record}'") record.rs_id.append(variant.rs_id[0]) updated = True else: logger.debug(f"Ignoring '{variant.rs_id[0]}: ({record})'") return record, updated
[docs]def deal_with_sex_and_alias( sex_column: str, alias_column: str, row: pd.Series): """ Deal with sex and alias parameters Parameters ---------- sex_column : str The sex column label. alias_column : str The alias column label. row : Series A row of metadata file. Returns ------- sex : SEX A SEX instance. alias : str The alias read from metadata table could be None. """ # Have I sex? search for a sex column if provided sex = None if sex_column: sex = str(row.get(sex_column)).strip() sex = SEX.from_string(sex) # drop sex column if unknown if sex == SEX.UNKNOWN: sex = None alias = None if alias_column: value = row.get(alias_column) if pd.notnull(value) and pd.notna(value): alias = value return sex, alias