Source code for pyaerocom.io.iris_io

"""
Module containing helper functions related to iris I/O methods. These contain
reading of Cubes, and some methods to perform quality checks of the data, e.g.

1. checking and correction of time definition
2. number and length of dimension coordinates must match data array
3. Longitude definition from -180 to 180 (corrected if defined on 0 -> 360 intervall)
"""

import logging
from datetime import datetime

import cf_units
import iris
import iris.coords
import iris.util

try:
    # as of iris version 3
    from iris.util import equalise_attributes
except ImportError:
    # old iris version installed
    from iris.experimental.equalise_cubes import equalise_attributes

from pathlib import Path
from traceback import format_exc

import numpy as np

from pyaerocom import const
from pyaerocom.exceptions import (
    FileConventionError,
    NetcdfError,
    UnresolvableTimeDefinitionError,
    VariableDefinitionError,
)
from pyaerocom.helpers import cftime_to_datetime64, make_datetimeindex_from_year
from pyaerocom.io.fileconventions import FileConventionRead
from pyaerocom.io.helpers import add_file_to_log
from pyaerocom.tstype import TsType

logger = logging.getLogger(__name__)


[docs] def load_cubes_custom(files, var_name=None, file_convention=None, perform_fmt_checks=True): """Load multiple NetCDF files into CubeList Note ---- This function does not apply any concatenation or merging of the variable data in the individual files, it only loads the files into individual instances of :class:`iris.cube.Cube`, which can be accessed via the returned list. Parameters ---------- files : list list of netcdf file paths var_name : str name of variable to be imported from input files. file_convention : :obj:`FileConventionRead`, optional Aerocom file convention. If provided, then the data content (e.g. dimension definitions) is tested against definition in file name perform_fmt_checks : bool if True, additional quality checks (and corrections) are (attempted to be) performed. Returns ------- list loaded cube instances. list list containing all files from which the input variable could be successfully loaded. """ cubes = [] loaded_files = [] for i, _file in enumerate(files): try: cube = load_cube_custom( file=_file, var_name=var_name, file_convention=file_convention, perform_fmt_checks=perform_fmt_checks, ) cubes.append(cube) loaded_files.append(_file) except Exception: msg = f"Failed to load {_file}. Reason: {format_exc()}" logger.warning(msg) if const.WRITE_FILEIO_ERR_LOG: add_file_to_log(_file, msg) return (cubes, loaded_files)
[docs] def load_cube_custom(file, var_name=None, file_convention=None, perform_fmt_checks=None): """Load netcdf file as iris.Cube Parameters ---------- file : str netcdf file var_name : str name of variable to read quality_check : bool if True, then a quality check of data is performed against the information provided in the filename file_convention : :obj:`FileConventionRead`, optional Aerocom file convention. If provided, then the data content (e.g. dimension definitions) is tested against definition in file name perform_fmt_checks : bool if True, additional quality checks (and corrections) are (attempted to be) performed. Returns ------- iris.cube.Cube loaded data as Cube """ if isinstance(file, Path): file = str(file) # iris load does not like PosixPath if perform_fmt_checks is None: perform_fmt_checks = const.GRID_IO.PERFORM_FMT_CHECKS cube_list = iris.load(file) cube = None if var_name is None: if not len(cube_list) == 1: vars_avail = [c.var_name for c in cube_list] raise NetcdfError( f"Could not load single cube from {file}. Please " f"specify var_name. Input file contains the " f"following variables: {vars_avail}" ) cube = cube_list[0] var_name = cube.var_name else: for c in cube_list: if c.var_name == var_name: cube = c break if cube is None: raise NetcdfError(f"Variable {var_name} not available in file {file}") if perform_fmt_checks: cube = _cube_quality_check(cube, file, file_convention) return cube
def _cube_quality_check(cube, file, file_convention=None): """Perform quality check of loaded cube data This includes the following checks (not all of them may be applicable): - Make sure dimensionless variables have unit 1 (and not empty string) """ coords = get_coord_names_cube(cube) try: cube = _check_cube_unitless(cube) except VariableDefinitionError: pass grid_io = const.GRID_IO if grid_io.CHECK_TIME_FILENAME: try: cube = _check_correct_time_dim(cube, file, file_convention) except FileConventionError: logger.warning( "WARNING: failed to check / validate " "time dim. using information in " "filename. Reason: invalid file name " "convention" ) if grid_io.CHECK_DIM_COORDS: cube = check_dim_coords_cube(cube) if "time" in coords and grid_io.DEL_TIME_BOUNDS: cube.coord("time").bounds = None if "longitude" in coords and grid_io.SHIFT_LONS: cube = check_and_regrid_lons_cube(cube) return cube
[docs] def check_and_regrid_lons_cube(cube): """Checks and corrects for if longitudes of :attr:`grid` are 0 -> 360 Note ---- This method checks if the maximum of the current longitudes array exceeds 180. Thus, it is not recommended to use this function after subsetting a cube, rather, it should be checked directly when the file is loaded (cf. :func:`load_input`) Parameters ---------- cube : iris.cube.Cube gridded data loaded as iris.Cube Returns ------- bool True, if longitudes were on 0 -> 360 and have been rolled, else False """ if cube.coord("longitude").points.max() > 180: logger.info( "Rearranging longitude dimension from 0 -> 360 definition to -180 -> 180 definition" ) cube = cube.intersection(longitude=(-180, 180)) return cube
[docs] def check_dim_coord_names_cube(cube): from pyaerocom import const coords = dict( lon=const.COORDINFO["lon"], lat=const.COORDINFO["lat"], time=const.COORDINFO["time"] ) for coord in cube.dim_coords: cv, cs, cn = coord.var_name, coord.standard_name, coord.long_name c = None if cv in coords: c = coords[cv] elif cs in coords: c = coords[cs] elif cn in coords: c = coords[cn] if c is not None: var_name = c.var_name std_name = c.standard_name lng_name = c.long_name if not coord.var_name == var_name: logger.warning( f"Invalid var_name {coord.standard_name} for " f"coord {coord.var_name} in cube. Overwriting with {std_name}" ) coord.var_name = var_name if not coord.standard_name == std_name: logger.warning( f"Invalid standard_name {coord.standard_name} for " f"coord {coord.var_name} in cube. Overwriting with {std_name}" ) coord.standard_name = std_name if not coord.long_name == lng_name: logger.warning( f"Invalid long_name {coord.long_name} for " f"coord {coord.var_name} in cube. Overwriting with {lng_name}" ) coord.long_name = lng_name return cube
[docs] def check_dim_coords_cube(cube): """Checks, and if necessary and applicable, updates coords names in Cube Parameters ---------- cube : iris.cube.Cube input cube Returns ------- iris.cube.Cube updated or unchanged cube """ cube = check_dim_coord_names_cube(cube) return cube
def _check_cube_unitless(cube): """Make sure unit in Cube is 1 if variable is dimensionless""" var = cube.var_name if not var in const.VARS: raise VariableDefinitionError(f"No such pyaerocom default variable: {cube.var_name}") unit = cf_units.Unit(cube.units) if str(const.VARS[var].units) == "1" and unit.is_unknown(): cube.units = cf_units.Unit("1") return cube def _get_info_from_filename(file, file_convention=None): """Load meta-information from filename Parameters ---------- """ if file_convention is None: file_convention = FileConventionRead(from_file=file) return file_convention.get_info_from_file(file) def _check_correct_time_dim(cube, file, file_convention=None): """Check if time dimension in input Cube is correct Note ----- Needs information about time dimension encoded in filename, since the check is done against what is specified in the filename. E.g. AeroCom format Parameters ---------- cube : iris.cube.Cube loaded Cube instance, for which time dimension is supposed to be checked file : str path to file from which the Cube was imported file_convention : FileConventionRead file naming convention specifying how time dimension information is encoded in the filenames. """ finfo = _get_info_from_filename(file, file_convention) ts_type = TsType(finfo["ts_type"]) year = finfo["year"] if not const.MIN_YEAR <= year <= const.MAX_YEAR: raise FileConventionError(f"Invalid year in file: {year}") elif year == 9999: logger.info( "Cannot compare NetCDF time dimension for climatological data " "(9999 in filename). Skipping this check." ) else: try: check_time_coord(cube, ts_type, year) except UnresolvableTimeDefinitionError as e: raise UnresolvableTimeDefinitionError(repr(e)) except Exception as e: msg = f"Invalid time dimension coordinate in file:\n{file}.\nError: repr({e})\n" logger.warning(msg) if const.GRID_IO.CORRECT_TIME_FILENAME: add_msg = "Attempting to correct time coordinate using information in file name" msg += add_msg logger.info(add_msg) try: cube = correct_time_coord(cube, ts_type=finfo["ts_type"], year=finfo["year"]) except Exception: add_msg = ( f"Unable to correct time dimension using the " f"information provided in the file name. Error:\n" f"{format_exc()}.\n\nThe file will be imported regardless!" ) msg += add_msg logger.warning(msg) if const.WRITE_FILEIO_ERR_LOG: add_file_to_log(file, msg) return cube def _check_leap_year(num, num_per, ts_type): if ts_type == "daily" and num + 1 == num_per: return True elif ts_type == "3hourly" and num + 8 == num_per: return True elif ts_type == "hourly" and num + 24 == num_per: return True return False
[docs] def check_time_coord(cube, ts_type, year): """Method that checks the time coordinate of an iris Cube This method checks if the time dimension of a cube is accessible and according to the standard (i.e. fully usable). It only checks, and does not correct. For the latter, please see :func:`correct_time_coord`. Parameters ---------- cube : Cube cube containing data ts_type : str pyaerocom ts_type year : year of data Returns ------- bool True, if time dimension is ok, False if not """ if isinstance(ts_type, str): ts_type = TsType(ts_type) if not "time" in get_coord_names_cube(cube): raise AttributeError("Cube does not contain time dimension") tdim = cube.coord("time") if not isinstance(tdim, iris.coords.DimCoord): raise AttributeError("Time is not a DimCoord instance") try: cftime_to_datetime64(0, cfunit=tdim.units) except Exception: raise ValueError("Could not convert time unit string") freq = ts_type.to_pandas_freq() tidx = make_datetimeindex_from_year(freq, year) num_per = len(tidx) num = len(tdim.points) if not num == num_per: if tidx[0].is_leap_year: if not _check_leap_year(num, num_per, ts_type): raise UnresolvableTimeDefinitionError( f"Expected {len(tidx)} timestamps but data has {num}" ) else: raise UnresolvableTimeDefinitionError( f"Expected {len(tidx)} timestamps but data has {num}" ) # ToDo: check why MS is not working for period conversion if freq == "MS": freq = "M" # convert first and last timestamps of index array into periods # (e.g. January and December for monthly data) per0 = tidx[0].to_period(freq) per1 = tidx[-1].to_period(freq) # first and last timestamp in data t0, t1 = cftime_to_datetime64([tdim.points[0], tdim.points[-1]], cfunit=tdim.units) if not per0.start_time <= t0 <= per0.end_time: raise ValueError(f"First timestamp of data {t0} does not lie in first period: {per0}") elif not per1.start_time <= t1 <= per1.end_time: raise ValueError(f"Last timestamp of data {t1} does not lie in end period: {per1}")
[docs] def get_dim_names_cube(cube): return [c.name() for c in cube.dim_coords]
[docs] def get_coord_names_cube(cube): return [c.name() for c in cube.coords()]
def _get_time_index_cube(cube): """ Get array index of time dimension for input Cube Parameters ---------- cube : iris.cube.Cube data cube. Raises ------ IndexError if index cannot be retrieved (e.g. data does not contain time dimension). Returns ------- int """ dim_names = get_dim_names_cube(cube) if "time" in dim_names: return dim_names.index("time") idx_miss = [] if cube.ndim != len(dim_names): # one dimension is missing for idx in range(len(cube.shape)): coords = cube.coords(contains_dimension=idx, dim_coords=True) if len(coords) == 0: idx_miss.append(idx) if len(idx_miss) == 1: return idx_miss[0] raise IndexError(f"Failed to identify data index of time dimension in cube {repr(cube)}")
[docs] def correct_time_coord(cube, ts_type, year): """Method that corrects the time coordinate of an iris Cube Parameters ---------- cube : Cube cube containing data ts_type : TsType or str temporal resolution of data (e.g. "hourly", "daily"). This information is e.g. encoded in the filename of a NetCDF file and may be accessed using :class:`pyaerocom.io.FileConventionRead` year : int integer specifying start year, e.g. 2017 Returns ------- Cube the same instance of the input cube with corrected time dimension axis """ tindex_cube = _get_time_index_cube(cube) coords = get_coord_names_cube(cube) if isinstance(ts_type, str): ts_type = TsType(ts_type) tres_str = ts_type.cf_base_unit conv = ts_type.datetime64_str tunit_str = f"{tres_str} since {year}-01-01 00:00:00" num = cube.shape[tindex_cube] tunit = cf_units.Unit(tunit_str, calendar=cf_units.CALENDAR_STANDARD) tres_np = ts_type.timedelta64_str # TSTR_TO_NP_TD[ts_type] base = np.datetime64(f"{year}-01-01 00:00:00").astype(conv) times = base + np.arange(0, num, 1).astype(tres_np) # see this thread https://github.com/matplotlib/matplotlib/issues/2259/ times_dt = times.astype("datetime64[s]").astype(datetime) time_nums = [tunit.date2num(t) for t in times_dt] pd_freq = ts_type.to_pandas_freq() num_expected = len(make_datetimeindex_from_year(pd_freq, year)) num_inferred = len(time_nums) if not num_inferred == num_expected: raise UnresolvableTimeDefinitionError( f"expected {num_expected} timestamps for {year} and " f"freq {ts_type} but got {num_inferred}" ) tcoord = iris.coords.DimCoord(time_nums, standard_name="time", units=tunit) if "time" in coords: cube.remove_coord("time") cube.add_dim_coord(tcoord, tindex_cube) cube.attributes["timedim-corrected"] = True return cube
def _check_correct_dtypes_timedim_cube_list(cubes): try: dtypes = np.unique([cube.coord("time").points.dtype for cube in cubes]) except iris.exceptions.CoordinateNotFoundError: return False corrected = False if len(dtypes) > 1: corrected = True for cube in cubes: new = cube.coord("time").points.astype(float) cube.coord("time").points = new return corrected
[docs] def concatenate_iris_cubes(cubes, error_on_mismatch=True): """Concatenate list of :class:`iris.Cube` instances cubes into single Cube Helper method for concatenating list of cubes This method is not supposed to be called directly but rather :func:`concatenate_cubes` (which ALWAYS returns instance of :class:`Cube` or raises Exception) or :func:`concatenate_possible_cubes` (which ALWAYS returns instance of :class:`CubeList` or raises Exception) Parameters ---------- cubes : CubeList or list(Cubes) list of individual cubes error_on_mismatch boolean specifying whether an Exception is supposed to be raised or not Returns ------- :obj:`Cube` result of concatenation Raises ------ iris.exceptions.ConcatenateError if ``error_on_mismatch=True`` and input cubes could not all concatenated into a single instance of :class:`iris.Cube` class. """ cubes = iris.cube.CubeList(cubes) var_name = cubes[0].var_name if const.GRID_IO.EQUALISE_METADATA: meta_init = cubes[0].metadata if not all([x.metadata == meta_init for x in cubes]): logger.warning( f"{var_name} cubes to be concatenated have different meta data settings. " f"These will be unified using the metadata dictionary of the first cube " f"(otherwise the method concatenate of the iris package won't work)" ) for cube in cubes: cube.metadata = meta_init # now put the CubeList together and form one cube # 1st equalise the cubes (remove non common attributes) equalise_attributes(cubes) # unify time units iris.util.unify_time_units(cubes) # now concatenate the cube list to one cube try: cubes_concat = iris.cube.CubeList.concatenate_cube(cubes, error_on_mismatch) except Exception as e: if _check_correct_dtypes_timedim_cube_list(cubes): cubes_concat = iris.cube.CubeList.concatenate_cube(cubes, error_on_mismatch) else: raise return cubes_concat