"""Rootzoneclim workflows for Wflow plugin."""
import logging
from typing import Optional
import numpy as np
import pandas as pd
import pyflwdir
import xarray as xr
from hydromt import flw
from scipy import optimize
logger = logging.getLogger(__name__)
__all__ = ["rootzoneclim"]
def determine_budyko_curve_terms(
ds_sub_annual,
):
"""
Determine the Budyko terms.
(Note that the discharge coefficient and evaporative
index for the future climate, if present, are not correct yet and will
be adjusted at a later stage).
Parameters
----------
ds_sub_annual : xr.Dataset
Dataset containing per subcatchment the annual precipitation,
potential evaporation and specific discharge sums.
Returns
-------
ds_sub_annual : xr.Dataset
Similar to input, but containing the discharge coefficient, aridity
index and the evaporative index as long term averages.
"""
ds_sub_annual["discharge_coeff"] = (
ds_sub_annual["specific_Q"] / ds_sub_annual["precip_mean"]
).mean("time", skipna=True)
ds_sub_annual["aridity_index"] = (
ds_sub_annual["pet_mean"] / ds_sub_annual["precip_mean"]
).mean("time", skipna=True)
ds_sub_annual["evap_index"] = 1 - ds_sub_annual["discharge_coeff"]
# Make sure Ea = P - Q < Ep. If not, we will not use that subcatchment for
# the calculations.
ds_sub_annual["discharge_coeff"] = ds_sub_annual["discharge_coeff"].where(
ds_sub_annual["evap_index"] < ds_sub_annual["aridity_index"]
)
ds_sub_annual["aridity_index"] = ds_sub_annual["aridity_index"].where(
ds_sub_annual["evap_index"] < ds_sub_annual["aridity_index"]
)
ds_sub_annual["evap_index"] = ds_sub_annual["evap_index"].where(
ds_sub_annual["evap_index"] < ds_sub_annual["aridity_index"]
)
# Final check, if a coefficient < 0.0 or >>1.0, set to nan
ds_sub_annual["discharge_coeff"] = (
ds_sub_annual["discharge_coeff"]
.where(ds_sub_annual["discharge_coeff"] < 10.0)
.where(ds_sub_annual["discharge_coeff"] > 0.0)
)
ds_sub_annual["aridity_index"] = (
ds_sub_annual["aridity_index"]
.where(ds_sub_annual["aridity_index"] < 10.0)
.where(ds_sub_annual["aridity_index"] > 0.0)
)
ds_sub_annual["evap_index"] = (
ds_sub_annual["evap_index"]
.where(ds_sub_annual["evap_index"] < 10.0)
.where(ds_sub_annual["evap_index"] > 0.0)
)
return ds_sub_annual
def determine_omega(ds_sub_annual):
"""
Determine the omega parameter.
This function uses the Zhang function (as defined in Teuling et al., 2019)
to determine the omega parameter for ds_sub_annual.
Teuling, A. J., de Badts, E. A. G., Jansen, F. A., Fuchs, R., Buitink, J.,
Hoek van Dijke, A. J., and Sterling, S. M.: Climate change, reforestation/
afforestation, and urbanization impacts on evapotranspiration and streamflow
in Europe, Hydrol. Earth Syst. Sci., 23, 3631–3652,
https://doi.org/10.5194/hess-23-3631-2019, 2019.
Parameters
----------
ds_sub_annual : xr.Dataset
Dataset containing at least the discharge coefficient, aridity
index and evaporative index for the different forcing types.
Returns
-------
ds_sub_annual : xr.Dataset
Same as above, but with the omega parameter added. The omega parameter
is the same for all forcing types and based on the observations.
"""
# Constract the output variable in the dataset
ds_sub_annual = ds_sub_annual.assign(
omega=lambda ds_sub_annual: ds_sub_annual["discharge_coeff"] * np.nan
)
# Load the aridity index and evaporative index as np arrays (this saves
# calculation time in the loop below)
# calculate omega for "obs" and "cc_hist":
if "cc_hist" in ds_sub_annual.forcing_type:
forcing_types = ["obs", "cc_hist"]
else:
forcing_types = ["obs"]
for forcing_type in forcing_types:
aridity_index = ds_sub_annual.sel(forcing_type=forcing_type)[
"aridity_index"
].values
evap_index = ds_sub_annual.sel(forcing_type=forcing_type)["evap_index"].values
# Set the temporary omega variable
omega = np.zeros((len(ds_sub_annual.index),))
for subcatch_index_nr in range(len(ds_sub_annual.index)):
if evap_index[subcatch_index_nr] > 0: # make sure evap index is not nan.
try:
omega_temp = optimize.brentq(
Zhang,
0.000000000001,
100,
args=(
aridity_index[subcatch_index_nr],
evap_index[subcatch_index_nr],
),
)
omega[subcatch_index_nr] = omega_temp
# possible error that occurs: "ValueError: f(a) and f(b) must
# have different signs") -- increase a and b range solves the issue.
except ValueError:
logger.warning("No value for omega could be derived.")
omega[subcatch_index_nr] = np.nan
else:
omega[subcatch_index_nr] = np.nan
# Add omega to the xr dataset
ds_sub_annual["omega"].loc[dict(forcing_type=forcing_type)] = omega
return ds_sub_annual
def determine_Peffective_Interception_explicit(ds_sub, Imax, intercep_vars_sub=None):
"""
Determine the effective precipitation, interception evaporation and canopy storage.
Based on (daily) values of precipitation and potential evaporation.
Parameters
----------
ds_sub : xr.Dataset
Dataset containing precipitation and potential evaporation
(precip_mean and pet_mean).
Imax : float
The maximum interception storage capacity [mm].
intercep_vars_sub : xr.Dataarray
Dataarray from staticmaps containing the Imax values for a number
of time steps, e.g. every month. If present, this is used to determine
Imax per time step. The default is None.
Returns
-------
ds_sub : xr.Dataset
same as above, but with effective precipitation, interception evaporation
and canopy storage added.
"""
# Make the output dataset ready for the output
ds_sub["evap_interception"] = xr.full_like(
ds_sub["precip_mean"], fill_value=ds_sub["precip_mean"].vector.nodata
)
ds_sub["evap_interception"].load()
ds_sub["precip_effective"] = xr.full_like(
ds_sub["precip_mean"],
fill_value=ds_sub["precip_mean"].vector.nodata,
)
ds_sub["precip_effective"].load()
ds_sub["canopy_storage"] = xr.full_like(
ds_sub["precip_mean"],
fill_value=ds_sub["precip_mean"].vector.nodata,
)
ds_sub["canopy_storage"].load()
# Calculate it per forcing type
for forcing_type in ds_sub["forcing_type"].values:
nr_time_steps = len(
ds_sub[["precip_mean", "pet_mean"]]
.sel(forcing_type=forcing_type)
.dropna("time")
.time
)
# Add new empty variables that will be filled in the loop
evap_interception = np.zeros(
(len(ds_sub.sel(forcing_type=forcing_type)["index"]), nr_time_steps)
)
precip_effective = np.zeros(
(len(ds_sub.sel(forcing_type=forcing_type)["index"]), nr_time_steps)
)
canopy_storage = np.zeros(
(len(ds_sub.sel(forcing_type=forcing_type)["index"]), nr_time_steps)
)
# Load the potential evaporation and precipitation into memory to speed up
# the subsequent for loop.
# make sure order of coord is the same.
Epdt = (
ds_sub.sel(forcing_type=forcing_type)["pet_mean"]
.dropna("time")
.transpose("index", "time")
.values
)
Pdt = (
ds_sub.sel(forcing_type=forcing_type)["precip_mean"]
.dropna("time")
.transpose("index", "time")
.values
)
# Loop through the time steps and determine the variables per time step.
for i in range(0, nr_time_steps):
if intercep_vars_sub is not None:
# TODO: for now assumed that LAI contains monthly data,
# change this for future
month = pd.to_datetime(ds_sub.time[i].values).month
Imax = intercep_vars_sub["Imax"].sel(time=month)
# Determine the variables with a simple interception reservoir approach
canopy_storage[:, i] = canopy_storage[:, i] + Pdt[:, i]
precip_effective[:, i] = np.maximum(0, canopy_storage[:, i] - Imax)
canopy_storage[:, i] = canopy_storage[:, i] - precip_effective[:, i]
evap_interception[:, i] = np.minimum(Epdt[:, i], canopy_storage[:, i])
canopy_storage[:, i] = canopy_storage[:, i] - evap_interception[:, i]
# Update Si for the next time step
if i < nr_time_steps - 1:
canopy_storage[:, i + 1] = canopy_storage[:, i]
# insert in ds for the time that is available in each forcing type for
# precip and pet
time_forcing_type = (
ds_sub[["precip_mean", "pet_mean"]]
.sel(forcing_type=forcing_type)
.dropna("time")
.time
)
ds_sub["evap_interception"].loc[
dict(forcing_type=forcing_type, time=time_forcing_type)
] = evap_interception
ds_sub["precip_effective"].loc[
dict(forcing_type=forcing_type, time=time_forcing_type)
] = precip_effective
ds_sub["canopy_storage"].loc[
dict(forcing_type=forcing_type, time=time_forcing_type)
] = canopy_storage
return ds_sub
def determine_storage_deficit(ds_sub, correct_cc_deficit):
"""
Determine the storage deficit for every time step.
Also for the subcatchment location and dataset in ds_sub.
Parameters
----------
ds_sub : xr.Dataset
Dataset containing the daily or higher-resolution data.
Returns
-------
ds_sub : xr.Dataset
Same as above, but containing the storage deficits per time step for all
forcing types.
"""
# make sure the order of the coordinates is always the same.
# Calculate it per forcing type
ds_sub["storage_deficit"] = xr.full_like(
ds_sub["precip_mean"],
fill_value=ds_sub["precip_mean"].vector.nodata,
)
ds_sub["storage_deficit"].load()
for forcing_type in ds_sub["forcing_type"].values:
time_forcing_type = (
ds_sub["precip_effective"]
.sel(forcing_type=forcing_type)
.dropna("time")
.time.values
)
transpiration = (
ds_sub["transpiration"]
.sel(forcing_type=forcing_type, time=time_forcing_type)
.transpose("index", "time")
.values
)
precip_effective = (
ds_sub["precip_effective"]
.sel(forcing_type=forcing_type, time=time_forcing_type)
.transpose("index", "time")
.values
)
storage_deficit = np.zeros((len(ds_sub["index"]), len(time_forcing_type)))
# Determine the storage deficit per time step
for i in range(1, len(time_forcing_type)):
storage_deficit[:, i] = np.minimum(
0,
storage_deficit[:, i - 1]
+ (precip_effective[:, i] - transpiration[:, i]),
)
# Create the storage deficit data array
ds_sub["storage_deficit"].loc[
dict(forcing_type=forcing_type, time=time_forcing_type)
] = storage_deficit
# If there are climate projections present, adjust the storage deficit for
# the future projections based on Table S1 in Bouaziz et al., 2002, HESS.
# cc_hist remains as is (see Table S1)
if (len(ds_sub.forcing_type) > 1) & (correct_cc_deficit == True):
if (
len(
ds_sub["precip_mean"]
.sel(forcing_type=["cc_fut", "cc_hist"])
.dropna("time")
.time
)
> 0
):
ds_sub["storage_deficit"].loc[dict(forcing_type="cc_fut")] = ds_sub[
"storage_deficit"
].sel(forcing_type="obs") + np.minimum(
0.0,
ds_sub["storage_deficit"].sel(forcing_type="cc_fut")
- ds_sub["storage_deficit"].sel(forcing_type="cc_hist"),
)
else:
logger.warning(
"Time period of cc_hist and cc_fut does not overlap. \
Correct_cc_deficit not applied."
)
return ds_sub
def fut_discharge_coeff(ds_sub_annual, correct_cc_deficit):
"""
Determine the future discharge coefficient.
Based on a given omega value (generally same as in the
current-climate observations), the aridity index of the future climate and the
difference in Q and PET between the simulated historical and
simulated future climate.
Parameters
----------
ds_sub_annual : xr.Dataset
Dataset containing at least the future omega and aridity index.
Returns
-------
ds_sub_annual : xr.Dataset
Similar to previous, but containing the new future discharge coefficient
and evaporative index.
"""
# Determine the delP and delEP for cc_fut
Ep = ds_sub_annual["pet_mean"].sel(forcing_type="obs").mean("time")
P = ds_sub_annual["precip_mean"].sel(forcing_type="obs").mean("time")
delP = (
ds_sub_annual["precip_mean"]
.sel(forcing_type=["cc_hist", "cc_fut"])
.mean("time")
.diff("forcing_type")
.sel(forcing_type="cc_fut")
)
delEp = (
ds_sub_annual["pet_mean"]
.sel(forcing_type=["cc_hist", "cc_fut"])
.mean("time")
.diff("forcing_type")
.sel(forcing_type="cc_fut")
)
# Determine the difference in discharge between the observations and the
# future climate simulations
Q_obs_mean = ds_sub_annual["specific_Q"].mean("time")
# if correct_cc_deficit=False -- omega is from cc_hist, else omega is from obs
if correct_cc_deficit == True:
omega = ds_sub_annual["omega"].sel(forcing_type="obs")
else:
omega = ds_sub_annual["omega"].sel(forcing_type="cc_hist")
aridity_index_fut = (Ep + delEp) / (P + delP)
change_Q_total = Zhang_future(omega, aridity_index_fut) * (P + delP) - Q_obs_mean
# Determine the discharge coefficient for the future climate
ds_sub_annual["discharge_coeff"].loc[dict(forcing_type="cc_fut")] = (
Q_obs_mean + change_Q_total
) / (P + delP)
return ds_sub_annual
def gumbel_su_calc_xr(
storage_deficit_annual, storage_deficit_count, return_period, threshold
):
"""
Determine the Gumbel distribution of the annual maximum storage deficits.
For a set of return periods.
Parameters
----------
storage_deficit_annual : xr.Dataset
Dataset with the minimum deficit per year as a positive value.
storage_deficit_count : xr.Dataset
Dataset containing the number of days with data per year. This
indicates on how many days a minimum in year_min_storage_deficit is
based.
return_period : list
List with one or more values indiciating the return period(s) (in
years) for wich the rootzone storage depth should be calculated.
threshold : int
Required minimum number of days in a year containing data to take the
year into account for the calculation..
Returns
-------
gumbel : xr.Dataset
Dataset, similar to year_min_storage_deficit, but containing the
root-zone storage capacity per return period.
"""
# Only take the years into account that contain more than [threshold] days
# of data.
storage_deficit_annual = storage_deficit_annual.where(
storage_deficit_count > threshold
)
# Calculate the mean and standard deviation for the Gumbel distribution
annual_mean = storage_deficit_annual.mean("time", skipna=True)
annual_std = storage_deficit_annual.std("time", skipna=True)
# Calculate alpha and mu
alpha = (np.sqrt(6.0) * annual_std) / np.pi
mu = annual_mean - 0.5772 * alpha
# Create the output dataset
gumbel = storage_deficit_annual.to_dataset(name="storage_deficit")
# Set the return periods
RP = return_period
gumbel["RP"] = RP
gumbel["rootzone_storage"] = (
("index", "forcing_type", "RP"),
np.zeros(
(len(gumbel["index"]), len(gumbel["forcing_type"]), len(gumbel["RP"]))
),
)
# Determine the root zone storage for different return periods using alpha
# and mu
gumbel["yt"] = -np.log(-np.log(1 - (1 / gumbel["RP"])))
gumbel["rootzone_storage"] = mu + alpha * gumbel["yt"]
return gumbel
def Zhang(omega, aridity_index, evap_index):
"""
Calculate omega according to Zhang.
This is the Zhang equation with omega as in Teuling et al., 2019.
This function is used to get omega for historical situations when
aridity_index and evap_index are known (assuming evap_index = 1 - discharge_coeff).
This equation is solved for Zhang eq = 0.
Parameters
----------
omega : float
Parameter of the Budyko curve.
aridity_index : float
The aridity index.
evap_index : float
The evaporative index.
Returns
-------
Value 0.
References
----------
Fu, B.: On the calculation of the evaporation from land surface,
Scientia Atmospherica Sinica, 5, 23–31, 1981 (in Chinese).
Teuling, A. J., de Badts, E. A. G., Jansen, F. A., Fuchs, R., Buitink, J.,
Hoek van Dijke, A. J., and Sterling, S. M.: Climate change, reforestation/
afforestation, and urbanization impacts on evapotranspiration and streamflow
in Europe, Hydrol. Earth Syst. Sci., 23, 3631–3652,
https://doi.org/10.5194/hess-23-3631-2019, 2019.
Zhang, L., Hickel, K., Dawes, W. R., Chiew, F. H., Western, A. W., and
Briggs, P. R.: A rational function approach for estimating mean annual
evapotranspiration, Water Resour. Res., 40, 1–14,
https://doi.org/10.1029/2003WR002710, 2004.
"""
return 1 + aridity_index - (1 + aridity_index**omega) ** (1 / omega) - evap_index
def Zhang_future(omega, aridity_index):
"""
Calculate discharge coefficients according to Zhang.
Once omega has been derived for historical situations, it can be used to
derive the new discharge coefficient when the future aridity index (Ep/P)
is known. This will allow the discharge coeffcient to shift over the same
line as the historical situation.
Parameters
----------
omega : xr.Datarray
Datarray containing the future omega values per sub catchment.
aridity_index : xr.Datarray
Datarray containing the future aridity index values per sub
catchment.
Returns
-------
Datarray containing the future discharge coefficient.
"""
return -(aridity_index - (1 + aridity_index**omega) ** (1 / omega))
def check_inputs(
start_hydro_year, start_field_capacity, dsrun, ds_obs, ds_cc_hist, ds_cc_fut
):
# Start with some initial checks
list_of_months = [
"Jan",
"Feb",
"Mar",
"Apr",
"May",
"Jun",
"Jul",
"Aug",
"Sep",
"Oct",
"Nov",
"Dec",
]
if start_hydro_year not in list_of_months:
raise ValueError(
f"start_hydro_year not in {list_of_months}: provide a valid month"
)
if start_field_capacity not in list_of_months:
raise ValueError(
f"start_field_capacity not in {list_of_months}: provide a valid month"
)
if "discharge" not in list(dsrun.keys()):
raise ValueError("Variable discharge not in run_fn")
if "precip" not in list(ds_obs.keys()):
raise ValueError("Variable precip not in forcing_obs_fn")
if "pet" not in list(ds_obs.keys()):
raise ValueError("Variable pet not in forcing_obs_fn")
if ds_cc_hist is not None:
if "precip" not in list(ds_cc_hist.keys()):
raise ValueError("Variable precip not in forcing_cc_hist_fn")
if "pet" not in list(ds_cc_hist.keys()):
raise ValueError("Variable pet not in forcing_cc_hist_fn")
if ds_cc_fut is not None:
if "precip" not in list(ds_cc_fut.keys()):
raise ValueError("Variable precip not in forcing_cc_fut_fn")
if "pet" not in list(ds_cc_fut.keys()):
raise ValueError("Variable pet not in forcing_cc_fut_fn")
return None
[docs]
def rootzoneclim(
dsrun: xr.Dataset,
ds_obs: xr.Dataset,
ds_like: xr.Dataset,
flwdir: xr.DataArray,
ds_cc_hist: Optional[xr.Dataset] = None,
ds_cc_fut: Optional[xr.Dataset] = None,
return_period: list = [2, 3, 5, 10, 15, 20, 25, 50, 60, 100],
Imax: float = 2.0,
start_hydro_year: str = "Sep",
start_field_capacity: str = "Apr",
LAI: bool = False,
rootzone_storage: bool = False,
correct_cc_deficit: bool = False,
chunksize: int = 100,
missing_days_threshold: int = 330,
logger=logger,
):
"""
Estimates the root zone storage parameter.
for current observed and (optionally) for future climate-based streamflow data.
The root zone storage capacity parameter is calculated per subcatchment and is
converted to a gridded map at model resolution. Optionally, this function
can return the wflow_sbm parameter RootingDepth by dividing the root zone
storage parameter by (theta_s - theta_r).
The method is based on the estimation of maximum annual storage deficits based on
precipitation and estimated actual evaporation time series, which in turn are
estimated from observed streamflow data and long-term precipitation and
potential evap. data, as explained in Bouaziz et al. (2022).
The main assumption is that vegetation adapts its rootzone storage capacity to
overcome dry spells with a certain return period
(typically 20 years for forest ecosystems).
In response to a changing climtate, it is likely that vegetation also adapts its
rootzone storage capacity, thereby changing model parameters for future conditions.
This method also allows to estimate the change in rootzone storage capacity in
response to a changing climate.
Parameters
----------
dsrun : xr.Dataset
Geodataset with streamflow locations and timeseries, named "discharge" (m3/s).
The geodataset expects the coordinate names "index" (for each station id).
ds_obs : xr.Dataset
Dataset with the observed forcing data (precip and pet) [mm/timestep].
ds_like : xr.Dataset
Dataset with staticmaps at model resolution.
flwdir : FlwDirRaster
flwdir object
ds_cc_hist : xr.Dataset
Dataset with the simulated historical forcing data (precip and pet) \
[mm/timestep],
based on a climate model.
The default is None.
ds_cc_fut : xr.Dataset
Dataset with the simulated future climate forcing data (precip and pet) \
[mm/timestep],
based on a climate model.
The default is None.
return_period : list
List with one or more values indiciating the return period(s) (in
years) for wich the rootzone storage depth should be calculated.
The default is [2,3,5,10,15,20,25,50,60,100]
Imax : float
The maximum interception storage capacity [mm].
The default is 2 mm.
start_hydro_year : str
The start month (abreviated to the first three letters of the month,
starting with a capital letter) of the hydrological year.
The default is "Sep".
start_field_capacity : str
The end of the wet season / commencement of dry season. This is the
moment when the soil is at field capacity, i.e. there is no storage
deficit yet.
The default is "Apr".
rootzone_storage : bool
Boolean to indicate whether the rootzone storage maps
should be stored in the staticmaps or not. The default is False.
LAI : bool
Determine whether the LAI will be used to determine Imax.
Requires to have run setup_laimaps.
The default is False.
chunksize : int
Chunksize on time dimension for processing data (not for saving to
disk!). A default value of 100 is used on the time dimension.
correct_cc_deficit : bool
Determines whether a bias-correction of the future deficit should be
applied. If the climate change scenario and hist period are bias-corrected,
this should probably be set to False.
missing_days_threshold: int, optional
Minimum number of days within a year for that year to be counted in the
long-term Budyko analysis.
Returns
-------
ds_out : xr.Dataset
Dataset containing root zone storage capacity (optional) and RootingDepth for
several forcing and return periods.
gdf_basins_all : GeoDataFrame
Geodataframe containing the root zone storage capacity values for
each basin before filling NaN.
References
----------
Bouaziz, L. J. E., Aalbers, E. E., Weerts, A. H., Hegnauer, M., Buiteveld,
H., Lammersen, R., Stam, J., Sprokkereef, E., Savenije, H. H. G. and
Hrachowitz, M. (2022). Ecosystem adaptation to climate change: the
sensitivity of hydrological predictions to time-dynamic model parameters,
Hydrology and Earth System Sciences, 26(5), 1295-1318. DOI:
10.5194/hess-26-1295-2022.
"""
# Start with some initial checks
check_inputs(
start_hydro_year, start_field_capacity, dsrun, ds_obs, ds_cc_hist, ds_cc_fut
)
# If LAI = True, create a new xr dataset containing the interception pars
if LAI == True:
intercep_vars = ds_like.LAI.to_dataset(name="LAI")
intercep_vars["Swood"] = ds_like["Swood"]
intercep_vars["Sl"] = ds_like["Sl"]
# Set the output dataset at model resolution
ds_out = xr.Dataset(coords=ds_like.raster.coords)
x_dim = ds_out.raster.x_dim
y_dim = ds_out.raster.y_dim
# Make a basin map containing all subcatchments from geodataset of observed
# streamflow
x_dim_dsrun = dsrun.vector.x_name
y_dim_dsrun = dsrun.vector.y_name
x, y, ids = dsrun[x_dim_dsrun].values, dsrun[y_dim_dsrun].values, dsrun.index.values
gdf_basins = pd.DataFrame()
# Loop over basins and get per gauge location a polygon of the upstream
# area.
for i, id in enumerate(dsrun.index.values):
ds_basin_single = flw.basin_map(
ds_like,
flwdir,
ids=ids[i],
xy=(x[i], y[i]),
stream=ds_like["wflow_river"],
)[0]
ds_basin_single.name = int(dsrun.index.values[i])
ds_basin_single.raster.set_crs(ds_like.raster.crs)
gdf_basin_single = ds_basin_single.raster.vectorize()
gdf_basins = pd.concat([gdf_basins, gdf_basin_single])
# Set index to catchment id
gdf_basins.index = gdf_basins.value.astype("int")
# Add the catchment area to gdf_basins and sort in a descending order
# make sure to also snap to river when retrieving areas
idxs_gauges = flwdir.snap(xy=(x, y), mask=ds_like["wflow_river"].values)[0]
areas_uparea = ds_like["wflow_uparea"].values.flat[idxs_gauges]
df_areas = pd.DataFrame(index=ids, data=areas_uparea * 1e6, columns=["area"])
gdf_basins = pd.concat([gdf_basins, df_areas], axis=1)
gdf_basins = gdf_basins.sort_values(by="area", ascending=False)
# drop basins where area is NaN.
gdf_basins = gdf_basins[(gdf_basins["area"] >= 0)]
# calculate mean areal precip and pot evap for the full upstream area of each gauge.
# loop over ds_obs, ds_cc_hist and ds_cc_fut as they might have
# different coordinate systems and then merge.
ds_sub_obs = ds_obs.raster.zonal_stats(gdf_basins, stats=["mean"])
logger.info("Computing zonal statistics for obs, this can take a while")
ds_sub_obs = ds_sub_obs.compute()
if ds_cc_hist is not None:
ds_sub_cc_hist = ds_cc_hist.raster.zonal_stats(gdf_basins, stats=["mean"])
logger.info("Computing zonal statistics for cc_hist, this can take a while")
ds_sub_cc_hist = ds_sub_cc_hist.compute()
if ds_cc_fut is not None:
ds_sub_cc_fut = ds_cc_fut.raster.zonal_stats(gdf_basins, stats=["mean"])
logger.info("Computing zonal statistics for cc_fut, this can take a while")
ds_sub_cc_fut = ds_sub_cc_fut.compute()
# Concatenate all forcing types (obs, cc_hist, cc_fut) into a
# xr dataset after zonal stats
if ds_cc_hist is not None and ds_cc_fut is not None:
ds_sub = xr.concat(
[ds_sub_obs, ds_sub_cc_hist, ds_sub_cc_fut],
pd.Index(["obs", "cc_hist", "cc_fut"], name="forcing_type"),
)
else:
ds_sub = xr.concat([ds_sub_obs], pd.Index(["obs"], name="forcing_type"))
# Also get the zonal statistics of the intercep_vars
if LAI == True:
intercep_vars_sub = intercep_vars.raster.zonal_stats(gdf_basins, stats=["mean"])
intercep_vars_sub = intercep_vars_sub.compute()
# Determine the Imax for every time step in the LAI data
intercep_vars_sub["Imax"] = (
intercep_vars_sub["Swood_mean"]
+ intercep_vars_sub["LAI_mean"] * intercep_vars_sub["Sl_mean"]
)
else:
intercep_vars_sub = None
# Get the time step of the datasets and make sure they all have a daily
# time step. If not, resample.
time_step = 86400 # in seconds
if (ds_sub.time[1].values - ds_sub.time[0].values).astype("timedelta64[s]").astype(
np.int32
) != time_step:
ds_sub = ds_sub.resample(time="1D").sum("time", skipna=True)
if (dsrun.time[1].values - dsrun.time[0].values).astype("timedelta64[s]").astype(
np.int32
) != time_step:
dsrun = dsrun.resample(time="1D").mean("time", skipna=True)
# Determine effective precipitation, interception evporation and canopy
# storage
logger.info(
"Determine effective precipitation, interception evporation and canopy storage"
)
ds_sub = determine_Peffective_Interception_explicit(
ds_sub, Imax=Imax, intercep_vars_sub=intercep_vars_sub
)
# Add specific discharge per location-index value to ds_sub
# First, sort dsrun based on descending subcatchment area (which is already
# done in ds_sub)
dsrun = dsrun.sel(index=ds_sub.index.values)
# Get the specific discharge (mm/timestep) per location in order to have
# everything in mm/timestep
dsrun = dsrun.assign(
specific_Q=dsrun["discharge"].transpose("time", "index")
/ np.array(gdf_basins["area"])
* time_step
* 1000.0
)
# Add specific discharge to ds_sub
ds_sub = xr.merge([ds_sub, dsrun["specific_Q"].to_dataset()], compat="override")
# Rechunk data
ds_sub = ds_sub.chunk(
chunks={"index": len(ds_sub.index), "forcing_type": 1, "time": chunksize}
)
# Get year sums of ds_sub
# a threshold is used to use only years with sufficient data
ds_sub_annual = ds_sub.resample(time=f"AS-{start_hydro_year}").sum(
"time", skipna=True, min_count=missing_days_threshold
)
# Determine discharge coefficient, the aridity index and the evaporative
# index
ds_sub_annual = determine_budyko_curve_terms(
ds_sub_annual,
)
# set runoff coefficient of cc_hist equal to runoff coeff of obs
if correct_cc_deficit == True:
ds_sub_annual["discharge_coeff"].loc[
dict(forcing_type="cc_hist")
] = ds_sub_annual["discharge_coeff"].sel(forcing_type="obs")
# Determine omega
logger.info("Calculating the omega values, this can take a while")
ds_sub_annual = determine_omega(ds_sub_annual)
# Determine future discharge ratio for cc_fut if ds_cc_fut exists
if ds_cc_fut is not None:
ds_sub_annual = fut_discharge_coeff(ds_sub_annual, correct_cc_deficit)
# Determine long-term interception, potential evaporation and tranpsiration;
# use runoff coefficient instead of Qobs to calculate actual evaporation for
# each climate projection
transpiration_long_term = (
ds_sub_annual["precip_effective"].mean("time")
- ds_sub_annual["precip_mean"].mean("time") * ds_sub_annual["discharge_coeff"]
) # ds_annual['Qobs_mm'].mean('time')
interception_long_term = ds_sub_annual["evap_interception"].mean("time")
pet_long_term = ds_sub_annual["pet_mean"].mean("time")
# Determine the transpiration on the finer time step (e.g. daily)
ds_sub["transpiration"] = (
(ds_sub["pet_mean"] - ds_sub["evap_interception"])
* transpiration_long_term
/ (pet_long_term - interception_long_term)
)
transpiration_long_term = None
interception_long_term = None
pet_long_term = None
# Determine storage deficit
logger.info("Determining the storage deficit")
ds_sub = determine_storage_deficit(ds_sub, correct_cc_deficit)
# From the storage deficit, determine the rootzone storage capacity using
# a Gumbel distribution.
logger.info("Calculating the Gumbel distribution and rootzone storage capacity")
# Determine the yearly minima in storage deficit (and make sure the values
# are positive)
storage_deficit_annual = -(
ds_sub["storage_deficit"]
.resample(time=f"AS-{start_field_capacity}")
.min("time", skipna=True)
)
# A counter will be used to only use years with sufficient days containing
# data for the Gumbel distribution
storage_deficit_count = (
ds_sub["storage_deficit"]
.resample(time=f"AS-{start_field_capacity}")
.count("time")
)
# Subsequently, determine the Gumbel distribution
gumbel = gumbel_su_calc_xr(
storage_deficit_annual,
storage_deficit_count.sel(time=storage_deficit_count.time[1:]),
return_period=return_period,
threshold=missing_days_threshold,
)
# Create a new geopandas dataframe, which will be used to rasterize the
# results
ds_basins_all = flw.basin_map(
ds_like,
flwdir,
ids=ids,
xy=(x, y),
stream=ds_like["wflow_river"],
)[0]
ds_basins_all.raster.set_crs(ds_like.raster.crs)
gdf_basins_all = ds_basins_all.raster.vectorize()
gdf_basins_all.index = gdf_basins_all.value.astype("int")
# Add the area and sort by area (from large to small)
# use previous df
gdf_basins_all = pd.concat([gdf_basins_all, df_areas], axis=1)
gdf_basins_all = gdf_basins_all.sort_values(by="area", ascending=False)
# again drop basins where area is NaN
gdf_basins_all = gdf_basins_all[(gdf_basins_all["area"] >= 0)]
# Add the rootzone storage to gdf_basins_all, per forcing type and return
# period
for return_period in gumbel.RP.values:
for forcing_type in gumbel.forcing_type.values:
gdf_basins_all[
f"rootzone_storage_{forcing_type}_{str(return_period)}"
] = gumbel["rootzone_storage"].sel(
RP=return_period, forcing_type=forcing_type
)
# Make sure to give the NaNs a value, otherwise they will become 0.0
gdf_basins_all[
f"rootzone_storage_{forcing_type}_{str(return_period)}"
] = gdf_basins_all[
f"rootzone_storage_{forcing_type}_{str(return_period)}"
].fillna(
-999
)
# Rasterize this (from large subcatchments to small ones)
for return_period in gumbel.RP.values:
for forcing_type in gumbel.forcing_type.values:
da_area = ds_like.raster.rasterize(
gdf=gdf_basins_all,
col_name=f"rootzone_storage_{forcing_type}_{str(return_period)}",
nodata=-999,
all_touched=True,
).to_dataset(name="rasterized_temp")
# Fill up the not a numbers with the data from a downstream point
out_raster = pyflwdir.FlwdirRaster.fillnodata(
flwdir,
data=da_area["rasterized_temp"],
nodata=-999,
direction="up",
how="max",
)
# Make sure to fill up full domain with value of most downstream point
# that contains values
fill_value = None
for value in gdf_basins_all[
f"rootzone_storage_{forcing_type}_{str(return_period)}"
]:
if value > 0.0:
if fill_value is None:
fill_value = value
out_raster = np.where(out_raster == -999.0, fill_value, out_raster)
# Store the rootzone_storage in ds_out is rootzone_storage flag is
# set to True.
if rootzone_storage == True:
ds_out[f"rootzone_storage_{forcing_type}_{str(return_period)}"] = (
(y_dim, x_dim),
out_raster,
)
# Store the RootingDepth in ds_out
ds_out[f"RootingDepth_{forcing_type}_{str(return_period)}"] = (
(y_dim, x_dim),
out_raster / (ds_like["thetaS"].values - ds_like["thetaR"].values),
)
return ds_out, gdf_basins_all