Code source de xhydro.frequency_analysis.regional

"""
Provides functions for performing regional frequency analysis on hydrological data.

Key Functions:
- cluster_indices: Get indices of elements with a specific cluster number.
- get_groups_indices: Retrieve indices of groups from a clustering result.
- get_group_from_fit: Obtain indices of groups from a fitted model.
- fit_pca: Perform Principal Component Analysis (PCA) on the input dataset.
- moment_l_vector: Calculate L-moments for multiple datasets.
- moment_l: Compute various L-moments and L-moment ratios for a given dataset.

The module utilizes NumPy, pandas, scikit-learn, and xarray for efficient data manipulation and analysis.

Example Usage:
    import xarray as xr
    from regional_frequency_analysis import fit_pca, moment_l_vector

    # Load your dataset
    ds = xr.open_dataset('your_data.nc')

    # Perform PCA
    data_pca, pca_obj = fit_pca(ds, n_components=3)

    # Calculate L-moments
    l_moments = moment_l_vector(ds.values)

This module is designed for hydrologists and data scientists working with regional frequency analysis in water resources.
"""

import math
import warnings
from collections.abc import Callable

import numpy as np
import pandas as pd
import xarray as xr
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler

from xhydro import __version__
from xhydro.utils import update_history


[docs] def cluster_indices( clust_num: int | np.ndarray, labels_array: int | np.ndarray ) -> np.ndarray: """ Get the indices of elements with a specific cluster number using NumPy. Parameters ---------- clust_num : numpy.ndarray or int Cluster number to find indices for. labels_array : numpy.ndarray or int Array containing cluster labels. Returns ------- numpy.ndarray Indices of elements with the specified cluster number. """ return np.where(labels_array == clust_num)[0]
[docs] def get_groups_indices(cluster: list, sample: xr.Dataset) -> list: """ Get indices of groups from a clustering result, excluding the group labeled -1. Parameters ---------- cluster : list Clustering result with labels attribute. sample : xr.Dataset Data sample to fit the model. Returns ------- list Indices for each non-excluded group. """ grouped: list = [ sample.index.to_numpy()[cluster_indices(i, cluster.labels_)] for i in range(np.max(cluster.labels_) + 1) ] return grouped
[docs] def get_group_from_fit( model: Callable, param: dict, sample: xr.Dataset | xr.DataArray ) -> list: """ Get indices of groups from a fit using the specified model and parameters. Parameters ---------- model : callable Model class or instance with a fit method. param : dict Parameters for the model. sample : xr.Dataset or xr.DataArray Data sample to fit the model. Returns ------- list : List of indices for each non-excluded group. """ sample = ( sample.to_dataframe(name="value") .reset_index() .pivot(index="Station", columns="components") ) return get_groups_indices(model(**param).fit(sample), sample)
[docs] def fit_pca(ds: xr.Dataset, **kwargs) -> tuple: r""" Perform Principal Component Analysis (PCA) on the input dataset. This function scales the input data, applies PCA transformation, and returns the transformed data along with the PCA object. Parameters ---------- ds : xr.Dataset Input dataset to perform PCA on. \*\*kwargs : dict Additional keyword arguments to pass to the PCA constructor. Returns ------- tuple: A tuple containing: - data_pca (xr.DataArray): PCA-transformed data with 'Station' and 'components' as coordinates. - obj_pca (sklearn.decomposition.PCA): Fitted PCA object. Notes ----- - The input data is scaled before PCA is applied. - The number of components in the output depends on the n_components parameter passed to PCA. """ ds = _scale_data(ds) df = ds.to_dataframe() pca = PCA(**kwargs) obj_pca = pca.fit(df) data_pca = pca.transform(df) data_pca = xr.DataArray( data_pca, coords={ "Station": ds.coords["Station"].values, "components": list(range(pca.n_components_)), }, ) data_pca.attrs["long_name"] = "Fitted Scaled Data" data_pca.attrs["description"] = ( "Fitted scaled data with StandardScaler and PCA from sklearn.preprocessing and sklearn.decomposition" ) data_pca.attrs["fitted_variables"] = [v for v in ds.var()] return data_pca, obj_pca
def _scale_data(ds: xr.Dataset) -> xr.Dataset: scalar = StandardScaler() df = ds.to_dataframe() scaled_data = pd.DataFrame(scalar.fit_transform(df)) # scaling the data scaled_data.columns = ( df.columns ) # Sets columns name and index from original dataframe to scaled dataframe scaled_data.index = df.index return xr.Dataset(scaled_data) def _moment_l_vector(x_vec: np.array) -> list: return [_moment_l(x[~np.isnan(x)]) for x in x_vec] # L-moments calculation def _moment_l(x: np.array) -> list: """ Calculate L-moments for a given dataset. This function computes various L-moments and L-moment ratios for a given array of data. It can return the results either as a list or as an OrderedDict. Parameters ---------- x : list or array-like Input data for which to calculate L-moments. Returns ------- list : Returns List in the order: [lambda1, lambda2, lambda3, tau, tau3, tau4] L-moments calculated: - lambda1: First L-moment (location) - lambda2: Second L-moment (scale) - lambda3: Third L-moment - tau: L-CV (Coefficient of L-Variation) - tau3: L-CS (L-skewness) - tau4: L-CK (L-kurtosis) Note: - NaN values are removed from the input data before calculations. - The input data is sorted in descending order for the calculations. """ x = x[~np.isnan(x)] # Sorting the data in descending order x_sort = np.sort(x) x_sort = x_sort[::-1] n = np.size(x_sort) j = np.arange(1, n + 1) # ponderation of moments b0 = np.mean(x_sort) b1 = np.dot((n - j) / (n * (n - 1)), x_sort) b2 = np.dot((n - j) * (n - j - 1) / (n * (n - 1) * (n - 2)), x_sort) b3 = np.dot( (n - j) * (n - j - 1) * (n - j - 2) / (n * (n - 1) * (n - 2) * (n - 3)), x_sort ) # L-Moment lambda1 = b0 lambda2 = 2 * b1 - b0 lambda3 = 6 * b2 - 6 * b1 + b0 lambda4 = 20 * b3 - 30 * b2 + 12 * b1 - b0 tau = lambda2 / lambda1 # L_CV # tau2 dans Anctil, tau dans Hosking et al. tau3 = lambda3 / lambda2 # L_CS tau4 = lambda4 / lambda2 # L_CK return [lambda1, lambda2, lambda3, tau, tau3, tau4]
[docs] def calc_h_z( ds_groups: xr.Dataset, ds_moments_groups: xr.Dataset, kap: object, seed: int | None = None, ) -> xr.Dataset: """ Calculate heterogeneity measure H and Z-score for regional frequency analysis. Parameters ---------- ds_groups : xr.Dataset Dataset containing grouped data. ds_moments_groups : xr.Dataset Dataset containing L-moments for grouped data. kap : scipy.stats.kappa3 Kappa3 distribution object. seed : int, optional Random seed for reproducibility. Default is None. Returns ------- xr.Dataset Dataset containing calculated H values and Z-scores for each group. Notes ----- This function applies the heterogeneity measure and Z-score calculations to grouped data for regional frequency analysis. It uses L-moments and the Kappa3 distribution in the process. This function does not support lazy evaluation. Equations are based on Hosking, J. R. M., & Wallis, J. R. (1997). Regional frequency analysis (p. 240). """ tau = ds_moments_groups.sel(lmom="tau").load() tau3 = ds_moments_groups.sel(lmom="tau3").load() tau4 = ds_moments_groups.sel(lmom="tau4").load() longeur = ds_groups.copy().count(dim="time").load() station_dim = ds_groups.cf.cf_roles["timeseries_id"][0] ds_h, ds_b4, ds_sigma4, ds_tau4_r = xr.apply_ufunc( _heterogeneite_et_score_z, kap, longeur, tau, tau3, tau4, seed, input_core_dims=[ [], [station_dim], [station_dim], [station_dim], [station_dim], [], ], output_core_dims=[[], [], [], []], vectorize=True, ) ds_tau4 = _calculate_gev_tau4(ds_groups.load(), ds_moments_groups.load()) z_score = (ds_tau4 - ds_tau4_r + ds_b4) / ds_sigma4 z_score = _append_ds_vars_names(z_score, "_Z") ds_h = _append_ds_vars_names(ds_h, "_H") ds = _combine_h_z(xr.merge([z_score, ds_h])) ds["crit"].attrs[ "description" ] = f"H and Z score based on Hosking, J. R. M., & Wallis, J. R. (1997). Regional frequency analysis (p. 240). - xhydro version: {__version__}" ds["crit"].attrs["long_name"] = "Score" for v in ds.var(): ds[v].attrs[ "description" ] = f"H and Z score based on Hosking, J. R. M., & Wallis, J. R. (1997). Regional frequency analysis (p. 240). - xhydro version: {__version__}" return ds
def _calculate_gev_tau4( ds_groups: xr.Dataset, ds_moments_groups: xr.Dataset ) -> xr.Dataset: # H&W lambda_r_1, lambda_r_2, lambda_r_3 = _calc_lambda_r(ds_groups, ds_moments_groups) kappa = _calc_kappa(lambda_r_2, lambda_r_3) # Hosking et Wallis, eq. A53 tau4 = (5 * (1 - 4**-kappa) - 10 * (1 - 3**-kappa) + 6 * (1 - 2**-kappa)) / ( 1 - 2**-kappa ) return tau4 def _heterogeneite_et_score_z( kap: Callable, n: np.array, t: np.array, t3: np.array, t4: np.array, seed=None ) -> tuple: # We remove nan or 0 length # If not enough values to calculate some moments, other moments are removed as well bool_maks = (n != 0) & (~np.isnan(t)) & (~np.isnan(t3)) & (~np.isnan(t4)) n = n[bool_maks] t = t[bool_maks] t3 = t3[bool_maks] t4 = t4[bool_maks] # Hosking et Wallis, eq. 4.3 tau_r = np.dot(n, t) / np.sum(n) tau3_r = np.dot(n, t3) / np.sum(n) tau4_r = np.dot(n, t4) / np.sum(n) # L-CVs ponderated standard deviation for the sample (eq. 4.4) v = np.sqrt(np.dot(n, (t - tau_r) ** 2) / np.sum(n)) # Kappa distribution # Fit a kappa distribution to the region average L-moment ratios: try: kappa_param = kap.lmom_fit(lmom_ratios=[1, tau_r, tau3_r, tau4_r]) except ValueError as error: # FIXME: Cette message could be plus informative. warnings.warn( f"Kappa distribution fit blablabla (quelle serait la cause d'un ValueError?), returning all NaNs. Error: {error}." ) return ( np.nan, np.nan, np.nan, np.nan, ) # Returning nans for H, b4, sigma4, tau4_r except Exception as error: warnings.warn( f"Kappa distribution fit failed to converge, returning all NaNs. Error: {error}." ) if "Failed to converge" in repr(error): return ( np.nan, np.nan, np.nan, np.nan, ) # Returning nans for H, b4, sigma4, tau4_r else: raise error n_sim = 500 # Number of "virtual regions" simulated def _calc_tsim( _kappa_param: dict, _length: float, _n_sim: int, _seed=seed ) -> np.array: # For each station, we get n_sim vectors de same length than the observations rvs = kap.rvs( _kappa_param["k"], _kappa_param["h"], _kappa_param["loc"], _kappa_param["scale"], size=(_n_sim, int(_length)), random_state=_seed, ) return _momentl_optim(rvs) t_sim_tau4m = [_calc_tsim(kappa_param, length, n_sim) for length in n] t_sim = np.array( [tt[3] for tt in t_sim_tau4m] ) # Tau corresponds to the 3rd moment. tau4m = np.array([tt[5] for tt in t_sim_tau4m]) # Tau4 corresponds to the 5th term. b4 = np.mean(tau4m - tau4_r) sigma4 = np.sqrt( (1 / (n_sim - 1)) * (np.sum((tau4m - tau4_r) ** 2) - (n_sim * b4 * b4)) ) # Calculating V tau_r_sim = np.dot(n, t_sim) / np.sum(n) v_sim = np.sqrt(np.dot(n, (t_sim - tau_r_sim) ** 2) / np.sum(n)) mu_v = np.mean(v_sim) sigma_v = np.std(v_sim) h = (v - mu_v) / sigma_v return h, b4, sigma4, tau4_r # Calculating L-moments def _momentl_optim(x: np.array) -> list: if x.ndim == 1: x = x[~np.isnan(x)] # reverse sorting x_sort = np.sort(x) x_sort = x_sort[::-1] n = np.size(x_sort) j = np.arange(1, n + 1) b0 = np.mean(x_sort) b1 = np.dot((n - j) / (n * (n - 1)), x_sort) b2 = np.dot((n - j) * (n - j - 1) / (n * (n - 1) * (n - 2)), x_sort) b3 = np.dot( (n - j) * (n - j - 1) * (n - j - 2) / (n * (n - 1) * (n - 2) * (n - 3)), x_sort, ) elif x.ndim == 2: x.sort() x_sort = np.fliplr(x) nn = np.shape(x_sort) j = np.repeat([np.arange(1, nn[1] + 1)], nn[0], axis=0) n = nn[1] b0 = np.mean(x_sort, axis=1) b1 = np.dot((n - j) / (n * (n - 1)), x_sort.T)[0] b2 = np.dot((n - j) * (n - j - 1) / (n * (n - 1) * (n - 2)), x_sort.T)[0] b3 = np.dot( (n - j) * (n - j - 1) * (n - j - 2) / (n * (n - 1) * (n - 2) * (n - 3)), x_sort.T, )[0] else: raise NotImplementedError("Only 1d and 2d have been implemented") # Moment L lambda1 = b0 lambda2 = 2 * b1 - b0 lambda3 = 6 * b2 - 6 * b1 + b0 lambda4 = 20 * b3 - 30 * b2 + 12 * b1 - b0 tau = lambda2 / lambda1 # L_CV # tau2 in Anctil, tau dans Hosking et al. tau3 = lambda3 / lambda2 # L_CS tau4 = lambda4 / lambda2 # L_CK return [lambda1, lambda2, lambda3, tau, tau3, tau4] def _append_ds_vars_names(ds: xr.Dataset, suffix: str) -> xr.Dataset: for name in ds.data_vars: ds = ds.rename({name: name + suffix}) return ds
[docs] def mask_h_z( ds: xr.Dataset, thresh_h: float | None = 1, thresh_z: float | None = 1.64 ) -> xr.DataArray: """ Create a boolean mask based on heterogeneity measure H and Z-score thresholds. Parameters ---------- ds : xr.Dataset Dataset containing H and Z values for each group. thresh_h : float, optional Threshold for the heterogeneity measure H. Default is 1. thresh_z : float, optional Threshold for the absolute Z-score. Default is 1.64. Returns ------- xr.DataArray Boolean mask where True indicates groups that meet both threshold criteria. """ ds_out = (ds.sel(crit="H") < thresh_h) & (abs(ds.sel(crit="Z")) < thresh_z) for v in ds_out.var(): ds_out[v].attrs["H_threshold"] = thresh_h ds_out[v].attrs["Z_threshold"] = thresh_z ds_out[v].attrs["long_name"] = "Mask" ds_out[v].attrs["description"] = "Mask for regions based on H & Z thresholds" ds_out[v].attrs["history"] = update_history( f"Mask for regions based on H ({thresh_h}) & Z ({thresh_z}) thresholds", ds_out[v], ) return ds_out
def _combine_h_z(ds: xr.Dataset) -> xr.Dataset: new_ds = xr.Dataset() for v in ds: if "_Z" in v: new_ds[v.removesuffix("_Z")] = xr.concat( [ds[v.replace("_Z", "_H")], ds[v]], dim="crit" ).assign_coords(crit=["H", "Z"]) return new_ds
[docs] def calculate_rp_from_afr( ds_groups: xr.Dataset, ds_moments_groups: xr.Dataset, rp: np.array, l1: xr.DataArray | None = None, ) -> xr.DataArray: """ Calculate return periods from Annual Flow Regime (AFR) analysis. Parameters ---------- ds_groups : xr.Dataset Dataset containing grouped flow data. ds_moments_groups : xr.Dataset Dataset containing L-moments for grouped data. rp : array-like Return periods to calculate. l1 : xr.DataArray, optional First L-moment (location) values. L-moment can be specified for ungauged catchments. If None, values are taken from ds_moments_groups. Returns ------- xr.DataArray Calculated return periods for each group and specified return period. Notes ----- This function calculates return periods using the Annual Flow Regime method. If l1 is not provided, it uses the first L-moment from ds_moments_groups. The function internally calls calculate_ic_from_AFR to compute the flood index. Equations are based on Hosking, J. R. M., & Wallis, J. R. (1997). Regional frequency analysis (p. 240). """ if l1 is None: station_dim = ds_moments_groups.cf.cf_roles["timeseries_id"][0] l1 = ds_moments_groups.sel(lmom="l1").dropna(dim=station_dim, how="all") ds = _calculate_ic_from_afr(ds_groups, ds_moments_groups, rp) * l1 for v in ds.var(): ds[v].attrs["long_name"] = "Return period" ds[v].attrs[ "description" ] = "Calculated return periods for each group and specified return period." ds[v].attrs["history"] = update_history("Computed return periods", ds[v]) ds[v].attrs["units"] = ds_groups[v].attrs["units"] return ds
def _calculate_ic_from_afr( ds_groups: xr.Dataset, ds_moments_groups: xr.Dataset, rp: list ) -> xr.Dataset: lambda_r_1, lambda_r_2, lambda_r_3 = _calc_lambda_r(ds_groups, ds_moments_groups) # alpha = location # xi = scale # kappa = shape kappa = _calc_kappa(lambda_r_2, lambda_r_3) term = xr.apply_ufunc(_calc_gamma, (1 + kappa), vectorize=True) # Hosking et Wallis, eq. A56. et Anctil et al. 1998, eq. 7 et 8. alpha = (lambda_r_2 * kappa) / ((1 - (2**-kappa)) * term) xi = lambda_r_1 + (alpha * (term - 1)) / kappa # Calculating wanted return periods t = xr.DataArray(data=rp, dims="return_period").assign_coords(return_period=rp) # Hosking et Wallis, eq. A44 et Anctil et al. 1998, eq. 5. q_rt = xi + alpha * (1 - (-np.log((t - 1) / t)) ** kappa) / kappa return q_rt def _calc_gamma(val): return math.gamma(val)
[docs] def remove_small_regions(ds: xr.Dataset, thresh: int = 5) -> xr.Dataset: """ Remove regions from the dataset that have fewer than the threshold number of stations. Parameters ---------- ds : xr.Dataset The input dataset containing regions and stations. thresh : int, optional The minimum number of stations required for a region to be kept. Default is 5. Returns ------- xr.Dataset The dataset with small regions removed. """ station_dim = ds.cf.cf_roles["timeseries_id"][0] for gr in ds.group_id: if ( len(ds.sel(group_id=gr).dropna(dim=station_dim, how="all")[station_dim]) < thresh ): ds = ds.drop_sel(group_id=gr) return ds
def _calc_kappa(lambda_r_2, lambda_r_3): # Hosking et Wallis, éq. A55 c = (2 / (3 + (lambda_r_3 / lambda_r_2))) - (np.log(2) / np.log(3)) # Hosking et Wallis, éq. A55 (generally acceptable approximation) kappa = 7.8590 * c + 2.9554 * (c**2) return kappa def _calc_lambda_r( ds_groups: xr.Dataset, ds_moments_groups: xr.Dataset ) -> tuple[int, xr.Dataset, xr.Dataset]: station_dim = ds_groups.cf.cf_roles["timeseries_id"][0] nr = ds_moments_groups.count(dim=station_dim).isel(lmom=0) nk = ds_groups.dropna(dim=station_dim, how="all").count(dim="time") wk = (nk * nr) / (nk + nr) lambda_k0 = ds_moments_groups.sel(lmom="l1").dropna(dim=station_dim, how="all") lambda_k1 = ds_moments_groups.sel(lmom="l2").dropna(dim=station_dim, how="all") lambda_k2 = ds_moments_groups.sel(lmom="l3").dropna(dim=station_dim, how="all") l2 = (lambda_k1 / lambda_k0) * wk l3 = (lambda_k2 / lambda_k0) * wk lambda_r_1 = 1 # Anctil et al. 1998 (p.362) lambda_r_2 = l2.sum(dim=station_dim) / wk.sum(dim=station_dim) lambda_r_3 = l3.sum(dim=station_dim) / wk.sum(dim=station_dim) return lambda_r_1, lambda_r_2, lambda_r_3
[docs] def calc_moments(ds: xr.Dataset) -> xr.Dataset: """ Calculate L-moments for multiple stations. Parameters ---------- ds : xr.Dataset A vector of stations, where each element is an array-like object containing the data for which to calculate L-moments. Returns ------- xr.Dataset L-moment dataset with a new lmom dimension. Notes ----- NaN values in each station are removed before calculating L-moments. The function uses the `moment_l` function to calculate L-moments for each individual stations. Equations are based on Hosking, J. R. M., & Wallis, J. R. (1997). Regional frequency analysis (p. 240). """ ds = xr.apply_ufunc( _moment_l_vector, ds, input_core_dims=[["time"]], output_core_dims=[["lmom"]], keep_attrs=True, ).assign_coords(lmom=["l1", "l2", "l3", "tau", "tau3", "tau4"]) # TODO: add attributes for v in ds.var(): ds[v].attrs["long_name"] = "L-moments" ds[v].attrs[ "description" ] = "L-moments based on Hosking, J. R. M., & Wallis, J. R. (1997). Regional frequency analysis (p. 240)" ds[v].attrs["history"] = update_history("Computed L-moments", ds[v]) ds[v].attrs.pop("cell_methods", None) ds[v].attrs.pop("units", None) return ds
[docs] def group_ds(ds: xr.Dataset, groups: list) -> xr.Dataset: """ Group a dataset by a list of groups. Parameters ---------- ds : xr.Dataset The input dataset to be grouped. groups : list A list of groups to be used for grouping the dataset. Returns ------- xr.Dataset A new dataset with the grouped data. """ ds_groups = xr.concat( [ ds.sel(id=groups[i]).assign_coords(group_id=i).expand_dims("group_id") for i in range(len(groups)) ], dim="group_id", ) ds_groups["group_id"].attrs["cf_role"] = "group_id" for v in ds_groups.var(): ds_groups[v].attrs["history"] = update_history("Grouped with", ds_groups[v]) return ds_groups