Source code for pyaerocom.io.read_earlinet

import logging
import os

import numpy as np
import pandas as pd
import xarray

from pyaerocom import const
from pyaerocom.exceptions import DataUnitError
from pyaerocom.io.readungriddedbase import ReadUngriddedBase
from pyaerocom.stationdata import StationData
from pyaerocom.ungriddeddata import UngriddedData
from pyaerocom.units_helpers import get_unit_conversion_fac
from pyaerocom.variable import Variable
from pyaerocom.vertical_profile import VerticalProfile

logger = logging.getLogger(__name__)


[docs] class ReadEarlinet(ReadUngriddedBase): """Interface for reading of EARLINET data""" #: Mask for identifying datafiles _FILEMASK = "*.*" #: version log of this class (for caching) __version__ = "0.16_" + ReadUngriddedBase.__baseversion__ #: Name of dataset (OBS_ID) DATA_ID = const.EARLINET_NAME #: List of all datasets supported by this interface SUPPORTED_DATASETS = [const.EARLINET_NAME] #: default variables for read method DEFAULT_VARS = ["bsc532aer", "ec532aer"] #: all data values that exceed this number will be set to NaN on read. This #: is because iris, xarray, etc. assign a FILL VALUE of the order of e36 #: to missing data in the netcdf files _MAX_VAL_NAN = 1e6 #: variable name of altitude in files ALTITUDE_ID = "altitude" #: temporal resolution # Note: This is an approximation based on the fact that MOST of the data appears to be collected # at an hourly reoslution. Some files are a little less, but typically this is the case TS_TYPE = "hourly" #: dictionary specifying the file search patterns for each variable # VAR_PATTERNS_FILE = { # "ec532aer": "*.e532", # "ec355aer": "*.e355", # "bsc532aer": "*.b532", # "bsc355aer": "*.b355", # "bsc1064aer": "*.b1064", # "zdust": "*.e*", # } VAR_PATTERNS_FILE = { "ec532aer": "_Lev02_e0532", "ec355aer": "_Lev02_e0355", "bsc532aer": "_Lev02_b0532", "bsc355aer": "_Lev02_b0355", "bsc1064aer": "_Lev02_b1064", # "zdust": "*.e*", # not sure if EARLINET has this anymore } #: dictionary specifying the file column names (values) for each Aerocom #: variable (keys) VAR_NAMES_FILE = { "ec532aer": "extinction", "ec355aer": "extinction", "ec1064aer": "extinction", "bsc532aer": "backscatter", "bsc355aer": "backscatter", "bsc1064aer": "backscatter", "zdust": "DustLayerHeight", # not sure if EARLINET has this anymore } META_NAMES_FILE = dict( location="location", start_utc="measurement_start_datetime", stop_utc="measurement_stop_datetime", # wavelength_det="DetectionWavelength_nm", # res_raw_m="ResolutionRaw_meter", instrument_name="system", comment="comment", PI="PI", dataset_name="title", # station_name="station_ID", website="references", wavelength_emis="wavelength", # detection_mode="DetectionMode", # res_eval="ResolutionEvaluated", # input_params="InputParameters", altitude="altitude", # eval_method="backscatter_evaluation_method", ) #: metadata keys that are needed for reading (must be values in #: :attr:`META_NAMES_FILE`) META_NEEDED = [ "location", "measurement_start_datetime", "measurement_start_datetime", ] #: Metadata keys from :attr:`META_NAMES_FILE` that are additional to #: standard keys defined in :class:`StationMetaData` and that are supposed #: to be inserted into :class:`UngriddedData` object created in :func:`read` KEEP_ADD_META = [ "location", "wavelength", "zenith_angle", "comment", "shots", "backscatter_evaluation_method", ] #: Attribute access names for unit reading of variable data VAR_UNIT_NAMES = dict( extinction=["units"], backscatter=["units"], dustlayerheight=["units"], altitude="units", ) #: Variable names of uncertainty data ERR_VARNAMES = dict( ec532aer="error_extinction", ec355aer="error_extinction", ) #: If true, the uncertainties are also read (where available, cf. ERR_VARNAMES) READ_ERR = True PROVIDES_VARIABLES = list(VAR_PATTERNS_FILE) EXCLUDE_CASES = ["cirrus.txt"] def __init__(self, data_id=None, data_dir=None): # initiate base class super().__init__(data_id=data_id, data_dir=data_dir) # make sure everything is properly set up if not all( [x in self.VAR_PATTERNS_FILE for x in self.PROVIDES_VARIABLES] ): # pragma: no cover raise AttributeError( "Please specify file search masks in " "header dict VAR_PATTERNS_FILE for each " "variable defined in PROVIDES_VARIABLES" ) elif not all( [x in self.VAR_NAMES_FILE for x in self.PROVIDES_VARIABLES] ): # pragma: no cover raise AttributeError( "Please specify file search masks in " "header dict VAR_NAMES_FILE for each " "variable defined in PROVIDES_VARIABLES" ) #: private dictionary containing loaded Variable instances, self._var_info = {} #: files that are supposed to be excluded from reading self.exclude_files = [] #: files that were actually excluded from reading self.excluded_files = [] self.is_vertical_profile = True
[docs] def read_file(self, filename, vars_to_retrieve=None, read_err=None, remove_outliers=True): """Read EARLINET file and return it as instance of :class:`StationData` Parameters ---------- filename : str absolute path to filename to read vars_to_retrieve : :obj:`list`, optional list of str with variable names to read. If None, use :attr:`DEFAULT_VARS` read_err : bool if True, uncertainty data is also read (where available). remove_outliers : bool if True, outliers are removed for each variable using the `minimum` and `maximum` attributes for that variable (accessed via pyaerocom.const.VARS[var_name]). Returns ------- StationData dict-like object containing results """ if read_err is None: # use default setting read_err = self.READ_ERR if isinstance(vars_to_retrieve, str): vars_to_retrieve = [vars_to_retrieve] _vars = [] for var in vars_to_retrieve: if ( var in self.VAR_PATTERNS_FILE ): # make sure to only read what is supported by this file if self.VAR_PATTERNS_FILE[var] in filename: _vars.append(var) elif var in self.AUX_REQUIRES: _vars.append(var) else: raise ValueError(f"{var} is not supported") # implemented in base class vars_to_read, vars_to_compute = self.check_vars_to_retrieve(_vars) # create empty data object (is dictionary with extended functionality) data_out = StationData() data_out["station_id"] = filename.split("/")[-1].split("_")[ 2 ] # loss of generality but should work. can also get from reading file if needed: data_in.station_ID data_out["data_id"] = self.data_id data_out["ts_type"] = self.TS_TYPE # create empty arrays for all variables that are supposed to be read # from file for var in vars_to_read: if not var in self._var_info: self._var_info[var] = Variable(var) var_info = self._var_info # Iterate over the lines of the file self.logger.debug(f"Reading file {filename}") data_in = xarray.open_dataset(filename, engine="netcdf4") # getting the coords since no longer in metadata # Put also just in the attributes. not sure why appears twice data_out["station_coords"]["longitude"] = data_out["longitude"] = np.float64( data_in["longitude"].values ) data_out["station_coords"]["latitude"] = data_out["latitude"] = np.float64( data_in["latitude"].values ) data_out["altitude"] = np.float64( data_in[ "altitude" ].values # altitude is defined in EARLINET in terms of altitude above sea level ) # Note altitude is an array for the data, station altitude is different data_out["station_coords"]["altitude"] = np.float64(data_in.station_altitude) data_out["altitude_attrs"] = data_in[ "altitude" ].attrs # get attrs for altitude units + extra # get intersection of metadaa in ddataa_out and data_in for k, v in self.META_NAMES_FILE.items(): if v in self.META_NEEDED: _meta = data_in.attrs[v] else: try: _meta = data_in.attrs[v] except Exception: # pragma: no cover _meta = None data_out[k] = _meta # get metadata expected in StationData but not in data_in's metadata data_out["wavelength_emis"] = data_in["wavelength"] data_out["shots"] = np.float64(data_in["shots"]) data_out["zenith_angle"] = np.float64(data_in["zenith_angle"]) data_out["filename"] = filename if "Lev02" in filename: data_out["data_level"] = 2 loc_split = data_in.attrs["location"].split(", ") data_out["station_name"] = loc_split[0] if len(loc_split) > 1: data_out["country"] = loc_split[1] dtime = pd.Timestamp(data_in.measurement_start_datetime).to_numpy().astype("datetime64[s]") stop = pd.Timestamp(data_in.measurement_stop_datetime).to_numpy().astype("datetime64[s]") # in case measurement goes over midnight into a new day if stop < dtime: stop = stop + np.timedelta64(1, "[D]") data_out["dtime"] = [dtime] data_out["stopdtime"] = [stop] data_out["has_zdust"] = False for var in vars_to_read: data_out["var_info"][var] = {} err_read = False unit_ok = False outliers_removed = False has_altitude = False netcdf_var_name = self.VAR_NAMES_FILE[var] # check if the desired variable is in the file if netcdf_var_name not in data_in.variables: self.logger.warning(f"Variable {var} not found in file {filename}") continue info = var_info[var] # xarray.DataArray arr = data_in.variables[netcdf_var_name] # the actual data as numpy array (or float if 0-D data, e.g. zdust) val = np.squeeze(np.float64(arr)) # squeeze to 1D array # CONVERT UNIT unit = None unames = self.VAR_UNIT_NAMES[netcdf_var_name] for u in unames: if u in arr.attrs: unit = arr.attrs[u] if unit is None: raise DataUnitError(f"Unit of {var} could not be accessed in file {filename}") unit_fac = None try: to_unit = self._var_info[var].units unit_fac = get_unit_conversion_fac(unit, to_unit) val *= unit_fac unit = to_unit unit_ok = True except Exception as e: logger.warning( f"Failed to convert unit of {var} in file {filename} (Earlinet): " f"Error: {repr(e)}" ) # import errors if applicable err = np.nan if read_err and var in self.ERR_VARNAMES: err_name = self.ERR_VARNAMES[var] if err_name in data_in.variables: err = np.squeeze(np.float64(data_in.variables[err_name])) if unit_ok: err *= unit_fac err_read = True # 1D variable if var == "zdust": if not val.ndim == 0: raise ValueError("Fatal: dust layer height data must be single value") if unit_ok and info.minimum < val < info.maximum: logger.warning(f"zdust value {val} out of range, setting to NaN") val = np.nan if np.isnan(val): self.logger.warning( f"Invalid value of variable zdust in file {filename}. Skipping...!" ) continue data_out["has_zdust"] = True data_out[var] = val else: if not val.ndim == 1: raise ValueError("Extinction data must be one dimensional") elif len(val) == 0: continue # no data # Remove NaN equivalent values val[val > self._MAX_VAL_NAN] = np.nan wvlg = var_info[var].wavelength_nm wvlg_str = self.META_NAMES_FILE["wavelength_emis"] if not wvlg == float(data_in[wvlg_str]): self.logger.info("No wavelength match") continue alt_id = self.ALTITUDE_ID alt_data = data_in.variables[alt_id] alt_vals = np.float64(alt_data) alt_unit = alt_data.attrs[self.VAR_UNIT_NAMES[alt_id]] to_alt_unit = const.VARS["alt"].units if not alt_unit == to_alt_unit: try: alt_unit_fac = get_unit_conversion_fac(alt_unit, to_alt_unit) alt_vals *= alt_unit_fac alt_unit = to_alt_unit except Exception as e: self.logger.warning(f"Failed to convert unit: {repr(e)}") has_altitude = True # remove outliers from data, if applicable if remove_outliers and unit_ok: # REMOVE OUTLIERS outlier_mask = np.logical_or(val < info.minimum, val > info.maximum) val[outlier_mask] = np.nan if err_read: err[outlier_mask] = np.nan outliers_removed = True # remove outliers from errors if applicable if err_read: err[err > self._MAX_VAL_NAN] = np.nan # create instance of ProfileData profile = VerticalProfile( data=val, altitude=alt_vals, dtime=dtime, var_name=var, data_err=err, var_unit=unit, altitude_unit=alt_unit, ) # Write everything into profile data_out[var] = profile data_out["var_info"][var].update( unit_ok=unit_ok, err_read=err_read, outliers_removed=outliers_removed, has_altitute=has_altitude, ) return data_out
[docs] def read( self, vars_to_retrieve=None, files=None, first_file=None, last_file=None, read_err=None, remove_outliers=True, pattern=None, ): """Method that reads list of files as instance of :class:`UngriddedData` Parameters ---------- vars_to_retrieve : :obj:`list` or similar, optional, list containing variable IDs that are supposed to be read. If None, all variables in :attr:`PROVIDES_VARIABLES` are loaded files : :obj:`list`, optional list of files to be read. If None, then the file list is used that is returned on :func:`get_file_list`. first_file : :obj:`int`, optional index of first file in file list to read. If None, the very first file in the list is used last_file : :obj:`int`, optional index of last file in list to read. If None, the very last file in the list is used read_err : bool if True, uncertainty data is also read (where available). If unspecified (None), then the default is used (cf. :attr:`READ_ERR`) pattern : str, optional string pattern for file search (cf :func:`get_file_list`) Returns ------- UngriddedData data object """ if vars_to_retrieve is None: vars_to_retrieve = self.DEFAULT_VARS elif isinstance(vars_to_retrieve, str): vars_to_retrieve = [vars_to_retrieve] if read_err is None: read_err = self.READ_ERR if files is None: if len(self.files) == 0: self.get_file_list(vars_to_retrieve, pattern=pattern) files = self.files # turn files into a list becauase I suspect there may be a bug if you don't do this if isinstance(files, str): files = [files] if first_file is None: first_file = 0 if last_file is None: last_file = len(files) files = files[ first_file : last_file + 1 ] # think need to +1 here in order to actually get desired subset self.read_failed = [] data_obj = UngriddedData() data_obj.is_vertical_profile = True col_idx = data_obj.index meta_key = -1.0 idx = 0 # assign metadata object metadata = data_obj.metadata meta_idx = data_obj.meta_idx # last_station_id = '' num_files = len(files) disp_each = int(num_files * 0.1) if disp_each < 1: disp_each = 1 VAR_IDX = -1 for i, _file in enumerate(files): if i % disp_each == 0: print(f"Reading file {i + 1} of {num_files} ({type(self).__name__})") try: stat = self.read_file( _file, vars_to_retrieve=vars_to_retrieve, read_err=read_err, remove_outliers=remove_outliers, ) if not any([var in stat.vars_available for var in vars_to_retrieve]): self.logger.info( f"Station {stat.station_name} contains none of the desired variables. Skipping station..." ) continue # if last_station_id != station_id: meta_key += 1 # Fill the metatdata dict # the location in the data set is time step dependant! # use the lat location here since we have to choose one location # in the time series plot metadata[meta_key] = {} metadata[meta_key].update(stat.get_meta()) for add_meta in self.KEEP_ADD_META: if add_meta in stat: metadata[meta_key][add_meta] = stat[add_meta] # metadata[meta_key]['station_id'] = station_id metadata[meta_key]["data_revision"] = self.data_revision metadata[meta_key]["variables"] = [] metadata[meta_key]["var_info"] = {} # this is a list with indices of this station for each variable # not sure yet, if we really need that or if it speeds up things meta_idx[meta_key] = {} # last_station_id = station_id # Is floating point single value time = stat.dtime[0] for var in stat.vars_available: if not var in data_obj.var_idx: VAR_IDX += 1 data_obj.var_idx[var] = VAR_IDX var_idx = data_obj.var_idx[var] val = stat[var] metadata[meta_key]["var_info"][var] = vi = {} if isinstance(val, VerticalProfile): altitude = val.altitude data = val.data add = len(data) err = val.data_err metadata[meta_key]["var_info"]["altitude"] = via = {} vi.update(val.var_info[var]) via.update(val.var_info["altitude"]) else: add = 1 altitude = np.nan data = val if var in stat.data_err: err = stat.err[var] else: err = np.nan vi.update(stat.var_info[var]) stop = idx + add # check if size of data object needs to be extended if stop >= data_obj._ROWNO: # if totnum < data_obj._CHUNKSIZE, then the latter is used data_obj.add_chunk(add) # write common meta info for this station data_obj._data[idx:stop, col_idx["latitude"]] = stat["station_coords"][ "latitude" ] data_obj._data[idx:stop, col_idx["longitude"]] = stat["station_coords"][ "longitude" ] data_obj._data[idx:stop, col_idx["altitude"]] = stat["station_coords"][ "altitude" ] data_obj._data[idx:stop, col_idx["meta"]] = meta_key # write data to data object data_obj._data[idx:stop, col_idx["time"]] = time data_obj._data[idx:stop, col_idx["stoptime"]] = stat.stopdtime[0] data_obj._data[idx:stop, col_idx["data"]] = data data_obj._data[idx:stop, col_idx["dataaltitude"]] = altitude data_obj._data[idx:stop, col_idx["varidx"]] = var_idx if read_err: data_obj._data[idx:stop, col_idx["dataerr"]] = err if not var in meta_idx[meta_key]: meta_idx[meta_key][var] = [] meta_idx[meta_key][var].extend(list(range(idx, stop))) if not var in metadata[meta_key]["variables"]: metadata[meta_key]["variables"].append(var) idx += add except Exception as e: self.read_failed.append(_file) self.logger.exception( f"Failed to read file {os.path.basename(_file)} (ERR: {repr(e)})" ) # shorten data_obj._data to the right number of points data_obj._data = data_obj._data[:idx] self.data = data_obj return data_obj
def _get_exclude_filelist(self): # pragma: no cover """Get list of filenames that are supposed to be ignored""" exclude = [] import glob files = glob.glob(f"{self.data_dir}/EXCLUDE/*.txt") for i, file in enumerate(files): if not os.path.basename(file) in self.EXCLUDE_CASES: continue count = 0 num = None indata = False with open(file) as f: for line in f: if indata: exclude.append(line.strip()) count += 1 elif "Number of" in line: num = int(line.split(":")[1].strip()) indata = True if not count == num: raise Exception self.exclude_files = list(dict.fromkeys(exclude)) return self.exclude_files
[docs] def get_file_list(self, vars_to_retrieve=None, pattern=None): """Perform recusive file search for all input variables Note ---- Overloaded implementation of base class, since for Earlinet, the paths are variable dependent Parameters ---------- vars_to_retrieve : list list of variables to retrieve pattern : str, optional file name pattern applied to search Returns ------- list list containing file paths """ if vars_to_retrieve is None: vars_to_retrieve = self.DEFAULT_VARS elif isinstance(vars_to_retrieve, str): vars_to_retrieve = [vars_to_retrieve] exclude = self._get_exclude_filelist() logger.info("Fetching EARLINET data files. This might take a while...") patterns = [] for var in vars_to_retrieve: if not var in self.VAR_PATTERNS_FILE: from pyaerocom.exceptions import VarNotAvailableError raise VarNotAvailableError(f"Input variable {var} is not supported") _pattern = self.VAR_PATTERNS_FILE[var] if pattern is not None: if "." in pattern: raise NotImplementedError("filetype delimiter . not supported") spl = _pattern.split(".") if not "*" in spl[0]: raise AttributeError(f"Invalid file pattern: {_pattern}") spl[0] = spl[0].replace("*", pattern) _pattern = ".".join(spl) patterns.append(_pattern) matches = [] for root, dirnames, files in os.walk(self.data_dir, topdown=True): paths = [os.path.join(root, f) for f in files] for _pattern in patterns: for path in paths: file = os.path.basename(path) if not _pattern in file: continue elif file in exclude: self.excluded_files.append(path) else: matches.append(path) self.files = files = list(dict.fromkeys(matches)) return files