#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Mon Mar 15 14:13:51 2021
@author: Paolo Cozzi <paolo.cozzi@ibba.cnr.it>
"""
import io
import re
import gzip
import logging
import pathlib
import collections
# Get an instance of a logger
logger = logging.getLogger(__name__)
[docs]def sanitize(
word: str,
chars=['.', ",", "-", "/", "#"],
check_mongoengine=True) -> str:
"""Sanitize a word by removing unwanted characters and lowercase it.
Args:
word (str): the word to sanitize
chars (list): a list of characters to remove
check_mongoengine (bool): true to add '_' after a mongoengine reserved
word
Returns:
str: the sanitized word
"""
# remove unwanted characters from word by putting spaces
pattern = "".join(chars)
tmp = re.sub(r'[%s]' % (pattern), ' ', word)
# remove spaces from column name and lowercase all
sanitized = re.sub(r"\s+", "_", tmp).lower()
if check_mongoengine:
if sanitized in ['size', 'type']:
sanitized += "_"
# remove starting sanitized char (can't be used with namedtuple)
if sanitized.startswith("_"):
sanitized = sanitized[1:]
return sanitized
[docs]def camelCase(string: str) -> str:
"""Convert a string into camel case
Args:
string (str): the string to convert
Returns:
str: the camel case version of the string
"""
string = re.sub(r"(_|-|\.)+", " ", string).title().replace(" ", "")
return string[0].lower() + string[1:]
[docs]class TqdmToLogger(io.StringIO):
"""
Output stream for TQDM which will output to logger module instead of
the StdOut.
"""
logger = None
level = None
buf = ''
[docs] def __init__(self, logger, level=None):
super(TqdmToLogger, self).__init__()
self.logger = logger
self.level = level or logging.INFO
[docs] def write(self, buf):
self.buf = buf.strip('\r\n\t ')
[docs] def flush(self):
self.logger.log(self.level, self.buf)
[docs]def get_project_dir() -> pathlib.PosixPath:
"""Return smarter project dir (which are three levels upper from the
module in which this function is stored)
Returns:
pathlib.PosixPath: the smarter project base dir
"""
return pathlib.Path(__file__).parents[2]
[docs]def get_raw_dir() -> pathlib.PosixPath:
"""Return smarter data raw dir
Returns:
pathlib.PosixPath: the smarter data raw directory
"""
return get_project_dir() / "data/raw"
[docs]def get_interim_dir() -> pathlib.PosixPath:
"""Return smarter data temporary dir
Returns:
pathlib.PosixPath: the smarter data temporary dir
"""
return get_project_dir() / "data/interim"
[docs]def get_processed_dir() -> pathlib.PosixPath:
"""Return smarter data processed dir (final processed data)
Returns:
pathlib.PosixPath: the smarter data final processed dir
"""
return get_project_dir() / "data/processed"
[docs]def text_or_gzip_open(path: str, mode: str = None) -> io.TextIOWrapper:
"""Open a file which can be compressed or not. Returns file handle"""
if pathlib.Path(path).suffix == '.gz':
if not mode:
mode = 'rt'
logger.debug(f"Gzip detected for {path}")
return gzip.open(path, mode=mode)
else:
if not mode:
mode = 'r'
return open(path, mode=mode)
[docs]def find_duplicates(header: list) -> list:
"""Find duplicate columns in list. Returns index to remove after the first
occurence
Args:
header (list): a list like the header read from a CSV file
Returns:
list: a list of index (numeric)
"""
to_remove = []
# count columns and find duplicates
counts = collections.Counter(header)
duplicated_cols = [key for key, value in counts.items() if value > 1]
# now iterate and get duplicates indexes
for duplicated in duplicated_cols:
# get all duplicated index
tmp = [i for i, col in enumerate(header) if col == duplicated]
# track only from the 2nd occurrence
to_remove += tmp[1:]
return to_remove
[docs]class UnknownCountry():
"""Deal with unknown country"""
[docs] def __init__(self):
self.name = "Unknown"
self.alpha_2 = "UN"
self.alpha_3 = "UNK"
self.numeric = None