Source code for xhydro.modelling._model_utils

"""Hidden utilities for HYDROTEL and RavenPy models."""

import datetime as dt
from pathlib import Path
from typing import Literal

import numpy as np
import pandas as pd
import xarray as xr
import xscen as xs
import yaml
from xclim.core.units import convert_units_to


with Path(Path(__file__).parent / "variables.yml").open() as f:
    VARIABLES = yaml.safe_load(f)


[docs] def standardize_output(ds, spatial_info: pd.DataFrame | None = None, alt_names: dict[str, str] | None = None) -> xr.Dataset: # noqa: C901 """ Standardize the output dataset by renaming dimensions and variables, adding relevant coordinates, and correcting attributes. Parameters ---------- ds : xr.Dataset The dataset to standardize. spatial_info : pd.DataFrame | None, optional A dataframe containing the spatial information of the model (RavenpyModel.hru["hru"] or Hydrotel.rhhu). alt_names : dict[str, str] | None, optional A dictionary mapping original variable names to their standardized names. Returns ------- xr.Dataset The standardized dataset. """ if alt_names is not None: ds = ds.rename({k: v for k, v in alt_names.items() if k in ds}) if spatial_info is not None: spatial_info = spatial_info.rename(columns={k: v for k, v in alt_names.items() if k in spatial_info.columns}) ds = ds.squeeze() if len(set(ds.dims).difference({"time"})) == 0: spatial_dim = None controlled_spatial = None else: # Standardize the spatial dimensions ordered_dims = [ "subbasin_id", "unit_id", ] original_dim = [ds[d].dims[0] for d in ordered_dims if d in ds] original_dim = original_dim[0] if len(original_dim) > 0 else "not_applicable" # Find the correct spatial dimension spatial_dim = [d for d in ordered_dims if d in ds] if len(spatial_dim) > 1: raise ValueError("The dataset has multiple spatial dimensions.") elif len(spatial_dim) == 1: spatial_dim = spatial_dim[0] else: raise ValueError( "The dataset has a spatial dimension that is not recognized. Please provide an `alt_names` " "dictionary to rename it to a standard name." ) if spatial_dim != original_dim: ds = ds.swap_dims({original_dim: spatial_dim}).drop_vars(original_dim, errors="ignore") # Datasets are exactly the same between Subbasin and Basin levels, so we need to make an educated guess is_subbasin = any("Subbasin" in ds[v].attrs.get("long_name", "") for v in ds.data_vars) or any(v in ds.data_vars for v in ["apport_lateral"]) controlled_spatial = ( "ComputationalUnit" if spatial_dim != "subbasin_id" else spatial_dim.replace("_id", "").capitalize() if is_subbasin else "DrainageArea" ) # Since Raven v4.1, 'basin_name' starting with 'sub_' has been renamed to 'basin_fullname', while a new 'basin_name' variable # without the prefix has been added. This new 'basin_fullname' is not required. ds = ds.drop_vars("basin_fullname", errors="ignore") # Ensure that all coordinates ending with "_id" are of type string for c in [cc for cc in ds.coords if cc.endswith("_id")]: ds[c] = ds[c].astype(str) if spatial_info is not None: for c in [cc for cc in spatial_info.columns if cc.endswith("_id")]: spatial_info[c] = spatial_info[c].astype(str) # Add relevant coordinates if available. model = "raven" if "Raven_version" in ds.attrs else "hydrotel" if "HYDROTEL_version" in ds.attrs else "unknown" if spatial_dim is not None and spatial_info is not None: df = spatial_info.drop_duplicates(spatial_dim).set_index(spatial_dim) columns = [c for c in list(VARIABLES["coordinates"].keys()) if c in df.columns] if spatial_dim == "subbasin_id": if is_subbasin is False: columns = [c for c in columns if not any(c.startswith(s) for s in ["unit", "subbasin"])] else: columns = [c for c in columns if not any(c.startswith(s) for s in ["unit"])] for col in columns: attrs = { attr: VARIABLES["coordinates"][col].get(f"{attr}_{model.lower()}", VARIABLES["coordinates"][col].get(attr, "unknown")) for attr in ["description", "long_name", "units"] } attrs = {k: v for k, v in attrs.items() if v != "unknown"} coord = xr.DataArray(df[col], dims=[spatial_dim], coords={spatial_dim: df.index}, attrs=attrs) # Special cases if col.endswith("_id"): coord = coord.astype(str) if "area" in col and model == "raven": coord.attrs["units"] = "m2" coord = convert_units_to(coord, "km2") coord[spatial_dim] = coord[spatial_dim].astype(str) ds = ds.assign_coords({col: coord}) # Also correct the attributes of the spatial dimension attrs = { attr: VARIABLES["coordinates"][spatial_dim].get(f"{attr}_{model.lower()}", VARIABLES["coordinates"][spatial_dim].get(attr, "unknown")) for attr in ["description", "long_name", "units"] } attrs = {k: v for k, v in attrs.items() if v != "unknown"} ds[spatial_dim].attrs = attrs ds[spatial_dim].attrs["cf_role"] = "timeseries_id" elif spatial_dim is None and spatial_info is not None and len(spatial_info) == 1: columns = [c for c in ["elevation", "drainage_area", "centroid_longitude", "centroid_latitude"] if c in spatial_info.columns] for col in columns: attrs = { attr: VARIABLES["coordinates"][col].get(f"{attr}_{model.lower()}", VARIABLES["coordinates"][col].get(attr, "unknown")) for attr in ["description", "long_name", "units"] } attrs = {k: v for k, v in attrs.items() if v != "unknown"} coord = xr.DataArray(spatial_info[col].iloc[0], attrs=attrs) ds = ds.assign_coords({col: coord}) # Manage the other variables controlled_vars = [v for v in ds.data_vars if v in VARIABLES["variables"]] for v in ds.data_vars: if v in controlled_vars: if ds[v].attrs.get("units", "unknown") != VARIABLES["variables"][v].get("canonical_units", "unknown"): ds[v] = convert_units_to(ds[v], VARIABLES["variables"][v]["canonical_units"]) ds[v].attrs.update({k: v for k, v in VARIABLES["variables"][v].items() if k != "canonical_units"}) ds[v].attrs["long_name"] = ds[v].attrs.get("long_name", "").split(" By")[0] if controlled_spatial is not None and spatial_dim in ds[v].dims: ds[v].attrs["aggregation_level"] = controlled_spatial # Since we squeezed the dataset and renamed the spatial dimension, it is preferable to clean the chunking information. # Default chunking provided by the hydrological models is often not optimal anyway. preferred_chunks = xs.io.estimate_chunks(ds, dims=ds.dims, target_mb=100) if "time" in preferred_chunks and len(ds["time"]) > 3: time_div = 365 if "D" in xr.infer_freq(ds["time"]) else 720 if "H" in xr.infer_freq(ds["time"]) else 1 preferred_chunks["time"] = np.min([np.max([int(np.round(preferred_chunks["time"] / time_div) * time_div), time_div]), len(ds["time"])]) for v in ds.data_vars: preferred = {d: preferred_chunks[d] for d in ds[v].dims} ds[v] = ds[v].chunk(preferred) ds[v].encoding["chunksizes"] = tuple(preferred[d] if d in preferred else ds[d].shape[0] for d in ds[v].dims) ds[v].encoding.pop("chunks", None) ds[v].encoding["preferred_chunks"] = preferred # Also fix a few more things in the encoding ds[v].encoding["zlib"] = True ds[v].encoding["complevel"] = 6 ds[v] = ds[v].astype("float32") ds[v].encoding["dtype"] = "float32" return ds
[docs] def aggregate_output( # noqa: C901 ds: xr.Dataset, by: Literal["hru", "rhhu", "unit", "subbasin"], to: Literal["subbasin", "drainage_area"], weights: xr.DataArray | None = None ) -> tuple[xr.Dataset, xr.DataArray]: """ Aggregate the model outputs to a different spatial unit. See the Notes section for more details. Parameters ---------- ds : xr.Dataset The dataset to aggregate. The 'standardize_outputs' method must have been called on this dataset beforehand. by : {"hru", "rhhu", "unit", "subbasin"} The spatial unit to aggregate from. "unit" is the generic term for either "hru" or "rhhu", depending on the hydrological model used. to : {"subbasin", "drainage_area"} The spatial unit to aggregate to. weights : xr.DataArray, optional The weights to use for the aggregation. If None, the method will compute them based on the drainage area of the units. Returns ------- xr.Dataset The aggregated dataset. xr.DataArray The weights used for the aggregation. """ if by.lower() == to.lower(): raise ValueError("Invalid aggregation levels.") if by in ["hru", "rhhu"]: by = "unit" to_dim = f"{to}_id" if to != "drainage_area" else "subbasin_id" # Load the coordinates [ds[c].load() for c in ds.coords] # Prepare the chunking information for the output dataset, based on the input chunks_out = None if ds.chunks is not None: chunks_out = {d.replace(f"{by}_id", f"{to_dim}"): ds.chunks[d][0] for d in ds.dims if d in ds.chunks} if "subbasin_id" not in ds.coords: raise ValueError("The `standardize_outputs` method must be called before using the `aggregate_outputs` method.") # Prepare the weights new_dim = ds["subbasin_id"].swap_dims({f"{by}_id": "subbasin_id"}).drop_duplicates("subbasin_id").rename({"subbasin_id": "sbid"}) if weights is None: if to == "subbasin": weights = ds["subbasin_id"].expand_dims({"sbid": new_dim}) weights = xr.where(weights == weights["sbid"], ds["unit_drainage_area"].expand_dims({"sbid": new_dim}), 0) elif to == "drainage_area": weights = xr.zeros_like(ds[f"{by}_id"]).expand_dims({"sbid": new_dim}).astype(float) weights = xr.where( weights["sbid"] == ds["subbasin_id"].expand_dims({"sbid": new_dim}), ds[f"{by}_drainage_area"].expand_dims({"sbid": new_dim}), weights ) # Head subbasins status = xr.where(~new_dim["sbid"].isin(ds["dowsub_id"]), 1, 0) # Recursively find the upstream subbasins for each subbasin, starting from the head subbasins and moving downstream while status.min() != 2: for idx in status["sbid"].where(status == 1, drop=True): if idx.values not in (status["dowsub_id"].sel(sbid=status["sbid"].where(status <= 1, drop=True)).values): weights.loc[(weights["sbid"] == status["dowsub_id"].sel(sbid=idx))] += weights.sel(sbid=idx.values).values status = xr.where(status["sbid"] == idx, 2, status) status = xr.where(status["sbid"] == status["dowsub_id"].sel(sbid=idx), 1, status) # Perform the aggregation ds_agg = ds.expand_dims({"sbid": new_dim}).weighted(weights).mean(dim=f"{by}_id") ds_agg = ds_agg.rename({"sbid": "subbasin_id"}) # Re-add the coordinates and attributes that were lost during the aggregation ds_for_coords = ds.swap_dims({f"{by}_id": "subbasin_id"}).drop_duplicates("subbasin_id") prefixes_to_drop = ["unit", "subbasin"] if to == "drainage_area" else ["unit"] ds_for_coords = ds_for_coords.drop_vars( [c for c in ds_for_coords.coords if any(prefix in c for prefix in prefixes_to_drop if c != "subbasin_id")] ) ds_agg = ds_agg.assign_coords({c: ds_for_coords[c] for c in ds_for_coords.coords if not c.startswith(by.lower()) and c not in ds_agg.coords}) for v in ds_agg.data_vars: ds_agg[v].attrs["aggregation_level"] = "".join([w.capitalize() for w in to.split("_")]) ds_agg[v].attrs["history"] = ( ds_agg[v].attrs.get("history", "") + f"{dt.datetime.now().isoformat()}: Aggregated from {by.capitalize()} to {''.join([w.capitalize() for w in to.split('_')])} level." ) try: ds_agg = ds_agg.sortby(ds_agg[f"{to_dim}"].astype(int)) except TypeError: ds_agg = ds_agg.sortby(ds_agg[f"{to_dim}"]) ds_agg = ds_agg.transpose("time", f"{to_dim}", ...) # Clean the chunking information after the aggregation if chunks_out is not None: ds_agg = ds_agg.chunk(chunks_out) for v in ds_agg.data_vars: ds_agg[v].encoding.pop("chunks", None) ds_agg[v].encoding.pop("preferred_chunks", None) ds_agg[v].encoding.pop("chunksizes", None) return ds_agg, weights