Source code for pyaerocom.io.read_ebas

import fnmatch
import logging
import os
import re

import numpy as np
from geonum.atmosphere import T0_STD, p0
from tqdm import tqdm

from pyaerocom import const
from pyaerocom._lowlevel_helpers import BrowseDict
from pyaerocom.aux_var_helpers import (
    calc_vmro3max,
    compute_ac550dryaer,
    compute_ang4470dryaer_from_dry_scat,
    compute_sc440dryaer,
    compute_sc550dryaer,
    compute_sc700dryaer,
    compute_wetna_from_concprcpna,
    compute_wetnh4_from_concprcpnh4,
    compute_wetno3_from_concprcpno3,
    compute_wetoxn_from_concprcpoxn,
    compute_wetoxs_from_concprcpoxs,
    compute_wetoxs_from_concprcpoxsc,
    compute_wetoxs_from_concprcpoxst,
    compute_wetrdn_from_concprcprdn,
    compute_wetso4_from_concprcpso4,
    concx_to_vmrx,
    make_proxy_drydep_from_O3,
    make_proxy_wetdep_from_O3,
    vmrx_to_concx,
)
from pyaerocom.exceptions import (
    EbasFileError,
    MetaDataError,
    NotInFileError,
    TemporalResolutionError,
    TemporalSamplingError,
    UnitConversionError,
)
from pyaerocom.io.ebas_file_index import EbasFileIndex, EbasSQLRequest
from pyaerocom.io.ebas_nasa_ames import EbasNasaAmesFile
from pyaerocom.io.ebas_varinfo import EbasVarInfo
from pyaerocom.io.helpers import _check_ebas_db_local_vs_remote
from pyaerocom.io.readungriddedbase import ReadUngriddedBase
from pyaerocom.molmasses import get_molmass
from pyaerocom.stationdata import StationData
from pyaerocom.tstype import TsType
from pyaerocom.ungriddeddata import UngriddedData
from pyaerocom.units_helpers import get_unit_conversion_fac

logger = logging.getLogger(__name__)


