"""Workflows to connect a wflow model to a 1D model."""
import logging
import geopandas as gpd
import hydromt
import numpy as np
import pandas as pd
import xarray as xr
from shapely.geometry import Point
logger = logging.getLogger(__name__)
__all__ = ["wflow_1dmodel_connection"]
[docs]
def wflow_1dmodel_connection(
gdf_riv: gpd.GeoDataFrame,
ds_model: xr.Dataset,
connection_method: str = "subbasin_area",
area_max: float = 10.0,
add_tributaries: bool = True,
include_river_boundaries: bool = True,
logger=logger,
) -> xr.Dataset:
"""
Connect wflow to a 1D model by deriving linked subcatchs (and tributaries).
There are two methods to connect models:
- `subbasin_area`:
creates subcatchments linked to the 1d river based on an
area threshold (area_max) for the subbasin size. With this method, if a
tributary is larger than the area_max, it will be connected to the 1d river
directly.
- `nodes`:
subcatchments are derived based on the 1driver nodes (used as gauges
locations). With this method, large tributaries can also be derived separately
using the add_tributaries option and adding a area_max threshold for the
tributaries.
If `add_tributary` option is on, you can decide to include or exclude the upstream
boundary of the 1d river as an additionnal tributary using the
`include_river_boundaries` option.
Parameters
----------
gdf_riv : gpd.GeoDataFrame
River geometry.
ds_model : xr.Dataset
Model dataset with 'flwdir', 'rivmsk', 'rivlen', 'uparea'.
connection_method : str, default subbasin_area
Method to connect wflow to the 1D model. Available methods are
{'subbasin_area', 'nodes'}.
area_max : float, default 10.0
Maximum area [km2] of the subbasins to connect to the 1D model in km2 with
connection_method **subbasin_area** or
**nodes** with add_tributaries set to True.
add_tributaries : bool, default True
If True, derive tributaries for the subbasins larger than area_max. Always True
for **subbasin_area** method.
include_river_boundaries : bool, default True
If True, include the upstream boundary(ies) of the 1d river as an additional
tributary(ies).
logger : logging.Logger, optional
Logger object, by default logger
Returns
-------
ds_out: xr.Dataset
Dataset with variables 'subcatch' for the subbasin map, 'subcatch_riv' for the
subbasin map masked with river cells to be able to save river output with wflow
and 'gauges' for the tributaries outflow locations (add_tributaries True or
subbasin_area method).
"""
# Checks
dvars_model = ["flwdir", "rivmsk", "rivlen", "uparea"]
if not np.all([v in ds_model for v in dvars_model]):
raise ValueError(f"One or more variables missing from ds_model: {dvars_model}")
# Reproject
if gdf_riv.crs != ds_model.raster.crs:
gdf_riv = gdf_riv.to_crs(ds_model.raster.crs)
# Derive flwdir
flwdir = hydromt.flw.flwdir_from_da(ds_model["flwdir"])
# If tributaries or subbasins area method,
# need to derive the tributaries areas first
if connection_method == "subbasin_area" or add_tributaries:
logger.info("Linking 1D river to wflow river")
# 1. Derive the river edges / boundaries
# merge multilinestrings in gdf_riv to linestrings
riv1d = gdf_riv.explode().reset_index(drop=True)
# get the edges of the riv1d
riv1d_edges = riv1d.geometry.apply(lambda x: Point(x.coords[0]))
riv1d_edges = pd.concat(
[riv1d_edges, riv1d.geometry.apply(lambda x: Point(x.coords[-1]))]
)
# find geometry that are unique in riv1d_edges
riv1d_edges = gpd.GeoDataFrame(
riv1d_edges[~riv1d_edges.duplicated(keep=False)],
crs=riv1d.crs,
geometry="geometry",
)
# 2. snap edges to wflow river
# TODO if uparea column in riv1d, use it to snap to the closest river
# based on upstream area
da_edges, idxs, ids = hydromt.flw.gauge_map(
ds_model,
xy=(riv1d_edges.geometry.x, riv1d_edges.geometry.y),
stream=ds_model["rivmsk"].values,
flwdir=flwdir,
logger=logger,
)
points = gpd.points_from_xy(*ds_model.raster.idx_to_xy(idxs))
# if csv contains additional columns, these are also written in the staticgeoms
riv1d_edges = gpd.GeoDataFrame(
index=ids.astype(np.int32), geometry=points, crs=ds_model.raster.crs
)
# 3. Derive the subbasins corresponding to the river edges
da_edges_subbas, _ = hydromt.flw.basin_map(
ds_model, flwdir=flwdir, xy=(riv1d_edges.geometry.x, riv1d_edges.geometry.y)
)
da_edges_subbas.raster.set_crs(ds_model.raster.crs)
# convert to gdf
gdf_edges_subbas = da_edges_subbas.raster.vectorize()
# 4. Filter which subbasins are the upstream ones (tributaries)
# and which ones are the downstream ones (main river)
# and should be split into subbasins
# First intersect riv1d with gdf_edges_subbas
rivmerge = gpd.overlay(riv1d, gdf_edges_subbas).explode().reset_index(drop=True)
# Compute len of river
if rivmerge.crs.is_geographic:
rivmerge["len"] = rivmerge.geometry.to_crs(3857).length
else:
rivmerge["len"] = rivmerge.geometry.length
# Groupby value and sum length
rivmerge = rivmerge.groupby("value")["len"].sum()
# Select the subcatch where rivlength is more than 5times rivlen_avg
riv_mask = ds_model["rivmsk"].values == 1
rivlen_avg = ds_model["rivlen"].values[riv_mask].mean()
subids = rivmerge.index[rivmerge > rivlen_avg * 5].values
subcatch_to_split = gdf_edges_subbas[gdf_edges_subbas["value"].isin(subids)]
subcatch_to_split = subcatch_to_split.to_crs(ds_model.raster.crs)
da_subcatch_to_split = ds_model.raster.rasterize(subcatch_to_split)
# First tributaries are the edges that are not included in the subcatch_to_split
gdf_tributaries = riv1d_edges[~riv1d_edges.index.isin(subids)]
# 5. Derive a mask of gdf_riv in the subcatch_to_split wflow rivers
# compute flw_path to mask out the river included in dfm 1d network
xy = (
gdf_tributaries.geometry.x.values.tolist(),
gdf_tributaries.geometry.y.values.tolist(),
)
# Get paths
flowpaths, dists = flwdir.path(
xy=xy, max_length=None, unit="m", direction="down"
)
feats = flwdir.geofeatures(flowpaths)
gdf_paths = gpd.GeoDataFrame.from_features(feats, crs=ds_model.raster.crs)
gdf_paths.index = np.arange(1, len(gdf_paths) + 1)
# Create a mask with subcatch id in subcatch_to_split and flowpath is nodata
da_flwpaths = ds_model.raster.rasterize(gdf_paths)
da_flwpaths = da_flwpaths.where(
da_subcatch_to_split != da_subcatch_to_split.raster.nodata,
da_flwpaths.raster.nodata,
)
# 6. Derive the tributaries
# Find tributaries
logger.info("Deriving tributaries")
trib_msk = da_subcatch_to_split.where(
da_flwpaths == da_flwpaths.raster.nodata, da_subcatch_to_split.raster.nodata
)
trib_msk = trib_msk.where(
(trib_msk != trib_msk.raster.nodata)
& (ds_model["uparea"] > area_max)
& (flwdir.downstream(da_flwpaths) != da_flwpaths.raster.nodata),
trib_msk.raster.nodata,
)
gdf_trib = trib_msk.raster.vectorize()
# Test if there gdf_trib is empty
if gdf_trib.empty:
logger.info("No tributaries found")
if not include_river_boundaries:
gdf_tributaries = gpd.GeoDataFrame()
else:
gdf_trib["geometry"] = gdf_trib.centroid
# Merge with gdf_tributary if include_river_boundaries
# else only keep intersecting tributaries
if include_river_boundaries:
gdf_tributaries = pd.concat(
[gdf_tributaries, gdf_trib.drop(["value"], axis=1)]
)
else:
gdf_tributaries = gdf_trib.drop(["value"], axis=1)
gdf_tributaries.index = np.arange(1, len(gdf_tributaries) + 1)
# 7. Mask the tributaries out of the subatch_to_split map
if not gdf_tributaries.empty:
# Derive the tributary basin map
da_trib_subbas, _ = hydromt.flw.basin_map(
ds_model,
flwdir=flwdir,
xy=(gdf_tributaries.geometry.x, gdf_tributaries.geometry.y),
)
da_trib_subbas.raster.set_crs(ds_model.raster.crs)
# Mask tributaries
da_flwdir_mask = ds_model["flwdir"].where(
(da_subcatch_to_split != da_subcatch_to_split.raster.nodata)
& (da_trib_subbas == da_trib_subbas.raster.nodata),
ds_model["flwdir"].raster.nodata,
)
else:
# Mask subcatch to split only
da_flwdir_mask = ds_model["flwdir"].where(
da_subcatch_to_split != da_subcatch_to_split.raster.nodata,
ds_model["flwdir"].raster.nodata,
)
flwdir_mask = hydromt.flw.flwdir_from_da(da_flwdir_mask)
else:
# The mask for deriving subbasins is the whole wflow model
flwdir_mask = flwdir
gdf_tributaries = gpd.GeoDataFrame()
# 8. Derive the subbasins
if connection_method == "subbasin_area":
logger.info(
"Deriving lateral subbasins based on"
f"subbasin area threshold: {area_max} km2"
)
# calculate subbasins with a minimum stream order 7 and its outlets
subbas, idxs_out = flwdir_mask.subbasins_area(area_max)
da_subbasins = xr.DataArray(
data=subbas.astype(np.int32),
dims=ds_model.raster.dims,
coords=ds_model.raster.coords,
)
da_subbasins.raster.set_nodata(0)
da_subbasins.raster.set_crs(ds_model.raster.crs)
else:
# Get the nodes from gdf_riv
logger.info("Deriving subbasins based on 1D river nodes snapped to wflow river")
# from multiline to line
gdf_riv = gdf_riv.explode(ignore_index=True, index_parts=False)
nodes = []
for bi, branch in gdf_riv.iterrows():
nodes.append([Point(branch.geometry.coords[0]), bi]) # start
nodes.append([Point(branch.geometry.coords[-1]), bi]) # end
gdf_nodes = gpd.GeoDataFrame(
nodes, columns=["geometry", "river_id"], crs=gdf_riv.crs
)
# Drop duplicates geometry
gdf_nodes = gdf_nodes[~gdf_nodes.geometry.duplicated(keep="first")]
gdf_nodes.index = np.arange(1, len(gdf_nodes) + 1)
# Derive subbasins
da_subbasins, _ = hydromt.flw.basin_map(
ds_model,
flwdir=flwdir_mask,
xy=(gdf_nodes.geometry.x, gdf_nodes.geometry.y),
stream=ds_model["rivmsk"].values,
)
da_subbasins.raster.set_crs(ds_model.raster.crs)
da_subbasins.name = "subcatch"
ds_out = da_subbasins.to_dataset()
# Subcatchment map for river cells only (to be able to save river outputs in wflow)
ds_out["subcatch_riv"] = da_subbasins.where(
ds_model["rivmsk"] > 0, da_subbasins.raster.nodata
)
# Add tributaries
if not gdf_tributaries.empty:
ds_out["gauges"] = ds_out.raster.rasterize(
gdf_tributaries, col_name="index", nodata=0
)
return ds_out