"""
Methods and / or classes to perform 3D colocation
"""
from __future__ import annotations
import logging
import os
from typing import NamedTuple
import iris
import numpy as np
from cf_units import Unit
from pyaerocom import __version__ as pya_ver
from pyaerocom import const
from pyaerocom.climatology_config import ClimatologyConfig
from pyaerocom._lowlevel_helpers import LayerLimits, RegridResDeg
from pyaerocom.exceptions import (
DataUnitError,
DimensionOrderError,
MetaDataError,
TemporalResolutionError,
VarNotAvailableError,
)
from pyaerocom.filter import Filter
from pyaerocom.helpers import make_datetime_index
from pyaerocom.tstype import TsType
from .colocated_data import ColocatedData
from .colocation_utils import (
_colocate_site_data_helper,
_colocate_site_data_helper_timecol,
_regrid_gridded,
check_time_ival,
check_ts_type,
resolve_var_name,
)
logger = logging.getLogger(__name__)
[docs]
class ColocatedDataLists(NamedTuple):
colocateddata_for_statistics: list[ColocatedData]
colocateddata_for_profile_viz: list[ColocatedData]
def _colocate_vertical_profile_gridded(
data,
data_ref,
start=None,
stop=None,
filter_name=None,
harmonise_units=True,
var_ref=None,
min_num_obs=None,
colocate_time=False,
use_climatology_ref=False,
resample_how=None,
layer_limits: LayerLimits | None = None,
obs_stat_data=None,
ungridded_lons=None,
ungridded_lats=None,
col_freq=None,
col_tst=None,
var=None,
var_aerocom=None,
var_ref_aerocom=None,
) -> list[ColocatedData]:
if layer_limits is None:
raise Exception("layer limits must be provided")
data_ref_unit = None
ts_type_src_ref = None
if not harmonise_units:
data_unit = str(data.units)
else:
data_unit = None
pd_freq = col_tst.to_pandas_freq()
time_idx = make_datetime_index(start, stop, pd_freq)
time_num = len(time_idx)
stat_num = len(obs_stat_data)
arr = np.full((2, time_num, stat_num), np.nan)
lons = np.full(stat_num, np.nan)
lats = np.full(stat_num, np.nan)
alts = np.full(stat_num, np.nan)
station_names = [""] * stat_num
dataset_ref = data_ref.contains_datasets[0]
ts_type_src_data = data.ts_type
list_of_colocateddata_objects = []
for vertical_layer in layer_limits:
# Think about efficency here in terms of order of loops. candidate for parallelism
# create the 2D layer data
arr = np.full((2, time_num, stat_num), np.nan)
try:
data_this_layer = (
data.extract(
iris.Constraint(
coord_values={
"altitude": lambda cell: vertical_layer["start"]
< cell
< vertical_layer["end"]
}
)
)
.collapsed("altitude", iris.analysis.MEAN)
.copy()
)
except Exception as e:
logger.warning(f"No altitude in model data layer {vertical_layer}")
logger.debug(f"Raised: {e}")
continue
grid_stat_data_this_layer = data_this_layer.to_time_series(
longitude=ungridded_lons,
latitude=ungridded_lats,
)
# loop over all stations and append to colocated data object
for i, obs_stat in enumerate(obs_stat_data):
# Add coordinates to arrays required for xarray.DataArray below
lons[i] = obs_stat.longitude
lats[i] = obs_stat.latitude
alts[i] = obs_stat.station_coords[
"altitude"
] # altitude refers to altitude of the data. be explcit where getting from
station_names[i] = obs_stat.station_name
# ToDo: consider removing to keep ts_type_src_ref (this was probably
# introduced for EBAS were the original data frequency is not constant
# but can vary from site to site)
if ts_type_src_ref is None:
ts_type_src_ref = obs_stat["ts_type_src"]
elif obs_stat["ts_type_src"] != ts_type_src_ref:
spl = ts_type_src_ref.split(";")
if obs_stat["ts_type_src"] not in spl:
spl.append(obs_stat["ts_type_src"])
ts_type_src_ref = ";".join(spl)
if data_ref_unit is None:
try:
data_ref_unit = obs_stat["var_info"][var_ref]["units"]
except KeyError as e: # variable information or unit is not defined
logger.exception(repr(e))
try:
unit = obs_stat["var_info"][var_ref]["units"]
except Exception:
unit = None
if not unit == data_ref_unit:
raise ValueError(
f"Cannot perform colocation. "
f"Ungridded data object contains different units ({var_ref})"
)
# get observations (Note: the index of the observation time series
# is already in the specified frequency format, and thus, does not
# need to be updated, for details (or if errors occur), cf.
# UngriddedData.to_station_data, where the conversion happens)
# get model station data
grid_stat_this_layer = grid_stat_data_this_layer[i]
# Make a copy of the station data and resample it to the mean based on hourly resolution. Needs testing!
obs_stat_this_layer = obs_stat.copy()
try:
obs_stat_this_layer[var_ref] = (
obs_stat_this_layer.select_altitude(
var_name=var_ref, altitudes=list(vertical_layer.values())
)
.mean("altitude", skipna=True) # very important to skip nans here
.to_series() # make pandas series
)
except ValueError:
logger.warning(
f"Var: {var_ref}. Skipping {obs_stat_this_layer.station_name} in altitude layer {vertical_layer} because no data"
)
continue
if harmonise_units:
grid_unit = grid_stat_this_layer.get_unit(var)
obs_unit = obs_stat_this_layer.get_unit(var_ref)
if not grid_unit == obs_unit:
grid_stat_this_layer.convert_unit(var, obs_unit)
if data_unit is None:
data_unit = obs_unit
try:
if colocate_time:
_df = _colocate_site_data_helper_timecol(
stat_data=grid_stat_this_layer,
stat_data_ref=obs_stat_this_layer,
var=var,
var_ref=var_ref,
ts_type=col_freq,
resample_how=resample_how,
min_num_obs=min_num_obs,
use_climatology_ref=use_climatology_ref,
)
else:
_df = _colocate_site_data_helper(
stat_data=grid_stat_this_layer,
stat_data_ref=obs_stat_this_layer,
var=var,
var_ref=var_ref,
ts_type=col_freq,
resample_how=resample_how,
min_num_obs=min_num_obs,
use_climatology_ref=use_climatology_ref,
)
try:
# assign the unified timeseries data to the colocated data array
arr[0, :, i] = _df["ref"].values
arr[1, :, i] = _df["data"].values
except ValueError:
try:
mask = _df.index.intersection(time_idx)
_df = _df.loc[mask]
arr[0, :, i] = _df["ref"].values
arr[1, :, i] = _df["data"].values
except ValueError as e:
logger.warning(
f"Failed to colocate time for station {obs_stat.station_name}. "
f"This station will be skipped (error: {e})"
)
except TemporalResolutionError as e:
# resolution of obsdata is too low
logger.warning(
f"{var_ref} data from site {obs_stat.station_name} will "
f"not be added to ColocatedData. Reason: {e}"
)
try:
revision = data_ref.data_revision[dataset_ref]
except Exception:
try:
revision = data_ref._get_data_revision_helper(dataset_ref)
except MetaDataError:
revision = "MULTIPLE"
except Exception:
revision = "n/a"
files = [os.path.basename(x) for x in data.from_files]
meta = {
"data_source": [dataset_ref, data.data_id],
"var_name": [var_ref_aerocom, var_aerocom],
"var_name_input": [var_ref, var],
"ts_type": col_freq, # will be updated below if resampling
"filter_name": filter_name,
"ts_type_src": [ts_type_src_ref, ts_type_src_data],
"var_units": [data_ref_unit, data_unit],
"data_level": 3,
"revision_ref": revision,
"from_files": files,
"from_files_ref": None,
"colocate_time": colocate_time,
"obs_is_clim": True if isinstance(use_climatology_ref, ClimatologyConfig) else False,
"pyaerocom": pya_ver,
"min_num_obs": min_num_obs,
"resample_how": resample_how,
"vertical_layer": vertical_layer,
}
# create coordinates of DataArray
coords = {
"data_source": meta["data_source"],
"time": time_idx,
"station_name": station_names,
"latitude": ("station_name", lats),
"longitude": ("station_name", lons),
"altitude": ("station_name", alts),
}
dims = ["data_source", "time", "station_name"]
coldata = ColocatedData(data=arr, coords=coords, dims=dims, name=var, attrs=meta)
# add correct units for lat / lon dimensions
coldata.latitude.attrs["standard_name"] = data.latitude.standard_name
coldata.latitude.attrs["units"] = str(data.latitude.units)
coldata.longitude.attrs["standard_name"] = data.longitude.standard_name
coldata.longitude.attrs["units"] = str(data.longitude.units)
coldata.data.attrs["altitude_units"] = str(data.altitude.units)
coldata.vertical_layer = vertical_layer
list_of_colocateddata_objects.append(coldata)
return list_of_colocateddata_objects
[docs]
def colocate_vertical_profile_gridded(
data,
data_ref,
ts_type: str = None,
start: str | None = None,
stop: str | None = None,
filter_name: str = None,
regrid_res_deg: float | RegridResDeg | None = None,
harmonise_units: bool = True,
regrid_scheme: str = "areaweighted",
var_ref: str = None,
update_baseyear_gridded: int = None,
min_num_obs: int | dict | None = None,
colocate_time: bool = False,
use_climatology_ref: dict = False,
resample_how: str | dict = None,
colocation_layer_limits: tuple[LayerLimits, ...] | None = None,
profile_layer_limits: tuple[LayerLimits, ...] | None = None,
**kwargs,
) -> ColocatedDataLists:
"""
Colocated vertical profile data with gridded (model) data
The guts of this function are placed in a helper function as not to repeat the code.
This is done because colocation must occur twice:
i) at the the statistics are computed
ii) at a finder vertical resoltuion for profile vizualization
Some things you do not want to compute twice, however.
So (most of) the things that correspond to both colocation instances are computed here,
and then passed to the helper function.
Returns
colocated_data_lists : ColocatedDataLists
-------
"""
if filter_name is None:
filter_name = const.DEFAULT_REG_FILTER
try:
data.check_dimcoords_tseries()
except DimensionOrderError:
data.reorder_dimensions_tseries()
var, var_aerocom = resolve_var_name(data)
if var_ref is None:
var_ref = var_aerocom
var_ref_aerocom = var_aerocom
else:
var_ref_aerocom = const.VARS[var_ref].var_name_aerocom
if var_ref not in data_ref.contains_vars:
raise VarNotAvailableError(
f"Variable {var_ref} is not available in ungridded "
f"data (which contains {data_ref.contains_vars})"
)
elif len(data_ref.contains_datasets) > 1:
raise AttributeError(
f"Colocation can only be performed with ungridded data objects "
f"that only contain a single dataset (input data contains: "
f"{data_ref.contains_datasets}. Use method `extract_dataset` of "
f"UngriddedData object to extract single datasets."
)
if any(
not {"start", "end"}.issubset(layer)
for layer in colocation_layer_limits + profile_layer_limits
):
raise KeyError(
"start and end must be provided for profiles in each vertical layer in colocate_vertical_profile_gridded"
)
data_ref_meta_idxs_with_var_info = []
for i in range(len(data_ref.metadata)):
if "altitude" not in data_ref.metadata[i]["var_info"]:
logger.warning(
f"Warning: Station {data_ref.metadata[i]['station_name']} does not have any var_info"
)
else:
data_ref_meta_idxs_with_var_info.append(i)
if any(
data.altitude.units != Unit(data_ref.metadata[i]["var_info"]["altitude"]["units"])
for i in data_ref_meta_idxs_with_var_info
):
logger.info(
f"Mismatching units in colocation_3d.py. Model has units {data.altitude.units} whereas not all observations have this unit. Debug to find out where."
)
raise DataUnitError
if update_baseyear_gridded is not None:
# update time dimension in gridded data
data.base_year = update_baseyear_gridded
# apply region filter to data
regfilter = Filter(name=filter_name)
data_ref = regfilter.apply(data_ref)
data = regfilter.apply(data)
# check time overlap and crop model data if needed
start, stop = check_time_ival(data, start, stop)
data = data.crop(time_range=(start, stop))
if regrid_res_deg is not None: # pragma: no cover
data = _regrid_gridded(data, regrid_scheme, regrid_res_deg)
# Special ts_typs for which all stations with ts_type< are removed
reduce_station_data_ts_type = ts_type
# ts_type_src_data = data.ts_type
ts_type, ts_type_data = check_ts_type(data, ts_type)
if not colocate_time and ts_type < ts_type_data:
data = data.resample_time(str(ts_type), min_num_obs=min_num_obs, how=resample_how)
ts_type_data = ts_type
if isinstance(use_climatology_ref, ClimatologyConfig): # pragma: no cover
col_freq = use_climatology_ref.freq
obs_start = use_climatology_ref.start
obs_stop = use_climatology_ref.stop
else:
col_freq = str(ts_type)
obs_start = start
obs_stop = stop
# colocation frequency
col_tst = TsType(col_freq)
latitude = data.latitude.points
longitude = data.longitude.points
altitude = data.altitude.points
lat_range = [np.min(latitude), np.max(latitude)]
lon_range = [np.min(longitude), np.max(longitude)]
alt_range = [np.min(altitude), np.max(altitude)]
# use only sites that are within model domain
# filter_by_meta wipes is_vertical_profile
# Also note that filter_by_meta may not be calling alt_range. Function fitler_altitude is defined but not used
data_ref = data_ref.filter_by_meta(latitude=lat_range, longitude=lon_range, altitude=alt_range)
# get timeseries from all stations in provided time resolution
# (time resampling is done below in main loop)
all_stats = data_ref.to_station_data_all(
vars_to_convert=var_ref,
start=obs_start,
stop=obs_stop,
by_station_name=True,
ts_type_preferred=reduce_station_data_ts_type,
**kwargs,
)
if len(all_stats["stats"]) == 0:
raise VarNotAvailableError(
f"Variable {var_ref} is not available in specified time interval ({start}-{stop})"
)
# Colocation has to occur twice for vertical profiles.
# Once for the colocation which we will compute the statistics over.
# The second time is just to show the vertical profiles on the web. This needs to be finer
# Here we make a list with the list of ColocatedData objects for both colocation purposes
output_prep = [
_colocate_vertical_profile_gridded(
data=data,
data_ref=data_ref,
start=start,
stop=stop,
filter_name=filter_name,
harmonise_units=harmonise_units,
var_ref=var_ref,
min_num_obs=min_num_obs,
colocate_time=colocate_time,
use_climatology_ref=use_climatology_ref,
resample_how=resample_how,
layer_limits=layer_limits,
obs_stat_data=all_stats["stats"],
ungridded_lons=all_stats["longitude"],
ungridded_lats=all_stats["latitude"],
col_freq=col_freq,
col_tst=col_tst,
var=var,
var_aerocom=var_aerocom,
var_ref_aerocom=var_ref_aerocom,
)
for layer_limits in (colocation_layer_limits, profile_layer_limits)
]
# Create a namedtuple for output.
# Each element in the tuple is a list of ColocatedData objects.
# The length of these lists is the same as the number of colocation layers
for coldata in output_prep[1]:
coldata.data.attrs["just_for_viz"] = 1
colocated_data_lists = ColocatedDataLists(
*output_prep
) # put the list of prepared output into namedtuple object s.t. both position and named arguments can be used
return colocated_data_lists