import logging
from time import time
from cf_units import Unit
from numpy.typing import ArrayLike
from pyaerocom import ColocatedData, TsType
from pyaerocom.aeroval._processing_base import ProcessingEngine
from pyaerocom.aeroval.coldatatojson_helpers import (
_apply_annual_constraint,
_init_data_default_frequencies,
_init_meta_glob,
_process_heatmap_data,
_process_map_and_scat,
_process_regional_timeseries,
_process_sites,
_process_sites_weekly_ts,
_process_statistics_timeseries,
_remove_less_covered,
init_regions_web,
process_profile_data_for_regions,
process_profile_data_for_stations,
)
from pyaerocom.aeroval.exceptions import ConfigError
from pyaerocom.aeroval.json_utils import round_floats
from pyaerocom.exceptions import TemporalResolutionError
logger = logging.getLogger(__name__)
[docs]
class ColdataToJsonEngine(ProcessingEngine):
[docs]
def run(self, files):
"""
Convert colocated data files to json
Parameters
----------
files : list
list of file paths pointing to colocated data objects to be
processed.
Returns
-------
list
list of files that have been converted.
"""
converted = []
for file in files:
logger.info(f"Processing: {file}")
coldata = ColocatedData(data=file)
self.process_coldata(coldata)
converted.append(file)
return converted
[docs]
def process_coldata(self, coldata: ColocatedData):
"""
Creates all json files for one ColocatedData object
Parameters
----------
coldata : ColocatedData
colocated data to be processed.
Raises
------
NotImplementedError
DESCRIPTION.
ValueError
DESCRIPTION.
ConfigError
DESCRIPTION.
Returns
-------
None.
"""
t00 = time()
use_weights = self.cfg.statistics_opts.weighted_stats
drop_stats = self.cfg.statistics_opts.drop_stats
# redundant, but cheap and important to be correct
self.cfg._check_time_config()
freqs = self.cfg.time_cfg.freqs
periods = self.cfg.time_cfg.periods
seasons = self.cfg.time_cfg.get_seasons()
main_freq = self.cfg.time_cfg.main_freq
annual_stats_constrained = self.cfg.statistics_opts.annual_stats_constrained
out_dirs = self.cfg.path_manager.get_json_output_dirs(True)
regions_how = self.cfg.webdisp_opts.regions_how
stats_min_num = self.cfg.statistics_opts.MIN_NUM
if hasattr(coldata.data, "altitude_units"):
if Unit(coldata.data.attrs["altitude_units"]) != Unit(
"km"
): # put everything in terms of km for viz
# convert start and end for file naming
self._convert_coldata_altitude_units_to_km(coldata)
vert_code = self._get_vert_code(coldata)
diurnal_only = coldata.get_meta_item("diurnal_only")
add_trends = self.cfg.statistics_opts.add_trends
trends_min_yrs = self.cfg.statistics_opts.stats_min_yrs
min_yrs = self.cfg.statistics_opts.obs_min_yrs
sequential_yrs = self.cfg.statistics_opts.sequential_yrs
avg_over_trends = self.cfg.statistics_opts.avg_over_trends
use_fairmode = self.cfg.statistics_opts.use_fairmode
use_diurnal = self.cfg.statistics_opts.use_diurnal
# ToDo: some of the checks below could be done automatically in
# EvalSetup, and at an earlier stage
if vert_code == "ModelLevel":
raise NotImplementedError("Coming (not so) soon...")
# this will need to be figured out as soon as there is altitude
elif "altitude" in coldata.data.dims:
raise ValueError("Altitude should have been dealt with already in the colocation")
elif not isinstance(coldata, ColocatedData):
raise ValueError(f"Need ColocatedData object, got {type(coldata)}")
elif coldata.has_latlon_dims and regions_how == "country":
raise NotImplementedError(
"Cannot yet apply country filtering for 4D colocated data instances"
)
elif main_freq not in freqs:
raise ConfigError(f"main_freq {main_freq} is not in experiment frequencies: {freqs}")
if self.cfg.statistics_opts.stats_tseries_base_freq is not None:
if self.cfg.statistics_opts.stats_tseries_base_freq not in freqs:
raise ConfigError(
f"Base frequency for statistics timeseries needs to be "
f"specified in experiment frequencies: {freqs}"
)
# init some stuff
if "var_name_input" in coldata.metadata:
obs_var = coldata.metadata["var_name_input"][0]
model_var = coldata.metadata["var_name_input"][1]
elif (
"obs_vars" in coldata.metadata
): # Try and get from obs_vars. Should not be needed in reading in a ColocatedData object created with pyaerocom.
obs_var = model_var = coldata.metadata["obs_vars"]
coldata.metadata["var_name_input"] = [obs_var, model_var]
logger.warning(
"ColdataToJsonEngine: Failed to access var_name_input from coldata.metadata. "
"This could be because you're using a ColocatedData object created outside of pyaerocom. "
"Setting obs_var and model_data to value in obs_vars instead. "
"Setting var_input_data to these values as well. "
)
else:
obs_var = model_var = "UNDEFINED"
model_name = coldata.model_name
obs_name = coldata.obs_name
mcfg = self.cfg.model_cfg.get_entry(model_name)
var_name_web = mcfg.get_varname_web(model_var, obs_var)
logger.info(
f"Computing json files for {model_name} ({model_var}) vs. {obs_name} ({obs_var})"
)
meta_glob = _init_meta_glob(
coldata,
vert_code=vert_code,
obs_name=obs_name,
model_name=model_name,
var_name_web=var_name_web,
)
if min_yrs > 0:
logger.info(
f"Removing stations with less than {min_yrs} years of data, with sequential_yrs = {sequential_yrs}"
)
coldata = _remove_less_covered(coldata, min_yrs, sequential_yrs)
# get region IDs
(regborders, regs, regnames) = init_regions_web(coldata, regions_how)
# Synchronise regions.json file
with self.avdb.lock():
regions = self.avdb.get_regions(
self.exp_output.proj_id, self.exp_output.exp_id, default={}
)
for region_name, region_info in regborders.items():
regions[region_name] = round_floats(region_info)
self.avdb.put_regions(regions, self.exp_output.proj_id, self.exp_output.exp_id)
use_country = True if regions_how == "country" else False
data = _init_data_default_frequencies(coldata, freqs)
if annual_stats_constrained:
data = _apply_annual_constraint(data)
if not coldata.data.attrs.get("just_for_viz", False): # make the regular json output
if not diurnal_only:
logger.info("Processing statistics timeseries for all regions")
self._process_stats_timeseries_for_all_regions(
data=data,
coldata=coldata,
main_freq=main_freq,
regnames=regnames,
use_weights=use_weights,
drop_stats=drop_stats,
use_country=use_country,
obs_name=obs_name,
obs_var=obs_var,
var_name_web=var_name_web,
out_dirs=out_dirs,
vert_code=vert_code,
model_name=model_name,
model_var=model_var,
meta_glob=meta_glob,
periods=periods,
seasons=seasons,
add_trends=add_trends,
trends_min_yrs=trends_min_yrs,
regions_how=regions_how,
regs=regs,
stats_min_num=stats_min_num,
use_fairmode=use_fairmode,
avg_over_trends=avg_over_trends,
)
if coldata.ts_type == "hourly" and use_diurnal:
logger.info("Processing diurnal profiles")
self._process_diurnal_profiles(
coldata=coldata,
regions_how=regions_how,
regnames=regnames,
meta_glob=meta_glob,
)
else:
logger.info("Processing profile data for visualization")
self._process_profile_data_for_visualization(
data=data,
use_country=use_country,
region_names=regnames,
station_names=coldata.data.station_name.values,
periods=periods,
seasons=seasons,
obs_name=obs_name,
var_name_web=var_name_web,
)
logger.info(
f"Finished computing json files for {model_name} ({model_var}) vs. "
f"{obs_name} ({obs_var})"
)
dt = time() - t00
logger.info(f"Time expired: {dt:.2f} s")
def _convert_coldata_altitude_units_to_km(self, coldata: ColocatedData = None):
alt_units = coldata.data.attrs["altitude_units"]
coldata.data.attrs["vertical_layer"]["start"] = str(
Unit(alt_units).convert(coldata.data.attrs["vertical_layer"]["start"], other="km")
)
coldata.data.attrs["vertical_layer"]["end"] = str(
Unit(alt_units).convert(coldata.data.attrs["vertical_layer"]["end"], other="km")
)
def _get_vert_code(self, coldata: ColocatedData = None):
if hasattr(coldata.data, "vertical_layer"):
# start and end for vertical layers (display on web and name jsons)
start = float(coldata.data.attrs["vertical_layer"]["start"])
end = float(coldata.data.attrs["vertical_layer"]["end"])
# format correctly (e.g., 1, 1.5, 2, 2.5, etc.)
start = f"{round(float(start), 1):g}"
end = f"{round(float(end), 1):g}"
vert_code = f"{start}-{end}km"
else:
vert_code = coldata.get_meta_item("vert_code")
return vert_code
def _process_profile_data_for_visualization(
self,
data: dict[str, ColocatedData] = None,
use_country: bool = False,
region_names: dict[str:str] = None,
station_names: ArrayLike = None,
periods: tuple[str, ...] = None,
seasons: tuple[str, ...] = None,
obs_name: str = None,
var_name_web: str = None,
):
if region_names is None and station_names is None:
raise ValueError("Both region_id and station_name can not both be None")
# Loop through regions
for regid in region_names:
profile_viz = process_profile_data_for_regions(
data=data,
region_id=regid,
use_country=use_country,
periods=periods,
seasons=seasons,
)
location = region_names[regid]
self.exp_output.add_profile_entry(
data,
profile_viz,
periods,
seasons,
location=location,
network=obs_name,
obsvar=var_name_web,
)
# Loop through stations
for station_name in station_names:
profile_viz = process_profile_data_for_stations(
data=data,
station_name=station_name,
use_country=use_country,
periods=periods,
seasons=seasons,
)
self.exp_output.add_profile_entry(
data,
profile_viz,
periods,
seasons,
location=station_name,
network=obs_name,
obsvar=var_name_web,
)
def _process_stats_timeseries_for_all_regions(
self,
data: dict[str, ColocatedData] = None,
coldata: ColocatedData = None,
main_freq: str = None,
regnames=None,
use_weights: bool = True,
drop_stats: tuple = (),
use_country: bool = False,
obs_name: str = None,
obs_var: str = None,
var_name_web: str = None,
out_dirs: dict = None,
vert_code: str = None,
model_name: str = None,
model_var: str = None,
meta_glob: dict = None,
periods: tuple[str, ...] = None,
seasons: tuple[str, ...] = None,
add_trends: bool = False,
trends_min_yrs: int = 7,
regions_how: str = "default",
regs: dict = None,
stats_min_num: int = 1,
use_fairmode: bool = False,
avg_over_trends: bool = False,
):
input_freq = self.cfg.statistics_opts.stats_tseries_base_freq
for reg in regnames:
try:
stats_ts = _process_statistics_timeseries(
data=data,
freq=main_freq,
region_ids={reg: regnames[reg]},
use_weights=use_weights,
drop_stats=drop_stats,
use_country=use_country,
data_freq=input_freq,
)
except TemporalResolutionError:
stats_ts = {}
region = regnames[reg]
self.exp_output.add_heatmap_timeseries_entry(
stats_ts,
region,
obs_name,
var_name_web,
vert_code,
model_name,
model_var,
)
logger.info("Processing heatmap data for all regions")
hm_all = _process_heatmap_data(
data,
regnames,
use_weights,
drop_stats,
use_country,
meta_glob,
periods,
seasons,
add_trends,
trends_min_yrs,
avg_over_trends,
)
for freq, hm_data in hm_all.items():
self.exp_output.add_heatmap_entry(
hm_data, freq, obs_name, var_name_web, vert_code, model_name, model_var
)
logger.info("Processing regional timeseries for all regions")
ts_objs_regional = _process_regional_timeseries(data, regnames, regions_how, meta_glob)
self.exp_output.write_timeseries(ts_objs_regional)
if coldata.has_latlon_dims:
for cd in data.values():
if cd is not None:
cd.data = cd.flatten_latlondim_station_name().data
logger.info("Processing individual site timeseries data")
(ts_objs, map_meta, site_indices) = _process_sites(data, regs, regions_how, meta_glob)
self.exp_output.write_timeseries(ts_objs)
scatter_freq = min(TsType(fq) for fq in self.cfg.time_cfg.freqs)
scatter_freq = min(scatter_freq, main_freq)
logger.info("Processing map and scat data by period")
for period in periods:
# compute map_data and scat_data just for this period
map_data, scat_data = _process_map_and_scat(
data,
map_meta,
site_indices,
[period],
str(scatter_freq),
stats_min_num,
seasons,
add_trends,
trends_min_yrs,
avg_over_trends,
use_fairmode,
obs_var,
drop_stats,
)
with self.avdb.lock():
self.avdb.put_map(
map_data,
self.exp_output.proj_id,
self.exp_output.exp_id,
obs_name,
var_name_web,
vert_code,
model_name,
model_var,
period.replace("/", ""), # Remove slashes in CAMS2_83 period.
)
self.avdb.put_scatter(
scat_data,
self.exp_output.proj_id,
self.exp_output.exp_id,
obs_name,
var_name_web,
vert_code,
model_name,
model_var,
period.replace("/", ""), # Remove slashes in CAMS2_83 period.
)
def _process_diurnal_profiles(
self,
coldata: ColocatedData = None,
regions_how: str = "default",
regnames=None,
meta_glob: dict = None,
):
ts_objs_weekly, ts_objs_weekly_reg = _process_sites_weekly_ts(
coldata, regions_how, regnames, meta_glob
)
for ts_data_weekly in ts_objs_weekly:
self.exp_output.write_station_data(ts_data_weekly)
if ts_objs_weekly_reg is not None:
for ts_data_weekly_reg in ts_objs_weekly_reg:
self.exp_output.write_station_data(ts_data_weekly_reg)