import logging
import numpy as np
from tqdm import tqdm
from pyaerocom import const
from pyaerocom.exceptions import (
AeronetReadError,
MetaDataError,
StationCoordinateError,
VariableNotFoundError,
)
from pyaerocom.helpers import varlist_aerocom
from pyaerocom.io.readungriddedbase import ReadUngriddedBase
from pyaerocom.mathutils import numbers_in_str
from pyaerocom.time_config import TS_TYPES
from pyaerocom.ungriddeddata import UngriddedData
logger = logging.getLogger(__name__)
[docs]
class ReadAeronetBase(ReadUngriddedBase):
"""TEMPLATE: Abstract base class template for reading of Aeronet data
Extended abstract base class, derived from low-level base class
:class:`ReadUngriddedBase` that contains some more functionality.
"""
__baseversion__ = "0.14_" + ReadUngriddedBase.__baseversion__
#: column delimiter in data block of files
COL_DELIM = ","
#: dictionary assigning temporal resolution flags for supported datasets
#: that are provided in a defined temporal resolution. Key is the name
#: of the dataset and value is the corresponding ts_type
TS_TYPES = {}
#: dictionary specifying the file column names (values) for each Aerocom
#: variable (keys)
VAR_NAMES_FILE = {}
#: Mappings for identifying variables in file (may be specified in addition
#: to explicit variable names specified in VAR_NAMES_FILE)
VAR_PATTERNS_FILE = {}
#: OPTIONAL: dictionary specifying alternative column names for variables
#: defined in :attr:`VAR_NAMES_FILE`
ALT_VAR_NAMES_FILE = {}
#: dictionary specifying the file column names (values) for each
#: metadata key (cf. attributes of :class:`StationData`, e.g.
#: 'station_name', 'longitude', 'latitude', 'altitude')
META_NAMES_FILE = {}
META_NAMES_FILE_ALT = ({},)
#: name of measurement instrument
INSTRUMENT_NAME = "sun_photometer"
#: Default data unit that is assigned to all variables that are not
#: specified in UNITS dictionary (cf. :attr:`UNITS`)
DEFAULT_UNIT = "1"
#: Variable specific units, only required for variables that deviate from
#: :attr:`DEFAULT_UNIT` (is irrelevant for all variables that are
#: so far supported by the implemented Aeronet products, i.e. all variables
#: are dimensionless as specified in :attr:`DEFAULT_UNIT`)
UNITS = {}
IGNORE_META_KEYS = ["date", "time", "day_of_year"]
def __init__(self, data_id=None, data_dir=None):
super().__init__(data_id=data_id, data_dir=data_dir)
# dictionary that contains information about the file columns
# is written in method _update_col_index
self._col_index = {}
# header string referring to the content in attr. col_index. Is
# updated whenever the former is updated (i.e. when method
# _update_col_index is called). Can be used to check if
# file structure changed between subsequent files so that
# col_index is only recomputed when the file structure changes
# and not for each file individually
self._last_col_index_str = None
self._last_col_order = []
self._alt_var_cols = {}
def _ts_type_from_data_id(self):
if "." in self.data_id:
ts_type = self.data_id.split(".")[-1]
if ts_type in TS_TYPES:
self.TS_TYPES[self.data_id] = ts_type
return ts_type
raise AttributeError("Failed to retrieve ts_type from data_id")
@property
def TS_TYPE(self):
"""Default implementation of string for temporal resolution"""
try:
return self.TS_TYPES[self.data_id]
except KeyError:
try:
return self._ts_type_from_data_id()
except AttributeError:
return "undefined"
@property
def col_index(self):
"""Dictionary that specifies the index for each data column
Note
----
Implementation depends on the data. For instance, if the variable
information is provided in all files (of all stations) and always in
the same column, then this can be set as a fixed dictionary in the
__init__ function of the implementation (see e.g. class
:class:`ReadAeronetSunV2`).
In other cases, it may not be ensured
that each variable is available in all files or the column definition
may differ between different stations. In the latter case you may
automise the column index retrieval by providing the header names for
each meta and data column you want to extract using the attribute
dictionaries :attr:`META_NAMES_FILE` and :attr:`VAR_NAMES_FILE` by
calling :func:`_update_col_index` in your implementation of
:func:`read_file` when you reach the line that contains the header
information.
"""
return self._col_index
[docs]
def infer_wavelength_colname(self, colname, low=250, high=2000):
"""Get variable wavelength from column name
Parameters
----------
colname : str
string of column name
low : int
lower limit of accepted value range
high : int
upper limit of accepted value range
Returns
-------
str
wavelength in nm as floating str
Raises
------
ValueError
if None or more than one number is detected in variable string
"""
nums = numbers_in_str(colname)
if len(nums) == 1:
if low <= int(nums[0]) <= high:
self.logger.debug(
f"Succesfully extracted wavelength {nums[0]} nm from column name {colname}"
)
return nums[0]
raise ValueError(f"Failed to extract wavelength from colname {colname}")
def _update_col_index(self, col_index_str):
"""Update file column information for fast access during read_file
Note
----
If successful (no exceptions raised), then this methods overwrites the
current column index information stored in :attr:`col_index`.
Parameters
----------
col_index_str : str
header string of data table in files
Returns
-------
dict
dictionary containing indices (values) for each data /
metadata key specified in ``VAR_NAMES_FILE`` and ``META_NAMES_FILE``.
Raises
------
MetaDataError
if one of the specified meta data columns does not exist in data
"""
cols = col_index_str.strip().split(self.COL_DELIM)
mapping = {}
for idx, info_str in enumerate(cols):
if info_str in mapping:
mapping[info_str] = "MULTI"
else:
mapping[info_str] = idx
col_index = self._find_vars_name_based(mapping, cols)
self._col_index = col_index
self._last_col_index_str = col_index_str
self._last_col_order = cols
return col_index
def _check_alternative_colnames(self, val, mapping):
if val in self.META_NAMES_FILE_ALT:
alt_names = self.META_NAMES_FILE_ALT[val]
if isinstance(alt_names, str) and alt_names in mapping:
return alt_names
elif isinstance(alt_names, list):
for alt_name in alt_names:
if alt_name in mapping:
return alt_name
raise MetaDataError(
f"Required meta-information string {val} could not be found in file header"
)
def _find_vars_name_based(self, mapping, cols):
col_index = {}
# find meta indices
for key, val in self.META_NAMES_FILE.items():
if val not in mapping:
val = self._check_alternative_colnames(val, mapping)
col_index[key] = mapping[val]
for var, colname in self.VAR_NAMES_FILE.items():
if colname in mapping:
col_index[var] = mapping[colname]
elif const.OBS_ALLOW_ALT_WAVELENGTHS:
known = False
if var in self.ALT_VAR_NAMES_FILE:
for alt_colname in self.ALT_VAR_NAMES_FILE[var]:
if alt_colname in mapping:
known = True
col_index[var] = mapping[alt_colname]
if not known:
try:
idx = self._search_var_wavelength_tol(var, cols)
col_index[var] = idx
except Exception as e:
self.logger.info(
f"Failed to infer data column of variable {var} "
f"within wavelength tolerance range. Error:\n{repr(e)}"
)
return col_index
def _search_var_wavelength_tol(self, var, cols):
"""Find alternative variable within acceptance range"""
var_info = const.VARS[var]
colname = self.VAR_NAMES_FILE[var]
wvl = var_info.wavelength_nm
tol = var_info.obs_wavelength_tol_nm
low, high = wvl - tol, wvl + tol
if wvl is None:
raise AttributeError(f"Variable {var} does not contain wavelength information")
# variable information exists and contains wavelength info
wvl_str = self.infer_wavelength_colname(colname)
check_mask = colname.replace(wvl_str, "")
if not wvl == float(wvl_str):
raise ValueError(
f"Wavelength mismatch between pyaerocom Variable {var} and "
f"wavelength inferred from Aeronet column name {colname}"
)
# it is possible to extract wavelength from column
# name and the extracted number corresponds to
# the expected wavelength as inferred from
# pyaerocom.Variable instance
wvl_diff_min = 1e6
# loop over header
for i, col in enumerate(cols):
try:
wvl_str_col = self.infer_wavelength_colname(col)
except Exception:
pass
else:
wvl_col = float(wvl_str_col)
if low <= wvl_col <= high:
mask = col.replace(wvl_str_col, "")
if check_mask == mask:
diff = abs(wvl_col - wvl)
if diff < wvl_diff_min:
wvl_diff_min = diff
if var not in self._alt_var_cols:
self._alt_var_cols[var] = []
if col not in self._alt_var_cols[var]:
self._alt_var_cols[var].append(col)
return i
raise VariableNotFoundError(
f"Did not find an alternative data column for variable {var} "
f"within allowed wavelength tolerance range of +/- {tol} nm."
)
[docs]
def print_all_columns(self):
for col in self._last_col_order:
print(col)
[docs]
def read(
self,
vars_to_retrieve=None,
files=None,
first_file=None,
last_file=None,
file_pattern=None,
common_meta=None,
):
"""Method that reads list of files as instance of :class:`UngriddedData`
Parameters
----------
vars_to_retrieve : :obj:`list` or similar, optional,
list containing variable IDs that are supposed to be read. If None,
all variables in :attr:`PROVIDES_VARIABLES` are loaded
files : :obj:`list`, optional
list of files to be read. If None, then the file list is used that
is returned on :func:`get_file_list`.
first_file : :obj:`int`, optional
index of first file in file list to read. If None, the very first
file in the list is used. Note: is ignored if input parameter
`file_pattern` is specified.
last_file : :obj:`int`, optional
index of last file in list to read. If None, the very last file
in the list is used. Note: is ignored if input parameter
`file_pattern` is specified.
file_pattern : str, optional
string pattern for file search (cf :func:`get_file_list`)
common_meta : dict, optional
dictionary that contains additional metadata shared for this
network (assigned to each metadata block of the
:class:`UngriddedData` object that is returned)
Returns
-------
UngriddedData
data object
"""
if common_meta is None:
common_meta = {}
if vars_to_retrieve is None:
vars_to_retrieve = self.DEFAULT_VARS
elif isinstance(vars_to_retrieve, str):
vars_to_retrieve = [vars_to_retrieve]
vars_to_retrieve = varlist_aerocom(vars_to_retrieve)
if files is None:
if len(self.files) == 0:
self.get_file_list(pattern=file_pattern)
files = self.files
if file_pattern is None:
if first_file is None:
first_file = 0
if last_file is None:
last_file = len(files)
files = files[first_file:last_file]
self.read_failed = []
data_obj = UngriddedData()
meta_key = 0.0
idx = 0
# assign metadata object
metadata = data_obj.metadata
meta_idx = data_obj.meta_idx
num_vars = len(vars_to_retrieve)
num_files = len(files)
logger.info("Reading AERONET data")
skipped = 0
for i in tqdm(range(num_files), disable=None):
_file = files[i]
try:
station_data = self.read_file(_file, vars_to_retrieve=vars_to_retrieve)
except AeronetReadError as e:
self.logger.warning(f"\n{repr(e)}.")
skipped += 1
continue
try:
statmeta = station_data.get_meta()
except StationCoordinateError as e:
stat = station_data.station_name
if isinstance(stat, list | np.ndarray):
stat = stat[0]
logger.warning(f"\nSkipping station {stat}. Reason: {repr(e)}.\n")
skipped += 1
continue
# Fill the metatdata dict
# the location in the data set is time step dependant!
# use the lat location here since we have to choose one location
# in the time series plot
meta = {}
meta["var_info"] = {}
meta.update(statmeta)
meta["data_id"] = self.data_id
meta["ts_type"] = self.TS_TYPE
# meta['variables'] = vars_to_retrieve
if "instrument_name" in station_data and station_data["instrument_name"] is not None:
instr = station_data["instrument_name"]
else:
instr = self.INSTRUMENT_NAME
meta["instrument_name"] = instr
meta["data_revision"] = self.data_revision
meta["filename"] = _file
meta.update(**common_meta)
# this is a list with indices of this station for each variable
# not sure yet, if we really need that or if it speeds up things
meta_idx[meta_key] = {}
num_times = len(station_data["dtime"])
# access array containing time stamps
# TODO: check using index instead (even though not a problem here
# since all Aerocom data files are of type timeseries)
times = np.float64(station_data["dtime"])
totnum = num_times * num_vars
# check if size of data object needs to be extended
if (idx + totnum) >= data_obj._ROWNO:
# if totnum < data_obj._CHUNKSIZE, then the latter is used
data_obj.add_chunk(totnum)
for var_idx, var in enumerate(vars_to_retrieve):
values = station_data[var]
start = idx + var_idx * num_times
stop = start + num_times
# write common meta info for this station (data lon, lat and
# altitude are set to station locations)
data_obj._data[start:stop, data_obj._LATINDEX] = station_data["latitude"]
data_obj._data[start:stop, data_obj._LONINDEX] = station_data["longitude"]
data_obj._data[start:stop, data_obj._ALTITUDEINDEX] = station_data["altitude"]
data_obj._data[start:stop, data_obj._METADATAKEYINDEX] = meta_key
# write data to data object
data_obj._data[start:stop, data_obj._TIMEINDEX] = times
data_obj._data[start:stop, data_obj._DATAINDEX] = values
data_obj._data[start:stop, data_obj._VARINDEX] = var_idx
meta_idx[meta_key][var] = np.arange(start, stop)
if var in station_data["var_info"]:
if "units" in station_data["var_info"][var]:
u = station_data["var_info"][var]["units"]
elif "unit" in station_data["var_info"][var]:
from pyaerocom.exceptions import MetaDataError
raise MetaDataError("Metadata attr unit is deprecated, please use units")
elif var in self.UNITS:
# support other units coming from the reader class
# needed for non 1 proxy data from aeronet
u = self.UNITS[var]
else:
u = self.DEFAULT_UNIT
elif var in self.UNITS:
u = self.UNITS[var]
else:
u = self.DEFAULT_UNIT
meta["var_info"][var] = dict(units=u)
if var not in data_obj.var_idx:
data_obj.var_idx[var] = var_idx
idx += totnum
metadata[meta_key] = meta
meta_key = meta_key + 1.0
if skipped:
logger.warning(
f"{skipped} out of {len(files)} files have been skipped (for "
f"details see output)."
)
# shorten data_obj._data to the right number of points
data_obj._data = data_obj._data[:idx]
# data_obj.data_revision[self.data_id] = self.data_revision
return data_obj