"""Hidden utilities for HYDROTEL and RavenPy models."""
import datetime as dt
from pathlib import Path
from typing import Literal
import numpy as np
import pandas as pd
import xarray as xr
import xscen as xs
import yaml
from xclim.core.units import convert_units_to
with Path(Path(__file__).parent / "variables.yml").open() as f:
VARIABLES = yaml.safe_load(f)
[docs]
def standardize_output(ds, spatial_info: pd.DataFrame | None = None, alt_names: dict[str, str] | None = None) -> xr.Dataset: # noqa: C901
"""
Standardize the output dataset by renaming dimensions and variables, adding relevant coordinates, and correcting attributes.
Parameters
----------
ds : xr.Dataset
The dataset to standardize.
spatial_info : pd.DataFrame | None, optional
A dataframe containing the spatial information of the model (RavenpyModel.hru["hru"] or Hydrotel.rhhu).
alt_names : dict[str, str] | None, optional
A dictionary mapping original variable names to their standardized names.
Returns
-------
xr.Dataset
The standardized dataset.
"""
if alt_names is not None:
ds = ds.rename({k: v for k, v in alt_names.items() if k in ds})
if spatial_info is not None:
spatial_info = spatial_info.rename(columns={k: v for k, v in alt_names.items() if k in spatial_info.columns})
ds = ds.squeeze()
if len(set(ds.dims).difference({"time"})) == 0:
spatial_dim = None
controlled_spatial = None
else:
# Standardize the spatial dimensions
ordered_dims = [
"subbasin_id",
"unit_id",
]
original_dim = [ds[d].dims[0] for d in ordered_dims if d in ds]
original_dim = original_dim[0] if len(original_dim) > 0 else "not_applicable"
# Find the correct spatial dimension
spatial_dim = [d for d in ordered_dims if d in ds]
if len(spatial_dim) > 1:
raise ValueError("The dataset has multiple spatial dimensions.")
elif len(spatial_dim) == 1:
spatial_dim = spatial_dim[0]
else:
raise ValueError(
"The dataset has a spatial dimension that is not recognized. Please provide an `alt_names` "
"dictionary to rename it to a standard name."
)
if spatial_dim != original_dim:
ds = ds.swap_dims({original_dim: spatial_dim}).drop_vars(original_dim, errors="ignore")
# Datasets are exactly the same between Subbasin and Basin levels, so we need to make an educated guess
is_subbasin = any("Subbasin" in ds[v].attrs.get("long_name", "") for v in ds.data_vars) or any(v in ds.data_vars for v in ["apport_lateral"])
controlled_spatial = (
"ComputationalUnit" if spatial_dim != "subbasin_id" else spatial_dim.replace("_id", "").capitalize() if is_subbasin else "DrainageArea"
)
# Since Raven v4.1, 'basin_name' starting with 'sub_' has been renamed to 'basin_fullname', while a new 'basin_name' variable
# without the prefix has been added. This new 'basin_fullname' is not required.
ds = ds.drop_vars("basin_fullname", errors="ignore")
# Ensure that all coordinates ending with "_id" are of type string
for c in [cc for cc in ds.coords if cc.endswith("_id")]:
ds[c] = ds[c].astype(str)
if spatial_info is not None:
for c in [cc for cc in spatial_info.columns if cc.endswith("_id")]:
spatial_info[c] = spatial_info[c].astype(str)
# Add relevant coordinates if available.
model = "raven" if "Raven_version" in ds.attrs else "hydrotel" if "HYDROTEL_version" in ds.attrs else "unknown"
if spatial_dim is not None and spatial_info is not None:
df = spatial_info.drop_duplicates(spatial_dim).set_index(spatial_dim)
columns = [c for c in list(VARIABLES["coordinates"].keys()) if c in df.columns]
if spatial_dim == "subbasin_id":
if is_subbasin is False:
columns = [c for c in columns if not any(c.startswith(s) for s in ["unit", "subbasin"])]
else:
columns = [c for c in columns if not any(c.startswith(s) for s in ["unit"])]
for col in columns:
attrs = {
attr: VARIABLES["coordinates"][col].get(f"{attr}_{model.lower()}", VARIABLES["coordinates"][col].get(attr, "unknown"))
for attr in ["description", "long_name", "units"]
}
attrs = {k: v for k, v in attrs.items() if v != "unknown"}
coord = xr.DataArray(df[col], dims=[spatial_dim], coords={spatial_dim: df.index}, attrs=attrs)
# Special cases
if col.endswith("_id"):
coord = coord.astype(str)
if "area" in col and model == "raven":
coord.attrs["units"] = "m2"
coord = convert_units_to(coord, "km2")
coord[spatial_dim] = coord[spatial_dim].astype(str)
ds = ds.assign_coords({col: coord})
# Also correct the attributes of the spatial dimension
attrs = {
attr: VARIABLES["coordinates"][spatial_dim].get(f"{attr}_{model.lower()}", VARIABLES["coordinates"][spatial_dim].get(attr, "unknown"))
for attr in ["description", "long_name", "units"]
}
attrs = {k: v for k, v in attrs.items() if v != "unknown"}
ds[spatial_dim].attrs = attrs
ds[spatial_dim].attrs["cf_role"] = "timeseries_id"
elif spatial_dim is None and spatial_info is not None and len(spatial_info) == 1:
columns = [c for c in ["elevation", "drainage_area", "centroid_longitude", "centroid_latitude"] if c in spatial_info.columns]
for col in columns:
attrs = {
attr: VARIABLES["coordinates"][col].get(f"{attr}_{model.lower()}", VARIABLES["coordinates"][col].get(attr, "unknown"))
for attr in ["description", "long_name", "units"]
}
attrs = {k: v for k, v in attrs.items() if v != "unknown"}
coord = xr.DataArray(spatial_info[col].iloc[0], attrs=attrs)
ds = ds.assign_coords({col: coord})
# Manage the other variables
controlled_vars = [v for v in ds.data_vars if v in VARIABLES["variables"]]
for v in ds.data_vars:
if v in controlled_vars:
if ds[v].attrs.get("units", "unknown") != VARIABLES["variables"][v].get("canonical_units", "unknown"):
ds[v] = convert_units_to(ds[v], VARIABLES["variables"][v]["canonical_units"])
ds[v].attrs.update({k: v for k, v in VARIABLES["variables"][v].items() if k != "canonical_units"})
ds[v].attrs["long_name"] = ds[v].attrs.get("long_name", "").split(" By")[0]
if controlled_spatial is not None and spatial_dim in ds[v].dims:
ds[v].attrs["aggregation_level"] = controlled_spatial
# Since we squeezed the dataset and renamed the spatial dimension, it is preferable to clean the chunking information.
# Default chunking provided by the hydrological models is often not optimal anyway.
preferred_chunks = xs.io.estimate_chunks(ds, dims=ds.dims, target_mb=100)
if "time" in preferred_chunks and len(ds["time"]) > 3:
time_div = 365 if "D" in xr.infer_freq(ds["time"]) else 720 if "H" in xr.infer_freq(ds["time"]) else 1
preferred_chunks["time"] = np.min([np.max([int(np.round(preferred_chunks["time"] / time_div) * time_div), time_div]), len(ds["time"])])
for v in ds.data_vars:
preferred = {d: preferred_chunks[d] for d in ds[v].dims}
ds[v] = ds[v].chunk(preferred)
ds[v].encoding["chunksizes"] = tuple(preferred[d] if d in preferred else ds[d].shape[0] for d in ds[v].dims)
ds[v].encoding.pop("chunks", None)
ds[v].encoding["preferred_chunks"] = preferred
# Also fix a few more things in the encoding
ds[v].encoding["zlib"] = True
ds[v].encoding["complevel"] = 6
ds[v] = ds[v].astype("float32")
ds[v].encoding["dtype"] = "float32"
return ds
[docs]
def aggregate_output( # noqa: C901
ds: xr.Dataset, by: Literal["hru", "rhhu", "unit", "subbasin"], to: Literal["subbasin", "drainage_area"], weights: xr.DataArray | None = None
) -> tuple[xr.Dataset, xr.DataArray]:
"""
Aggregate the model outputs to a different spatial unit. See the Notes section for more details.
Parameters
----------
ds : xr.Dataset
The dataset to aggregate. The 'standardize_outputs' method must have been called on this dataset beforehand.
by : {"hru", "rhhu", "unit", "subbasin"}
The spatial unit to aggregate from.
"unit" is the generic term for either "hru" or "rhhu", depending on the hydrological model used.
to : {"subbasin", "drainage_area"}
The spatial unit to aggregate to.
weights : xr.DataArray, optional
The weights to use for the aggregation. If None, the method will compute them based on the drainage area of the units.
Returns
-------
xr.Dataset
The aggregated dataset.
xr.DataArray
The weights used for the aggregation.
"""
if by.lower() == to.lower():
raise ValueError("Invalid aggregation levels.")
if by in ["hru", "rhhu"]:
by = "unit"
to_dim = f"{to}_id" if to != "drainage_area" else "subbasin_id"
# Load the coordinates
[ds[c].load() for c in ds.coords]
# Prepare the chunking information for the output dataset, based on the input
chunks_out = None
if ds.chunks is not None:
chunks_out = {d.replace(f"{by}_id", f"{to_dim}"): ds.chunks[d][0] for d in ds.dims if d in ds.chunks}
if "subbasin_id" not in ds.coords:
raise ValueError("The `standardize_outputs` method must be called before using the `aggregate_outputs` method.")
# Prepare the weights
new_dim = ds["subbasin_id"].swap_dims({f"{by}_id": "subbasin_id"}).drop_duplicates("subbasin_id").rename({"subbasin_id": "sbid"})
if weights is None:
if to == "subbasin":
weights = ds["subbasin_id"].expand_dims({"sbid": new_dim})
weights = xr.where(weights == weights["sbid"], ds["unit_drainage_area"].expand_dims({"sbid": new_dim}), 0)
elif to == "drainage_area":
weights = xr.zeros_like(ds[f"{by}_id"]).expand_dims({"sbid": new_dim}).astype(float)
weights = xr.where(
weights["sbid"] == ds["subbasin_id"].expand_dims({"sbid": new_dim}), ds[f"{by}_drainage_area"].expand_dims({"sbid": new_dim}), weights
)
# Head subbasins
status = xr.where(~new_dim["sbid"].isin(ds["dowsub_id"]), 1, 0)
# Recursively find the upstream subbasins for each subbasin, starting from the head subbasins and moving downstream
while status.min() != 2:
for idx in status["sbid"].where(status == 1, drop=True):
if idx.values not in (status["dowsub_id"].sel(sbid=status["sbid"].where(status <= 1, drop=True)).values):
weights.loc[(weights["sbid"] == status["dowsub_id"].sel(sbid=idx))] += weights.sel(sbid=idx.values).values
status = xr.where(status["sbid"] == idx, 2, status)
status = xr.where(status["sbid"] == status["dowsub_id"].sel(sbid=idx), 1, status)
# Perform the aggregation
ds_agg = ds.expand_dims({"sbid": new_dim}).weighted(weights).mean(dim=f"{by}_id")
ds_agg = ds_agg.rename({"sbid": "subbasin_id"})
# Re-add the coordinates and attributes that were lost during the aggregation
ds_for_coords = ds.swap_dims({f"{by}_id": "subbasin_id"}).drop_duplicates("subbasin_id")
prefixes_to_drop = ["unit", "subbasin"] if to == "drainage_area" else ["unit"]
ds_for_coords = ds_for_coords.drop_vars(
[c for c in ds_for_coords.coords if any(prefix in c for prefix in prefixes_to_drop if c != "subbasin_id")]
)
ds_agg = ds_agg.assign_coords({c: ds_for_coords[c] for c in ds_for_coords.coords if not c.startswith(by.lower()) and c not in ds_agg.coords})
for v in ds_agg.data_vars:
ds_agg[v].attrs["aggregation_level"] = "".join([w.capitalize() for w in to.split("_")])
ds_agg[v].attrs["history"] = (
ds_agg[v].attrs.get("history", "")
+ f"{dt.datetime.now().isoformat()}: Aggregated from {by.capitalize()} to {''.join([w.capitalize() for w in to.split('_')])} level."
)
try:
ds_agg = ds_agg.sortby(ds_agg[f"{to_dim}"].astype(int))
except TypeError:
ds_agg = ds_agg.sortby(ds_agg[f"{to_dim}"])
ds_agg = ds_agg.transpose("time", f"{to_dim}", ...)
# Clean the chunking information after the aggregation
if chunks_out is not None:
ds_agg = ds_agg.chunk(chunks_out)
for v in ds_agg.data_vars:
ds_agg[v].encoding.pop("chunks", None)
ds_agg[v].encoding.pop("preferred_chunks", None)
ds_agg[v].encoding.pop("chunksizes", None)
return ds_agg, weights