Source code for xscen.aggregate

"""Functions to aggregate data over time and space."""

import datetime
import logging
import os
import warnings
from collections.abc import Sequence
from copy import deepcopy
from pathlib import Path
from types import ModuleType
from typing import Literal

import geopandas as gpd
import numpy as np
import pandas as pd
import scipy
import shapely
import xarray as xr
import xclim as xc
import xclim.core.calendar

from .regrid import create_bounds_gridmapping


try:
    import xesmf as xe
except (ImportError, KeyError) as e:
    if isinstance(e, KeyError):
        if e.args[0] == "Author":
            warnings.warn(
                "The xesmf package could not be imported due to a known KeyError bug that occurs with some "
                "older versions of ESMF and specific execution setups (such as debugging on a Windows machine). "
                "As a workaround, try installing 'importlib-metadata <8.0.0' and/or updating ESMF. If you do not "
                "need 'xesmf' functionalities (e.g. regridding), you can ignore this warning.",
                stacklevel=2,
            )
        else:
            raise e
    xe = None
from xclim.core.indicator import Indicator
from xclim.core.units import pint2cfattrs, units2pint

from .config import parse_config
from .extract import subset_warming_level
from .indicators import compute_indicators
from .spatial import Region, get_grid_mapping, subset
from .utils import stack_dates, standardize_periods, unstack_dates, update_attr


logger = logging.getLogger(__name__)

__all__ = [
    "climatological_op",
    "compute_deltas",
    "produce_horizon",
    "spatial_mean",
]


# Dummy function to make gettext aware of translatable-strings
def _(s):
    return s


