import codecs
import logging
import os
from glob import glob

import numpy as np
import pandas as pd
from tqdm import tqdm

from pyaerocom.exceptions import DataRetrievalError
from import ReadUngriddedBase
from pyaerocom.stationdata import StationData
from pyaerocom.ungriddeddata import UngriddedData

logger = logging.getLogger(__name__)

[docs] class ReadAirNow(ReadUngriddedBase): """ Reading routine for North-American Air Now observations """ # Data type of files _FILETYPE = ".dam" # File search mask to recursively retrieve list of data files _FILEMASK = f"/monthly/*{_FILETYPE}" #: Version log of this class (for caching) __version__ = "0.12" #: Column delimiter FILE_COL_DELIM = "|" #: Columns in data files FILE_COL_NAMES = [ "date", "time", "station_id", "station_name", "time_zone", "variable", "unit", "value", "institute", ] # will be used millions of times, therefore store it FILE_COL_ROW_NUMBER = len(FILE_COL_NAMES) # row # for the variable ROW_VAR_COL = 5 #: Mapping of columns in station metadata file to pyaerocom standard STATION_META_MAP = { "aqsid": "station_id", "name": "station_name", "lat": "latitude", "lon": "longitude", "elevation": "altitude", "city": "city", "address": "address", "timezone": "timezone", "environment": "area_classification", "populationclass": "station_classification", "modificationdate": "modificationdate", "comment": "comment", } #: conversion functions for metadata dtypes STATION_META_DTYPES = { "station_id": str, "station_name": str, "latitude": float, "longitude": float, "altitude": float, "city": str, "address": str, "timezone": str, "area_classification": str, "station_classification": str, "modificationdate": str, "comment": str, } # strings to be replaced in original station names REPLACE_STATNAME = {"&": "and", "/": " ", ":": " ", ".": " ", "'": ""} # Years in timestamps in the files are 2-digit (e.g. 20 for 2020) BASEYEAR = 2000 #: Name of dataset (OBS_ID) DATA_ID = "AirNow" #: List of all datasets supported by this interface SUPPORTED_DATASETS = [DATA_ID] #: Units found in data files UNIT_MAP = { "C": "celcius", "M/S": "m s-1", "MILLIBAR": "mbar", "MM": "mm", "PERCENT": "%", "PPB": "ppb", "PPM": "ppm", "UG/M3": "ug m-3", "WATTS/M2": "W m-2", } #: Variable names in data files VAR_MAP = { "concbc": "BC", "concpm10": "PM10", "concpm25": "PM2.5", "vmrco": "CO", "vmrnh3": "NH3", "vmrno": "NO", "vmrno2": "NO2", "vmrnox": "NOX", "vmrnoy": "NOY", "vmro3": "OZONE", "vmrso2": "SO2", } #: List of variables that are provided PROVIDES_VARIABLES = list(VAR_MAP) #: Default variables DEFAULT_VARS = PROVIDES_VARIABLES #: Frequency of measurements TS_TYPE = "hourly" #: file containing station metadata STAT_METADATA_FILENAME = "allStations_20191224.csv" def __init__(self, data_id=None, data_dir=None): super().__init__(data_id=data_id, data_dir=data_dir) self.make_datetime64_array = np.vectorize(self._date_time_str_to_datetime64) self._station_metadata = None @property def station_metadata(self): """Dictionary containing global metadata for each site""" if self._station_metadata is None: self._init_station_metadata() return self._station_metadata def _date_time_str_to_datetime64(self, date, time): """ Convert date and time string into datetime64 object Parameters ---------- date : str date string as mm/dd/yy as in data files time : str time of the day as HH:MM Returns ------- datetime64[s] """ mm, dd, yy = date.split("/") yr = str(self.BASEYEAR + int(yy)) # returns as datetime64[s] return np.datetime64(f"{yr}-{mm}-{dd}T{time}:00") def _read_metadata_file(self): """ Read station metadatafile Returns ------- cfg : pandas.DataFrame metadata dataframe """ fn = os.path.join(self.data_dir, self.STAT_METADATA_FILENAME) cfg = pd.read_csv(fn, sep=",", converters={"aqsid": lambda x: str(x)}) return cfg def _correct_station_name(self, station_name): """ Remove unwanted chars from original station names Parameters ---------- station_name : str original station name Returns ------- str station name cleaned of chars defined in :attr:`REPLACE_STATNAME` """ for search, replace in self.REPLACE_STATNAME.items(): station_name = station_name.replace(search, replace) return station_name def _init_station_metadata(self): """ Initiate metadata for all stations Returns ------- dict dictionary with metadata dictionaries for all stations """ cfg = self._read_metadata_file() meta_map = self.STATION_META_MAP cols = list(cfg.columns.values) col_idx = {} for from_meta, to_meta in meta_map.items(): col_idx[to_meta] = cols.index(from_meta) arr = cfg.values dtypes = self.STATION_META_DTYPES stats = {} for row in arr: stat = {} for meta_key, col_num in col_idx.items(): stat[meta_key] = dtypes[meta_key](row[col_num]) sid = stat["station_id"] stat["station_name"] = self._correct_station_name(stat["station_name"]) stat["data_id"] = self.data_id stat["ts_type"] = self.TS_TYPE stats[sid] = stat self._station_metadata = stats return stats
[docs] def get_file_list(self): """ Retrieve list of data files Returns ------- list """ basepath = self.data_dir pattern = f"{basepath}{self._FILEMASK}" files = sorted(glob(pattern)) return files
def _read_file(self, file): """ Read one datafile using :func:`pandas.read_csv` Parameters ---------- file : str file path Returns ------- df : pandas.DataFrame DataFrame containing the file data """ # try utf_8 and cp863 reading first, then # determine file encoding and provide that to pandas # just determining the encoding is too slow given the # of files # Airbase consists of # just trying a couple of encodings and not determining the encoding all # the time speeds up reading by a factor of 5 # make sure certain dtypes are set for a few rows (e.g. station code as str) try: encoding = "utf_8" df = pd.read_csv( file, sep=self.FILE_COL_DELIM, names=self.FILE_COL_NAMES, encoding=encoding, on_bad_lines="skip", dtype={2: str, 4: float, 7: float}, ) except UnicodeDecodeError: encoding = "cp863" df = pd.read_csv( file, sep=self.FILE_COL_DELIM, names=self.FILE_COL_NAMES, encoding=encoding, on_bad_lines="skip", dtype={2: str, 4: float, 7: float}, ) except: encoding = self.get_file_encoding(file) df = pd.read_csv( file, sep=self.FILE_COL_DELIM, names=self.FILE_COL_NAMES, encoding=encoding, on_bad_lines="skip", dtype={ 2: str, 4: float, 7: float, }, ) return df def _read_files(self, files, vars_to_retrieve): """ Read input variables from list of files Parameters ---------- files : list list of data files vars_to_retrieve : list list of variables to retrieve Raises ------ NotImplementedError if several timezones are assigned to the same station AttributeError if data unit is unkown Returns ------- stats : list list of StationData objects """"Read AirNow data file(s)") file_vars_to_retrieve = [self.VAR_MAP[x] for x in vars_to_retrieve] # initialize empty dataframe varcol = self.FILE_COL_NAMES.index("variable") arrs = [] unique_stat_ids = None for i in tqdm(range(len(files)), disable=None): fp = files[i] filedata = self._read_file(fp) for i, filevar in enumerate(file_vars_to_retrieve): arrs.append(filedata[filedata["variable"] == filevar].values) if unique_stat_ids is None: unique_stat_ids = np.unique( (arrs[-1][:, self.FILE_COL_NAMES.index("station_id")]) ) else: try: unique_stat_ids = np.union1d( unique_stat_ids, np.unique((arrs[-1][:, self.FILE_COL_NAMES.index("station_id")])), ) except (ValueError, TypeError): print(arrs[-1][:, self.FILE_COL_NAMES.index("station_id")]) raise DataRetrievalError( f"file {fp}: error in creating unique stationlist" ) if len(arrs) == 0: raise DataRetrievalError("None of the input variables could be found in input list") return self._filedata_to_statlist(arrs, vars_to_retrieve, unique_stat_ids=unique_stat_ids) def _filedata_to_statlist( self, arrs: list[np.ndarray], vars_to_retrieve: list[str], unique_stat_ids: list[str] = None, ) -> list[StationData]: """ Convert loaded filedata into list of StationData objects Parameters ---------- arrs : list list of numpy arrays extracted from each file (see :func:`_read_files`). vars_to_retrieve : list list of variables to be retrieved from input data. Returns ------- stats : list list of :class:`StationData` objects, one for each var and station. :type unique_stat_ids: object """ # doubling of RAM usage! data = np.concatenate(arrs) # so kill the input data right afterwards arrs = None"Converting filedata to list of StationData") stat_meta = self.station_metadata stat_ids = list(stat_meta) varcol = self.FILE_COL_NAMES.index("variable") statcol = self.FILE_COL_NAMES.index("station_id") tzonecol = self.FILE_COL_NAMES.index("time_zone") unitcol = self.FILE_COL_NAMES.index("unit") valcol = self.FILE_COL_NAMES.index("value") stats = [] # shortcut for limited testing dataset because the code # below the try statement fails with testing: # E ValueError: cannot call `vectorize` on size 0 inputs unless `otypes` is set # ../../../mambaforge/envs/pyadev-applied/lib/python3.11/site-packages/numpy/lib/ ValueError try: dtime = self.make_datetime64_array(data[:, 0], data[:, 1]) except ValueError: raise DataRetrievalError("None of the input variables could be found in input list") for var in vars_to_retrieve: # extract only variable data (should speed things up) var_in_file = self.VAR_MAP[var] mask = data[:, varcol] == var_in_file # another RAM consuming op, the mask can just be used all the time (for the price of readability...) subset = data[mask] dtime_subset = dtime[mask] # there are stations with a all numeric station ID, but type hints in pd.read_csv made sure # they are read as str... if unique_stat_ids is None: statlist = np.unique((subset[:, statcol])) else: statlist = unique_stat_ids for stat_id in tqdm(statlist, desc=var, disable=None): if not stat_id in stat_ids: continue statmask = subset[:, statcol] == stat_id if statmask.sum() == 0: continue # RAM consuming op... statdata = subset[statmask] timestamps = dtime_subset[statmask] # timezone offsets (there's a half hour time zone!, so float) # not sure why the astype(float) is needed in between, but it does not work without... toffs = statdata[:, tzonecol].astype(float).astype("timedelta64[h]") timestamps += toffs stat = StationData(**stat_meta[stat_id]) vals = statdata[:, valcol] units = np.unique(statdata[:, unitcol]) # errors that did not occur in v0 but that may occur assert len(units) == 1 assert units[0] in self.UNIT_MAP stat["dtime"] = timestamps stat["timezone"] = "UTC" stat[var] = vals unit = self.UNIT_MAP[units[0]] stat["var_info"][var] = dict(units=unit) stats.append(stat) return stats
[docs] def read_file(self, filename, vars_to_retrieve=None): """ This method is returns just the raw content of a file as a dict Parameters ---------- filename : str absolute path to filename to read vars_to_retrieve : :obj:`list`, optional list of str with variable names to read. If None, use :attr:`DEFAULT_VARS` vars_as_series : bool if True, the data columns of all variables in the result dictionary are converted into pandas Series objects Returns ------- StationData dict-like object containing results Raises ------ NotImplementedError """ # unfortunately the files have different encodings, so we have to try them # on the entire file first ret_data = {} encoding = "utf_8" try: with open(filename, encoding=encoding) as infile: linedata = infile.readlines() except UnicodeDecodeError: encoding = "cp863" with open(filename, encoding=encoding) as infile: linedata = infile.readlines() except:"unforeseen encoding in file {filename}! Trying to determine encoding") try: encoding = self.get_file_encoding(filename)"determined encoding: {encoding}") with open(filename, encoding=encoding) as infile: linedata = infile.readlines() except MemoryError:"could not determine encoding due to MemoryError: Skipping file...") raise DataRetrievalError( f"could not determine encoding due to MemoryError: Skipping file..." ) return ret_data if vars_to_retrieve is None: vars_to_retrieve = self.DEFAULT_VARS file_vars_to_retrieve = [self.VAR_MAP[x] for x in vars_to_retrieve] tot_lines_retrieved = 0 for var in vars_to_retrieve: ret_data[var] = {} ret_data[var]["lines_retrieved"] = 0 ret_data[var]["linedata"] = [] for line in linedata: line_arr = line.split(self.FILE_COL_DELIM) # skip malformed lines if len(line_arr) != self.FILE_COL_ROW_NUMBER: continue # skip lines that do not contain data of an interesting variable if line_arr[self.ROW_VAR_COL] not in file_vars_to_retrieve: continue # make the numerical values numerical here already line_arr[4] = float(line_arr[4]) line_arr[7] = float(line_arr[7]) ret_data[var]["linedata"].append(line_arr) ret_data[var]["lines_retrieved"] += 1 tot_lines_retrieved += 1 return ret_data
[docs] def read(self, vars_to_retrieve=None, first_file=None, last_file=None): """ Read variable data Parameters ---------- vars_to_retrieve : str or list, optional List of variables to be retrieved. The default is None. first_file : int, optional Index of first file to be read. The default is None, in which case index 0 in file list is used. last_file : int, optional Index of last file to be read. The default is None, in which case last index in file list is used. Returns ------- data : UngriddedData loaded data object. """ if vars_to_retrieve is None: vars_to_retrieve = self.DEFAULT_VARS elif isinstance(vars_to_retrieve, str): vars_to_retrieve = [vars_to_retrieve] self._init_station_metadata() files = self.get_file_list() if first_file is None: first_file = 0 if last_file is None: last_file = len(files) files = files[first_file:last_file] stats = self._read_files(files, vars_to_retrieve) data = UngriddedData.from_station_data( stats, add_meta_keys=["timezone", "area_classification", "station_classification"] ) return data
[docs] def get_file_bom_encoding(self, filename): with open(filename, "rb") as openfileobject: line = str(openfileobject.readline()) if line[2:14] == str(codecs.BOM_UTF8).split("'")[1]: return "utf_8" if line[2:10] == str(codecs.BOM_UTF16_BE).split("'")[1]: return "utf_16" if line[2:10] == str(codecs.BOM_UTF16_LE).split("'")[1]: return "utf_16" if line[2:18] == str(codecs.BOM_UTF32_BE).split("'")[1]: return "utf_32" if line[2:18] == str(codecs.BOM_UTF32_LE).split("'")[1]: return "utf_32" return ""
[docs] def get_all_file_encodings(self, filename): encoding_list = [] encodings = ( "utf_8", "cp863", "utf_16", "utf_16_le", "utf_16_be", "utf_32", "utf_32_be", "utf_32_le", "cp850", "cp437", "cp852", "cp1252", "cp1250", "ascii", "utf_8_sig", "big5", "big5hkscs", "cp037", "cp424", "cp500", "cp720", "cp737", "cp775", "cp855", "cp856", "cp857", "cp858", "cp860", "cp861", "cp862", "cp864", "cp865", "cp866", "cp869", "cp874", "cp875", "cp932", "cp949", "cp950", "cp1006", "cp1026", "cp1140", "cp1251", "cp1253", "cp1254", "cp1255", "cp1256", "cp1257", "cp1258", "euc_jp", "euc_jis_2004", "euc_jisx0213", "euc_kr", "gb2312", "gbk", "gb18030", "hz", "iso2022_jp", "iso2022_jp_1", "iso2022_jp_2", "iso2022_jp_2004", "iso2022_jp_3", "iso2022_jp_ext", "iso2022_kr", "latin_1", "iso8859_2", "iso8859_3", "iso8859_4", "iso8859_5", "iso8859_6", "iso8859_7", "iso8859_8", "iso8859_9", "iso8859_10", "iso8859_13", "iso8859_14", "iso8859_15", "iso8859_16", "johab", "koi8_r", "koi8_u", "mac_cyrillic", "mac_greek", "mac_iceland", "mac_latin2", "mac_roman", "mac_turkish", "ptcp154", "shift_jis", "shift_jis_2004", "shift_jisx0213", ) for e in encodings: try: fh =, "r", encoding=e) fh.readlines() except UnicodeDecodeError: fh.close() except UnicodeError: fh.close() else: encoding_list.append([e]) fh.close() continue return encoding_list
[docs] def get_file_encoding(self, filename): file_encoding = self.get_file_bom_encoding(filename) if file_encoding != "": return file_encoding encoding_list = self.get_all_file_encodings(filename) file_encoding = str(encoding_list[0][0]) if file_encoding[-3:] == "_be" or file_encoding[-3:] == "_le": file_encoding = file_encoding[:-3] return file_encoding