"""Local frequency analysis functions and utilities."""
import datetime
import numpy as np
import statsmodels
import xarray as xr
import xclim.indices.stats
from scipy.stats.mstats import plotting_positions
from statsmodels.tools import eval_measures
from xscen.utils import standardize_periods
__all__ = [
"criteria",
"fit",
"parametric_quantiles",
]
[docs]
def fit(
ds,
*,
distributions: list[str] | None = None,
min_years: int | None = None,
method: str = "ML",
periods: list[str] | list[list[str]] | None = None,
) -> xr.Dataset:
"""Fit multiple distributions to data.
Parameters
----------
ds : xr.Dataset
Dataset containing the data to fit. All variables will be fitted.
distributions : list of str, optional
List of distribution names as defined in `scipy.stats`. See https://docs.scipy.org/doc/scipy/reference/stats.html#continuous-distributions.
Defaults to ["genextreme", "pearson3", "gumbel_r", "expon"].
min_years : int, optional
Minimum number of years required for a distribution to be fitted.
method : str
Fitting method. Defaults to "ML" (maximum likelihood).
periods : list of str or list of list of str, optional
Either [start, end] or list of [start, end] of periods to be considered.
If multiple periods are given, the output will have a `horizon` dimension.
If None, all data is used.
Returns
-------
xr.Dataset
Dataset containing the parameters of the fitted distributions, with a new dimension `scipy_dist` containing the distribution names.
Notes
-----
In order to combine the parameters of multiple distributions, the size of the `dparams` dimension is set to the
maximum number of unique parameters between the distributions.
"""
distributions = distributions or ["genextreme", "pearson3", "gumbel_r", "expon"]
periods = (
standardize_periods(periods, multiple=True)
if periods is not None
else [[str(int(ds.time.dt.year.min())), str(int(ds.time.dt.year.max()))]]
)
out = []
for period in periods:
outp = []
ds_subset = ds.sel(time=slice(period[0], period[1]))
for v in ds.data_vars:
p = []
for d in distributions:
p.append( # noqa: PERF401
xclim.indices.stats.fit(
ds_subset[v].chunk({"time": -1}), dist=d, method=method
)
.assign_coords(scipy_dist=d)
.expand_dims("scipy_dist")
)
params = xr.concat(p, dim="scipy_dist")
# Reorder dparams to match the order of the parameters across all distributions, since subsequent operations rely on this.
p_order = sorted(
set(params.dparams.values).difference(["loc", "scale"])
) + [
"loc",
"scale",
]
params = params.sel(dparams=p_order)
if min_years is not None:
params = params.where(ds_subset[v].notnull().sum("time") >= min_years)
params = params.expand_dims({"horizon": ["-".join(period)]})
params.attrs["scipy_dist"] = distributions
params.attrs["description"] = "Parameters of the distributions"
params.attrs["long_name"] = "Distribution parameters"
params.attrs["min_years"] = min_years
outp.append(params)
out.append(xr.merge(outp))
out = xr.concat(out, dim="horizon")
out = out.chunk({"dparams": -1})
out.attrs = ds.attrs
if len(out.horizon) == 1:
# If only one period was asked, remove the horizon dimension, but keep the period in the coordinates.
out = out.squeeze("horizon")
return out
[docs]
def parametric_quantiles(
p: xr.Dataset, t: float | list[float], mode: str = "max"
) -> xr.Dataset:
"""Compute quantiles from fitted distributions.
Parameters
----------
p : xr.Dataset
Dataset containing the parameters of the fitted distributions.
Must have a dimension `dparams` containing the parameter names and a dimension `scipy_dist` containing the distribution names.
t : float or list of float
Return period(s) in years.
mode : {'max', 'min'}
Whether the return period is the probability of exceedance (max) or non-exceedance (min).
Returns
-------
xr.Dataset
Dataset containing the quantiles of the distributions.
"""
distributions = list(p["scipy_dist"].values)
t = np.atleast_1d(t)
if mode == "max":
q = 1 - 1.0 / t
elif mode == "min":
q = 1.0 / t
else:
raise ValueError(f"'mode' must be 'max' or 'min', got '{mode}'.")
out = []
for v in p.data_vars:
quantiles = []
for d in distributions:
dist_obj = xclim.indices.stats.get_dist(d)
shape_params = [] if dist_obj.shapes is None else dist_obj.shapes.split(",")
dist_params = shape_params + ["loc", "scale"]
da = p[v].sel(scipy_dist=d, dparams=dist_params).transpose("dparams", ...)
da.attrs["scipy_dist"] = d
qt = (
xclim.indices.stats.parametric_quantile(da, q=q)
.rename({"quantile": "return_period"})
.assign_coords(scipy_dist=d, return_period=t)
.expand_dims("scipy_dist")
)
quantiles.append(qt)
quantiles = xr.concat(quantiles, dim="scipy_dist")
# Add the quantile as a new coordinate
da_q = xr.DataArray(q, dims="return_period", coords={"return_period": t})
da_q.attrs["long_name"] = (
"Probability of exceedance"
if mode == "max"
else "Probability of non-exceedance"
)
da_q.attrs["description"] = (
"Parametric distribution quantiles for the given return period."
)
da_q.attrs["mode"] = mode
quantiles = quantiles.assign_coords(p_quantile=da_q)
quantiles.attrs["scipy_dist"] = distributions
quantiles.attrs["description"] = (
f"Return period ({mode}) estimated with statistic distributions"
)
quantiles.attrs["long_name"] = "Return period"
quantiles.attrs["mode"] = mode
out.append(quantiles)
out = xr.merge(out)
out.attrs = p.attrs
return out
[docs]
def criteria(ds: xr.Dataset, p: xr.Dataset) -> xr.Dataset:
"""Compute information criteria (AIC, BIC, AICC) from fitted distributions, using the log-likelihood.
Parameters
----------
ds : xr.Dataset
Dataset containing the yearly data that was fitted.
p : xr.Dataset
Dataset containing the parameters of the fitted distributions.
Must have a dimension `dparams` containing the parameter names and a dimension `scipy_dist` containing the distribution names.
Returns
-------
xr.Dataset
Dataset containing the information criteria for the distributions.
"""
def _get_criteria_1d(da, params, dist):
da = np.atleast_2d(da).T
params = np.atleast_2d(params).T
llf = np.nansum(dist.logpdf(da, *params), axis=0) # log-likelihood
nobs = np.sum(np.isfinite(da), axis=0)
df_modelwc = len(p)
dof_eff = nobs - df_modelwc - 1.0
aic = eval_measures.aic(llf=llf, nobs=nobs, df_modelwc=len(p))
bic = eval_measures.bic(llf=llf, nobs=nobs, df_modelwc=len(p))
# Custom AICC, because the one in statsmodels does not support multiple dimensions
aicc = np.where(
dof_eff > 0, -2.0 * llf + 2.0 * df_modelwc * nobs / dof_eff, np.nan
)
return np.stack([aic, bic, aicc], axis=1)
out = []
distributions = list(p["scipy_dist"].values)
common_vars = list(set(ds.data_vars).intersection(p.data_vars))
for v in common_vars:
c = []
for d in distributions:
dist_obj = xclim.indices.stats.get_dist(d)
shape_params = [] if dist_obj.shapes is None else dist_obj.shapes.split(",")
dist_params = shape_params + ["loc", "scale"]
da = ds[v].transpose("time", ...)
params = (
p[v].sel(scipy_dist=d, dparams=dist_params).transpose("dparams", ...)
)
if len(da.dims) == 1:
crit = xr.apply_ufunc(
_get_criteria_1d,
da.expand_dims("tmp"),
params.expand_dims("tmp"),
kwargs={"dist": dist_obj},
input_core_dims=[["time"], ["dparams"]],
output_core_dims=[["criterion"]],
dask_gufunc_kwargs=dict(output_sizes={"criterion": 3}),
dask="parallelized",
).squeeze("tmp")
else:
crit = xr.apply_ufunc(
_get_criteria_1d,
da,
params,
kwargs={"dist": dist_obj},
input_core_dims=[["time"], ["dparams"]],
output_core_dims=[["criterion"]],
dask_gufunc_kwargs=dict(output_sizes={"criterion": 3}),
dask="parallelized",
)
# Add attributes
crit = crit.assign_coords(criterion=["aic", "bic", "aicc"]).expand_dims(
"scipy_dist"
)
crit.attrs = p[v].attrs
crit.attrs["history"] = (
crit.attrs.get("history", "")
+ f", [{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] "
f"criteria: computed AIC, BIC and AICC. - statsmodels version: {statsmodels.__version__}"
)
crit.attrs["description"] = (
"Information criteria for the distribution parameters."
)
crit.attrs["long_name"] = "Information criteria"
# Remove a few attributes that are not relevant anymore
crit.attrs.pop("method", None)
crit.attrs.pop("estimator", None)
crit.attrs.pop("min_years", None)
c.append(crit)
c = xr.concat(c, dim="scipy_dist")
out.append(c)
out = xr.merge(out)
out.attrs = ds.attrs
return out
def _get_plotting_positions(data_array, alpha=0.4, beta=0.4, return_period=True):
"""Calculate plotting positions for data.
Parameters
----------
data_array : xarray DataArray
Input data.
alpha : float, optional
Plotting position parameter, by default 0.4.
beta : float, optional
Plotting position parameter, by default 0.4.
return_period : bool, optional
If True, return periods instead of probabilities, by default True.
Returns
-------
xarray DataArray
The data with plotting positions assigned.
Notes
-----
See scipy.stats.mstats.plotting_positions for typical values for alpha and beta. (0.4, 0.4) : approximately quantile unbiased (Cunnane).
"""
data_copy = data_array.copy(deep=True)
def vec_plotting_positions(vec_data, alpha=0.4, beta=0.4):
"""Calculate plotting positions for vectorized data.
Parameters
----------
vec_data : ndarray
Input data, with time dimension first.
alpha : float, optional
Plotting position parameter.
beta : float, optional
Plotting position parameter.
Returns
-------
ndarray
Array with plotting positions assigned to valid data points,
and NaNs assigned to invalid data points.
"""
out = []
if vec_data.ndim == 1:
valid = ~np.isnan(vec_data)
pp = plotting_positions(vec_data[valid], alpha, beta)
out = np.full_like(vec_data.astype(float), np.nan)
out[valid] = pp
else:
for data in vec_data:
valid = ~np.isnan(data)
pp = plotting_positions(data[valid], alpha, beta)
pp_full = np.full_like(data.astype(float), np.nan)
pp_full[valid] = pp
out.append(pp_full)
return out
pp = xr.apply_ufunc(
vec_plotting_positions,
data_array,
alpha,
beta,
input_core_dims=[["time"], [], []],
output_core_dims=[["time"]],
)
if return_period:
pp = -1 / (pp - 1)
for name in pp.data_vars:
pp = pp.rename({name: name + "_pp"})
pp[name] = data_copy[name]
return pp
def _prepare_plots(params, xmin=1, xmax=10000, npoints=100, log=True):
"""Prepare x-values for plotting frequency analysis results.
Parameters
----------
params : xarray DataArray
Input data.
xmin : float, optional
Minimum x value, by default 1.
xmax : float, optional
Maximum x value, by default 10000.
npoints : int, optional
Number of x values, by default 100.
log : bool, optional
If True, return log-spaced x values, by default True.
Returns
-------
xarray DataArray
The data with plotting positions assigned.
"""
if log:
x = np.logspace(np.log10(xmin), np.log10(xmax), npoints, endpoint=True)
else:
x = np.linspace(xmin, xmax, npoints)
return parametric_quantiles(params, x)