Source code for hydromt_wflow.workflows.connect

"""Workflows to connect a wflow model to a 1D model."""

import logging

import geopandas as gpd
import hydromt
import numpy as np
import pandas as pd
import xarray as xr
from shapely.geometry import LineString, MultiLineString, Point

logger = logging.getLogger(__name__)


__all__ = ["wflow_1dmodel_connection"]


[docs] def wflow_1dmodel_connection( gdf_riv: gpd.GeoDataFrame, ds_model: xr.Dataset, connection_method: str = "subbasin_area", area_max: float = 30.0, add_tributaries: bool = True, include_river_boundaries: bool = True, logger=logger, **kwargs, ) -> xr.Dataset: """ Connect wflow to a 1D model by deriving linked subcatchs (and tributaries). There are two methods to connect models: - `subbasin_area`: creates subcatchments linked to the 1d river based on an area threshold (area_max) for the subbasin size. With this method, if a tributary is larger than the area_max, it will be connected to the 1d river directly. - `nodes`: subcatchments are derived based on the 1driver nodes (used as gauges locations). With this method, large tributaries can also be derived separately using the add_tributaries option and adding a area_max threshold for the tributaries. If `add_tributary` option is on, you can decide to include or exclude the upstream boundary of the 1d river as an additional tributary using the `include_river_boundaries` option. River edges or river nodes are snapped to the closest downstream wflow river cell using the :py:meth:`hydromt.flw.gauge_map` method. Parameters ---------- gdf_riv : gpd.GeoDataFrame River geometry. ds_model : xr.Dataset Model dataset with 'flwdir', 'rivmsk', 'rivlen', 'uparea'. connection_method : str, default subbasin_area Method to connect wflow to the 1D model. Available methods are {'subbasin_area', 'nodes'}. area_max : float, default 10.0 Maximum area [km2] of the subbasins to connect to the 1D model in km2 with connection_method **subbasin_area** or **nodes** with add_tributaries set to True. add_tributaries : bool, default True If True, derive tributaries for the subbasins larger than area_max. Always True for **subbasin_area** method. include_river_boundaries : bool, default True If True, include the upstream boundary(ies) of the 1d river as an additional tributary(ies). logger : logging.Logger, optional Logger object, by default logger **kwargs Additional keyword arguments passed to the snapping method hydromt.flw.gauge_map. See its documentation for more information. Returns ------- ds_out: xr.Dataset Dataset with variables 'subcatch' for the subbasin map, 'subcatch_riv' for the subbasin map masked with river cells to be able to save river output with wflow and 'gauges' for the tributaries outflow locations (add_tributaries True or subbasin_area method). See Also -------- hydromt.flw.gauge_map """ # Checks dvars_model = ["flwdir", "rivmsk", "rivlen", "uparea"] if not np.all([v in ds_model for v in dvars_model]): raise ValueError(f"One or more variables missing from ds_model: {dvars_model}") # Reproject if gdf_riv.crs != ds_model.raster.crs: gdf_riv = gdf_riv.to_crs(ds_model.raster.crs) # Derive flwdir flwdir = hydromt.flw.flwdir_from_da(ds_model["flwdir"]) # Basin mask buffer_cells = 2 basin_mask = ds_model["basins"].raster.vectorize() basin_mask = basin_mask.buffer(-buffer_cells * max(ds_model.raster.res, key=abs)) # If tributaries or subbasins area method, # need to derive the tributaries areas first if connection_method == "subbasin_area" or add_tributaries: # 0. Check that the max_area is not too small # Should be bigger than the wflow river threshold # Get from attrs if available for newer wflow models built with hydromt riv_upa = ds_model["rivmsk"].attrs.get("river_upa", None) if riv_upa is None: # Derive from the uparea and rivmsk riv_upa = xr.where(ds_model["rivmsk"] > 0, ds_model["uparea"], np.nan) riv_upa = float(riv_upa.min()) if area_max < riv_upa: new_area_max = np.ceil(riv_upa / 0.5) * 0.5 logger.warning( f"The area_max {area_max} is smaller than the minimum upstream area of " f"the wflow river {riv_upa} which means tributaries will " "not be connected to the wflow river. Changing and setting area_max to " f"{new_area_max} km2. " f"To keep {area_max} km2 threshold, please update the wflow model to " "include more detailed rivers." ) area_max = new_area_max logger.info("Linking 1D river to wflow river") # 1. Derive the river edges / boundaries # merge multilinestrings in gdf_riv to linestrings if any(gdf_riv.geometry.apply(lambda geom: isinstance(geom, MultiLineString))): logger.debug( "'gdf_riv' contains MultiLineStrings which will be converted into" "LineStrings." ) gdf_riv = gdf_riv.explode(index_parts=True).reset_index(drop=True) # clip to basins riv1d = gdf_riv.clip(basin_mask) if any(riv1d.geometry.apply(lambda geom: isinstance(geom, MultiLineString))): # clipping may result in MultiLineStrings logger.warning( "The provided river geometry contains MultiLineString geometries after" "clipping. Consider checking if the provided river geometry crosses the" "boundary of the Wflow model and if there is sufficient space between" f"the river and the Wflow model boundary (min. {buffer_cells} cells)." ) # we turn these into LineStrings again using its points riv1d["geometry"] = riv1d.geometry.apply( lambda geom: LineString( [coord for line in geom.geoms for coord in line.coords] ) if isinstance(geom, MultiLineString) else geom ) # get the edges of the riv1d riv1d_edges = riv1d.geometry.apply(lambda x: Point(x.coords[0])) riv1d_edges = pd.concat( [riv1d_edges, riv1d.geometry.apply(lambda x: Point(x.coords[-1]))] ) # find geometry that are unique in riv1d_edges riv1d_edges = gpd.GeoDataFrame( riv1d_edges[~riv1d_edges.duplicated(keep=False)], crs=riv1d.crs, geometry="geometry", ) # 2. snap edges to wflow river da_edges, idxs, ids = hydromt.flw.gauge_map( ds_model, xy=(riv1d_edges.geometry.x, riv1d_edges.geometry.y), stream=ds_model["rivmsk"].values, flwdir=flwdir, logger=logger, **kwargs, ) points = gpd.points_from_xy(*ds_model.raster.idx_to_xy(idxs)) # if csv contains additional columns, these are also written in the staticgeoms riv1d_edges = gpd.GeoDataFrame( index=ids.astype(np.int32), geometry=points, crs=ds_model.raster.crs ) # 3. Derive the subbasins corresponding to the river edges da_edges_subbas, _ = hydromt.flw.basin_map( ds_model, flwdir=flwdir, xy=(riv1d_edges.geometry.x, riv1d_edges.geometry.y) ) da_edges_subbas.raster.set_crs(ds_model.raster.crs) # convert to gdf gdf_edges_subbas = da_edges_subbas.raster.vectorize() # 4. Filter which subbasins are the upstream ones (tributaries) # and which ones are the downstream ones (main river) # and should be split into subbasins # First intersect riv1d with gdf_edges_subbas rivmerge = ( gpd.overlay(riv1d, gdf_edges_subbas) .explode(index_parts=True) .reset_index(drop=True) ) # Compute len of river if rivmerge.crs.is_geographic: rivmerge["len"] = rivmerge.geometry.to_crs(3857).length else: rivmerge["len"] = rivmerge.geometry.length # Groupby value and sum length rivmerge = rivmerge.groupby("value")["len"].sum() # Select the subcatch where rivlength is more than 5times rivlen_avg riv_mask = ds_model["rivmsk"].values == 1 rivlen_avg = ds_model["rivlen"].values[riv_mask].mean() subids = rivmerge.index[rivmerge > rivlen_avg * 5].values subcatch_to_split = gdf_edges_subbas[gdf_edges_subbas["value"].isin(subids)] subcatch_to_split = subcatch_to_split.to_crs(ds_model.raster.crs) da_subcatch_to_split = ds_model.raster.rasterize( subcatch_to_split, col_name="value" ).astype(np.float32) # First tributaries are the edges that are not included in the subcatch_to_split gdf_tributaries = riv1d_edges[~riv1d_edges.index.isin(subids)] # 5. Derive a mask of gdf_riv in the subcatch_to_split wflow rivers # compute flw_path to mask out the river included in dfm 1d network xy = ( gdf_tributaries.geometry.x.values.tolist(), gdf_tributaries.geometry.y.values.tolist(), ) # Get paths flowpaths, dists = flwdir.path( xy=xy, max_length=None, unit="m", direction="down" ) feats = flwdir.geofeatures(flowpaths) gdf_paths = gpd.GeoDataFrame.from_features(feats, crs=ds_model.raster.crs) gdf_paths.index = np.arange(1, len(gdf_paths) + 1) # Create a mask with subcatch id in subcatch_to_split and flowpath is nodata da_flwpaths = ds_model.raster.rasterize(gdf_paths) da_flwpaths = da_flwpaths.where( da_subcatch_to_split != da_subcatch_to_split.raster.nodata, da_flwpaths.raster.nodata, ) # 6. Derive the tributaries # Find tributaries logger.info("Deriving tributaries") trib_msk = da_subcatch_to_split.where( da_flwpaths == da_flwpaths.raster.nodata, da_subcatch_to_split.raster.nodata ) trib_msk = trib_msk.where( (trib_msk != trib_msk.raster.nodata) & (ds_model["uparea"] > area_max) & (flwdir.downstream(da_flwpaths) != da_flwpaths.raster.nodata), trib_msk.raster.nodata, ) # Make sure we vectorize to single cells and not to polygons for adjacent cells trib_msk = trib_msk.stack(z=(trib_msk.raster.y_dim, trib_msk.raster.x_dim)) nodata = trib_msk.raster.nodata trib_msk = trib_msk.where(trib_msk != nodata, drop=True) trib_msk.values = np.arange(1, len(trib_msk) + 1) trib_msk = trib_msk.unstack(fill_value=nodata) trib_msk = trib_msk.reindex_like( da_subcatch_to_split, fill_value=nodata ).astype(np.int32) gdf_trib = trib_msk.raster.vectorize() # Test if there gdf_trib is empty if gdf_trib.empty: logger.info("No tributaries found") if not include_river_boundaries: gdf_tributaries = gpd.GeoDataFrame() else: gdf_trib["geometry"] = gdf_trib.centroid # Merge with gdf_tributary if include_river_boundaries # else only keep intersecting tributaries if include_river_boundaries: gdf_tributaries = pd.concat( [gdf_tributaries, gdf_trib.drop(["value"], axis=1)] ) else: gdf_tributaries = gdf_trib.drop(["value"], axis=1) gdf_tributaries.index = np.arange(1, len(gdf_tributaries) + 1) # 7. Mask the tributaries out of the subatch_to_split map if not gdf_tributaries.empty: # Derive the tributary basin map da_trib_subbas, _ = hydromt.flw.basin_map( ds_model, flwdir=flwdir, xy=(gdf_tributaries.geometry.x, gdf_tributaries.geometry.y), ) da_trib_subbas.raster.set_crs(ds_model.raster.crs) # Mask tributaries da_flwdir_mask = ds_model["flwdir"].where( (da_subcatch_to_split != da_subcatch_to_split.raster.nodata) & (da_trib_subbas == da_trib_subbas.raster.nodata), ds_model["flwdir"].raster.nodata, ) else: # Mask subcatch to split only da_flwdir_mask = ds_model["flwdir"].where( da_subcatch_to_split != da_subcatch_to_split.raster.nodata, ds_model["flwdir"].raster.nodata, ) flwdir_mask = hydromt.flw.flwdir_from_da(da_flwdir_mask) else: # The mask for deriving subbasins is the whole wflow model flwdir_mask = flwdir gdf_tributaries = gpd.GeoDataFrame() # 8. Derive the subbasins if connection_method == "subbasin_area": logger.info( "Deriving lateral subbasins based on" f"subbasin area threshold: {area_max} km2" ) # calculate subbasins with a minimum stream order 7 and its outlets subbas, idxs_out = flwdir_mask.subbasins_area(area_max) da_subbasins = xr.DataArray( data=subbas.astype(np.int32), dims=ds_model.raster.dims, coords=ds_model.raster.coords, ) da_subbasins.raster.set_nodata(0) da_subbasins.raster.set_crs(ds_model.raster.crs) else: # Get the nodes from gdf_riv logger.info("Deriving subbasins based on 1D river nodes snapped to wflow river") # from multiline to line gdf_riv = gdf_riv.explode(ignore_index=True, index_parts=False) # Clip to basin gdf_riv = gdf_riv.clip(basin_mask) nodes = [] for bi, branch in gdf_riv.iterrows(): nodes.append([Point(branch.geometry.coords[0]), bi]) # start nodes.append([Point(branch.geometry.coords[-1]), bi]) # end gdf_nodes = gpd.GeoDataFrame( nodes, columns=["geometry", "river_id"], crs=gdf_riv.crs ) # Drop duplicates geometry gdf_nodes = gdf_nodes[~gdf_nodes.geometry.duplicated(keep="first")] gdf_nodes.index = np.arange(1, len(gdf_nodes) + 1) # Snap the nodes to the wflow river da_nodes, idxs, ids = hydromt.flw.gauge_map( ds_model, xy=(gdf_nodes.geometry.x, gdf_nodes.geometry.y), stream=ds_model["rivmsk"].values, flwdir=flwdir, logger=logger, **kwargs, ) # Derive subbasins da_subbasins, _ = hydromt.flw.basin_map( ds_model, flwdir=flwdir_mask, idxs=idxs, ids=ids, stream=ds_model["rivmsk"].values, ) da_subbasins.raster.set_crs(ds_model.raster.crs) da_subbasins.name = "subcatch" ds_out = da_subbasins.to_dataset() # Subcatchment map for river cells only (to be able to save river outputs in wflow) ds_out["subcatch_riv"] = da_subbasins.where( ds_model["rivmsk"] > 0, da_subbasins.raster.nodata ) # Add tributaries if not gdf_tributaries.empty: ds_out["gauges"] = ds_out.raster.rasterize( gdf_tributaries, col_name="index", nodata=0 ) return ds_out