import logging
import os
import sqlite3
from pyaerocom._lowlevel_helpers import BrowseDict
logger = logging.getLogger(__name__)
[docs]
class EbasSQLRequest(BrowseDict):
"""Low level dictionary like object for EBAS sqlite queries
Attributes
----------
variables : :obj:`tuple`, optional
tuple containing variable names to be extracted (e.g.
``('aerosol_light_scattering_coefficient', 'aerosol_optical_depth')``).
If None, all available is used
start_date : :obj:`str`, optional
start date of data request (format YYYY-MM-DD). If None, all available
is used
stop_date : :obj:`str`, optional
stop date of data request (format YYYY-MM-DD). If None, all available
is used
station_names : :obj:`tuple`, optional
tuple containing station_names of request (e.g.
``('Birkenes II', 'Asa')``).If None, all available is used
matrices : :obj:`tuple`, optional
tuple containing station_names of request (e.g.
``('pm1', 'pm10', 'pm25', 'aerosol')``)
If None, all available is used
altitude_range : :obj:`tuple`, optional
tuple specifying altitude range of station in m (e.g.
``(0.0, 500.0)``). If None, all available is used
lon_range : :obj:`tuple`, optional
tuple specifying longitude range of station in degrees (e.g.
``(-20, 20)``). If None, all available is used
lat_range : :obj:`tuple`, optional
tuple specifying latitude range of station in degrees (e.g.
``(50, 80)``). If None, all available is used
instrument_type : :obj:`str`, optional
string specifying instrument types (e.g.
``("nephelometer")``)
statistics : :obj:`tuple`, optional
string specifying statistics code (e.g.
``("arithmetic mean")``)
Parameters
----------
see Attributes
"""
def __init__(
self,
variables=None,
start_date=None,
stop_date=None,
station_names=None,
matrices=None,
altitude_range=None,
lon_range=None,
lat_range=None,
instrument_types=None,
statistics=None,
datalevel=None,
):
self.variables = variables
self.start_date = start_date
self.stop_date = stop_date
self.station_names = station_names
self.matrices = matrices
self.altitude_range = altitude_range
self.lon_range = lon_range
self.lat_range = lat_range
self.instrument_types = instrument_types
self.statistics = statistics
self.datalevel = datalevel
[docs]
def update(self, **kwargs):
for k, v in kwargs.items():
if k in self:
self[k] = v
else:
logger.warning(f"Unknown EBAS SQL request key {k} (value {v})")
@staticmethod
def _var2sql(var):
if isinstance(var, list):
if len(var) > 1:
var = tuple(var)
else:
var = var[0]
if isinstance(var, tuple):
return f"{var}"
elif isinstance(var, str):
return f"('{var}')"
raise ValueError("Invalid value...")
[docs]
def make_file_query_str(self, distinct=True, **kwargs):
"""Wrapper for base method :func:`make_query_str`
Parameters
----------
distinct : bool
return unique files
**kwargs
update request attributes (e.g. ``lon_range=(30, 60)``)
Returns
-------
str
SQL file request command for current specs
"""
query = self.make_query_str(distinct=distinct, **kwargs)
# add an extsion to get only files that have no fraction variables in them
query = query.replace(
";",
" and not exists (select * from characteristic where var_id=variable.var_id and ct_type='Fraction');",
)
return query
[docs]
def make_query_str(self, what=None, distinct=True, **kwargs):
"""Translate current class state into SQL query command string
Parameters
----------
what : str or tuple, optional
what columns to retrieve (e.g. comp_name for all variables) from
table specified. Defaults to None, in which case "filename" is used
distinct : bool
return unique files
**kwargs
update request attributes (e.g. ``lon_range=(30, 60)``)
Returns
-------
str
SQL file request command for current specs
"""
self.update(**kwargs)
if what is None:
what = "filename"
elif not isinstance(what, str): # tuple or list of parameters to be retrieved
what = ",".join(what)
if distinct:
req = f"select distinct {what} from variable"
else:
req = f"select {what} from variable"
req += " join station on station.station_code=variable.station_code"
add_cond = 0
# add constraints from station table
conv = self._var2sql
if self.station_names is not None:
req += f" where station_name in {conv(self.station_names)}"
add_cond += 1
if self.altitude_range is not None:
low, high = self.altitude_range
req += " and " if add_cond else " where "
req += f"station_altitude>{low} and station_altitude<{high}"
add_cond += 1
if self.lon_range is not None:
l, r = self.lon_range
req += " and " if add_cond else " where "
req += f"station_longitude>{l} and station_longitude<{r}"
add_cond += 1
if self.lat_range is not None:
s, n = self.lat_range
req += " and " if add_cond else " where "
req += f"station_latitude>{s} and station_latitude<{n}"
add_cond += 1
if self.instrument_types is not None:
req += " and " if add_cond else " where "
req += f"instr_type in {conv(self.instrument_types)}"
add_cond += 1
# add constraints from variable table
if self.variables is not None:
req += " and " if add_cond else " where "
req += f"comp_name in {conv(self.variables)}"
add_cond += 1
if self.stop_date is not None:
req += " and " if add_cond else " where "
req += f"first_end < '{self.stop_date}'"
add_cond += 1
if self.start_date is not None:
req += " and " if add_cond else " where "
req += f"last_start > '{self.start_date}'"
add_cond += 1
if self.matrices is not None:
req += " and " if add_cond else " where "
req += f"matrix in {conv(self.matrices)}"
add_cond += 1
if self.statistics is not None:
req += " and " if add_cond else " where "
req += f"statistics in {conv(self.statistics)}"
add_cond += 1
if self.datalevel is not None:
req += " and " if add_cond else " where "
req += f"datalevel={self.datalevel}"
add_cond += 1
return req + ";"
def __str__(self):
head = f"Pyaerocom {type(self).__name__}"
s = f"\n{head}\n{len(head) * '-'}"
for k, v in self.items():
s += f"\n{k}: {v}"
s += f"\nFilename request string:\n{self.make_file_query_str()}"
return s
[docs]
class EbasFileIndex:
"""EBAS SQLite I/O interface
Takes care of connection to database and execution of requests
"""
def __init__(self, database=None):
self._database = database
@property
def database(self):
"""Path to ebas_file_index.sqlite3 file"""
db = self._database
if db is None or not os.path.exists(db):
raise AttributeError(
"EBAS SQLite database file could not be "
"located but is needed in EbasFileIndex class"
)
return db
@property
def ALL_STATION_NAMES(self):
"""List of all available station names in database"""
names = self.execute_request("select distinct station_name from station")
return [x[0] for x in names]
@property
def ALL_STATION_CODES(self):
"""List of all available station codes in database
Note
----
Not tested whether the order is the same as the order in
:attr:`STATION_NAMES`, i.e. the lists should not be linked to each
other
"""
names = self.execute_request("select distinct station_code from station")
return [x[0] for x in names]
@property
def ALL_STATISTICS_PARAMS(self):
"""List of all statistical parameters available
For more info see `here <https://ebas-submit.nilu.no/Submit-Data/
Data-Reporting/Comments/Generic-metadata-comments/Statistics>`__
"""
names = self.execute_request("select distinct statistics from variable")
return [x[0] for x in names]
@property
def ALL_VARIABLES(self):
"""List of all variables available"""
names = self.execute_request("select distinct comp_name from variable")
return [x[0] for x in names]
@property
def ALL_MATRICES(self):
"""List of all matrix values available"""
names = self.execute_request("select distinct matrix from variable")
return [x[0] for x in names]
@property
def ALL_INSTRUMENTS(self):
"""List of all variables available"""
names = self.execute_request("select distinct instr_type from variable")
return [x[0] for x in names]
[docs]
def get_table_names(self):
"""Get all table names in SQLite database file"""
return [
x[0]
for x in self.execute_request("SELECT name FROM sqlite_master WHERE type='table';")
]
[docs]
def get_table_columns(self, table_name):
"""Get all columns of a table in SQLite database file"""
req = f"select * from {table_name} where 1=0;"
with sqlite3.connect(self.database) as con:
cur = con.cursor()
cur.execute(req)
return [f[0] for f in cur.description]
[docs]
def execute_request(self, request, file_request=False):
"""Connect to database and retrieve data for input request
Parameters
----------
request : :obj:`EbasSQLRequest` or :obj:`str`
request specifications
Returns
-------
list
list of tuples containing the retrieved results. The number of
items in each tuple corresponds to the number of requested
parameters (usually one, can be specified in
:func:`make_query_str` using argument ``what``)
"""
if isinstance(request, str):
sql_str = request
elif isinstance(request, EbasSQLRequest):
if not file_request:
sql_str = request.make_query_str()
else:
sql_str = request.make_file_query_str()
else:
raise ValueError(f"Unsupported request type {type(request)}")
with sqlite3.connect(self.database) as con:
cur = con.cursor()
cur.execute(sql_str)
return [f for f in cur.fetchall()]
[docs]
def get_file_names(self, request):
"""Get all files that match the request specifications
Parameters
----------
request : :obj:`EbasSQLRequest` or :obj:`str`
request specifications
Returns
-------
list
list of file paths that match the request
"""
return [f[0] for f in self.execute_request(request, file_request=True)]