"""Implement the ravenpy handler class for emulating raven models in ravenpy."""
import datetime as dt
import logging
import os
import shutil
import subprocess # noqa: S404
import tempfile
import warnings
from copy import deepcopy
from pathlib import Path
from typing import Literal
import geopandas as gpd
import numpy as np
import pandas as pd
import xarray as xr
from xclim.core.units import convert_units_to
try:
import ravenpy
import ravenpy.config as rc
import ravenpy.extractors
run = ravenpy.ravenpy.run
ravenpy_err_msg = None
except (ImportError, RuntimeError) as e:
run = None
ravenpy_err_msg = e
from ._hm import HydrologicalModel
from ._model_utils import aggregate_output, standardize_output
logger = logging.getLogger(__name__)
__all__ = ["RavenpyModel"]
[docs]
class RavenpyModel(HydrologicalModel):
r"""
Initialize the RavenPy model class.
Parameters
----------
overwrite : bool
If True, overwrite the existing project files. Default is False.
workdir : str | Path | None
Path to save the .rv files and model outputs. Default is None, which creates a temporary directory.
executable : str | os.PathLike | None, optional
Path to the Raven executable, bypassing RavenPy.
If None (default), the Raven executable from your current Python environment ('raven-hydro') will be used.
run_name : str, optional
Name of the run, which will be used to name the project files. Defaults to "raven" if not provided.
model_name : {"Blended", "GR4JCN", "HBVEC", "HMETS", "HYPR", "Mohyse", "SACSMA"}, optional
The name of the RavenPy model to run. Only optional if the project files already exist.
start_date : dt.datetime | str, optional
The first date of the simulation. Only optional if the project files already exist.
end_date : dt.datetime | str, optional
The last date of the simulation. Only optional if the project files already exist.
parameters : np.ndarray | list[float], optional
The model parameters for simulation or calibration. Only optional if the project files already exist.
qobs_file : str | Path, optional
Path to the file containing the observed streamflow data.
If there are multiple stations, the file should contain a 'basin_id' variable that identifies the subbasin for each time series.
If a 'station_id' variable is present, it will be used to identify the station.
alt_name_flow : str, optional
Name of the streamflow variable in the observed data file. If not provided, it will be assumed to be "q".
hru : gpd.GeoDataFrame | dict | os.PathLike, optional
A GeoDataFrame, or dictionary containing the HRU properties. Only optional if the project files already exist.
For distributed models, it should be readable by ravenpy.extractors.BasinMakerExtractor.
For lumped models, should contain the following variables:
- area: The watershed drainage area, in km².
- elevation: The elevation of the watershed, in meters.
- latitude: The latitude of the watershed centroid.
- longitude: The longitude of the watershed centroid.
- HRU_ID: The ID of the HRU (required for gridded data, optional for station data).
If the meteorological data is gridded, the HRU dataset must also contain a SubId, DowSubId, valid geometry and crs.
If the input is modified, a new shapefile will be created in the workdir/weights subdirectory.
output_subbasins : {"all", "qobs"} | list[int] | None, optional
If "all", all subbasins will be outputted. If "qobs", only the subbasins with observed flow will be outputted.
Leave as None to use the value as defined in the HRU file ('Has_Gauge' column). Only applicable for distributed HBVEC models.
minimum_reservoir_area : str, optional
Quantified string (e.g. "20 km2") representing the minimum lake area to consider the lake explicitly as a reservoir.
If not provided, all lakes with the 'HRU_IsLake' column set to 1 in the HRU file will be considered as reservoirs.
Note that 'reservoirs' in Raven can also refer to natural lakes with weir-like outflows.
Only applicable for distributed HBVEC models.
meteo_file : str | Path, optional
Path to the file containing the observed meteorological data. Only optional if the project files already exist.
The meteorological data can be either station or gridded data. Use the 'xhydro.modelling.format_input' function to ensure the data
is in the correct format. Unless the input is a single station accompanied by 'meteo_station_properties', the file should contain
the following coordinates:
- elevation: The elevation of the station / grid cell, in meters.
- latitude: The latitude of the station / grid cell centroid.
- longitude: The longitude of the station / grid cell centroid.
data_type : list[str], optional
The list of types of data provided to Raven in the meteorological file. Only optional if the project files already exist.
See https://github.com/CSHS-CWRA/RavenPy/blob/master/src/ravenpy/config/conventions.py for the list of available types.
alt_names_meteo : dict, optional
A dictionary that allows users to link the names of meteorological variables in their dataset to Raven-compliant names.
The keys should be the Raven names as listed in the data_type parameter.
meteo_station_properties : dict, optional
Additional properties of the weather stations providing the meteorological data. Only required if absent from the 'meteo_file'.
For single stations, the format is {"ALL": {"elevation": elevation, "latitude": latitude, "longitude": longitude}}.
This has not been tested for multiple stations or gridded data.
gridweights : str | Path | None
If using gridded meteorological data, path to a text file containing the weights linking the grid cells to the HRUs.
If None, the weights will be computed using ravenpy.extractors.GridWeightExtractor and saved in a 'weights' subdirectory
of the project folder, using a "{meteo_file}_vs_{hru_file}_weights.txt" pattern.
\*\*kwargs : dict, optional
Additional parameters to pass to the RavenPy emulator, to modify the default modules used by a given hydrological model.
Typical entries include RainSnowFraction, Evaporation, GlobalParameters, etc.
See https://raven.uwaterloo.ca/Downloads.html for the latest Raven documentation. Currently, model templates are listed in Appendix F.
"""
def __init__(
self,
overwrite: bool = False,
*,
workdir: str | os.PathLike | None = None,
executable: str | os.PathLike | None = None,
run_name: str | None = None,
model_name: (Literal["Blended", "GR4JCN", "HBVEC", "HMETS", "HYPR", "Mohyse", "SACSMA"] | None) = None,
start_date: dt.datetime | str | None = None,
end_date: dt.datetime | str | None = None,
parameters: np.ndarray | list[float] | None = None,
qobs_file: os.PathLike | str | None = None,
alt_name_flow: str | None = "q",
hru: gpd.GeoDataFrame | dict | os.PathLike | str | None = None,
output_subbasins: Literal["all", "qobs"] | list[int] | None = None,
minimum_reservoir_area: str | None = None,
meteo_file: os.PathLike | str | None = None,
data_type: list[str] | None = None,
alt_names_meteo: dict | None = None,
meteo_station_properties: dict | None = None,
gridweights: str | os.PathLike | None = None,
**kwargs,
):
"""Initialize the RavenPy model class."""
# Set a few variables
if workdir is None:
self.workdir = Path(tempfile.mkdtemp(prefix="raven-hydro_"))
else:
self.workdir = Path(workdir)
self.workdir.mkdir(parents=True, exist_ok=True)
self.executable = executable
self.model_name = model_name
self.run_name = run_name or "raven"
self.parameters = parameters
self.start_date = start_date
self.end_date = end_date
self.kwargs = deepcopy(kwargs)
# Updated when the project files are created and never really changed afterwards,
# but useful to have it here to allow the user to access it.
self.emulator_config = None
self.qobs = None
self.hru = None
self.meteo = None
if any(opt is not None for opt in [qobs_file, hru, meteo_file]):
self.update_data(
qobs_file=qobs_file,
alt_name_flow=alt_name_flow,
hru=hru,
output_subbasins=output_subbasins,
minimum_reservoir_area=minimum_reservoir_area,
meteo_file=meteo_file,
data_type=data_type,
alt_names_meteo=alt_names_meteo,
meteo_station_properties=meteo_station_properties,
gridweights=gridweights,
)
# Check for existing project files
files = [f for f in self.workdir.rglob(f"{self.run_name}.rv*")]
# If the project files already exist and overwrite is False, we stop here
if len(files) > 0 and not overwrite:
if any(opt is not None for opt in [self.meteo, self.hru, self.qobs]):
warnings.warn(
"Meteorological data, HRU, or observed streamflow data were provided, but the project files already exist "
"and 'overwrite' is set to False. These data will not be used. If you want to update the "
"project files, use the 'update_config' method.",
stacklevel=2,
)
else:
logger.info("Project already exists and files will not be overwritten.")
return
# If the project files do not exist, we create them if the required data is provided
else:
if self.meteo is None or self.hru is None:
warnings.warn(
"The meteorological data and/or HRU are not provided. Project files will not be created. Please provide the missing data, "
"then call the 'create_rv' method to create the project files.",
stacklevel=2,
)
return
if len(files) > 0:
logger.info("Project already exists, but will be overwritten.")
# Create the project files
self.create_rv(
overwrite=overwrite,
)
[docs]
def create_rv(
self,
*,
overwrite: bool = False,
):
r"""
Write the RavenPy project files.
Parameters
----------
overwrite : bool
If True, overwrite the existing project files. Default is False.
Note that to prevent inconsistencies, all files containing the 'run_name' will be removed, including the output files.
"""
if run is None:
raise RuntimeError(
"RavenPy is not installed or not properly configured. The RavenpyModel.create_rv method cannot be used without it."
f" Original error: {ravenpy_err_msg}"
)
required = [
"model_name",
"run_name",
"start_date",
"end_date",
"parameters",
"meteo",
"hru",
]
missing = [input for input in required if getattr(self, input) is None]
if missing:
raise ValueError(f"The following required inputs are missing: {', '.join(missing)}")
# Remove any existing files in the project directory
if len([f for f in self.workdir.rglob(f"{self.run_name}.rv*")]) > 0:
if overwrite:
for file in self.workdir.rglob(f"{self.run_name}*.*"):
file.unlink()
else:
raise FileExistsError(f"Project {self.run_name} in {self.workdir} already exists, but 'overwrite' is set to False.")
kwargs = deepcopy(self.kwargs)
# Add the observed streamflow data if provided
if self.qobs is not None:
kwargs["ObservationData"] = self.qobs["rc"]
# Add the HRU properties
kwargs = kwargs | self.hru["keys"]
# Add the meteorological data
kwargs = kwargs | self.meteo["keys"]
# Create the emulator configuration
self.emulator_config = dict(
RunName=self.run_name,
params=self.parameters,
StartDate=self.start_date,
EndDate=self.end_date,
**kwargs,
)
model = getattr(rc.emulators, self.model_name)(
**self.emulator_config,
)
model.write_rv(workdir=self.workdir, overwrite=overwrite)
[docs]
def update_data(
self,
*,
qobs_file: os.PathLike | str | None = None,
alt_name_flow: str | None = "q",
hru: gpd.GeoDataFrame | dict | os.PathLike | str | None = None,
output_subbasins: Literal["all", "qobs"] | list[int] | None = None,
minimum_reservoir_area: str | None = None,
meteo_file: os.PathLike | str | None = None,
data_type: list[str] | None = None,
alt_names_meteo: dict | None = None,
meteo_station_properties: dict | None = None,
gridweights: str | os.PathLike | None = None,
):
"""
Update the model configuration with new observed data (self.qobs), HRU properties (self.hru), or meteorological data (self.meteo).
Parameters
----------
qobs_file : os.PathLike | str
Path to the NetCDF file containing the observed streamflow data.
If there are multiple stations, the file should contain a 'basin_id' variable that identifies the subbasin for each time series.
If a 'station_id' variable is present, it will be used to identify the station.
alt_name_flow : str, optional
Alternative name for the streamflow variable in the observed data.
hru : gpd.GeoDataFrame | dict | os.PathLike | str
A GeoDataFrame, or dictionary containing the HRU properties. Alternatively, a path to a shapefile containing the HRU properties.
For distributed models, it should be readable by ravenpy.extractors.BasinMakerExtractor.
For lumped models, should contain the following variables:
- area: The watershed drainage area, in km².
- elevation: The elevation of the watershed, in meters.
- latitude: The latitude of the watershed centroid.
- longitude: The longitude of the watershed centroid.
- HRU_ID: The ID of the HRU (required for gridded data, optional for station data).
If the meteorological data is gridded, the HRU dataset must also contain a SubId, DowSubId, valid geometry and crs.
If the input is modified, a new shapefile will be created in the workdir/weights subdirectory.
output_subbasins : {"all", "qobs"} | list[int] | None, optional
If "all", all subbasins will be outputted.
If "qobs", subbasins with observed flow will be outputted, as defined by the basin IDs in the observed streamflow data.
If a list of integers is provided, it should contain the basin IDs to output.
Leave as None to use the value as defined in the HRU file ('Has_Gauge' column).
minimum_reservoir_area : str, optional
Quantified string (e.g. "20 km2") representing the minimum lake area to consider the lake explicitly as a reservoir.
If not provided, all lakes with the 'HRU_IsLake' column set to 1 in the HRU file will be considered as reservoirs.
Note that 'reservoirs' in Raven can also refer to natural lakes with weir-like outflows.
Only applicable for distributed HBVEC models.
meteo_file : str | Path, optional
Path to the file containing the observed meteorological data. Only optional if the project files already exist.
The meteorological data can be either station or gridded data. Use the 'xhydro.modelling.format_input' function to ensure the data
is in the correct format. Unless the input is a single station accompanied by 'meteo_station_properties', the file should contain
the following coordinates:
- elevation: The elevation of the station / grid cell, in meters.
- latitude: The latitude of the station / grid cell centroid.
- longitude: The longitude of the station / grid cell centroid.
data_type : list[str], optional
The list of types of data provided to Raven in the meteorological file. Only optional if the project files already exist.
See https://github.com/CSHS-CWRA/RavenPy/blob/master/src/ravenpy/config/conventions.py for the list of available types.
alt_names_meteo : dict, optional
A dictionary that allows users to link the names of meteorological variables in their dataset to Raven-compliant names.
The keys should be the Raven names as listed in the data_type parameter.
meteo_station_properties : dict, optional
Additional properties of the weather stations providing the meteorological data. Only required if absent from the 'meteo_file'.
For single stations, the format is {"ALL": {"elevation": elevation, "latitude": latitude, "longitude": longitude}}.
This has not been tested for multiple stations or gridded data.
gridweights : str | Path | None
If using gridded meteorological data, path to a text file containing the weights linking the grid cells to the HRUs.
If None, the weights will be computed using ravenpy.extractors.GridWeightExtractor and saved in a 'weights' subdirectory
of the project folder, using a "{meteo_file}_vs_{hru_file}_weights.txt" pattern.
Notes
-----
If the meteorological data is gridded, new weights will be computed using the HRU file in the RavenpyModel instance and saved
in a 'weights' subdirectory of the project folder, under the name 'meteo-name_vs_hru-name.txt'.
"""
if run is None:
raise RuntimeError(
"RavenPy is not installed or not properly configured. The RavenpyModel.update_data method cannot be used without it."
f" Original error: {ravenpy_err_msg}"
)
if (any(opt is not None for opt in [output_subbasins, minimum_reservoir_area]) and hru is None) or (
any(opt is not None for opt in [data_type, alt_names_meteo, meteo_station_properties]) and meteo_file is None
):
raise ValueError("All relevant inputs must be provided if updating the HRU properties or meteorological data")
# Q_obs must be read first, as it might be used to update the subbasins to output
if qobs_file is not None:
self._read_qobs(qobs_file, alt_name_flow=alt_name_flow)
# HRU data must be read before the meteorological data, as it might be used to update the meteorological data weights
if hru is not None:
self._read_hru(
hru,
output_subbasins=output_subbasins,
minimum_reservoir_area=minimum_reservoir_area,
)
# Meteorological data must be last
if meteo_file is not None:
self._read_meteo(
meteo_file=meteo_file,
data_type=data_type,
alt_names_meteo=alt_names_meteo,
meteo_station_properties=meteo_station_properties,
gridweights=gridweights,
)
[docs]
def update_config( # noqa: C901
self,
*,
rvi_dates: bool = False,
rvi_commands: list[str] | None = None,
rvt: bool = False,
rvh: bool = False,
) -> None:
"""
Manually update some aspects of the configuration of the RavenPy model.
Parameters
----------
rvi_dates : bool
If True, update the .rvi file with the 'start_date' and 'end_date' defined in the model.
rvi_commands : list[str] | None
A list of commands to include in the .rvi file. If None, no additional commands will be added.
Warning: These commands will be added at the end of the .rvi file, with no checks. Use with caution.
rvt : bool
If True, update the .rvt file with the meteorological data and observed streamflow data defined in the model.
rvh : bool
If True, update the .rvh file with the list of subbasins to output. Nothing else will be changed in that file.
Notes
-----
Ideally, users should favor using the `update_data` method to update the model configuration, then call the `create_rv`
method to recreate the project files from scratch. This method assumes that the changes brought to the model configuration
are minimal, such as wanting to change the meteorological data or the simulation start and end dates.
Be aware that:
- The .rvh will be rewritten entirely. If multiple sources of data were mentioned, such as both meteorological and observed streamflow data,
all of them must be included in the RavenpyModel instance.
- If the meteorological data is gridded, new weights will be computed using the HRU file in the RavenpyModel instance. If that HRU
file is different from the one used to create the original .rvh file, it may lead to inconsistencies or errors.
- Similarly, only the list of subbasins to output will be modified in the new .rvh file. Any additional changes to the HRU or
other components might also lead to inconsistencies or errors.
A backup of the original files will be created before any modifications are made.
"""
# Update the .rvi file
if rvi_dates:
# Backup the existing .rvi file
shutil.copy(
self.workdir / f"{self.run_name}.rvi",
self.workdir / f"{self.run_name}_backup_{dt.datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}.rvi",
)
# Read the existing .rvi file and update the start date and end date
with (self.workdir / f"{self.run_name}.rvi").open("r") as file:
lines = file.readlines()
start_date = pd.to_datetime(self.start_date).strftime("%Y-%m-%d %H:%M:%S")
idx = [i for i, line in enumerate(lines) if line.startswith(":StartDate")][0]
lines[idx] = f":StartDate {start_date}\n"
end_date = pd.to_datetime(self.end_date).strftime("%Y-%m-%d %H:%M:%S")
idx = [i for i, line in enumerate(lines) if line.startswith(":EndDate")][0]
lines[idx] = f":EndDate {end_date}\n"
with (self.workdir / f"{self.run_name}.rvi").open("w") as file:
file.writelines(lines)
if rvi_commands is not None:
if not isinstance(rvi_commands, list):
raise ValueError("rvi_commands must be a list of strings.")
with (self.workdir / f"{self.run_name}.rvi").open("a") as file:
file.write("\n")
for command in rvi_commands:
file.write(command + "\n")
if rvt:
if all(data is None for data in [self.meteo, self.qobs]):
warnings.warn(
"Meteorological data and/or observed streamflow data were not provided. The .rvt file will not be updated.", stacklevel=2
)
else:
if run is None:
raise RuntimeError(
"RavenPy is not installed or not properly configured. The rvt file cannot currently be updated without it."
f" Original error: {ravenpy_err_msg}"
)
# Backup the existing .rvt file
shutil.copy(
self.workdir / f"{self.run_name}.rvt",
self.workdir / f"{self.run_name}_backup_{dt.datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}.rvt",
)
# Update the meteorological data and observed streamflow data in the .rvt file
rvt = rc.rvs.RVT(**(self.meteo["keys"] | ({"observation_data": self.qobs["rc"]} if self.qobs is not None else {})))
rvt_lines = rvt.to_rv() + "\n"
rvt_lines = rvt_lines.split("\n")
rvt_lines = [line + "\n" for line in rvt_lines]
# Read the existing .rvt file and replace the GriddedForcing or Gauge section
with (self.workdir / f"{self.run_name}.rvt").open("r") as file:
lines = file.readlines()
output_lines = []
for line in lines:
# If the line starts with :GriddedForcing, :Gauge, or :ObservationalData, replace everything after with the new rvt_lines
if line.startswith(":GriddedForcing") or line.startswith(":Gauge") or line.startswith(":ObservationalData"):
output_lines.extend(rvt_lines)
break
else:
output_lines.append(line)
# Overwrite the .rvt file with the new lines
with (self.workdir / f"{self.run_name}.rvt").open("w") as file:
file.writelines(output_lines)
if rvh:
# Get the requested output
output = self.hru.get("output_subbasins", None)
if output is None:
warnings.warn(
"Changes to the .rvh file were requested, but no output subbasins were defined in the HRU properties. "
"The .rvh file will not be updated.",
stacklevel=2,
)
else:
# Backup the existing .rvh file
shutil.copy(
self.workdir / f"{self.run_name}.rvh",
self.workdir / f"{self.run_name}_backup_{dt.datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}.rvh",
)
# Read the existing .rvh file and update the hydrological response variables
with (self.workdir / f"{self.run_name}.rvh").open("r") as file:
lines = file.readlines()
output = [int(o) for o in output]
output_lines = []
changes_active = False
for line in lines:
if line == ":SubBasins\n":
changes_active = True
hruid_idx = None
gauged_idx = None
if line == ":EndSubBasins\n":
changes_active = False
if changes_active:
if "GAUGED" in line:
l_split = line.split(",")
l_split = [li for li in l_split if li.strip() != ":Attributes"]
hruid_idx = [idx for idx, line in enumerate(l_split) if " ID" in line]
gauged_idx = [idx for idx, line in enumerate(l_split) if "GAUGED" in line]
if len(hruid_idx) != 1 and len(gauged_idx) != 1:
raise ValueError("Could not determine unique HRU and GAUGED columns in the .rvh file.")
hruid_idx = hruid_idx[0]
gauged_idx = gauged_idx[0]
if gauged_idx is not None:
l_split = line.split()
try:
idx = int(l_split[hruid_idx])
# Replace the gauged element, accounting for the real whitespace between each non-space value
whitespaces = np.diff([i for i, c in enumerate(line) if c == " "])
actual_gauged_idx = (np.array([i for i, val in enumerate(whitespaces) if val != 1]))[gauged_idx]
actual_gauged_idx = np.sum(whitespaces[:actual_gauged_idx]) + 1
line = [li for li in line]
if idx in output:
line[actual_gauged_idx] = "1"
else:
line[actual_gauged_idx] = "0"
line = "".join(line)
except ValueError as e:
if "invalid literal for int()" in str(e):
pass # Skip lines that do not contain a valid HRU ID
else:
raise e
output_lines.append(line)
# Overwrite the .rvh file with the new lines
with (self.workdir / f"{self.run_name}.rvh").open("w") as file:
file.writelines(output_lines)
[docs]
def run(self, *, overwrite: bool = False, standardize: bool = True, return_streamflow: bool = True) -> xr.Dataset | None:
"""
Run the Raven hydrological model and return simulated streamflow.
Parameters
----------
overwrite : bool
If True, overwrite the existing output files. Default is False.
standardize : bool
If True, standardize the output files to ensure they are in a consistent format. Default is True.
return_streamflow : bool
If True, return the simulated streamflow. Default is True.
Returns
-------
xr.Dataset
The simulated streamflow.
"""
# TODO: Compare the tagged version of the files with the raven-hydro version
# FIXME: overwrite is currently not working as intended in RavenPy. Remove this once it is fixed.
if overwrite is False and Path.is_file(self.workdir / "output" / f"{self.run_name}_Hydrographs.nc"):
raise FileExistsError(f"Output files already exist in {self.workdir / 'output'}. Use 'overwrite=True' to overwrite them.")
if self.executable is None:
run(modelname=self.run_name, configdir=self.workdir, overwrite=overwrite)
else:
executable = str(Path(self.executable))
if "raven" not in executable.lower():
raise ValueError("The executable command does not seem to be a valid Raven command. Please check the 'executable' parameter.")
# Since we bypassed RavenPy, we need to clean up the output directory
for file in (self.workdir / "output").glob(f"{self.run_name}*.nc"):
file.unlink()
subprocess.run( # noqa: S603
[
executable,
self.workdir / f"{self.run_name}",
"-o",
self.workdir / "output",
],
check=True,
stdin=subprocess.DEVNULL,
)
if standardize:
self.standardize_outputs()
if return_streamflow:
return self.get_outputs("q")
[docs]
def get_streamflow(self, output: Literal["q", "all", "path"] = "q", **kwargs) -> xr.Dataset | Path:
r"""
Return the simulated streamflow from the Raven model.
Parameters
----------
output : {"q", "all", "path"}
The type of output to return. If "q", return only the streamflow variable.
If "all", return the entire hydrograph dataset.
If "path", return the path to the streamflow file.
\*\*kwargs : dict
Keyword arguments to pass to :py:func:`xarray.open_dataset`.
Returns
-------
xr.Dataset
The streamflow file.
Path
The path to the streamflow file if output is set to "path".
"""
warnings.warn(
"The 'get_streamflow' method is deprecated and will be removed in a future version. Please use the 'get_outputs' method instead.",
FutureWarning,
stacklevel=2,
)
if output == "path":
return self.get_outputs("q", return_paths=True)[0]
else:
if output == "q":
return self.get_outputs("q")
else:
return self.get_outputs("Hydrographs", **kwargs)
[docs]
def get_outputs(self, output: str, return_paths: bool = False, **kwargs) -> xr.Dataset | Path | list[Path]:
r"""
Return the outputs of the Raven model.
Parameters
----------
output : str
"path" to return the output directory.
"q" to only return the streamflow variable.
Alternatively, a string matching the name of the output file to return (e.g. "Hydrographs", "Storage", "ByHRU", etc.).
return_paths : bool
If True, return the path to the output file(s) instead of the dataset. Default is False.
\*\*kwargs : dict
Keyword arguments to pass to :py:func:`xarray.open_dataset`.
Returns
-------
xr.Dataset
The requested output variable.
Path
The path to the output directory if output is set to "path".
list[Path]
The path to the output file(s) if return_path is True.
"""
outdir = self.workdir / "output"
if output == "path":
return outdir
if output == "q":
file = list(outdir.glob(f"{self.run_name}_*Hydrographs*.nc"))
if return_paths:
return file
else:
with xr.open_dataset(file[0], **kwargs) as ds:
return ds[["q"]]
else:
matching_files = list(outdir.glob(f"{self.run_name}_*{output}*.nc"))
if return_paths:
return matching_files
else:
if len(matching_files) == 0:
raise ValueError(f"No output files matching '{self.run_name}_*{output}*.nc' were found.")
else:
kwargs = deepcopy(kwargs)
kwargs.setdefault("combine", "by_coords")
kwargs.setdefault("data_vars", "minimal")
with xr.open_mfdataset(matching_files, **kwargs) as ds:
return ds
[docs]
def aggregate_outputs(
self, by: Literal["hru", "unit", "subbasin"], to: Literal["subbasin", "drainage_area"], subset: list[str] | None = None, **kwargs
) -> None:
r"""
Aggregate the model outputs to a different spatial unit. See the Notes section for more details.
Parameters
----------
by : {"hru", "unit", "subbasin"}
The spatial unit to aggregate from.
"unit" is the generic term for "hru".
to : {"subbasin", "drainage_area"}
The spatial unit to aggregate to.
subset : list[str] | None
The list of variables to aggregate. If None, all variables will be processed.
The strings should match the names produced by the Raven model, typically found under ":CustomOutput" in the .rvi file.
\*\*kwargs : dict
Keyword arguments to pass to :py:func:`xarray.open_dataset`.
Returns
-------
None
The aggregated outputs will be saved as new NetCDF files in the output directory, with a name pattern
following what is produced by the Raven model (e.g. "{run_name}_variable}_By{aggregation}.nc").
Aggregation will be 'ByHRU', 'BySubbasin', or 'ByDrainageArea', depending on the 'to' parameter.
If a file with the same name already exists, a new file will be saved with a "_v{n}" suffix.
Notes
-----
This method expects that relevant spatial information has been provided to the RavenPy model, either through the initial configuration or
through the `update_data` method. Furthermore, that spatial information should be consistent with ravenpy.extractors.BasinMakerExtractor
expectations, as well as the Data Specifications of Basin Maker (https://hydrology.uwaterloo.ca/basinmaker/) and the outputs of
BasinMaker's `Generate_HRUs` function. In particular, the following variables should be present in the HRU file:
- Always:
- SubId: The ID of the subbasins.
- BasArea: The area of the subbasins.
- by == "hru":
- HRU_ID: The ID of the HRUs.
- HRU_Area: The area of the HRUs, in units consistent with the area of the subbasins.
- to == "drainage_area":
- DowSubId: The ID of the downstream subbasin for each HRU.
"""
clean = {
"hru": "HRU",
"unit": "HRU",
"subbasin": "Subbasin",
"drainage_area": "DrainageArea",
}
# Get the files to aggregate
files = self.get_outputs(output=f"_By{clean[by]}", return_paths=True)
if subset is not None:
files = [file for file in files if any(s in file.name for s in subset)]
if len(files) == 0:
raise ValueError(f"No output files matching '{self.run_name}_*_By{clean[by]}*.nc' were found.")
weights = None
for file in files:
with xr.open_dataset(file, **kwargs) as ds:
ds_agg, weights = aggregate_output(ds, by=by, to=to, weights=weights)
file_out = file.parent / file.name.replace(f"_By{clean[by]}", f"_By{clean[to]}")
if file_out.exists():
warnings.warn(
f"The file {file_out} already exists.",
stacklevel=2,
)
files_exist = list(file_out.parent.glob(file_out.stem.replace("[", "[[]").replace("]_", "[]]_") + "*.nc"))
file_out = Path(str(file_out).replace(".nc", f"_v{len(files_exist) + 1}.nc"))
ds_agg.to_netcdf(file_out)
def _read_qobs(self, qobs_file: os.PathLike | str, alt_name_flow: str | None = "q") -> None:
"""Read the observed streamflow data from a NetCDF file and update the .qobs properties of the RavenPy model."""
self.qobs = {
"file": Path(qobs_file),
"alt_name_flow": alt_name_flow,
}
with xr.open_dataset(qobs_file, chunks={}) as ds_qobs:
ds_qobs = ds_qobs.squeeze()
# Try to get the basin name or ID variable as a string
basin_id = None
if "timeseries_id" in ds_qobs.cf.cf_roles:
basin_id = ds_qobs.cf.cf_roles["timeseries_id"][0]
else:
basin_name = [a for a in ["basin_id", "basin_name", "subbasin_id"] if a in ds_qobs]
if len(basin_name) == 1:
basin_id = basin_name[0]
elif len(basin_name) > 1:
raise ValueError(
"Multiple possible basin ID variables found in the observed streamflow dataset. Please ensure only one of 'basin_id', "
"'basin_name', or 'subbasin_id' is present."
)
station_id = "station_id" if "station_id" in ds_qobs else None
# If the dataset is a single time series, we can manage a missing basin_id
if basin_id is None and ds_qobs.sizes.keys() == {"time"}:
self.qobs["basin_id"] = [1]
self.qobs["station_id"] = ["0"] if station_id is None else ds_qobs[station_id].astype(str).values.tolist()
elif basin_id is None:
raise ValueError(
"The observed streamflow dataset must contain a basin ID variable. Please ensure one of 'basin_id', 'basin_name', "
"or 'subbasin_id' is present, or that the dataset contains a time series with an attribute 'cf_role' set to 'timeseries_id'."
)
else:
# Extract the basin and station IDs
self.qobs["basin_id"] = [int(i) for i in ds_qobs[basin_id].values]
self.qobs["station_id"] = (
[str(i) for i in range(len(self.qobs["basin_id"]))] if station_id is None else ds_qobs[station_id].astype(str).values.tolist()
)
self.qobs["rc"] = [
rc.commands.ObservationData.from_nc(
qobs_file,
alt_names=alt_name_flow,
station_idx=i + 1,
uid=self.qobs["basin_id"][i],
)
for i in range(len(self.qobs["basin_id"]))
]
def _read_hru( # noqa: C901
self,
hru: gpd.GeoDataFrame | dict | os.PathLike | str,
output_subbasins: Literal["all", "qobs"] | list[int] | None = None,
minimum_reservoir_area: str | None = None,
) -> gpd.GeoDataFrame:
"""Read the HRU properties from a GeoDataFrame and update the .hru properties of the RavenPy model. Might also create a new HRU file."""
hru = deepcopy(hru)
self.hru = {"keys": {}}
has_changed = False
hru_file = None
if isinstance(hru, gpd.GeoDataFrame | dict):
has_changed = True
# Manage the input to ensure it is a GeoDataFrame
if isinstance(hru, str | os.PathLike):
hru_file = Path(hru)
hru = gpd.read_file(Path(hru))
elif isinstance(hru, dict):
gpd_info = {k: [v] for k, v in hru.items() if k not in ["geometry", "crs"]}
hru = gpd.GeoDataFrame(
gpd_info,
geometry=[hru.get("geometry")],
crs=hru.get("crs"),
)
elif not isinstance(hru, gpd.GeoDataFrame):
raise TypeError("The 'hru' parameter must be a GeoDataFrame, a dictionary, or a path to a shapefile.")
# Manage the output subbasins
if output_subbasins is not None:
has_changed = True
if output_subbasins == "qobs":
if self.qobs is None:
raise ValueError("The 'qobs' parameter is set to 'qobs', but no observed streamflow data is provided.")
basin_ids = self.qobs["basin_id"]
# Also fill in the station names to match the observed streamflow data
station_ids = self.qobs["station_id"]
hru["Obs_NM"] = hru["SubId"].map(dict(zip(basin_ids, station_ids, strict=False)))
elif output_subbasins == "all":
basin_ids = hru["SubId"].unique()
elif isinstance(output_subbasins, list):
basin_ids = output_subbasins
else:
raise ValueError(
f"The 'output_subbasins' parameter must be either 'all', 'qobs', or a list of basin IDs. Got '{output_subbasins}' instead."
) from None
self.hru["output_subbasins"] = basin_ids
hru["Has_Gauge"] = hru["SubId"].isin(basin_ids)
hru["Obs_NM"] = hru["Obs_NM"].fillna("")
try:
# Extract the basin properties
rvh_config = ravenpy.extractors.BasinMakerExtractor(hru).extract()
self.hru["keys"]["HRUs"] = rvh_config["hrus"]
except KeyError as err:
rvh_config = None
# If the HRU is not compatible with the BasinMakerExtractor, we try to manage it manually
if len(hru) != 1:
raise ValueError(
"If using multiple HRUs, the HRU GeoDataFrame must contain every property required by the BasinMakerExtractor."
) from err
if any(col not in hru.columns for col in ["HRU_ID", "hru_type", "SubId", "DowSubId"]):
has_changed = True
hru["HRU_ID"] = hru.get("HRU_ID", "1")
hru["hru_type"] = hru.get("hru_type", "land")
# These two are only required for gridded meteorological data, but we add them anyway for consistency
hru["SubId"] = hru.get("SubId", 1)
hru["DowSubId"] = hru.get("DowSubId", -1)
hru_dict = hru.reset_index().squeeze()
self.hru["keys"]["HRUs"] = [
{
"area": hru_dict["area"],
"elevation": hru_dict["elevation"],
"latitude": hru_dict["latitude"],
"longitude": hru_dict["longitude"],
"hru_type": hru_dict["hru_type"],
"hru_id": hru_dict["HRU_ID"],
}
]
self.hru["hru"] = hru
self.hru["saved_on_disk"] = False if has_changed else True
if hru_file is None or has_changed:
self.hru["file"] = self.workdir / "geospatial" / f"{self.run_name}_hru.gpkg"
else:
self.hru["file"] = hru_file
# Special considerations for HBVEC
if rvh_config is not None and self.model_name == "HBVEC":
# Q_REFERENCE is needed for the DIFFUSIVE_ROUTE routing scheme, which is not default but could be used by the user.
if "sub_basins" in rvh_config and any(q in hru.columns for q in ["Q_Mean", "Q_REFERENCE"]):
if any(k in self.kwargs for k in ["SubBasinProperties", "sub_basin_properties"]):
warnings.warn(
"The 'SubBasinProperties' parameter is already set in the model configuration. "
"The Q_REFERENCE values from the HRU file will not be added to it.",
stacklevel=2,
)
else:
col_name = "Q_Mean" if "Q_Mean" in hru.columns else "Q_REFERENCE"
# Warning: This entry would completely overwrite the default SubBasinProperties, so we need to copy the existing ones first
# and add Q_REFERENCE to it.
default_sub_basin_properties = rc.emulators.HBVEC.model_fields["sub_basin_properties"].default
sb_parameters = default_sub_basin_properties["parameters"].copy()
records_values = default_sub_basin_properties["records"][0]["values"]
self.hru["keys"]["sub_basin_properties"] = {
"parameters": sb_parameters + ["Q_REFERENCE"],
"records": [
{
"sb_id": subid["subbasin_id"],
"values": tuple(
list(records_values)
+ [
hru.loc[
hru["SubId"] == subid["subbasin_id"],
col_name,
].mean()
]
),
}
for subid in rvh_config["sub_basins"]
],
}
if "reservoirs" in rvh_config and len(rvh_config["reservoirs"]) > 0:
# Set the initial storage of implicit lakes at 1000 mm
if "hru_state_variable_table" in self.kwargs:
warnings.warn(
"The 'hru_state_variable_table' parameter is already set in the model configuration. "
"The initial storage for implicit lakes will not be modified.",
stacklevel=2,
)
else:
storage_name = (
self.kwargs.get("LakeStorage") or self.kwargs.get("lake_storage") or "SOIL[2]" # Default for HBVEC
)
self.hru["keys"]["hru_state_variable_table"] = [
{
"hru_id": res["hru_id"],
"data": {storage_name: 1000},
}
for res in rvh_config["reservoirs"]
]
# Filter the reservoirs based on the minimum reservoir area, but after setting an initial storage on all lakes
if minimum_reservoir_area is not None:
minimum_reservoir_area = convert_units_to(minimum_reservoir_area, "m2")
reservoirs = hru.loc[
(hru["LakeArea"] >= minimum_reservoir_area) & (hru["HRU_IsLake"] > 0),
"SubId",
].unique()
rvh_config["reservoirs"] = [r for r in rvh_config["reservoirs"] if r["subbasin_id"] in reservoirs]
# Add information from the BasinMakerExtractor
for key in rvh_config.keys():
if key != "hrus" and len(rvh_config[key]) > 0:
self.hru["keys"][key] = rvh_config[key]
def _read_meteo(
self,
meteo_file: os.PathLike | str,
data_type: list[str],
alt_names_meteo: dict | None = None,
meteo_station_properties: dict | None = None,
gridweights: os.PathLike | str | None = None,
) -> None:
"""Read the meteorological data from a NetCDF file and update the .meteo properties of the RavenPy model."""
self.meteo = {
"file": Path(meteo_file),
"data_type": data_type,
"alt_names_meteo": alt_names_meteo,
"station_properties": meteo_station_properties,
"keys": {},
}
# Get some properties from the meteorological file
with xr.open_dataset(self.meteo["file"], chunks={}) as ds:
# Station-based meteorological data (Hard-coding is fine here, since RavenPy only supports station data with a 'station_id' dimension)
if "station_id" in ds.dims or ds.sizes.keys() == {"time"}:
self.meteo["type"] = "station"
self.meteo["station_len"] = len(ds.station_id) if "station_id" in ds.dims else 1
if self.meteo["station_len"] > 5:
warnings.warn(
"Multiple stations were provided in the meteorological data. Be aware that Raven is very inefficient and will"
" open the file for each station and each variable. Try to use gridded data if possible for better performance.",
UserWarning,
stacklevel=2,
)
elif ds.cf.axes.get("X") is not None:
if self.hru is None:
raise ValueError("The HRU properties must be defined before reading gridded meteorological data.")
self.meteo["type"] = "grid"
# Other required properties
self.meteo["dim_names"] = (ds.cf.axes["X"][0], ds.cf.axes["Y"][0])
self.meteo["var_names"] = (
ds.cf.coordinates["longitude"][0],
ds.cf.coordinates["latitude"][0],
)
self.meteo["elevation_name"] = ds.cf.coordinates["vertical"][0]
# Raven requires that the data is in T,Y,X order
for v in self.meteo["data_type"]:
v = alt_names_meteo.get(v, v)
if ds[v].dims != (
"time",
self.meteo["dim_names"][1],
self.meteo["dim_names"][0],
):
raise ValueError(
"All variables in the meteorological dataset must have the dimensions (time, Y, X). "
"Please use the 'xhydro.modelling.format_input' function to ensure the data is in the correct format."
)
else:
raise ValueError(
"Could not determine the type of meteorological data. Please use the 'xhydro.modelling.format_input' function "
"to ensure the data is in the correct format."
)
# Prepare the meteorological data commands
if self.meteo["type"] == "station":
self.meteo["keys"]["Gauge"] = [
rc.commands.Gauge.from_nc(
self.meteo["file"],
data_type=self.meteo["data_type"],
alt_names=self.meteo["alt_names_meteo"],
data_kwds=self.meteo["station_properties"],
station_idx=i + 1, # RavenPy uses 1-based indexing for stations
)
for i in range(self.meteo["station_len"])
]
else:
if not self.hru["saved_on_disk"]:
Path(self.hru["file"].parent).mkdir(parents=True, exist_ok=True)
self.hru["hru"].to_file(str(self.hru["file"]))
self.hru["saved_on_disk"] = True
# Compute the weights
if gridweights is None:
gridweights = self.workdir / "weights" / f"{self.meteo['file'].stem}_vs_{self.hru['file'].stem}_weights.txt"
weights = ravenpy.extractors.GridWeightExtractor(
input_file_path=self.meteo["file"],
routing_file_path=self.hru["file"],
dim_names=self.meteo["dim_names"],
var_names=self.meteo["var_names"],
routing_id_field="HRU_ID",
).extract()
gw_cmd = rc.commands.GridWeights(**weights)
gridweights.parent.mkdir(parents=True, exist_ok=True)
gridweights.write_text(gw_cmd.to_rv() + "\n")
# Meteo configuration
self.meteo["keys"]["GriddedForcing"] = [
rc.commands.GriddedForcing.from_nc(
self.meteo["file"],
data_type=v,
alt_names=(self.meteo["alt_names_meteo"][v] if self.meteo["alt_names_meteo"] is not None else None),
data_kwds=self.meteo["station_properties"],
station_idx=None, # FIXME: This can be removed once we have ravenpy >= 0.18.3
engine="h5netcdf",
GridWeights=rc.commands.RedirectToFile(gridweights),
ElevationVarNameNC=self.meteo["elevation_name"],
DimNamesNC=list(self.meteo["dim_names"]) # This must always be X, Y, T regardless of the input data
+ ["time"],
# Longitude/Latitude names are only set if they differ from the dimension names (aka. non-regular grids)
LongitudeVarNameNC=(self.meteo["var_names"][0] if self.meteo["var_names"][0] != self.meteo["dim_names"][0] else None),
LatitudeVarNameNC=(self.meteo["var_names"][1] if self.meteo["var_names"][1] != self.meteo["dim_names"][1] else None),
)
for v in data_type
]
[docs]
def standardize_outputs(self, files: list[str] | None = None, **kwargs):
r"""
Standardize the outputs of the simulation to be more consistent with CF conventions.
Parameters
----------
files : list[str] | None
Names of the output files to standardize. If None, all output files will be standardized.
The strings can be part of the file name (e.g. "Hydrographs", "Storage", "ByHRU", etc.).
\*\*kwargs : dict
Keyword arguments to pass to :py:func:`xarray.open_dataset`.
Notes
-----
Be aware that since systems such as Windows do not allow to overwrite files that are currently open,
a temporary file will be created and then renamed to overwrite the original file.
"""
if files is None:
patterns = [f"{self.run_name}_*.nc"]
else:
patterns = [f"{self.run_name}_*{file.replace('.nc', '')}*.nc" for file in files]
files = []
for pattern in patterns:
files.extend(self.get_outputs(output="path").glob(pattern))
alt_names = {
# Dimensions
"basin_name": "subbasin_id",
"SBID": "subbasin_id",
"HRU_ID": "unit_id",
# HRU properties
"SubId": "subbasin_id",
"DowSubId": "dowsub_id",
"DrainArea": "drainage_area",
"BasArea": "subbasin_drainage_area",
"MeanElev": "subbasin_elevation",
"Obs_NM": "station_id",
"centroid_x": "subbasin_centroid_longitude",
"centroid_y": "subbasin_centroid_latitude",
"HRU_CenX": "unit_centroid_longitude",
"HRU_CenY": "unit_centroid_latitude",
"HRU_E_mean": "unit_elevation",
"HRU_Area": "unit_drainage_area",
# Lumped models
"area": "drainage_area",
"elevation": "elevation",
"longitude": "centroid_longitude",
"latitude": "centroid_latitude",
# Variables
"q_sim": "q",
}
for file in files:
with xr.open_dataset(file, **kwargs) as ds:
[ds[c].load() for c in ds.coords]
# Global attributes are already pretty good, but make the Raven version explicit
# Since the executable used might differ from raven-hydro, we trust the dataset's history
ds.attrs["Raven_version"] = ds.attrs.get("history", "Raven unknown").split("Raven ")[-1]
if run is not None:
ds.attrs["RavenPy_version"] = ravenpy.__version__
ds = standardize_output(ds, spatial_info=self.hru["hru"], alt_names=alt_names)
# Save the file
ds.to_netcdf(file.parent / f"{file.stem}_tmp.nc")
# Remove the original file and rename the new one
file.unlink()
(file.parent / f"{file.stem}_tmp.nc").rename(
file,
)