[docs] @parse_config def climatological_op( # noqa: C901 ds: xr.Dataset, *, op: str | dict = "mean", window: int | None = None, min_periods: int | float | None = None, stride: int = 1, periods: list[str] | list[list[str]] | None = None, rename_variables: bool = True, to_level: str = "climatology", horizons_as_dim: bool = False, **unstack_kwargs, ) -> xr.Dataset: r""" Perform an operation 'op' over time, for given time periods, respecting the temporal resolution of ds. Parameters ---------- ds : xr.Dataset Dataset to use for the computation. op : str or dict Operation to perform over time. The operation can be any method name of xarray.core.rolling.DatasetRolling, 'linregress', or a dictionary. If 'op' is a dictionary, the key is the operation name and the value is a dict of kwargs accepted by the equivalent NumPy function. See the Notes for more information. While other operations are technically possible, the following are recommended and tested: ['max', 'mean', 'median', 'min', 'std', 'sum', 'var', 'linregress', 'theilslopes']. Operations beyond methods of xarray.core.rolling.DatasetRolling include: - 'linregress' : Computes the linear regression over time, using scipy.stats.linregress and employing years as regressors. The output will have a new dimension 'linreg_param' with coordinates: ['slope', 'intercept', 'rvalue', 'pvalue', 'stderr', 'intercept_stderr']. - 'theilslopes' : Computes the Theil-Sen estimator over time, using scipy.stats.theilslopes and employing years as regressors. Correlation and p-value for the correlation are also computed using scipy.stats.kendalltau, as the Theil-Sen estimator is based on Kendall's tau. Other kwargs can be passed by defining an 'op' dictionary as described above but in this case users must specify kwargs for both theilslopes and kendalltau functions : example op={"theilslopes": {"theilslopes":{"alpha": 0.90}, "kendalltau": {"method": "auto"}}}. The output will have a new dimension 'theilslopes_param' with coordinates: ['slope', 'intercept', 'lower_slope', 'upper_slope', 'correlation', 'p_value']. Only one operation per call is supported, so len(op)==1 if a dict. window : int, optional Number of years to use for the rolling operation. If left at None and periods is given, window will be the size of the first period. Hence, if periods are of different lengths, the shortest period should be passed first. If left at None and periods is not given, the window will be the size of the input dataset. min_periods : int or float, optional For the rolling operation, minimum number of years required for a value to be computed. If left at None and the xrfreq is either QS or AS and doesn't start in January, min_periods will be one less than window. Otherwise, if left at None, it will be deemed the same as 'window'. If passed as a float value between 0 and 1, this will be interpreted as the floor of the fraction of the window size. stride : int Stride (in years) at which to provide an output from the rolling window operation. periods : list of str or list of lists of str, optional Either [start, end] or list of [start, end] of continuous periods to be considered. This is needed when the time axis of ds contains some jumps in time. If None, the dataset will be considered continuous. rename_variables : bool If True, '_clim_{op}' will be added to variable names. to_level : str, optional The processing level to assign to the output. If None, the processing level of the inputs is preserved. horizons_as_dim : bool If True, the output will have 'horizon' and the frequency as 'month', 'season' or 'year' as dimensions and coordinates. The 'time' coordinate will be unstacked to horizon and frequency dimensions. Horizons originate from periods and/or windows and their stride in the rolling operation. **unstack_kwargs : dict Other arguments to pass to `py:func:~xscen.utils.unstack_dates`. Returns ------- xr.Dataset Dataset with the results from the climatological operation. See Also -------- scipy.stats.linregress : Linear least-squares regression for two sets of measurements. scipy.stats.theilslopes : Theil-Sen estimator for a set of points (x, y). Notes ----- xarray.core.rolling.DatasetRolling functions do not support kwargs other than 'keep_attrs'. In order to pass additional arguments to the operation, we instead use the 'reduce' method and pass the operation as a numpy function. If possible, a function that handles NaN values will be used (e.g. op='mean' will use `np.nanmean`), as the 'min_periods' argument already decides how many NaN values are acceptable. """ # more than one operation per call is not supported (yet), case for dict if isinstance(op, dict) and len(op) > 1: raise NotImplementedError("xs.climatological_op does not currently support more than one operation per call.") periods = standardize_periods(periods or [[int(ds.time.dt.year[0]), int(ds.time.dt.year[-1])]]) window = window or int(periods[0][1]) - int(periods[0][0]) + 1 ds_unstack = xr.concat([unstack_dates(ds.sel(time=slice(period[0], period[1])), **unstack_kwargs) for period in periods], "time") # Rolling will ignore gaps in time, so raise an exception beforehand if (not all(ds_unstack.time.dt.year.diff(dim="time", n=1) == 1)) & (periods is None): raise ValueError("Data is not continuous. Use the 'periods' argument.") # there is one less occurrence when a period crosses years freq_across_year = [f"{f}-{mon}" for mon in xr.coding.cftime_offsets._MONTH_ABBREVIATIONS.values() for f in ["AS", "QS"] if mon != "JAN"] if ( any( x in freq_across_year for x in [ ds.attrs.get("cat:xrfreq"), (xr.infer_freq(ds.time) if len(ds.time) > 3 else None), ] ) and min_periods is None ): min_periods = window - 1 # unpack min_periods as fraction of window if isinstance(min_periods, float): if 0 < min_periods <= 1: min_periods = int(np.floor(min_periods * window)) else: raise ValueError(f"When 'min_periods' is passed as a 'float', it must be between 0 and 1. Got {min_periods}.") # set min_periods min_periods = min_periods or window if min_periods > window: raise ValueError("'min_periods' should be smaller or equal to 'window'") # if op is a dict, unpack it if isinstance(op, dict): op, op_kwargs = op.copy().popitem() op_kwargs.setdefault("keep_attrs", True) else: op_kwargs = {"keep_attrs": True} # special case for averaging standard deviations: need to convert to variance before averaging ds_has_std = False std_v = [] if op == "mean": for vv in ds_unstack.data_vars: if "std" in vv or "standard deviation" in ds_unstack[vv].attrs.get("description", "").lower(): ds_unstack[vv] = np.square(ds_unstack[vv]) ds_has_std = True std_v.extend([vv]) # Compute the climatological operation concats = [] for period in periods: # Rolling average ds_rolling = ds_unstack.sel(time=slice(period[0], period[1])).rolling(time=window, min_periods=min_periods) # apply operation on rolling object if hasattr(ds_rolling, op) and callable(getattr(ds_rolling, op)): if op not in ["max", "mean", "median", "min", "std", "sum", "var"]: warnings.warn(f"The requested operation '{op}' has not been tested and may produce unexpected results.", stacklevel=2) if len(set(op_kwargs.keys()).difference(["keep_attrs"])) > 0: # Reduce allows for kwargs to be passed to the operation, but it is less efficient try: numpy_op = getattr(np, f"nan{op}") except AttributeError: warnings.warn( f"The requested operation '{op}' has no NaN handling in numpy. Results may ignore the 'min_periods' argument.", stacklevel=2 ) numpy_op = getattr(np, op) ds_rolling = ds_rolling.reduce(numpy_op, **op_kwargs) else: ds_rolling = getattr(ds_rolling, op)(**op_kwargs) # revert variance to std, where applicable if op == "mean" and ds_has_std: for vv in std_v: ds_rolling[vv] = np.sqrt(ds_rolling[vv]) # Shift by window - 1 to position the label at the start of the window # Select the windows at provided stride, dropping the last incomplete windows ds_rolling = ds_rolling.shift(time=-(window - 1)).isel(time=slice(None, -(window - 1), stride)) elif op == "theilslopes": ds_rolling = _common_trend_utils( ds_rolling=ds_rolling, func="theilslopes", op_kwargs=op_kwargs, window=window, stride=stride, min_periods=min_periods ) elif op == "linregress": ds_rolling = _common_trend_utils( ds_rolling=ds_rolling, func="linregress", op_kwargs=op_kwargs, window=window, stride=stride, min_periods=min_periods ) else: raise ValueError(f"Operation '{op}' not implemented.") # build horizons horizons = xr.DataArray( [f"{yr}-{yr + window - 1}" for yr in ds_rolling.time.dt.year.values], dims=dict(time=ds_rolling.time), ).astype(str) ds_rolling = ds_rolling.assign_coords(horizon=horizons) concats.extend([ds_rolling]) # end loop over periods # concatenate results ds_rolling = xr.concat(concats, dim="time", data_vars="minimal") # update data_vars names, attrs, history if rename_variables: ds_rolling = ds_rolling.rename_vars({vv: f"{vv}_clim_{op}" for vv in ds_rolling.data_vars}) for vv in ds_rolling.data_vars: for a in ["description", "long_name"]: try: op_format = dict.fromkeys(("mean", "std", "var", "sum"), "adj") | dict.fromkeys(("max", "min"), "noun") operation = xc.core.formatting.default_formatter.format_field(op, op_format[op]) except (KeyError, ValueError): operation = op update_attr( ds_rolling[vv], a, _("{window}-year climatological {operation} of {attr}."), window=window, operation=operation, ) new_history = ( f"[{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] {window}-year climatological {operation} " f"over window (non-centered), with a minimum of {min_periods} years of data - xarray v{xr.__version__}" ) history = new_history + " \n " + ds_rolling[vv].attrs["history"] if "history" in ds_rolling[vv].attrs else new_history ds_rolling[vv].attrs["history"] = history # update processing level if to_level is not None: ds_rolling.attrs["cat:processing_level"] = to_level if not horizons_as_dim: return stack_dates(ds_rolling) out = ds_rolling.swap_dims(time="horizon").drop_vars("time").rename(original_time="time") if out.sizes.get(unstack_kwargs.get("new_dim", "season")) == 1: out = out.squeeze(unstack_kwargs.get("new_dim", "season"), drop=True) return out
[docs] @parse_config def compute_deltas( # noqa: C901 ds: xr.Dataset, reference_horizon: str | xr.Dataset, *, kind: str | dict = "+", rename_variables: bool = True, to_level: str | None = "deltas", ) -> xr.Dataset: """ Compute deltas in comparison to a reference time period, respecting the temporal resolution of ds. Parameters ---------- ds : xr.Dataset Dataset to use for the computation. reference_horizon : str or xr.Dataset Either a YYYY-YYYY string corresponding to the 'horizon' coordinate of the reference period, or a xr.Dataset containing the climatological mean. kind : str or dict ['+', '/', '%'] Whether to provide absolute, relative, or percentage deltas. Can also be a dictionary separated per variable name. rename_variables : bool If True, '_delta_YYYY-YYYY' will be added to variable names. to_level : str, optional The processing level to assign to the output. If None, the processing level of the inputs is preserved. Returns ------- xr.Dataset Returns a Dataset with the requested deltas. """ if isinstance(reference_horizon, str): if reference_horizon not in ds.horizon: raise ValueError(f"The reference horizon {reference_horizon} is not in the dataset.") # Separate the reference from the other horizons if xc.core.utils.uses_dask(ds["horizon"]): ds["horizon"].load() ref = ds.where(ds.horizon == reference_horizon, drop=True) elif isinstance(reference_horizon, xr.Dataset): ref = reference_horizon if "horizon" in ref: reference_horizon = np.unique(ref["horizon"]) if len(reference_horizon) != 1: raise ValueError("The reference dataset appears to contain multiple horizons.") reference_horizon = reference_horizon[0] else: reference_horizon = "unknown_horizon" else: raise ValueError("reference_horizon should be either a string or an xarray.Dataset.") if "time" in ds: if (len(ds.time) >= 3) and (xr.infer_freq(ds.time) == "D"): raise NotImplementedError("xs.compute_deltas does not currently support daily data.") # Remove references to 'year' in REF mindex_coords_1 = xr.Coordinates.from_pandas_multiindex( pd.MultiIndex.from_arrays( [ref.time.dt.month.values, ref.time.dt.day.values], names=["month", "day"], ), dim="time", ) ref = ref.assign_coords(coords=mindex_coords_1).unstack("time") mindex_coords_2 = xr.Coordinates.from_pandas_multiindex( pd.MultiIndex.from_arrays( [ ds.time.dt.year.values, ds.time.dt.month.values, ds.time.dt.day.values, ], names=["year", "month", "day"], ), dim="time", ) other_hz = ds.assign_coords(coords=mindex_coords_2).unstack("time") else: other_hz = ds ref = ref.squeeze() deltas = xr.Dataset(coords=other_hz.coords, attrs=other_hz.attrs) # Calculate deltas for vv in list(ds.data_vars): v_name = vv if rename_variables is False else f"{vv}_delta_{reference_horizon.replace('-', '_')}" with xr.set_options(keep_attrs=True): if (isinstance(kind, dict) and kind[vv] == "+") or kind == "+": _kind = "abs." deltas[v_name] = other_hz[vv] - ref[vv] unit = pint2cfattrs(units2pint(other_hz[vv].attrs["units"]), is_difference=True) deltas[v_name].attrs.update(unit) elif (isinstance(kind, dict) and kind[vv] == "/") or kind == "/": _kind = "rel." deltas[v_name] = other_hz[vv] / ref[vv] deltas[v_name].attrs["units"] = "" elif (isinstance(kind, dict) and kind[vv] == "%") or kind == "%": _kind = "pct." deltas[v_name] = 100 * (other_hz[vv] - ref[vv]) / ref[vv] deltas[v_name].attrs["units"] = "%" else: raise ValueError(f"Delta 'kind' not understood. Should be '+', '/' or '%', received {kind}.") # modify attrs and history deltas[v_name].attrs["delta_kind"] = _kind deltas[v_name].attrs["delta_reference"] = reference_horizon for a in ["description", "long_name"]: update_attr( deltas[v_name], a, _("{attr1}: {kind} delta compared to {refhoriz}."), others=[other_hz[vv]], refhoriz=reference_horizon, kind=_kind, ) new_history = f"[{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] {_kind} delta vs. {reference_horizon} - xarray v{xr.__version__}" history = new_history + " \n " + deltas[v_name].attrs["history"] if "history" in deltas[v_name].attrs else new_history deltas[v_name].attrs["history"] = history if "time" in ds: # get back to 1D time deltas = deltas.stack(time=("year", "month", "day")) # rebuild time coord if isinstance(ds.indexes["time"], pd.core.indexes.datetimes.DatetimeIndex): time_coord = list( pd.to_datetime( { "year": deltas.year.values, "month": deltas.month.values, "day": deltas.day.values, } ).values ) elif isinstance(ds.indexes["time"], xr.coding.cftimeindex.CFTimeIndex): time_coord = [ xclim.core.calendar.datetime_classes[ds.time.dt.calendar](y, m, d) for y, m, d in zip(deltas.year.values, deltas.month.values, deltas.day.values, strict=False) ] else: raise ValueError("The type of 'time' could not be understood.") deltas = deltas.drop_vars({"year", "day", "month", "time"}).assign(time=time_coord).transpose("time", ...) deltas = deltas.reindex_like(ds) if to_level is not None: deltas.attrs["cat:processing_level"] = to_level return deltas
@parse_config def spatial_mean( # noqa: C901 ds: xr.Dataset, method: str, *, spatial_subset: bool | None = None, region: Region | Literal["global"] | None = None, simplify_tolerance: float | None = None, kwargs: dict | None = None, to_domain: str | None = None, to_level: str | None = None, ) -> xr.Dataset: r""" Compute the spatial mean using a variety of available methods. Parameters ---------- ds : xr.Dataset Dataset to use for the computation. method : str 'cos-lat' will weight the area covered by each pixel using an approximation based on latitude. 'xesmf' will make use of xESMF's SpatialAverager. This will typically be more precise, especially for irregular regions, but can be much slower than other methods. spatial_subset : bool, optional If True, xscen.spatial.subset will be called prior to the other operations. This requires the 'region' argument. If None, this will automatically become True if 'region' is provided and the subsetting method is 'cos-lat'. region : :py:data:`~xscen.spatial.Region` or 'global', optional Description of the region and the subsetting method. If method=='xesmf', the bounding box or shapefile is given to SpatialAverager. Can also be "global", for global averages. The latter is simply a shortcut for `{'name': 'global', 'method': 'bbox', 'lon_bnds' [-180, 180], 'lat_bnds': [-90, 90]}`. simplify_tolerance : float, optional Only used with 'xesmf' method. Precision (in degree) used to simplify a shapefile before sending it to SpatialAverager(). The simpler the polygons, the faster the averaging, but it will lose some precision. kwargs : dict, optional Arguments to send to SpatialAverager(). In addition, also for SpatialAverager, one can give `skipna` or `output_chunks` here, to be passed to the averager call itself. Additionally, for all methods, the 'latitude_name' and 'longitude_name' can be passed here to specify the names of those coordinates. to_domain : str, optional The domain to assign to the output. If None, the domain of the inputs is preserved. to_level : str, optional The processing level to assign to the output. If None, the processing level of the inputs is preserved. Returns ------- xr.Dataset Returns a Dataset with the spatial dimensions averaged. See Also -------- xesmf.SpatialAverager : The exact average of a gridded array over a geometry. """ if isinstance(ds, xr.DataArray): warnings.warn("Input is a DataArray, but should be a Dataset. This could lead to errors, especially with rotated poles.", stacklevel=2) kwargs = (kwargs or {}).copy() # Determine the coordinates i = 0 while ("latitude" not in ds.cf.coordinates or "longitude" not in ds.cf.coordinates) and i < 2: if i == 0: # First, try to guess the latitude coordinate using CF conventions ds = ds.cf.guess_coord_axis() elif i == 1: missing_coords = [f"{coord}_name" for coord in ["latitude", "longitude"] if coord not in ds.cf.coordinates] # If the coordinates are still not found, use the 'kwargs' argument if any(coord not in kwargs for coord in missing_coords): raise ValueError( "Could not determine the spatial coordinate names using CF conventions. " "Use kwargs = {latitude_name: str, longitude_name: str} to specify them." ) for coord in missing_coords: ds[kwargs[coord]].attrs["standard_name"] = coord.split("_")[0] i += 1 kwargs.pop("latitude_name", None) kwargs.pop("longitude_name", None) if region == "global": region = { "name": "global", "method": "bbox", "lat_bnds": [-90 + 1e-5, 90 - 1e-5], } # `spatial_subset` won't wrap coords on the bbox, we need to fit the system used on ds. if ds.cf["longitude"].min() >= 0: region["lon_bnds"] = [0, 360] else: region["lon_bnds"] = [-180, 180] if (region is not None) and (spatial_subset is None) and (method == "cos-lat"): logger.info("Automatically turning spatial_subset to True based on inputs.") spatial_subset = True # If requested, call xscen.spatial.subset prior to averaging if spatial_subset: ds = subset(ds, **region) if method == "cos-lat": if ds.cf["latitude"].attrs.get("units", "") != "degrees_north": warnings.warn( f"Latitude units are '{ds.cf['latitude'].attrs.get('units', '')}', " "expected 'degrees_north'. Make sure that the computation is right.", stacklevel=2, ) if ((ds.cf["longitude"].min() < -160) & (ds.cf["longitude"].max() > 160)) or ( (ds.cf["longitude"].min() < 20) & (ds.cf["longitude"].max() > 340) ): warnings.warn( "The region appears to be crossing the -180/180° meridian. Bounds computation is currently bugged in cf_xarray. " "Make sure that the computation is right.", stacklevel=2, ) weights = np.cos(np.deg2rad(ds.cf["latitude"])) if ds.cf["longitude"].ndim == 1: dims = ds.cf["longitude"].dims + ds.cf["latitude"].dims else: if "longitude" not in ds.cf.bounds: ds = ds.assign_coords(**create_bounds_gridmapping(ds)) # Weights the weights by the cell area (in °²) weights = weights * xr.DataArray( shapely.area(shapely.polygons(shapely.linearrings(ds.lon_bounds, ds.lat_bounds))), dims=ds.cf["longitude"].dims, coords=ds.cf["longitude"].coords, ) dims = ds.cf["longitude"].dims ds_agg = ds.weighted(weights).mean(dims, keep_attrs=True) # Prepare the History field new_history = ( f"[{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] " f"weighted mean(dim={dims}) using a 'cos-lat' approximation of areacella (in deg2)" ) elif method == "interp_centroid": # FIXME: No backward compatibility for this method since it produced incorrect results, but we want a specific error message for now. raise ValueError("Method 'interp_centroid' has been removed, since it produced incorrect results.") # Uses xesmf.SpatialAverager elif method == "xesmf": if xe is None: raise ImportError("Method xesmf requires xESMF to work, please install that package.") # If the region is a bounding box, call shapely and geopandas to transform it into an input compatible with xesmf if region["method"] == "bbox": polygon_geom = shapely.box( region["lon_bnds"][0], region["lat_bnds"][0], region["lon_bnds"][1], region["lat_bnds"][1], ) polygon = gpd.GeoDataFrame(index=[0], geometry=[polygon_geom]) # Prepare the History field new_history = ( f"[{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] " f"xesmf.SpatialAverager over {region['lon_bnds']}{region['lat_bnds']} - xESMF v{xe.__version__}" ) # If the region is a shapefile, open with geopandas elif region["method"] == "shape": if not isinstance(region["shape"], gpd.GeoDataFrame): polygon = gpd.read_file(region["shape"]) name = Path(region["shape"]).name else: polygon = region["shape"] name = f"{len(polygon)} polygons" # Simplify the geometries to a given tolerance, if needed. # The simpler the polygons, the faster the averaging, but it will lose some precision. if simplify_tolerance is not None: polygon = polygon.copy() polygon["geometry"] = polygon.simplify(tolerance=simplify_tolerance, preserve_topology=True) # Prepare the History field new_history = f"[{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] xesmf.SpatialAverager over {name} - xESMF v{xe.__version__}" else: raise ValueError("'method' should be one of [bbox, shape].") kwargs_copy = deepcopy(kwargs) call_kwargs = {"skipna": kwargs_copy.pop("skipna", False)} if "output_chunks" in kwargs: call_kwargs["output_chunks"] = kwargs_copy.pop("output_chunks") # Preemptive segmentization. Same threshold as xESMF, but there isn't strong analysis behind this choice geoms = shapely.segmentize(polygon.geometry, 1) if ds.cf["longitude"].ndim == 2 and "longitude" not in ds.cf.bounds: ds = ds.assign_coords(**create_bounds_gridmapping(ds)) savg = xe.SpatialAverager(ds, geoms, **kwargs_copy) ds_agg = savg(ds, keep_attrs=True, **call_kwargs) geom_dim_name = kwargs_copy.pop("geom_dim_name", "geom") extra_coords = {col: xr.DataArray(polygon[col], dims=(geom_dim_name,)) for col in polygon.columns if col != "geometry"} extra_coords[geom_dim_name] = xr.DataArray(polygon.index, dims=(geom_dim_name,)) ds_agg = ds_agg.assign_coords(**extra_coords) if len(polygon) == 1: ds_agg = ds_agg.squeeze(geom_dim_name) else: raise ValueError("Subsetting method should be ['cos-lat', 'interp_centroid', 'xesmf']") # If the dataset had a projection, remove it grid_mapping = get_grid_mapping(ds) if grid_mapping: ds_agg = ds_agg.drop_vars(grid_mapping) for v in ds_agg.data_vars: ds_agg[v].attrs.pop("grid_mapping", None) # History history = new_history + " \n " + ds_agg.attrs["history"] if "history" in ds_agg.attrs else new_history ds_agg.attrs["history"] = history # Attrs if to_domain is not None: ds_agg.attrs["cat:domain"] = to_domain if to_level is not None: ds_agg.attrs["cat:processing_level"] = to_level return ds_agg
[docs] @parse_config def produce_horizon( # noqa: C901 ds: xr.Dataset, *, indicators: (str | os.PathLike | Sequence[Indicator] | Sequence[tuple[str, Indicator]] | ModuleType | None) = None, periods: list[str] | list[list[str]] | None = None, warminglevels: dict | None = None, op: str | dict = "mean", to_level: str | None = "horizons", ) -> xr.Dataset: """ Compute indicators, the climatological mean, and unstack dates in order to have a single dataset with all indicators of different frequencies. Once this is done, the function drops 'time' in favor of 'horizon'. This function computes the indicators and does an interannual mean. It stacks the season and month in different dimensions and adds a dimension `horizon` for the period or the warming level, if given. Parameters ---------- ds : xr.Dataset Input dataset with a time dimension. If 'indicators' is None, the dataset should contain the precomputed indicators. indicators : str | os.PathLike | Sequence[Indicator] | Sequence[Tuple[str, Indicator]] | ModuleType, optional Indicators to compute. It will be passed to the `indicators` argument of `xs.compute_indicators`. periods : list of str or list of lists of str, optional Either [start, end] or list of [start_year, end_year] for the period(s) to be evaluated. If both periods and warminglevels are None, the full time series will be used. warminglevels : dict, optional Dictionary of arguments to pass to `py:func:xscen.subset_warming_level`. If 'wl' is a list, the function will be called for each value and produce multiple horizons. If both periods and warminglevels are None, the full time series will be used. op : str or dict Operation to perform over the time dimension. See `py:func:xscen.climatological_op` for details. Default is 'mean'. to_level : str, optional The processing level to assign to the output. If there is only one horizon, you can use "{wl}", "{period0}" and "{period1}" in the string to dynamically include that information in the processing level. Returns ------- xr.Dataset Horizon dataset. """ if "warminglevel" in ds and len(ds.warminglevel) != 1: raise ValueError( "Input dataset should only have `warminglevel` dimension of length 1. " "If you want to use produce_horizon for multiple warming levels, " "extract the full time series and use the `warminglevels` argument instead." ) all_periods = [] if periods is not None: all_periods.extend(standardize_periods(periods)) if warminglevels is not None: if isinstance(warminglevels["wl"], int | float): all_periods.append(warminglevels) elif isinstance(warminglevels["wl"], list): template = deepcopy(warminglevels) for wl in warminglevels["wl"]: all_periods.append({**template, "wl": wl}) else: raise ValueError(f"Could not understand the format of warminglevels['wl']: {warminglevels['wl']}") if len(all_periods) == 0: all_periods = standardize_periods([[int(ds.time.dt.year[0]), int(ds.time.dt.year[-1])]]) out = [] for period in all_periods: if isinstance(period, list): if ds.time.dt.year[0] <= int(period[0]) and ds.time.dt.year[-1] >= int(period[1]): ds_sub = ds.sel(time=slice(period[0], period[1])) else: warnings.warn( f"The requested period {period} is not fully covered by the input dataset. The requested period will be skipped.", stacklevel=2 ) ds_sub = None else: ds_sub = subset_warming_level(ds, **period) if ds_sub is not None: if indicators is not None: # compute indicators ind_dict = compute_indicators( ds=(ds_sub.squeeze(dim="warminglevel") if "warminglevel" in ds_sub.dims else ds_sub), indicators=indicators, ) else: ind_dict = {"skipped": ds_sub} # Compute the window-year mean ds_merge = xr.Dataset() for freq, ds_ind in ind_dict.items(): if freq != "fx": ds_mean = climatological_op( ds_ind, op=op, rename_variables=False, horizons_as_dim=True, ).drop_vars("time") else: ds_mean = ds_ind.expand_dims(dim={"horizon": [f"{ds.time.dt.year[0].item()}-{ds.time.dt.year[-1].item()}"]}) ds_mean["horizon"] = ds_mean["horizon"].astype(str) if "warminglevel" in ds_mean.coords: wl = np.array([ds_mean["warminglevel"].item()]) wl_attrs = ds_mean["warminglevel"].attrs if "warminglevel" in ds_mean.dims: ds_mean = ds_mean.squeeze("warminglevel") ds_mean = ds_mean.drop_vars("warminglevel") ds_mean["horizon"] = wl ds_mean["horizon"].attrs.update(wl_attrs) # put all indicators in one dataset for var in ds_mean.data_vars: ds_merge[var] = ds_mean[var] ds_merge.attrs.update(ds_mean.attrs) out.append(ds_merge) # If automatic attributes are not the same for all indicators, warn the user. if len(out) > 0: for v in out[0].data_vars: if not all( [all([out[0][v].attrs.get(attr) == out[i][v].attrs.get(attr) for i in range(1, len(out))]) for attr in ["long_name", "description"]] ): warnings.warn( f"The attributes for variable {v} are not the same for all horizons, " "probably because the periods were not of the same length. " "Attributes will be kept from the first horizon, but this might not be the most appropriate.", stacklevel=2, ) out = xr.concat(out, dim="horizon") out.attrs["cat:xrfreq"] = "fx" out.attrs["cat:frequency"] = "fx" if to_level: if len(all_periods) == 1: if (isinstance(all_periods[0], dict)) or ("warminglevel" in ds.dims and warminglevels is None): to_level = to_level.format(wl=(ds_sub.warminglevel.values[0] if isinstance(all_periods[0], dict) else ds.warminglevel.values[0])) else: to_level = to_level.format(period0=all_periods[0][0], period1=all_periods[0][1]) out.attrs["cat:processing_level"] = to_level return out else: raise ValueError("No horizon could be computed. Check your inputs.")
def _ulinregress(x, y, **kwargs): # Wrapper for scipy.stats.linregress to unpack multiple return values in xr.apply_ufunc valid_x = ~np.isnan(x) valid_y = ~np.isnan(y) mask = valid_x & valid_y if np.sum(mask) >= kwargs.get("min_periods", 1): kwargs.pop("min_periods", None) x = x[mask] y = y[mask] reg = scipy.stats.linregress(x, y, **kwargs) out = np.array( [ reg.slope, reg.intercept, reg.rvalue, reg.pvalue, reg.stderr, reg.intercept_stderr, ] ) else: out = np.full(6, np.nan) return out def _theilslopes(x, y, **kwargs): # Wrapper for scipy.stats.theilslopes to unpack multiple return values in xr.apply_ufunc valid_x = ~np.isnan(x) valid_y = ~np.isnan(y) mask = valid_x & valid_y if np.sum(mask) >= kwargs.pop("min_periods", 1): if kwargs: # theilslopes and kendalltau can both take kwargs, # make sure that the user is explicit about which kwargs are for which function if not set(kwargs.keys()).issubset({"theilslopes", "kendalltau"}): op_str = 'climatological_op(op={"theilslopes": {"theilslopes":{"alpha": 0.90}, "kendalltau": {"method": "auto"}}})' msg = 'Sending scipy kwargs to climatological_op(op="theilslopes") requires explicit key, \ value pairs for "scipy.stats.theilslopes" and/or "scipy.stats.kendalltau".' msg += f" Example: {op_str}, received {kwargs}" raise ValueError(msg) x = x[mask] y = y[mask] reg = scipy.stats.theilslopes(y, x, **kwargs.get("theilslopes", {})) correlation, p_value = scipy.stats.kendalltau(x, y, **kwargs.get("kendalltau", {})) out = np.array( [ reg.slope, # slope reg.intercept, # intercept reg.low_slope, # lower bound of slope confidence interval reg.high_slope, # upper bound of slope confidence interval correlation, # correlation coefficient for kendall tau p_value, # significance of the correlation for kendall tau ] ) else: out = np.full(6, np.nan) return out def _common_trend_utils(ds_rolling=None, func=None, op_kwargs=None, min_periods=None, window=None, stride=None, **kwargs): # prepare kwargs trend_kwargs = {k: v for k, v in op_kwargs.items() if "keep_attrs" not in k} trend_kwargs["min_periods"] = min_periods # unwrap DatasetRolling object and select years subset dsr_construct = ds_rolling.construct(window_dim="window", keep_attrs=True) dsr_construct = dsr_construct.shift(time=-(window - 1)).isel(time=slice(None, -(window - 1), stride)) # construct array to use years as x values (==regressors) in xr.apply_ufunc years_as_x_values = xr.DataArray( np.arange(dsr_construct.window.size).repeat(dsr_construct.time.size).reshape(dsr_construct.window.size, dsr_construct.time.size) + dsr_construct.time.dt.year.values, dims=["window", "time"], coords={ "window": dsr_construct.window.values, "time": dsr_construct.time, }, ) if func == "linregress": subfunc = _ulinregress elif func == "theilslopes": subfunc = _theilslopes else: raise ValueError(f"Unknown func: {func}") # apply linregress along windows ds_rolling = xr.apply_ufunc( subfunc, years_as_x_values, dsr_construct, input_core_dims=[["window"], ["window"]], output_core_dims=[["trend_param"]], vectorize=True, dask="parallelized", output_dtypes=["float32"], dask_gufunc_kwargs={"output_sizes": {"trend_param": 6}}, keep_attrs="no_conflicts", kwargs=trend_kwargs, ) # label new coords if func == "linregress": ds_rolling = ds_rolling.rename({"trend_param": "linreg_param"}) ds_rolling = ds_rolling.assign_coords(linreg_param=["slope", "intercept", "rvalue", "pvalue", "stderr", "intercept_stderr"]) elif func == "theilslopes": ds_rolling = ds_rolling.rename({"trend_param": "theilslopes_param"}) ds_rolling = ds_rolling.assign_coords(theilslopes_param=["slope", "intercept", "lower_slope", "upper_slope", "correlation", "p_value"]) return ds_rolling