Source code for hydromt.stats.extremes

"""Functions for extreme value analysis."""
import math as math
from typing import Optional

import dask
import numpy as np
import xarray as xr
from numba import njit
from scipy import stats

__all__ = [

_RPS = np.array([2, 5, 10, 25, 50, 100, 250, 500])  # years
_DISTS = {
    "POT": ["exp", "gpd"],
    "BM": ["gumb", "gev"],

## high level methods

[docs] def eva( da: xr.DataArray, ev_type: str = "BM", min_dist: int = 0, qthresh: float = 0.9, period: str = "365.25D", min_sample_size: int = 0, distribution: Optional[str] = None, rps: np.ndarray = _RPS, criterium: str = "AIC", ) -> xr.Dataset: """Return Extreme Value Analysis. Extreme value analysis based on block maxima (BM) or Peaks Over Threshold (POT). The method selects the peaks, fits a distribution and calculates return values for provided return periods. Parameters ---------- da : xr.DataArray Timeseries data, must have a regular spaced 'time' dimension. ev_type: {"POT", "BM"} Peaks over threshold (POT) or block maxima (BM) peaks, by default "BM" period : str, optional Period string, by default "365.25D". See pandas.Timedelta for options. min_dist : int, optional Minimum distance between peaks measured in time steps, by default 0 qthresh : float, optional Quantile threshold used with peaks over threshold method, by default 0.9 min_sample_size : int, optional Minimum number of finite values in a valid block, by default 0. Peaks of invalid blocks are set to NaN. distribution : str, optional Short name of distribution. If None (default) the optimal block maxima distribution ("gumb" or "gev" for BM and "exp" or "gpd" for POT) is selected based on `criterium`. rps : np.ndarray, optional Array of return periods, by default [2, 5, 10, 25, 50, 100, 250, 500] criterium: {'AIC', 'AICc', 'BIC'} Selection criterium, by default "AIC" Returns ------- xr.Dataset Dataset with peaks timeseries, distribution name and parameters and return values. """ da_peaks = get_peaks( da, ev_type=ev_type, min_dist=min_dist, qthresh=qthresh, period=period, min_sample_size=min_sample_size, ) # fit distribution using lmom da_params = fit_extremes( da_peaks, ev_type=ev_type, distribution=distribution, criterium=criterium ) # get return values da_rps = get_return_value(da_params, rps=rps) # combine data return xr.merge([da_peaks, da_params, da_rps])
# In theory could be removed because redundant with eva def eva_block_maxima( da: xr.DataArray, period: str = "365.25D", min_dist: int = 0, min_sample_size: int = 0, distribution: Optional[str] = None, rps: np.ndarray = _RPS, criterium: str = "AIC", ) -> xr.Dataset: """Return EVA based on block maxima. Extreme valua analysis based on block maxima. The method selects the peaks, fits a distribution and calculates return values for provided return periods. Parameters ---------- da : xr.DataArray Timeseries data, must have a regular spaced 'time' dimension. period : str, optional Period string, by default "365.25D". See pandas.Timedelta for options. min_dist : int, optional Minimum distance between peaks measured in time steps, by default 0 min_sample_size : int, optional Minimum number of finite values in a valid block, by default 0. Peaks of invalid blocks are set to NaN. distribution : str, optional Short name of distribution. If None (default) the optimal block maxima distribution ("gumb" or "gev") is selected based on `criterium`. rps : np.ndarray, optional Array of return periods, by default [2, 5, 10, 25, 50, 100, 250, 500] criterium: {'AIC', 'AICc', 'BIC'} Selection criterium, by default "AIC" Returns ------- xr.Dataset Dataset with peaks timeseries, distribution name and parameters and return values. """ da_bm = get_peaks( da, ev_type="BM", min_dist=min_dist, min_sample_size=min_sample_size, period=period, ) # fit distribution using lmom da_params = fit_extremes( da_bm, ev_type="BM", distribution=distribution, criterium=criterium ) # get return values da_rps = get_return_value(da_params, rps=rps) # combine data return xr.merge([da_bm, da_params, da_rps]) # In theory could be removed because redundant with eva def eva_peaks_over_threshold( da: xr.DataArray, qthresh: float = 0.9, min_dist: int = 0, min_sample_size: int = 0, period: str = "365.25D", distribution: Optional[str] = None, rps: np.ndarray = _RPS, criterium: str = "AIC", ) -> xr.Dataset: """Return EVA based on peaks over threshold. Extreme valua analysis based on peaks over threshold. The method selects the peaks, fits a distribution and calculates return values for provided return periods. Parameters ---------- da : xr.DataArray Timeseries data, must have a regular spaced 'time' dimension. qthresh : float, optional Quantile threshold used with peaks over threshold method, by default 0.9 min_dist : int, optional Minimum distance between peaks measured in time steps, by default 0 min_sample_size : int, optional Minumimum number of finite values in a valid block, by default 0. Peaks of period : str, optional Period string, by default "365.25D". See pandas.Timedelta for options. distribution : str, optional Short name of distribution. If None (default) the optimal block maxima distribution ("exp" or "gpd") is selected based on `criterium`. rps : np.ndarray, optional Array of return periods, by default [1.5, 2, 5, 10, 20, 50, 100, 200, 500] criterium: {'AIC', 'AICc', 'BIC'} distrition selection criterium, by default "AIC" Returns ------- xr.Dataset Dataset with peaks timeseries, distribution name and parameters and return values. """ da_bm = get_peaks( da, ev_type="POT", min_dist=min_dist, period=period, qthresh=qthresh, min_sample_size=min_sample_size, ) # fit distribution using lmom da_params = fit_extremes( da_bm, ev_type="POT", distribution=distribution, criterium=criterium ) # get return values da_rps = get_return_value(da_params, rps=rps) return xr.merge([da_bm, da_params, da_rps])
[docs] def get_peaks( da: xr.DataArray, ev_type: str = "BM", min_dist: int = 0, qthresh: float = 0.9, period: str = "year", min_sample_size: int = 0, time_dim: str = "time", ) -> xr.DataArray: """Return peaks from time series. Return the timeseries with all but the peak values set to NaN. For block maxima (BM) peaks, peaks are determined by finding the maximum within each period and then ensuring a minimum distance (min_dist) and minimum sample size (min_sample_size) per period. For Peaks Over Threshold (POT), peaks are determined solely based on the minimum distance between peaks. The average interarrival time (extreme_rates) is calculated by dividing the number of peaks by the total duration of the timeseries and converting it to a yearly rate. Parameters ---------- da : xr.DataArray Timeseries data, must have a regular spaced 'time' dimension. ev_type : {"POT", "BM"} Peaks over threshold (POT) or block maxima (BM) peaks, by default "BM" min_dist : int, optional Minimum distance between peaks measured in time steps, by default 0 qthresh : float, optional Quantile threshold used with peaks over threshold method, by default 0.9 period : {'year', 'month', 'quarter', pandas.Timedelta}, optional Period string, by default "year". min_sample_size : int, optional Minimum number of finite values in a valid block, by default 0. Peaks of invalid bins are set to NaN. Returns ------- xr.DataArray Timeseries data with only peak values, all other values are set to NaN. Average interarrival time calculated based on the average number of peaks per year and stored as "extreme_rates" """ if not (0 < qthresh < 1.0): raise ValueError("Quantile 'qthresh' should be between (0,1)") if time_dim not in da.dims: raise ValueError(f"Input array should have a '{time_dim}' dimension") if ev_type.upper() not in _DISTS.keys(): raise ValueError( f"Unknown ev_type {ev_type.upper()}, select from {_DISTS.keys()}." ) bins = None nyears = (da[time_dim][-1] - da[time_dim][0]).dt.days / 365.2425 if period in ["year", "quarter", "month"] and ev_type.upper() == "BM": bins = getattr(da[time_dim].dt, period).values elif ev_type.upper() == "BM": tstart = da[time_dim].resample(time=period, label="left").first() bins = tstart.reindex_like(da, method="ffill").values.astype(float) else: min_sample_size = 0 # min_sample_size not used for POT def func(x): return local_max_1d( x, min_dist=min_dist, bins=bins, min_sample_size=min_sample_size ) duck = dask.array if isinstance(, dask.array.Array) else np lmax = duck.apply_along_axis(func, da.get_axis_num(time_dim), da) # apply POT threshold peaks = da.where(lmax) if ev_type.upper() == "POT": peaks = da.where(peaks > da.quantile(qthresh, dim=time_dim)) # get extreme rate per year da_rate = np.isfinite(peaks).sum(time_dim) / nyears peaks = peaks.assign_coords({"extremes_rate": da_rate}) = "peaks" return peaks
[docs] def get_return_value( da_params: xr.DataArray, rps: np.ndarray = _RPS, extremes_rate=1.0 ) -> xr.DataArray: """Return return value based on EVA. Return return values based on a fitted extreme value distribution using the :py:meth:`fit_extremes` method based on the scipy inverse survival function (isf). Parameters ---------- da_params : xr.DataArray Short name and parameters of extreme value distribution, see also :py:meth:`fit_extremes`. rps : np.ndarray, optional Array of return periods in years, by default [1.5, 2, 5, 10, 20, 50, 100, 200, 500] extremes_rate : float, optional Average number of peaks per period, by default 1.0 Only used if extremes_rate is not provided as coordinate in da_params. Returns ------- xr.DataArray Return values """ def _return_values_1d(p, r, d, rps=rps): if np.isnan(p).all(): return np.full(rps.size, np.nan) if d == "gumb" and len(p) == 3: p = p[1:] return _get_return_values(p, d, rps=rps, extremes_rate=r) if isinstance(rps, list): rps = np.asarray(rps) elif not isinstance(rps, np.ndarray): raise ValueError("rps should be a list or numpy array") if "dparams" not in da_params.dims: raise ValueError("da_params should have a 'dparams' dimension") if "distribution" not in da_params.coords: raise ValueError("da_params should have a 'distribution' coordinate") distributions = da_params["distribution"].load() if "extremes_rate" in da_params.coords: extremes_rate = da_params["extremes_rate"].load() elif isinstance(extremes_rate, (int, float)): extremes_rate = np.full(distributions.shape, extremes_rate) elif extremes_rate.shape != distributions.shape: raise ValueError( "extremes_rate should be a scalar or have the same shape as distribution" ) if da_params.ndim == 1: # fix case of single dim da_params = da_params.expand_dims("index") da_rvs = xr.apply_ufunc( _return_values_1d, da_params, extremes_rate, distributions, input_core_dims=(["dparams"], [], []), output_core_dims=[["rps"]], vectorize=True, dask="parallelized", dask_gufunc_kwargs=dict(output_sizes={"rps": rps.size}), output_dtypes=[float], ) da_rvs["rps"] = xr.IndexVariable("rps", rps) = "return_values" return da_rvs.squeeze()
[docs] def fit_extremes( da_peaks: xr.DataArray, distribution: Optional[str] = None, ev_type: str = "BM", criterium: str = "AIC", time_dim: str = "time", ) -> xr.DataArray: """Return distribution fit from extremes. Return the fitted parameters of the extreme value `distribution` based on the lmoments method. If no distribution name is provided `distribution=None`, the optimal distribution is selected based on `criterium` from a list of distributions associated with `ev_type`. Block maximum distributions: gumbel ("gumb") and general extreme value ("gev"). Peak over threshold distributions: exponential ("exp") and general pareto distribution ("gdp"). Parameters ---------- da_peaks : xr.DataArray Timeseries data with only peak values, any other values are set to NaN. The DataArray should contain as coordinate `extreme_rates` indicating the yearly rate of extreme events. If not provided, this value is set to 1.0 distribution: {'gev', 'gpd', 'gumb', 'exp'}, optional Short distribution name. If None (default) the optimal distribution is calculated based on `criterium` ev_type : {"POT", "BM"} Peaks over threshold (POT) or block maxima (BM) peaks, by default "BM" criterium: {'AIC', 'AICc', 'BIC'} Selection criterium, by default "AIC" Returns ------- xr.DataArray Parameters and short name of optimal extreme value distribution. """ distributions = _DISTS.get(ev_type.upper(), None) if distribution is not None: if isinstance(distribution, str): distributions = [distribution] elif not isinstance(distribution, list): raise ValueError( f"distribution should be a string or list, got {type(distribution)}" ) elif ev_type.upper() not in _DISTS: raise ValueError( f"Unknown ev_type {ev_type.upper()}, select from {_DISTS.keys()}." ) def _fitopt_1d(x, distributions=distributions, criterium=criterium): if np.isnan(x).all(): return np.array([np.nan, np.nan, np.nan, -1]) params, d = lmoment_fitopt(x, distributions=distributions, criterium=criterium) if len(params) == 2: params = np.concatenate([[0], params]) # trick to include distribution name; -1 if too little data to fit -> NA idist = distributions.index(d) if d in distributions else -1 return np.concatenate([params, [idist]]) if da_peaks.ndim == 1: # fix case of single dim da_peaks = da_peaks.expand_dims("index") da_params = xr.apply_ufunc( _fitopt_1d, da_peaks, input_core_dims=[[time_dim]], output_core_dims=[["dparams"]], dask_gufunc_kwargs=dict(output_sizes={"dparams": 4}), vectorize=True, dask="parallelized", output_dtypes=[float], ) # split output idist = da_params.isel(dparams=-1).values.astype(int) distributions = np.atleast_1d(np.array(distributions + ["NA"])[idist]) da_params = da_params.isel(dparams=slice(0, -1)) = "parameters" # add coordinates dist_dims = list([d for d in da_params.dims if d != "dparams"]) coords = dict( dparams=xr.IndexVariable("dparams", ["shape", "loc", "scale"]), distribution=xr.DataArray(dims=dist_dims, data=distributions), ) # forward extremes_rate if provided if "extremes_rate" in da_peaks.coords: coords["extremes_rate"] = da_peaks["extremes_rate"] da_params = da_params.assign_coords(coords) return da_params.squeeze()
## PEAKS @njit def local_max_1d( arr: np.ndarray, bins: np.ndarray = None, min_dist: int = 0, min_sample_size: int = 0, ) -> np.ndarray: """Return boolean index of local maxima in `arr` which are `min_dist` apart. Parameters ---------- arr : np.ndarray 1D time series bins : np.ndarray, optional 1D array of with uniquely numbered bins (blocks), by default None. If provided only the largest peak per block is flagged. min_dist : int, optional Minimum distance between peaks, by default 0 min_sample_size : int, optional minimum number of samples per block, by default 0 Returns ------- np.ndarray boolean index of local maxima """ a0 = arr[0] amax = -np.inf # peak value imax = -min_dist # peak index bsize = 0 min_sample_size = 0 if bins is None else min_sample_size up = False # sign of difference between subsequent values out = np.array([bool(0) for _ in range(arr.size)]) for i in range(arr.size): a1 = arr[i] if not np.isfinite(a1): a0 = a1 continue dd = i - 1 - imax # distance to previous peak if (imax > 0) and ( (bins is None and dd == (min_dist + 1)) or (bins is not None and bins[i - 1] != bins[imax] and dd > min_dist) ): if bsize >= min_sample_size: out[imax] = True amax = -np.inf bsize = 0 if up and a1 < a0 and a0 > amax: # peak imax = i - 1 amax = a0 if a1 < a0: up = False elif a1 > a0: up = True bsize += 1 a0 = a1 if imax > 0 and bsize >= min_sample_size: out[imax] = True return out ## LINK TO SCIPY.STATS def get_dist(distribution): """Return scipy.stats distribution.""" _DISTS = { "gev": "genextreme", "gpd": "genpareto", "gumb": "gumbel_r", "exp": "genpareto", } _scipy_dist_name = _DISTS.get(distribution, distribution) dist = getattr(stats, _scipy_dist_name, None) if dist is None: raise ValueError(f'Distribution "{_scipy_dist_name}" not found in scipy.stats.') return dist def get_frozen_dist(params, distribution): """Return frozen distribution. Returns scipy.stats frozen distribution, i.e.: with set parameters. """ return get_dist(distribution)(*params[:-2], loc=params[-2], scale=params[-1]) ## STATS # TODO add ks and cmv tests # cvm = stats.cramervonmises(x, frozen_dist.cdf) # ks = stats.kstest(x, frozen_dist.cdf) def _aic(x, params, distribution): """Return Akaike Information Criterion for a frozen distribution.""" k = len(params) nll = get_frozen_dist(params, distribution).logpdf(x).sum() aic = 2 * k - 2 * nll return aic def _aicc(x, params, distribution): """Return AICC. Return Akaike Information Criterion with correction for small sample size for a frozen distribution. """ k = len(params) aic = _aic(x, params, distribution) aicc = aic + ((2 * k) ** 2 + 2 * k) / (len(x) - k - 1) return aicc def _bic(x, params, distribution): """Return Bayesian Information Criterion for a frozen distribution.""" k = len(params) nll = get_frozen_dist(params, distribution).logpdf(x).sum() bic = k * np.log(len(x)) - 2 * nll return bic ## TRANSFORMATIONS def _get_return_values(params, distribution, rps=_RPS, extremes_rate=1.0): q = 1 / rps / extremes_rate return get_frozen_dist(params, distribution).isf(q) def _get_return_periods(x, a=0.0, extremes_rate=1.0): assert np.all(np.isfinite(x)) b = 1.0 - 2.0 * a ranks = (len(x) + 1) - stats.rankdata(x, method="average") freq = ((ranks - a) / (len(x) + b)) * extremes_rate rps = 1 / freq return rps ## CONFIDENCE INTERVALS def lmoment_ci(x, distribution, nsample=1000, alpha=0.9, rps=_RPS, extremes_rate=1.0): q = 1 / rps / extremes_rate dist = get_dist(distribution) def func(x, distribution=distribution, q=q): p = lmoment_fit(x, distribution) return dist.isf(q, *p[:-2], loc=p[-2], scale=p[-1]) # np.random.seed(12456) x_sample = np.random.choice(x, size=[nsample, x.size], replace=True) xrv = np.apply_along_axis(func, 1, x_sample) percentiles = np.array([(1 - alpha) / 2, 1 - (1 - alpha) / 2]) * 100 return np.percentile(xrv, percentiles, axis=0) ## PLOTS def plot_return_values( x: xr.DataArray, params: xr.DataArray, distribution: str, x_scale: Optional[str] = "gumbel", ax=None, color: Optional[str] = "k", a: Optional[float] = 0, alpha: Optional[float] = 0.9, nsample: Optional[int] = 1000, rps: Optional[np.ndarray] = _RPS, extremes_rate: float = 1.0, ): # TODO: add description to this function Dirk - done very simply now # Maybe extremes_rate should be checked from the params? """Return figure of EVA fit and empirical data. Parameters ---------- x : xr.DataArray Timeseries data with only peak values, all other values are set to NaN params : xr.DataArray Short name and parameters of extreme value distribution, see also :py:meth:`fit_extremes`. distribution : str Name of distribution used for fit. Can be set to `empirical` if no fit is performed x_scale : str, optional transformation of the x-axis for the figure ax : color : str, optional Color of the line. By default colord is black 'k'. a : float, optional Float for the Gringorten position. By default a = 0 alpha : float, optional alpha value for the confidence interval of the EV fit. By default, alpha = 0.9 nsample : int, optional number of samples used to calculate the confidence interval. By default, nsample = 1000 rps : np.ndarray, optional Specific return periods to highlight as points in the plot. By default [2, 5, 10, 25, 50, 100, 250, 500] years extremes_rate : float. Average interarrival time calculated based on the average number of peaks per year. For annual maxima, extremes_rate = 1 Returns ------- ax.figure Figure of EVA fit and empirical data """ import matplotlib.pyplot as plt rvs_obs = np.sort(x[np.isfinite(x)]) if distribution != "empirical": params = params[-2:] if distribution == "gumb" else params rvs_sim = _get_return_values( params, distribution, rps=rps, extremes_rate=extremes_rate ) else: rvs_sim = rvs_obs rps_obs = _get_return_periods(rvs_obs, a=a, extremes_rate=extremes_rate) if x_scale == "gumbel": xsim = -np.log(-np.log(1.0 - 1.0 / rps)) xobs = -np.log(-np.log(1.0 - 1.0 / rps_obs)) else: xsim = rps xobs = rps_obs if ax is None: _, ax = plt.subplots(1, 1, figsize=(10, 6)) ax.plot(xobs, rvs_obs, color=color, marker="o", label="plot position", lw=0) if distribution != "empirical": ax.plot( xsim, rvs_sim, color=color, ls="--", label=f"{distribution.upper()} fit" ) if alpha is not None and nsample > 0 and distribution != "empirical": urvs = lmoment_ci( x, distribution, nsample=nsample, alpha=alpha, rps=rps, extremes_rate=extremes_rate, ) ax.plot( xsim, urvs[0, :], color=color, ls=":", label=f"conf. interval (alpha = {alpha:.2f})", ) ax.plot( xsim, urvs[1, :], color=color, ls=":", ) ax.legend(loc="upper left") ymin = 0 ymax = np.nanmax([np.nanmax(rvs_obs), np.nanmax(rvs_sim), 0]) ymax = ymax * 1.1 ax.set_ylim(ymin, ymax) ax.set_ylabel("Return value") if x_scale == "gumbel": ax.set_xticks(xsim) ax.set_xticklabels(rps) ax.set_xlabel("Return period") ax.grid() return ax ## LMOMENTS FITTING # credits Ferdinand Diermanse def lmoment_fitopt(x, distributions=None, criterium="AIC"): """Return parameters based on lmoments fit. Lmomentfit routine derive parameters of a distribution function based on lmoments. The distribution selection is based on the AIC criterium. Based on the theory of Hosking and Wallis (1997) Appendix A. Parameters ---------- X: 1D array of float data series distributions: iterable of {'gev', 'gpd', 'gumb', 'exp'} iterable of distribution names criterium: {'AIC', 'AICc', 'BIC'} distrition selection criterium Returns ------- params: ndarray of float array of distribution parameters distribution: str selected distribution """ fgof = {"AIC": _aic, "AICC": _aicc, "BIC": _bic}.get(criterium.upper()) # make sure the timeseries does not contain NaNs x = x[~np.isnan(x)] # derive first four L-moments from data lmom = get_lmom(x, 4) # derive parameters of distribution function params = {} gof_values = [] for distribution in distributions: params[distribution] = _lmomentfit(lmom, distribution) gof_values.append(fgof(x, params[distribution], distribution)) distribution = distributions[np.argmin(gof_values)] return params[distribution], distribution def lmoment_fit(x, distribution): """Return parameters based on lmoments. Lmomentfit routine derive parameters of a distribution function based on lmoments. Based on the theory of Hosking and Wallis (1997) Appendix A. Parameters ---------- X: 1D array of float data series distribution: {'gev', 'gpd', 'gumb', 'exp'} Short name of distribution function to be fitted. Returns ------- params: ndarray of float array of distribution parameters lambda: 1D array of float vector of (nmom) L-moments """ # make sure the timesiries does not contain NaNs x = x[~np.isnan(x)] # derive first four L-moments from data lmom = get_lmom(x, 4) # derive parameters of distribution function params = _lmomentfit(lmom, distribution) return params def _lmomentfit(lmom, distribution): """Return parameters based on lmoments. Lmomentfit routine to derive parameters of a distribution function based on given lmoments. Based on the theory of Hosking and Wallis (1997) Appendix A. Parameters ---------- lmom: 1D array of float l-moments, derived from data distribution: {'gev', 'gpd', 'gumb', 'exp'} Short name of distribution function to be fitted. Returns ------- params: ndarray of float array of distribution parameters """ # l-moment ratios from l-moments # tau = lmom[2]/lmom[1] # tau = L-CV tau3 = lmom[2] / lmom[1] # tau3 = L-SK # tau4 = lmom[4]/lmom[2] # tau4 = L-KU # derive parameters for selected distribution if distribution in ["gev", "genextreme"]: c1 = 2.0 / (3.0 + tau3) - np.log(2.0) / np.log(3.0) k1 = 7.859 * c1 + 2.9554 * (c1**2.0) s1 = (lmom[1] * k1) / ((1.0 - 2.0 ** (-k1)) * math.gamma(1.0 + k1)) m1 = lmom[0] - (s1 / k1) * (1.0 - math.gamma(1.0 + k1)) params = (k1, m1, s1) elif distribution in ["gumb", "gumbel_r"]: s1 = lmom[1] / np.log(2.0) m1 = lmom[0] - 0.5772 * s1 params = (m1, s1) elif distribution in ["gpd", "genpareto"]: k1 = (1 - 3 * tau3) / (1 + tau3) s1 = (1 + k1) * (2 + k1) * lmom[1] m1 = lmom[0] - (2 + k1) * lmom[1] params = (-k1, m1, s1) elif distribution in ["exp", "genexpon"]: k1 = 1e-8 s1 = (1 + k1) * (2 + k1) * lmom[1] m1 = lmom[0] - (2 + k1) * lmom[1] params = (0.0, m1, s1) else: raise ValueError("Unknown distribution") return params def legendre_shift_poly(n): """Shifted Legendre polynomial. Based on recurrence relation (n + 1)Pn+1 (x) - (1 + 2 n)(2 x - 1)Pn (x) + n Pn-1 (x) = 0 Given nonnegative integer n, compute the shifted Legendre polynomial P_n. Return the result as a vector whose mth element is the coefficient of x^(n+1-m). polyval(legendre_shift_poly(n),x) evaluates P_n(x). """ if n == 0: pk = 1 elif n == 1: pk = [2, -1] else: pkm2 = np.zeros(n + 1) pkm2[-1] = 1 pkm1 = np.zeros(n + 1) pkm1[-1] = -1 pkm1[-2] = 2 for k in range(2, n + 1): pk = np.zeros(n + 1) for e in range(n - k + 1, n + 1): pk[e - 1] = ( (4 * k - 2) * pkm1[e] + (1 - 2 * k) * pkm1[e - 1] + (1 - k) * pkm2[e - 1] ) pk[-1] = (1 - 2 * k) * pkm1[-1] + (1 - k) * pkm2[-1] pk = pk / k if k < n: pkm2 = pkm1 pkm1 = pk return pk def get_lmom(x, nmom=4): """Compute L-moments for a data series. Based on calculation of probability weighted moments and the coefficient of the shifted Legendre polynomial. lmom by Kobus N. Bekker, 14-09-2004 Parameters ---------- x: 1D array of float data series nmom: int number of L-Moments to be computed, by default 4. Returns ------- lmom: 1D array of float vector of (nmom) L-moments """ n = len(x) xs = np.sort(x, axis=0) bb = np.zeros(nmom - 1) ll = np.zeros(nmom - 1) b0 = xs.mean(axis=0) for r in range(1, nmom): Num1 = np.kron(np.ones((r, 1)), np.arange(r + 1, n + 1)) Num2 = np.kron(np.ones((n - r, 1)), np.arange(1, r + 1)).T Num = - Num2, axis=0) Den =, r)), n) - np.arange(1, r + 1)) bb[r - 1] = (((Num / Den) * xs[r:n]).sum()) / n B = np.concatenate([np.array([b0]), bb.T])[::-1] for i in range(1, nmom): Spc = np.zeros(len(B) - (i + 1)) Coeff = np.concatenate([Spc, legendre_shift_poly(i)]) ll[i - 1] = np.sum(Coeff * B) lmom = np.concatenate([np.array([b0]), ll.T]) return lmom