#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Fri Apr 9 15:58:40 2021
@author: Paolo Cozzi <paolo.cozzi@ibba.cnr.it>
Try to model data operations on plink files
"""
import re
import csv
import logging
from pathlib import Path
from typing import Union
from collections import namedtuple
from dataclasses import dataclass
from tqdm import tqdm
from mongoengine.errors import DoesNotExist, MultipleObjectsReturned
from mongoengine.queryset import Q
from plinkio import plinkfile
from .snpchimp import clean_chrom
from .smarterdb import (
VariantSheep, SampleSheep, Breed, Dataset, SmarterDBException, SEX,
VariantGoat, SampleGoat, Location, get_sample_type)
from .utils import TqdmToLogger, skip_comments, text_or_gzip_open
from .illumina import read_snpList, read_illuminaRow
from .affymetrix import read_affymetrixRow
# Get an instance of a logger
logger = logging.getLogger(__name__)
# a generic class to deal with assemblies
AssemblyConf = namedtuple('AssemblyConf', ['version', 'imported_from'])
class CodingException(Exception):
pass
class PlinkIOException(Exception):
pass
class IlluminaReportException(Exception):
pass
class AffyReportException(Exception):
pass
[docs]@dataclass
class MapRecord():
chrom: str
name: str
cm: float
position: int
def __post_init__(self):
# types are annotations. So, enforce position type:
if isinstance(self.position, str):
if self.position.isnumeric():
self.position = int(self.position)
if isinstance(self.cm, str):
if self.cm.isnumeric():
self.cm = float(self.cm)
[docs]class SmarterMixin():
"""Common features of a Smarter related dataset file"""
_species = None
mapdata = list()
src_locations = list()
dst_locations = list()
filtered = set()
variants_name = list()
VariantSpecies = None
SampleSpecies = None
chip_name = None
# this need to be set to the proper read genotype method
read_genotype_method = None
@property
def species(self):
return self._species
@species.setter
def species(self, species):
if not species:
# avoid to check a None value
return
# determine the SampleClass
elif species == 'Sheep':
self.VariantSpecies = VariantSheep
self.SampleSpecies = SampleSheep
elif species == 'Goat':
self.VariantSpecies = VariantGoat
self.SampleSpecies = SampleGoat
else:
raise NotImplementedError(
f"Species '{species}' not yet implemented"
)
self._species = species
[docs] def search_breed(self, fid, dataset, *args, **kwargs):
"""Get breed relying aliases and dataset"""
# this is a $elemMatch query
breed = Breed.objects(
aliases__match={'fid': fid, 'dataset': dataset}).get()
logger.debug(f"Found breed {breed}")
return breed
[docs] def search_country(self, dataset: Dataset, breed: Breed):
# this will be the default value
country = dataset.country
# search for country in my aliases
for alias in breed.aliases:
if alias.dataset != dataset:
continue
if alias.country:
# override country if defined
country = alias.country
return country
[docs] def update_mapfile(self, outputfile: str):
# helper function to get default value for cM
def get_cM(record):
"""Returns distance in cM or '0' (default for map file)"""
if hasattr(record, 'cm'):
return record.cm
return '0'
with open(outputfile, 'w') as handle:
writer = csv.writer(handle, delimiter=' ', lineterminator="\n")
counter = 0
for idx, record in enumerate(self.mapdata):
if idx in self.filtered:
logger.warning(f"Skipping {record}: not in database")
continue
# get a location relying on indexes
location = self.dst_locations[idx]
# get the tracked variant name relying on indexes
variant_name = self.variants_name[idx]
# a new record in mapfile
writer.writerow([
clean_chrom(location.chrom),
variant_name,
get_cM(record),
location.position
])
counter += 1
logger.info(f"Wrote {counter} SNPs in mapfile")
def _deal_with_relationship(self, line: list, dataset: Dataset):
# deal with special items
sex = None
father_id = None
mother_id = None
# test with sex column
if int(line[4]) in [1, 2]:
sex = SEX(int(line[4]))
# test with father id
if str(line[2]) != '0':
qs = self.SampleSpecies.objects(
original_id=line[2], dataset=dataset)
if qs.count() == 1:
father_id = qs.get()
# test with mother id
if str(line[3]) != '0':
qs = self.SampleSpecies.objects(
original_id=line[3], dataset=dataset)
if qs.count() == 1:
mother_id = qs.get()
return sex, father_id, mother_id
def _create_sample(
self,
line: list,
dataset: Dataset,
breed: Breed,
sex: SEX,
father_id: Union[SampleSheep, SampleGoat],
mother_id: Union[SampleSheep, SampleGoat]) -> Union[
SampleSheep, SampleGoat]:
"""
Helper method to create a new sample from a PED line
Parameters
----------
line : list
A PED line.
dataset : Dataset
The Dataset instance this sample belong to.
breed : Breed
The Breed instance of this sample.
sex : SEX
The sex of this sample (if any).
father_id : Union[SampleSheep, SampleGoat]
The father of this sample (if any).
mother_id : Union[SampleSheep, SampleGoat]
The mother of this sample (if any).
Returns
-------
Union[SampleSheep, SampleGoat]
The created sample instance.
"""
# do I have a multi country dataset? try to determine the country
country = self.search_country(dataset, breed)
# test if foreground or background dataset
type_ = get_sample_type(dataset)
# insert sample into database
logger.info(f"Registering sample '{line[1]}' in database")
sample = self.SampleSpecies(
original_id=line[1],
country=country,
breed=breed.name,
breed_code=breed.code,
dataset=dataset,
type_=type_,
chip_name=self.chip_name,
sex=sex,
father_id=father_id,
mother_id=mother_id
)
sample.save()
logger.debug(f"Increment '{breed}' n_individuals counter")
breed.n_individuals += 1
breed.save()
return sample
[docs] def get_or_create_sample(
self,
line: list,
dataset: Dataset,
breed: Breed,
sample_field: str = "original_id",
create_sample: bool = False) -> Union[SampleSheep, SampleGoat]:
"""
Get a sample from database or create a new one (if create_sample
parameter flag is provided)
Parameters
----------
line : list
A ped line as a list.
dataset : Dataset
The dataset object this sample belongs to.
breed : Breed
The Breed object of such sample.
sample_field : str, optional
Search sample name within this field. The default is "original_id".
create_sample : bool, optional
Create a sample if not found in database. The default is False.
Raises
------
SmarterDBException
Raised if more than one sample is retrieved.
Returns
-------
sample : Union[SampleSheep, SampleGoat]
A SampleSheep or SampleGoat object for a Sample object found or
created. None if no sample is found and create_sample if False.
"""
# search for sample in database
qs = self.SampleSpecies.objects(
dataset=dataset,
breed_code=breed.code,
**{sample_field: line[1]}
)
sex, father_id, mother_id = self._deal_with_relationship(
line, dataset)
# the sample I want to return
sample = None
if qs.count() == 1:
logger.debug(f"Sample '{line[1]}' found in database")
sample = qs.get()
# update records if necessary
if sample.father_id != father_id or sample.mother_id != mother_id:
logger.warning(f"Update relationships for sample '{line[1]}'")
sample.father_id = father_id
sample.mother_id = mother_id
sample.save()
elif qs.count() == 0:
if not create_sample:
logger.warning(f"Sample '{line[1]}' not found in database")
else:
sample = self._create_sample(
line, dataset, breed, sex, father_id, mother_id)
else:
raise SmarterDBException(
f"Got {qs.count()} results for '{line[1]}'")
return sample
# helper function
[docs] def skip_index(self, idx):
"""Skip a certain SNP reling on its position"""
# skip this variant (even in ped)
self.filtered.add(idx)
# need to add an empty value in locations (or my indexes
# won't work properly). The same for variants name
self.src_locations.append(None)
self.dst_locations.append(None)
self.variants_name.append(None)
[docs] def make_query_args(
self, src_assembly: AssemblyConf, dst_assembly: AssemblyConf):
"""Generate args to select variants from database"""
query = []
# construct the query arguments to search into database
if dst_assembly:
query = [Q(locations__match=src_assembly._asdict()) &
Q(locations__match=dst_assembly._asdict())]
else:
query = [Q(locations__match=src_assembly._asdict())]
return query
[docs] def make_query_kwargs(
self, search_field: str, record: MapRecord, chip_name: str):
"""Generate kwargs to select variants from database"""
if search_field == 'probeset_id':
# construct a custom query to find the selected SNP
return {
"probesets__match": {
'chip_name': chip_name,
'probeset_id': record.name
}
}
else:
return {
search_field: record.name,
"chip_name": chip_name
}
[docs] def fetch_coordinates(
self,
src_assembly: AssemblyConf,
dst_assembly: AssemblyConf = None,
search_field: str = "name",
chip_name: str = None,
*args, **kwargs):
"""Search for variants in smarter database
Args:
src_assembly (AssemblyConf): the source data assembly version
dst_assembly (AssemblyConf): the destination data assembly version
search_field (str): search variant by field (def. "name")
chip_name (str): limit search to this chip_name
"""
# reset meta informations
self.src_locations = list()
self.dst_locations = list()
self.filtered = set()
self.variants_name = list()
# get the query arguments relying on assemblies
query = self.make_query_args(src_assembly, dst_assembly)
tqdm_out = TqdmToLogger(logger, level=logging.INFO)
for idx, record in enumerate(tqdm(
self.mapdata, file=tqdm_out, mininterval=1)):
try:
# additional arguments used in query
additional_arguments = self.make_query_kwargs(
search_field, record, chip_name)
# remove empty additional arguments if any
variant = self.VariantSpecies.objects(
*query,
**{k: v for k, v in additional_arguments.items() if v}
).get()
except DoesNotExist as e:
logger.debug(
f"Couldn't find '{record.name}' with '{query}'"
f"using '{additional_arguments}': {e}")
self.skip_index(idx)
# don't check location for missing SNP
continue
except MultipleObjectsReturned as e:
logger.warning(
f"Got multiple {record.name} with '{query}'"
f"using '{additional_arguments}': {e}")
self.skip_index(idx)
# don't check location for missing SNP
continue
# get the proper locations and track it
src_location = variant.get_location(**src_assembly._asdict())
self.src_locations.append(src_location)
if dst_assembly:
dst_location = variant.get_location(**dst_assembly._asdict())
self.dst_locations.append(dst_location)
# track variant.name read from database (useful when searching
# using probeset_id)
self.variants_name.append(variant.name)
# for simplicity
if not dst_assembly:
self.dst_locations = self.src_locations
logger.debug(
f"collected {len(self.dst_locations)} with '{query}' "
f"using '{additional_arguments}'")
[docs] def fetch_coordinates_by_positions(
self,
src_assembly: AssemblyConf,
dst_assembly: AssemblyConf = None):
"""
Search for variant in smarter database relying on positions
Parameters
----------
src_assembly : AssemblyConf
the source data assembly version.
dst_assembly : AssemblyConf, optional
the destination data assembly version. The default is None.
Returns
-------
None.
"""
# reset meta informations
self.src_locations = list()
self.dst_locations = list()
self.filtered = set()
self.variants_name = list()
tqdm_out = TqdmToLogger(logger, level=logging.INFO)
for idx, record in enumerate(tqdm(
self.mapdata, file=tqdm_out, mininterval=1)):
location = src_assembly._asdict()
location["chrom"] = record.chrom
location["position"] = record.position
# construct the final query
# construct the query arguments to search into database
if dst_assembly:
query = [Q(locations__match=location) &
Q(locations__match=dst_assembly._asdict())]
else:
query = [Q(locations__match=location)]
try:
variant = self.VariantSpecies.objects(
*query
).get()
except DoesNotExist as e:
logger.debug(
f"Couldn't find '{record.chrom}:{record.position}' in "
f"'{src_assembly}' assembly: {e}")
self.skip_index(idx)
# don't check location for missing SNP
continue
except MultipleObjectsReturned as e:
# tecnically, I could return the first item I found in
# my database. Skip, for the moment
logger.debug(
f"Got multiple records for position '{record.chrom}:"
f"{record.position}' in '{src_assembly}'"
f" assembly: {e}")
self.skip_index(idx)
# don't check location for missing SNP
continue
# get the proper locations and track it
src_location = variant.get_location(**src_assembly._asdict())
self.src_locations.append(src_location)
if dst_assembly:
dst_location = variant.get_location(**dst_assembly._asdict())
self.dst_locations.append(dst_location)
# track variant.name read from database (useful when searching
# using probeset_id)
self.variants_name.append(variant.name)
# for simplicity
if not dst_assembly:
self.dst_locations = self.src_locations
logger.debug(
f"collected {len(self.dst_locations)} using positions "
f"in '{src_assembly}' assembly")
def _to_top(
self, index: int, genotype: list, coding: str,
location: Location) -> list:
"""
Check genotype with coding and returns illumina_top alleles
Parameters
----------
index: int
The i-th SNP received
genotype : list
The genotype as a list (ex: ['T', 'C'])
coding : str
The coding input type ('top', 'forward', ...)
location : Location
A smarterdb location used to check input genotype and coding and
to return the corresponing illumina top genotype (ex ['A', 'G'])
Raises
------
CodingException
Raised when input genotype hasn't a match in the smarter database
with the provided coding
NotImplementedError
A coding format not yet supported (implemented)
Returns
-------
list
The illumina top genotype as a list (ex ['A', 'G'])
"""
# for semplicity
a1, a2 = genotype
# the returned value
top_genotype = []
# define a dictionary with useful data
config = {
'forward': {
'check': 'is_forward',
'convert': 'forward2top',
'message': (
f"Error for SNP {index}: '{self.mapdata[index].name}': "
f"{a1}/{a2} <> {location.illumina_forward}"
),
'exception': (
f"SNP '{self.mapdata[index].name}' is "
"not in illumina forward format"
)
},
'ab': {
'check': 'is_ab',
'convert': 'ab2top',
'message': (
f"Error for SNP {index}: '{self.mapdata[index].name}': "
f"{a1}/{a2} <> A/B"
),
'exception': (
f"SNP '{self.mapdata[index].name}' is "
"not in illumina ab format"
)
},
'affymetrix': {
'check': 'is_affymetrix',
'convert': 'affy2top',
'message': (
f"Error for SNP {index}: '{self.mapdata[index].name}': "
f"{a1}/{a2} <> {location.affymetrix_ab}"
),
'exception': (
f"SNP '{self.mapdata[index].name}' is "
"not in affymetrix format"
)
},
'illumina': {
'check': 'is_illumina',
'convert': 'illumina2top',
'message': (
f"Error for SNP {index}: '{self.mapdata[index].name}': "
f"{a1}/{a2} <> {location.illumina}"
),
'exception': (
f"SNP '{self.mapdata[index].name}' is "
"not in illumina format"
)
}
}
if coding == 'top':
if not location.is_top(genotype):
logger.debug(
f"Error for SNP {index}: '{self.mapdata[index].name}': "
f"{a1}/{a2} <> {location.illumina_top}"
)
raise CodingException(
f"SNP '{self.mapdata[index].name}' is "
"not in illumina top format")
# allele coding is the same received as input
top_genotype = genotype
elif coding in config:
# get the proper method to check genotype
check = getattr(location, config[coding]['check'])
if not check(genotype):
logger.debug(config[coding]['message'])
raise CodingException(config[coding]['exception'])
# get the proper method to convert genotype
convert = getattr(location, config[coding]['convert'])
# change the allele coding
top_genotype = convert(genotype)
else:
raise NotImplementedError(f"Coding '{coding}' not supported")
return top_genotype
def _process_genotypes(self, line: list, coding: str, ignore_errors=False):
new_line = line.copy()
# ok now is time to update genotypes
for i in range(len(self.mapdata)):
# replacing the i-th genotypes. Skip 6 columns
a1 = new_line[6+i*2]
a2 = new_line[6+i*2+1]
genotype = [a1, a2]
# xor condition: https://stackoverflow.com/a/433161/4385116
if (a1 in ["0", "-"]) != (a2 in ["0", "-"]):
logger.warning(
f"Found half-missing SNP in {new_line[1]}: {i*2}: "
f"[{a1}/{a2}]. Forcing SNP to be MISSING")
new_line[6+i*2], new_line[6+i*2+1] = ["0", "0"]
continue
# is this snp filtered out
if i in self.filtered:
logger.debug(
f"Skipping {self.mapdata[i].name}:[{a1}/{a2}] "
"not in database!"
)
continue
# get the proper position
location = self.src_locations[i]
# check and return illumina top genotype
try:
top_genotype = self._to_top(i, genotype, coding, location)
# replace alleles in ped line with top genotype
new_line[6+i*2], new_line[6+i*2+1] = top_genotype
except CodingException as e:
if ignore_errors:
# ok, replace with a missing genotype
logger.debug(
f"Ignoring code check in {new_line[1]}: {i*2}: "
f"[{a1}/{a2}]. Forcing SNP to be MISSING")
new_line[6+i*2], new_line[6+i*2+1] = ["0", "0"]
continue
else:
# this is the normal case
raise e
return new_line
def _check_file_sizes(self, line):
# check genotypes size 2*mapdata (diploidy) + 6 extra columns:
if len(line) != len(self.mapdata)*2 + 6:
logger.critical(
f"SNPs sizes don't match in '{self.mapfile}' "
f"and '{self.pedfile}'")
logger.critical("Please check file contents")
raise PlinkIOException(".ped line size doens't match .map size")
def _process_relationship(self, line, sample):
# create a copy of the original object
new_line = line.copy()
# add father or mather to ped line (if I can)
if str(line[2]) != '0':
if sample.father_id:
new_line[2] = sample.father_id.smarter_id
else:
logger.warning(
f"Cannot resolve relationship for father {line[2]}")
new_line[2] = '0'
if str(line[3]) != '0':
if sample.mother_id:
new_line[3] = sample.mother_id.smarter_id
else:
logger.warning(
f"Cannot resolve relationship for mother {line[3]}")
new_line[3] = '0'
return new_line
def _process_pedline(
self,
line: list,
dataset: Dataset,
coding: str,
create_sample: bool = False,
sample_field: str = "original_id",
ignore_coding_errors: bool = False):
self._check_file_sizes(line)
logger.debug(f"Processing {line[:10]+ ['...']}")
# check for breed in database reling on fid.
try:
breed = self.search_breed(fid=line[0], dataset=dataset)
except DoesNotExist:
# it's possible that the breed exists but not with the desidered
# dataset (maybe I want to skip it)
logger.debug(
f"Couldn't find breed_code '{line[0]}' for '{dataset}'")
return None
# check for sample in database
sample = self.get_or_create_sample(
line, dataset, breed, sample_field, create_sample)
# if I couldn't find a registered sample (in such case)
# i can skip such record
if not sample:
logger.debug(
f"Couldn't find sample '{line[1]}' for '{dataset}'")
return None
# a new line obj
new_line = line.copy()
# updating ped line with smarter ids
new_line[0] = breed.code
new_line[1] = sample.smarter_id
# replace relationship if possible
new_line = self._process_relationship(new_line, sample)
# check and fix genotypes if necessary
new_line = self._process_genotypes(
new_line, coding, ignore_coding_errors)
# update ped line with sex accordingly to db informations
if sample.sex and int(new_line[4]) != sample.sex.value:
logger.warning(
f"Update sex for sample '{sample} {new_line[4]} -> "
f"{sample.sex.value}'")
new_line[4] = str(sample.sex.value)
# need to remove filtered snps from ped line
for index in sorted(self.filtered, reverse=True):
# index is snp position. Need to delete two fields
del new_line[6+index*2+1]
del new_line[6+index*2]
return new_line
[docs] def update_pedfile(
self,
outputfile: str,
dataset: Dataset,
coding: str,
create_samples: bool = False,
sample_field: str = "original_id",
ignore_coding_errors: bool = False,
*args,
**kwargs):
"""
Write a new pedfile relying on illumina_top genotypes and coordinates
stored in smarter database
Args:
outputfile (str): write ped to this path (overwrite if exists)
dataset (Dataset): the dataset we are converting
coding (str): the source coding (could be 'top', 'ab', 'forward')
create_samples (bool): create samples if not exist (useful to
create samples directly from ped file)
sample_field (str): search samples using this attribute (def.
'original_id')
ignore_coding_errors (bool): ignore coding related errors (no
more exceptions when genotypes don't match)
"""
if ignore_coding_errors:
logger.warning(
"Coding check is disabled! wrong genotypes will not "
"throw errors!")
with open(outputfile, "w") as target:
writer = csv.writer(
target, delimiter=' ', lineterminator="\n")
processed = 0
for line in self.read_genotype_method(
dataset=dataset,
sample_field=sample_field,
*args, **kwargs):
# covert the ped line with the desidered format
new_line = self._process_pedline(
line,
dataset,
coding,
create_samples,
sample_field,
ignore_coding_errors)
if new_line:
# write updated line into updated ped file
logger.debug(
f"Writing: {new_line[:10]+ ['...']} "
f"({int((len(new_line)-6)/2)} SNPs)")
writer.writerow(new_line)
processed += 1
else:
logger.warning(
f"Skipping: {line[:10]+ ['...']} "
f"({int((len(line)-6)/2)} SNPs)"
)
logger.info(f"Processed {processed} individuals")
# output file block
# input file block
[docs]class FakePedMixin():
"""Class which override SmarterMixin when creating a PED file from a
non-plink file format. In this case the FID is already correct and I don't
need to look for dataset aliases"""
[docs] def search_breed(self, fid, *args, **kwargs):
"""Get breed relying on provided FID and species class attribute"""
breed = Breed.objects(code=fid, species=self.species).get()
logger.debug(f"Found breed {breed}")
return breed
[docs] def search_fid(
self,
sample_name: str,
dataset: Dataset,
sample_field: str = "original_id"
) -> str:
"""
Determine FID from smarter SampleSpecies breed
Parameters
----------
sample_name : str
The sample name.
dataset : Dataset
The dataset where the sample comes from.
sample_field : str
The field use to search sample name
Returns
-------
fid : str
The FID used in the generated .ped file
"""
logger.debug(
f"Searching fid for sample using {sample_field}: '{sample_name}, "
"{dataset}'")
# determine fid from sample, if not received as argument
try:
sample = self.SampleSpecies.objects.get(
dataset=dataset,
**{sample_field: sample_name}
)
except DoesNotExist as e:
logger.debug(e)
raise SmarterDBException(
f"Couldn't find sample '{sample_name}' in file "
f"'{dataset.file}' using field'{sample_field}'"
)
fid = sample.breed_code
logger.debug(
f"Found breed '{fid}' for '{sample_name}'")
return fid
[docs]class TextPlinkIO(SmarterMixin):
mapfile = None
pedfile = None
[docs] def __init__(
self,
prefix: str = None,
mapfile: str = None,
pedfile: str = None,
species: str = None,
chip_name: str = None):
# need to be set in order to write a genotype
self.read_genotype_method = self.read_pedfile
if prefix:
self.mapfile = prefix + ".map"
self.pedfile = prefix + ".ped"
elif mapfile or pedfile:
self.mapfile = mapfile
self.pedfile = pedfile
self.species = species
self.chip_name = chip_name
[docs] def read_mapfile(self):
"""Read map data and track informations in memory. Useful to process
data files"""
self.mapdata = []
with open(self.mapfile) as handle:
# affy files has both " " and "\t" in their files
for line in handle:
record = re.split('[ \t]+', line.strip())
# affy data may have comments in files
if not record[0].startswith("#"):
self.mapdata.append(MapRecord(*record))
[docs] def read_pedfile(self, *args, **kwargs):
"""Open pedfile for reading return iterator"""
with open(self.pedfile) as handle:
# affy files has both " " and "\t" in their files
for record in handle:
# affy data may have comments in files
if record.startswith("#"):
logger.info(f"Skipping {record}")
continue
line = re.split('[ \t]+', record.strip())
yield line
[docs] def get_samples(self) -> list:
"""
Get samples from genotype files
Returns
-------
list
The sample list.
"""
sample_list = []
for line in self.read_pedfile():
sample_list.append(line[1])
return sample_list
[docs]class AffyPlinkIO(FakePedMixin, TextPlinkIO):
"""a new class for affymetrix plink files, which are slightly different
from plink text files"""
[docs] def read_pedfile(
self,
breed: str = None,
dataset: Dataset = None,
*args, **kwargs):
"""
Open pedfile for reading return iterator
breed : str, optional
A breed to be assigned to all samples, or use the sample breed
stored in database if not provided. The default is None.
dataset : Dataset, optional
A dataset in which search for sample breed identifier
Yields
------
line : list
A ped line read as a list.
"""
with open(self.pedfile) as handle:
# affy files has both " " and "\t" in their files
for record in handle:
# affy data may have comments in files
if record.startswith("#"):
logger.info(f"Skipping {record}")
continue
line = re.split('[ \t]+', record.strip())
if not breed:
fid = self.search_fid(line[0], dataset)
else:
fid = breed
# affy ped lacks of plink columns. add such value to line
line.insert(0, fid) # FID
line.insert(2, '0') # father
line.insert(3, '0') # mother
line.insert(4, '0') # SEX
line.insert(5, '-9') # phenotype
yield line
[docs] def get_samples(self) -> list:
"""
Get samples from genotype files
Returns
-------
list
The sample list.
"""
sample_list = []
with open(self.pedfile) as handle:
# affy files has both " " and "\t" in their files
for record in handle:
# affy data may have comments in files
if record.startswith("#"):
logger.info(f"Skipping {record}")
continue
line = re.split('[ \t]+', record.strip())
sample_list.append(line[0])
return sample_list
[docs]class BinaryPlinkIO(SmarterMixin):
plink_file = None
_prefix = None
[docs] def __init__(
self,
prefix: str = None,
species: str = None,
chip_name: str = None):
# need to be set in order to write a genotype
self.read_genotype_method = self.read_pedfile
self.prefix = prefix
self.species = species
self.chip_name = chip_name
@property
def prefix(self):
return self._prefix
@prefix.setter
def prefix(self, prefix: str):
if prefix:
self._prefix = prefix
self.plink_file = plinkfile.open(self._prefix)
[docs] def read_mapfile(self):
"""Read map data and track informations in memory. Useful to process
data files"""
self.mapdata = list()
for locus in self.plink_file.get_loci():
record = MapRecord(
chrom=locus.chromosome,
name=locus.name,
position=locus.bp_position,
cm=locus.position
)
self.mapdata.append(record)
[docs] def read_pedfile(self, *args, **kwargs):
"""Open pedfile for reading return iterator"""
sample_list = self.plink_file.get_samples()
locus_list = self.plink_file.get_loci()
snp_arrays = list(self.plink_file)
def format_sex(value):
if value in [1, 2]:
return str(value)
else:
return "0"
def convert(genotype, locus):
# in binary format, allele2 is REF allele1 ALT
if genotype == 0:
return locus.allele1, locus.allele1
elif genotype == 1:
return locus.allele2, locus.allele1
elif genotype == 2:
return locus.allele2, locus.allele2
elif genotype == 3:
return "0", "0"
else:
raise CodingException("Genotype %s Not supported" % genotype)
# determine genotype length
size = 6 + 2*len(self.mapdata)
for sample_idx, sample in enumerate(sample_list):
# this will be the returned row
line = ["0"] * size
# set values. I need to set a breed code in order to get a
# proper ped line
line[0:6] = [
sample.fid,
sample.iid,
sample.father_iid,
sample.mother_iid,
format_sex(sample.sex),
str(int(sample.phenotype))
]
for idx, locus in enumerate(locus_list):
genotype = snp_arrays[idx][sample_idx]
line[6+idx*2], line[6+idx*2+1] = convert(genotype, locus)
yield line
[docs] def get_samples(self) -> list:
"""
Get samples from genotype files
Returns
-------
list
The sample list.
"""
return [sample.iid for sample in self.plink_file.get_samples()]
[docs]class IlluminaReportIO(FakePedMixin, SmarterMixin):
snpfile = None
report = None
[docs] def __init__(
self,
snpfile: str = None,
report: str = None,
species: str = None,
chip_name: str = None):
# need to be set in order to write a genotype
self.read_genotype_method = self.read_reportfile
self.snpfile = snpfile
self.report = report
self.species = species
self.chip_name = chip_name
[docs] def read_snpfile(self):
"""Read snp data and track informations in memory. Useful to process
data files"""
self.mapdata = list(read_snpList(self.snpfile))
# this will be called when calling read_genotype_method()
[docs] def read_reportfile(
self,
breed: str = None,
dataset: Dataset = None,
*args, **kwargs):
"""
Open and read an illumina report file. Returns iterator
Parameters
----------
breed : str, optional
A breed to be assigned to all samples, or use the sample breed
stored in database if not provided. The default is None.
dataset : Dataset, optional
A dataset in which search for sample breed identifier
Raises
------
IlluminaReportException
Raised when SNPs index doesn't match snpfile.
Yields
------
line : list
A ped line read as a list.
"""
# determine genotype length
size = 6 + 2*len(self.mapdata)
# track sample
last_sample = None
# need to have snp indexes
indexes = [record.name for record in self.mapdata]
# this will be the returned row
line = list()
# this is the snp position index
idx = 0
# try to returns something like a ped row
for row in read_illuminaRow(self.report):
if row.sample_id != last_sample:
logger.debug(f"Reading sample {row.sample_id}")
# this is not returned if I'm processing the first sample
if last_sample:
yield line
# initialize an empty array
line = ["0"] * size
if not breed:
fid = self.search_fid(row.sample_id, dataset)
else:
fid = breed
# set values. I need to set a breed code in order to get a
# proper ped line
line[0], line[1], line[5] = fid, row.sample_id, "-9"
# track last sample
last_sample = row.sample_id
# reset index
idx = 0
# check snp name consistency
if indexes[idx] != row.snp_name:
raise IlluminaReportException(
f"snp positions doens't match "
f"{indexes[idx]}<>{row.snp_name}"
)
# update line relying on records
line[6+idx*2], line[6+idx*2+1] = row.allele1_ab, row.allele2_ab
# updating indexes
idx += 1
# after completing rows, I need to return last one
yield line
[docs] def get_samples(self) -> list:
"""
Get samples from genotype files
Returns
-------
list
The sample list.
"""
sample_list = []
last_sample = None
for row in read_illuminaRow(self.report):
if row.sample_id != last_sample:
sample_list.append(row.sample_id)
last_sample = row.sample_id
return sample_list
[docs]class AffyReportIO(FakePedMixin, SmarterMixin):
"""In this type of file there are both genotypes and informations. Moreover
genotypes are *transposed*, traking SNP for all samples in a simple line"""
report = None
peddata = []
delimiter = "\t"
warn_missing_cols = True
header = []
n_samples = None
[docs] def __init__(
self,
report: str = None,
species: str = None,
chip_name: str = None):
# need to be set in order to write a genotype
self.read_genotype_method = self.read_peddata
self.report = report
self.species = species
self.chip_name = chip_name
def __get_header(self):
# sample names are sanitized through read_affymetrixRow: so read the
# first header of the report file to determine the original sample
# names
with text_or_gzip_open(self.report) as handle:
position, skipped = skip_comments(handle)
# go back to header section
handle.seek(position)
# now read csv file
reader = csv.reader(handle, delimiter=self.delimiter)
# get header
self.header = next(reader)
def __warn_missing_column(self, row, columns=[]):
if self.warn_missing_cols:
for col in columns:
if not hasattr(row, col):
logger.warning(
f"Missing '{col}' column in reportfile!")
def __process_probeset(self, row, snp_idx):
# track SNP in mapdata
try:
self.mapdata.append(MapRecord(
row.chr_id, row.probeset_id, 0, row.start))
except AttributeError as exc:
logger.debug(exc)
self.__warn_missing_column(row, ["chr_id", "start"])
# maybe there are missing columns, try to define a MapRecord
# with at least probeset id
self.mapdata.append(MapRecord(
0, row.probeset_id, 0, 0))
# track A/B genotype
try:
self.genotypes.append(f"{row.allele_a}/{row.allele_b}")
except AttributeError as exc:
logger.debug(exc)
self.__warn_missing_column(row, ["allele_a", "allele_b"])
# track a missing genotype
self.genotypes.append("-/-")
# track genotypes in the proper column (skip the first 6 columns)
for i in range(self.n_samples):
call = row[i+1]
# mind to missing values
if call == "NoCall":
logger.debug(
f"Skipping SNP {snp_idx}: "
f"'{self.mapdata[snp_idx].name}' for sample "
f"'{self.header[i+1]}' ({call})")
continue
genotype = list(call)
self.peddata[i][6+snp_idx*2] = genotype[0]
self.peddata[i][6+snp_idx*2+1] = genotype[1]
[docs] def read_reportfile(self, n_samples: int = None, *args, **kwargs):
"""
Read reportfile once and generate mapdata and pedata, with genotype
informations by sample.
Parameters
----------
n_samples : int, optional
Limit to N samples. Useful when there are different number of
samples from reported in file. The default is None (read number
of samples from reportfile).
Returns
-------
None.
"""
self.mapdata = []
self.peddata = []
if n_samples:
logger.warning(f"Limiting import to first {n_samples} samples")
# update samples number with received argument
self.n_samples = n_samples
# I want to track also the AB genotypes, to check them while fetching
# coordinates
self.genotypes = []
# those informations are required to define the pedfile
n_snps = None
size = None
initialized = False
# an index to track SNP accross peddata
snp_idx = 0
# warning user once
self.warn_missing_cols = True
# try to returns something like a ped row and derive map data in the
# same time
for row in read_affymetrixRow(self.report, delimiter=self.delimiter):
# first determine how many SNPs and samples I have
if not initialized:
if not self.n_samples:
self.n_samples = row.n_samples
n_snps = row.n_snps
# read the original header from report file
self.__get_header()
# ok create a ped data object with the required dimensions
size = 6 + 2 * n_snps
self.peddata = [["0"] * size for i in range(self.n_samples)]
# track sample names in row. First column is probeset id
# read them from the original header row
for i in range(self.n_samples):
self.peddata[i][1] = self.header[i+1]
# change flag value
initialized = True
# deal with a single probeset_id
self.__process_probeset(row, snp_idx)
# update SNP column
snp_idx += 1
# no more reporting warnings after first row
self.warn_missing_cols = False
# check for n of snp after processing reportfile
if snp_idx != n_snps:
logger.warning(
f"Got a different number of SNPs {snp_idx}<>{n_snps}.")
if snp_idx < n_snps:
logger.warning("Dropping unused SNPs")
# determining the proper size
size = 6 + 2 * snp_idx
for i, line in enumerate(self.peddata):
self.peddata[i] = line[:size]
else:
raise AffyReportException(
f"Got a different number of SNPs {snp_idx}<>{n_snps}")
[docs] def fetch_coordinates(
self,
src_assembly: AssemblyConf,
dst_assembly: AssemblyConf = None,
search_field: str = "name",
chip_name: str = None,
skip_check: bool = False):
"""Search for variants in smarter database. Check if the provided
A/B information is equal to the database content
Args:
src_assembly (AssemblyConf): the source data assembly version
dst_assembly (AssemblyConf): the destination data assembly version
search_field (str): search variant by field (def. "name")
chip_name (str): limit search to this chip_name
skip_check (bool): skipp coordinate check
"""
# call base method
super().fetch_coordinates(
src_assembly, dst_assembly, search_field, chip_name)
# now check that genotypes read from report are equal to src_assembly
for idx, genotype in enumerate(self.genotypes):
if idx in self.filtered:
# this genotype is discared
continue
if (not skip_check and
genotype != self.src_locations[idx].affymetrix_ab):
raise AffyReportException(
"Genotypes differ from reportfile and src_assembly")
[docs] def read_peddata(
self,
breed: str = None,
dataset: Dataset = None,
sample_field: str = "original_id",
*args, **kwargs):
"""
Yields over genotype record.
Parameters
----------
breed : str, optional
A breed to be assigned to all samples, or use the sample breed
stored in database if not provided. The default is None.
dataset : Dataset, optional
A dataset in which search for sample breed identifier
sample_field : str, optional
Search samples using this field. The default is "original_id".
Yields
------
line : list
A ped line read as a list.
"""
for line in self.peddata:
# instantiate a new object in order to be modified
line = line.copy()
logger.debug(f"Prepare {line[:10]+ ['...']} to add FID")
if not breed:
try:
fid = self.search_fid(
sample_name=line[1],
dataset=dataset,
sample_field=sample_field)
except SmarterDBException as e:
logger.debug(e)
logger.warning(
f"Ignoring sample '{line[1]}': not in database")
continue
else:
fid = breed
# set values. I need to set a breed code in order to get a
# proper ped line
line[0], line[5] = fid, "-9"
yield line
[docs] def get_samples(self) -> list:
"""
Get samples from genotype files
Returns
-------
list
The sample list.
"""
sample_list = []
for line in self.peddata:
sample_list.append(line[1])
return sample_list
def plink_binary_exists(prefix: Path):
"Test if plink binary files exists"
for ext in [".bed", ".bim", ".fam"]:
# HINT Path.with_suffix will replace the extension
test = prefix.parent / (prefix.name + ext)
if not test.exists():
return False
# if I arrive here, all plink binary output files exists
return True