Source code for pyaerocom.colocation_auto

"""
Classes and methods to perform high-level colocation.
"""

import glob
import logging
import os
import traceback
from datetime import datetime
from pathlib import Path
from typing import Optional

import pandas as pd
from cf_units import Unit

from pyaerocom import const
from pyaerocom._lowlevel_helpers import BrowseDict, ListOfStrings, StrWithDefault, chk_make_subdir
from pyaerocom.colocateddata import ColocatedData
from pyaerocom.colocation import (
    colocate_gridded_gridded,
    colocate_gridded_ungridded,
    correct_model_stp_coldata,
)
from pyaerocom.colocation_3d import ColocatedDataLists, colocate_vertical_profile_gridded
from pyaerocom.config import ALL_REGION_NAME
from pyaerocom.exceptions import ColocationError, ColocationSetupError, DataCoverageError
from pyaerocom.helpers import (
    get_lowest_resolution,
    start_stop,
    to_datestring_YYYYMMDD,
    to_pandas_timestamp,
)
from pyaerocom.io import ReadGridded, ReadUngridded
from pyaerocom.io.helpers import get_all_supported_ids_ungridded
from pyaerocom.io.mscw_ctm.reader import ReadMscwCtm
from pyaerocom.io.pyaro.pyaro_config import PyaroConfig

logger = logging.getLogger(__name__)


