import logging
import os
from time import time
from cf_units import Unit
from numpy.typing import ArrayLike
from pyaerocom import ColocatedData, TsType
from pyaerocom.aeroval._processing_base import ProcessingEngine
from pyaerocom.aeroval.coldatatojson_helpers import (
_add_heatmap_entry_json,
_apply_annual_constraint,
_init_data_default_frequencies,
_init_meta_glob,
_process_heatmap_data,
_process_map_and_scat,
_process_regional_timeseries,
_process_sites,
_process_sites_weekly_ts,
_process_statistics_timeseries,
_write_site_data,
_write_stationdata_json,
add_profile_entry_json,
get_heatmap_filename,
get_json_mapname,
get_profile_filename,
get_timeseries_file_name,
init_regions_web,
process_profile_data_for_regions,
process_profile_data_for_stations,
update_regions_json,
)
from pyaerocom.aeroval.exceptions import ConfigError
from pyaerocom.aeroval.json_utils import write_json
from pyaerocom.exceptions import TemporalResolutionError
logger = logging.getLogger(__name__)
[docs]
class ColdataToJsonEngine(ProcessingEngine):
[docs]
def run(self, files):
"""
Convert colocated data files to json
Parameters
----------
files : list
list of file paths pointing to colocated data objects to be
processed.
Returns
-------
list
list of files that have been converted.
"""
converted = []
for file in files:
logger.info(f"Processing: {file}")
coldata = ColocatedData(data=file)
self.process_coldata(coldata)
converted.append(file)
return converted
[docs]
def process_coldata(self, coldata: ColocatedData):
"""
Creates all json files for one ColocatedData object
Parameters
----------
coldata : ColocatedData
colocated data to be processed.
Raises
------
NotImplementedError
DESCRIPTION.
ValueError
DESCRIPTION.
ConfigError
DESCRIPTION.
Returns
-------
None.
"""
t00 = time()
use_weights = self.cfg.statistics_opts.weighted_stats
drop_stats = self.cfg.statistics_opts.drop_stats
# redundant, but cheap and important to be correct
self.cfg._check_time_config()
freqs = self.cfg.time_cfg.freqs
periods = self.cfg.time_cfg.periods
seasons = self.cfg.time_cfg.get_seasons()
main_freq = self.cfg.time_cfg.main_freq
annual_stats_constrained = self.cfg.statistics_opts.annual_stats_constrained
out_dirs = self.cfg.path_manager.get_json_output_dirs(True)
regions_json = self.exp_output.regions_file
regions_how = self.cfg.webdisp_opts.regions_how
stats_min_num = self.cfg.statistics_opts.MIN_NUM
if hasattr(coldata.data, "altitude_units"):
if Unit(coldata.data.attrs["altitude_units"]) != Unit(
"km"
): # put everything in terms of km for viz
# convert start and end for file naming
self._convert_coldata_altitude_units_to_km(coldata)
vert_code = self._get_vert_code(coldata)
diurnal_only = coldata.get_meta_item("diurnal_only")
add_trends = self.cfg.statistics_opts.add_trends
trends_min_yrs = self.cfg.statistics_opts.trends_min_yrs
use_fairmode = self.cfg.statistics_opts.use_fairmode
use_diurnal = self.cfg.statistics_opts.use_diurnal
# ToDo: some of the checks below could be done automatically in
# EvalSetup, and at an earlier stage
if vert_code == "ModelLevel":
raise NotImplementedError("Coming (not so) soon...")
# this will need to be figured out as soon as there is altitude
elif "altitude" in coldata.data.dims:
raise ValueError("Altitude should have been dealt with already in the colocation")
elif not isinstance(coldata, ColocatedData):
raise ValueError(f"Need ColocatedData object, got {type(coldata)}")
elif coldata.has_latlon_dims and regions_how == "country":
raise NotImplementedError(
"Cannot yet apply country filtering for 4D colocated data instances"
)
elif not main_freq in freqs:
raise ConfigError(f"main_freq {main_freq} is not in experiment frequencies: {freqs}")
if self.cfg.statistics_opts.stats_tseries_base_freq is not None:
if not self.cfg.statistics_opts.stats_tseries_base_freq in freqs:
raise ConfigError(
f"Base frequency for statistics timeseries needs to be "
f"specified in experiment frequencies: {freqs}"
)
# init some stuff
if "var_name_input" in coldata.metadata:
obs_var = coldata.metadata["var_name_input"][0]
model_var = coldata.metadata["var_name_input"][1]
else:
obs_var = model_var = "UNDEFINED"
model_name = coldata.model_name
obs_name = coldata.obs_name
mcfg = self.cfg.model_cfg.get_entry(model_name)
var_name_web = mcfg.get_varname_web(model_var, obs_var)
logger.info(
f"Computing json files for {model_name} ({model_var}) vs. {obs_name} ({obs_var})"
)
meta_glob = _init_meta_glob(
coldata,
vert_code=vert_code,
obs_name=obs_name,
model_name=model_name,
var_name_web=var_name_web,
)
# get region IDs
(regborders, regs, regnames) = init_regions_web(coldata, regions_how)
update_regions_json(regborders, regions_json)
use_country = True if regions_how == "country" else False
data = _init_data_default_frequencies(coldata, freqs)
if annual_stats_constrained:
data = _apply_annual_constraint(data)
if not coldata.data.attrs.get("just_for_viz", False): # make the regular json output
if not diurnal_only:
logger.info("Processing statistics timeseries for all regions")
self._process_stats_timeseries_for_all_regions(
data=data,
coldata=coldata,
main_freq=main_freq,
regnames=regnames,
use_weights=use_weights,
drop_stats=drop_stats,
use_country=use_country,
obs_name=obs_name,
obs_var=obs_var,
var_name_web=var_name_web,
out_dirs=out_dirs,
vert_code=vert_code,
model_name=model_name,
model_var=model_var,
meta_glob=meta_glob,
periods=periods,
seasons=seasons,
add_trends=add_trends,
trends_min_yrs=trends_min_yrs,
regions_how=regions_how,
regs=regs,
stats_min_num=stats_min_num,
use_fairmode=use_fairmode,
)
if coldata.ts_type == "hourly" and use_diurnal:
logger.info("Processing diurnal profiles")
self._process_diurnal_profiles(
coldata=coldata,
regions_how=regions_how,
regnames=regnames,
meta_glob=meta_glob,
out_dirs=out_dirs,
)
else:
logger.info("Processing profile data for vizualization")
self._process_profile_data_for_vizualization(
data=data,
use_country=use_country,
region_names=regnames,
station_names=coldata.data.station_name.values,
periods=periods,
seasons=seasons,
obs_name=obs_name,
var_name_web=var_name_web,
out_dirs=out_dirs,
)
logger.info(
f"Finished computing json files for {model_name} ({model_var}) vs. "
f"{obs_name} ({obs_var})"
)
dt = time() - t00
logger.info(f"Time expired: {dt:.2f} s")
def _convert_coldata_altitude_units_to_km(self, coldata: ColocatedData = None):
alt_units = coldata.data.attrs["altitude_units"]
coldata.data.attrs["vertical_layer"]["start"] = str(
Unit(alt_units).convert(coldata.data.attrs["vertical_layer"]["start"], other="km")
)
coldata.data.attrs["vertical_layer"]["end"] = str(
Unit(alt_units).convert(coldata.data.attrs["vertical_layer"]["end"], other="km")
)
def _get_vert_code(self, coldata: ColocatedData = None):
if hasattr(coldata.data, "vertical_layer"):
# start and end for vertical layers (display on web and name jsons)
start = float(coldata.data.attrs["vertical_layer"]["start"])
end = float(coldata.data.attrs["vertical_layer"]["end"])
# format correctly (e.g., 1, 1.5, 2, 2.5, etc.)
start = f"{round(float(start), 1):g}"
end = f"{round(float(end), 1):g}"
vert_code = f"{start}-{end}km"
else:
vert_code = coldata.get_meta_item("vert_code")
return vert_code
def _process_profile_data_for_vizualization(
self,
data: dict[str, ColocatedData] = None,
use_country: bool = False,
region_names: dict[str:str] = None,
station_names: ArrayLike = None,
periods: list[str] = None,
seasons: list[str] = None,
obs_name: str = None,
var_name_web: str = None,
out_dirs: dict = None,
):
if region_names == None and station_names == None:
raise ValueError("Both region_id and station_name can not both be None")
# Loop through regions
for regid in region_names:
profile_viz = process_profile_data_for_regions(
data=data,
region_id=regid,
use_country=use_country,
periods=periods,
seasons=seasons,
)
fname = get_profile_filename(region_names[regid], obs_name, var_name_web)
outfile_profile = os.path.join(out_dirs["profiles"], fname)
add_profile_entry_json(outfile_profile, data, profile_viz, periods, seasons)
# Loop through stations
for station_name in station_names:
profile_viz = process_profile_data_for_stations(
data=data,
station_name=station_name,
use_country=use_country,
periods=periods,
seasons=seasons,
)
fname = get_profile_filename(station_name, obs_name, var_name_web)
outfile_profile = os.path.join(out_dirs["profiles"], fname)
add_profile_entry_json(outfile_profile, data, profile_viz, periods, seasons)
def _process_stats_timeseries_for_all_regions(
self,
data: dict[str, ColocatedData] = None,
coldata: ColocatedData = None,
main_freq: str = None,
regnames=None,
use_weights: bool = True,
drop_stats: tuple = (),
use_country: bool = False,
obs_name: str = None,
obs_var: str = None,
var_name_web: str = None,
out_dirs: dict = None,
vert_code: str = None,
model_name: str = None,
model_var: str = None,
meta_glob: dict = None,
periods: list[str] = None,
seasons: list[str] = None,
add_trends: bool = False,
trends_min_yrs: int = 7,
regions_how: str = "default",
regs: dict = None,
stats_min_num: int = 1,
use_fairmode: bool = False,
):
input_freq = self.cfg.statistics_opts.stats_tseries_base_freq
for reg in regnames:
try:
stats_ts = _process_statistics_timeseries(
data=data,
freq=main_freq,
region_ids={reg: regnames[reg]},
use_weights=use_weights,
drop_stats=drop_stats,
use_country=use_country,
data_freq=input_freq,
)
except TemporalResolutionError:
stats_ts = {}
fname = get_timeseries_file_name(regnames[reg], obs_name, var_name_web, vert_code)
ts_file = os.path.join(out_dirs["hm/ts"], fname)
_add_heatmap_entry_json(
ts_file, stats_ts, obs_name, var_name_web, vert_code, model_name, model_var
)
logger.info("Processing heatmap data for all regions")
hm_all = _process_heatmap_data(
data,
regnames,
use_weights,
drop_stats,
use_country,
meta_glob,
periods,
seasons,
add_trends,
trends_min_yrs,
)
for freq, hm_data in hm_all.items():
fname = get_heatmap_filename(freq)
hm_file = os.path.join(out_dirs["hm"], fname)
_add_heatmap_entry_json(
hm_file, hm_data, obs_name, var_name_web, vert_code, model_name, model_var
)
logger.info("Processing regional timeseries for all regions")
ts_objs_regional = _process_regional_timeseries(data, regnames, regions_how, meta_glob)
_write_site_data(ts_objs_regional, out_dirs["ts"])
if coldata.has_latlon_dims:
for cd in data.values():
if cd is not None:
cd.data = cd.flatten_latlondim_station_name().data
logger.info("Processing individual site timeseries data")
(ts_objs, map_meta, site_indices) = _process_sites(data, regs, regions_how, meta_glob)
_write_site_data(ts_objs, out_dirs["ts"])
scatter_freq = min(TsType(fq) for fq in self.cfg.time_cfg.freqs)
scatter_freq = min(scatter_freq, main_freq)
logger.info("Processing map and scat data by period")
for period in periods:
# compute map_data and scat_data just for this period
map_data, scat_data = _process_map_and_scat(
data,
map_meta,
site_indices,
[period],
str(scatter_freq),
stats_min_num,
seasons,
add_trends,
trends_min_yrs,
use_fairmode,
obs_var,
drop_stats,
)
# the files in /map and /scat will be split up according to their time period as well
map_name = get_json_mapname(
obs_name, var_name_web, model_name, model_var, vert_code, period
)
outfile_map = os.path.join(out_dirs["map"], map_name)
write_json(map_data, outfile_map, ignore_nan=True)
outfile_scat = os.path.join(out_dirs["scat"], map_name)
write_json(scat_data, outfile_scat, ignore_nan=True)
def _process_diurnal_profiles(
self,
coldata: ColocatedData = None,
regions_how: str = "default",
regnames=None,
meta_glob: dict = None,
out_dirs: dict = None,
):
(ts_objs_weekly, ts_objs_weekly_reg) = _process_sites_weekly_ts(
coldata, regions_how, regnames, meta_glob
)
outdir = os.path.join(out_dirs["ts/diurnal"])
for ts_data_weekly in ts_objs_weekly:
# writes json file
_write_stationdata_json(ts_data_weekly, outdir)
if ts_objs_weekly_reg != None:
for ts_data_weekly_reg in ts_objs_weekly_reg:
# writes json file
_write_stationdata_json(ts_data_weekly_reg, outdir)