"""
General helper methods for the pyaerocom library.
"""
from __future__ import annotations
import logging
import math as ma
from collections import Counter
from datetime import MINYEAR, date, datetime
import iris
import iris.analysis
import iris.coords
import iris.cube
import numpy as np
import pandas as pd
import xarray as xr
from cf_units import Unit
from pyaerocom import const
from pyaerocom._warnings import ignore_warnings
from pyaerocom.exceptions import (
DataCoverageError,
DataDimensionError,
LongitudeConstraintError,
MetaDataError,
ResamplingError,
TemporalResolutionError,
VariableDefinitionError,
)
from pyaerocom.time_config import (
GREGORIAN_BASE,
PANDAS_RESAMPLE_OFFSETS,
TS_TYPE_DATETIME_CONV,
TS_TYPE_SECS,
TS_TYPE_TO_PANDAS_FREQ,
day_units,
hr_units,
microsec_units,
millisec_units,
min_units,
sec_units,
)
from pyaerocom.tstype import TsType
from pyaerocom.variable_helpers import get_variable
logger = logging.getLogger(__name__)
NUM_KEYS_META = ["longitude", "latitude", "altitude"]
STR_TO_IRIS = dict(
count=iris.analysis.COUNT,
gmean=iris.analysis.GMEAN,
hmean=iris.analysis.HMEAN,
max=iris.analysis.MAX,
mean=iris.analysis.MEAN,
median=iris.analysis.MEDIAN,
sum=iris.analysis.SUM,
nearest=iris.analysis.Nearest,
linear=iris.analysis.Linear,
areaweighted=iris.analysis.AreaWeighted,
)
[docs]
def varlist_aerocom(varlist):
if isinstance(varlist, str):
varlist = [varlist]
elif not isinstance(varlist, list):
raise ValueError("Need string or list")
output = []
for var in varlist:
try:
_var = const.VARS[var].var_name_aerocom
if not _var in output:
output.append(_var)
except VariableDefinitionError as e:
logger.warning(repr(e))
if len(output) == 0:
raise ValueError("None of the input variables appears to be valid")
return output
[docs]
def delete_all_coords_cube(cube, inplace=True):
"""Delete all coordinates of an iris cube
Parameters
----------
cube : iris.cube.Cube
input cube that is supposed to be cleared of coordinates
inplace : bool
if True, then the coordinates are deleted in the input object, else in
a copy of it
Returns
-------
iris.cube.Cube
input cube without coordinates
"""
if not inplace:
cube = cube.copy()
for aux_fac in cube.aux_factories:
cube.remove_aux_factory(aux_fac)
for coord in cube.coords():
cube.remove_coord(coord)
return cube
[docs]
def lists_to_tuple_list(*lists):
"""Convert input lists (of same length) into list of tuples
e.g. input 2 lists of latitude and longitude coords, output one list
with tuple coordinates at each index
"""
return list(zip(*lists))
[docs]
def tuple_list_to_lists(tuple_list):
"""Convert list with tuples (e.g. (lat, lon)) into multiple lists"""
return list(map(list, zip(tuple_list)))
[docs]
def make_dummy_cube_latlon(
lat_res_deg: float = 2,
lon_res_deg: float = 3,
lat_range: list[float] | tuple[float, float] = (-90, 90),
lon_range: list[float] | tuple[float, float] = (-180, 180),
):
"""Make an empty Cube with given latitude and longitude resolution
Dimensions will be lat, lon
Parameters
----------
lat_res_deg : float or int
latitude resolution of grid
lon_res_deg : float or int
longitude resolution of grid
lat_range : tuple or list
2-element list containing latitude range. If `None`, then `(-90, 90)`
is used.
lon_range : tuple or list
2-element list containing longitude range. If `None`, then `(-180, 180)`
is used.
Returns
-------
Cube
dummy cube in input resolution
"""
# Accept lists for lat_range and lon_range, but make sure correct length
assert len(lat_range) == len(lon_range) == 2
lons = np.arange(
lon_range[0] + (lon_res_deg / 2), lon_range[1] + (lon_res_deg / 2), lon_res_deg
)
lats = np.arange(
lat_range[0] + (lat_res_deg / 2), lat_range[1] + (lat_res_deg / 2), lat_res_deg
)
lon_circ = check_coord_circular(lons, modulus=360)
latdim = iris.coords.DimCoord(
lats, var_name="lat", standard_name="latitude", circular=False, units=Unit("degrees")
)
londim = iris.coords.DimCoord(
lons, var_name="lon", standard_name="longitude", circular=lon_circ, units=Unit("degrees")
)
latdim.guess_bounds()
londim.guess_bounds()
dummy = iris.cube.Cube(np.ones((len(lats), len(lons))))
dummy.add_dim_coord(latdim, 0)
dummy.add_dim_coord(londim, 1)
dummy.var_name = "dummy_grid"
return dummy
[docs]
def check_coord_circular(coord_vals, modulus, rtol=1e-5):
"""Check circularity of coordinate
Parameters
----------
coord_vals : list or ndarray
values of coordinate to be tested
modulus : float or int
modulus of coordinate (e.g. 360 for longitude)
rtol : float
relative tolerance
Returns
-------
bool
True if circularity is given, else False
Raises
------
ValueError
if circularity is given and results in overlap (right end of input
array is mapped to a value larger than the first one at the left end
of the array)
"""
from pyaerocom import const
if len(coord_vals) < 2:
logger.warning(
"Checking coordinate values for circularity "
"failed since coord array has less than 2 values"
)
return False
step = coord_vals[-1] - coord_vals[-2]
tol = step * rtol
diff = coord_vals[-1] - coord_vals[0] + step
if diff - tol > modulus:
raise ValueError(
"Circularity is given but results in overlap (right "
"end of input array is mapped to a value larger than "
"the first one at the left end of the array)."
)
if abs(modulus - diff) > tol:
return False
return True
[docs]
def numpy_to_cube(data, dims=None, var_name=None, units=None, **attrs):
"""Make a cube from a numpy array
Parameters
----------
data : ndarray
input data
dims : list, optional
list of :class:`iris.coord.DimCoord` instances in order of dimensions
of input data array (length of list and shapes of each of the
coordinates must match dimensions of input data)
var_name : str, optional
name of variable
units : str
unit of variable
**attrs
additional attributes to be added to metadata
Returns
-------
iris.cube.Cube
Raises
------
DataDimensionError
if input `dims` is specified and results in conflict
"""
if not isinstance(data, np.ndarray):
raise ValueError("Invalid input, need numpy array")
cube = iris.cube.Cube(data)
cube.var_name = var_name
cube.units = units
sh = data.shape
if dims is not None:
if not len(dims) == data.ndim:
raise DataDimensionError("Input number of dimensios must match array dimension number")
for i, dim in enumerate(dims):
if not isinstance(dim, iris.coords.DimCoord):
raise ValueError("Need iris.DimCoord...")
elif not len(dim.points) == sh[i]:
raise DataDimensionError(
f"Length mismatch between {dim.var_name} dim ({len(dim.points)}) "
f"and array dimension {i} ({sh[i]})"
)
cube.add_dim_coord(dim, i)
cube.attributes.update(attrs)
return cube
[docs]
def copy_coords_cube(to_cube, from_cube, inplace=True):
"""Copy all coordinates from one cube to another
Requires the underlying data to be the same shape.
Warning
--------
This operation will delete all existing coordinates and auxiliary
coordinates and will then copy the ones from the input data object.
No checks of any kind will be performed
Parameters
----------
to_cube
other : GriddedData or Cube
other data object (needs to be same shape as this object)
Returns
-------
GriddedData
data object containing coordinates from other object
"""
if not all([isinstance(x, iris.cube.Cube) for x in [to_cube, from_cube]]):
raise ValueError("Invalid input. Need instances of iris.cube.Cube class...")
if not from_cube.shape == to_cube.shape:
raise DataDimensionError("Cannot copy coordinates: shape mismatch")
to_cube = delete_all_coords_cube(to_cube, inplace)
for i, dim_coord in enumerate(from_cube.dim_coords):
to_cube.add_dim_coord(dim_coord, i)
for aux_coord, dim in from_cube._aux_coords_and_dims:
to_cube.add_aux_coord(aux_coord, dim)
for aux_fac in from_cube.aux_factories:
to_cube.add_aux_factory(aux_fac)
return to_cube
[docs]
def infer_time_resolution(time_stamps, dt_tol_percent=5, minfrac_most_common=0.8):
"""Infer time resolution based on input time-stamps
Calculates time difference *dt* between consecutive timestamps provided via
input array or list. Then it counts the most common *dt* (e.g. 86400 s for
daily). Before inferring the frequency it then checks all other *dts*
occurring in the input array to see if they are within a certain interval
around the most common one (e.g. +/- 5% as default, via arg
`dt_tol_percent`), that is, 86390 would be included if most common dt is
86400 s but not 80000s. Then it checks if the number of *dts* that
are within that tolerance level around the most common *dt* exceed a
certain fraction (arg `minfrac_most_common`) of the total number of *dts*
that occur in the input array (default is 80%). If that is the case, the
most common frequency is attempted to be derived using
:func:`TsType.from_total_seconds` based on the most common *dt* (in this
example that would be *daily*).
Parameters
----------
time_stamps : pandas.DatetimeIndex, or similar
list of time stamps
dt_tol_percent : int
tolerance in percent of accepted range of time diffs with respect to
most common time difference.
minfrac_most_common : float
minimum required fraction of time diffs that have to be equal to, or
within tolerance range, the most common time difference.
Raises
------
TemporalResolutionError
if frequency cannot be derived.
Returns
-------
str
inferred frequency
"""
from pyaerocom import TsType
if not isinstance(time_stamps, pd.DatetimeIndex):
time_stamps = pd.DatetimeIndex(time_stamps)
vals = time_stamps.values
dts = (vals[1:] - vals[:-1]).astype("timedelta64[s]").astype(int)
if np.min(dts) < 0:
raise TemporalResolutionError("Nasa Ames file contains neg. meas periods...")
counts = Counter(dts).most_common()
most_common_dt, most_common_num = counts[0]
num_within_tol = most_common_num
lower = most_common_dt * (100 - dt_tol_percent) / 100
upper = most_common_dt * (100 + dt_tol_percent) / 100
for dt, num in counts[1:]:
if lower <= dt <= upper:
num_within_tol += num
frac_ok = num_within_tol / len(dts)
if not frac_ok > minfrac_most_common:
raise TemporalResolutionError("Failed to infer ts_type")
tst = TsType.from_total_seconds(most_common_dt)
return str(tst)
[docs]
def seconds_in_periods(timestamps, ts_type):
"""
Calculates the number of seconds for each period in timestamps.
Parameters
----------
timestamps : numpy.datetime64 or numpy.ndarray
Either a single datetime or an array of datetimes.
ts_type : str
Frequency of timestamps.
Returns
-------
np.array :
Array with same length as timestamps containing number of seconds for
each period.
"""
ts_type = TsType(ts_type)
if isinstance(timestamps, np.datetime64):
timestamps = np.array([timestamps])
if isinstance(timestamps, np.ndarray):
timestamps = [to_pandas_timestamp(timestamp) for timestamp in timestamps]
# From here on timestamps should be a numpy array containing pandas Timestamps
seconds_in_day = 86400
if ts_type >= TsType("monthly"):
if ts_type == TsType("monthly"):
days_in_months = np.array([timestamp.days_in_month for timestamp in timestamps])
seconds = days_in_months * seconds_in_day
return seconds
if ts_type == TsType("daily"):
return seconds_in_day * np.ones_like(timestamps)
raise NotImplementedError("Only yearly, monthly and daily frequencies implemented.")
elif ts_type == TsType("yearly"):
days_in_year = []
for ts in timestamps:
if ts.year % 4 == 0:
days_in_year.append(366) # Leap year
else:
days_in_year.append(365)
seconds = np.array(days_in_year) * seconds_in_day
return seconds
raise TemporalResolutionError(f"Unknown TsType: {ts_type}")
[docs]
def get_tot_number_of_seconds(ts_type, dtime=None):
"""Get total no. of seconds for a given frequency
ToDo
----
This method needs revision and can be solved simpler probably
Parameters
----------
ts_type : str or TsType
frequency for which number of seconds is supposed to be retrieved
dtime : TYPE, optional
DESCRIPTION. The default is None.
Raises
------
AttributeError
DESCRIPTION.
Returns
-------
TYPE
DESCRIPTION.
"""
ts_tpe = TsType(ts_type)
if ts_tpe >= TsType("monthly"):
if dtime is None:
raise AttributeError(
"For frequncies larger than or eq. monthly you"
+ " need to provide dtime in order to compute the number of second."
)
if not ts_type == "monthly":
raise NotImplementedError("Can only handle monthly so far...")
# find seconds from dtime
# TODO generalize this
days_in_month = dtime.dt.daysinmonth
return days_in_month * 24 * 60 * 60
else:
return TS_TYPE_SECS[ts_type]
[docs]
def get_standard_name(var_name):
"""Converts AeroCom variable name to CF standard name
Also handles alias names for variables, etc. or strings corresponding to
older conventions (e.g. names containing 3D).
Parameters
----------
var_name : str
AeroCom variable name
Returns
-------
str
corresponding standard name
"""
from pyaerocom import const
return const.VARS[var_name].standard_name
[docs]
def get_standard_unit(var_name):
"""Gets standard unit of AeroCom variable
Also handles alias names for variables, etc. or strings corresponding to
older conventions (e.g. names containing 3D).
Parameters
----------
var_name : str
AeroCom variable name
Returns
-------
str
corresponding standard unit
"""
from pyaerocom import const
return const.VARS[var_name].units
[docs]
def get_lowest_resolution(ts_type, *ts_types):
"""Get the lowest resolution from several ts_type codes
Parameters
----------
ts_type : str
first ts_type
*ts_types
one or more additional ts_type codes
Returns
-------
str
the ts_type that corresponds to the lowest resolution
Raises
------
ValueError
if one of the input ts_type codes is not supported
"""
# all_ts_types = const.GRID_IO.TS_TYPES
from pyaerocom.tstype import TsType
lowest = TsType(ts_type)
for freq in ts_types:
_temp = TsType(freq)
if _temp < lowest:
lowest = _temp
return lowest.val
[docs]
def sort_ts_types(ts_types):
"""Sort a list of ts_types
Parameters
----------
ts_types : list
list of strings (or instance of :class:`TsType`) to be sorted
Returns
-------
list
list of strings with sorted frequencies
Raises
------
TemporalResolutionError
if one of the input ts_types is not supported
"""
freqs_sorted = []
for ts_type in ts_types:
if isinstance(ts_type, str):
ts_type = TsType(ts_type)
if len(freqs_sorted) == 0:
freqs_sorted.append(ts_type)
else:
insert = False
for i, tt in enumerate(freqs_sorted):
if tt < ts_type:
insert = True
break
if insert:
freqs_sorted.insert(i, ts_type)
else:
freqs_sorted.append(ts_type)
return [str(tt) for tt in freqs_sorted]
[docs]
def get_highest_resolution(ts_type, *ts_types):
"""Get the highest resolution from several ts_type codes
Parameters
----------
ts_type : str
first ts_type
*ts_types
one or more additional ts_type codes
Returns
-------
str
the ts_type that corresponds to the highest resolution
Raises
------
ValueError
if one of the input ts_type codes is not supported
"""
lst = [ts_type]
lst.extend(ts_types)
return sort_ts_types(lst)[0]
[docs]
def isnumeric(val):
"""Check if input value is numeric
Parameters
----------
val
input value to be checked
Returns
-------
bool
True, if input value corresponds to a range, else False.
"""
from numbers import Number
if isinstance(val, Number):
return True
return False
[docs]
def isrange(val):
"""Check if input value corresponds to a range
Checks if input is list, or array or tuple with 2 entries, or alternatively
a slice that has defined start and stop and has set step to None.
Note
----
No check is performed, whether first entry is smaller than second entry if
all requirements for a range are fulfilled.
Parameters
----------
val
input value to be checked
Returns
-------
bool
True, if input value corresponds to a range, else False.
"""
if isinstance(val, (list, np.ndarray, tuple)):
if len(val) == 2:
return True
return False
elif isinstance(val, slice):
if val.step is not None or val.start is None or val.stop is None:
return False
return True
return False
def _check_stats_merge(statlist, var_name, pref_attr, fill_missing_nan):
has_errs = False
is_3d = []
stats = []
for stat in statlist:
if not var_name in stat:
raise DataCoverageError(f"All input stations must contain {var_name} data")
elif pref_attr is not None and not pref_attr in stat:
raise MetaDataError(
f"Cannot sort station relevance by attribute {pref_attr}. "
f"At least one of the input stations does not contain this attribute"
)
elif not isinstance(stat[var_name], pd.Series):
stat._to_ts_helper(var_name)
# this will raise MetaDataError or TemporalResolutionError if there is
# an unresolvable issue with sampling frequency
stat.get_var_ts_type(var_name)
is_3d.append(stat.check_if_3d(var_name))
if var_name in stat.data_err:
has_errs = True
stats.append(stat)
if np.any(is_3d):
if not np.all(is_3d):
raise ValueError(
"Merge error: some of the input stations contain "
"altitude info (suggesting profile data), others "
"not."
)
is_3d = True
else:
is_3d = False
return (stats, is_3d, has_errs)
def _merge_stats_2d(
stats, var_name, sort_by_largest, pref_attr, add_meta_keys, resample_how, min_num_obs
):
if pref_attr is not None:
stats.sort(key=lambda s: s[pref_attr])
else:
stats.sort(key=lambda s: len(s[var_name].dropna()))
if sort_by_largest:
stats = stats[::-1]
# remove first station from the list
merged = stats.pop(0)
for i, stat in enumerate(stats):
merged.merge_other(
stat,
var_name,
add_meta_keys=add_meta_keys,
resample_how=resample_how,
min_num_obs=min_num_obs,
)
return merged
def _merge_stats_3d(stats, var_name, add_meta_keys, has_errs):
dtime = []
for stat in stats:
_t = stat[var_name].index.unique()
if not len(_t) == 1:
raise NotImplementedError(
"So far, merging of profile data "
"requires that profile values are "
"sampled at the same time"
)
dtime.append(_t[0])
tidx = pd.DatetimeIndex(dtime)
# AeroCom default vertical grid
vert_grid = const.make_default_vert_grid()
_data = np.ones((len(vert_grid), len(tidx))) * np.nan
if has_errs:
_data_err = np.ones((len(vert_grid), len(tidx))) * np.nan
for i, stat in enumerate(stats):
if i == 0:
merged = stat
else:
merged.merge_meta_same_station(stat, add_meta_keys=add_meta_keys)
_data[:, i] = np.interp(vert_grid, stat["altitude"], stat[var_name].values)
if has_errs:
try:
_data_err[:, i] = np.interp(vert_grid, stat["altitude"], stat.data_err[var_name])
except Exception:
pass
_coords = {"time": tidx, "altitude": vert_grid}
d = xr.DataArray(data=_data, coords=_coords, dims=["altitude", "time"], name=var_name)
d = d.sortby("time")
merged[var_name] = d
merged.dtime = d.time
merged.altitude = d.altitude
return merged
[docs]
def merge_station_data(
stats,
var_name,
pref_attr=None,
sort_by_largest=True,
fill_missing_nan=True,
add_meta_keys=None,
resample_how=None,
min_num_obs=None,
):
"""Merge multiple StationData objects (from one station) into one instance
Note
----
all input :class:`StationData` objects need to have same attributes
``station_name``, ``latitude``, ``longitude`` and ``altitude``
Parameters
----------
stats : list
list containing :class:`StationData` objects (note: all of these
objects must contain variable data for the specified input variable)
var_name : str
data variable name that is to be merged
pref_attr
optional argument that may be used to specify a metadata attribute
that is available in all input :class:`StationData` objects and that
is used to order the input stations by relevance. The associated values
of this attribute need to be sortable (e.g. revision_date). This is
only relevant in case overlaps occur. If unspecified the relevance of
the stations is sorted based on the length of the associated data
arrays.
sort_by_largest : bool
if True, the result from the sorting is inverted. E.g. if
``pref_attr`` is unspecified, then the stations will be sorted based on
the length of the data vectors, starting with the shortest, ending with
the longest. This sorting result will then be inverted, if
``sort_by_largest=True``, so that the longest time series get's highest
importance. If, e.g. ``pref_attr='revision_date'``, then the stations
are sorted by the associated revision date value, starting with the
earliest, ending with the latest (which will also be inverted if
this argument is set to True)
fill_missing_nan : bool
if True, the resulting time series is filled with NaNs. NOTE: this
requires that information about the temporal resolution (ts_type) of
the data is available in each of the StationData objects.
add_meta_keys : str or list, optional
additional non-standard metadata keys that are supposed to be
considered for merging.
resample_how : str or dict, optional
in case input stations come in different frequencies they are merged
to the lowest common freq. This parameter can be used to control, which
aggregator(s) are to be used (e.g. mean, median).
min_num_obs : str or dict, optional
in case input stations come in different frequencies they are merged
to the lowest common freq. This parameter can be used to control minimum
number of observation constraints for the downsampling.
Returns
-------
StationData
merged data
"""
if isinstance(var_name, list):
if len(var_name) > 1:
raise NotImplementedError("Merging of multivar data not yet possible")
var_name = var_name[0]
stats, is_3d, has_errs = _check_stats_merge(stats, var_name, pref_attr, fill_missing_nan)
# ToDo: data_err is not handled at the moment for 2D data, needs r
# revision and should be done in StationData.merge, also 3D vs 2D
# should be handled by StationData directly...
if is_3d:
merged = _merge_stats_3d(stats, var_name, add_meta_keys, has_errs)
else:
merged = _merge_stats_2d(
stats, var_name, sort_by_largest, pref_attr, add_meta_keys, resample_how, min_num_obs
)
if fill_missing_nan:
try:
merged.insert_nans_timeseries(var_name)
except Exception as e:
logger.warning(
f"Could not insert NaNs into timeseries of variable {var_name} "
f"after merging stations. Reason: {repr(e)}"
)
merged["stat_merge_pref_attr"] = pref_attr
return merged
def _get_pandas_freq_and_loffset(freq):
"""Helper to convert resampling info"""
if freq in TS_TYPE_TO_PANDAS_FREQ:
freq = TS_TYPE_TO_PANDAS_FREQ[freq]
loffset = None
if freq in PANDAS_RESAMPLE_OFFSETS:
loffset = PANDAS_RESAMPLE_OFFSETS[freq]
return (freq, loffset)
[docs]
def make_datetime_index(start, stop, freq):
"""Make pandas.DatetimeIndex for input specs
Note
----
If input frequency is specified in `PANDAS_RESAMPLE_OFFSETS`, an offset
will be added (e.g. 15 days for monthly data).
Parameters
----------
start
start time. Preferably as :class:`pandas.Timestamp`, else it will be
attempted to be converted.
stop
stop time. Preferably as :class:`pandas.Timestamp`, else it will be
attempted to be converted.
freq
frequency of datetime index.
Returns
-------
DatetimeIndex
"""
if not isinstance(start, pd.Timestamp):
start = to_pandas_timestamp(start)
if not isinstance(stop, pd.Timestamp):
stop = to_pandas_timestamp(stop)
freq, loffset = _get_pandas_freq_and_loffset(freq)
idx = pd.date_range(start=start, end=stop, freq=freq)
if loffset is not None:
idx = idx + pd.Timedelta(loffset)
return idx
[docs]
def make_datetimeindex_from_year(freq, year):
"""Create pandas datetime index
Parameters
----------
freq : str
pandas frequency str
year : int
year
Returns
-------
pandas.DatetimeIndex
index object
"""
start, stop = start_stop_from_year(year)
return make_datetime_index(start, stop, freq)
[docs]
def calc_climatology(s, start, stop, min_count=None, set_year=None, resample_how="mean"):
"""Compute climatological timeseries from pandas.Series
Parameters
----------
s : pandas.Series
time series data
start : numpy.datetime64 or similar
start time of data used to compute climatology
stop : numpy.datetime64 or similar
start time of data used to compute climatology
mincount_month : int, optional
minimum number of observations required per aggregated month in
climatological interval. Months not meeting this requirement will be
set to NaN.
set_year : int, optional
if specified, the output data will be assigned the input year. Else
the middle year of the climatological interval is used.
resample_how : str
string specifying how the climatological timeseries is to be
aggregated
Returns
-------
DataFrame
dataframe containing climatological timeseries as
well as columns std and count
"""
if not isinstance(start, pd.Timestamp):
start, stop = start_stop(start, stop)
sc = s[start:stop]
sc.dropna(inplace=True)
if len(sc) == 0:
raise ValueError(
"Cropping input time series in climatological interval resulted in empty series"
)
if set_year is None:
set_year = int(start.year + (stop.year - start.year) / 2) + 1
df = pd.DataFrame(sc)
df["month"] = df.index.month
clim = df.groupby("month").agg([resample_how, "std", "count"])
# clim.columns = clim.columns.droplevel(0)
clim.columns = ["data", "std", "numobs"]
idx = [np.datetime64(f"{set_year}-{x:02d}-15") for x in clim.index.values]
clim.set_index(pd.DatetimeIndex(idx), inplace=True)
if min_count is not None:
mask = clim["numobs"] < min_count
clim.loc[mask, "data"] = np.nan
return clim
[docs]
def resample_timeseries(ts, freq, how=None, min_num_obs=None):
"""Resample a timeseries (pandas.Series)
Parameters
----------
ts : Series
time series instance
freq : str
new temporal resolution (can be pandas freq. string, or pyaerocom
ts_type)
how
aggregator to be used, accepts everything that is accepted by
:func:`pandas.core.resample.Resampler.agg` and in addition,
percentiles may be provided as str using e.g. 75percentile as input for
the 75% percentile.
min_num_obs : int, optional
minimum number of observations required per period (when downsampling).
E.g. if input is in daily resolution and freq is monthly and
min_num_obs is 10, then all months that have less than 10 days of data
are set to nan.
Returns
-------
Series
resampled time series object
"""
if how is None:
how = "mean"
elif "percentile" in how:
p = int(how.split("percentile")[0])
how = lambda x: np.nanpercentile(x, p)
freq, loffset = _get_pandas_freq_and_loffset(freq)
resampler = ts.resample(freq)
data = resampler.agg(how)
if min_num_obs is not None:
numobs = resampler.count()
# df = resampler.agg([how, 'count'])
invalid = numobs < min_num_obs
if np.any(invalid):
data.values[invalid] = np.nan
if loffset is not None:
data.index = data.index + pd.Timedelta(loffset)
return data
[docs]
def resample_time_dataarray(arr, freq, how=None, min_num_obs=None):
"""Resample the time dimension of a :class:`xarray.DataArray`
Note
----
The dataarray must have a dimension coordinate named "time"
Parameters
----------
arr : DataArray
data array to be resampled
freq : str
new temporal resolution (can be pandas freq. string, or pyaerocom
ts_type)
how : str
how to aggregate (e.g. mean, median)
min_num_obs : int, optional
minimum number of observations required per period (when downsampling).
E.g. if input is in daily resolution and freq is monthly and
min_num_obs is 10, then all months that have less than 10 days of data
are set to nan.
Returns
-------
DataArray
resampled data array object
Raises
------
IOError
if data input `arr` is not an instance of :class:`DataArray`
DataDimensionError
if time dimension is not available in dataset
"""
if how is None:
how = "mean"
elif "percentile" in how:
raise NotImplementedError(
"percentile based resampling is not yet available for xarray based data"
)
if not isinstance(arr, xr.DataArray):
raise OSError(f"Invalid input for arr: need DataArray, got {type(arr)}")
elif not "time" in arr.dims:
raise DataDimensionError("Cannot resample time: input DataArray has no time dimension")
from pyaerocom.tstype import TsType
to = TsType(freq)
pd_freq = to.to_pandas_freq()
invalid = None
if min_num_obs is not None:
invalid = arr.resample(time=pd_freq).count(dim="time") < min_num_obs
freq, loffset = _get_pandas_freq_and_loffset(freq)
resampler = arr.resample(time=pd_freq, loffset=loffset)
try:
aggfun = getattr(resampler, how)
except AttributeError:
raise ResamplingError(f"Invalid aggregator {how} for temporal resampling of DataArray...")
arr = aggfun(dim="time")
if invalid is not None:
arr.data[invalid.data] = np.nan
return arr
[docs]
def str_to_iris(key, **kwargs):
"""Mapping function that converts strings into iris analysis objects
Please see dictionary ``STR_TO_IRIS`` in this module for valid definitions
Parameters
----------
key : str
key of :attr:`STR_TO_IRIS` dictionary
Returns
-------
obj
corresponding iris analysis object (e.g. Aggregator, method)
"""
key = key.lower()
if not key in STR_TO_IRIS:
raise KeyError(
"No iris.analysis object available for key %s, please "
"choose from %s" % (key, STR_TO_IRIS.keys())
)
val = STR_TO_IRIS[key]
if callable(val):
return val(**kwargs)
return val
[docs]
@ignore_warnings(UserWarning, r"Parsing .* in DD/MM/YYYY format")
def to_pandas_timestamp(value):
"""Convert input to instance of :class:`pandas.Timestamp`
Parameters
----------
value
input value that is supposed to be converted to time stamp
Returns
--------
pandas.Timestamp
"""
if isinstance(value, np.str_):
value = str(value)
if isinstance(value, pd.Timestamp):
return value
elif isinstance(value, (str, np.datetime64, datetime, date)):
return pd.Timestamp(value)
else:
try:
numval = int(value)
if not 0 <= numval <= 10000:
raise ValueError("Could not infer valid year from numerical time input")
return pd.Timestamp(str(numval))
except Exception as e:
raise ValueError(f"Failed to convert {value} to Timestamp: {repr(e)}")
[docs]
def to_datetime64(value):
"""Convert input value to numpy.datetime64
Parameters
----------
value
input value that is supposed to be converted, needs to be either str,
datetime.datetime, pandas.Timestamp or an integer specifying the
desired year.
Returns
-------
datetime64
input timestamp converted to datetime64
"""
if isinstance(value, np.datetime64):
return value
else:
try:
return to_pandas_timestamp(value).to_datetime64()
except Exception as e:
raise ValueError(f"Failed to convert {value} to datetime64 objectError: {repr(e)}")
[docs]
def is_year(val):
"""Check if input is / may be year
Parameters
----------
val
input that is supposed to be checked
Returns
-------
bool
True if input is a number between -2000 and 10000, else False
"""
try:
if -2000 < int(val) < 10000:
return True
raise Exception
except Exception:
return False
def _check_climatology_timestamp(t):
if isnumeric(t) and t == 9999:
return pd.Timestamp("1-1-2222")
elif isinstance(t, np.datetime64):
tstr = str(t)
if tstr.startswith("9999"):
return pd.Timestamp(tstr.replace("9999", "2222"))
elif isinstance(t, str) and "9999" in t:
return pd.Timestamp(t.replace("9999", "2222"))
elif isinstance(t, datetime) and t.year == 9999:
return pd.Timestamp(t.replace(year=2222))
raise ValueError(f"Failed to identify {t} as climatological timestamp...")
[docs]
def start_stop(start, stop=None, stop_sub_sec=True):
"""Create pandas timestamps from input start / stop values
Note
----
If input suggests climatological data in AeroCom format (i.e. year=9999)
then the year is converted to 2222 instead since pandas cannot handle
year 9999.
Parameters
-----------
start
start time (any format that can be converted to pandas.Timestamp)
stop
stop time (any format that can be converted to pandas.Timestamp)
stop_sub_sec : bool
if True and if input for stop is a year (e.g. 2015) then one second
is subtracted from stop timestamp (e.g. if input stop is
2015 and denotes "until 2015", then for the returned stop timestamp
one second will be subtracted, so it would be 31.12.2014 23:59:59).
Returns
-------
pandas.Timestamp
start timestamp
pandas.Timestamp
stop timestamp
Raises
------
ValueError
if input cannot be converted to pandas timestamps
"""
isclim = False
try:
start = to_pandas_timestamp(start)
except pd.errors.OutOfBoundsDatetime: # probably climatology
start = _check_climatology_timestamp(start)
isclim = True
if stop is None:
if isclim:
yr = 2222
else:
yr = start.year
stop = to_pandas_timestamp(f"{yr}-12-31 23:59:59")
else:
try:
subt_sec = False
if isnumeric(stop):
subt_sec = True
stop = to_pandas_timestamp(stop)
if subt_sec and stop_sub_sec:
stop = stop - pd.Timedelta(1, "s")
except pd.errors.OutOfBoundsDatetime:
stop = _check_climatology_timestamp(stop)
return (start, stop)
[docs]
def datetime2str(time, ts_type=None):
from pyaerocom import const
conv = TS_TYPE_DATETIME_CONV[ts_type]
if is_year(time):
return str(time)
try:
time = to_pandas_timestamp(time).strftime(conv)
except pd.errors.OutOfBoundsDatetime:
logger.warning(f"Failed to convert time {time} to string")
return time
[docs]
def start_stop_str(start, stop=None, ts_type=None):
conv = TS_TYPE_DATETIME_CONV[ts_type]
if is_year(start) and stop is None:
return str(start)
start, stop = start_stop(start, stop)
start_str = start.strftime(conv)
stop_str = stop.strftime(conv)
if stop_str != start_str:
return f"{start_str}-{stop_str}"
return start_str
[docs]
def start_stop_from_year(year):
"""Create start / stop timestamp from year
Parameters
----------
year : int
the year for which start / stop is to be instantiated
Returns
-------
numpy.datetime64
start datetime
numpy.datetime64
stop datetime
"""
start = np.datetime64(f"{year}-01-01T00:00:00")
stop = np.datetime64(f"{year}-12-31T23:59:59")
return (start, stop)
[docs]
def to_datestring_YYYYMMDD(value):
"""Convert input time to string with format YYYYMMDD
Parameters
----------
value
input time, may be string, datetime, numpy.datetime64 or
pandas.Timestamp
Returns
-------
str
input formatted to string YYYYMMDD
Raises
------
ValueError
if input is not supported
"""
if isinstance(value, str) and len(value) == 8:
logger.info(
"Input is already string containing 8 chars. Assuming it "
"is in the right format and returning unchanged"
)
return value
try:
return to_pandas_timestamp(value).strftime("%Y%m%d")
except Exception as e:
raise ValueError(
f"Invalid input, need str, datetime, numpy.datetime64 or pandas.Timestamp. "
f"Error: {repr(e)}"
)
[docs]
def cftime_to_datetime64(times, cfunit=None, calendar=None):
"""Convert numerical timestamps with epoch to numpy datetime64
This method was designed to enhance the performance of datetime conversions
and is based on the corresponding information provided in the cftime
package (`see here <https://github.com/Unidata/cftime/blob/master/cftime/
_cftime.pyx>`__). Particularly, this object does, what the :func:`num2date`
therein does, but faster, in case the time stamps are not defined on a non
standard calendar.
Parameters
----------
times : :obj:`list` or :obj:`ndarray` or :obj:`iris.coords.DimCoord`
array containing numerical time stamps (relative to basedate of
``cfunit``). Can also be a single number.
cfunit : :obj:`str` or :obj:`Unit`, optional
CF unit string (e.g. day since 2018-01-01 00:00:00.00000000 UTC) or
unit. Required if `times` is not an instance of
:class:`iris.coords.DimCoord`
calendar : :obj:`str`, optional
string specifying calendar (only required if ``cfunit`` is of type
``str``).
Returns
-------
ndarray
numpy array containing timestamps as datetime64 objects
Raises
------
ValueError
if cfunit is ``str`` and calendar is not provided or invalid, or if
the cfunit string is invalid
Example
-------
>>> cfunit_str = 'day since 2018-01-01 00:00:00.00000000 UTC'
>>> cftime_to_datetime64(10, cfunit_str, "gregorian")
array(['2018-01-11T00:00:00.000000'], dtype='datetime64[us]')
"""
if isinstance(times, iris.coords.DimCoord): # special case
times, cfunit = times.points, times.units
try:
len(times)
except Exception:
times = [times]
if isinstance(cfunit, str):
if calendar is None:
raise ValueError(
"Require specification of calendar for conversion into datetime64 objects"
)
cfunit = Unit(cfunit, calendar) # raises Error if calendar is invalid
if not isinstance(cfunit, Unit):
raise ValueError(
"Please provide cfunit either as instance of class cf_units.Unit or as a string"
)
calendar = cfunit.calendar
basedate = cfunit.num2date(0)
if (calendar == "proleptic_gregorian" and basedate.year >= MINYEAR) or (
calendar in ["gregorian", "standard"] and basedate > GREGORIAN_BASE
):
# NOTE: changed on 9 July 2018 by jgliss due to error (kernel died)
# after update of dependencies (cf_units). Attribute name does not
# work anymore...
cfu_str = cfunit.origin # cfunit.name
res = cfu_str.split()[0].lower()
if res in microsec_units:
tstr = "us"
elif res in millisec_units:
tstr = "ms"
elif res in sec_units:
tstr = "s"
elif res in min_units:
tstr = "m"
elif res in hr_units:
tstr = "h"
elif res in day_units:
tstr = "D"
else:
raise ValueError("unsupported time units")
basedate = np.datetime64(basedate)
dt = np.asarray(np.asarray(times), dtype=f"timedelta64[{tstr}]")
return basedate + dt
else:
return np.asarray([np.datetime64(t) for t in cfunit.num2date(times)])
[docs]
def get_constraint(lon_range=None, lat_range=None, time_range=None, meridian_centre=True):
"""Function that creates an :class:`iris.Constraint` based on input
Note
----
Please be aware of the definition of the longitudes in your data when
cropping within the longitude dimension. The longitudes in your data may be
defined either from **-180 <= lon <= 180** (pyaerocom standard) or from
**0 <= lon <= 360**. In the former case (-180 -> 180) you can leave the
additional input parameter ``meridian_centre=True`` (default).
Parameters
----------
lon_range : :obj:`tuple`, optional
2-element tuple containing longitude range for cropping
Example input to crop around meridian: `lon_range=(-30, 30)`
lat_range : :obj:`tuple`, optional
2-element tuple containing latitude range for cropping.
time_range : :obj:`tuple`, optional
2-element tuple containing time range for cropping. Allowed data
types for specifying the times are
1. a combination of 2 :class:`pandas.Timestamp` instances or
2. a combination of two strings that can be directly converted\
into :class:`pandas.Timestamp` instances (e.g.\
`time_range=("2010-1-1", "2012-1-1")`) or
3. directly a combination of indices (:obj:`int`).
meridian_centre : bool
specifies the coordinate definition range of longitude array. If True,
then -180 -> 180 is assumed, else 0 -> 360
Returns
-------
iris.Constraint
the combined constraint from all valid input parameters
"""
constraints = []
if lon_range is not None:
constraints.append(get_lon_rng_constraint(*lon_range, meridian_centre))
if lat_range is not None:
constraints.append(get_lat_rng_constraint(*lat_range))
if time_range is not None:
constraints.append(get_time_rng_constraint(*time_range))
if len(constraints) > 0:
c = constraints[0]
for cadd in constraints[1:]:
c = c & cadd
return c
[docs]
def get_lat_rng_constraint(low, high):
"""Create latitude constraint based on input range
Parameters
----------
low : float or int
lower latitude coordinate
high : float or int
upper latitude coordinate
Returns
-------
iris.Constraint
the corresponding iris.Constraint instance
"""
return iris.Constraint(latitude=lambda v: low <= v <= high)
[docs]
def get_lon_rng_constraint(low, high, meridian_centre=True):
"""Create longitude constraint based on input range
Parameters
----------
low : float or int
left longitude coordinate
high : float or int
right longitude coordinate
meridian_centre : bool
specifies the coordinate definition range of longitude array of the
data to be cropped. If True, then -180 -> 180 is assumed, else 0 -> 360
Returns
-------
iris.Constraint
the corresponding iris.Constraint instance
Raises
------
ValueError
if first coordinate in lon_range equals or exceeds second
LongitudeConstraintError
if the input implies cropping over border of longitude array
(e.g. 160 -> - 160 if -180 <= lon <= 180).
"""
if low == high:
raise ValueError("the specified values are equal")
elif low > high:
raise ValueError("Left coordinate must exceed right coordinate")
if meridian_centre:
low, high = (low + 180) % 360 - 180, (high + 180) % 360 - 180
else:
low, high = low % 360, high % 360
if low > high:
msg = "Cannot crop over right border of longitude range"
raise LongitudeConstraintError(msg)
return iris.Constraint(longitude=lambda v: low <= v <= high)
[docs]
def get_time_rng_constraint(start, stop):
"""Create iris.Constraint for data extraction along time axis
Parameters
----------
start : :obj:`Timestamp` or :obj:` str`
start time of desired subset. If string, it must be convertible
into :class:`pandas.Timestamp` (e.g. "2012-1-1")
stop : :obj:`Timestamp` or :obj:` str`
start time of desired subset. If string, it must be convertible
into :class:`pandas.Timestamp` (e.g. "2012-1-1")
Returns
-------
iris.Constraint
iris Constraint instance that can, e.g., be used as input for
:func:`pyaerocom.griddeddata.GriddedData.extract`
"""
if not isinstance(start, pd.Timestamp):
start = pd.Timestamp(start)
if not isinstance(stop, pd.Timestamp):
stop = pd.Timestamp(stop)
t_lower = iris.time.PartialDateTime(year=start.year, month=start.month, day=start.day)
t_upper = iris.time.PartialDateTime(year=stop.year, month=stop.month, day=stop.day)
return iris.Constraint(time=lambda cell: t_lower <= cell <= t_upper)
[docs]
def get_max_period_range(periods):
start = min([int(per.split("-")[0]) for per in periods])
stop = max(int(per.split("-")[1]) if len(per.split("-")) > 1 else int(per) for per in periods)
return start, stop
[docs]
def make_dummy_cube(
var_name: str, start_yr: int = 2000, stop_yr: int = 2020, freq: str = "daily", dtype=float
) -> iris.cube.Cube:
startstr = f"days since {start_yr}-01-01 00:00"
if freq not in TS_TYPE_TO_PANDAS_FREQ.keys():
raise ValueError(f"{freq} not a recognized frequency")
start_str = f"{start_yr}-01-01 00:00"
stop_str = f"{stop_yr}-12-31 00:00"
times = pd.date_range(start_str, stop_str, freq=TS_TYPE_TO_PANDAS_FREQ[freq])
days_since_start = (times - times[0]).days
unit = get_variable(var_name).units
lat_range = (-90, 90)
lon_range = (-180, 180)
lat_res_deg = 45
lon_res_deg = 90
time_unit = Unit(startstr, calendar="gregorian")
lons = np.arange(
lon_range[0] + (lon_res_deg / 2), lon_range[1] + (lon_res_deg / 2), lon_res_deg
)
lats = np.arange(
lat_range[0] + (lat_res_deg / 2), lat_range[1] + (lat_res_deg / 2), lat_res_deg
)
latdim = iris.coords.DimCoord(
lats,
var_name="lat",
standard_name="latitude",
long_name="Center coordinates for latitudes",
circular=False,
units=Unit("degrees"),
)
londim = iris.coords.DimCoord(
lons,
var_name="lon",
standard_name="longitude",
long_name="Center coordinates for longitudes",
circular=False,
units=Unit("degrees"),
)
timedim = iris.coords.DimCoord(
days_since_start, var_name="time", standard_name="time", long_name="Time", units=time_unit
)
latdim.guess_bounds()
londim.guess_bounds()
dummy = iris.cube.Cube(np.ones((len(times), len(lats), len(lons))), units=unit)
dummy.add_dim_coord(latdim, 1)
dummy.add_dim_coord(londim, 2)
dummy.add_dim_coord(timedim, 0)
dummy.var_name = var_name
dummy.ts_type = freq
dummy.data = dummy.data.astype(dtype)
for coord in dummy.coords():
coord.points = coord.points.astype(dtype)
return dummy