[docs] class ColocationSetup(BrowseDict): """ Setup class for high-level model / obs co-location. An instance of this setup class can be used to run a colocation analysis between a model and an observation network and will create a number of :class:`pya.ColocatedData` instances, which can be saved automatically as NetCDF files. Apart from co-location, this class also handles reading of the input data for co-location. Supported co-location options are: 1. gridded vs. ungridded data For instance 3D model data (instance of :class:`GriddedData`) with lat, lon and time dimension that is co-located with station based observations which are represented in pyaerocom through :class:`UngriddedData` objects. The co-location function used is :func:`pyaerocom.colocation.colocated_gridded_ungridded`. For this type of co-location, the output co-located data object will be 3-dimensional, with dimensions `data_source` (index 0: obs, index 1: model), `time` and `station_name`. 2. gridded vs. gridded data For instance 3D model data that is co-located with 3D satellite data (both instances of :class:`GriddedData`), both objects with lat, lon and time dimensions. The co-location function used is :func:`pyaerocom.colocation.colocated_gridded_gridded`. For this type of co-location, the output co-located data object will be 4-dimensional, with dimensions `data_source` (index 0: obs, index 1: model), `time` and `latitude` and `longitude`. Attributes ---------- model_id : str ID of model to be used. obs_config: PyaroConfig In the case Pyaro is used, a config must be provided. In that case obs_id(see below) is ignored and only the config is used. obs_id : str ID of observation network to be used. obs_vars : list Variables to be analysed (need to be available in input obs dataset). Variables that are not available in the model data output will be skipped. Alternatively, model variables to be used for a given obs variable can also be specified via attributes :attr:`model_use_vars` and :attr:`model_add_vars`. ts_type : str String specifying colocation output frequency. start Start time of colocation. Input can be integer denoting the year or anything that can be converted into :class:`pandas.Timestamp` using :func:`pyaerocom.helpers.to_pandas_timestamp`. If None, than the first available date in the model data is used. stop stop time of colocation. int or anything that can be converted into :class:`pandas.Timestamp` using :func:`pyaerocom.helpers.to_pandas_timestamp` or None. If None and if ``start`` is on resolution of year (e.g. ``start=2010``) then ``stop`` will be automatically set to the end of that year. Else, it will be set to the last available timestamp in the model data. filter_name : str name of filter to be applied. If None, no filter is used (to be precise, if None, then :attr:`pyaerocom.const.DEFAULT_REG_FILTER` is used which should default to `ALL-wMOUNTAINS`, that is, no filtering). basedir_coldata : str Base directory for storing of colocated data files. save_coldata : bool if True, colocated data objects are saved as NetCDF file. obs_name : str, optional if provided, this string will be used in colocated data filename to specify obsnetwork, else obs_id will be used. obs_data_dir : str, optional location of obs data. If None, attempt to infer obs location based on obs ID. obs_use_climatology : bool BETA if True, pyaerocom default climatology is computed from observation stations (so far only possible for unrgidded / gridded colocation). obs_vert_type : str AeroCom vertical code encoded in the model filenames (only AeroCom 3 and later). Specifies which model file should be read in case there are multiple options (e.g. surface level data can be read from a *Surface*.nc file as well as from a *ModelLevel*.nc file). If input is string (e.g. 'Surface'), then the corresponding vertical type code is used for reading of all variables that are colocated (i.e. that are specified in :attr:`obs_vars`). obs_ts_type_read : str or dict, optional may be specified to explicitly define the reading frequency of the observation data (so far, this does only apply to gridded obsdata such as satellites), either as str (same for all obs variables) or variable specific as dict. For ungridded reading, the frequency may be specified via :attr:`obs_id`, where applicable (e.g. AeronetSunV3Lev2.daily). Not to be confused with :attr:`ts_type`, which specifies the frequency used for colocation. Can be specified variable specific in form of dictionary. obs_filters : dict filters applied to the observational dataset before co-location. In case of gridded / gridded, these are filters that can be passed to :func:`pyaerocom.io.ReadGridded.read_var`, for instance, `flex_ts_type`, or `constraints`. In case the obsdata is ungridded (gridded / ungridded co-locations) these are filters that are handled through keyword `filter_post` in :func:`pyaerocom.io.ReadUngridded.read`. These filters are applied to the :class:`UngriddedData` objects after reading and caching the data, so changing them, will not invalidate the latest cache of the :class:`UngriddedData`. read_opts_ungridded : dict, optional dictionary that specifies reading constraints for ungridded reading, and are passed as `**kwargs` to :func:`pyaerocom.io.ReadUngridded.read`. Note that - other than for `obs_filters` these filters are applied during the reading of the :class:`UngriddedData` objects and specifying them will deactivate caching. model_name : str, optional if provided, this string will be used in colocated data filename to specify model, else obs_id will be used. model_data_dir : str, optional Location of model data. If None, attempt to infer model location based on model ID. model_read_opts : dict, optional options for model reading (passed as keyword args to :func:`pyaerocom.io.ReadUngridded.read`). model_use_vars : dict, optional dictionary that specifies mapping of model variables. Keys are observation variables, values are the corresponding model variables (e.g. model_use_vars=dict(od550aer='od550csaer')). Example: your observation has var *od550aer* but your model model uses a different variable name for that variable, say *od550*. Then, you can specify this via `model_use_vars = {'od550aer' : 'od550'}`. NOTE: in this case, a model variable *od550aer* will be ignored, even if it exists (cf :attr:`model_add_vars`). model_rename_vars : dict, optional rename certain model variables **after** co-location, before storing the associated :class:`ColocatedData` object on disk. Keys are model variables, values are new names (e.g. `model_rename_vars={'od550aer':'MyAOD'}`). Note: this does not impact which variables are read from the model. model_add_vars : dict, optional additional model variables to be processed for one obs variable. E.g. `model_add_vars={'od550aer': ['od550so4', 'od550gt1aer']}` would co-locate both model SO4 AOD (od550so4) and model coarse mode AOD (od550gt1aer) with total AOD (od550aer) from obs (in addition to od550aer vs od550aer if applicable). model_to_stp : bool ALPHA (please do not use): convert model data values to STP conditions after co-location. Note: this only works for very particular settings at the moment and needs revision, as it relies on access to meteorological data. model_ts_type_read : str or dict, optional may be specified to explicitly define the reading frequency of the model data, either as str (same for all obs variables) or variable specific as dict. Not to be confused with :attr:`ts_type`, which specifies the output frequency of the co-located data. model_read_aux : dict, optional may be used to specify additional computation methods of variables from models. Keys are variables to be computed, values are dictionaries with keys `vars_required` (list of required variables for computation of var and `fun` (method that takes list of read data objects and computes and returns var). model_use_climatology : bool if True, attempt to use climatological model data field. Note: this only works if model data is in AeroCom conventions (climatological fields are indicated with 9999 as year in the filename) and if this is active, only single year analysis are supported (i.e. provide int to :attr:`start` to specify the year and leave :attr:`stop` empty). gridded_reader_id : dict BETA: dictionary specifying which gridded reader is supposed to be used for model (and gridded obs) reading. Note: this is a workaround solution and will likely be removed in the future when the gridded reading API is more harmonised (see https://github.com/metno/pyaerocom/issues/174). flex_ts_type : bool Bboolean specifying whether reading frequency of gridded data is allowed to be flexible. This includes all gridded data, whether it is model or gridded observation (e.g. satellites). Defaults to True. min_num_obs : dict or int, optional time resampling constraints applied, defaults to None, in which case no constraints are applied. For instance, say your input is in daily resolution and you want output in monthly and you want to make sure to have roughly 50% daily coverage for the monthly averages. Then you may specify `min_num_obs=15` which will ensure that at least 15 daily averages are available to compute a monthly average. However, you may also define a hierarchical scheme that first goes from daily to weekly and then from weekly to monthly, via a dict. E.g. `min_num_obs=dict(monthly=dict(weekly=4), weekly=dict(daily=3))` would ensure that each week has at least 3 daily values, as well as that each month has at least 4 weekly values. resample_how : str or dict, optional string specifying how data should be aggregated when resampling in time. Default is "mean". Can also be a nested dictionary, e.g. `resample_how={'conco3': 'daily': {'hourly' : 'max'}}` would use the maximum value to aggregate from hourly to daily for variable conco3, rather than the mean. obs_remove_outliers : bool if True, outliers are removed from obs data before colocation, else not. Default is False. Custom outlier ranges for each variable can be specified via :attr:`obs_outlier_ranges`, and for all other variables, the pyaerocom default outlier ranges are used. The latter are specified in `variables.ini` file via `minimum` and `maximum` attributes and can also be accessed through :attr:`pyaerocom.variable.Variable.minimum` and :attr:`pyaerocom.variable.Variable.maximum`, respectively. model_remove_outliers : bool if True, outliers are removed from model data (normally this should be set to False, as the models are supposed to be assessed, including outlier cases). Default is False. Custom outlier ranges for each variable can be specified via :attr:`model_outlier_ranges`, and for all other variables, the pyaerocom default outlier ranges are used. The latter are specified in `variables.ini` file via `minimum` and `maximum` attributes and can also be accessed through :attr:`pyaerocom.variable.Variable.minimum` and :attr:`pyaerocom.variable.Variable.maximum`, respectively. obs_outlier_ranges : dict, optional dictionary specifying outlier ranges for individual obs variables. (e.g. dict(od550aer = [-0.05, 10], ang4487aer=[0,4])). Only relevant if :attr:`obs_remove_outliers` is True. model_outlier_ranges : dict, optional like :attr:`obs_outlier_ranges` but for model variables. Only relevant if :attr:`model_remove_outliers` is True. zeros_to_nan : bool If True, zero's in output co-located data object will be converted to NaN. Default is False. harmonise_units : bool if True, units are attempted to be harmonised during co-location (note: raises Exception if True and in case units cannot be harmonised). regrid_res_deg : int, optional resolution in degrees for regridding of model grid (done before co-location). Default is None. colocate_time : bool if True and if obs and model sampling frequency (e.g. daily) are higher than output colocation frequency (e.g. monthly), then the datasets are first colocated in time (e.g. on a daily basis), before the monthly averages are calculated. Default is False. reanalyse_existing : bool if True, always redo co-location, even if there is already an existing co-located NetCDF file (under the output location specified by :attr:`basedir_coldata` ) for the given variable combination to be co-located. If False and output already exists, then co-location is skipped for the associated variable. Default is True. raise_exceptions : bool if True, Exceptions that may occur for individual variables to be processed, are raised, else the analysis is skipped for such cases. keep_data : bool if True, then all colocated data objects computed when running :func:`run` will be stored in :attr:`data`. Defaults to True. add_meta : dict additional metadata that is supposed to be added to each output :class:`ColocatedData` object. """ #: Dictionary specifying alternative vertical types that may be used to #: read model data. E.g. consider the variable is ec550aer, #: obs_vert_type='Surface' and obs_vert_type_alt=dict(Surface='ModelLevel'). #: Now, if a model that is used for the analysis does not contain a data #: file for ec550aer at the surface ('*ec550aer*Surface*.nc'), then, the #: colocation routine will look for '*ec550aer*ModelLevel*.nc' and if this #: exists, it will load it and extract the surface level. OBS_VERT_TYPES_ALT = {"Surface": "ModelLevel", "2D": "2D"} #: do not raise Exception if invalid item is attempted to be assigned #: (Overwritten from base class) CRASH_ON_INVALID = False FORBIDDEN_KEYS = [ "var_outlier_ranges", # deprecated since v0.12.0 "var_ref_outlier_ranges", # deprecated since v0.12.0 "remove_outliers", # deprecated since v0.12.0 ] ts_type = StrWithDefault("monthly") obs_vars = ListOfStrings() def __init__( self, model_id=None, obs_config: Optional[PyaroConfig] = None, obs_id=None, obs_vars=None, ts_type=None, start=None, stop=None, basedir_coldata=None, save_coldata=False, **kwargs, ): self.model_id = model_id self._obs_id = None self._obs_config = None self.obs_id = obs_id self.obs_config = obs_config self.obs_vars = obs_vars self.ts_type = ts_type self.start = start self.stop = stop # crashes if input filter name is invalid self.filter_name = f"{ALL_REGION_NAME}-wMOUNTAINS" if basedir_coldata is not None: basedir_coldata = self._check_input_basedir_coldata(basedir_coldata) else: basedir_coldata = const.COLOCATEDDATADIR self.basedir_coldata = basedir_coldata self.save_coldata = save_coldata # END OF ASSIGNMENT OF MOST COMMON PARAMETERS - BELOW ARE FURTHER # CONFIG ATTRIBUTES, THAT ARE OPTIONAL AND LESS FREQUENTLY USED # Options related to obs reading and processing self.obs_name = None self.obs_data_dir = None self.obs_use_climatology = False self._obs_cache_only = False # only relevant if obs is ungridded self.obs_vert_type = None self.obs_ts_type_read = None self.obs_filters = {} self._obs_is_vertical_profile = False self.colocation_layer_limits = None self.profile_layer_limits = None self.read_opts_ungridded = {} # Attributes related to model data self.model_name = None self.model_data_dir = None self.model_read_opts = {} self.model_use_vars = {} self.model_rename_vars = {} self.model_add_vars = {} self.model_to_stp = False self.model_ts_type_read = None self.model_read_aux = {} self.model_use_climatology = False self.gridded_reader_id = {"model": "ReadGridded", "obs": "ReadGridded"} self.flex_ts_type = True # Options related to time resampling self.min_num_obs = None self.resample_how = "mean" # Options related to outlier removal self.obs_remove_outliers = False self.model_remove_outliers = False # Custom outlier ranges for model and obs self.obs_outlier_ranges = {} self.model_outlier_ranges = {} self.zeros_to_nan = False self.harmonise_units = False self.regrid_res_deg = None self.colocate_time = False self.reanalyse_existing = True self.raise_exceptions = False self.keep_data = True self.add_meta = {} self.update(**kwargs) def _check_input_basedir_coldata(self, basedir_coldata): """ Make sure input basedir_coldata is str and exists Parameters ---------- basedir_coldata : str or Path basic output directory for colocated data Raises ------ ValueError If input is invalid. Returns ------- str valid output directory """ if isinstance(basedir_coldata, Path): basedir_coldata = str(basedir_coldata) if isinstance(basedir_coldata, str): if not os.path.exists(basedir_coldata): os.mkdir(basedir_coldata) return basedir_coldata raise ValueError(f"Invalid input for basedir_coldata: {basedir_coldata}") def _check_basedir_coldata(self): """ Make sure output directory for colocated data files exists Raises ------ FileNotFoundError If :attr:`basedir_coldata` does not exist and cannot be created. Returns ------- str current value of :attr:`basedir_coldata` """ basedir_coldata = self.basedir_coldata if basedir_coldata is None: basedir_coldata = const.COLOCATEDDATADIR if not os.path.exists(basedir_coldata): logger.info(f"Creating directory: {basedir_coldata}") os.mkdir(basedir_coldata) elif isinstance(basedir_coldata, Path): basedir_coldata = str(basedir_coldata) if isinstance(basedir_coldata, str) and not os.path.exists(basedir_coldata): os.mkdir(basedir_coldata) if not os.path.exists(basedir_coldata): raise FileNotFoundError( f"Output directory for colocated data files {basedir_coldata} does not exist" ) self.basedir_coldata = basedir_coldata return basedir_coldata @property def basedir_logfiles(self): """Base directory for storing logfiles""" p = chk_make_subdir(self.basedir_coldata, "logfiles") return p @property def obs_id(self) -> str: return self._obs_id @obs_id.setter def obs_id(self, val: Optional[str]) -> None: if self.obs_config is not None and val != self.obs_config.name: logger.info( f"Data ID in Pyaro config {self.obs_config.name} does not match obs_id {val}. Setting Pyaro config to None!" ) self.obs_config = None self._obs_id = val @property def obs_config(self) -> PyaroConfig: return self._obs_config @obs_config.setter def obs_config(self, val: Optional[PyaroConfig]) -> None: if val is not None: if isinstance(val, dict): logger.info(f"Obs config was given as dict. Will try to convert to PyaroConfig") val = PyaroConfig(**val) if self.obs_id is not None and val.name != self.obs_id: logger.info( f"Data ID in Pyaro config {val.name} does not match obs_id {self.obs_id}. Setting Obs ID to match Pyaro Config!" ) self.obs_id = val.name if self.obs_id is None: self.obs_id = val.name self._obs_config = val
[docs] def add_glob_meta(self, **kwargs): """ Add global metadata to :attr:`add_meta` Parameters ---------- kwargs metadata to be added Returns ------- None """ self.add_meta.update(**kwargs)
def __setitem__(self, key, val): if key == "basedir_coldata": val = self._check_input_basedir_coldata(val) super().__setitem__(key, val) def _period_from_start_stop(self) -> str: start, stop = start_stop(self.start, self.stop, stop_sub_sec=False) y0, y1 = start.year, stop.year assert y0 <= y1 if y0 == y1: return str(y0) else: return f"{y0}-{y1}"
[docs] class Colocator(ColocationSetup): """High level class for running co-location Note ---- This object inherits from :class:`ColocationSetup` and is also instantiated as such. For setup attributes, please see base class. """ SUPPORTED_GRIDDED_READERS = {"ReadGridded": ReadGridded, "ReadMscwCtm": ReadMscwCtm} STATUS_CODES = { 1: "SUCCESS", 2: "NOT OK: Missing/invalid model variable", 3: "NOT OK: Missing/invalid obs variable", 4: "NOT OK: Failed to read model variable", 5: "NOT OK: Colocation failed", } def __init__(self, **kwargs): super().__init__(**kwargs) self._log = None self.logging = True self._loaded_model_data = {} self.data = {} self._processing_status = [] self.files_written = [] self._model_reader = None self._obs_reader = None @property def model_vars(self): """ List of all model variables specified in config Note ---- This method does not check if the variables are valid or available. Returns ------- list list of all model variables specified in this setup. """ ovars = self.obs_vars model_vars = [] for ovar in ovars: if ovar in self.model_use_vars: model_vars.append(self.model_use_vars[ovar]) else: model_vars.append(ovar) for ovar, mvars in self.model_add_vars.items(): if not ovar in ovars: logger.warning( f"Found entry in model_add_vars for obsvar {ovar} which " f"is not specified in attr obs_vars, and will thus be " f"ignored" ) model_vars += mvars return model_vars @property def obs_is_ungridded(self): """ bool: True if obs_id refers to an ungridded observation, else False """ if self.obs_config is not None: return True return True if self.obs_id in get_all_supported_ids_ungridded() else False @property def obs_is_vertical_profile(self): """ bool: True if obs_id refers to a VerticalProfile, else False """ return self._obs_is_vertical_profile @obs_is_vertical_profile.setter def obs_is_vertical_profile(self, value): self._obs_is_vertical_profile = value @property def model_reader(self): """ Model data reader """ if self._model_reader is not None: if self._model_reader.data_id == self.model_id: return self._model_reader logger.info( f"Reloading outdated model reader. ID of current reader: " f"{self._model_reader.data_id}. New ID: {self.model_id}" ) self._model_reader = self._instantiate_gridded_reader(what="model") self._loaded_model_data = {} return self._model_reader def _check_data_id_obs_reader(self): """ Check if obs_reader is instantiated with correct obs ID. Returns ------- bool True if current obs reader can be used for obs reading, else False. """ reader = self._obs_reader if reader is None: return False elif self.obs_is_ungridded and self.obs_id in reader.data_ids: return True elif self.obs_id == reader.data_id: return True @property def obs_reader(self): """ Observation data reader """ if not self._check_data_id_obs_reader(): if self.obs_is_ungridded: self._obs_reader = ReadUngridded( data_ids=[self.obs_id], data_dirs=self.obs_data_dir, configs=[ self.obs_config, ], ) else: self._obs_reader = self._instantiate_gridded_reader(what="obs") return self._obs_reader @property def output_dir(self): """ str: Output directory for colocated data NetCDF files """ self._check_basedir_coldata() loc = os.path.join(self.basedir_coldata, self.get_model_name()) if not os.path.exists(loc): logger.info(f"Creating dir {loc}") os.mkdir(loc) return loc @property def processing_status(self): head = ["Model Var", "Obs Var", "Status"] tab = [] for mvar, ovar, key in self._processing_status: tab.append([mvar, ovar, self.STATUS_CODES[key]]) return pd.DataFrame(tab, columns=head)
[docs] def get_model_name(self): """ Get name of model Note ---- Not to be confused with :attr:`model_id` which is always the database ID of the model, while model_name can differ from that and is used for output files, etc. Raises ------ AttributeError If neither model_id or model_name are set Returns ------- str preferably :attr:`model_name`, else :attr:`model_id` """ if self.model_name is None: if self.model_id is None: raise AttributeError("Neither model_name nor model_id are set") return self.model_id return self.model_name
[docs] def get_obs_name(self): """ Get name of obsdata source Note ---- Not to be confused with :attr:`obs_id` which is always the database ID of the observation dataset, while obs_name can differ from that and is used for output files, etc. Raises ------ AttributeError If neither obs_id or obs_name are set Returns ------- str preferably :attr:`obs_name`, else :attr:`obs_id` """ if self.obs_name is None: if self.obs_id is None: raise AttributeError("Neither obs_name nor obs_id are set") return self.obs_id return self.obs_name
def get_model_data(self, model_var): if model_var in self._loaded_model_data: mdata = self._loaded_model_data[model_var] if mdata.data_id == self.model_id: return mdata self._check_add_model_read_aux(model_var) mdata = self._read_gridded(var_name=model_var, is_model=True) self._loaded_model_data[model_var] = mdata return mdata def get_obs_data(self, obs_var): if self.obs_is_ungridded: return self._read_ungridded(obs_var) else: return self._read_gridded(obs_var, is_model=False) def get_start_str(self): if self.start is None: raise AttributeError("start time is not set") return to_datestring_YYYYMMDD(to_pandas_timestamp(self.start)) def get_stop_str(self): if self.stop is None: raise AttributeError("stop time is not set") return to_datestring_YYYYMMDD(to_pandas_timestamp(self.stop))
[docs] def prepare_run(self, var_list: list = None) -> dict: """ Prepare colocation run for current setup. Parameters ---------- var_name : str, optional Variable name that is supposed to be analysed. The default is None, in which case all defined variables are attempted to be colocated. Raises ------ AttributeError If no observation variables are defined (:attr:`obs_vars` empty). Returns ------- vars_to_process : dict Mapping of variables to be processed, keys are model vars, values are obs vars. """ try: self._init_log() except Exception: logger.warning("Deactivating logging in Colocator") self.logging = False if isinstance(self.obs_vars, str): self.obs_vars = [self.obs_vars] elif not isinstance(self.obs_vars, list): raise AttributeError("obs_vars not defined or invalid, need list with strings...") self._check_obs_vars_available() self._check_obs_filters() self._check_model_add_vars() self._check_set_start_stop() vars_to_process = self._find_var_matches() if var_list is not None: vars_to_process = self._filter_var_matches_varlist(vars_to_process, var_list) (vars_to_process, ts_types) = self._check_load_model_data(vars_to_process) if self.save_coldata and not self.reanalyse_existing: vars_to_process = self._filter_var_matches_files_not_exist(vars_to_process, ts_types) return vars_to_process
[docs] def run(self, var_list: list = None, **opts): """Perform colocation for current setup See also :func:`prepare_run`. Parameters ---------- var_list : list, optional list of variables supposed to be analysed. The default is None, in which case all defined variables are attempted to be colocated. **opts keyword args that may be specified to change the current setup before colocation Returns ------- dict nested dictionary, where keys are model variables, values are dictionaries comprising key / value pairs of obs variables and associated instances of :class:`ColocatedData`. """ self.update(**opts) data_out = {} # ToDo: see if the following could be solved via custom context manager try: vars_to_process = self.prepare_run(var_list) except Exception as ex: logger.exception(ex) if self.raise_exceptions: self._print_processing_status() self._write_log(f"ABORTED: raise_exceptions is True: {traceback.format_exc()}\n") self._close_log() raise ex vars_to_process = {} self._print_coloc_info(vars_to_process) for mod_var, obs_var in vars_to_process.items(): try: coldata = self._run_helper( mod_var, obs_var ) # note this can be ColocatedData or ColocatedDataLists if not mod_var in data_out: data_out[mod_var] = {} data_out[mod_var][obs_var] = coldata self._processing_status.append([mod_var, obs_var, 1]) except Exception: msg = f"Failed to perform analysis: {traceback.format_exc()}\n" logger.warning(msg) self._processing_status.append([mod_var, obs_var, 5]) self._write_log(msg) if self.raise_exceptions: self._print_processing_status() self._write_log("ABORTED: raise_exceptions is True\n") self._close_log() raise ColocationError(traceback.format_exc()) self._write_log("Colocation finished") self._close_log() self._print_processing_status() if self.keep_data: self.data = data_out return data_out
[docs] def get_nc_files_in_coldatadir(self): """ Get list of NetCDF files in colocated data directory Returns ------- list list of NetCDF file paths found """ mask = f"{self.output_dir}/*.nc" return glob.glob(mask)
def get_available_coldata_files(self, var_list: list = None) -> list: self._check_set_start_stop() def check_meta_match(meta, **kwargs): for key, val in kwargs.items(): if not meta[key] == val: return False return True mname = self.get_model_name() oname = self.get_obs_name() model_vars = self.model_vars obs_vars = self.obs_vars start, stop = self.get_start_str(), self.get_stop_str() valid = [] all_files = self.get_nc_files_in_coldatadir() for file in all_files: try: meta = ColocatedData.get_meta_from_filename(file) except Exception: continue candidate = check_meta_match( meta, model_name=mname, obs_name=oname, start=start, stop=stop ) if candidate and meta["model_var"] in model_vars and meta["obs_var"] in obs_vars: if var_list is None: ok = True else: ok = False for var in var_list: if meta["model_var"] == var or meta["obs_var"] == var: ok = True break if ok: valid.append(file) return valid def _filter_var_matches_varlist(self, vars_to_process, var_list) -> dict: _vars_to_process = {} if isinstance(var_list, str): var_list = [var_list] for var_name in var_list: _subset = self._filter_var_matches_var_name(vars_to_process, var_name) _vars_to_process.update(**_subset) return _vars_to_process def _read_ungridded(self, var_name): """Helper to read UngriddedData Note ---- reading is restricted to single variable UngriddedData objects here, due to multiple possibilities for variable specific obs filter definitions and because the colocation is done sequentially for each variable. Parameters ---------- vars_to_read : str or list, optional variables that should be read from obs-network (:attr:`obs_id`) Returns ------- UngriddedData loaded data object """ obs_reader = self.obs_reader obs_filters_post = self._eval_obs_filters(var_name) obs_data = obs_reader.read( data_ids=[self.obs_id], vars_to_retrieve=var_name, only_cached=self._obs_cache_only, filter_post=obs_filters_post, **self.read_opts_ungridded, ) if self.obs_remove_outliers: oor = self.obs_outlier_ranges if var_name in oor: low, high = oor[var_name] else: var_info = const.VARS[var_name] low, high = var_info.minimum, var_info.maximum obs_data.remove_outliers( var_name, low=low, high=high, inplace=True, move_to_trash=False ) return obs_data def _check_obs_filters(self): obs_vars = self.obs_vars if any([x in self.obs_filters for x in obs_vars]): # variable specific obs_filters for ovar in obs_vars: if not ovar in self.obs_filters: self.obs_filters[ovar] = {} def _check_load_model_data(self, var_matches): """ Try to preload modeldata for input variable matches Note ---- This will load all model fields for each obs variable in lazy mode, so should not require much storage. T Parameters ---------- var_matches : dict dictionary specifying model / obs var pairs for colocation. Raises ------ ColocationError If :attr:`raise_exceptions` is True and if one of the input model variables cannot be loaded. Returns ------- filtered : dict `var_matches` filtered by entries for which model data could be successfully read. ts_types : dict data frequencies for each model variable that could be read. Those depend also on settings for :attr:`flex_ts_type` """ filtered, ts_types = {}, {} for mvar, ovar in var_matches.items(): try: mdata = self.get_model_data(mvar) filtered[mvar] = ovar ts_types[mvar] = mdata.ts_type except Exception as e: msg = f"Failed to load model data: {self.model_id} ({mvar}). Reason {e}" logger.warning(msg) self._write_log(msg + "\n") self._processing_status.append([mvar, ovar, 4]) if self.raise_exceptions: raise ColocationError(msg) return filtered, ts_types def _filter_var_matches_files_not_exist(self, var_matches, ts_types): filtered = {} for mvar, ovar in var_matches.items(): ts_type = ts_types[mvar] fname = self._coldata_savename(ovar, mvar, ts_type) fp = os.path.join(self.output_dir, fname) if not os.path.exists(fp): filtered[mvar] = ovar return filtered def _check_model_add_vars(self): for ovar, mvars in self.model_add_vars.items(): if not isinstance(mvars, list): raise ValueError("Values of model_add_vars need to be list") elif not all([isinstance(x, str) for x in mvars]): raise ValueError("Values of model_add_vars need to be list of strings") def _instantiate_gridded_reader(self, what): """ Create reader for model or observational gridded data. Parameters ---------- what : str Type of reader. ("model" or "obs") Returns ------- Instance of reader class defined in self.SUPPORTED_GRIDDED_READERS """ if what == "model": data_id = self.model_id data_dir = self.model_data_dir else: data_id = self.obs_id data_dir = self.obs_data_dir reader_class = self._get_gridded_reader_class(what=what) reader = reader_class(data_id=data_id, data_dir=data_dir) return reader def _get_gridded_reader_class(self, what): """Returns the class of the reader for gridded data.""" try: reader = self.SUPPORTED_GRIDDED_READERS[self.gridded_reader_id[what]] except KeyError as e: raise NotImplementedError( f"Reader {self.gridded_reader_id[what]} is not supported: {e}" ) return reader def _check_add_model_read_aux(self, model_var): if not model_var in self.model_read_aux: return False info = self.model_read_aux[model_var] if not isinstance(info, dict): raise ValueError( f"Invalid value for model_read_aux of variable {model_var}. " f"Need dictionary, got {info}" ) elif not all([x in info for x in ["vars_required", "fun"]]): raise ValueError( f"Invalid value for model_read_aux dict of variable {model_var}. " f"Require keys vars_required and fun in dict, got {info}" ) try: self.model_reader.add_aux_compute(var_name=model_var, **info) except DataCoverageError: return False return True def _check_obs_vars_available(self): if self.obs_vars == []: raise ColocationSetupError("no observation variables specified...") oreader = self.obs_reader if self.obs_is_ungridded: avail = oreader.get_vars_supported(self.obs_id, self.obs_vars) else: avail = [] for ovar in self.obs_vars: if oreader.has_var(ovar): avail.append(ovar) if len(self.obs_vars) > len(avail): for ovar in self.obs_vars: if not ovar in avail: logger.warning( f"Obs variable {ovar} is not available in {self.obs_id} " f"and will be ignored" ) self._processing_status.append([None, ovar, 3]) if self.raise_exceptions: invalid = [var for var in self.obs_vars if not var in avail] invalid = "; ".join(invalid) raise DataCoverageError(f"Invalid obs var(s) for {self.obs_id}: {invalid}") self.obs_vars = avail def _print_processing_status(self): mname = self.get_model_name() oname = self.get_obs_name() logger.info(f"Colocation processing status for {mname} vs. {oname}") logger.info(self.processing_status) def _filter_var_matches_var_name(self, var_matches, var_name): filtered = {} for mvar, ovar in var_matches.items(): if mvar in var_name or ovar in var_name: filtered[mvar] = ovar if len(filtered) == 0: raise DataCoverageError(var_name) return filtered def _find_var_matches(self): """Find variable matches in model data for input obs variables""" # dictionary that will map model variables (keys) with observation # variables (values) var_matches = {} all_ok = True muv = self.model_use_vars modreader = self.model_reader for ovar in self.obs_vars: if ovar in muv: mvar = muv[ovar] else: mvar = ovar self._check_add_model_read_aux(mvar) if modreader.has_var(mvar): var_matches[mvar] = ovar else: self._processing_status.append([mvar, ovar, 2]) all_ok = False if ovar in self.model_add_vars: # observation variable addvars = self.model_add_vars[ovar] for addvar in addvars: self._check_add_model_read_aux(addvar) if modreader.has_var(addvar): var_matches[addvar] = ovar else: self._processing_status.append([addvar, ovar, 2]) all_ok = False if not all_ok and self.raise_exceptions: raise DataCoverageError("Some model variables are not available") return var_matches def _get_ts_type_read(self, var_name, is_model): """ Get *desired* reading frequency for gridded reading Parameters ---------- var_name : str Name of variable to be read. is_model : bool True if reading refers to model reading, else False (e.g. gridded satellite obs). Raises ------ ValueError Returns ------- str or None frequency to be read. """ tst = self.ts_type # default if is_model and self.model_ts_type_read is not None: tst = self.model_ts_type_read if tst == "": tst = self.ts_type elif not is_model and self.obs_ts_type_read is not None: tst = self.obs_ts_type_read if isinstance(tst, dict): if var_name in tst: tst = tst[var_name] else: tst = self.ts_type return tst def _read_gridded(self, var_name, is_model): start, stop = self.start, self.stop ts_type_read = self._get_ts_type_read(var_name, is_model) kwargs = {} if is_model: reader = self.model_reader vert_which = self.obs_vert_type if self.model_use_climatology: # overwrite start and stop to read climatology file for model start, stop = 9999, None if var_name in self.model_read_opts: kwargs.update(self.model_read_opts[var_name]) else: reader = self.obs_reader vert_which = None ts_type_read = self.obs_ts_type_read kwargs.update(self._eval_obs_filters(var_name)) try: data = reader.read_var( var_name, start=start, stop=stop, ts_type=ts_type_read, vert_which=vert_which, flex_ts_type=self.flex_ts_type, **kwargs, ) except DataCoverageError: vert_which_alt = self._try_get_vert_which_alt(is_model, var_name) data = reader.read_var( var_name, start=start, stop=stop, ts_type=ts_type_read, flex_ts_type=self.flex_ts_type, vert_which=vert_which_alt, ) data = self._check_remove_outliers_gridded(data, var_name, is_model) return data def _try_get_vert_which_alt(self, is_model, var_name): if is_model: if self.obs_vert_type in self.OBS_VERT_TYPES_ALT: return self.OBS_VERT_TYPES_ALT[self.obs_vert_type] raise DataCoverageError(f"No alternative vert type found for {var_name}") def _check_remove_outliers_gridded(self, data, var_name, is_model): if is_model: rm_outliers = self.model_remove_outliers outlier_ranges = self.model_outlier_ranges else: rm_outliers = self.obs_remove_outliers outlier_ranges = self.obs_outlier_ranges if len(outlier_ranges) > 0 and not rm_outliers: logger.warning( f"WARNING: Found definition of outlier ranges for {var_name} " f"({data.data_id}) but outlier removal is deactivated. Consider " f"checking your setup (note: model or obs outlier removal can be " f"activated via attrs. model_remove_outliers and remove_outliers, " f"respectively" ) if rm_outliers: if var_name in outlier_ranges: low, high = outlier_ranges[var_name] else: var_info = const.VARS[var_name] low, high = var_info.minimum, var_info.maximum data.check_unit() data.remove_outliers(low, high, inplace=True) return data def _eval_obs_filters(self, var_name): obs_filters = self["obs_filters"] if var_name in obs_filters: obs_filters = obs_filters[var_name] remaining = {} if not isinstance(obs_filters, dict): raise AttributeError( f"Detected obs_filters attribute in Colocator class, " f"which is not a dictionary: {obs_filters}" ) for key, val in obs_filters.items(): # keep ts_type filter in remaining (added on 17.2.21, 0.10.0 -> 0.10.1) if key in self and not key == "ts_type": # can be handled if isinstance(self[key], dict) and isinstance(val, dict): self[key].update(val) else: self[key] = val else: remaining[key] = val return remaining def _save_coldata(self, coldata): """Helper for saving colocateddata""" obs_var, mod_var = coldata.metadata["var_name_input"] if mod_var in self.model_rename_vars: mvar = self.model_rename_vars[mod_var] logger.info( f"Renaming model variable from {mod_var} to {mvar} in " f"ColocatedData before saving to NetCDF." ) coldata.rename_variable(mod_var, mvar, self.model_id) else: mvar = mod_var if hasattr(coldata, "vertical_layer"): # save colocated vertical layer netCDF files with vertical layers in km if not Unit(coldata.data.altitude_units) == Unit("km"): start = Unit(coldata.data.altitude_units).convert( coldata.vertical_layer["start"], other="km" ) end = Unit(coldata.data.altitude_units).convert( coldata.vertical_layer["end"], other="km" ) vertical_layer = {"start": start, "end": end} else: vertical_layer = coldata.vertical_layer savename = self._coldata_savename( obs_var, mvar, coldata.ts_type, vertical_layer=vertical_layer ) else: savename = self._coldata_savename(obs_var, mvar, coldata.ts_type) fp = coldata.to_netcdf(self.output_dir, savename=savename) self.files_written.append(fp) msg = f"WRITE: {fp}\n" self._write_log(msg) logger.info(msg) def _eval_resample_how(self, model_var, obs_var): rshow = self.resample_how if not isinstance(rshow, dict): return rshow if obs_var in rshow: return rshow[obs_var] elif model_var in rshow: return rshow[model_var] else: return None def _infer_start_stop_yr_from_model_reader(self): """ Infer start / stop year for colocation from gridded model reader Sets :attr:`start` and :attr:`stop` """ # get sorted list of years available in model data (files with year # 9999 denote climatological data) yrs_avail = self.model_reader.years_avail if self.model_use_climatology: if not 9999 in yrs_avail: raise DataCoverageError("No climatology files available") first, last = 9999, None else: if 9999 in yrs_avail: yrs_avail = [x for x in yrs_avail if not x == 9999] first, last = yrs_avail[0], yrs_avail[-1] if first == last: last = None self.start = first self.stop = last def _check_set_start_stop(self): if self.start is None: self._infer_start_stop_yr_from_model_reader() if self.model_use_climatology: if self.stop is not None or not isinstance(self.start, int): raise ColocationSetupError( "Conflict: only single year analyses are support for model " 'climatology fields, please specify "start" as integer ' 'denoting the year, and set "stop"=None' ) self.start, self.stop = start_stop(self.start, self.stop) def _coldata_savename(self, obs_var, mod_var, ts_type, **kwargs): """Get filename of colocated data file for saving""" if "vertical_layer" in kwargs: vertical_layer = kwargs["vertical_layer"] else: vertical_layer = None name = ColocatedData._aerocom_savename( obs_var=obs_var, obs_id=self.get_obs_name(), mod_var=mod_var, mod_id=self.get_model_name(), start_str=self.get_start_str(), stop_str=self.get_stop_str(), ts_type=ts_type, filter_name=self.filter_name, vertical_layer=vertical_layer, ) return f"{name}.nc" def _get_colocation_ts_type(self, model_ts_type, obs_ts_type=None): chk = [self.ts_type, model_ts_type] if obs_ts_type is not None: chk.append(obs_ts_type) return get_lowest_resolution(*chk) @property def _colocation_func(self): """ Function used for colocation Returns ------- callable function the performs co-location operation """ if self.obs_is_vertical_profile: return colocate_vertical_profile_gridded if self.obs_is_ungridded: return colocate_gridded_ungridded else: return colocate_gridded_gridded def _prepare_colocation_args(self, model_var: str, obs_var: str): model_data = self.get_model_data(model_var) obs_data = self.get_obs_data(obs_var) if getattr(obs_data, "is_vertical_profile", None): self.obs_is_vertical_profile = obs_data.is_vertical_profile rshow = self._eval_resample_how(model_var, obs_var) if self.model_use_climatology: baseyr = self.start.year else: baseyr = None # input args shared between all colocation functions args = dict( data=model_data, data_ref=obs_data, start=self.start, stop=self.stop, filter_name=self.filter_name, regrid_res_deg=self.regrid_res_deg, harmonise_units=self.harmonise_units, update_baseyear_gridded=baseyr, min_num_obs=self.min_num_obs, colocate_time=self.colocate_time, resample_how=rshow, ) if self.obs_is_ungridded: ts_type = self._get_colocation_ts_type(model_data.ts_type) args.update( ts_type=ts_type, var_ref=obs_var, use_climatology_ref=self.obs_use_climatology, ) else: ts_type = self._get_colocation_ts_type(model_data.ts_type, obs_data.ts_type) args.update(ts_type=ts_type) if self.obs_is_vertical_profile: args.update( colocation_layer_limits=self.colocation_layer_limits, profile_layer_limits=self.profile_layer_limits, ) return args def _check_dimensionality(self, args): mdata = args["data"] odata = args["data_ref"] from pyaerocom.exceptions import DataDimensionError from pyaerocom.griddeddata import GriddedData if mdata.ndim == 4 and self.obs_vert_type == "Surface": mdata = mdata.extract_surface_level() args["data"] = mdata if isinstance(odata, GriddedData): if odata.ndim == 4 and self.obs_vert_type == "Surface": odata = odata.extract_surface_level() args["data_ref"] = odata elif odata.ndim > 3: raise DataDimensionError( f"cannot co-locate model data with more than 3 dimensions: {odata}" ) return args def _run_helper(self, model_var: str, obs_var: str): logger.info(f"Running {self.model_id} ({model_var}) vs. {self.obs_id} ({obs_var})") args = self._prepare_colocation_args(model_var, obs_var) args = self._check_dimensionality(args) coldata = self._colocation_func(**args) if isinstance(coldata, ColocatedData): coldata.data.attrs["model_name"] = self.get_model_name() coldata.data.attrs["obs_name"] = self.get_obs_name() coldata.data.attrs["vert_code"] = self.obs_vert_type coldata.data.attrs.update(**self.add_meta) if self.zeros_to_nan: coldata = coldata.set_zeros_nan() if self.model_to_stp: coldata = correct_model_stp_coldata(coldata) if self.save_coldata: self._save_coldata(coldata) elif isinstance(coldata, ColocatedDataLists): # look into intertools chain.from_iterable for i_list in coldata: for coldata_obj in i_list: coldata_obj.data.attrs["model_name"] = self.get_model_name() coldata_obj.data.attrs["obs_name"] = self.get_obs_name() coldata_obj.data.attrs["vert_code"] = self.obs_vert_type coldata_obj.data.attrs.update(**self.add_meta) if self.zeros_to_nan: coldata_obj = coldata_obj.set_zeros_nan() if self.model_to_stp: # TODO: check is this needs modifying coldata = correct_model_stp_coldata(coldata_obj) if self.save_coldata: self._save_coldata(coldata_obj) else: raise Exception( f"Invalid coldata type returned by colocation function {self._colocation_func}" ) return coldata def _print_coloc_info(self, var_matches): if not var_matches: logger.info("Nothing to colocate") return logger.info("The following variable combinations will be colocated\nMODEL-VAR\tOBS-VAR") for key, val in var_matches.items(): logger.info(f"{key}\t{val}") def _init_log(self): logdir = chk_make_subdir(self.basedir_logfiles, self.get_model_name()) oname = self.get_obs_name() datestr = datetime.today().strftime("%Y%m%d") datetimestr = datetime.today().strftime("%d-%m-%Y %H:%M") fname = f"{oname}_{datestr}.log" logfile = os.path.join(logdir, fname) self._log = log = open(logfile, "a+") log.write("\n------------------ NEW ----------------\n") log.write(f"Timestamp: {datetimestr}\n\n") log.write("Analysis configuration\n") ignore = ["_log", "logging", "data", "_model_reader", "_obs_reader"] for key, val in self.items(): if key in ignore: continue log.write(f"{key}: {val}\n") def _write_log(self, msg): if self.logging: self._log.write(msg) def _close_log(self): if self._log is not None: self._log.close() self._log = None