Source code for xhydro.optimal_interpolation.compare_result

"""Compare results between simulations and observations."""

import logging

import numpy as np
import xarray as xr

import xhydro.optimal_interpolation.utilities as util
from xhydro.modelling.obj_funcs import get_objective_function


logger = logging.getLogger(__name__)

__all__ = ["compare"]


[docs] def compare( qobs: xr.Dataset, qsim: xr.Dataset, flow_l1o: xr.Dataset, station_correspondence: xr.Dataset, observation_stations: list, percentile_to_plot: int = 50, show_comparison: bool = True, ): """ Start the computation of the comparison method. Parameters ---------- qobs : xr.Dataset Streamflow and catchment properties dataset for observed data. qsim : xr.Dataset Streamflow and catchment properties dataset for simulated data. flow_l1o : xr.Dataset Streamflow and catchment properties dataset for simulated leave-one-out cross-validation results. station_correspondence : xr.Dataset Matching between the tag in the simulated files and the observed station number for the obs dataset. observation_stations : list Observed hydrometric dataset stations to be used in the cross-validation step. percentile_to_plot : int Percentile value to plot (default is 50). show_comparison : bool Whether to display the comparison plots (default is True). """ time_range = len(qobs["time"]) # Read percentiles list (which percentile thresholds were used) percentile = flow_l1o["percentile"] # Find position of the desired percentile idx_pct = np.where(percentile == percentile_to_plot)[0] if idx_pct is None: raise ValueError( "The desired percentile is not computed in the results file \ provided. Please make sure your percentile value is expressed \ in percent (i.e. 50th percentile = 50)" ) station_count = len(observation_stations) selected_flow_sim = np.empty((time_range, station_count)) * np.nan selected_flow_obs = np.empty((time_range, station_count)) * np.nan selected_flow_l1o = np.empty((time_range, station_count)) * np.nan for i in range(0, station_count): msg = f"Reading data from station {i + 1} of {station_count}" logger.info(msg) # For each validation station: cv_station_id = observation_stations[i] # Get the station number from the obs database which has the same codification for station ids. index_correspondence = np.where(station_correspondence["station_id"] == cv_station_id)[0][0] station_code = station_correspondence["reach_id"][index_correspondence] # Search for data in the Qsim file index_in_sim = np.where(qsim["station_id"].values == station_code.data)[0] sup_sim = qsim["drainage_area"].values[index_in_sim] selected_flow_sim[:, i] = qsim["q"].isel(station=index_in_sim) / sup_sim # Get data in Qobs file index_in_obs = np.where(qobs["station_id"] == cv_station_id)[0] sup_obs = qobs["drainage_area"].values[index_in_obs] selected_flow_obs[:, i] = qobs["q"].isel(station=index_in_obs) / sup_obs # Get data in Leave one out file index_in_l1o = np.where(flow_l1o["station_id"] == cv_station_id)[0] sup_l1o = qobs["drainage_area"].values[index_in_l1o] selected_flow_l1o[:, i] = flow_l1o["q"].isel(station=index_in_l1o, percentile=idx_pct).squeeze() / sup_l1o # Prepare the arrays for kge results kge = np.empty(station_count) * np.nan nse = np.empty(station_count) * np.nan kge_l1o = np.empty(station_count) * np.nan nse_l1o = np.empty(station_count) * np.nan for n in range(0, station_count): if not np.isnan(selected_flow_obs[:, n]).all(): kge[n] = get_objective_function(selected_flow_obs[:, n], selected_flow_sim[:, n], "kge") nse[n] = get_objective_function(selected_flow_obs[:, n], selected_flow_sim[:, n], "nse") kge_l1o[n] = get_objective_function(selected_flow_obs[:, n], selected_flow_l1o[:, n], "kge") nse_l1o[n] = get_objective_function(selected_flow_obs[:, n], selected_flow_l1o[:, n], "nse") if show_comparison: util.plot_results(kge, kge_l1o, nse, nse_l1o)