Source code for hydromt_sfincs.workflows.flwdir

"""Flow direction and river network workflows for SFINCS models."""
import logging
from typing import Tuple

import geopandas as gpd
import hydromt
import numpy as np
import pyflwdir
import xarray as xr

logger = logging.getLogger(__name__)

__all__ = [
    "river_source_points",
    "river_centerline_from_hydrography",
]


[docs] def river_centerline_from_hydrography( da_flwdir: xr.DataArray, da_uparea: xr.DataArray, river_upa: float = 10, river_len: float = 1e3, gdf_mask: gpd.GeoDataFrame = None, ) -> gpd.GeoDataFrame: """Returns the centerline of rivers based on a flow direction raster data (`da_flwdir`). Parameters ---------- da_flwdir: xarray.DataArray D8 flow direction raster data. da_uparea: xarray.DataArray, optional River mask raster data. Used to mask da_flwdir, by default None. river_upa : float, optional Minimum upstream area threshold for rivers [km2], by default 10.0 river_len: float, optional Mimimum river length [m] within the model domain to define river cells, by default 1000 m. gdf_mask: geopandas.GeoDataFrame, optional Polygon to clip river center lines before calculating the river length, by default None. Returns ------- gdf_riv: geopandas.GeoDataFrame River line vector data. """ # get river network from hydrography based on upstream area mask riv_mask = da_uparea >= river_upa if not riv_mask.any(): return gpd.GeoDataFrame() flwdir = hydromt.flw.flwdir_from_da(da_flwdir, mask=riv_mask) feats = flwdir.streams(uparea=da_uparea.values) gdf_riv = gpd.GeoDataFrame.from_features(feats, crs=da_flwdir.raster.crs) # clip to mask and remove empty geometries if gdf_mask is not None and isinstance(gdf_mask, gpd.GeoDataFrame): gdf_riv = gdf_riv.to_crs(gdf_mask.crs).clip(gdf_mask.unary_union) gdf_riv = gdf_riv[~gdf_riv.is_empty] # create river network from gdf to get distance from outlet 'rivlen' # length of river segments if gdf_riv.crs.is_geographic: gdf_riv["seglen"] = gdf_riv.to_crs("epsg:3857").geometry.length else: gdf_riv["seglen"] = gdf_riv.geometry.length gdf_riv = gdf_riv[gdf_riv["seglen"] > 0] if gdf_riv.empty or river_len == 0: return gdf_riv # accumulate to get river length from outlet flwdir = pyflwdir.from_dataframe(gdf_riv.set_index("idx"), ds_col="idx_ds") gdf_riv["rivdst"] = flwdir.accuflux(gdf_riv["seglen"].values, direction="down") # get maximum river length from outlet (at headwater segments) for each river segment gdf_riv["rivlen"] = flwdir.fillnodata( np.where(flwdir.n_upstream == 0, gdf_riv["rivdst"], 0), 0 ) # filter river network based on total length gdf_riv = gdf_riv[gdf_riv["rivlen"] >= river_len] return gdf_riv
[docs] def river_source_points( gdf_riv: gpd.GeoDataFrame, gdf_mask: gpd.GeoDataFrame, src_type: str = "inflow", buffer: float = 100, river_upa: float = 10, river_len: float = 1e3, da_uparea: xr.DataArray = None, reverse_river_geom: bool = False, logger: logging.Logger = logger, ) -> Tuple[gpd.GeoDataFrame, gpd.GeoDataFrame]: """Returns the locations where a river flows in (`inflow=True`) or out (`inflow=False`) of the model gdf_mask. Rivers are based on either a river network vector data (`gdf_riv`) or a flow direction raster data (`da_flwdir`). Parameters ---------- gdf_riv: geopandas.GeoDataFrame River network vector data, by default None. Requires 'uparea' and 'rivlen' attributes to check for river length and upstream area thresholds. gdf_mask: geopandas.GeoDataFrame Polygon of model gdf_mask of interest. src_type: ['inflow', 'outflow', 'headwater'], optional Type of river source points to return, by default 'inflow'. If 'inflow', return points where the river flows into the model domain. If 'outflow', return points where the river flows out of the model domain. If 'headwater', return all headwater (including inflow) points within the model domain. buffer: float, optional Buffer around gdf_mask to select river source points, by default 100 m. river_upa : float, optional Minimum upstream area threshold for rivers [km2], by default 10.0 river_len: float, optional Mimimum river length [m] within the model domain to define river cells, by default 1000 m. da_uparea: xarray.DataArray, optional River upstream area raster data, by default None. reverse_river_geom: bool, optional If True, assume that segments in 'rivers' are drawn from downstream to upstream. Only used if 'rivers' is not None, By default False Returns ------- gdf_pnt: geopandas.GeoDataFrame Source points """ # data checks if not ( isinstance(gdf_mask, (gpd.GeoDataFrame, gpd.GeoSeries)) and np.all(np.isin(gdf_mask.geometry.type, ["Polygon", "MultiPolygon"])) ): raise TypeError("gdf_mask must be a GeoDataFrame of Polygons.") if not ( isinstance(gdf_riv, (gpd.GeoDataFrame, gpd.GeoSeries)) and np.all(np.isin(gdf_riv.geometry.type, ["LineString", "MultiLineString"])) ): raise TypeError("gdf_riv must be a GeoDataFrame of LineStrings.") if src_type not in ["inflow", "outflow", "headwater"]: raise ValueError("src_type must be either 'inflow', 'outflow', or 'headwater'.") if gdf_mask.crs.is_geographic: # to pseudo mercator gdf_mask = gdf_mask.to_crs("epsg:3857") # clip river to model gdf_mask gdf_riv = gdf_riv.to_crs(gdf_mask.crs).clip(gdf_mask.unary_union) # filter river network based on uparea and length if "uparea" in gdf_riv.columns: gdf_riv = gdf_riv[gdf_riv["uparea"] >= river_upa] if "rivlen" in gdf_riv.columns: gdf_riv = gdf_riv[gdf_riv["rivlen"] > river_len] if gdf_riv.empty: logger.warning( "No rivers matching the uparea and rivlen thresholds found in gdf_riv." ) return gpd.GeoDataFrame() # get source points 1m before the start/end of the river # a positive dx results in a point near the start of the line (inflow) # a negative dx results in a point near the end of the line (outflow) dx = -1 if reverse_river_geom else 1 gdf_up = gdf_riv.interpolate(dx).to_frame("geometry") gdf_up["riv_idx"] = gdf_riv.index gdf_ds = gdf_riv.interpolate(-dx).to_frame("geometry") gdf_ds["riv_idx"] = gdf_riv.index # get points that do not intersect with up/downstream end of other river segments # use a small buffer of 5m around these points to account for dx and avoid issues with inprecise river geometries if src_type in ["inflow", "headwater"]: pnts_ds = gdf_ds.buffer(5).unary_union gdf_pnt = gdf_up[~gdf_up.intersects(pnts_ds)].reset_index(drop=True) elif src_type == "outflow": pnts_up = gdf_up.buffer(5).unary_union gdf_pnt = gdf_ds[~gdf_ds.intersects(pnts_up)].reset_index(drop=True) # get buffer around gdf_mask, in- and outflow points should be within this buffer if src_type in ["inflow", "outflow"]: bnd = gdf_mask.boundary.buffer(buffer).unary_union gdf_pnt = gdf_pnt[gdf_pnt.intersects(bnd)].reset_index(drop=True) # log numer of source points logger.info(f"Found {gdf_pnt.index.size} {src_type} points.") # add uparea attribute if da_uparea is provided if da_uparea is not None: gdf_pnt["uparea"] = da_uparea.raster.sample(gdf_pnt).values gdf_pnt = gdf_pnt.sort_values("uparea", ascending=False).reset_index(drop=True) return gdf_pnt