"""Workflows to derive river for a wflow model."""
import logging
from os.path import join
import geopandas as gpd
import numpy as np
import pandas as pd
import xarray as xr
from hydromt import stats
from hydromt.gis import _gis_utils, flw
from hydromt.gis._raster_utils import _reggrid_area
from hydromt.gis._vector_utils import _nearest
from hydromt.model.processes.rivers import river_depth
from scipy.optimize import curve_fit
from hydromt_wflow.utils import DATADIR # global var
logger = logging.getLogger(__name__)
__all__ = ["river", "river_bathymetry", "river_width", "river_floodplain_volume"]
RESAMPLING = {"climate": "nearest", "precip": "average"}
NODATA = {"discharge": -9999}
def power_law(x, a, b):
return a * np.power(x, b)
[docs]
def river(
ds,
ds_model=None,
river_upa=30.0,
slope_len=2e3,
min_rivlen_ratio=0.0,
channel_dir="up",
logger=logger,
):
"""Return river maps.
The output maps are:
- rivmsk : river mask based on upstream area threshold on upstream area
- rivlen : river length [m]
- rivslp : smoothed river slope [m/m]
- rivzs : elevation of the river bankfull height based on pixel outlet
- rivwth : river width at pixel outlet (if in ds)
- qbankfull : bankfull discharge at pixel outlet (if in ds)
Parameters
----------
ds: xr.Dataset
hydrography dataset containing "flwdir", "uparea", "elevtn" variables;
and optional "rivwth" and "qbankfull" variable
ds_model: xr.Dataset, optional
model dataset with output grid, must contain "uparea", for subgrid rivlen/slp
must contain "x_out", "y_out". If None, ds is assumed to be the model grid
river_upa: float
minimum threshold to define river cell & pixels, by default 30 [km2]
slope_len: float
minimum length over which to calculate the river slope, by default 1000 [m]
min_rivlen_ratio: float
minimum global river length to avg. Cell resolution ratio used as threshold
in window based smoothing of river length, by default 0.0.
The smoothing is skipped if min_riverlen_ratio = 0.
channel_dir: {"up", "down"}
flow direction in which to calculate (subgrid) river length and width
Returns
-------
ds_out: xr.Dataset
Dataset with output river attributes
"""
# check data variables.
dvars = ["flwdir", "uparea", "elevtn"]
dvars_model = ["flwdir", "uparea"]
if not np.all([v in ds for v in dvars]):
raise ValueError(f"One or more variables missing from ds: {dvars}.")
if ds_model is not None and not np.all([v in ds_model for v in dvars_model]):
raise ValueError(f"One or more variables missing from ds_model: {dvars_model}")
# sort subgrid
subgrid = True
if ds_model is None or not np.all([v in ds_model for v in ["x_out", "y_out"]]):
subgrid = False
ds_model = ds
logger.info("River length and slope are calculated at model resolution.")
## river mask and flow directions at model grid
logger.debug(f"Set river mask (min uparea: {river_upa} km2) and prepare flow dirs.")
riv_mask = ds_model["uparea"] > river_upa # initial mask
mod_mask = ds_model["uparea"] != ds_model["uparea"].raster.nodata
## (high res) flwdir and outlet indices
riv_mask_org = ds["uparea"].values >= river_upa # highres riv mask
flwdir = flw.flwdir_from_da(ds["flwdir"], mask=True)
if subgrid == False:
# get cell index of river cells
idxs_out = np.arange(ds_model.raster.size).reshape(ds_model.raster.shape)
idxs_out = np.where(mod_mask, idxs_out, flwdir._mv)
else:
# get subgrid outlet pixel index
idxs_out = ds.raster.xy_to_idx(
xs=ds_model["x_out"].values,
ys=ds_model["y_out"].values,
mask=mod_mask.values,
nodata=flwdir._mv,
)
## river length
# get river length based on on-between distance between two outlet pixels
logger.debug("Derive river length.")
rivlen = flwdir.subgrid_rivlen(
idxs_out=idxs_out,
mask=riv_mask_org,
direction=channel_dir,
unit="m",
)
xres, yres = ds_model.raster.res
if ds_model.raster.crs.is_geographic: # convert degree to meters
lat_avg = ds_model.raster.ycoords.values.mean()
xres, yres = _gis_utils._cellres(lat_avg, xres, yres)
rivlen = np.where(riv_mask.values, rivlen, -9999)
# set mean length at most downstream (if channel_dir=down) or upstream
# (if channel_dir=up) river lengths
if np.any(rivlen == 0):
rivlen[rivlen == 0] = np.mean(rivlen[rivlen > 0])
# smooth river length based on minimum river length
if min_rivlen_ratio > 0 and hasattr(flwdir, "smooth_rivlen"):
res = np.mean(np.abs([xres, yres]))
min_len = res * min_rivlen_ratio
flwdir_model = flw.flwdir_from_da(ds_model["flwdir"], mask=riv_mask)
rivlen2 = flwdir_model.smooth_rivlen(rivlen, min_len, nodata=-9999)
min_len2 = rivlen2[riv_mask].min()
pmod = (rivlen != rivlen2).sum() / riv_mask.sum() * 100
logger.debug(
f"River length smoothed (min length: {min_len2:.0f} m; \
cells modified: {pmod:.1f})%."
)
rivlen = rivlen2
elif min_rivlen_ratio > 0:
logger.warning(
"River length smoothing skipped as it requires newer version of pyflwdir."
)
## river slope as derivative of elevation around outlet pixels
logger.debug("Derive river slope.")
rivslp = flwdir.subgrid_rivslp(
idxs_out=idxs_out,
elevtn=ds["elevtn"].values,
mask=riv_mask_org,
length=slope_len,
)
rivslp = np.where(riv_mask.values, rivslp, -9999)
# create xarray dataset for all river variables
ds_out = xr.Dataset(coords=ds_model.raster.coords)
dims = ds_model.raster.dims
# save as uint8 as bool is not supported in nc and tif files
ds_out["rivmsk"] = riv_mask.astype(np.uint8)
ds_out["rivmsk"].raster.set_nodata(0)
attrs = dict(_FillValue=-9999, unit="m")
ds_out["rivlen"] = xr.Variable(dims, rivlen, attrs=attrs)
attrs = dict(_FillValue=-9999, unit="m.m-1")
ds_out["rivslp"] = xr.Variable(dims, rivslp, attrs=attrs)
for name in ["rivwth", "qbankfull"]:
if name in ds:
logger.debug(f"Derive {name} from hydrography dataset.")
data = np.full_like(riv_mask, -9999, dtype=np.float32)
data0 = ds[name].values.flat[idxs_out[riv_mask.values]]
data[riv_mask.values] = np.where(data0 > 0, data0, -9999)
ds_out[name] = xr.Variable(dims, data, attrs=dict(_FillValue=-9999))
return ds_out, flwdir
[docs]
def river_bathymetry(
ds_model: xr.Dataset,
gdf_riv: gpd.GeoDataFrame,
method: str = "powlaw",
smooth_len: float = 5e3,
min_rivdph: float = 1.0,
min_rivwth: float = 30.0,
logger=logger,
) -> xr.Dataset:
"""Get river width and bankfull discharge.
From `gdf_riv` to estimate river depth
using :py:meth:`hydromt.workflows.river_depth`. Missing values in rivwth are first
filled using downward filling and remaining (upstream) missing values are set
to min_rivwth (for rivwth) and 0 (for qbankfull).
Parameters
----------
ds_model : xr.Dataset
Model dataset with 'flwdir', 'rivmsk', 'rivlen', 'x_out' and 'y_out' variables.
gdf_riv : gpd.GeoDataFrame
River geometry with 'rivwth' and 'rivdph' or 'qbankfull' columns.
method : {'gvf', 'manning', 'powlaw'}
see py:meth:`hydromt.workflows.river_depth` for details, by default "powlaw"
smooth_len : float, optional
Length [m] over which to smooth the output river width and depth, by default 5e3
min_rivdph : float, optional
Minimum river depth [m], by default 1.0
min_rivwth : float, optional
Minimum river width [m], by default 30.0
Returns
-------
xr.Dataset
Dataset with 'rivwth' and 'rivdph' variables
"""
dims = ds_model.raster.dims
# check data variables.
dvars_model = ["flwdir", "rivmsk", "rivlen"]
if method == "manning":
dvars_model += ["rivslp"]
elif method == "gvf":
dvars_model += ["rivslp", "subelv"]
if not np.all([v in ds_model for v in dvars_model]):
raise ValueError(f"One or more variables missing from ds_model: {dvars_model}")
# setup flow direction for river
riv_mask = ds_model["rivmsk"].values == 1
flwdir_river = flw.flwdir_from_da(ds_model["flwdir"], mask=riv_mask)
rivlen_avg = ds_model["rivlen"].values[riv_mask].mean()
## river width and bankfull discharge
vars0 = ["rivwth", "rivdph", "qbankfull"]
# find nearest values from river shape if provided
# if None assume the data is in ds_model
if gdf_riv is not None:
vars = [c for c in vars0 if c in gdf_riv.columns]
if len(vars) == 0:
raise ValueError(f" columns {vars0} not found in gdf_riv")
logger.debug(f"Derive {vars} from shapefile.")
if "x_out" in ds_model and "y_out" in ds_model:
# get subgrid outlet pixel index and coordinates
xs_out = ds_model["x_out"].values[riv_mask]
ys_out = ds_model["y_out"].values[riv_mask]
else:
# get river cell coordinates
row, col = np.where(riv_mask)
xs_out = ds_model.raster.xcoords.values[col]
ys_out = ds_model.raster.ycoords.values[row]
gdf_out = gpd.GeoDataFrame(
geometry=gpd.points_from_xy(xs_out, ys_out), crs=ds_model.raster.crs
)
idx_nn, dst_nn = _nearest(gdf_out, gdf_riv)
# get valid river data within max half pixel distance
xres, yres = ds_model.raster.res
if ds_model.raster.crs.is_geographic: # convert degree to meters
lat_avg = ds_model.raster.ycoords.values.mean()
xres, yres = _gis_utils._cellres(lat_avg, xres, yres)
max_dist = np.mean(np.abs([xres, yres])) / 2.0
nriv, nsnap = xs_out.size, int(np.sum(dst_nn < max_dist))
logger.debug(
f"Valid for {nsnap}/{nriv} river cells (max dist: {max_dist:.0f} m)."
)
for name in vars:
data = np.full_like(riv_mask, -9999, dtype=np.float32)
data[riv_mask] = np.where(
dst_nn < max_dist, gdf_riv.loc[idx_nn, name].fillna(-9999).values, -9999
)
ds_model[name] = xr.Variable(dims, data, attrs=dict(_FillValue=-9999))
else:
vars = vars0
assert "rivwth" in ds_model
assert "qbankfull" in ds_model or "rivdph" in ds_model
# fill gaps in data using downward filling along flow directions
for name in vars:
data = ds_model[name].values
nodata = ds_model[name].raster.nodata
if np.all(data[riv_mask] != nodata):
continue
data = flwdir_river.fillnodata(data, nodata, direction="down", how="max")
ds_model[name].values = np.maximum(0, data)
# smooth by averaging along flow directions and set minimum
if smooth_len > 0:
nsmooth = min(1, int(round(smooth_len / rivlen_avg / 2)))
kwgs = dict(n=nsmooth, restrict_strord=True)
ds_model["rivwth"].values = flwdir_river.moving_average(
ds_model["rivwth"].values, nodata=ds_model["rivwth"].raster.nodata, **kwgs
)
ds_model["rivwth"] = np.maximum(min_rivwth, ds_model["rivwth"]).where(
riv_mask, ds_model["rivwth"].raster.nodata
)
## river depth
if "rivdph" not in ds_model:
# distance to outlet; required for manning and gvf rivdph methods
if method != "powlaw" and "rivdst" not in ds_model:
rivlen = ds_model["rivlen"].values
nodata = ds_model["rivlen"].raster.nodata
rivdst = flwdir_river.accuflux(rivlen, nodata=nodata, direction="down")
ds_model["rivdst"] = xr.Variable(
dims, rivdst, attrs=dict(_FillValue=nodata)
)
# add river distance to outlet -> required for manning/gvf method
rivdph = river_depth(
data=ds_model,
flwdir=flwdir_river,
method=method,
min_rivdph=min_rivdph,
rivzs_name="subelv",
)
attrs = dict(_FillValue=-9999, unit="m")
ds_model["rivdph"] = xr.Variable(dims, rivdph, attrs=attrs).fillna(-9999)
# smooth by averaging along flow directions and set minimum
if smooth_len > 0:
ds_model["rivdph"].values = flwdir_river.moving_average(
ds_model["rivdph"].values, nodata=-9999, **kwgs
)
ds_model["rivdph"] = np.maximum(min_rivdph, ds_model["rivdph"]).where(
riv_mask, -9999
)
return ds_model[["rivwth", "rivdph"]]
def river_floodplain_volume(
ds,
ds_model,
river_upa=30,
flood_depths=[0.5, 1.0, 1.5, 2.0, 2.5],
dtype=np.float64,
logger=logger,
):
"""Calculate floodplain volume at given flood depths based on (subgrid) HAND map.
Parameters
----------
ds: xr.Dataset
hydrography dataset containing "flwdir", "uparea", "elevtn" variables;
ds_model: xr.Dataset, optional
Model dataset with output grid, must contain "rivmsk", "rivwth", "rivlen"
for subgrid must contain "x_out", "y_out".
river_upa: float
minimum threshold to define the river when calculating HAND, by default 30 [km2]
flood_depths : list of float, optional
flood depths at which a volume is derived, by default [0.5,1.0,1.5,2.0,2.5]
dtype: numpy.dtype, optional
output dtype
Returns
-------
da_out : xr.DataArray with dims (flood_depth, y, x)
Floodplain volume [m3]
"""
if not ds.raster.crs == ds_model.raster.crs:
raise ValueError("Hydrography dataset CRS does not match model CRS")
# check data variables.
dvars = ["flwdir", "elevtn", "uparea"]
dvars_model = ["rivmsk", "rivwth", "rivlen"]
if not np.all([v in ds for v in dvars]):
raise ValueError(f"One or more variables missing from ds: {dvars}.")
if not np.all([v in ds_model for v in dvars_model]):
raise ValueError(f"One or more variables missing from ds_model: {dvars_model}")
# initialize flood directions
flwdir = flw.flwdir_from_da(ds["flwdir"], mask=None)
# get river cell coordinates
riv_mask = ds_model["rivmsk"].values > 0
if "x_out" in ds_model and "y_out" in ds_model: # use subgrid
idxs_out = ds.raster.xy_to_idx(
xs=ds_model["x_out"].values,
ys=ds_model["y_out"].values,
mask=riv_mask,
nodata=flwdir._mv,
)
else:
# get cell index of river cells
idxs_out = np.arange(ds_model.raster.size).reshape(ds_model.raster.shape)
idxs_out = np.where(riv_mask, idxs_out, flwdir._mv)
# calculate hand
hand = flwdir.hand(
drain=ds["uparea"].values >= river_upa, elevtn=ds["elevtn"].values
)
# calculate volume at user defined flood depths
# note that the ucat_map is not save currently but is required if
# we want to create a flood map by postprocessing
flood_depths = np.atleast_1d(flood_depths).ravel().astype(dtype) # force 1D
_, ucat_vol = flwdir.ucat_volume(idxs_out=idxs_out, hand=hand, depths=flood_depths)
# force minimum volume based on river width * length
rivlen = np.maximum(ds_model["rivlen"].values, 1) # avoid zero division errors
min_vol = ds_model["rivwth"].values * rivlen * flood_depths[0]
ucat_vol[0, :, :] = np.maximum(ucat_vol[0, :, :], min_vol)
fldwth = ucat_vol[0, :, :] / rivlen / flood_depths[0]
for i in range(1, ucat_vol.shape[0]):
dh = flood_depths[i] - flood_depths[i - 1]
min_vol = ucat_vol[i - 1, ...] + (fldwth * rivlen * dh)
ucat_vol[i, :, :] = np.maximum(ucat_vol[i, :, :], min_vol)
fldwth = (ucat_vol[i, :, :] - ucat_vol[i - 1, :, :]) / rivlen / dh
# return xarray DataArray
da_out = (
xr.DataArray(
coords={"flood_depth": flood_depths, **ds_model.raster.coords},
dims=("flood_depth", *ds_model.raster.dims),
data=ucat_vol,
)
.reset_coords(drop=True)
.where(ds_model["rivmsk"] > 0, -9999.0)
)
da_out.raster.set_nodata(-9999.0)
da_out.raster.set_crs(ds_model.raster.crs)
# TODO return a second dataset with hand and ucat_map variables for postprocessing
return da_out
# TODO: methods below are redundant after version v0.1.4 and will be removed in
# future versions
def river_width(
ds_like,
flwdir,
data=dict(),
fit=False,
fill=True,
fill_outliers=True,
min_wth=1,
mask_names=[],
predictor="discharge",
rivwth_name="rivwth",
obs_postfix="_obs",
rivmsk_name="rivmsk",
a=None,
b=None,
logger=logger,
**kwargs,
):
"""Calculate river width."""
nopars = a is None or b is None # no manual a, b parameters
fit = fit or (nopars and predictor not in ["discharge"]) # fit power-law on the fly
nowth = f"{rivwth_name}{obs_postfix}" not in ds_like # no observed width
fill = fill and nowth == False # fill datagaps and masked areas (lakes/res) in obs
if nowth and fit:
raise ValueError(
f'Observed rivwth "{rivwth_name}{obs_postfix}" required to fit riverwidths.'
)
elif nowth:
logger.warning(f'Observed rivwth "{rivwth_name}{obs_postfix}" not found.')
# get predictor
logger.debug(f'Deriving predictor "{predictor}" values')
if predictor == "discharge":
values, pars = _discharge(ds_like, flwdir=flwdir, logger=logger, **data)
if fit == False:
a, b = pars
# TODO: check units
elif predictor == "precip":
values = _precip(ds_like, flwdir=flwdir, logger=logger, **data)
else:
if predictor not in ds_like:
raise ValueError(f"required {predictor} variable missing in grid.")
values = ds_like[predictor].values
# read river width observations
if nowth == False:
rivwth_org = ds_like[f"{rivwth_name}{obs_postfix}"].values
nodata = ds_like[f"{rivwth_name}{obs_postfix}"].raster.nodata
rivmsk = rivwth_org != nodata
rivwth_org = np.where(rivmsk, rivwth_org, -9999)
# mask zero and negative riverwidth and waterbodies if present
mask = rivwth_org > min_wth
for name in mask_names:
if name in ds_like:
mask[ds_like[name].values != ds_like[name].raster.nodata] = False
logger.debug(f"{name} masked out in rivwth data.")
else:
logger.warning(f'mask variable "{name}" not found in maps.')
# fit predictor
wth = rivwth_org[mask]
val = values[mask]
if fit:
logger.info(f"Fitting power-law a,b parameters based on {predictor}")
a, b = _width_fit(wth, val, mask, logger=logger)
wsim = power_law(val, a, b)
res = np.abs(wsim - wth)
outliers = np.logical_and(res > 200, (res / wsim) > 1.0)
pout = np.sum(outliers) / outliers.size * 100
# validate
wsim = power_law(val[~outliers], a, b)
nse = stats.skills._nse(wsim, wth[~outliers])
logger.info(
f'Using "width = a*{predictor}^b"; where a={a:.2f}, b={b:.2f} '
f"(nse={nse:.3f}; outliers={pout:.2f}%)"
)
# compute new riverwidth
rivmsk = ds_like[rivmsk_name].values != 0
rivwth_out = np.full(ds_like.raster.shape, -9999.0, dtype=np.float32)
rivwth_out[rivmsk] = np.maximum(min_wth, power_law(values[rivmsk], a, b))
# overwrite rivwth data with original data at valid points
if fill:
if fill_outliers:
mask[mask] = ~outliers
logger.info(f"masking {np.sum(outliers):.0f} outliers")
rivwth_out[mask] = wth[~outliers]
fills = "for missing data" if fill else "globally"
logger.info(f"rivwth set {fills} based on width-{predictor} relationship.")
# return xarray dataarray
attrs = dict(_FillValue=-9999, unit="m")
return xr.DataArray(rivwth_out, dims=ds_like.raster.dims, attrs=attrs)
def _width_fit(
wth,
val,
mask,
p0=[0.15, 0.65],
logger=logger, # rhine uparea based
):
outliers = np.full(np.sum(mask), False, dtype=bool)
a, b = None, None
# check if sufficient data
if np.sum(mask) > 10:
n = 0
# fit while removing outliers.
while n < 20:
if np.sum(mask[mask][~outliers]) > 10:
pars = curve_fit(
f=power_law,
xdata=val[~outliers],
ydata=wth[~outliers],
p0=p0,
bounds=([0.01, 0.01], [100.0, 1.0]),
)[0]
# print(n, pars, np.sum(~outliers))
if np.allclose(p0, pars):
break
# outliers based on absolute and relative diff
wsim = power_law(val, *pars)
res = np.abs(wsim - wth)
outliers = np.logical_and(res > 200, (res / wsim) > 1.0)
if np.sum(outliers) > np.sum(mask) * 0.2: # max 20 % outliers
pars = p0
outliers[:] = False
logger.warning("Fit not successful.")
break
p0 = pars
n += 1
a, b = pars
elif a is None or b is None:
a, b = p0
logger.warning("Insufficient data points, using global parameters.")
return a, b
def _precip(ds_like, flwdir, da_precip, logger=logger):
"""Do simple precipication method."""
precip = (
da_precip.raster.reproject_like(ds_like, method=RESAMPLING["precip"]).values
/ 1e3
) # [m/yr]
precip = np.maximum(precip, 0)
lat, lon = ds_like.raster.ycoords.values, ds_like.raster.xcoords.values
area = _reggrid_area(lat, lon)
# 10 x average flow
accu_precip = flwdir.accuflux(precip * area / (86400 * 365) * 10) # [m3/s]
return accu_precip
def _discharge(ds_like, flwdir, da_precip, da_climate, logger=logger):
"""Do discharge method."""
# read clim classes and regression parameters from data dir
precip_fn = da_precip.name
climate_fn = da_climate.name
fn_regr = join(DATADIR, "rivwth", f"regr_{precip_fn}.csv")
fn_clim = join(DATADIR, "rivwth", f"{climate_fn}.csv")
clim_map = pd.read_csv(fn_clim, index_col="class")
regr_map = pd.read_csv(fn_regr, index_col="source").loc[climate_fn]
regr_map = regr_map.set_index("base_class")
# get overall catchment climate classification (based on mode)
# TODO: convert to xarray method by using scipy.stats.mode in xr.apply_ufuncs
# TODO: reproject climate and mask cells outside basin
np_climate = da_climate.values
basin_climate = np.bincount(np_climate.flatten()).argmax()
# convert detailed climate classification to higher level base class
base_class, class_name = clim_map.loc[basin_climate]
logger.debug(f"Basin base climate classification: {base_class} ({class_name})")
params = regr_map.loc[base_class]
# precipitation (TO-DO: add checks that were present in old version?)
precip = da_precip.raster.reproject_like(
ds_like, method=RESAMPLING["precip"]
).values
# apply scaling factor to make sure accumulated precipitation will stay
# (approximately) the same
scaling_factor_1 = np.round(da_precip.raster.res[0] / ds_like.raster.res[0], 4)
scaling_factor_2 = np.round(da_precip.raster.res[1] / ds_like.raster.res[1], 4)
scaling_factor = (scaling_factor_1 + scaling_factor_2) / 2
precip = precip / scaling_factor**2
# derive cell areas (m2)
lat, lon = ds_like.raster.ycoords.values, ds_like.raster.xcoords.values
areagrid = _reggrid_area(lat, lon) / 1e6
# calculate "local runoff" (note: set missing in precipitation to zero)
runoff = (np.maximum(precip, 0) * params["precip"]) + (areagrid * params["area"])
runoff = np.maximum(runoff, 0) # make sure runoff is not negative
# get discharges
discharge = flwdir.accuflux(runoff, nodata=NODATA["discharge"], direction="up")
return discharge, (params["discharge_a"], params["discharge_b"])