Source code for pyaerocom.colocation.colocator

"""
Classes and methods to perform high-level colocation.
"""

import glob
import logging
import os
import traceback
import warnings
from collections import defaultdict
from collections.abc import Callable
from datetime import datetime
from typing import Any

import pandas as pd
from cf_units import Unit

from pyaerocom import const
from pyaerocom._lowlevel_helpers import chk_make_subdir
from pyaerocom.colocation.colocation_utils import (
    colocate_gridded_gridded,
    colocate_gridded_ungridded,
    correct_model_stp_coldata,
)
from pyaerocom.exceptions import (
    ColocationError,
    ColocationSetupError,
    DataCoverageError,
)
from pyaerocom.griddeddata import GriddedData
from pyaerocom.helpers import (
    get_lowest_resolution,
    start_stop,
    to_datestring_YYYYMMDD,
    to_pandas_timestamp,
)
from pyaerocom.io import ReadCAMS2_83, ReadGridded, ReadUngridded
from pyaerocom.io.helpers import get_all_supported_ids_ungridded
from pyaerocom.io.mscw_ctm.reader import ReadMscwCtm
from pyaerocom.stats.mda8.const import MDA8_INPUT_VARS
from pyaerocom.stats.mda8.mda8 import mda8_colocated_data
from pyaerocom.ungriddeddata import UngriddedData

from .colocated_data import ColocatedData
from .colocation_3d import ColocatedDataLists, colocate_vertical_profile_gridded
from .colocation_setup import ColocationSetup

logger = logging.getLogger(__name__)


