import logging
import os
import sys
from datetime import timedelta
from functools import cached_property
from getpass import getuser
from pathlib import Path
from typing import Annotated, Literal
import datetime
from pyaerocom.aeroval.glob_defaults import VarWebInfo, VarWebScaleAndColormap
from pyaerocom.aeroval.obsentry import ObsEntry
if sys.version_info >= (3, 11):
from typing import Self
else:
from typing_extensions import Self
import aerovaldb
import pandas as pd
from pydantic import (
BaseModel,
ConfigDict,
Field,
PositiveInt,
computed_field,
field_serializer,
field_validator,
)
from pyaerocom import __version__, const
from pyaerocom.aeroval.aux_io_helpers import ReadAuxHandler
from pyaerocom.aeroval.collections import ModelCollection, ObsCollection
from pyaerocom.aeroval.exceptions import ConfigError
from pyaerocom.aeroval.helpers import (
_check_statistics_periods,
_get_min_max_year_periods,
check_if_year,
BoundingBox,
)
from pyaerocom.aeroval.modelmaps_helpers import CONTOUR, OVERLAY
from pyaerocom.aeroval.json_utils import read_json, set_float_serialization_precision
from pyaerocom.colocation.colocation_setup import ColocationSetup
logger = logging.getLogger(__name__)
PLOT_TYPE_OPTIONS = ({OVERLAY}, {CONTOUR}, {OVERLAY, CONTOUR})
[docs]
class OutputPaths(BaseModel):
"""
Setup class for output paths of json files and co-located data
This interface generates all paths required for an experiment.
Attributes
----------
proj_id : str
project ID
exp_id : str
experiment ID
json_basedir : str, Path
avdb_resource : str, Path, None
An aerovaldb resource identifier as expected by aerovaldb.open()[1].
If not provided, pyaerocom will fall back to using json_basedir, for
backwards compatibility.
[1] https://aerovaldb.readthedocs.io/en/latest/api.html#aerovaldb.open
"""
# Pydantic ConfigDict
model_config = ConfigDict(arbitrary_types_allowed=True)
_JSON_SUBDIRS: list[str] = [
"map",
"ts",
"ts/diurnal",
"scat",
"hm",
"hm/ts",
"contour",
"profiles",
]
avdb_resource: Path | str | None = None
json_basedir: Path | str = Field(
default=os.path.join(const.OUTPUTDIR, "aeroval/data"), validate_default=True
)
coldata_basedir: Path | str = Field(
default=os.path.join(const.OUTPUTDIR, "aeroval/coldata"), validate_default=True
)
@field_validator("json_basedir", "coldata_basedir")
@classmethod
def validate_basedirs(cls, v):
if not os.path.exists(v):
tmp = Path(v) if isinstance(v, str) else v
tmp.mkdir(parents=True, exist_ok=True)
return v
proj_id: str
exp_id: str
def _check_init_dir(self, loc, assert_exists):
if assert_exists and not os.path.exists(loc):
os.makedirs(loc)
return loc
def get_coldata_dir(self, assert_exists=True):
loc = os.path.join(self.coldata_basedir, self.proj_id, self.exp_id)
return self._check_init_dir(loc, assert_exists)
def get_json_output_dirs(self, assert_exists=True):
out = {}
base = os.path.join(self.json_basedir, self.proj_id, self.exp_id)
for subdir in self._JSON_SUBDIRS:
loc = self._check_init_dir(os.path.join(base, subdir), assert_exists)
out[subdir] = loc
# for cams2_83 the extra 'forecast' folder will contain the median scores if computed
if self.proj_id == "cams2-83":
loc = self._check_init_dir(os.path.join(base, "forecast"), assert_exists)
out["forecast"] = loc
return out
[docs]
class ModelMapsSetup(BaseModel):
maps_freq: Literal["hourly", "daily", "monthly", "yearly", "coarsest"] = "coarsest"
maps_res_deg: PositiveInt = 5
plot_types: dict[str, str | tuple[str, str]] | set[str] = {CONTOUR}
boundaries: BoundingBox | None = None
map_observations_only_in_right_menu: bool = False
overlay_save_format: Literal["webp", "png"] = "webp"
@field_validator("plot_types")
def validate_plot_types(cls, v):
if isinstance(v, dict):
for m in v:
if not isinstance(v[m], set):
if isinstance(v[m], str):
v[m] = set([v[m]])
else:
v[m] = set([*v[m]])
if v[m] not in PLOT_TYPE_OPTIONS:
raise ConfigError("Model maps set up given a non-valid plot type.")
if isinstance(v, str):
v = set([v])
if isinstance(v, list): # can occur when reading a serialized config
v = set(v)
if v not in PLOT_TYPE_OPTIONS:
raise ConfigError("Model maps set up given a non-valid plot type.")
return v
[docs]
class CAMS2_83Setup(BaseModel):
use_cams2_83: bool = False
[docs]
class StatisticsSetup(BaseModel, extra="allow"):
"""
Setup options for statistical calculations
Attributes
----------
weighted_stats : bool
if True, statistics are calculated using area weights,
this is only relevant for gridded / gridded evaluations.
annual_stats_constrained : bool
if True, then only sites are considered that satisfy a potentially
specified annual resampling constraint (see
:attr:`pyaerocom.colocation.ColocationSetup.min_num_obs`). E.g.
lets say you want to calculate statistics (bias,
correlation, etc.) for monthly model / obs data for a given site and
year. Lets further say, that there are only 8 valid months of data, and
4 months are missing, so statistics will be calculated for that year
based on 8 vs. 8 values. Now if
:attr:`pyaerocom.colocation.ColocationSetup.min_num_obs` is
specified in way that requires e.g. at least 9 valid months to
represent the whole year, then this station will not be considered in
case `annual_stats_constrained` is True, else it will. Defaults to
False.
stats_tseries_base_freq : str, optional
The statistics Time Series display in AeroVal (under Overall Evaluation)
is computed in intervals of a certain frequency, which is specified
via :attr:`TimeSetup.main_freq` (defaults to monthly). That is,
monthly colocated data is used as a basis to compute the statistics
for each month (e.g. if you have 10 sites, then statistics will be
computed based on 10 monthly values for each month of the timeseries,
1 value for each site). `stats_tseries_base_freq` may be specified in
case a higher resolution is supposed to be used as a basis to compute
the timeseries in the resolution specified by
:attr:`TimeSetup.main_freq` (e.g. if daily is specified here, then for
the above example 310 values would be used - 31 for each site - to
compute the statistics for a given month (in this case, a month with 31
days, obviously).
drop_stats: tuple, optional
tuple of strings with names of statistics (as determined by keys in
aeroval.glob_defaults.py's statistics_defaults) to not compute. For example,
setting drop_stats = ("mb", "mab"), results in json files in hm/ts with
entries which do not contain the mean bias and mean absolute bias,
but the other statistics are preserved.
stats_decimals: int, optional
If provided, overwrites the decimals key in glod_defaults for the statistics, which has a deault of 3.
Setting this higher of lower changes the number of decimals shown on the Aeroval webpage.
round_floats_precision: int, optional
Sets the precision argument for the function `pyaerocom.aaeroval.json_utils:set_float_serialization_precision`
Parameters
----------
kwargs
any of the supported attributes, e.g.
`StatisticsSetup(annual_stats_constrained=True)`
"""
# Pydantic ConfigDict
model_config = ConfigDict(protected_namespaces=())
# StatisticsSetup attributes
MIN_NUM: PositiveInt = 1
weighted_stats: bool = True
annual_stats_constrained: bool = False
# Trends config
add_trends: bool = False # Adding trend calculations, only trends over the average time series over stations in a region
avg_over_trends: bool = (
False # Adds calculation of avg over trends of time series of stations in region
)
obs_min_yrs: PositiveInt = 0 # Removes stations with less than this number of years of valid data (a year with data points in all four seasons) Should in most cases be the same as stats_min_yrs
stats_min_yrs: PositiveInt = obs_min_yrs # Calculates trends if number of valid years are equal or more than this. Should in most cases be the same as obs_min_yrs
sequential_yrs: bool = False # Whether or not the min_yrs should be sequential
stats_tseries_base_freq: str | None = None
forecast_evaluation: bool = False
forecast_days: PositiveInt = 4
use_fairmode: bool = False
use_diurnal: bool = False
obs_only_stats: bool = False
model_only_stats: bool = False
drop_stats: tuple[str, ...] = ()
stats_decimals: int | None = None
round_floats_precision: int | None = None
if round_floats_precision:
set_float_serialization_precision(round_floats_precision)
[docs]
class TimeSetup(BaseModel):
DEFAULT_FREQS: Literal["monthly", "yearly"] = "monthly"
SEASONS: list[str] = ["all", "DJF", "MAM", "JJA", "SON"]
main_freq: str = "monthly"
freqs: list[str] = ["monthly", "yearly"]
periods: list[str] = Field(default_factory=list)
add_seasons: bool = True
[docs]
def get_seasons(self):
"""
Get list of seasons to be analysed
Returns :attr:`SEASONS` if :attr:`add_seasons` it True, else `[
'all']` (only whole year).
Returns
-------
list
list of season strings for analysis
"""
if self.add_seasons:
return self.SEASONS
return ["all"]
def _get_all_period_strings(self):
"""
Get list of all period strings for evaluation
Returns
-------
list
list of period / season strings
"""
output = []
for per in self.periods:
for season in self.get_seasons():
perstr = f"{per}-{season}"
output.append(perstr)
return output
[docs]
class WebDisplaySetup(BaseModel):
# Pydantic ConfigDict
model_config = ConfigDict(protected_namespaces=())
# WebDisplaySetup attributes
map_zoom: Literal["World", "Europe", "xEMEP"] = "World"
regions_how: Literal["default", "aerocom", "htap", "country"] = "default"
map_zoom: str = "World"
add_model_maps: bool = False
modelorder_from_config: bool = True
obsorder_from_config: bool = True
var_order_menu: tuple[str, ...] = ()
obs_order_menu: tuple[str, ...] = ()
model_order_menu: tuple[str, ...] = ()
hide_charts: tuple[str, ...] = ()
hide_pages: tuple[str, ...] = ()
ts_annotations: dict[str, str] = Field(default_factory=dict)
pages: tuple[str, ...] = ("maps", "evaluation", "intercomp", "overall", "infos")
[docs]
class EvalRunOptions(BaseModel):
clear_existing_json: bool = True
only_json: bool = False
only_colocation: bool = False
#: If True, process only maps (skip obs evaluation)
only_model_maps: bool = False
obs_only: bool = False
[docs]
class ProjectInfo(BaseModel):
proj_id: str
[docs]
class ExperimentInfo(BaseModel):
exp_id: str
exp_name: str = ""
exp_descr: str = ""
public: bool = False
exp_pi: str = getuser()
pyaerocom_version: str = __version__
creation_date: str = f"{datetime.datetime.now(datetime.timezone.utc):%Y-%m-%dT%H:%M:%S.%fZ}"
[docs]
class EvalSetup(BaseModel):
"""Composite class representing a whole analysis setup
This represents the level at which json I/O happens for configuration
setup files.
"""
###########################
## Pydantic ConfigDict
###########################
model_config = ConfigDict(arbitrary_types_allowed=True, extra="allow", protected_namespaces=())
########################################
## Regular & BaseModel-based Attributes
########################################
io_aux_file: Annotated[
Path | str, ".py file containing additional read methods for modeldata"
] = ""
var_web_info_file: Annotated[Path | str, "config file containing additional variables"] = ""
var_scale_colmap_file: Annotated[
Path | str, "config file containing scales/ranges for variables"
] = ""
_aux_funs: dict = {}
@computed_field
@cached_property
def proj_info(self) -> ProjectInfo:
if not hasattr(self, "model_extra") or self.model_extra is None:
return ProjectInfo()
model_args = {
key: val for key, val in self.model_extra.items() if key in ProjectInfo.model_fields
}
return ProjectInfo(**model_args)
@computed_field
@cached_property
def exp_info(self) -> ExperimentInfo:
model_args = {
key: val for key, val in self.model_extra.items() if key in ExperimentInfo.model_fields
}
return ExperimentInfo(**model_args)
@computed_field
@cached_property
def json_filename(self) -> str:
"""
str: Savename of config file: cfg_<proj_id>_<exp_id>.json
"""
return f"cfg_{self.proj_info.proj_id}_{self.exp_info.exp_id}.json"
@cached_property
def gridded_aux_funs(self) -> dict:
if not bool(self._aux_funs) and os.path.exists(self.io_aux_file):
self._import_aux_funs()
return self._aux_funs
@cached_property
def var_web_info(self) -> VarWebInfo:
return VarWebInfo(config_file=self.var_web_info_file)
@cached_property
def var_scale_colmap(self) -> VarWebScaleAndColormap:
return VarWebScaleAndColormap(config_file=self.var_scale_colmap_file)
@computed_field
@cached_property
def path_manager(self) -> OutputPaths:
if not hasattr(self, "model_extra") or self.model_extra is None:
return OutputPaths()
model_args = {
key: val for key, val in self.model_extra.items() if key in OutputPaths.model_fields
}
return OutputPaths(**model_args)
# Many computed_fields here have this hack to get keys from a general CFG into their appropriate respective classes
# TODO: all these computed fields could be more easily defined if the config were
# rigid enough to have them explicitly defined (e.g., in a TOML file), rather than dumping everything
# into one large config dict and then dishing out the relevant parts to each class.
@computed_field
@cached_property
def time_cfg(self) -> TimeSetup:
if not hasattr(self, "model_extra") or self.model_extra is None:
return TimeSetup()
model_args = {
key: val for key, val in self.model_extra.items() if key in TimeSetup.model_fields
}
return TimeSetup(**model_args)
@computed_field
@cached_property
def modelmaps_opts(self) -> ModelMapsSetup:
if not hasattr(self, "model_extra") or self.model_extra is None:
return ModelMapsSetup()
model_args = {
key: val for key, val in self.model_extra.items() if key in ModelMapsSetup.model_fields
}
return ModelMapsSetup(**model_args)
@computed_field
@cached_property
def cams2_83_cfg(self) -> CAMS2_83Setup:
if not hasattr(self, "model_extra"):
return CAMS2_83Setup()
model_args = {
key: val for key, val in self.model_extra.items() if key in CAMS2_83Setup.model_fields
}
return CAMS2_83Setup(**model_args)
@computed_field
@cached_property
def webdisp_opts(self) -> WebDisplaySetup:
if not hasattr(self, "model_extra") or self.model_extra is None:
return WebDisplaySetup()
model_args = {
key: val
for key, val in self.model_extra.items()
if key in WebDisplaySetup.model_fields
}
return WebDisplaySetup(**model_args)
@computed_field
@cached_property
def processing_opts(self) -> EvalRunOptions:
if not hasattr(self, "model_extra") or self.model_extra is None:
return EvalRunOptions()
model_args = {
key: val for key, val in self.model_extra.items() if key in EvalRunOptions.model_fields
}
return EvalRunOptions(**model_args)
@computed_field
@cached_property
def statistics_opts(self) -> StatisticsSetup:
if not hasattr(self, "model_extra") or self.model_extra is None:
return StatisticsSetup(weighted_stats=True, annual_stats_constrained=False)
model_args = {
key: val
for key, val in self.model_extra.items()
if key in StatisticsSetup.model_fields
}
return StatisticsSetup(**model_args)
@computed_field
@cached_property
def colocation_opts(self) -> ColocationSetup:
if not hasattr(self, "model_extra") or self.model_extra is None:
return ColocationSetup(save_coldata=True, keep_data=False, resample_how="mean")
model_args = {
key: val
for key, val in self.model_extra.items()
if key in ColocationSetup.model_fields
}
# need to pass some default values to the ColocationSetup if not provided in config
default_dict = {
"save_coldata": True,
"keep_data": False,
"resample_how": "mean",
}
for key in default_dict:
if key not in model_args:
model_args[key] = default_dict[key]
return ColocationSetup(**model_args)
##################################
## Non-BaseModel-based attributes
##################################
# These attributes require special attention b/c they're not based on Pydantic's BaseModel class.
@computed_field
@cached_property
def obs_cfg(self) -> ObsCollection:
oc = ObsCollection()
for k, v in self.model_extra.get("obs_cfg", {}).items():
oc.add_entry(k, v)
return oc
@field_serializer("obs_cfg")
def serialize_obs_cfg(self, obs_cfg: ObsCollection):
return obs_cfg.as_dict()
@computed_field
@cached_property
def model_cfg(self) -> ModelCollection:
mc = ModelCollection()
for k, v in self.model_extra.get("model_cfg", {}).items():
mc.add_entry(k, v)
return mc
@field_serializer("model_cfg")
def serialize_model_cfg(self, model_cfg: ModelCollection):
return model_cfg.as_dict()
###########################
## Methods
###########################
[docs]
def get_obs_entry(self, obs_name) -> ObsEntry:
"""Returns ObsEntry instance for network obs_name"""
return self.obs_cfg.get_entry(obs_name)
[docs]
def get_model_entry(self, model_name) -> dict:
"""Get model entry configuration
Since the configuration files for experiments are in json format, they
do not allow the storage of executable custom methods for model data
reading. Instead, these can be specified in a python module that may
be specified via :attr:`add_methods_file` and that contains a
dictionary `FUNS` that maps the method names with the callable methods.
As a result, this means that, by default, custom read methods for
individual models in :attr:`model_config` do not contain the
callable methods but only the names. This method will take care of
handling this and will return a dictionary where potential custom
method strings have been converted to the corresponding callable
methods.
Parameters
----------
model_name : str
name of model
Returns
-------
dict
Dictionary that specifies the model setup ready for the analysis
"""
cfg = self.model_cfg.get_entry(model_name)
cfg = cfg.prep_dict_analysis(self.gridded_aux_funs)
return cfg
[docs]
def to_json(self, outdir: str, ignore_nan: bool = True, indent: int = 3) -> None:
"""
Save configuration as JSON file
Parameters
----------
outdir : str
directory where the config json file is supposed to be stored
ignore_nan : bool
set NaNs to Null when writing
indent : int
json indentation
"""
with aerovaldb.open(
self.path_manager.json_basedir
if self.path_manager.avdb_resource is None
else self.path_manager.json_basedir
) as db:
with db.lock():
db.put_config(self.json_repr(), self.proj_info.proj_id, self.exp_info.exp_id)
[docs]
@staticmethod
def from_json(filepath: str) -> Self:
"""Load configuration from json config file"""
settings = read_json(filepath)
return EvalSetup(**settings)
def json_repr(self):
return self.model_dump()
def _import_aux_funs(self) -> None:
h = ReadAuxHandler(self.io_aux_file)
self._aux_funs.update(**h.import_all())
def _check_time_config(self) -> None:
periods = self.time_cfg.periods
colstart = self.colocation_opts.start
colstop = self.colocation_opts.stop
if len(periods) == 0:
if colstart is None:
raise ConfigError("Either periods or start must be set...")
per = self.colocation_opts._period_from_start_stop()
periods = [per]
logger.info(
f"periods is not set, inferred {per} from start / stop colocation settings."
)
self.time_cfg.periods = _check_statistics_periods(periods)
start, stop = _get_min_max_year_periods(periods)
start_yr = start.year
stop_yr = stop.year
years = check_if_year(periods)
if not years:
if start == stop and isinstance(start, pd.Timestamp):
stop = start + timedelta(hours=23)
elif isinstance(start, pd.Timestamp):
stop = stop + timedelta(hours=23)
if stop_yr == start_yr:
stop_yr += 1
if colstart is None:
self.colocation_opts.start = start.strftime("%Y/%m/%d %H:%M:%S")
if colstop is None:
self.colocation_opts.stop = stop.strftime(
"%Y/%m/%d %H:%M:%S"
) # + 1 # add 1 year since we want to include stop year
else:
if colstart is None:
self.colocation_opts.start = start_yr
if colstop is None:
self.colocation_opts.stop = (
stop_yr + 1
) # add 1 year since we want to include stop year