"""
Uncertainty analysis module for hydrological frequency analysis.
This module provides functions for bootstrap sampling, distribution fitting,
and uncertainty estimation in regional frequency analysis. It includes tools
for generating bootstrap samples from observed data and fitted distributions,
calculating L-moments, and estimating quantiles with uncertainty bounds.
Functions:
bootstrap_obs: Generate bootstrap samples from observed data.
bootstrap_dist: Generate bootstrap samples from a fitted distribution.
fit_boot_dist: Fit distributions to bootstrap samples.
calc_moments_iter: Calculate L-moments for each bootstrap sample.
calc_q_iter: Calculate quantiles for each bootstrap sample and group.
generate_combinations: Generate combinations of indices for sensitivity analysis.
"""
from itertools import combinations
import numpy as np
import xarray as xr
import xclim
from scipy import stats
import xhydro.frequency_analysis as xhfa
from .regional import (
calc_moments,
calculate_return_period,
remove_small_regions,
)
__all__ = [
"bootstrap_dist",
"bootstrap_obs",
"calc_moments_iter",
"fit_boot_dist",
"generate_combinations",
]
[docs]
def bootstrap_obs(obs: xr.DataArray, *, n_samples: int, seed: int | None = None) -> xr.DataArray:
"""
Generate bootstrap samples from observed data.
Parameters
----------
obs : xarray.DataArray
The observed data to bootstrap.
n_samples : int
The number of bootstrap samples to generate.
seed : int, optional
Seed for the random number generator.
Returns
-------
xarray.DataArray
Bootstrap samples with dimensions [samples, time].
"""
def _gen_boot(f: np.array, n_samples: int, seed: int | None = None) -> np.ndarray:
vals = f[~np.isnan(f)]
rng = np.random.default_rng(seed=seed)
idx = rng.choice(vals, size=(n_samples, len(vals)))
frep = np.dstack([f] * n_samples)[0].T
frep[~np.isnan(frep)] = idx.flatten()
return frep
return xr.apply_ufunc(
_gen_boot,
obs,
n_samples,
seed,
input_core_dims=[["time"], [], []],
output_core_dims=[["samples", "time"]],
vectorize=True,
).assign_coords(samples=range(n_samples))
[docs]
def bootstrap_dist(ds_obs: xr.Dataset, ds_params: xr.Dataset, *, n_samples: int) -> xr.Dataset:
"""
Generate bootstrap samples from a fitted distribution.
Parameters
----------
ds_obs : xarray.Dataset
The observed data.
ds_params : xarray.Dataset
The fitted distribution parameters.
n_samples : int
The number of bootstrap samples to generate.
Returns
-------
xarray.Dataset
Bootstrap samples with dimensions [samples, time].
Notes
-----
This function does not support lazy evaluation.
"""
def _calc_rvs(data, params, dist, p_names, n_samples):
dist_obj = xclim.indices.stats.get_dist(dist)
shape_params = [] if dist_obj.shapes is None else dist_obj.shapes.split(",")
dist_params = shape_params + ["loc", "scale"]
data_length = len(data)
p_names = p_names[~np.isnan(params)]
params = params[~np.isnan(params)]
ordered_params = [params[d == p_names] for d in dist_params]
samples = getattr(stats, dist).rvs(*ordered_params, size=(n_samples, data_length))
samples[:, np.isnan(data)] = np.nan
return samples
ds = xr.apply_ufunc(
_calc_rvs,
ds_obs.load(),
ds_params.load(),
ds_params.scipy_dist,
ds_params.dparams,
n_samples,
input_core_dims=[["time"], ["dparams"], [], ["dparams"], []],
output_core_dims=[["samples", "time"]],
vectorize=True,
).assign_coords(samples=range(n_samples))
return ds
[docs]
def fit_boot_dist(ds: xr.Dataset) -> xr.Dataset:
"""
Fit distributions to bootstrap samples.
Parameters
----------
ds : xarray.Dataset
The bootstrap samples to fit.
Returns
-------
xarray.Dataset
Fitted distribution parameters for each bootstrap sample.
"""
# we can't use xhydro fit as each dist would be fitted over each dist, resulting in nxn fits
params_ince = []
for dist in ds.scipy_dist.values:
ds.sel(scipy_dist=dist)
params_ince.append(xhfa.local.fit(ds.sel(scipy_dist=dist), distributions=[dist]))
return xr.concat(params_ince, dim="scipy_dist", join="outer")
[docs]
def calc_moments_iter(ds_samples: xr.Dataset) -> xr.Dataset:
"""
Calculate L-moments for each bootstrap sample.
Parameters
----------
ds_samples : xarray.Dataset
The bootstrap samples.
Returns
-------
xarray.Dataset
L-moments for each bootstrap sample.
"""
ds_mom = []
for sample in ds_samples.samples.values:
ds = ds_samples.sel(samples=sample)
ds_mom.append(calc_moments(ds))
return xr.concat(ds_mom, dim="samples")
def _calc_q_iter_da(
bv: str,
da_groups: xr.DataArray,
da_moments_iter: xr.DataArray,
*,
return_period: np.array,
small_regions_threshold: int | None = 5,
l1: xr.DataArray | None = None,
) -> xr.DataArray:
"""
Calculate quantiles for each bootstrap sample and group.
Parameters
----------
bv : str
The basin identifier or 'all' to proceed on all basins (needed for ungauged).
The associated dimension must have a 'cf_role: timeseries_id' attribute.
da_groups: xr.DataArray,
The grouped data.
da_moments_iter: xr.DataArray
The L-moments for each bootstrap sample.
return_period : array-like
The return periods to calculate quantiles for.
small_regions_threshold : int, optional
The threshold for removing small regions. Default is 5.
l1 : xr.DataArray, optional
First L-moment (location) values. L-moment can be specified for ungauged catchments.
If `None`, values are taken from ds_moments_iter.
Returns
-------
xarray.DataArray
Quantiles for each bootstrap sample and group.
"""
# We select groups for all or one id
id_dim = da_groups.cf.cf_roles["timeseries_id"][0]
if bv == "all":
ds_temp = da_groups.dropna("region_id", how="all")
else:
ds_temp = da_groups.sel(**{id_dim: bv}).dropna("region_id", how="all")
ds_mom = []
# For each group, we find which id are in it
for region_id in ds_temp.region_id.values:
id_list = da_groups.sel(region_id=region_id).dropna(id_dim, how="all").id.values
# We use moments with ressample previously done, and we create ds_moment_group with iterations
ds_mom.append(da_moments_iter.sel(**{id_dim: id_list}).assign_coords(region_id=region_id).expand_dims("region_id"))
# Concat along region_id
ds_moments_groups = xr.concat(ds_mom, dim="region_id")
da_groups = da_groups.sel(region_id=ds_moments_groups.region_id).dropna(dim="id", how="all")
# With obs and moments of same dims, we calculate
qt = calculate_return_period(
da_groups.to_dataset(),
ds_moments_groups.to_dataset(),
return_period=return_period,
l1=l1,
)
qt = remove_small_regions(qt, thresh=small_regions_threshold)
# For each station we stack regions et bootstrap
if bv == "all":
return qt.rename({"samples": "obs_samples"}).stack(samples=["region_id", "obs_samples"]).to_dataarray().squeeze()
else:
return qt.rename({"samples": "obs_samples"}).stack(samples=["region_id", "obs_samples"]).sel(**{id_dim: bv}).to_dataarray().squeeze()
def calculate_quantiles_over_boostraped_groups(
bv: str,
groups: xr.DataArray | xr.Dataset,
moments_iter: xr.DataArray | xr.Dataset,
return_period: np.array,
small_regions_threshold: int | None = 5,
l1: xr.DataArray | None = None,
) -> xr.DataArray:
"""
Calculate quantiles for each bootstrap sample and group.
Parameters
----------
bv : str
The basin identifier or 'all' to proceed on all basins (needed for ungauged).
The associated dimension must have a 'cf_role: timeseries_id' attribute.
groups : xr.DataArray or xr.Dataset
The grouped data.
moments_iter : xr.DataArray or xr.Dataset
The L-moments for each bootstrap sample.
return_period : array-like
The return periods to calculate quantiles for.
small_regions_threshold : int, optional
The threshold for removing small regions. Default is 5.
l1 : xr.DataArray, optional
First L-moment (location) values. L-moment can be specified for ungauged catchments.
If `None`, values are taken from ds_moments_iter.
Returns
-------
xr.DataArray or xr.Dataset
Quantiles for each bootstrap sample and group. Returns a Dataset if input groups
and moments_iter are Datasets, otherwise returns a DataArray.
"""
if all(isinstance(input, xr.DataArray) for input in [groups, moments_iter]):
ds = False
elif all(isinstance(input, xr.Dataset) for input in [groups, moments_iter]):
ds = True
else:
raise TypeError("groups and moments_iter must be both xr.DataArray or xr.Dataset")
if ds:
ds = xr.Dataset()
groups_var = list(groups.keys())
if groups_var != list(moments_iter.keys()):
raise Exception("Variables in groups and moments_iter must be the same")
for var in groups_var:
ds[var] = _calc_q_iter_da(
bv,
groups[var],
moments_iter[var],
return_period=return_period,
small_regions_threshold=small_regions_threshold,
l1=l1,
).expand_dims("id")
return ds
else:
return _calc_q_iter_da(
bv,
groups,
moments_iter,
return_period=return_period,
small_regions_threshold=small_regions_threshold,
l1=l1,
).expand_dims("id")
[docs]
def generate_combinations(da: xr.DataArray, *, n: int) -> list:
"""
Generate combinations of indices omitting up to N indices.
Parameters
----------
da : xarray.DataArray
Input DataArray.
n : int
Number of indices to omit in each combination.
Returns
-------
list
List of all combinations.
"""
# Get the list of indices
for ids in da.indexes:
if ids != "components":
indices = da[ids].values
# Variable to store all combinations
all_combinations = []
# Generate combinations for each case
for i in range(0, n + 1):
# Generate combinations of indices omitting i indices
index_combinations = list(combinations(indices, len(indices) - i))
# Add each combination to the variable
all_combinations.extend(index_combinations)
return all_combinations