Source code for pyaerocom.time_resampler

"""
Module containing time resampling functionality
"""
import logging

import pandas as pd
import xarray as xarr

from pyaerocom.exceptions import TemporalResolutionError
from pyaerocom.helpers import isnumeric, resample_time_dataarray, resample_timeseries
from pyaerocom.tstype import TsType

logger = logging.getLogger(__name__)


[docs] class TimeResampler: """Object that can be use to resample timeseries data It supports hierarchical resampling of :class:`xarray.DataArray` objects and :class:`pandas.Series` objects. Hierarchical means, that resampling constraints can be applied for each level, that is, if hourly data is to be resampled to monthly, it may be specified to first required minimum number of hours per day, and minimum days per month, to create the output data. """ AGGRS_UNIT_PRESERVE = ("mean", "median", "std", "max", "min") DEFAULT_HOW = "mean" def __init__(self, input_data=None): self.last_setup = None # the following attribute is updated whenever a resampling operation is # performed and it will check if any of the specified resampling # aggregators invalidates unit preservation (e.g. using how=add for # for accumulating precipitation...). See also attr. AGGRS_UNIT_PRESERVE self._last_units_preserved = None self._input_data = None if input_data is not None: self.input_data = input_data @property def last_units_preserved(self): """Boolean indicating if last resampling operation preserves units""" if self._last_units_preserved is None: raise AttributeError("Please call resample first...") return self._last_units_preserved @property def input_data(self): """Input data object that is to be resampled""" return self._input_data @input_data.setter def input_data(self, val): if not isinstance(val, (pd.Series, xarr.DataArray)): raise ValueError("Invalid input: need Series or DataArray") self._input_data = val @property def fun(self): """Resamplig method (depends on input data type)""" if isinstance(self.input_data, pd.Series): return resample_timeseries return resample_time_dataarray def _get_resample_how(self, fr, to, how): if not isinstance(how, (str, dict)): val = self.DEFAULT_HOW elif isinstance(how, dict): if to.val in how and fr.val in how[to.val]: val = how[to.val][fr.val] else: val = self.DEFAULT_HOW else: val = how return val def _get_idx_entry(self, fr, to, min_num_obs, how): min_num = fr.get_min_num_obs(to, min_num_obs) _how = self._get_resample_how(fr, to, how) return (to.val, min_num, _how) def _gen_idx(self, from_ts_type, to_ts_type, min_num_obs, how): """Generate hierarchical resampling index Return ------ list list (can be considered the iterator) of 3-element tuples for each\ resampling step, containing - frequency to which the current is converted - minimum number of not-NaN values required for that step - aggregator to be used (e.g. mean, median, ...) """ if isnumeric(min_num_obs): if not isinstance(how, str): raise ValueError( f"Error initialising resampling constraints. " f"min_num_obs is numeric ({min_num_obs}) and input how is {how} " f"(would need to be string, e.g. mean)" ) return [(to_ts_type.val, int(min_num_obs), how)] if not isinstance(min_num_obs, dict): raise ValueError( f"Invalid input for min_num_obs, need dictionary or integer, got {min_num_obs}" ) base_freqs = TsType.VALID start_base = base_freqs.index(from_ts_type.base) stop_base = base_freqs.index(to_ts_type.base) last_from = from_ts_type idx = [] # loop from next base freq to end base freq, note that min_num_obs as # well as input freqs may have multiplication factors, which may # require min_num_obs values to be updated accordingly for i in range(start_base + 1, stop_base + 1): to_base = TsType(base_freqs[i]) try: entry = self._get_idx_entry(last_from, to_base, min_num_obs, how) idx.append(entry) last_from = to_base except (TemporalResolutionError, ValueError): continue if len(idx) == 0 or not idx[-1][0] == to_ts_type.val: try: last_entry = self._get_idx_entry(last_from, to_ts_type, min_num_obs, how) except (TemporalResolutionError, ValueError): _how = self._get_resample_how(last_from, to_ts_type, how) last_entry = (to_ts_type.val, 0, _how) idx.append(last_entry) return idx
[docs] def resample( self, to_ts_type, input_data=None, from_ts_type=None, how=None, min_num_obs=None, **kwargs ): """Resample input data Parameters ---------- to_ts_type : str or TsType output resolution input_data : pandas.Series or xarray.DataArray data to be resampled from_ts_type : str or TsType, optional current temporal resolution of data how : str string specifying how the data is to be aggregated, default is mean min_num_obs : dict or int, optinal integer or nested dictionary specifying minimum number of observations required to resample from higher to lower frequency. For instance, if `input_data` is hourly and `to_ts_type` is monthly, you may specify something like:: min_num_obs = {'monthly' : {'daily' : 7}, 'daily' : {'hourly' : 6}} to require at least 6 hours per day and 7 days per month. **kwargs additional input arguments passed to resampling method Returns ------- pandas.Series or xarray.DataArray resampled data object """ if how is None: how = "mean" if how in self.AGGRS_UNIT_PRESERVE: self._last_units_preserved = True else: self._last_units_preserved = False if not isinstance(to_ts_type, TsType): to_ts_type = TsType(to_ts_type) if str(from_ts_type) == "native": from_ts_type = None if from_ts_type is None: if min_num_obs is not None: logger.warning("setting min_num_obs to None since from_ts_type is not specified") min_num_obs = None elif isinstance(from_ts_type, str): from_ts_type = TsType(from_ts_type) if input_data is not None: self.input_data = input_data if self.input_data is None: raise ValueError("Please provide data (Series or DataArray)") self.last_setup = dict(min_num_obs=min_num_obs, how=how) if from_ts_type is None: # native == unknown freq = to_ts_type.to_pandas_freq() data_out = self.fun(self.input_data, freq=freq, how=how, **kwargs) elif to_ts_type > from_ts_type: raise TemporalResolutionError( f"Cannot resample time-series from {from_ts_type} to {to_ts_type}" ) elif to_ts_type == from_ts_type: logger.debug( f"Input time frequency {to_ts_type.val} equals current frequency of data. " f"Resampling will be applied anyways which will introduce NaN values " f"at missing time stamps" ) freq = to_ts_type.to_pandas_freq() self._last_units_preserved = True data_out = self.fun(self.input_data, freq=freq, how="mean", **kwargs) elif min_num_obs is None: freq = to_ts_type.to_pandas_freq() if not isinstance(how, str): raise ValueError( f"Temporal resampling without constraints can only use string type " f"argument how (e.g. how=mean). Got {how}" ) data_out = self.fun(self.input_data, freq=freq, how=how, **kwargs) else: _idx = self._gen_idx(from_ts_type, to_ts_type, min_num_obs, how) data_out = self.input_data aggrs = [] for to_ts_type, mno, rshow in _idx: freq = TsType(to_ts_type).to_pandas_freq() data_out = self.fun(data_out, freq=freq, how=rshow, min_num_obs=mno, **kwargs) aggrs.append(rshow) if all([x in self.AGGRS_UNIT_PRESERVE for x in aggrs]): self._last_units_preserved = True else: self._last_units_preserved = False return data_out