import getpass
import logging
import os
from configparser import ConfigParser
from pathlib import Path
import numpy as np
from pyaerocom import obs_io
from pyaerocom._lowlevel_helpers import (
check_dir_access,
check_write_access,
chk_make_subdir,
list_to_shortstr,
)
from pyaerocom.data import resources
from pyaerocom.exceptions import DataIdError, DataSourceError
from pyaerocom.grid_io import GridIO
from pyaerocom.region_defs import ALL_REGION_NAME, HTAP_REGIONS, OLD_AEROCOM_REGIONS
from pyaerocom.varcollection import VarCollection
logger = logging.getLogger(__name__)
[docs]
class Config:
"""Class containing relevant paths for read and write routines
A loaded instance of this class is created on import of pyaerocom and
can be accessed via `pyaerocom.const`.
TODO: provide more information
"""
# NAMES
# default names of the different obs networks
# might get overwritten from paths.ini see func read_config
#: ICP Forests
ICPFORESTS_NAME = "ICPFORESTS"
#: Aeronet Sun V2 access names
AERONET_SUN_V2L15_AOD_DAILY_NAME = "AeronetSunV2Lev1.5.daily"
AERONET_SUN_V2L15_AOD_ALL_POINTS_NAME = "AeronetSun_2.0_NRT"
AERONET_SUN_V2L2_AOD_DAILY_NAME = "AeronetSunV2Lev2.daily"
AERONET_SUN_V2L2_AOD_ALL_POINTS_NAME = "AeronetSunV2Lev2.AP"
#: Aeronet SDA V2 access names
AERONET_SUN_V2L2_SDA_DAILY_NAME = "AeronetSDAV2Lev2.daily"
AERONET_SUN_V2L2_SDA_ALL_POINTS_NAME = "AeronetSDAV2Lev2.AP"
# Aeronet V2 inversion products
AERONET_INV_V2L15_DAILY_NAME = "AeronetInvV2Lev1.5.daily"
AERONET_INV_V2L15_ALL_POINTS_NAME = "AeronetInvV2Lev1.5.AP"
AERONET_INV_V2L2_DAILY_NAME = "AeronetInvV2Lev2.daily"
AERONET_INV_V2L2_ALL_POINTS_NAME = "AeronetInvV2Lev2.AP"
#: Aeronet Sun V3 access names
AERONET_SUN_V3L15_AOD_DAILY_NAME = "AeronetSunV3Lev1.5.daily"
AERONET_SUN_V3L15_AOD_ALL_POINTS_NAME = "AeronetSunV3Lev1.5.AP"
AERONET_SUN_V3L2_AOD_DAILY_NAME = "AeronetSunV3Lev2.daily"
AERONET_SUN_V3L2_AOD_ALL_POINTS_NAME = "AeronetSunV3Lev2.AP"
#: Aeronet SDA V3 access names
AERONET_SUN_V3L15_SDA_DAILY_NAME = "AeronetSDAV3Lev1.5.daily"
AERONET_SUN_V3L15_SDA_ALL_POINTS_NAME = "AeronetSDAV3Lev1.5.AP"
AERONET_SUN_V3L2_SDA_DAILY_NAME = "AeronetSDAV3Lev2.daily"
AERONET_SUN_V3L2_SDA_ALL_POINTS_NAME = "AeronetSDAV3Lev2.AP"
#: Aeronet V3 inversions
AERONET_INV_V3L15_DAILY_NAME = "AeronetInvV3Lev1.5.daily"
AERONET_INV_V3L2_DAILY_NAME = "AeronetInvV3Lev2.daily"
#: CAMS2_83 name
CAMS2_83_NRT_NAME = "CAMS2_83.NRT"
#: EBAS name
EBAS_MULTICOLUMN_NAME = "EBASMC"
#: EEA nmea
EEA_NAME = "EEAAQeRep"
#: EEA.NRT name
EEA_NRT_NAME = "EEAAQeRep.NRT"
#: EEAV2 name
EEA_V2_NAME = "EEAAQeRep.v2"
#: Earlinet access name;
EARLINET_NAME = "EARLINET"
#: GAW TAD subset aas et al paper
GAWTADSUBSETAASETAL_NAME = "GAWTADsubsetAasEtAl"
#: DMS
DMS_AMS_CVO_NAME = "DMS_AMS_CVO"
#: MEP name
MEP_NAME = "MEP"
#: ICOS name
ICOS_NAME = "ICOS"
#: boolean specifying wheter EBAS DB is copied to local cache for faster
#: access, defaults to True
EBAS_DB_LOCAL_CACHE = True
#: Lowest possible year in data
MIN_YEAR = 0
#: Highest possible year in data
MAX_YEAR = 20000
#: standard names for coordinates
STANDARD_COORD_NAMES = ["latitude", "longitude", "altitude"]
#: Information specifying default vertical grid for post processing of
#: profile data. The values are in units of m.
DEFAULT_VERT_GRID_DEF = dict(lower=0, upper=15000, step=250)
#: maximum allowed RH to be considered dry
RH_MAX_PERCENT_DRY = 40
DEFAULT_REG_FILTER = f"{ALL_REGION_NAME}-wMOUNTAINS"
#: Time resample strategies for certain cominations, first level refers
#: to TO, second to FROM and values are minimum number of observations
OBS_MIN_NUM_RESAMPLE = dict(
yearly=dict(monthly=3),
monthly=dict(daily=7),
daily=dict(hourly=6),
hourly=dict(minutely=15),
)
#: This boolean can be used to enable / disable the former (i.e. use
#: available wavelengths of variable in a certain range around variable
#: wavelength).
OBS_ALLOW_ALT_WAVELENGTHS = obs_io.OBS_ALLOW_ALT_WAVELENGTHS
#: Wavelength tolerance for observations imports
OBS_WAVELENGTH_TOL_NM = obs_io.OBS_WAVELENGTH_TOL_NM
CLIM_START = 2005
CLIM_STOP = 2015
CLIM_FREQ = "daily"
CLIM_RESAMPLE_HOW = "mean" # median, ...
# as a function of climatological frequency
CLIM_MIN_COUNT = dict(
daily=30, monthly=5 # at least 30 daily measurements in each month over whole period
) # analogue to daily ...
# names for the satellite data sets
SENTINEL5P_NAME = "Sentinel5P"
AEOLUS_NAME = "AeolusL2A"
OLD_AEROCOM_REGIONS = OLD_AEROCOM_REGIONS
URL_HTAP_MASKS = "https://pyaerocom.met.no/pyaerocom-suppl/htap_masks/"
HTAP_REGIONS = HTAP_REGIONS
RM_CACHE_OUTDATED = True
#: Name of the file containing the revision string of an obs data network
REVISION_FILE = "Revision.txt"
#: timeout to check if one of the supported server locations can be
#: accessed
SERVER_CHECK_TIMEOUT = 1 # s
_outhomename = "MyPyaerocom"
with resources.path("pyaerocom.data", "paths.ini") as path:
_config_ini_lustre = str(path)
with resources.path("pyaerocom.data", "paths_user_server.ini") as path:
_config_ini_user_server = str(path)
with resources.path("pyaerocom.data", "paths_local_database.ini") as path:
_config_ini_localdb = str(path)
# this dictionary links environment ID's with corresponding ini files
_config_files = {
"metno": _config_ini_lustre,
"users-db": _config_ini_user_server,
"local-db": _config_ini_localdb,
}
# this dictionary links environment ID's with corresponding subdirectory
# names that are required to exist in order to load this environment
_check_subdirs_cfg = {"metno": "aerocom", "users-db": "AMAP", "local-db": "modeldata"}
with resources.path("pyaerocom.data", "variables.ini") as path:
_var_info_file = str(path)
with resources.path("pyaerocom.data", "coords.ini") as path:
_coords_info_file = str(path)
# these are searched in preferred order both in root and home
_DB_SEARCH_SUBDIRS = {}
_DB_SEARCH_SUBDIRS["lustre/storeB/project"] = "metno"
_DB_SEARCH_SUBDIRS["metno/aerocom_users_database"] = "users-db"
_DB_SEARCH_SUBDIRS["MyPyaerocom/data"] = "local-db"
DONOTCACHEFILE = None
ERA5_SURFTEMP_FILENAME = "era5.msl.t2m.201001-201012.nc"
_LUSTRE_CHECK_PATH = "/project/aerocom/aerocom1/"
def __init__(self, config_file=None, try_infer_environment=True):
# Directories
self._outputdir = None
self._cache_basedir = None
self._colocateddatadir = None
self._logdir = None
self._filtermaskdir = None
self._local_tmp_dir = None
self._downloaddatadir = None
self._confirmed_access = []
self._rejected_access = []
# Options
self._caching_active = True
self._var_param = None
self._coords = None
# Attributes that are used to store search directories
self.OBSLOCS_UNGRIDDED = {}
self.OBS_UNGRIDDED_POST = {}
self.SUPPLDIRS = {}
self._search_dirs = []
self.WRITE_FILEIO_ERR_LOG = True
self.last_config_file = None
self._ebas_flag_info = None
#: Settings for reading and writing of gridded data
self.GRID_IO = GridIO()
if config_file is not None:
if not os.path.exists(config_file):
raise FileNotFoundError(f"input config file does not exist {config_file}")
elif not config_file.endswith("ini"):
raise ValueError("Need path to an ini file for input config_file")
basedir, config_file = os.path.split(config_file)
elif try_infer_environment:
try:
basedir, config_file = self.infer_basedir_and_config()
except FileNotFoundError:
pass
if config_file is not None:
try:
self.read_config(config_file, basedir=basedir)
except Exception as e:
logger.warning(f"Failed to read config. Error: {repr(e)}")
# create MyPyaerocom directory
chk_make_subdir(self.HOMEDIR, self._outhomename)
def _check_access(self, loc, timeout=None):
"""Uses multiprocessing approach to check if location can be accessed
Parameters
----------
loc : str
path that is supposed to be checked
Returns
-------
bool
True, if location is accessible, else False
"""
if loc is None:
return False
loc = str(Path(loc)) # make sure the path is set correctly
if loc in self._confirmed_access:
return True
elif loc in self._rejected_access:
return False
logger.info(f"Checking access to: {loc}")
if check_dir_access(loc):
self._confirmed_access.append(loc)
return True
self._rejected_access.append(loc)
return False
def _basedirs_search_db(self):
return [self.ROOTDIR, self.HOMEDIR]
def _infer_config_from_basedir(self, basedir):
basedir = os.path.normpath(basedir)
for env_id, chk_sub in self._check_subdirs_cfg.items():
chkdir = os.path.join(basedir, chk_sub)
if self._check_access(chkdir):
return (self._config_files[env_id], env_id)
raise FileNotFoundError(
f"Could not infer environment configuration for input directory: {basedir}"
)
[docs]
def infer_basedir_and_config(self):
"""Boolean specifying whether the lustre database can be accessed"""
for sub_envdir, cfg_id in self._DB_SEARCH_SUBDIRS.items():
for sdir in self._basedirs_search_db():
basedir = os.path.join(sdir, sub_envdir)
if self._check_access(basedir):
_chk_dir = os.path.join(basedir, self._check_subdirs_cfg[cfg_id])
if self._check_access(_chk_dir):
return (basedir, self._config_files[cfg_id])
raise FileNotFoundError("Could not establish access to any registered database")
@property
def has_access_users_database(self):
chk_dir = self._check_subdirs_cfg["users-db"]
chk_paths = [
os.path.join("/metno/aerocom_users_database/", chk_dir),
os.path.join(self.HOMEDIR, "/aerocom_users_database/", chk_dir),
]
for p in chk_paths:
if self._check_access(p):
return True
return False
@property
def has_access_lustre(self):
"""Boolean specifying whether MetNO AeroCom server is accessible"""
for path in self._search_dirs:
if self._LUSTRE_CHECK_PATH in path and self._check_access(path):
return True
return False
@property
def ALL_DATABASE_IDS(self):
"""ID's of available database configurations"""
return list(self._config_files)
@property
def ROOTDIR(self):
"""Local root directory"""
return os.path.abspath(os.sep)
@property
def HOMEDIR(self):
"""Home directory of user"""
return os.path.expanduser("~") + "/"
@property
def OUTPUTDIR(self):
"""Default output directory"""
if not check_write_access(self._outputdir):
self._outputdir = chk_make_subdir(self.HOMEDIR, self._outhomename)
return self._outputdir
@property
def DATA_SEARCH_DIRS(self):
"""
Directories which pyaerocom will consider for data access
Note
----
This corresponds to directories considered for searching gridded
data (e.g. models and level 3 satellite products). Please
see :attr:`OBSLOCS_UNGRIDDED` for available data directories
for reading of ungridded data.
Returns
-------
list
list of directories
"""
return self._search_dirs
@property
def FILTERMASKKDIR(self):
if not check_write_access(self._filtermaskdir):
outdir = self.OUTPUTDIR
self._filtermaskdir = chk_make_subdir(outdir, "filtermasks")
return self._filtermaskdir
@property
def COLOCATEDDATADIR(self):
"""Directory for accessing and saving colocated data objects"""
if not check_write_access(self._colocateddatadir):
outdir = self.OUTPUTDIR
self._colocateddatadir = chk_make_subdir(outdir, "colocated_data")
return self._colocateddatadir
@property
def LOCAL_TMP_DIR(self):
"""Local TEMP directory"""
if self._local_tmp_dir is None:
self._local_tmp_dir = f"{self.OUTPUTDIR}/tmp"
if not self._check_access(self._local_tmp_dir):
os.makedirs(self._local_tmp_dir, exist_ok=True)
return self._local_tmp_dir
@LOCAL_TMP_DIR.setter
def LOCAL_TMP_DIR(self, val):
self._local_tmp_dir = val
@property
def DOWNLOAD_DATADIR(self):
"""Directory where data is downloaded into"""
if self._downloaddatadir is None:
self._downloaddatadir = chk_make_subdir(self.OUTPUTDIR, "data")
return self._downloaddatadir
@DOWNLOAD_DATADIR.setter
def DOWNLOAD_DATADIR(self, val):
if not isinstance(val, str):
raise ValueError("Please provide str")
elif not os.path.exists(val):
try:
os.mkdir(val)
except Exception:
raise OSError(f"Input directory {val} does not exist and can also not be created")
self._downloaddatadir = val
@property
def user(self):
"""User ID"""
return getpass.getuser()
@property
def cache_basedir(self):
"""Base directory for caching
The actual files are cached in user subdirectory, cf :attr:`CACHEDIR`
"""
cd = self._cache_basedir
if not check_write_access(cd):
outdir = self.OUTPUTDIR
cd = chk_make_subdir(outdir, "_cache")
self._cache_basedir = cd
return cd
@cache_basedir.setter
def cache_basedir(self, val):
if not check_write_access(val):
raise ValueError(val)
self._cache_basedir = os.path.abspath(val)
@property
def CACHEDIR(self):
"""Cache directory for UngriddedData objects"""
try:
return chk_make_subdir(self.cache_basedir, self.user)
except Exception as e:
logger.warning(f"Failed to access CACHEDIR: {repr(e)}\nDeactivating caching")
self._caching_active = False
@CACHEDIR.setter
def CACHEDIR(self, val):
"""Cache directory"""
if not check_write_access(val):
raise ValueError(
f"Cannot set cache directory. "
f"Input directory {val} does not exist or write permission is not granted"
)
spl = os.path.split(val)
if spl[-1] == self.user:
val = spl[0]
self._cache_basedir = val
@property
def CACHING(self):
"""Activate writing of and reading from cache files"""
return self._caching_active
@CACHING.setter
def CACHING(self, val):
self._caching_active = bool(val)
@property
def VAR_PARAM(self):
"""Deprecated name, please use :attr:`VARS` instead"""
logger.warning("Deprecated (but still functional) name VAR_PARAM. Please use VARS")
return self.VARS
@property
def VARS(self):
"""Instance of class VarCollection (for default variable information)"""
if self._var_param is None: # has not been accessed before
self._var_param = VarCollection(self._var_info_file)
return self._var_param
@property
def COORDINFO(self):
"""Instance of :class:`VarCollection` containing coordinate info"""
if self._coords is None:
self._coords = VarCollection(self._coords_info_file)
return self._coords
@property
def LOGFILESDIR(self):
"""Directory where logfiles are stored"""
if self._logdir is None:
self._logdir = chk_make_subdir(self.OUTPUTDIR, "_log")
return self._logdir
@property
def ETOPO1_AVAILABLE(self):
"""
Boolean specifying if access to ETOPO1 dataset is provided
Returns
-------
bool
"""
if "etopo1" in self.SUPPLDIRS and os.path.exists(self.SUPPLDIRS["etopo1"]):
return True
return False
@property
def EBAS_FLAGS_FILE(self):
"""Location of CSV file specifying meaning of EBAS flags"""
with resources.path("pyaerocom.data", "ebas_flags.csv") as path:
return str(path)
@property
def OBS_IDS_UNGRIDDED(self):
"""List of all data IDs of supported ungridded observations"""
ids = list(self.OBSLOCS_UNGRIDDED)
ids.extend(self.OBS_UNGRIDDED_POST)
return ids
@property
def ERA5_SURFTEMP_FILE(self):
if "era5" in self.SUPPLDIRS:
sdir = self.SUPPLDIRS["era5"]
if os.path.exists(sdir) and self.ERA5_SURFTEMP_FILENAME in os.listdir(sdir):
return os.path.join(sdir, self.ERA5_SURFTEMP_FILENAME)
raise FileNotFoundError(
"ERA Interim surface temperature data cannot be accessed (check lustre connection)"
)
[docs]
def make_default_vert_grid(self):
"""Makes default vertical grid for resampling of profile data"""
step = self.DEFAULT_VERT_GRID_DEF["step"]
offs = int(step / 2)
return np.arange(
self.DEFAULT_VERT_GRID_DEF["lower"] + offs,
self.DEFAULT_VERT_GRID_DEF["upper"] - offs,
step,
)
[docs]
def add_data_search_dir(self, *dirs):
"""Add data search directories for database browsing"""
for loc in dirs:
if not self._check_access(loc):
raise FileNotFoundError(f"Input location {loc} could not be accessed")
self._search_dirs.append(loc)
[docs]
def add_ungridded_obs(self, obs_id, data_dir, reader=None, check_read=False):
"""Add a network to the data search structure
Parameters
----------
obs_id : str
name of network. E.g. MY_OBS or EBASMC
data_dir : str
directory where data files are stored
reader : pyaerocom.io.ReadUngriddedBase, optional
reading class used to import these data. If `obs_id` is known
(e.g. EBASMC) this is not needed.
Raises
------
AttributeError
if the network name is already reserved in :attr:`OBSLOCS_UNGRIDDED`
ValueError
if the data directory does not exist
"""
if obs_id in self.OBS_IDS_UNGRIDDED:
raise DataIdError(
f"Network with ID {obs_id} is already registered at "
f"{self.OBSLOCS_UNGRIDDED[obs_id]}"
)
elif not self._check_access(data_dir):
raise ValueError("Input data directory cannot be accessed")
if reader is None:
from pyaerocom.io.utils import get_ungridded_reader
reader = get_ungridded_reader(obs_id)
if not obs_id in reader.SUPPORTED_DATASETS:
reader.SUPPORTED_DATASETS.append(obs_id)
self.OBSLOCS_UNGRIDDED[obs_id] = data_dir
if check_read:
self._check_obsreader(obs_id, data_dir, reader)
[docs]
def add_ungridded_post_dataset(
self,
obs_id,
obs_vars,
obs_aux_requires,
obs_merge_how,
obs_aux_funs=None,
obs_aux_units=None,
**kwargs,
):
"""
Register new ungridded dataset
Other than :func:`add_ungridded_obs`, this method adds required logic
for a "virtual" ungridded observation datasets, that is, a dataset that
can only be computed from other ungridded datasets but not read from
disk.
If all input parameters are okay, the new dataset will be registered
in :attr:`OBS_UNGRIDDED_POST` and will then be accessible for import
in ungridded reading factory class :class:`pyaerocom.io.ReadUngridded`.
Parameters
----------
obs_id : str
Name of new dataset.
obs_vars : str or list
variables supported by this dataset.
obs_aux_requires : dict
dicionary specifying required datasets and variables for each
variable supported by the auxiliary dataset.
obs_merge_how : str or dict
info on how to derive each of the supported coordinates (e.g. eval,
combine). For valid input args see
:mod:`pyaerocom.combine_vardata_ungridded`. If value is string,
then the same method is used for all variables.
obs_aux_funs : dict, optional
dictionary specifying computation methods for auxiliary variables
that are supposed to be retrieved via `obs_merge_how='eval'`.
Keys are variable names, values are respective computation methods
(which need to be strings as they will be evaluated via
:func:`pandas.DataFrame.eval` in
:mod:`pyaerocom.combine_vardata_ungridded`). This input is
optional, but mandatory if any of the `obs_vars` is
supposed to be retrieved via `merge_how='eval'`.
obs_aux_units : dict, optional
output units of auxiliary variables (only needed for varibales
that are derived via `merge_how='eval'`)
**kwargs
additional keyword arguments (unused, but serves the purpose to
allow for parsing info from dictionaries and classes that
contain additional attributes than the ones needed here).
Raises
------
ValueError
if input obs_id is already reserved
Returns
-------
None.
"""
if obs_id in self.OBS_IDS_UNGRIDDED:
raise ValueError(f"Network with ID {obs_id} is already registered...")
elif obs_aux_units is None:
obs_aux_units = {}
# this class will do the required sanity checking and will only
# initialise if everything is okay
addinfo = obs_io.AuxInfoUngridded(
data_id=obs_id,
vars_supported=obs_vars,
aux_requires=obs_aux_requires,
aux_merge_how=obs_merge_how,
aux_funs=obs_aux_funs,
aux_units=obs_aux_units,
)
self.OBS_UNGRIDDED_POST[obs_id] = addinfo.to_dict()
def _check_obsreader(self, obs_id, data_dir, reader):
"""
Check if files can be accessed when registering new dataset
Parameters
----------
obs_id : str
name of obsnetwork
data_dir : str
directory containing data files
reader : ReadUngriddedBase
reading interface
"""
check = reader(obs_id)
path = check.data_dir
assert path == data_dir
try:
check.get_file_list()
except DataSourceError:
if not "renamed" in os.listdir(data_dir):
raise
logger.warning(
f"Failed to register {obs_id} at {data_dir} using ungridded "
f"reader {reader} but input dir has a renamed subdirectory, "
f"trying to find valid data files in there instead"
)
chk_dir = os.path.join(data_dir, "renamed")
self.OBSLOCS_UNGRIDDED.pop(obs_id)
self.add_ungridded_obs(obs_id, chk_dir, reader, check_read=True)
@property
def ebas_flag_info(self):
"""Information about EBAS flags
Note
----
Is loaded upon request -> cf.
:attr:`pyaerocom.io.ebas_nasa_ames.EbasFlagCol.FLAG_INFO`
Dictionary containing 3 dictionaries (keys: ```valid, values, info```)
that contain information about validity of each flag (```valid```),
their actual values (```values```, e.g. V, M, I)
"""
if self._ebas_flag_info is None:
from pyaerocom.io.helpers import read_ebas_flags_file
self._ebas_flag_info = read_ebas_flags_file(self.EBAS_FLAGS_FILE)
return self._ebas_flag_info
[docs]
def reload(self, keep_basedirs=True):
"""Reload config file (for details see :func:`read_config`)"""
self.read_config(self.last_config_file, keep_basedirs)
[docs]
def read_config(
self, config_file, basedir=None, init_obslocs_ungridded=False, init_data_search_dirs=False
):
"""
Import paths from one of the config ini files
Parameters
----------
config_file : str
file location of config ini file
basedir : str, optional
Base directory to be used for relative model and obs dirs specified
via BASEDIR in config file. If None, then the BASEDIR value in the
config file is used. The default is None.
init_obslocs_ungridded : bool, optional
If True, :attr:`OBSLOCS_UNGRIDDED` will be re-instantiated (i.e.
all currently set obs locations will be deleted).
The default is False.
init_data_search_dirs : bool, optional
If True, :attr:`DATA_SEARCH_DIRS` will be re-instantiated (i.e.
all currently set data search directories will be deleted).
The default is False.
Raises
------
FileNotFoundError
If input config file is not a file or does not exist.
Returns
-------
None.
"""
if not os.path.isfile(config_file):
raise FileNotFoundError(
f"Configuration file paths.ini at {config_file} does not exist "
f"or is not a file"
)
if init_obslocs_ungridded:
self.OBSLOCS_UNGRIDDED = {}
if init_data_search_dirs:
self._search_dirs = []
cr = ConfigParser()
cr.read(config_file)
# init base directories for Model data
if cr.has_section("modelfolders"):
self._add_searchdirs(cr, basedir)
if cr.has_section("obsfolders"):
self._add_obsconfig(cr, basedir)
if cr.has_section("outputfolders"):
self._init_output_folders_from_cfg(cr)
if cr.has_section("supplfolders"):
if basedir is None and "BASEDIR" in cr["supplfolders"]:
basedir = cr["supplfolders"]["BASEDIR"]
for name, path in cr["supplfolders"].items():
if "${BASEDIR}" in path:
path = path.replace("${BASEDIR}", basedir)
self.SUPPLDIRS[name] = path
cr.clear()
self.GRID_IO.load_aerocom_default()
self.last_config_file = config_file
def _resolve_basedir(self, locs, chk_dirs):
repl = "${BASEDIR}"
for loc in locs:
if repl in loc:
for chk_dir in chk_dirs:
chk = Path(loc.replace(repl, chk_dir))
if self._check_access(chk):
return chk_dir
raise FileNotFoundError("Could not confirm any directory...")
def _add_searchdirs(self, cr, basedir=None):
chk_dirs = []
if basedir is not None and self._check_access(basedir):
chk_dirs.append(basedir)
mcfg = cr["modelfolders"]
# check and update model base directory if applicable
if "BASEDIR" in mcfg:
_dir = mcfg["BASEDIR"]
if "${HOME}" in _dir:
_dir = _dir.replace("${HOME}", os.path.expanduser("~"))
if not _dir in chk_dirs and self._check_access(_dir):
chk_dirs.append(_dir)
if len(chk_dirs) == 0:
return False
# get all locations defined in config file as list
locs = mcfg["dir"].replace("\n", "").split(",")
# find first location that contains BASEDIR to determine
try:
basedir = str(self._resolve_basedir(locs, chk_dirs))
except FileNotFoundError:
basedir = None
repl_str = "${BASEDIR}"
for loc in locs:
if repl_str in loc:
if basedir is None:
continue
loc = loc.replace(repl_str, basedir)
if not loc in self._search_dirs:
self._search_dirs.append(loc)
return True
def _add_obsconfig(self, cr, basedir=None):
chk_dirs = []
if basedir is not None and self._check_access(basedir):
chk_dirs.append(basedir)
cfg = cr["obsfolders"]
# check and update model base directory if applicable
if "BASEDIR" in cfg:
_dir = cfg["BASEDIR"]
if "${HOME}" in _dir:
_dir = _dir.replace("${HOME}", os.path.expanduser("~"))
if not _dir in chk_dirs and self._check_access(_dir):
chk_dirs.append(_dir)
if len(chk_dirs) == 0:
return False
names_cfg = self._add_obsnames_config(cr)
candidates = {}
dirconfirmed = None
repl = "${BASEDIR}"
if cr.has_section("obsfolders"):
for obsname, path in cr["obsfolders"].items():
if obsname.lower() == "basedir":
continue
name_str = f"{obsname.upper()}_NAME"
if name_str in names_cfg:
ID = self.__dict__[name_str]
else:
ID = self._add_obsname(obsname)
candidates[ID] = path
# candidate for checking access
if dirconfirmed is None and repl in path:
for chk_dir in chk_dirs:
chk = Path(path.replace(repl, chk_dir))
if self._check_access(chk):
dirconfirmed = str(chk_dir)
homedir = os.path.expanduser("~")
for name, loc in candidates.items():
if repl in loc:
if dirconfirmed is None:
continue
loc = loc.replace(repl, dirconfirmed)
if "${HOME}" in loc:
loc = loc.replace("${HOME}", homedir)
self.OBSLOCS_UNGRIDDED[name] = loc
def _init_output_folders_from_cfg(self, cr):
cfg = cr["outputfolders"]
if "CACHEDIR" in cfg and not self._check_access(self._cache_basedir):
self._cache_basedir = cfg["CACHEDIR"]
if "OUTPUTDIR" in cfg and not self._check_access(self._outputdir):
self._outputdir = cfg["OUTPUTDIR"]
if "COLOCATEDDATADIR" in cfg and not self._check_access(self._colocateddatadir):
self._colocateddatadir = cfg["COLOCATEDDATADIR"]
if "LOCALTMPDIR" in cfg:
_dir = cfg["LOCALTMPDIR"]
# expand ${HOME}
if "${HOME}" in _dir:
_dir = _dir.replace("${HOME}", os.path.expanduser("~"))
if "${USER}" in _dir:
_dir = _dir.replace("${USER}", getpass.getuser())
self._local_tmp_dir = _dir
def _add_obsname(self, name):
name_str = f"{name.upper()}_NAME"
self[name_str] = name
return name_str
def _add_obsnames_config(self, cr):
names_cfg = []
if cr.has_section("obsnames"):
for obsname, ID in cr["obsnames"].items():
name_str = f"{obsname.upper()}_NAME"
self[name_str] = ID
names_cfg.append(name_str)
return names_cfg
[docs]
def short_str(self):
"""Deprecated method"""
return self.__str__()
def __setitem__(self, key, val):
self.__dict__[key] = val
def __str__(self):
head = f"Pyaerocom {type(self).__name__}"
s = f"\n{head}\n{len(head) * '-'}\n"
for k, v in self.__dict__.items():
if k.startswith("_"):
pass
if k == "VARS":
s += f"\n{k}\n{list_to_shortstr(v.all_vars)}"
elif isinstance(v, dict):
s += f"\n{k} (dict)"
elif isinstance(v, list):
s += f"\n{k} (list)"
s += list_to_shortstr(v)
else:
s += f"\n{k}: {v}"
return s