"""
Mathematical low level utility methods of pyaerocom
"""
import numpy as np
from scipy.stats import kendalltau, pearsonr, spearmanr
from pyaerocom._warnings import ignore_warnings
### LAMBDA FUNCTIONS
in_range = lambda x, low, high: low <= x <= high
### OTHER FUNCTIONS
[docs]
def is_strictly_monotonic(iter1d) -> bool:
"""
Check if 1D iterble is strictly monotonic
Parameters
----------
iter1d
1D iterable object to be tested
Returns
-------
bool
"""
return True if np.all(np.diff(iter1d) > 0) else False
[docs]
def make_binlist(vmin: float, vmax: float, num: int = None) -> list:
""""""
if num is None:
num = 8
return list(np.linspace(vmin, vmax, num + 1))
[docs]
def weighted_sum(data, weights):
"""Compute weighted sum using numpy dot product
Parameters
----------
data : ndarray
data array that is supposed to be summed up
weights : ndarray
array containing weights for each point in `data`
Returns
-------
float
weighted sum of values in input array
"""
return np.dot(data, weights)
[docs]
def sum(data, weights=None):
"""Summing operation with option to perform weighted sum
Parameters
----------
data : ndarray
data array that is supposed to be summed up
weights : ndarray, optional
array containing weights for each point in `data`
Returns
-------
float or int
sum of values in input array
"""
if weights is None:
return np.sum(data)
return weighted_sum(data, weights)
[docs]
def weighted_mean(data, weights):
"""Compute weighted mean
Parameters
----------
data : ndarray
data array that is supposed to be averaged
weights : ndarray
array containing weights for each point in `data`
Returns
-------
float or int
weighted mean of data array
"""
return np.sum(data * weights) / np.sum(weights)
[docs]
def weighted_cov(ref_data, data, weights):
"""Compute weighted covariance
Parameters
----------
data_ref : ndarray
x data
data : ndarray
y data
weights : ndarray
array containing weights for each point in `data`
Returns
-------
float
covariance
"""
avgx = weighted_mean(ref_data, weights)
avgy = weighted_mean(data, weights)
return np.sum(weights * (ref_data - avgx) * (data - avgy)) / np.sum(weights)
[docs]
def weighted_corr(ref_data, data, weights):
"""Compute weighted correlation
Parameters
----------
data_ref : ndarray
x data
data : ndarray
y data
weights : ndarray
array containing weights for each point in `data`
Returns
-------
float
weighted correlation coefficient
"""
wcovxy = weighted_cov(ref_data, data, weights)
wcovxx = weighted_cov(ref_data, ref_data, weights)
wcovyy = weighted_cov(data, data, weights)
wsigmaxy = np.sqrt(wcovxx * wcovyy)
return wcovxy / wsigmaxy
[docs]
@ignore_warnings(
RuntimeWarning, "invalid value encountered in double_scalars", "An input array is constant"
)
def corr(ref_data, data, weights=None):
"""Compute correlation coefficient
Parameters
----------
data_ref : ndarray
x data
data : ndarray
y data
weights : ndarray, optional
array containing weights for each point in `data`
Returns
-------
float
correlation coefficient
"""
if weights is None:
return pearsonr(ref_data, data)[0]
return weighted_corr(ref_data, data, weights)
def _nanmean_and_std(data):
"""
Calculate mean and std for input data (may contain NaN's')
Parameters
----------
data : list or numpy.ndarray
input data
Returns
-------
float
mean value of input data.
float
standard deviation of input data.
"""
if np.all(np.isnan(data)):
return (np.nan, np.nan)
return (np.nanmean(data), np.nanstd(data))
[docs]
@ignore_warnings(
RuntimeWarning, "An input array is constant", "invalid value encountered in .*divide"
)
def calc_statistics(
data, ref_data, lowlim=None, highlim=None, min_num_valid=1, weights=None, drop_stats=None
):
"""Calc statistical properties from two data arrays
Calculates the following statistical properties based on the two provided
1-dimensional data arrays and returns them in a dictionary (keys are
provided after the arrows):
- Mean value of both arrays -> refdata_mean, data_mean
- Standard deviation of both arrays -> refdata_std, data_std
- RMS (Root mean square) -> rms
- NMB (Normalised mean bias) -> nmb
- MNMB (Modified normalised mean bias) -> mnmb
- MB (Mean Bias) -> mb
- MAB (Mean Absolute Bias) -> mab
- FGE (Fractional gross error) -> fge
- R (Pearson correlation coefficient) -> R
- R_spearman (Spearman corr. coeff) -> R_spearman
Note
----
Nans are removed from the input arrays, information about no. of removed
points can be inferred from keys `totnum` and `num_valid` in return dict.
Parameters
----------
data : ndarray
array containing data, that is supposed to be compared with reference
data
ref_data : ndarray
array containing data, that is used to compare `data` array with
lowlim : float
lower end of considered value range (e.g. if set 0, then all datapoints
where either ``data`` or ``ref_data`` is smaller than 0 are removed)
highlim : float
upper end of considered value range
min_num_valid : int
minimum number of valid measurements required to compute statistical
parameters.
weights: ndarray
array containing weights if computing weighted statistics
drop_stats: tuple
tuple which drops the provided statistics from computed json files.
For example, setting drop_stats = ("mb", "mab"), results in json files
in hm/ts with entries which do not contain the mean bias and mean
absolute bias, but the other statistics are preserved.
Returns
-------
dict
dictionary containing computed statistics
Raises
------
ValueError
if either of the input arrays has dimension other than 1
"""
data = np.asarray(data)
ref_data = np.asarray(ref_data)
if not data.ndim == 1 or not ref_data.ndim == 1:
raise ValueError("Invalid input. Data arrays must be one dimensional")
result = {}
mask = ~np.isnan(ref_data) * ~np.isnan(data)
num_points = mask.sum()
data, ref_data = data[mask], ref_data[mask]
weighted = False if weights is None else True
result["totnum"] = float(len(mask))
result["num_valid"] = float(num_points)
ref_mean, ref_std = _nanmean_and_std(ref_data)
data_mean, data_std = _nanmean_and_std(data)
result["refdata_mean"] = ref_mean
result["refdata_std"] = ref_std
result["data_mean"] = data_mean
result["data_std"] = data_std
result["weighted"] = weighted
if not num_points >= min_num_valid:
if lowlim is not None:
valid = np.logical_and(data > lowlim, ref_data > lowlim)
data = data[valid]
ref_data = ref_data[valid]
if highlim is not None:
valid = np.logical_and(data < highlim, ref_data < highlim)
data = data[valid]
ref_data = ref_data[valid]
result["rms"] = np.nan
result["nmb"] = np.nan
result["mnmb"] = np.nan
result["fge"] = np.nan
result["R"] = np.nan
result["R_spearman"] = np.nan
return result
if lowlim is not None:
valid = np.logical_and(data > lowlim, ref_data > lowlim)
data = data[valid]
ref_data = ref_data[valid]
if highlim is not None:
valid = np.logical_and(data < highlim, ref_data < highlim)
data = data[valid]
ref_data = ref_data[valid]
difference = data - ref_data
diffsquare = difference**2
if weights is not None:
weights = weights[mask]
weights = weights / weights.max()
result[
"NOTE"
] = "Weights were not applied to FGE and kendall and spearman corr (not implemented)"
result["rms"] = np.sqrt(np.average(diffsquare, weights=weights))
# NO implementation to apply weights yet ...
if num_points > 1:
result["R"] = corr(data, ref_data, weights)
result["R_spearman"] = spearmanr(data, ref_data)[0]
result["R_kendall"] = kendalltau(data, ref_data)[0]
else:
result["R"] = np.nan
result["R_spearman"] = np.nan
result["R_kendall"] = np.nan
sum_diff = sum(difference, weights=weights)
sum_refdata = sum(ref_data, weights=weights)
if sum_refdata == 0:
if sum_diff == 0:
nmb = 0
mb = 0
else:
nmb = np.nan
mb = np.nan
else:
nmb = sum_diff / sum_refdata
mb = sum_diff
sum_data_refdata = data + ref_data
# for MNMB, and FGE: don't divide by 0 ...
mask = ~np.isnan(sum_data_refdata)
num_points = mask.sum()
if num_points == 0:
mnmb = np.nan
fge = np.nan
mb = np.nan
mab = np.nan
else:
tmp = difference[mask] / sum_data_refdata[mask]
if weights is not None:
weights = weights[mask]
mnmb = 2.0 / num_points * sum(tmp, weights=weights)
fge = 2.0 / num_points * sum(np.abs(tmp), weights=weights)
mb = sum(difference[mask]) / num_points
mab = sum(np.abs(difference[mask])) / num_points
result["nmb"] = nmb
result["mnmb"] = mnmb
result["fge"] = fge
result["mb"] = mb
result["mab"] = mab
if drop_stats:
for istat in drop_stats:
result.pop(istat, None)
return result
[docs]
def closest_index(num_array, value):
"""Returns index in number array that is closest to input value"""
return np.argmin(np.abs(np.asarray(num_array) - value))
[docs]
def numbers_in_str(input_string):
"""This method finds all numbers in a string
Note
----
- Beta version, please use with care
- Detects only integer numbers, dots are ignored
Parameters
----------
input_string : str
string containing numbers
Returns
-------
list
list of strings specifying all numbers detected in string
Example
-------
>>> numbers_in_str('Bla42Blub100')
[42, 100]
"""
numbers = []
IN_NUM = False
c_num = None
for char in input_string:
try:
int(char)
if not IN_NUM:
IN_NUM = True
c_num = char
elif IN_NUM:
c_num += char
except Exception:
if IN_NUM:
numbers.append(c_num)
IN_NUM = False
if IN_NUM:
numbers.append(c_num)
return numbers
[docs]
def exponent(num):
"""Get exponent of input number
Parameters
----------
num : :obj:`float` or iterable
input number
Returns
-------
:obj:`int` or :obj:`ndarray` containing ints
exponent of input number(s)
Example
-------
>>> from pyaerocom.mathutils import exponent
>>> exponent(2340)
3
"""
return np.floor(np.log10(abs(np.asarray(num)))).astype(int)
[docs]
def range_magnitude(low, high):
"""Returns magnitude of value range
Parameters
----------
low : float
lower end of range
high : float
upper end of range
Returns
-------
int
magnitudes spanned by input numbers
Example
-------
>>> range_magnitude(0.1, 100)
3
>>> range_magnitude(100, 0.1)
-3
>>> range_magnitude(1e-3, 1e6)
9
"""
return exponent(high) - exponent(low)
[docs]
def estimate_value_range(vmin, vmax, extend_percent=0):
"""
Round and extend input range to estimate lower and upper bounds of range
Parameters
----------
vmin : float
lower value of range
vmax : float
upper value of range
extend_percent : int
percentage specifying to which extent the input range is supposed to be
extended.
Returns
-------
float
estimated lower end of range
float
estimated upper end of range
"""
if not vmax > vmin:
raise ValueError("vmax needs to exceed vmin")
# extent value range by +/- 5%
offs = (vmax - vmin) * extend_percent * 0.01
vmin, vmax = vmin - offs, vmax + offs
if vmin != 0:
exp = float(exponent(vmin))
else:
exp = float(exponent(vmax))
# round values
vmin = np.floor(vmin * 10 ** (-exp)) * 10.0 ** (exp)
vmax = np.ceil(vmax * 10 ** (-exp)) * 10.0 ** (exp)
return vmin, vmax
def _init_stats_dummy(drop_stats=None):
# dummy for statistics dictionary for locations without data
stats_dummy = {}
for k in calc_statistics([1], [1], drop_stats=drop_stats):
stats_dummy[k] = np.nan
# Test to make sure these variables are defined even when yearly and season != all
stats_dummy["R_spatial_mean"] = np.nan
stats_dummy["R_spatial_median"] = np.nan
stats_dummy["R_temporal_mean"] = np.nan
stats_dummy["R_temporal_median"] = np.nan
return stats_dummy