#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Wed Mar 3 10:54:15 2021
@author: Paolo Cozzi <paolo.cozzi@ibba.cnr.it>
"""
import re
import logging
from lxml import etree as ET
from src.features.utils import text_or_gzip_open
# Get an instance of a logger
logger = logging.getLogger(__name__)
[docs]class DBSNP():
config = {
('ExchangeSet', 'Rs', 'Create'): "record_create",
('ExchangeSet', 'Rs', 'Update'): "record_update",
('ExchangeSet', 'Rs', 'Sequence'): "record_exemplar",
('ExchangeSet', 'Rs', 'Sequence', 'Observed'): "record_observed",
('ExchangeSet', 'Rs', 'Ss'): "record_ss",
('ExchangeSet', 'Rs', 'Ss', 'Sequence', 'Seq5'): "ss_seq5",
('ExchangeSet', 'Rs', 'Ss', 'Sequence', 'Observed'): "ss_observed",
('ExchangeSet', 'Rs', 'Ss', 'Sequence', 'Seq3'): "ss_seq3",
('ExchangeSet', 'Rs', 'Assembly'): "record_assembly",
('ExchangeSet', 'Rs', 'Assembly', 'Component'): 'ass_component',
('ExchangeSet', 'Rs', 'Assembly', 'Component', 'MapLoc'): 'ass_maploc',
('ExchangeSet', 'Rs', 'Assembly', 'SnpStat'): 'ass_snpstat',
}
[docs] def __init__(self, path, elem):
logger.debug(f"Creating a new SNP: {elem.attrib}")
self.snp = dict(elem.attrib)
[docs] @classmethod
def clean_tag(cls, tag: str):
return re.sub(r"\{.*\}", "", tag)
[docs] def to_dict(self):
return self.snp
[docs] def recurse_children(self, path: list, elem: ET.Element):
for item in elem.iterchildren():
path.append(self.clean_tag(item.tag))
self.process_element(path, item)
self.recurse_children(path, item)
path.pop()
[docs] def process_element(self, path, elem):
path = tuple(path)
if path in self.config:
func = self.config[path]
func = getattr(self, func)
func(elem)
else:
logger.debug(
f"Ignoring '{path}': {elem.attrib}, {str(elem.text).strip()}")
[docs] def record_create(self, elem):
logger.debug(f"Update snp: {elem.attrib}")
self.snp["create"] = dict(elem.attrib)
[docs] def record_update(self, elem):
logger.debug(f"Update snp: {elem.attrib}")
self.snp["update"] = dict(elem.attrib)
[docs] def record_exemplar(self, elem):
logger.debug(f"Create exemplar: {elem.attrib}")
self.snp["exemplar"] = dict(elem.attrib)
[docs] def record_observed(self, elem):
logger.debug(f"Update exemplar: {elem.text.strip()}")
self.snp["exemplar"]["observed"] = elem.text.strip()
[docs] def record_ss(self, elem):
if 'ss' in self.snp:
logger.debug(f"Update snp. Append ss: {elem.attrib}")
self.snp["ss"].append(dict(elem.attrib))
else:
logger.debug(f"Update snp. Create ss: {elem.attrib}")
self.snp["ss"] = [dict(elem.attrib)]
[docs] def ss_seq5(self, elem):
logger.debug(f"Update seq5 for ss {self.snp['ss'][-1]['ssId']}")
self.snp["ss"][-1]["seq5"] = elem.text.strip()
[docs] def ss_observed(self, elem):
logger.debug(f"Update observed for ss {self.snp['ss'][-1]['ssId']}")
self.snp["ss"][-1]["observed"] = elem.text.strip()
[docs] def ss_seq3(self, elem):
logger.debug(f"Update seq3 for ss {self.snp['ss'][-1]['ssId']}")
self.snp["ss"][-1]["seq3"] = elem.text.strip()
[docs] def record_assembly(self, elem):
logger.debug(f"Adding assembly record: {elem.attrib}")
self.snp["assembly"] = dict(elem.attrib)
[docs] def ass_component(self, elem):
logger.debug(f"Update assembly record: {elem.attrib}")
self.snp["assembly"]["component"] = dict(elem.attrib)
[docs] def ass_maploc(self, elem):
logger.debug(f"Update assembly record: {elem.attrib}")
self.snp["assembly"]["component"]["maploc"] = dict(elem.attrib)
[docs] def ass_snpstat(self, elem):
logger.debug(f"Update assembly record: {elem.attrib}")
self.snp["assembly"]["snpstat"] = dict(elem.attrib)
[docs]def process_rs_elem(elem: ET.Element):
# initialize path
path = ['ExchangeSet']
# create a snp instance. Add first tag to path
path.append(DBSNP.clean_tag(elem.tag))
snp = DBSNP(path, elem)
# iterate over children
snp.recurse_children(path, elem)
return snp.to_dict()
[docs]def read_dbSNP(path: str):
with text_or_gzip_open(path, mode="rb") as handle:
for event, elem in ET.iterparse(handle, events=("end", )):
tag = re.sub(r"\{.*\}", "", elem.tag)
if tag.lower() == "rs":
logger.debug(
f"Found tag: {elem.tag}, "
f"attrib: {elem.attrib}, text: {str(elem.text).strip()}")
yield process_rs_elem(elem)
# release memory after processing elem
elem.clear()
else:
logger.debug(
f"Ignoring tag: {elem.tag}, "
f"attrib: {elem.attrib}, text: {str(elem.text).strip()}")
[docs]def search_chip_snps(snp, handle="AGR_BS"):
for ss in snp['ss']:
if ss['handle'] == handle:
return True
return False