import glob
import logging
import os
import pathlib
import shutil
from collections import namedtuple
import aerovaldb
from pyaerocom import const
from pyaerocom._lowlevel_helpers import (
DirLoc,
StrType,
TypeValidator,
sort_dict_by_name,
)
from pyaerocom.aeroval.collections import ObsCollection
from pyaerocom.aeroval.glob_defaults import (
VariableInfo,
extended_statistics,
statistics_defaults,
statistics_mean_trend,
statistics_median_trend,
statistics_model_only,
statistics_obs_only,
statistics_trend,
)
from pyaerocom.aeroval.json_utils import round_floats
from pyaerocom.aeroval.modelentry import ModelEntry
from pyaerocom.aeroval import EvalSetup
from pyaerocom.aeroval.varinfo_web import VarinfoWeb
from pyaerocom.colocation.colocated_data import ColocatedData
from pyaerocom.exceptions import EntryNotAvailable, VariableDefinitionError
from pyaerocom.stats.mda8.const import MDA8_OUTPUT_VARS
from pyaerocom.stats.stats import _init_stats_dummy
from pyaerocom.utils import recursive_defaultdict
from pyaerocom.variable_helpers import get_aliases, get_variable
MapInfo = namedtuple(
"MapInfo",
["obs_network", "obs_var", "vert_code", "mod_name", "mod_var", "time_period"],
)
logger = logging.getLogger(__name__)
[docs]
class ProjectOutput:
"""JSON output for project"""
proj_id = StrType()
json_basedir = DirLoc(assert_exists=True)
def __init__(self, proj_id: str, resource: str | pathlib.Path | aerovaldb.AerovalDB):
self.proj_id = proj_id
if isinstance(resource, pathlib.Path):
resource = str(resource)
if isinstance(resource, str):
self.avdb = aerovaldb.open(resource)
elif isinstance(resource, aerovaldb.AerovalDB):
self.avdb = resource
else:
raise ValueError(f"Expected string or AerovalDB, got {type(resource)}.")
# TODO: Only works for json_files, check if this is needed, and remove / rewrite
# functionality that requires direct knowledge of _basedir.
self.json_basedir = self.avdb._basedir
@property
def proj_dir(self) -> str:
"""Project directory"""
fp = os.path.join(self.json_basedir, self.proj_id)
if not os.path.exists(fp):
os.mkdir(fp)
logger.info(f"Creating AeroVal project directory at {fp}")
return fp
@property
def experiments_file(self) -> str:
"""json file containing region specifications"""
fp = os.path.join(self.proj_dir, "experiments.json")
return fp
@property
def available_experiments(self) -> list:
"""
List of available experiments
"""
return list(self.avdb.get_experiments(self.proj_id, default={}))
[docs]
class ExperimentOutput(ProjectOutput):
"""JSON output for experiment"""
cfg = TypeValidator(EvalSetup)
def __init__(self, cfg: EvalSetup):
self.cfg = cfg
super().__init__(
cfg.proj_id,
(
cfg.path_manager.json_basedir
if cfg.path_manager.avdb_resource is None
else cfg.path_manager.json_basedir
),
)
# dictionary that will be filled by json cleanup methods to check for
# invalid or outdated json files across different output directories
self._invalid = dict(models=[], obs=[])
@property
def exp_id(self) -> str:
"""Experiment ID"""
return self.cfg.exp_id
@property
def exp_dir(self) -> str:
"""Experiment directory"""
fp = os.path.join(self.proj_dir, self.exp_id)
if not os.path.exists(fp):
os.mkdir(fp)
logger.info(f"Creating AeroVal experiment directory at {fp}")
return fp
@property
def regions_file(self) -> str:
"""json file containing region specifications"""
fp = os.path.join(self.exp_dir, "regions.json")
return fp
@property
def statistics_file(self) -> str:
"""json file containing region specifications"""
fp = os.path.join(self.exp_dir, "statistics.json")
return fp
@property
def var_ranges_file(self) -> str:
"""json file containing region specifications"""
fp = os.path.join(self.exp_dir, "ranges.json")
return fp
@property
def menu_file(self) -> str:
"""json file containing region specifications"""
fp = os.path.join(self.exp_dir, "menu.json")
return fp
@property
def results_available(self) -> bool:
"""
bool: True if results are available for this experiment, else False
"""
if self.exp_id not in os.listdir(self.proj_dir):
return False
elif not len(self._get_json_output_files("map")) > 0:
return False
return True
@property
def out_dirs_json(self) -> dict:
"""
json output directories (`dict`)
"""
return self.cfg.path_manager.get_json_output_dirs()
[docs]
def update_interface(self) -> None:
"""
Update web interface
Steps:
1. Check if results are available, and if so:
2. Add entry for this experiment in experiments.json
3. Create/update ranges.json file in experiment directory
4. Update menu.json against available output and evaluation setup
5. Synchronise content of heatmap json files with menu
6. Create/update file statistics.json in experiment directory
7. Copy json version of EvalSetup into experiment directory
Returns
-------
None
"""
if not self.results_available:
logger.warning(f"no output available for experiment {self.exp_id} in {self.proj_id}")
return
exp_data = {"public": self.cfg.exp_info.public}
self._add_entry_experiments_json(self.exp_id, exp_data)
self._create_var_ranges_json()
self.update_menu()
self._sync_heatmaps_with_menu_and_regions()
self._create_statistics_json()
# AeroVal frontend needs periods to be set in config json file...
# make sure they are
self.cfg._check_time_config()
with self.avdb.lock():
self.avdb.put_config(self.cfg.json_repr(), self.proj_id, self.exp_id)
def _sync_heatmaps_with_menu_and_regions(self) -> None:
"""
Synchronise content of heatmap json files with content of menu.json
"""
with self.avdb.lock():
menu = self.avdb.get_menu(self.proj_id, self.exp_id, default={})
all_regions = self.avdb.get_regions(self.proj_id, self.exp_id, default={})
for uri in self.avdb.list_glob_stats(
self.proj_id, self.exp_id, access_type=aerovaldb.AccessType.URI
):
data = self.avdb.get_by_uri(uri)
hm = {}
for vardisp, info in menu.items():
obs_dict = info["obs"]
if vardisp not in hm:
hm[vardisp] = {}
for obs, vdict in obs_dict.items():
if obs not in hm[vardisp]:
hm[vardisp][obs] = {}
for vert_code, mdict in vdict.items():
if vert_code not in hm[vardisp][obs]:
hm[vardisp][obs][vert_code] = {}
for mod, minfo in mdict.items():
if mod not in hm[vardisp][obs][vert_code]:
hm[vardisp][obs][vert_code][mod] = {}
modvar = minfo["model_var"]
hm_data = data[vardisp][obs][vert_code][mod][modvar]
hm_data = self._check_hm_all_regions_avail(all_regions, hm_data)
hm[vardisp][obs][vert_code][mod][modvar] = hm_data
self.avdb.put_by_uri(hm, uri)
def _check_hm_all_regions_avail(self, all_regions, hm_data) -> dict:
if all([x in hm_data for x in all_regions]):
return hm_data
# some regions are not available in this subset
periods = self.cfg.time_cfg._get_all_period_strings()
dummy_stats = _init_stats_dummy()
for region in all_regions:
if region not in hm_data:
hm_data[region] = {}
for per in periods:
hm_data[region][per] = dummy_stats
return hm_data
@staticmethod
def _info_from_map_file(filename: str) -> MapInfo:
"""
Separate map filename into meta info on obs and model content
Parameters
----------
filename : str
name of file in "map" subdirectory of json output directory for
this experiment
Raises
------
ValueError
if input filename is invalid
Returns
-------
str
name of observation network
str
name of observation variable
str
name of vertical code (e.g. Surface)
str
name of model
str
name of model variable
str
Time period
"""
spl = os.path.basename(filename).split(".json")[0].split("_")
if len(spl) != 4:
raise ValueError(
f"invalid map filename: {filename}. Must "
f"contain exactly 3 underscores _ to separate "
f"obsinfo, vertical, model info, and periods"
)
obsinfo = spl[0]
vert_code = spl[1]
modinfo = spl[2]
time_period = spl[3]
mspl = modinfo.split("-")
mod_var = mspl[-1]
mod_id = "-".join(mspl[:-1])
ospl = obsinfo.split("-")
obs_var = ospl[-1]
obs_network = "-".join(ospl[:-1])
return MapInfo(obs_network, obs_var, vert_code, mod_id, mod_var, time_period)
def _results_summary(self) -> dict[str, list[str]]:
res = [[], [], [], [], [], []]
files = self._get_json_output_files("map")
for file in files:
map_info = self._info_from_map_file(file)
for i, entry in enumerate(map_info):
res[i].append(entry)
output = {}
for i, name in enumerate(["obs", "ovar", "vc", "mod", "mvar", "per"]):
output[name] = list(set(res[i]))
return output
[docs]
def clean_json_files(self) -> list[str]:
"""Checks all existing json files and removes outdated data
This may be relevant when updating a model name or similar.
Returns:
list[str] :
The list of file paths that where modified / removed.
"""
modified = []
logger.info(
"Running clean_json_files: Checking json output directories for "
"outdated or invalid data and cleaning up."
)
outdirs = self.out_dirs_json
mapfiles = self._get_json_output_files("map")
rmmap = []
vert_codes = self.cfg.obs_cfg.all_vert_types
for file_path in mapfiles:
try:
(
obs_network,
obs_var,
vert_code,
mod_name,
mod_var,
time_period,
) = self._info_from_map_file(file_path)
except Exception as e:
logger.warning(
f"FATAL: invalid file convention for map json file:"
f" {file_path}. This file will be deleted. Error message: "
f"{repr(e)}"
)
rmmap.append(file_path)
continue
if not self._is_part_of_experiment(obs_network, obs_var, mod_name, mod_var):
rmmap.append(file_path)
elif vert_code not in vert_codes:
rmmap.append(file_path)
scatfiles = os.listdir(outdirs["scat"])
for file_path in rmmap: # delete map files
logger.info(f"Deleting outdated map json file: {file_path}.")
os.remove(file_path)
modified.append(file_path)
fname = os.path.basename(file_path)
if fname in scatfiles:
scfp = os.path.join(outdirs["scat"], fname)
logger.info(f"Deleting outdated scatter json file: {scfp}.")
os.remove(scfp)
modified.append(file_path)
tsfiles = self._get_json_output_files("ts")
for file_path in tsfiles:
if self._check_clean_ts_file(file_path):
modified.append(file_path)
modified.extend(self._clean_modelmap_files())
self.update_interface() # will take care of heatmap data
return modified
def _check_clean_ts_file(self, fp) -> bool:
fname = os.path.basename(fp)
spl = fname.split(".json")[0].split("_")
vert_code, obsinfo = spl[-1], spl[-2]
if vert_code not in self.cfg.obs_cfg.all_vert_types:
logger.warning(
f"Invalid or outdated vert code {vert_code} in ts file {fp}. File will be deleted."
)
os.remove(fp)
return True
obs_name = str.join("-", obsinfo.split("-")[:-1])
if obs_name in self._invalid["obs"]:
logger.info(
f"Invalid or outdated obs name {obs_name} in ts file {fp}. "
f"File will be deleted."
)
os.remove(fp)
return True
with self.avdb.lock():
try:
# TODO: Hack to get uri. Ideally this should be rewritten to use URIs directly further
# up.
uri = self.avdb._get_uri_for_file(fp)
data = self.avdb.get_by_uri(uri)
except Exception:
logger.exception(f"FATAL: detected corrupt json file: {fp}. Removing file...")
os.remove(fp)
return True
models_avail = list(data)
models_in_exp = self.cfg.model_cfg.web_interface_names
if all([mod in models_in_exp for mod in models_avail]):
# nothing to clean up
return False
modified = False
data_new = {}
for mod_name in models_avail:
if mod_name in models_in_exp:
data_new[mod_name] = data[mod_name]
else:
modified = True
logger.info(f"Removing data for model {mod_name} from ts file: {fp}")
self.avdb.put_by_uri(data_new, uri)
return modified
def _clean_modelmap_files(self) -> list[str]:
# Note: to be called after cleanup of files in map subdir
json_files = self._get_json_output_files("contour")
rm = []
for file in json_files:
if not self.cfg.webdisp_opts.add_model_maps:
rm.append(file)
else:
fname = os.path.basename(file)
spl = fname.split(".")[0].split("_")
if not len(spl) == 2:
msg = f"FATAL: invalid file convention for map json file: {file}."
if len(spl) > 2:
msg += "Likely due to underscore being present in model or variable name."
rm.append(file)
logger.warning(msg)
elif spl[-1] in self._invalid["models"]:
rm.append(file)
removed = []
for file in rm:
os.remove(file)
removed.append(file)
file1 = file.replace(".json", ".geojson")
if os.path.exists(file1):
os.remove(file1)
removed.append(file)
return removed
[docs]
def delete_experiment_data(self, also_coldata=True) -> None:
"""Delete all data associated with a certain experiment
Note
----
This simply deletes the experiment directory with all the json files
and, if `also_coldata` is True, also the associated co-located data
objects.
Parameters
----------
also_coldata : bool
if True and if output directory for colocated data is default and
specific for input experiment ID, then also all associated colocated
NetCDF files are deleted. Defaults to True.
"""
self.avdb.rm_experiment_data(self.proj_id, self.exp_id)
if also_coldata:
coldir = self.cfg.path_manager.get_coldata_dir()
if os.path.exists(coldir):
logger.info(f"Deleting everything under {coldir}")
shutil.rmtree(coldir)
self._del_entry_experiments_json(self.exp_id)
def _get_json_output_files(self, dirname) -> list[str]:
dirloc = self.out_dirs_json[dirname]
return glob.glob(f"{dirloc}/*.json")
def _get_cmap_info(self, var) -> dict[str, str | list[float]]:
var_ranges_defaults = self.cfg.var_scale_colmap
if var in var_ranges_defaults:
return var_ranges_defaults[var]
try:
varinfo = VarinfoWeb(var)
# TODO: get unit from pyaerocom/data/variables.ini
info = dict(scale=varinfo.cmap_bins, colmap=varinfo.cmap, unit=varinfo.unit)
except (VariableDefinitionError, AttributeError):
info = var_ranges_defaults["default"]
logger.warning(
f"Failed to infer cmap and variable "
f"ranges for {var}, using default "
f"settings which are {info}"
)
return info
def _create_var_ranges_json(self) -> None:
with self.avdb.lock():
ranges = self.avdb.get_ranges(self.proj_id, self.exp_id, default={})
avail = self._results_summary()
all_vars = list(set(avail["ovar"] + avail["mvar"]))
for var in all_vars:
if var not in ranges or ranges[var]["scale"] == []:
ranges[var] = self._get_cmap_info(var)
ranges[var]["unit"] = get_variable(var).units
self.avdb.put_ranges(ranges, self.proj_id, self.exp_id)
def _create_statistics_json(self) -> None:
if self.cfg.statistics_opts.obs_only_stats:
stats_info = statistics_obs_only
elif self.cfg.statistics_opts.model_only_stats:
stats_info = statistics_model_only
else:
stats_info = statistics_defaults
stats_info.update(extended_statistics)
# configurable statistics - drop any statistics provided in drop_stats
if self.cfg.statistics_opts.drop_stats:
for stat in self.cfg.statistics_opts.drop_stats:
stats_info.pop(stat, None)
# configure the number of decimals shown in statistics if provided
if self.cfg.statistics_opts.stats_decimals:
for stat in stats_info:
stats_info[stat].update(decimals=self.cfg.statistics_opts.stats_decimals)
if self.cfg.statistics_opts.add_trends:
if self.cfg.processing_opts.obs_only:
obs_statistics_trend = {
key: val for key, val in statistics_trend.items() if "mod" not in key
}
stats_info.update(obs_statistics_trend)
else:
stats_info.update(statistics_trend)
if self.cfg.statistics_opts.avg_over_trends:
stats_info.update(statistics_mean_trend)
stats_info.update(statistics_median_trend)
with self.avdb.lock():
self.avdb.put_statistics(stats_info, self.proj_id, self.exp_id)
def _get_var_name_and_type(self, var_name: str) -> VariableInfo:
"""Get menu name and type of observation variable
Parameters
----------
var_name : str
Name of variable
Returns
-------
VariableInfo :
named tuple containing
- menu name of this variable.
- Vertical type of this variable (ie. 2D, 3D).
- Category of this variable.
"""
if var_name in self.cfg.var_web_info:
name, tp, cat = self.cfg.var_web_info[var_name]
else:
name, tp, cat = var_name, "UNDEFINED", "UNDEFINED"
logger.warning(f"Missing menu name definition for var {var_name}.")
return VariableInfo(name, tp, cat)
def _init_menu_entry(self, var: str) -> dict:
name, tp, cat = self._get_var_name_and_type(var)
out = {"type": tp, "cat": cat, "name": name, "obs": {}}
try:
lname = const.VARS[var].description
except VariableDefinitionError:
lname = "UNDEFINED"
out["longname"] = lname
try:
# Comes in as a string. split() here breaks up based on space and returns either just the element in a list or the components of the string in a list
only_use_in = const.VARS[var].only_use_in.split(" ")
# only return only_use_in if key exists, otherwise do not
out["only_use_in"] = only_use_in
except AttributeError:
pass
return out
def _check_ovar_mvar_entry(
self, mcfg: ModelEntry, mod_var, ocfg: ObsCollection, obs_var
) -> bool:
muv = mcfg.model_use_vars
mrv = mcfg.model_rename_vars
mvar_aliases = get_aliases(mod_var)
for ovar, mvars in mcfg.model_add_vars.items():
if obs_var in mvars:
# for evaluation of entries in model_add_vars, the output json
# files use the model variable both for obs and for model as a
# workaround for the AeroVal heatmap display (which is based on
# observation variables on the y-axis). E.g. if
# model_add_vars=dict(od550aer=['od550so4']) then there will
# 2 co-located data objects one where model od550aer is
# co-located with obs od550aer and one where model od550so4
# is co-located with obs od550aer, thus for the latter,
# the obs variable is set to od550so4, so it shows up as a
# separate entry in AeroVal.
if obs_var in mrv:
# model_rename_vars is specified for that obs variable,
# e.g. using the above example, the output obs variable
# would be od550so4, however, the user want to rename
# the corresponding model variable via e.g.
# model_rename_vars=dict(od550so4='MyVar'). Thus,
# check if model variable is MyVar
if mod_var == mrv[obs_var]:
return True
elif obs_var == mod_var:
# if match, then they should be the same here
return True
obs_vars = ocfg.get_all_vars()
if obs_var in obs_vars:
if obs_var in muv:
mvar_to_use = muv[obs_var]
if mvar_to_use == mod_var:
# obs var is different from mod_var but this mapping is
# specified in mcfg.model_use_vars
return True
elif mvar_to_use in mvar_aliases:
# user specified an alias name in config for the
# observation variable e.g. model_use_vars=dict(
# ac550aer=absc550dryaer).
return True
elif mvar_to_use in mrv and mrv[mvar_to_use] == mod_var:
# user wants to rename the model variable
return True
if obs_var in mrv and mrv[obs_var] == mod_var:
# obs variable is in model_rename_vars
return True
elif mod_var in get_aliases(obs_var):
# model var is an alias to obs var e.g. sconcpm10 to concpm10
return True
elif mod_var == obs_var:
# default setting, includes cases where mcfg.model_use_vars
# is set and the value of the model variable in
# mcfg.model_use_vars is an alias for obs_var
return True
return False
def _is_part_of_experiment(self, obs_name, obs_var, mod_name, mod_var) -> bool:
"""
Check if input combination of model and obs var is valid
Note
----
The input parameters are supposed to be retrieved from json files
stored in the map subdirectory of an existing AeroVal experiment. In
complex setup cases the variable mapping (model / obs variables)
used in these json filenames may not be the trivial one expected from
the configuaration. These are cases where one specifies
model_add_vars, or model_use_vars or model_rename_vars in a model
entry.
Parameters
----------
obs_name : str
Name of obs dataset.
obs_var : str
Name of obs variable.
mod_name : str
Name of model
mod_var : str
Name of model variable
Returns
-------
bool
True if this combination is valid, else False.
"""
# MDA8 is computed on-the-fly ONLY if a MDA8_INPUT_VAR at hourly freq is detected.
# Consequently, it is not specified in a config but should be included as part of the experiment.
if obs_var in MDA8_OUTPUT_VARS and mod_var in MDA8_OUTPUT_VARS:
return True
# get model entry for model name
try:
mcfg = self.cfg.model_cfg.get_entry(mod_name)
except EntryNotAvailable:
self._invalid["models"].append(mod_name)
return False
# mapping of obs / model variables to be used
# search obs entry (may have web_interface_name set, so have to
# check keys of ObsCollection but also the individual entries for
# occurence of web_interface_name).
allobs = self.cfg.obs_cfg
obs_matches = []
for ocfg in allobs:
if obs_name == allobs.get_web_interface_name(ocfg.obs_name):
obs_matches.append(ocfg)
if len(obs_matches) == 0:
self._invalid["obs"].append(obs_name)
# obs dataset is not part of experiment
return False
# first, check model_add_vars
for ocfg in obs_matches:
if self._check_ovar_mvar_entry(mcfg, mod_var, ocfg, obs_var):
return True
return False
def _create_menu_dict(self) -> dict:
new = {}
files = self._get_json_output_files("map")
for file in files:
(obs_name, obs_var, vert_code, mod_name, mod_var, per) = self._info_from_map_file(file)
if self._is_part_of_experiment(obs_name, obs_var, mod_name, mod_var):
mcfg = self.cfg.model_cfg.get_entry(mod_name)
var = mcfg.get_varname_web(mod_var, obs_var)
if var not in new:
new[var] = self._init_menu_entry(var)
if obs_name not in new[var]["obs"]:
new[var]["obs"][obs_name] = {}
if vert_code not in new[var]["obs"][obs_name]:
new[var]["obs"][obs_name][vert_code] = {}
if mod_name not in new[var]["obs"][obs_name][vert_code]:
new[var]["obs"][obs_name][vert_code][mod_name] = {}
new[var]["obs"][obs_name][vert_code][mod_name] = {
"model_id": mcfg.model_id,
"model_var": mod_var,
"obs_var": obs_var,
}
else:
logger.warning(
f"Invalid entry: model {mod_name} ({mod_var}), obs {obs_name} ({obs_var})"
)
return new
def _sort_menu_entries(self, avail: dict) -> dict:
"""
Used in method :func:`update_menu_evaluation_iface`
Sorts results of different menu entries (i.e. variables, observations
and models).
Parameters
----------
avail : dict
nested dictionary contining info about available results
Returns
-------
dict
input dictionary sorted in variable, obs and model layers. The order
of variables, observations and models may be specified in
AerocomEvaluation class and if not, alphabetic order is used.
"""
# sort first layer (i.e. variables)
avail = sort_dict_by_name(avail, pref_list=self.cfg.webdisp_opts.var_order_menu)
new_sorted = {}
for var, info in avail.items():
new_sorted[var] = info
obs_order = self.get_obs_order_menu()
sorted_obs = sort_dict_by_name(info["obs"], pref_list=obs_order)
new_sorted[var]["obs"] = sorted_obs
for obs_name, vert_codes in sorted_obs.items():
vert_codes_sorted = sort_dict_by_name(vert_codes)
new_sorted[var]["obs"][obs_name] = vert_codes_sorted
for vert_code, models in vert_codes_sorted.items():
model_order = self.get_model_order_menu()
models_sorted = sort_dict_by_name(models, pref_list=model_order)
new_sorted[var]["obs"][obs_name][vert_code] = models_sorted
return new_sorted
def _add_entry_experiments_json(self, exp_id: str, data) -> None:
with self.avdb.lock():
current = self.avdb.get_experiments(self.proj_id, default={})
current[exp_id] = data
self.avdb.put_experiments(current, self.proj_id)
def _del_entry_experiments_json(self, exp_id) -> None:
"""
Remove an entry from experiments.json
Parameters
----------
exp_id : str
name of experiment
Returns
-------
None
"""
with self.avdb.lock():
current = self.avdb.get_experiments(self.proj_id, default={})
try:
del current[exp_id]
except KeyError:
logger.warning(f"no such experiment registered: {exp_id}")
self.avdb.put_experiments(current, self.proj_id)
[docs]
def reorder_experiments(self, exp_order=None) -> None:
"""Reorder experiment order in evaluation interface
Puts experiment list into order as specified by `exp_order`, all
remaining experiments are sorted alphabetically.
Parameters
----------
exp_order : list, optional
desired experiment order, if None, then alphabetical order is used.
"""
if exp_order is None:
exp_order = []
elif not isinstance(exp_order, list):
raise ValueError("need list as input")
with self.avdb.lock():
current = self.avdb.get_experiments(self.proj_id, default={})
current = sort_dict_by_name(current, pref_list=exp_order)
self.avdb.put_experiments(current, self.proj_id)
[docs]
def add_heatmap_timeseries_entry(
self,
entry: dict,
region: str,
network: str,
obsvar: str,
layer: str,
modelname: str,
modvar: str,
):
"""Adds a heatmap entry to hm/ts
:param entry: The entry to be added.
:param network: Observation network
:param obsvar: Observation variable
:param layer: Vertical layer
:param modelname: Model name
:param modvar: Model variable
"""
project = self.proj_id
experiment = self.exp_id
with self.avdb.lock():
glob_stats = self.avdb.get_heatmap_timeseries(
project, experiment, region, network, obsvar, layer, default={}
)
glob_stats = recursive_defaultdict(glob_stats)
glob_stats[obsvar][network][layer][modelname][modvar] = round_floats(entry)
self.avdb.put_heatmap_timeseries(
glob_stats,
project,
experiment,
region,
network,
obsvar,
layer,
)
[docs]
def add_forecast_entry(
self,
entry: dict,
region: str,
network: str,
obsvar: str,
layer: str,
modelname: str,
modvar: str,
):
"""Adds a forecast entry to forecast
:param entry: The entry to be added.
:param network: Observation network
:param obsvar: Observation variable
:param layer: Vertical layer
:param modelname: Model name
:param modvar: Model variable
"""
project = self.proj_id
experiment = self.exp_id
with self.avdb.lock():
glob_stats = self.avdb.get_forecast(
project, experiment, region, network, obsvar, layer, default={}
)
glob_stats = recursive_defaultdict(glob_stats)
glob_stats[obsvar][network][layer][modelname][modvar] = round_floats(entry)
self.avdb.put_forecast(
glob_stats,
project,
experiment,
region,
network,
obsvar,
layer,
)
[docs]
def add_heatmap_entry(
self,
entry,
frequency: str,
network: str,
obsvar: str,
layer: str,
modelname: str,
modvar: str,
):
"""Adds a heatmap entry to glob_stats
:param entry: The entry to be added.
:param region: The region (eg. ALL)
:param obsvar: Observation variable.
:param layer: Vertical Layer (eg. SURFACE)
:param modelname: Model name
:param modelvar: Model variable.
"""
project = self.proj_id
experiment = self.exp_id
with self.avdb.lock():
glob_stats = self.avdb.get_glob_stats(project, experiment, frequency, default={})
glob_stats = recursive_defaultdict(glob_stats)
glob_stats[obsvar][network][layer][modelname][modvar] = entry
self.avdb.put_glob_stats(glob_stats, project, experiment, frequency)
[docs]
def write_station_data(self, data):
"""Writes timeseries weekly.
:param data: Data to be written.
"""
project = self.proj_id
experiment = self.exp_id
location = data["station_name"]
network = data["obs_name"]
obsvar = data["var_name_web"]
layer = data["vert_code"]
modelname = data["model_name"]
with self.avdb.lock():
station_data = self.avdb.get_timeseries_weekly(
project, experiment, location, network, obsvar, layer, default={}
)
station_data[modelname] = round_floats(data)
self.avdb.put_timeseries_weekly(
station_data, project, experiment, location, network, obsvar, layer
)
[docs]
def write_timeseries(self, data):
"""Write timeseries
Args:
data: The timeseries object to be written.
Note:
-----
All necessary metadata will be read from the data object.
"""
if not isinstance(data, list):
data = [data]
project = self.proj_id
experiment = self.exp_id
with self.avdb.lock():
for d in data:
location = d["station_name"]
network = d["obs_name"]
obsvar = d["var_name_web"]
layer = d["vert_code"]
modelname = d["model_name"]
timeseries = self.avdb.get_timeseries(
project, experiment, location, network, obsvar, layer, default={}
)
timeseries[modelname] = round_floats(d)
self.avdb.put_timeseries(
timeseries, project, experiment, location, network, obsvar, layer
)
[docs]
def add_profile_entry(
self,
data: ColocatedData,
profile_viz: dict,
periods: list[str],
seasons: list[str],
location,
network,
obsvar,
):
"""Adds an entry for the colocated data to profiles.json.
Args:
data (ColocatedData): For this vertical layer
profile_viz (dict): Output of process_profile_data()
periods (list[str]): periods to compute over (years)
seasons (list[str]): seasons to compute over (e.g., All, DJF, etc.)
"""
with self.avdb.lock():
current = self.avdb.get_profiles(
self.proj_id, self.exp_id, location, network, obsvar, default={}
)
current = recursive_defaultdict(current)
for freq, coldata in data.items():
model_name = coldata.model_name
midpoint = (
float(coldata.data.attrs["vertical_layer"]["end"])
+ float(coldata.data.attrs["vertical_layer"]["start"])
) / 2
if "z" not in current[model_name]:
current[model_name]["z"] = [midpoint] # initalize with midpoint
if (
midpoint > current[model_name]["z"][-1]
): # only store incremental increases in the layers
current[model_name]["z"].append(midpoint)
# old boilerplate to get around recursive_default_dict issues
if "obs" not in current[model_name]:
current[model_name]["obs"] = {}
if freq not in current[model_name]["obs"]:
current[model_name]["obs"][freq] = {}
if "mod" not in current[model_name]:
current[model_name]["mod"] = {}
if freq not in current[model_name]["mod"]:
current[model_name]["mod"][freq] = {}
for per in periods:
for season in seasons:
perstr = f"{per}-{season}"
if perstr not in current[model_name]["obs"][freq]:
current[model_name]["obs"][freq][perstr] = []
if perstr not in current[model_name]["mod"][freq]:
current[model_name]["mod"][freq][perstr] = []
current[model_name]["obs"][freq][perstr].append(
profile_viz["obs"][freq][perstr]
)
current[model_name]["mod"][freq][perstr].append(
profile_viz["mod"][freq][perstr]
)
if "metadata" not in current[model_name]:
current[model_name]["metadata"] = {
"z_unit": coldata.data.attrs["altitude_units"],
"z_description": "Altitude ASL",
"z_long_description": "Altitude Above Sea Level",
"unit": coldata.unitstr,
}
current[model_name] = round_floats(current[model_name])
self.avdb.put_profiles(current, self.proj_id, self.exp_id, location, network, obsvar)