"""Waterbody workflows for Wflow plugin."""
import json
import logging
import geopandas as gp
import numpy as np
import pandas as pd
import shapely
import xarray as xr
logger = logging.getLogger(__name__)
__all__ = ["waterbodymaps", "reservoirattrs", "lakeattrs"]
[docs]
def waterbodymaps(
gdf,
ds_like,
wb_type="reservoir",
uparea_name="uparea",
logger=logger,
):
"""Return waterbody (reservoir/lake) maps (see list below).
At model resolution based on gridded upstream area data input or outlet coordinates.
The following waterbody maps are calculated:
- resareas/lakeareas : waterbody areas mask [ID]
- reslocs/lakelocs : waterbody outlets [ID]
Parameters
----------
gdf : geopandas.GeoDataFrame
GeoDataFrame containing reservoirs/lakes geometries and attributes.
ds_like : xarray.DataArray
Dataset at model resolution.
uparea_name : str, optional
Name of uparea variable in ds_like. If None then database coordinates will be
used to setup outlets
wb_type : str, optional either "reservoir" or "lake"
Option to change the name of the maps depending on reservoir or lake
Returns
-------
ds_out : xarray.DataArray
Dataset containing gridded waterbody data
"""
# Rasterize the GeoDataFrame to get the areas mask of waterbodies
res_id = gdf["waterbody_id"].values
da_wbmask = ds_like.raster.rasterize(
gdf,
col_name="waterbody_id",
nodata=-999,
all_touched=True,
dtype=None,
sindex=False,
)
da_wbmask = da_wbmask.rename("resareas")
da_wbmask.attrs.update(_FillValue=-999)
ds_out = da_wbmask.to_dataset()
if not np.all(np.isin(res_id, ds_out["resareas"])):
gdf = gdf.loc[np.isin(res_id, ds_out["resareas"])]
nskipped = res_id.size - gdf.index.size
res_id = gdf["waterbody_id"].values
logger.warning(
f"{nskipped} reservoirs are not succesfully rasterized and skipped!!"
" Consider increasing the lakes min_area threshold."
)
# Initialize the waterbody outlet map
ds_out["reslocs"] = xr.full_like(ds_out["resareas"], -999)
# If an upstream area map is present in the model, gets outlets coordinates using/
# the maximum uparea in each waterbody mask to match model river network.
if uparea_name is not None and uparea_name in ds_like.data_vars:
logger.debug(f"Setting {wb_type} outlet map based maximum upstream area.")
# create dataframe with x and y coord to be filled in either from uparea or from
# xout and yout in hydrolakes data
outdf = gdf[["waterbody_id"]].assign(xout=np.nan, yout=np.nan)
ydim = ds_like.raster.y_dim
xdim = ds_like.raster.x_dim
for i in res_id:
res_acc = ds_like[uparea_name].where(ds_out["resareas"] == i).load()
max_res_acc = res_acc.where(res_acc == res_acc.max(), drop=True).squeeze()
yacc = max_res_acc[ydim].values
xacc = max_res_acc[xdim].values
ds_out["reslocs"].loc[{f"{ydim}": yacc, f"{xdim}": xacc}] = i
outdf.loc[outdf.waterbody_id == i, "xout"] = xacc
outdf.loc[outdf.waterbody_id == i, "yout"] = yacc
outgdf = gp.GeoDataFrame(
outdf, geometry=gp.points_from_xy(outdf.xout, outdf.yout)
)
# ELse use coordinates from the waterbody database
elif "xout" in gdf.columns and "yout" in gdf.columns:
logger.debug(f"Setting {wb_type} outlet map based on coordinates.")
outdf = gdf[["waterbody_id", "xout", "yout"]]
outgdf = gp.GeoDataFrame(
outdf, geometry=gp.points_from_xy(outdf.xout, outdf.yout)
)
ds_out["reslocs"] = ds_like.raster.rasterize(
outgdf, col_name="waterbody_id", nodata=-999
)
# Else outlet map is equal to areas mask map
else:
ds_out["reslocs"] = ds_out["resareas"]
logger.warning(
f"Neither upstream area map nor {wb_type}'s outlet coordinates found. "
f"Setting {wb_type} outlet map equal to the area map."
)
# dummy outgdf
outgdf = gdf[["waterbody_id"]]
ds_out["reslocs"].attrs.update(_FillValue=-999)
# fix dtypes
ds_out["reslocs"] = ds_out["reslocs"].astype("int32")
ds_out["resareas"] = ds_out["resareas"].astype("float32")
if wb_type == "lake":
ds_out = ds_out.rename({"resareas": "lakeareas", "reslocs": "lakelocs"})
return ds_out, outgdf
[docs]
def reservoirattrs(gdf, timeseries_fn=None, perc_norm=50, perc_min=20, logger=logger):
"""Return reservoir attributes (see list below) needed for modelling.
When specified, some of the reservoir attributes can be derived from \
earth observation data.
Two options are currently available: 1. Global Water Watch data (Deltares, 2022) \
using gwwapi and 2. JRC (Peker, 2016) using hydroengine.
The following reservoir attributes are calculated:
- resmaxvolume : reservoir maximum volume [m3]
- resarea : reservoir area [m2]
- resdemand : reservoir demand flow [m3/s]
- resmaxrelease : reservoir maximum release flow [m3/s]
- resfullfrac : reservoir targeted full volume fraction [m3/m3]
- resminfrac : reservoir targeted minimum volume fraction [m3/m3]
Parameters
----------
gdf : geopandas.GeoDataFrame
GeoDataFrame containing reservoirs geometries and attributes.
timeseries_fn : str, optional
Name of database from which time series of reservoir surface water area \
will be retrieved.
Currently available: ['jrc', 'gww']
Defaults to Deltares' Global Water Watch database.
perc_norm : int, optional
Percentile for normal (operational) surface area
perc_min: int, optional
Percentile for minimal (operational) surface area
Returns
-------
df_out : pandas.DataFrame
DataFrame containing reservoir attributes.
df_plot : pandas.DataFrame
DataFrame containing debugging values for reservoir building.
df_ts : pandas.DataFrame
DataFrame containing all downloaded reservoir time series.
"""
if timeseries_fn == "jrc":
try:
import hydroengine as he
logger.info("Using reservoir timeseries from JRC via hydroengine.")
except Exception:
raise ImportError(
"hydroengine package not found, cannot download jrc \
reservoir timeseries."
)
elif timeseries_fn == "gww":
try:
from gwwapi import client as cli
from gwwapi import utils
logger.info("Using reservoir timeseries from GWW via gwwapi.")
except Exception:
raise ImportError(
"gwwapi package not found, cannot download gww reservoir timeseries."
)
elif timeseries_fn is not None:
raise ValueError(
f"timeseries_fn argument {timeseries_fn} not understood, \
please use one of [gww, jrc] or None."
)
else:
logger.debug(
"Using reservoir attributes from reservoir database to compute parameters."
)
# Initialize output DataFrame with empty values and reservoir ID
df_out = pd.DataFrame(
index=range(len(gdf["waterbody_id"])),
columns=(
[
"resid",
"resmaxvolume",
"resarea",
"resdemand",
"resmaxrelease",
"resfullfrac",
"resminfrac",
]
),
)
df_out["resid"] = gdf["waterbody_id"].values
# Create similar dataframe for EO time series
df_EO = pd.DataFrame(
index=range(len(gdf["waterbody_id"])),
columns=(["resid", "maxarea", "normarea", "minarea", "capmin", "capmax"]),
)
df_EO["resid"] = gdf["waterbody_id"].values
# Create dtaframe for accuracy plots
df_plot = pd.DataFrame(
index=range(len(gdf["waterbody_id"])),
columns=(["resid", "factor", "accuracy_min", "accuracy_norm"]),
)
df_plot["resid"] = gdf["waterbody_id"].values
# Create empty dataframe for timeseries exports
df_ts = pd.DataFrame()
# Get EO data for each reservoir
if timeseries_fn == "jrc":
for i in range(len(gdf["waterbody_id"])):
ids = str(gdf["waterbody_id"].iloc[i])
try:
logger.debug(f"Downloading HydroEngine time series for reservoir {ids}")
time_series = he.get_lake_time_series(
int(gdf["Hylak_id"].iloc[i]), "water_area"
)
# Append series to df_ts which will contain all time series
# of all dowloaded reservoirs, with an outer join on the datetime index
ts_index = pd.to_datetime([k * 1000000 for k in time_series["time"]])
ts_values = time_series["water_area"]
ts_series = pd.Series(
data=ts_values, index=ts_index, name=int(gdf["Hylak_id"].iloc[i])
)
ts_series = ts_series[ts_series > 0].dropna()
df_ts = pd.concat([df_ts, ts_series], join="outer", axis=1)
# Save area stats
area_series = np.array(time_series["water_area"]) # [m2]
area_series_nozeros = area_series[area_series > 0]
df_EO.loc[i, "maxarea"] = area_series_nozeros.max()
df_EO.loc[i, "normarea"] = np.percentile(
area_series_nozeros, perc_norm, axis=0
)
df_EO.loc[i, "minarea"] = np.percentile(
area_series_nozeros, perc_min, axis=0
)
except Exception:
logger.warning(
f"No HydroEngine time series available for reservoir {ids}!"
)
if timeseries_fn == "gww":
# get bounds from gdf input as JSON object that can be used in
# post request with the gww api
gdf_bounds = json.dumps(
shapely.geometry.box(*gdf.total_bounds, ccw=True).__geo_interface__
)
# get reservoirs wihtin these bounds
gww_reservoirs = cli.get_reservoirs_by_geom(gdf_bounds)
# from the response, create a dictonary, linking the gww_id to the hylak_id
# (used in the default reservoir database)
idlink = {
k["properties"]["source_id"]: k["id"] for k in gww_reservoirs["features"]
}
for i in range(len(gdf["waterbody_id"])):
ids = str(gdf["waterbody_id"].iloc[i])
try:
logger.debug(
f"Downloading Global Water Watch time series for reservoir {ids}"
)
time_series = cli.get_reservoir_ts_monthly(
idlink[int(gdf["Hylak_id"].iloc[i])]
)
# Append series to df_ts which will contain all time series
# of all dowloaded reservoirs, with an outer join on the datetime index
ts_series = utils.to_timeseries(
time_series, name=f'{int(gdf["Hylak_id"].iloc[i])}'
).drop_duplicates()
df_ts = pd.concat([df_ts, ts_series], join="outer", axis=1)
# Compute stats
area_series = utils.to_timeseries(time_series)["area"].to_numpy()
area_series_nozeros = area_series[area_series > 0]
df_EO.loc[i, "maxarea"] = area_series_nozeros.max()
df_EO.loc[i, "normarea"] = np.percentile(
area_series_nozeros, perc_norm, axis=0
)
df_EO.loc[i, "minarea"] = np.percentile(
area_series_nozeros, perc_min, axis=0
)
except Exception:
logger.warning(f"No GWW time series available for reservoir {ids}!")
# Sort timeseries dataframe (will be saved to root as .csv later)
df_ts = df_ts.sort_index()
# Compute resdemand and resmaxrelease either from average discharge
if "Dis_avg" in gdf.columns:
df_out["resdemand"] = gdf["Dis_avg"].values * 0.5
df_out["resmaxrelease"] = gdf["Dis_avg"].values * 4.0
# Get resarea either from EO or database depending
if "Area_avg" in gdf.columns:
df_out["resarea"] = gdf["Area_avg"].values
if timeseries_fn is not None:
df_out.loc[pd.notna(df_EO["maxarea"]), "resarea"] = df_EO["maxarea"][
pd.notna(df_EO["maxarea"])
].values
else:
df_out.loc[pd.isna(df_out["resarea"]), "resarea"] = df_EO["maxarea"][
pd.isna(df_out["resarea"])
].values
else:
df_out["resarea"] = df_EO["maxarea"].values
# Get resmaxvolume from database
if "Vol_avg" in gdf.columns:
df_out["resmaxvolume"] = gdf["Vol_avg"].values
# Compute target min and max fractions
# First look if data is available from the database
if "Capacity_min" in gdf.columns:
df_out["resminfrac"] = gdf["Capacity_min"].values / df_out["resmaxvolume"]
df_plot["accuracy_min"] = np.repeat(1.0, len(df_plot["accuracy_min"]))
if "Capacity_norm" in gdf.columns:
df_out["resfullfrac"] = gdf["Capacity_norm"].values / df_out["resmaxvolume"]
df_plot["accuracy_norm"] = np.repeat(1.0, len(df_plot["accuracy_norm"]))
# Then compute from EO data and fill or replace the previous values
# (if a valid source is provided)
# TODO for now assumes that the reservoir-db is used
# (combination of GRanD and HydroLAKES)
gdf = gdf.fillna(value=np.nan)
for i in range(len(gdf["waterbody_id"])):
# Initialise values
# import pdb; pdb.set_trace()
dam_height = np.nanmax([gdf["Dam_height"].iloc[i], 0.0])
max_level = np.nanmax([gdf["Depth_avg"].iloc[i], 0.0])
max_area = np.nanmax([df_out["resarea"].iloc[i], 0.0])
max_cap = np.nanmax([df_out["resmaxvolume"].iloc[i], 0.0])
norm_area = np.nanmax([df_EO["normarea"].iloc[i], 0.0])
if "Capacity_norm" in gdf.columns:
norm_cap = np.nanmax([gdf["Capacity_norm"].iloc[i], 0.0])
else:
norm_cap = 0.0
min_area = np.nanmax([df_EO["minarea"].iloc[i], 0.0])
if "Capacity_min" in gdf.columns:
min_cap = np.nanmax([gdf["Capacity_min"].iloc[i], 0.0])
else:
min_cap = 0.0
mv = 0.0
# Maximum level
# a validation has shown that GRanD dam height is a more reliable value than
# HydroLAKES depth average (when it is a larger value)
if dam_height > max_level:
max_level_f = dam_height
factor_shape = dam_height / max_level
logger.info(
"GRanD dam height used as max. level instead of HydroLAKES depth "
f"average. Difference factor: {round(factor_shape, 2):.4f}"
)
else:
max_level_f = max_level
factor_shape = 1.0
# coefficient for linear relationship
lin_coeff = max_area / max_level_f # [m2/m]
# # adjust factor based on chosen method
# if method == 0:
# factor_used = factor_shape
# elif method == 1:
# factor_used = 1.0
factor_used = 1.0
# Operational (norm) level
if norm_cap != mv and norm_area != mv:
norm_level = factor_used * (norm_cap / norm_area) # [m]
norm_area_f = norm_area
norm_cap_f = norm_cap
accuracy_norm = 1
elif norm_cap != mv:
norm_level = (norm_cap / lin_coeff) ** (1 / 2) # [m]
norm_area_f = norm_cap / norm_level # [m]
norm_cap_f = norm_cap
accuracy_norm = 1
elif norm_area != mv:
norm_level = norm_area / lin_coeff # [m]
norm_area_f = norm_area
norm_cap_f = norm_area * norm_level # [m3]
accuracy_norm = 2
else:
# the calculation is based on the max area (and not max level or max
# capacity) as it is the most reliable value
norm_area_f = max_area * 0.666 # [m]
norm_level = norm_area_f / lin_coeff # [m]
norm_cap_f = norm_area_f * norm_level # [m3]
accuracy_norm = 3
# CHECK norm level (1)
if accuracy_norm == 1 and norm_level > max_level_f:
# it is assumed that the norm capacity value is not reliable, so this value
# is delated and the linear relationship assumption is introduced
norm_level = norm_area_f / lin_coeff # [m]
norm_cap_f = norm_area_f * norm_level # [m3]
accuracy_norm = 21
elif accuracy_norm == 2 and norm_level > max_level_f:
norm_area_f = max_area * 0.666 # [m]
norm_level = norm_area_f / lin_coeff # [m]
norm_cap_f = norm_area_f * norm_level # [m3]
accuracy_norm = 31
# Minimum level
if min_area != mv and min_cap != mv:
min_level = min_cap / min_area # [m]
min_area_f = min_area
min_cap_f = min_cap
accuracy_min = 1
elif min_cap != mv:
min_level = (min_cap / lin_coeff) ** (1 / 2) # [m]
min_area_f = min_cap / min_level # [m]
min_cap_f = min_cap # [m3]
accuracy_min = 1
elif min_area != mv:
min_level = min_area / lin_coeff # [m]
min_area_f = min_area # [m]
min_cap_f = min_area * min_level # [m3]
accuracy_min = 2
else:
# the calculation is based on the max area (and not max level or max
# capacity) as it is the most reliable value
min_area_f = max_area * 0.333 # [m]
min_level = min_area_f / lin_coeff # [m]
min_cap_f = min_area_f * min_level # [m3]
accuracy_min = 3
# CHECK minumum level (1)
if accuracy_min == 1 and min_level > norm_level:
accuracy_min = 21
min_level = min_area_f / lin_coeff # [m]
min_cap_f = min_area_f * min_level # [m3]
elif accuracy_min == 2 and min_level > norm_level:
accuracy_min = 31
min_area_f = max_area * 0.333 # [m]
min_level = min_area_f / lin_coeff # [m]
min_cap_f = (min_area_f * min_level) / 100 # [m3]
elif min_level > norm_level:
min_area_f = norm_area_f * 0.45 # [m]
min_level = min_area_f / lin_coeff # [m]
min_cap_f = min_area_f * min_level # [m3]
accuracy_min = 4
# CHECK norm level (2)
if norm_cap_f > max_cap:
logger.warning("norm_cap > max_cap! setting norm_cap equal to max_cap.")
norm_cap_f = max_cap
accuracy_norm = 5
# CHECK minumum level (2)
if min_cap_f > norm_cap_f:
logger.warning("min_cap > norm_cap! setting min_cap equal to norm_cap.")
min_cap_f = norm_cap_f
accuracy_min = 5
# Resume results
df_EO.loc[i, "capmin"] = min_cap_f
df_EO.loc[i, "capmax"] = norm_cap_f
df_plot.loc[i, "factor"] = factor_shape
df_plot.loc[i, "accuracy_min"] = accuracy_min
df_plot.loc[i, "accuracy_norm"] = accuracy_norm
# Depending on priority EO update fullfrac and min frac
if timeseries_fn is not None:
df_out.resminfrac = df_EO["capmin"].values / df_out["resmaxvolume"].values
df_out.resfullfrac = df_EO["capmax"].values / df_out["resmaxvolume"].values
else:
df_out.loc[pd.isna(df_out["resminfrac"]), "resminfrac"] = (
df_EO.loc[pd.isna(df_out["resminfrac"]), "capmin"].values
/ df_out.loc[pd.isna(df_out["resminfrac"]), "resmaxvolume"].values
)
df_out.loc[pd.isna(df_out["resfullfrac"]), "resfullfrac"] = (
df_EO.loc[pd.isna(df_out["resfullfrac"]), "capmax"].values
/ df_out.loc[pd.isna(df_out["resfullfrac"]), "resmaxvolume"].values
)
return df_out, df_plot, df_ts
[docs]
def lakeattrs(
ds: xr.Dataset,
gdf: gp.GeoDataFrame,
rating_dict: dict = dict(),
add_maxstorage: bool = False,
logger=logger,
):
"""
Return lake attributes (see list below) needed for modelling.
If rating_dict is not empty, prepares also rating tables for wflow.
The following reservoir attributes are calculated:
- waterbody_id : waterbody id
- LakeArea : lake area [m2]
- LakeAvgLevel: lake average level [m]
- LakeAvgOut: lake average outflow [m3/s]
- Lake_b: lake rating curve coefficient [-]
- Lake_e: lake rating curve exponent [-]
- LakeStorFunc: option to compute storage curve [-]
- LakeOutflowFunc: option to compute rating curve [-]
- LakeThreshold: minimium threshold for lake outflow [m]
- LinkedLakeLocs: id of linked lake location if any
- LakeMaxStorage: maximum storage [m3] (optional)
Parameters
----------
ds : xr.Dataset
Dataset containing the lake locations and area
gdf : gp.GeoDataFrame
GeoDataFrame containing the lake locations and area
rating_dict : dict, optional
Dictionary containing the rating curve parameters, by default dict()
add_maxstorage : bool, optional
If True, adds the maximum storage to the output, by default False
Returns
-------
ds : xr.Dataset
Dataset containing the lake locations with the attributes
gdf : gp.GeoDataFrame
GeoDataFrame containing the lake locations with the attributes
rating_curves : dict
Dictionary containing the rating curves in wflow format
"""
# rename to param values
gdf = gdf.rename(
columns={
"Area_avg": "LakeArea",
"Depth_avg": "LakeAvgLevel",
"Dis_avg": "LakeAvgOut",
}
)
# Add maximum volume / no filling of NaNs as assumes then
# natural lake and not controlled
if add_maxstorage:
if "Vol_max" in gdf.columns:
gdf = gdf.rename(columns={"Vol_max": "LakeMaxStorage"})
else:
logger.warning(
"No maximum storage 'Vol_max' column found, \
skip adding LakeMaxStorage map."
)
# Minimum value for LakeAvgOut
LakeAvgOut = gdf["LakeAvgOut"].copy()
gdf["LakeAvgOut"] = np.maximum(gdf["LakeAvgOut"], 0.01)
if "Lake_b" not in gdf.columns:
gdf["Lake_b"] = gdf["LakeAvgOut"].values / (gdf["LakeAvgLevel"].values) ** (2)
if "Lake_e" not in gdf.columns:
gdf["Lake_e"] = 2
if "LakeThreshold" not in gdf.columns:
gdf["LakeThreshold"] = 0.0
if "LinkedLakeLocs" not in gdf.columns:
gdf["LinkedLakeLocs"] = 0
if "LakeStorFunc" not in gdf.columns:
gdf["LakeStorFunc"] = 1
if "LakeOutflowFunc" not in gdf.columns:
gdf["LakeOutflowFunc"] = 3
# Check if some LakeAvgOut values have been replaced
if not np.all(LakeAvgOut == gdf["LakeAvgOut"]):
logger.warning(
"Some values of LakeAvgOut have been replaced by \
a minimum value of 0.01m3/s"
)
# Check if rating curve is provided
rating_curves = dict()
if len(rating_dict) != 0:
# Assume one rating curve per lake index
for id in gdf["waterbody_id"].values:
id = int(id)
if id in rating_dict.keys():
df_rate = rating_dict[id]
# Prepare the right tables for wflow
# Update LakeStor and LakeOutflowFunc
# Storage
if "volume" in df_rate.columns:
gdf.loc[gdf["waterbody_id"] == id, "LakeStorFunc"] = 2
df_stor = df_rate[["elevtn", "volume"]].dropna(
subset=["elevtn", "volume"]
)
df_stor.rename(columns={"elevtn": "H", "volume": "S"}, inplace=True)
# add to rating_curves
rating_curves[f"lake_sh_{id}"] = df_stor
else:
logger.warning(
f"Storage data not available for lake {id}. Using default S=AH"
)
# Rating
if "discharge" in df_rate.columns:
gdf.loc[gdf["waterbody_id"] == id, "LakeOutflowFunc"] = 1
df_rate = df_rate[["elevtn", "discharge"]].dropna(
subset=["elevtn", "discharge"]
)
df_rate.rename(
columns={"elevtn": "H", "discharge": "Q"},
inplace=True,
)
# Repeat Q for the 365 JDOY
df_q = pd.concat([df_rate.Q] * (366), axis=1, ignore_index=True)
df_q[0] = df_rate["H"]
df_q.rename(columns={0: "H"}, inplace=True)
# add to rating_curves
rating_curves[f"lake_hq_{id}"] = df_q
else:
logger.warning(
f"Rating data not available for lake {id}. \
Using default Modified Puls Approach"
)
# Create raster of lake params
lake_params = [
"waterbody_id",
"LakeArea",
"LakeAvgLevel",
"LakeAvgOut",
"Lake_b",
"Lake_e",
"LakeStorFunc",
"LakeOutflowFunc",
"LakeThreshold",
"LinkedLakeLocs",
]
if "LakeMaxStorage" in gdf.columns:
lake_params.append("LakeMaxStorage")
gdf_org_points = gp.GeoDataFrame(
gdf[lake_params],
geometry=gp.points_from_xy(gdf.xout, gdf.yout),
)
for name in lake_params[1:]:
da_lake = ds.raster.rasterize(
gdf_org_points, col_name=name, dtype="float32", nodata=-999
)
ds[name] = da_lake
return ds, gdf, rating_curves