Source code for pyaerocom.aeroval.coldatatojson_helpers

"""
Helpers for conversion of ColocatedData to JSON files for web interface.
"""

import logging
import os
from copy import deepcopy
from datetime import datetime, timezone
from typing import Literal

import numpy as np
import pandas as pd
import xarray as xr

from pyaerocom._warnings import ignore_warnings
from pyaerocom.aeroval.exceptions import ConfigError, TrendsError
from pyaerocom.aeroval.fairmode_stats import fairmode_stats
from pyaerocom.aeroval.helpers import _get_min_max_year_periods, _period_str_to_timeslice
from pyaerocom.aeroval.json_utils import read_json, round_floats, write_json
from pyaerocom.colocateddata import ColocatedData
from pyaerocom.config import ALL_REGION_NAME
from pyaerocom.exceptions import DataCoverageError, TemporalResolutionError
from pyaerocom.helpers import start_stop
from pyaerocom.region import Region, find_closest_region_coord, get_all_default_region_ids
from pyaerocom.region_defs import HTAP_REGIONS_DEFAULT, OLD_AEROCOM_REGIONS
from pyaerocom.stats.stats import _init_stats_dummy, calculate_statistics
from pyaerocom.trends_engine import TrendsEngine
from pyaerocom.trends_helpers import _get_season_from_months
from pyaerocom.tstype import TsType

logger = logging.getLogger(__name__)


def get_heatmap_filename(ts_type):
    return f"glob_stats_{ts_type}.json"


def get_timeseries_file_name(region, obs_name, var_name_web, vert_code):
    return f"{region}-{obs_name}-{var_name_web}-{vert_code}.json"