[docs] class ReadEbasOptions(BrowseDict): """Options for EBAS reading routine Attributes ---------- prefer_statistics : list preferred order of data statistics. Some files may contain multiple columns for one variable, where each column corresponds to one of the here defined statistics that where applied to the data. This attribute is only considered for ebas variables, that have not explicitely defined what statistics to use (and in which preferred order, if applicable). Reading preferences for all Ebas variables are specified in the file ebas_config.ini in the data directory of pyaerocom. ignore_statistics : list columns that have either of these statistics applied are ignored for variable data reading. wavelength_tol_nm : int Wavelength tolerance in nm for reading of (wavelength dependent) variables. If multiple matches occur (e.g. query -> variable at 550nm but file contains 3 columns of that variable, e.g. at 520, 530 and 540 nm), then the closest wavelength to the queried wavelength is used within the specified tolerance level. shift_wavelengths : bool (only for wavelength dependent variables). If True, and a data columns candidate is valid within wavelength tolerance around desired wavelength, that column will be considered to be used for data import. Defaults to True. assume_default_ae_if_unavail : bool assume an Angstrom Exponent for applying wavelength shifts of data. See :attr:`ReadEbas.ASSUME_AE_SHIFT_WVL` and :attr:`ReadEbas.ASSUME_AAE_SHIFT_WVL` for AE and AAE assumptions related to scattering and absorption coeffs. Defaults to True. check_correct_MAAP_wrong_wvl : bool (BETA, do not use): set correct wavelength for certain absorption coeff measurements. Defaults to False. eval_flags : bool If True, the flag columns in the NASA Ames files are read and decoded (using :func:`EbasFlagCol.decode`) and the (up to 3 flags for each measurement) are evaluated as valid / invalid using the information in the flags CSV file. The evaluated flags are stored in the data files returned by the reading methods :func:`ReadEbas.read` and :func:`ReadEbas.read_file`. keep_aux_vars : bool if True, auxiliary variables required for computed variables will be written to the :class:`UngriddedData` object created in :func:`ReadEbas.read` (e.g. if sc550dryaer is requested, this requires reading of sc550aer and scrh. The latter 2 will be written to the data object if this parameter evaluates to True) convert_units : bool if True, variable units in EBAS files will be checked and attempted to be converted into AeroCom default unit for that variable. Defaults to True. try_convert_vmr_conc : bool attempt to convert vmr data to conc if user requires conc (e.g. user wants conco3 but file only contains vmro3), and vice versa. ensure_correct_freq : bool if True, the frequency set in NASA Ames files (provided via attr *resolution_code*) is checked using time differences inferred from start and stop time of each measurement. Measurements that are not in that resolution (within 5% tolerance level) will be flagged invalid. freq_from_start_stop_meas : bool infer frequency from start / stop intervals of individual measurements. freq_min_cov : float defines minimum number of measurements that need to correspond to the detected sampling frequency in the file within the specified tolerance range. Only applies if :attr:`ensure_correct_freq` is True. E.g. if a file contains 100 measurements and the most common frequency (as inferred from stop-start of each measurement) is daily. Then, if `freq_min_cov` is 0.75, it will be ensured that at least 75 of the measurements are daily (within +/- 5% tolerance), otherwise this file is discarded. Defaults to 0. Parameters ---------- **args key / value pairs specifying any of the supported settings. """ #: Names of options that correspond to reading filter constraints _FILTER_IDS = ["prefer_statistics", "wavelength_tol_nm"] def __init__(self, **args): self.prefer_statistics = ["arithmetic mean", "median"] self.ignore_statistics = ["percentile:15.87", "percentile:84.13"] self.wavelength_tol_nm = 50 self.shift_wavelengths = True self.assume_default_ae_if_unavail = True self.check_correct_MAAP_wrong_wvl = False self.eval_flags = True self.keep_aux_vars = False self.convert_units = True self.try_convert_vmr_conc = True self.ensure_correct_freq = False self.freq_from_start_stop_meas = True self.freq_min_cov = 0.0 self.update(**args) @property def filter_dict(self): d = {} for n in self._FILTER_IDS: d[n] = self[n] return d
[docs] class ReadEbas(ReadUngriddedBase): """Interface for reading EBAS data Parameters ---------- data_id string specifying either of the supported datasets that are defined in ``SUPPORTED_DATASETS`` data_dir : str directory where data is located (NOTE: needs to point to the directory that contains the "ebas_file_index.sqlite3" file and not to the underlying directory "data" which contains the actual NASA Ames files.) TODO ---- - Check for negative values vs. detection limit - Read uncertainties from percentiles (where available) """ #: version log of this class (for caching) __version__ = "0.52_" + ReadUngriddedBase.__baseversion__ #: Name of dataset (OBS_ID) DATA_ID = const.EBAS_MULTICOLUMN_NAME #: Name of subdirectory containing data files (relative to #: :attr:`data_dir`) FILE_SUBDIR_NAME = "data" #: Name of sqlite database file SQL_DB_NAME = "ebas_file_index.sqlite3" #: List of all datasets supported by this interface SUPPORTED_DATASETS = [const.EBAS_MULTICOLUMN_NAME] #: For the following data IDs, the sqlite database file will be cached if #: const.EBAS_DB_LOCAL_CACHE is True CACHE_SQLITE_FILE = [const.EBAS_MULTICOLUMN_NAME] TS_TYPE = "undefined" MERGE_STATIONS = { "Birkenes": "Birkenes II", "Rörvik": "Råö", "Vavihill": "Hallahus", "Virolahti II": "Virolahti III", } # 'Trollhaugen' : 'Troll'} #: Temporal resolution codes that (so far) can be understood by pyaerocom TS_TYPE_CODES = { "1mn": "minutely", "1h": "hourly", "1d": "daily", "1w": "weekly", "1mo": "monthly", "mn": "minutely", "h": "hourly", "d": "daily", "w": "weekly", "mo": "monthly", } #: variables required for computation of auxiliary variables AUX_REQUIRES = { "sc550dryaer": ["sc550aer", "scrh"], "sc440dryaer": ["sc440aer", "scrh"], "sc700dryaer": ["sc700aer", "scrh"], "ac550dryaer": ["ac550aer", "acrh"], "ang4470dryaer": ["sc440dryaer", "sc700dryaer"], "wetoxs": ["concprcpoxs", "pr"], "wetoxsc": ["concprcpoxsc", "pr"], "wetoxst": ["concprcpoxst", "pr"], "wetoxn": ["concprcpoxn", "pr"], "wetrdn": ["concprcprdn", "pr"], "wetso4": ["concprcpso4", "pr"], "wetna": ["concprcpna", "pr"], "wetno3": ["concprcpno3", "pr"], "wetnh4": ["concprcpnh4", "pr"], "vmro3max": ["vmro3"], # proxy drydep # Suphar based "proxydryoxs": ["concprcpoxs", "pr"], "proxydryss": ["concprcpna", "pr"], "proxydryna": ["concprcpna", "pr"], "proxydryso2": ["concprcpoxs", "pr"], "proxydryso4": ["concprcpoxs", "pr"], # Oxidized nitrogen based "proxydryoxn": ["concprcpoxn", "pr"], "proxydryno2": ["concprcpoxn", "pr"], "proxydryno2no2": ["concprcpoxn", "pr"], "proxydryhono": ["concprcpoxn", "pr"], "proxydryn2o5": ["concprcpoxn", "pr"], "proxydryhno3": ["concprcpoxn", "pr"], "proxydryno3c": ["concprcpoxn", "pr"], "proxydryno3f": ["concprcpoxn", "pr"], # Reduced nitrogen based "proxydryrdn": ["concprcprdn", "pr"], "proxydrynh3": ["concprcprdn", "pr"], "proxydrynh4": ["concprcprdn", "pr"], # Other "proxydryo3": ["vmro3"], "proxydrypm10": ["concprcpoxs", "pr"], "proxydrypm25": ["concprcpoxs", "pr"], # proxy wetdep # Suphar based "proxywetoxs": ["concprcpoxs", "pr"], "proxywetso2": ["concprcpoxs", "pr"], "proxywetso4": ["concprcpoxs", "pr"], # Oxidized nitrogen based "proxywetoxn": ["concprcpoxn", "pr"], "proxywetno2": ["concprcpoxn", "pr"], "proxywetno2no2": ["concprcpoxn", "pr"], "proxywethono": ["concprcpoxn", "pr"], "proxywetn2o5": ["concprcpoxn", "pr"], "proxywethno3": ["concprcpoxn", "pr"], "proxywetno3c": ["concprcpoxn", "pr"], "proxywetno3f": ["concprcpoxn", "pr"], # Reduced nitrogen based "proxywetrdn": ["concprcprdn", "pr"], "proxywetnh3": ["concprcprdn", "pr"], "proxywetnh4": ["concprcprdn", "pr"], # Other "proxyweto3": ["vmro3"], "proxywetpm10": ["concprcpoxs", "pr"], "proxywetpm25": ["concprcpoxs", "pr"], } #: Meta information supposed to be migrated to computed variables AUX_USE_META = { "sc550dryaer": "sc550aer", "sc440dryaer": "sc440aer", "sc700dryaer": "sc700aer", "ac550dryaer": "ac550aer", } #: Functions supposed to be used for computation of auxiliary variables AUX_FUNS = { "sc440dryaer": compute_sc440dryaer, "sc550dryaer": compute_sc550dryaer, "sc700dryaer": compute_sc700dryaer, "ac550dryaer": compute_ac550dryaer, "ang4470dryaer": compute_ang4470dryaer_from_dry_scat, "wetoxs": compute_wetoxs_from_concprcpoxs, "wetoxsc": compute_wetoxs_from_concprcpoxsc, "wetoxst": compute_wetoxs_from_concprcpoxst, "wetoxn": compute_wetoxn_from_concprcpoxn, "wetrdn": compute_wetrdn_from_concprcprdn, "wetnh4": compute_wetnh4_from_concprcpnh4, "wetno3": compute_wetno3_from_concprcpno3, "wetso4": compute_wetso4_from_concprcpso4, "wetna": compute_wetna_from_concprcpna, "vmro3max": calc_vmro3max, # proxy dry dep # Suphar based "proxydryoxs": compute_wetoxs_from_concprcpoxs, "proxydryss": compute_wetna_from_concprcpna, "proxydryna": compute_wetna_from_concprcpna, "proxydryso2": compute_wetoxs_from_concprcpoxs, "proxydryso4": compute_wetoxs_from_concprcpoxs, # Oxidized nitrogen based "proxydryoxn": compute_wetoxn_from_concprcpoxn, "proxydryno2": compute_wetoxn_from_concprcpoxn, "proxydryno2no2": compute_wetoxn_from_concprcpoxn, "proxydryhono": compute_wetoxn_from_concprcpoxn, "proxydryn2o5": compute_wetoxn_from_concprcpoxn, "proxydryhno3": compute_wetoxn_from_concprcpoxn, "proxydryno3c": compute_wetoxn_from_concprcpoxn, "proxydryno3f": compute_wetoxn_from_concprcpoxn, # Reduced nitrogen based "proxydryrdn": compute_wetrdn_from_concprcprdn, "proxydrynh3": compute_wetrdn_from_concprcprdn, "proxydrynh4": compute_wetrdn_from_concprcprdn, # Other "proxydryo3": make_proxy_drydep_from_O3, "proxydrypm10": compute_wetoxs_from_concprcpoxs, "proxydrypm25": compute_wetoxs_from_concprcpoxs, # proxy wet dep # Suphar based "proxywetoxs": compute_wetoxs_from_concprcpoxs, "proxywetso2": compute_wetoxs_from_concprcpoxs, "proxywetso4": compute_wetoxs_from_concprcpoxs, # Oxidized nitrogen based "proxywetoxn": compute_wetoxn_from_concprcpoxn, "proxywetno2": compute_wetoxn_from_concprcpoxn, "proxywetno2no2": compute_wetoxn_from_concprcpoxn, "proxywethono": compute_wetoxn_from_concprcpoxn, "proxywetn2o5": compute_wetoxn_from_concprcpoxn, "proxywethno3": compute_wetoxn_from_concprcpoxn, "proxywetno3c": compute_wetoxn_from_concprcpoxn, "proxywetno3f": compute_wetoxn_from_concprcpoxn, # Reduced nitrogen based "proxywetrdn": compute_wetrdn_from_concprcprdn, "proxywetnh3": compute_wetrdn_from_concprcprdn, "proxywetnh4": compute_wetrdn_from_concprcprdn, # Other "proxyweto3": make_proxy_wetdep_from_O3, "proxywetpm10": compute_wetoxs_from_concprcpoxs, "proxywetpm25": compute_wetoxs_from_concprcpoxs, } #: Custom reading options for individual variables. Keys need to be valid #: attributes of :class:`ReadEbasOptions` and anything specified here (for #: a given variable) will be overwritten from the defaults specified in #: the options class. VAR_READ_OPTS = { "pr": dict(convert_units=False, freq_min_cov=0.75), "prmm": dict(freq_min_cov=0.75), } ASSUME_AAE_SHIFT_WVL = 1.0 ASSUME_AE_SHIFT_WVL = 1 # .5 #: list of EBAS data files that are flagged invalid and will not be imported IGNORE_FILES = [ "CA0420G.20100101000000.20190125102503.filter_absorption_photometer.aerosol_absorption_coefficient.aerosol.1y.1h.CA01L_Magee_AE31_ALT.CA01L_aethalometer.lev2.nas", "DK0022R.20180101070000.20191014000000.bulk_sampler..precip.1y.15d.DK01L_bs_22.DK01L_IC.lev2.nas", "DK0012R.20180101070000.20191014000000.bulk_sampler..precip.1y.15d.DK01L_bs_12.DK01L_IC.lev2.nas", "DK0008R.20180101070000.20191014000000.bulk_sampler..precip.1y.15d.DK01L_bs_08.DK01L_IC.lev2.nas", "DK0005R.20180101070000.20191014000000.bulk_sampler..precip.1y.15d.DK01L_bs_05.DK01L_IC.lev2.nas", ] #: Ignore data columns in NASA Ames files that contain any of the listed #: attributes IGNORE_COLS_CONTAIN = ["fraction", "artifact"] # list of all available resolution codes (extracted from SQLite database) # 1d 1h 1mo 1w 4w 30mn 2w 3mo 2d 3d 4d 12h 10mn 2h 5mn 6d 3h 15mn #: List of variables that are provided by this dataset (will be extended #: by auxiliary variables on class init, for details see __init__ method of #: base class ReadUngriddedBase) def __init__(self, data_id=None, data_dir=None): super().__init__(data_id=data_id, data_dir=data_dir) self._opts = {"default": ReadEbasOptions()} # self.opts = ReadEbasOptions() #: loaded instances of aerocom variables (instances of #: :class:`Variable` object, is written in get_file_list self._loaded_aerocom_vars = {} #: loaded instances of variables in EBAS namespace (instances of #: :class:`EbasVarInfo` object, is updated in read_file self._loaded_ebas_vars = {} self._file_dir = None self.files_failed = [] self._read_stats_log = BrowseDict() #: SQL database interface class used to retrieve file paths for vars self._file_index = None self.sql_requests = [] #: original file lists retrieved for each variable individually using #: SQL request. Since some of the files in the lists for each variable #: might occur in multiple lists, these are merged into a single list #: self.files and information about which variables are to be extracted #: for each file is stored in attribute files_contain #: Originally retrieved file lists from SQL database, for each variable #: individually self._lists_orig = {} #: this is filled in method get_file_list and specifies variables #: to be read from each file self.files_contain = [] self._all_stats = None @property def DEFAULT_VARS(self): """ list: list of default variables to be read Note ---- Currently a wrapper for :attr:`PROVIDES_VARIABLES` """ return self.PROVIDES_VARIABLES @property def file_dir(self): """Directory containing EBAS NASA Ames files""" if self._file_dir is not None: return self._file_dir return os.path.join(self.data_dir, self.FILE_SUBDIR_NAME) @file_dir.setter def file_dir(self, val): if not isinstance(val, str) or not os.path.exists(val): raise FileNotFoundError("Input directory does not exist") self._file_dir = val @property def file_index(self): """SQlite file mapping metadata with filenames""" if self._file_index is None: self._file_index = EbasFileIndex(self.sqlite_database_file) return self._file_index @property def FILE_REQUEST_OPTS(self): """List of options for file retrieval""" return list(EbasSQLRequest()) @property def _FILEMASK(self): raise AttributeError( "Irrelevant for EBAS implementation, since SQL " "database is used for finding valid files" ) @property def NAN_VAL(self): """Irrelevant for implementation of EBAS I/O""" raise AttributeError( "Irrelevant for EBAS implementation: Info about " "invalid measurements is extracted from header of " "NASA Ames files for each variable individually " ) @property def PROVIDES_VARIABLES(self): """List of variables provided by the interface""" return EbasVarInfo.PROVIDES_VARIABLES() @property def sqlite_database_file(self): """Path to EBAS SQL database""" dbname = self.SQL_DB_NAME loc_remote = os.path.join(self.data_dir, dbname) if self.data_id in self.CACHE_SQLITE_FILE and const.EBAS_DB_LOCAL_CACHE: loc_local = os.path.join(const.CACHEDIR, dbname) return _check_ebas_db_local_vs_remote(loc_remote, loc_local) return loc_remote def _merge_lists(self, lists_per_var): """Merge dictionary of lists for each variable into one list Note ---- In addition to writing the retrieved file list into :attr:`files`, this method also fills the list :attr:`files_contain` which (by index) defines variables to read for each file path in :attr:`files` Parameters ---------- lists_per_var : dict dictionary containing file lists (values) for a set of variables (keys) Returns ------- list merged file list (is also written into :attr:`files`) """ # original lists are modified, so make a copy of them # lists = deepcopy(lists_per_var) lists = lists_per_var mapping = {} for var, lst in lists.items(): for fpath in lst: mapping[fpath] = [var] for other_var, other_lst in lists.items(): if not var == other_var: try: other_lst.pop(other_lst.index(fpath)) mapping[fpath].append(other_var) except ValueError: pass self.logger.info(f"Number of files to read reduced to {len(mapping)}") files, files_contain = [], [] for path, contains_vars in mapping.items(): files.append(path) files_contain.append(contains_vars) self.files = files self.files_contain = files_contain return files @property def all_station_names(self): # ToDo: this should probably not be part of this class """List of all available station names in EBAS database""" if self._all_stats is None: self._all_stats = self.file_index.ALL_STATION_NAMES return self._all_stats def _find_station_matches(self, stats_or_patterns): # ToDo: this should probably not be part of this class """Find all stations names that match input list of names or patterns""" val = stats_or_patterns all_stats = self.all_station_names stats = [] if isinstance(val, str): val = [val] if not isinstance(val, list): raise ValueError("Need list or string...") for name in val: if "*" in name: # handle wildcard for stat in all_stats: if fnmatch.fnmatch(stat, name): stats.append(stat) elif name in all_stats: stats.append(name) else: logger.warning(f"Ignoring station_names input {name}. No match could be found") if not bool(stats): raise FileNotFoundError( f"No EBAS data files could be found for stations {stats_or_patterns}" ) return list(set(stats)) def _precheck_vars_to_retrieve(self, vars_to_retrieve): """ Make sure input variables are supported and are only provided once Parameters ---------- vars_to_retrieve : str or list, or similar make sure input variable names are in AeroCom convention Raises ------ ValueError if the same variable is input more than one time VariableDefinitionError if one of the input variables is not supported Returns ------- list list of variables to be retrieved """ if vars_to_retrieve is None: vars_to_retrieve = self.DEFAULT_VARS elif isinstance(vars_to_retrieve, str): vars_to_retrieve = [vars_to_retrieve] out = [] for var in vars_to_retrieve: out.append(self.var_info(var).var_name_aerocom) return out def _check_add_station_filter_sqlquery(self, constraints): if "station_names" in constraints: stat_matches = self._find_station_matches(constraints["station_names"]) constraints["station_names"] = stat_matches return constraints
[docs] def get_file_list(self, vars_to_retrieve, **constraints): """Get list of files for all variables to retrieve Parameters ---------- vars_to_retrieve : list or str list of variables that are supposed to be read **constraints further EBAS request constraints deviating from default (default info for each AEROCOM variable can be found in `ebas_config.ini < https://github.com/metno/pyaerocom/blob/master/pyaerocom/data/ ebas_config.ini>`__). For details on possible input parameters see :class:`EbasSQLRequest` (or `this tutorial <http://aerocom.met.no /pyaerocom/tutorials.html#ebas-file-query-and-database-browser>`__) Returns ------- list unified list of file paths each containing either of the specified variables """ if isinstance(vars_to_retrieve, str): vars_to_retrieve = [vars_to_retrieve] # make sure variable names are input correctly vars_to_retrieve = self._precheck_vars_to_retrieve(vars_to_retrieve) self.logger.info("Fetching data files. This might take a while...") db = self.file_index files_vars = {} files_aux_req = {} logger.info(f"Retrieving EBAS files for variables\n{vars_to_retrieve}") # directory containing NASA Ames files filedir = self.file_dir for var in vars_to_retrieve: info = self.get_ebas_var(var) try: constraints = self._check_add_station_filter_sqlquery(constraints) except FileNotFoundError: # skip this variable continue requests = info.make_sql_requests(**constraints) for _var, req in requests.items(): filenames = db.get_file_names(req) self.sql_requests.append(req) paths = [] for file in filenames: if file in self.IGNORE_FILES: logger.info(f"Ignoring flagged file {file}") continue paths.append(os.path.join(filedir, file)) if _var in vars_to_retrieve: # this variable is actually to be imported files_vars[_var] = sorted(paths) else: # auxiliary variable that is needed to compute one of the # input variables. This variable potentially also requires # other variables to be read, and the corresponding file # list are merged in a separate step below. files_aux_req[_var] = sorted(paths) for _var in vars_to_retrieve: if not _var in files_vars: files_vars[_var] = self._merge_auxvar_lists(_var, files_aux_req) self._lists_orig = files_vars files = self._merge_lists(files_vars) if len(files) == 0: raise FileNotFoundError( f"No files could be found for {vars_to_retrieve} and reading " f"constraints {constraints}." ) return files
def _merge_auxvar_lists(self, aux_var, files_aux_req): """ Merge lists of variables required for input aux_var Parameters ---------- aux_var : str Name of auxiliary variable that is computed from other variables. Must be contained in :attr:`AUX_REQUIRES` and must have entry `requires` in file `ebas_config.ini`. files_aux_req : dict dictionary containing file lists for all auxiliary variables needed to compute input variable `aux_var` (see :attr:`AUX_REQUIRES`). Returns ------- list list of all filenames that contain all required variables for input variable `aux_var`. """ _required = self.get_ebas_var(aux_var).requires if _required is None: return [] remaining = None for _vreq in _required: if not _vreq in files_aux_req: return [] _lst = files_aux_req[_vreq] if remaining is None: remaining = _lst else: remaining = np.intersect1d(remaining, _lst) return remaining def _get_var_cols(self, ebas_var_info, data): """Get all columns in NASA Ames file matching input Aerocom variable Note ---- For developers: All Aerocom variable definitions should go into file *variables.ini* in pyaerocom data directory. All Ebas variable definitions for each Aerocom variable should go into file *ebas_config.ini* where section names are Aerocom namespace and contain import constraints. Parameters ----------- ebas_var_info : EbasVarInfo EBAS variable information (e.g. for ac550aer) data : EbasNasaAmesFile loaded EBAS file data Returns ------- list list specifying column matches Raises ------ NotInFileError if no column in file matches variable specifications """ var = ebas_var_info.var_name opts = self.get_read_opts(var) if ebas_var_info.component is None: raise NotInFileError col_matches = [] check_matrix = False if ebas_var_info["matrix"] is None else True check_stats = False if ebas_var_info["statistics"] is None else True comps = [] for colnum, col_info in enumerate(data.var_defs): if col_info.name in ebas_var_info.component: # candidate (name match) ok = True if check_matrix: if "matrix" in col_info: matrix = col_info["matrix"] else: matrix = data.matrix if not matrix in ebas_var_info["matrix"]: ok = False if ok and "statistics" in col_info: # ALWAYS ignore columns containing statistics flagged in # ignore_statistics if col_info["statistics"] in opts.ignore_statistics: ok = False elif check_stats: if not col_info["statistics"] in ebas_var_info["statistics"]: ok = False for key in self.IGNORE_COLS_CONTAIN: if key in col_info: ok = False logger.warning(f"\nignore column {col_info}") break if ok: col_matches.append(colnum) if not col_info.name in comps: comps.append(col_info.name) if len(col_matches) == 0: raise NotInFileError(f"Variable {ebas_var_info.var_name} could not be found in file") elif len(comps) > 1 and len(ebas_var_info.component) > 1: for prefcomp in ebas_var_info.component: if not prefcomp in comps: continue col_matches = [ colnum for colnum in col_matches if prefcomp == data.var_defs[colnum].name ] break return col_matches def _resolve_units_cols(self, var_name, result_col, file): """ Identify data column with the correct unit for input variable For instance, O3 is sometimes reported both as vmro3 (nmole mole-1) and conco3 (ug m-3) in different columns of the file. Parameters ---------- var_name : str AeroCom variable name. result_col : list list of columns numbers to be checked. file : EbasNasaAmesFile file data. Returns ------- list list of column numbers that match the correct units. """ to_unit = str(self.var_info(var_name).units) if not to_unit in ("", "1"): _cols = [] for colnum in result_col: try: from_unit = file.var_defs[colnum].units get_unit_conversion_fac(from_unit, to_unit, var_name) _cols.append(colnum) except UnitConversionError: continue if len(_cols) > 0: result_col = _cols return result_col def _resolve_meas_height_cols(self, result_col, file): """ Identify data column(s) with the lowest tower inlet height Parameters ---------- result_col : list list of columns numbers to be checked. All columns need to have attr. `tower_inlet_height`. file : EbasNasaAmesFile file data. Raises ------ ValueError If conversion of tower_inlet_height values into float fails. Returns ------- list list containing columns that match the lowest tower inlet height (usually only 1 column, but who knows...) """ lowest = 1e9 matches = [] for col in result_col: heightstr = file.var_defs[col]["tower_inlet_height"] if not heightstr.endswith(" m"): raise ValueError( f"value of tower_inlet_height in col {col} " f"is invalid: {heightstr} (needs to end " f"with m)" ) height = float(heightstr.split()[0]) if height < lowest: matches = [col] lowest = height elif height == lowest: matches.append(col) assert len(matches) > 0 return matches def _find_best_data_column(self, cols, ebas_var_info, file, check_units_on_multimatch=True): """Find best match of data column for variable in multiple columns This method is supposed to be used in case no unique match can be found for a given variable. For instance, if ``ac550aer`` """ var = ebas_var_info["var_name"] opts = self.get_read_opts(var) preferred_matrix = None idx_best_matrix_found = 9999 matrix_matches = [] # first find best column match with if ebas_var_info["matrix"] is not None: preferred_matrix = ebas_var_info["matrix"] for colnum in cols: col_info = file.var_defs[colnum] if "matrix" in col_info: if preferred_matrix is None: raise OSError( f"Data file contains multiple column matches for variable {var}, " f"some of which specify different data type matrices. Aerocom " f"import information for this variable, however, " f"does not contain information about preferred matrix. " f"Please resolve by adding preferred matrix information for " f"{var} in corresponding section of ebas_config.ini file" ) matrix = col_info["matrix"] if matrix in preferred_matrix: idx = preferred_matrix.index(matrix) if idx < idx_best_matrix_found: idx_best_matrix_found = idx matrix_matches = [] matrix_matches.append(colnum) elif idx == idx_best_matrix_found: matrix_matches.append(colnum) if idx_best_matrix_found == 9999: matrix_matches = cols if len(matrix_matches) == 1: return matrix_matches[0] preferred_statistics = opts.prefer_statistics idx_best_statistics_found = 9999 result_col = [] if ebas_var_info["statistics"] is not None: preferred_statistics = ebas_var_info["statistics"] for colnum in matrix_matches: col_info = file.var_defs[colnum] if "statistics" in col_info: stats = col_info["statistics"] elif "statistics" in file.meta: stats = file.meta["statistics"] else: raise EbasFileError( f"Cannot infer data statistics for data column {col_info}. " f"Neither column nor file meta specifications include information " f"about data statistics" ) if stats in preferred_statistics: idx = preferred_statistics.index(stats) if idx < idx_best_statistics_found: idx_best_statistics_found = idx result_col = [] result_col.append(colnum) elif idx == idx_best_statistics_found: result_col.append(colnum) if len(result_col) > 1 and check_units_on_multimatch: result_col = self._resolve_units_cols(var, result_col, file) add_msg = "" if len(result_col) > 1 and file.all_cols_contain(result_col, "tower_inlet_height"): try: result_col = self._resolve_meas_height_cols(result_col, file) except (ValueError, AssertionError) as e: add_msg += f"\n{repr(e)}" if len(result_col) > 1: comp = ebas_var_info["component"] startstop = f"{file.time_stamps[0]} - {file.time_stamps[-1]}" msg = ( f"\n\nFATAL: could not resolve unique data column for " f"{var} (EBAS varname: {comp})\nData period: {startstop}), " f"\nStation {file.station_name} (col matches: {result_col})" ) for col in result_col: msg += f"\nColumn {col}\n{file.var_defs[col]}" msg += f"\nFilename: {file.file_name}" msg += add_msg msg += "\n\nTHIS FILE WILL BE SKIPPED\n" logger.warning(msg) raise ValueError("failed to identify unique data column") return result_col[0] def _add_meta(self, data_out, file): meta = file.meta name = meta["station_name"].replace("/", ";") data_out["framework"] = file.project_association data_out["filename"] = os.path.basename(file.file) data_out["data_id"] = self.data_id data_out["PI"] = file["data_originator"] data_out["station_id"] = meta["station_code"] data_out["set_type_code"] = meta["set_type_code"] data_out["station_name"] = name if name in self.MERGE_STATIONS: data_out["station_name"] = self.MERGE_STATIONS[name] data_out["station_name_orig"] = name else: data_out["station_name"] = name # write meta information tres_code = meta["resolution_code"] try: ts_type = self.TS_TYPE_CODES[tres_code] except KeyError: ival = re.findall(r"\d+", tres_code)[0] code = tres_code.split(ival)[-1] if not code in self.TS_TYPE_CODES: raise NotImplementedError(f"Cannot handle EBAS resolution code {tres_code}") ts_type = ival + self.TS_TYPE_CODES[code] self.TS_TYPE_CODES[tres_code] = ts_type data_out["ts_type"] = ts_type # altitude of station try: altitude = float(meta["station_altitude"].split(" ")[0]) except Exception: altitude = np.nan try: meas_height = float(meta["measurement_height"].split(" ")[0]) except KeyError: meas_height = 0.0 data_alt = altitude + meas_height # file specific meta information # data_out.update(meta) data_out["latitude"] = float(meta["station_latitude"]) data_out["longitude"] = float(meta["station_longitude"]) data_out["altitude"] = data_alt data_out["meas_height"] = meas_height data_out["station_altitude"] = altitude data_out["instrument_name"] = meta["instrument_name"] data_out["instrument_type"] = meta["instrument_type"] data_out["matrix"] = meta["matrix"] data_out["revision_date"] = file["revision_date"] setting, land_use, gaw_type, lev = None, None, None, None if "station_setting" in meta: setting = meta["station_setting"] if "station_land_use" in meta: land_use = meta["station_land_use"] if "station_gaw_type" in meta: gaw_type = meta["station_gaw_type"] if "data_level" in meta: try: lev = int(meta["data_level"]) except Exception: pass data_out["station_setting"] = setting data_out["station_land_use"] = land_use data_out["station_gaw_type"] = gaw_type # NOTE: may be also defined per column in attr. var_defs data_out["data_level"] = lev return data_out def _find_wavelength_matches(self, col_matches, file, var_info): """Find columns with wavelength closes to variable wavelength""" min_diff_wvl = 1e6 opts = self.get_read_opts(var_info.var_name) matches = [] # get wavelength of column and tolerance wvl = var_info.wavelength_nm wvl_low = wvl - opts.wavelength_tol_nm wvl_high = wvl + opts.wavelength_tol_nm for colnum in col_matches: colinfo = file.var_defs[colnum] if not "wavelength" in colinfo: logger.warning( f"Ignoring column {colnum}\n{colinfo}\nVar {var_info.var_name}: " f"column misses wavelength specification!" ) continue wvl_col = colinfo.get_wavelength_nm() # wavelength is in tolerance range if wvl_low <= wvl_col <= wvl_high: wvl_diff = wvl_col - wvl if abs(wvl_diff) < abs(min_diff_wvl): # the wavelength difference of this column to # the desired wavelength of the variable is # smaller than any of the detected before, so # ignore those from earlier columns by reinit # of the matches list min_diff_wvl = wvl_diff matches = [] matches.append(colnum) elif wvl_diff == min_diff_wvl: matches.append(colnum) return (matches, min_diff_wvl) def _find_closest_wavelength_cols(self, col_matches, file, var_info): """ Find data column with wavelength closest to desired wavelength """ min_diff_wvl = 1e6 matches = [] # get wavelength of column and tolerance wvl = var_info.wavelength_nm for colnum in col_matches: colinfo = file.var_defs[colnum] if not "wavelength" in colinfo: logger.warning( f"Ignoring column {colnum} ({colinfo}) in EBAS file for reading var {var_info}: " f"column misses wavelength specification" ) continue wvl_col = colinfo.get_wavelength_nm() # wavelength is in tolerance range diff = abs(wvl_col - wvl) if diff < min_diff_wvl: min_diff_wvl = diff matches = [colnum] elif diff == min_diff_wvl: matches.append(colnum) return (matches, min_diff_wvl) def _check_shift_wavelength(self, var, col_info, meta, data): """ Where applicable, shift wavelength of input data to another wavelegnth Applies to cases where input variable corresponds to a wavelength (e.g. ac550aer corresponds to 550nm) but EBAS measurement was performed at another wavelength (e.g. 520nm). In this case, the data is shifted to the wavelength of that variable using an assumed Angstrom Exponent (:attr:`ASSUME_AE_SHIFT_WVL`). Parameters ---------- var : str variable na,e col_info : EbasColDef EBAS file column information meta : dict EBAS file metadata data : ndarray array containing variable data Raises ------ EbasFileError if variable is wavelength dependent but NotImplementedError if option to shift wavelength is activated but option `assume_default_ae_if_unavail` is set False. Returns ------- data : ndarray modified input data """ _col = col_info vi = self.var_info(var) opts = self.get_read_opts(var) # make sure this variable has wavelength set # vi.ensure_wavelength_avail() if vi.is_wavelength_dependent: if not "wavelength" in _col: raise EbasFileError( f"Cannot access column wavelength information for variable {var}" ) wvlcol = _col.get_wavelength_nm() # HARD CODED FIX FOR INVALID WAVELENGTH IN ABSCOEFF EBAS FILES if var == "ac550aer" and opts.check_correct_MAAP_wrong_wvl: instr = meta["instrument_name"] if any([x in instr for x in ["MAAP", "Thermo"]]) and wvlcol != 637: _col["wavelength_WRONG_EBAS"] = f"{wvlcol} nm" _col["wavelength_nm_WRONG_EBAS"] = wvlcol _col["wavelength"] = "637 nm" _col["wavelength_WRONG_EBAS_INFO"] = ( "Wavelength of MAAP / Thermo absorption instruments " "is sometimes reported wrongly, in most cases 670nm " "is specified in the EBAS files. Please contact " "EBAS team if you have any questions regarding this" ) wvlcol = 637 _col["wavelength_nm"] = wvlcol if opts.shift_wavelengths: towvl = vi.wavelength_nm if wvlcol != towvl: # ToDo: add AE if available if opts.assume_default_ae_if_unavail: if var.startswith("ac"): ae = self.ASSUME_AAE_SHIFT_WVL elif var.startswith("sc"): ae = self.ASSUME_AE_SHIFT_WVL else: raise NotImplementedError(f"No Angstrom exponent specified for {var}") avg_before = np.nanmean(data) data = self._shift_wavelength( vals=data, from_wvl=wvlcol, to_wvl=towvl, angexp=ae ) avg = np.nanmean(data) diff = (avg - avg_before) / avg_before * 100 _col["wvl_adj"] = True _col["from_wvl"] = wvlcol _col["wvl_adj_angstrom"] = ae _col["wvl_adj_diff"] = f"{diff:.2f} %" _col["wavelength"] = f"{towvl:.1f} nm" _col["wavelength_nm"] = towvl else: raise NotImplementedError( "Cannot correct for wavelength shift, need Angstrom Exp." ) return data def _shift_wavelength(self, vals, from_wvl, to_wvl, angexp): return vals * (from_wvl / to_wvl) ** angexp
[docs] def find_var_cols(self, vars_to_read, loaded_nasa_ames): """Find best-match variable columns in loaded NASA Ames file For each of the input variables, try to find one or more matches in the input NASA Ames file (loaded data object). If more than one match occurs, identify the best one (an example here is: user wants sc550aer and file contains scattering coefficients at 530 nm and 580 nm: in this case the 530 nm column will be used, cf. also accepted wavelength tolerance for reading of wavelength dependent variables :attr:`wavelength_tol_nm`). Parameters ---------- vars_to_read : list list of variables that are supposed to be read loaded_nasa_ames : EbasNasaAmesFile loaded data object Returns ------- dict dictionary specifying the best-match variable column for each of the input variables. """ file = loaded_nasa_ames # dict containing variable column matches _vc = {} # Loop over all variables that are supposed to be read for var in vars_to_read: # get corresponding EBAS variable info ... ebas_var_info = self.get_ebas_var(var) # ... and AeroCom variable definition var_info = self.var_info(var) # reading options opts = self.get_read_opts(var) # Find all columns in file that match the current variable # There may be multiple matches, e.g. because the variable may # be sampled at different wavelenghts or there may be different # statistics applied, or there may be different matrices # available (e.g. aerosol, pm10, pm25) try: col_matches = self._get_var_cols(ebas_var_info, file) except NotInFileError: logger.warning( f"Variable {var} (EBAS name(s): {ebas_var_info.component}) is missing " f"in file {os.path.basename(file.file)} (start: {file.base_date})" ) continue # if AeroCom variable has a wavelength specified, find the column(s) # that are closest to this wavelength. There may be multiple column # matches, e.g. due to different statistics or matrix columns, these # will be sorted out below. if var_info.wavelength_nm is not None: if opts.shift_wavelengths: (col_matches, diff) = self._find_closest_wavelength_cols( col_matches, file, var_info ) else: (col_matches, diff) = self._find_wavelength_matches( col_matches, file, var_info ) if bool(col_matches): _vc[var] = col_matches if not len(_vc) > 0: raise NotInFileError( f"None of the specified variables {vars_to_read} could be " f"found in file {os.path.basename(file.file)}" ) var_cols = {} for var, cols in _vc.items(): if len(cols) == 1: col = cols[0] else: col = self._find_best_data_column(cols, self.get_ebas_var(var), file) var_cols[var] = col return var_cols
def _try_get_pt_conversion(self, meta): if "volume_std._temperature" in meta and "volume_std._pressure" in meta: try: pstr, punit = meta["volume_std._pressure"].split() tstr, tunit = meta["volume_std._temperature"].split() pconv = get_unit_conversion_fac(punit, "Pa") tconv = get_unit_conversion_fac(tunit, "K") p = float(pstr) * pconv T = float(tstr) * tconv except Exception: raise MetaDataError( "Failed to convert information strings for p and T into " "floating point numbers with units P and K, respectively" ) return (p, T) raise MetaDataError("Info not available in metadata") def _try_convert_vmr_conc(self, data_out, var, var_info, meta): mmol = get_molmass(var) mmol_air = get_molmass("air_dry") from_unit = var_info["units"] to_unit = self.var_info(var).units try: # try get pressure and temperature used for unit conversion from # column info p, T = self._try_get_pt_conversion(var_info) except MetaDataError: try: # try get pressure and temperature used for unit conversion from # file metadata (could happen for single column NASA Ames files) p, T = self._try_get_pt_conversion(meta) except MetaDataError: # use US standard pressure and temperature p, T = p0, T0_STD if var.startswith("vmr"): # assume variable is concX concvar = var.replace("vmr", "conc") to_unit_pre = self.var_info(concvar).units cfac_pre = get_unit_conversion_fac(from_unit, to_unit_pre, var_name=concvar) cfac = concx_to_vmrx( data=1, p_pascal=p, T_kelvin=T, conc_unit=to_unit_pre, mmol_var=mmol, mmol_air=mmol_air, to_unit=to_unit, ) cfac *= cfac_pre elif var.startswith("conc"): # assume variable is vmrX cfac = vmrx_to_concx( data=1, p_pascal=p, T_kelvin=T, vmr_unit=from_unit, mmol_var=mmol, mmol_air=mmol_air, to_unit=to_unit, ) else: raise UnitConversionError("Data is neither vmr nor conc") data_out.var_info[var]["units"] = to_unit data_out.var_info[var]["converted_from_units"] = from_unit data_out.var_info[var]["units_conv_fac"] = cfac data_out[var] *= cfac return data_out
[docs] def get_ebas_var(self, var_name): """Get instance of :class:`EbasVarInfo` for input AeroCom variable""" if not var_name in self._loaded_ebas_vars: self._loaded_ebas_vars[var_name] = EbasVarInfo(var_name) return self._loaded_ebas_vars[var_name]
[docs] def read_file( self, filename, vars_to_retrieve=None, _vars_to_read=None, _vars_to_compute=None ): """Read EBAS NASA Ames file Parameters ---------- filename : str absolute path to filename to read vars_to_retrieve : :obj:`list`, optional list of str with variable names to read, if None (and if not both of the alternative possible parameters ``_vars_to_read`` and ``_vars_to_compute`` are specified explicitely) then the default settings are used Returns ------- StationData dict-like object containing results """ # implemented in base class if _vars_to_read is None or _vars_to_compute is None: (vars_to_read, vars_to_compute) = self.check_vars_to_retrieve(vars_to_retrieve) else: vars_to_read, vars_to_compute = _vars_to_read, _vars_to_compute file = EbasNasaAmesFile(filename) # find columns in NASA Ames file for variables that are to be read var_cols = self.find_var_cols(vars_to_read=vars_to_read, loaded_nasa_ames=file) # create empty data object (is dictionary with extended functionality) data_out = StationData() data_out = self._add_meta(data_out, file) freq_ebas = data_out["ts_type"] # resolution code # store the raw EBAS meta dictionary (who knows what for later ;P ) # data_out['ebas_meta'] = meta data_out["var_info"] = {} for var, colnum in var_cols.items(): opts = self.get_read_opts(var) data_out["var_info"][var] = {} _col = file.var_defs[colnum] data = file.data[:, colnum] if opts.freq_from_start_stop_meas: tst = self._check_correct_freq(file, freq_ebas) if tst != freq_ebas: logger.info( f"Updating ts_type from {freq_ebas} (EBAS resolution_code) " f"to {tst} (derived from stop_meas-start_meas)" ) data_out["ts_type"] = tst if opts.eval_flags: invalid = ~file.flag_col_info[_col.flag_col].valid data_out.data_flagged[var] = invalid sf = self.get_ebas_var(var).scale_factor if sf != 1: data *= sf data_out["var_info"][var]["scale_factor"] = sf meta = file.meta if not "unit" in _col: # make sure a unit is assigned to data column _col["unit"] = file.unit data = self._check_shift_wavelength(var, _col, meta, data) # TODO: double-check with NILU if this can be assumed if not "matrix" in _col: _col["matrix"] = meta["matrix"] if not "statistics" in _col: stats = None if "statistics" in meta: stats = meta["statistics"] _col["statistics"] = stats data_out[var] = data var_info = _col.to_dict() data_out["var_info"][var].update(var_info) if opts.convert_units: try: data_out = self._convert_varunit_stationdata(data_out, var) except UnitConversionError: if opts.try_convert_vmr_conc: data_out = self._try_convert_vmr_conc( data_out, var, var_info, file.meta ) # raises UnitConversionError if it is not possible else: raise if len(data_out["var_info"]) == 0: raise EbasFileError( f"All data columns of specified input variables are NaN in {filename}" ) data_out["dtime"] = file.time_stamps data_out["start_meas"] = file.start_meas data_out["stop_meas"] = file.stop_meas if self.readopts_default.ensure_correct_freq: self._flag_incorrect_frequencies(data_out) # compute additional variables (if applicable) data_out = self.compute_additional_vars(data_out, vars_to_compute) return data_out
def _check_correct_freq(self, file, freq_ebas): # ToDo: should go into EbasNasaAmesFile class dts = (file.stop_meas - file.start_meas).astype(int) if np.min(dts) < 0: raise TemporalResolutionError("Nasa Ames file contains neg. meas periods...") counts = np.bincount(dts) most_common_dt = np.argmax(counts) # frequency associated based on resolution code if TsType(freq_ebas).check_match_total_seconds(most_common_dt): return freq_ebas logger.warning( f"Detected wrong frequency {freq_ebas}. Trying to infer the correct frequency..." ) try: freq = TsType.from_total_seconds(most_common_dt) return str(freq) except TemporalResolutionError: raise TemporalResolutionError( f"Failed to derive correct sampling frequency in {file.file_name}. " f"Most common meas period (stop_meas - start_meas) in file is " f"{most_common_dt}s and does not " f"correspond to any of the supported frequencies {TsType.VALID_ITER} " f"or permutations of those frequencies within the allowed ranges " f"{TsType.TS_MAX_VALS}" ) def _flag_incorrect_frequencies(self, filedata): # time diffs in units of s for each measurement dt = (filedata.stop_meas - filedata.start_meas).astype(float) # frequency in file (supposedly) tst = TsType(filedata.ts_type) # number of seconds in period (e.g. 86400 for ts_type daily) numsecs = tst.num_secs # tolerance in seconds in period (5% of numsecs, as of 13.1.2021) tolsecs = tst.tol_secs diffarr = dt - numsecs invalid = np.logical_or(diffarr < -tolsecs, diffarr > tolsecs) frac_valid = np.sum(~invalid) / len(invalid) num = len(filedata["start_meas"]) for var in filedata.var_info: opts = self.get_read_opts(var) if opts.freq_min_cov > frac_valid: raise TemporalSamplingError( f"Only {frac_valid * 100:.2f}% of measuerements are in " f"{tst} resolution. Minimum requirement for {var} is " f"{opts.freq_min_cov * 100:.2f}%" ) if not var in filedata.data_flagged: filedata.data_flagged[var] = np.zeros(num).astype(bool) filedata.data_flagged[var][invalid] = True return filedata def _convert_varunit_stationdata(self, sd, var): from_unit = sd.var_info[var]["units"] to_unit = self.var_info(var)["units"] if from_unit != to_unit: sd.convert_unit(var, to_unit) return sd
[docs] def compute_additional_vars(self, data, vars_to_compute): """Compute additional variables and put into station data Note ---- Extended version of :func:`ReadUngriddedBase.compute_additional_vars` Parameters ---------- data : dict-like data object containing data vectors for variables that are required for computation (cf. input param ``vars_to_compute``) vars_to_compute : list list of variable names that are supposed to be computed. Variables that are required for the computation of the variables need to be specified in :attr:`AUX_VARS` and need to be available as data vectors in the provided data dictionary (key is the corresponding variable name of the required variable). Returns ------- dict updated data object now containing also computed variables """ data = super().compute_additional_vars(data, vars_to_compute) for var in vars_to_compute: if not var in data: # variable could not be computed -> ignore continue data.var_info[var].update(self.get_ebas_var(var)) # self._loaded_ebas_vars[var] if var in self.AUX_USE_META: to_dict = data["var_info"][var] from_var = self.AUX_USE_META[var] if from_var in data: from_dict = data["var_info"][from_var] for k, v in from_dict.items(): if not k in to_dict or to_dict[k] is None: to_dict[k] = v if from_var in data.data_flagged: data.data_flagged[var] = data.data_flagged[from_var] if from_var in data.data_err: data.data_err[var] = data.data_err[from_var] if not "units" in data["var_info"][var]: data["var_info"][var]["units"] = self.var_info(var)["units"] return data
[docs] def var_info(self, var_name): """Aerocom variable info for input var_name""" if not var_name in self._loaded_aerocom_vars: self._loaded_aerocom_vars[var_name] = const.VARS[var_name] return self._loaded_aerocom_vars[var_name]
@property def readopts_default(self): """Default reading options These are applied to all variables if not defined explicitly for individual variables in :attr:`REA """ return self._opts["default"]
[docs] def get_read_opts(self, var_name): """ Get reading options for input variable Parameters ---------- var_name : str name of variable Returns ------- EbasReadOptions options """ if not var_name in self.VAR_READ_OPTS: return self._opts["default"] if not var_name in self._opts: vo = ReadEbasOptions(**self.VAR_READ_OPTS[var_name]) self._opts[var_name] = vo return self._opts[var_name]
def _check_constraints(self, constraints): """ Separate sqlite constraints (for file retrieval) from reading options Parameters ---------- constraints : dict constraints and options Returns ------- constraints_new : dict input constraints that have been filtered for options that can be handled by :class:`ReadEbasOptions`. update_opts : dict key / value pairs of input dict that are handled by :class:`ReadEbasOptions`. """ constraints_new = {} update_opts = {} for key, val in constraints.items(): if key in self._opts["default"]: # key is one of the default options available in # ReadEbasOptions (note, they may be also variable dependent) # see method _init_read_opts update_opts[key] = val else: constraints_new[key] = val return (constraints_new, update_opts) def _init_read_opts(self, vars_to_retrieve, constraints): """ Initiate reading options and constraints Parameters ---------- vars_to_retrieve : list list of variables to be retrieved constraints : dict reading constraints and options. Returns ------- constraints : dict updated constraints (e.g. attributes of that can be handled in file requests using :class:`EbasSQLRequest`). """ constraints, update_opts = self._check_constraints(constraints) for var in vars_to_retrieve: # the following method returns default opts if this variable is not # specified explicitly in VAR_READ_OPTS, else, it will instantiate # a new instance of EbasReadOptions for that variable with the # options set therein (and default values for all other options) var_opts = self.get_read_opts(var) if len(update_opts) > 0: var_opts.update(**update_opts) return constraints def _check_keep_aux_vars(self, vars_to_retrieve): """ Check if auxiliary variables are supposed to be kept for input varlist Parameters ---------- vars_to_retrieve : list list of variables to be checked Returns ------- vars_to_retrieve : list input list that may be extented by additional auxiliary variables that are needed for reading some of the input variables and that are supposed to be imported as well. """ add = [] for var in vars_to_retrieve: if var in self.AUX_REQUIRES and self.get_read_opts(var).keep_aux_vars: for auxvar in self.AUX_REQUIRES[var]: if auxvar in add: raise NotImplementedError() add.append(auxvar) return vars_to_retrieve + add
[docs] def read( self, vars_to_retrieve=None, first_file=None, last_file=None, files=None, **constraints ): """Method that reads list of files as instance of :class:`UngriddedData` Parameters ---------- vars_to_retrieve : :obj:`list` or similar, optional, list containing variable IDs that are supposed to be read. If None, all variables in :attr:`PROVIDES_VARIABLES` are loaded first_file : :obj:`int`, optional index of first file in file list to read. If None, the very first file in the list is used last_file : :obj:`int`, optional index of last file in list to read. If None, the very last file in the list is used files : list list of files **constraints further reading constraints deviating from default (default info for each AEROCOM variable can be found in `ebas_config.ini < https://github.com/metno/pyaerocom/blob/master/pyaerocom/data/ ebas_config.ini>`__). For details on possible input parameters see :class:`EbasSQLRequest` (or `this tutorial <http://aerocom.met.no /pyaerocom/tutorials.html#ebas-file-query-and-database-browser>`__) Returns ------- UngriddedData data object """ vars_to_retrieve = self._precheck_vars_to_retrieve(vars_to_retrieve) # check_vars_to_retrieve is implemented in template base class (vars_to_read, vars_to_compute) = self.check_vars_to_retrieve(vars_to_retrieve) all_vars = vars_to_read + vars_to_compute constraints = self._init_read_opts(all_vars, constraints) vars_to_retrieve = self._check_keep_aux_vars(vars_to_retrieve) if files is None: self.get_file_list(vars_to_retrieve, **constraints) files = self.files files_contain = self.files_contain else: if isinstance(files, str): # single file files = [files] files_contain = [vars_to_retrieve] * len(files) if first_file is None: first_file = 0 if last_file is None: last_file = len(files) files = files[first_file:last_file] files_contain = files_contain[first_file:last_file] data = self._read_files(files, vars_to_retrieve, files_contain, constraints) data.clear_meta_no_data() return data
def _read_files(self, files, vars_to_retrieve, files_contain, constraints): """Helper that reads list of files into UngriddedData Note ---- This method is not supposed to be called directly but is used in :func:`read` and serves the purpose of parallel loading of data """ self.files_failed = [] data_obj = UngriddedData(num_points=1000000) # Add reading options to filter "history of UngriddedDataObject" filters = self.readopts_default.filter_dict filters.update(constraints) data_obj._add_to_filter_history(filters) meta_key = 0.0 idx = 0 # assign metadata object metadata = data_obj.metadata meta_idx = data_obj.meta_idx # counter that is updated whenever a new variable appears during read # (is used for attr. var_idx in UngriddedData object) var_count_glob = -1 logger.info(f"Reading EBAS data from {self.file_dir}") num_files = len(files) for i in tqdm(range(num_files), disable=None): _file = files[i] contains = files_contain[i] try: station_data = self.read_file(_file, vars_to_retrieve=contains) except ( NotInFileError, EbasFileError, TemporalResolutionError, TemporalSamplingError, ) as e: self.files_failed.append(_file) self.logger.warning( f"Skipping reading of EBAS NASA Ames file: {_file}. Reason: {repr(e)}" ) continue except Exception as e: self.files_failed.append(_file) logger.warning( f"Skipping reading of EBAS NASA Ames file: {_file}. Reason: {repr(e)}" ) continue # Fill the metatdata dict # the location in the data set is time step dependent! # use the lat location here since we have to choose one location # in the time series plot metadata[meta_key] = {} metadata[meta_key].update(station_data.get_meta(add_none_vals=True)) if "station_name_orig" in station_data: metadata[meta_key]["station_name_orig"] = station_data["station_name_orig"] metadata[meta_key]["data_revision"] = self.data_revision metadata[meta_key]["var_info"] = {} # this is a list with indices of this station for each variable # not sure yet, if we really need that or if it speeds up things meta_idx[meta_key] = {} num_times = len(station_data["dtime"]) contains_vars = list(station_data.var_info) # access array containing time stamps # TODO: check using index instead (even though not a problem here # since all Aerocom data files are of type timeseries) times = np.float64(station_data["dtime"]) append_vars = [x for x in np.intersect1d(vars_to_retrieve, contains_vars)] totnum = num_times * len(append_vars) # check if size of data object needs to be extended if (idx + totnum) >= data_obj._ROWNO: # if totnum < data_obj._CHUNKSIZE, then the latter is used data_obj.add_chunk(totnum) for var_count, var in enumerate(append_vars): # data values values = station_data[var] # get start / stop index for this data vector start = idx + var_count * num_times stop = start + num_times if not var in data_obj.var_idx: var_count_glob += 1 var_idx = var_count_glob data_obj.var_idx[var] = var_idx else: var_idx = data_obj.var_idx[var] # write common meta info for this station (data lon, lat and # altitude are set to station locations) data_obj._data[start:stop, data_obj._LATINDEX] = station_data["latitude"] data_obj._data[start:stop, data_obj._LONINDEX] = station_data["longitude"] data_obj._data[start:stop, data_obj._ALTITUDEINDEX] = station_data["altitude"] data_obj._data[start:stop, data_obj._METADATAKEYINDEX] = meta_key # write data to data object data_obj._data[start:stop, data_obj._TIMEINDEX] = times data_obj._data[start:stop, data_obj._DATAINDEX] = values data_obj._data[start:stop, data_obj._VARINDEX] = var_idx if var in station_data.data_flagged: invalid = station_data.data_flagged[var] data_obj._data[start:stop, data_obj._DATAFLAGINDEX] = invalid if var in station_data.data_err: errs = station_data.data_err[var] data_obj._data[start:stop, data_obj._DATAERRINDEX] = errs var_info = station_data["var_info"][var] metadata[meta_key]["var_info"][var] = {} metadata[meta_key]["var_info"][var].update(var_info) meta_idx[meta_key][var] = np.arange(start, stop) metadata[meta_key]["variables"] = append_vars idx += totnum meta_key += 1 # shorten data_obj._data to the right number of points data_obj._data = data_obj._data[:idx] num_failed = len(self.files_failed) if num_failed > 0: logger.warning(f"{num_failed} out of {num_files} could not be read...") return data_obj