Source code for pyaerocom.aeroval.helpers

import logging
import os
from pathlib import Path

from pydantic import BaseModel

from pyaerocom import const
from pyaerocom.aeroval.modelentry import ModelEntry
from pyaerocom.griddeddata import GriddedData
from pyaerocom.helpers import (
    get_highest_resolution,
    get_max_period_range,
    make_dummy_cube,
    start_stop,
    to_pandas_timestamp,
)
from pyaerocom.variable import Variable

logger = logging.getLogger(__name__)


def _check_statistics_periods(periods: list) -> list:
    """
    Check input list of period strings is valid

    Parameters
    ----------
    periods : list
        list containing period strings to be checked.

    Raises
    ------
    ValueError
        if input is not a list or any of the provided periods in that list is
        not a string or invalid.

    Returns
    -------
    list
        list of periods

    """
    checked = []
    if not isinstance(periods, list):
        raise ValueError("statistics_periods needs to be a list")
    for per in periods:
        if not isinstance(per, str):
            raise ValueError("All periods need to be strings")
        spl = [x.strip() for x in per.split("-")]
        # periods can be also dates or date ranges since cams2_83
        if len(spl) == 2:
            if len(spl[0]) != len(spl[1]):
                raise ValueError(f"{spl[0]} not on the same format as {spl[1]}")

        if len(spl) > 2:
            raise ValueError(
                f"Invalid value for period ({per}), can be either single "
                f"years/dates or range of years/dates (e.g. 2000-2010)."
            )
        years = True if len(spl[0]) == 4 else False
        if years:
            _per = "-".join([str(int(val)) for val in spl])
        else:
            # slash in the period string here is required by the aeroval web server logic
            _per = "-".join([to_pandas_timestamp(val).strftime("%Y/%m/%d") for val in spl])
        checked.append(_per)

    return checked


def _period_str_to_timeslice(period: str) -> slice:
    """
    Convert input period to a time slice

    Parameters
    ----------
    period : str
        period, e.g. "2000-2010"

    Raises
    ------
    ValueError
        if input period is invalid

    Returns
    -------
    slice
        slice containing start and end strings.
    """
    spl = period.split("-")
    if len(spl) == 1:
        return slice(spl[0], spl[0])
    elif len(spl) == 2:
        return slice(*spl)
    raise ValueError(period)


def _get_min_max_year_periods(statistics_periods):
    """Get lowest and highest available year from all periods

    Parameters
    ----------
    statistics_periods : list
        list of periods for experiment

    Returns
    -------
    pd.Timestamp
        start year
    pd.Timestamp
        stop year (may be the same as start year, e.g. if periods suggest
        single year analysis).
    """
    startyr, stopyr = start_stop("2100", "1900")
    for per in statistics_periods:
        sl = _period_str_to_timeslice(per)
        perstart, perstop = start_stop(sl.start, sl.stop)
        if perstart < startyr:
            startyr = perstart
        if perstop > stopyr:
            stopyr = perstop
    return startyr, stopyr


[docs] def check_if_year(periods: list[str]) -> bool: """ Checks if the periods in the periods list are years or dates """ years = [] for per in periods: spl = [x.strip() for x in per.split("-")] if len(spl) == 2: if len(spl[0]) != len(spl[1]): raise ValueError(f"{spl[0]} not on the same format as {spl[1]}") if len(spl) > 2: raise ValueError( f"Invalid value for period ({per}), can be either single " f"years/dates or range of years/dates (e.g. 2000-2010)." ) years.append(True if len(spl[0]) == 4 else False) if len(set(years)) != 1: raise ValueError(f"Found mix of years and dates in {periods}") return list(set(years))[0]
def make_dummy_model(obs_list: list, cfg) -> str: # Sets up variable for the model register tmpdir = const.LOCAL_TMP_DIR const.add_data_search_dir(tmpdir) model_id = "dummy_model" outdir = os.path.join(tmpdir, f"{model_id}/renamed") os.makedirs(outdir, exist_ok=True) # Finds dates and freq to use, so that all observations are covered (start, stop) = get_max_period_range(cfg.time_cfg.periods) freq = get_highest_resolution(*cfg.time_cfg.freqs) tmp_var_obj = Variable() # Loops over variables in obs for obs in obs_list: for var in cfg.obs_cfg.get_entry(obs).obs_vars: # Create dummy cube dummy_cube = make_dummy_cube(var, start_yr=start, stop_yr=stop, freq=freq) # Converts cube to GriddedData dummy_grid = GriddedData(dummy_cube) # Set the value to be the mean of acceptable values to prevent incorrect outlier removal # This needs some care though because the defaults are (currently) -inf and inf, which leads to erroneous removal if not ( dummy_grid.var_info.minimum == tmp_var_obj.VMIN_DEFAULT or dummy_grid.var_info.maximum == tmp_var_obj.VMAX_DEFAULT ): dummy_grid.data *= (dummy_grid.var_info.minimum + dummy_grid.var_info.maximum) / 2 # Loop over each year yr_gen = dummy_grid.split_years() for dummy_grid_yr in yr_gen: # Add to netcdf yr = dummy_grid_yr.years_avail()[0] vert_code = cfg.obs_cfg.get_entry(obs).obs_vert_type save_name = dummy_grid_yr.aerocom_savename(model_id, var, vert_code, yr, freq) dummy_grid_yr.to_netcdf(outdir, savename=save_name) # Add dummy model to cfg cfg.model_cfg.add_entry("dummy", ModelEntry(model_id="dummy_model")) return model_id def delete_dummy_model(model_id: str) -> None: tmpdir = const.LOCAL_TMP_DIR const.add_data_search_dir(tmpdir) renamed = Path(tmpdir) / f"{model_id}/renamed" for path in renamed.glob("*.nc"): print(f"Deleting dummy model {path}") path.unlink()
[docs] class BoundingBox(BaseModel): west: float east: float south: float north: float