Source code for pyaerocom.aeroval.superobs_engine

import logging
from traceback import format_exc

import numpy as np
import xarray as xr

from pyaerocom import ColocatedData
from pyaerocom.aeroval._processing_base import HasColocator, ProcessingEngine
from pyaerocom.aeroval.coldatatojson_engine import ColdataToJsonEngine
from pyaerocom.helpers import get_lowest_resolution

logger = logging.getLogger(__name__)


[docs] class SuperObsEngine(ProcessingEngine, HasColocator): """ Class to handle the processing of combined obs datasets """
[docs] def run(self, model_name, obs_name, var_list, try_colocate_if_missing=True): self._process_entry( model_name=model_name, obs_name=obs_name, var_list=var_list, try_colocate_if_missing=try_colocate_if_missing, )
def _process_entry(self, model_name, obs_name, var_list, try_colocate_if_missing): sobs_cfg = self.cfg.obs_cfg.get_entry(obs_name) if var_list is None: var_list = sobs_cfg.obs_vars elif isinstance(var_list, str): var_list = [var_list] elif not isinstance(var_list, list): raise ValueError(f"invalid input for var_list: {var_list}.") for var_name in var_list: try: self._run_var(model_name, obs_name, var_name, try_colocate_if_missing) except Exception: if self.raise_exceptions: raise logger.warning( f"Failed to process superobs entry for {obs_name}, " f"{model_name}, var {var_name}. Reason: {format_exc()}" ) def _run_var(self, model_name, obs_name, var_name, try_colocate_if_missing): """ Run evaluation of superobs entry Parameters ---------- model_name : str name of model in :attr:`model_config` obs_name : str name of super observation in :attr:`obs_cfg` var_name : str name of variable to be processed. try_colocate_if_missing : bool if True, then missing colocated data objects are computed on the fly. Raises ------ ValueError If multiple (or no) colocated data objects are available for individual obs datasets of which the superobservation is comprised. Returns ------- None """ coldata_files = [] coldata_resolutions = [] vert_codes = [] obs_entry = self.cfg.obs_cfg.get_entry(obs_name) obs_needed = obs_entry.obs_id vert_code = obs_entry.obs_vert_type for oname in obs_needed: fp, ts_type, vert_code = self._get_coldata_fileinfo( model_name, oname, var_name, try_colocate_if_missing ) coldata_files.append(fp) coldata_resolutions.append(ts_type) vert_codes.append(vert_code) if len(np.unique(vert_codes)) > 1 or vert_codes[0] != vert_code: raise ValueError( "Cannot merge observations with different vertical types into " "super observation..." ) if not len(coldata_files) == len(obs_needed): raise ValueError( f"Could not retrieve colocated data files for " f"all required observations for super obs " f"{obs_name}" ) to_freq = get_lowest_resolution(*coldata_resolutions) darrs = [] for fp in coldata_files: darrs.append(self._get_dataarray(fp, to_freq, obs_name)) merged = xr.concat(darrs, dim="station_name") coldata = ColocatedData(data=merged) engine = ColdataToJsonEngine(self.cfg) engine.process_coldata(coldata) def _get_dataarray(self, fp, to_freq, obs_name): """Get dataarray needed for combination to superobs""" data = ColocatedData(data=fp) if data.ts_type != to_freq: data.resample_time(to_ts_type=to_freq, settings_from_meta=True, inplace=True) arr = data.data ds = arr["data_source"].values source_new = [obs_name, ds[1]] arr["data_source"] = source_new # obs, model_id arr.attrs["data_source"] = source_new arr.attrs["obs_name"] = obs_name return arr def _get_coldata_fileinfo(self, model_name, obs_name, var_name, try_colocate_if_missing): """Get fileinfo about existing colocated data object""" col = self.get_colocator(model_name, obs_name) if self.reanalyse_existing: col.run(var_list=[var_name]) cdf = col.files_written else: cdf = col.get_available_coldata_files([var_name]) if len(cdf) == 0 and try_colocate_if_missing: col.run(var_list=[var_name]) cdf = col.files_written if len(cdf) != 1: raise ValueError( f"Fatal: Found multiple colocated data objects for " f"{model_name}, {obs_name}, {var_name}: {cdf}..." ) fp = cdf[0] meta = ColocatedData.get_meta_from_filename(fp) ts_type = meta["ts_type"] vert_code = self.cfg.obs_cfg.get_entry(obs_name).obs_vert_type return (fp, ts_type, vert_code)