Source code for pyaerocom.aeroval.modelmaps_engine

import logging
import os

from pyaerocom import GriddedData, TsType, const
from pyaerocom.aeroval._processing_base import DataImporter, ProcessingEngine
from pyaerocom.aeroval.modelmaps_helpers import (
    calc_contour_json,
    plot_overlay_pixel_maps,
    _jsdate_list,
    CONTOUR,
    OVERLAY,
)
from pyaerocom.colocation.colocator import Colocator

from pyaerocom.aeroval.varinfo_web import VarinfoWeb
from pyaerocom.exceptions import (
    DataCoverageError,
    DataDimensionError,
    DataQueryError,
    ModelVarNotAvailable,
    TemporalResolutionError,
    VariableDefinitionError,
    VarNotAvailableError,
    EntryNotAvailable,
)

logger = logging.getLogger(__name__)

MODELREADERS_USE_MAP_FREQ = ["ReadMscwCtm"]


[docs] class ModelMapsEngine(ProcessingEngine, DataImporter): """ Engine for processing of model maps """ def _get_run_kwargs(self, **kwargs): try: model_list = kwargs["model_list"] except KeyError: model_list = self.cfg.model_cfg.keylist() try: var_list = kwargs["var_list"] except Exception: var_list = None return model_list, var_list
[docs] def run(self, **kwargs): model_list, var_list = self._get_run_kwargs(**kwargs) all_files = [] for model in model_list: try: files = self._run_model(model, var_list) except VarNotAvailableError: files = [] if not files: logger.warning(f"no data for model {model}, skipping") continue all_files.extend(files) return files
def _get_vars_to_process(self, model_name, var_list): mvars = self.cfg.model_cfg.get_entry(model_name).get_vars_to_process( self.cfg.obs_cfg.get_all_vars() )[1] all_vars = sorted(list(set(mvars))) if var_list is not None: all_vars = [var for var in var_list if var in all_vars] return all_vars def _get_obs_vars_to_process(self, obs_name, var_list): ovars = self.cfg.obs_cfg.get(obs_name).get_all_vars() all_vars = sorted(list(set(ovars))) if var_list is not None: all_vars = [var for var in var_list if var in all_vars] return all_vars def _run_model(self, model_name: str, var_list): """Run evaluation of map processing Create json files for model-maps display. This analysis does not require any observation data but processes model output at all model grid points, which is then displayed on the website in the maps section. Parameters ---------- model_name : str name of model to be processed var_list : list, optional name of variable to be processed. If None, all available observation variables are used. """ try: var_list = self._get_vars_to_process(model_name, var_list) except EntryNotAvailable: var_list = self._get_obs_vars_to_process(model_name, var_list) files = [] for var in var_list: logger.info(f"Processing model maps for {model_name} ({var})") try: # pragma: no cover make_contour, make_overlay = False, False if isinstance(self.cfg.modelmaps_opts.plot_types, dict): make_contour = CONTOUR in self.cfg.modelmaps_opts.plot_types.get( model_name, False ) make_overlay = OVERLAY in self.cfg.modelmaps_opts.plot_types.get( model_name, False ) if self.cfg.modelmaps_opts.plot_types == {CONTOUR} or make_contour: _files = self._process_contour_map_var( model_name, var, self.reanalyse_existing ) files.extend(_files) if self.cfg.modelmaps_opts.plot_types == {OVERLAY} or make_overlay: # create overlay (pixel) plots _files = self._process_overlay_map_var( model_name, var, self.reanalyse_existing ) except ModelVarNotAvailable as ex: logger.warning(f"{ex}") except ( TemporalResolutionError, DataCoverageError, VariableDefinitionError, DataQueryError, ) as e: if self.raise_exceptions: raise logger.warning(f"Failed to process maps for {model_name} {var} data. Reason: {e}.") return files def _check_dimensions(self, data: GriddedData) -> "GriddedData": if not data.has_latlon_dims: raise DataDimensionError("data needs to have latitude an longitude dimension") elif not data.has_time_dim: raise DataDimensionError("data needs to have latitude an longitude dimension") if data.ndim == 4: data = data.extract_surface_level() return data def _process_contour_map_var(self, model_name, var, reanalyse_existing): # pragma: no cover """ Process model data to create map geojson files Parameters ---------- model_name : str name of model var : str name of variable reanalyse_existing : bool if True, already existing json files will be reprocessed Raises ------ ValueError If vertical code of data is invalid or not set AttributeError If the data has the incorrect number of dimensions or misses either of time, latitude or longitude dimension. ModelVarNotAvailable If model/var data cannot be read """ try: data = self._read_model_data(model_name, var) except Exception as e: raise ModelVarNotAvailable( f"Cannot read data for model {model_name} (variable {var}): {e}" ) var_ranges_defaults = self.cfg.var_scale_colmap if var in var_ranges_defaults.keys(): cmapinfo = var_ranges_defaults[var] varinfo = VarinfoWeb(var, cmap=cmapinfo["colmap"], cmap_bins=cmapinfo["scale"]) else: cmapinfo = var_ranges_defaults["default"] varinfo = VarinfoWeb(var, cmap=cmapinfo["colmap"], cmap_bins=cmapinfo["scale"]) data = self._check_dimensions(data) outdir = self.cfg.path_manager.get_json_output_dirs()["contour"] outname = f"{var}_{model_name}" fp_geojson = os.path.join(outdir, f"{outname}.geojson") if not reanalyse_existing: if os.path.exists(fp_geojson): logger.info(f"Skipping contour processing of {outname}: data already exists.") return [] freq = self._get_maps_freq() tst = TsType(data.ts_type) if tst < freq: raise TemporalResolutionError(f"need {freq} or higher, got{tst}") elif tst > freq: data = data.resample_time(str(freq)) data.check_unit() # first calcualate and save geojson with contour levels contourjson = calc_contour_json(data, cmap=varinfo.cmap, cmap_bins=varinfo.cmap_bins) with self.avdb.lock(): self.avdb.put_contour( contourjson, self.exp_output.proj_id, self.exp_output.exp_id, var, model_name, ) return fp_geojson def _process_overlay_map_var(self, model_name, var, reanalyse_existing): # pragma: no cover """Process overlay map (pixels) for either model or obserations argument model_name is a misnomer because this can also be applied to observation networks Args: model_name (str): name of model or obs to make overlay pixel maps of var (str): variable name """ try: data = self.read_gridded_obsdata(model_name, var) except EntryNotAvailable: try: data = self._read_model_data(model_name, var) except Exception as e: raise ModelVarNotAvailable( f"Cannot read data for model {model_name} (variable {var}): {e}" ) var_ranges_defaults = self.cfg.var_scale_colmap if var in var_ranges_defaults.keys(): cmapinfo = var_ranges_defaults[var] varinfo = VarinfoWeb(var, cmap=cmapinfo["colmap"], cmap_bins=cmapinfo["scale"]) else: cmapinfo = var_ranges_defaults["default"] varinfo = VarinfoWeb(var, cmap=cmapinfo["colmap"], cmap_bins=cmapinfo["scale"]) data = self._check_dimensions(data) outdir = self.cfg.path_manager.get_json_output_dirs()["contour/overlay"] freq = self._get_maps_freq() tst = TsType(data.ts_type) if tst < freq: raise TemporalResolutionError(f"need {freq} or higher, got{tst}") elif tst > freq: data = data.resample_time(str(freq)) data.check_unit() tst = _jsdate_list(data) data = data.to_xarray() for i, date in enumerate(tst): outname = f"{model_name}_{var}_{date}" # Note should matche the output location defined in aerovaldb fp_overlay = os.path.join(outdir, outname) if not reanalyse_existing: if os.path.exists(fp_overlay): logger.info(f"Skipping overlay processing of {outname}: data already exists.") continue overlay_plot = plot_overlay_pixel_maps( data[i], cmap=varinfo.cmap, cmap_bins=varinfo.cmap_bins, format=self.cfg.modelmaps_opts.overlay_save_format, ) with self.avdb.lock(): self.avdb.put_map_overlay( overlay_plot, self.exp_output.proj_id, self.exp_output.exp_id, model_name, var, date, ) def _get_maps_freq(self) -> TsType: """ Gets the maps reading frequency. If maps_freq in cfg is coarsest, it takes the coarsest of the given frequencies. Else it just returns the maps_freq Returns ------- TSType """ maps_freq = TsType(self.cfg.modelmaps_opts.maps_freq) if maps_freq == "coarsest": # TODO: Implement this in terms of a TsType object. #1267 freq = min(TsType(fq) for fq in self.cfg.time_cfg.freqs) freq = min(freq, self.cfg.time_cfg.main_freq) else: freq = maps_freq return freq def _get_read_model_freq(self, model_ts_types: list) -> TsType: """ Tries to find the best TS type to read. Checks for available ts types with the following priority 1. If the freq from _get_maps_freq is available 2. If maps_freq is explicitly given, and is available 3. Iterates through the freqs given in the config, and find the coarsest available ts type Raises ------- ValueError If no ts types are possible to read Returns ------- TSType """ wanted_freq = self._get_maps_freq() if wanted_freq in model_ts_types: return wanted_freq maps_freq = TsType(self.cfg.modelmaps_opts.maps_freq) if maps_freq != "coarsest": if maps_freq not in model_ts_types: raise ValueError( f"Could not find any model data for given maps_freq. {maps_freq} is not in {model_ts_types}" ) return maps_freq for freq in sorted(TsType(fq) for fq in self.cfg.time_cfg.freqs): if freq in model_ts_types: logger.info(f"Found coarsest maps_freq that is available as model data: {freq}") return freq raise ValueError("Could not find any TS type to read maps") def _read_model_data(self, model_name: str, var: str) -> GriddedData: """ Function for reading the model data without going through the colocation object. This means that none of the checks normally done in the colocation class are run. Parameters ---------- model_name : str name of model var : str name of variable Returns ----------- Griddeddata the read data """ start, stop = self.cfg.colocation_opts.start, self.cfg.colocation_opts.stop if self.cfg.colocation_opts.model_use_climatology: # overwrite start and stop to read climatology file for model start, stop = 9999, None data_id = self.cfg.model_cfg.get_entry(model_name).model_id try: data_dir = self.cfg.model_cfg.get_entry(model_name).model_data_dir except Exception as e: logger.info(f"Could not find model dir. Setting to None. Error {str(e)}") data_dir = None try: model_reader = self.cfg.model_cfg.get_entry(model_name).gridded_reader_id["model"] except Exception as e: logger.info(f"Could not find model reader. Setting to None. Error {str(e)}") model_reader = None if model_reader is not None: reader_class = Colocator.SUPPORTED_GRIDDED_READERS[model_reader] else: reader_class = Colocator.SUPPORTED_GRIDDED_READERS["ReadGridded"] reader = reader_class( data_id=data_id, data_dir=data_dir, **self.cfg.colocation_opts.model_kwargs, ) if var in self.cfg.model_cfg.get_entry(model_name).model_read_aux: aux_instructions = self.cfg.model_cfg.get_entry(model_name).model_read_aux[var] reader.add_aux_compute(var_name=var, **aux_instructions) kwargs = {} kwargs.update(**self.cfg.colocation_opts.model_kwargs) if var in self.cfg.colocation_opts.model_read_opts: kwargs.update(self.cfg.colocation_opts.model_read_opts[var]) kwargs.update(self.cfg.get_model_entry(model_name).get("model_kwargs", {})) if model_reader is not None and model_reader in MODELREADERS_USE_MAP_FREQ: ts_types = reader.ts_types ts_type_read = str(self._get_read_model_freq(ts_types)) else: ts_type_read = self.cfg.time_cfg.main_freq data = reader.read_var( var, start=start, stop=stop, ts_type=ts_type_read, vert_which=self.cfg.colocation_opts.obs_vert_type, flex_ts_type=self.cfg.colocation_opts.flex_ts_type, **kwargs, ) rm_outliers = self.cfg.colocation_opts.model_remove_outliers outlier_ranges = self.cfg.colocation_opts.model_outlier_ranges if rm_outliers: if var in outlier_ranges: low, high = outlier_ranges[var] else: var_info = const.VARS[var] low, high = var_info.minimum, var_info.maximum data.check_unit() data.remove_outliers(low, high, inplace=True) return data