"""Workflows to derive river for a wflow model."""
import logging
from os.path import join
import geopandas as gpd
import numpy as np
import pandas as pd
import xarray as xr
from hydromt import flw, gis_utils, stats, workflows
from scipy.optimize import curve_fit
from hydromt_wflow.utils import DATADIR # global var
logger = logging.getLogger(__name__)
__all__ = ["river", "river_bathymetry", "river_width", "river_floodplain_volume"]
RESAMPLING = {"climate": "nearest", "precip": "average"}
NODATA = {"discharge": -9999}
def power_law(x, a, b):
return a * np.power(x, b)
[docs]
def river(
ds,
ds_model=None,
river_upa=30.0,
slope_len=2e3,
min_rivlen_ratio=0.0,
channel_dir="up",
logger=logger,
):
"""Return river maps.
The output maps are:
- rivmsk : river mask based on upstream area threshold on upstream area
- rivlen : river length [m]
- rivslp : smoothed river slope [m/m]
- rivzs : elevation of the river bankfull height based on pixel outlet
- rivwth : river width at pixel outlet (if in ds)
- qbankfull : bankfull discharge at pixel outlet (if in ds)
Parameters
----------
ds: xr.Dataset
hydrography dataset containing "flwdir", "uparea", "elevtn" variables;
and optional "rivwth" and "qbankfull" variable
ds_model: xr.Dataset, optional
model dataset with output grid, must contain "uparea", for subgrid rivlen/slp
must contain "x_out", "y_out". If None, ds is assumed to be the model grid
river_upa: float
minimum threshold to define river cell & pixels, by default 30 [km2]
slope_len: float
minimum length over which to calculate the river slope, by default 1000 [m]
min_rivlen_ratio: float
minimum global river length to avg. Cell resolution ratio used as threshold
in window based smoothing of river length, by default 0.0.
The smoothing is skipped if min_riverlen_ratio = 0.
channel_dir: {"up", "down"}
flow direction in which to calculate (subgrid) river length and width
Returns
-------
ds_out: xr.Dataset
Dataset with output river attributes
"""
# check data variables.
dvars = ["flwdir", "uparea", "elevtn"]
dvars_model = ["flwdir", "uparea"]
if not np.all([v in ds for v in dvars]):
raise ValueError(f"One or more variables missing from ds: {dvars}.")
if ds_model is not None and not np.all([v in ds_model for v in dvars_model]):
raise ValueError(f"One or more variables missing from ds_model: {dvars_model}")
# sort subgrid
subgrid = True
if ds_model is None or not np.all([v in ds_model for v in ["x_out", "y_out"]]):
subgrid = False
ds_model = ds
logger.info("River length and slope are calculated at model resolution.")
## river mask and flow directions at model grid
logger.debug(f"Set river mask (min uparea: {river_upa} km2) and prepare flow dirs.")
riv_mask = ds_model["uparea"] > river_upa # initial mask
mod_mask = ds_model["uparea"] != ds_model["uparea"].raster.nodata
## (high res) flwdir and outlet indices
riv_mask_org = ds["uparea"].values >= river_upa # highres riv mask
flwdir = flw.flwdir_from_da(ds["flwdir"], mask=True)
if subgrid == False:
# get cell index of river cells
idxs_out = np.arange(ds_model.raster.size).reshape(ds_model.raster.shape)
idxs_out = np.where(mod_mask, idxs_out, flwdir._mv)
else:
# get subgrid outlet pixel index
idxs_out = ds.raster.xy_to_idx(
xs=ds_model["x_out"].values,
ys=ds_model["y_out"].values,
mask=mod_mask.values,
nodata=flwdir._mv,
)
## river length
# get river length based on on-between distance between two outlet pixels
logger.debug("Derive river length.")
rivlen = flwdir.subgrid_rivlen(
idxs_out=idxs_out,
mask=riv_mask_org,
direction=channel_dir,
unit="m",
)
xres, yres = ds_model.raster.res
if ds_model.raster.crs.is_geographic: # convert degree to meters
lat_avg = ds_model.raster.ycoords.values.mean()
xres, yres = gis_utils.cellres(lat_avg, xres, yres)
rivlen = np.where(riv_mask.values, rivlen, -9999)
# set mean length at most downstream (if channel_dir=down) or upstream
# (if channel_dir=up) river lengths
if np.any(rivlen == 0):
rivlen[rivlen == 0] = np.mean(rivlen[rivlen > 0])
# smooth river length based on minimum river length
if min_rivlen_ratio > 0 and hasattr(flwdir, "smooth_rivlen"):
res = np.mean(np.abs([xres, yres]))
min_len = res * min_rivlen_ratio
flwdir_model = flw.flwdir_from_da(ds_model["flwdir"], mask=riv_mask)
rivlen2 = flwdir_model.smooth_rivlen(rivlen, min_len, nodata=-9999)
min_len2 = rivlen2[riv_mask].min()
pmod = (rivlen != rivlen2).sum() / riv_mask.sum() * 100
logger.debug(
f"River length smoothed (min length: {min_len2:.0f} m; \
cells modified: {pmod:.1f})%."
)
rivlen = rivlen2
elif min_rivlen_ratio > 0:
logger.warning(
"River length smoothing skipped as it requires newer version of pyflwdir."
)
## river slope as derivative of elevation around outlet pixels
logger.debug("Derive river slope.")
rivslp = flwdir.subgrid_rivslp(
idxs_out=idxs_out,
elevtn=ds["elevtn"].values,
mask=riv_mask_org,
length=slope_len,
)
rivslp = np.where(riv_mask.values, rivslp, -9999)
# create xarray dataset for all river variables
ds_out = xr.Dataset(coords=ds_model.raster.coords)
dims = ds_model.raster.dims
# save as uint8 as bool is not supported in nc and tif files
ds_out["rivmsk"] = riv_mask.astype(np.uint8)
ds_out["rivmsk"].raster.set_nodata(0)
attrs = dict(_FillValue=-9999, unit="m")
ds_out["rivlen"] = xr.Variable(dims, rivlen, attrs=attrs)
attrs = dict(_FillValue=-9999, unit="m.m-1")
ds_out["rivslp"] = xr.Variable(dims, rivslp, attrs=attrs)
for name in ["rivwth", "qbankfull"]:
if name in ds:
logger.debug(f"Derive {name} from hydrography dataset.")
data = np.full_like(riv_mask, -9999, dtype=np.float32)
data0 = ds[name].values.flat[idxs_out[riv_mask.values]]
data[riv_mask.values] = np.where(data0 > 0, data0, -9999)
ds_out[name] = xr.Variable(dims, data, attrs=dict(_FillValue=-9999))
return ds_out, flwdir
[docs]
def river_bathymetry(
ds_model: xr.Dataset,
gdf_riv: gpd.GeoDataFrame,
method: str = "powlaw",
smooth_len: float = 5e3,
min_rivdph: float = 1.0,
min_rivwth: float = 30.0,
logger=logger,
**kwargs,
) -> xr.Dataset:
"""Get river width and bankfull discharge.
From `gdf_riv` to estimate river depth
using :py:meth:`hydromt.workflows.river_depth`. Missing values in rivwth are first
filled using downward filling and remaining (upstream) missing values are set
to min_rivwth (for rivwth) and 0 (for qbankfull).
Parameters
----------
ds_model : xr.Dataset
Model dataset with 'flwdir', 'rivmsk', 'rivlen', 'x_out' and 'y_out' variables.
gdf_riv : gpd.GeoDataFrame
River geometry with 'rivwth' and 'rivdph' or 'qbankfull' columns.
method : {'gvf', 'manning', 'powlaw'}
see py:meth:`hydromt.workflows.river_depth` for details, by default "powlaw"
smooth_len : float, optional
Length [m] over which to smooth the output river width and depth, by default 5e3
min_rivdph : float, optional
Minimum river depth [m], by default 1.0
min_rivwth : float, optional
Minimum river width [m], by default 30.0
Returns
-------
xr.Dataset
Dataset with 'rivwth' and 'rivdph' variables
"""
dims = ds_model.raster.dims
# check data variables.
dvars_model = ["flwdir", "rivmsk", "rivlen"]
if method != "powlaw":
dvars_model += ["rivslp", "rivzs"]
if not np.all([v in ds_model for v in dvars_model]):
raise ValueError(f"One or more variables missing from ds_model: {dvars_model}")
# setup flow direction for river
riv_mask = ds_model["rivmsk"].values == 1
flwdir_river = flw.flwdir_from_da(ds_model["flwdir"], mask=riv_mask)
rivlen_avg = ds_model["rivlen"].values[riv_mask].mean()
## river width and bunkfull discharge
vars0 = ["rivwth", "rivdph", "qbankfull"]
# find nearest values from river shape if provided
# if None assume the data is in ds_model
if gdf_riv is not None:
vars = [c for c in vars0 if c in gdf_riv.columns]
if len(vars) == 0:
raise ValueError(f" columns {vars0} not found in gdf_riv")
logger.debug(f"Derive {vars} from shapefile.")
if "x_out" in ds_model and "y_out" in ds_model:
# get subgrid outlet pixel index and coordinates
xs_out = ds_model["x_out"].values[riv_mask]
ys_out = ds_model["y_out"].values[riv_mask]
else:
# get river cell coordinates
row, col = np.where(riv_mask)
xs_out = ds_model.raster.xcoords.values[col]
ys_out = ds_model.raster.ycoords.values[row]
gdf_out = gpd.GeoDataFrame(
geometry=gpd.points_from_xy(xs_out, ys_out), crs=ds_model.raster.crs
)
idx_nn, dst_nn = gis_utils.nearest(gdf_out, gdf_riv)
# get valid river data within max half pixel distance
xres, yres = ds_model.raster.res
if ds_model.raster.crs.is_geographic: # convert degree to meters
lat_avg = ds_model.raster.ycoords.values.mean()
xres, yres = gis_utils.cellres(lat_avg, xres, yres)
max_dist = np.mean(np.abs([xres, yres])) / 2.0
nriv, nsnap = xs_out.size, int(np.sum(dst_nn < max_dist))
logger.debug(
f"Valid for {nsnap}/{nriv} river cells (max dist: {max_dist:.0f} m)."
)
for name in vars:
data = np.full_like(riv_mask, -9999, dtype=np.float32)
data[riv_mask] = np.where(
dst_nn < max_dist, gdf_riv.loc[idx_nn, name].fillna(-9999).values, -9999
)
ds_model[name] = xr.Variable(dims, data, attrs=dict(_FillValue=-9999))
else:
vars = vars0
assert "rivwth" in ds_model
assert "qbankfull" in ds_model or "rivdph" in ds_model
# fill gaps in data using downward filling along flow directions
for name in vars:
data = ds_model[name].values
nodata = ds_model[name].raster.nodata
if np.all(data[riv_mask] != nodata):
continue
data = flwdir_river.fillnodata(data, nodata, direction="down", how="max")
ds_model[name].values = np.maximum(0, data)
# smooth by averaging along flow directions and set minimum
if smooth_len > 0:
nsmooth = min(1, int(round(smooth_len / rivlen_avg / 2)))
kwgs = dict(n=nsmooth, restrict_strord=True)
ds_model["rivwth"].values = flwdir_river.moving_average(
ds_model["rivwth"].values, nodata=ds_model["rivwth"].raster.nodata, **kwgs
)
ds_model["rivwth"] = np.maximum(min_rivwth, ds_model["rivwth"]).where(
riv_mask, ds_model["rivwth"].raster.nodata
)
## river depth
if "rivdph" not in ds_model:
# distance to outlet; required for manning and gvf rivdph methods
if method != "powlaw" and "rivdst" not in ds_model:
rivlen = ds_model["rivlen"].values
nodata = ds_model["rivlen"].raster.nodata
rivdst = flwdir_river.accuflux(rivlen, nodata=nodata, direction="down")
ds_model["rivdst"] = xr.Variable(
dims, rivdst, attrs=dict(_FillValue=nodata)
)
# add river distance to outlet -> required for manning/gvf method
rivdph = workflows.river_depth(
data=ds_model,
flwdir=flwdir_river,
method=method,
min_rivdph=min_rivdph,
**kwargs,
)
attrs = dict(_FillValue=-9999, unit="m")
ds_model["rivdph"] = xr.Variable(dims, rivdph, attrs=attrs).fillna(-9999)
# smooth by averaging along flow directions and set minimum
if smooth_len > 0:
ds_model["rivdph"].values = flwdir_river.moving_average(
ds_model["rivdph"].values, nodata=-9999, **kwgs
)
ds_model["rivdph"] = np.maximum(min_rivdph, ds_model["rivdph"]).where(
riv_mask, -9999
)
return ds_model[["rivwth", "rivdph"]]
def river_floodplain_volume(
ds,
ds_model,
river_upa=30,
flood_depths=[0.5, 1.0, 1.5, 2.0, 2.5],
dtype=np.float64,
logger=logger,
):
"""Calculate floodplain volume at given flood depths based on (subgrid) HAND map.
Parameters
----------
ds: xr.Dataset
hydrography dataset containing "flwdir", "uparea", "elevtn" variables;
ds_model: xr.Dataset, optional
Model dataset with output grid, must contain "rivmsk", "rivwth", "rivlen"
for subgrid must contain "x_out", "y_out".
river_upa: float
minimum threshold to define the river when calculating HAND, by default 30 [km2]
flood_depths : list of float, optional
flood depths at which a volume is derived, by default [0.5,1.0,1.5,2.0,2.5]
dtype: numpy.dtype, optional
output dtype
Returns
-------
da_out : xr.DataArray with dims (flood_depth, y, x)
Floodplain volume [m3]
"""
if not ds.raster.crs == ds_model.raster.crs:
raise ValueError("Hydrography dataset CRS does not match model CRS")
# check data variables.
dvars = ["flwdir", "elevtn", "uparea"]
dvars_model = ["rivmsk", "rivwth", "rivlen"]
if not np.all([v in ds for v in dvars]):
raise ValueError(f"One or more variables missing from ds: {dvars}.")
if not np.all([v in ds_model for v in dvars_model]):
raise ValueError(f"One or more variables missing from ds_model: {dvars_model}")
# initialize flood directions
flwdir = flw.flwdir_from_da(ds["flwdir"], mask=None)
# get river cell coordinates
riv_mask = ds_model["rivmsk"].values > 0
if "x_out" in ds_model and "y_out" in ds_model: # use subgrid
idxs_out = ds.raster.xy_to_idx(
xs=ds_model["x_out"].values,
ys=ds_model["y_out"].values,
mask=riv_mask,
nodata=flwdir._mv,
)
else:
# get cell index of river cells
idxs_out = np.arange(ds_model.raster.size).reshape(ds_model.raster.shape)
idxs_out = np.where(riv_mask, idxs_out, flwdir._mv)
# calculate hand
hand = flwdir.hand(
drain=ds["uparea"].values >= river_upa, elevtn=ds["elevtn"].values
)
# calculate volume at user defined flood depths
# note that the ucat_map is not save currently but is required if
# we want to create a flood map by postprocessing
flood_depths = np.atleast_1d(flood_depths).ravel().astype(dtype) # force 1D
_, ucat_vol = flwdir.ucat_volume(idxs_out=idxs_out, hand=hand, depths=flood_depths)
# force minimum volume based on river width * length
rivlen = np.maximum(ds_model["rivlen"].values, 1) # avoid zero division errors
min_vol = ds_model["rivwth"].values * rivlen * flood_depths[0]
ucat_vol[0, :, :] = np.maximum(ucat_vol[0, :, :], min_vol)
fldwth = ucat_vol[0, :, :] / rivlen / flood_depths[0]
for i in range(1, ucat_vol.shape[0]):
dh = flood_depths[i] - flood_depths[i - 1]
min_vol = ucat_vol[i - 1, ...] + (fldwth * rivlen * dh)
ucat_vol[i, :, :] = np.maximum(ucat_vol[i, :, :], min_vol)
fldwth = (ucat_vol[i, :, :] - ucat_vol[i - 1, :, :]) / rivlen / dh
# return xarray DataArray
da_out = (
xr.DataArray(
coords={"flood_depth": flood_depths, **ds_model.raster.coords},
dims=("flood_depth", *ds_model.raster.dims),
data=ucat_vol,
)
.reset_coords(drop=True)
.where(ds_model["rivmsk"] > 0, -9999.0)
)
da_out.raster.set_nodata(-9999.0)
da_out.raster.set_crs(ds_model.raster.crs)
# TODO return a second dataset with hand and ucat_map variables for postprocessing
return da_out
# TODO: methods below are redundant after version v0.1.4 and will be removed in
# future versions
def river_width(
ds_like,
flwdir,
data=dict(),
fit=False,
fill=True,
fill_outliers=True,
min_wth=1,
mask_names=[],
predictor="discharge",
rivwth_name="rivwth",
obs_postfix="_obs",
rivmsk_name="rivmsk",
a=None,
b=None,
logger=logger,
**kwargs,
):
"""Calculate river width."""
nopars = a is None or b is None # no manual a, b parameters
fit = fit or (nopars and predictor not in ["discharge"]) # fit power-law on the fly
nowth = f"{rivwth_name}{obs_postfix}" not in ds_like # no observed width
fill = fill and nowth == False # fill datagaps and masked areas (lakes/res) in obs
if nowth and fit:
raise ValueError(
f'Observed rivwth "{rivwth_name}{obs_postfix}" required to fit riverwidths.'
)
elif nowth:
logger.warning(f'Observed rivwth "{rivwth_name}{obs_postfix}" not found.')
# get predictor
logger.debug(f'Deriving predictor "{predictor}" values')
if predictor == "discharge":
values, pars = _discharge(ds_like, flwdir=flwdir, logger=logger, **data)
if fit == False:
a, b = pars
# TODO: check units
elif predictor == "precip":
values = _precip(ds_like, flwdir=flwdir, logger=logger, **data)
else:
if predictor not in ds_like:
raise ValueError(f"required {predictor} variable missing in grid.")
values = ds_like[predictor].values
# read river width observations
if nowth == False:
rivwth_org = ds_like[f"{rivwth_name}{obs_postfix}"].values
nodata = ds_like[f"{rivwth_name}{obs_postfix}"].raster.nodata
rivmsk = rivwth_org != nodata
rivwth_org = np.where(rivmsk, rivwth_org, -9999)
# mask zero and negative riverwidth and waterbodies if present
mask = rivwth_org > min_wth
for name in mask_names:
if name in ds_like:
mask[ds_like[name].values != ds_like[name].raster.nodata] = False
logger.debug(f"{name} masked out in rivwth data.")
else:
logger.warning(f'mask variable "{name}" not found in maps.')
# fit predictor
wth = rivwth_org[mask]
val = values[mask]
if fit:
logger.info(f"Fitting power-law a,b parameters based on {predictor}")
a, b = _width_fit(wth, val, mask, logger=logger)
wsim = power_law(val, a, b)
res = np.abs(wsim - wth)
outliers = np.logical_and(res > 200, (res / wsim) > 1.0)
pout = np.sum(outliers) / outliers.size * 100
# validate
wsim = power_law(val[~outliers], a, b)
nse = stats._nse(wsim, wth[~outliers])
logger.info(
f'Using "width = a*{predictor}^b"; where a={a:.2f}, b={b:.2f} '
f"(nse={nse:.3f}; outliers={pout:.2f}%)"
)
# compute new riverwidth
rivmsk = ds_like[rivmsk_name].values != 0
rivwth_out = np.full(ds_like.raster.shape, -9999.0, dtype=np.float32)
rivwth_out[rivmsk] = np.maximum(min_wth, power_law(values[rivmsk], a, b))
# overwrite rivwth data with original data at valid points
if fill:
if fill_outliers:
mask[mask] = ~outliers
logger.info(f"masking {np.sum(outliers):.0f} outliers")
rivwth_out[mask] = wth[~outliers]
fills = "for missing data" if fill else "globally"
logger.info(f"rivwth set {fills} based on width-{predictor} relationship.")
# return xarray dataarray
attrs = dict(_FillValue=-9999, unit="m")
return xr.DataArray(rivwth_out, dims=ds_like.raster.dims, attrs=attrs)
def _width_fit(
wth,
val,
mask,
p0=[0.15, 0.65],
logger=logger, # rhine uparea based
):
outliers = np.full(np.sum(mask), False, dtype=bool)
a, b = None, None
# check if sufficient data
if np.sum(mask) > 10:
n = 0
# fit while removing outliers.
while n < 20:
if np.sum(mask[mask][~outliers]) > 10:
pars = curve_fit(
f=power_law,
xdata=val[~outliers],
ydata=wth[~outliers],
p0=p0,
bounds=([0.01, 0.01], [100.0, 1.0]),
)[0]
# print(n, pars, np.sum(~outliers))
if np.allclose(p0, pars):
break
# outliers based on absolute and relative diff
wsim = power_law(val, *pars)
res = np.abs(wsim - wth)
outliers = np.logical_and(res > 200, (res / wsim) > 1.0)
if np.sum(outliers) > np.sum(mask) * 0.2: # max 20 % outliers
pars = p0
outliers[:] = False
logger.warning("Fit not successful.")
break
p0 = pars
n += 1
a, b = pars
elif a is None or b is None:
a, b = p0
logger.warning("Insufficient data points, using global parameters.")
return a, b
def _precip(ds_like, flwdir, da_precip, logger=logger):
"""Do simple precipication method."""
precip = (
da_precip.raster.reproject_like(ds_like, method=RESAMPLING["precip"]).values
/ 1e3
) # [m/yr]
precip = np.maximum(precip, 0)
lat, lon = ds_like.raster.ycoords.values, ds_like.raster.xcoords.values
area = gis_utils.reggrid_area(lat, lon)
# 10 x average flow
accu_precip = flwdir.accuflux(precip * area / (86400 * 365) * 10) # [m3/s]
return accu_precip
def _discharge(ds_like, flwdir, da_precip, da_climate, logger=logger):
"""Do discharge method."""
# read clim classes and regression parameters from data dir
precip_fn = da_precip.name
climate_fn = da_climate.name
fn_regr = join(DATADIR, "rivwth", f"regr_{precip_fn}.csv")
fn_clim = join(DATADIR, "rivwth", f"{climate_fn}.csv")
clim_map = pd.read_csv(fn_clim, index_col="class")
regr_map = pd.read_csv(fn_regr, index_col="source").loc[climate_fn]
regr_map = regr_map.set_index("base_class")
# get overall catchment climate classification (based on mode)
# TODO: convert to xarray method by using scipy.stats.mode in xr.apply_ufuncs
# TODO: reproject climate and mask cells outside basin
np_climate = da_climate.values
basin_climate = np.bincount(np_climate.flatten()).argmax()
# convert detailed climate classification to higher level base class
base_class, class_name = clim_map.loc[basin_climate]
logger.debug(f"Basin base climate classification: {base_class} ({class_name})")
params = regr_map.loc[base_class]
# precipitation (TO-DO: add checks that were present in old version?)
precip = da_precip.raster.reproject_like(
ds_like, method=RESAMPLING["precip"]
).values
# apply scaling factor to make sure accumulated precipitation will stay
# (approximately) the same
scaling_factor_1 = np.round(da_precip.raster.res[0] / ds_like.raster.res[0], 4)
scaling_factor_2 = np.round(da_precip.raster.res[1] / ds_like.raster.res[1], 4)
scaling_factor = (scaling_factor_1 + scaling_factor_2) / 2
precip = precip / scaling_factor**2
# derive cell areas (m2)
lat, lon = ds_like.raster.ycoords.values, ds_like.raster.xcoords.values
areagrid = gis_utils.reggrid_area(lat, lon) / 1e6
# calculate "local runoff" (note: set missings in precipitation to zero)
runoff = (np.maximum(precip, 0) * params["precip"]) + (areagrid * params["area"])
runoff = np.maximum(runoff, 0) # make sure runoff is not negative
# get discharges
discharge = flwdir.accuflux(runoff, nodata=NODATA["discharge"], direction="up")
return discharge, (params["discharge_a"], params["discharge_b"])