[docs] class Colocator: """High level class for running co-location Note ---- This object requires and instance from :class:`ColocationSetup`. """ SUPPORTED_GRIDDED_READERS: dict = { "ReadGridded": ReadGridded, "ReadMscwCtm": ReadMscwCtm, "ReadCAMS2_83": ReadCAMS2_83, } MODELS_WITH_KWARGS = [ReadMscwCtm] STATUS_CODES: dict[int, str] = { 1: "SUCCESS", 2: "NOT OK: Missing/invalid model variable", 3: "NOT OK: Missing/invalid obs variable", 4: "NOT OK: Failed to read model variable", 5: "NOT OK: Colocation failed", } def __init__(self, colocation_setup: ColocationSetup | dict, **kwargs): if not colocation_setup: raise ValueError( "An instance ColocationSetup or a dict must be provided to Colocator." ) if not isinstance(colocation_setup, ColocationSetup): colocation_setup = ColocationSetup(**colocation_setup) warnings.warn( DeprecationWarning( "Future versions of Pyaerocom will require Colocator to injest an instance of ColocationSetup." ) ) self.colocation_setup = colocation_setup self._log: Callable | None = None self.logging: bool = True self._loaded_model_data: dict | None = {} self.data: dict = {} self._processing_status: list[str] = [] self.files_written: list[str] = [] self._model_reader: ReadGridded | ReadMscwCtm | ReadCAMS2_83 | None = None self._obs_reader: Any | None = None self._obs_is_vertical_profile: bool = False self.obs_filters: dict = colocation_setup.obs_filters.copy() @property def model_vars(self): """ List of all model variables specified in config Note ---- This method does not check if the variables are valid or available. Returns ------- list list of all model variables specified in this setup. """ ovars = self.colocation_setup.obs_vars model_vars = [] for ovar in ovars: if ovar in self.colocation_setup.model_add_vars: model_vars.append(self.colocation_setup.model_add_vars[ovar]) else: model_vars.append(ovar) for ovar, mvars in self.colocation_setup.model_add_vars.items(): if ovar not in ovars: logger.warning( f"Found entry in model_add_vars for obsvar {ovar} which " f"is not specified in attr obs_vars, and will thus be " f"ignored" ) model_vars += mvars return model_vars @property def obs_is_ungridded(self): """ bool: True if obs_id refers to an ungridded observation, else False """ if self.colocation_setup.pyaro_config is not None: return True return True if self.colocation_setup.obs_id in get_all_supported_ids_ungridded() else False @property def obs_is_vertical_profile(self): """ bool: True if obs_id refers to a VerticalProfile, else False """ return self._obs_is_vertical_profile @obs_is_vertical_profile.setter def obs_is_vertical_profile(self, value): self._obs_is_vertical_profile = value @property def model_reader(self): """ Model data reader """ if self._model_reader is not None: if self._model_reader.data_id == self.colocation_setup.model_id: return self._model_reader logger.info( f"Reloading outdated model reader. ID of current reader: " f"{self._model_reader.data_id}. New ID: {self.colocation_setup.model_id}" ) self._model_reader = self._instantiate_gridded_reader(what="model") self._loaded_model_data = {} return self._model_reader def _check_data_id_obs_reader(self): """ Check if obs_reader is instantiated with correct obs ID. Returns ------- bool True if current obs reader can be used for obs reading, else False. """ reader = self._obs_reader if reader is None: return False elif self.obs_is_ungridded and self.colocation_setup.obs_id in reader.data_ids: return True elif self.colocation_setup.obs_id == reader.data_id: return True @property def obs_reader(self): """ Observation data reader """ if not self._check_data_id_obs_reader(): if self.obs_is_ungridded: self._obs_reader = ReadUngridded( data_ids=[self.colocation_setup.obs_id], data_dirs=self.colocation_setup.obs_data_dir, configs=[ self.colocation_setup.pyaro_config, ], ) else: self._obs_reader = self._instantiate_gridded_reader(what="obs") return self._obs_reader @property def output_dir(self): """ str: Output directory for colocated data NetCDF files """ loc = os.path.join(self.colocation_setup.basedir_coldata, self.get_model_name()) if not os.path.exists(loc): logger.info(f"Creating dir {loc}") os.mkdir(loc) return loc @property def processing_status(self): head = ["Model Var", "Obs Var", "Status"] tab = [] for mvar, ovar, key in self._processing_status: tab.append([mvar, ovar, self.STATUS_CODES[key]]) return pd.DataFrame(tab, columns=head)
[docs] def get_model_name(self): """ Get name of model Note ---- Not to be confused with :attr:`model_id` which is always the database ID of the model, while model_name can differ from that and is used for output files, etc. Raises ------ AttributeError If neither model_id or model_name are set Returns ------- str preferably :attr:`model_name`, else :attr:`model_id` """ if self.colocation_setup.model_name is None: if self.colocation_setup.model_id is None: raise AttributeError( "Neither model_name nor model_id are set. These must be set in ColocationSetup." ) return self.colocation_setup.model_id return self.colocation_setup.model_name
[docs] def get_obs_name(self): """ Get name of obsdata source Note ---- Not to be confused with :attr:`obs_id` which is always the database ID of the observation dataset, while obs_name can differ from that and is used for output files, etc. Raises ------ AttributeError If neither obs_id or obs_name are set Returns ------- str preferably :attr:`obs_name`, else :attr:`obs_id` """ if self.colocation_setup.obs_name is None: if self.colocation_setup.obs_id is None: raise AttributeError("Neither obs_name nor obs_id are set") return self.colocation_setup.obs_id return self.colocation_setup.obs_name
def get_model_data(self, model_var: str): if model_var in self._loaded_model_data: mdata = self._loaded_model_data[model_var] if mdata.data_id == self.colocation_setup.model_id: return mdata self._check_add_model_read_aux(model_var) mdata = self._read_gridded(var_name=model_var, is_model=True) self._loaded_model_data[model_var] = mdata return mdata def get_obs_data(self, obs_var: str): if self.obs_is_ungridded: return self._read_ungridded(obs_var) else: return self._read_gridded(obs_var, is_model=False) def get_start_str(self): if self.start is None: raise AttributeError("start time is not set") return to_datestring_YYYYMMDD(to_pandas_timestamp(self.start)) def get_stop_str(self): if self.stop is None: raise AttributeError("stop time is not set") return to_datestring_YYYYMMDD(to_pandas_timestamp(self.stop))
[docs] def prepare_run(self, var_list: list[str] | None = None) -> dict: """ Prepare colocation run for current setup. Parameters ---------- var_name : str, optional Variable name that is supposed to be analysed. The default is None, in which case all defined variables are attempted to be colocated. Raises ------ AttributeError If no observation variables are defined (:attr:`obs_vars` empty). Returns ------- vars_to_process : dict Mapping of variables to be processed, keys are model vars, values are obs vars. """ try: self._init_log() except Exception: logger.warning("Deactivating logging in Colocator") self.logging = False if isinstance(self.colocation_setup.obs_vars, str): self.colocation_setup.obs_vars = (self.colocation_setup.obs_vars,) self._check_obs_vars_available() self._check_obs_filters() self._check_model_add_vars() self._check_set_start_stop() vars_to_process = self._find_var_matches() if var_list is not None: vars_to_process = self._filter_var_matches_varlist(vars_to_process, var_list) (vars_to_process, ts_types) = self._check_load_model_data(vars_to_process) if self.colocation_setup.save_coldata and not self.colocation_setup.reanalyse_existing: vars_to_process = self._filter_var_matches_files_not_exist(vars_to_process, ts_types) return vars_to_process
[docs] def run(self, var_list: list[str] | None = None): """Perform colocation for current setup See also :func:`prepare_run`. Parameters ---------- var_list : list, optional list of variables supposed to be analysed. The default is None, in which case all defined variables are attempted to be colocated. Returns ------- dict nested dictionary, where keys are model variables, values are dictionaries comprising key / value pairs of obs variables and associated instances of :class:`ColocatedData`. """ # MDA8 is a daily value so it doesn't make sense to calculate it if # no frequency is daily or coarser. calc_mda8 = False if self.colocation_setup.main_freq in ["daily", "monthly", "yearly"]: calc_mda8 = True if any(x in ["daily", "monthly", "yearly"] for x in self.colocation_setup.freqs): calc_mda8 = True data_out = defaultdict(lambda: dict()) # TODO: see if the following could be solved via custom context manager try: vars_to_process = self.prepare_run(var_list) except Exception as ex: logger.exception(ex) if self.colocation_setup.raise_exceptions: self._print_processing_status() self._write_log(f"ABORTED: raise_exceptions is True: {traceback.format_exc()}\n") self._close_log() raise ex vars_to_process = {} self._print_coloc_info(vars_to_process) for mod_var, obs_var in vars_to_process.items(): try: coldata = self._run_helper( mod_var, obs_var ) # note this can be ColocatedData or ColocatedDataLists data_out[mod_var][obs_var] = coldata if calc_mda8 and (obs_var in MDA8_INPUT_VARS): try: mda8 = mda8_colocated_data( coldata, obs_var=f"{obs_var}mda8", mod_var=f"{mod_var}mda8" ) except ValueError as e: logger.debug(e) else: self._save_coldata(mda8) logger.info( "Successfully calculated mda8 for [%s, %s].", obs_var, mod_var, ) data_out[f"{mod_var}mda8"][f"{obs_var}mda8"] = mda8 self._processing_status.append([mod_var, obs_var, 1]) except Exception: msg = f"Failed to perform analysis: {traceback.format_exc()}\n" logger.warning(msg) self._processing_status.append([mod_var, obs_var, 5]) self._write_log(msg) if self.colocation_setup.raise_exceptions: self._print_processing_status() self._write_log("ABORTED: raise_exceptions is True\n") self._close_log() raise ColocationError(traceback.format_exc()) self._write_log("Colocation finished") self._close_log() self._print_processing_status() if self.colocation_setup.keep_data: self.data = data_out return dict(data_out)
[docs] def get_nc_files_in_coldatadir(self): """ Get list of NetCDF files in colocated data directory Returns ------- list list of NetCDF file paths found """ mask = f"{self.output_dir}/*.nc" return glob.glob(mask)
def get_available_coldata_files(self, var_list: list = None) -> list: self._check_set_start_stop() def check_meta_match(meta, **kwargs): for key, val in kwargs.items(): if not meta[key] == val: return False return True mname = self.get_model_name() oname = self.get_obs_name() model_vars = self.model_vars obs_vars = self.colocation_setup.obs_vars start, stop = self.get_start_str(), self.get_stop_str() valid = [] all_files = self.get_nc_files_in_coldatadir() for file in all_files: try: meta = ColocatedData.get_meta_from_filename(file) except Exception: continue candidate = check_meta_match( meta, model_name=mname, obs_name=oname, start=start, stop=stop ) if candidate and meta["model_var"] in model_vars and meta["obs_var"] in obs_vars: if var_list is None: ok = True else: ok = False for var in var_list: if meta["model_var"] == var or meta["obs_var"] == var: ok = True break if ok: valid.append(file) return valid def _filter_var_matches_varlist(self, vars_to_process, var_list) -> dict: _vars_to_process = {} if isinstance(var_list, str): var_list = [var_list] for var_name in var_list: _subset = self._filter_var_matches_var_name(vars_to_process, var_name) _vars_to_process.update(**_subset) return _vars_to_process def _read_ungridded(self, var_name): """Helper to read UngriddedData Note ---- reading is restricted to single variable UngriddedData objects here, due to multiple possibilities for variable specific obs filter definitions and because the colocation is done sequentially for each variable. Parameters ---------- vars_to_read : str or list, optional variables that should be read from obs-network (:attr:`obs_id`) Returns ------- UngriddedData loaded data object """ obs_reader = self.obs_reader obs_filters_post = self._eval_obs_filters(var_name) obs_data = obs_reader.read( data_ids=[self.colocation_setup.obs_id], vars_to_retrieve=var_name, only_cached=self.colocation_setup.obs_cache_only, filter_post=obs_filters_post, **self.colocation_setup.read_opts_ungridded, ) if self.colocation_setup.obs_remove_outliers: oor = self.colocation_setup.obs_outlier_ranges if var_name in oor: low, high = oor[var_name] else: var_info = const.VARS[var_name] low, high = var_info.minimum, var_info.maximum obs_data.remove_outliers( var_name, low=low, high=high, inplace=True, move_to_trash=False ) return obs_data def _check_obs_filters(self): obs_vars = self.colocation_setup.obs_vars if any([x in self.colocation_setup.obs_filters for x in obs_vars]): # variable specific obs_filters for ovar in obs_vars: if ovar not in self.colocation_setup.obs_filters: self.obs_filters[ovar] = {} def _check_load_model_data(self, var_matches): """ Try to preload modeldata for input variable matches Note ---- This will load all model fields for each obs variable in lazy mode, so should not require much storage. T Parameters ---------- var_matches : dict dictionary specifying model / obs var pairs for colocation. Raises ------ ColocationError If :attr:`raise_exceptions` is True and if one of the input model variables cannot be loaded. Returns ------- filtered : dict `var_matches` filtered by entries for which model data could be successfully read. ts_types : dict data frequencies for each model variable that could be read. Those depend also on settings for :attr:`flex_ts_type` """ filtered, ts_types = {}, {} for mvar, ovar in var_matches.items(): try: mdata = self.get_model_data(mvar) filtered[mvar] = ovar ts_types[mvar] = mdata.ts_type except Exception as e: msg = f"Failed to load model data: {self.colocation_setup.model_id} ({mvar}). Reason {e}" logger.warning(msg) self._write_log(msg + "\n") self._processing_status.append([mvar, ovar, 4]) if self.colocation_setup.raise_exceptions: raise ColocationError(msg) return filtered, ts_types def _filter_var_matches_files_not_exist(self, var_matches, ts_types): filtered = {} for mvar, ovar in var_matches.items(): ts_type = ts_types[mvar] fname = self._coldata_savename(ovar, mvar, ts_type) fp = os.path.join(self.output_dir, fname) if not os.path.exists(fp): filtered[mvar] = ovar return filtered def _check_model_add_vars(self): for ovar, mvars in self.colocation_setup.model_add_vars.items(): if not isinstance(mvars, list | tuple): raise ValueError("Values of model_add_vars need to be list or tuple") elif not all([isinstance(x, str) for x in mvars]): raise ValueError("Values of model_add_vars need to be list of strings") def _instantiate_gridded_reader(self, what): """ Create reader for model or observational gridded data. Parameters ---------- what : str Type of reader. ("model" or "obs") Returns ------- Instance of reader class defined in self.SUPPORTED_GRIDDED_READERS """ if what == "model": data_id = self.colocation_setup.model_id data_dir = self.colocation_setup.model_data_dir else: data_id = self.colocation_setup.obs_id data_dir = self.colocation_setup.obs_data_dir reader_class = self._get_gridded_reader_class(what=what) if what == "model" and reader_class in self.MODELS_WITH_KWARGS: reader = reader_class( data_id=data_id, data_dir=data_dir, **self.colocation_setup.model_kwargs ) else: reader = reader_class(data_id=data_id, data_dir=data_dir) return reader def _get_gridded_reader_class(self, what): """Returns the class of the reader for gridded data.""" try: reader = self.SUPPORTED_GRIDDED_READERS[self.colocation_setup.gridded_reader_id[what]] except KeyError as e: raise NotImplementedError( f"Reader {self.colocation_setup.gridded_reader_id[what]} is not supported: {e}" ) return reader def _check_add_model_read_aux(self, model_var): if model_var not in self.colocation_setup.model_read_aux: return False info = self.colocation_setup.model_read_aux[model_var] if not isinstance(info, dict): raise ValueError( f"Invalid value for model_read_aux of variable {model_var}. " f"Need dictionary, got {info}" ) elif not all([x in info for x in ["vars_required", "fun"]]): raise ValueError( f"Invalid value for model_read_aux dict of variable {model_var}. " f"Require keys vars_required and fun in dict, got {info}" ) try: self.model_reader.add_aux_compute(var_name=model_var, **info) except DataCoverageError: return False return True def _check_obs_vars_available(self): if not len(self.colocation_setup.obs_vars) > 0: raise ColocationSetupError("no observation variables specified...") oreader = self.obs_reader if self.obs_is_ungridded: avail = oreader.get_vars_supported( self.colocation_setup.obs_id, self.colocation_setup.obs_vars ) else: avail = [] for ovar in self.colocation_setup.obs_vars: if oreader.has_var(ovar): avail.append(ovar) if len(self.colocation_setup.obs_vars) > len(avail): for ovar in self.colocation_setup.obs_vars: if ovar not in avail: logger.warning( f"Obs variable {ovar} is not available in {self.colocation_setup.obs_id} " f"and will be ignored" ) self._processing_status.append([None, ovar, 3]) if self.colocation_setup.raise_exceptions: invalid = [var for var in self.colocation_setup.obs_vars if var not in avail] invalid = "; ".join(invalid) raise DataCoverageError( f"Invalid obs var(s) for {self.colocation_setup.obs_id}: {invalid}" ) self.obs_vars = avail def _print_processing_status(self): mname = self.get_model_name() oname = self.get_obs_name() logger.info(f"Colocation processing status for {mname} vs. {oname}") logger.info(self.processing_status) def _filter_var_matches_var_name(self, var_matches, var_name): filtered = {} for mvar, ovar in var_matches.items(): if mvar == var_name or ovar == var_name: filtered[mvar] = ovar if len(filtered) == 0: raise DataCoverageError(var_name) return filtered def _find_var_matches(self): """Find variable matches in model data for input obs variables""" # dictionary that will map model variables (keys) with observation # variables (values) var_matches = {} all_ok = True muv = self.colocation_setup.model_use_vars modreader = self.model_reader for ovar in self.colocation_setup.obs_vars: if ovar in muv: mvar = muv[ovar] else: mvar = ovar self._check_add_model_read_aux(mvar) if modreader.has_var(mvar): var_matches[mvar] = ovar else: self._processing_status.append([mvar, ovar, 2]) all_ok = False if ovar in self.colocation_setup.model_add_vars: # observation variable addvars = self.colocation_setup.model_add_vars[ovar] for addvar in addvars: self._check_add_model_read_aux(addvar) if modreader.has_var(addvar): var_matches[addvar] = ovar else: self._processing_status.append([addvar, ovar, 2]) all_ok = False if not all_ok and self.colocation_setup.raise_exceptions: raise DataCoverageError("Some model variables are not available") return var_matches def _get_ts_type_read(self, var_name, is_model): """ Get *desired* reading frequency for gridded reading Parameters ---------- var_name : str Name of variable to be read. is_model : bool True if reading refers to model reading, else False (e.g. gridded satellite obs). Raises ------ ValueError Returns ------- str or None frequency to be read. """ tst = self.colocation_setup.ts_type # default if is_model and self.colocation_setup.model_ts_type_read is not None: tst = self.colocation_setup.model_ts_type_read if tst == "": tst = self.colocation_setup.ts_type elif not is_model and self.colocation_setup.obs_ts_type_read is not None: tst = self.obs_ts_type_read if isinstance(tst, dict): if var_name in tst: tst = tst[var_name] else: tst = self.colocation_setup.ts_type return tst def _read_gridded(self, var_name, is_model): start, stop = self.colocation_setup.start, self.colocation_setup.stop ts_type_read = self._get_ts_type_read(var_name, is_model) kwargs = {} if is_model: reader = self.model_reader vert_which = self.colocation_setup.obs_vert_type kwargs.update(**self.colocation_setup.model_kwargs) if self.colocation_setup.model_use_climatology: # overwrite start and stop to read climatology file for model start, stop = 9999, None if var_name in self.colocation_setup.model_read_opts: kwargs.update(self.colocation_setup.model_read_opts[var_name]) else: reader = self.obs_reader vert_which = None ts_type_read = self.colocation_setup.obs_ts_type_read kwargs.update(self._eval_obs_filters(var_name)) try: data = reader.read_var( var_name, start=start, stop=stop, ts_type=ts_type_read, vert_which=vert_which, flex_ts_type=self.colocation_setup.flex_ts_type, **kwargs, ) except DataCoverageError: vert_which_alt = self._try_get_vert_which_alt(is_model, var_name) data = reader.read_var( var_name, start=start, stop=stop, ts_type=ts_type_read, flex_ts_type=self.colocation_setup.flex_ts_type, vert_which=vert_which_alt, ) data = self._check_remove_outliers_gridded(data, var_name, is_model) return data def _try_get_vert_which_alt(self, is_model, var_name): if is_model: if self.colocation_setup.obs_vert_type in self.colocation_setup.OBS_VERT_TYPES_ALT: return self.colocation_setup.OBS_VERT_TYPES_ALT[ self.colocation_setup.obs_vert_type ] raise DataCoverageError(f"No alternative vert type found for {var_name}") def _check_remove_outliers_gridded(self, data: GriddedData, var_name: str, is_model: bool): if is_model: rm_outliers = self.colocation_setup.model_remove_outliers outlier_ranges = self.colocation_setup.model_outlier_ranges else: rm_outliers = self.colocation_setup.obs_remove_outliers outlier_ranges = self.colocation_setup.obs_outlier_ranges if len(outlier_ranges) > 0 and not rm_outliers: logger.warning( f"WARNING: Found definition of outlier ranges for {var_name} " f"({data.data_id}) but outlier removal is deactivated. Consider " f"checking your setup (note: model or obs outlier removal can be " f"activated via attrs. model_remove_outliers and remove_outliers, " f"respectively" ) if rm_outliers: if var_name in outlier_ranges: low, high = outlier_ranges[var_name] else: var_info = const.VARS[var_name] low, high = var_info.minimum, var_info.maximum data.check_unit() data.remove_outliers(low, high, inplace=True) return data def _eval_obs_filters(self, var_name: str): obs_filters = self.obs_filters if var_name in obs_filters: # return obs_filters[var_name] obs_filters = obs_filters[var_name] if not isinstance(obs_filters, dict): raise AttributeError( f"Detected obs_filters attribute in Colocator class, " f"which is not a dictionary: {obs_filters}" ) return obs_filters if len(obs_filters) > 0 else {} def _save_coldata(self, coldata: ColocatedData): """Helper for saving colocateddata""" obs_var, mod_var = coldata.metadata["var_name_input"] if mod_var in self.colocation_setup.model_rename_vars: mvar = self.colocation_setup.model_rename_vars[mod_var] logger.info( f"Renaming model variable from {mod_var} to {mvar} in " f"ColocatedData before saving to NetCDF." ) coldata.rename_variable(mod_var, mvar, self.colocation_setup.model_id) else: mvar = mod_var if hasattr(coldata, "vertical_layer"): # save colocated vertical layer netCDF files with vertical layers in km if not Unit(coldata.data.altitude_units) == Unit("km"): start = Unit(coldata.data.altitude_units).convert( coldata.vertical_layer["start"], other="km" ) end = Unit(coldata.data.altitude_units).convert( coldata.vertical_layer["end"], other="km" ) vertical_layer = {"start": start, "end": end} else: vertical_layer = coldata.vertical_layer savename = self._coldata_savename( obs_var, mvar, coldata.ts_type, vertical_layer=vertical_layer ) else: savename = self._coldata_savename(obs_var, mvar, coldata.ts_type) fp = coldata.to_netcdf(self.output_dir, savename=savename) self.files_written.append(fp) msg = f"WRITE: {fp}\n" self._write_log(msg) logger.info(msg) def _eval_resample_how(self, model_var: str, obs_var: str): rshow = self.colocation_setup.resample_how if not isinstance(rshow, dict): return rshow if obs_var in rshow: return rshow[obs_var] elif model_var in rshow: return rshow[model_var] else: return None def _infer_start_stop_yr_from_model_reader(self): """ Infer start / stop year for colocation from gridded model reader Sets :attr:`start` and :attr:`stop` """ # get sorted list of years available in model data (files with year # 9999 denote climatological data) yrs_avail = self.model_reader.years_avail if self.colocation_setup.model_use_climatology: if 9999 not in yrs_avail: raise DataCoverageError("No climatology files available") first, last = 9999, None else: if 9999 in yrs_avail: yrs_avail = [x for x in yrs_avail if not x == 9999] first, last = yrs_avail[0], yrs_avail[-1] if first == last: last = None self.start = first self.stop = last def _check_set_start_stop(self): if self.colocation_setup.start is None: self._infer_start_stop_yr_from_model_reader() start, stop = self.start, self.stop else: start, stop = self.colocation_setup.start, self.colocation_setup.stop if self.colocation_setup.model_use_climatology: if self.colocation_setup.stop is not None or not isinstance( self.colocation_setup.start, int ): raise ColocationSetupError( "Conflict: only single year analyses are support for model " 'climatology fields, please specify "start" as integer ' 'denoting the year, and set "stop"=None' ) self.start, self.stop = start_stop(start, stop) def _coldata_savename(self, obs_var, mod_var, ts_type, **kwargs): """Get filename of colocated data file for saving""" if "vertical_layer" in kwargs: vertical_layer = kwargs["vertical_layer"] else: vertical_layer = None name = ColocatedData._aerocom_savename( obs_var=obs_var, obs_id=self.get_obs_name(), mod_var=mod_var, mod_id=self.get_model_name(), start_str=self.get_start_str(), stop_str=self.get_stop_str(), ts_type=ts_type, filter_name=self.colocation_setup.filter_name, vertical_layer=vertical_layer, ) return f"{name}.nc" def _get_colocation_ts_type(self, model_ts_type, obs_ts_type=None): chk = [self.colocation_setup.ts_type, model_ts_type] if obs_ts_type is not None: chk.append(obs_ts_type) return get_lowest_resolution(*chk) @property def _colocation_func(self): """ Function used for colocation Returns ------- callable function the performs co-location operation """ if self.obs_is_vertical_profile: return colocate_vertical_profile_gridded if self.obs_is_ungridded: return colocate_gridded_ungridded else: return colocate_gridded_gridded def _prepare_colocation_args(self, model_var: str, obs_var: str): model_data = self.get_model_data(model_var) obs_data = self.get_obs_data(obs_var) if getattr(obs_data, "is_vertical_profile", None): self.obs_is_vertical_profile = obs_data.is_vertical_profile rshow = self._eval_resample_how(model_var, obs_var) if self.colocation_setup.model_use_climatology: baseyr = self.start.year else: baseyr = None # input args shared between all colocation functions args = dict( data=model_data, data_ref=obs_data, start=self.start, stop=self.stop, filter_name=self.colocation_setup.filter_name, regrid_res_deg=self.colocation_setup.regrid_res_deg, harmonise_units=self.colocation_setup.harmonise_units, update_baseyear_gridded=baseyr, min_num_obs=self.colocation_setup.min_num_obs, colocate_time=self.colocation_setup.colocate_time, resample_how=rshow, add_meta_keys=[], ) # check if the station_type key has been passed to the ungridded data object # (not all readers may do that, currently only the CAMS2_83 reader does) if isinstance(obs_data, UngriddedData): are_there_station_types = { "station_type" in dict for dict in obs_data.metadata.values() } if are_there_station_types == {True, False}: raise ValueError("some stations have `station_type` metadata while others do not") if are_there_station_types == {True}: # all have station_type args.update(add_meta_keys=["station_type"]) if self.obs_is_ungridded: ts_type = self._get_colocation_ts_type(model_data.ts_type) args.update( ts_type=ts_type, var_ref=obs_var, use_climatology_ref=self.colocation_setup.obs_use_climatology, ) else: ts_type = self._get_colocation_ts_type(model_data.ts_type, obs_data.ts_type) args.update(ts_type=ts_type) if self.obs_is_vertical_profile: args.update( colocation_layer_limits=self.colocation_setup.colocation_layer_limits, profile_layer_limits=self.colocation_setup.profile_layer_limits, ) return args def _check_dimensionality(self, args): mdata = args["data"] odata = args["data_ref"] from pyaerocom.exceptions import DataDimensionError from pyaerocom.griddeddata import GriddedData if mdata.ndim == 4 and self.colocation_setup.obs_vert_type == "Surface": mdata = mdata.extract_surface_level() args["data"] = mdata if isinstance(odata, GriddedData): if odata.ndim == 4 and self.colocation_setup.obs_vert_type == "Surface": odata = odata.extract_surface_level() args["data_ref"] = odata elif odata.ndim > 3: raise DataDimensionError( f"cannot co-locate model data with more than 3 dimensions: {odata}" ) return args def _run_helper(self, model_var: str, obs_var: str): logger.info( f"Running {self.colocation_setup.model_id} ({model_var}) vs. {self.colocation_setup.obs_id} ({obs_var})" ) args = self._prepare_colocation_args(model_var, obs_var) args = self._check_dimensionality(args) coldata = self._colocation_func(**args) if isinstance(coldata, ColocatedData): coldata.data.attrs["model_name"] = self.get_model_name() coldata.data.attrs["obs_name"] = self.get_obs_name() coldata.data.attrs["vert_code"] = self.colocation_setup.obs_vert_type coldata.data.attrs.update(**self.colocation_setup.add_meta) if self.colocation_setup.zeros_to_nan: coldata = coldata.set_zeros_nan() if self.colocation_setup.model_to_stp: coldata = correct_model_stp_coldata(coldata) if self.colocation_setup.save_coldata: self._save_coldata(coldata) elif isinstance(coldata, ColocatedDataLists): # look into intertools chain.from_iterable for i_list in coldata: for coldata_obj in i_list: coldata_obj.data.attrs["model_name"] = self.get_model_name() coldata_obj.data.attrs["obs_name"] = self.get_obs_name() coldata_obj.data.attrs["vert_code"] = self.colocation_setup.obs_vert_type coldata_obj.data.attrs.update(**self.colocation_setup.add_meta) if self.colocation_setup.zeros_to_nan: coldata_obj = coldata_obj.set_zeros_nan() if self.colocation_setup.model_to_stp: # TODO: check is this needs modifying coldata = correct_model_stp_coldata(coldata_obj) if self.colocation_setup.save_coldata: self._save_coldata(coldata_obj) else: raise Exception( f"Invalid coldata type returned by colocation function {self._colocation_func}" ) return coldata def _print_coloc_info(self, var_matches): if not var_matches: logger.info("Nothing to colocate") return logger.info("The following variable combinations will be colocated\nMODEL-VAR\tOBS-VAR") for key, val in var_matches.items(): logger.info(f"{key}\t{val}") def _init_log(self): logdir = chk_make_subdir(self.colocation_setup.basedir_coldata, self.get_model_name()) oname = self.get_obs_name() datestr = datetime.today().strftime("%Y%m%d") datetimestr = datetime.today().strftime("%d-%m-%Y %H:%M") fname = f"{oname}_{datestr}.log" logfile = os.path.join(logdir, fname) self._log = log = open(logfile, "a+") log.write("\n------------------ NEW ----------------\n") log.write(f"Timestamp: {datetimestr}\n\n") log.write("Analysis configuration\n") ignore = ["_log", "logging", "data", "_model_reader", "_obs_reader"] for key, val in self.model_dump().items(): if key in ignore: continue log.write(f"{key}: {val}\n") def _write_log(self, msg): if self.logging: self._log.write(msg) def _close_log(self): if self._log is not None: self._log.close() self._log = None