Source code for pyaerocom.aeroval.helpers

import logging
import os
from pathlib import Path

from pyaerocom import const
from pyaerocom.aeroval.modelentry import ModelEntry
from pyaerocom.aeroval.varinfo_web import VarinfoWeb
from pyaerocom.griddeddata import GriddedData
from pyaerocom.helpers import get_highest_resolution, get_max_period_range, make_dummy_cube
from pyaerocom.variable import Variable

logger = logging.getLogger(__name__)


[docs] def check_var_ranges_avail(model_data, var_name): """ Check if lower and upper variable ranges are available for input variable Parameters ---------- model_data : GriddedData modeldata containing variable data var_name : str variable name to be checked (must be the same as model data AeroCom variable name). Raises ------ ValueError if ranges for input variable are not defined and if input model data corresponds to a different variable than the input variable name. Returns ------- None """ try: VarinfoWeb(var_name) except AttributeError: if model_data.var_name_aerocom == var_name: model_data.register_var_glob(delete_existing=True) else: raise ValueError( f"Mismatch between variable name of input model_data " f"({model_data.var_name_aerocom}) and var_name {var_name}" )
def _check_statistics_periods(periods: list) -> list: """ Check input list of period strings is valid Parameters ---------- periods : list list containing period strings to be checked. Raises ------ ValueError if input is not a list or any of the provided periods in that list is not a string or invalid. Returns ------- list list of periods """ checked = [] if not isinstance(periods, list): raise ValueError("statistics_periods needs to be a list") for per in periods: if not isinstance(per, str): raise ValueError("All periods need to be strings") spl = [x.strip() for x in per.split("-")] if len(spl) > 2: raise ValueError( f"Invalid value for period ({per}), can be either single " f"years or period of years (e.g. 2000-2010)." ) _per = "-".join([str(int(val)) for val in spl]) checked.append(_per) return checked def _period_str_to_timeslice(period: str) -> slice: """ Convert input period to a time slice Parameters ---------- period : str period, e.g. "2000-2010" Raises ------ ValueError if input period is invalid Returns ------- slice slice containing start and end strings. """ spl = period.split("-") if len(spl) == 1: return slice(spl[0], spl[0]) elif len(spl) == 2: return slice(*spl) raise ValueError(period) def _get_min_max_year_periods(statistics_periods): """Get lowest and highest available year from all periods Parameters ---------- statistics_periods : list list of periods for experiment Returns ------- int start year int stop year (may be the same as start year, e.g. if periods suggest single year analysis). """ startyr, stopyr = 1e6, -1e6 for per in statistics_periods: sl = _period_str_to_timeslice(per) perstart, perstop = int(sl.start), int(sl.stop) if perstart < startyr: startyr = perstart if perstop > stopyr: stopyr = perstop return startyr, stopyr def make_dummy_model(obs_list: list, cfg) -> str: # Sets up variable for the model register tmpdir = const.LOCAL_TMP_DIR const.add_data_search_dir(tmpdir) model_id = "dummy_model" outdir = os.path.join(tmpdir, f"{model_id}/renamed") os.makedirs(outdir, exist_ok=True) # Finds dates and freq to use, so that all observations are covered (start, stop) = get_max_period_range(cfg.time_cfg.periods) freq = get_highest_resolution(*cfg.time_cfg.freqs) tmp_var_obj = Variable() # Loops over variables in obs for obs in obs_list: for var in cfg.obs_cfg[obs]["obs_vars"]: # Create dummy cube dummy_cube = make_dummy_cube(var, start_yr=start, stop_yr=stop, freq=freq) # Converts cube to GriddedData dummy_grid = GriddedData(dummy_cube) # Set the value to be the mean of acceptable values to prevent incorrect outlier removal # This needs some care though because the defaults are (currently) -inf and inf, which leads to erroneous removal if not ( dummy_grid.var_info.minimum == tmp_var_obj.VMIN_DEFAULT or dummy_grid.var_info.maximum == tmp_var_obj.VMAX_DEFAULT ): dummy_grid.data *= (dummy_grid.var_info.minimum + dummy_grid.var_info.maximum) / 2 # Loop over each year yr_gen = dummy_grid.split_years() for dummy_grid_yr in yr_gen: # Add to netcdf yr = dummy_grid_yr.years_avail()[0] vert_code = cfg.obs_cfg[obs]["obs_vert_type"] save_name = dummy_grid_yr.aerocom_savename(model_id, var, vert_code, yr, freq) dummy_grid_yr.to_netcdf(outdir, savename=save_name) # Add dummy model to cfg cfg.model_cfg["dummy"] = ModelEntry(model_id="dummy_model") return model_id def delete_dummy_model(model_id: str) -> None: tmpdir = const.LOCAL_TMP_DIR const.add_data_search_dir(tmpdir) renamed = Path(tmpdir) / f"{model_id}/renamed" for path in renamed.glob("*.nc"): print(f"Deleting dummy model {path}") path.unlink()