Source code for pyaerocom.io.read_eea_aqerep_base

"""
Interface for reading EEA AqERep files (formerly known as Airbase data).
"""
import gzip
import logging
import os
import pathlib
import shutil
import tempfile

import cf_units
import numpy as np
import pandas as pd
from tqdm import tqdm

from pyaerocom.exceptions import EEAv2FileError, TemporalResolutionError
from pyaerocom.io.helpers import get_country_name_from_iso
from pyaerocom.io.readungriddedbase import ReadUngriddedBase
from pyaerocom.stationdata import StationData
from pyaerocom.ungriddeddata import UngriddedData

logger = logging.getLogger(__name__)


[docs] class ReadEEAAQEREPBase(ReadUngriddedBase): """Class for reading EEA AQErep data Extended class derived from low-level base class :class:`ReadUngriddedBase` that contains some more functionality. Note ---- Currently only single variable reading into an :class:`UngriddedData` object is supported. """ #: Mask for identifying datafiles _FILEMASK = "*.csv" #: Version log of this class (for caching) __version__ = "0.09" #: Column delimiter FILE_COL_DELIM = "," #: Name of the dataset (OBS_ID) DATA_ID = "" # change this since we added more vars? #: List of all datasets supported by this interface SUPPORTED_DATASETS = [DATA_ID] #: There is no global ts_type but it is specified in the data files... TS_TYPE = "variable" #: sampling frequencies found in data files TS_TYPES_FILE = { "hour": "hourly", "day": "daily", } #: Dictionary specifying values corresponding to invalid measurements #: there's no value for NaNs in this data set. It uses an empty string NAN_VAL = {} #: Dictionary specifying the file column names (values) for each Aerocom #: variable (keys) # There's only one variable in each file named concentration VAR_NAMES_FILE = {} VAR_NAMES_FILE["concso2"] = "concentration" VAR_NAMES_FILE["conco3"] = "concentration" VAR_NAMES_FILE["concno2"] = "concentration" VAR_NAMES_FILE["concco"] = "concentration" VAR_NAMES_FILE["concno"] = "concentration" VAR_NAMES_FILE["concpm10"] = "concentration" VAR_NAMES_FILE["concpm25"] = "concentration" VAR_NAMES_FILE["vmro3"] = "concentration" VAR_NAMES_FILE["vmro3max"] = "concentration" VAR_NAMES_FILE["vmrno2"] = "concentration" VAR_NAMES_FILE["concSso2"] = "concentration" VAR_NAMES_FILE["concNno"] = "concentration" VAR_NAMES_FILE["concNno2"] = "concentration" #: units of variables in files (needs to be defined for each variable supported) VAR_UNITS_FILE = { "µg/m3": "ug m-3", "mg/m3": "mg m-3", "µgS/m3": "ug S m-3", "µgN/m3": "ug N m-3", "ppb": "ppb", } #: file masks for the data files FILE_MASKS = dict( concso2="**/??_1_*_timeseries.csv*", concpm10="**/??_5_*_timeseries.csv*", conco3="**/??_7_*_timeseries.csv*", vmro3="**/??_7_*_timeseries.csv*", vmro3max="**/??_7_*_timeseries.csv*", concno2="**/??_8_*_timeseries.csv*", vmrno2="**/??_8_*_timeseries.csv*", concco="**/??_10_*_timeseries.csv*", concno="**/??_38_*_timeseries.csv*", concpm25="**/??_6001_*_timeseries.csv*", concSso2="**/??_1_*_timeseries.csv*", concNno2="**/??_8_*_timeseries.csv*", concNno="**/??_38_*_timeseries.csv*", ) # conversion factor between concX and vmrX CONV_FACTOR = {} CONV_FACTOR["concSso2"] = np.float_(0.50052292274792) CONV_FACTOR["concNno2"] = np.float_(0.3044517868011477) CONV_FACTOR["concNno"] = np.float_(0.466788868521913) CONV_FACTOR["vmro3"] = np.float_( 0.493 ) # retrieved using STD atmosphere from geonum and pya.mathutils.concx_to_vmrx CONV_FACTOR["vmro3max"] = np.float_( 0.493 ) # retrieved using STD atmosphere from geonum and pya.mathutils.concx_to_vmrx CONV_FACTOR["vmrno2"] = np.float_( 0.514 ) # retrieved using STD atmosphere from geonum and pya.mathutils.concx_to_vmrx # unit of the converted property after the conversion CONV_UNIT = {} CONV_UNIT["concSso2"] = "µgS/m3" CONV_UNIT["concNno2"] = "µgN/m3" CONV_UNIT["concNno"] = "µgN/m3" CONV_UNIT["vmro3"] = "ppb" CONV_UNIT["vmro3max"] = "ppb" CONV_UNIT["vmrno2"] = "ppb" #: field name of the start time of the measurement (in lower case) START_TIME_NAME = "datetimebegin" #: filed name of the end time of the measurement (in lower case) END_TIME_NAME = "datetimeend" #: dictionary that connects the EEA variable codes with aerocom variable names VAR_CODES = {} VAR_CODES["1"] = "concso2" VAR_CODES["5"] = "concpm10" VAR_CODES["7"] = "conco3" VAR_CODES["8"] = "concno2" VAR_CODES["10"] = "concco" VAR_CODES["38"] = "concno" VAR_CODES["6001"] = "concpm25" #: column name that holds the EEA variable code VAR_CODE_NAME = "airpollutantcode" #: List of variables that are provided by this dataset (will be extended #: by auxiliary variables on class init, for details see __init__ method of #: base class ReadUngriddedBase) PROVIDES_VARIABLES = list(VAR_NAMES_FILE) #: there's no general instrument name in the data INSTRUMENT_NAME = "unknown" #: max time steps to read per file (hourly data) # to make numpy array allocation size static MAX_LINES_TO_READ = 24 * 366 #: file name of the metadata file #: this will be prepended with a data path later on # this file is in principe updated once a day. # so we night consider updating it from within the code later on. # URL: https://discomap.eea.europa.eu/map/fme/metadata/PanEuropean_metadata.csv DEFAULT_METADATA_FILE = "metadata.csv" #: Name of latitude variable in metadata file LATITUDENAME = "latitude" #: name of longitude variable in metadata file LONGITUDENAME = "longitude" #: name of altitude variable in metadata file ALTITUDENAME = "altitude" #: this class reads the European Environment Agency's Eionet data #: for details please read #: https://www.eea.europa.eu/about-us/countries-and-eionet WEBSITE = "https://discomap.eea.europa.eu/map/fme/AirQualityExport.htm" #: Eionet offers 2 data revisions #: E2a (near real time) and E1a (quality controlled) #: this class reads the E2a data for now. # But by changing the base path # and this constant, it can also read the E1a data set DATA_PRODUCT = "" AUX_REQUIRES = { "vmro3max": ["conco3"], "vmro3": ["conco3"], "vmrno2": ["concno2"], "concNno2": ["concno2"], "concNno": ["concno"], "concSso2": ["concso2"], } AUX_FUNS = { "vmro3": NotImplementedError(), "vmro3max": NotImplementedError(), "vmrno2": NotImplementedError(), "concNno2": NotImplementedError(), "concNno": NotImplementedError(), "concSso2": NotImplementedError(), } def __init__(self, data_id=None, data_dir=None): super().__init__(data_id=data_id, data_dir=data_dir) self._metadata = None @property def DEFAULT_VARS(self): """List of default variables""" return [self.VAR_CODES["7"]] @property def DATASET_NAME(self): """Name of the dataset""" return self.data_id
[docs] def read_file(self, filename, var_name, vars_as_series=False): """Read a single EEA file Note that there's only a single variable in the file Parameters ---------- filename : str Absolute path to filename to read. var_name : str Name of variable in file. vars_as_series : bool If True, the data columns of all variables in the result dictionary are converted into pandas Series objects. Returns ------- StationData Dict-like object containing the results. """ if not var_name in self.PROVIDES_VARIABLES: raise ValueError(f"Invalid input variable {var_name}") # there's only one variable in the file aerocom_var_name = var_name # Iterate over the lines of the file self.logger.info(f"Reading file {filename}") file_delimiter = self.FILE_COL_DELIM # this lists the data to keep from the original read string # this becomes a time series file_indexes_to_keep = [11, 13, 14, 15, 16] # used for line length control... max_file_index_to_keep = max(file_indexes_to_keep) # this is some header information header_indexes_to_keep = [ 0, 3, 8, 9, 10, 12, ] # These are the indexes with a time and are stored as np.datetime64 time_indexes = [13, 14] # read the file # enable alternative reading of .gz files here to save space on the file system suffix = pathlib.Path(filename).suffix if suffix == ".gz": f_out = tempfile.NamedTemporaryFile(delete=False) with gzip.open(filename, "r") as f_in: shutil.copyfileobj(f_in, f_out) read_filename = f_out.name f_out.close() else: read_filename = filename # input files can be either UTF-8 or UTF-16 encoded # try both # files are max 3MB in size, so no big deal terms of RAM usage try: with open(read_filename) as f: lines = f.readlines() except UnicodeDecodeError: with open(read_filename, encoding="UTF-16") as f: lines = f.readlines() except: if suffix == ".gz": os.remove(f_out.name) raise EEAv2FileError(f"Found corrupt file {filename}. consider deleteing it") # remove the temp file in case the input file was a gz file if suffix == ".gz": os.remove(f_out.name) header = lines[0].lower().rstrip().split(file_delimiter) # create output dict if len(header) < max_file_index_to_keep: raise EEAv2FileError(f"Found corrupt file {filename}. consider deleting it") data_dict = {} for idx in header_indexes_to_keep: data_dict[header[idx]] = "" for idx in file_indexes_to_keep: if idx in time_indexes: data_dict[header[idx]] = np.zeros(self.MAX_LINES_TO_READ, dtype="datetime64[s]") else: data_dict[header[idx]] = np.empty(self.MAX_LINES_TO_READ, dtype=np.float_) # read the data... # DE,http://gdi.uba.de/arcgis/rest/services/inspire/DE.UBA.AQD,NET.DE_BB,STA.DE_DEBB054,DEBB054,SPO.DE_DEBB054_PM2_dataGroup1,SPP.DE_DEBB054_PM2_automatic_light-scat_Duration-30minute,SAM.DE_DEBB054_2,PM2.5,http://dd.eionet.europa.eu/vocabulary/aq/pollutant/6001,hour,3.2000000000,µg/m3,2020-01-04 00:00:00 +01:00,2020-01-04 01:00:00 +01:00,1,2 lineidx = 0 for line in lines[1:]: rows = line.rstrip().split(file_delimiter) # Unfortunately there's a lot of corrupt files # skip data line if the # rows is not sufficient if len(rows) < max_file_index_to_keep: continue if lineidx == 0: for idx in header_indexes_to_keep: if header[idx] != self.VAR_CODE_NAME: data_dict[header[idx]] = rows[idx] else: # extract the EEA var code from the URL noted in the data file data_dict[header[idx]] = rows[idx].split("/")[-1] for idx in file_indexes_to_keep: # if the data is a time if idx in time_indexes: # make the time string ISO compliant so that numpy can directly read it # this is not very time string forgiving but fast data_dict[header[idx]][lineidx] = np.datetime64( rows[idx][0:10] + "T" + rows[idx][11:19] ) # due to the deprecation of the timezone interpretation after numpy 0.11 # we have to substract the offset manually to get to UTC. # np.timedelta64 does not accept a float as parameter, only an integer. # Although there are time zones with a 30 minutes offset, these don't # exist in Europe, so just consider integer hours here for speed tz_offset = np.timedelta64(np.int64(rows[idx][20:23]), "h") data_dict[header[idx]][lineidx] = data_dict[header[idx]][lineidx] - tz_offset else: # data is not a time # sometimes there's no value in the file. Set that to nan try: data_dict[header[idx]][lineidx] = np.float_(rows[idx]) except (ValueError, IndexError): data_dict[header[idx]][lineidx] = np.nan lineidx += 1 # if the first line in the file was empty if data_dict["unitofmeasurement"] == "": if rows[12] == "": raise EEAv2FileError( f"Unit of Measurment could not be inferred from EEA file {filename}" ) else: # with loss of generality get the unitofmeasurement from the last row column 12 (which should be a kept header) data_dict["unitofmeasurement"] = rows[12] unit_in_file = data_dict["unitofmeasurement"] # adjust the unit and apply conversion factor in case we read a variable noted in self.AUX_REQUIRES if var_name in self.AUX_REQUIRES: unit_in_file = self.CONV_UNIT[var_name] data_dict[self.VAR_NAMES_FILE[var_name]] = ( data_dict[self.VAR_NAMES_FILE[var_name]] * self.CONV_FACTOR[var_name] ) try: unit = self.VAR_UNITS_FILE[unit_in_file] except KeyError: # this will raise an Exception if cf_units cannot handle. In # which case the unit should be added in VAR_UNITS_FILE unit = str(cf_units.Unit(unit_in_file)) # Empty data object (a dictionary with extended functionality) data_out = StationData() data_out.data_id = self.data_id data_out.dataset_name = self.DATASET_NAME data_out.station_id = data_dict["airqualitystation"] data_out.station_name = data_dict["airqualitystation"] data_out.filename = filename data_out.instrument_name = self.INSTRUMENT_NAME data_out.country_code = data_dict["countrycode"] freq = data_dict["averagingtime"] try: tstype = self.TS_TYPES_FILE[freq] except KeyError: raise TemporalResolutionError( f"Found invalid ts_type {freq}. Please register in class header " f"attr TS_TYPES_FILE" ) data_out.ts_type = tstype # ToDo: check "variables" entry, it should not be needed anymore in UngriddedData data_out["variables"] = [aerocom_var_name] data_out["var_info"][aerocom_var_name] = {} data_out["var_info"][aerocom_var_name]["units"] = unit # TsType is # data_out['var_info'][aerocom_var_name]['ts_type'] = self.TS_TYPE # Sometimes the times in the data files are not ordered in time which causes problems when doing # time interpolations later on. Make sure that the data is ordered in time diff_unsorted = np.diff(data_dict[self.START_TIME_NAME]) sort_flag = False # use a vectorised time_diff instead of scalar one as before time_diff = ( data_dict[self.END_TIME_NAME][:lineidx] - data_dict[self.START_TIME_NAME][:lineidx] ) / 2.0 # np.min needs an array and fails with ValueError when a scalar is supplied # this is the case for a single line file try: min_diff = np.min(diff_unsorted) except ValueError: min_diff = 0 if min_diff < 0: # data needs to be sorted ordered_idx = np.argsort(data_dict[self.START_TIME_NAME][:lineidx]) data_out["dtime"] = ( data_dict[self.START_TIME_NAME][ordered_idx] + time_diff[ordered_idx] ) sort_flag = True else: data_out["dtime"] = data_dict[self.START_TIME_NAME][:lineidx] + time_diff for key, value in data_dict.items(): # adjust the variable name to aerocom standard if key != self.VAR_NAMES_FILE[aerocom_var_name]: data_out[key] = value[:lineidx] else: if sort_flag: data_out[aerocom_var_name] = value[:lineidx][ordered_idx] else: data_out[aerocom_var_name] = value[:lineidx] # convert data vectors to pandas.Series (if attribute # vars_as_series=True) if vars_as_series: data_out[aerocom_var_name] = pd.Series( data_out[aerocom_var_name], index=data_out["dtime"] ) return data_out
def _read_metadata_file(self, filename=None): """Read EEA metadata file Parameters ---------- filename : str Absolute path to filename to read. Returns ------- metadata Dict-like object containing the results with the keys being a combination of the station name and the variable and the values being all fields of the metadata file """ if filename is None: filename = os.path.join(self.data_dir, self.DEFAULT_METADATA_FILE) # test also for a gzipped file... if not os.path.isfile(filename): filename = filename + ".gz" self.logger.warning(f"Reading file {filename}") struct_data = {} suffix = pathlib.Path(filename).suffix if suffix == ".gz": f_out = tempfile.NamedTemporaryFile(delete=False) with gzip.open(filename, "r") as f_in: shutil.copyfileobj(f_in, f_out) filename = f_out.name f_in.close() with open(filename) as f: # read header... # Countrycode Timezone Namespace AirQualityNetwork AirQualityStation AirQualityStationEoICode AirQualityStationNatCode SamplingPoint SamplingProces Sample AirPollutantCode ObservationDateBegin ObservationDateEnd Projection Longitude Latitude Altitude MeasurementType AirQualityStationType AirQualityStationArea EquivalenceDemonstrated MeasurementEquipment InletHeight BuildingDistance KerbDistance header = f.readline().lower().rstrip().split() min_row_no = len(header) # create output dict data_dict = {} for key in header: data_dict[key] = [] lineidx = 0 bad_line_no = 0 bad_line_arr = [] for line in f: rows = line.rstrip().split("\t") # skip too short lines if len(rows) < min_row_no: # print(line) bad_line_no += 1 bad_line_arr.append(line) continue temp_dict = {} for idx, key in enumerate(header): if header[idx] != self.VAR_CODE_NAME: # data_dict[header[idx]] = rows[idx] temp_dict[header[idx]] = rows[idx] else: # extract the EEA var code from the URL noted in the data file # data_dict[header[idx]] = rows[idx].split('/')[-1] temp_dict[header[idx]] = rows[idx].split("/")[-1] meta_key = f"{temp_dict['airqualitystation']}__{temp_dict['airpollutantcode']}" if meta_key not in struct_data: struct_data[meta_key] = temp_dict.copy() else: pass lineidx += 1 self.logger.info(f"Reading file {filename} done") # remove the temp file in case the input file was a gz file if suffix == ".gz": f_out.close() os.remove(f_out.name) return struct_data
[docs] def get_file_list(self, pattern=None): """Search all files to be read Uses :attr:`_FILEMASK` (+ optional input search pattern, e.g. station_name) to find valid files for query. Parameters ---------- pattern : str, optional file name pattern applied to search Returns ------- list list containing retrieved file locations Raises ------ IOError if no files can be found """ import glob import os from pyaerocom._lowlevel_helpers import list_to_shortstr if pattern is None: logger.warning("using default pattern *.* for file search") pattern = "*.*" self.logger.info("Fetching data files. This might take a while...") fp = os.path.join(self.data_dir, pattern) files = sorted(glob.glob(fp, recursive=True)) if not len(files) > 0: all_str = list_to_shortstr(os.listdir(self.data_dir)) # raise DataSourceError('No files could be detected matching file ' # 'mask {} in dataset {}, files in folder {}:\n' # 'Files in folder:{}'.format(pattern, # self.data_id, # self.data_dir, # all_str)) return files
[docs] def get_station_coords(self, meta_key): """ get a station's coordinates Parameters ---------- meta_key : str string with the internal station key """ ret_data = {} ret_data["latitude"] = float(self._metadata[meta_key][self.LATITUDENAME]) ret_data["longitude"] = float(self._metadata[meta_key][self.LONGITUDENAME]) ret_data["altitude"] = float(self._metadata[meta_key][self.ALTITUDENAME]) return ret_data
[docs] def read( self, vars_to_retrieve=None, files=None, first_file=None, last_file=None, metadatafile=None ): """Method that reads list of files as instance of :class:`UngriddedData` Parameters ---------- vars_to_retrieve : :obj:`list` or similar, optional List containing variable IDs that are supposed to be read. If None, all variables in :attr:`PROVIDES_VARIABLES` are loaded. files : :obj:`list`, optional List of files to be read. If None, then the file list used is the returned from :func:`get_file_list`. first_file : :obj:`int`, optional Index of the first file in :obj:'file' to be read. If None, the very first file in the list is used. last_file : :obj:`int`, optional Index of the last file in :obj:'file' to be read. If None, the very last file in the list is used. metadatafile : :obj:'str', optional full qualified path to metadata file. If None, the default metadata file will be used Returns ------- UngriddedData data object """ if vars_to_retrieve is None: vars_to_retrieve = self.DEFAULT_VARS elif isinstance(vars_to_retrieve, str): vars_to_retrieve = [vars_to_retrieve] if len(vars_to_retrieve) > 1: raise NotImplementedError("So far, only one variable can be read at a time...") var_name = vars_to_retrieve[0] logger.info("Reading EEA data") if files is None: logger.info("Retrieving file list") files = self.get_file_list(self.FILE_MASKS[var_name]) if first_file is None: first_file = 0 if last_file is None: last_file = len(files) if metadatafile is None: metadatafile = os.path.join(self.data_dir, self.DEFAULT_METADATA_FILE) files = files[first_file:last_file] data_obj = UngriddedData() meta_key = 0.0 idx = 0 # Assign metadata object and index metadata = data_obj.metadata meta_idx = data_obj.meta_idx logger.info("Reading metadata file") # non compliant, but efficiently indexed metadata self._metadata = self._read_metadata_file(metadatafile) # returns a dict with country codes as keys and the country names as value _country_dict = get_country_name_from_iso() logger.info("Reading files...") for i in tqdm(range(len(files)), disable=None): _file = files[i] try: station_data = self.read_file(_file, var_name=var_name) except EEAv2FileError: self.logger.warning(f"file {_file} is corrupt! consider deleting it") continue except TemporalResolutionError as e: self.logger.warning(f"{_file} has TemporalResolutionError") logger.warning(f"{repr(e)}. Skipping file...") continue # readfile might fail outside of the error captured by the try statement above if station_data is None: self.logger.warning(f"file {_file} did not provide data. skipping...!") continue # to find the metadata quickly, we use a string internally _meta_key = f"{station_data['station_id']}__{station_data['airpollutantcode']}" # Fill the metadata dict. # The location in the data set is time step dependant if _meta_key not in self._metadata: self.logger.warning( f"metadata for station {_meta_key} not found! skipping that station!" ) continue metadata[meta_key] = {} meta_idx[meta_key] = {} metadata[meta_key].update(station_data.get_meta()) metadata[meta_key].update(self.get_station_coords(_meta_key)) metadata[meta_key]["variables"] = list( station_data.var_info.keys() ) # vars_to_retrieve metadata[meta_key]["station_classification"] = self._metadata[_meta_key][ "airqualitystationtype" ] metadata[meta_key]["area_classification"] = self._metadata[_meta_key][ "airqualitystationarea" ] try: metadata[meta_key]["country"] = _country_dict[metadata[meta_key]["country_code"]] except KeyError: pass metadata[meta_key]["var_info"] = station_data["var_info"] metadata[meta_key]["website"] = self.WEBSITE metadata[meta_key]["data_product"] = self.DATA_PRODUCT metadata[meta_key]["station_name"] = self._metadata[_meta_key][ "airqualitystationeoicode" ] # List with indices of this station for each variable num_times = len(station_data["dtime"]) # Check whether the size of the data object needs to be extended if (idx + num_times) >= data_obj._ROWNO: # if totnum < data_obj._CHUNKSIZE, then the latter is used data_obj.add_chunk(num_times) for var_idx, var in enumerate(list(station_data.var_info)): # set invalid data to np.nan according to # https://dd.eionet.europa.eu/vocabulary/aq/observationvalidity/view # data flagged as below the detection limit (values 2 and 3) # will remain in the data station_data[var][station_data["validity"] < 1] = np.nan values = station_data[var] start = idx + var_idx * num_times stop = start + num_times data_obj._data[start:stop, data_obj._METADATAKEYINDEX] = meta_key data_obj._data[start:stop, data_obj._TIMEINDEX] = station_data["dtime"] data_obj._data[start:stop, data_obj._DATAINDEX] = values data_obj._data[start:stop, data_obj._VARINDEX] = var_idx meta_idx[meta_key][var] = np.arange(start, stop) if not var in data_obj.var_idx: data_obj.var_idx[var] = var_idx idx += num_times meta_key = meta_key + 1.0 # Shorten data_obj._data to the right number of points data_obj._data = data_obj._data[:idx] # data_obj.data_revision[self.DATASET_NAME] = self.data_revision self.data = data_obj self._metadata = None self.files = files return data_obj