Source code for pyaerocom.tstype

"""
General helper methods for the pyaerocom library.
"""
import logging
import re

import numpy as np

from pyaerocom.exceptions import TemporalResolutionError
from pyaerocom.time_config import (
    PANDAS_FREQ_TO_TS_TYPE,
    TS_TYPE_TO_NUMPY_FREQ,
    TS_TYPE_TO_PANDAS_FREQ,
    TS_TYPE_TO_SI,
    TS_TYPES,
)

logger = logging.getLogger(__name__)


[docs] class TsType: VALID = TS_TYPES VALID_ITER = VALID[:-1] FROM_PANDAS = PANDAS_FREQ_TO_TS_TYPE TO_PANDAS = TS_TYPE_TO_PANDAS_FREQ TO_NUMPY = TS_TYPE_TO_NUMPY_FREQ TO_SI = TS_TYPE_TO_SI TS_MAX_VALS = { "minutely": 360, # up to 6hourly "hourly": 168, # up to weekly "daily": 180, # up to 6 monthly "weekly": 104, # up to ~2yearly "monthly": 120, } # up to 10yearly # "monthly": "days" below is because each month does not have the same number of days # netcdf does time calculation for you given starting day and days past (CF convention) TSTR_TO_CF = {"hourly": "hours", "daily": "days", "monthly": "days"} TOL_SECS_PERCENT = 5 def __init__(self, val): self._mulfac = 1 self._val = None self.val = val @property def mulfac(self): """Multiplication factor of frequency""" return self._mulfac @mulfac.setter def mulfac(self, value): try: value = int(value) except Exception: raise ValueError("mulfac needs to be int or convertible to int") if self.base in self.TS_MAX_VALS and value > self.TS_MAX_VALS[self.base]: raise ValueError( f"Multiplication factor exceeds maximum allowed, which is " f"{self.TS_MAX_VALS[self.base]}" ) self._mulfac = value @property def base(self): """Base string (without multiplication factor, cf :attr:`mulfac`)""" return self._val @property def val(self): """Value of frequency (string type), e.g. 3daily""" if self._mulfac != 1: return f"{self._mulfac}{self._val}" return self._val @val.setter def val(self, val): if val is None: raise TemporalResolutionError( "Invalid input, please provide valid frequency string..." ) mulfac = 1 if val[0].isdigit(): ivalstr = re.findall(r"\d+", val)[0] val = val.split(ivalstr)[-1] mulfac = int(ivalstr) if not val in self.VALID: try: val = self._from_pandas(val) except TemporalResolutionError: raise TemporalResolutionError( f"Invalid input for ts_type {val}. Choose from {self.VALID}" ) if val in self.TS_MAX_VALS and mulfac != 1: if mulfac > self.TS_MAX_VALS[val]: raise TemporalResolutionError( f"Invalid input for ts_type {val}. Multiplication factor " f"{mulfac} exceeds maximum allowed for {val}, which is " f"{self.TS_MAX_VALS[val]}" ) self._val = val self._mulfac = mulfac @property def datetime64_str(self): """Convert ts_type str to datetime64 unit string""" return f"datetime64[{self.to_numpy_freq()}]" @property def timedelta64_str(self): """Convert ts_type str to datetime64 unit string""" return f"timedelta64[{self.to_numpy_freq()}]" @property def cf_base_unit(self): """Convert ts_type str to CF convention time unit""" if not self.base in self.TSTR_TO_CF: raise NotImplementedError(f"Cannot convert {self.base} to CF str") return self.TSTR_TO_CF[self.base] @property def num_secs(self): """Number of seconds in one period Note ---- Be aware that for monthly frequency the number of seconds is not well defined! """ from cf_units import Unit cf = self.to_si() total_secs = 1 / Unit("s").convert(1, cf) return total_secs @property def tol_secs(self): """Tolerance in seconds for current TsType""" total_secs = self.num_secs frac = self.TOL_SECS_PERCENT / 100 return int(np.ceil(frac * total_secs))
[docs] def to_timedelta64(self): """ Convert frequency to timedelta64 object Can be used, e.g. as tolerance when reindexing pandas Series Returns ------- timedelta64 """ return np.timedelta64(1, self.to_numpy_freq())
@property def next_higher(self): """Next lower resolution code""" if self.mulfac > 1: return TsType(self._val) idx = self.VALID_ITER.index(self._val) if idx == 0: raise IndexError(f"No higher resolution available than {self}") return TsType(self.VALID_ITER[idx - 1]) @property def next_lower(self): """Next lower resolution code This will go to the next lower base resolution, that is if current is 3daily, it will return weekly, however, if current exceeds next lower base, it will iterate that base, that is, if current is 8daily, next lower will be 2weekly (and not 9daily). """ idx = self.VALID_ITER.index(self._val) if idx == len(self.VALID_ITER) - 1: tst = TsType(self.base) tst.mulfac = self.mulfac + 1 return tst tst = TsType(self.VALID_ITER[idx + 1]) if self.mulfac == 1 or self.num_secs < tst.num_secs: return tst try: maxmul = self.TS_MAX_VALS[tst.base] except: maxmul = 10 numsecs = self.num_secs for mulfac in range(1, maxmul + 1): tst.mulfac = mulfac if numsecs < tst.num_secs: return tst raise TemporalResolutionError(f"Failed to determine next lower resolution for {self}")
[docs] @staticmethod def valid(val): try: TsType(val) return True except TemporalResolutionError: return False
[docs] def to_numpy_freq(self): if not self._val in self.TO_NUMPY: raise TemporalResolutionError(f"numpy frequency not available for {self._val}") freq = self.TO_NUMPY[self._val] return f"{self.mulfac}{freq}"
[docs] def to_pandas_freq(self): """Convert ts_type to pandas frequency string""" if not self._val in self.TO_PANDAS: raise TemporalResolutionError(f"pandas frequency not available for {self._val}") freq = self.TO_PANDAS[self._val] if self._mulfac == 1: return freq return f"{self._mulfac}{freq}"
[docs] def to_si(self): """Convert to SI conform string (e.g. used for unit conversion)""" base = self.base if not base in self.TO_SI: raise ValueError(f"Cannot convert ts_type={self} to SI unit string...") si = self.TO_SI[base] return si if self.mulfac == 1 else f"({self.mulfac}{si})"
[docs] def get_min_num_obs(self, to_ts_type: "TsType", min_num_obs: dict) -> int: selfstr = self.val if to_ts_type >= self: # should occur rarely if to_ts_type == self: return 0 raise TemporalResolutionError( f"input ts_type {to_ts_type} is lower resolution than current {self}" ) elif str(to_ts_type) in min_num_obs: # output frequency is specified in min_num_obs (note: this may # also be 3daily, etc, i.e., not restricted to base frequencies) mno = min_num_obs[str(to_ts_type)] if selfstr in mno: return int(mno[selfstr]) elif self.mulfac != 1 and self.base in mno: min_num_base = mno[self.base] return int(np.round(min_num_base / self.mulfac)) elif to_ts_type.base in min_num_obs: mno = min_num_obs[to_ts_type.base] if selfstr in mno: val = mno[selfstr] return int(np.round(to_ts_type.mulfac * val)) elif self.mulfac != 1 and self.base in mno: min_num_base = mno[self.base] val = min_num_base / self.mulfac * to_ts_type.mulfac val = int(np.round(val)) return val raise ValueError( f"could not infer min_num_obs value from input dict {min_num_obs} " f"for conversion from {self} to {to_ts_type}" )
[docs] def check_match_total_seconds(self, total_seconds): """ Check if this object matches with input interval length in seconds Parameters ---------- total_seconds : int or float interval length in units of seconds (e.g. 86400 for daily) Returns ------- bool """ try: numsecs = self.num_secs tolsecs = self.tol_secs except ValueError: # native / undefined return False low, high = numsecs - tolsecs, numsecs + tolsecs if np.logical_and(total_seconds >= low, total_seconds <= high): return True return False
@staticmethod def _try_infer_from_total_seconds(base, total_seconds): """ Infer multiplication factor required to match input interval length Not to be used directly, is used in :func:`from_total_seconds`. Parameters ---------- base : str base frequency total_seconds : int or float interval length Raises ------ TemporalResolutionError if TsType cannot be inferred Returns ------- TsType inferred frequency """ if base in TsType.TS_MAX_VALS: maxnum = TsType.TS_MAX_VALS[base] else: maxnum = 2 candidates = [] dts = [] tstype = TsType(base) for mulfac in range(1, maxnum): tstype.mulfac = mulfac if tstype.check_match_total_seconds(total_seconds): dt = total_seconds - tstype.num_secs dts.append(dt) candidates.append(TsType(f"{mulfac}{base}")) if dt == 0 or dt < 0: # current candidate has larger number of seconds than input break if len(candidates) > 0: return candidates[np.argmin(np.abs(dts))] raise TemporalResolutionError( f"Period {total_seconds}s could not be associated with any " f"allowed multiplication factor of base frequency {base}" )
[docs] @staticmethod def from_total_seconds(total_seconds): """ Try to infer TsType based on interval length Parameters ---------- total_seconds : int or float total number of seconds Raises ------ TemporalResolutionError If no TsType can be inferred for input number of seconds Returns ------- TsType """ candidates = [] candidates_diff = [] for tst in TsType.VALID_ITER: tstype = TsType(tst) if tstype.check_match_total_seconds(total_seconds): return tstype diff = total_seconds - tstype.num_secs if diff > 0: candidates.append(tst) candidates_diff.append(diff) if len(candidates) > 0: # sort by the candidate that has the lowest dt candidates_sorted = [c for _, c in sorted(zip(candidates_diff, candidates))] for base_tst in candidates_sorted: try: return TsType._try_infer_from_total_seconds(base_tst, total_seconds) except TemporalResolutionError as e: logger.info(e) continue raise TemporalResolutionError( f"failed to infer ts_type based on input dt={total_seconds} s" )
def _from_pandas(self, val): if not val in self.FROM_PANDAS: raise TemporalResolutionError(f"Invalid input: {val}, need pandas frequency string") return self.FROM_PANDAS[val] def __eq__(self, other): if isinstance(other, str): other = TsType(other) return other.val == self.val def __lt__(self, other): if isinstance(other, str): other = TsType(other) nss, nso = self.num_secs, other.num_secs # inverted comparison, i.e. if other has less seconds if has higher # resolution return nss > nso def __le__(self, other): return True if (self.__eq__(other) or self.__lt__(other)) else False def __gt__(self, other): return not self.__le__(other) def __ge__(self, other): return not self.__lt__(other) def __call__(self): return self.val def __str__(self): return self.val def __repr__(self): return self.val