Source code for hydromt_sfincs.workflows.flwdir

"""Flow direction and river network workflows for SFINCS models."""
import logging
from typing import Tuple

import geopandas as gpd
import hydromt
import numpy as np
import pyflwdir
import xarray as xr

logger = logging.getLogger(__name__)

__all__ = [
    "river_boundary_points",
    "river_centerline_from_hydrography",
]


[docs]def river_centerline_from_hydrography( da_flwdir: xr.DataArray, da_uparea: xr.DataArray, river_upa: float = 10, river_len: float = 1e3, mask: gpd.GeoDataFrame = None, ) -> gpd.GeoDataFrame: """Returns the centerline of rivers based on a flow direction raster data (`da_flwdir`). Parameters ---------- da_flwdir: xarray.DataArray D8 flow direction raster data. da_uparea: xarray.DataArray, optional River mask raster data. Used to mask da_flwdir, by default None. river_upa : float, optional Minimum upstream area threshold for rivers [km2], by default 10.0 river_len: float, optional Mimimum river length [m] within the model domain to define river cells, by default 1000 m. mask: geopandas.GeoDataFrame, optional Polygon to clip river center lines before calculating the river length, by default None. Returns ------- gdf_riv: geopandas.GeoDataFrame River line vector data. """ # get river network from hydrography based on upstream area mask flwdir = hydromt.flw.flwdir_from_da(da_flwdir, mask=da_uparea >= river_upa) gdf_riv = gpd.GeoDataFrame.from_features(flwdir.streams(), crs=da_flwdir.raster.crs) # clip to mask and remove empty geometries if mask is not None: gdf_riv = gdf_riv.to_crs(mask.crs).clip(mask.unary_union) gdf_riv = gdf_riv[~gdf_riv.is_empty] # create river network from gdf to get distance from outlet 'rivlen' gdf_riv["rivlen"] = gdf_riv.geometry.length flwdir = pyflwdir.from_dataframe(gdf_riv.set_index("idx"), ds_col="idx_ds") gdf_riv["rivlen"] = flwdir.accuflux(gdf_riv["rivlen"].values, direction="down") gdf_riv = gdf_riv[gdf_riv["rivlen"] >= river_len] return gdf_riv
[docs]def river_boundary_points( region: gpd.GeoDataFrame, res: float, gdf_riv: gpd.GeoDataFrame = None, da_flwdir: xr.DataArray = None, da_uparea: xr.DataArray = None, river_upa: float = 10, river_len: float = 1e3, inflow: bool = True, ) -> Tuple[gpd.GeoDataFrame, gpd.GeoDataFrame]: """Returns the locations where a river flows in (`inflow=True`) or out (`inflow=False`) of the model region. Rivers are based on either a river network vector data (`gdf_riv`) or a flow direction raster data (`da_flwdir`). Parameters ---------- region: geopandas.GeoDataFrame Polygon of model region of interest. res: float Model resolution [m]. gdf_riv: geopandas.GeoDataFrame, optional River network vector data, by default None. da_flwdir: xarray.DataArray, optional D8 flow direction raster data, by default None. da_uparea: xarray.DataArray, optional River upstream area raster data, by default None. river_upa : float, optional Minimum upstream area threshold for rivers [km2], by default 10.0 river_len: float, optional Mimimum river length [m] within the model domain to define river cells, by default 1000 m. inflow: bool, optional If True, return inflow otherwise outflow boundary points, by default True. Returns ------- gdf_src, gdf_riv: geopandas.GeoDataFrame In-/outflow points and river line vector data. """ if not isinstance(region, (gpd.GeoDataFrame, gpd.GeoSeries)) and np.all( region.geometry.type == "Polygon" ): raise ValueError("Boundary must be a GeoDataFrame of LineStrings.") if gdf_riv is None and (da_flwdir is None or da_uparea is None): raise ValueError("Either gdf_riv or da_flwdir and da_uparea must be provided.") elif gdf_riv is None: # get river network from hydrography gdf_riv = river_centerline_from_hydrography( da_flwdir, da_uparea, river_upa, river_len, mask=region ) else: # clip river to model region gdf_riv = gdf_riv.to_crs(region.crs).clip(region.unary_union) # filter river network based on uparea and length if "uparea" in gdf_riv.columns: gdf_riv = [gdf_riv["uparea"] >= river_upa] if "rivlen" in gdf_riv.columns: gdf_riv = gdf_riv[gdf_riv["rivlen"] > river_len] dx = res / 5 if inflow else -res / 5 # move point a bit into the model domain gdf_pnt = gdf_riv.interpolate(dx).to_frame("geometry") # keep points on boundary cells bnd = region.boundary.buffer(res).unary_union # NOTE should be single geom gdf_pnt = gdf_pnt[gdf_pnt.within(bnd)].reset_index(drop=True) # add uparea attribute if da_uparea is provided if da_uparea is not None: gdf_pnt["uparea"] = da_uparea.raster.sample(gdf_pnt).values gdf_pnt = gdf_pnt.sort_values("uparea", ascending=False).reset_index(drop=True) if "rivwth" in gdf_riv.columns: gdf_out = hydromt.gis_utils.nearest_merge( gdf_out, gdf_riv, columns=["rivwth"], max_dist=10 ) return gdf_pnt, gdf_riv