Source code for hydromt_wflow.wflow

"""Implement Wflow model class."""

# Implement model class following model API

import codecs
import glob
import logging
import os
from os.path import basename, dirname, isdir, isfile, join
from pathlib import Path
from typing import Dict, List, Optional, Union

import geopandas as gpd

# from dask.distributed import LocalCluster, Client, performance_report
import hydromt
import numpy as np
import pandas as pd
import pyflwdir
import pyproj
import toml
import xarray as xr
from dask.diagnostics import ProgressBar
from hydromt import flw
from hydromt.models.model_grid import GridModel
from hydromt.nodata import NoDataStrategy
from pyflwdir import core_conversion, core_d8, core_ldd
from shapely.geometry import box

from . import DATADIR, utils, workflows

__all__ = ["WflowModel"]

logger = logging.getLogger(__name__)


[docs] class WflowModel(GridModel): """Wflow model class.""" _NAME = "wflow" _CONF = "wflow_sbm.toml" _CLI_ARGS = {"region": "setup_basemaps", "res": "setup_basemaps"} _DATADIR = DATADIR _GEOMS = {} _MAPS = { "flwdir": "wflow_ldd", "elevtn": "wflow_dem", "subelv": "dem_subgrid", "uparea": "wflow_uparea", "strord": "wflow_streamorder", "basins": "wflow_subcatch", "rivlen": "wflow_riverlength", "rivmsk": "wflow_river", "rivwth": "wflow_riverwidth", "lndslp": "Slope", "rivslp": "RiverSlope", "rivdph": "RiverDepth", "rivman": "N_River", "gauges": "wflow_gauges", "landuse": "wflow_landuse", "resareas": "wflow_reservoirareas", "reslocs": "wflow_reservoirlocs", "lakeareas": "wflow_lakeareas", "lakelocs": "wflow_lakelocs", "glacareas": "wflow_glacierareas", "glacfracs": "wflow_glacierfrac", "glacstore": "wflow_glacierstore", } _FOLDERS = [ "staticgeoms", "instate", "run_default", ] _CATALOGS = join(_DATADIR, "parameters_data.yml")
[docs] def __init__( self, root: Optional[str] = None, mode: Optional[str] = "w", config_fn: Optional[str] = None, data_libs: Union[List, str] = None, logger=logger, **artifact_keys, ): if data_libs is None: data_libs = [] for lib, version in artifact_keys.items(): logger.warning( "Adding a predefined data catalog as key-word argument is deprecated, " f"add the catalog as '{lib}={version}'" " to the data_libs list instead." ) if not version: # False or None continue elif isinstance(version, str): lib += f"={version}" data_libs = [lib] + data_libs super().__init__( root=root, mode=mode, config_fn=config_fn, data_libs=data_libs, logger=logger, ) # wflow specific self._intbl = dict() self._tables = dict() self._flwdir = None self.data_catalog.from_yml(self._CATALOGS) # To be deprecated from v0.6.0 onwards self._staticmaps = None self._staticgeoms = None
# COMPONENTS
[docs] def setup_basemaps( self, region: Dict, hydrography_fn: Union[str, xr.Dataset], basin_index_fn: Union[str, xr.Dataset] = None, res: Union[float, int] = 1 / 120.0, upscale_method: str = "ihu", ): """ Build the DEM and flow direction for a Wflow model. Setup basemaps sets the `region` of interest and `res` (resolution in degrees) of the model. All DEM and flow direction related maps are then build. E.g. of `region` argument for a subbasin based on a point and snapped using upstream area threshold in `hydrography_fn`, where the maximum boundary box of the output subbasin is known: region = {'subbasin': [x,y], 'uparea': 10, 'bounds': [xmin, ymin, xmax, ymax]} (Sub)Basin delination is done using hydromt.workflows.get_basin_geometry method. Because the delineation is computed from the flow direction data in memory, to avoid memory error when using large datasets in `hydrography_fn`, the user can either supply 'bounds' in `region` or a basin index dataset in `basin_index_fn` to limit the flow direction data to the region of interest. The basin index dataset is a GeoDataframe containing either basins polygons or bounding boxes of basin boundaries. To select the correct basins, basin ID 'basins' in `hydrography_fn` and `basin_index_fn` should match. If the model resolution is larger than the source data resolution, the flow direction is upscaled using the `upscale_method`, by default the Iterative Hydrography Upscaling (IHU). The default `hydrography_fn` is "merit_hydro" (`MERIT hydro <http://hydro.iis.u-tokyo.ac.jp/~yamadai/MERIT_Hydro/index.html>`_ at 3 arcsec resolution). Alternative sources include "merit_hydro_1k" at 30 arcsec resolution. Users can also supply their own elevation and flow direction data in any CRS and not only EPSG:4326. Note that in order to define the region, using points or bounding box, the coordinates of the points / bounding box should be in the same CRS than the hydrography data. The wflow model will then also be in the same CRS than the hydrography data in order to avoid assumptions and reprojection errors. If the user wishes to use a different CRS, we recommend first to reproject the hydrography data seperately, before calling hydromt build. You can find examples on how to reproject or prepare hydrography data in the `prepare flow directions example notebok <https://deltares.github.io/hydromt_wflow/latest/_examples/prepare_ldd.html>`_. Adds model layers: * **wflow_ldd** map: flow direction in LDD format [-] * **wflow_subcatch** map: basin ID map [-] * **wflow_uparea** map: upstream area [km2] * **wflow_streamorder** map: Strahler stream order [-] * **wflow_dem** map: average elevation [m+REF] * **dem_subgrid** map: subgrid outlet elevation [m+REF] * **Slope** map: average land surface slope [m/m] * **basins** geom: basins boundary vector * **region** geom: region boundary vector Parameters ---------- region : dict Dictionary describing region of interest. See :py:meth:`hydromt.workflows.basin_mask.parse_region()` for all options hydrography_fn : str, xarray.Dataset Name of RasterDataset source for basemap parameters. * Required variables: 'flwdir' [LLD or D8 or NEXTXY], 'elevtn' [m+REF] * Required variables if used with `basin_index_fn`: 'basins' [-] * Required variables if used for snapping in `region`: 'uparea' [km2], 'strord' [-] * Optional variables: 'lndslp' [m/m], 'mask' [bool] basin_index_fn : str, geopandas.GeoDataFrame, optional Name of GeoDataFrame source for basin_index data linked to hydrography_fn. * Required variables: 'basid' [-] res : float, optional Output model resolution upscale_method : {'ihu', 'eam', 'dmm'}, optional Upscaling method for flow direction data, by default 'ihu'. See Also -------- hydromt.workflows.parse_region hydromt.workflows.get_basin_geometry workflows.hydrography workflows.topography """ self.logger.info("Preparing base hydrography basemaps.") # retrieve global data (lazy!) ds_org = self.data_catalog.get_rasterdataset(hydrography_fn) # get basin geometry and clip data kind, region = hydromt.workflows.parse_region(region, logger=self.logger) xy = None if kind in ["basin", "subbasin", "outlet"]: if basin_index_fn is not None: bas_index = self.data_catalog[basin_index_fn] else: bas_index = None geom, xy = hydromt.workflows.get_basin_geometry( ds=ds_org, kind=kind, basin_index=bas_index, logger=self.logger, **region, ) elif "bbox" in region: bbox = region.get("bbox") geom = gpd.GeoDataFrame(geometry=[box(*bbox)], crs=4326) elif "geom" in region: geom = region.get("geom") else: raise ValueError(f"wflow region argument not understood: {region}") if geom is not None and geom.crs is None: raise ValueError("wflow region geometry has no CRS") ds_org = ds_org.raster.clip_geom(geom, align=res, buffer=10) ds_org.coords["mask"] = ds_org.raster.geometry_mask(geom) self.logger.debug("Adding basins vector to geoms.") self.set_geoms(geom, name="basins") # Check on resolution (degree vs meter) depending on ds_org res/crs scale_ratio = int(np.round(res / ds_org.raster.res[0])) if scale_ratio < 1: raise ValueError( f"The model resolution {res} should be \ larger than the {hydrography_fn} resolution {ds_org.raster.res[0]}" ) if ds_org.raster.crs.is_geographic: if res > 1: # 111 km raise ValueError( f"The model resolution {res} should be smaller than 1 degree \ (111km) for geographic coordinate systems. " "Make sure you provided res in degree rather than in meters." ) # setup hydrography maps and set staticmap attribute with renamed maps ds_base, _ = workflows.hydrography( ds=ds_org, res=res, xy=xy, upscale_method=upscale_method, logger=self.logger, ) # Convert flow direction from d8 to ldd format flwdir_data = ds_base["flwdir"].values.astype(np.uint8) # force dtype # if d8 convert to ldd if core_d8.isvalid(flwdir_data): data = core_conversion.d8_to_ldd(flwdir_data) da_flwdir = xr.DataArray( name="flwdir", data=data, coords=ds_base.raster.coords, dims=ds_base.raster.dims, attrs=dict( long_name="ldd flow direction", _FillValue=core_ldd._mv, ), ) ds_base["flwdir"] = da_flwdir # Rename and add to grid rmdict = {k: v for k, v in self._MAPS.items() if k in ds_base.data_vars} self.set_grid(ds_base.rename(rmdict)) # setup topography maps ds_topo = workflows.topography( ds=ds_org, ds_like=self.grid, method="average", logger=self.logger ) ds_topo["lndslp"] = np.maximum(ds_topo["lndslp"], 0.0) rmdict = {k: v for k, v in self._MAPS.items() if k in ds_topo.data_vars} self.set_grid(ds_topo.rename(rmdict)) # set basin geometry self.logger.debug("Adding region vector to geoms.") self.set_geoms(self.region, name="region") # update toml for degree/meters if needed if ds_base.raster.crs.is_projected: self.set_config("model.sizeinmetres", True)
[docs] def setup_rivers( self, hydrography_fn: Union[str, xr.Dataset], river_geom_fn: Union[str, gpd.GeoDataFrame] = None, river_upa: float = 30, rivdph_method: str = "powlaw", slope_len: float = 2e3, min_rivlen_ratio: float = 0.0, min_rivdph: float = 1, min_rivwth: float = 30, smooth_len: float = 5e3, rivman_mapping_fn: Union[ str, Path, pd.DataFrame ] = "roughness_river_mapping_default", elevtn_map: str = "wflow_dem", river_routing: str = "kinematic-wave", connectivity: int = 8, **kwargs, ): """ Set all river parameter maps. The river mask is defined by all cells with a mimimum upstream area threshold ``river_upa`` [km2]. The river length is defined as the distance from the subgrid outlet pixel to the next upstream subgrid outlet pixel. The ``min_rivlen_ratio`` is the minimum global river length to avg. cell resolution ratio and is used as a threshold in window based smoothing of river length. The river slope is derived from the subgrid elevation difference between pixels at a half distance ``slope_len`` [m] up- and downstream from the subgrid outlet pixel. The river manning roughness coefficient is derived based on reclassification of the streamorder map using a lookup table ``rivman_mapping_fn``. The river width is derived from the nearest river segment in ``river_geom_fn``. Data gaps are filled by the nearest valid upstream value and averaged along the flow directions over a length ``smooth_len`` [m] The river depth is calculated using the ``rivdph_method``, by default powlaw: h = hc*Qbf**hp, which is based on qbankfull discharge from the nearest river segment in ``river_geom_fn`` and takes optional arguments for the hc (default = 0.27) and hp (default = 0.30) parameters. For other methods see :py:meth:`hydromt.workflows.river_depth`. If ``river_routing`` is set to "local-inertial", the bankfull elevation map can be conditioned based on the average cell elevation ("wflow_dem") or subgrid outlet pixel elevation ("dem_subgrid"). The subgrid elevation might provide a better representation of the river elevation profile, however in combination with local-inertial land routing (see :py:meth:`setup_floodplains`) the subgrid elevation will likely overestimate the floodplain storage capacity. Note that the same input elevation map should be used for river bankfull elevation and land elevation when using local-inertial land routing. Adds model layers: * **wflow_river** map: river mask [-] * **wflow_riverlength** map: river length [m] * **wflow_riverwidth** map: river width [m] * **RiverDepth** map: bankfull river depth [m] * **RiverSlope** map: river slope [m/m] * **N_River** map: Manning coefficient for river cells [s.m^1/3] * **rivers** geom: river vector based on wflow_river mask * **hydrodem** map: hydrologically conditioned elevation [m+REF] Parameters ---------- hydrography_fn : str, Path, xarray.Dataset Name of RasterDataset source for hydrography data. Must be same as setup_basemaps for consistent results. * Required variables: 'flwdir' [LLD or D8 or NEXTXY], 'uparea' [km2], 'elevtn'[m+REF] * Optional variables: 'rivwth' [m], 'qbankfull' [m3/s] river_geom_fn : str, Path, geopandas.GeoDataFrame, optional Name of GeoDataFrame source for river data. * Required variables: 'rivwth' [m], 'qbankfull' [m3/s] river_upa : float, optional Minimum upstream area threshold for the river map [km2]. By default 30.0 slope_len : float, optional Length over which the river slope is calculated [km]. By default 2.0 min_rivlen_ratio: float, optional Ratio of cell resolution used minimum length threshold in a moving window based smoothing of river length, by default 0.0 The river length smoothing is skipped if `min_riverlen_ratio` = 0. For details about the river length smoothing, see :py:meth:`pyflwdir.FlwdirRaster.smooth_rivlen` rivdph_method : {'gvf', 'manning', 'powlaw'} see :py:meth:`hydromt.workflows.river_depth` for details, by default \ "powlaw" river_routing : {'kinematic-wave', 'local-inertial'} Routing methodology to be used, by default "kinematic-wave". smooth_len : float, optional Length [m] over which to smooth the output river width and depth, by default 5e3 min_rivdph : float, optional Minimum river depth [m], by default 1.0 min_rivwth : float, optional Minimum river width [m], by default 30.0 elevtn_map : str, optional Name of the elevation map in the current WflowModel.grid. By default "wflow_dem" See Also -------- workflows.river_bathymetry hydromt.workflows.river_depth pyflwdir.FlwdirRaster.river_depth setup_floodplains """ self.logger.info("Preparing river maps.") rivdph_methods = ["gvf", "manning", "powlaw"] if rivdph_method not in rivdph_methods: raise ValueError(f'"{rivdph_method}" unknown. Select from {rivdph_methods}') routing_options = ["kinematic-wave", "local-inertial"] if river_routing not in routing_options: raise ValueError( f'river_routing="{river_routing}" unknown. \ Select from {routing_options}.' ) # read data ds_hydro = self.data_catalog.get_rasterdataset( hydrography_fn, geom=self.region, buffer=10 ) ds_hydro.coords["mask"] = ds_hydro.raster.geometry_mask(self.region) # get rivmsk, rivlen, rivslp # read model maps and revert wflow to hydromt map names inv_rename = {v: k for k, v in self._MAPS.items() if v in self.grid} ds_riv = workflows.river( ds=ds_hydro, ds_model=self.grid.rename(inv_rename), river_upa=river_upa, slope_len=slope_len, channel_dir="up", min_rivlen_ratio=min_rivlen_ratio, logger=self.logger, )[0] ds_riv["rivmsk"] = ds_riv["rivmsk"].assign_attrs( river_upa=river_upa, slope_len=slope_len, min_rivlen_ratio=min_rivlen_ratio ) dvars = ["rivmsk", "rivlen", "rivslp"] rmdict = {k: self._MAPS.get(k, k) for k in dvars} self.set_grid(ds_riv[dvars].rename(rmdict)) # TODO make separate workflows.river_manning method # Make N_River map from csv file with mapping # between streamorder and N_River value strord = self.grid[self._MAPS["strord"]].copy() df = self.data_catalog.get_dataframe(rivman_mapping_fn) # max streamorder value above which values get the same N_River value max_str = df.index[-2] # if streamorder value larger than max_str, assign last value strord = strord.where(strord <= max_str, max_str) # handle missing value (last row of csv is mapping of missing values) strord = strord.where(strord != strord.raster.nodata, -999) strord.raster.set_nodata(-999) ds_nriver = workflows.landuse( da=strord, ds_like=self.grid, df=df, logger=self.logger, ) self.set_grid(ds_nriver) # get rivdph, rivwth # while we still have setup_riverwidth one can skip river_bathymetry here # TODO make river_geom_fn required after removing setup_riverwidth if river_geom_fn is not None: gdf_riv = self.data_catalog.get_geodataframe( river_geom_fn, geom=self.region ) # reread model data to get river maps inv_rename = {v: k for k, v in self._MAPS.items() if v in self.grid} ds_riv1 = workflows.river_bathymetry( ds_model=self.grid.rename(inv_rename), gdf_riv=gdf_riv, method=rivdph_method, smooth_len=smooth_len, min_rivdph=min_rivdph, min_rivwth=min_rivwth, logger=self.logger, **kwargs, ) rmdict = {k: v for k, v in self._MAPS.items() if k in ds_riv1.data_vars} self.set_grid(ds_riv1.rename(rmdict)) # update config self.set_config("input.lateral.river.bankfull_depth", self._MAPS["rivdph"]) self.logger.debug("Adding rivers vector to geoms.") self.geoms.pop("rivers", None) # remove old rivers if in geoms self.rivers # add new rivers to geoms # Add hydrologically conditioned elevation map for the river, if required if river_routing == "local-inertial": postfix = {"wflow_dem": "_avg", "dem_subgrid": "_subgrid"}.get( elevtn_map, "" ) name = f"hydrodem{postfix}" ds_out = flw.dem_adjust( da_flwdir=self.grid[self._MAPS["flwdir"]], da_elevtn=self.grid[elevtn_map], da_rivmsk=self.grid[self._MAPS["rivmsk"]], flwdir=self.flwdir, connectivity=connectivity, river_d8=True, logger=self.logger, ).rename(name) self.set_grid(ds_out) # update toml model.river_routing self.logger.debug( f'Update wflow config model.river_routing="{river_routing}"' ) self.set_config("model.river_routing", river_routing) self.set_config("input.lateral.river.bankfull_depth", self._MAPS["rivdph"]) self.set_config("input.lateral.river.bankfull_elevation", name) else: self.set_config("model.river_routing", river_routing)
[docs] def setup_floodplains( self, hydrography_fn: Union[str, xr.Dataset], floodplain_type: str, ### Options for 1D floodplains river_upa: Optional[float] = None, flood_depths: List = [0.5, 1.0, 1.5, 2.0, 2.5, 3.0, 4.0, 5.0], ### Options for 2D floodplains elevtn_map: str = "wflow_dem", connectivity: int = 4, ): """ Add floodplain information to the model schematisation. The user can define what type of floodplains are required (1D or 2D), through the ``floodplain_type`` argument. If ``floodplain_type`` is set to "1d", a floodplain profile is derived for every river cell. It adds a map with floodplain volume per flood depth, which is used in the wflow 1D floodplain schematisation. Note, it is important to use the same river uparea value as used in the :py:meth:`setup_rivers` method. If ``floodplain_type`` is set to "2d", this component adds a hydrologically conditioned elevation (hydrodem) map for land routing (local-inertial). For this options, landcells need to be conditioned to D4 flow directions otherwise pits may remain in the land cells. The conditioned elevation can be based on the average cell elevation ("wflow_dem") or subgrid outlet pixel elevation ("dem_subgrid"). Note that the subgrid elevation will likely overestimate the floodplain storage capacity. Additionally, note that the same input elevation map should be used for river bankfull elevation and land elevation when using local-inertial land routing. Requires :py:meth:`setup_rivers` to be executed beforehand (with ``river_routing`` set to "local-inertial"). Adds model layers: * **floodplain_volume** map: map with floodplain volumes, has flood depth as \ third dimension [m3] (for 1D floodplains) * **hydrodem** map: hydrologically conditioned elevation [m+REF] (for 2D \ floodplains) Parameters ---------- floodplain_type: {"1d", "2d"} Option defining the type of floodplains, see below what arguments are related to the different floodplain types hydrography_fn : str, Path, xarray.Dataset Name of RasterDataset source for hydrography data. Must be same as setup_basemaps for consistent results. * Required variables: ['flwdir', 'uparea', 'elevtn'] river_upa : float, optional (1D floodplains) minimum upstream area threshold for drain in the HAND. Optional value, as it is inferred from the grid metadata, to be consistent with setup_rivers. flood_depths : tuple of float, optional (1D floodplains) flood depths at which a volume is derived. By default [0.5, 1.0, 1.5, 2.0, 2.5, 3.0, 4.0, 5.0] elevtn_map: {"wflow_dem", "dem_subgrid"} (2D floodplains) Name of staticmap to hydrologically condition. By default "wflow_dem" See Also -------- workflows.river_floodplain_volume hydromt.flw.dem_adjust pyflwdir.FlwdirRaster.dem_adjust setup_rivers """ if self.get_config("model.river_routing") != "local-inertial": raise ValueError( "Floodplains (1d or 2d) are currently only supported with \ local intertial river routing" ) r_list = ["1d", "2d"] if floodplain_type not in r_list: raise ValueError( f'river_routing="{floodplain_type}" unknown. Select from {r_list}.' ) # Adjust settings based on floodplain_type selection if floodplain_type == "1d": floodplain_1d = True land_routing = "kinematic-wave" if not hasattr(pyflwdir.FlwdirRaster, "ucat_volume"): self.logger.warning("This method requires pyflwdir >= 0.5.6") return self.logger.info("Preparing 1D river floodplain_volume map.") # read data ds_hydro = self.data_catalog.get_rasterdataset( hydrography_fn, geom=self.region, buffer=10 ) ds_hydro.coords["mask"] = ds_hydro.raster.geometry_mask(self.region) # try to get river uparea from grid, throw error if not specified # or when found but different from specified value new_river_upa = self.grid[self._MAPS["rivmsk"]].attrs.get( "river_upa", river_upa ) if new_river_upa is None: raise ValueError( "No value for `river_upa` specified, and the value cannot \ be inferred from the grid attributes" ) elif new_river_upa != river_upa and river_upa is not None: raise ValueError( f"Value specified for river_upa ({river_upa}) is different from \ the value found in the grid ({new_river_upa})" ) self.logger.debug(f"Using river_upa value value of: {new_river_upa}") # get river floodplain volume inv_rename = {v: k for k, v in self._MAPS.items() if v in self.grid} da_fldpln = workflows.river_floodplain_volume( ds=ds_hydro, ds_model=self.grid.rename(inv_rename), river_upa=new_river_upa, flood_depths=flood_depths, logger=self.logger, ) # check if the layer already exists, since overwriting with different # flood_depth values is not working properly if this is the case if "floodplain_volume" in self.grid: self.logger.warning( "Layer `floodplain_volume` already in grid, removing layer \ and `flood_depth` dimension to ensure correctly \ setting new flood_depth dimensions" ) self._grid = self._grid.drop_dims("flood_depth") self.set_grid(da_fldpln, "floodplain_volume") elif floodplain_type == "2d": floodplain_1d = False land_routing = "local-inertial" if elevtn_map not in self.grid: raise ValueError(f'"{elevtn_map}" not found in grid') postfix = {"wflow_dem": "_avg", "dem_subgrid": "_subgrid"}.get( elevtn_map, "" ) name = f"hydrodem{postfix}" self.logger.info(f"Preparing {name} map for land routing.") name = f"hydrodem{postfix}_D{connectivity}" self.logger.info(f"Preparing {name} map for land routing.") ds_out = flw.dem_adjust( da_flwdir=self.grid[self._MAPS["flwdir"]], da_elevtn=self.grid[elevtn_map], da_rivmsk=self.grid[self._MAPS["rivmsk"]], flwdir=self.flwdir, connectivity=connectivity, river_d8=True, logger=self.logger, ).rename(name) self.set_grid(ds_out) # Update config self.logger.debug(f'Update wflow config model.floodplain_1d="{floodplain_1d}"') self.set_config("model.floodplain_1d", floodplain_1d) self.logger.debug(f'Update wflow config model.land_routing="{land_routing}"') self.set_config("model.land_routing", land_routing) if floodplain_type == "1d": # include new input data self.set_config( "input.lateral.river.floodplain.volume", "floodplain_volume" ) # Add states self.set_config("state.lateral.river.floodplain.q", "q_floodplain") self.set_config("state.lateral.river.floodplain.h", "h_floodplain") self.set_config("state.lateral.land.q", "q_land") # Remove local-inertial land states if self.get_config("state.lateral.land.qx") is not None: self.config["state"]["lateral"]["land"].pop("qx", None) if self.get_config("state.lateral.land.qy") is not None: self.config["state"]["lateral"]["land"].pop("qy", None) if self.get_config("output.lateral.land.qx") is not None: self.config["output"]["lateral"]["land"].pop("qx", None) if self.get_config("output.lateral.land.qy") is not None: self.config["output"]["lateral"]["land"].pop("qy", None) else: # include new input data self.set_config("input.lateral.river.bankfull_elevation", name) self.set_config("input.lateral.land.elevation", name) # Add local-inertial land routing states self.set_config("state.lateral.land.qx", "qx_land") self.set_config("state.lateral.land.qy", "qy_land") # Remove kinematic-wave and 1d floodplain states if self.get_config("state.lateral.land.q") is not None: self.config["state"]["lateral"]["land"].pop("q", None) if self.get_config("state.lateral.river.floodplain.q") is not None: self.config["state"]["lateral"]["river"]["floodplain"].pop("q", None) if self.get_config("state.lateral.river.floodplain.h") is not None: self.config["state"]["lateral"]["river"]["floodplain"].pop("h", None) if self.get_config("output.lateral.land.q") is not None: self.config["output"]["lateral"]["land"].pop("q", None)
def setup_riverwidth( self, predictor: str = "discharge", fill: bool = False, fit: bool = False, min_wth: float = 1.0, precip_fn: Union[str, xr.DataArray] = "chelsa", climate_fn: Union[str, xr.DataArray] = "koppen_geiger", **kwargs, ): """ Set the river width parameter based on power-law relationship with a predictor. By default the riverwidth is estimated based on discharge as ``predictor`` and used to set the riverwidth globally based on pre-defined power-law parameters per climate class. With ``fit`` set to True, the power-law relationsship paramters are set on-the-fly. With ``fill`` set to True, the estimated river widths are only used to fill gaps in the observed data. Alternative ``predictor`` values are precip (accumulated precipitation) and uparea (upstream area). For these predictors values ``fit`` default to True. By default the predictor is based on discharge which is estimated through multiple linear regression with precipitation and upstream area per climate zone. * **wflow_riverwidth** map: river width [m] Parameters ---------- predictor : {"discharge", "precip", "uparea"} Predictor used in the power-law equation: width = a * predictor ^ b. Discharge is based on multiple linear regression per climate zone. Precip is based on the 10x the daily average accumulated precipitation [m3/s]. Uparea is based on the upstream area grid [km2]. Other variables, e.g. bankfull discharge, can also be provided if present in the grid fill : bool, optional If True (default), use estimate to fill gaps, outliers and lake/res areas in observed width data (if present); if False, set all riverwidths based on predictor (automatic choice if no observations found) fit : bool, optional If True, the power-law parameters are fitted on the fly By default True for all but "discharge" predictor. A-priori derived parameters will be overwritten if True. a, b : float, optional kwarg Manual power-law parameters min_wth : float, optional Minimum river width, by default 1.0 precip_fn : str, xarray.DataArray Source of long term precipitation grid if the predictor is set to 'discharge' or 'precip'. By default "chelsa". climate_fn: str, xarray.DataArray Source of long-term climate grid if the predictor is set to 'discharge'. By default "koppen_geiger". """ self.logger.warning( 'The "setup_riverwidth" method has been deprecated \ and will soon be removed. ' 'You can now use the "setup_river" method for all river parameters.' ) if self._MAPS["rivmsk"] not in self.grid: raise ValueError( 'The "setup_riverwidth" method requires \ to run setup_river method first.' ) # derive river width data = {} if predictor in ["discharge", "precip"]: da_precip = self.data_catalog.get_rasterdataset( precip_fn, geom=self.region, buffer=2 ) da_precip.name = precip_fn data["da_precip"] = da_precip if predictor == "discharge": da_climate = self.data_catalog.get_rasterdataset( climate_fn, geom=self.region, buffer=2 ) da_climate.name = climate_fn data["da_climate"] = da_climate inv_rename = {v: k for k, v in self._MAPS.items() if v in self.grid} da_rivwth = workflows.river_width( ds_like=self.grid.rename(inv_rename), flwdir=self.flwdir, data=data, fill=fill, fill_outliers=kwargs.pop("fill_outliers", fill), min_wth=min_wth, mask_names=["lakeareas", "resareas", "glacareas"], predictor=predictor, a=kwargs.get("a", None), b=kwargs.get("b", None), logger=self.logger, fit=fit, **kwargs, ) self.set_grid(da_rivwth, name=self._MAPS["rivwth"])
[docs] def setup_lulcmaps( self, lulc_fn: Union[str, xr.DataArray], lulc_mapping_fn: Union[str, Path, pd.DataFrame] = None, lulc_vars: List = [ "landuse", "Kext", "N", "PathFrac", "RootingDepth", "Sl", "Swood", "WaterFrac", ], ): """ Derive several wflow maps based on landuse-landcover (LULC) data. Lookup table `lulc_mapping_fn` columns are converted to lulc classes model parameters based on literature. The data is remapped at its original resolution and then resampled to the model resolution using the average value, unless noted differently. Currently, if `lulc_fn` is set to the "vito", "globcover", "esa_worldcover" or "corine", default lookup tables are available and will be used if `lulc_mapping_fn` is not provided. Adds model layers: * **landuse** map: Landuse class [-] * **Kext** map: Extinction coefficient in the canopy gap fraction equation [-] * **Sl** map: Specific leaf storage [mm] * **Swood** map: Fraction of wood in the vegetation/plant [-] * **RootingDepth** map: Length of vegetation roots [mm] * **PathFrac** map: The fraction of compacted or urban area per grid cell [-] * **WaterFrac** map: The fraction of open water per grid cell [-] * **N** map: Manning Roughness [-] Parameters ---------- lulc_fn : str, xarray.DataArray Name of RasterDataset source in data_sources.yml file. lulc_mapping_fn : str, Path, pd.DataFrame Path to a mapping csv file from landuse in source name to parameter values in lulc_vars. If lulc_fn is one of {"globcover", "vito", "corine", "esa_worldcover"}, a default mapping is used and this argument becomes optional. lulc_vars : list List of landuse parameters to keep. By default \ ["landuse","Kext","N","PathFrac","RootingDepth","Sl","Swood","WaterFrac"] """ self.logger.info("Preparing LULC parameter maps.") if lulc_mapping_fn is None: fn_map = f"{lulc_fn}_mapping_default" else: fn_map = lulc_mapping_fn if not isfile(fn_map) and fn_map not in self.data_catalog: raise ValueError(f"LULC mapping file not found: {fn_map}") # read landuse map to DataArray da = self.data_catalog.get_rasterdataset( lulc_fn, geom=self.region, buffer=2, variables=["landuse"] ) df_map = self.data_catalog.get_dataframe( fn_map, driver_kwargs={"index_col": 0}, # only used if fn_map is a file path ) # process landuse ds_lulc_maps = workflows.landuse( da=da, ds_like=self.grid, df=df_map, params=lulc_vars, logger=self.logger, ) rmdict = {k: v for k, v in self._MAPS.items() if k in ds_lulc_maps.data_vars} self.set_grid(ds_lulc_maps.rename(rmdict))
[docs] def setup_laimaps(self, lai_fn: Union[str, xr.DataArray]): """ Set leaf area index (LAI) climatology maps per month [1,2,3,...,12]. The values are resampled to the model resolution using the average value. Currently only directly cyclic LAI data is supported. Adds model layers: * **LAI** map: Leaf Area Index climatology [-] Resampled from source data using average. Assuming that missing values correspond to bare soil, these are set to zero before resampling. Parameters ---------- lai_fn : str, xarray.DataArray Name of RasterDataset source for LAI parameters, see data/data_sources.yml. * Required variables: 'LAI' [-] * Required dimensions: 'time' = [1,2,3,...,12] (months) """ # retrieve data for region self.logger.info("Preparing LAI maps.") da = self.data_catalog.get_rasterdataset(lai_fn, geom=self.region, buffer=2) da_lai = workflows.lai( da=da, ds_like=self.grid, logger=self.logger, ) # Rename the first dimension to time rmdict = {da_lai.dims[0]: "time"} self.set_grid(da_lai.rename(rmdict), name="LAI")
[docs] def setup_config_output_timeseries( self, mapname: str, toml_output: Optional[str] = "csv", header: Optional[List[str]] = ["Q"], param: Optional[List[str]] = ["lateral.river.q_av"], reducer: Optional[List[str]] = None, ): """Set the default gauge map based on basin outlets. Adds model layers: * **csv.column** config: csv timeseries to save based on mapname locations * **netcdf.variable** config: netcdf timeseries to save based on mapname \ locations Parameters ---------- mapname : str Name of the gauge map (in staticmaps.nc) to use for scalar output. toml_output : str, optional One of ['csv', 'netcdf', None] to update [csv] or [netcdf] section of wflow toml file or do nothing. By default, 'csv'. header : list, optional Save specific model parameters in csv section. This option defines the header of the csv file. By default saves Q (for lateral.river.q_av). param: list, optional Save specific model parameters in csv section. This option defines the wflow variable corresponding to the names in gauge_toml_header. By default saves lateral.river.q_av (for Q). reducer: list, optional If map is an area rather than a point location, provides the reducer for the parameters to save. By default None. """ # # Add new outputcsv section in the config if toml_output == "csv" or toml_output == "netcdf": self.logger.info(f"Adding {param} to {toml_output} section of toml.") # Add map to the input section of config basename = ( mapname if not mapname.startswith("wflow") else mapname.replace("wflow_", "") ) self.set_config(f"input.{basename}", mapname) # Settings and add csv or netcdf sections if not already in config # csv if toml_output == "csv": header_name = "header" var_name = "column" if self.get_config("csv") is None: self.set_config("csv.path", "output.csv") # netcdf if toml_output == "netcdf": header_name = "name" var_name = "variable" if self.get_config("netcdf") is None: self.set_config("netcdf.path", "output_scalar.nc") # initialise column / varibale section if self.get_config(f"{toml_output}.{var_name}") is None: self.set_config(f"{toml_output}.{var_name}", []) # Add new output column/variable to config for o in range(len(param)): gauge_toml_dict = { header_name: header[o], "map": basename, "parameter": param[o], } if reducer is not None: gauge_toml_dict["reducer"] = reducer[o] # If the gauge column/variable already exists skip writting twice if gauge_toml_dict not in self.config[toml_output][var_name]: self.config[toml_output][var_name].append(gauge_toml_dict) else: self.logger.info( f"toml_output set to {toml_output}, \ skipping adding gauge specific outputs to the toml." )
[docs] def setup_outlets( self, river_only=True, toml_output="csv", gauge_toml_header=["Q"], gauge_toml_param=["lateral.river.q_av"], ): """Set the default gauge map based on basin outlets. If wflow_subcatch is available, the catchment outlets IDs will be matching the wflow_subcatch IDs. If not, then IDs from 1 to number of outlets are used. Can also add csv/netcdf output settings in the TOML. Adds model layers: * **wflow_gauges** map: gauge IDs map from catchment outlets [-] * **gauges** geom: polygon of catchment outlets Parameters ---------- river_only : bool, optional Only derive outlet locations if they are located on a river instead of locations for all catchments, by default True. toml_output : str, optional One of ['csv', 'netcdf', None] to update [csv] or [netcdf] section of wflow toml file or do nothing. By default, 'csv'. gauge_toml_header : list, optional Save specific model parameters in csv section. This option defines the header of the csv file. By default saves Q (for lateral.river.q_av). gauge_toml_param: list, optional Save specific model parameters in csv section. This option defines the wflow variable corresponding to the names in gauge_toml_header. By default saves lateral.river.q_av (for Q). """ # read existing geoms; important to get the right basin when updating # fix in set_geoms / set_geoms method self.geoms self.logger.info("Gauges locations set based on river outlets.") idxs_out = self.flwdir.idxs_pit # Only keep river outlets for gauges if river_only: idxs_out = idxs_out[ (self.grid[self._MAPS["rivmsk"]] > 0).values.flat[idxs_out] ] # Use the wflow_subcatch ids if self._MAPS["basins"] in self.grid: ids = self.grid[self._MAPS["basins"]].values.flat[idxs_out] else: ids = None da_out, idxs_out, ids_out = flw.gauge_map( self.grid, idxs=idxs_out, ids=ids, flwdir=self.flwdir, logger=self.logger, ) self.set_grid(da_out, name=self._MAPS["gauges"]) points = gpd.points_from_xy(*self.grid.raster.idx_to_xy(idxs_out)) gdf = gpd.GeoDataFrame( index=ids_out.astype(np.int32), geometry=points, crs=self.crs ) gdf["fid"] = ids_out.astype(np.int32) self.set_geoms(gdf, name="gauges") self.logger.info("Gauges map based on catchment river outlets added.") self.setup_config_output_timeseries( mapname="wflow_gauges", toml_output=toml_output, header=gauge_toml_header, param=gauge_toml_param, )
[docs] def setup_gauges( self, gauges_fn: Union[str, Path, gpd.GeoDataFrame], index_col: Optional[str] = None, snap_to_river: bool = True, mask: Optional[np.ndarray] = None, snap_uparea: bool = False, max_dist: float = 10e3, wdw: int = 3, rel_error: float = 0.05, abs_error: float = 50.0, fillna: bool = False, derive_subcatch: bool = False, basename: Optional[str] = None, toml_output: str = "csv", gauge_toml_header: List[str] = ["Q", "P"], gauge_toml_param: List[str] = [ "lateral.river.q_av", "vertical.precipitation", ], **kwargs, ): """Set a gauge map based on ``gauges_fn`` data. Supported gauge datasets include data catlog entries, direct GeoDataFrame or "<path_to_source>" for user supplied csv or geometry files with gauge locations. If a csv file is provided, a "x" or "lon" and "y" or "lat" column is required and the first column will be used as IDs in the map. There are four available methods to prepare the gauge map: * no snapping: ``mask=None``, ``snap_to_river=False``, ``snap_uparea=False``. The gauge locations are used as is. * snapping to mask: the gauge locations are snapped to a boolean mask map based on the closest dowsntream cell within the mask: either provide ``mask`` or set ``snap_to_river=True`` to snap to the river cells (default). ``max_dist`` can be used to set the maximum distance to snap to the mask. * snapping based on upstream area matching: : ``snap_uparea=True``. The gauge locations are snapped to the closest matching upstream area value. Requires gauges_fn to have an ``uparea`` [km2] column. The closest value will be looked for in a cell window of size ``wdw`` and the absolute and relative differences between the gauge and the closest value should be smaller than ``abs_error`` and ``rel_error``. * snapping based on upstream area matching and mask: ``snap_uparea=True``, ``mask`` or ``snap_to_river=True``. The gauge locations are snapped to the closest matching upstream area value within the mask. If ``derive_subcatch`` is set to True, an additional subcatch map is derived from the gauge locations. Finally the output locations can be added to wflow TOML file sections [csv] or [netcdf] using the ``toml_output`` option. The ``gauge_toml_header`` and ``gauge_toml_param`` options can be used to define the header and corresponding wflow variable names in the TOML file. Adds model layers: * **wflow_gauges_source** map: gauge IDs map from source [-] (if gauges_fn) * **wflow_subcatch_source** map: subcatchment based on gauge locations [-] \ (if derive_subcatch) * **gauges_source** geom: polygon of gauges from source * **subcatch_source** geom: polygon of subcatchment based on \ gauge locations [-] (if derive_subcatch) Parameters ---------- gauges_fn : str, Path, geopandas.GeoDataFrame Catalog source name, path to gauges file geometry file or geopandas.GeoDataFrame. * Required variables if snap_uparea is True: 'uparea' [km2] index_col : str, optional Column in gauges_fn to use for ID values, by default None (use the default index column) mask : np.boolean, optional If provided snaps to the mask, else snaps to the river (default). snap_to_river : bool, optional Snap point locations to the closest downstream river cell, by default True snap_uparea: bool, optional Snap gauges based on upstream area. Gauges_fn should have "uparea" in its attributes. max_dist : float, optional Maximum distance [m] between original and snapped point location. A warning is logged if exceeded. By default 10 000m. wdw: int, optional Window size in number of cells around the gauge locations to snap uparea to, only used if ``snap_uparea`` is True. By default 3. rel_error: float, optional Maximum relative error (default 0.05) between the gauge location upstream area and the upstream area of the best fit grid cell, only used if snap_uparea is True. abs_error: float, optional Maximum absolute error (default 50.0) between the gauge location upstream area and the upstream area of the best fit grid cell, only used if snap_uparea is True. fillna: bool, optional Fill missing values in the gauges uparea column with the values from wflow upstream area (ie no snapping). By default False and the gauges with NaN values are skipped. derive_subcatch : bool, optional Derive subcatch map for gauges, by default False basename : str, optional Map name in grid (wflow_gauges_basename) if None use the gauges_fn basename. toml_output : str, optional One of ['csv', 'netcdf', None] to update [csv] or [netcdf] section of wflow toml file or do nothing. By default, 'csv'. gauge_toml_header : list, optional Save specific model parameters in csv section. This option defines the header of the csv file. By default saves Q (for lateral.river.q_av) and P (for vertical.precipitation). gauge_toml_param: list, optional Save specific model parameters in csv section. This option defines the wflow variable corresponding to the names in gauge_toml_header. By default saves lateral.river.q_av (for Q) and vertical.precipitation (for P). kwargs : dict, optional Additional keyword arguments to pass to the get_data method ie get_geodataframe or get_geodataset depending on the data_type of gauges_fn. """ # Read data kwargs = {} if isinstance(gauges_fn, gpd.GeoDataFrame): gdf_gauges = gauges_fn if not np.all(np.isin(gdf_gauges.geometry.type, "Point")): raise ValueError(f"{gauges_fn} contains other geometries than Point") elif isfile(gauges_fn): # try to get epsg number directly, important when writting back data_catalog if hasattr(self.crs, "to_epsg"): code = self.crs.to_epsg() else: code = self.crs kwargs.update(crs=code) gdf_gauges = self.data_catalog.get_geodataframe( gauges_fn, geom=self.basins, assert_gtype="Point", handle_nodata=NoDataStrategy.IGNORE, **kwargs, ) elif gauges_fn in self.data_catalog: if self.data_catalog[gauges_fn].data_type == "GeoDataFrame": gdf_gauges = self.data_catalog.get_geodataframe( gauges_fn, geom=self.basins, assert_gtype="Point", handle_nodata=NoDataStrategy.IGNORE, **kwargs, ) elif self.data_catalog[gauges_fn].data_type == "GeoDataset": da = self.data_catalog.get_geodataset( gauges_fn, geom=self.basins, handle_nodata=NoDataStrategy.IGNORE, **kwargs, ) gdf_gauges = da.vector.to_gdf() # Check for point geometry if not np.all(np.isin(gdf_gauges.geometry.type, "Point")): raise ValueError( f"{gauges_fn} contains other geometries than Point" ) else: raise ValueError( f"{gauges_fn} data source not found or \ incorrect data_type (GeoDataFrame or GeoDataset)." ) # Create basename if basename is None: basename = os.path.basename(gauges_fn).split(".")[0].replace("_", "-") # Check if there is data found if gdf_gauges is None: self.logger.info("Skipping method, as no data has been found") return # Create the gauges map self.logger.info( f"{gdf_gauges.index.size} {basename} gauge locations found within domain" ) # read existing geoms; important to get the right basin when updating self.geoms # Reproject to model crs gdf_gauges = gdf_gauges.to_crs(self.crs).copy() # Get coords, index and ID xs, ys = np.vectorize(lambda p: (p.xy[0][0], p.xy[1][0]))( gdf_gauges["geometry"] ) idxs = self.grid.raster.xy_to_idx(xs, ys) if index_col is not None and index_col in gdf_gauges.columns: gdf_gauges = gdf_gauges.set_index(index_col) if np.any(gdf_gauges.index == 0): self.logger.warning("Gauge ID 0 is not allowed, setting to 1") gdf_gauges.index = gdf_gauges.index.values + 1 ids = gdf_gauges.index.values # if snap_to_river use river map as the mask if snap_to_river and mask is None: mask = self._MAPS["rivmsk"] if mask is not None: mask = self.grid[mask].values if snap_uparea and "uparea" in gdf_gauges.columns: # Derive gauge map based on upstream area snapping da, idxs, ids = workflows.gauge_map_uparea( self.grid, gdf_gauges, uparea_name="wflow_uparea", mask=mask, wdw=wdw, rel_error=rel_error, abs_error=abs_error, fillna=fillna, logger=self.logger, ) else: # Derive gauge map da, idxs, ids = flw.gauge_map( self.grid, idxs=idxs, ids=ids, stream=mask, flwdir=self.flwdir, max_dist=max_dist, logger=self.logger, ) # Filter gauges that could not be snapped to rivers if snap_to_river: ids_old = ids.copy() da = da.where(self.grid[self._MAPS["rivmsk"]] != 0, da.raster.nodata) ids_new = np.unique(da.values[da.values > 0]) idxs = idxs[np.isin(ids_old, ids_new)] ids = da.values.flat[idxs] # Check if there are gauges left if ids.size == 0: self.logger.warning( "No gauges found within domain after snapping, skipping method." ) return # Add to grid mapname = f'{str(self._MAPS["gauges"])}_{basename}' self.set_grid(da, name=mapname) # geoms points = gpd.points_from_xy(*self.grid.raster.idx_to_xy(idxs)) # if csv contains additional columns, these are also written in the geoms gdf_snapped = gpd.GeoDataFrame( index=ids.astype(np.int32), geometry=points, crs=self.crs ) # Set the index name of gdf snapped based on original gdf if gdf_gauges.index.name is not None: gdf_snapped.index.name = gdf_gauges.index.name else: gdf_snapped.index.name = "fid" gdf_gauges.index.name = "fid" # Add gdf attributes to gdf_snapped (filter on snapped index before merging) df_attrs = pd.DataFrame(gdf_gauges.drop(columns="geometry")) df_attrs = df_attrs[np.isin(df_attrs.index, gdf_snapped.index)] gdf_snapped = gdf_snapped.merge(df_attrs, how="inner", on=gdf_gauges.index.name) # Add gdf_snapped to geoms self.set_geoms(gdf_snapped, name=mapname.replace("wflow_", "")) # Add output timeseries for gauges in the toml self.setup_config_output_timeseries( mapname=mapname, toml_output=toml_output, header=gauge_toml_header, param=gauge_toml_param, ) # add subcatch if derive_subcatch: da_basins = flw.basin_map(self.grid, self.flwdir, idxs=idxs, ids=ids)[0] mapname = self._MAPS["basins"] + "_" + basename self.set_grid(da_basins, name=mapname) gdf_basins = self.grid[mapname].raster.vectorize() self.set_geoms(gdf_basins, name=mapname.replace("wflow_", ""))
[docs] def setup_areamap( self, area_fn: Union[str, gpd.GeoDataFrame], col2raster: str, nodata: Union[int, float] = -1, ): """Set area map from vector data to save wflow outputs for specific area. Adds model layer: * **col2raster** map: output area data map Parameters ---------- area_fn : str, geopandas.GeoDataFrame Name of GeoDataFrame data corresponding to wflow output area. col2raster : str Name of the column from `area_fn` to rasterize. nodata : int/float, optional Nodata value to use when rasterizing. Should match the dtype of `col2raster` . By default -1. """ self.logger.info(f"Preparing '{col2raster}' map from '{area_fn}'.") gdf_org = self.data_catalog.get_geodataframe( area_fn, geom=self.basins, dst_crs=self.crs ) if gdf_org.empty: self.logger.warning( f"No shapes of {area_fn} found within region, skipping areamap." ) return else: da_area = self.grid.raster.rasterize( gdf=gdf_org, col_name=col2raster, nodata=nodata, all_touched=True, ) self.set_grid(da_area.rename(col2raster))
[docs] def setup_lakes( self, lakes_fn: Union[str, Path, gpd.GeoDataFrame], rating_curve_fns: List[Union[str, Path, pd.DataFrame]] = None, min_area: float = 10.0, add_maxstorage: bool = False, **kwargs, ): """Generate maps of lake areas and outlets. Also meant to generate parameters with average lake area, depth and discharge values. The data is generated from features with ``min_area`` [km2] (default 1 km2) from a database with lake geometry, IDs and metadata. Data required are lake ID 'waterbody_id', average area 'Area_avg' [m2], average volume 'Vol_avg' [m3], average depth 'Depth_avg' [m] and average discharge 'Dis_avg' [m3/s]. If rating curve data is available for storage and discharge they can be prepared via ``rating_curve_fns`` (see below for syntax and requirements). Else the parameters 'Lake_b' and 'Lake_e' will be used for discharge and for storage a rectangular profile lake is assumed. See Wflow documentation for more information. If ``add_maxstorage`` is True, the maximum storage of the lake is added to the output (controlled lake) based on 'Vol_max' [m3] column of lakes_fn. Adds model layers: * **wflow_lakeareas** map: lake IDs [-] * **wflow_lakelocs** map: lake IDs at outlet locations [-] * **LakeArea** map: lake area [m2] * **LakeAvgLevel** map: lake average water level [m] * **LakeAvgOut** map: lake average discharge [m3/s] * **Lake_b** map: lake rating curve coefficient [-] * **LakeOutflowFunc** map: option to compute rating curve [-] * **LakeStorFunc** map: option to compute storage curve [-] * **LakeMaxStorage** map: optional, maximum storage of lake [m3] * **lakes** geom: polygon with lakes and wflow lake parameters Parameters ---------- lakes_fn : Name of GeoDataFrame source for lake parameters, see data/data_sources.yml. * Required variables for direct use: \ 'waterbody_id' [-], 'Area_avg' [m2], 'Depth_avg' [m], 'Dis_avg' [m3/s], 'Lake_b' [-], \ 'Lake_e' [-], 'LakeOutflowFunc' [-], 'LakeStorFunc' [-], 'LakeThreshold' [m], \ 'LinkedLakeLocs' [-] * Required variables for parameter estimation: \ 'waterbody_id' [-], 'Area_avg' [m2], 'Vol_avg' [m3], 'Depth_avg' [m], 'Dis_avg'[m3/s] rating_curve_fns: str, Path, pandas.DataFrame, List, optional Data catalog entry/entries, path(s) or pandas.DataFrame containing rating curve values for lakes. If None then will be derived from properties of `lakes_fn`. Assumes one file per lake (with all variables) and that the lake ID is either in the filename or data catalog entry name (eg using placeholder). The ID should be placed at the end separated by an underscore (eg 'rating_curve_12.csv' or 'rating_curve_12') * Required variables for storage curve: 'elevtn' [m+REF], 'volume' [m3] * Required variables for rating curve: 'elevtn' [m+REF], 'discharge' [m3/s] min_area : float, optional Minimum lake area threshold [km2], by default 10.0 km2. add_maxstorage : bool, optional If True, maximum storage of the lake is added to the output (controlled lake) based on 'Vol_max' [m3] column of lakes_fn. By default False (natural lake). kwargs: optional Keyword arguments passed to the method hydromt.DataCatalog.get_rasterdataset() """ # Derive lake are and outlet maps gdf_org, ds_lakes = self._setup_waterbodies( lakes_fn, "lake", min_area, **kwargs ) if ds_lakes is None: self.logger.info("Skipping method, as no data has been found") return rmdict = {k: v for k, v in self._MAPS.items() if k in ds_lakes.data_vars} ds_lakes = ds_lakes.rename(rmdict) # If rating_curve_fn prepare rating curve dict rating_dict = dict() if rating_curve_fns is not None: rating_curve_fns = np.atleast_1d(rating_curve_fns) # Find ids in rating_curve_fns fns_ids = [] for fn in rating_curve_fns: try: fns_ids.append(int(fn.split("_")[-1].split(".")[0])) except Exception: self.logger.warning( f"Could not parse integer lake index from \ rating curve fn {fn}. Skipping." ) # assume lake index will be in the path # Assume one rating curve per lake index for id in gdf_org["waterbody_id"].values: id = int(id) # Find if id is is one of the paths in rating_curve_fns if id in fns_ids: # Update path based on current waterbody_id i = fns_ids.index(id) rating_fn = rating_curve_fns[i] # Read data if isfile(rating_fn) or rating_fn in self.data_catalog: self.logger.info( f"Preparing lake rating curve data from {rating_fn}" ) df_rate = self.data_catalog.get_dataframe(rating_fn) # Add to dict rating_dict[id] = df_rate else: self.logger.warning( f"Rating curve file not found for lake with id {id}. \ Using default storage/outflow function parameters." ) else: self.logger.info( "No rating curve data provided. \ Using default storage/outflow function parameters." ) # add waterbody parameters ds_lakes, gdf_lakes, rating_curves = workflows.waterbodies.lakeattrs( ds_lakes, gdf_org, rating_dict, add_maxstorage=add_maxstorage ) # add to grid self.set_grid(ds_lakes) # write lakes with attr tables to static geoms. self.set_geoms(gdf_lakes, name="lakes") # add the tables for k, v in rating_curves.items(): self.set_tables(v, name=k) # if there are lakes, change True in toml # Lake seetings in the toml to update lakes_toml = { "model.lakes": True, "state.lateral.river.lake.waterlevel": "waterlevel_lake", "input.lateral.river.lake.area": "LakeArea", "input.lateral.river.lake.areas": "wflow_lakeareas", "input.lateral.river.lake.b": "Lake_b", "input.lateral.river.lake.e": "Lake_e", "input.lateral.river.lake.locs": "wflow_lakelocs", "input.lateral.river.lake.outflowfunc": "LakeOutflowFunc", "input.lateral.river.lake.storfunc": "LakeStorFunc", "input.lateral.river.lake.threshold": "LakeThreshold", "input.lateral.river.lake.linkedlakelocs": "LinkedLakeLocs", "input.lateral.river.lake.waterlevel": "LakeAvgLevel", } if "LakeMaxStorage" in ds_lakes: lakes_toml["input.lateral.river.lake.maxstorage"] = "LakeMaxStorage" for option in lakes_toml: self.set_config(option, lakes_toml[option])
[docs] def setup_reservoirs( self, reservoirs_fn: Union[str, gpd.GeoDataFrame], timeseries_fn: str = None, min_area: float = 1.0, **kwargs, ): """Generate maps of reservoir areas and outlets. Also meant to generate parameters with average reservoir area, demand, min and max target storage capacities and discharge capacity values. The data is generated from features with ``min_area`` [km2] (default is 1 km2) from a database with reservoir geometry, IDs and metadata. Data requirements for direct use (i.e. wflow parameters are data already present in reservoirs_fn) are reservoir ID 'waterbody_id', area 'ResSimpleArea' [m2], maximum volume 'ResMaxVolume' [m3], the targeted minimum and maximum fraction of water volume in the reservoir 'ResTargetMinFrac' and 'ResTargetMaxFrac' [-], the average water demand ResDemand [m3/s] and the maximum release of the reservoir before spilling 'ResMaxRelease' [m3/s]. In case the wflow parameters are not directly available they can be computed by HydroMT based on time series of reservoir surface water area. These time series can be retreived from either the hydroengine or the gwwapi, based on the Hylak_id the reservoir, found in the GrandD database. The required variables for computation of the parameters with time series data are reservoir ID 'waterbody_id', reservoir ID in the HydroLAKES database 'Hylak_id', average volume 'Vol_avg' [m3], average depth 'Depth_avg' [m], average discharge 'Dis_avg' [m3/s] and dam height 'Dam_height' [m]. To compute parameters without using time series data, the required varibales in reservoirs_fn are reservoir ID 'waterbody_id', average area 'Area_avg' [m2], average volume 'Vol_avg' [m3], average depth 'Depth_avg' [m], average discharge 'Dis_avg' [m3/s] and dam height 'Dam_height' [m] and minimum / normal / maximum storage capacity of the dam 'Capacity_min', 'Capacity_norm', 'Capacity_max' [m3]. Adds model layers: * **wflow_reservoirareas** map: reservoir IDs [-] * **wflow_reservoirlocs** map: reservoir IDs at outlet locations [-] * **ResSimpleArea** map: reservoir area [m2] * **ResMaxVolume** map: reservoir max volume [m3] * **ResTargetMinFrac** map: reservoir target min frac [m3/m3] * **ResTargetFullFrac** map: reservoir target full frac [m3/m3] * **ResDemand** map: reservoir demand flow [m3/s] * **ResMaxRelease** map: reservoir max release flow [m3/s] * **reservoirs** geom: polygon with reservoirs and wflow reservoir parameters Parameters ---------- reservoirs_fn : str Name of data source for reservoir parameters, see data/data_sources.yml. * Required variables for direct use: \ 'waterbody_id' [-], 'ResSimpleArea' [m2], 'ResMaxVolume' [m3], 'ResTargetMinFrac' \ [m3/m3], 'ResTargetFullFrac' [m3/m3], 'ResDemand' [m3/s], 'ResMaxRelease' [m3/s] * Required variables for computation with timeseries_fn: \ 'waterbody_id' [-], 'Hylak_id' [-], 'Vol_avg' [m3], 'Depth_avg' [m], 'Dis_avg' [m3/s], \ 'Dam_height' [m] * Required variables for computation without timeseries_fn: \ 'waterbody_id' [-], 'Area_avg' [m2], 'Vol_avg' [m3], 'Depth_avg' [m], 'Dis_avg' \ [m3/s], 'Capacity_max' [m3], 'Capacity_norm' [m3], 'Capacity_min' [m3], 'Dam_height' [m] timeseries_fn : {'gww', 'hydroengine', None}, optional Download and use time series of reservoir surface water area to calculate and overwrite the reservoir volume/areas of the data source. Timeseries are either downloaded from Global Water Watch 'gww' (using gwwapi package) or JRC 'jrc' (using hydroengine package). By default None. min_area : float, optional Minimum reservoir area threshold [km2], by default 1.0 km2. kwargs: optional Keyword arguments passed to the method hydromt.DataCatalog.get_rasterdataset() """ # rename to wflow naming convention tbls = { "resarea": "ResSimpleArea", "resdemand": "ResDemand", "resfullfrac": "ResTargetFullFrac", "resminfrac": "ResTargetMinFrac", "resmaxrelease": "ResMaxRelease", "resmaxvolume": "ResMaxVolume", "resid": "expr1", } res_toml = { "model.reservoirs": True, "state.lateral.river.reservoir.volume": "volume_reservoir", "input.lateral.river.reservoir.area": "ResSimpleArea", "input.lateral.river.reservoir.areas": "wflow_reservoirareas", "input.lateral.river.reservoir.demand": "ResDemand", "input.lateral.river.reservoir.locs": "wflow_reservoirlocs", "input.lateral.river.reservoir.maxrelease": "ResMaxRelease", "input.lateral.river.reservoir.maxvolume": "ResMaxVolume", "input.lateral.river.reservoir.targetfullfrac": "ResTargetFullFrac", "input.lateral.river.reservoir.targetminfrac": "ResTargetMinFrac", } gdf_org, ds_res = self._setup_waterbodies( reservoirs_fn, "reservoir", min_area, **kwargs ) # TODO: check if there are missing values in the above columns of # the parameters tbls = # if everything is present, skip calculate reservoirattrs() and # directly make the maps # Skip method if no data is returned if ds_res is None: self.logger.info("Skipping method, as no data has been found") return # Continue method if data has been found rmdict = {k: v for k, v in self._MAPS.items() if k in ds_res.data_vars} self.set_grid(ds_res.rename(rmdict)) # add attributes # if present use directly resattributes = [ "waterbody_id", "ResSimpleArea", "ResMaxVolume", "ResTargetMinFrac", "ResTargetFullFrac", "ResDemand", "ResMaxRelease", ] if np.all(np.isin(resattributes, gdf_org.columns)): intbl_reservoirs = gdf_org[resattributes] reservoir_accuracy = None reservoir_timeseries = None # else compute else: ( intbl_reservoirs, reservoir_accuracy, reservoir_timeseries, ) = workflows.reservoirattrs( gdf=gdf_org, timeseries_fn=timeseries_fn, logger=self.logger ) intbl_reservoirs = intbl_reservoirs.rename(columns=tbls) # create a geodf with id of reservoir and gemoetry at outflow location gdf_org_points = gpd.GeoDataFrame( gdf_org["waterbody_id"], geometry=gpd.points_from_xy(gdf_org.xout, gdf_org.yout), ) intbl_reservoirs = intbl_reservoirs.rename(columns={"expr1": "waterbody_id"}) gdf_org_points = gdf_org_points.merge( intbl_reservoirs, on="waterbody_id" ) # merge # add parameter attributes to polygone gdf: gdf_org = gdf_org.merge(intbl_reservoirs, on="waterbody_id") # write reservoirs with param values to geoms self.set_geoms(gdf_org, name="reservoirs") for name in gdf_org_points.columns[2:]: gdf_org_points[name] = gdf_org_points[name].astype("float32") da_res = ds_res.raster.rasterize( gdf_org_points, col_name=name, dtype="float32", nodata=-999 ) self.set_grid(da_res) # Save accuracy information on reservoir parameters if reservoir_accuracy is not None: reservoir_accuracy.to_csv(join(self.root, "reservoir_accuracy.csv")) if reservoir_timeseries is not None: reservoir_timeseries.to_csv( join(self.root, f"reservoir_timeseries_{timeseries_fn}.csv") ) for option in res_toml: self.set_config(option, res_toml[option])
def _setup_waterbodies(self, waterbodies_fn, wb_type, min_area=0.0, **kwargs): """Help with common workflow of setup_lakes and setup_reservoir. See specific methods for more info about the arguments. """ # retrieve data for basin self.logger.info(f"Preparing {wb_type} maps.") if "predicate" not in kwargs: kwargs.update(predicate="contains") gdf_org = self.data_catalog.get_geodataframe( waterbodies_fn, geom=self.basins, handle_nodata=NoDataStrategy.IGNORE, **kwargs, ) if gdf_org is None: # Return two times None (similair to main function output), if there is no # data found return None, None # skip small size waterbodies if "Area_avg" in gdf_org.columns and gdf_org.geometry.size > 0: min_area_m2 = min_area * 1e6 gdf_org = gdf_org[gdf_org.Area_avg >= min_area_m2] else: self.logger.warning( f"{wb_type}'s database has no area attribute. " f"All {wb_type}s will be considered." ) # get waterbodies maps and parameters nb_wb = gdf_org.geometry.size ds_waterbody = None if nb_wb > 0: self.logger.info( f"{nb_wb} {wb_type}(s) of sufficient size found within region." ) # add waterbody maps uparea_name = self._MAPS["uparea"] if uparea_name not in self.grid.data_vars: self.logger.warning( f"Upstream area map for {wb_type} outlet setup not found. " "Database coordinates used instead" ) uparea_name = None ds_waterbody, gdf_wateroutlet = workflows.waterbodymaps( gdf=gdf_org, ds_like=self.grid, wb_type=wb_type, uparea_name=uparea_name, logger=self.logger, ) # update/replace xout and yout in gdf_org from gdf_wateroutlet: gdf_org["xout"] = gdf_wateroutlet["xout"] gdf_org["yout"] = gdf_wateroutlet["yout"] else: self.logger.warning( f"No {wb_type}s of sufficient size found within region! " f"Skipping {wb_type} procedures!" ) # rasterize points polygons in raster.rasterize -- # you need grid to know the grid return gdf_org, ds_waterbody
[docs] def setup_soilmaps( self, soil_fn: str = "soilgrids", ptf_ksatver: str = "brakensiek", wflow_thicknesslayers: List[int] = [100, 300, 800], ): """ Derive several (layered) soil parameters. Based on a database with physical soil properties using available point-scale (pedo)transfer functions (PTFs) from literature with upscaling rules to ensure flux matching across scales. Currently, supported ``soil_fn`` is "soilgrids" and "soilgrids_2020". ``ptf_ksatver`` (PTF for the vertical hydraulic conductivity) options are "brakensiek" and "cosby". "soilgrids" provides data at 7 specific depths, while "soilgrids_2020" provides data averaged over 6 depth intervals. This leads to small changes in the workflow: (1) M parameter uses midpoint depths in soilgrids_2020 versus \ specific depths in soilgrids, (2) weighted average of soil properties over soil thickness is done with \ the trapezoidal rule in soilgrids versus simple block weighted average in \ soilgrids_2020, (3) the c parameter is computed as weighted average over wflow_sbm soil layers \ defined in ``wflow_thicknesslayers``. The required data from soilgrids are soil bulk density 'bd_sl*' [g/cm3], \ clay content 'clyppt_sl*' [%], silt content 'sltppt_sl*' [%], organic carbon content \ 'oc_sl*' [%], pH 'ph_sl*' [-], sand content 'sndppt_sl*' [%] and soil thickness \ 'soilthickness' [cm]. The following maps are added to grid: * **thetaS** map: average saturated soil water content [m3/m3] * **thetaR** map: average residual water content [m3/m3] * **KsatVer** map: vertical saturated hydraulic conductivity at \ soil surface [mm/day] * **SoilThickness** map: soil thickness [mm] * **SoilMinThickness** map: minimum soil thickness [mm] (equal to SoilThickness) * **M** map: model parameter [mm] that controls exponential decline of \ KsatVer with soil depth (fitted with curve_fit (scipy.optimize)), bounds of M are \ checked * **M_** map: model parameter [mm] that controls exponential decline of \ KsatVer with soil depth (fitted with numpy linalg regression), bounds of `M_` are \ checked * **M_original** map: M without checking bounds * **M_original_** map: `M_` without checking bounds * **f** map: scaling parameter controlling the decline of KsatVer [mm-1] \ (fitted with curve_fit (scipy.optimize)), bounds are checked * **f_** map: scaling parameter controlling the decline of KsatVer [mm-1] \ (fitted with numpy linalg regression), bounds are checked * **c_n** map: Brooks Corey coefficients [-] based on pore size distribution, \ a map for each of the wflow_sbm soil layers (n in total) * **KsatVer_[z]cm** map: KsatVer [mm/day] at soil depths [z] of SoilGrids data \ [0.0, 5.0, 15.0, 30.0, 60.0, 100.0, 200.0] * **wflow_soil** map: soil texture based on USDA soil texture triangle \ (mapping: [1:Clay, 2:Silty Clay, 3:Silty Clay-Loam, 4:Sandy Clay, 5:Sandy Clay-Loam, \ 6:Clay-Loam, 7:Silt, 8:Silt-Loam, 9:Loam, 10:Sand, 11: Loamy Sand, 12:Sandy Loam]) Parameters ---------- soil_fn : {'soilgrids', 'soilgrids_2020'} Name of RasterDataset source for soil parameter maps, see data/data_sources.yml. Should contain info for the 7 soil depths of soilgrids (or 6 depths intervals for soilgrids_2020). * Required variables: \ 'bd_sl*' [g/cm3], 'clyppt_sl*' [%], 'sltppt_sl*' [%], 'oc_sl*' [%], 'ph_sl*' [-], \ 'sndppt_sl*' [%], 'soilthickness' [cm] ptf_ksatver : {'brakensiek', 'cosby'} Pedotransfer function (PTF) to use for calculation KsatVer (vertical saturated hydraulic conductivity [mm/day]). By default 'brakensiek'. wflow_thicknesslayers : list of int, optional Thickness of soil layers [mm] for wflow_sbm soil model. By default [100, 300, 800] for layers at depths 100, 400, 1200 and >1200 mm. Used only for Brooks Corey coefficients. """ self.logger.info("Preparing soil parameter maps.") # TODO add variables list with required variable names dsin = self.data_catalog.get_rasterdataset(soil_fn, geom=self.region, buffer=2) dsout = workflows.soilgrids( ds=dsin, ds_like=self.grid, ptfKsatVer=ptf_ksatver, soil_fn=soil_fn, wflow_layers=wflow_thicknesslayers, logger=self.logger, ).reset_coords(drop=True) self.set_grid(dsout) # Update the toml file self.set_config("model.thicknesslayers", wflow_thicknesslayers)
[docs] def setup_ksathorfrac( self, ksat_fn: Union[str, xr.DataArray], variable: str | None = None, resampling_method: str = "average", ): """Set KsatHorFrac parameter values from a predetermined map. This predetermined map contains (preferably) 'calibrated' values of \ the KsatHorFrac parameter. This map is either selected from the wflow Deltares data \ or created by a third party/ individual. Parameters ---------- ksat_fn : str, optional The identifier of the KsatHorFrac dataset in the data catalog. variable : str | None, optional The variable name for the ksathorfrac map to use in ``ksat_fn`` in case \ ``ksat_fn`` contains several variables. By default None. resampling_method : str, optional The resampling method when up- or downscaled, by default "average" """ self.logger.info("Preparing KsatHorFrac parameter map.") dain = self.data_catalog.get_rasterdataset( ksat_fn, geom=self.region, buffer=2, variables=variable, single_var_as_array=True, ) # Ensure its a DataArray if isinstance(dain, xr.Dataset): raise ValueError( "The ksathorfrac data contains several variables. \ Select the variable to use for ksathorfrac using 'variable' argument." ) # Create scaled ksathorfrac map daout = workflows.ksathorfrac( dain, ds_like=self.grid, resampling_method=resampling_method, ) # Set the output variable name if not isinstance(ksat_fn, str): bname = ksat_fn.name if ksat_fn.name is not None else "KsatHorFrac" else: bname = ksat_fn # base name of the outgoing layer name lname = bname if variable is not None: lname += f"_{variable}" # Set the grid self.set_grid(daout, name=lname) self.set_config("input.lateral.subsurface.ksathorfrac", lname)
[docs] def setup_glaciers(self, glaciers_fn="rgi", min_area=1): """ Generate maps of glacier areas, area fraction and volume fraction. Also generates tables with temperature threshold, melting factor and snow-to-ice convertion fraction. The data is generated from features with ``min_area`` [km2] (default is 1 km2) from a database with glacier geometry, IDs and metadata. The required variables from glaciers_fn dataset are glacier ID 'simple_id'. Optionnally glacier area 'AREA' [km2] can be present to filter the glaciers by size. If not present it will be computed on the fly. Adds model layers: * **wflow_glacierareas** map: glacier IDs [-] * **wflow_glacierfrac** map: area fraction of glacier per cell [-] * **wflow_glacierstore** map: storage (volume) of glacier per cell [mm] * **G_TT** map: temperature threshold for glacier melt/buildup [°C] * **G_Cfmax** map: glacier melting factor [mm/°C*day] * **G_SIfrac** map: fraction of snowpack on top of glacier converted to ice, \ added to glacierstore [-] Parameters ---------- glaciers_fn : {'rgi'} Name of data source for glaciers, see data/data_sources.yml. * Required variables: ['simple_id'] min_area : float, optional Minimum glacier area threshold [km2], by default 0 (all included) """ glac_toml = { "model.glacier": True, "state.vertical.glacierstore": "glacierstore", "input.vertical.glacierstore": "wflow_glacierstore", "input.vertical.glacierfrac": "wflow_glacierfrac", "input.vertical.g_cfmax": "G_Cfmax", "input.vertical.g_tt": "G_TT", "input.vertical.g_sifrac": "G_SIfrac", } # retrieve data for basin self.logger.info("Preparing glacier maps.") gdf_org = self.data_catalog.get_geodataframe( glaciers_fn, geom=self.basins, predicate="intersects", handle_nodata=NoDataStrategy.IGNORE, ) # Check if there are glaciers found if gdf_org is None: self.logger.info("Skipping method, as no data has been found") return # skip small size glacier if "AREA" in gdf_org.columns and gdf_org.geometry.size > 0: gdf_org = gdf_org[gdf_org["AREA"] >= min_area] # get glacier maps and parameters nb_glac = gdf_org.geometry.size ds_glac = None if nb_glac > 0: self.logger.info( f"{nb_glac} glaciers of sufficient size found within region." ) # add glacier maps ds_glac = workflows.glaciermaps( gdf=gdf_org, ds_like=self.grid, id_column="simple_id", elevtn_name=self._MAPS["elevtn"], logger=self.logger, ) rmdict = {k: v for k, v in self._MAPS.items() if k in ds_glac.data_vars} self.set_grid(ds_glac.rename(rmdict)) self.set_geoms(gdf_org, name="glaciers") for option in glac_toml: self.set_config(option, glac_toml[option]) else: self.logger.warning( "No glaciers of sufficient size found within region!" "Skipping glacier procedures!" )
[docs] def setup_constant_pars( self, dtype: str = "float32", nodata: Union[int, float] = -999, **kwargs ): """Generate constant parameter maps for all active model cells. Adds model layer: * **param_name** map: constant parameter map. Parameters ---------- dtype: str data type nodata: int or float nodata value kwargs "param_name: value" pairs for constant grid. """ for key, value in kwargs.items(): nodata = np.dtype(dtype).type(nodata) da_param = xr.where(self.grid[self._MAPS["basins"]], value, nodata).astype( dtype ) da_param.raster.set_nodata(nodata) da_param = da_param.rename(key) self.set_grid(da_param)
[docs] def setup_grid_from_raster( self, raster_fn: Union[str, xr.Dataset], reproject_method: str, variables: Optional[List[str]] = None, wflow_variables: Optional[List[str]] = None, fill_method: Optional[str] = None, ) -> List[str]: """ Add data variable(s) from ``raster_fn`` to grid object. If raster is a dataset, all variables will be added unless ``variables`` list is specified. The config toml can also be updated to include the new maps using ``wflow_variables``. Adds model layers: * **raster.name** or **variables** grid: data from raster_fn Parameters ---------- raster_fn: str Source name of RasterDataset in data_catalog. reproject_method: str Reprojection method from rasterio.enums.Resampling. Available methods: ['nearest', 'bilinear', 'cubic', 'cubic_spline', \ 'lanczos', 'average', 'mode', 'gauss', 'max', 'min', 'med', 'q1', 'q3', \ 'sum', 'rms'] variables: list, optional List of variables to add to grid from raster_fn. By default all. wflow_variables: list, optional List of corresponding wflow variables to update the config toml (e.g: ["input.vertical.altitude"]). Should match the variables list. variables list should be provided unless raster_fn contains a single variable (len 1). fill_method : str, optional If specified, fills nodata values using fill_nodata method. Available methods are {'linear', 'nearest', 'cubic', 'rio_idw'}. Returns ------- list Names of added model staticmap layers. """ self.logger.info(f"Preparing grid data from raster source {raster_fn}") # Read raster data and select variables ds = self.data_catalog.get_rasterdataset( raster_fn, geom=self.region, buffer=2, variables=variables, single_var_as_array=False, ) # Fill nodata if fill_method is not None: ds = ds.raster.interpolate_na(method=fill_method) # Reprojection ds_out = ds.raster.reproject_like(self.grid, method=reproject_method) # Add to grid self.set_grid(ds_out) # Update config if wflow_variables is not None: self.logger.info( f"Updating the config for wflow_variables: {wflow_variables}" ) if variables is None: if len(ds_out.data_vars) == 1: variables = list(ds_out.data_vars.keys()) else: raise ValueError( "Cannot update the toml if raster_fn has more than \ one variable and variables list is not provided." ) # Check on len if len(wflow_variables) != len(variables): raise ValueError( f"Length of variables {variables} do not match wflow_variables \ {wflow_variables}. Cannot update the toml." ) else: for i in range(len(variables)): self.set_config(wflow_variables[i], variables[i])
[docs] def setup_precip_forcing( self, precip_fn: Union[str, xr.DataArray], precip_clim_fn: Optional[Union[str, xr.DataArray]] = None, chunksize: Optional[int] = None, **kwargs, ) -> None: """Generate gridded precipitation forcing at model resolution. Adds model layer: * **precip**: precipitation [mm] Parameters ---------- precip_fn : str, xarray.DataArray Precipitation RasterDataset source, see data/forcing_sources.yml. * Required variable: 'precip' [mm] * Required dimension: 'time' [timestamp] precip_clim_fn : str, xarray.DataArray, optional High resolution climatology precipitation RasterDataset source to correct precipitation, see data/forcing_sources.yml. * Required variable: 'precip' [mm] * Required dimension: 'time' [cyclic month] chunksize: int, optional Chunksize on time dimension for processing data (not for saving to disk!). If None the data chunksize is used, this can however be optimized for large/small catchments. By default None. """ starttime = self.get_config("starttime") endtime = self.get_config("endtime") freq = pd.to_timedelta(self.get_config("timestepsecs"), unit="s") mask = self.grid[self._MAPS["basins"]].values > 0 precip = self.data_catalog.get_rasterdataset( precip_fn, geom=self.region, buffer=2, time_tuple=(starttime, endtime), variables=["precip"], ) if chunksize is not None: precip = precip.chunk({"time": chunksize}) clim = None if precip_clim_fn is not None: clim = self.data_catalog.get_rasterdataset( precip_clim_fn, geom=precip.raster.box, buffer=2, variables=["precip"], ) precip_out = hydromt.workflows.forcing.precip( precip=precip, da_like=self.grid[self._MAPS["elevtn"]], clim=clim, freq=freq, resample_kwargs=dict(label="right", closed="right"), logger=self.logger, **kwargs, ) # Update meta attributes (used for default output filename later) precip_out.attrs.update({"precip_fn": precip_fn}) if precip_clim_fn is not None: precip_out.attrs.update({"precip_clim_fn": precip_clim_fn}) self.set_forcing(precip_out.where(mask), name="precip")
[docs] def setup_temp_pet_forcing( self, temp_pet_fn: Union[str, xr.Dataset], pet_method: str = "debruin", press_correction: bool = True, temp_correction: bool = True, wind_correction: bool = True, wind_altitude: int = 10, reproj_method: str = "nearest_index", dem_forcing_fn: Union[str, xr.DataArray] = None, skip_pet: str = False, chunksize: Optional[int] = None, **kwargs, ) -> None: """Generate gridded temperature and reference evapotranspiration forcing. If `temp_correction` is True, the temperature will be reprojected and then downscaled to model resolution using the elevation lapse rate. For better accuracy, you can provide the elevation grid of the climate data in `dem_forcing_fn`. If not present, the upscaled elevation grid of the wflow model is used ('wflow_dem'). To compute PET (`skip_pet` is False), several methods are available. Before computation, both the temperature and pressure can be downscaled. Wind speed should be given at 2m altitude and can be corrected if `wind_correction` is True and the wind data altitude is provided in `wind_altitude` [m]. Several methods to compute pet are available: {'debruin', 'makkink', 'penman-monteith_rh_simple', 'penman-monteith_tdew'}. Depending on the methods, `temp_pet_fn` should contain temperature 'temp' [°C], pressure 'press_msl' [hPa], incoming shortwave radiation 'kin' [W/m2], outgoing shortwave radiation 'kout' [W/m2], wind speed 'wind' [m/s], relative humidity 'rh' [%], dew point temperature 'temp_dew' [°C], wind speed either total 'wind' or the U- 'wind10_u' [m/s] and V- 'wind10_v' components [m/s]. Adds model layer: * **pet**: reference evapotranspiration [mm] * **temp**: temperature [°C] Parameters ---------- temp_pet_fn : str, xarray.Dataset Name or path of RasterDataset source with variables to calculate temperature and reference evapotranspiration, see data/forcing_sources.yml. * Required variable for temperature: 'temp' [°C] * Required variables for De Bruin reference evapotranspiration: \ 'temp' [°C], 'press_msl' [hPa], 'kin' [W/m2], 'kout' [W/m2] * Required variables for Makkink reference evapotranspiration: \ 'temp' [°C], 'press_msl' [hPa], 'kin'[W/m2] * Required variables for daily Penman-Monteith \ reference evapotranspiration: \ either {'temp' [°C], 'temp_min' [°C], 'temp_max' [°C], 'wind' [m/s], 'rh' [%], 'kin' \ [W/m2]} for 'penman-monteith_rh_simple' or {'temp' [°C], 'temp_min' [°C], 'temp_max' \ [°C], 'temp_dew' [°C], 'wind' [m/s], 'kin' [W/m2], 'press_msl' [hPa], 'wind10_u' [m/s],\ "wind10_v" [m/s]} for 'penman-monteith_tdew' (these are the variables available in ERA5) pet_method : {'debruin', 'makkink', 'penman-monteith_rh_simple', \ 'penman-monteith_tdew'}, optional Reference evapotranspiration method, by default 'debruin'. If penman-monteith is used, requires the installation of the pyet package. press_correction, temp_correction : bool, optional If True pressure, temperature are corrected using elevation lapse rate, by default False. dem_forcing_fn : str, default None Elevation data source with coverage of entire meteorological forcing domain. If temp_correction is True and dem_forcing_fn is provided this is used in combination with elevation at model resolution to correct the temperature. * Required variable: 'elevtn' [m+REF] wind_correction : bool, optional If True wind speed is corrected to wind at 2m altitude using ``wind_altitude``. By default True. wind_altitude : int, optional Altitude of wind speed [m] variable, by default 10. Only used if ``wind_correction`` is True. skip_pet : bool, optional If True calculate temp only. reproj_method : str, optional Reprojection method from rasterio.enums.Resampling. to reproject the climate data to the model resolution. By default 'nearest_index'. chunksize: int, optional Chunksize on time dimension for processing data (not for saving to disk!). If None the data chunksize is used, this can however be optimized for large/small catchments. By default None. """ starttime = self.get_config("starttime") endtime = self.get_config("endtime") timestep = self.get_config("timestepsecs") freq = pd.to_timedelta(timestep, unit="s") mask = self.grid[self._MAPS["basins"]].values > 0 variables = ["temp"] if not skip_pet: if pet_method == "debruin": variables += ["press_msl", "kin", "kout"] elif pet_method == "makkink": variables += ["press_msl", "kin"] elif pet_method == "penman-monteith_rh_simple": variables += ["temp_min", "temp_max", "wind", "rh", "kin"] elif pet_method == "penman-monteith_tdew": variables += [ "temp_min", "temp_max", "wind10_u", "wind10_v", "temp_dew", "kin", "press_msl", ] else: methods = [ "debruin", "makking", "penman-monteith_rh_simple", "penman-monteith_tdew", ] raise ValueError( f"Unknown pet method {pet_method}, select from {methods}" ) ds = self.data_catalog.get_rasterdataset( temp_pet_fn, geom=self.region, buffer=1, time_tuple=(starttime, endtime), variables=variables, single_var_as_array=False, # always return dataset ) if chunksize is not None: ds = ds.chunk({"time": chunksize}) dem_forcing = None if dem_forcing_fn is not None: dem_forcing = self.data_catalog.get_rasterdataset( dem_forcing_fn, geom=ds.raster.box, # clip dem with forcing bbox for full coverage buffer=2, variables=["elevtn"], ).squeeze() temp_in = hydromt.workflows.forcing.temp( ds["temp"], dem_model=self.grid[self._MAPS["elevtn"]], dem_forcing=dem_forcing, lapse_correction=temp_correction, logger=self.logger, freq=None, # resample time after pet workflow **kwargs, ) if ( "penman-monteith" in pet_method ): # also downscaled temp_min and temp_max for Penman needed temp_max_in = hydromt.workflows.forcing.temp( ds["temp_max"], dem_model=self.grid[self._MAPS["elevtn"]], dem_forcing=dem_forcing, lapse_correction=temp_correction, logger=self.logger, freq=None, # resample time after pet workflow **kwargs, ) temp_max_in.name = "temp_max" temp_min_in = hydromt.workflows.forcing.temp( ds["temp_min"], dem_model=self.grid[self._MAPS["elevtn"]], dem_forcing=dem_forcing, lapse_correction=temp_correction, logger=self.logger, freq=None, # resample time after pet workflow **kwargs, ) temp_min_in.name = "temp_min" temp_in = xr.merge([temp_in, temp_max_in, temp_min_in]) if not skip_pet: pet_out = hydromt.workflows.forcing.pet( ds[variables[1:]], temp=temp_in, dem_model=self.grid[self._MAPS["elevtn"]], method=pet_method, press_correction=press_correction, wind_correction=wind_correction, wind_altitude=wind_altitude, reproj_method=reproj_method, freq=freq, resample_kwargs=dict(label="right", closed="right"), logger=self.logger, **kwargs, ) # Update meta attributes with setup opt opt_attr = { "pet_fn": temp_pet_fn, "pet_method": pet_method, } pet_out.attrs.update(opt_attr) self.set_forcing(pet_out.where(mask), name="pet") # make sure only temp is written to netcdf if "penman-monteith" in pet_method: temp_in = temp_in["temp"] # resample temp after pet workflow temp_out = hydromt.workflows.forcing.resample_time( temp_in, freq, upsampling="bfill", # we assume right labeled original data downsampling="mean", label="right", closed="right", conserve_mass=False, logger=self.logger, ) # Update meta attributes with setup opt (used for default naming later) opt_attr = { "temp_fn": temp_pet_fn, "temp_correction": str(temp_correction), } temp_out.attrs.update(opt_attr) self.set_forcing(temp_out.where(mask), name="temp")
[docs] def setup_pet_forcing( self, pet_fn: Union[str, xr.DataArray], chunksize: Optional[int] = None, ): """ Prepare PET forcing from existig PET data. Adds model layer: * **pet**: reference evapotranspiration [mm] Parameters ---------- pet_fn: str, xr.DataArray RasterDataset source or data for PET to be resampled. * Required variable: 'pet' [mm] chunksize: int, optional Chunksize on time dimension for processing data (not for saving to disk!). If None the data chunksize is used, this can however be optimized for large/small catchments. By default None. """ self.logger.info("Preparing potential evapotranspiration forcing maps.") starttime = self.get_config("starttime") endtime = self.get_config("endtime") freq = pd.to_timedelta(self.get_config("timestepsecs"), unit="s") pet = self.data_catalog.get_rasterdataset( pet_fn, geom=self.region, buffer=2, variables=["pet"], time_tuple=(starttime, endtime), ) pet_out = workflows.forcing.pet( pet=pet, ds_like=self.grid, freq=freq, mask_name=self._MAPS["basins"], chunksize=chunksize, logger=self.logger, ) # Update meta attributes (used for default output filename later) pet_out.attrs.update({"pet_fn": pet_fn}) self.set_forcing(pet_out, name="pet")
[docs] def setup_rootzoneclim( self, run_fn: Union[str, Path, xr.Dataset], forcing_obs_fn: Union[str, Path, xr.Dataset], forcing_cc_hist_fn: Optional[Union[str, Path, xr.Dataset]] = None, forcing_cc_fut_fn: Optional[Union[str, Path, xr.Dataset]] = None, chunksize: Optional[int] = 100, return_period: Optional[list] = [2, 3, 5, 10, 15, 20, 25, 50, 60, 100], Imax: Optional[float] = 2.0, start_hydro_year: Optional[str] = "Sep", start_field_capacity: Optional[str] = "Apr", LAI: Optional[bool] = False, rootzone_storage: Optional[bool] = False, correct_cc_deficit: Optional[bool] = False, time_tuple: Optional[tuple] = None, time_tuple_fut: Optional[tuple] = None, missing_days_threshold: Optional[int] = 330, update_toml_rootingdepth: Optional[str] = "RootingDepth_obs_20", ) -> None: """ Set the RootingDepth. Done by estimating the catchment-scale root-zone storage capacity from observed hydroclimatic data (and optionally also for climate change historical and future periods). This presents an alternative approach to determine the RootingDepth based on hydroclimatic data instead of through a look-up table relating land use to rooting depth (as usually done for the wflow_sbm model). The method is based on the estimation of maximum annual storage deficits based on precipitation and estimated actual evaporation time series, which in turn are estimated from observed streamflow data and long-term precipitation and potential evap. data, as explained in Bouaziz et al. (2022). The main assumption is that vegetation adapts its rootzone storage capacity to overcome dry spells with a certain return period (typically 20 years for forest ecosystems). In response to a changing climtate, it is likely that vegetation also adapts its rootzone storage capacity, thereby changing model parameters for future conditions. This method also allows to estimate the change in rootzone storage capacity in response to a changing climate. As the method requires precipitation and potential evaporation timeseries, it may be useful to run this method as an update step in the setting-up of the hydrological model, once the forcing files have already been derived. In addition the setup_soilmaps method is also required to calculate the RootingDepth (rootzone_storage / (thetaS-thetaR)). The setup_laimaps method is also required if LAI is set to True (interception capacity estimated from LAI maps, instead of providing a default maximum interception capacity). References ---------- Bouaziz, L. J. E., Aalbers, E. E., Weerts, A. H., Hegnauer, M., Buiteveld, H., Lammersen, R., Stam, J., Sprokkereef, E., Savenije, H. H. G. and Hrachowitz, M. (2022). Ecosystem adaptation to climate change: the sensitivity of hydrological predictions to time-dynamic model parameters, Hydrology and Earth System Sciences, 26(5), 1295-1318. DOI: 10.5194/hess-26-1295-2022. Adds model layer: * **RootingDepth_{forcing}_{RP}** map: rooting depth [mm of the soil column] \ estimated from hydroclimatic data {forcing: obs, cc_hist or cc_fut} for different \ return periods RP. The translation to RootingDepth is done by dividing \ the rootzone_storage by (thetaS - thetaR). * **rootzone_storage_{forcing}_{RP}** geom: polygons of rootzone \ storage capacity [mm of water] for each catchment estimated before filling \ the missings with data from downstream catchments. * **rootzone_storage_{forcing}_{RP}** map: rootzone storage capacity \ [mm of water] estimated from hydroclimatic data {forcing: obs, cc_hist or cc_fut} for \ different return periods RP. Only if rootzone_storage is set to True! Parameters ---------- run_fn : str, Path, xr.Dataset Geodataset with streamflow timeseries (m3/s) per x,y location. The geodataset expects the coordinate names "index" (for each station id) and the variable name "discharge". forcing_obs_fn : str, Path, xr.Dataset Gridded timeseries with the observed forcing [mm/timestep]. Expects to have variables "precip" and "pet". forcing_cc_hist_fn : str, Path, xr.Dataset, optional Gridded timeseries with the simulated historical forcing [mm/timestep], based on a climate model. Expects to have variables "precip" and "pet". The default is None. forcing_cc_fut_fn : str, optional Gridded timeseries with the simulated climate forcing [mm/timestep], based on a climate model. Expects to have variables "precip" and "pet". The default is None. chunksize : int, optional Chunksize on time dimension for processing data (not for saving to disk!). The default is 100. return_period : list, optional List with one or more values indiciating the return period(s) (in years) for wich the rootzone storage depth should be calculated. The default is [2,3,5,10,15,20,25,50,60,100] years. Imax : float, optional The maximum interception storage capacity [mm]. The default is 2.0 mm. start_hydro_year : str, optional The start month (abreviated to the first three letters of the month, starting with a capital letter) of the hydrological year. The default is 'Sep'. start_field_capacity : str, optional The end of the wet season / commencement of dry season. This is the moment when the soil is at field capacity, i.e. there is no storage deficit yet. The default is 'Apr'. LAI : bool, optional Determine whether the LAI will be used to determine Imax. The default is False. If set to True, requires to have run setup_laimaps. rootzone_storage : bool, optional Determines whether the rootzone storage maps should be stored in the grid or not. The default is False. correct_cc_deficit : bool, optional Determines whether a bias-correction of the future deficit should be applied using the cc_hist deficit. Only works if the time periods of cc_hist and cc_fut are the same. If the climate change scenario and hist period are bias-corrected, this should probably set to False. The default is False. time_tuple: tuple, optional Select which time period to read from all the forcing files. There should be some overlap between the time period available in the forcing files for the historical period and in the observed streamflow data. missing_days_threshold: int, optional Minimum number of days within a year for that year to be counted in the long-term Budyko analysis. update_toml_rootingdepth: str, optional Update the wflow_sbm model config of the RootingDepth variable with the estimated RootingDepth. The default is RootingDepth_obs_20, which requires to have RP 20 in the list provided for \ the return_period argument. """ self.logger.info("Preparing climate based root zone storage parameter maps.") # Open the data sets ds_obs = self.data_catalog.get_rasterdataset( forcing_obs_fn, geom=self.region, buffer=2, variables=["pet", "precip"], time_tuple=time_tuple, ) ds_cc_hist = None if forcing_cc_hist_fn is not None: ds_cc_hist = self.data_catalog.get_rasterdataset( forcing_cc_hist_fn, geom=self.region, buffer=2, variables=["pet", "precip"], time_tuple=time_tuple, ) ds_cc_fut = None if forcing_cc_fut_fn is not None: ds_cc_fut = self.data_catalog.get_rasterdataset( forcing_cc_fut_fn, geom=self.region, buffer=2, variables=["pet", "precip"], time_tuple=time_tuple_fut, ) # observed streamflow data dsrun = self.data_catalog.get_geodataset( run_fn, single_var_as_array=False, time_tuple=time_tuple ) # make sure dsrun overlaps with ds_obs, otherwise give error if dsrun.time[0] < ds_obs.time[0]: dsrun = dsrun.sel(time=slice(ds_obs.time[0], None)) if dsrun.time[-1] > ds_obs.time[-1]: dsrun = dsrun.sel(time=slice(None, ds_obs.time[-1])) if len(dsrun.time) == 0: self.logger.error( "No overlapping period between the meteo and observed streamflow data" ) # check if setup_soilmaps and setup_laimaps were run if LAI =True and # if rooting_depth = True" if (LAI == True) and ("LAI" not in self.grid): self.logger.error( "LAI variable not found in grid. \ Set LAI to False or run setup_laimaps first" ) if ("thetaR" not in self.grid) or ("thetaS" not in self.grid): self.logger.error( "thetaS or thetaR variables not found in grid. \ Run setup_soilmaps first" ) # Run the rootzone clim workflow dsout, gdf = workflows.rootzoneclim( dsrun=dsrun, ds_obs=ds_obs, ds_like=self.grid, flwdir=self.flwdir, ds_cc_hist=ds_cc_hist, ds_cc_fut=ds_cc_fut, return_period=return_period, Imax=Imax, start_hydro_year=start_hydro_year, start_field_capacity=start_field_capacity, LAI=LAI, rootzone_storage=rootzone_storage, correct_cc_deficit=correct_cc_deficit, chunksize=chunksize, missing_days_threshold=missing_days_threshold, logger=self.logger, ) # set nodata value outside basin dsout = dsout.where(self.grid[self._MAPS["basins"]] > 0, -999) for var in dsout.data_vars: dsout[var].raster.set_nodata(-999) self.set_grid(dsout) self.set_geoms(gdf, name="rootzone_storage") # update config self.set_config("input.vertical.rootingdepth", update_toml_rootingdepth)
[docs] def setup_1dmodel_connection( self, river1d_fn: Union[str, Path, gpd.GeoDataFrame], connection_method: str = "subbasin_area", area_max: float = 10.0, add_tributaries: bool = True, include_river_boundaries: bool = True, mapname: str = "1dmodel", update_toml: bool = True, toml_output: str = "netcdf", ): """ Connect wflow to a 1D model by deriving linked subcatch (and tributaries). There are two methods to connect models: - `subbasin_area`: creates subcatchments linked to the 1d river based on an area threshold (area_max) for the subbasin size. With this method, if a tributary is larger than the `area_max`, it will be connected to the 1d river directly. - `nodes`: subcatchments are derived based on the 1driver nodes (used as gauges locations). With this method, large tributaries can also be derived separately using the `add_tributaries` option and adding a `area_max` threshold for the tributaries. If `add_tributary` option is on, you can decide to include or exclude the upstream boundary of the 1d river as an additional tributary using the `include_river_boundaries` option. Optionally, the toml file can also be updated to save lateral.river.inwater to save all river inflows for the subcatchments and lateral.river.q_av for the tributaries using :py:meth:`hydromt_wflow.wflow.setup_config_output_timeseries`. Adds model layer: * **wflow_subcatch_{mapname}** map/geom: connection subbasins between wflow and the 1D model. * **wflow_subcatch_riv_{mapname}** map/geom: connection subbasins between wflow and the 1D model for river cells only. * **wflow_gauges_{mapname}** map/geom, optional: outlets of the tributaries flowing into the 1D model. Parameters ---------- river1d_fn : str, Path, gpd.GeoDataFrame GeodataFrame with the 1D model river network and nodes where to derive subbasins for connection_method **nodes**. connection_method : str, default subbasin_area Method to connect wflow to the 1D model. Available methods are { 'subbasin_area', 'nodes'}. area_max : float, default 10.0 Maximum area [km2] of the subbasins to connect to the 1D model in km2 with connection_method **subbasin_area** or **nodes** with add_tributaries set to True. add_tributaries : bool, default True If True, derive tributaries for the subbasins larger than area_max. Always True for **subbasin_area** method. include_river_boundaries : bool, default True If True, include the upstream boundary(ies) of the 1d river as an additional tributary(ies). mapname : str, default 1dmodel Name of the map to save the subcatchments and tributaries in the wflow model staticmaps and geoms (wflow_subcatch_{mapname}). update_toml : bool, default True If True, updates the wflow configuration file to save the required outputs for the 1D model. toml_output : str, optional One of ['csv', 'netcdf', None] to update [csv] or [netcdf] section of wflow toml file or do nothing. By default, 'netcdf'. """ # Check connection method values if connection_method not in ["subbasin_area", "nodes"]: raise ValueError( f"Unknown connection method {connection_method}," "select from ['subbasin_area', 'nodes']" ) # read 1d model river network gdf_riv = self.data_catalog.get_geodataframe( river1d_fn, geom=self.region, buffer=2, ) # derive subcatchments and tributaries inv_rename = {v: k for k, v in self._MAPS.items() if v in self.grid} ds_out = workflows.wflow_1dmodel_connection( gdf_riv, ds_model=self.grid.rename(inv_rename), connection_method=connection_method, area_max=area_max, add_tributaries=add_tributaries, include_river_boundaries=include_river_boundaries, logger=self.logger, ) # Derive tributary gauge map if "gauges" in ds_out.data_vars: self.set_grid(ds_out["gauges"], name=f"wflow_gauges_{mapname}") # Derive the gauges staticgeoms gdf_tributary = ds_out["gauges"].raster.vectorize() gdf_tributary["geometry"] = gdf_tributary["geometry"].centroid gdf_tributary["value"] = gdf_tributary["value"].astype( ds_out["gauges"].dtype ) self.set_geoms(gdf_tributary, name=f"gauges_{mapname}") # Update toml if update_toml: self.setup_config_output_timeseries( mapname=f"wflow_gauges_{mapname}", toml_output=toml_output, header=["Q"], param=["lateral.river.q_av"], reducer=None, ) # Derive subcatchment map self.set_grid(ds_out["subcatch"], name=f"wflow_subcatch_{mapname}") gdf_subcatch = ds_out["subcatch"].raster.vectorize() gdf_subcatch["value"] = gdf_subcatch["value"].astype(ds_out["subcatch"].dtype) self.set_geoms(gdf_subcatch, name=f"subcatch_{mapname}") # Subcatchment map for river cells only (to be able to save river outputs # in wflow) self.set_grid(ds_out["subcatch_riv"], name=f"wflow_subcatch_riv_{mapname}") gdf_subcatch_riv = ds_out["subcatch_riv"].raster.vectorize() gdf_subcatch_riv["value"] = gdf_subcatch_riv["value"].astype( ds_out["subcatch"].dtype ) self.set_geoms(gdf_subcatch_riv, name=f"subcatch_riv_{mapname}") # Update toml if update_toml: self.setup_config_output_timeseries( mapname=f"wflow_subcatch_riv_{mapname}", toml_output=toml_output, header=["Qlat"], param=["lateral.river.inwater"], reducer=["sum"], )
[docs] def setup_cold_states( self, timestamp: str = None, ) -> None: """Prepare cold states for Wflow. To be run last as this requires some soil parameters or constant_pars to be computed already. To be run after setup_lakes, setup_reservoirs and setup_glaciers to also create cold states for them if they are present in the basin. This function is mainly useful in case the wflow model is read into Delft-FEWS. Adds model layer: * **satwaterdepth**: saturated store [mm] * **snow**: snow storage [mm] * **tsoil**: top soil temperature [°C] * **ustorelayerdepth**: amount of water in the unsaturated store, per layer [mm] * **snowwater**: liquid water content in the snow pack [mm] * **canopystorage**: canopy storage [mm] * **q_river**: river discharge [m3/s] * **h_river**: river water level [m] * **h_av_river**: river average water level [m] * **ssf**: subsurface flow [m3/d] * **h_land**: land water level [m] * **h_av_land**: land average water level[m] * **q_land** or **qx_land**+**qy_land**: overland flow for kinwave [m3/s] or overland flow in x/y directions for local-inertial [m3/s] If lakes, also adds: * **waterlevel_lake**: lake water level [m] If reservoirs, also adds: * **volume_reservoir**: reservoir volume [m3] If glaciers, also adds: * **glacierstore**: water within the glacier [mm] Parameters ---------- timestamp : str, optional Timestamp of the cold states. By default uses the (starttime - timestepsecs) from the config. """ states, states_config = workflows.prepare_cold_states( self.grid, config=self.config, timestamp=timestamp, ) self.set_states(states) # Update config to read the states self.set_config("model.reinit", False) # Update states variables names in config for option in states_config: self.set_config(option, states_config[option])
# I/O
[docs] def read(self): """Read the complete model schematization and configuration from file.""" self.read_config() self.read_grid() self.read_intbl() self.read_tables() self.read_geoms() self.read_states() self.read_forcing() self.logger.info("Model read")
[docs] def write(self): """Write the complete model schematization and configuration to file.""" self.logger.info(f"Write model data to {self.root}") # if in r, r+ mode, only write updated components if not self._write: self.logger.warning("Cannot write in read-only mode") return self.write_data_catalog() if self.config: # try to read default if not yet set self.write_config() if self._grid: self.write_grid() if self._tables: self.write_tables() if self._geoms: self.write_geoms() if self._states: self.write_states() if self._forcing: self.write_forcing()
[docs] def read_grid(self, **kwargs): """ Read wflow static input and add to ``grid``. Checks the path of the file in the config toml using both ``input.path_static`` and ``dir_input``. If not found uses the default path ``staticmaps.nc`` in the root folder. If the file is not found, will try to read from the old PCRaster format if map files are found in the ``staticmaps`` folder in the root folder using read_staticmaps_pcr. See Also -------- read_staticmaps_pcr """ fn_default = "staticmaps.nc" fn = self.get_config( "input.path_static", abs_path=True, fallback=join(self.root, fn_default) ) if self.get_config("dir_input") is not None: input_dir = self.get_config("dir_input", abs_path=True) fn = join( input_dir, self.get_config("input.path_static", fallback=fn_default), ) self.logger.info(f"Input directory found {input_dir}") if not self._write: # start fresh in read-only mode self._grid = xr.Dataset() if fn is not None and isfile(fn): self.logger.info(f"Read grid from {fn}") # FIXME: we need a smarter (lazy) solution for big models which also # works when overwriting / appending data in the same source! ds = xr.open_dataset( fn, mask_and_scale=False, decode_coords="all", **kwargs ).load() ds.close() # make sure internally maps are always North -> South oriented if ds.raster.res[1] > 0: ds = ds.raster.flipud() self.set_grid(ds) elif len(glob.glob(join(self.root, "staticmaps", "*.map"))) > 0: self.read_staticmaps_pcr()
def read_staticmaps(self, **kwargs): """Read staticmaps. DEPRECATED for read_grid.""" self.logger.warning( "read_staticmaps is deprecated. Use 'read_grid' instead. " "Will be removed in hydromt_wflow v0.6.0" ) self.read_grid(**kwargs)
[docs] def write_grid(self): """ Write grid to wflow static data file. Checks the path of the file in the config toml using both ``input.path_static`` and ``dir_input``. If not found uses the default path ``staticmaps.nc`` in the root folder. """ if not self._write: raise IOError("Model opened in read-only mode") # clean-up grid and write CRS according to CF-conventions # TODO replace later with hydromt.raster.gdal_compliant method # after core release crs = self.grid.raster.crs ds_out = self.grid.reset_coords() # TODO?! # if ds_out.raster.res[1] < 0: # write data with South -> North orientation # ds_out = ds_out.raster.flipud() x_dim, y_dim, x_attrs, y_attrs = hydromt.gis_utils.axes_attrs(crs) ds_out = ds_out.rename({ds_out.raster.x_dim: x_dim, ds_out.raster.y_dim: y_dim}) ds_out[x_dim].attrs.update(x_attrs) ds_out[y_dim].attrs.update(y_attrs) ds_out = ds_out.drop_vars(["mask", "spatial_ref", "ls"], errors="ignore") ds_out.rio.write_crs(crs, inplace=True) ds_out.rio.write_transform(self.grid.raster.transform, inplace=True) ds_out.raster.set_spatial_dims() # Remove FillValue Nan for x_dim, y_dim encoding = dict() for v in [ds_out.raster.x_dim, ds_out.raster.y_dim]: ds_out[v].attrs.pop("_FillValue", None) encoding[v] = {"_FillValue": None} # filename fn_default = "staticmaps.nc" fn = self.get_config( "input.path_static", abs_path=True, fallback=join(self.root, fn_default) ) # Append inputdir if required if self.get_config("dir_input") is not None: input_dir = self.get_config("dir_input", abs_path=True) fn = join( input_dir, self.get_config("input.path_static", fallback=fn_default), ) # Check if all sub-folders in fn exists and if not create them if not isdir(dirname(fn)): os.makedirs(dirname(fn)) self.logger.info(f"Write grid to {fn}") mask = ds_out[self._MAPS["basins"]] > 0 for v in ds_out.data_vars: # nodata is required for all but boolean fields if ds_out[v].dtype != "bool": ds_out[v] = ds_out[v].where(mask, ds_out[v].raster.nodata) ds_out.to_netcdf(fn, encoding=encoding)
def write_staticmaps(self): """Write staticmaps: DEPRECATED for write_grid.""" self.logger.warning( "write_staticmaps is deprecated. Use 'write_grid' instead. " "Will be removed in hydromt_wflow v0.6.0" ) self.write_grid()
[docs] def read_geoms( self, geom_fn: str = "staticgeoms", ): """ Read static geometries and adds to ``geoms``. Assumes that the `geom_fn` folder is located in the same folder as the static input data (``input.path_static``). If not found uses assumes they are in <root/geom_fm>. """ if not self._write: self._geoms = dict() # fresh start in read-only mode dir_default = join(self.root, "staticmaps.nc") dir_mod = dirname( self.get_config("input.path_static", abs_path=True, fallback=dir_default) ) fns = glob.glob(join(dir_mod, geom_fn, "*.geojson")) if len(fns) > 1: self.logger.info("Reading model staticgeom files.") for fn in fns: name = basename(fn).split(".")[0] if name != "region": self.set_geoms(gpd.read_file(fn), name=name)
def read_staticgeoms( self, geom_fn: str = "staticgeoms", ): """Read static geometries: DEPRECATED for read_geoms.""" self.logger.warning( "read_staticgeoms is deprecated. Use 'read_geoms' instead. " "Will be removed in hydromt_wflow v0.6.0" ) self.read_geoms(geom_fn)
[docs] def write_geoms( self, geom_fn: str = "staticgeoms", ): """Write geoms in <root/geom_fn> in GeoJSON format.""" # to write use self.geoms[var].to_file() if not self._write: raise IOError("Model opened in read-only mode") if self.geoms: self.logger.info("Writing model staticgeom to file.") for name, gdf in self.geoms.items(): fn_out = join(self.root, geom_fn, f"{name}.geojson") gdf.to_file(fn_out, driver="GeoJSON")
def write_staticgeoms( self, geom_fn: str = "staticgeoms", ): """Write static geometries: DEPRECATED for write_geoms.""" self.logger.warning( "write_staticgeoms is deprecated. Use 'write_geoms' instead. " "Will be removed in hydromt_wflow v0.6.0" ) self.write_geoms(geom_fn)
[docs] def read_forcing(self): """ Read forcing. Checks the path of the file in the config toml using both ``input.path_forcing`` and ``dir_input``. If not found uses the default path ``inmaps.nc`` in the root folder. If several files are used using '*' in ``input.path_forcing``, all corresponding files are read and merged into one xarray dataset before being splitted to one xarray dataaray per forcing variable in the hydromt ``forcing`` dictionnary. """ fn_default = "inmaps.nc" fn = self.get_config( "input.path_forcing", abs_path=True, fallback=join(self.root, fn_default) ) if self.get_config("dir_input") is not None: input_dir = self.get_config("dir_input", abs_path=True) fn = join( input_dir, self.get_config( "input.path_forcing", fallback=fn_default, ), ) self.logger.info(f"Input directory found {input_dir}") if not self._write: # start fresh in read-only mode self._forcing = dict() if fn is not None and isfile(fn): self.logger.info(f"Read forcing from {fn}") ds = xr.open_dataset(fn, chunks={"time": 30}, decode_coords="all") for v in ds.data_vars: self.set_forcing(ds[v]) elif "*" in str(fn): self.logger.info(f"Read multiple forcing files using {fn}") fns = list(fn.parent.glob(fn.name)) if len(fns) == 0: raise IOError(f"No forcing files found using {fn}") ds = xr.open_mfdataset(fns, chunks={"time": 30}, decode_coords="all") for v in ds.data_vars: self.set_forcing(ds[v])
[docs] def write_forcing( self, fn_out=None, freq_out=None, chunksize=1, decimals=2, time_units="days since 1900-01-01T00:00:00", **kwargs, ): """Write forcing at ``fn_out`` in model ready format. If no ``fn_out`` path is provided and path_forcing from the wflow toml exists, the following default filenames are used: * Default name format (with downscaling): \ inmaps_sourcePd_sourceTd_methodPET_freq_startyear_endyear.nc * Default name format (no downscaling): \ inmaps_sourceP_sourceT_methodPET_freq_startyear_endyear.nc Parameters ---------- fn_out: str, Path, optional Path to save output netcdf file; if None the name is read from the wflow toml file. freq_out: str (Offset), optional Write several files for the forcing according to fn_freq. For example 'Y' for one file per year or 'M' for one file per month. By default writes the one file. For more options, \ see https://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html#offset-aliases chunksize: int, optional Chunksize on time dimension when saving to disk. By default 1. decimals: int, optional Round the ouput data to the given number of decimals. time_units: str, optional Common time units when writting several netcdf forcing files. By default "days since 1900-01-01T00:00:00". """ if not self._write: raise IOError("Model opened in read-only mode") if self.forcing: self.logger.info("Write forcing file") # Get default forcing name from forcing attrs yr0 = pd.to_datetime(self.get_config("starttime")).year yr1 = pd.to_datetime(self.get_config("endtime")).year freq = self.get_config("timestepsecs") # get output filename if fn_out is not None: self.set_config("input.path_forcing", fn_out) self.write_config() # re-write config else: fn_name = self.get_config("input.path_forcing", abs_path=False) if fn_name is not None: if "*" in basename(fn_name): # get rid of * in case model had multiple forcing files and # write to single nc file. self.logger.warning( "Writing multiple forcing files to one file" ) fn_name = join( dirname(fn_name), basename(fn_name).replace("*", "") ) if self.get_config("dir_input") is not None: input_dir = self.get_config("dir_input", abs_path=True) fn_out = join(input_dir, fn_name) else: fn_out = join(self.root, fn_name) else: fn_out = None # get deafult filename if file exists if fn_out is None or isfile(fn_out): self.logger.warning( "Netcdf forcing file from input.path_forcing in the TOML " "already exists, using default name." ) sourceP = "" sourceT = "" methodPET = "" if "precip" in self.forcing: val = self.forcing["precip"].attrs.get("precip_clim_fn", None) Pdown = "d" if val is not None else "" val = self.forcing["precip"].attrs.get("precip_fn", None) if val is not None: sourceP = f"_{val}{Pdown}" if "temp" in self.forcing: val = self.forcing["temp"].attrs.get("temp_correction", "False") Tdown = "d" if val == "True" else "" val = self.forcing["temp"].attrs.get("temp_fn", None) if val is not None: sourceT = f"_{val}{Tdown}" if "pet" in self.forcing: val = self.forcing["pet"].attrs.get("pet_method", None) if val is not None: methodPET = f"_{val}" fn_default = ( f"inmaps{sourceP}{sourceT}{methodPET}_{freq}_{yr0}_{yr1}.nc" ) if self.get_config("dir_input") is not None: input_dir = self.get_config("dir_input", abs_path=True) fn_default_path = join(input_dir, fn_default) else: fn_default_path = join(self.root, fn_default) if isfile(fn_default_path): self.logger.warning( "Netcdf default forcing file already exists, \ skipping write_forcing. " "To overwrite netcdf forcing file: \ change name input.path_forcing " "in setup_config section of the build inifile." ) return else: self.set_config("input.path_forcing", fn_default) self.write_config() # re-write config fn_out = fn_default_path # Check if all dates between (starttime, endtime) are in all da forcing # Check if starttime and endtime timestamps are correct start = pd.to_datetime(self.get_config("starttime")) end = pd.to_datetime(self.get_config("endtime")) correct_times = False for da in self.forcing.values(): if "time" in da.coords: # only correct dates in toml for standard calendars: if not hasattr(da.indexes["time"], "to_datetimeindex"): times = da.time.values if (start < pd.to_datetime(times[0])) | (start not in times): start = pd.to_datetime(times[0]) correct_times = True if (end > pd.to_datetime(times[-1])) | (end not in times): end = pd.to_datetime(times[-1]) correct_times = True # merge, process and write forcing ds = xr.merge([da.reset_coords(drop=True) for da in self.forcing.values()]) ds.raster.set_crs(self.crs) # Send warning, and update config with new start and end time if correct_times: self.logger.warning( f"Not all dates found in precip_fn changing starttime to \ {start} and endtime to {end} in the toml." ) self.set_config("starttime", start.to_pydatetime()) self.set_config("endtime", end.to_pydatetime()) self.write_config() if decimals is not None: ds = ds.round(decimals) # clean-up forcing and write CRS according to CF-conventions ds = ds.raster.gdal_compliant(rename_dims=True, force_sn=False) ds = ds.drop_vars(["mask", "idx_out"], errors="ignore") # write with output chunksizes with single timestep and complete # spatial grid to speed up the reading from wflow.jl # dims are always ordered (time, y, x) ds.raster._check_dimensions() chunksizes = (chunksize, ds.raster.ycoords.size, ds.raster.xcoords.size) encoding = { v: {"zlib": True, "dtype": "float32", "chunksizes": chunksizes} for v in ds.data_vars.keys() } # make sure no _FillValue is written to the time / x_dim / y_dim dimension # For several forcing files add common units attributes to time for v in ["time", ds.raster.x_dim, ds.raster.y_dim]: ds[v].attrs.pop("_FillValue", None) encoding[v] = {"_FillValue": None} # Check if all sub-folders in fn_out exists and if not create them if not isdir(dirname(fn_out)): os.makedirs(dirname(fn_out)) forcing_list = [] if freq_out is None: # with compute=False we get a delayed object which is executed when # calling .compute where we can pass more arguments to # the dask.compute method forcing_list.append([fn_out, ds]) else: self.logger.info(f"Writting several forcing with freq {freq_out}") # For several forcing files add common units attributes to time encoding["time"] = {"_FillValue": None, "units": time_units} # Updating path forcing in config fns_out = os.path.relpath(fn_out, self.root) fns_out = f"{str(fns_out)[0:-3]}_*.nc" self.set_config("input.path_forcing", fns_out) self.write_config() # re-write config for label, ds_gr in ds.resample(time=freq_out): # ds_gr = group[1] start = ds_gr["time"].dt.strftime("%Y%m%d")[0].item() fn_out_gr = f"{str(fn_out)[0:-3]}_{start}.nc" forcing_list.append([fn_out_gr, ds_gr]) for fn_out_gr, ds_gr in forcing_list: self.logger.info(f"Process forcing; saving to {fn_out_gr}") delayed_obj = ds_gr.to_netcdf( fn_out_gr, encoding=encoding, mode="w", compute=False ) with ProgressBar(): delayed_obj.compute(**kwargs)
# TO profile uncomment lines below to replace lines above # from dask.diagnostics import Profiler, CacheProfiler, ResourceProfiler # import cachey # with Profiler() as prof, CacheProfiler(metric=cachey.nbytes) as cprof, # ResourceProfiler() as rprof: # delayed_obj.compute() # visualize([prof, cprof, rprof], # file_path=r'c:\Users\eilan_dk\work\profile2.html')
[docs] def read_states(self): """Read states at <root/instate/> and parse to dict of xr.DataArray.""" fn_default = join("instate", "instates.nc") fn = self.get_config( "state.path_input", abs_path=True, fallback=join(self.root, fn_default) ) if self.get_config("dir_input") is not None: input_dir = self.get_config("dir_input", abs_path=True) fn = join( input_dir, self.get_config("state.path_input", fallback=fn_default), ) self.logger.info(f"Input directory found {input_dir}") if not self._write: # start fresh in read-only mode self._states = dict() if fn is not None and isfile(fn): self.logger.info(f"Read states from {fn}") ds = xr.open_dataset(fn, mask_and_scale=False) for v in ds.data_vars: self.set_states(ds[v])
[docs] def write_states(self, fn_out: Union[str, Path] = None): """Write states at <root/instate/> in model ready format.""" if not self._write: raise IOError("Model opened in read-only mode") if self.states: self.logger.info("Writting states file") # get output filename and if needed update and re-write the config if fn_out is not None: self.set_config("state.path_input", fn_out) self.write_config() # re-write config else: fn_name = self.get_config( "state.path_input", abs_path=False, fallback=None ) if fn_out is None: fn_name = join("instate", "instates.nc") self.set_config("state.path_input", fn_name) self.write_config() # re-write config if self.get_config("dir_input") is not None: input_dir = self.get_config("dir_input", abs_path=True) fn_out = join(input_dir, fn_name) else: fn_out = join(self.root, fn_name) # merge, process and write forcing ds = xr.merge(self.states.values()) # make sure no _FillValue is written to the time dimension ds["time"].attrs.pop("_FillValue", None) # Check if all sub-folders in fn_out exists and if not create them if not isdir(dirname(fn_out)): os.makedirs(dirname(fn_out)) # write states ds.to_netcdf(fn_out, mode="w")
[docs] def read_results(self): """Read results at <root/?/> and parse to dict of xr.DataArray/xr.Dataset.""" if not self._write: # start fresh in read-only mode self._results = dict() output_dir = "" if self.get_config("dir_output") is not None: output_dir = self.get_config("dir_output") # Read gridded netcdf (output section) nc_fn = self.get_config("output.path", abs_path=True) nc_fn = nc_fn.parent / output_dir / nc_fn.name if nc_fn is not None else nc_fn if nc_fn is not None and isfile(nc_fn): self.logger.info(f"Read results from {nc_fn}") ds = xr.open_dataset(nc_fn, chunks={"time": 30}, decode_coords="all") # TODO ? align coords names and values of results nc with grid self.set_results(ds, name="output") # Read scalar netcdf (netcdf section) ncs_fn = self.get_config("netcdf.path", abs_path=True) ncs_fn = ( ncs_fn.parent / output_dir / ncs_fn.name if ncs_fn is not None else ncs_fn ) if ncs_fn is not None and isfile(ncs_fn): self.logger.info(f"Read results from {ncs_fn}") ds = xr.open_dataset(ncs_fn, chunks={"time": 30}) self.set_results(ds, name="netcdf") # Read csv timeseries (csv section) csv_fn = self.get_config("csv.path", abs_path=True) csv_fn = ( csv_fn.parent / output_dir / csv_fn.name if csv_fn is not None else csv_fn ) if csv_fn is not None and isfile(csv_fn): csv_dict = utils.read_csv_results( csv_fn, config=self.config, maps=self.grid ) for key in csv_dict: # Add to results self.set_results(csv_dict[f"{key}"])
def write_results(self): """Write results at <root/?/> in model ready format.""" if not self._write: raise IOError("Model opened in read-only mode") # raise NotImplementedError() def read_intbl(self, **kwargs): """Read and intbl files at <root/intbl> and parse to xarray.""" if not self._write: self._intbl = dict() # start fresh in read-only mode if not self._read: self.logger.info("Reading default intbl files.") fns = glob.glob(join(DATADIR, "wflow", "intbl", "*.tbl")) else: self.logger.info("Reading model intbl files.") fns = glob.glob(join(self.root, "intbl", "*.tbl")) if len(fns) > 0: for fn in fns: name = basename(fn).split(".")[0] tbl = pd.read_csv(fn, delim_whitespace=True, header=None) tbl.columns = [ f"expr{i+1}" if i + 1 < len(tbl.columns) else "value" for i in range(len(tbl.columns)) ] # rename columns self.set_intbl(tbl, name=name) def write_intbl(self): """Write intbl at <root/intbl> in PCRaster table format.""" if not self._write: raise IOError("Model opened in read-only mode") if self.intbl: self.logger.info("Writing intbl files.") for name in self.intbl: fn_out = join(self.root, "intbl", f"{name}.tbl") self.intbl[name].to_csv(fn_out, sep=" ", index=False, header=False) def set_intbl(self, df, name): """Add intbl <pandas.DataFrame> to model.""" if not (isinstance(df, pd.DataFrame) or isinstance(df, pd.Series)): raise ValueError("df type not recognized, should be pandas.DataFrame.") if name in self._intbl: if not self._write: raise IOError(f"Cannot overwrite intbl {name} in read-only mode") elif self._read: self.logger.warning(f"Overwriting intbl: {name}") self._intbl[name] = df
[docs] def read_tables(self, **kwargs): """Read table files at <root> and parse to dict of dataframes.""" if not self._write: self._tables = dict() # start fresh in read-only mode self.logger.info("Reading model table files.") fns = glob.glob(join(self.root, "*.csv")) if len(fns) > 0: for fn in fns: name = basename(fn).split(".")[0] tbl = pd.read_csv(fn) self.set_tables(tbl, name=name)
[docs] def write_tables(self): """Write tables at <root>.""" if not self._write: raise IOError("Model opened in read-only mode") if self.tables: self.logger.info("Writing table files.") for name in self.tables: fn_out = join(self.root, f"{name}.csv") self.tables[name].to_csv(fn_out, sep=",", index=False, header=True)
[docs] def set_tables(self, df, name): """Add table <pandas.DataFrame> to model.""" if not (isinstance(df, pd.DataFrame) or isinstance(df, pd.Series)): raise ValueError("df type not recognized, should be pandas.DataFrame.") if name in self._tables: if not self._write: raise IOError(f"Cannot overwrite table {name} in read-only mode") elif self._read: self.logger.warning(f"Overwriting table: {name}") self._tables[name] = df
def _configread(self, fn): with codecs.open(fn, "r", encoding="utf-8") as f: fdict = toml.load(f) return fdict def _configwrite(self, fn): with codecs.open(fn, "w", encoding="utf-8") as f: toml.dump(self.config, f) ## WFLOW specific data and methods @property def intbl(self): """Return a dictionary of pandas.DataFrames representing wflow intbl files.""" if not self._intbl: self.read_intbl() return self._intbl @property # Move to core Model API ? def tables(self): """Return a dictionary of pandas.DataFrames representing wflow intbl files.""" if not self._tables: self.read_tables() return self._tables @property def flwdir(self): """Return the pyflwdir.FlwdirRaster object parsed from wflow ldd.""" if self._flwdir is None: self.set_flwdir() return self._flwdir
[docs] def set_flwdir(self, ftype="infer"): """Parse pyflwdir.FlwdirRaster object parsed from the wflow ldd.""" flwdir_name = flwdir_name = self._MAPS["flwdir"] self._flwdir = flw.flwdir_from_da( self.grid[flwdir_name], ftype=ftype, check_ftype=True, mask=(self.grid[self._MAPS["basins"]] > 0), )
@property def basins(self): """Returns a basin(s) geometry as a geopandas.GeoDataFrame.""" if "basins" in self.geoms: gdf = self.geoms["basins"] elif self._MAPS["basins"] in self.grid: gdf = ( self.grid[self._MAPS["basins"]] .raster.vectorize() .set_index("value") .sort_index() ) gdf.index.name = self._MAPS["basins"] self.set_geoms(gdf, name="basins") else: self.logger.warning(f"Basin map {self._MAPS['basins']} not found in grid.") gdf = None return gdf @property def rivers(self): """Return a river geometry as a geopandas.GeoDataFrame. If available, the stream order and upstream area values are added to the geometry properties. """ if "rivers" in self.geoms: gdf = self.geoms["rivers"] elif self._MAPS["rivmsk"] in self.grid: rivmsk = self.grid[self._MAPS["rivmsk"]].values != 0 # Check if there are river cells in the model before continuing if np.any(rivmsk): # add stream order 'strord' column strord = self.flwdir.stream_order(mask=rivmsk) feats = self.flwdir.streams(mask=rivmsk, strord=strord) gdf = gpd.GeoDataFrame.from_features(feats) gdf.crs = pyproj.CRS.from_user_input(self.crs) self.set_geoms(gdf, name="rivers") else: self.logger.warning("No river cells detected in the selected basin.") gdf = None return gdf @property def staticgeoms(self): """Return static geometries. Note: deprecated and will be removed in hydromt_wflow v0.6.0. Use :py:attr:`geoms` instead. """ self.logger.warning( "staticgeoms is deprecated. Call 'geoms' instead. " "Will be removed in hydromt_wflow v0.6.0" ) return self.geoms @property def staticmaps(self): """Return staticmaps. Note: deprecated and will be removed in hydromt_wflow v0.6.0. Use :py:attr:`grid` instead. """ self.logger.warning( "staticmaps is deprecated. Call 'grid' instead. " "Will be removed in hydromt_wflow v0.6.0" ) return self.grid ## WFLOW specific modification (clip for now) methods
[docs] def clip_grid( self, region, buffer=0, align=None, crs=4326, ): """Clip grid to subbasin. Parameters ---------- region : dict See :meth:`models.wflow.WflowModel.setup_basemaps` buffer : int, optional Buffer around subbasin in number of pixels, by default 0 align : float, optional Align bounds of region to raster with resolution <align>, by default None crs: int, optional Default crs of the grid to clip. Returns ------- xarray.DataSet Clipped grid. """ basins_name = self._MAPS["basins"] flwdir_name = self._MAPS["flwdir"] kind, region = hydromt.workflows.parse_region(region, logger=self.logger) # translate basin and outlet kinds to geom geom = region.get("geom", None) bbox = region.get("bbox", None) if kind in ["basin", "outlet", "subbasin"]: # supply bbox to avoid getting basin bounds first when clipping subbasins if kind == "subbasin" and bbox is None: region.update(bbox=self.bounds) geom, _ = hydromt.workflows.get_basin_geometry( ds=self.grid, logger=self.logger, kind=kind, basins_name=basins_name, flwdir_name=flwdir_name, **region, ) # clip based on subbasin args, geom or bbox if geom is not None: ds_grid = self.grid.raster.clip_geom(geom, align=align, buffer=buffer) ds_grid.coords["mask"] = ds_grid.raster.geometry_mask(geom) ds_grid[basins_name] = ds_grid[basins_name].where( ds_grid.coords["mask"], self.grid[basins_name].raster.nodata ) ds_grid[basins_name].attrs.update( _FillValue=self.grid[basins_name].raster.nodata ) elif bbox is not None: ds_grid = self.grid.raster.clip_bbox(bbox, align=align, buffer=buffer) # Update flwdir grid and geoms if self.crs is None and crs is not None: self.set_crs(crs) self._grid = xr.Dataset() self.set_grid(ds_grid) # add pits at edges after clipping self._flwdir = None # make sure old flwdir object is removed self.grid[self._MAPS["flwdir"]].data = self.flwdir.to_array("ldd") # Reinitiliase geoms and re-create basins/rivers self._geoms = dict() # self.basins # self.rivers # now geoms links to geoms which does not exist in every hydromt version # remove when updating wflow to new objects # Basin shape basins = ( self.grid[basins_name].raster.vectorize().set_index("value").sort_index() ) basins.index.name = basins_name self.set_geoms(basins, name="basins") rivmsk = self.grid[self._MAPS["rivmsk"]].values != 0 # Check if there are river cells in the model before continuing if np.any(rivmsk): # add stream order 'strord' column strord = self.flwdir.stream_order(mask=rivmsk) feats = self.flwdir.streams(mask=rivmsk, strord=strord) gdf = gpd.GeoDataFrame.from_features(feats) gdf.crs = pyproj.CRS.from_user_input(self.crs) self.set_geoms(gdf, name="rivers") # Update reservoir and lakes remove_reservoir = False if self._MAPS["resareas"] in self.grid: reservoir = self.grid[self._MAPS["resareas"]] if not np.any(reservoir > 0): remove_reservoir = True remove_maps = [ self._MAPS["resareas"], self._MAPS["reslocs"], "ResSimpleArea", "ResDemand", "ResTargetFullFrac", "ResTargetMinFrac", "ResMaxRelease", "ResMaxVolume", ] self._grid = self.grid.drop_vars(remove_maps) remove_lake = False if self._MAPS["lakeareas"] in self.grid: lake = self.grid[self._MAPS["lakeareas"]] if not np.any(lake > 0): remove_lake = True remove_maps = [ self._MAPS["lakeareas"], self._MAPS["lakelocs"], "LinkedLakeLocs", "LakeStorFunc", "LakeOutflowFunc", "LakeArea", "LakeAvgLevel", "LakeAvgOut", "LakeThreshold", "Lake_b", "Lake_e", ] self._grid = self.grid.drop_vars(remove_maps) # Update tables ids = np.unique(lake) self._tables = { k: v for k, v in self.tables.items() if not any([str(x) in k for x in ids]) } # Update config # Remove the absolute path and if needed remove lakes and reservoirs if remove_reservoir: # change reservoirs = true to false self.set_config("model.reservoirs", False) # remove states if self.get_config("state.lateral.river.reservoir") is not None: del self.config["state"]["lateral"]["river"]["reservoir"] if remove_lake: # change lakes = true to false self.set_config("model.lakes", False) # remove states if self.get_config("state.lateral.river.lake") is not None: del self.config["state"]["lateral"]["river"]["lake"]
def clip_staticmaps( self, region, buffer=0, align=None, crs=4326, ): """Clip staticmaps to subbasin.""" self.logger.warning( "clip_staticmaps is deprecated. Use 'clip_grid' instead. " "Will be removed in hydromt_wflow v0.6.0" ) self.clip_grid(region, buffer, align, crs)
[docs] def clip_forcing(self, crs=4326, **kwargs): """Return clippped forcing for subbasin. Returns ------- xarray.DataSet Clipped forcing. """ if len(self.forcing) > 0: self.logger.info("Clipping NetCDF forcing..") ds_forcing = xr.merge(self.forcing.values()).raster.clip_bbox( self.grid.raster.bounds ) self.set_forcing(ds_forcing)
[docs] def clip_states(self, crs=4326, **kwargs): """Return clippped states for subbasin. Returns ------- xarray.DataSet Clipped states. """ if len(self.states) > 0: self.logger.info("Clipping NetCDF states..") ds_states = xr.merge(self.states.values()).raster.clip_bbox( self.grid.raster.bounds ) # Check for reservoirs/lakes presence in the clipped model remove_maps = [] if self._MAPS["resareas"] not in self.grid: if "volume_reservoir" in ds_states: remove_maps.extend(["volume_reservoir"]) if self._MAPS["lakeareas"] not in self.grid: if "waterlevel_lake" in ds_states: remove_maps.extend(["waterlevel_lake"]) ds_states = ds_states.drop_vars(remove_maps) self.set_states(ds_states)