[docs] def get_stationfile_name(station_name, obs_name, var_name_web, vert_code): """Get name of station timeseries file""" return f"{station_name}_{obs_name}-{var_name_web}_{vert_code}.json"
[docs] def get_json_mapname(obs_name, var_name_web, model_name, model_var, vert_code, period): """Get base name of json file""" # for cams2_83 the periods contain slashes at this point periodmod = period.replace("/", "") return f"{obs_name}-{var_name_web}_{vert_code}_{model_name}-{model_var}_{periodmod}.json"
def _write_stationdata_json(ts_data, out_dir): """ This method writes time series data given in a dictionary to .json files Parameters ---------- ts_data : dict A dictionary containing all processed time series data. out_dir : str or similar output directory Returns ------- None. """ filename = get_stationfile_name( ts_data["station_name"], ts_data["obs_name"], ts_data["var_name_web"], ts_data["vert_code"] ) fp = os.path.join(out_dir, filename) if os.path.exists(fp): current = read_json(fp) else: current = {} current[ts_data["model_name"]] = round_floats(ts_data) write_json(current, fp, round_floats=False) def _write_site_data(ts_objs, dirloc): """Write list of station timeseries files to json""" for ts_data in ts_objs: # writes json file _write_stationdata_json(ts_data, dirloc) def _write_diurnal_week_stationdata_json(ts_data, out_dirs): """ Minor modification of method _write_stationdata_json to allow a further level of sub-directories Parameters ---------- ts_data : dict A dictionary containing all processed time series data. out_dirs : list list of file paths for writing data to Raises ------ Exception Raised if opening json file fails Returns ------- None. """ filename = get_stationfile_name( ts_data["station_name"], ts_data["obs_name"], ts_data["var_name_web"], ts_data["vert_code"] ) fp = os.path.join(out_dirs["ts/diurnal"], filename) if os.path.exists(fp): current = read_json(fp) else: current = {} current[ts_data["model_name"]] = round_floats(ts_data) write_json(current, fp, round_floats=False) def _add_heatmap_entry_json( heatmap_file, result, obs_name, var_name_web, vert_code, model_name, model_var ): if os.path.exists(heatmap_file): current = read_json(heatmap_file) else: current = {} if not var_name_web in current: current[var_name_web] = {} ov = current[var_name_web] if not obs_name in ov: ov[obs_name] = {} on = ov[obs_name] if not vert_code in on: on[vert_code] = {} ovc = on[vert_code] if not model_name in ovc: ovc[model_name] = {} mn = ovc[model_name] mn[model_var] = round_floats(result) write_json(current, heatmap_file, round_floats=False) def _prepare_regions_json_helper(region_ids): regborders, regs = {}, {} for regid in region_ids: reg = Region(regid) name = reg.name regs[name] = reg regborders[name] = rinfo = {} latr = reg.lat_range lonr = reg.lon_range if any(x is None for x in (latr, lonr)): raise ValueError(f"Lat / lon range missing for region {regid}") rinfo["minLat"] = latr[0] rinfo["maxLat"] = latr[1] rinfo["minLon"] = lonr[0] rinfo["maxLon"] = lonr[1] return (regborders, regs) def _prepare_default_regions_json(): return _prepare_regions_json_helper(get_all_default_region_ids()) def _prepare_aerocom_regions_json(): return _prepare_regions_json_helper(OLD_AEROCOM_REGIONS) def _prepare_htap_regions_json(): return _prepare_regions_json_helper(HTAP_REGIONS_DEFAULT) def _prepare_country_regions(region_ids): regs = {} for regid in region_ids: reg = Region(regid) name = reg.name regs[name] = reg return regs def init_regions_web(coldata, regions_how): regborders, regs = {}, {} regborders_default, regs_default = _prepare_default_regions_json() if regions_how == "default": regborders, regs = regborders_default, regs_default elif regions_how == "aerocom": regborders, regs = _prepare_aerocom_regions_json() elif regions_how == "htap": regborders[ALL_REGION_NAME] = regborders_default[ALL_REGION_NAME] regs[ALL_REGION_NAME] = regs_default[ALL_REGION_NAME] add_borders, add_regs = _prepare_htap_regions_json() regborders.update(add_borders) regs.update(add_regs) elif regions_how == "country": regborders[ALL_REGION_NAME] = regborders_default[ALL_REGION_NAME] regs[ALL_REGION_NAME] = regs_default[ALL_REGION_NAME] coldata.check_set_countries(True) regborders.update(coldata.get_country_codes()) add_regs = _prepare_country_regions(coldata.get_country_codes().keys()) regs.update(add_regs) else: raise ValueError("Invalid input for regions_how", regions_how) regnames = {} for regname, reg in regs.items(): regnames[reg.region_id] = regname return (regborders, regs, regnames)
[docs] def update_regions_json(region_defs, regions_json): """Check current regions.json for experiment and update if needed Parameters ---------- region_defs : dict keys are names of region (not IDs!) values define rectangular borders regions_json : str regions.json file (if it does not exist it will be created). Returns ------- dict current content of updated regions.json """ if os.path.exists(regions_json): current = read_json(regions_json) else: current = {} for region_name, region_info in region_defs.items(): if not region_name in current: current[region_name] = round_floats(region_info) write_json(current, regions_json, round_floats=False) return current
def _init_meta_glob(coldata, **kwargs): meta = coldata.metadata # create metadata dictionary that is shared among all timeseries files meta_glob = {} NDSTR = "UNDEFINED" try: meta_glob["pyaerocom_version"] = meta["pyaerocom"] except KeyError: meta_glob["pyaerocom_version"] = NDSTR try: meta_glob["obs_var"] = meta["var_name"][0] meta_glob["mod_var"] = meta["var_name"][1] except KeyError: meta_glob["obs_var"] = NDSTR meta_glob["mod_var"] = NDSTR try: meta_glob["obs_unit"] = meta["var_units"][0] meta_glob["mod_unit"] = meta["var_units"][1] except KeyError: meta_glob["obs_unit"] = NDSTR meta_glob["mod_unit"] = NDSTR try: meta_glob["obs_freq_src"] = meta["ts_type_src"][0] meta_glob["mod_freq_src"] = meta["ts_type_src"][1] except KeyError: meta_glob["obs_freq_src"] = NDSTR meta_glob["mod_freq_src"] = NDSTR try: meta_glob["obs_revision"] = meta["revision_ref"] except KeyError: meta_glob["obs_revision"] = NDSTR meta_glob["processed_utc"] = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M") meta_glob.update(kwargs) return meta_glob def _init_ts_data(freqs): data = {} for freq in freqs: data[f"{freq}_date"] = [] data[f"{freq}_obs"] = [] data[f"{freq}_mod"] = [] return data def _create_diurnal_weekly_data_object(coldata, resolution): """ Private helper functions that creates the data set containing all the weekly time series at the specified resolution. Called by _process_sites_weekly_ts. The returned xarray.Dataset contains a dummy time variable (currently not used) and the weekly time series as xarray.DataArray objects. Parameters ---------- coldata : ColocatedData ColocatedData object colocated on hourly resolution. resolution : string String specifying the averaging window used to generate representative weekly time series with hourly resolution. Valid values are 'yearly' and 'seasonal'. Raises ------ ValueError If an invalid resolution is given raise ValueError and print what was given. Returns ------- rep_week_full_period : xarray.Dataset Contains the weekly time series as the variable 'rep_week' """ import xarray as xr data_allyears = coldata.data yearkeys = list(data_allyears.groupby("time.year").groups) if resolution == "seasonal": seasons = ["DJF", "MAM", "JJA", "SON"] elif resolution == "yearly": seasons = ["year"] else: raise ValueError(f"Invalid resolution. Got {resolution}.") first = True for yr in yearkeys: data = data_allyears.where(data_allyears["time.year"] == yr, drop=True) for seas in seasons: rep_week_ds = xr.Dataset() if resolution == "seasonal": mon_slice = data.where(data["time.season"] == seas, drop=True) elif resolution == "yearly": mon_slice = data month_stamp = f"{seas}" for day in range(7): day_slice = mon_slice.where(mon_slice["time.dayofweek"] == day, drop=True) rep_day = day_slice.groupby("time.hour").mean(dim="time") rep_day["hour"] = rep_day.hour / 24 + day + 1 if day == 0: rep_week = rep_day else: rep_week = xr.concat([rep_week, rep_day], dim="hour") rep_week = rep_week.rename({"hour": "dummy_time"}) month_stamps = np.zeros(rep_week.dummy_time.shape, dtype="<U5") month_stamps[:] = month_stamp rep_week_ds["rep_week"] = rep_week rep_week_ds["month_stamp"] = (("dummy_time"), month_stamps) if seas in ["DJF", "year"]: rep_week_full_period = rep_week_ds else: rep_week_full_period = xr.concat([rep_week_full_period, rep_week_ds], dim="period") if first: output_array = rep_week_full_period # output_array = output_array.expand_dims({'year':[0]},0) first = False else: output_array = xr.concat([output_array, rep_week_full_period], dim="year") try: output_array.year except AttributeError: output_array = output_array.expand_dims({"year": [0]}, 0) output_array = output_array.assign(dict(year=np.array(yearkeys))) # output_array.year[:] = np.array(yearkeys) return output_array def _get_period_keys(resolution: Literal["seasonal", "yearly"]): period_keys = dict( seasonal=["DJF", "MAM", "JJA", "SON"], yearly=["All"], ) if resolution not in period_keys: raise ValueError(f"Unknown {resolution=}") return period_keys[resolution] def _process_one_station_weekly(stat_name, i, repw_res, meta_glob, time): """ Processes one station data set for all supported averaging windows into a dict of station time series data and metadata. Parameters ---------- stat_name : string Name of station to process i : int Index of the station to process in the xarray.Datasets. repw_res : dict Dictionary of xarray.Datasets for each supported averaging window for the weekly time series meta_glob : TYPE Dictionary containing global metadata. time : list Time index. Returns ------- ts_data : dict Dictinary of time series data and metadata for one station. Contains all resolutions/averaging windows. has_data : bool Set to false if all data is missing for a station. """ has_data = False years = list(repw_res["seasonal"].year.values) yeardict = {} for year in years: yeardict[f"{year}"] = {} ts_data = { "time": time, "seasonal": {"obs": deepcopy(yeardict), "mod": deepcopy(yeardict)}, "yearly": {"obs": deepcopy(yeardict), "mod": deepcopy(yeardict)}, } ts_data["station_name"] = stat_name ts_data.update(meta_glob) for y, year in enumerate(years): for res, repw in repw_res.items(): repw = repw.transpose("year", "period", "data_source", "dummy_time", "station_name") obs_vals = repw[y, :, 0, :, i] if (np.isnan(obs_vals)).all().values: continue has_data = True mod_vals = repw[y, :, 1, :, i] period_keys = _get_period_keys(res) for period_num, pk in enumerate(period_keys): ts_data[res]["obs"][f"{year}"][pk] = obs_vals.sel( period=period_num ).values.tolist() ts_data[res]["mod"][f"{year}"][pk] = mod_vals.sel( period=period_num ).values.tolist() return ts_data, has_data def _process_weekly_object_to_station_time_series(repw_res, meta_glob): """ Process the xarray.Datasets objects returned by _create_diurnal_weekly_data_object into a dictionary containing station time series data and metadata. Parameters ---------- repw_res : dict Dictionary of xarray.Datasets for each supported averaging window for the weekly time series meta_glob : dict Dictionary containing global metadata. Returns ------- ts_objs : list List of dicts containing station time series data and metadata. """ ts_objs = [] dc = 0 time = (np.arange(168) / 24 + 1).round(4).tolist() for i, stat_name in enumerate(repw_res["seasonal"].station_name.values): ts_data, has_data = _process_one_station_weekly(stat_name, i, repw_res, meta_glob, time) if has_data: ts_objs.append(ts_data) dc += 1 return ts_objs def _process_weekly_object_to_country_time_series(repw_res, meta_glob, regions_how, region_ids): """ Process the xarray.Dataset objects returned by _create_diurnal_weekly_data_object into a dictionary containing country average time series data and metadata. ToDo ---- Implement regions_how for other values than country based Parameters ---------- repw_res : dict Dictionary of xarray.Datasets for each supported averaging window for the weekly time series meta_glob : dict Dictionary containing global metadata. regions_how : string String describing how regions are to be processed. Regional time series are only calculated if regions_how = country. region_ids : dict Dict containing mapping of region IDs and names. Returns ------- ts_objs_reg : list List of dicts containing station time series data and metadata. """ ts_objs_reg = [] time = (np.arange(168) / 24 + 1).round(4).tolist() years = list(repw_res["seasonal"].year.values) yeardict = {} for year in years: yeardict[f"{year}"] = {} if regions_how != "country": print("Regional diurnal cycles are only implemented for country regions, skipping...") ts_objs_reg = None else: for regid, regname in region_ids.items(): ts_data = { "time": time, "seasonal": {"obs": deepcopy(yeardict), "mod": deepcopy(yeardict)}, "yearly": {"obs": deepcopy(yeardict), "mod": deepcopy(yeardict)}, } ts_data["station_name"] = regname ts_data.update(meta_glob) for y, year in enumerate(years): for res, repw in repw_res.items(): repw = repw.transpose( "year", "period", "data_source", "dummy_time", "station_name" ) if regid == ALL_REGION_NAME: subset = repw else: subset = repw.where(repw.country == regid) avg = subset.mean(dim="station_name") obs_vals = avg[y, :, 0, :] mod_vals = avg[y, :, 1, :] period_keys = _get_period_keys(res) for period_num, pk in enumerate(period_keys): ts_data[res]["obs"][f"{year}"][pk] = obs_vals.sel( period=period_num ).values.tolist() ts_data[res]["mod"][f"{year}"][pk] = mod_vals.sel( period=period_num ).values.tolist() ts_objs_reg.append(deepcopy(ts_data)) return ts_objs_reg def _process_sites_weekly_ts(coldata, regions_how, region_ids, meta_glob): """ Private helper function to process ColocatedData objects into dictionaries containing represenative weekly time series with hourly resolution. Processing the coloceted data object into a collection of representative weekly time series is done in the private function _create_diurnal_weekly_data_object. This object (an xarray.Dataset) is then further processed into two dictionaries containing station and regional time series respectively. Parameters ---------- coldata : ColocatedData The colocated data to process. regions_how : string Srting describing how regions are to be processed. Regional time series are only calculated if regions_how = country. region_ids : dict Dict containing mapping of region IDs and names. meta_glob : dict Dictionary containing global metadata. Returns ------- ts_objs : list List of dicts containing station time series data and metadata. ts_objs_reg : list List of dicts containing country time series data and metadata. """ assert coldata.dims == ("data_source", "time", "station_name") repw_res = { "seasonal": _create_diurnal_weekly_data_object(coldata, "seasonal")["rep_week"], "yearly": _create_diurnal_weekly_data_object(coldata, "yearly")["rep_week"].expand_dims( "period", axis=0 ), } ts_objs = _process_weekly_object_to_station_time_series(repw_res, meta_glob) ts_objs_reg = _process_weekly_object_to_country_time_series( repw_res, meta_glob, regions_how, region_ids ) return (ts_objs, ts_objs_reg) def _init_site_coord_arrays(data): found = False jsdates = {} for freq, cd in data.items(): if cd is None: continue elif not found: sites = cd.data.station_name.values lats = cd.data.latitude.values.astype(np.float64) lons = cd.data.longitude.values.astype(np.float64) if "altitude" in cd.data.coords: alts = cd.data.altitude.values.astype(np.float64) else: alts = [np.nan] * len(lats) if "country" in cd.data.coords: countries = cd.data.country.values else: countries = ["UNAVAIL"] * len(lats) found = True else: assert all(cd.data.station_name.values == sites) jsdates[freq] = cd.data.jsdate.values.tolist() return (sites, lats, lons, alts, countries, jsdates) def _get_stat_regions(lats, lons, regions, **kwargs): regs = [] regions_how = kwargs.get("regions_how", None) for lat, lon in zip(lats, lons): reg = find_closest_region_coord(lat, lon, regions=regions, regions_how=regions_how) regs.append(reg) return regs def _process_sites(data, regions, regions_how, meta_glob): freqs = list(data) (sites, lats, lons, alts, countries, jsdates) = _init_site_coord_arrays(data) if regions_how == "country": regs = countries elif regions_how == "htap": regs = _get_stat_regions(lats, lons, regions, regions_how=regions_how) else: regs = _get_stat_regions(lats, lons, regions) ts_objs = [] site_indices = [] map_meta = [] for i, site in enumerate(sites): # init empty timeseries data object site_meta = { "station_name": str(site), "latitude": lats[i], "longitude": lons[i], "altitude": alts[i], } if regions_how == "country": site_meta["region"] = [regs[i]] else: site_meta["region"] = regs[i] ts_data = _init_ts_data(freqs) ts_data.update(meta_glob) ts_data.update(site_meta) has_data = False for freq, cd in data.items(): if cd is not None: assert cd.dims == ("data_source", "time", "station_name") sitedata = cd.data.data[:, :, i] if np.all(np.isnan(sitedata)): # skip this site, all is NaN continue ts_data[f"{freq}_date"] = jsdates[freq] ts_data[f"{freq}_obs"] = sitedata[0].tolist() ts_data[f"{freq}_mod"] = sitedata[1].tolist() has_data = True if has_data: # site is valid # register ts_data ts_objs.append(ts_data) # remember site indices in data for faster processing of statistics # below. site_indices.append(i) # init map data for each valid site map_meta.append({**site_meta}) return (ts_objs, map_meta, site_indices) def _get_statistics(obs_vals, mod_vals, min_num, drop_stats): stats = calculate_statistics(mod_vals, obs_vals, min_num_valid=min_num, drop_stats=drop_stats) return _prep_stats_json(stats) def _make_trends_from_timeseries(obs, mod, freq, season, start, stop, min_yrs): """ Function for generating trends from timeseries Includes fomatting in a way that can be serialized to json. A key, map_var, is added for use in the web interface. Parameters ---------- obs : pd.Series Time series of the obs mod : pd.Series Time series of the mod freq : str Frequency for the trends, either monthly or yearly season : str Seasons used for the trends start : int Start year stop : int Stop year min_yrs : int Minimal number of years for the calculation of the trends Raises ------ TrendsError If stop - start is smaller than min_yrs AeroValError If the trend engine returns None Returns ------ (dict, dict) Dicts consiting of the trends data for the obs and mod """ if stop - start < min_yrs: raise TrendsError(f"min_yrs ({min_yrs}) larger than time between start and stop") te = TrendsEngine # The model and observation data are made to pandas times series obs_trend_series = obs mod_trend_series = mod # Translate season to names used in trends_helpers.py. Should be handled there instead! season = _get_season_from_months(season) # Trends are calculated obs_trend = te.compute_trend(obs_trend_series, freq, start, stop, min_yrs, season) mod_trend = te.compute_trend(mod_trend_series, freq, start, stop, min_yrs, season) # Makes pd.Series serializable if obs_trend["data"] is None or mod_trend["data"] is None: raise TrendsError("Trends came back as None", obs_trend["data"], mod_trend["data"]) obs_trend["data"] = obs_trend["data"].to_json() mod_trend["data"] = mod_trend["data"].to_json() obs_trend["map_var"] = f"slp_{start}" mod_trend["map_var"] = f"slp_{start}" return obs_trend, mod_trend def _make_trends(obs_vals, mod_vals, time, freq, season, start, stop, min_yrs): """ Function for generating trends from lists of observations This will calculate pandas time series from the lists and use that to calculate trends Parameters ---------- obs : list Time series of the obs mod : list Time series of the mod freq : str Frequency for the trends, either monthly or yearly season : str Seasons used for the trends start : int Start year stop : int Stop year min_yrs : int Minimal number of years for the calculation of the trends Raises ------ TrendsError If stop - start is smaller than min_yrs AeroValError If the trend engine returns None Returns ------ (dict, dict) Dicts consiting of the trends data for the obs and mod """ # The model and observation data are made to pandas times series obs_trend_series = pd.Series(obs_vals, time) mod_trend_series = pd.Series(mod_vals, time) (obs_trend, mod_trend) = _make_trends_from_timeseries( obs_trend_series, mod_trend_series, freq, season, start, stop, min_yrs ) return obs_trend, mod_trend def _process_map_and_scat( data, map_data, site_indices, periods, scatter_freq, min_num, seasons, add_trends, trends_min_yrs, use_fairmode, obs_var, drop_stats, ): stats_dummy = _init_stats_dummy(drop_stats=drop_stats) scat_data = {} scat_dummy = [np.nan] for freq, cd in data.items(): for per in periods: for season in seasons: use_dummy = cd is None if not use_dummy: try: subset = _select_period_season_coldata(cd, per, season) jsdate = subset.data.jsdate.values.tolist() except (DataCoverageError, TemporalResolutionError): use_dummy = True for i, map_stat in zip(site_indices, map_data): if not freq in map_stat: map_stat[freq] = {} if use_dummy: stats = stats_dummy else: obs_vals = subset.data.data[0, :, i] mod_vals = subset.data.data[1, :, i] stats = _get_statistics( obs_vals, mod_vals, min_num=min_num, drop_stats=drop_stats ) if use_fairmode and freq != "yearly" and not np.isnan(obs_vals).all(): stats["mb"] = np.nanmean(mod_vals - obs_vals) stats["fairmode"] = fairmode_stats(obs_var, stats) # Code for the calculation of trends if add_trends and freq != "daily": (start, stop) = _get_min_max_year_periods([per]) (start, stop) = (start.year, stop.year) if stop - start >= trends_min_yrs: try: time = subset.data.time.values (obs_trend, mod_trend) = _make_trends( obs_vals, mod_vals, time, freq, season, start, stop, trends_min_yrs, ) # The whole trends dicts are placed in the stats dict stats["obs_trend"] = obs_trend stats["mod_trend"] = mod_trend except TrendsError as e: msg = f"Failed to calculate trends, and will skip. This was due to {e}" logger.warning(msg) perstr = f"{per}-{season}" map_stat[freq][perstr] = stats if freq == scatter_freq: # add only sites to scatter data that have data available # in the lowest of the input resolutions (e.g. yearly) site = map_stat["station_name"] if not site in scat_data: scat_data[site] = {} scat_data[site]["latitude"] = map_stat["latitude"] scat_data[site]["longitude"] = map_stat["longitude"] scat_data[site]["altitude"] = map_stat["altitude"] scat_data[site]["region"] = map_stat["region"] if use_dummy: obs = mod = jsdate = scat_dummy else: obs, mod = obs_vals.tolist(), mod_vals.tolist() scat_data[site][perstr] = {"obs": obs, "mod": mod, "date": jsdate} return (map_data, scat_data) def _process_regional_timeseries(data, region_ids, regions_how, meta_glob): ts_objs = [] freqs = list(data) check_countries = True if regions_how == "country" else False for regid, regname in region_ids.items(): ts_data = _init_ts_data(freqs) ts_data["station_name"] = regname ts_data.update(meta_glob) for freq, cd in data.items(): if cd is None: continue jsfreq = cd.data.jsdate.values.tolist() try: subset = cd.filter_region(regid, inplace=False, check_country_meta=check_countries) except DataCoverageError: logger.info(f"no data in {regid} ({freq}) to compute regional timeseries") ts_data[f"{freq}_date"] = jsfreq ts_data[f"{freq}_obs"] = [np.nan] * len(jsfreq) ts_data[f"{freq}_mod"] = [np.nan] * len(jsfreq) continue if subset.has_latlon_dims: avg = subset.data.mean(dim=("latitude", "longitude")) else: avg = subset.data.mean(dim="station_name") obs_vals = avg[0].data.tolist() mod_vals = avg[1].data.tolist() ts_data[f"{freq}_date"] = jsfreq ts_data[f"{freq}_obs"] = obs_vals ts_data[f"{freq}_mod"] = mod_vals ts_objs.append(ts_data) return ts_objs def _apply_annual_constraint_helper(coldata, yearly): arr_yr = yearly.data yrs_cd = coldata.data.time.dt.year yrs_avail = arr_yr.time.dt.year obs_allyrs = arr_yr[0] for i, yr in enumerate(yrs_avail): obs_yr = obs_allyrs[i] nan_sites_yr = obs_yr.isnull() if not nan_sites_yr.any(): continue scond = nan_sites_yr.data tcond = (yrs_cd == yr).data # workaround since numpy sometimes throws IndexError if tcond and # scond are attempted to be applied directly via # coldata.data.data[:,tcond, scond] = np.nan tsel = coldata.data.data[:, tcond] tsel[:, :, scond] = np.nan coldata.data.data[:, tcond] = tsel return coldata def _apply_annual_constraint(data): """ Apply annual filter to data Parameters ---------- data : dict keys are frequencies, values are corresponding instances of `ColocatedData` in that resolution, or None, if colocated data is not available in that resolution (see also :func:`_init_data_default_frequencies`). Raises ------ ConfigError If colocated data in yearly resolution is not available in input data Returns ------- output : dict like input `data` but with annual constrained applied. """ output = {} if not "yearly" in data or data["yearly"] is None: raise ConfigError( "Cannot apply annual_stats_constrained option. " 'Please add "yearly" in your setup (see attribute ' '"statistics_json" in AerocomEvaluation class)' ) yearly = data["yearly"] for tst, cd in data.items(): if cd is None: output[tst] = None else: output[tst] = _apply_annual_constraint_helper(cd, yearly) return output def _prep_stats_json(stats): for k, v in stats.items(): try: stats[k] = np.float64(v) # for json encoder... except Exception: # value is str (e.g. for weighted stats) # 'NOTE': 'Weights were not applied to FGE and kendall and spearman corr (not implemented)' stats[k] = v return stats def _get_extended_stats(coldata, use_weights, drop_stats): stats = coldata.calc_statistics(use_area_weights=use_weights, drop_stats=drop_stats) # Removes the spatial median and temporal mean (see mails between Hilde, Jonas, Augustin and Daniel from 27.09.21) # (stats['R_spatial_mean'], # stats['R_spatial_median']) = _calc_spatial_corr(coldata, use_weights) # (stats['R_temporal_mean'], # stats['R_temporal_median']) = _calc_temporal_corr(coldata) (stats["R_spatial_mean"], _) = _calc_spatial_corr(coldata, use_weights) (_, stats["R_temporal_median"]) = _calc_temporal_corr(coldata) return _prep_stats_json(stats) def _calc_spatial_corr(coldata, use_weights): """ Compute spatial correlation both for median and mean aggregation Parameters ---------- coldata : ColocatedData Input data. use_weights : bool Apply area weights or not. Returns ------- float mean spatial correlation float median spatial correlation """ return ( coldata.calc_spatial_statistics(aggr="mean", use_area_weights=use_weights)["R"], coldata.calc_spatial_statistics(aggr="median", use_area_weights=use_weights)["R"], ) def _calc_temporal_corr(coldata): """ Compute temporal correlation both for median and mean aggregation Parameters ---------- coldata : ColocatedData Input data. Returns ------- float mean temporal correlation float median temporal correlation """ if len(coldata.time) < 3: return np.nan, np.nan elif coldata.has_latlon_dims: coldata = coldata.flatten_latlondim_station_name() # Use only sites that contain at least 3 valid data points (otherwise # correlation will be 1). obs_ok = coldata.data[0].count(dim="time") > 2 arr = [] arr.append(coldata.data[0].where(obs_ok, drop=True)) arr.append(coldata.data[1].where(obs_ok, drop=True)) if np.prod(arr[0].shape) == 0 or np.prod(arr[1].shape) == 0: return np.nan, np.nan corr_time = xr.corr(arr[1], arr[0], dim="time") with ignore_warnings(RuntimeWarning, "Mean of empty slice", "All-NaN slice encountered"): return (np.nanmean(corr_time.data), np.nanmedian(corr_time.data)) def _select_period_season_coldata(coldata, period, season): tslice = _period_str_to_timeslice(period) # expensive, try use solution with numpy indexing directly... # also, keep an eye on: https://github.com/pydata/xarray/issues/2799 arr = coldata.data.sel(time=tslice) if len(arr.time) == 0: raise DataCoverageError(f"No data available in period {period}") if season != "all": if not season in arr.season: raise DataCoverageError(f"No data available in {season} in period {period}") elif TsType(coldata.ts_type) < "monthly": raise TemporalResolutionError( "Season selection is only available for monthly or higher resolution data" ) mask = arr["season"] == season arr = arr.sel(time=arr["time"][mask]) return ColocatedData(arr) def _process_heatmap_data( data, region_ids, use_weights, drop_stats, use_country, meta_glob, periods, seasons, add_trends, trends_min_yrs, ): output = {} stats_dummy = _init_stats_dummy(drop_stats=drop_stats) for freq, coldata in data.items(): output[freq] = hm_freq = {} for regid, regname in region_ids.items(): hm_freq[regname] = {} for per in periods: for season in seasons: use_dummy = coldata is None perstr = f"{per}-{season}" if use_dummy: stats = stats_dummy else: try: subset = _select_period_season_coldata(coldata, per, season) trends_successful = False if add_trends and freq != "daily": # Calculates the start and stop years. (start, stop) = _get_min_max_year_periods([per]) (start, stop) = (start.year, stop.year) if stop - start >= trends_min_yrs: try: subset_time_series = subset.get_regional_timeseries( regid, check_country_meta=use_country ) (obs_trend, mod_trend) = _make_trends_from_timeseries( subset_time_series["obs"], subset_time_series["mod"], freq, season, start, stop, trends_min_yrs, ) trends_successful = True except TrendsError as e: msg = f"Failed to calculate trends, and will skip. This was due to {e}" logger.warning(msg) subset = subset.filter_region( region_id=regid, check_country_meta=use_country ) stats = _get_extended_stats(subset, use_weights, drop_stats) if add_trends and freq != "daily" and trends_successful: # The whole trends dicts are placed in the stats dict stats["obs_trend"] = obs_trend stats["mod_trend"] = mod_trend except (DataCoverageError, TemporalResolutionError) as e: stats = stats_dummy hm_freq[regname][perstr] = stats return output def _map_indices(outer_idx, inner_idx): """ Find index positions of inner array contained in outer array Note ---- Both outer and inner index arrays must be strictly monotonous Parameters ---------- outer_idx : np.ndarray numerical value array (e.g. numerical timestamps in yearly resolution from 2010-2020). Must be strictly monotonous. inner_idx : np.ndarray inner value array, must be contained in outer array, e.g. numerical timestamps in yearly resolution from 2012-2014, all values in this array must be contained Returns ------- mappingg : np.ndarray same shape as `outer_idx` with values -1 where no matches are found and else, corresponding indices of `inner_idx` Example ------- `outer_idx`: [2010, 2011, 2012, 2013, 2014, 2015] `inner_idx`: [2011, 2012] `mapping` : [-1, 0, 1, -1, -1, -1] """ inner_start, inner_stop = inner_idx[0], inner_idx[-1] mapping = np.ones_like(outer_idx) * -1 count = 0 indata = False for i, idx in enumerate(outer_idx): if inner_start == idx: indata = True elif inner_stop < idx: break if indata: mapping[i] = count # sanity checking, will fail, e.g. if input data is not monotonuos assert inner_idx[count] == idx count += 1 return mapping.astype(int) def _process_statistics_timeseries( data, freq, region_ids, use_weights, drop_stats, use_country, data_freq ): """ Compute statistics timeseries for input data Parameters ---------- data : dict dictionary containing colocated data object (values) in different temporal resolutions (keys). freq : str Output frequency (temporal resolution in which statistics timeseries if computed, AeroVal default is monthly) region_ids : dict Region IDs (keys) and corresponding names (values) use_weights : bool calculate statistics using area weights or not (only relevant for 4D colocated data with lat and lon dimension, e.g. from gridded / gridded co-location) use_country : bool Use countries for regional filtering. data_freq : str, optional Base frequency for computation of statistics (if None, `freq` is used). For details see https://github.com/metno/pyaerocom/pull/416. Raises ------ TemporalResolutionError If `data_freq` is lower resolution than `freq`. Returns ------- output : dict Dictionary with results. """ if data_freq is None: data_freq = freq # input frequency is lower resolution than output frequency if TsType(data_freq) < TsType(freq): raise TemporalResolutionError( f"Desired input frequency {data_freq} is lower than desired " f"output frequency {freq}" ) output = {} if not data_freq in data or data[data_freq] is None: raise TemporalResolutionError( f"failed to compute statistics timeseries, no co-located data " f"available in specified base resolution {data_freq}" ) coldata = data[data_freq] # get time index of output frequency to_idx = data[freq].data.time.values tstr = TsType(freq).to_numpy_freq() # list of strings of output timestamps (used below to select the # individual periods) to_idx_str = [str(x) for x in to_idx.astype(f"datetime64[{tstr}]")] jsdate = _get_jsdate(to_idx) for regid, regname in region_ids.items(): output[regname] = {} try: subset = coldata.filter_region(region_id=regid, check_country_meta=use_country) except DataCoverageError: continue for i, js in enumerate(jsdate): per = to_idx_str[i] try: arr = ColocatedData(subset.data.sel(time=per)) stats = arr.calc_statistics(use_area_weights=use_weights, drop_stats=drop_stats) output[regname][str(js)] = _prep_stats_json(stats) except DataCoverageError: pass return output def _get_jsdate(nparr): dt = nparr.astype("datetime64[s]") offs = np.datetime64("1970", "s") return (dt - offs).astype(int) * 1000 def _init_data_default_frequencies(coldata, to_ts_types): """ Compute one colocated data object for each desired statistics frequency Parameters ---------- coldata : ColocatedData Initial colocated data in a certain frequency to_ts_types : list list of desired frequencies for which statistical parameters are being computed. Returns ------- dict keys are elements of `to_ts_types`, values are corresponding instances of `ColocatedData` in that resolution, or None, if resolution is higher than original resolution of input `coldata`. dict like `data_arrs` but values are jsdate instances. """ data_arrs = dict.fromkeys(to_ts_types) from_tst = TsType(coldata.ts_type) for to in to_ts_types: to_tst = TsType(to) if from_tst < to_tst: continue elif from_tst == to_tst: cd = coldata.copy() else: cd = coldata.resample_time(to, settings_from_meta=True, inplace=False) # add season coordinate for later filtering arr = cd.data arr["season"] = arr.time.dt.season jsdates = _get_jsdate(arr.time.values) arr = arr.assign_coords(jsdate=("time", jsdates)) cd.data = arr data_arrs[to] = cd return data_arrs def get_profile_filename(station_or_region_name, obs_name, var_name_web): return f"{station_or_region_name}_{obs_name}_{var_name_web}.json"
[docs] def process_profile_data_for_regions( data: ColocatedData, region_id: str, use_country: bool, periods: list[str], seasons: list[str], ) -> dict: # pragma: no cover """ This method populates the json files in data/profiles which are use for visualization. Analogous to _process_map_and_scat for profile data. Each json file corresponds to a region or station, obs network, and variable. Inside the json, it is broken up by model. Each model has a key for "z" (the vertical dimension), "obs", and "mod" Each "obs" and "mod" is broken up by period. Args: data (ColocatedData): ColocatedData object for this layer region_id (str): Spatial subset to compute the mean profiles over station_name (str): Station to compute mean profiles over for period use_country (boolean): Passed to filter_region(). periods (str): Year part of the temporal range to average over seasons (str): Sesonal part of the temporal range to average over Returns: output (dict): Dictionary to write to json """ output = {"obs": {}, "mod": {}} for freq, coldata in data.items(): if freq not in output["obs"]: output["obs"][freq] = {} if freq not in output["mod"]: output["mod"][freq] = {} for per in periods: for season in seasons: use_dummy = coldata is None perstr = f"{per}-{season}" if use_dummy: output["obs"][freq][perstr] = np.nan output["mod"][freq][perstr] = np.nan else: try: per_season_subset = _select_period_season_coldata(coldata, per, season) subset = per_season_subset.filter_region( region_id=region_id, check_country_meta=use_country ) output["obs"][freq][perstr] = np.nanmean(subset.data[0, :, :]) output["mod"][freq][perstr] = np.nanmean(subset.data[1, :, :]) except (DataCoverageError, TemporalResolutionError) as e: msg = f"Failed to access subset timeseries, and will skip. Reason was: {e}" logger.warning(msg) output["obs"][freq][perstr] = np.nan output["mod"][freq][perstr] = np.nan return output
[docs] def process_profile_data_for_stations( data: ColocatedData, station_name: str, use_country: bool, periods: list[str], seasons: list[str], ) -> dict: # pragma: no cover """ This method populates the json files in data/profiles which are use for visualization. Analogous to _process_map_and_scat for profile data. Each json file corresponds to a region, obs network, and variable. Inside the json, it is broken up by model. Each model has a key for "z" (the vertical dimension), "obs", and "mod" Each "obs" and "mod" is broken up by period. Args: data (ColocatedData): ColocatedData object for this layer region_id (str): Spatial subset to compute the mean profiles over station_name (str): Station to compute mean profiles over for period use_country (boolean): Passed to filter_region(). periods (str): Year part of the temporal range to average over seasons (str): Sesonal part of the temporal range to average over Returns: output (dict): Dictionary to write to json """ output = {"obs": {}, "mod": {}} for freq, coldata in data.items(): if freq not in output["obs"]: output["obs"][freq] = {} if freq not in output["mod"]: output["mod"][freq] = {} for per in periods: for season in seasons: use_dummy = coldata is None perstr = f"{per}-{season}" if use_dummy: output["obs"][freq][perstr] = np.nan output["mod"][freq][perstr] = np.nan else: try: per_season_subset = _select_period_season_coldata(coldata, per, season) subset = per_season_subset.data[ :, :, per_season_subset.data.station_name.values == station_name, # in this case a station ] # Assumes ordering of station name matches output["obs"][freq][perstr] = np.nanmean(subset.data[0, :, :]) output["mod"][freq][perstr] = np.nanmean(subset.data[1, :, :]) except (DataCoverageError, TemporalResolutionError) as e: msg = f"Failed to access subset timeseries, and will skip. Reason was: {e}" logger.warning(msg) output["obs"][freq][perstr] = np.nan output["mod"][freq][perstr] = np.nan return output
[docs] def add_profile_entry_json( profile_file: str, data: ColocatedData, profile_viz: dict, periods: list[str], seasons: list[str], ): # pragma: no cover """ Analogous to _add_heatmap_entry_json for profile data. Every time this function is called it checks to see if the profile_file exists. If so, it reads it, if not it makes a new one. This is because one can not add to json files and so everytime we want to add entries for profile layers we must read in the old file, add the entries, and write a new file. Args: profile_file (str): Name of profile_file data (ColocatedData): For this vertical layer profile_viz (dict): Output of process_profile_data() periods (list[str]): periods to compute over (years) seasons (list[str]): seasons to compute over (e.g., All, DJF, etc.) """ if os.path.exists(profile_file): current = read_json(profile_file) else: current = {} for freq, coldata in data.items(): model_name = coldata.model_name if not model_name in current: current[model_name] = {} midpoint = ( float(coldata.data.attrs["vertical_layer"]["end"]) + float(coldata.data.attrs["vertical_layer"]["start"]) ) / 2 if not "z" in current[model_name]: current[model_name]["z"] = [midpoint] # initalize with midpoint if ( midpoint > current[model_name]["z"][-1] ): # only store incremental increases in the layers current[model_name]["z"].append(midpoint) if not "obs" in current[model_name]: current[model_name]["obs"] = {} if not freq in current[model_name]["obs"]: current[model_name]["obs"][freq] = {} if not "mod" in current[model_name]: current[model_name]["mod"] = {} if not freq in current[model_name]["mod"]: current[model_name]["mod"][freq] = {} for per in periods: for season in seasons: perstr = f"{per}-{season}" if not perstr in current[model_name]["obs"][freq]: current[model_name]["obs"][freq][perstr] = [] if not perstr in current[model_name]["mod"][freq]: current[model_name]["mod"][freq][perstr] = [] current[model_name]["obs"][freq][perstr].append(profile_viz["obs"][freq][perstr]) current[model_name]["mod"][freq][perstr].append(profile_viz["mod"][freq][perstr]) if not "metadata" in current[model_name]: current[model_name]["metadata"] = { "z_unit": coldata.data.attrs["altitude_units"], "z_description": "Altitude ASL", "z_long_description": "Altitude Above Sea Level", "unit": coldata.unitstr, } current[model_name] = round_floats(current[model_name]) write_json(current, profile_file, round_floats=False)