import glob
import logging
import os
import shutil
from pyaerocom import const
from pyaerocom._lowlevel_helpers import DirLoc, StrType, TypeValidator, sort_dict_by_name
from pyaerocom.aeroval.glob_defaults import (
extended_statistics,
statistics_defaults,
statistics_model_only,
statistics_obs_only,
statistics_trend,
var_ranges_defaults,
var_web_info,
)
from pyaerocom.aeroval.json_utils import check_make_json, read_json, write_json
from pyaerocom.aeroval.setupclasses import EvalSetup
from pyaerocom.aeroval.varinfo_web import VarinfoWeb
from pyaerocom.exceptions import EntryNotAvailable, VariableDefinitionError
from pyaerocom.stats.stats import _init_stats_dummy
from pyaerocom.variable_helpers import get_aliases
logger = logging.getLogger(__name__)
[docs]
class ProjectOutput:
"""JSON output for project"""
proj_id = StrType()
json_basedir = DirLoc(assert_exists=True)
def __init__(self, proj_id: str, json_basedir: str):
self.proj_id = proj_id
self.json_basedir = json_basedir
@property
def proj_dir(self) -> str:
"""Project directory"""
fp = os.path.join(self.json_basedir, self.proj_id)
if not os.path.exists(fp):
os.mkdir(fp)
logger.info(f"Creating AeroVal project directory at {fp}")
return fp
@property
def experiments_file(self) -> str:
"""json file containing region specifications"""
fp = os.path.join(self.proj_dir, "experiments.json")
fp = check_make_json(fp)
return fp
@property
def available_experiments(self) -> list:
"""
List of available experiments
"""
return list(read_json(self.experiments_file))
[docs]
class ExperimentOutput(ProjectOutput):
"""JSON output for experiment"""
cfg = TypeValidator(EvalSetup)
def __init__(self, cfg):
self.cfg = cfg
super().__init__(cfg.proj_id, cfg.path_manager.json_basedir)
# dictionary that will be filled by json cleanup methods to check for
# invalid or outdated json files across different output directories
self._invalid = dict(models=[], obs=[])
@property
def exp_id(self) -> str:
"""Experiment ID"""
return self.cfg.exp_id
@property
def exp_dir(self) -> str:
"""Experiment directory"""
fp = os.path.join(self.proj_dir, self.exp_id)
if not os.path.exists(fp):
os.mkdir(fp)
logger.info(f"Creating AeroVal experiment directory at {fp}")
return fp
@property
def regions_file(self) -> str:
"""json file containing region specifications"""
fp = os.path.join(self.exp_dir, "regions.json")
fp = check_make_json(fp)
return fp
@property
def statistics_file(self) -> str:
"""json file containing region specifications"""
fp = os.path.join(self.exp_dir, "statistics.json")
fp = check_make_json(fp)
return fp
@property
def var_ranges_file(self) -> str:
"""json file containing region specifications"""
fp = os.path.join(self.exp_dir, "ranges.json")
check_make_json(fp)
return fp
@property
def menu_file(self) -> str:
"""json file containing region specifications"""
fp = os.path.join(self.exp_dir, "menu.json")
check_make_json(fp)
return fp
@property
def results_available(self) -> bool:
"""
bool: True if results are available for this experiment, else False
"""
if not self.exp_id in os.listdir(self.proj_dir):
return False
elif not len(self._get_json_output_files("map")) > 0:
return False
return True
@property
def out_dirs_json(self) -> dict:
"""
json output directories (`dict`)
"""
return self.cfg.path_manager.get_json_output_dirs()
[docs]
def update_interface(self) -> None:
"""
Update web interface
Steps:
1. Check if results are available, and if so:
2. Add entry for this experiment in experiments.json
3. Create/update ranges.json file in experiment directory
4. Update menu.json against available output and evaluation setup
5. Synchronise content of heatmap json files with menu
6. Create/update file statistics.json in experiment directory
7. Copy json version of EvalSetup into experiment directory
Returns
-------
None
"""
if not self.results_available:
logger.warning(f"no output available for experiment {self.exp_id} in {self.proj_id}")
return
exp_data = {"public": self.cfg.exp_info.public}
self._add_entry_experiments_json(self.exp_id, exp_data)
self._create_var_ranges_json()
self.update_menu()
self._sync_heatmaps_with_menu_and_regions()
self._create_statistics_json()
# AeroVal frontend needs periods to be set in config json file...
# make sure they are
self.cfg._check_time_config()
self.cfg.to_json(self.exp_dir)
def _sync_heatmaps_with_menu_and_regions(self) -> None:
"""
Synchronise content of heatmap json files with content of menu.json
"""
menu = read_json(self.menu_file)
all_regions = read_json(self.regions_file)
for fp in self._get_json_output_files("hm"):
data = read_json(fp)
hm = {}
for vardisp, info in menu.items():
obs_dict = info["obs"]
if not vardisp in hm:
hm[vardisp] = {}
for obs, vdict in obs_dict.items():
if not obs in hm[vardisp]:
hm[vardisp][obs] = {}
for vc, mdict in vdict.items():
if not vc in hm[vardisp][obs]:
hm[vardisp][obs][vc] = {}
for mod, minfo in mdict.items():
if not mod in hm[vardisp][obs][vc]:
hm[vardisp][obs][vc][mod] = {}
modvar = minfo["model_var"]
hm_data = data[vardisp][obs][vc][mod][modvar]
hm_data = self._check_hm_all_regions_avail(all_regions, hm_data)
hm[vardisp][obs][vc][mod][modvar] = hm_data
write_json(hm, fp, ignore_nan=True)
def _check_hm_all_regions_avail(self, all_regions, hm_data) -> dict:
if all([x in hm_data for x in all_regions]):
return hm_data
# some regions are not available in this subset
periods = self.cfg.time_cfg._get_all_period_strings()
dummy_stats = _init_stats_dummy()
for region in all_regions:
if not region in hm_data:
hm_data[region] = {}
for per in periods:
hm_data[region][per] = dummy_stats
return hm_data
@staticmethod
def _info_from_map_file(filename) -> tuple[str]:
"""
Separate map filename into meta info on obs and model content
Parameters
----------
filename : str
name of file in "map" subdirectory of json output directory for
this experiment
Raises
------
ValueError
if input filename is invalid
Returns
-------
str
name of observation network
str
name of observation variable
str
name of vertical code (e.g. Surface)
str
name of model
str
name of model variable
"""
spl = os.path.basename(filename).split(".json")[0].split("_")
if len(spl) != 4:
raise ValueError(
f"invalid map filename: {filename}. Must "
f"contain exactly 3 underscores _ to separate "
f"obsinfo, vertical, model info, and periods"
)
obsinfo = spl[0]
vert_code = spl[1]
modinfo = spl[2]
per = spl[3]
mspl = modinfo.split("-")
mvar = mspl[-1]
mname = "-".join(mspl[:-1])
ospl = obsinfo.split("-")
ovar = ospl[-1]
oname = "-".join(ospl[:-1])
return (oname, ovar, vert_code, mname, mvar, per)
def _results_summary(self) -> dict:
res = [[], [], [], [], [], []]
files = self._get_json_output_files("map")
tab = []
for file in files:
item = self._info_from_map_file(file)
for i, entry in enumerate(item):
res[i].append(entry)
output = {}
for i, name in enumerate(["obs", "ovar", "vc", "mod", "mvar", "per"]):
output[name] = list(set(res[i]))
return output
[docs]
def clean_json_files(self) -> list:
"""Checks all existing json files and removes outdated data
This may be relevant when updating a model name or similar.
"""
modified = []
logger.info(
"Running clean_json_files: Checking json output directories for "
"outdated or invalid data and cleaning up."
)
outdirs = self.out_dirs_json
mapfiles = self._get_json_output_files("map")
rmmap = []
vert_codes = self.cfg.obs_cfg.all_vert_types
for file in mapfiles:
try:
(
obs_name,
obs_var,
vert_code,
mod_name,
mod_var,
period,
) = self._info_from_map_file(file)
except Exception as e:
logger.warning(
f"FATAL: invalid file convention for map json file:"
f" {file}. This file will be deleted. Error message: "
f"{repr(e)}"
)
rmmap.append(file)
continue
if not self._is_part_of_experiment(obs_name, obs_var, mod_name, mod_var):
rmmap.append(file)
elif not vert_code in vert_codes:
rmmap.append(file)
scatfiles = os.listdir(outdirs["scat"])
for file in rmmap: # delete map files
logger.info(f"Deleting outdated map json file: {file}.")
os.remove(file)
modified.append(file)
fname = os.path.basename(file)
if fname in scatfiles:
scfp = os.path.join(outdirs["scat"], fname)
logger.info(f"Deleting outdated scatter json file: {file}.")
os.remove(scfp)
modified.append(file)
tsfiles = self._get_json_output_files("ts")
for file in tsfiles:
if self._check_clean_ts_file(file):
modified.append(file)
modified.extend(self._clean_modelmap_files())
self.update_interface() # will take care of heatmap data
return modified
def _check_clean_ts_file(self, fp) -> bool:
fname = os.path.basename(fp)
spl = fname.split(".json")[0].split("_")
vc, obsinfo = spl[-1], spl[-2]
if not vc in self.cfg.obs_cfg.all_vert_types:
logger.warning(
f"Invalid or outdated vert code {vc} in ts file {fp}. File will be deleted."
)
os.remove(fp)
return True
obs_name = str.join("-", obsinfo.split("-")[:-1])
if obs_name in self._invalid["obs"]:
logger.info(
f"Invalid or outdated obs name {obs_name} in ts file {fp}. "
f"File will be deleted."
)
os.remove(fp)
return True
try:
data = read_json(fp)
except Exception:
logger.exception(f"FATAL: detected corrupt json file: {fp}. Removing file...")
os.remove(fp)
return True
models_avail = list(data)
models_in_exp = self.cfg.model_cfg.web_iface_names
if all([mod in models_in_exp for mod in models_avail]):
# nothing to clean up
return False
modified = False
data_new = {}
for mod_name in models_avail:
if mod_name in models_in_exp:
data_new[mod_name] = data[mod_name]
else:
modified = True
logger.info(f"Removing data for model {mod_name} from ts file: {fp}")
write_json(data_new, fp)
return modified
def _clean_modelmap_files(self) -> list[str]:
# Note: to be called after cleanup of files in map subdir
json_files = self._get_json_output_files("contour")
rm = []
for file in json_files:
if not self.cfg.webdisp_opts.add_model_maps:
rm.append(file)
else:
fname = os.path.basename(file)
spl = fname.split(".")[0].split("_")
if not len(spl) == 2:
msg = f"FATAL: invalid file convention for map json file: {file}."
if len(spl) > 2:
msg += "Likely due to underscore being present in model or variable name."
rm.append(file)
logger.warning(msg)
elif spl[-1] in self._invalid["models"]:
rm.append(file)
removed = []
for file in rm:
os.remove(file)
removed.append(file)
file1 = file.replace(".json", ".geojson")
if os.path.exists(file1):
os.remove(file1)
removed.append(file)
return removed
[docs]
def delete_experiment_data(self, also_coldata=True) -> None:
"""Delete all data associated with a certain experiment
Note
----
This simply deletes the experiment directory with all the json files
and, if `also_coldata` is True, also the associated co-located data
objects.
Parameters
----------
base_dir : str, optional
basic output direcory (containing subdirs of all projects)
proj_name : str, optional
name of project, if None, then this project is used
exp_name : str, optional
name experiment, if None, then this project is used
also_coldata : bool
if True and if output directory for colocated data is default and
specific for input experiment ID, then also all associated colocated
NetCDF files are deleted. Defaults to True.
"""
if os.path.exists(self.exp_dir):
logger.info(f"Deleting everything under {self.exp_dir}")
shutil.rmtree(self.exp_dir)
if also_coldata:
coldir = self.cfg.path_manager.get_coldata_dir()
if os.path.exists(coldir):
logger.info(f"Deleting everything under {coldir}")
shutil.rmtree(coldir)
self._del_entry_experiments_json(self.exp_id)
def _get_json_output_files(self, dirname) -> list[str]:
dirloc = self.out_dirs_json[dirname]
return glob.glob(f"{dirloc}/*.json")
def _get_cmap_info(self, var) -> list[float]:
if var in var_ranges_defaults:
return var_ranges_defaults[var]
try:
varinfo = VarinfoWeb(var)
info = dict(scale=varinfo.cmap_bins, colmap=varinfo.cmap)
except (VariableDefinitionError, AttributeError):
info = var_ranges_defaults["default"]
logger.warning(
f"Failed to infer cmap and variable "
f"ranges for {var}, using default "
f"settings which are {info}"
)
return info
def _create_var_ranges_json(self) -> None:
try:
ranges = read_json(self.var_ranges_file)
except FileNotFoundError:
ranges = {}
avail = self._results_summary()
all_vars = list(set(avail["ovar"] + avail["mvar"]))
for var in all_vars:
if not var in ranges or ranges[var]["scale"] == []:
ranges[var] = self._get_cmap_info(var)
write_json(ranges, self.var_ranges_file, indent=4)
def _create_statistics_json(self) -> None:
if self.cfg.statistics_opts.obs_only_stats:
stats_info = statistics_obs_only
elif self.cfg.statistics_opts.model_only_stats:
stats_info = statistics_model_only
else:
stats_info = statistics_defaults
stats_info.update(extended_statistics)
# configurable statistics - drop any statistics provided in drop_stats
if self.cfg.statistics_opts.drop_stats:
for stat in self.cfg.statistics_opts.drop_stats:
stats_info.pop(stat, None)
# configure the number of decimals shown in statistics if provided
if self.cfg.statistics_opts.stats_decimals:
for stat in stats_info:
stats_info[stat].update(decimals=self.cfg.statistics_opts.stats_decimals)
if self.cfg.statistics_opts.add_trends:
if self.cfg.processing_opts.obs_only:
obs_statistics_trend = {
key: val for key, val in statistics_trend.items() if "mod" not in key
}
stats_info.update(obs_statistics_trend)
else:
stats_info.update(statistics_trend)
write_json(stats_info, self.statistics_file, indent=4)
def _get_var_name_and_type(self, var_name) -> str:
"""Get menu name and type of observation variable
Parameters
----------
var_name : str
Name of variable
Returns
-------
str
menu name of this variable
str
vertical type of this variable (2D, 3D)
str
variable category
"""
if var_name in self.cfg.var_web_info:
name, tp, cat = self.cfg.var_mapping[var_name]
elif var_name in var_web_info:
name, tp, cat = var_web_info[var_name]
else:
name, tp, cat = var_name, "UNDEFINED", "UNDEFINED"
logger.warning(f"Missing menu name definition for var {var_name}.")
return (name, tp, cat)
def _init_menu_entry(self, var: str) -> dict:
name, tp, cat = self._get_var_name_and_type(var)
out = {"type": tp, "cat": cat, "name": name, "obs": {}}
try:
lname = const.VARS[var].description
except VariableDefinitionError:
lname = "UNDEFINED"
out["longname"] = lname
try:
# Comes in as a string. split() here breaks up based on space and returns either just the element in a list or the components of the string in a list
only_use_in = const.VARS[var].only_use_in.split(" ")
# only return only_use_in if key exists, otherwise do not
out["only_use_in"] = only_use_in
except AttributeError:
pass
return out
def _check_ovar_mvar_entry(self, mcfg, mod_var, ocfg, obs_var) -> bool:
muv = mcfg.model_use_vars
mrv = mcfg.model_rename_vars
mvar_aliases = get_aliases(mod_var)
for ovar, mvars in mcfg.model_add_vars.items():
if obs_var in mvars:
# for evaluation of entries in model_add_vars, the output json
# files use the model variable both for obs and for model as a
# workaround for the AeroVal heatmap display (which is based on
# observation variables on the y-axis). E.g. if
# model_add_vars=dict(od550aer=['od550so4']) then there will
# 2 co-located data objects one where model od550aer is
# co-located with obs od550aer and one where model od550so4
# is co-located with obs od550aer, thus for the latter,
# the obs variable is set to od550so4, so it shows up as a
# separate entry in AeroVal.
if obs_var in mrv:
# model_rename_vars is specified for that obs variable,
# e.g. using the above example, the output obs variable
# would be od550so4, however, the user want to rename
# the corresponding model variable via e.g.
# model_rename_vars=dict(od550so4='MyVar'). Thus,
# check if model variable is MyVar
if mod_var == mrv[obs_var]:
return True
elif obs_var == mod_var:
# if match, then they should be the same here
return True
obs_vars = ocfg.get_all_vars()
if obs_var in obs_vars:
if obs_var in muv:
mvar_to_use = muv[obs_var]
if mvar_to_use == mod_var:
# obs var is different from mod_var but this mapping is
# specified in mcfg.model_use_vars
return True
elif mvar_to_use in mvar_aliases:
# user specified an alias name in config for the
# observation variable e.g. model_use_vars=dict(
# ac550aer=absc550dryaer).
return True
elif mvar_to_use in mrv and mrv[mvar_to_use] == mod_var:
# user wants to rename the model variable
return True
if obs_var in mrv and mrv[obs_var] == mod_var:
# obs variable is in model_rename_vars
return True
elif mod_var in get_aliases(obs_var):
# model var is an alias to obs var e.g. sconcpm10 to concpm10
return True
elif mod_var == obs_var:
# default setting, includes cases where mcfg.model_use_vars
# is set and the value of the model variable in
# mcfg.model_use_vars is an alias for obs_var
return True
return False
def _is_part_of_experiment(self, obs_name, obs_var, mod_name, mod_var) -> bool:
"""
Check if input combination of model and obs var is valid
Note
----
The input parameters are supposed to be retrieved from json files
stored in the map subdirectory of an existing AeroVal experiment. In
complex setup cases the variable mapping (model / obs variables)
used in these json filenames may not be the trivial one expected from
the configuaration. These are cases where one specifies
model_add_vars, or model_use_vars or model_rename_vars in a model
entry.
Parameters
----------
obs_name : str
Name of obs dataset.
obs_var : str
Name of obs variable.
mod_name : str
Name of model
mod_var : str
Name of model variable
Returns
-------
bool
True if this combination is valid, else False.
"""
# get model entry for model name
try:
mcfg = self.cfg.model_cfg.get_entry(mod_name)
except EntryNotAvailable:
self._invalid["models"].append(mod_name)
return False
# mapping of obs / model variables to be used
# search obs entry (may have web_interface_name set, so have to
# check keys of ObsCollection but also the individual entries for
# occurence of web_interface_name).
allobs = self.cfg.obs_cfg
obs_matches = []
for key, ocfg in allobs.items():
if obs_name == allobs.get_web_iface_name(key):
obs_matches.append(ocfg)
if len(obs_matches) == 0:
self._invalid["obs"].append(obs_name)
# obs dataset is not part of experiment
return False
# first, check model_add_vars
for ocfg in obs_matches:
if self._check_ovar_mvar_entry(mcfg, mod_var, ocfg, obs_var):
return True
return False
def _create_menu_dict(self) -> dict:
new = {}
files = self._get_json_output_files("map")
for file in files:
(obs_name, obs_var, vert_code, mod_name, mod_var, per) = self._info_from_map_file(file)
if self._is_part_of_experiment(obs_name, obs_var, mod_name, mod_var):
mcfg = self.cfg.model_cfg.get_entry(mod_name)
var = mcfg.get_varname_web(mod_var, obs_var)
if not var in new:
new[var] = self._init_menu_entry(var)
if not obs_name in new[var]["obs"]:
new[var]["obs"][obs_name] = {}
if not vert_code in new[var]["obs"][obs_name]:
new[var]["obs"][obs_name][vert_code] = {}
if not mod_name in new[var]["obs"][obs_name][vert_code]:
new[var]["obs"][obs_name][vert_code][mod_name] = {}
model_id = mcfg["model_id"]
new[var]["obs"][obs_name][vert_code][mod_name] = {
"model_id": model_id,
"model_var": mod_var,
"obs_var": obs_var,
}
else:
logger.warning(
f"Invalid entry: model {mod_name} ({mod_var}), obs {obs_name} ({obs_var})"
)
return new
def _sort_menu_entries(self, avail) -> dict:
"""
Used in method :func:`update_menu_evaluation_iface`
Sorts results of different menu entries (i.e. variables, observations
and models).
Parameters
----------
avail : dict
nested dictionary contining info about available results
config : AerocomEvaluation
Configuration class
Returns
-------
dict
input dictionary sorted in variable, obs and model layers. The order
of variables, observations and models may be specified in
AerocomEvaluation class and if not, alphabetic order is used.
"""
# sort first layer (i.e. variables)
avail = sort_dict_by_name(avail, pref_list=self.cfg.webdisp_opts.var_order_menu)
new_sorted = {}
for var, info in avail.items():
new_sorted[var] = info
obs_order = self.get_obs_order_menu()
sorted_obs = sort_dict_by_name(info["obs"], pref_list=obs_order)
new_sorted[var]["obs"] = sorted_obs
for obs_name, vert_codes in sorted_obs.items():
vert_codes_sorted = sort_dict_by_name(vert_codes)
new_sorted[var]["obs"][obs_name] = vert_codes_sorted
for vert_code, models in vert_codes_sorted.items():
model_order = self.get_model_order_menu()
models_sorted = sort_dict_by_name(models, pref_list=model_order)
new_sorted[var]["obs"][obs_name][vert_code] = models_sorted
return new_sorted
def _get_valid_obs_vars(self, obs_name) -> dict:
if obs_name in self._valid_obs_vars:
return self._valid_obs_vars[obs_name]
obs_vars = self.obs_config[obs_name]["obs_vars"]
add = []
for mname, mcfg in self.model_config.items():
if "model_add_vars" in mcfg:
for ovar, mvar in mcfg["model_add_vars"].items():
if ovar in obs_vars and not mvar in add:
add.append(mvar)
obs_vars.extend(add)
self._valid_obs_vars[obs_name] = obs_vars
return obs_vars
def _add_entry_experiments_json(self, exp_id, data) -> None:
fp = self.experiments_file
current = read_json(fp)
current[exp_id] = data
write_json(
current,
self.experiments_file,
indent=4,
)
def _del_entry_experiments_json(self, exp_id) -> None:
"""
Remove an entry from experiments.json
Parameters
----------
exp_id : str
name of experiment
Returns
-------
None
"""
current = read_json(self.experiments_file)
try:
del current[exp_id]
except KeyError:
logger.warning(f"no such experiment registered: {exp_id}")
write_json(
current,
self.experiments_file,
indent=4,
)
[docs]
def reorder_experiments(self, exp_order=None) -> None:
"""Reorder experiment order in evaluation interface
Puts experiment list into order as specified by `exp_order`, all
remaining experiments are sorted alphabetically.
Parameters
----------
exp_order : list, optional
desired experiment order, if None, then alphabetical order is used.
"""
if exp_order is None:
exp_order = []
elif not isinstance(exp_order, list):
raise ValueError("need list as input")
current = read_json(self.experiments_file)
current = sort_dict_by_name(current, pref_list=exp_order)
write_json(
current,
self.experiments_file,
indent=4,
)