Source code for hydromt.stats.design_events

"""Functions for design events."""
import math as math
from typing import Optional

import numpy as np
import xarray as xr
from numba import njit

__all__ = [
    "get_peak_hydrographs",
]


[docs] def get_peak_hydrographs( da: xr.DataArray, da_peaks: xr.DataArray, wdw_size: int, n_peaks: Optional[int] = None, normalize: bool = True, ) -> xr.DataArray: """Return peak hydrographs. Return a hydrograph of `wdw_size` length around each peak in `da_peaks` with a max value or 1 at the peak. The mean hydrograph can be derived by applying statistics along the 'peak' output dimension. Parameters ---------- da : xr.DataArray Timeseries data, must have a regular spaced 'time' dimension. da_peaks : xr.DataArray Timeseries data with only peak values, all other values are set to NaN wdw_size : int Length of hydrographs measured in the time series time step. n_peaks : int, optional N largest peaks to return. If None (default) all peaks are returned. normalize : bool, optional If True (default) return peak hydrographs normalized by peak value. Returns ------- xr.DataArray Hydrographs with new 'peak' and 'dt' dimensions. """ assert da.shape == da_peaks.shape, "da and da_peaks must have identical shape" if da_peaks.dtype != "bool": da_peaks = np.isfinite(da_peaks) if n_peaks is None: # n_peaks required for output dimensions n_peaks = da_peaks.sum("time").max().compute().item() # temp method with arguments set def _func(ts, peaks, wdw_size=wdw_size, n_peaks=n_peaks, normalize=normalize): return hydrograph_1d(ts, peaks, wdw_size, n_peaks, normalize) if da.ndim == 1: # fix case with single dim da = da.expand_dims("index") da_peaks = da_peaks.expand_dims("index") da_shape = xr.apply_ufunc( _func, da.chunk({"time": -1}), da_peaks.chunk({"time": -1}), input_core_dims=[["time"], ["time"]], output_core_dims=[["peak", "dt"]], dask_gufunc_kwargs=dict(output_sizes={"peak": n_peaks, "dt": wdw_size}), vectorize=True, dask="parallelized", output_dtypes=[float], ).rename({"dt": "time"}) # set time coordinate t0 = int(np.floor(wdw_size / 2)) da_shape["time"] = xr.IndexVariable("time", np.arange(-t0, t0 + wdw_size % 2)) return da_shape.squeeze()
@njit def hydrograph_1d( ts: np.ndarray, peaks: np.ndarray, wdw_size: int, n_peaks: Optional[int] = None, normalize: bool = True, ) -> np.ndarray: """Return hydrograph 1D. Return 2D array of shape (`n_peaks`, `wdw_size`) with normalized hydrographs from time series `ts`. Parameters ---------- ts : np.ndarray of float 1D array with constant spaced time series peaks : np.ndarray of bool 1D array with constant spaced time series, True where peaks wdw_size : int Size of hydrograph in unit of time series time step n_peaks : int, optional N largest peaks to return. If None (default) all peaks are returned. normalize : bool, optional If True (default) return peak hydrographs normalized by peak value. Returns ------- np.ndarray normalized hydrographs """ assert ts.shape == peaks.shape, "the shapes of ts and peaks mismatch" idxs = np.where(peaks)[0] n0 = idxs.size if n_peaks is None else int(n_peaks) out = np.full((n0, wdw_size), np.nan, ts.dtype) seq = np.argsort(ts[idxs])[::-1] # sort from large to small n = ts.size d0 = int(np.floor(wdw_size / 2)) d1 = wdw_size - d0 for i in range(n0): idx = idxs[seq[i]] idx0 = idx - d0 idx1 = idx + d1 s = slice(max(0, idx0), min(n + 1, idx1)) s1 = slice(max(0, -idx0), n - idx0 if idx1 > n else idx1 - idx0) out[i, s1] = ts[s] / ts[idx] if normalize else ts[s] return out