Source code for hydromt_sfincs.sfincs

"""
SfincsModel class
"""
from __future__ import annotations

import glob
import logging
import os
from os.path import abspath, basename, dirname, isabs, isfile, join
from pathlib import Path
from typing import Any, Dict, List, Tuple, Union

import geopandas as gpd
import hydromt
import numpy as np
import pandas as pd
import xarray as xr
from hydromt.models.model_grid import GridModel
from hydromt.vector import GeoDataArray, GeoDataset
from hydromt.workflows.forcing import da_to_timedelta
from pyproj import CRS
from shapely.geometry import LineString, box

from . import DATADIR, plots, utils, workflows
from .regulargrid import RegularGrid
from .subgrid import SubgridTableRegular
from .sfincs_input import SfincsInput

__all__ = ["SfincsModel"]

logger = logging.getLogger(__name__)


[docs] class SfincsModel(GridModel): # GLOBAL Static class variables that can be used by all methods within # SfincsModel class. Typically list of variables (e.g. _MAPS) or # dict with varname - filename pairs (e.g. thin_dams : thd) _NAME = "sfincs" _GEOMS = { "observation_points": "obs", "observation_lines": "crs", "weirs": "weir", "thin_dams": "thd", "drainage_structures": "drn", } # parsed to dict of geopandas.GeoDataFrame _FORCING_1D = { # timeseries (can be multiple), locations tuple "waterlevel": (["bzs"], "bnd"), "waves": (["bzi"], "bnd"), "discharge": (["dis"], "src"), "precip": (["precip"], None), "wind": (["wnd"], None), "wavespectra": (["bhs", "btp", "bwd", "bds"], "bwv"), "wavemaker": (["whi", "wti", "wst"], "wvp"), # TODO check names and test } _FORCING_NET = { # 2D forcing sfincs name, rename tuple "waterlevel": ("netbndbzsbzi", {"zs": "bzs", "zi": "bzi"}), "discharge": ("netsrcdis", {"discharge": "dis"}), "precip_2d": ("netampr", {"Precipitation": "precip_2d"}), "press_2d": ("netamp", {"barometric_pressure": "press_2d"}), "wind_2d": ( "netamuamv", {"eastward_wind": "wind_u", "northward_wind": "wind_v"}, ), } _FORCING_SPW = {"spiderweb": "spw"} # TODO add read and write functions _MAPS = ["msk", "dep", "scs", "manning", "qinf", "smax", "seff", "ks", "vol"] _STATES = ["rst", "ini"] _FOLDERS = [] _CLI_ARGS = {"region": "setup_grid_from_region", "res": "setup_grid_from_region"} _CONF = "sfincs.inp" _DATADIR = DATADIR _ATTRS = { "dep": {"standard_name": "elevation", "unit": "m+ref"}, "msk": {"standard_name": "mask", "unit": "-"}, "scs": { "standard_name": "potential maximum soil moisture retention", "unit": "in", }, "qinf": {"standard_name": "infiltration rate", "unit": "mm.hr-1"}, "manning": {"standard_name": "manning roughness", "unit": "s.m-1/3"}, "vol": {"standard_name": "storage volume", "unit": "m3"}, "bzs": {"standard_name": "waterlevel", "unit": "m+ref"}, "bzi": {"standard_name": "wave height", "unit": "m"}, "dis": {"standard_name": "discharge", "unit": "m3.s-1"}, "precip": {"standard_name": "precipitation", "unit": "mm.hr-1"}, "precip_2d": {"standard_name": "precipitation", "unit": "mm.hr-1"}, "press_2d": {"standard_name": "barometric pressure", "unit": "Pa"}, "wind_u": {"standard_name": "eastward wind", "unit": "m/s"}, "wind_v": {"standard_name": "northward wind", "unit": "m/s"}, "wnd": {"standard_name": "wind", "unit": "m/s"}, }
[docs] def __init__( self, root: str = None, mode: str = "w", config_fn: str = "sfincs.inp", write_gis: bool = True, data_libs: Union[List[str], str] = None, logger=logger, ): """ The SFINCS model class (SfincsModel) contains methods to read, write, setup and edit `SFINCS <https://sfincs.readthedocs.io/en/latest/>`_ models. Parameters ---------- root: str, Path, optional Path to model folder mode: {'w', 'r+', 'r'} Open model in write, append or reading mode, by default 'w' config_fn: str, Path, optional Filename of model config file, by default "sfincs.inp" write_gis: bool Write model files additionally to geotiff and geojson, by default True data_libs: List, str List of data catalog yaml files, by default None """ # model folders self._write_gis = write_gis if write_gis and "gis" not in self._FOLDERS: self._FOLDERS.append("gis") super().__init__( root=root, mode=mode, config_fn=config_fn, data_libs=data_libs, logger=logger, ) # placeholder grid classes self.grid_type = None self.reggrid = None self.quadtree = None self.subgrid = xr.Dataset()
@property def mask(self) -> xr.DataArray | None: """Returns model mask""" if self.grid_type == "regular": if "msk" in self.grid: return self.grid["msk"] elif self.reggrid is not None: return self.reggrid.empty_mask @property def region(self) -> gpd.GeoDataFrame: """Returns the geometry of the active model cells.""" # NOTE overwrites property in GridModel region = gpd.GeoDataFrame() if "region" in self.geoms: region = self.geoms["region"] elif "msk" in self.grid and np.any(self.grid["msk"] > 0): da = xr.where(self.mask > 0, 1, 0).astype(np.int16) da.raster.set_nodata(0) region = da.raster.vectorize().dissolve() elif self.reggrid is not None: region = self.reggrid.empty_mask.raster.box return region @property def crs(self) -> CRS | None: """Returns the model crs""" if self.grid_type == "regular": return self.reggrid.crs elif self.grid_type == "quadtree": return self.quadtree.crs def set_crs(self, crs: Any) -> None: """Sets the model crs""" if self.grid_type == "regular": self.reggrid.crs = CRS.from_user_input(crs) self.grid.raster.set_crs(self.reggrid.crs) elif self.grid_type == "quadtree": self.quadtree.crs = CRS.from_user_input(crs)
[docs] def setup_grid( self, x0: float, y0: float, dx: float, dy: float, nmax: int, mmax: int, rotation: float, epsg: int, ): """Setup a regular or quadtree grid. Parameters ---------- x0, y0 : float x,y coordinates of the origin of the grid dx, dy : float grid cell size in x and y direction mmax, nmax : int number of grid cells in x and y direction rotation : float, optional rotation of grid [degree angle], by default None epsg : int, optional epsg-code of the coordinate reference system, by default None """ # TODO gdf_refinement for quadtree self.config.update( x0=x0, y0=y0, dx=dx, dy=dy, nmax=nmax, mmax=mmax, rotation=rotation, epsg=epsg, ) self.update_grid_from_config()
[docs] def setup_grid_from_region( self, region: dict, res: float = 100, crs: Union[str, int] = "utm", rotated: bool = False, hydrography_fn: str = None, basin_index_fn: str = None, align: bool = False, dec_origin: int = 0, dec_rotation: int = 3, ): """Setup a regular or quadtree grid from a region. Parameters ---------- region : dict Dictionary describing region of interest, e.g.: * {'bbox': [xmin, ymin, xmax, ymax]} * {'geom': 'path/to/polygon_geometry'} Note: For the 'bbox' option the coordinates need to be provided in WG84/EPSG:4326. For a complete overview of all region options, see :py:func:`hydromt.workflows.basin_mask.parse_region` res : float, optional grid resolution, by default 100 m crs : Union[str, int], optional coordinate reference system of the grid if "utm" (default) the best UTM zone is selected else a pyproj crs string or epsg code (int) can be provided grid_type : str, optional grid type, "regular" (default) or "quadtree" rotated : bool, optional if True, a minimum rotated rectangular grid is fitted around the region, by default False hydrography_fn : str Name of data source for hydrography data. basin_index_fn : str Name of data source with basin (bounding box) geometries associated with the 'basins' layer of `hydrography_fn`. Only required if the `region` is based on a (sub)(inter)basins without a 'bounds' argument. align : bool, optional If True (default), align target transform to resolution. Note that this has only been implemented for non-rotated grids. dec_origin : int, optional number of decimals to round the origin coordinates, by default 0 dec_rotation : int, optional number of decimals to round the rotation angle, by default 3 See Also -------- hydromt.workflows.basin_mask.parse_region """ # setup `region` of interest of the model. self.setup_region( region=region, hydrography_fn=hydrography_fn, basin_index_fn=basin_index_fn, ) # get pyproj crs of best UTM zone if crs=utm pyproj_crs = hydromt.gis_utils.parse_crs( crs, self.region.to_crs(4326).total_bounds ) if self.geoms["region"].crs != pyproj_crs: self.geoms["region"] = self.geoms["region"].to_crs(pyproj_crs) # update config for geographic coordinates if pyproj_crs.is_geographic: self.set_config("crsgeo", 1) # create grid from region # NOTE keyword rotated is added to still have the possibility to create unrotated grids if needed (e.g. for FEWS?) if rotated: geom = self.geoms["region"].unary_union x0, y0, mmax, nmax, rot = utils.rotated_grid( geom, res, dec_origin=dec_origin, dec_rotation=dec_rotation ) else: x0, y0, x1, y1 = self.geoms["region"].total_bounds if align: x0 = round(x0 / res) * res y0 = round(y0 / res) * res else: x0, y0 = round(x0, dec_origin), round(y0, dec_origin) mmax = int(np.ceil((x1 - x0) / res)) nmax = int(np.ceil((y1 - y0) / res)) rot = 0 self.setup_grid( x0=x0, y0=y0, dx=res, dy=res, nmax=nmax, mmax=mmax, rotation=rot, epsg=pyproj_crs.to_epsg(), )
[docs] def setup_dep( self, datasets_dep: List[dict], buffer_cells: int = 0, # not in list interp_method: str = "linear", # used for buffer cells only ): """Interpolate topobathy (dep) data to the model grid. Adds model grid layers: * **dep**: combined elevation/bathymetry [m+ref] Parameters ---------- datasets_dep : List[dict] List of dictionaries with topobathy data, each containing a dataset name or Path (elevtn) and optional merge arguments e.g.: [{'elevtn': merit_hydro, 'zmin': 0.01}, {'elevtn': gebco, 'offset': 0, 'merge_method': 'first', 'reproj_method': 'bilinear'}] For a complete overview of all merge options, see :py:func:`hydromt.workflows.merge_multi_dataarrays` buffer_cells : int, optional Number of cells between datasets to ensure smooth transition of bed levels, by default 0 interp_method : str, optional Interpolation method used to fill the buffer cells , by default "linear" """ # retrieve model resolution to determine zoom level for xyz-datasets # TODO fix for quadtree if not self.mask.raster.crs.is_geographic: res = np.abs(self.mask.raster.res[0]) else: res = np.abs(self.mask.raster.res[0]) * 111111.0 datasets_dep = self._parse_datasets_dep(datasets_dep, res=res) if self.grid_type == "regular": da_dep = workflows.merge_multi_dataarrays( da_list=datasets_dep, da_like=self.mask, buffer_cells=buffer_cells, interp_method=interp_method, logger=self.logger, ) # check if no nan data is present in the bed levels nmissing = int(np.sum(np.isnan(da_dep.values))) if nmissing > 0: self.logger.warning(f"Interpolate elevation at {nmissing} cells") da_dep = da_dep.raster.interpolate_na( method="rio_idw", extrapolate=True ) self.set_grid(da_dep, name="dep") # FIXME this shouldn't be necessary, since da_dep should already have a crs if self.crs is not None and self.grid.raster.crs is None: self.grid.set_crs(self.crs) if "depfile" not in self.config: self.config.update({"depfile": "sfincs.dep"}) elif self.grid_type == "quadtree": raise NotImplementedError( "Create dep not yet implemented for quadtree grids." )
[docs] def setup_mask_active( self, mask: Union[str, Path, gpd.GeoDataFrame] = None, include_mask: Union[str, Path, gpd.GeoDataFrame] = None, exclude_mask: Union[str, Path, gpd.GeoDataFrame] = None, mask_buffer: int = 0, zmin: float = None, zmax: float = None, fill_area: float = 10.0, drop_area: float = 0.0, connectivity: int = 8, all_touched: bool = True, reset_mask: bool = True, ): """Setup active model cells. The SFINCS model mask defines inactive (msk=0), active (msk=1), and waterlevel boundary (msk=2) and outflow boundary (msk=3) cells. This method sets the active and inactive cells. Active model cells are based on a region and cells with valid elevation (i.e. not nodata), optionally bounded by areas inside the include geomtries, outside the exclude geomtries, larger or equal than a minimum elevation threshhold and smaller or equal than a maximum elevation threshhold. All conditions are combined using a logical AND operation. Sets model layers: * **msk** map: model mask [-] Parameters ---------- mask: str, Path, gpd.GeoDataFrame, optional Path or data source name of polygons to initiliaze active mask with; proceding arguments can be used to include/exclude cells If not given, existing mask (if present) used, else mask is initialized empty. include_mask, exclude_mask: str, Path, gpd.GeoDataFrame, optional Path or data source name of polygons to include/exclude from the active model domain. Note that include (second last) and exclude (last) areas are processed after other critera, i.e. `zmin`, `zmax` and `drop_area`, and thus overrule these criteria for active model cells. mask_buffer: float, optional If larger than zero, extend the `include_mask` geometry with a buffer [m], by default 0. zmin, zmax : float, optional Minimum and maximum elevation thresholds for active model cells. fill_area : float, optional Maximum area [km2] of contiguous cells below `zmin` or above `zmax` but surrounded by cells within the valid elevation range to be kept as active cells, by default 10 km2. drop_area : float, optional Maximum area [km2] of contiguous cells to be set as inactive cells, by default 0 km2. connectivity, {4, 8}: The connectivity used to define contiguous cells, if 4 only horizontal and vertical connections are used, if 8 (default) also diagonal connections. all_touched: bool, optional if True (default) include (or exclude) a cell in the mask if it touches any of the include (or exclude) geometries. If False, include a cell only if its center is within one of the shapes, or if it is selected by Bresenham's line algorithm. reset_mask: bool, optional If True (default), reset existing mask layer. If False updating existing mask. """ # read geometries gdf_mask, gdf_include, gdf_exclude = None, None, None bbox = self.region.to_crs(4326).total_bounds if mask is not None: if not isinstance(mask, gpd.GeoDataFrame) and str(mask).endswith(".pol"): # NOTE polygons should be in same CRS as model gdf_mask = utils.polygon2gdf( feats=utils.read_geoms(fn=mask), crs=self.region.crs ) else: gdf_mask = self.data_catalog.get_geodataframe(mask, bbox=bbox) if mask_buffer > 0: # NOTE assumes model in projected CRS! gdf_mask["geometry"] = gdf_mask.to_crs(self.crs).buffer(mask_buffer) if include_mask is not None: if not isinstance(include_mask, gpd.GeoDataFrame) and str( include_mask ).endswith(".pol"): # NOTE polygons should be in same CRS as model gdf_include = utils.polygon2gdf( feats=utils.read_geoms(fn=include_mask), crs=self.region.crs ) else: gdf_include = self.data_catalog.get_geodataframe( include_mask, bbox=bbox ) if exclude_mask is not None: if not isinstance(exclude_mask, gpd.GeoDataFrame) and str( exclude_mask ).endswith(".pol"): gdf_exclude = utils.polygon2gdf( feats=utils.read_geoms(fn=exclude_mask), crs=self.region.crs ) else: gdf_exclude = self.data_catalog.get_geodataframe( exclude_mask, bbox=bbox ) # get mask if self.grid_type == "regular": da_mask = self.reggrid.create_mask_active( da_mask=self.grid["msk"] if "msk" in self.grid else None, da_dep=self.grid["dep"] if "dep" in self.grid else None, gdf_mask=gdf_mask, gdf_include=gdf_include, gdf_exclude=gdf_exclude, zmin=zmin, zmax=zmax, fill_area=fill_area, drop_area=drop_area, connectivity=connectivity, all_touched=all_touched, reset_mask=reset_mask, logger=self.logger, ) self.set_grid(da_mask, name="msk") # update config if "mskfile" not in self.config: self.config.update({"mskfile": "sfincs.msk"}) if "indexfile" not in self.config: self.config.update({"indexfile": "sfincs.ind"}) # update region if np.any(da_mask >= 1): self.logger.info("Derive region geometry based on active cells.") # make mask with ones and zeros only -> vectorize ones region = da_mask.where(da_mask <= 1, 1).raster.vectorize() if region.empty: raise ValueError("No region found.") self.set_geoms(region, "region") else: self.logger.warning("No active cells found.")
[docs] def setup_mask_bounds( self, btype: str = "waterlevel", include_mask: Union[str, Path, gpd.GeoDataFrame] = None, exclude_mask: Union[str, Path, gpd.GeoDataFrame] = None, include_mask_buffer: int = 0, zmin: float = None, zmax: float = None, connectivity: int = 8, all_touched: bool = False, reset_bounds: bool = False, ): """Set boundary cells in the model mask. The SFINCS model mask defines inactive (msk=0), active (msk=1), and waterlevel boundary (msk=2) and outflow boundary (msk=3) cells. Active cells set using the `setup_mask` method, while this method sets both types of boundary cells, see `btype` argument. Boundary cells at the edge of the active model domain, optionally bounded by areas inside the include geomtries, outside the exclude geomtries, larger or equal than a minimum elevation threshhold and smaller or equal than a maximum elevation threshhold. All conditions are combined using a logical AND operation. Updates model layers: * **msk** map: model mask [-] Parameters ---------- btype: {'waterlevel', 'outflow'} Boundary type include_mask, exclude_mask: str, Path, gpd.GeoDataFrame, optional Path or data source name for geometries with areas to include/exclude from the model boundary. zmin, zmax : float, optional Minimum and maximum elevation thresholds for boundary cells. Note that when include and exclude areas are used, the elevation range is only applied on cells within the include area and outside the exclude area. reset_bounds: bool, optional If True, reset existing boundary cells of the selected boundary type (`btype`) before setting new boundary cells, by default False. all_touched: bool, optional if True (default) include (or exclude) a cell in the mask if it touches any of the include (or exclude) geometries. If False, include a cell only if its center is within one of the shapes, or if it is selected by Bresenham's line algorithm. connectivity, {4, 8}: The connectivity used to detect the model edge, if 4 only horizontal and vertical connections are used, if 8 (default) also diagonal connections. """ # get include / exclude geometries gdf_include, gdf_exclude = None, None bbox = self.mask.raster.transform_bounds(4326) if include_mask is not None: if not isinstance(include_mask, gpd.GeoDataFrame) and str( include_mask ).endswith(".pol"): # NOTE polygons should be in same CRS as model gdf_include = utils.polygon2gdf( feats=utils.read_geoms(fn=include_mask), crs=self.region.crs ) else: gdf_include = self.data_catalog.get_geodataframe( include_mask, bbox=bbox ) if include_mask_buffer > 0: if self.crs.is_geographic: include_mask_buffer = include_mask_buffer / 111111.0 gdf_include["geometry"] = gdf_include.to_crs(self.crs).buffer( include_mask_buffer ) if exclude_mask is not None: if not isinstance(exclude_mask, gpd.GeoDataFrame) and str( exclude_mask ).endswith(".pol"): gdf_exclude = utils.polygon2gdf( feats=utils.read_geoms(fn=exclude_mask), crs=self.region.crs ) else: gdf_exclude = self.data_catalog.get_geodataframe( exclude_mask, bbox=bbox ) # mask values if self.grid_type == "regular": da_mask = self.reggrid.create_mask_bounds( da_mask=self.grid["msk"], btype=btype, gdf_include=gdf_include, gdf_exclude=gdf_exclude, da_dep=self.grid["dep"] if "dep" in self.grid else None, zmin=zmin, zmax=zmax, connectivity=connectivity, all_touched=all_touched, reset_bounds=reset_bounds, logger=self.logger, ) self.set_grid(da_mask, name="msk")
[docs] def setup_subgrid( self, datasets_dep: List[dict], datasets_rgh: List[dict] = [], datasets_riv: List[dict] = [], buffer_cells: int = 0, nlevels: int = 10, nbins: int = None, nr_subgrid_pixels: int = 20, nrmax: int = 2000, # blocksize max_gradient: float = 5.0, z_minimum: float = -99999.0, manning_land: float = 0.04, manning_sea: float = 0.02, rgh_lev_land: float = 0.0, write_dep_tif: bool = False, write_man_tif: bool = False, ): """Setup method for subgrid tables based on a list of elevation and Manning's roughness datasets. These datasets are used to derive relations between the water level and the volume in a cell to do the continuity update, and a representative water depth used to calculate momentum fluxes. This allows that one can compute on a coarser computational grid, while still accounting for the local topography and roughness. Parameters ---------- datasets_dep : List[dict] List of dictionaries with topobathy data. Each should minimally contain a data catalog source name, data file path, or xarray raster object ('elevtn'). Optional merge arguments include: 'zmin', 'zmax', 'mask', 'offset', 'reproj_method', and 'merge_method', see example below. For a complete overview of all merge options, see :py:func:`hydromt.workflows.merge_multi_dataarrays` :: [ {'elevtn': 'merit_hydro', 'zmin': 0.01}, {'elevtn': 'gebco', 'offset': 0, 'merge_method': 'first', reproj_method: 'bilinear'} ] datasets_rgh : List[dict], optional List of dictionaries with Manning's n datasets. Each dictionary should at least contain one of the following: * manning: filename (or Path) of gridded data with manning values * lulc (and reclass_table): a combination of a filename of gridded landuse/landcover and a mapping table. In additon, optional merge arguments can be provided, e.g.: :: [ {'manning': 'manning_data'}, {'lulc': 'esa_worlcover', 'reclass_table': 'esa_worlcover_mapping'} ] datasets_riv : List[dict], optional List of dictionaries with river datasets. Each dictionary should at least contain a river centerline data and optionally a river mask: * centerlines: filename (or Path) of river centerline with attributes rivwth (river width [m]; required if not river mask provided), rivdph or rivbed (river depth [m]; river bedlevel [m+REF]), manning (Manning's n [s/m^(1/3)]; optional) * mask (optional): filename (or Path) of river mask * river attributes (optional): "rivdph", "rivbed", "rivwth", "manning" to fill missing values * arguments to the river burn method (optional): segment_length [m] (default 500m) and riv_bank_q [0-1] (default 0.5) which used to estimate the river bank height in case river depth is provided. For more info see :py:func:`hydromt.workflows.bathymetry.burn_river_rect` :: [{'centerlines': 'river_lines', 'mask': 'river_mask', 'manning': 0.035}] buffer_cells : int, optional Number of cells between datasets to ensure smooth transition of bed levels, by default 0 nbins : int, optional Number of bins in which hypsometry is subdivided, by default 10 Note that this keyword is deprecated and will be removed in future versions. nlevels: int, optional Number of levels to describe hypsometry, by default 10 nr_subgrid_pixels : int, optional Number of subgrid pixels per computational cell, by default 20 nrmax : int, optional Maximum number of cells per subgrid-block, by default 2000 These blocks are used to prevent memory issues while working with large datasets max_gradient : float, optional If slope in hypsometry exceeds this value, then smoothing is applied, to prevent numerical stability problems, by default 5.0 z_minimum : float, optional Minimum depth in the subgrid tables, by default -99999.0 manning_land, manning_sea : float, optional Constant manning roughness values for land and sea, by default 0.04 and 0.02 s.m-1/3 Note that these values are only used when no Manning's n datasets are provided, or to fill the nodata values rgh_lev_land : float, optional Elevation level to distinguish land and sea roughness (when using manning_land and manning_sea), by default 0.0 write_dep_tif, write_man_tif : bool, optional Write geotiff of the merged topobathy / roughness on the subgrid resolution. These files are not used by SFINCS, but can be used for visualisation and downscaling of the floodmaps. Unlinke the SFINCS files it is written to disk at execution of this method. By default False """ # retrieve model resolution # TODO fix for quadtree if not self.mask.raster.crs.is_geographic: res = np.abs(self.mask.raster.res[0]) / nr_subgrid_pixels else: res = np.abs(self.mask.raster.res[0]) * 111111.0 / nr_subgrid_pixels datasets_dep = self._parse_datasets_dep(datasets_dep, res=res) if len(datasets_rgh) > 0: # NOTE conversion from landuse/landcover to manning happens here datasets_rgh = self._parse_datasets_rgh(datasets_rgh) if len(datasets_riv) > 0: datasets_riv = self._parse_datasets_riv(datasets_riv) # folder where high-resolution topobathy and manning geotiffs are stored if write_dep_tif or write_man_tif: highres_dir = os.path.join(self.root, "subgrid") if not os.path.isdir(highres_dir): os.makedirs(highres_dir) else: highres_dir = None if nbins is not None: logger.warning( "Keyword nbins is deprecated and will be removed in future versions. Please use nlevels instead." ) nlevels = nbins if self.grid_type == "regular": self.reggrid.subgrid.build( da_mask=self.mask, datasets_dep=datasets_dep, datasets_rgh=datasets_rgh, datasets_riv=datasets_riv, buffer_cells=buffer_cells, nlevels=nlevels, nr_subgrid_pixels=nr_subgrid_pixels, nrmax=nrmax, max_gradient=max_gradient, z_minimum=z_minimum, manning_land=manning_land, manning_sea=manning_sea, rgh_lev_land=rgh_lev_land, write_dep_tif=write_dep_tif, write_man_tif=write_man_tif, highres_dir=highres_dir, logger=self.logger, ) self.subgrid = self.reggrid.subgrid.to_xarray( dims=self.mask.raster.dims, coords=self.mask.raster.coords ) elif self.grid_type == "quadtree": pass # when building a new subgrid table, always update config # NOTE from now onwards, netcdf subgrid tables are used self.config.update({"sbgfile": "sfincs_subgrid.nc"}) # if "sbgfile" not in self.config: # only add sbgfile if not already present # self.config.update({"sbgfile": "sfincs.sbg"}) # subgrid is used so no depfile or manningfile needed if "depfile" in self.config: self.config.pop("depfile") # remove depfile from config if "manningfile" in self.config: self.config.pop("manningfile") # remove manningfile from config
[docs] def setup_river_inflow( self, rivers: Union[str, Path, gpd.GeoDataFrame] = None, hydrography: Union[str, Path, xr.Dataset] = None, river_upa: float = 10.0, river_len: float = 1e3, river_width: float = 500, merge: bool = False, first_index: int = 1, keep_rivers_geom: bool = False, reverse_river_geom: bool = False, src_type: str = "inflow", ): """Setup discharge (src) points where a river enters the model domain. If `rivers` is not provided, river centerlines are extracted from the `hydrography` dataset based on the `river_upa` threshold. Waterlevel or outflow boundary cells intersecting with the river are removed from the model mask. Discharge is set to zero at these points, but can be updated using the `setup_discharge_forcing` or `setup_discharge_forcing_from_grid` methods. Note: this method assumes the rivers are directed from up- to downstream. Adds model layers: * **dis** forcing: discharge forcing * **mask** map: SFINCS mask layer (only if `river_width` > 0) * **rivers_inflow** geoms: river centerline (if `keep_rivers_geom`; not used by SFINCS) Parameters ---------- rivers : str, Path, gpd.GeoDataFrame, optional Path, data source name, or geopandas object for river centerline data. If present, the 'uparea' and 'rivlen' attributes are used. hydrography: str, Path, xr.Dataset optional Path, data source name, or a xarray raster object for hydrography data. * Required layers: ['uparea', 'flwdir']. river_upa : float, optional Minimum upstream area threshold for rivers [km2], by default 10.0 river_len: float, optional Mimimum river length within the model domain threshhold [m], by default 1 km. river_width: float, optional Estimated constant width [m] of the inflowing river. Boundary cells within half the width are forced to be closed (mask = 1) to avoid instabilities with nearby open or waterlevel boundary cells, by default 500 m. merge: bool, optional If True, merge rivers source points with existing points, by default False. first_index: int, optional First index for the river source points, by default 1. keep_rivers_geom: bool, optional If True, keep a geometry of the rivers "rivers_inflow" in geoms. By default False. reverse_river_geom: bool, optional If True, assume that segments in 'rivers' are drawn from downstream to upstream. Only used if 'rivers' is not None, By default False src_type: {'inflow', 'headwater'}, optional Source type, by default 'inflow' If 'inflow', return points where the river flows into the model domain. If 'headwater', return all headwater (including inflow) points within the model domain. See Also -------- setup_discharge_forcing setup_discharge_forcing_from_grid """ # get hydrography data da_uparea = None if hydrography is not None: ds = self.data_catalog.get_rasterdataset( hydrography, bbox=self.mask.raster.transform_bounds(4326), variables=["uparea", "flwdir"], buffer=5, ) da_uparea = ds["uparea"] # reused in river_source_points # get river centerlines if ( isinstance(rivers, str) and rivers == "rivers_outflow" and rivers in self.geoms ): # reuse rivers from setup_river_in/outflow gdf_riv = self.geoms[rivers] elif rivers is not None: gdf_riv = self.data_catalog.get_geodataframe( rivers, geom=self.region ).to_crs(self.crs) elif hydrography is not None: gdf_riv = workflows.river_centerline_from_hydrography( da_flwdir=ds["flwdir"], da_uparea=da_uparea, river_upa=river_upa, river_len=river_len, gdf_mask=self.region, ) elif hydrography is None: raise ValueError("Either hydrography or rivers must be provided.") # estimate buffer based on model resolution buffer = self.reggrid.dx if self.crs.is_geographic: buffer = buffer * 111111.0 # get river inflow / headwater source points gdf_src = workflows.river_source_points( gdf_riv=gdf_riv, gdf_mask=self.region, src_type=src_type, buffer=buffer, river_upa=river_upa, river_len=river_len, da_uparea=da_uparea, reverse_river_geom=reverse_river_geom, logger=self.logger, ) if gdf_src.empty: return # set forcing src pnts gdf_src.index = gdf_src.index + first_index self.set_forcing_1d(gdf_locs=gdf_src.copy(), name="dis", merge=merge) # set river if keep_rivers_geom: self.set_geoms(gdf_riv, name="rivers_inflow") # update mask if river_width > 0 if "rivwth" in gdf_src.columns: river_width = gdf_src["rivwth"].fillna(river_width) if np.any(river_width > 0) and np.any(self.mask > 1): # apply buffer gdf_src["geometry"] = gdf_src.buffer(river_width / 2) # find intersect of buffer and model grid tmp_msk = self.reggrid.create_mask_bounds( xr.where(self.mask > 0, 1, 0).astype(np.uint8), gdf_include=gdf_src ) reset_msk = np.logical_and(tmp_msk > 1, self.mask > 1) # update model mask n = int(np.sum(reset_msk)) if n > 0: da_mask = self.mask.where(~reset_msk, np.uint8(1)) self.set_grid(da_mask, "msk") self.logger.info(f"Boundary cells (n={n}) updated around src points.")
[docs] def setup_river_outflow( self, rivers: Union[str, Path, gpd.GeoDataFrame] = None, hydrography: Union[str, Path, xr.Dataset] = None, river_upa: float = 10.0, river_len: float = 1e3, river_width: float = 500, keep_rivers_geom: bool = False, reset_bounds: bool = False, btype: str = "outflow", reverse_river_geom: bool = False, ): """Setup open boundary cells (mask=3) where a river flows out of the model domain. If `rivers` is not provided, river centerlines are extracted from the `hydrography` dataset based on the `river_upa` threshold. River outflows that intersect with discharge source point or waterlevel boundary cells are omitted. Note: this method assumes the rivers are directed from up- to downstream. Adds / edits model layers: * **msk** map: edited by adding outflow points (msk=3) * **rivers_outflow** geoms: river centerline (if `keep_rivers_geom`; not used by SFINCS) Parameters ---------- rivers : str, Path, gpd.GeoDataFrame, optional Path, data source name, or geopandas object for river centerline data. If present, the 'uparea' and 'rivlen' attributes are used. hydrography: str, Path, xr.Dataset optional Path, data source name, or a xarray raster object for hydrography data. * Required layers: ['uparea', 'flwdir']. river_upa : float, optional Minimum upstream area threshold for rivers [km2], by default 10.0 river_len: float, optional Mimimum river length within the model domain threshhold [m], by default 1000 m. river_width: int, optional The width [m] of the open boundary cells in the SFINCS msk file. By default 500m, i.e.: 250m to each side of the outflow location. append_bounds: bool, optional If True, write new outflow boundary cells on top of existing. If False (default), first reset existing outflow boundary cells to normal active cells. keep_rivers_geom: bool, optional If True, keep a geometry of the rivers "rivers_outflow" in geoms. By default False. reset_bounds: bool, optional If True, reset existing outlfow boundary cells before setting new boundary cells, by default False. btype: {'waterlevel', 'outflow'} Boundary type reverse_river_geom: bool, optional If True, assume that segments in 'rivers' are drawn from downstream to upstream. Only used if rivers is not None, By default False See Also -------- setup_mask_bounds """ # get hydrography data da_uparea = None if hydrography is not None: ds = self.data_catalog.get_rasterdataset( hydrography, bbox=self.mask.raster.transform_bounds(4326), variables=["uparea", "flwdir"], buffer=5, ) da_uparea = ds["uparea"] # reused in river_source_points # get river centerlines if ( isinstance(rivers, str) and rivers == "rivers_inflow" and rivers in self.geoms ): # reuse rivers from setup_river_in/outflow gdf_riv = self.geoms[rivers] elif rivers is not None: gdf_riv = self.data_catalog.get_geodataframe( rivers, geom=self.region ).to_crs(self.crs) elif hydrography is not None: gdf_riv = workflows.river_centerline_from_hydrography( da_flwdir=ds["flwdir"], da_uparea=da_uparea, river_upa=river_upa, river_len=river_len, gdf_mask=self.region, ) else: raise ValueError("Either hydrography or rivers must be provided.") # estimate buffer based on model resolution buffer = self.reggrid.dx if self.crs.is_geographic: buffer = buffer * 111111.0 # get river inflow / headwater source points gdf_out = workflows.river_source_points( gdf_riv=gdf_riv, gdf_mask=self.region, src_type="outflow", buffer=buffer, river_upa=river_upa, river_len=river_len, da_uparea=da_uparea, reverse_river_geom=reverse_river_geom, logger=self.logger, ) if gdf_out.empty: return if len(gdf_out) > 0: if "rivwth" in gdf_out.columns: river_width = gdf_out["rivwth"].fillna(river_width) gdf_out["geometry"] = gdf_out.buffer(river_width / 2) # remove points near waterlevel boundary cells if np.any(self.mask == 2) and btype == "outflow": gdf_msk2 = utils.get_bounds_vector(self.mask) # NOTE: this should be a single geom geom = gdf_msk2[gdf_msk2["value"] == 2].unary_union gdf_out = gdf_out[~gdf_out.intersects(geom)] # remove outflow points near source points if "dis" in self.forcing and len(gdf_out) > 0: geom = self.forcing["dis"].vector.to_gdf().unary_union gdf_out = gdf_out[~gdf_out.intersects(geom)] # update mask n = len(gdf_out.index) self.logger.info(f"Found {n} valid river outflow points.") if n > 0: self.setup_mask_bounds( btype=btype, include_mask=gdf_out, reset_bounds=reset_bounds ) elif reset_bounds: self.setup_mask_bounds(btype=btype, reset_bounds=reset_bounds) # keep river centerlines if keep_rivers_geom and len(gdf_riv) > 0: self.set_geoms(gdf_riv, name="rivers_outflow")
# Function to create constant spatially varying infiltration
[docs] def setup_constant_infiltration( self, qinf=None, lulc=None, reclass_table=None, reproj_method="average", ): """Setup spatially varying constant infiltration rate (qinffile). Adds model layers: * **qinf** map: constant infiltration rate [mm/hr] Parameters ---------- qinf : str, Path, or RasterDataset Spatially varying infiltration rates [mm/hr] lulc: str, Path, or RasterDataset Landuse/landcover data set reclass_table: str, Path, or pd.DataFrame Reclassification table to convert landuse/landcover to infiltration rates [mm/hr] reproj_method : str, optional Resampling method for reprojecting the infiltration data to the model grid. By default 'average'. For more information see, :py:meth:`hydromt.raster.RasterDataArray.reproject_like` """ # get infiltration data if qinf is not None: da_inf = self.data_catalog.get_rasterdataset( qinf, bbox=self.mask.raster.transform_bounds(4326), buffer=10, ) elif lulc is not None: # landuse/landcover should always be combined with mapping if reclass_table is None: raise IOError( f"Infiltration mapping file should be provided for {lulc}" ) da_lulc = self.data_catalog.get_rasterdataset( lulc, bbox=self.mask.raster.transform_bounds(4326), buffer=10, variables=["lulc"], ) df_map = self.data_catalog.get_dataframe( reclass_table, variables=["qinf"], index_col=0, # driver kwargs ) # reclassify da_inf = da_lulc.raster.reclassify(df_map)["qinf"] else: raise ValueError( "Either qinf or lulc must be provided when setting up constant infiltration." ) # reproject infiltration data to model grid da_inf = da_inf.raster.mask_nodata() # set nodata to nan da_inf = da_inf.raster.reproject_like(self.mask, method=reproj_method) # check on nan values if np.logical_and(np.isnan(da_inf), self.mask >= 1).any(): self.logger.warning("NaN values found in infiltration data; filled with 0") da_inf = da_inf.fillna(0) da_inf.raster.set_nodata(-9999.0) # set grid mname = "qinf" da_inf.attrs.update(**self._ATTRS.get(mname, {})) self.set_grid(da_inf, name=mname) # update config: remove default inf and set qinf map self.set_config(f"{mname}file", f"sfincs.{mname}") self.config.pop("qinf", None)
# Function to create curve number for SFINCS
[docs] def setup_cn_infiltration(self, cn, antecedent_moisture="avg", reproj_method="med"): """Setup model potential maximum soil moisture retention map (scsfile) from gridded curve number map. Adds model layers: * **scs** map: potential maximum soil moisture retention [inch] Parameters --------- cn: str, Path, or RasterDataset Name of gridded curve number map. * Required layers without antecedent runoff conditions: ['cn'] * Required layers with antecedent runoff conditions: ['cn_dry', 'cn_avg', 'cn_wet'] antecedent_moisture: {'dry', 'avg', 'wet'}, optional Antecedent runoff conditions. None if data has no antecedent runoff conditions. By default `avg` reproj_method : str, optional Resampling method for reprojecting the curve number data to the model grid. By default 'med'. For more information see, :py:meth:`hydromt.raster.RasterDataArray.reproject_like` """ # get data da_org = self.data_catalog.get_rasterdataset( cn, bbox=self.mask.raster.transform_bounds(4326), buffer=10 ) # read variable v = "cn" if antecedent_moisture: v = f"cn_{antecedent_moisture}" if isinstance(da_org, xr.Dataset) and v in da_org.data_vars: da_org = da_org[v] elif not isinstance(da_org, xr.DataArray): raise ValueError(f"Could not find variable {v} in {cn}") # reproject using median da_cn = da_org.raster.reproject_like(self.grid, method=reproj_method) # convert to potential maximum soil moisture retention S (1000/CN - 10) [inch] da_scs = workflows.cn_to_s(da_cn, self.mask > 0).round(3) # set grid mname = "scs" da_scs.attrs.update(**self._ATTRS.get(mname, {})) self.set_grid(da_scs, name=mname) # update config: remove default infiltration values and set scs map self.config.pop("qinf", None) self.set_config(f"{mname}file", f"sfincs.{mname}")
# Function to create curve number for SFINCS including recovery via saturated hydraulic conductivity [mm/hr]
[docs] def setup_cn_infiltration_with_ks( self, lulc, hsg, ksat, reclass_table, effective, block_size=2000 ): """Setup model the Soil Conservation Service (SCS) Curve Number (CN) files for SFINCS including recovery term based on the soil saturation Parameters --------- lulc : str, Path, or RasterDataset Landuse/landcover data set hsg : str, Path, or RasterDataset HSG (Hydrological Similarity Group) in integers ksat : str, Path, or RasterDataset Ksat (saturated hydraulic conductivity) [mm/hr] reclass_table : str, Path, or RasterDataset reclass table to relate landcover with soiltype effective : float estimate of percentage effective soil, e.g. 0.50 for 50% block_size : float maximum block size - use larger values will get more data in memory but can be faster, default=2000 """ # Read the datafiles da_landuse = self.data_catalog.get_rasterdataset( lulc, bbox=self.mask.raster.transform_bounds(4326), buffer=10 ) da_HSG = self.data_catalog.get_rasterdataset( hsg, bbox=self.mask.raster.transform_bounds(4326), buffer=10 ) da_Ksat = self.data_catalog.get_rasterdataset( ksat, bbox=self.mask.raster.transform_bounds(4326), buffer=10 ) df_map = self.data_catalog.get_dataframe(reclass_table, index_col=0) # Define outputs da_smax = xr.full_like(self.mask, -9999, dtype=np.float32) da_ks = xr.full_like(self.mask, -9999, dtype=np.float32) # Compute resolution land use (we are assuming that is the finest) resolution_landuse = np.mean( [abs(da_landuse.raster.res[0]), abs(da_landuse.raster.res[1])] ) if da_landuse.raster.crs.is_geographic: resolution_landuse = ( resolution_landuse * 111111.0 ) # assume 1 degree is 111km # Define the blocks nrmax = block_size nmax = np.shape(self.mask)[0] mmax = np.shape(self.mask)[1] refi = self.config["dx"] / resolution_landuse # finest resolution of landuse nrcb = int(np.floor(nrmax / refi)) # nr of regular cells in a block nrbn = int(np.ceil(nmax / nrcb)) # nr of blocks in n direction nrbm = int(np.ceil(mmax / nrcb)) # nr of blocks in m direction x_dim, y_dim = self.mask.raster.x_dim, self.mask.raster.y_dim # avoid blocks with width or height of 1 merge_last_col = False merge_last_row = False if mmax % nrcb == 1: nrbm -= 1 merge_last_col = True if nmax % nrcb == 1: nrbn -= 1 merge_last_row = True ## Loop through blocks ib = -1 for ii in range(nrbm): bm0 = ii * nrcb # Index of first m in block bm1 = min(bm0 + nrcb, mmax) # last m in block if merge_last_col and ii == (nrbm - 1): bm1 += 1 for jj in range(nrbn): bn0 = jj * nrcb # Index of first n in block bn1 = min(bn0 + nrcb, nmax) # last n in block if merge_last_row and jj == (nrbn - 1): bn1 += 1 # Count ib += 1 self.logger.debug( f"\nblock {ib + 1}/{nrbn * nrbm} -- " f"col {bm0}:{bm1-1} | row {bn0}:{bn1-1}" ) # calculate transform and shape of block at cell and subgrid level da_mask_block = self.mask.isel( {x_dim: slice(bm0, bm1), y_dim: slice(bn0, bn1)} ).load() # Call workflow ( da_smax_block, da_ks_block, ) = workflows.curvenumber.scs_recovery_determination( da_landuse, da_HSG, da_Ksat, df_map, da_mask_block ) # New place in the overall matrix sn, sm = slice(bn0, bn1), slice(bm0, bm1) da_smax[sn, sm] = da_smax_block da_ks[sn, sm] = da_ks_block # Done self.logger.info(f"Done with determination of values (in blocks).") # Specify the effective soil retention (seff) da_seff = da_smax da_seff = da_seff * effective da_seff.raster.set_nodata(da_smax.raster.nodata) # set grids for seff, smax and ks (saturated hydraulic conductivity) names = ["smax", "seff", "ks"] data = [da_smax, da_seff, da_ks] for name, da in zip(names, data): # Give metadata to the layer and set grid da.attrs.update(**self._ATTRS.get(name, {})) self.set_grid(da, name=name) # update config: set maps self.set_config(f"{name}file", f"sfincs.{name}") # give it to SFINCS # Remove qinf variable in sfincs self.config.pop("qinf", None)
# Roughness
[docs] def setup_manning_roughness( self, datasets_rgh: List[dict] = [], manning_land=0.04, manning_sea=0.02, rgh_lev_land=0, ): """Setup model manning roughness map (manningfile) from gridded manning data or a combinataion of gridded land-use/land-cover map and manning roughness mapping table. Adds model layers: * **man** map: manning roughness coefficient [s.m-1/3] Parameters --------- datasets_rgh : List[dict], optional List of dictionaries with Manning's n datasets. Each dictionary should at least contain one of the following: * (1) manning: filename (or Path) of gridded data with manning values * (2) lulc (and reclass_table) :a combination of a filename of gridded landuse/landcover and a mapping table. In additon, optional merge arguments can be provided e.g.: merge_method, gdf_valid_fn manning_land, manning_sea : float, optional Constant manning roughness values for land and sea, by default 0.04 and 0.02 s.m-1/3 Note that these values are only used when no Manning's n datasets are provided, or to fill the nodata values rgh_lev_land : float, optional Elevation level to distinguish land and sea roughness (when using manning_land and manning_sea), by default 0.0 """ if len(datasets_rgh) > 0: datasets_rgh = self._parse_datasets_rgh(datasets_rgh) else: datasets_rgh = [] # fromdep keeps track of whether any manning values should be based on the depth or not fromdep = len(datasets_rgh) == 0 if self.grid_type == "regular": if len(datasets_rgh) > 0: da_man = workflows.merge_multi_dataarrays( da_list=datasets_rgh, da_like=self.mask, interp_method="linear", logger=self.logger, ) fromdep = np.isnan(da_man).where(self.mask > 0, False).any() if "dep" in self.grid and fromdep: da_man0 = xr.where( self.grid["dep"] >= rgh_lev_land, manning_land, manning_sea ) elif fromdep: da_man0 = xr.full_like(self.mask, manning_land, dtype=np.float32) if len(datasets_rgh) > 0 and fromdep: self.logger.warning("nan values in manning roughness array") da_man = da_man.where(~np.isnan(da_man), da_man0) elif fromdep: da_man = da_man0 da_man.raster.set_nodata(-9999.0) # set grid mname = "manning" da_man.attrs.update(**self._ATTRS.get(mname, {})) self.set_grid(da_man, name=mname) # update config: remove default manning values and set maning map for v in ["manning_land", "manning_sea", "rgh_lev_land"]: self.config.pop(v, None) self.set_config(f"{mname}file", f"sfincs.{mname[:3]}")
[docs] def setup_observation_points( self, locations: Union[str, Path, gpd.GeoDataFrame], merge: bool = True, **kwargs, ): """Setup model observation point locations. Adds model layers: * **obs** geom: observation point locations Parameters --------- locations: str, Path, gpd.GeoDataFrame, optional Path, data source name, or geopandas object for observation point locations. merge: bool, optional If True, merge the new observation points with the existing ones. By default True. """ name = self._GEOMS["observation_points"] # FIXME ensure the catalog is loaded before adding any new entries self.data_catalog.sources gdf_obs = self.data_catalog.get_geodataframe( locations, geom=self.region, assert_gtype="Point", **kwargs ).to_crs(self.crs) if not gdf_obs.geometry.type.isin(["Point"]).all(): raise ValueError("Observation points must be of type Point.") if merge and name in self.geoms: gdf0 = self._geoms.pop(name) gdf_obs = gpd.GeoDataFrame(pd.concat([gdf_obs, gdf0], ignore_index=True)) self.logger.info(f"Adding new observation points to existing ones.") self.set_geoms(gdf_obs, name) self.set_config(f"{name}file", f"sfincs.{name}")
[docs] def setup_observation_lines( self, locations: Union[str, Path, gpd.GeoDataFrame], merge: bool = True, **kwargs, ): """Setup model observation lines (cross-sections) to monitor discharges. Adds model layers: * **crs** geom: observation lines (cross-sections) Parameters --------- locations: str, Path, gpd.GeoDataFrame, optional Path, data source name, or geopandas object for observation lines (cross-sections). merge: bool, optional If True, merge the new observation lines with the existing ones. By default True. """ name = self._GEOMS["observation_lines"] # FIXME ensure the catalog is loaded before adding any new entries self.data_catalog.sources # FIXME assert_gtype="LineString" does not work for MultiLineString and default seems to be Point (??) gdf_obs = self.data_catalog.get_geodataframe( locations, geom=self.region, assert_gtype=None, **kwargs ).to_crs(self.crs) # make sure MultiLineString are converted to LineString gdf_obs = gdf_obs.explode(index_parts=True).reset_index(drop=True) if not gdf_obs.geometry.type.isin(["LineString"]).all(): raise ValueError("Observation lines must be of type LineString.") if merge and name in self.geoms: gdf0 = self._geoms.pop(name) gdf_obs = gpd.GeoDataFrame(pd.concat([gdf_obs, gdf0], ignore_index=True)) self.logger.info(f"Adding new observation lines to existing ones.") self.set_geoms(gdf_obs, name) self.set_config(f"{name}file", f"sfincs.{name}")
[docs] def setup_structures( self, structures: Union[str, Path, gpd.GeoDataFrame], stype: str, dep: Union[str, Path, xr.DataArray] = None, buffer: float = None, dz: float = None, merge: bool = True, **kwargs, ): """Setup thin dam or weir structures. Adds model layer (depending on `stype`): * **thd** geom: thin dam * **weir** geom: weir / levee Parameters ---------- structures : str, Path Path, data source name, or geopandas object to structure line geometry file. The "name" (for thd and weir), "z" and "par1" (for weir only) variables are optional. For weirs: `dz` must be provided if gdf has no "z" column or ZLineString; "par1" defaults to 0.6 if gdf has no "par1" column. stype : {'thd', 'weir'} Structure type. dep : str, Path, xr.DataArray, optional Path, data source name, or xarray raster object ('elevtn') describing the depth in an alternative resolution which is used for sampling the weir. buffer : float, optional If provided, describes the distance from the centerline to the foot of the structure. This distance is supplied to the raster.sample as the window (wdw). merge : bool, optional If True, merge with existing'stype' structures, by default True. dz: float, optional If provided, for weir structures the z value is calculated from the model elevation (dep) plus dz. """ # read, clip and reproject gdf_structures = self.data_catalog.get_geodataframe( structures, geom=self.region, **kwargs ).to_crs(self.crs) cols = { "thd": ["name", "geometry"], "weir": ["name", "z", "par1", "geometry"], } assert stype in cols, f"stype must be one of {list(cols.keys())}" gdf = gdf_structures[ [c for c in cols[stype] if c in gdf_structures.columns] ] # keep relevant cols structs = utils.gdf2linestring(gdf) # check if it parsed correct # sample zb values from dep file and set z = zb + dz if stype == "weir" and (dep is not None or dz is not None): if dep is None or dep == "dep": assert "dep" in self.grid, "dep layer not found" elv = self.grid["dep"] else: elv = self.data_catalog.get_rasterdataset( dep, geom=self.region, buffer=5, variables=["elevtn"] ) # calculate window size from buffer if buffer is not None: res = abs(elv.raster.res[0]) if elv.raster.crs.is_geographic: res = res * 111111.0 window_size = int(np.ceil(buffer / res)) else: window_size = 0 self.logger.debug(f"Sampling elevation with window size {window_size}") structs_out = [] for s in structs: pnts = gpd.points_from_xy(x=s["x"], y=s["y"]) zb = elv.raster.sample( gpd.GeoDataFrame(geometry=pnts, crs=self.crs), wdw=window_size ) if zb.ndim > 1: zb = zb.max(axis=1) s["z"] = zb.values if dz is not None: s["z"] += float(dz) structs_out.append(s) gdf = utils.linestring2gdf(structs_out, crs=self.crs) # Else function if you define elevation of weir elif stype == "weir" and np.any(["z" not in s for s in structs]): raise ValueError("Weir structure requires z values.") # combine with existing structures if present if merge and stype in self.geoms: gdf0 = self._geoms.pop(stype) gdf = gpd.GeoDataFrame(pd.concat([gdf, gdf0], ignore_index=True)) self.logger.info(f"Adding {stype} structures to existing structures.") # set structures self.set_geoms(gdf, stype) self.set_config(f"{stype}file", f"sfincs.{stype}")
[docs] def setup_drainage_structures( self, structures: Union[str, Path, gpd.GeoDataFrame], stype: str = "pump", discharge: float = 0.0, merge: bool = True, **kwargs, ): """Setup drainage structures. Adds model layer: * **drn** geom: drainage pump or culvert Parameters ---------- structures : str, Path Path, data source name, or geopandas object to structure line geometry file. The line should consist of only 2 points (else first and last points are used), ordered from up to downstream. The "type" (1 for pump and 2 for culvert), "par1" ("discharge" also accepted) variables are optional. If "type" or "par1" are not provided, they are based on stype or discharge arguments. stype : {'pump', 'culvert'}, optional Structure type, by default "pump". stype is converted to integer "type" to match with SFINCS expectations. discharge : float, optional Discharge of the structure, by default 0.0. For culverts, this is the maximum discharge, since actual discharge depends on waterlevel gradient merge : bool, optional If True, merge with existing drainage structures, by default True. """ stype = stype.lower() svalues = {"pump": 1, "culvert": 2} if stype not in svalues: raise ValueError('stype must be one of "pump", "culvert"') svalue = svalues[stype] # read, clip and reproject gdf_structures = self.data_catalog.get_geodataframe( structures, geom=self.region, **kwargs ).to_crs(self.crs) # check if type (int) is present in gdf, else overwrite from args # TODO also add check if type is interger? if "type" not in gdf_structures: gdf_structures["type"] = svalue # if discharge is provided, rename to par1 if "discharge" in gdf_structures: gdf_structures = gdf_structures.rename(columns={"discharge": "par1"}) # add par1, par2, par3, par4, par5 if not present # NOTE only par1 is used in the model if "par1" not in gdf_structures: gdf_structures["par1"] = discharge if "par2" not in gdf_structures: gdf_structures["par2"] = 0 if "par3" not in gdf_structures: gdf_structures["par3"] = 0 if "par4" not in gdf_structures: gdf_structures["par4"] = 0 if "par5" not in gdf_structures: gdf_structures["par5"] = 0 # multi to single lines lines = gdf_structures.explode(column="geometry").reset_index(drop=True) # get start [0] and end [1] points endpoints = lines.boundary.explode(index_parts=True).unstack() # merge start and end points into a single linestring gdf_structures["geometry"] = endpoints.apply( lambda x: LineString(x.values.tolist()), axis=1 ) # combine with existing structures if present if merge and "drn" in self.geoms: gdf0 = self._geoms.pop("drn") gdf_structures = gpd.GeoDataFrame( pd.concat([gdf_structures, gdf0], ignore_index=True) ) self.logger.info(f"Adding {stype} structures to existing structures.") # set structures self.set_geoms(gdf_structures, "drn") self.set_config("drnfile", "sfincs.drn")
[docs] def setup_storage_volume( self, storage_locs: Union[str, Path, gpd.GeoDataFrame], volume: Union[float, List[float]] = None, height: Union[float, List[float]] = None, merge: bool = True, ): """Setup storage volume. Adds model layer: * **vol** map: storage volume for green infrastructure Parameters ---------- storage_locs : str, Path Path, data source name, or geopandas object to storage location polygon or point geometry file. Optional "volume" or "height" attributes can be provided to set the storage volume. volume : float, optional Storage volume [m3], by default None height : float, optional Storage height [m], by default None merge : bool, optional If True, merge with existing storage volumes, by default True. """ # read, clip and reproject gdf = self.data_catalog.get_geodataframe( storage_locs, geom=self.region, buffer=10, ).to_crs(self.crs) if self.grid_type == "regular": # if merge, add new storage volumes to existing ones if merge and "vol" in self.grid: da_vol = self.grid["vol"] else: da_vol = xr.full_like(self.mask, 0, dtype=np.float64) # add storage volumes form gdf to da_vol da_vol = workflows.add_storage_volume( da_vol, gdf, volume=volume, height=height, logger=self.logger, ) # set grid mname = "vol" da_vol.attrs.update(**self._ATTRS.get(mname, {})) self.set_grid(da_vol, name=mname) # update config self.set_config(f"{mname}file", f"sfincs.{mname[:3]}")
### FORCING
[docs] def set_forcing_1d( self, df_ts: pd.DataFrame = None, gdf_locs: gpd.GeoDataFrame = None, name: str = "bzs", merge: bool = True, ): """Set 1D forcing time series for 'bzs' or 'dis' boundary conditions. 1D forcing exists of point location `gdf_locs` and associated timeseries `df_ts`. If `gdf_locs` is None, the currently set locations are used. If merge is True, time series in `df_ts` with the same index will overwrite existing data. Time series with new indices are added to the existing forcing. In case the forcing time series have a numeric index, the index is converted to a datetime index assuming the index is in seconds since `tref`. Parameters ---------- df_ts : pd.DataFrame, optional 1D forcing time series data. If None, dummy forcing data is added. gdf_locs : gpd.GeoDataFrame, optional Location of waterlevel boundary points. If None, the currently set locations are used. name : str, optional Name of the waterlevel boundary time series file, by default 'bzs'. merge : bool, optional If True, merge with existing forcing data, by default True. """ # check dtypes if gdf_locs is not None: if not isinstance(gdf_locs, gpd.GeoDataFrame): raise ValueError("gdf_locs must be a gpd.GeoDataFrame") if not gdf_locs.index.is_integer() and gdf_locs.index.is_unique: raise ValueError("gdf_locs index must be unique integer values") if not gdf_locs.geometry.type.isin(["Point"]).all(): raise ValueError("gdf_locs geometry must be Point") if gdf_locs.crs != self.crs: gdf_locs = gdf_locs.to_crs(self.crs) elif name in self.forcing: gdf_locs = self.forcing[name].vector.to_gdf() if df_ts is not None: if not isinstance(df_ts, pd.DataFrame): raise ValueError("df_ts must be a pd.DataFrame") if not df_ts.columns.is_integer() and df_ts.columns.is_unique: raise ValueError("df_ts column names must be unique integer values") # parse datetime index if df_ts is not None and df_ts.index.is_numeric(): if "tref" not in self.config: raise ValueError( "tref must be set in config to convert numeric index to datetime index" ) tref = utils.parse_datetime(self.config["tref"]) df_ts.index = tref + pd.to_timedelta(df_ts.index, unit="sec") # parse location index if ( gdf_locs is not None and df_ts is not None and gdf_locs.index.size == df_ts.columns.size and not set(gdf_locs.index) == set(df_ts.columns) ): # loop over integer columns and find matching index for col in gdf_locs.select_dtypes(include=np.integer).columns: if set(gdf_locs[col]) == set(df_ts.columns): gdf_locs = gdf_locs.set_index(col) self.logger.info(f"Setting gdf_locs index to {col}") break if not set(gdf_locs.index) == set(df_ts.columns): gdf_locs = gdf_locs.set_index(df_ts.columns) self.logger.info( f"No matching index column found in gdf_locs; assuming the order is correct" ) # merge with existing data if name in self.forcing and merge: # read existing data da = self.forcing[name] gdf0 = da.vector.to_gdf() df0 = da.transpose(..., da.vector.index_dim).to_pandas() if set(gdf0.index) != set(gdf_locs.index): # merge locations; overwrite existing locations with the same name gdf0 = gdf0.drop(gdf_locs.index, errors="ignore") gdf_locs = pd.concat([gdf0, gdf_locs], axis=0).sort_index() # gdf_locs = gpd.GeoDataFrame(gdf_locs, crs=gdf0.crs) df0 = df0.reindex(gdf_locs.index, axis=1, fill_value=0) if df_ts is None: df_ts = df0 elif set(df0.columns) != set(df_ts.columns): # merge timeseries; overwrite existing timeseries with the same name df0 = df0.drop(columns=df_ts.columns, errors="ignore") df_ts = pd.concat([df0, df_ts], axis=1).sort_index() # use linear interpolation and backfill to fill in missing values df_ts = df_ts.sort_index() df_ts = df_ts.interpolate(method="linear").bfill().fillna(0) # location data is required if gdf_locs is None: raise ValueError( f"gdf_locs must be provided if not merged with existing {name} forcing data" ) # fill in missing timeseries if df_ts is None: df_ts = pd.DataFrame( index=pd.date_range(*self.get_model_time(), periods=2), data=0, columns=gdf_locs.index, ) # set forcing with consistent names if not set(gdf_locs.index) == set(df_ts.columns): raise ValueError("The gdf_locs index and df_ts columns must be the same") gdf_locs.index.name = "index" df_ts.columns.name = "index" df_ts.index.name = "time" da = GeoDataArray.from_gdf(gdf_locs.to_crs(self.crs), data=df_ts, name=name) self.set_forcing(da.transpose("time", "index"))
[docs] def setup_waterlevel_forcing( self, geodataset: Union[str, Path, xr.Dataset] = None, timeseries: Union[str, Path, pd.DataFrame] = None, locations: Union[str, Path, gpd.GeoDataFrame] = None, offset: Union[str, Path, xr.Dataset] = None, buffer: float = 5e3, merge: bool = True, ): """Setup waterlevel forcing. Waterlevel boundary conditions are read from a `geodataset` (geospatial point timeseries) or a tabular `timeseries` dataframe. At least one of these must be provided. The tabular timeseries data is combined with `locations` if provided, or with existing 'bnd' locations if previously set. Adds model forcing layers: * **bzs** forcing: waterlevel time series [m+ref] Parameters ---------- geodataset: str, Path, xr.Dataset, optional Path, data source name, or xarray data object for geospatial point timeseries. timeseries: str, Path, pd.DataFrame, optional Path, data source name, or pandas data object for tabular timeseries. locations: str, Path, gpd.GeoDataFrame, optional Path, data source name, or geopandas object for bnd point locations. It should contain a 'index' column matching the column names in `timeseries`. offset: str, Path, xr.Dataset, float, optional Path, data source name, constant value or xarray raster data for gridded offset between vertical reference of elevation and waterlevel data, The offset is added to the waterlevel data. buffer: float, optional Buffer [m] around model water level boundary cells to select waterlevel gauges, by default 5 km. merge : bool, optional If True, merge with existing forcing data, by default True. See Also -------- set_forcing_1d """ gdf_locs, df_ts = None, None tstart, tstop = self.get_model_time() # model time # buffer around msk==2 values if np.any(self.mask == 2): region = self.mask.where(self.mask == 2, 0).raster.vectorize() else: region = self.region # read waterlevel data from geodataset or geodataframe if geodataset is not None: # read and clip data in time & space da = self.data_catalog.get_geodataset( geodataset, geom=region, buffer=buffer, variables=["waterlevel"], time_tuple=(tstart, tstop), crs=self.crs, ) df_ts = da.transpose(..., da.vector.index_dim).to_pandas() gdf_locs = da.vector.to_gdf() elif timeseries is not None: df_ts = self.data_catalog.get_dataframe( timeseries, time_tuple=(tstart, tstop), # kwargs below only applied if timeseries not in data catalog parse_dates=True, index_col=0, ) df_ts.columns = df_ts.columns.map(int) # parse column names to integers # read location data (if not already read from geodataset) if gdf_locs is None and locations is not None: gdf_locs = self.data_catalog.get_geodataframe( locations, geom=region, buffer=buffer, crs=self.crs ).to_crs(self.crs) if "index" in gdf_locs.columns: gdf_locs = gdf_locs.set_index("index") # filter df_ts timeseries based on gdf_locs index # this allows to use a subset of the locations in the timeseries if df_ts is not None and np.isin(gdf_locs.index, df_ts.columns).all(): df_ts = df_ts.reindex(gdf_locs.index, axis=1, fill_value=0) elif gdf_locs is None and "bzs" in self.forcing: gdf_locs = self.forcing["bzs"].vector.to_gdf() elif gdf_locs is None: raise ValueError("No waterlevel boundary (bnd) points provided.") # optionally read offset data and correct df_ts if offset is not None and gdf_locs is not None: if isinstance(offset, (float, int)): df_ts += offset else: da_offset = self.data_catalog.get_rasterdataset( offset, bbox=self.mask.raster.transform_bounds(4326), buffer=5, ) offset_pnts = da_offset.raster.sample(gdf_locs) df_offset = offset_pnts.to_pandas().reindex(df_ts.columns).fillna(0) df_ts = df_ts + df_offset offset = offset_pnts.mean().values self.logger.debug( f"waterlevel forcing: applied offset (avg: {offset:+.2f})" ) # set/ update forcing self.set_forcing_1d(df_ts=df_ts, gdf_locs=gdf_locs, name="bzs", merge=merge)
[docs] def setup_waterlevel_bnd_from_mask( self, distance: float = 1e4, merge: bool = True, ): """Setup waterlevel boundary (bnd) points along model waterlevel boundary (msk=2). The waterlevel boundary (msk=2) should be set before calling this method, e.g.: with `setup_mask_bounds` Waterlevels (bzs) are set to zero at these points, but can be updated with `setup_waterlevel_forcing`. Parameters ---------- distance: float, optional Distance [m] between waterlevel boundary points, by default 10 km. merge : bool, optional If True, merge with existing forcing data, by default True. See Also -------- setup_waterlevel_forcing setup_mask_bounds """ # get waterlevel boundary vector based on mask gdf_msk = utils.get_bounds_vector(self.mask) gdf_msk2 = gdf_msk[gdf_msk["value"] == 2] # convert to meters if crs is geographic if self.mask.raster.crs.is_geographic: distance = distance / 111111.0 # create points along boundary points = [] for _, row in gdf_msk2.iterrows(): distances = np.arange(0, row.geometry.length, distance) for d in distances: point = row.geometry.interpolate(d) points.append((point.x, point.y)) # create geodataframe with points gdf = gpd.GeoDataFrame(geometry=gpd.points_from_xy(*zip(*points)), crs=self.crs) # set waterlevel boundary self.set_forcing_1d(gdf_locs=gdf, name="bzs", merge=merge)
[docs] def setup_discharge_forcing( self, geodataset=None, timeseries=None, locations=None, merge=True, buffer: float = None, ): """Setup discharge forcing. Discharge timeseries are read from a `geodataset` (geospatial point timeseries) or a tabular `timeseries` dataframe. At least one of these must be provided. The tabular timeseries data is combined with `locations` if provided, or with existing 'src' locations if previously set, e.g., with the `setup_river_inflow` method. Adds model layers: * **dis** forcing: discharge time series [m3/s] Parameters ---------- geodataset: str, Path, xr.Dataset, optional Path, data source name, or xarray data object for geospatial point timeseries. timeseries: str, Path, pd.DataFrame, optional Path, data source name, or pandas data object for tabular timeseries. locations: str, Path, gpd.GeoDataFrame, optional Path, data source name, or geopandas object for bnd point locations. It should contain a 'index' column matching the column names in `timeseries`. merge : bool, optional If True, merge with existing forcing data, by default True. buffer: float, optional Buffer [m] around model boundary within the model region select discharge gauges, by default None. See Also -------- setup_river_inflow """ gdf_locs, df_ts = None, None tstart, tstop = self.get_model_time() # model time # buffer region = self.region if buffer is not None: # TODO this assumes the model crs is projected region = region.boundary.buffer(buffer).clip(self.region) # read waterlevel data from geodataset or geodataframe if geodataset is not None: # read and clip data in time & space da = self.data_catalog.get_geodataset( geodataset, geom=region, variables=["discharge"], time_tuple=(tstart, tstop), crs=self.crs, ) df_ts = da.transpose(..., da.vector.index_dim).to_pandas() gdf_locs = da.vector.to_gdf() elif timeseries is not None: df_ts = self.data_catalog.get_dataframe( timeseries, time_tuple=(tstart, tstop), # kwargs below only applied if timeseries not in data catalog parse_dates=True, index_col=0, ) df_ts.columns = df_ts.columns.map(int) # parse column names to integers # read location data (if not already read from geodataset) if gdf_locs is None and locations is not None: gdf_locs = self.data_catalog.get_geodataframe( locations, geom=region, crs=self.crs ).to_crs(self.crs) if "index" in gdf_locs.columns: gdf_locs = gdf_locs.set_index("index") # filter df_ts timeseries based on gdf_locs index # this allows to use a subset of the locations in the timeseries if df_ts is not None and np.isin(gdf_locs.index, df_ts.columns).all(): df_ts = df_ts.reindex(gdf_locs.index, axis=1, fill_value=0) elif gdf_locs is None and "dis" in self.forcing: gdf_locs = self.forcing["dis"].vector.to_gdf() elif gdf_locs is None: raise ValueError("No discharge boundary (src) points provided.") # set/ update forcing self.set_forcing_1d(df_ts=df_ts, gdf_locs=gdf_locs, name="dis", merge=merge)
[docs] def setup_discharge_forcing_from_grid( self, discharge, locations=None, uparea=None, wdw=1, rel_error=0.05, abs_error=50, ): """Setup discharge forcing based on a gridded discharge dataset. Discharge boundary timesereis are read from the `discharge` dataset with gridded discharge time series data. The `locations` are snapped to the `uparea` grid if provided based their uparea attribute. If not provided, the nearest grid cell is used. Adds model layers: * **dis** forcing: discharge time series [m3/s] Adds meta layer (not used by SFINCS): * **src_snapped** geom: snapped gauge location on discharge grid Parameters ---------- discharge: str, Path, xr.DataArray optional Path, data source name or xarray data object for gridded discharge timeseries dataset. * Required variables: ['discharge' (m3/s)] * Required coordinates: ['time', 'y', 'x'] locations: str, Path, gpd.GeoDataFrame, optional Path, data source name, or geopandas data object for point location dataset. Not required if point location have previously been set, e.g. using the :py:meth:`~hydromt_sfincs.SfincsModel.setup_river_inflow` method. * Required variables: ['uparea' (km2)] uparea: str, Path, optional Path, data source name, or xarray data object for upstream area grid. * Required variables: ['uparea' (km2)] wdw: int, optional Window size in number of cells around discharge boundary locations to snap to, only used if ``uparea`` is provided. By default 1. rel_error, abs_error: float, optional Maximum relative error (default 0.05) and absolute error (default 50 km2) between the discharge boundary location upstream area and the upstream area of the best fit grid cell, only used if "discharge" geoms has a "uparea" column. See Also -------- setup_river_inflow """ if locations is not None: gdf = self.data_catalog.get_geodataframe( locations, geom=self.region, assert_gtype="Point" ).to_crs(self.crs) elif "dis" in self.forcing: gdf = self.forcing["dis"].vector.to_gdf() else: raise ValueError("No discharge boundary (src) points provided.") # read data ds = self.data_catalog.get_rasterdataset( discharge, bbox=self.mask.raster.transform_bounds(4326), buffer=2, time_tuple=self.get_model_time(), # model time variables=["discharge"], single_var_as_array=False, ) if uparea is not None and "uparea" in gdf.columns: da_upa = self.data_catalog.get_rasterdataset( uparea, bbox=self.mask.raster.transform_bounds(4326), buffer=2, variables=["uparea"], ) # make sure ds and da_upa align ds["uparea"] = da_upa.raster.reproject_like(ds, method="nearest") elif "uparea" not in gdf.columns: self.logger.warning('No "uparea" column found in location data.') # TODO use hydromt core method ds_snapped = workflows.snap_discharge( ds=ds, gdf=gdf, wdw=wdw, rel_error=rel_error, abs_error=abs_error, uparea_name="uparea", discharge_name="discharge", logger=self.logger, ) # set zeros for src points without matching discharge da_q = ds_snapped["discharge"].reindex(index=gdf.index, fill_value=0).fillna(0) df_q = da_q.transpose("time", ...).to_pandas() # update forcing self.set_forcing_1d(df_ts=df_q, gdf_locs=gdf, name="dis") # keep snapped locations self.set_geoms(ds_snapped.vector.to_gdf(), "src_snapped")
[docs] def setup_precip_forcing_from_grid( self, precip, dst_res=None, aggregate=False, **kwargs ): """Setup precipitation forcing from a gridded spatially varying data source. If aggregate is True, spatially uniform precipitation forcing is added to the model based on the mean precipitation over the model domain. If aggregate is False, distributed precipitation is added to the model as netcdf file. The data is reprojected to the model CRS (and destination resolution `dst_res` if provided). Adds one of these model layer: * **netamprfile** forcing: distributed precipitation [mm/hr] * **precipfile** forcing: uniform precipitation [mm/hr] Parameters ---------- precip, str, Path Path to precipitation rasterdataset netcdf file. * Required variables: ['precip' (mm)] * Required coordinates: ['time', 'y', 'x'] dst_res: float output resolution (m), by default None and computed from source data. Only used in combination with aggregate=False aggregate: bool, {'mean', 'median'}, optional Method to aggregate distributed input precipitation data. If True, mean aggregation is used, if False (default) the data is not aggregated and spatially distributed precipitation is returned. """ # get data for model domain and config time range precip = self.data_catalog.get_rasterdataset( precip, bbox=self.mask.raster.transform_bounds(4326), buffer=2, time_tuple=self.get_model_time(), variables=["precip"], ) # aggregate or reproject in space if aggregate: stat = aggregate if isinstance(aggregate, str) else "mean" self.logger.debug(f"Aggregate precip using {stat}.") zone = self.region.dissolve() # make sure we have a single (multi)polygon precip_out = precip.raster.zonal_stats(zone, stats=stat)[f"precip_{stat}"] df_ts = precip_out.where(precip_out >= 0, 0).fillna(0).squeeze().to_pandas() self.setup_precip_forcing(df_ts.to_frame()) else: # reproject to model utm crs # NOTE: currently SFINCS errors (stack overflow) on large files, # downscaling to model grid is not recommended kwargs0 = dict(align=dst_res is not None, method="nearest_index") kwargs0.update(kwargs) meth = kwargs0["method"] self.logger.debug(f"Resample precip using {meth}.") precip_out = precip.raster.reproject( dst_crs=self.crs, dst_res=dst_res, **kwargs ).fillna(0) # only resample in time if freq < 1H, else keep input values if da_to_timedelta(precip_out) < pd.to_timedelta("1H"): precip_out = hydromt.workflows.resample_time( precip_out, freq=pd.to_timedelta("1H"), conserve_mass=True, upsampling="bfill", downsampling="sum", logger=self.logger, ) precip_out = precip_out.rename("precip_2d") # add to forcing self.set_forcing(precip_out, name="precip_2d")
[docs] def setup_precip_forcing(self, timeseries=None, magnitude=None): """Setup spatially uniform precipitation forcing (precip). Adds model layers: * **precipfile** forcing: uniform precipitation [mm/hr] Parameters ---------- timeseries: str, Path Path to tabulated timeseries csv file with time index in first column and location IDs in the first row, see :py:meth:`hydromt.open_timeseries_from_table`, for details. Note: tabulated timeseries files cannot yet be set through the data_catalog yml file. magnitude: float Precipitation magnitude [mm/hr] to use if no timeseries is provided. """ tstart, tstop = self.get_model_time() if timeseries is not None: df_ts = self.data_catalog.get_dataframe( timeseries, time_tuple=(tstart, tstop), # kwargs below only applied if timeseries not in data catalog parse_dates=True, index_col=0, ) elif magnitude is not None: times = pd.date_range(*self.get_model_time(), freq="10T") df_ts = pd.DataFrame( index=times, data=np.full((len(times), 1), magnitude, dtype=float) ) else: raise ValueError("Either timeseries or magnitude must be provided") if isinstance(df_ts, pd.DataFrame): df_ts = df_ts.squeeze() if not isinstance(df_ts, pd.Series): raise ValueError("df_ts must be a pandas.Series") df_ts.name = "precip" df_ts.index.name = "time" self.set_forcing(df_ts.to_xarray(), name="precip")
[docs] def setup_pressure_forcing_from_grid( self, press, dst_res=None, fill_value=101325, **kwargs ): """Setup pressure forcing from a gridded spatially varying data source. Adds one model layer: * **netampfile** forcing: distributed barometric pressure [Pa] Parameters ---------- press, str, Path, xr.Dataset, xr.DataArray Path to pressure rasterdataset netcdf file or xarray dataset. * Required variables: ['press' (Pa)] * Required coordinates: ['time', 'y', 'x'] dst_res: float output resolution (m), by default None and computed from source data. fill_value: float value to use when no data is available. Standard atmospheric pressure (101325 Pa) is used if no value is given. """ # get data for model domain and config time range press = self.data_catalog.get_rasterdataset( press, geom=self.region, buffer=2, time_tuple=self.get_model_time(), variables=["press"], ) # reproject to model utm crs # NOTE: currently SFINCS errors (stack overflow) on large files, # downscaling to model grid is not recommended kwargs0 = dict(align=dst_res is not None, method="nearest_index") kwargs0.update(kwargs) meth = kwargs0["method"] self.logger.debug(f"Resample pressure using {meth}.") press_out = press.raster.reproject( dst_crs=self.crs, dst_res=dst_res, **kwargs ).fillna(fill_value) # only resample in time if freq < 1H, else keep input values if da_to_timedelta(press_out) < pd.to_timedelta("1H"): press_out = hydromt.workflows.resample_time( press_out, freq=pd.to_timedelta("1H"), conserve_mass=False, upsampling="interpolate", downsampling="interpolate", logger=self.logger, ) press_out = press_out.rename("press_2d") # add to forcing self.set_forcing(press_out, name="press_2d")
[docs] def setup_wind_forcing_from_grid(self, wind, dst_res=None, **kwargs): """Setup pressure forcing from a gridded spatially varying data source. Adds one model layer: * **netamuamv** forcing: distributed wind [m/s] Parameters ---------- wind, str, Path, xr.Dataset Path to wind rasterdataset (including eastward and northward components) netcdf file or xarray dataset. * Required variables: ['wind_u' (m/s), 'wind_v' (m/s)] * Required coordinates: ['time', 'y', 'x'] dst_res: float output resolution (m), by default None and computed from source data. """ # get data for model domain and config time range wind = self.data_catalog.get_rasterdataset( wind, geom=self.region, buffer=2, time_tuple=self.get_model_time(), variables=["wind_u", "wind_v"], ) # reproject to model utm crs # NOTE: currently SFINCS errors (stack overflow) on large files, # downscaling to model grid is not recommended kwargs0 = dict(align=dst_res is not None, method="nearest_index") kwargs0.update(kwargs) meth = kwargs0["method"] self.logger.debug(f"Resample wind using {meth}.") wind = wind.raster.reproject( dst_crs=self.crs, dst_res=dst_res, **kwargs ).fillna(0) # only resample in time if freq < 1H, else keep input values if da_to_timedelta(wind) < pd.to_timedelta("1H"): wind_out = xr.Dataset() # resample in time for var in wind.data_vars: wind_out[var] = hydromt.workflows.resample_time( wind[var], freq=pd.to_timedelta("1H"), conserve_mass=False, upsampling="interpolate", downsampling="interpolate", logger=self.logger, ) else: wind_out = wind # add to forcing self.set_forcing(wind_out, name="wind_2d")
[docs] def setup_wind_forcing(self, timeseries=None, magnitude=None, direction=None): """Setup spatially uniform wind forcing (wind). Adds model layers: * **windfile** forcing: uniform wind magnitude [m/s] and direction [deg] Parameters ---------- timeseries, str, Path Path to tabulated timeseries csv file with time index in first column, magnitude in second column and direction in third column see :py:meth:`hydromt.open_timeseries_from_table`, for details. Note: tabulated timeseries files cannot yet be set through the data_catalog yml file. magnitude: float Magnitude of the wind [m/s] direction: float Direction where the wind is coming from [deg], e.g. 0 is north, 90 is east, etc. """ tstart, tstop = self.get_model_time() if timeseries is not None: df_ts = self.data_catalog.get_dataframe( timeseries, time_tuple=(tstart, tstop), # kwargs below only applied if timeseries not in data catalog parse_dates=True, index_col=0, ) elif magnitude is not None and direction is not None: df_ts = pd.DataFrame( index=pd.date_range(*self.get_model_time(), periods=2), data=np.array([[magnitude, direction], [magnitude, direction]]), columns=["mag", "dir"], ) else: raise ValueError( "Either timeseries or magnitude and direction must be provided" ) df_ts.name = "wnd" df_ts.index.name = "time" df_ts.columns.name = "index" da = xr.DataArray( df_ts.values, dims=("time", "index"), coords={"time": df_ts.index, "index": ["mag", "dir"]}, ) self.set_forcing(da, name="wnd")
[docs] def setup_tiles( self, path: Union[str, Path] = None, region: dict = None, datasets_dep: List[dict] = [], zoom_range: Union[int, List[int]] = [0, 13], z_range: List[int] = [-20000.0, 20000.0], create_index_tiles: bool = True, create_topobathy_tiles: bool = True, fmt: str = "bin", ): """Create both index and topobathy tiles in webmercator format. Parameters ---------- path : Union[str, Path] Directory in which to store the index tiles, if None, the model root + tiles is used. region : dict Dictionary describing region of interest, e.g.: * {'bbox': [xmin, ymin, xmax, ymax]}. Note bbox should be provided in WGS 84 * {'geom': 'path/to/polygon_geometry'} If None, the model region is used. datasets_dep : List[dict] List of dictionaries with topobathy data, each containing a dataset name or Path (elevtn) and optional merge arguments e.g.: [{'elevtn': merit_hydro, 'zmin': 0.01}, {'elevtn': gebco, 'offset': 0, 'merge_method': 'first', reproj_method: 'bilinear'}] For a complete overview of all merge options, see :py:func:`~hydromt.workflows.merge_multi_dataarrays` Note that subgrid/dep_subgrid.tif is automatically used if present and datasets_dep is left empty. zoom_range : Union[int, List[int]], optional Range of zoom levels for which tiles are created, by default [0,13] z_range : List[int], optional Range of valid elevations that are included in the topobathy tiles, by default [-20000.0, 20000.0] create_index_tiles : bool, optional If True, index tiles are created, by default True create_topobathy_tiles : bool, optional If True, topobathy tiles are created, by default True. fmt : str, optional Format of the tiles: "bin" (binary, default), or "png". """ # use model root if path not provided if path is None: path = os.path.join(self.root, "tiles") # use model region if region not provided if region is None: region = self.region else: _kind, _region = hydromt.workflows.parse_region(region=region) if "bbox" in _region: bbox = _region["bbox"] region = gpd.GeoDataFrame(geometry=[box(*bbox)], crs=4326) elif "geom" in _region: region = _region["geom"] if region.crs is None: raise ValueError('Model region "geom" has no CRS') # if only one zoom level is specified, create tiles up to that zoom level (inclusive) if isinstance(zoom_range, int): zoom_range = [0, zoom_range] # create index tiles if create_index_tiles: # only binary and png are supported for index tiles so set to binary if tif fmt_ind = "bin" if fmt == "tif" else fmt if self.grid_type == "regular": self.reggrid.create_index_tiles( region=region, root=path, zoom_range=zoom_range, fmt=fmt_ind, logger=self.logger, ) elif self.grid_type == "quadtree": raise NotImplementedError( "Index tiles not yet implemented for quadtree grids." ) # create topobathy tiles if create_topobathy_tiles: # compute resolution of highest zoom level # resolution of zoom level 0 on equator: 156543.03392804097 res = 156543.03392804097 / 2 ** zoom_range[1] datasets_dep = self._parse_datasets_dep(datasets_dep, res=res) # if no datasets provided, check if high-res subgrid geotiff is there if len(datasets_dep) == 0: if os.path.exists(os.path.join(self.root, "subgrid")): # check if there is a dep_subgrid.tif dep = os.path.join(self.root, "subgrid", "dep_subgrid.tif") if os.path.exists(dep): da = self.data_catalog.get_rasterdataset(dep) datasets_dep.append({"da": da}) else: raise ValueError("No topobathy datasets provided.") # create topobathy tiles workflows.tiling.create_topobathy_tiles( root=path, region=region, datasets_dep=datasets_dep, index_path=os.path.join(path, "indices"), zoom_range=zoom_range, z_range=z_range, fmt=fmt, )
# Plotting
[docs] def plot_forcing(self, fn_out=None, forcings="all", **kwargs): """Plot model timeseries forcing. For distributed forcing a spatial avarage, minimum or maximum is plotted. Parameters ---------- fn_out: str Path to output figure file. If a basename is given it is saved to <model_root>/figs/<fn_out> If None, no file is saved. forcings : str List of forcings to plot, by default 'all'. If 'all', all available forcings are plotted. See :py:attr:`~hydromt_sfincs.SfincsModel.forcing.keys()` for available forcings. **kwargs : dict Additional keyword arguments passed to :py:func:`hydromt.plotting.plot_forcing`. Returns ------- fig, axes Model fig and ax objects """ import matplotlib.dates as mdates import matplotlib.pyplot as plt if self.forcing: forcing = {} if forcings == "all": forcings = list(self.forcing.keys()) elif isinstance(forcings, str): forcings = [forcings] for name in forcings: if name not in self.forcing: self.logger.warning(f'No forcing named "{name}" found in model.') continue if isinstance(self.forcing[name], xr.Dataset): self.logger.warning( f'Skipping forcing "{name}" as it is a dataset.' ) continue # plot only dataarrays forcing[name] = self.forcing[name].copy() # update missing attributes for plot labels forcing[name].attrs.update(**self._ATTRS.get(name, {})) if len(forcing) > 0: fig, axes = plots.plot_forcing(forcing, **kwargs) # set xlim to model tstart - tend tstart, tstop = self.get_model_time() axes[-1].set_xlim(mdates.date2num([tstart, tstop])) # save figure if fn_out is not None: if not os.path.isabs(fn_out): fn_out = join(self.root, "figs", fn_out) if not os.path.isdir(dirname(fn_out)): os.makedirs(dirname(fn_out)) plt.savefig(fn_out, dpi=225, bbox_inches="tight") return fig, axes else: raise ValueError("No forcing found in model.")
[docs] def plot_basemap( self, fn_out: str = None, variable: Union[str, xr.DataArray] = "dep", shaded: bool = False, plot_bounds: bool = True, plot_region: bool = False, plot_geoms: bool = True, bmap: str = None, zoomlevel: int = "auto", figsize: Tuple[int] = None, geom_names: List[str] = None, geom_kwargs: Dict = {}, legend_kwargs: Dict = {}, **kwargs, ): """Create basemap plot. Parameters ---------- fn_out: str, optional Path to output figure file, by default None. If a basename is given it is saved to <model_root>/figs/<fn_out> If None, no file is saved. variable : str, xr.DataArray, optional Map of variable in ds to plot, by default 'dep' Alternatively, provide a xr.DataArray shaded : bool, optional Add shade to variable (only for variable = 'dep' and non-rotated grids), by default False plot_bounds : bool, optional Add waterlevel (msk=2) and open (msk=3) boundary conditions to plot. plot_region : bool, optional If True, plot region outline. plot_geoms : bool, optional If True, plot available geoms. bmap : str, optional background map souce name, by default None. Default image tiles "sat", and "osm" are fetched from cartopy image tiles. If contextily is installed, xyzproviders tiles can be used as well. zoomlevel : int, optional zoomlevel, by default 'auto' figsize : Tuple[int], optional figure size, by default None geom_names : List[str], optional list of model geometries to plot, by default all model geometries. geom_kwargs : Dict of Dict, optional Model geometry styling per geometry, passed to geopandas.GeoDataFrame.plot method. For instance: {'src': {'markersize': 30}}. legend_kwargs : Dict, optional Legend kwargs, passed to ax.legend method. Returns ------- fig, axes Model fig and ax objects """ import matplotlib.pyplot as plt # combine geoms and forcing locations sg = self.geoms.copy() for fname, gname in self._FORCING_1D.values(): if fname[0] in self.forcing and gname is not None: try: sg.update({gname: self.forcing[fname[0]].vector.to_gdf()}) except ValueError: self.logger.debug(f'unable to plot forcing location: "{fname}"') if plot_region and "region" not in self.geoms: sg.update({"region": self.region}) # make sure grid are set if isinstance(variable, xr.DataArray): ds = variable.to_dataset() variable = variable.name elif variable.startswith("subgrid.") and self.subgrid is not None: ds = self.subgrid.copy() variable = variable.replace("subgrid.", "") else: ds = self.grid.copy() if "msk" not in ds: ds["msk"] = self.mask fig, ax = plots.plot_basemap( ds, sg, variable=variable, shaded=shaded, plot_bounds=plot_bounds, plot_region=plot_region, plot_geoms=plot_geoms, bmap=bmap, zoomlevel=zoomlevel, figsize=figsize, geom_names=geom_names, geom_kwargs=geom_kwargs, legend_kwargs=legend_kwargs, **kwargs, ) if fn_out is not None: if not os.path.isabs(fn_out): fn_out = join(self.root, "figs", fn_out) if not os.path.isdir(dirname(fn_out)): os.makedirs(dirname(fn_out)) plt.savefig(fn_out, dpi=225, bbox_inches="tight") return fig, ax
# I/O
[docs] def read(self, epsg: int = None): """Read the complete model schematization and configuration from file.""" self.read_config(epsg=epsg) if epsg is None and "epsg" not in self.config: raise ValueError("Please specify epsg to read this model") self.read_grid() self.read_subgrid() self.read_geoms() self.read_forcing() self.logger.info("Model read")
[docs] def write(self): """Write the complete model schematization and configuration to file.""" self.logger.info(f"Writing model data to {self.root}") # TODO - add check for subgrid & quadtree > give flags to self.write_grid() and self.write_config() self.write_grid() self.write_subgrid() self.write_geoms() self.write_forcing() self.write_states() # config last; might be udpated when writing maps, states or forcing self.write_config() # write data catalog with used data sources self.write_data_catalog() # new in hydromt v0.4.4
[docs] def read_grid(self, data_vars: Union[List, str] = None) -> None: """Read SFINCS binary grid files and save to `grid` attribute. Filenames are taken from the `config` attribute (i.e. input file). Parameters ---------- data_vars : Union[List, str], optional List of data variables to read, by default None (all) """ if self._grid is None: self._grid = xr.Dataset() # avoid reading grid twice da_lst = [] if data_vars is None: data_vars = self._MAPS elif isinstance(data_vars, str): data_vars = list(data_vars) # read index file ind_fn = self.get_config("indexfile", fallback="sfincs.ind", abs_path=True) if not isfile(ind_fn): raise IOError(f".ind path {ind_fn} does not exist") dtypes = {"msk": "u1"} mvs = {"msk": 0} if self.reggrid is not None: ind = self.reggrid.read_ind(ind_fn=ind_fn) for name in data_vars: if f"{name}file" in self.config: fn = self.get_config( f"{name}file", fallback=f"sfincs.{name}", abs_path=True ) if not isfile(fn): self.logger.warning(f"{name}file not found at {fn}") continue dtype = dtypes.get(name, "f4") mv = mvs.get(name, -9999.0) da = self.reggrid.read_map(fn, ind, dtype, mv, name=name) da_lst.append(da) ds = xr.merge(da_lst) epsg = self.config.get("epsg", None) if epsg is not None: ds.raster.set_crs(epsg) self.set_grid(ds) # keep some metadata maps from gis directory fns = glob.glob(join(self.root, "gis", "*.tif")) fns = [ fn for fn in fns if basename(fn).split(".")[0] not in self.grid.data_vars ] if fns: ds = hydromt.open_mfraster(fns).load() self.set_grid(ds) ds.close()
[docs] def write_grid(self, data_vars: Union[List, str] = None): """Write SFINCS grid to binary files including map index file. Filenames are taken from the `config` attribute (i.e. input file). If `write_gis` property is True, all grid variables are written to geotiff files in a "gis" subfolder. Parameters ---------- data_vars : Union[List, str], optional List of data variables to write, by default None (all) """ self._assert_write_mode dtypes = {"msk": "u1"} # default to f4 if self.reggrid and len(self.grid.data_vars) > 0 and "msk" in self.grid: # make sure orientation is S->N ds_out = self.grid if ds_out.raster.res[1] < 0: ds_out = ds_out.raster.flipud() mask = ds_out["msk"].values self.logger.debug("Write binary map indices based on mask.") ind_fn = self.get_config("indexfile", abs_path=True) self.reggrid.write_ind(ind_fn=ind_fn, mask=mask) if data_vars is None: # write all maps data_vars = [v for v in self._MAPS if v in ds_out] elif isinstance(data_vars, str): data_vars = list(data_vars) self.logger.debug(f"Write binary map files: {data_vars}.") for name in data_vars: if f"{name}file" not in self.config: self.set_config(f"{name}file", f"sfincs.{name}") # do not write depfile if subgrid is used if (name == "dep" or name == "manning") and self.subgrid: continue self.reggrid.write_map( map_fn=self.get_config(f"{name}file", abs_path=True), data=ds_out[name].values, mask=mask, dtype=dtypes.get(name, "f4"), ) if self._write_gis: self.write_raster("grid")
[docs] def read_subgrid(self): """Read SFINCS subgrid file and add to `subgrid` attribute. Filename is taken from the `config` attribute (i.e. input file).""" self._assert_read_mode if "sbgfile" in self.config: fn = self.get_config("sbgfile", abs_path=True) if not isfile(fn): self.logger.warning(f"sbgfile not found at {fn}") return # re-initialize subgrid (different variables for old/new version) # TODO: come up with a better way to handle this self.reggrid.subgrid = SubgridTableRegular() self.subgrid = xr.Dataset() # read subgrid file if fn.parts[-1].endswith(".sbg"): # read binary file self.reggrid.subgrid.read_binary(file_name=fn, mask=self.mask) else: # read netcdf file self.reggrid.subgrid.read(file_name=fn, mask=self.mask) self.subgrid = self.reggrid.subgrid.to_xarray( dims=self.mask.raster.dims, coords=self.mask.raster.coords )
[docs] def write_subgrid(self): """Write SFINCS subgrid file.""" self._assert_write_mode if self.subgrid: if "sbgfile" not in self.config: # apparently no subgrid was read, so set default filename self.set_config("sbgfile", "sfincs_subgrid.nc") fn = self.get_config("sbgfile", abs_path=True) if fn.parts[-1].endswith(".sbg"): # write binary file self.reggrid.subgrid.write_binary(file_name=fn, mask=self.mask) else: # write netcdf file self.reggrid.subgrid.write(file_name=fn, mask=self.mask)
[docs] def read_geoms(self): """Read geometry files and save to `geoms` attribute. Known geometry files mentioned in the sfincs.inp configuration file are read, including: bnd/src/obs xy(n) files, thd/weir structure files and drn drainage structure files. If other geojson files are present in a "gis" subfolder folder, those are read as well. """ self._assert_read_mode if self._geoms is None: self._geoms = {} # avoid reading geoms twice # read _GEOMS model files for gname in self._GEOMS.values(): if f"{gname}file" in self.config: fn = self.get_config(f"{gname}file", abs_path=True) if fn is None: continue elif not isfile(fn): self.logger.warning(f"{gname}file not found at {fn}") continue if gname in ["thd", "weir", "crs"]: struct = utils.read_geoms(fn) gdf = utils.linestring2gdf(struct, crs=self.crs) elif gname == "obs": gdf = utils.read_xyn(fn, crs=self.crs) elif gname == "drn": gdf = utils.read_drn(fn, crs=self.crs) else: gdf = utils.read_xy(fn, crs=self.crs) # this seems to be required for new pandas versions gdf.set_geometry("geometry", inplace=True) self.set_geoms(gdf, name=gname) # read additional geojson files from gis directory for fn in glob.glob(join(self.root, "gis", "*.geojson")): name = basename(fn).replace(".geojson", "") gnames = [f[1] for f in self._FORCING_1D.values() if f[1] is not None] skip = gnames + list(self._GEOMS.values()) if name in skip: continue gdf = hydromt.open_vector(fn, crs=self.crs) self.set_geoms(gdf, name=name)
[docs] def write_geoms(self, data_vars: Union[List, str] = None): """Write geoms to bnd/src/obs xy files and thd/weir structure files. Filenames are based on the `config` attribute. If `write_gis` property is True, all geoms are written to geojson files in a "gis" subfolder. Parameters ---------- data_vars : list of str, optional List of data variables to write, by default None (all) """ self._assert_write_mode # change precision of coordinates according to crs if self.crs.is_geographic: fmt = "%.6f" else: fmt = "%.1f" if self.geoms: dvars = self._GEOMS.values() if data_vars is not None: dvars = [name for name in data_vars if name in self._GEOMS.values()] self.logger.info("Write geom files") for gname, gdf in self.geoms.items(): if gname in dvars: if f"{gname}file" not in self.config: self.set_config(f"{gname}file", f"sfincs.{gname}") fn = self.get_config(f"{gname}file", abs_path=True) if gname in ["thd", "weir", "crs"]: struct = utils.gdf2linestring(gdf) utils.write_geoms(fn, struct, stype=gname, fmt=fmt) elif gname == "obs": utils.write_xyn(fn, gdf, fmt=fmt) elif gname == "drn": utils.write_drn(fn, gdf, fmt=fmt) else: hydromt.io.write_xy(fn, gdf, fmt="%8.2f") # NOTE: all geoms are written to geojson files in a "gis" subfolder if self._write_gis: self.write_vector(variables=["geoms"])
[docs] def read_forcing(self, data_vars: List = None): """Read forcing files and save to `forcing` attribute. Known forcing files mentioned in the sfincs.inp configuration file are read, including: bzs/dis/precip ascii files and the netampr netcdf file. Parameters ---------- data_vars : list of str, optional List of data variables to read, by default None (all) """ self._assert_read_mode if self._forcing is None: self._forcing = {} # avoid reading forcing twice if isinstance(data_vars, str): data_vars = list(data_vars) # 1D dvars_1d = self._FORCING_1D if data_vars is not None: dvars_1d = [name for name in data_vars if name in dvars_1d] tref = utils.parse_datetime(self.config["tref"]) for name in dvars_1d: ts_names, xy_name = self._FORCING_1D[name] # read time series da_lst = [] for ts_name in ts_names: ts_fn = self.get_config(f"{ts_name}file", abs_path=True) if ts_fn is None or not isfile(ts_fn): if ts_fn is not None: self.logger.warning(f"{ts_name}file not found at {ts_fn}") continue df = utils.read_timeseries(ts_fn, tref) df.index.name = "time" if xy_name is not None: df.columns.name = "index" da = xr.DataArray(df, dims=("time", "index"), name=ts_name) else: # spatially uniform forcing da = xr.DataArray(df[df.columns[0]], dims=("time"), name=ts_name) da_lst.append(da) ds = xr.merge(da_lst[:]) # read xy if xy_name is not None: xy_fn = self.get_config(f"{xy_name}file", abs_path=True) if xy_fn is None or not isfile(xy_fn): if xy_fn is not None: self.logger.warning(f"{xy_name}file not found at {xy_fn}") else: gdf = utils.read_xy(xy_fn, crs=self.crs) # read attribute data from gis files; merge based on index gis_fn = join(self.root, "gis", f"{xy_name}.geojson") if isfile(gis_fn): gdf1 = gpd.read_file(gis_fn) if "index" in gdf1.columns: gdf1 = gdf1.set_index("index") if not np.all(np.isin(gdf.index, gdf1.index)): self.logger.warning( f"Index in {xy_name}file does not match {gis_fn}" ) else: for col in gdf1.columns: if col not in gdf.columns: gdf[col] = gdf1.loc[gdf.index, col] # set locations and attributes as coordinates of dataset ds = ds.assign_coords(index=gdf.index.values) ds = GeoDataset.from_gdf(gdf, ds, index_dim="index") # save in self.forcing if len(ds) > 1: # keep wave forcing together self.set_forcing(ds, name=name, split_dataset=False) elif len(ds) > 0: self.set_forcing(ds, split_dataset=True) # 2D NETCDF format dvars_2d = self._FORCING_NET if data_vars is not None: dvars_2d = [name for name in data_vars if name in dvars_2d] for name in dvars_2d: fname, rename = self._FORCING_NET[name] fn = self.get_config(f"{fname}file", abs_path=True) if fn is None or not isfile(fn): if fn is not None: self.logger.warning(f"{name}file not found at {fn}") continue elif name in ["netbndbzsbzi", "netsrcdis"]: ds = GeoDataset.from_netcdf(fn, crs=self.crs, chunks="auto") else: ds = xr.open_dataset(fn, chunks="auto") rename = {k: v for k, v in rename.items() if k in ds} if len(rename) > 0: ds = ds.rename(rename).squeeze(drop=True)[list(rename.values())] self.set_forcing(ds, split_dataset=True) else: logger.warning(f"No forcing variables found in {fname}file")
[docs] def write_forcing(self, data_vars: Union[List, str] = None): """Write forcing to ascii or netcdf (netampr) files. Filenames are based on the `config` attribute. Parameters ---------- data_vars : list of str, optional List of data variables to write, by default None (all) """ self._assert_write_mode # change precision of coordinates according to crs if self.crs.is_geographic: fmt = "%.6f" else: fmt = "%.1f" if self.forcing: self.logger.info("Write forcing files") tref = utils.parse_datetime(self.config["tref"]) # for nc files -> time in minutes since tref tref_str = tref.strftime("%Y-%m-%d %H:%M:%S") # 1D timeseries + location text files dvars_1d = self._FORCING_1D if data_vars is not None: dvars_1d = [name for name in data_vars if name in self._FORCING_1D] for name in dvars_1d: ts_names, xy_name = self._FORCING_1D[name] if ( name in self._FORCING_NET and f"{self._FORCING_NET[name][0]}file" in self.config ): continue # write NC file instead of text files # work with wavespectra dataset and bzs/dis dataarray if name in self.forcing and isinstance(self.forcing[name], xr.Dataset): ds = self.forcing[name] else: ds = self.forcing # dict # write timeseries da = None for ts_name in ts_names: if ts_name not in ds or ds[ts_name].ndim > 2: continue # parse data to dataframe da = ds[ts_name].transpose("time", ...) df = da.to_pandas() # get filenames from config if f"{ts_name}file" not in self.config: self.set_config(f"{ts_name}file", f"sfincs.{ts_name}") fn = self.get_config(f"{ts_name}file", abs_path=True) # write timeseries utils.write_timeseries(fn, df, tref) # write xy if xy_name and da is not None: # parse data to geodataframe try: gdf = da.vector.to_gdf() except Exception: raise ValueError(f"Locations missing for {name} forcing") # get filenames from config if f"{xy_name}file" not in self.config: self.set_config(f"{xy_name}file", f"sfincs.{xy_name}") fn_xy = self.get_config(f"{xy_name}file", abs_path=True) # write xy hydromt.io.write_xy(fn_xy, gdf, fmt=fmt) if self._write_gis: # write geojson file to gis folder self.write_vector(variables=f"forcing.{ts_names[0]}") # netcdf forcing encoding = dict( time={"units": f"minutes since {tref_str}", "dtype": "float64"} ) dvars_2d = self._FORCING_NET if data_vars is not None: dvars_2d = [name for name in data_vars if name in self._FORCING_NET] for name in dvars_2d: if ( name in self._FORCING_1D and f"{self._FORCING_1D[name][1]}file" in self.config ): continue # timeseries + xy file already written fname, rename = self._FORCING_NET[name] # combine variables and rename to output names rename = {v: k for k, v in rename.items() if v in self.forcing} if len(rename) == 0: continue ds = xr.merge([self.forcing[v] for v in rename.keys()]).rename(rename) # get filename from config if f"{fname}file" not in self.config: self.set_config(f"{fname}file", f"{name}.nc") fn = self.get_config(f"{fname}file", abs_path=True) # write 1D timeseries if fname in ["netbndbzsbzi", "netsrcdis"]: ds.vector.to_xy().to_netcdf(fn, encoding=encoding) if self._write_gis: # write geojson file to gis folder self.write_vector(variables=f"forcing.{list(rename.keys())[0]}") # write 2D gridded timeseries else: ds.to_netcdf(fn, encoding=encoding)
[docs] def read_states(self): """Read waterlevel state (zsini) from binary file and save to `states` attribute. The inifile if mentioned in the sfincs.inp configuration file is read. """ self._assert_read_mode # read index file # TODO make reggrid a property where we trigger the initialization of reggrid if self.reggrid is None: self.update_grid_from_config() if self.reggrid is not None: ind_fn = self.get_config("indexfile", fallback="sfincs.ind", abs_path=True) if "msk" in self.grid: # triggers reading grid if empty and in read mode ind = self.reggrid.ind(self.grid["msk"].values) elif isfile(ind_fn): ind = self.reggrid.read_ind(ind_fn=ind_fn) else: raise IOError(f"indexfile {ind_fn} does not exist") if "inifile" in self.config: fn = self.get_config("inifile", abs_path=True) if not isfile(fn): self.logger.warning("inifile not found at {fn}") return zsini = self.reggrid.read_map( fn, ind, dtype="f4", mv=-9999.0, name="zsini" ) if self.crs is not None: zsini.raster.set_crs(self.crs) self.set_states(zsini, "zsini")
[docs] def write_states(self): """Write waterlevel state (zsini) to binary map file. The filenames is based on the `config` attribute. """ self._assert_write_mode name = "zsini" if name not in self.states: self.logger.warning(f"{name} not in states, skipping") return if self.reggrid and "msk" in self.grid: # make sure orientation is S->N ds_out = self.grid if ds_out.raster.res[1] < 0: ds_out = ds_out.raster.flipud() mask = ds_out["msk"].values self.logger.debug("Write binary map indices based on mask.") # write index file ind_fn = self.get_config("indexfile", abs_path=True) self.reggrid.write_ind(ind_fn=ind_fn, mask=mask) if "inifile" not in self.config: self.set_config("inifile", f"sfincs.{name}") fn = self.get_config("inifile", abs_path=True) da = self.states[name] if da.raster.res[1] < 0: da = da.raster.flipud() self.logger.debug("Write binary water level state inifile") self.reggrid.write_map( map_fn=fn, data=da.values, mask=mask, dtype="f4", ) if self._write_gis: self.write_raster("states")
[docs] def read_results( self, chunksize=100, drop=["crs", "sfincsgrid"], fn_map="sfincs_map.nc", fn_his="sfincs_his.nc", **kwargs, ): """Read results from sfincs_map.nc and sfincs_his.nc and save to the `results` attribute. The staggered nc file format is translated into hydromt.RasterDataArray formats. Additionally, hmax is computed from zsmax and zb if present. Parameters ---------- chunksize: int, optional chunk size along time dimension, by default 100 drop: list, optional list of variables to drop, by default ["crs", "sfincsgrid"] fn_map: str, optional filename of sfincs_map.nc, by default "sfincs_map.nc" fn_his: str, optional filename of sfincs_his.nc, by default "sfincs_his.nc" """ if not isabs(fn_map): fn_map = join(self.root, fn_map) if isfile(fn_map): ds_face, ds_edge = utils.read_sfincs_map_results( fn_map, ds_like=self.grid, # TODO: fix for quadtree drop=drop, logger=self.logger, **kwargs, ) # save as dict of DataArray self.set_results(ds_face, split_dataset=True) self.set_results(ds_edge, split_dataset=True) if not isabs(fn_his): fn_his = join(self.root, fn_his) if isfile(fn_his): ds_his = utils.read_sfincs_his_results( fn_his, crs=self.crs, chunksize=chunksize ) # drop double vars (map files has priority) drop_vars = [v for v in ds_his.data_vars if v in self.results or v in drop] ds_his = ds_his.drop_vars(drop_vars) self.set_results(ds_his, split_dataset=True)
def write_raster( self, variables=["grid", "states", "results.hmax"], root=None, driver="GTiff", compress="deflate", **kwargs, ): """Write model 2D raster variables to geotiff files. NOTE: these files are not used by the model by just saved for visualization/ analysis purposes. Parameters ---------- variables: str, list, optional Model variables are a combination of attribute and layer (optional) using <attribute>.<layer> syntax. Known ratster attributes are ["grid", "states", "results"]. Different variables can be combined in a list. By default, variables is ["grid", "states", "results.hmax"] root: Path, str, optional The output folder path. If None it defaults to the <model_root>/gis folder (Default) kwargs: Key-word arguments passed to hydromt.RasterDataset.to_raster(driver='GTiff', compress='lzw'). """ # check variables if isinstance(variables, str): variables = [variables] if not isinstance(variables, list): raise ValueError(f'"variables" should be a list, not {type(list)}.') # check root if root is None: root = join(self.root, "gis") if not os.path.isdir(root): os.makedirs(root) # save to file for var in variables: vsplit = var.split(".") attr = vsplit[0] obj = getattr(self, f"_{attr}") if obj is None or len(obj) == 0: continue # empty self.logger.info(f"Write raster file(s) for {var} to 'gis' subfolder") layers = vsplit[1:] if len(vsplit) >= 2 else list(obj.keys()) for layer in layers: if layer not in obj: self.logger.warning(f"Variable {attr}.{layer} not found: skipping.") continue da = obj[layer] if len(da.dims) != 2: # try to reduce to 2D by taking maximum over time dimension if "time" in da.dims: da = da.max("time") elif "timemax" in da.dims: da = da.max("timemax") # if still not 2D, skip if len(da.dims) != 2: self.logger.warning( f"Variable {attr}.{layer} has more than 2 dimensions: skipping." ) continue # only write active cells to gis files da = da.where(self.mask > 0, da.raster.nodata).raster.mask_nodata() if da.raster.res[1] > 0: # make sure orientation is N->S da = da.raster.flipud() da.raster.to_raster( join(root, f"{layer}.tif"), driver=driver, compress=compress, **kwargs, ) def write_vector( self, variables=["geoms", "forcing.bzs", "forcing.dis"], root=None, gdf=None, **kwargs, ): """Write model vector (geoms) variables to geojson files. NOTE: these files are not used by the model by just saved for visualization/ analysis purposes. Parameters ---------- variables: str, list, optional geoms variables. By default all geoms are saved. root: Path, str, optional The output folder path. If None it defaults to the <model_root>/gis folder (Default) kwargs: Key-word arguments passed to geopandas.GeoDataFrame.to_file(driver='GeoJSON'). """ kwargs.update(driver="GeoJSON") # fixed # check variables if isinstance(variables, str): variables = [variables] if not isinstance(variables, list): raise ValueError(f'"variables" should be a list, not {type(list)}.') # check root if root is None: root = join(self.root, "gis") if not os.path.isdir(root): os.makedirs(root) # save to file for var in variables: vsplit = var.split(".") attr = vsplit[0] obj = getattr(self, f"_{attr}") if obj is None or len(obj) == 0: continue # empty self.logger.info(f"Write vector file(s) for {var} to 'gis' subfolder") names = vsplit[1:] if len(vsplit) >= 2 else list(obj.keys()) for name in names: if name not in obj: self.logger.warning(f"Variable {attr}.{name} not found: skipping.") continue if isinstance(obj[name], gpd.GeoDataFrame): gdf = obj[name] else: try: gdf = obj[name].vector.to_gdf() # xy name -> difficult! name = [ v[-1] for v in self._FORCING_1D.values() if name in v[0] ][0] except: self.logger.debug( f"Variable {attr}.{name} could not be written to vector file." ) pass gdf.to_file(join(root, f"{name}.geojson"), **kwargs) ## model configuration
[docs] def read_config(self, config_fn: str = None, epsg: int = None) -> None: """Parse config from SFINCS input file. If in write-only mode the config is initialized with default settings unless a path to a template config file is provided. Parameters ---------- config_fn: str Filename of config file, by default None. epsg: int EPSG code of the model CRS. Only used if missing in the SFINCS input file, by default None. """ inp = SfincsInput() # initialize with defaults if config_fn is not None or self._read: if config_fn is None: # read from default location config_fn = self._config_fn if not isabs(config_fn) and self._root: # read from model root config_fn = abspath(join(self.root, config_fn)) if not isfile(config_fn): raise IOError(f"SFINCS input file not found {config_fn}") # read inp file inp.read(inp_fn=config_fn) # overwrite / initialize config attribute self._config = inp.to_dict() if epsg is not None and "epsg" not in self.config: self.set_config("epsg", int(epsg)) # update grid properties based on sfincs.inp self.update_grid_from_config()
[docs] def write_config(self, config_fn: str = "sfincs.inp"): """Write config to <root/config_fn>""" self._assert_write_mode if not isabs(config_fn) and self._root: config_fn = join(self.root, config_fn) inp = SfincsInput.from_dict(self.config) inp.write(inp_fn=abspath(config_fn))
[docs] def update_spatial_attrs(self): """Update geospatial `config` (sfincs.inp) attributes based on grid""" dx, dy = self.res # TODO check self.bounds with rotation!! origin not necessary equal to total_bounds west, south, _, _ = self.bounds if self.crs is not None: self.set_config("epsg", self.crs.to_epsg()) self.set_config("mmax", self.width) self.set_config("nmax", self.height) self.set_config("dx", dx) self.set_config("dy", abs(dy)) # dy is always positive (orientation is S -> N) self.set_config("x0", west) self.set_config("y0", south)
[docs] def update_grid_from_config(self): """Update grid properties based on `config` (sfincs.inp) attributes""" self.grid_type = ( "quadtree" if self.config.get("qtrfile") is not None else "regular" ) if self.grid_type == "regular": self.reggrid = RegularGrid( x0=self.config.get("x0"), y0=self.config.get("y0"), dx=self.config.get("dx"), dy=self.config.get("dy"), nmax=self.config.get("nmax"), mmax=self.config.get("mmax"), rotation=self.config.get("rotation", 0), epsg=self.config.get("epsg"), ) else: raise not NotImplementedError("Quadtree grid not implemented yet")
# self.quadtree = QuadtreeGrid()
[docs] def get_model_time(self): """Return (tstart, tstop) tuple with parsed model start and end time""" tstart = utils.parse_datetime(self.config["tstart"]) tstop = utils.parse_datetime(self.config["tstop"]) return tstart, tstop
## helper method def _parse_datasets_dep(self, datasets_dep, res): """Parse filenames or paths of Datasets in list of dictionaries datasets_dep into xr.DataArray and gdf.GeoDataFrames: * "elevtn" is parsed into da (xr.DataArray) * "offset" is parsed into da_offset (xr.DataArray) * "mask" is parsed into gdf (gpd.GeoDataFrame) Parameters ---------- datasets_dep : List[dict] List of dictionaries with topobathy data, each containing a dataset name or Path (dep) and optional merge arguments. res : float Resolution of the model grid in meters. Used to obtain the correct zoom level of the depth datasets. """ parse_keys = ["elevtn", "offset", "mask", "da"] copy_keys = ["zmin", "zmax", "reproj_method", "merge_method", "offset"] datasets_out = [] for dataset in datasets_dep: dd = {} # read in depth datasets; replace dep (source name; filename or xr.DataArray) if "elevtn" in dataset or "da" in dataset: try: da_elv = self.data_catalog.get_rasterdataset( dataset.get("elevtn", dataset.get("da")), bbox=self.mask.raster.transform_bounds(4326), buffer=10, variables=["elevtn"], zoom_level=(res, "meter"), ) # TODO remove ValueError after fix in hydromt core except (IndexError, ValueError): data_name = dataset.get("elevtn") self.logger.warning(f"No data in domain for {data_name}, skipped.") continue dd.update({"da": da_elv}) else: raise ValueError( "No 'elevtn' (topobathy) dataset provided in datasets_dep." ) # read offset filenames # NOTE offsets can be xr.DataArrays and floats if "offset" in dataset and not isinstance(dataset["offset"], (float, int)): da_offset = self.data_catalog.get_rasterdataset( dataset.get("offset"), bbox=self.mask.raster.transform_bounds(4326), buffer=10, ) dd.update({"offset": da_offset}) # read geodataframes describing valid areas if "mask" in dataset: gdf_valid = self.data_catalog.get_geodataframe( dataset.get("mask"), bbox=self.mask.raster.transform_bounds(4326), ) dd.update({"gdf_valid": gdf_valid}) # copy remaining keys for key, value in dataset.items(): if key in copy_keys and key not in dd: dd.update({key: value}) elif key not in copy_keys + parse_keys: self.logger.warning(f"Unknown key {key} in datasets_dep. Ignoring.") datasets_out.append(dd) return datasets_out def _parse_datasets_rgh(self, datasets_rgh): """Parse filenames or paths of Datasets in list of dictionaries datasets_rgh into xr.DataArrays and gdf.GeoDataFrames: * "manning" is parsed into da (xr.DataArray) * "lulc" is parsed into da (xr.DataArray) using reclass table in "reclass_table" * "mask" is parsed into gdf_valid (gpd.GeoDataFrame) Parameters ---------- datasets_rgh : List[dict], optional List of dictionaries with Manning's n datasets. Each dictionary should at least contain one of the following: * (1) manning: filename (or Path) of gridded data with manning values * (2) lulc (and reclass_table): a combination of a filename of gridded landuse/landcover and a reclassify table. In additon, optional merge arguments can be provided e.g.: merge_method, mask """ parse_keys = ["manning", "lulc", "reclass_table", "mask", "da"] copy_keys = ["reproj_method", "merge_method"] datasets_out = [] for dataset in datasets_rgh: dd = {} if "manning" in dataset or "da" in dataset: da_man = self.data_catalog.get_rasterdataset( dataset.get("manning", dataset.get("da")), bbox=self.mask.raster.transform_bounds(4326), buffer=10, ) dd.update({"da": da_man}) elif "lulc" in dataset: # landuse/landcover should always be combined with mapping lulc = dataset.get("lulc") reclass_table = dataset.get("reclass_table", None) if reclass_table is None and isinstance(lulc, str): reclass_table = join(DATADIR, "lulc", f"{lulc}_mapping.csv") if reclass_table is None: raise IOError( f"Manning roughness 'reclass_table' csv file must be provided" ) da_lulc = self.data_catalog.get_rasterdataset( lulc, bbox=self.mask.raster.transform_bounds(4326), buffer=10, variables=["lulc"], ) df_map = self.data_catalog.get_dataframe(reclass_table, index_col=0) # reclassify da_man = da_lulc.raster.reclassify(df_map[["N"]])["N"] dd.update({"da": da_man}) else: raise ValueError("No 'manning' dataset provided in datasets_rgh.") # read geodataframes describing valid areas if "mask" in dataset: gdf_valid = self.data_catalog.get_geodataframe( dataset.get("mask"), bbox=self.mask.raster.transform_bounds(4326), ) dd.update({"gdf_valid": gdf_valid}) # copy remaining keys for key, value in dataset.items(): if key in copy_keys and key not in dd: dd.update({key: value}) elif key not in copy_keys + parse_keys: self.logger.warning(f"Unknown key {key} in datasets_rgh. Ignoring.") datasets_out.append(dd) return datasets_out def _parse_datasets_riv(self, datasets_riv): """Parse filenames or paths of Datasets in list of dictionaries datasets_riv into xr.DataArrays and gdf.GeoDataFrames: see SfincsModel.setup_subgrid for details """ # option 1: rectangular river cross-sections based on river centerline # depth/bedlevel, manning attributes are specified on the river centerline # TODO: make this work with LineStringZ geometries for bedlevel # the width is either specified on the river centerline or river mask # option 2: (TODO): irregular river cross-sections # cross-sections are specified as a series of points (river_crosssections) parse_keys = [ "centerlines", "mask", "gdf_riv", "gdf_riv_mask", ] copy_keys = [] attrs = ["rivwth", "rivdph", "rivbed", "manning"] datasets_out = [] for dataset in datasets_riv: dd = {} # parse rivers if "centerlines" in dataset: rivers = dataset.get("centerlines") if isinstance(rivers, str) and rivers in self.geoms: gdf_riv = self.geoms[rivers].copy() else: gdf_riv = self.data_catalog.get_geodataframe( rivers, geom=self.mask.raster.box, buffer=1e3, # 1km ).to_crs(self.crs) # update missing attributes based on global values for key in attrs: if key in dataset: value = dataset.pop(key) if key not in gdf_riv.columns: # update all gdf_riv[key] = value elif np.any(np.isnan(gdf_riv[key])): # fill na gdf_riv[key] = gdf_riv[key].fillna(value) if not gdf_riv.columns.isin(["rivbed", "rivdph"]).any(): raise ValueError("No 'rivbed' or 'rivdph' attribute found.") else: raise ValueError("No 'centerlines' dataset provided.") dd.update({"gdf_riv": gdf_riv}) # parse mask if "mask" in dataset: gdf_riv_mask = self.data_catalog.get_geodataframe( dataset.get("mask"), geom=self.mask.raster.box, ) dd.update({"gdf_riv_mask": gdf_riv_mask}) elif "rivwth" not in gdf_riv: raise ValueError( "Either mask must be provided or centerlines " "should contain a 'rivwth' attribute." ) # copy remaining keys for key, value in dataset.items(): if key in copy_keys and key not in dd: dd.update({key: value}) elif key not in copy_keys + parse_keys: self.logger.warning(f"Unknown key {key} in datasets_riv. Ignoring.") datasets_out.append(dd) return datasets_out