"""Implement Wflow model class."""
# Implement model class following model API
import logging
import os
import tomllib
from os.path import isfile, join
from pathlib import Path
from typing import Any, Dict, List
import geopandas as gpd
import hydromt
import numpy as np
import pandas as pd
import pyflwdir
import pyproj
import xarray as xr
from hydromt import hydromt_step
from hydromt._typing import ModeLike, NoDataStrategy
from hydromt.gis import flw
from hydromt.model import Model
from hydromt.model.processes.basin_mask import get_basin_geometry
from hydromt.model.processes.region import (
_parse_region_value,
)
import hydromt_wflow.utils as utils
from hydromt_wflow import workflows
from hydromt_wflow.components import (
WflowConfigComponent,
WflowForcingComponent,
WflowGeomsComponent,
WflowOutputCsvComponent,
WflowOutputGridComponent,
WflowOutputScalarComponent,
WflowStatesComponent,
WflowStaticmapsComponent,
WflowTablesComponent,
)
from hydromt_wflow.naming import _create_hydromt_wflow_mapping_sbm
from hydromt_wflow.version_upgrade import (
convert_reservoirs_to_wflow_v1_sbm,
convert_to_wflow_v1_sbm,
)
__all__ = ["WflowModel"]
__hydromt_eps__ = ["WflowModel"] # core entrypoints
logger = logging.getLogger(f"hydromt.{__name__}")
[docs]
class WflowModel(Model):
"""Read or Write a wflow model.
Parameters
----------
root : str, optional
Model root, by default None (current working directory)
config_filename : str, optional
A path relative to the root where the configuration file will
be read and written if user does not provide a path themselves.
By default "wflow_sbm.toml"
mode : {'r','r+','w'}, optional
read/append/write mode, by default "w"
data_libs : list[str] | str, optional
List of data catalog configuration files, by default None
**catalog_keys:
Additional keyword arguments to be passed down to the DataCatalog.
"""
name: str = "wflow"
_MODEL_VERSION = None
# TODO supported model version should be filled by the plugins
# e.g. _MODEL_VERSION = ">=1.0, <1.1
_DATADIR = utils.DATADIR
_DEFAULT_TEMPLATE_FILENAME = join(_DATADIR, "wflow", "wflow_sbm.toml")
_CATALOGS = join(_DATADIR, "parameters_data.yml")
[docs]
def __init__(
self,
root: str | None = None,
config_filename: str = "wflow_sbm.toml",
mode: str = "w",
data_libs: list[str] | str | None = None,
**catalog_keys,
):
# Define components when they are implemented
components = {
"config": WflowConfigComponent(
self,
filename=str(config_filename),
default_template_filename=self._DEFAULT_TEMPLATE_FILENAME,
),
"forcing": WflowForcingComponent(self, region_component="staticmaps"),
"geoms": WflowGeomsComponent(self, region_component="staticmaps"),
"states": WflowStatesComponent(self, region_component="staticmaps"),
"staticmaps": WflowStaticmapsComponent(self),
"tables": WflowTablesComponent(self),
"output_grid": WflowOutputGridComponent(
self, region_component="staticmaps"
),
"output_scalar": WflowOutputScalarComponent(self),
"output_csv": WflowOutputCsvComponent(
self, locations_component="staticmaps"
),
}
super().__init__(
root,
components=components,
mode=mode,
region_component="staticmaps",
data_libs=data_libs,
**catalog_keys,
)
# wflow specific
self._flwdir = None
self.data_catalog.from_yml(self._CATALOGS)
# Supported Wflow.jl version
logger.info("Supported Wflow.jl version v1+")
# hydromt mapping and wflow variable names
self._MAPS, self._WFLOW_NAMES = _create_hydromt_wflow_mapping_sbm(
self.config.data
)
## Properties
# Components
@property
def config(self) -> WflowConfigComponent:
"""Return the config component."""
return self.components["config"]
@property
def forcing(self) -> WflowForcingComponent:
"""Return the forcing component."""
return self.components["forcing"]
@property
def geoms(self) -> WflowGeomsComponent:
"""Return the geoms component."""
return self.components["geoms"]
@property
def states(self) -> WflowStatesComponent:
"""Return the states component."""
return self.components["states"]
@property
def staticmaps(self) -> WflowStaticmapsComponent:
"""Return the staticmaps component."""
return self.components["staticmaps"]
@property
def tables(self) -> WflowTablesComponent:
"""Return the WflowTablesComponent instance."""
return self.components["tables"]
@property
def output_grid(self) -> WflowOutputGridComponent:
"""Return the WflowOutputGridComponent instance."""
return self.components["output_grid"]
@property
def output_scalar(self) -> WflowOutputScalarComponent:
"""Return the WflowOutputScalarComponent instance."""
return self.components["output_scalar"]
@property
def output_csv(self) -> WflowOutputCsvComponent:
"""Return the WflowOutputCsvComponent instance."""
return self.components["output_csv"]
# Non model component properties
@property
def basins(self) -> gpd.GeoDataFrame | None:
"""Returns a basin(s) geometry as a geopandas.GeoDataFrame."""
if "basins" in self.geoms.data:
gdf = self.geoms.get("basins")
elif self._MAPS["basins"] in self.staticmaps.data:
gdf = (
self.staticmaps.data[self._MAPS["basins"]]
.raster.vectorize()
.set_index("value")
.sort_index()
)
self.set_geoms(gdf, name="basins")
else:
logger.warning(f"Basin map {self._MAPS['basins']} not found in grid.")
gdf = None
return gdf
@property
def basins_highres(self) -> gpd.GeoDataFrame | None:
"""Returns a high resolution basin(s) geometry."""
if "basins_highres" in self.geoms.data:
gdf = self.geoms.get("basins_highres")
else:
gdf = self.basins
return gdf
@property
def rivers(self) -> gpd.GeoDataFrame | None:
"""Return a river geometry as a geopandas.GeoDataFrame.
If available, the stream order and upstream area values are added to
the geometry properties.
"""
if "rivers" in self.geoms.data:
gdf = self.geoms.get("rivers")
elif self._MAPS["rivmsk"] in self.staticmaps.data:
rivmsk = self.staticmaps.data[self._MAPS["rivmsk"]].values != 0
# Check if there are river cells in the model before continuing
if np.any(rivmsk):
# add stream order 'strord' column
strord = self.flwdir.stream_order(mask=rivmsk)
feats = self.flwdir.streams(mask=rivmsk, strord=strord)
gdf = gpd.GeoDataFrame.from_features(feats)
gdf.crs = pyproj.CRS.from_user_input(self.crs)
self.set_geoms(gdf, name="rivers")
else:
logger.warning("No river cells detected in the selected basin.")
gdf = None
return gdf
## SETUP METHODS
[docs]
@hydromt_step
def setup_config(self, data: Dict[str, Any]):
"""Set the config dictionary at key(s) with values.
Parameters
----------
data : Dict[str, Any]
A dictionary with the values to be set. keys can be dotted like in
:py:meth:`~hydromt_wflow.components.config.WflowConfigComponent.set`
Examples
--------
Setting data as a nested dictionary::
>> self.setup_config({'a': 1, 'b': {'c': {'d': 2}}})
>> self.config.data
{'a': 1, 'b': {'c': {'d': 2}}}
Setting data using dotted notation::
>> self.setup_config({'a.d.f.g': 1, 'b': {'c': {'d': 2}}})
>> self.config.data
{'a': {'d':{'f':{'g': 1}}}, 'b': {'c': {'d': 2}}}
"""
self.config.update(data)
[docs]
@hydromt_step
def setup_basemaps(
self,
region: Dict,
hydrography_fn: str | xr.Dataset,
basin_index_fn: str | xr.Dataset | None = None,
res: float | int = 1 / 120.0,
upscale_method: str = "ihu",
output_names: Dict = {
"basin__local_drain_direction": "local_drain_direction",
"subbasin_location__count": "subcatchment",
"land_surface__slope": "land_slope",
},
):
"""
Build the DEM and flow direction for a Wflow model.
Setup basemaps sets the `region` of interest and `res`
(resolution in degrees) of the model.
All DEM and flow direction related maps are then build.
We strongly recommend using the region types ``basin`` or ``subbasin`` to build
a wflow model. If you know what you are doing, you can also use the ``bbox``
region type (e.g for an island) with the bbox coordinates in EPSG 4326 or the
``geom`` region type (e.g. basin polygons have been pre-processed and match
EXACTLY with ``hydrography_fn``).
E.g. of `region` argument for a subbasin based on a point and snapped using
upstream area threshold in `hydrography_fn`, where the maximum boundary box of
the output subbasin is known:
region = {'subbasin': [x,y], 'uparea': 10, 'bounds': [xmin, ymin, xmax, ymax]}
(Sub)Basin delineation is done using hydromt.workflows.get_basin_geometry
method. Because the delineation is computed from the flow direction data in
memory, to avoid memory error when using large datasets in `hydrography_fn`, the
user can either supply 'bounds' in `region` or a basin index dataset in
`basin_index_fn` to limit the flow direction data to the region of interest.
The basin index dataset is a GeoDataframe containing either basins polygons or
bounding boxes of basin boundaries. To select the correct basins, basin ID
'basins' in `hydrography_fn` and `basin_index_fn` should match.
If the model resolution is larger than the source data resolution,
the flow direction is upscaled using the `upscale_method`, by default the
Iterative Hydrography Upscaling (IHU).
The default `hydrography_fn` is "merit_hydro"
(`MERIT hydro <http://hydro.iis.u-tokyo.ac.jp/~yamadai/MERIT_Hydro/index.html>`_
at 3 arcsec resolution).
Alternative sources include "merit_hydro_1k" at 30 arcsec resolution.
Users can also supply their own elevation and flow direction data
in any CRS and not only EPSG:4326. The ArcGIS D8 convention is supported
(see also `PyFlwDir documentation
<https://deltares.github.io/pyflwdir/latest/_examples/flwdir.html>`).
Note that in order to define the region, using points or bounding box,
the coordinates of the points / bounding box
should be in the same CRS than the hydrography data.
The wflow model will then also be in the same CRS than the
hydrography data in order to avoid assumptions and reprojection errors.
If the user wishes to use a different CRS,
we recommend first to reproject the hydrography data separately,
before calling hydromt build.
You can find examples on how to reproject or prepare hydrography data in the
`prepare flow directions example notebook
<https://deltares.github.io/hydromt_wflow/latest/_examples/prepare_ldd.html>`_.
Adds model layers:
* **local_drain_direction** map: flow direction in LDD format [-]
* **subcatchment** map: basin ID map [-]
* **meta_upstream_area** map: upstream area [km2]
* **meta_streamorder** map: Strahler stream order [-]
* **land_elevation** map: average elevation [m+REF]
* **meta_subgrid_elevation** map: subgrid outlet elevation [m+REF]
* **land_slope** map: average land surface slope [m/m]
* **basins** geom: basins boundary vector
* **region** geom: region boundary vector
Parameters
----------
region : dict
Dictionary describing region of interest.
See :py:meth:`hydromt.workflows.basin_mask.parse_region()` for all options
hydrography_fn : str, xarray.Dataset
Name of RasterDataset source for basemap parameters.
* Required variables: 'flwdir' [LLD or D8 or NEXTXY], 'elevtn' [m+REF]
* Required variables if used with `basin_index_fn`: 'basins' [-]
* Required variables if used for snapping in `region`: 'uparea' [km2],
'strord' [-]
* Optional variables: 'lndslp' [m/m], 'mask' [bool]
basin_index_fn : str, geopandas.GeoDataFrame, optional
Name of GeoDataFrame source for basin_index data linked to hydrography_fn.
* Required variables: 'basid' [-]
res : float, optional
Output model resolution
upscale_method : {'ihu', 'eam', 'dmm'}, optional
Upscaling method for flow direction data, by default 'ihu'.
output_names : dict, optional
Dictionary with output names that will be used in the model netcdf input
files. Users should provide the Wflow.jl variable name followed by the name
in the netcdf file.
See Also
--------
hydromt.workflows.parse_region
hydromt.workflows.get_basin_geometry
workflows.hydrography
workflows.topography
"""
logger.info("Preparing base hydrography basemaps.")
# update self._MAPS and self._WFLOW_NAMES with user defined output names
self._update_naming(output_names)
geometries, xy, ds_org = workflows.parse_region(
data_catalog=self.data_catalog,
region=region,
hydrography_fn=hydrography_fn,
resolution=res,
basin_index_fn=basin_index_fn,
)
# setup hydrography maps and set staticmap attribute with renamed maps
ds_base, _ = workflows.hydrography(
ds=ds_org,
res=res,
xy=xy,
upscale_method=upscale_method,
)
# Rename and add to grid
# rename idx_out coords
if "idx_out" in ds_base:
ds_base = ds_base.rename({"idx_out": "meta_subgrid_outlet_idx"})
rmdict = {k: self._MAPS.get(k, k) for k in ds_base.data_vars}
self.set_grid(ds_base.rename(rmdict))
# Add basin geometries after grid is set to avoid warning
logger.info("Adding basin shape to staticgeoms.")
for name, _geom in geometries.items():
self.set_geoms(_geom, name=name)
# update config
# skip adding elevtn to config as it will only be used if floodplain 2d are on
rmdict = {k: v for k, v in rmdict.items() if k != "elevtn"}
self._update_config_variable_name(ds_base.rename(rmdict).data_vars, None)
# Call basins once to set it
self.basins
# setup topography maps
ds_topo = workflows.topography(
ds=ds_org,
ds_like=self.staticmaps.data,
method="average",
)
rmdict = {k: self._MAPS.get(k, k) for k in ds_topo.data_vars}
self.set_grid(ds_topo.rename(rmdict))
# update config
# skip adding elevtn to config as it will only be used if floodplain 2d are on
rmdict = {k: v for k, v in rmdict.items() if k != "elevtn"}
self._update_config_variable_name(ds_topo.rename(rmdict).data_vars)
# update toml for degree/meters if needed
if ds_base.raster.crs.is_projected:
self.set_config("model.cell_length_in_meter__flag", True)
[docs]
@hydromt_step
def setup_rivers(
self,
hydrography_fn: str | xr.Dataset,
river_geom_fn: str | gpd.GeoDataFrame | None = None,
river_upa: float = 30,
rivdph_method: str = "powlaw",
slope_len: float = 2e3,
min_rivlen_ratio: float = 0.0,
min_rivdph: float = 1,
min_rivwth: float = 30,
smooth_len: float = 5e3,
rivman_mapping_fn: str
| Path
| pd.DataFrame = "roughness_river_mapping_default",
elevtn_map: str = "land_elevation",
river_routing: str = "kinematic-wave",
connectivity: int = 8,
strord_name: str = "meta_streamorder",
output_names: Dict = {
"river_location__mask": "river_mask",
"river__length": "river_length",
"river__width": "river_width",
"river_bank_water__depth": "river_depth",
"river__slope": "river_slope",
"river_water_flow__manning_n_parameter": "river_manning_n",
"river_bank_water__elevation": "river_bank_elevation",
},
):
"""
Set all river parameter maps.
The river mask is defined by all cells with a minimum upstream area threshold
``river_upa`` [km2].
The river length is defined as the distance from the subgrid outlet pixel to
the next upstream subgrid outlet pixel. The ``min_rivlen_ratio`` is the minimum
global river length to avg. cell resolution ratio and is used as a threshold in
window based smoothing of river length.
The river slope is derived from the subgrid elevation difference between pixels
at a half distance ``slope_len`` [m] up-
and downstream from the subgrid outlet pixel.
The river manning roughness coefficient is derived based on reclassification
of the streamorder map using a lookup table ``rivman_mapping_fn``.
The river width is derived from the nearest river segment in ``river_geom_fn``.
Data gaps are filled by the nearest valid upstream value and averaged along
the flow directions over a length ``smooth_len`` [m]
The river depth can be directly derived from ``river_geom_fn`` property or
calculated using the ``rivdph_method``, by default powlaw:
h = hc*Qbf**hp, which is based on qbankfull discharge from the nearest river
segment in ``river_geom_fn`` and takes optional arguments for the hc
(default = 0.27) and hp (default = 0.30) parameters. For other methods see
:py:meth:`hydromt.workflows.river_depth`.
If ``river_routing`` is set to "local-inertial", the bankfull elevation map
can be conditioned based on the average cell elevation ("land_elevation")
or subgrid outlet pixel elevation ("meta_subgrid_elevation").
The subgrid elevation might provide a better representation
of the river elevation profile, however in combination with
local-inertial land routing (see :py:meth:`setup_floodplains`)
the subgrid elevation will likely overestimate the floodplain storage capacity.
Note that the same input elevation map should be used for river bankfull
elevation and land elevation when using local-inertial land routing.
Adds model layers:
* **wflow_river** map: river mask [-]
* **river_length** map: river length [m]
* **river_width** map: river width [m]
* **river_depth** map: bankfull river depth [m]
* **river_slope** map: river slope [m/m]
* **river_manning_n** map: Manning coefficient for river cells [s.m^1/3]
* **rivers** geom: river vector based on wflow_river mask
* **river_bank_elevation** map: hydrologically conditioned elevation [m+REF]
Parameters
----------
hydrography_fn : str, Path, xarray.Dataset
Name of RasterDataset source for hydrography data.
Must be same as setup_basemaps for consistent results.
* Required variables: 'flwdir' [LLD or D8 or NEXTXY], 'uparea' [km2],
'elevtn'[m+REF]
* Optional variables: 'rivwth' [m], 'qbankfull' [m3/s]
river_geom_fn : str, Path, geopandas.GeoDataFrame, optional
Name of GeoDataFrame source for river data.
* Required variables: 'rivwth' [m], 'rivdph' [m] or 'qbankfull' [m3/s]
river_upa : float, optional
Minimum upstream area threshold for the river map [km2]. By default 30.0
slope_len : float, optional
Length over which the river slope is calculated [km]. By default 2.0
min_rivlen_ratio: float, optional
Ratio of cell resolution used minimum length threshold in a moving
window based smoothing of river length, by default 0.0
The river length smoothing is skipped if `min_riverlen_ratio` = 0.
For details about the river length smoothing,
see :py:meth:`pyflwdir.FlwdirRaster.smooth_rivlen`
rivdph_method : {'gvf', 'manning', 'powlaw'}
see :py:meth:`hydromt.workflows.river_depth` for details, by default \
"powlaw"
river_routing : {'kinematic-wave', 'local-inertial'}
Routing methodology to be used, by default "kinematic-wave".
smooth_len : float, optional
Length [m] over which to smooth the output river width and depth,
by default 5e3
min_rivdph : float, optional
Minimum river depth [m], by default 1.0
min_rivwth : float, optional
Minimum river width [m], by default 30.0
elevtn_map : str, optional
Name of the elevation map in the current WflowModel.staticmaps.
By default "land_elevation"
output_names : dict, optional
Dictionary with output names that will be used in the model netcdf input
files. Users should provide the Wflow.jl variable name followed by the name
in the netcdf file.
See Also
--------
workflows.river_bathymetry
hydromt.workflows.river_depth
pyflwdir.FlwdirRaster.river_depth
setup_floodplains
"""
logger.info("Preparing river maps.")
# update self._MAPS and self._WFLOW_NAMES with user defined output names
self._update_naming(output_names)
# check for streamorder
if self._MAPS["strord"] not in self.staticmaps.data:
if strord_name not in self.staticmaps.data:
raise ValueError(
f"Streamorder map {strord_name} not found in grid. "
"Please run setup_basemaps or update the strord_name argument."
)
else:
self._MAPS["strord"] = strord_name
# Check that river_upa threshold is bigger than the maximum uparea in the grid
if river_upa > float(self.staticmaps.data[self._MAPS["uparea"]].max()):
raise ValueError(
f"river_upa threshold {river_upa} should be larger than the maximum \
uparea in the grid {float(self.staticmaps.data[self._MAPS['uparea']].max())} in order \
to create river cells."
)
rivdph_methods = ["gvf", "manning", "powlaw"]
if rivdph_method not in rivdph_methods:
raise ValueError(f'"{rivdph_method}" unknown. Select from {rivdph_methods}')
routing_options = ["kinematic-wave", "local-inertial"]
if river_routing not in routing_options:
raise ValueError(
f'river_routing="{river_routing}" unknown. \
Select from {routing_options}.'
)
# read data
ds_hydro = self.data_catalog.get_rasterdataset(
hydrography_fn, geom=self.region, buffer=10
)
ds_hydro.coords["mask"] = ds_hydro.raster.geometry_mask(self.region)
# get rivmsk, rivlen, rivslp
# read model maps and revert wflow to hydromt map names
inv_rename = {v: k for k, v in self._MAPS.items() if v in self.staticmaps.data}
ds_riv = workflows.river(
ds=ds_hydro,
ds_model=self.staticmaps.data.rename(inv_rename),
river_upa=river_upa,
slope_len=slope_len,
channel_dir="up",
min_rivlen_ratio=min_rivlen_ratio,
)[0]
ds_riv["rivmsk"] = ds_riv["rivmsk"].assign_attrs(
river_upa=river_upa, slope_len=slope_len, min_rivlen_ratio=min_rivlen_ratio
)
dvars = ["rivmsk", "rivlen", "rivslp"]
rmdict = {k: self._MAPS.get(k, k) for k in dvars}
self.set_grid(ds_riv[dvars].rename(rmdict))
# update config
for dvar in dvars:
if dvar == "rivmsk":
self._update_config_variable_name(self._MAPS[dvar], data_type=None)
else:
self._update_config_variable_name(self._MAPS[dvar])
# TODO make separate workflows.river_manning method
# Make river_manning_n map from csv file with mapping
# between streamorder and river_manning_n value
strord = self.staticmaps.data[self._MAPS["strord"]].copy()
df = self.data_catalog.get_dataframe(rivman_mapping_fn)
# max streamorder value above which values get the same river_manning_n value
max_str = df.index[-2]
nodata = df.index[-1]
# if streamorder value larger than max_str, assign last value
strord = strord.where(strord <= max_str, max_str)
# handle missing value (last row of csv is mapping of missing values)
strord = strord.where(strord != strord.raster.nodata, nodata)
strord.raster.set_nodata(nodata)
ds_nriver = workflows.landuse(
da=strord,
ds_like=self.staticmaps.data,
df=df,
)
rmdict = {k: self._MAPS.get(k, k) for k in ds_nriver.data_vars}
self.set_grid(ds_nriver.rename(rmdict))
# update config
self._update_config_variable_name(ds_nriver.rename(rmdict).data_vars)
# get rivdph, rivwth
# while we still have setup_riverwidth one can skip river_bathymetry here
# TODO make river_geom_fn required after removing setup_riverwidth
if river_geom_fn is not None:
gdf_riv = self.data_catalog.get_geodataframe(
river_geom_fn, geom=self.region
)
# reread model data to get river maps
inv_rename = {
v: k for k, v in self._MAPS.items() if v in self.staticmaps.data
}
ds_riv1 = workflows.river_bathymetry(
ds_model=self.staticmaps.data.rename(inv_rename),
gdf_riv=gdf_riv,
method=rivdph_method,
smooth_len=smooth_len,
min_rivdph=min_rivdph,
min_rivwth=min_rivwth,
)
rmdict = {k: self._MAPS.get(k, k) for k in ds_riv1.data_vars}
self.set_grid(ds_riv1.rename(rmdict))
# update config
self._update_config_variable_name(ds_riv1.rename(rmdict).data_vars)
logger.info("Adding rivers vector to geoms.")
if "rivers" in self.geoms.data:
self.geoms.pop("rivers") # remove old rivers if in geoms
self.rivers # add new rivers to geoms
# Add hydrologically conditioned elevation map for the river, if required
logger.info(f'Update wflow config model.river_routing="{river_routing}"')
self.set_config("model.river_routing", river_routing)
if river_routing == "local-inertial":
postfix = {
"land_elevation": "_avg",
"meta_subgrid_elevation": "_subgrid",
}.get(elevtn_map, "")
name = f"river_bank_elevation{postfix}"
# Check if users wanted a specific name for the hydrodem
hydrodem_var = self._WFLOW_NAMES.get(self._MAPS["hydrodem"])
if hydrodem_var in output_names:
name = output_names[hydrodem_var]
self._update_naming({hydrodem_var: name})
ds_out = flw.dem_adjust(
da_flwdir=self.staticmaps.data[self._MAPS["flwdir"]],
da_elevtn=self.staticmaps.data[elevtn_map],
da_rivmsk=self.staticmaps.data[self._MAPS["rivmsk"]],
flwdir=self.flwdir,
connectivity=connectivity,
river_d8=True,
).rename(name)
self.set_grid(ds_out)
# update toml model.river_routing
self._update_config_variable_name(name)
[docs]
@hydromt_step
def setup_floodplains(
self,
hydrography_fn: str | xr.Dataset,
floodplain_type: str,
### Options for 1D floodplains
river_upa: float | None = None,
flood_depths: List = [0.5, 1.0, 1.5, 2.0, 2.5, 3.0, 4.0, 5.0],
### Options for 2D floodplains
elevtn_map: str = "land_elevation",
connectivity: int = 4,
output_names: Dict = {
"floodplain_water__sum_of_volume-per-depth": "floodplain_volume",
"river_bank_elevation": "river_bank_elevation_avg_D4",
},
):
"""
Add floodplain information to the model schematisation.
The user can define what type of floodplains are required (1D or 2D),
through the ``floodplain_type`` argument.
If ``floodplain_type`` is set to "1d", a floodplain profile is derived for every
river cell. It adds a map with floodplain volume per flood depth,
which is used in the wflow 1D floodplain schematisation.
Note, it is important to use the same river uparea value as used in the
:py:meth:`setup_rivers` method.
If ``floodplain_type`` is set to "2d", this component adds
a hydrologically conditioned elevation (river_bank_elevation) map for
land routing (local-inertial). For this options, landcells need to be
conditioned to D4 flow directions otherwise pits may remain in the land cells.
The conditioned elevation can be based on the average cell elevation
("land_elevation") or subgrid outlet pixel elevation ("meta_subgrid_elevation").
Note that the subgrid elevation will likely overestimate
the floodplain storage capacity.
Additionally, note that the same input elevation map should be used for river
bankfull elevation and land elevation when using local-inertial land routing.
Requires :py:meth:`setup_rivers` to be executed beforehand
(with ``river_routing`` set to "local-inertial").
Adds model layers:
* **floodplain_volume** map: map with floodplain volumes, has flood depth as \
third dimension [m3] (for 1D floodplains)
* **river_bank_elevation** map: hydrologically conditioned elevation [m+REF]
(for 2D floodplains)
Parameters
----------
floodplain_type: {"1d", "2d"}
Option defining the type of floodplains, see below what arguments
are related to the different floodplain types
hydrography_fn : str, Path, xarray.Dataset
Name of RasterDataset source for hydrography data. Must be same as
setup_basemaps for consistent results.
* Required variables: ['flwdir', 'uparea', 'elevtn']
river_upa : float, optional
(1D floodplains) minimum upstream area threshold for drain in the HAND.
Optional value, as it is inferred from the grid metadata,
to be consistent with setup_rivers.
flood_depths : tuple of float, optional
(1D floodplains) flood depths at which a volume is derived.
By default [0.5, 1.0, 1.5, 2.0, 2.5, 3.0, 4.0, 5.0]
elevtn_map: {"land_elevation", "meta_subgrid_elevation"}
(2D floodplains) Name of staticmap to hydrologically condition.
By default "land_elevation"
output_names : dict, optional
Dictionary with output names that will be used in the model netcdf input
files. Users should provide the Wflow.jl variable name followed by the name
in the netcdf file.
See Also
--------
workflows.river_floodplain_volume
hydromt.flw.dem_adjust
pyflwdir.FlwdirRaster.dem_adjust
setup_rivers
"""
if self.get_config("model.river_routing") != "local-inertial":
raise ValueError(
"Floodplains (1d or 2d) are currently only supported with \
local inertial river routing"
)
# update self._MAPS and self._WFLOW_NAMES with user defined output names
var = "floodplain_water__sum_of_volume-per-depth"
if var in output_names:
self._update_naming({var: output_names[var]})
r_list = ["1d", "2d"]
if floodplain_type not in r_list:
raise ValueError(
f'river_routing="{floodplain_type}" unknown. Select from {r_list}.'
)
# Adjust settings based on floodplain_type selection
if floodplain_type == "1d":
floodplain_1d = True
land_routing = "kinematic-wave"
if not hasattr(pyflwdir.FlwdirRaster, "ucat_volume"):
logger.warning("This method requires pyflwdir >= 0.5.6")
return
logger.info("Preparing 1D river floodplain_volume map.")
# read data
ds_hydro = self.data_catalog.get_rasterdataset(
hydrography_fn, geom=self.region, buffer=10
)
ds_hydro.coords["mask"] = ds_hydro.raster.geometry_mask(self.region)
# try to get river uparea from grid, throw error if not specified
# or when found but different from specified value
new_river_upa = self.staticmaps.data[self._MAPS["rivmsk"]].attrs.get(
"river_upa", river_upa
)
if new_river_upa is None:
raise ValueError(
"No value for `river_upa` specified, and the value cannot \
be inferred from the grid attributes"
)
elif new_river_upa != river_upa and river_upa is not None:
raise ValueError(
f"Value specified for river_upa ({river_upa}) is different from \
the value found in the grid ({new_river_upa})"
)
logger.debug(f"Using river_upa value value of: {new_river_upa}")
# get river floodplain volume
inv_rename = {
v: k for k, v in self._MAPS.items() if v in self.staticmaps.data
}
da_fldpln = workflows.river_floodplain_volume(
ds=ds_hydro,
ds_model=self.staticmaps.data.rename(inv_rename),
river_upa=new_river_upa,
flood_depths=flood_depths,
)
# check if the layer already exists, since overwriting with different
# flood_depth values is not working properly if this is the case
if self._MAPS["floodplain_volume"] in self.staticmaps.data:
logger.warning(
"Layer `floodplain_volume` already in grid, removing layer \
and `flood_depth` dimension to ensure correctly \
setting new flood_depth dimensions"
)
self._grid = self._grid.drop_dims("flood_depth")
da_fldpln.name = self._MAPS["floodplain_volume"]
self.set_grid(da_fldpln)
self._update_config_variable_name(da_fldpln.name)
elif floodplain_type == "2d":
floodplain_1d = False
land_routing = "local-inertial"
if elevtn_map not in self.staticmaps.data:
raise ValueError(f'"{elevtn_map}" not found in grid')
postfix = {
"land_elevation": "_avg",
"meta_subgrid_elevation": "_subgrid",
}.get(elevtn_map, "")
name = f"river_bank_elevation{postfix}_D{connectivity}"
# Check if users wanted a specific name for the river_bank_elevation
hydrodem_var = self._WFLOW_NAMES.get(self._MAPS["hydrodem"])
lndelv_var = self._WFLOW_NAMES.get(self._MAPS["elevtn"])
# river_bank_elevation is used for two wflow variables
if hydrodem_var in output_names:
name = output_names[hydrodem_var]
self._update_naming(
{
hydrodem_var: name,
lndelv_var: elevtn_map,
}
)
logger.info(f"Preparing {name} map for land routing.")
ds_out = flw.dem_adjust(
da_flwdir=self.staticmaps.data[self._MAPS["flwdir"]],
da_elevtn=self.staticmaps.data[elevtn_map],
da_rivmsk=self.staticmaps.data[self._MAPS["rivmsk"]],
flwdir=self.flwdir,
connectivity=connectivity,
river_d8=True,
).rename(name)
self.set_grid(ds_out)
# Update the bankfull elevation map
self.set_config("input.static.river_bank_water__elevation", name)
# In this case river_bank_elevation is also used for the ground elevation?
self.set_config(
"input.static.land_surface_water_flow__ground_elevation", elevtn_map
)
# Update config
logger.info(
f'Update wflow config model.floodplain_1d__flag="{floodplain_1d}"',
)
self.set_config("model.floodplain_1d__flag", floodplain_1d)
logger.info(f'Update wflow config model.land_routing="{land_routing}"')
self.set_config("model.land_routing", land_routing)
if floodplain_type == "1d":
# Add states
self.set_config(
"state.variables.floodplain_water__instantaneous_volume_flow_rate",
"floodplain_instantaneous_q",
)
self.set_config(
"state.variables.floodplain_water__instantaneous_depth",
"floodplain_instantaneous_h",
)
self.set_config(
"state.variables.land_surface_water__instantaneous_volume_flow_rate",
"land_instantaneous_q",
)
# Remove local-inertial land states
self.config.remove(
"state.variables.land_surface_water__x_component_of_instantaneous_volume_flow_rate",
errors="ignore",
)
self.config.remove(
"state.variables.land_surface_water__y_component_of_instantaneous_volume_flow_rate",
errors="ignore",
)
# Remove from output.netcdf_grid section
self.config.remove(
"output.netcdf_grid.variables.land_surface_water__x_component_of_instantaneous_volume_flow_rate",
errors="ignore",
)
self.config.remove(
"output.netcdf_grid.variables.land_surface_water__y_component_of_instantaneous_volume_flow_rate",
errors="ignore",
)
else:
# Add local-inertial land routing states
self.set_config(
"state.variables.land_surface_water__x_component_of_instantaneous_volume_flow_rate",
"land_instantaneous_qx",
)
self.set_config(
"state.variables.land_surface_water__y_component_of_instantaneous_volume_flow_rate",
"land_instantaneous_qy",
)
# Remove kinematic-wave and 1d floodplain states
self.config.remove(
"state.variables.land_surface_water__instantaneous_volume_flow_rate",
errors="ignore",
)
self.config.remove(
"state.variables.floodplain_water__instantaneous_volume_flow_rate",
errors="ignore",
)
self.config.remove(
"state.variables.floodplain_water__instantaneous_depth",
errors="ignore",
)
# Remove from output.netcdf_grid section
self.config.remove(
"output.netcdf_grid.variables.land_surface_water__instantaneous_volume_flow_rate",
errors="ignore",
)
@hydromt_step
def setup_riverwidth(
self,
predictor: str = "discharge",
fill: bool = False,
fit: bool = False,
min_wth: float = 1.0,
precip_fn: str | xr.DataArray = "chelsa",
climate_fn: str | xr.DataArray = "koppen_geiger",
output_name: str = "river_width",
**kwargs,
):
"""
Set the river width parameter based on power-law relationship with a predictor.
By default the riverwidth is estimated based on discharge as ``predictor``
and used to set the riverwidth globally based on pre-defined power-law
parameters per climate class. With ``fit`` set to True,
the power-law relationships parameters are set on-the-fly.
With ``fill`` set to True, the estimated river widths are only used
to fill gaps in the observed data. Alternative ``predictor`` values
are precip (accumulated precipitation) and uparea (upstream area).
For these predictors values ``fit`` default to True.
By default the predictor is based on discharge which is estimated through
multiple linear regression with precipitation and upstream area
per climate zone.
* **river_width** map: river width [m]
Parameters
----------
predictor : {"discharge", "precip", "uparea"}
Predictor used in the power-law equation: width = a * predictor ^ b.
Discharge is based on multiple linear regression per climate zone.
Precip is based on the 10x the daily average
accumulated precipitation [m3/s].
Uparea is based on the upstream area grid [km2].
Other variables, e.g. bankfull discharge, can also be provided if present
in the grid
fill : bool, optional
If True (default), use estimate to fill gaps, outliers and lake/res areas
in observed width data (if present);
if False, set all riverwidths based on predictor
(automatic choice if no observations found)
fit : bool, optional
If True, the power-law parameters are fitted on the fly
By default True for all but "discharge" predictor.
A-priori derived parameters will be overwritten if True.
a, b : float, optional kwarg
Manual power-law parameters
min_wth : float, optional
Minimum river width, by default 1.0
precip_fn : str, xarray.DataArray
Source of long term precipitation grid if the predictor
is set to 'discharge' or 'precip'. By default "chelsa".
climate_fn: str, xarray.DataArray
Source of long-term climate grid if the predictor is set to 'discharge'.
By default "koppen_geiger".
output_name : str
The name of the output river__width map.
"""
logger.warning(
'The "setup_riverwidth" method has been deprecated \
and will soon be removed. '
'You can now use the "setup_river" method for all river parameters.'
)
if self._MAPS["rivmsk"] not in self.staticmaps.data:
raise ValueError(
"The setup_riverwidth method requires to run setup_river method first."
)
# update self._MAPS and self._WFLOW_NAMES with user defined output names
wflow_var = "river__width"
self._update_naming({wflow_var: output_name})
# derive river width
data = {}
if predictor in ["discharge", "precip"]:
da_precip = self.data_catalog.get_rasterdataset(
precip_fn, geom=self.region, buffer=2
)
da_precip.name = precip_fn
data["da_precip"] = da_precip
if predictor == "discharge":
da_climate = self.data_catalog.get_rasterdataset(
climate_fn, geom=self.region, buffer=2
)
da_climate.name = climate_fn
data["da_climate"] = da_climate
inv_rename = {v: k for k, v in self._MAPS.items() if v in self.staticmaps.data}
da_rivwth = workflows.river_width(
ds_like=self.staticmaps.data.rename(inv_rename),
flwdir=self.flwdir,
data=data,
fill=fill,
fill_outliers=kwargs.pop("fill_outliers", fill),
min_wth=min_wth,
mask_names=["lake_area_id", "reservoir_area_id"],
predictor=predictor,
a=kwargs.get("a", None),
b=kwargs.get("b", None),
fit=fit,
**kwargs,
)
self.set_grid(da_rivwth, name=output_name)
self._update_config_variable_name(output_name)
[docs]
@hydromt_step
def setup_lulcmaps(
self,
lulc_fn: str | xr.DataArray,
lulc_mapping_fn: str | Path | pd.DataFrame | None = None,
lulc_vars: Dict = {
"landuse": None,
"vegetation_kext": "vegetation_canopy__light-extinction_coefficient",
"land_manning_n": "land_surface_water_flow__manning_n_parameter",
"soil_compacted_fraction": "soil~compacted__area_fraction",
"vegetation_root_depth": "vegetation_root__depth",
"vegetation_leaf_storage": "vegetation__specific-leaf_storage",
"vegetation_wood_storage": "vegetation_wood_water__storage_capacity",
"land_water_fraction": "land~water-covered__area_fraction",
"vegetation_crop_factor": "vegetation__crop_factor",
"vegetation_feddes_alpha_h1": "vegetation_root__feddes_critial_pressure_head_h~1_reduction_coefficient", # noqa: E501
"vegetation_feddes_h1": "vegetation_root__feddes_critial_pressure_head_h~1",
"vegetation_feddes_h2": "vegetation_root__feddes_critial_pressure_head_h~2",
"vegetation_feddes_h3_high": "vegetation_root__feddes_critial_pressure_head_h~3~high", # noqa: E501
"vegetation_feddes_h3_low": "vegetation_root__feddes_critial_pressure_head_h~3~low", # noqa: E501
"vegetation_feddes_h4": "vegetation_root__feddes_critial_pressure_head_h~4",
},
output_names_suffix: str | None = None,
):
"""
Derive several wflow maps based on landuse-landcover (LULC) data.
Lookup table `lulc_mapping_fn` columns are converted to lulc classes model
parameters based on literature. The data is remapped at its original resolution
and then resampled to the model resolution using the average value, unless noted
differently.
Currently, if `lulc_fn` is set to the "vito", "globcover", "esa_worldcover"
"corine" or "glmnco", default lookup tables are available and will be used if
`lulc_mapping_fn` is not provided.
Adds model layers:
* **landuse** map:
Landuse class [-]
* **vegetation_kext** map:
Extinction coefficient in the canopy gap fraction equation [-]
* **vegetation_leaf_storage** map:
Specific leaf storage [mm]
* **vegetation_wood_storage** map:
Fraction of wood in the vegetation/plant [-]
* **vegetation_root_depth** map:
Length of vegetation roots [mm]
* **soil_compacted_fraction** map:
The fraction of compacted or urban area per grid cell [-]
* **land_water_fraction** map:
The fraction of open water per grid cell [-]
* **land_manning_n** map:
Manning Roughness [-]
* **vegetation_crop_factor** map:
Crop coefficient [-]
* **vegetation_feddes_alpha_h1** map:
Root water uptake reduction at soil water pressure head
h1 (0 or 1) [-]
* **vegetation_feddes_h1** map:
Soil water pressure head h1 at which root water
uptake is reduced (Feddes) [cm]
* **vegetation_feddes_h2** map:
Soil water pressure head h2 at which root water
uptake is reduced (Feddes) [cm]
* **vegetation_feddes_h3_high** map:
Soil water pressure head h3 (high) at which root water uptake is
reduced (Feddes) [cm]
* **vegetation_feddes_h3_low** map:
Soil water pressure head h3 (low) at which root water uptake is
reduced (Feddes) [cm]
* **vegetation_feddes_h4** map:
Soil water pressure head h4 at which root water
uptake is reduced (Feddes) [cm]
Parameters
----------
lulc_fn : str, xarray.DataArray
Name of RasterDataset source in data_sources.yml file.
lulc_mapping_fn : str, Path, pd.DataFrame
Path to a mapping csv file from landuse in source name to parameter values
in lulc_vars. If lulc_fn is one of {"globcover", "vito", "corine",
"esa_worldcover", "glmnco"}, a default mapping is used and this argument
becomes optional.
lulc_vars : Dict
Dictionnary of landuse parameters to prepare. The names are the
the columns of the mapping file and the values are the corresponding
Wflow.jl variables if any.
output_names_suffix : str, optional
Suffix to be added to the output names to avoid having to rename all the
columns of the mapping tables. For example if the suffix is "vito", all
variables in lulc_vars will be renamed to "landuse_vito", "Kext_vito", etc.
"""
output_names = {
v: f"{k}_{output_names_suffix}" if output_names_suffix else k
for k, v in lulc_vars.items()
}
self._update_naming(output_names)
# As landuse is not a wflow variable, we update the name manually
rmdict = {"landuse": "meta_landuse"} if "landuse" in lulc_vars else {}
if output_names_suffix is not None:
self._MAPS["landuse"] = f"meta_landuse_{output_names_suffix}"
# rename dict for the staticmaps (hydromt names are not used in that case)
rmdict = {k: f"{k}_{output_names_suffix}" for k in lulc_vars.keys()}
if "landuse" in lulc_vars:
rmdict["landuse"] = f"meta_landuse_{output_names_suffix}"
logger.info("Preparing LULC parameter maps.")
if lulc_mapping_fn is None:
lulc_mapping_fn = f"{lulc_fn}_mapping_default"
# read landuse map to DataArray
da = self.data_catalog.get_rasterdataset(
lulc_fn, geom=self.region, buffer=2, variables=["landuse"]
)
df_map = self.data_catalog.get_dataframe(
lulc_mapping_fn,
driver_kwargs={"index_col": 0}, # only used if fn_map is a file path
)
# process landuse
ds_lulc_maps = workflows.landuse(
da=da,
ds_like=self.staticmaps.data,
df=df_map,
params=list(lulc_vars.keys()),
)
self.set_grid(ds_lulc_maps.rename(rmdict))
# Add entries to the config
self._update_config_variable_name(ds_lulc_maps.rename(rmdict).data_vars)
[docs]
@hydromt_step
def setup_lulcmaps_from_vector(
self,
lulc_fn: str | gpd.GeoDataFrame,
lulc_mapping_fn: str | Path | pd.DataFrame | None = None,
lulc_vars: Dict = {
"landuse": None,
"vegetation_kext": "vegetation_canopy__light-extinction_coefficient",
"land_manning_n": "land_surface_water_flow__manning_n_parameter",
"soil_compacted_fraction": "soil~compacted__area_fraction",
"vegetation_root_depth": "vegetation_root__depth",
"vegetation_leaf_storage": "vegetation__specific-leaf_storage",
"vegetation_wood_storage": "vegetation_wood_water__storage_capacity",
"land_water_fraction": "land~water-covered__area_fraction",
"vegetation_crop_factor": "vegetation__crop_factor",
"vegetation_feddes_alpha_h1": "vegetation_root__feddes_critial_pressure_head_h~1_reduction_coefficient", # noqa: E501
"vegetation_feddes_h1": "vegetation_root__feddes_critial_pressure_head_h~1",
"vegetation_feddes_h2": "vegetation_root__feddes_critial_pressure_head_h~2",
"vegetation_feddes_h3_high": "vegetation_root__feddes_critial_pressure_head_h~3~high", # noqa: E501
"vegetation_feddes_h3_low": "vegetation_root__feddes_critial_pressure_head_h~3~low", # noqa: E501
"vegetation_feddes_h4": "vegetation_root__feddes_critial_pressure_head_h~4",
},
lulc_res: float | int | None = None,
all_touched: bool = False,
buffer: int = 1000,
save_raster_lulc: bool = False,
output_names_suffix: str | None = None,
):
"""
Derive several wflow maps based on vector landuse-landcover (LULC) data.
The vector lulc data is first rasterized to a raster map at the model resolution
or at a higher resolution specified in ``lulc_res`` (recommended).
Lookup table `lulc_mapping_fn` columns are converted to lulc classes model
parameters based on literature. The data is remapped at its original resolution
and then resampled to the model resolution using the average value, unless noted
differently.
Adds model layers:
* **landuse** map:
Landuse class [-]
* **vegetation_kext** map:
Extinction coefficient in the canopy gap fraction equation [-]
* **vegetation_leaf_storage** map:
Specific leaf storage [mm]
* **vegetation_wood_storage** map:
Fraction of wood in the vegetation/plant [-]
* **vegetation_root_depth** map:
Length of vegetation roots [mm]
* **soil_compacted_fraction** map:
The fraction of compacted or urban area per grid cell [-]
* **land_water_fraction** map:
The fraction of open water per grid cell [-]
* **land_manning_n** map: Manning Roughness [-]
* **vegetation_crop_factor** map:
Crop coefficient [-]
* **vegetation_feddes_alpha_h1** map:
Root water uptake reduction at soil water pressure head
h1 (0 or 1) [-]
* **vegetation_feddes_h1** map:
Soil water pressure head h1 at which root water
uptake is reduced (Feddes) [cm]
* **vegetation_feddes_h2** map:
Soil water pressure head h2 at which root water
uptake is reduced (Feddes) [cm]
* **vegetation_feddes_h3_high** map:
Soil water pressure head h3 (high) at which root water uptake is
reduced (Feddes) [cm]
* **vegetation_feddes_h3_low** map:
Soil water pressure head h3 (low) at which root water uptake is
reduced (Feddes) [cm]
* **vegetation_feddes_h4** map:
Soil water pressure head h4 at which root water
uptake is reduced (Feddes) [cm]
Parameters
----------
lulc_fn : str, gpd.GeoDataFrame
GeoDataFrame or name in data catalog / path to (vector) landuse map.
* Required columns: 'landuse' [-]
lulc_mapping_fn : str, Path, pd.DataFrame
Path to a mapping csv file from landuse in source name to parameter values
in lulc_vars. If lulc_fn is one of {"globcover", "vito", "corine",
"esa_worldcover", "glmnco"}, a default mapping is used and this argument
becomes optional.
lulc_vars : Dict
Dictionnary of landuse parameters to prepare. The names are the
the columns of the mapping file and the values are the corresponding
Wflow.jl variables.
lulc_res : float, int, optional
Resolution of the intermediate rasterized landuse map. The unit (meter or
degree) depends on the CRS of lulc_fn (projected or not). By default None,
which uses the model resolution.
all_touched : bool, optional
If True, all pixels touched by the vector will be burned in the raster,
by default False.
buffer : int, optional
Buffer around the bounding box of the vector data to ensure that all
landuse classes are included in the rasterized map, by default 1000.
save_raster_lulc : bool, optional
If True, the (high) resolution rasterized landuse map will be saved to
maps/landuse_raster.tif, by default False.
output_names_suffix : str, optional
Suffix to be added to the output names to avoid having to rename all the
columns of the mapping tables. For example if the suffix is "vito", all
variables in lulc_vars will be renamed to "landuse_vito", "Kext_vito", etc.
See Also
--------
workflows.landuse_from_vector
"""
output_names = {
v: f"{k}_{output_names_suffix}" if output_names_suffix else k
for k, v in lulc_vars.items()
}
self._update_naming(output_names)
# As landuse is not a wflow variable, we update the name manually
rmdict = {"landuse": "meta_landuse"} if "landuse" in lulc_vars else {}
if output_names_suffix is not None:
self._MAPS["landuse"] = f"meta_landuse_{output_names_suffix}"
# rename dict for the staticmaps (hydromt names are not used in that case)
rmdict = {k: f"{k}_{output_names_suffix}" for k in lulc_vars.keys()}
if "landuse" in lulc_vars:
rmdict["landuse"] = f"meta_landuse_{output_names_suffix}"
logger.info("Preparing LULC parameter maps.")
# Read mapping table
if lulc_mapping_fn is None:
lulc_mapping_fn = f"{lulc_fn}_mapping_default"
df_map = self.data_catalog.get_dataframe(
lulc_mapping_fn,
driver_kwargs={"index_col": 0}, # only used if fn_map is a file path
)
# read landuse map
gdf = self.data_catalog.get_geodataframe(
lulc_fn,
bbox=self.staticmaps.data.raster.bounds,
buffer=buffer,
variables=["landuse"],
)
if save_raster_lulc:
lulc_out = join(self.root.path, "maps", "landuse_raster.tif")
else:
lulc_out = None
# process landuse
ds_lulc_maps = workflows.landuse_from_vector(
gdf=gdf,
ds_like=self.staticmaps.data,
df=df_map,
params=list(lulc_vars.keys()),
lulc_res=lulc_res,
all_touched=all_touched,
buffer=buffer,
lulc_out=lulc_out,
)
self.set_grid(ds_lulc_maps.rename(rmdict))
# update config variable names
self._update_config_variable_name(ds_lulc_maps.rename(rmdict).data_vars)
[docs]
@hydromt_step
def setup_laimaps(
self,
lai_fn: str | xr.DataArray,
lulc_fn: str | xr.DataArray | None = None,
lulc_sampling_method: str = "any",
lulc_zero_classes: List[int] = [],
buffer: int = 2,
output_name: str = "vegetation_leaf_area_index",
):
"""
Set leaf area index (LAI) climatology maps per month [1,2,3,...,12].
The values are resampled to the model resolution using the average value.
Currently only directly cyclic LAI data is supported.
If `lulc_fn` is provided, mapping tables from landuse classes to LAI values
will be derived from the LULC data. These tables can then be re-used later if
you would like to add new LAI maps derived from this mapping table and new
landuse scenarios. We advise to use a larger `buffer` to ensure that LAI values
can be assigned for all landuse classes and based on a large enough sample of
the LULC data.
Adds model layers:
* **vegetation_leaf_area_index** map: Leaf Area Index climatology [-]
Resampled from source data using average. Assuming that missing values
correspond to bare soil, these are set to zero before resampling.
Parameters
----------
lai_fn : str, xarray.DataArray
Name of RasterDataset source for LAI parameters, see data/data_sources.yml.
* Required variables: 'LAI' [-]
* Required dimensions: 'time' = [1,2,3,...,12] (months)
lulc_fn : str, xarray.DataArray, optional
Name of RasterDataset source for landuse-landcover data.
If provided, the LAI values are mapped to landuse classes and will be saved
to a csv file.
lulc_sampling_method : str, optional
Resampling method for the LULC data to the LAI resolution. Two methods are
supported:
* 'any' (default): if any cell of the desired landuse class is present in
the resampling window (even just one), it will be used to derive LAI
values. This method is less exact but will provide LAI values for all
landuse classes for the high resolution landuse map.
* 'mode': the most frequent value in the resampling window is
used. This method is less precise as for cells with a lot of different
landuse classes, the most frequent value might still be only a small
fraction of the cell. More landuse classes should however be covered and
it can always be used with the landuse map of the wflow model instead of
the original high resolution one.
* 'q3': only cells with the most frequent value (mode) and that cover 75%
(q3) of the resampling window will be used. This method is more exact but
for small basins, you may have less or no samples to derive LAI values
for some classes.
lulc_zero_classes : list, optional
List of landuse classes that should have zero for leaf area index values
for example waterbodies, open ocean etc. For very high resolution landuse
maps, urban surfaces and bare areas can be included here as well.
By default empty.
buffer : int, optional
Buffer in pixels around the region to read the data, by default 2.
output_name : str
Name of the output vegetation__leaf-area_index map.
By default "vegetation_leaf_area_index".
"""
# retrieve data for region
logger.info("Preparing LAI maps.")
wflow_var = self._WFLOW_NAMES[self._MAPS["LAI"]]
self._update_naming({wflow_var: output_name})
da = self.data_catalog.get_rasterdataset(
lai_fn, geom=self.region, buffer=buffer
)
if lulc_fn is not None:
logger.info("Preparing LULC-LAI mapping table.")
da_lulc = self.data_catalog.get_rasterdataset(
lulc_fn, geom=self.region, buffer=buffer
)
# derive mapping
df_lai_mapping = workflows.create_lulc_lai_mapping_table(
da_lulc=da_lulc,
da_lai=da.copy(),
sampling_method=lulc_sampling_method,
lulc_zero_classes=lulc_zero_classes,
)
# Save to csv
if isinstance(lulc_fn, str) and not isfile(lulc_fn):
df_fn = f"lai_per_lulc_{lulc_fn}.csv"
else:
df_fn = "lai_per_lulc.csv"
df_lai_mapping.to_csv(join(self.root.path, df_fn))
# Resample LAI data to wflow model resolution
da_lai = workflows.lai(
da=da,
ds_like=self.staticmaps.data,
)
# Rename the first dimension to time
rmdict = {da_lai.dims[0]: "time"}
self.set_grid(da_lai.rename(rmdict), name=self._MAPS["LAI"])
self._update_config_variable_name(self._MAPS["LAI"], data_type="cyclic")
[docs]
@hydromt_step
def setup_laimaps_from_lulc_mapping(
self,
lulc_fn: str | xr.DataArray,
lai_mapping_fn: str | pd.DataFrame,
output_name: str = "vegetation_leaf_area_index",
):
"""
Derive cyclic LAI maps from a LULC data source and a LULC-LAI mapping table.
Adds model layers:
* **vegetation_leaf_area_index** map: Leaf Area Index climatology [-]
Resampled from source data using average. Assuming that missing values
correspond to bare soil, these are set to zero before resampling.
Parameters
----------
lulc_fn : str, xarray.DataArray
Name of RasterDataset source for landuse-landcover data.
lai_mapping_fn : str, pd.DataFrame
Path to a mapping csv file from landuse in source name to
LAI values. The csv file should contain rows with landuse classes
and LAI values for each month. The columns should be named as the
months (1,2,3,...,12).
This table can be created using the :py:meth:`setup_laimaps` method.
output_name : str
Name of the output vegetation__leaf-area_index map.
By default "vegetation_leaf_area_index".
"""
logger.info("Preparing LAI maps from LULC data using LULC-LAI mapping table.")
# update self._MAPS and self._WFLOW_NAMES with user defined output names
wflow_var = self._WFLOW_NAMES[self._MAPS["LAI"]]
self._update_naming({wflow_var: output_name})
# read landuse map to DataArray
da = self.data_catalog.get_rasterdataset(
lulc_fn, geom=self.region, buffer=2, variables=["landuse"]
)
df_lai_mapping = self.data_catalog.get_dataframe(
lai_mapping_fn,
driver_kwargs={"index_col": 0}, # only used if fn_map is a file path
)
# process landuse with LULC-LAI mapping table
da_lai = workflows.lai_from_lulc_mapping(
da=da,
ds_like=self.staticmaps.data,
df=df_lai_mapping,
)
# Add to grid
self.set_grid(da_lai, name=self._MAPS["LAI"])
# Add to config
self._update_config_variable_name(self._MAPS["LAI"], data_type="cyclic")
[docs]
@hydromt_step
def setup_config_output_timeseries(
self,
mapname: str,
toml_output: str | None = "csv",
header: List[str] | None = ["river_q"],
param: List[str] | None = ["river_water__volume_flow_rate"],
reducer: List[str] | None = None,
):
"""Set the default gauge map based on basin outlets.
Adds model layers:
* **csv.column** config: csv timeseries to save based on mapname locations
* **netcdf.variable** config: netcdf timeseries to save based on mapname \
locations
Parameters
----------
mapname : str
Name of the gauge map (in staticmaps.nc) to use for scalar output.
toml_output : str, optional
One of ['csv', 'netcdf_scalar', None] to update [output.csv] or
[output.netcdf_scalar] section of wflow toml file or do nothing. By
default, 'csv'.
header : list, optional
Save specific model parameters in csv section. This option defines
the header of the csv file.
By default saves river_q (for river_water__volume_flow_rate).
param: list, optional
Save specific model parameters in csv section. This option defines
the wflow variable corresponding to the
names in gauge_toml_header. By default saves river_water__volume_flow_rate
(for river_q).
reducer: list, optional
If map is an area rather than a point location, provides the reducer
for the parameters to save. By default None.
"""
# # Add new outputcsv section in the config
if toml_output == "csv" or toml_output == "netcdf_scalar":
logger.info(f"Adding {param} to {toml_output} section of toml.")
# Add map to the input section of config
self.set_config(f"input.{mapname}", mapname)
# Settings and add csv or netcdf sections if not already in config
# csv
if toml_output == "csv":
header_name = "header"
var_name = "column"
current_config = self.get_config("output.csv")
if current_config is None or len(current_config) == 0:
self.set_config("output.csv.path", "output.csv")
# netcdf
if toml_output == "netcdf_scalar":
header_name = "name"
var_name = "variable"
current_config = self.get_config("output.netcdf_scalar")
if current_config is None or len(current_config) == 0:
self.set_config("output.netcdf_scalar.path", "output_scalar.nc")
# initialise column / variable section
if self.get_config(f"output.{toml_output}.{var_name}") is None:
self.set_config(f"output.{toml_output}.{var_name}", [])
# Add new output column/variable to config
for o in range(len(param)):
gauge_toml_dict = {
header_name: header[o],
"map": mapname,
"parameter": param[o],
}
if reducer is not None:
gauge_toml_dict["reducer"] = reducer[o]
# If the gauge column/variable already exists skip writing twice
variables = self.get_config(f"output.{toml_output}.{var_name}")
if gauge_toml_dict not in variables:
variables.append(gauge_toml_dict)
self.set_config(f"output.{toml_output}.{var_name}", variables)
else:
logger.info(
f"toml_output set to {toml_output}, \
skipping adding gauge specific outputs to the toml."
)
[docs]
@hydromt_step
def setup_outlets(
self,
river_only: bool = True,
toml_output: str = "csv",
gauge_toml_header: List[str] = ["river_q"],
gauge_toml_param: List[str] = ["river_water__volume_flow_rate"],
):
"""Set the default gauge map based on basin outlets.
If the subcatchment map is available, the catchment outlets IDs will be matching
the subcatchment IDs. If not, then IDs from 1 to number of outlets are used.
Can also add csv/netcdf_scalar output settings in the TOML.
Adds model layers:
* **outlets** map: IDs map from catchment outlets [-]
* **outlets** geom: polygon of catchment outlets
Parameters
----------
river_only : bool, optional
Only derive outlet locations if they are located on a river instead of
locations for all catchments, by default True.
toml_output : str, optional
One of ['csv', 'netcdf_scalar', None] to update [output.csv] or
[output.netcdf_scalar] section of wflow toml file or do nothing. By
default, 'csv'.
gauge_toml_header : list, optional
Save specific model parameters in csv section. This option defines
the header of the csv file.
By default saves river_q (for river_water__volume_flow_rate).
gauge_toml_param: list, optional
Save specific model parameters in csv section. This option defines
the wflow variable corresponding to the names in gauge_toml_header.
By default saves river_water__volume_flow_rate (for river_q).
"""
# read existing geoms; important to get the right basin when updating
# fix in set_geoms / set_geoms method
self.geoms
logger.info("Gauges locations set based on river outlets.")
idxs_out = self.flwdir.idxs_pit
# Only keep river outlets for gauges
if river_only:
idxs_out = idxs_out[
(self.staticmaps.data[self._MAPS["rivmsk"]] > 0).values.flat[idxs_out]
]
# Use the subcatchment ids
if self._MAPS["basins"] in self.staticmaps.data:
ids = self.staticmaps.data[self._MAPS["basins"]].values.flat[idxs_out]
else:
ids = None
da_out, idxs_out, ids_out = flw.gauge_map(
self.staticmaps.data,
idxs=idxs_out,
ids=ids,
flwdir=self.flwdir,
)
self.set_grid(da_out, name="outlets")
points = gpd.points_from_xy(*self.staticmaps.data.raster.idx_to_xy(idxs_out))
gdf = gpd.GeoDataFrame(
index=ids_out.astype(np.int32), geometry=points, crs=self.crs
)
gdf["fid"] = ids_out.astype(np.int32)
self.set_geoms(gdf, name="outlets")
logger.info("Gauges map based on catchment river outlets added.")
self.setup_config_output_timeseries(
mapname="outlets",
toml_output=toml_output,
header=gauge_toml_header,
param=gauge_toml_param,
)
[docs]
@hydromt_step
def setup_gauges(
self,
gauges_fn: str | Path | gpd.GeoDataFrame,
index_col: str | None = None,
snap_to_river: bool = True,
mask: np.ndarray | None = None,
snap_uparea: bool = False,
max_dist: float = 10e3,
wdw: int = 3,
rel_error: float = 0.05,
abs_error: float = 50.0,
fillna: bool = False,
derive_subcatch: bool = False,
basename: str | None = None,
toml_output: str = "csv",
gauge_toml_header: List[str] = ["river_q", "precip"],
gauge_toml_param: List[str] = [
"river_water__volume_flow_rate",
"atmosphere_water__precipitation_volume_flux",
],
**kwargs,
):
"""Set a gauge map based on ``gauges_fn`` data.
Supported gauge datasets include data catlog entries, direct GeoDataFrame
or "<path_to_source>" for user supplied csv or geometry files
with gauge locations. If a csv file is provided, a "x" or "lon" and
"y" or "lat" column is required and the first column will be used as
IDs in the map.
There are four available methods to prepare the gauge map:
* no snapping: ``mask=None``, ``snap_to_river=False``, ``snap_uparea=False``.
The gauge locations are used as is.
* snapping to mask: the gauge locations are snapped to a boolean mask map based
on the closest dowsntream cell within the mask:
either provide ``mask`` or set ``snap_to_river=True``
to snap to the river cells (default).
``max_dist`` can be used to set the maximum distance to snap to the mask.
* snapping based on upstream area matching: : ``snap_uparea=True``.
The gauge locations are snapped to the closest matching upstream area value.
Requires gauges_fn to have an ``uparea`` [km2] column. The closest value will
be looked for in a cell window of size ``wdw`` and the absolute and relative
differences between the gauge and the closest value should be smaller than
``abs_error`` and ``rel_error``.
* snapping based on upstream area matching and mask: ``snap_uparea=True``,
``mask`` or ``snap_to_river=True``. The gauge locations are snapped to the
closest matching upstream area value within the mask.
If ``derive_subcatch`` is set to True, an additional subcatch map is derived
from the gauge locations.
Finally the output locations can be added to wflow TOML file sections
[output.csv] or [output.netcdf_scalar] using the ``toml_output`` option. The
``gauge_toml_header`` and ``gauge_toml_param`` options can be used to define
the header and corresponding wflow variable names in the TOML file.
Adds model layers:
* **gauges_source** map: gauge IDs map from source [-] (if gauges_fn)
* **subcatchment_source** map: subcatchment based on gauge locations [-] \
(if derive_subcatch)
* **gauges_source** geom: polygon of gauges from source
* **subcatchment_source** geom: polygon of subcatchment based on \
gauge locations [-] (if derive_subcatch)
Parameters
----------
gauges_fn : str, Path, geopandas.GeoDataFrame
Catalog source name, path to gauges file geometry file or
geopandas.GeoDataFrame.
* Required variables if snap_uparea is True: 'uparea' [km2]
index_col : str, optional
Column in gauges_fn to use for ID values, by default None
(use the default index column)
mask : np.boolean, optional
If provided snaps to the mask, else snaps to the river (default).
snap_to_river : bool, optional
Snap point locations to the closest downstream river cell, by default True
snap_uparea: bool, optional
Snap gauges based on upstream area. Gauges_fn should have "uparea"
in its attributes.
max_dist : float, optional
Maximum distance [m] between original and snapped point location.
A warning is logged if exceeded. By default 10 000m.
wdw: int, optional
Window size in number of cells around the gauge locations
to snap uparea to, only used if ``snap_uparea`` is True. By default 3.
rel_error: float, optional
Maximum relative error (default 0.05)
between the gauge location upstream area and the upstream area of
the best fit grid cell, only used if snap_uparea is True.
abs_error: float, optional
Maximum absolute error (default 50.0)
between the gauge location upstream area and the upstream area of
the best fit grid cell, only used if snap_uparea is True.
fillna: bool, optional
Fill missing values in the gauges uparea column with the values from wflow
upstream area (ie no snapping). By default False and the gauges with NaN
values are skipped.
derive_subcatch : bool, optional
Derive subcatch map for gauges, by default False
basename : str, optional
Map name in grid (gauges_basename)
if None use the gauges_fn basename.
toml_output : str, optional
One of ['csv', 'netcdf_scalar', None] to update [output.csv] or
[output.netcdf_scalar] section of wflow toml file or do nothing. By
default, 'csv'.
gauge_toml_header : list, optional
Save specific model parameters in csv section.
This option defines the header of the csv file.
By default saves river_q (for river_water__volume_flow_rate) and
precip (for "atmosphere_water__precipitation_volume_flux").
gauge_toml_param: list, optional
Save specific model parameters in csv section. This option defines
the wflow variable corresponding to the names in gauge_toml_header.
By default saves river_water__volume_flow_rate (for river_q) and
"atmosphere_water__precipitation_volume_flux" (for precip).
kwargs : dict, optional
Additional keyword arguments to pass to the get_data method ie
get_geodataframe or get_geodataset depending on the data_type of gauges_fn.
"""
# Read data
if isinstance(gauges_fn, gpd.GeoDataFrame):
gdf_gauges = gauges_fn
if not np.all(np.isin(gdf_gauges.geometry.type, "Point")):
raise ValueError(f"{gauges_fn} contains other geometries than Point")
elif isfile(gauges_fn):
# hydromt#1243
# try to get epsg number directly, important when writing back data_catalog
if hasattr(self.crs, "to_epsg"):
code = self.crs.to_epsg()
else:
code = self.crs
kwargs.update(crs=code)
gdf_gauges = self.data_catalog.get_geodataframe(
gauges_fn,
geom=self.basins,
# assert_gtype="Point", hydromt#1243
handle_nodata=NoDataStrategy.IGNORE,
**kwargs,
)
elif self.data_catalog.contains_source(gauges_fn):
if self.data_catalog.get_source(gauges_fn).data_type == "GeoDataFrame":
gdf_gauges = self.data_catalog.get_geodataframe(
gauges_fn,
geom=self.basins,
# assert_gtype="Point", hydromt#1243
handle_nodata=NoDataStrategy.IGNORE,
**kwargs,
)
elif self.data_catalog.get_source(gauges_fn).data_type == "GeoDataset":
da = self.data_catalog.get_geodataset(
gauges_fn,
geom=self.basins,
# assert_gtype="Point", hydromt#1243
handle_nodata=NoDataStrategy.IGNORE,
**kwargs,
)
gdf_gauges = da.vector.to_gdf()
# Check for point geometry
if not np.all(np.isin(gdf_gauges.geometry.type, "Point")):
raise ValueError(
f"{gauges_fn} contains other geometries than Point"
)
else:
raise ValueError(
f"{gauges_fn} data source not found or incorrect data_type "
f"({self.data_catalog.get_source(gauges_fn).data_type} instead of "
"GeoDataFrame or GeoDataset)."
)
# Create basename
if basename is None:
basename = os.path.basename(gauges_fn).split(".")[0].replace("_", "-")
# Check if there is data found
if gdf_gauges is None:
logger.info("Skipping method, as no data has been found")
return
# Create the gauges map
logger.info(
f"{gdf_gauges.index.size} {basename} gauge locations found within domain"
)
# read existing geoms; important to get the right basin when updating
self.geoms
# Reproject to model crs
gdf_gauges = gdf_gauges.to_crs(self.crs).copy()
# Get coords, index and ID
xs, ys = np.vectorize(lambda p: (p.xy[0][0], p.xy[1][0]))(
gdf_gauges["geometry"]
)
idxs = self.staticmaps.data.raster.xy_to_idx(xs, ys)
if index_col is not None and index_col in gdf_gauges.columns:
gdf_gauges = gdf_gauges.set_index(index_col)
if np.any(gdf_gauges.index == 0):
logger.warning("Gauge ID 0 is not allowed, setting to 1")
gdf_gauges.index = gdf_gauges.index.values + 1
ids = gdf_gauges.index.values
# if snap_to_river use river map as the mask
if snap_to_river and mask is None:
mask = self._MAPS["rivmsk"]
if mask is not None:
mask = self.staticmaps.data[mask].values
if snap_uparea and "uparea" in gdf_gauges.columns:
# Derive gauge map based on upstream area snapping
da, idxs, ids = workflows.gauge_map_uparea(
self.staticmaps.data,
gdf_gauges,
uparea_name=self._MAPS["uparea"],
mask=mask,
wdw=wdw,
rel_error=rel_error,
abs_error=abs_error,
fillna=fillna,
)
else:
# Derive gauge map
da, idxs, ids = flw.gauge_map(
self.staticmaps.data,
idxs=idxs,
ids=ids,
stream=mask,
flwdir=self.flwdir,
max_dist=max_dist,
)
# Filter gauges that could not be snapped to rivers
if snap_to_river:
ids_old = ids.copy()
da = da.where(
self.staticmaps.data[self._MAPS["rivmsk"]] != 0, da.raster.nodata
)
ids_new = np.unique(da.values[da.values > 0])
idxs = idxs[np.isin(ids_old, ids_new)]
ids = da.values.flat[idxs]
# Check if there are gauges left
if ids.size == 0:
logger.warning(
"No gauges found within domain after snapping, skipping method."
)
return
# Add to grid
mapname = f"gauges_{basename}"
self.set_grid(da, name=mapname)
# geoms
points = gpd.points_from_xy(*self.staticmaps.data.raster.idx_to_xy(idxs))
# if csv contains additional columns, these are also written in the geoms
gdf_snapped = gpd.GeoDataFrame(
index=ids.astype(np.int32), geometry=points, crs=self.crs
)
# Set the index name of gdf snapped based on original gdf
if gdf_gauges.index.name is not None:
gdf_snapped.index.name = gdf_gauges.index.name
else:
gdf_snapped.index.name = "fid"
gdf_gauges.index.name = "fid"
# Add gdf attributes to gdf_snapped (filter on snapped index before merging)
df_attrs = pd.DataFrame(gdf_gauges.drop(columns="geometry"))
df_attrs = df_attrs[np.isin(df_attrs.index, gdf_snapped.index)]
gdf_snapped = gdf_snapped.merge(df_attrs, how="inner", on=gdf_gauges.index.name)
# Add gdf_snapped to geoms
self.set_geoms(gdf_snapped, name=mapname)
# Add output timeseries for gauges in the toml
self.setup_config_output_timeseries(
mapname=mapname,
toml_output=toml_output,
header=gauge_toml_header,
param=gauge_toml_param,
)
# add subcatch
if derive_subcatch:
da_basins = flw.basin_map(
self.staticmaps.data, self.flwdir, idxs=idxs, ids=ids
)[0]
mapname = self._MAPS["basins"] + "_" + basename
self.set_grid(da_basins, name=mapname)
gdf_basins = self.staticmaps.data[mapname].raster.vectorize()
self.set_geoms(gdf_basins, name=mapname)
[docs]
@hydromt_step
def setup_areamap(
self,
area_fn: str | gpd.GeoDataFrame,
col2raster: str,
nodata: int | float = -1,
output_names: Dict = {},
):
"""Set area map from vector data to save wflow outputs for specific area.
Adds model layer:
* **col2raster** map: output area data map
Parameters
----------
area_fn : str, geopandas.GeoDataFrame
Name of GeoDataFrame data corresponding to wflow output area.
col2raster : str
Name of the column from `area_fn` to rasterize.
nodata : int/float, optional
Nodata value to use when rasterizing. Should match the dtype of `col2raster`
. By default -1.
output_names : dict, optional
Dictionary with output names that will be used in the model netcdf input
files. Users should provide the Wflow.jl variable name followed by the name
in the netcdf file. If another name is provided than col2raster, this name
will be used.
"""
logger.info(f"Preparing '{col2raster}' map from '{area_fn}'.")
self._update_naming(output_names)
gdf_org = self.data_catalog.get_geodataframe(
area_fn, geom=self.basins, dst_crs=self.crs
)
if gdf_org.empty:
logger.warning(
f"No shapes of {area_fn} found within region, skipping areamap."
)
return
else:
da_area = self.staticmaps.data.raster.rasterize(
gdf=gdf_org,
col_name=col2raster,
nodata=nodata,
all_touched=True,
)
if any(output_names):
if len(output_names) > 1:
raise ValueError(
"Only one output name is allowed for areamap, \
please provide a dictionary with one key."
)
col2raster_name = list(output_names.values())[0]
self._update_config_variable_name(col2raster_name)
else:
col2raster_name = col2raster
self.set_grid(da_area.rename(col2raster_name))
[docs]
@hydromt_step
def setup_reservoirs_no_control(
self,
reservoirs_fn: str | Path | gpd.GeoDataFrame,
rating_curve_fns: List[str | Path | pd.DataFrame] | None = None,
overwrite_existing: bool = False,
duplicate_id: str = "error",
min_area: float = 10.0,
output_names: Dict = {
"reservoir_area__count": "reservoir_area_id",
"reservoir_location__count": "reservoir_outlet_id",
"reservoir_surface__area": "reservoir_area",
"reservoir_water_surface__initial_elevation": "reservoir_initial_depth",
"reservoir_water_flow_threshold-level__elevation": "reservoir_outflow_threshold", # noqa: E501
"reservoir_water__rating_curve_coefficient": "reservoir_b",
"reservoir_water__rating_curve_exponent": "reservoir_e",
"reservoir_water__rating_curve_type_count": "reservoir_rating_curve",
"reservoir_water__storage_curve_type_count": "reservoir_storage_curve",
"reservoir~lower_location__count": "reservoir_lower_id",
},
geom_name: str = "meta_reservoirs_no_control",
**kwargs,
):
"""Generate maps of reservoir areas, outlets and parameters.
This function adds (uncontrolled) reservoirs such as natural lakes or weirs to
the model. It prepares rating and storage curves parameters for the reservoirs
modelled with the following rating curve types (see
`Wflow reservoir concepts <https://deltares.github.io/Wflow.jl/stable/model_docs/routing/reservoirs.html>`__ ):
* 1 for Q = f(H) from reservoir data and interpolation
* 2 for Q = b(H - H0)^e (general power law)
* 3 for Q = b(H - H0)^2 (Modified Puls Approach)
Created reservoirs can be added to already existing ones in the model
`overwrite_existing=False` (default) or overwrite them
`overwrite_existing=True`.
Reservoir data is generated from features with ``min_area`` [km2] (default 1
km2) from a database with reservoir geometry, IDs and metadata. Parameters can
be directly provided in the GeoDataFrame or derived using common properties such
as average depth, area and discharge.
If rating curve data is available for storage and discharge they can be prepared
via ``rating_curve_fns`` (see below for syntax and requirements).
Else the parameters 'reservoir_b' and 'reservoir_e' will be used for discharge,
and a rectangular profile will be used to compute storage. This corresponds to
the following storage curve types in Wflow:
* 1 for S = A * H
* 2 for S = f(H) from reservoir data and interpolation
Adds model layers:
* **reservoir_area_id** map: reservoir IDs [-]
* **reservoir_outlet_id** map: reservoir IDs at outlet locations [-]
* **reservoir_area** map: reservoir area [m2]
* **reservoir_initial_depth** map: reservoir average water level [m]
* **reservoir_outflow_threshold** map: reservoir outflow threshold water level [m]
* **meta_reservoir_mean_outflow** map: reservoir average discharge [m3/s]
* **reservoir_b** map: reservoir rating curve coefficient [-]
* **reservoir_e** map: reservoir rating curve exponent [-]
* **reservoir_rating_curve** map: option to compute rating curve [-]
* **reservoir_storage_curve** map: option to compute storage curve [-]
* **reservoir_lower_id** map: optional, lower linked reservoir locations [-]
* **meta_reservoirs_no_control** geom: polygon with reservoirs (e.g. lakes or
weirs) and wflow parameters.
* **reservoirs** geom: polygon with all reservoirs as in the model
Parameters
----------
reservoirs_fn : str, Path, gpd.GeoDataFrame
Name of GeoDataFrame source for uncontrolled reservoir parameters.
* Required variables for direct use: \
'waterbody_id' [-], 'Area_avg' [m2], 'Depth_avg' [m], 'Dis_avg' [m3/s], 'reservoir_b' [-], \
'reservoir_e' [-], 'reservoir_rating_curve' [-], 'reservoir_storage_curve' [-], \
'reservoir_outflow_threshold' [m], 'reservoir_lower_id' [-]
* Required variables for parameter estimation: \
'waterbody_id' [-], 'Area_avg' [m2], 'Vol_avg' [m3], 'Depth_avg' [m], 'Dis_avg'[m3/s]
rating_curve_fns: str, Path, pandas.DataFrame, List, optional
Data catalog entry/entries, path(s) or pandas.DataFrame containing rating
curve values for reservoirs. If None then will be derived from properties of
`reservoirs_fn`.
Assumes one file per reservoir (with all variables) and that the reservoir
ID is either in the filename or data catalog entry name (eg using
placeholder). The ID should be placed at the end separated by an underscore
(eg 'rating_curve_12.csv' or 'rating_curve_12')
* Required variables for storage curve: 'elevtn' [m+REF], 'volume' [m3]
* Required variables for rating curve: 'elevtn' [m+REF], 'discharge' [m3/s]
overwrite_existing : bool, optional
If False (default), update existing reservoirs in the model with the new
reservoirs_fn data.
duplicate_id: str, optional {"error", "skip"}
Action to take if duplicate reservoir IDs are found when merging with
existing reservoirs. Options are "error" to raise an error (default); "skip"
to skip adding new reservoirs.
min_area : float, optional
Minimum reservoir area threshold [km2], by default 10.0 km2.
output_names : dict, optional
Dictionary with output names that will be used in the model netcdf input
files. Users should provide the Wflow.jl variable name followed by the name
in the netcdf file.
geom_name : str, optional
Name of the reservoir geometry in the staticgeoms folder, by default
'meta_reservoirs_no_control' for meta_reservoirs_no_control.geojson.
kwargs: optional
Keyword arguments passed to the method
hydromt.DataCatalog.get_rasterdataset()
""" # noqa: E501
# retrieve data for basin
logger.info("Preparing reservoir maps.")
kwargs.setdefault("predicate", "contains")
gdf_org = self.data_catalog.get_geodataframe(
reservoirs_fn,
geom=self.basins_highres,
handle_nodata=NoDataStrategy.IGNORE,
**kwargs,
)
if gdf_org is None:
logger.info("Skipping method, as no data has been found")
return
# Derive reservoir area and outlet maps
ds_reservoirs, gdf_org = workflows.reservoir_id_maps(
gdf=gdf_org,
ds_like=self.staticmaps.data,
min_area=min_area,
uparea_name=self._MAPS["uparea"],
)
if ds_reservoirs is None:
# No reservoirs of sufficient size found
return
self._update_naming(output_names)
# If rating_curve_fn prepare rating curve dict
rating_dict = dict()
if rating_curve_fns is not None:
rating_curve_fns = np.atleast_1d(rating_curve_fns)
# Find ids in rating_curve_fns
fns_ids = []
for fn in rating_curve_fns:
try:
fns_ids.append(int(fn.split("_")[-1].split(".")[0]))
except Exception:
logger.warning(
"Could not parse integer reservoir index from "
f"rating curve fn {fn}. Skipping."
)
# assume reservoir index will be in the path
# Assume one rating curve per reservoir index
for _id in gdf_org["waterbody_id"].values:
_id = int(_id)
# Find if _id is is one of the paths in rating_curve_fns
if _id in fns_ids:
# Update path based on current waterbody_id
i = fns_ids.index(_id)
rating_fn = rating_curve_fns[i]
# Read data
if isfile(rating_fn) or rating_fn in self.data_catalog.sources:
logger.info(
f"Preparing reservoir rating curve data from {rating_fn}"
)
df_rate = self.data_catalog.get_dataframe(rating_fn)
# Add to dict
rating_dict[_id] = df_rate
else:
logger.warning(
f"Rating curve file not found for reservoir with id {_id}. "
"Using default storage/outflow function parameters."
)
else:
logger.info(
"No rating curve data provided. "
"Using default storage/outflow function parameters."
)
# add reservoir parameters
ds_reservoirs, gdf_reservoirs, rating_curves = (
workflows.reservoirs.reservoir_parameters(
ds_reservoirs,
gdf_org,
rating_dict,
)
)
# merge with existing reservoirs
if (
not overwrite_existing
and self._MAPS["reservoir_area"] in self.staticmaps.data
):
inv_rename = {
v: k
for k, v in self._MAPS.items()
if v in self.staticmaps.data.data_vars
}
ds_reservoirs = workflows.reservoirs.merge_reservoirs(
ds_reservoirs,
self.staticmaps.data.rename(inv_rename),
duplicate_id=duplicate_id,
)
# Check if ds_res is None ie duplicate IDs
if ds_reservoirs is None:
logger.warning(
"Duplicate reservoir IDs found. Skipping adding new reservoirs."
)
return
else:
# remove all reservoir layers from the grid as some control parameters
# like demand will not be in ds_reservoirs and won't be overwritten
reservoir_maps = [
self._MAPS.get(k, k) for k in workflows.reservoirs.RESERVOIR_LAYERS
]
self.staticmaps.drop_vars(reservoir_maps, errors="ignore")
# add to grid
rmdict = {k: self._MAPS.get(k, k) for k in ds_reservoirs.data_vars}
self.set_grid(ds_reservoirs.rename(rmdict))
# write reservoirs with attr tables to static geoms.
self.set_geoms(gdf_reservoirs, name=geom_name)
# Prepare a combined geoms of all reservoirs
gdf_res_all = workflows.reservoirs.create_reservoirs_geoms(
ds_reservoirs.rename(rmdict),
)
self.set_geoms(gdf_res_all, name="reservoirs")
# add the tables
for k, v in rating_curves.items():
self.set_tables(v, name=k)
# Reservoir settings in the toml to update
self.set_config("model.reservoir__flag", True)
self.set_config(
"state.variables.reservoir_water_surface__instantaneous_elevation",
"reservoir_instantaneous_water_level",
)
for dvar in ds_reservoirs.data_vars:
if dvar in [
"reservoir_area_id",
"reservoir_outlet_id",
"reservoir_lower_id",
]:
self._update_config_variable_name(self._MAPS[dvar], data_type=None)
elif dvar in self._WFLOW_NAMES:
self._update_config_variable_name(self._MAPS[dvar])
[docs]
@hydromt_step
def setup_reservoirs_simple_control(
self,
reservoirs_fn: str | gpd.GeoDataFrame,
timeseries_fn: str | None = None,
overwrite_existing: bool = False,
duplicate_id: str = "error",
min_area: float = 1.0,
output_names: Dict = {
"reservoir_area__count": "reservoir_area_id",
"reservoir_location__count": "reservoir_outlet_id",
"reservoir_surface__area": "reservoir_area",
"reservoir_water_surface__initial_elevation": "reservoir_initial_depth",
"reservoir_water__rating_curve_type_count": "reservoir_rating_curve",
"reservoir_water__storage_curve_type_count": "reservoir_storage_curve",
"reservoir_water__max_volume": "reservoir_max_volume",
"reservoir_water~min-target__volume_fraction": "reservoir_target_min_fraction", # noqa: E501
"reservoir_water~full-target__volume_fraction": "reservoir_target_full_fraction", # noqa: E501
"reservoir_water_demand~required~downstream__volume_flow_rate": "reservoir_demand", # noqa: E501
"reservoir_water_release-below-spillway__max_volume_flow_rate": "reservoir_max_release", # noqa: E501
},
geom_name: str = "meta_reservoirs_simple_control",
**kwargs,
):
"""Generate maps of controlled reservoir areas, outlets and parameters.
Also generates parameters with average reservoir area, demand,
min and max target storage capacities and discharge capacity values.
This function adds reservoirs with simple control operations to the model. It
prepares rating and storage curves parameters for the reservoirs modelled with
the following rating curve types (see
`Wflow reservoir concepts <https://deltares.github.io/Wflow.jl/stable/model_docs/routing/reservoirs.html>`__
):
* 4 simple reservoir operational parameters
Created reservoirs can be added to already existing ones in the model
`overwrite_existing=False` (default) or overwrite them
`overwrite_existing=True`.
The data is generated from features with ``min_area`` [km2] (default is 1 km2)
from a database with reservoir geometry, IDs and metadata. Parameters can
be directly provided in the GeoDataFrame or derived using common properties such
as average depth, area and discharge.
Data requirements for direct use (i.e. wflow parameters are data already present
in reservoirs_fn) are reservoir ID 'waterbody_id', area 'reservoir_area' [m2],
initial depth 'reservoir_initial_depth' [m], rating curve type
'reservoir_rating_curve' [-], storage curve type 'reservoir_storage_curve' [-],
maximum volume 'reservoir_max_volume' [m3], the targeted minimum and maximum
fraction of water volume in the reservoir 'reservoir_target_min_fraction' and
'reservoir_target_full_fraction' [-], the average water demand
'reservoir_demand' [m3/s] and the maximum release of the reservoir before
spilling 'reservoir_max_release' [m3/s].
In case the wflow parameters are not directly available they can be computed by
HydroMT based on time series of reservoir surface water area.
These time series can be retrieved from either the hydroengine or the gwwapi,
based on the Hylak_id the reservoir, found in the GrandD database.
The required variables for computation of the parameters with time series data
are reservoir ID 'waterbody_id', reservoir ID in the HydroLAKES database
'Hylak_id', average volume 'Vol_avg' [m3], average depth 'Depth_avg' [m],
average discharge 'Dis_avg' [m3/s] and dam height 'Dam_height' [m].
To compute parameters without using time series data, the required variables in
reservoirs_fn are reservoir ID 'waterbody_id', average area 'Area_avg' [m2],
average volume 'Vol_avg' [m3], average depth 'Depth_avg' [m], average discharge
'Dis_avg' [m3/s] and dam height 'Dam_height' [m]
and minimum / normal / maximum storage capacity of the dam 'Capacity_min',
'Capacity_norm', 'Capacity_max' [m3].
Adds model layers:
* **reservoir_area_id** map: reservoir IDs [-]
* **reservoir_outlet_id** map: reservoir IDs at outlet locations [-]
* **reservoir_area** map: reservoir area [m2]
* **reservoir_initial_depth** map: reservoir initial water level [m]
* **reservoir_rating_curve** map: option to compute rating curve [-]
* **reservoir_storage_curve** map: option to compute storage curve [-]
* **reservoir_max_volume** map: reservoir max volume [m3]
* **reservoir_target_min_fraction** map: reservoir target min frac [m3/m3]
* **reservoir_target_full_fraction** map: reservoir target full frac [m3/m3]
* **reservoir_demand** map: reservoir demand flow [m3/s]
* **reservoir_max_release** map: reservoir max release flow [m3/s]
* **meta_reservoirs_simple_control** geom: polygon with reservoirs and parameters
* **reservoirs** geom: polygon with all reservoirs as in the model
Parameters
----------
reservoirs_fn : str
Name of data source for reservoir parameters, see data/data_sources.yml.
* Required variables for direct use:
'waterbody_id' [-], 'reservoir_area' [m2], 'reservoir_max_volume' [m3],
'reservoir_initial_depth' [m], 'reservoir_rating_curve' [-],
'reservoir_storage_curve' [-], 'reservoir_target_min_fraction' [m3/m3],
'reservoir_target_full_fraction' [m3/m3], 'reservoir_demand' [m3/s],
'reservoir_max_release' [m3/s]
* Required variables for computation with timeseries_fn:
'waterbody_id' [-], 'Hylak_id' [-], 'Vol_avg' [m3], 'Depth_avg' [m],
'Dis_avg' [m3/s], 'Dam_height' [m]
* Required variables for computation without timeseries_fn:
'waterbody_id' [-], 'Area_avg' [m2], 'Vol_avg' [m3], 'Depth_avg' [m],
'Dis_avg' [m3/s], 'Capacity_max' [m3], 'Capacity_norm' [m3],
'Capacity_min' [m3], 'Dam_height' [m]
timeseries_fn : {'gww', 'hydroengine', None}, optional
Download and use time series of reservoir surface water area to calculate
and overwrite the reservoir volume/areas of the data source. Timeseries are
either downloaded from Global Water Watch 'gww' (using gwwapi package) or
JRC 'jrc' (using hydroengine package). By default None.
overwrite_existing : bool, optional
If False (default), update existing reservoirs in the model with the new
reservoirs_fn data.
duplicate_id: str, optional {"error", "skip"}
Action to take if duplicate reservoir IDs are found when merging with
existing reservoirs. Options are "error" to raise an error (default); "skip"
to skip adding new reservoirs.
min_area : float, optional
Minimum reservoir area threshold [km2], by default 1.0 km2.
output_names : dict, optional
Dictionary with output names that will be used in the model netcdf input
files. Users should provide the Wflow.jl variable name followed by the name
in the netcdf file.
geom_name : str, optional
Name of the reservoirs geometry in the staticgeoms folder, by default
"meta_reservoirs_simple_control" for meta_reservoirs_simple_control.geojson.
kwargs: optional
Keyword arguments passed to the method
hydromt.DataCatalog.get_rasterdataset()
""" # noqa: E501
# retrieve data for basin
logger.info("Preparing reservoir with simple control maps.")
kwargs.setdefault("predicate", "contains")
gdf_org = self.data_catalog.get_geodataframe(
reservoirs_fn,
geom=self.basins_highres,
handle_nodata=NoDataStrategy.IGNORE,
**kwargs,
)
# Skip method if no data is returned
if gdf_org is None:
logger.info("Skipping method, as no data has been found")
return
# Derive reservoir area and outlet maps
ds_res, gdf_org = workflows.reservoir_id_maps(
gdf=gdf_org,
ds_like=self.staticmaps.data,
min_area=min_area,
uparea_name=self._MAPS["uparea"],
)
if ds_res is None:
# No reservoir of sufficient size found
return
self._update_naming(output_names)
# add parameters
ds_res, gdf_res = workflows.reservoir_simple_control_parameters(
gdf=gdf_org,
ds_reservoirs=ds_res,
timeseries_fn=timeseries_fn,
output_folder=self.root.path,
)
# merge with existing reservoirs
if (
not overwrite_existing
and self._MAPS["reservoir_area"] in self.staticmaps.data
):
inv_rename = {
v: k
for k, v in self._MAPS.items()
if v in self.staticmaps.data.data_vars
}
ds_res = workflows.reservoirs.merge_reservoirs(
ds_res,
self.staticmaps.data.rename(inv_rename),
duplicate_id=duplicate_id,
)
# Check if ds_res is None ie duplicate IDs
if ds_res is None:
logger.warning(
"Duplicate reservoir IDs found. Skipping adding new reservoirs."
)
return
else:
# remove all reservoir layers from the grid as some parameters
# like b or e will not be in ds_res and won't be overwritten
reservoir_maps = [
self._MAPS.get(k, k) for k in workflows.reservoirs.RESERVOIR_LAYERS
]
self.staticmaps.drop_vars(reservoir_maps, errors="ignore")
# add to grid
rmdict = {k: self._MAPS.get(k, k) for k in ds_res.data_vars}
self.set_grid(ds_res.rename(rmdict))
# write reservoirs with param values to geoms
self.set_geoms(gdf_res, name=geom_name)
# Prepare a combined geoms of all reservoirs
gdf_res_all = workflows.reservoirs.create_reservoirs_geoms(
ds_res.rename(rmdict),
)
self.set_geoms(gdf_res_all, name="reservoirs")
# update toml
self.set_config("model.reservoir__flag", True)
self.set_config(
"state.variables.reservoir_water_surface__instantaneous_elevation",
"reservoir_instantaneous_water_level",
)
for dvar in ds_res.data_vars:
if dvar in ["reservoir_area_id", "reservoir_outlet_id"]:
self._update_config_variable_name(self._MAPS[dvar], data_type=None)
elif dvar in self._WFLOW_NAMES:
self._update_config_variable_name(self._MAPS[dvar], data_type="static")
[docs]
@hydromt_step
def setup_soilmaps(
self,
soil_fn: str = "soilgrids",
ptf_ksatver: str = "brakensiek",
wflow_thicknesslayers: List[int] = [100, 300, 800],
output_names: Dict = {
"soil_water__saturated_volume_fraction": "soil_theta_s",
"soil_water__residual_volume_fraction": "soil_theta_r",
"soil_surface_water__vertical_saturated_hydraulic_conductivity": "soil_ksat_vertical", # noqa: E501
"soil__thickness": "soil_thickness",
"soil_water__vertical_saturated_hydraulic_conductivity_scale_parameter": "soil_f", # noqa: E501
"soil_layer_water__brooks-corey_exponent": "soil_brooks_corey_c",
},
):
"""
Derive several (layered) soil parameters.
Based on a database with physical soil properties using available point-scale
(pedo)transfer functions (PTFs) from literature with upscaling rules to
ensure flux matching across scales.
Currently, supported ``soil_fn`` is "soilgrids" and "soilgrids_2020".
``ptf_ksatver`` (PTF for the vertical hydraulic conductivity) options are
"brakensiek" and "cosby". "soilgrids" provides data at 7 specific depths,
while "soilgrids_2020" provides data averaged over 6 depth intervals.
This leads to small changes in the workflow:
(1) M parameter uses midpoint depths in soilgrids_2020 versus \
specific depths in soilgrids,
(2) weighted average of soil properties over soil thickness is done with \
the trapezoidal rule in soilgrids versus simple block weighted average in \
soilgrids_2020,
(3) the soil_brooks_corey_c parameter is computed as weighted average over \
wflow_sbm soil layers defined in ``wflow_thicknesslayers``.
The required data from soilgrids are soil bulk density 'bd_sl*' [g/cm3], \
clay content 'clyppt_sl*' [%], silt content 'sltppt_sl*' [%], organic carbon content \
'oc_sl*' [%], pH 'ph_sl*' [-], sand content 'sndppt_sl*' [%] and soil thickness \
'soilthickness' [cm].
A ``soil_mapping_fn`` can optionnally be provided to derive parameters based
on soil texture classes. A default table *soil_mapping_default* is available
to derive the infiltration capacity of the soil.
The following maps are added to grid:
* **soil_theta_s** map:
average saturated soil water content [m3/m3]
* **soil_theta_r** map:
average residual water content [m3/m3]
* **soil_ksat_vertical** map:
vertical saturated hydraulic conductivity at soil surface [mm/day]
* **soil_thickness** map:
soil thickness [mm]
* **soil_f** map: scaling parameter controlling the decline of ksat_vertical \
[mm-1] (fitted with curve_fit (scipy.optimize)), bounds are checked
* **soil_f_** map:
scaling parameter controlling the decline of soil_ksat_vertical \
[mm-1] (fitted with numpy linalg regression), bounds are checked
* **soil_brooks_corey_c_n** map:
Brooks Corey coefficients [-] based on pore size distribution, \
a map for each of the wflow_sbm soil layers (n in total)
* **meta_{soil_fn}_ksat_vertical_[z]cm** map: vertical hydraulic conductivity
[mm/day] at soil depths [z] of ``soil_fn`` data
[0.0, 5.0, 15.0, 30.0, 60.0, 100.0, 200.0]
* **meta_soil_texture** map: soil texture based on USDA soil texture triangle \
(mapping: [1:Clay, 2:Silty Clay, 3:Silty Clay-Loam, 4:Sandy Clay, 5:Sandy Clay-Loam, \
6:Clay-Loam, 7:Silt, 8:Silt-Loam, 9:Loam, 10:Sand, 11: Loamy Sand, 12:Sandy Loam])
Parameters
----------
soil_fn : {'soilgrids', 'soilgrids_2020'}
Name of RasterDataset source for soil parameter maps, see
data/data_sources.yml.
Should contain info for the 7 soil depths of soilgrids
(or 6 depths intervals for soilgrids_2020).
* Required variables: \
'bd_sl*' [g/cm3], 'clyppt_sl*' [%], 'sltppt_sl*' [%], 'oc_sl*' [%], 'ph_sl*' [-], \
'sndppt_sl*' [%], 'soilthickness' [cm]
ptf_ksatver : {'brakensiek', 'cosby'}
Pedotransfer function (PTF) to use for calculation of ksat vertical
(vertical saturated hydraulic conductivity [mm/day]).
By default 'brakensiek'.
wflow_thicknesslayers : list of int, optional
Thickness of soil layers [mm] for wflow_sbm soil model.
By default [100, 300, 800] for layers at depths 100, 400, 1200 and >1200 mm.
Used only for Brooks Corey coefficients.
output_names : dict, optional
Dictionary with output names that will be used in the model netcdf input
files. Users should provide the Wflow.jl variable name followed by the name
in the netcdf file.
"""
logger.info("Preparing soil parameter maps.")
self._update_naming(output_names)
# TODO add variables list with required variable names
dsin = self.data_catalog.get_rasterdataset(soil_fn, geom=self.region, buffer=2)
dsout = workflows.soilgrids(
ds=dsin,
ds_like=self.staticmaps.data,
ptfKsatVer=ptf_ksatver,
soil_fn=soil_fn,
wflow_layers=wflow_thicknesslayers,
).reset_coords(drop=True)
rmdict = {k: self._MAPS.get(k, k) for k in dsout.data_vars}
self.set_grid(dsout.rename(rmdict))
# Update the toml file
self.set_config("model.soil_layer__thickness", wflow_thicknesslayers)
self._update_config_variable_name(dsout.rename(rmdict).data_vars)
[docs]
@hydromt_step
def setup_ksathorfrac(
self,
ksat_fn: str | xr.DataArray,
variable: str | None = None,
resampling_method: str = "average",
output_name: str | None = None,
):
"""Set KsatHorFrac parameter values from a predetermined map.
This predetermined map contains (preferably) 'calibrated' values of \
the KsatHorFrac parameter. This map is either selected from the wflow Deltares data \
or created by a third party/ individual.
Parameters
----------
ksat_fn : str, xr.DataArray
The identifier of the KsatHorFrac dataset in the data catalog.
variable : str, optional
The variable name for the subsurface_ksat_horizontal_ratio map to
use in ``ksat_fn`` in case ``ksat_fn`` contains several variables.
By default None.
resampling_method : str, optional
The resampling method when up- or downscaled, by default "average"
output_name : str, optional
The name of the output map. If None (default), the name will be set
to the name of the ksat_fn DataArray.
"""
logger.info("Preparing KsatHorFrac parameter map.")
wflow_var = "subsurface_water__horizontal-to-vertical_saturated_hydraulic_conductivity_ratio" # noqa: E501
dain = self.data_catalog.get_rasterdataset(
ksat_fn,
geom=self.region,
buffer=2,
variables=variable,
single_var_as_array=True,
)
# Ensure its a DataArray
if isinstance(dain, xr.Dataset):
raise ValueError(
"The ksat_fn data contains several variables. \
Select the variable to use for subsurface_ksat_horizontal_ratio \
using 'variable' argument."
)
# Create scaled subsurface_ksat_horizontal_ratio map
daout = workflows.ksat_horizontal_ratio(
dain,
ds_like=self.staticmaps.data,
resampling_method=resampling_method,
)
if output_name is not None:
daout.name = output_name
self._update_naming({wflow_var: daout.name})
# Set the grid
self.set_grid(daout)
self._update_config_variable_name(daout.name)
[docs]
@hydromt_step
def setup_ksatver_vegetation(
self,
soil_fn: str = "soilgrids",
alfa: float = 4.5,
beta: float = 5,
output_name: str = "soil_ksat_vertical_vegetation",
):
"""Correct vertical saturated hydraulic conductivity with vegetation properties.
This allows to account for biologically-promoted soil structure and \
heterogeneities in natural landscapes based on the work of \
Bonetti et al. (2021) https://www.nature.com/articles/s43247-021-00180-0.
This method requires to have run setup_soilgrids and setup_lai first.
The following map is added to grid:
* **KsatVer_vegetation** map: saturated hydraulic conductivity considering \
vegetation characteristics [mm/d]
Parameters
----------
soil_fn : {'soilgrids', 'soilgrids_2020'}
Name of RasterDataset source for soil parameter maps, see
data/data_sources.yml.
Should contain info for the sand percentage of the upper layer
* Required variable: 'sndppt_sl1' [%]
alfa : float, optional
Shape parameter. The default is 4.5 when using LAI.
beta : float, optional
Shape parameter. The default is 5 when using LAI.
output_name : dict, optional
Name of the output map. By default 'KsatVer_vegetation'.
"""
logger.info("Modifying ksat_vertical based on vegetation characteristics")
wflow_var = self._WFLOW_NAMES[self._MAPS["ksat_vertical"]]
# open soil dataset to get sand percentage
sndppt = self.data_catalog.get_rasterdataset(
soil_fn, geom=self.region, buffer=2, variables=["sndppt_sl1"]
)
# in ksatver_vegetation, ksat_vertical should be provided in mm/d
inv_rename = {
v: k for k, v in self._MAPS.items() if v in self.staticmaps.data.data_vars
}
ksatver_vegetation = workflows.ksatver_vegetation(
ds_like=self.staticmaps.data.rename(inv_rename),
sndppt=sndppt,
alfa=alfa,
beta=beta,
)
self._update_naming({wflow_var: output_name})
# add to grid
self.set_grid(ksatver_vegetation, output_name)
# update config file
self._update_config_variable_name(output_name)
[docs]
@hydromt_step
def setup_lulcmaps_with_paddy(
self,
lulc_fn: str | Path | xr.DataArray,
paddy_class: int,
output_paddy_class: int | None = None,
lulc_mapping_fn: str | Path | pd.DataFrame | None = None,
paddy_fn: str | Path | xr.DataArray | None = None,
paddy_mapping_fn: str | Path | pd.DataFrame | None = None,
soil_fn: str | Path | xr.DataArray = "soilgrids",
wflow_thicknesslayers: List[int] = [50, 100, 50, 200, 800],
target_conductivity: List[None | int | float] = [
None,
None,
5,
None,
None,
],
lulc_vars: Dict = {
"landuse": None,
"vegetation_kext": "vegetation_canopy__light-extinction_coefficient",
"land_manning_n": "land_surface_water_flow__manning_n_parameter",
"soil_compacted_fraction": "soil~compacted__area_fraction",
"vegetation_root_depth": "vegetation_root__depth",
"vegetation_leaf_storage": "vegetation__specific-leaf_storage",
"vegetation_wood_storage": "vegetation_wood_water__storage_capacity",
"land_water_fraction": "land~water-covered__area_fraction",
"vegetation_crop_factor": "vegetation__crop_factor",
"vegetation_feddes_alpha_h1": "vegetation_root__feddes_critial_pressure_head_h~1_reduction_coefficient", # noqa: E501
"vegetation_feddes_h1": "vegetation_root__feddes_critial_pressure_head_h~1",
"vegetation_feddes_h2": "vegetation_root__feddes_critial_pressure_head_h~2",
"vegetation_feddes_h3_high": "vegetation_root__feddes_critial_pressure_head_h~3~high", # noqa: E501
"vegetation_feddes_h3_low": "vegetation_root__feddes_critial_pressure_head_h~3~low", # noqa: E501
"vegetation_feddes_h4": "vegetation_root__feddes_critial_pressure_head_h~4",
},
paddy_waterlevels: Dict = {
"demand_paddy_h_min": 20,
"demand_paddy_h_opt": 50,
"demand_paddy_h_max": 80,
},
save_high_resolution_lulc: bool = False,
output_names_suffix: str | None = None,
):
"""Set up landuse maps and parameters including for paddy fields.
THIS FUNCTION SHOULD BE RUN AFTER setup_soilmaps.
Lookup table `lulc_mapping_fn` columns are converted to lulc classes model
parameters based on literature. The data is remapped at its original resolution
and then resampled to the model resolution using the average value, unless noted
differently.
If paddies are present either directly as a class in the landuse_fn or in a
separate paddy_fn, the paddy class is used to derive the paddy parameters.
To allow for water to pool on the surface (for paddy/rice fields), the layers in
the model can be updated to new depths, such that we can allow a thin layer with
limited vertical conductivity. These updated layers means that the
``soil_brooks_corey_c`` parameter needs to be calculated again. Next, the
soil_ksat_vertical_factor layer corrects the vertical conductivity
(by multiplying) such that the bottom of the layer corresponds to the
``target_conductivity`` for that layer. This currently assumes the wflow models
to have an exponential declining vertical conductivity (using the ``f``
parameter). If no target_conductivity is specified for a layer (``None``),
the soil_ksat_vertical_factor value is set to 1.
The different values for the minimum/optimal/maximum water levels for paddy
fields will be added as constant values in the toml file, through the
``land~irrigated-paddy__min_depth.value = 20`` interface.
Adds model layers:
* **landuse** map:
Landuse class [-]
* **vegetation_kext** map:
Extinction coefficient in the canopy gap fraction equation [-]
* **vegetation_leaf_storage** map:
Specific leaf storage [mm]
* **vegetation_wood_storage** map:
Fraction of wood in the vegetation/plant [-]
* **vegetation_root_depth** map:
Length of vegetation roots [mm]
* **soil_compacted_fraction** map:
The fraction of compacted or urban area per grid cell [-]
* **land_water_fraction** map:
The fraction of open water per grid cell [-]
* **land_manning_n** map:
Manning Roughness [-]
* **vegetation_crop_factor** map:
Crop coefficient [-]
* **vegetation_feddes_alpha_h1** map:
Root water uptake reduction at soil water pressure head
h1 (0 or 1) [-]
* **vegetation_feddes_h1** map:
Soil water pressure head h1 at which root water
uptake is reduced (Feddes) [cm]
* **vegetation_feddes_h2** map:
Soil water pressure head h2 at which root water
uptake is reduced (Feddes) [cm]
* **vegetation_feddes_h3_high** map:
Soil water pressure head h3 (high) at which root water uptake is
reduced (Feddes) [cm]
* **vegetation_feddes_h3_low** map:
Soil water pressure head h3 (low) at which root water uptake is
reduced (Feddes) [cm]
* **vegetation_feddes_h4** map:
Soil water pressure head h4 at which root water
uptake is reduced (Feddes) [cm]
* **demand_paddy_h_min** value:
Minimum required water depth for paddy fields [mm]
* **demand_paddy_h_opt** value:
Optimal water depth for paddy fields [mm]
* **demand_paddy_h_max** value:
Maximum water depth for paddy fields [mm]
* **soil_ksat_vertical_factor**:
Map with a multiplication factor for the vertical conductivity [-]
Updates model layers:
* **soil_brooks_corey_c**:
Brooks Corey coefficients [-] based on pore size
distribution, a map for each of the wflow_sbm soil layers (updated based
on the newly specified layers)
Parameters
----------
lulc_fn : str, Path, xr.DataArray
RasterDataset or name in data catalog / path to landuse map.
paddy_class : int
Landuse class value for paddy fields either in landuse_fn or paddy_fn if
provided.
output_paddy_class : int, optional
Landuse class value for paddy fields in the output landuse map. If None,
the ``paddy_class`` is used, by default None. This can be useful when
merging paddy location from ``paddy_fn`` into ``landuse_fn``.
lulc_mapping_fn : str, Path, pd.DataFrame, optional
Path to a mapping csv file from landuse in source name to parameter values
in lulc_vars. If lulc_fn is one of {"globcover", "vito", "corine",
"esa_worldcover", "glmnco"}, a default mapping is used and this argument
becomes optional.
paddy_fn : str, Path, xr.DataArray, optional
RasterDataset or name in data catalog / path to paddy map.
paddy_mapping_fn : str, Path, pd.DataFrame, optional
Path to a mapping csv file from paddy in source name to parameter values
in lulc_vars. A default mapping table for rice parameters is used if not
provided.
soil_fn : str, Path, xr.DataArray, optional
Soil data to be used to recalculate the Brooks-Corey coefficients
(`soil_brooks_corey_c` parameter), based on the provided
``wflow_thicknesslayers``, by default "soilgrids", but should ideally
be equal to the data used in :py:meth:`setup_soilmaps`
* Required variables: 'bd_sl*' [g/cm3], 'clyppt_sl*' [%], 'sltppt_sl*' [%],
'ph_sl*' [-].
wflow_thicknesslayers: list
List of soil thickness per layer [mm], by default [50, 100, 50, 200, 800, ]
target_conductivity: list
List of target vertical conductivities [mm/day] for each layer in
``wflow_thicknesslayers``. Set value to `None` if no specific value is
required, by default [None, None, 5, None, None].
lulc_vars : Dict
Dictionnary of landuse parameters to prepare. The names are the
the columns of the mapping file and the values are the corresponding
Wflow.jl variables.
paddy_waterlevels : dict
Dictionary with the minimum, optimal and maximum water levels for paddy
fields [mm]. By default {"demand_paddy_h_min": 20, "demand_paddy_h_opt": 50,
"demand_paddy_h_max": 80}
save_high_resolution_lulc : bool
Save the high resolution landuse map merged with the paddies to the static
folder. By default False.
output_names_suffix : str, optional
Suffix to be added to the output names to avoid having to rename all the
columns of the mapping tables. For example if the suffix is "vito", all
variables in lulc_vars will be renamed to "landuse_vito", "Kext_vito", etc.
Note that the suffix will also be used to rename the paddy parameter
soil_ksat_vertical_factor but not the soil_brooks_corey_c parameter.
"""
logger.info("Preparing LULC parameter maps including paddies.")
if output_names_suffix is not None:
# rename lulc_vars with the suffix
output_names = {
v: f"{k}_{output_names_suffix}" for k, v in lulc_vars.items()
}
# Add soil_ksat_vertical_factor
output_names[self._WFLOW_NAMES[self._MAPS["soil_ksat_vertical_factor"]]] = (
f"soil_ksat_vertical_factor_{output_names_suffix}"
)
else:
output_names = {v: k for k, v in lulc_vars.items()}
# update self._MAPS and self._WFLOW_NAMES with user defined output names
self._update_naming(output_names)
# As landuse is not a wflow variable, we update the name manually in self._MAPS
rmdict = {"landuse": "meta_landuse"} if "landuse" in lulc_vars else {}
if output_names_suffix is not None:
self._MAPS["landuse"] = f"meta_landuse_{output_names_suffix}"
# rename dict for the staticmaps (hydromt names are not used in that case)
rmdict = {k: f"{k}_{output_names_suffix}" for k in lulc_vars.keys()}
if "landuse" in lulc_vars:
rmdict["landuse"] = f"meta_landuse_{output_names_suffix}"
# Check if soil data is available
if self._MAPS["ksat_vertical"] not in self.staticmaps.data.data_vars:
raise ValueError(
"ksat_vertical and f are required to update the soil parameters with "
"paddies. Please run setup_soilmaps first."
)
if lulc_mapping_fn is None:
lulc_mapping_fn = f"{lulc_fn}_mapping_default"
# read landuse map and mapping table
landuse = self.data_catalog.get_rasterdataset(
lulc_fn, geom=self.region, buffer=2, variables=["landuse"]
)
df_mapping = self.data_catalog.get_dataframe(
lulc_mapping_fn,
driver_kwargs={"index_col": 0}, # only used if fn_map is a file path
)
output_paddy_class = (
paddy_class if output_paddy_class is None else output_paddy_class
)
# if needed, add paddies to landuse
if paddy_fn is not None:
# Read paddy map and mapping table
paddy = self.data_catalog.get_rasterdataset(
paddy_fn, geom=self.region, buffer=2, variables=["paddy"]
)
if paddy_mapping_fn is None:
paddy_mapping_fn = "paddy_mapping_default"
df_paddy_mapping = self.data_catalog.get_dataframe(
paddy_mapping_fn,
driver_kwargs={"index_col": 0},
)
landuse, df_mapping = workflows.add_paddy_to_landuse(
landuse,
paddy,
paddy_class,
output_paddy_class=output_paddy_class,
df_mapping=df_mapping,
df_paddy_mapping=df_paddy_mapping,
)
if save_high_resolution_lulc:
output_dir = join(self.root.path, "maps")
if not os.path.exists(output_dir):
os.makedirs(output_dir)
landuse.raster.to_raster(join(output_dir, "landuse_with_paddy.tif"))
df_mapping.to_csv(join(output_dir, "landuse_with_paddy_mapping.csv"))
# Prepare landuse parameters
landuse_maps = workflows.landuse(
da=landuse,
ds_like=self.staticmaps.data,
df=df_mapping,
params=list(lulc_vars.keys()),
)
self.set_grid(landuse_maps.rename(rmdict))
# update config
self._update_config_variable_name(landuse_maps.rename(rmdict).data_vars)
# Update soil parameters if there are paddies in the domain
# Get paddy pixels at model resolution
wflow_paddy = landuse_maps["landuse"] == output_paddy_class
if wflow_paddy.any():
if self.get_config("model.soil_layer__thickness") == len(
wflow_thicknesslayers
):
logger.info(
"same thickness already present, skipping updating"
" `soil_brooks_corey_c` parameter"
)
update_c = False
else:
logger.info(
"Different thicknesslayers requested, updating "
"`soil_brooks_corey_c` parameter"
)
update_c = True
# Read soil data
soil = self.data_catalog.get_rasterdataset(
soil_fn, geom=self.region, buffer=2
)
# update soil parameters soil_brooks_corey_c and soil_ksat_vertical_factor
inv_rename = {
v: k
for k, v in self._MAPS.items()
if v in self.staticmaps.data.data_vars
}
soil_maps = workflows.update_soil_with_paddy(
ds=soil,
ds_like=self.staticmaps.data.rename(inv_rename),
paddy_mask=wflow_paddy,
soil_fn=soil_fn,
update_c=update_c,
wflow_layers=wflow_thicknesslayers,
target_conductivity=target_conductivity,
)
self.set_grid(
soil_maps["soil_ksat_vertical_factor"],
name=self._MAPS["soil_ksat_vertical_factor"],
)
self._update_config_variable_name(self._MAPS["soil_ksat_vertical_factor"])
if "soil_brooks_corey_c" in soil_maps:
self.set_grid(
soil_maps["soil_brooks_corey_c"],
name=self._MAPS["soil_brooks_corey_c"],
)
self._update_config_variable_name(self._MAPS["soil_brooks_corey_c"])
self.set_config("model.soil_layer__thickness", wflow_thicknesslayers)
# Add paddy water levels to the config
for key, value in paddy_waterlevels.items():
self.set_config(f"input.static.{self._WFLOW_NAMES[key]}.value", value)
# Update the states
self.set_config(
"state.variables.land_surface_water~paddy__depth", "demand_paddy_h"
)
else:
logger.info("No paddy fields found, skipping updating soil parameters")
[docs]
@hydromt_step
def setup_glaciers(
self,
glaciers_fn: str | Path | gpd.GeoDataFrame,
min_area: float = 1.0,
output_names: Dict = {
"glacier_surface__area_fraction": "glacier_fraction",
"glacier_ice__initial_leq-depth": "glacier_initial_leq_depth",
},
geom_name: str = "glaciers",
):
"""
Generate maps of glacier areas, area fraction and volume fraction.
The data is generated from features with ``min_area`` [km2] (default is 1 km2)
from a database with glacier geometry, IDs and metadata.
The required variables from glaciers_fn dataset are glacier ID 'simple_id'.
Optionally glacier area 'AREA' [km2] can be present to filter the glaciers
by size. If not present it will be computed on the fly.
Adds model layers:
* **meta_glacier_area_id** map: glacier IDs [-]
* **glacier_fraction** map: area fraction of glacier per cell [-]
* **glacier_initial_leq_depth** map: storage (volume) of glacier per cell [mm]
Parameters
----------
glaciers_fn :
Name of data source for glaciers, see data/data_sources.yml.
* Required variables: ['simple_id']
min_area : float, optional
Minimum glacier area threshold [km2], by default 0 (all included)
output_names : dict, optional
Dictionary with output names that will be used in the model netcdf input
files. Users should provide the Wflow.jl variable name followed by the name
in the netcdf file.
geom_name : str, optional
Name of the geometry to be used in the model, by default "glaciers" for
glaciers.geojson.
"""
self._update_naming(output_names)
# retrieve data for basin
logger.info("Preparing glacier maps.")
gdf_org = self.data_catalog.get_geodataframe(
glaciers_fn,
geom=self.basins_highres,
predicate="intersects",
handle_nodata=NoDataStrategy.IGNORE,
)
# Check if there are glaciers found
if gdf_org is None:
logger.info("Skipping method, as no data has been found")
return
# skip small size glacier
if "AREA" in gdf_org.columns and gdf_org.geometry.size > 0:
gdf_org = gdf_org[gdf_org["AREA"] >= min_area]
# get glacier maps and parameters
nb_glac = gdf_org.geometry.size
if nb_glac == 0:
logger.warning(
"No glaciers of sufficient size found within region!"
"Skipping glacier procedures!"
)
return
logger.info(f"{nb_glac} glaciers of sufficient size found within region.")
# add glacier maps
ds_glac = workflows.glaciermaps(
gdf=gdf_org,
ds_like=self.staticmaps.data,
id_column="simple_id",
elevtn_name=self._MAPS["elevtn"],
)
rmdict = {k: self._MAPS.get(k, k) for k in ds_glac.data_vars}
self.set_grid(ds_glac.rename(rmdict))
# update config
self._update_config_variable_name(ds_glac.rename(rmdict).data_vars)
self.set_config("model.glacier__flag", True)
self.set_config("state.variables.glacier_ice__leq-depth", "glacier_leq_depth")
# update geoms
self.set_geoms(gdf_org, name=geom_name)
[docs]
@hydromt_step
def setup_constant_pars(self, **kwargs):
"""Generate constant parameter maps for all active model cells.
Adds model layer:
* **param_name** map: constant parameter map.
Parameters
----------
dtype: str
data type
nodata: int or float
nodata value
kwargs
"param_name: value" pairs for constant grid.
Param_name should be the Wflow.jl variable name.
"""
wflow_variables = [v for k, v in self._WFLOW_NAMES.items()]
for wflow_var, value in kwargs.items():
if wflow_var not in wflow_variables:
raise ValueError(
f"Parameter {wflow_var} not recognised as a Wflow variable. "
f"Please check the name."
)
# check if param is already in toml and will be overwritten
if self.get_config(wflow_var) is not None:
logger.info(
f"Parameter {wflow_var} already in toml and will be overwritten."
)
# remove from config
self.config.data.pop(wflow_var, None)
# Add to config
self.set_config(f"input.static.{wflow_var}.value", value)
[docs]
@hydromt_step
def setup_grid_from_raster(
self,
raster_fn: str | xr.Dataset,
reproject_method: str,
variables: List[str] | None = None,
wflow_variables: List[str] | None = None,
fill_method: str | None = None,
) -> List[str]:
"""
Add data variable(s) from ``raster_fn`` to grid object.
If raster is a dataset, all variables will be added unless ``variables``
list is specified. The config toml can also be updated to include
the new maps using ``wflow_variables``.
Adds model layers:
* **raster.name** or **variables** grid: data from raster_fn
Parameters
----------
raster_fn: str
Source name of RasterDataset in data_catalog.
reproject_method: str
Reprojection method from rasterio.enums.Resampling.
Available methods: ['nearest', 'bilinear', 'cubic', 'cubic_spline', \
'lanczos', 'average', 'mode', 'gauss', 'max', 'min', 'med', 'q1', 'q3', \
'sum', 'rms']
variables: list, optional
List of variables to add to grid from raster_fn. By default all.
wflow_variables: list, optional
List of corresponding wflow variables to update the config toml
(e.g: ["vegetation_root__depth"]).
Should match the variables list. variables list should be provided unless
raster_fn contains a single variable (len 1).
fill_method : str, optional
If specified, fills nodata values using fill_nodata method.
Available methods are {'linear', 'nearest', 'cubic', 'rio_idw'}.
Returns
-------
list
Names of added model staticmap layers.
"""
logger.info(f"Preparing grid data from raster source {raster_fn}")
# Read raster data and select variables
ds = self.data_catalog.get_rasterdataset(
raster_fn,
geom=self.region,
buffer=2,
variables=variables,
single_var_as_array=False,
)
# Fill nodata
if fill_method is not None:
ds = ds.raster.interpolate_na(method=fill_method)
# Reprojection
ds_out = ds.raster.reproject_like(self.staticmaps.data, method=reproject_method)
# Add to grid
self.set_grid(ds_out)
# Update config
if wflow_variables is not None:
logger.info(f"Updating the config for wflow_variables: {wflow_variables}")
if variables is None:
if len(ds_out.data_vars) == 1:
variables = list(ds_out.data_vars.keys())
else:
raise ValueError(
"Cannot update the toml if raster_fn has more than \
one variable and variables list is not provided."
)
# Check on len
if len(wflow_variables) != len(variables):
raise ValueError(
f"Length of variables {variables} do not match wflow_variables \
{wflow_variables}. Cannot update the toml."
)
else:
for i in range(len(variables)):
self.set_config(f"input.static.{wflow_variables[i]}", variables[i])
[docs]
@hydromt_step
def setup_precip_forcing(
self,
precip_fn: str | xr.DataArray,
precip_clim_fn: str | xr.DataArray | None = None,
chunksize: int | None = None,
**kwargs,
) -> None:
"""Generate gridded precipitation forcing at model resolution.
Adds model layer:
* **precip**: precipitation [mm]
Parameters
----------
precip_fn : str, xarray.DataArray
Precipitation RasterDataset source.
* Required variable: 'precip' [mm]
* Required dimension: 'time' [timestamp]
precip_clim_fn : str, xarray.DataArray, optional
High resolution climatology precipitation RasterDataset source to correct
precipitation.
* Required variable: 'precip' [mm]
* Required dimension: 'time' [cyclic month]
chunksize: int, optional
Chunksize on time dimension for processing data (not for saving to disk!).
If None the data chunksize is used, this can however be optimized for
large/small catchments. By default None.
**kwargs : dict, optional
Additional arguments passed to the forcing function.
See hydromt.model.processes.meteo.precip for more details.
"""
starttime = self.get_config("time.starttime")
endtime = self.get_config("time.endtime")
freq = pd.to_timedelta(self.get_config("time.timestepsecs"), unit="s")
mask = self.staticmaps.data[self._MAPS["basins"]].values > 0
precip = self.data_catalog.get_rasterdataset(
precip_fn,
geom=self.region,
buffer=2,
time_tuple=(starttime, endtime),
variables=["precip"],
)
precip = precip.astype("float32")
if chunksize is not None:
precip = precip.chunk({"time": chunksize})
clim = None
if precip_clim_fn is not None:
clim = self.data_catalog.get_rasterdataset(
precip_clim_fn,
geom=precip.raster.box,
buffer=2,
variables=["precip"],
)
clim = clim.astype("float32")
precip_out = hydromt.model.processes.meteo.precip(
precip=precip,
da_like=self.staticmaps.data[self._MAPS["elevtn"]],
clim=clim,
freq=freq,
resample_kwargs=dict(label="right", closed="right"),
)
# Update meta attributes (used for default output filename later)
precip_out.attrs.update({"precip_fn": precip_fn})
if precip_clim_fn is not None:
precip_out.attrs.update({"precip_clim_fn": precip_clim_fn})
self.set_forcing(precip_out.where(mask), name="precip")
self._update_config_variable_name(self._MAPS["precip"], data_type="forcing")
[docs]
@hydromt_step
def setup_precip_from_point_timeseries(
self,
precip_fn: str | pd.DataFrame | xr.Dataset,
interp_type: str = "nearest",
precip_stations_fn: str | gpd.GeoDataFrame | None = None,
index_col: str | None = None,
buffer: float = 1e5,
**kwargs,
) -> None:
"""
Generate gridded precipitation from point timeseries (requires wradlib).
Adds model layer:
* **precip**: precipitation [mm]
Supported interpolation methods:
* uniform: Applies spatially uniform precipitation to the model. \
Only works when `precip_fn` contains a single timeseries.
* nearest: Nearest-neighbour interpolation, also works with a single station.
* idw: Inverse-distance weighting using 1 / distance ** p.
* linear: Linear interpolation using scipy.interpolate.LinearNDInterpolator, \
may result in missing values when station coverage is limited.
* ordinarykriging: Interpolate using Ordinary Kriging, see wradlib \
documentation for a full explanation: `wradlib.ipol.OrdinaryKriging <https://docs.wradlib.org/en/latest/generated/wradlib.ipol.OrdinaryKriging.html>`.
* externaldriftkriging: Kriging interpolation including an external drift, \
see wradlib documentation for a full explanation: \
`wradlib.ipol.ExternalDriftKriging <https://docs.wradlib.org/en/latest/generated/wradlib.ipol.ExternalDriftKriging.html>`.
Parameters
----------
precip_fn : str, pd.DataFrame, xr.Dataset
Precipitation source as DataFrame or GeoDataset. \
- DataFrame: the index column should contain time and the other \
columns should correspond to the name or ID values of the stations \
in `precip_stations_fn`.
- GeoDataset: the dataset should contain the variable 'precip' and \
the dimensions 'time' and 'index'.
* Required variable: 'time', 'precip' [mm]
interp_type : str
Interpolation method. Options: "nearest", "idw", "linear", \
"ordinarykriging", "externaldriftkriging".
precip_stations_fn : str, gpd.GeoDataFrame, optional
Source for the locations of the stations as points: (x, y) or (lat, lon). \
Only required if precip_fn is of type DataFrame.
index_col : str, optional
Column in precip_stations_fn to use for station ID values, by default None.
buffer: float, optional
Buffer around the basins in metres to determine which
stations to include. Set to 100 km (1e5 metres) by default.
**kwargs
Additional keyword arguments passed to the interpolation function. \
Supported arguments depend on the interpolation type:
- nnearest: Maximum number of neighbors for interpolation (default: 4).
- p: Power parameter for IDW interpolation (default: 2).
- remove_missing: Mask NaN values in the input data (default: False).
- cov: Covariance model for Kriging (default: '1.0 Exp(10000.)').
- src_drift: External drift values at source points (stations).
- trg_drift: External drift values at target points (grid).
See Also
--------
hydromt_wflow.workflows.forcing.spatial_interpolation
`wradlib.ipol.interpolate <https://docs.wradlib.org/en/latest/ipol.html#wradlib.ipol.interpolate>`
"""
starttime = self.get_config("time.starttime")
endtime = self.get_config("time.endtime")
timestep = self.get_config("time.timestepsecs")
freq = pd.to_timedelta(timestep, unit="s")
mask = self.staticmaps.data[self._MAPS["basins"]].values > 0
# Check data type of precip_fn if it is provided through the data catalog
if isinstance(precip_fn, str) and precip_fn in self.data_catalog:
_data_type = self.data_catalog[precip_fn].data_type
else:
_data_type = None
# Read the precipitation timeseries
if isinstance(precip_fn, xr.Dataset) or _data_type == "GeoDataset":
da_precip = self.data_catalog.get_geodataset(
precip_fn,
geom=self.region,
buffer=buffer,
variables=["precip"],
time_tuple=(starttime, endtime),
single_var_as_array=True,
)
else:
# Read timeseries
df_precip = self.data_catalog.get_dataframe(
precip_fn,
time_tuple=(starttime, endtime),
)
# Get locs
if interp_type == "uniform":
# Use basin centroid as 'station' for uniform case
gdf_stations = gpd.GeoDataFrame(
data=None,
geometry=[self.basins.union_all().centroid],
index=df_precip.columns,
crs=self.crs,
)
interp_type = "nearest"
if df_precip.shape[1] != 1:
raise ValueError(
f"""
Data source ({precip_fn}) should contain
a single timeseries, not {df_precip.shape[1]}."""
)
logger.info("Uniform interpolation is applied using method 'nearest'.")
elif precip_stations_fn is None:
raise ValueError(
"Using a DataFrame as precipitation source requires that station "
"locations are provided separately through precip_station_fn."
)
else:
# Load the stations and their coordinates
gdf_stations = self.data_catalog.get_geodataframe(
precip_stations_fn,
geom=self.basins,
buffer=buffer,
assert_gtype="Point",
handle_nodata=NoDataStrategy.IGNORE,
)
# Use station ids from gdf_stations when reading the DataFrame
if index_col is not None:
gdf_stations = gdf_stations.set_index(index_col)
# Index is required to contruct GeoDataArray
if gdf_stations.index.name is None:
gdf_stations.index.name = "stations"
# Convert to geodataset
da_precip = hydromt.vector.GeoDataArray.from_gdf(
gdf=gdf_stations,
data=df_precip,
name="precip",
index_dim=None,
dims=["time", gdf_stations.index.name],
keep_cols=False,
merge_index="gdf",
)
# Calling interpolation workflow
precip = workflows.forcing.spatial_interpolation(
forcing=da_precip,
interp_type=interp_type,
ds_like=self.staticmaps.data,
mask_name=self._MAPS["basins"],
**kwargs,
)
# Use precip workflow to create the forcing file
precip_out = hydromt.model.processes.meteo.precip(
precip=precip,
da_like=self.staticmaps.data[self._MAPS["elevtn"]],
clim=None,
freq=freq,
resample_kwargs=dict(label="right", closed="right"),
)
# Update meta attributes (used for default output filename later)
precip_out.attrs.update({"precip_fn": precip_fn})
precip_out = precip_out.astype("float32")
self.set_forcing(precip_out.where(mask), name="precip")
self._update_config_variable_name(self._MAPS["precip"], data_type="forcing")
# Add to geoms
gdf_stations = da_precip.vector.to_gdf().to_crs(self.crs)
self.set_geoms(gdf_stations, name="stations_precipitation")
[docs]
@hydromt_step
def setup_temp_pet_forcing(
self,
temp_pet_fn: str | xr.Dataset,
pet_method: str = "debruin",
press_correction: bool = True,
temp_correction: bool = True,
wind_correction: bool = True,
wind_altitude: int = 10,
reproj_method: str = "nearest_index",
fillna_method: str | None = None,
dem_forcing_fn: str | xr.DataArray | None = None,
skip_pet: bool = False,
chunksize: int | None = None,
) -> None:
"""Generate gridded temperature and reference evapotranspiration forcing.
If `temp_correction` is True, the temperature will be reprojected and then
downscaled to model resolution using the elevation lapse rate. For better
accuracy, you can provide the elevation grid of the climate data in
`dem_forcing_fn`. If not present, the upscaled elevation grid of the wflow model
is used ('land_elevation').
To compute PET (`skip_pet` is False), several methods are available. Before
computation, both the temperature and pressure can be downscaled. Wind speed
should be given at 2m altitude and can be corrected if `wind_correction` is True
and the wind data altitude is provided in `wind_altitude` [m].
Several methods to compute pet are available: {'debruin', 'makkink',
'penman-monteith_rh_simple', 'penman-monteith_tdew'}.
Depending on the methods, `temp_pet_fn` should contain temperature 'temp' [°C],
pressure 'press_msl' [hPa], incoming shortwave radiation 'kin' [W/m2], outgoing
shortwave radiation 'kout' [W/m2], wind speed 'wind' [m/s], relative humidity
'rh' [%], dew point temperature 'temp_dew' [°C], wind speed either total 'wind'
or the U- 'wind10_u' [m/s] and V- 'wind10_v' components [m/s].
Adds model layer:
* **pet**: reference evapotranspiration [mm]
* **temp**: temperature [°C]
Parameters
----------
temp_pet_fn : str, xarray.Dataset
Name or path of RasterDataset source with variables to calculate temperature
and reference evapotranspiration.
* Required variable for temperature: 'temp' [°C]
* Required variables for De Bruin reference evapotranspiration: \
'temp' [°C], 'press_msl' [hPa], 'kin' [W/m2], 'kout' [W/m2]
* Required variables for Makkink reference evapotranspiration: \
'temp' [°C], 'press_msl' [hPa], 'kin'[W/m2]
* Required variables for daily Penman-Monteith \
reference evapotranspiration: \
either {'temp' [°C], 'temp_min' [°C], 'temp_max' [°C], 'wind' [m/s], 'rh' [%], 'kin' \
[W/m2]} for 'penman-monteith_rh_simple' or {'temp' [°C], 'temp_min' [°C], 'temp_max' \
[°C], 'temp_dew' [°C], 'wind' [m/s], 'kin' [W/m2], 'press_msl' [hPa], 'wind10_u' [m/s],\
"wind10_v" [m/s]} for 'penman-monteith_tdew' (these are the variables available in ERA5)
pet_method : {'debruin', 'makkink', 'penman-monteith_rh_simple', \
'penman-monteith_tdew'}, optional
Reference evapotranspiration method, by default 'debruin'.
If penman-monteith is used, requires the installation of the pyet package.
press_correction, temp_correction : bool, optional
If True pressure, temperature are corrected using elevation lapse rate,
by default False.
dem_forcing_fn : str, default None
Elevation data source with coverage of entire meteorological forcing domain.
If temp_correction is True and dem_forcing_fn is provided this is used in
combination with elevation at model resolution to correct the temperature.
* Required variable: 'elevtn' [m+REF]
wind_correction : bool, optional
If True wind speed is corrected to wind at 2m altitude using
``wind_altitude``. By default True.
wind_altitude : int, optional
Altitude of wind speed [m] variable, by default 10. Only used if
``wind_correction`` is True.
skip_pet : bool, optional
If True calculate temp only.
reproj_method : str, optional
Reprojection method from rasterio.enums.Resampling. to reproject the climate
data to the model resolution. By default 'nearest_index'.
fillna_method: str, optional
Method to fill NaN cells within the active model domain in the
temperature data e.g. 'nearest'
By default None for no interpolation.
chunksize: int, optional
Chunksize on time dimension for processing data (not for saving to disk!).
If None the data chunksize is used, this can however be optimized for
large/small catchments. By default None.
"""
starttime = self.get_config("time.starttime")
endtime = self.get_config("time.endtime")
timestep = self.get_config("time.timestepsecs")
freq = pd.to_timedelta(timestep, unit="s")
mask = self.staticmaps.data[self._MAPS["basins"]].values > 0
variables = ["temp"]
if not skip_pet:
if pet_method == "debruin":
variables += ["press_msl", "kin", "kout"]
elif pet_method == "makkink":
variables += ["press_msl", "kin"]
elif pet_method == "penman-monteith_rh_simple":
variables += ["temp_min", "temp_max", "wind", "rh", "kin"]
elif pet_method == "penman-monteith_tdew":
variables += [
"temp_min",
"temp_max",
"wind10_u",
"wind10_v",
"temp_dew",
"kin",
"press_msl",
]
else:
methods = [
"debruin",
"makkink",
"penman-monteith_rh_simple",
"penman-monteith_tdew",
]
raise ValueError(
f"Unknown pet method {pet_method}, select from {methods}"
)
ds = self.data_catalog.get_rasterdataset(
temp_pet_fn,
geom=self.region,
buffer=1,
time_tuple=(starttime, endtime),
variables=variables,
single_var_as_array=False, # always return dataset
)
if chunksize is not None:
ds = ds.chunk({"time": chunksize})
for var in ds.data_vars:
ds[var] = ds[var].astype("float32")
dem_forcing = None
if dem_forcing_fn is not None:
dem_forcing = self.data_catalog.get_rasterdataset(
dem_forcing_fn,
geom=ds.raster.box, # clip dem with forcing bbox for full coverage
buffer=2,
variables=["elevtn"],
).squeeze()
dem_forcing = dem_forcing.astype("float32")
temp_in = hydromt.model.processes.meteo.temp(
ds["temp"],
dem_model=self.staticmaps.data[self._MAPS["elevtn"]],
dem_forcing=dem_forcing,
lapse_correction=temp_correction,
freq=None, # resample time after pet workflow
)
if (
"penman-monteith" in pet_method
): # also downscaled temp_min and temp_max for Penman needed
temp_max_in = hydromt.model.processes.meteo.temp(
ds["temp_max"],
dem_model=self.staticmaps.data[self._MAPS["elevtn"]],
dem_forcing=dem_forcing,
lapse_correction=temp_correction,
freq=None, # resample time after pet workflow
)
temp_max_in.name = "temp_max"
temp_min_in = hydromt.model.processes.meteo.temp(
ds["temp_min"],
dem_model=self.staticmaps.data[self._MAPS["elevtn"]],
dem_forcing=dem_forcing,
lapse_correction=temp_correction,
freq=None, # resample time after pet workflow
)
temp_min_in.name = "temp_min"
temp_in = xr.merge([temp_in, temp_max_in, temp_min_in])
if not skip_pet:
pet_out = hydromt.model.processes.meteo.pet(
ds[variables[1:]],
temp=temp_in,
dem_model=self.staticmaps.data[self._MAPS["elevtn"]],
method=pet_method,
press_correction=press_correction,
wind_correction=wind_correction,
wind_altitude=wind_altitude,
reproj_method=reproj_method,
freq=freq,
resample_kwargs=dict(label="right", closed="right"),
)
# Update meta attributes with setup opt
opt_attr = {
"pet_fn": temp_pet_fn,
"pet_method": pet_method,
}
pet_out.attrs.update(opt_attr)
self.set_forcing(pet_out.where(mask), name="pet")
self._update_config_variable_name(self._MAPS["pet"], data_type="forcing")
# make sure only temp is written to netcdf
if "penman-monteith" in pet_method:
temp_in = temp_in["temp"]
# resample temp after pet workflow
temp_out = hydromt.model.processes.meteo.resample_time(
temp_in,
freq,
upsampling="bfill", # we assume right labeled original data
downsampling="mean",
label="right",
closed="right",
conserve_mass=False,
)
# Update meta attributes with setup opt (used for default naming later)
opt_attr = {
"temp_fn": temp_pet_fn,
"temp_correction": str(temp_correction),
}
temp_out.attrs.update(opt_attr)
if fillna_method is not None:
temp_out = temp_out.raster.interpolate_na(
dim=temp_out.raster.x_dim,
method=fillna_method,
fill_value="extrapolate",
)
self.set_forcing(temp_out.where(mask), name="temp")
self._update_config_variable_name(self._MAPS["temp"], data_type="forcing")
[docs]
@hydromt_step
def setup_pet_forcing(
self,
pet_fn: str | xr.DataArray,
chunksize: int | None = None,
):
"""
Prepare PET forcing from existig PET data.
Adds model layer:
* **pet**: reference evapotranspiration [mm]
Parameters
----------
pet_fn: str, xr.DataArray
RasterDataset source or data for PET to be resampled.
* Required variable: 'pet' [mm]
chunksize: int, optional
Chunksize on time dimension for processing data (not for saving to disk!).
If None the data chunksize is used, this can however be optimized for
large/small catchments. By default None.
"""
logger.info("Preparing potential evapotranspiration forcing maps.")
starttime = self.get_config("time.starttime")
endtime = self.get_config("time.endtime")
freq = pd.to_timedelta(self.get_config("time.timestepsecs"), unit="s")
pet = self.data_catalog.get_rasterdataset(
pet_fn,
geom=self.region,
buffer=2,
variables=["pet"],
time_range=(starttime, endtime),
)
pet = pet.astype("float32")
pet_out = workflows.forcing.pet(
pet=pet,
ds_like=self.staticmaps.data,
freq=freq,
mask_name=self._MAPS["basins"],
chunksize=chunksize,
)
# Update meta attributes (used for default output filename later)
pet_out.attrs.update({"pet_fn": pet_fn})
self.set_forcing(pet_out, name="pet")
self._update_config_variable_name(self._MAPS["pet"], data_type="forcing")
[docs]
@hydromt_step
def setup_rootzoneclim(
self,
run_fn: str | Path | xr.Dataset,
forcing_obs_fn: str | Path | xr.Dataset,
forcing_cc_hist_fn: str | Path | xr.Dataset | None = None,
forcing_cc_fut_fn: str | Path | xr.Dataset | None = None,
chunksize: int | None = 100,
return_period: List[int] = [2, 3, 5, 10, 15, 20, 25, 50, 60, 100],
Imax: float = 2.0,
start_hydro_year: str = "Sep",
start_field_capacity: str = "Apr",
LAI: bool = False,
rootzone_storage: bool = False,
correct_cc_deficit: bool = False,
time_tuple: tuple | None = None,
time_tuple_fut: tuple | None = None,
missing_days_threshold: int | None = 330,
output_name_rootingdepth: str = "vegetation_root_depth_obs_20",
) -> None:
"""
Set the vegetation_root_depth.
Done by estimating the catchment-scale root-zone storage capacity from observed
hydroclimatic data (and optionally also for climate change historical and
future periods).
This presents an alternative approach to determine the vegetation_root_depth
based on hydroclimatic data instead of through a look-up table relating
land use to rooting depth (as usually done for the wflow_sbm model).
The method is based on the estimation of maximum annual storage deficits
based on precipitation and estimated actual evaporation time series,
which in turn are estimated from observed streamflow data and
long-term precipitation and potential evap. data, as explained in
Bouaziz et al. (2022).
The main assumption is that vegetation adapts its rootzone storage capacity
to overcome dry spells with a certain return period (typically 20 years for
forest ecosystems). In response to a changing climtate,
it is likely that vegetation also adapts its rootzone storage capacity,
thereby changing model parameters for future conditions.
This method also allows to estimate the change in rootzone storage capacity
in response to a changing climate.
As the method requires precipitation and potential evaporation timeseries,
it may be useful to run this method as an update step in the setting-up of
the hydrological model, once the forcing files have already been derived.
In addition the setup_soilmaps method is also required to calculate
the vegetation_root_depth (rootzone_storage / (theta_s-theta_r)).
The setup_laimaps method is also required if LAI is set to True
(interception capacity estimated from LAI maps, instead of providing
a default maximum interception capacity).
References
----------
Bouaziz, L. J. E., Aalbers, E. E., Weerts, A. H., Hegnauer, M., Buiteveld,
H., Lammersen, R., Stam, J., Sprokkereef, E., Savenije, H. H. G. and
Hrachowitz, M. (2022). Ecosystem adaptation to climate change: the
sensitivity of hydrological predictions to time-dynamic model parameters,
Hydrology and Earth System Sciences, 26(5), 1295-1318. DOI:
10.5194/hess-26-1295-2022.
Adds model layer:
* **vegetation_root_depth_{forcing}_{RP}** map: rooting depth [mm of the soil \
column] estimated from hydroclimatic data {forcing: obs, cc_hist or cc_fut} for \
different return periods RP. The translation to vegetation_root_depth is done by \
dividing the rootzone_storage by (theta_s - theta_r).
* **meta_rootzone_storage_{forcing}_{RP}** geom: polygons of rootzone \
storage capacity [mm of water] for each catchment estimated before filling \
the missing with data from downstream catchments.
* **meta_rootzone_storage_{forcing}_{RP}** map: rootzone storage capacity \
[mm of water] estimated from hydroclimatic data {forcing: obs, cc_hist or cc_fut} for \
different return periods RP. Only if rootzone_storage is set to True!
Parameters
----------
run_fn : str, Path, xr.Dataset
Geodataset with streamflow timeseries (m3/s) per x,y location.
The geodataset expects the coordinate names "index" (for each station id)
and the variable name "discharge".
forcing_obs_fn : str, Path, xr.Dataset
Gridded timeseries with the observed forcing [mm/timestep].
Expects to have variables "precip" and "pet".
forcing_cc_hist_fn : str, Path, xr.Dataset, optional
Gridded timeseries with the simulated historical forcing [mm/timestep],
based on a climate model. Expects to have variables "precip" and "pet".
The default is None.
forcing_cc_fut_fn : str, optional
Gridded timeseries with the simulated climate forcing [mm/timestep],
based on a climate model. Expects to have variables "precip" and "pet".
The default is None.
chunksize : int, optional
Chunksize on time dimension for processing data (not for saving to
disk!). The default is 100.
return_period : list, optional
List with one or more values indicating the return period(s) (in
years) for which the rootzone storage depth should be calculated. The
default is [2,3,5,10,15,20,25,50,60,100] years.
Imax : float, optional
The maximum interception storage capacity [mm]. The default is 2.0 mm.
start_hydro_year : str, optional
The start month (abbreviated to the first three letters of the month,
starting with a capital letter) of the hydrological year. The
default is 'Sep'.
start_field_capacity : str, optional
The end of the wet season / commencement of dry season. This is the
moment when the soil is at field capacity, i.e. there is no storage
deficit yet. The default is 'Apr'.
LAI : bool, optional
Determine whether the leaf area index will be used to
determine Imax. The default is False.
If set to True, requires to have run setup_laimaps.
rootzone_storage : bool, optional
Determines whether the rootzone storage maps
should be stored in the grid or not. The default is False.
correct_cc_deficit : bool, optional
Determines whether a bias-correction of the future deficit should be
applied using the cc_hist deficit. Only works if the time periods of
cc_hist and cc_fut are the same. If the climate change scenario and
hist period are bias-corrected, this should probably set to False.
The default is False.
time_tuple: tuple, optional
Select which time period to read from all the forcing files.
There should be some overlap between the time period available in the
forcing files for the historical period and in the observed streamflow data.
missing_days_threshold: int, optional
Minimum number of days within a year for that year to be counted in
the long-term Budyko analysis.
output_name_rootingdepth: str, optional
Update the wflow_sbm model config of the vegetation_root_depth variable with
the estimated vegetation_root_depth.
The default is vegetation_root_depth_obs_20,
which requires to have RP 20 in the list provided for \
the return_period argument.
"""
logger.info("Preparing climate based root zone storage parameter maps.")
# Open the data sets
ds_obs = self.data_catalog.get_rasterdataset(
forcing_obs_fn,
geom=self.region,
buffer=2,
variables=["pet", "precip"],
time_tuple=time_tuple,
)
ds_cc_hist = None
if forcing_cc_hist_fn is not None:
ds_cc_hist = self.data_catalog.get_rasterdataset(
forcing_cc_hist_fn,
geom=self.region,
buffer=2,
variables=["pet", "precip"],
time_tuple=time_tuple,
)
ds_cc_fut = None
if forcing_cc_fut_fn is not None:
ds_cc_fut = self.data_catalog.get_rasterdataset(
forcing_cc_fut_fn,
geom=self.region,
buffer=2,
variables=["pet", "precip"],
time_tuple=time_tuple_fut,
)
# observed streamflow data
dsrun = self.data_catalog.get_geodataset(
run_fn, single_var_as_array=False, time_tuple=time_tuple
)
# make sure dsrun overlaps with ds_obs, otherwise give error
if dsrun.time[0] < ds_obs.time[0]:
dsrun = dsrun.sel(time=slice(ds_obs.time[0], None))
if dsrun.time[-1] > ds_obs.time[-1]:
dsrun = dsrun.sel(time=slice(None, ds_obs.time[-1]))
if len(dsrun.time) == 0:
logger.error(
"No overlapping period between the meteo and observed streamflow data"
)
# check if setup_soilmaps and setup_laimaps were run when:
# if LAI == True and rooting_depth == True
if (LAI == True) and (self._MAPS["LAI"] not in self.staticmaps.data):
logger.error(
"LAI variable not found in grid. \
Set LAI to False or run setup_laimaps first"
)
if (self._MAPS["theta_r"] not in self.staticmaps.data) or (
self._MAPS["theta_s"] not in self.staticmaps.data
):
logger.error(
"theta_s or theta_r variables not found in grid. \
Run setup_soilmaps first"
)
# Run the rootzone clim workflow
inv_rename = {
v: k for k, v in self._MAPS.items() if v in self.staticmaps.data.data_vars
}
dsout, gdf = workflows.rootzoneclim(
dsrun=dsrun,
ds_obs=ds_obs,
ds_like=self.staticmaps.data.rename(inv_rename),
flwdir=self.flwdir,
ds_cc_hist=ds_cc_hist,
ds_cc_fut=ds_cc_fut,
return_period=return_period,
Imax=Imax,
start_hydro_year=start_hydro_year,
start_field_capacity=start_field_capacity,
LAI=LAI,
rootzone_storage=rootzone_storage,
correct_cc_deficit=correct_cc_deficit,
chunksize=chunksize,
missing_days_threshold=missing_days_threshold,
)
# set nodata value outside basin
dsout = dsout.where(self.staticmaps.data[self._MAPS["basins"]] > 0, -999)
for var in dsout.data_vars:
dsout[var].raster.set_nodata(-999)
self.set_grid(dsout)
self.set_geoms(gdf, name="rootzone_storage")
# update config
self.set_config("input.static.vegetation_root__depth", output_name_rootingdepth)
[docs]
@hydromt_step
def setup_1dmodel_connection(
self,
river1d_fn: str | Path | gpd.GeoDataFrame,
connection_method: str = "subbasin_area",
area_max: float = 30.0,
add_tributaries: bool = True,
include_river_boundaries: bool = True,
mapname: str = "1dmodel",
update_toml: bool = True,
toml_output: str = "netcdf_scalar",
**kwargs,
):
"""
Connect wflow to a 1D model by deriving linked subcatch (and tributaries).
There are two methods to connect models:
- `subbasin_area`:
creates subcatchments linked to the 1d river based
on an area threshold (area_max) for the subbasin size. With this method,
if a tributary is larger than the `area_max`, it will be connected to
the 1d river directly.
- `nodes`:
subcatchments are derived based on the 1driver nodes (used as
gauges locations). With this method, large tributaries can also be derived
separately using the `add_tributaries` option and adding a `area_max`
threshold for the tributaries.
If `add_tributary` option is on, you can decide to include or exclude the
upstream boundary of the 1d river as an additional tributary using the
`include_river_boundaries` option.
River edges or river nodes are snapped to the closest downstream wflow river
cell using the :py:meth:`hydromt.flw.gauge_map` method.
Optionally, the toml file can also be updated to save lateral.river.inwater to
save all river inflows for the subcatchments and lateral.river.q_av for the
tributaries using :py:meth:`hydromt_wflow.wflow.setup_config_output_timeseries`.
Adds model layer:
* **subcatchment_{mapname}** map/geom: connection subbasins between
wflow and the 1D model.
* **subcatchment_river_{mapname}** map/geom: connection subbasins between
wflow and the 1D model for river cells only.
* **gauges_{mapname}** map/geom, optional: outlets of the tributaries
flowing into the 1D model.
Parameters
----------
river1d_fn : str, Path, gpd.GeoDataFrame
GeodataFrame with the 1D model river network and nodes where to derive
subbasins for connection_method **nodes**.
connection_method : str, default subbasin_area
Method to connect wflow to the 1D model. Available methods are {
'subbasin_area', 'nodes'}.
area_max : float, default 10.0
Maximum area [km2] of the subbasins to connect to the 1D model in km2 with
connection_method **subbasin_area** or **nodes** with add_tributaries
set to True.
add_tributaries : bool, default True
If True, derive tributaries for the subbasins larger than area_max. Always
True for **subbasin_area** method.
include_river_boundaries : bool, default True
If True, include the upstream boundary(ies) of the 1d river as an
additional tributary(ies).
mapname : str, default 1dmodel
Name of the map to save the subcatchments and tributaries in the wflow model
staticmaps and geoms (subcatchment_{mapname}).
update_toml : bool, default True
If True, updates the wflow configuration file to save the required outputs
for the 1D model.
toml_output : str, optional
One of ['csv', 'netcdf_scalar', None] to update [output.csv] or
[output.netcdf_scalar] section of wflow toml file or do nothing. By
default, 'netcdf_scalar'.
**kwargs
Additional keyword arguments passed to the snapping method
hydromt.flw.gauge_map. See its documentation for more information.
See Also
--------
hydromt.flw.gauge_map
"""
# Check connection method values
if connection_method not in ["subbasin_area", "nodes"]:
raise ValueError(
f"Unknown connection method {connection_method},"
"select from ['subbasin_area', 'nodes']"
)
# read 1d model river network
gdf_riv = self.data_catalog.get_geodataframe(
river1d_fn,
geom=self.region,
buffer=2,
)
# derive subcatchments and tributaries
inv_rename = {v: k for k, v in self._MAPS.items() if v in self.staticmaps.data}
ds_out = workflows.wflow_1dmodel_connection(
gdf_riv,
ds_model=self.staticmaps.data.rename(inv_rename),
connection_method=connection_method,
area_max=area_max,
add_tributaries=add_tributaries,
include_river_boundaries=include_river_boundaries,
**kwargs,
)
# Derive tributary gauge map
if "gauges" in ds_out.data_vars:
self.set_grid(ds_out["gauges"], name=f"gauges_{mapname}")
# Derive the gauges staticgeoms
gdf_tributary = ds_out["gauges"].raster.vectorize()
centroid = utils.planar_operation_in_utm(
gdf_tributary["geometry"], lambda geom: geom.centroid
)
gdf_tributary["geometry"] = centroid
gdf_tributary["value"] = gdf_tributary["value"].astype(
ds_out["gauges"].dtype
)
self.set_geoms(gdf_tributary, name=f"gauges_{mapname}")
# Add a check that all gauges are on the river
if (
self.staticmaps.data[self._MAPS["rivmsk"]].raster.sample(gdf_tributary)
== self.staticmaps.data[self._MAPS["rivmsk"]].raster.nodata
).any():
river_upa = self.staticmaps.data[self._MAPS["rivmsk"]].attrs.get(
"river_upa", ""
)
logger.warning(
"Not all tributary gauges are on the river network and river "
"discharge cannot be saved. You should use a higher threshold "
f"for the subbasin area than {area_max} to match better the "
f"wflow river in your model {river_upa}."
)
all_gauges_on_river = False
else:
all_gauges_on_river = True
# Update toml
if update_toml and all_gauges_on_river:
self.setup_config_output_timeseries(
mapname=f"gauges_{mapname}",
toml_output=toml_output,
header=["Q"],
param=["river_water__volume_flow_rate"],
reducer=None,
)
# Derive subcatchment map
self.set_grid(ds_out["subcatch"], name=f"subcatchment_{mapname}")
gdf_subcatch = ds_out["subcatch"].raster.vectorize()
gdf_subcatch["value"] = gdf_subcatch["value"].astype(ds_out["subcatch"].dtype)
self.set_geoms(gdf_subcatch, name=f"subcatchment_{mapname}")
# Subcatchment map for river cells only (to be able to save river outputs
# in wflow)
self.set_grid(ds_out["subcatch_riv"], name=f"subcatchment_riv_{mapname}")
gdf_subcatch_riv = ds_out["subcatch_riv"].raster.vectorize()
gdf_subcatch_riv["value"] = gdf_subcatch_riv["value"].astype(
ds_out["subcatch"].dtype
)
self.set_geoms(gdf_subcatch_riv, name=f"subcatchment_riv_{mapname}")
# Update toml
if update_toml:
self.setup_config_output_timeseries(
mapname=f"subcatchment_river_{mapname}",
toml_output=toml_output,
header=["Qlat"],
param=["river_water_inflow~lateral__volume_flow_rate"],
reducer=["sum"],
)
[docs]
@hydromt_step
def setup_allocation_areas(
self,
waterareas_fn: str | gpd.GeoDataFrame,
priority_basins: bool = True,
minimum_area: float = 50.0,
output_name: str = "demand_allocation_area_id",
):
"""Create water demand allocation areas.
The areas are based on the wflow model basins (at model resolution), the
wflow model rivers and water areas or regions for allocation.
Water regions are generally defined by sub-river-basins within a Country. In
order to mimic reality, it is advisable to avoid cross-Country-border
abstractions. Whenever information is available, it is strongly recommended to
align the water regions with the actual areas managed by water management
authorities, such as regional water boards.
The allocation area will be an intersection of the wflow model basins and the
water areas. For areas that do not contain river cells after intersection with
the water areas, the priority_basins flag can be used to decide if these basins
should be merged with the closest downstream basin or with any large enough
basin in the same water area.
Parameters
----------
waterareas_fn : str | gpd.GeoDataFrame
Administrative boundaries GeoDataFrame data, this could be
e.g. water management areas by water boards or the administrative
boundaries of countries.
priority_basins : bool, optional
If True, merge the basins with the closest downstream basin, else merge
with any large enough basin in the same water area, by default True.
minimum_area : float
Minimum area of the subbasins to keep in km2. Default is 50 km2.
output_name : str, optional
Name of the allocation areas map to be saved in the wflow model staticmaps
and staticgeoms. Default is 'demand_allocation_area_id'.
"""
logger.info("Preparing water demand allocation map.")
self._update_naming({"land_water_allocation_area__count": output_name})
# Read the data
waterareas = self.data_catalog.get_geodataframe(
waterareas_fn,
geom=self.region,
)
# Create the allocation grid
inv_rename = {v: k for k, v in self._MAPS.items() if v in self.staticmaps.data}
da_alloc, gdf_alloc = workflows.demand.allocation_areas(
ds_like=self.staticmaps.data.rename(inv_rename),
waterareas=waterareas,
basins=self.basins,
priority_basins=priority_basins,
minimum_area=minimum_area,
)
self.set_grid(da_alloc, name=output_name)
# Update the config
self.set_config("input.static.land_water_allocation_area__count", output_name)
# Add alloc to geoms
self.set_geoms(gdf_alloc, name=output_name)
[docs]
@hydromt_step
def setup_allocation_surfacewaterfrac(
self,
gwfrac_fn: str | xr.DataArray,
waterareas_fn: str | xr.DataArray | None = None,
gwbodies_fn: str | xr.DataArray | None = None,
ncfrac_fn: str | xr.DataArray | None = None,
interpolate_nodata: bool = False,
mask_and_scale_gwfrac: bool = True,
output_name: str = "demand_surface_water_ratio",
):
"""Create the fraction of water allocated from surface water.
This fraction entails the division of the water demand between surface water,
ground water (aquifers) and non conventional sources (e.g. desalination plants).
The surface water fraction is based on the raw groundwater fraction, if
groundwater bodies are present (these are absent in e.g. mountainous regions),
a fraction of water consumed that is obtained by non-conventional means and the
water source areas.
Non-conventional water could e.g. be water acquired by desalination of ocean or
other brackish water.
Adds model layer:
* **demand_surface_water_ratio**: fraction of water allocated from surface water
[0-1]
Parameters
----------
gwfrac_fn : str | xr.DataArray
The raw groundwater fraction per grid cell. The values of these cells need
to be between 0 and 1.
waterareas_fn : str| xr.DataArray
The areas over which the water has to be distributed. This may either be
a global (or more local map). If not provided, the source areas created by
the `setup_allocation_areas` will be used.
gwbodies_fn : str | xr.DataArray | None
The presence of groundwater bodies per grid cell. The values are ought to
be binary (either 0 or 1). If they are not provided, we assume groundwater
bodies are present where gwfrac is more than 0.
ncfrac_fn : str | xr.DataArray | None
The non-conventional fraction. Same types of values apply as for
`gwfrac_fn`. If not provided, we assume no non-conventional sources are
used.
interpolate_nodata : bool, optional
If True, nodata values in the resulting demand_surface_water_ratio map will
be linearly
interpolated. Else a default value of 1 will be used for nodata values
(default).
mask_and_scale_gwfrac : bool, optional
If True, gwfrac will be masked for areas with no groundwater bodies. To keep
the average gwfrac used over waterareas similar after the masking, gwfrac
for areas with groundwater bodies can increase. If False, gwfrac will be
used as is. By default True.
output_name : str, optional
Name of the fraction of surface water used map to be saved in the wflow
model staticmaps file. Default is 'demand_surface_water_ratio'.
"""
logger.info("Preparing surface water fraction map.")
# Load the data
gwfrac_raw = self.data_catalog.get_rasterdataset(
gwfrac_fn,
geom=self.region,
buffer=2,
)
if gwbodies_fn is not None:
gwbodies = self.data_catalog.get_rasterdataset(
gwbodies_fn,
geom=self.region,
buffer=2,
)
else:
gwbodies = None
if ncfrac_fn is not None:
ncfrac = self.data_catalog.get_rasterdataset(
ncfrac_fn,
geom=self.region,
buffer=2,
)
else:
ncfrac = None
# check whether to use the models own allocation areas
if waterareas_fn is None:
logger.info("Using wflow model allocation areas.")
if self._MAPS["allocation_areas"] not in self.staticmaps.data:
logger.error(
"No allocation areas found. Run setup_allocation_areas first "
"or provide a waterareas_fn."
)
return
waterareas = self.staticmaps.data[self._MAPS["allocation_areas"]]
else:
waterareas = self.data_catalog.get_rasterdataset(
waterareas_fn,
geom=self.region,
buffer=2,
)
# Call the workflow
w_frac = workflows.demand.surfacewaterfrac_used(
gwfrac_raw=gwfrac_raw,
da_like=self.staticmaps.data[self._MAPS["elevtn"]],
waterareas=waterareas,
gwbodies=gwbodies,
ncfrac=ncfrac,
interpolate=interpolate_nodata,
mask_and_scale_gwfrac=mask_and_scale_gwfrac,
)
# Update the settings toml
wflow_var = "land_surface_water__withdrawal_fraction"
self._update_naming({wflow_var: output_name})
self.set_config(f"input.static.{wflow_var}", output_name)
# Set the dataarray to the wflow grid
self.set_grid(w_frac, name=output_name)
[docs]
@hydromt_step
def setup_domestic_demand(
self,
domestic_fn: str | xr.Dataset,
population_fn: str | xr.Dataset | None = None,
domestic_fn_original_res: float | None = None,
output_names: Dict = {
"land~domestic__gross_water_demand_volume_flux": "demand_domestic_gross",
"land~domestic__net_water_demand_volume_flux": "demand_domestic_net",
},
):
"""
Prepare domestic water demand maps from a raster dataset.
Both gross and netto domestic demand should be provided in `domestic_fn`. They
can either be cyclic or non-cyclic.
To improve accuracy, the domestic demand can be downsampled based on a provided
population dataset. If the data you are using was already downscaled using a
different source for population data, you may decide to first resample to the
original resolution of `domestic_fn` before downsampling with `population_fn`.
For example, the pcr_globwb dataset is at a resolution of 0.0083333333 degrees,
while the original data has a resolution of 0.5 degrees. Use the
`domestic_fn_original_res` parameter to specify the original resolution.
Adds model layer:
* **demand_domestic_gross**: gross domestic water demand [mm/day]
* **demand_domestic_net**: net domestic water demand [mm/day]
Parameters
----------
domestic_fn : str | xr.Dataset
The domestic dataset. This can either be the dataset directly (xr.Dataset),
a string referring to an entry in the data catalog or a dictionary
containing the name of the dataset (keyword: `source`) and any optional
keyword arguments (e.g. `version`). The data can be cyclic
(with a `time` dimension) or non-cyclic. Allowed cyclic data can be monthly
(12) or dayofyear (365 or 366).
* Required variables: 'domestic_gross' [mm/day], 'domestic_net' [mm/day]
population_fn : str | xr.Dataset
The population dataset in capita. Either provided as a dataset directly or
as a string referring to an entry in the data catalog.
domestic_fn_original_res : Optional[float], optional
The original resolution of the domestic dataset, by default None to skip
upscaling before downsampling with population.
output_names : dict, optional
Dictionary with output names that will be used in the model netcdf input
files. Users should provide the Wflow.jl variable name followed by the name
in the netcdf file.
"""
logger.info("Preparing domestic demand maps.")
self._update_naming(output_names)
# Set flag for cyclic data
_cyclic = False
# Read data
domestic_raw = self.data_catalog.get_rasterdataset(
domestic_fn,
geom=self.region,
buffer=2,
variables=["domestic_gross", "domestic_net"],
)
# Increase the buffer if original resolution is provided
if domestic_fn_original_res is not None:
buffer = np.ceil(domestic_fn_original_res / abs(domestic_raw.raster.res[0]))
domestic_raw = self.data_catalog.get_rasterdataset(
domestic_fn,
geom=self.region,
buffer=buffer,
variables=["domestic_gross", "domestic_net"],
)
# Check if data is time dependent
if "time" in domestic_raw.coords:
# Check that this is indeed cyclic data
if len(domestic_raw.time) in [12, 365, 366]:
_cyclic = True
domestic_raw["time"] = domestic_raw.time.astype("int32")
else:
raise ValueError(
"The provided domestic demand data is cyclic but the time "
"dimension does not match the expected length of 12, 365 or 366."
)
# Get population data
pop_raw = None
if population_fn is not None:
pop_raw = self.data_catalog.get_rasterdataset(
population_fn,
bbox=domestic_raw.raster.bounds,
buffer=2,
)
# Compute domestic demand
domestic, pop = workflows.demand.domestic(
domestic_raw,
ds_like=self.staticmaps.data,
popu=pop_raw,
original_res=domestic_fn_original_res,
)
# Add to grid
rmdict = {k: self._MAPS.get(k, k) for k in domestic.data_vars}
self.set_grid(domestic.rename(rmdict))
if population_fn is not None:
self.set_grid(pop, name="meta_population")
# Update toml
self.set_config("model.water_demand.domestic__flag", True)
data_type = "cyclic" if _cyclic else "static"
self._update_config_variable_name(domestic.rename(rmdict).data_vars, data_type)
[docs]
@hydromt_step
def setup_domestic_demand_from_population(
self,
population_fn: str | xr.Dataset,
domestic_gross_per_capita: float | List[float],
domestic_net_per_capita: float | List[float] | None = None,
output_names: Dict = {
"land~domestic__gross_water_demand_volume_flux": "demand_domestic_gross",
"land~domestic__net_water_demand_volume_flux": "demand_domestic_net",
},
):
"""
Prepare domestic water demand maps from statistics per capita.
Gross and net demands per capita can be provide as cyclic (list) or non-cyclic
(constant). The statistics are then multiplied by the population dataset to
derive the gross and net domestic demand.
Adds model layer:
* **demand_domestic_gross**: gross domestic water demand [mm/day]
* **demand_domestic_net**: net domestic water demand [mm/day]
Parameters
----------
population_fn : str | xr.Dataset
The (gridded) population dataset in capita. Either provided as a dataset
directly or as a string referring to an entry in the data catalog.
domestic_gross_per_capita : float | List[float]
The gross domestic water demand per capita [m3/day]. If cyclic, provide a
list with 12 values for monthly data or 365/366 values for daily data.
domestic_net_per_capita : float | List[float] | None
The net domestic water demand per capita [m3/day]. If cyclic, provide a
list with 12 values for monthly data or 365/366 values for daily data. If
not provided, the gross demand will be used as net demand.
output_names : dict, optional
Dictionary with output names that will be used in the model netcdf input
files. Users should provide the Wflow.jl variable name followed by the name
in the netcdf file.
"""
logger.info("Preparing domestic demand maps based on population.")
# Set flag for cyclic data
_cyclic = False
self._update_naming(output_names)
# Check if data is time dependent
time_length = len(np.atleast_1d(domestic_gross_per_capita))
if time_length in [12, 365, 366]:
_cyclic = True
elif time_length > 1:
raise ValueError(
"The provided domestic demand data is cyclic but the length "
f"({time_length})does not match the expected length of 12, 365 or 366."
)
if domestic_net_per_capita is None:
domestic_net_per_capita = domestic_gross_per_capita
logger.info("Net domestic demand not provided, using gross demand.")
# Get population data
popu = self.data_catalog.get_rasterdataset(
population_fn,
bbox=self.staticmaps.data.raster.bounds,
buffer=1000,
)
# Compute domestic demand
domestic, popu_scaled = workflows.demand.domestic_from_population(
popu,
ds_like=self.staticmaps.data,
gross_per_capita=domestic_gross_per_capita,
net_per_capita=domestic_net_per_capita,
)
# Add to grid
rmdict = {k: self._MAPS.get(k, k) for k in domestic.data_vars}
self.set_grid(domestic.rename(rmdict))
if population_fn is not None:
self.set_grid(popu_scaled, name="meta_population")
# Update toml
self.set_config("model.water_demand.domestic__flag", True)
data_type = "cyclic" if _cyclic else "static"
self._update_config_variable_name(domestic.rename(rmdict).data_vars, data_type)
[docs]
@hydromt_step
def setup_other_demand(
self,
demand_fn: str | Dict[str, Dict[str, Any]] | xr.Dataset,
variables: list = [
"industry_gross",
"industry_net",
"livestock_gross",
"livestock_net",
],
resampling_method: str = "average",
output_names: Dict = {
"land~industry__gross_water_demand_volume_flux": "demand_industry_gross",
"land~industry__net_water_demand_volume_flux": "demand_industry_net",
"land~livestock__gross_water_demand_volume_flux": "demand_livestock_gross",
"land~livestock__net_water_demand_volume_flux": "demand_livestock_net",
},
):
"""Create water demand maps from other sources (e.g. industry, livestock).
These maps are created from a supplied dataset that either contains one
or all of the following variables:
- `Industrial` water demand
- `Livestock` water demand
- `Domestic` water demand (without population downsampling)
For each of these datasets/ variables a gross and a netto water demand
should be provided. They can either be provided cyclic or non-cyclic. The
maps are then resampled to the model resolution using the provided
`resampling_method`.
Adds model layer:
* **{var}_gross**: gross water demand [mm/day]
* **{var}_net**: net water demand [mm/day]
Parameters
----------
demand_fn : str | Dict[str, Dict[str, Any]], xr.Dataset]
The water demand dataset. This can either be the dataset directly
(xr.Dataset), a string referring to an entry in the data catalog or
a dictionary containing the name of the dataset (keyword: `source`) and
any optional keyword arguments (e.g. `version`). The data can be cyclic
(with a `time` dimension) or non-cyclic. Allowed cyclic data can be monthly
(12) or dayofyear (365 or 366).
* Required variables: variables listed in `variables` in [mm/day].
variables : list, optional
The variables to be processed. Supported variables are ['domestic_gross',
'domestic_net', 'industry_gross', 'industry_net', 'livestock_gross',
'livestock_net']. By default gross and net demand for industry and livestock
are processed.
resampling_method : str, optional
Resampling method for the demand maps, by default "average"
output_names : dict, optional
Dictionary with output names that will be used in the model netcdf input
files. Users should provide the Wflow.jl variable name followed by the name
in the netcdf file.
"""
logger.info(f"Preparing water demand maps for {variables}.")
# Set flag for cyclic data
_cyclic = False
self._update_naming(output_names)
# Selecting data
demand_raw = self.data_catalog.get_rasterdataset(
demand_fn,
geom=self.region,
buffer=2,
variables=variables,
)
if "time" in demand_raw.coords:
# Check that this is indeed cyclic data
if len(demand_raw.time) in [12, 365, 366]:
_cyclic = True
demand_raw["time"] = demand_raw.time.astype("int32")
else:
raise ValueError(
"The provided demand data is cyclic but the time dimension does "
"not match the expected length of 12, 365 or 366."
)
# Create static water demand rasters
demand = workflows.demand.other_demand(
demand_raw,
ds_like=self.staticmaps.data,
ds_method=resampling_method,
)
rmdict = {k: self._MAPS.get(k, k) for k in demand.data_vars}
self.set_grid(demand.rename(rmdict))
# Update the settings toml
if "domestic_gross" in demand.data_vars:
self.set_config("model.water_demand.domestic__flag", True)
if "industry_gross" in demand.data_vars:
self.set_config("model.water_demand.industry__flag", True)
if "livestock_gross" in demand.data_vars:
self.set_config("model.water_demand.livestock__flag", True)
data_type = "cyclic" if _cyclic else "static"
self._update_config_variable_name(demand.rename(rmdict).data_vars, data_type)
[docs]
@hydromt_step
def setup_irrigation(
self,
irrigated_area_fn: str | Path | xr.DataArray,
irrigation_value: List[int],
cropland_class: List[int],
paddy_class: List[int] = [],
area_threshold: float = 0.6,
lai_threshold: float = 0.2,
lulcmap_name: str = "meta_landuse",
output_names: Dict = {
"land~irrigated-paddy_area__count": "demand_paddy_irrigated_mask",
"land~irrigated-non-paddy_area__count": "demand_nonpaddy_irrigated_mask",
"land~irrigated-paddy__irrigation_trigger_flag": "demand_paddy_irrigation_trigger", # noqa: E501
"land~irrigated-non-paddy__irrigation_trigger_flag": "demand_nonpaddy_irrigation_trigger", # noqa: E501
},
):
"""
Add required information to simulate irrigation water demand from grid.
THIS FUNCTION SHOULD BE RUN AFTER LANDUSE AND LAI MAPS ARE CREATED.
The function requires data that contains information about the location of the
irrigated areas (``irrigated_area_fn``). This, combined with the wflow landuse
map that contains classes for cropland (``cropland_class``) and optionally for
paddy (rice) (``paddy_class``), determines which locations are considered to be
paddy irrigation, and which locations are considered to be non-paddy irrigation.
Next, the irrigated area map is reprojected to the model resolution, where a
threshold (``area_threshold``) determines when pixels are considered to be
classified as irrigation or rainfed cells (both paddy and non-paddy). It adds
the resulting maps to the input data.
To determine when irrigation is allowed to occur, an irrigation trigger map is
defined. This is a cyclic map, that defines (with a mask) when irrigation is
expected to occur. This is done based on the Leaf Area Index (LAI), that is
already present in the wflow model configuration. We follow the procedure
described by Peano et al. (2019). They describe a threshold value based on the
LAI variability to determine the growing season. This threshold is defined as
20% (default value) of the LAI variability, but can be adjusted via the
``lai_threshold`` argument.
Adds model layers:
* **demand_nonpaddy_irrigated_mask**: Irrigated (non-paddy) mask [-]
* **demand_paddy_irrigated_mask**: Irrigated (paddy) mask [-]
* **demand_paddy_irrigation_trigger**: Map with monthly values, indicating
whether irrigation is allowed (1) or not (0) [-] for paddy areas
* **demand_paddy_irrigation_trigger**: Map with monthly values, indicating
whether irrigation is allowed (1) or not (0) [-] for non-paddy areas
Parameters
----------
irrigated_area_fn: str, Path, xarray.DataArray
Name of the (gridded) dataset that contains the location of irrigated areas.
irrigation_value: list
List of values that are considered to be irrigated areas in
``irrigated_area_fn``.
cropland_class: list
List of values that are considered to be cropland in the wflow landuse data.
paddy_class: int
Class in the wflow landuse data that is considered as paddy or rice. Leave
empty if not present (default).
area_threshold: float
Fractional area of a (wflow) pixel before it gets classified as an irrigated
pixel, by default 0.6
lai_threshold: float
Value of LAI variability to be used to determine the irrigation trigger. By
default 0.2.
lulcmap_name: str
Name of the landuse map layer in the wflow model staticmaps. By default
'meta_landuse'. Please update if your landuse map has a different name
(eg 'landuse_globcover').
output_names : dict, optional
Dictionary with output names that will be used in the model netcdf input
files. Users should provide the Wflow.jl variable name followed by the name
in the netcdf file.
See Also
--------
workflows.demand.irrigation
References
----------
Peano, D., Materia, S., Collalti, A., Alessandri, A., Anav, A., Bombelli, A., &
Gualdi, S. (2019). Global variability of simulated and observed vegetation
growing season. Journal of Geophysical Research: Biogeosciences, 124, 3569–3587.
https://doi.org/10.1029/2018JG004881
"""
logger.info("Preparing irrigation maps.")
if lulcmap_name in self.staticmaps.data:
# update the internal mapping
self._MAPS["landuse"] = lulcmap_name
else:
raise ValueError(
f"Landuse map {lulcmap_name} not found in the model grid. Please "
"provide a valid landuse map name or run setup_lulcmaps."
)
# Extract irrigated area dataset
irrigated_area = self.data_catalog.get_rasterdataset(
irrigated_area_fn, bbox=self.staticmaps.data.raster.bounds, buffer=3
)
# Get irrigation areas for paddy, non paddy and irrigation trigger
inv_rename = {v: k for k, v in self._MAPS.items() if v in self.staticmaps.data}
ds_irrigation = workflows.demand.irrigation(
da_irrigation=irrigated_area,
ds_like=self.staticmaps.data.rename(inv_rename),
irrigation_value=irrigation_value,
cropland_class=cropland_class,
paddy_class=paddy_class,
area_threshold=area_threshold,
lai_threshold=lai_threshold,
)
# Check if paddy and non paddy are present
cyclic_lai = len(self.staticmaps.data[self._MAPS["LAI"]].dims) > 2
if (
"demand_paddy_irrigated_mask" in ds_irrigation.data_vars
and ds_irrigation["demand_paddy_irrigated_mask"]
.raster.mask_nodata()
.sum()
.values
!= 0
):
# Select the paddy variables in output_names
paddy_names = {
k: v for k, v in output_names.items() if "irrigated-paddy" in k
}
self._update_naming(paddy_names)
ds_paddy = ds_irrigation[
["demand_paddy_irrigated_mask", "demand_paddy_irrigation_trigger"]
]
rmdict = {k: self._MAPS.get(k, k) for k in ds_paddy.data_vars}
self.set_grid(ds_paddy.rename(rmdict))
self.set_config("model.water_demand.paddy__flag", True)
self._update_config_variable_name(
self._MAPS.get(
"demand_paddy_irrigated_mask", "demand_paddy_irrigated_mask"
),
"static",
)
data_type = "cyclic" if cyclic_lai else "static"
self._update_config_variable_name(
self._MAPS.get(
"demand_paddy_irrigation_trigger", "demand_paddy_irrigation_trigger"
),
data_type,
)
else:
self.set_config("model.water_demand.paddy__flag", False)
if (
ds_irrigation["demand_nonpaddy_irrigated_mask"]
.raster.mask_nodata()
.sum()
.values
!= 0
):
nonpaddy_names = {
k: v for k, v in output_names.items() if "irrigated-non-paddy" in k
}
self._update_naming(nonpaddy_names)
ds_nonpaddy = ds_irrigation[
["demand_nonpaddy_irrigated_mask", "demand_nonpaddy_irrigation_trigger"]
]
rmdict = {k: self._MAPS.get(k, k) for k in ds_nonpaddy.data_vars}
self.set_grid(ds_nonpaddy.rename(rmdict))
# Update the config
self.set_config("model.water_demand.nonpaddy__flag", True)
self._update_config_variable_name(
self._MAPS.get(
"demand_nonpaddy_irrigated_mask", "demand_nonpaddy_irrigated_mask"
),
"static",
)
data_type = "cyclic" if cyclic_lai else "static"
self._update_config_variable_name(
self._MAPS.get(
"demand_nonpaddy_irrigation_trigger",
"demand_nonpaddy_irrigation_trigger",
),
data_type,
)
else:
self.set_config("model.water_demand.nonpaddy__flag", False)
[docs]
@hydromt_step
def setup_irrigation_from_vector(
self,
irrigated_area_fn: str | Path | gpd.GeoDataFrame,
cropland_class: List[int],
paddy_class: List[int] = [],
area_threshold: float = 0.6,
lai_threshold: float = 0.2,
output_names: Dict = {
"land~irrigated-paddy_area__count": "demand_paddy_irrigated_mask",
"land~irrigated-non-paddy_area__count": "demand_nonpaddy_irrigated_mask",
"land~irrigated-paddy__irrigation_trigger_flag": "demand_paddy_irrigation_trigger", # noqa: E501
"land~irrigated-non-paddy__irrigation_trigger_flag": "demand_nonpaddy_irrigation_trigger", # noqa: E501
},
):
"""
Add required information to simulate irrigation water demand from vector.
THIS FUNCTION SHOULD BE RUN AFTER LANDUSE AND LAI MAPS ARE CREATED.
The function requires data that contains information about the location of the
irrigated areas (``irrigated_area_fn``). This, combined with the wflow landuse
map that contains classes for cropland (``cropland_class``) and optionally for
paddy (rice) (``paddy_class``), determines which locations are considered to be
paddy irrigation, and which locations are considered to be non-paddy irrigation.
Next, the irrigated area geometries are rasterized, where a threshold
(``area_threshold``) determines when pixels are considered to be
classified as irrigation or rainfed cells (both paddy and non-paddy). It adds
the resulting maps to the input data.
To determine when irrigation is allowed to occur, an irrigation trigger map is
defined. This is a cyclic map, that defines (with a mask) when irrigation is
expected to occur. This is done based on the Leaf Area Index (LAI), that is
already present in the wflow model configuration. We follow the procedure
described by Peano et al. (2019). They describe a threshold value based on the
LAI variability to determine the growing season. This threshold is defined as
20% (default value) of the LAI variability, but can be adjusted via the
``lai_threshold`` argument.
Adds model layers:
* **demand_paddy_irrigated_mask**: Irrigated (paddy) mask [-]
* **demand_nonpaddy_irrigated_mask**: Irrigated (non-paddy) mask [-]
* **demand_paddy_irrigation_trigger**: Map with monthly values, indicating
whether irrigation is allowed (1) or not (0) [-] for paddy areas
* **demand_nonpaddy_irrigation_trigger**: Map with monthly values, indicating
whether irrigation is allowed (1) or not (0) [-] for non-paddy areas
Parameters
----------
irrigated_area_fn: str, Path, geopandas.GeoDataFrame
Name of the (vector) dataset that contains the location of irrigated areas.
cropland_class: list
List of values that are considered to be cropland in the wflow landuse data.
paddy_class: int
Class in the wflow landuse data that is considered as paddy or rice. Leave
empty if not present (default).
area_threshold: float
Fractional area of a (wflow) pixel before it gets classified as an irrigated
pixel, by default 0.6
lai_threshold: float
Value of LAI variability to be used to determine the irrigation trigger. By
default 0.2.
output_names : dict, optional
Dictionary with output names that will be used in the model netcdf input
files. Users should provide the Wflow.jl variable name followed by the name
in the netcdf file.
See Also
--------
workflows.demand.irrigation
References
----------
Peano, D., Materia, S., Collalti, A., Alessandri, A., Anav, A., Bombelli, A., &
Gualdi, S. (2019). Global variability of simulated and observed vegetation
growing season. Journal of Geophysical Research: Biogeosciences, 124, 3569–3587.
https://doi.org/10.1029/2018JG004881
"""
logger.info("Preparing irrigation maps.")
# Extract irrigated area dataset
irrigated_area = self.data_catalog.get_geodataframe(
irrigated_area_fn,
bbox=self.staticmaps.data.raster.bounds,
buffer=1000,
predicate="intersects",
handle_nodata=NoDataStrategy.IGNORE,
).copy() # Ensure we have a copy to resolve SettingWithCopyWarning
# Check if the geodataframe is empty
if irrigated_area is None or irrigated_area.empty:
logger.info("No irrigated areas found in the provided geodataframe.")
return
# Get irrigation areas for paddy, non paddy and irrigation trigger
inv_rename = {v: k for k, v in self._MAPS.items() if v in self.staticmaps.data}
ds_irrigation = workflows.demand.irrigation_from_vector(
gdf_irrigation=irrigated_area,
ds_like=self.staticmaps.data.rename(inv_rename),
cropland_class=cropland_class,
paddy_class=paddy_class,
area_threshold=area_threshold,
lai_threshold=lai_threshold,
)
# Check if paddy and non paddy are present
cyclic_lai = len(self.staticmaps.data[self._MAPS["LAI"]].dims) > 2
if (
"demand_paddy_irrigated_mask" in ds_irrigation.data_vars
and ds_irrigation["demand_paddy_irrigated_mask"]
.raster.mask_nodata()
.sum()
.values
!= 0
):
paddy_names = {
k: v for k, v in output_names.items() if "irrigated-paddy" in k
}
self._update_naming(paddy_names)
ds_paddy = ds_irrigation[
["demand_paddy_irrigated_mask", "demand_paddy_irrigation_trigger"]
]
rmdict = {k: self._MAPS.get(k, k) for k in ds_paddy.data_vars}
self.set_grid(ds_paddy.rename(rmdict))
# Update the config
self.set_config("model.water_demand.paddy__flag", True)
self._update_config_variable_name(
self._MAPS.get(
"demand_paddy_irrigated_mask", "demand_paddy_irrigated_mask"
),
"static",
)
data_type = "cyclic" if cyclic_lai else "static"
self._update_config_variable_name(
self._MAPS.get(
"demand_paddy_irrigation_trigger", "demand_paddy_irrigation_trigger"
),
data_type,
)
else:
self.set_config("model.water_demand.paddy__flag", False)
if (
ds_irrigation["demand_nonpaddy_irrigated_mask"]
.raster.mask_nodata()
.sum()
.values
!= 0
):
nonpaddy_names = {
k: v for k, v in output_names.items() if "irrigated-non-paddy" in k
}
self._update_naming(nonpaddy_names)
ds_nonpaddy = ds_irrigation[
["demand_nonpaddy_irrigated_mask", "demand_nonpaddy_irrigation_trigger"]
]
rmdict = {k: self._MAPS.get(k, k) for k in ds_nonpaddy.data_vars}
self.set_grid(ds_nonpaddy.rename(rmdict))
# Update the config
self.set_config("model.water_demand.nonpaddy__flag", True)
self._update_config_variable_name(
self._MAPS.get(
"demand_nonpaddy_irrigated_mask", "demand_nonpaddy_irrigated_mask"
),
"static",
)
data_type = "cyclic" if cyclic_lai else "static"
self._update_config_variable_name(
self._MAPS.get(
"demand_nonpaddy_irrigation_trigger",
"demand_nonpaddy_irrigation_trigger",
),
data_type,
)
else:
self.set_config("model.water_demand.nonpaddy__flag", False)
[docs]
@hydromt_step
def setup_cold_states(
self,
timestamp: str = None,
) -> None:
"""Prepare cold states for Wflow.
To be run last as this requires some soil parameters or constant_pars to be
computed already.
To be run after setup_reservoirs methods and setup_glaciers to also create
cold states for them if they are present in the basin.
This function is mainly useful in case the wflow model is read into Delft-FEWS.
Adds model layers:
* **soil_saturated_depth**: saturated store [mm]
* **snow_leq_depth**: snow storage [mm]
* **soil_temp**: top soil temperature [°C]
* **soil_unsaturated_depth**: amount of water in the unsaturated store, per
layer [mm]
* **snow_water_depth**: liquid water content in the snow pack [mm]
* **vegetation_water_depth**: canopy storage [mm]
* **river_instantaneous_q**: river discharge [m3/s]
* **river_instantaneous_h**: river water level [m]
* **subsurface_q**: subsurface flow [m3/d]
* **land_instantaneous_h**: land water level [m]
* **land_instantaneous_q** or **land_instantaneous_qx**+
**land_instantaneous_qy**: overland flow for kinwave [m3/s] or
overland flow in x/y directions for local-inertial [m3/s]
If reservoirs, also adds:
* **reservoir_instantaneous_water_level**: reservoir water level [m]
If glaciers, also adds:
* **glacier_leq_depth**: water within the glacier [mm]
If paddy, also adds:
* **demand_paddy_h**: water on the paddy fields [mm]
Parameters
----------
timestamp : str, optional
Timestamp of the cold states. By default uses the (starttime - timestepsecs)
from the config.
"""
states, states_config = workflows.prepare_cold_states(
self.staticmaps.data,
config=self.config.data,
timestamp=timestamp,
mask_name_land=self._MAPS["basins"],
mask_name_river=self._MAPS["rivmsk"],
)
self.set_states(states)
# Update config to read the states
self.set_config("model.cold_start__flag", False)
# Update states variables names in config
for option in states_config:
self.set_config(option, states_config[option])
[docs]
@hydromt_step
def upgrade_to_v1_wflow(self):
"""
Upgrade the model to wflow v1 format.
The function reads a TOML from wflow v0x and converts it to wflow v1x format.
The other components stay the same.
Lakes and reservoirs have also been merged into one structure and parameters in
the resulted staticmaps will be combined.
This function should be followed by write_config() to write the upgraded file.
"""
config_v0 = self.config.data.copy()
config_out = convert_to_wflow_v1_sbm(self.config.data)
# Update the config
with open(self._DATADIR / "default_config_headers.toml", "rb") as file:
self.config._data = tomllib.load(file)
for option in config_out:
self.set_config(option, config_out[option])
# Merge lakes and reservoirs layers
ds_res, vars_to_remove, config_opt = convert_reservoirs_to_wflow_v1_sbm(
self.staticmaps.data, config_v0
)
if ds_res is not None:
# Remove older maps from grid
self.staticmaps.drop_vars(vars_to_remove)
# Add new reservoir maps to grid
self.set_grid(ds_res)
# Update the config with the new names
for option in config_opt:
self.set_config(option, config_opt[option])
# I/O
[docs]
@hydromt_step
def write(
self,
config_filename: str | None = None,
grid_filename: str | None = None,
geoms_folder: str | None = "staticgeoms",
forcing_filename: str | None = None,
states_filename: str | None = None,
):
"""
Write the complete model schematization and configuration to file.
From this function, the output filenames/folder of the different components can
be set. If not set, the default filenames/folder are used.
To change more advanced settings, use the specific write methods directly.
Parameters
----------
config_filename : str, optional
Name of the config file, relative to model root. By default None to use the
default name.
grid_filename : str, optional
Name of the grid file, relative to model root/dir_input. By default None
to use the name as defined in the model config file.
geoms_folder : str, optional
Name of the geoms folder relative to grid_filename (ie model
root/dir_input). By default 'staticgeoms'.
forcing_filename : str, optional
Name of the forcing file relative to model root/dir_input. By default None
to use the name as defined in the model config file.
states_filename : str, optional
Name of the states file relative to model root/dir_input. By default None
to use the name as defined in the model config file.
"""
logger.info(f"Write model data to {self.root.path}")
# if in r, r+ mode, only write updated components
if not self.root.is_writing_mode():
logger.warning("Cannot write in read-only mode")
return
self.write_data_catalog()
_ = self.config.data # try to read default if not yet set
if "staticmaps" in self.components:
self.write_grid(filename=grid_filename)
if "geoms" in self.components:
self.write_geoms(folder=geoms_folder)
if "forcing" in self.components:
self.write_forcing(filename=forcing_filename)
if "tables" in self.components:
self.write_tables()
if "states" in self.components:
self.write_states(filename=states_filename)
# Write the config last as variables can get set in other write methods
self.write_config(filename=config_filename)
[docs]
@hydromt_step
def read(
self,
config_filename: str | None = None,
geoms_folder: str = "staticgeoms",
):
"""Read components from disk.
From this function, the input filenames/folder of the config and geoms
components can be set. For the others, the filenames/folder as defined in the
config file are used.
To change more advanced settings, use the specific read methods directly.
Parameters
----------
config_filename : str | None, optional
Name of the config file, relative to model root. By default None to use the
default name.
geoms_folder : str | None, optional
Name of the geoms folder relative to grid_filename (ie model
root/dir_input). By default 'staticgeoms'.
"""
self.read_config(filename=config_filename)
self.read_grid()
self.read_geoms(folder=geoms_folder)
self.read_forcing()
self.read_states()
self.read_tables()
[docs]
@hydromt_step
def read_config(
self,
filename: str | None = None,
):
"""
Read config from <root/filename>.
Parameters
----------
filename : str, optional
Name of the config file. By default None to use the default name
wflow_sbm.toml.
"""
# Call the component
self.config.read(filename)
[docs]
@hydromt_step
def write_config(
self,
filename: str | None = None,
config_root: Path | str | None = None,
):
"""
Write config to <(config_)root/config_fn>.
Parameters
----------
filename : str, optional
Name of the config file. By default None to use the default name
wflow_sbm.toml.
config_root : str, optional
Root folder to write the config file if different from model root (default).
Can be absolute or relative to model root.
"""
# Call the component
self.config.write(filename, config_root)
[docs]
def read_grid(
self,
**kwargs,
):
"""Read grid model data.
Checks the path of the file in the config toml using both ``input.path_static``
and ``dir_input``. If not found uses the default path ``staticmaps.nc`` in the
root folder.
Key-word arguments are passed to :py:meth:`~hydromt._io.readers._read_nc`
Parameters
----------
**kwargs : dict
Additional keyword arguments to be passed to the `read_nc` method.
"""
# Call the component method
self.staticmaps.read(**kwargs)
[docs]
@hydromt_step
def write_grid(
self,
filename: str | None = None,
**kwargs,
):
"""
Write grid to wflow static data file.
Checks the path of the file in the config toml using both ``input.path_static``
and ``dir_input``. If not found uses the default path ``staticmaps.nc`` in the
root folder.
If filename is supplied, the config will be updated.
Parameters
----------
filename : Path, str, optional
Name or path to the outgoing staticmaps file (including extension).
This is the path/name relative to the root folder and if present the
``dir_input`` folder. By default None.
**kwargs : dict
Additional keyword arguments to be passed to the `write_nc` method.
"""
# Call the component write method
self.staticmaps.write(filename=filename, **kwargs)
[docs]
def set_grid(
self,
data: xr.DataArray | xr.Dataset,
name: str | None = None,
):
"""Add data to grid.
All layers of grid must have identical spatial coordinates. If basin data is
available the grid will be masked to that upon setting.
The first fix is when data with a time axis is being added. Since Wflow.jl
v0.7.3, cyclic data at different lengths (12, 365, 366) is supported, as long as
the dimension name starts with "time". In this function, a check is done if a
time axis with that exact shape is already present in the grid object, and will
use that dimension (and its name) to set the data. If a time dimension does not
yet exist with that shape, it is created following the format
"time_{length_data}".
The other fix is that when the model is updated with a different number of
layers, this is not automatically updated correctly. With this fix, the old
layer dimension is removed (including all associated data), and the new data is
added with the correct "layer" dimension.
Parameters
----------
data: xarray.DataArray or xarray.Dataset
new map layer to add to grid
name: str, optional
Name of new map layer, this is used to overwrite the name of a DataArray and
ignored if data is a Dataset
"""
# Call the staticmaps set method
self.staticmaps.set(data, name=name)
[docs]
@hydromt_step
def read_geoms(
self,
folder: str = "staticgeoms",
):
"""
Read static geometries and adds to ``geoms``.
If ``dir_input`` is set in the config, the path where all static geometries are
read, will be constructed as ``<model_root>/<dir_input>/<geoms_fn>``.
Where <dir_input> is relative to the model root. Depending on the config value
``dir_input``, the path will be constructed differently.
Parameters
----------
folder : str, optional
Folder name/path where the static geometries are stored relative to the
model root and ``dir_input`` if any. By default "staticgeoms".
"""
self.geoms.read(folder=folder)
[docs]
@hydromt_step
def write_geoms(
self,
folder: str = "staticgeoms",
precision: int | None = None,
to_wgs84: bool = False,
**kwargs: dict,
):
"""
Write geoms in GeoJSON format.
Checks the path of ``geoms_fn`` using both model root and
``dir_input``. If not found uses the default path ``staticgeoms`` in the root
folder.
Parameters
----------
folder : str, optional
Folder name/path where the static geometries are stored relative to the
model root and ``dir_input`` if any. By default "staticgeoms".
precision : int, optional
Decimal precision to write the geometries. By default None to use 1 decimal
for projected crs and 6 for non-projected crs.
to_wgs84 : bool, optional
If True, geometries are transformed to WGS84 before writing. By default
False, which means geometries are written in their original CRS.
"""
# Call the component write method
self.geoms.write(
folder=folder,
to_wgs84=to_wgs84,
precision=precision,
**kwargs,
)
def set_geoms(self, geometry: gpd.GeoDataFrame | gpd.GeoSeries, name: str):
"""
Set geometries to the model.
This is an inherited method from HydroMT-core's GeomsModel.set_geoms.
"""
self.geoms.set(
geom=geometry,
name=name,
)
[docs]
@hydromt_step
def read_forcing(
self,
**kwargs,
):
"""
Read forcing.
Checks the path of the file in the config toml using both ``input.path_forcing``
and ``dir_input``. If not found uses the default path ``inmaps.nc`` in the
root folder.
If several files are used using '*' in ``input.path_forcing``, all corresponding
files are read and merged into one xarray dataset before being split to one
xarray DataArray per forcing variable in the hydromt ``forcing`` dictionary.
Parameters
----------
**kwargs : dict
Additional keyword arguments to be passed to the `read_nc` method.
"""
self.forcing.read(**kwargs)
[docs]
@hydromt_step
def write_forcing(
self,
filename: str | None = None,
output_frequency: str | None = None,
time_chunk: int = 1,
time_units="days since 1900-01-01T00:00:00",
decimals: int = 2,
overwrite: bool = False,
**kwargs,
):
"""Write forcing at <root/dir_input/filename> in model ready format.
If no ``filename` path is provided and path_forcing from the wflow toml exists,
the following default filenames are used:
* Default name format (with downscaling):
inmaps_sourcePd_sourceTd_methodPET_freq_startyear_endyear.nc
* Default name format (no downscaling):
inmaps_sourceP_sourceT_methodPET_freq_startyear_endyear.nc
Parameters
----------
filename : Path | str, optional
Path to save output netcdf file; if None the name is read from the wflow
toml file.
output_frequency : str (Offset), optional
Write several files for the forcing according to output_frequency. For
example 'Y' for one file per year or 'M' for one file per month.
By default writes the one file.
For more options, \
see https://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html#offset-aliases
time_chunk : int, optional
Chunksize on time dimension when saving to disk. By default 1.
time_units : str, optional
Common time units when writing several netcdf forcing files.
By default "days since 1900-01-01T00:00:00".
decimals : int, optional
Number of decimals to use when writing the forcing data.
By default 2.
overwrite : bool, optional
Whether to overwrite existing files. By default False.
**kwargs : dict
Additional keyword arguments to be passed to the `write_nc` method.
"""
# Call the component
self.forcing.write(
filename=filename,
output_frequency=output_frequency,
time_chunk=time_chunk,
time_units=time_units,
decimals=decimals,
overwrite=overwrite,
**kwargs,
)
[docs]
def set_forcing(
self,
data: xr.DataArray | xr.Dataset,
name: str | None = None,
):
"""Add data for the forcing component."""
self.forcing.set(data=data, name=name)
[docs]
@hydromt_step
def read_states(self):
"""
Read states at <root/dir_input/state.path_input>.
Checks the path of the file in the config toml using both ``state.path_input``
and ``dir_input``. If not found uses the default path ``instate/instates.nc``
in the root folder.
"""
self.states.read()
[docs]
@hydromt_step
def write_states(self, filename: str | None = None):
"""
Write states at <root/dir_input/state.path_input> in model ready format.
Checks the path of the file in the config toml using both ``state.path_input``
and ``dir_input``. If not found uses the default path ``instate/instates.nc``
in the root folder.
If filename is provided, it will be used and config ``state.path_input``
will be updated accordingly.
Parameters
----------
filename : str, Path, optional
Name of the states file, relative to model root and ``dir_input`` if any.
By default None to use the name as defined in the model config file.
"""
# Write
self.states.write(filename=filename)
[docs]
def set_states(
self,
data: xr.DataArray | xr.Dataset,
name: str | None = None,
):
"""Add data to states.
All layers of states must have identical spatial coordinates. This is an
inherited method from HydroMT-core's StatesModel.set_states with some fixes.
If basin data is available the states will be masked to that upon setting.
Parameters
----------
data: xarray.DataArray or xarray.Dataset
new map layer to add to states
name: str, optional
Name of new map layer, this is used to overwrite the name of a DataArray and
ignored if data is a Dataset
"""
if self._MAPS["basins"] in self.staticmaps.data:
data = utils.mask_raster_from_layer(
data, self.staticmaps.data[self._MAPS["basins"]]
)
# fall back on default set_states behaviour
self.states.set(data, name=name)
[docs]
@hydromt_step
def read_outputs(self):
"""
Read outputs at <root/dir_output>.
Reads netcdf_grid at ``output.netcdf_grid.path``, netcdf_scalar at
``output.netcdf_scalar.path`` and csv outputs at ``output.csv.path``.
Checks if ``dir_output`` is present.
"""
self.output_grid.read()
self.output_scalar.read()
self.output_csv.read()
[docs]
@hydromt_step
def read_tables(self, **kwargs):
"""Read table files at <root> and parse to dict of dataframes.
Parameters
----------
**kwargs : dict
Additional keyword arguments to be passed to the `pd.read_csv` method
Returns
-------
None
The tables are read and stored in the `self.tables.data` attribute.
"""
self.tables.read(float_precision="round_trip", **kwargs)
[docs]
@hydromt_step
def write_tables(self):
"""Write tables at <root>."""
self.tables.write()
[docs]
def set_tables(self, df: pd.DataFrame, name: str):
"""Add table <pandas.DataFrame> to model."""
self.tables.set(tables=df, name=name)
def set_root(self, root: Path | str, mode: ModeLike = "w"):
"""Set the model root folder.
Parameters
----------
root : Path, str
Path to the model root folder.
mode : str, optional
Mode to open the model root folder, by default 'w'.
Can be 'r' for read-only or 'r+' for read-write.
"""
self.root.set(root, mode=mode)
[docs]
def get_config(
self,
*args,
fallback: Any = None,
abs_path: bool = False,
) -> str | None:
"""Get a config value at key.
Parameters
----------
args : tuple, str
keys can given by multiple args: ('key1', 'key2')
or a string with '.' indicating a new level: ('key1.key2')
fallback: Any, optional
fallback value if key(s) not found in config, by default None.
abs_path: bool, optional
If True return the absolute path relative to the model root,
by default False.
NOTE: this assumes the config is located in model root!
Returns
-------
value : Any
dictionary value
Examples
--------
>> # self.config = {'a': 1, 'b': {'c': {'d': 2}}}
>> get_config('a')
>> 1
>> get_config('b', 'c', 'd') # identical to get_config('b.c.d')
>> 2
>> get_config('b.c') # # identical to get_config('b','c')
>> {'d': 2}
"""
return self.config.get_value(
*args,
fallback=fallback,
abs_path=abs_path,
)
[docs]
def set_config(self, *args):
"""
Update the config toml at key(s) with values.
This function is made to maintain the structure of your toml file.
When adding keys it will look for the most specific header present in
the toml file and add it under that.
meaning that if you have a config toml that is empty and you run
``wflow_model.set_config("input.forcing.scale", 1)``
it will result in the following file:
.. code-block:: toml
input.forcing.scale = 1
however if your toml file looks like this before:
.. code-block:: toml
[input.forcing]
(i.e. you have a header in there that has no keys)
then after the insertion it will look like this:
.. code-block:: toml
[input.forcing]
scale = 1
.. warning::
Due to limitations of the underlying library it is currently not possible to
create new headers (i.e. groups like ``input.forcing`` in the example above)
programmatically, and they will need to be added to the default config
toml document
Parameters
----------
args : str, tuple, list
if tuple or list, minimal length of two
keys can given by multiple args: ('key1', 'key2', 'value')
or a string with '.' indicating a new level: ('key1.key2', 'value')
Examples
--------
.. code-block:: ipython
>> self.config
>> {'a': 1, 'b': {'c': {'d': 2}}}
>> self.set_config('a', 99)
>> {'a': 99, 'b': {'c': {'d': 2}}}
>> self.set_config('b', 'c', 'd', 99) # identical to set_config('b.d.e', 99)
>> {'a': 1, 'b': {'c': {'d': 99}}}
"""
key = ".".join(args[:-1])
value = args[-1]
self.config.set(key, value)
def _update_naming(self, rename_dict: dict):
"""Update the naming of the model variables.
Parameters
----------
rename_dict: dict
Dictionary with the wflow variable and new output name in file.
"""
_wflow_names_inv = {v: k for k, v in self._WFLOW_NAMES.items()}
_hydromt_names_inv = {v: k for k, v in self._MAPS.items()}
for wflow_var, new_name in rename_dict.items():
if wflow_var is None:
continue
# Find the previous name in self._WFLOW_NAMES
old_name = _wflow_names_inv.get(wflow_var, None)
if old_name is not None:
# Rename the variable in self._WFLOW_NAMES
self._WFLOW_NAMES.pop(old_name)
self._WFLOW_NAMES[new_name] = wflow_var
# Rename the variable in self._MAPS
hydromt_name = _hydromt_names_inv.get(old_name, None)
if hydromt_name is not None:
self._MAPS[hydromt_name] = new_name
else:
logger.warning(f"Wflow variable {wflow_var} not found, check spelling.")
def _update_config_variable_name(
self, data_vars: str | List[str], data_type: str | None = "static"
):
"""Update the variable names in the config file.
Parameters
----------
data_vars: list of str
List of variable names to update in the config file.
data_type: str, optional
Type of data (static, forcing, cyclic, None), by default "static"
"""
data_vars = [data_vars] if isinstance(data_vars, str) else data_vars
_prefix = f"input.{data_type}" if data_type is not None else "input"
for var in data_vars:
if var in self._WFLOW_NAMES:
# Get the name from the Wflow variable name
wflow_var = self._WFLOW_NAMES[var]
# Update the config variable name
self.set_config(f"{_prefix}.{wflow_var}", var)
# else not a wflow variable
# (spelling mistakes should have been checked in _update_naming)
## WFLOW specific data and method
@property
def flwdir(self) -> pyflwdir.FlwdirRaster:
"""Return the pyflwdir.FlwdirRaster object parsed from wflow ldd."""
if self._flwdir is None:
self.set_flwdir()
return self._flwdir
def set_flwdir(self, ftype="infer"):
"""Parse pyflwdir.FlwdirRaster object parsed from the wflow ldd."""
flwdir_name = self._MAPS["flwdir"]
self._flwdir = flw.flwdir_from_da(
self.staticmaps.data[flwdir_name],
ftype=ftype,
check_ftype=True,
mask=(self.staticmaps.data[self._MAPS["basins"]] > 0),
)
## WFLOW specific modification (clip for now) methods
[docs]
@hydromt_step
def clip_grid(self, region, buffer=0, align=None, crs=4326, inverse_clip=False):
"""Clip grid to subbasin.
Parameters
----------
region : dict
See :meth:`models.wflow.WflowModel.setup_basemaps`
buffer : int, optional
Buffer around subbasin in number of pixels, by default 0
align : float, optional
Align bounds of region to raster with resolution <align>, by default None
crs: int, optional
Default crs of the grid to clip.
inverse_clip: bool, optional
Flag to perform "inverse clipping": removing an upstream part of the model
instead of the subbasin itself, by default False
Returns
-------
xarray.DataSet
Clipped grid.
"""
basins_name = self._MAPS["basins"]
flwdir_name = self._MAPS["flwdir"]
# translate basin and outlet kinds to geom
# get basin geometry and clip data
kind = next(iter(region))
if kind in ["basin", "subbasin"]:
# parse_region_basin does not return xy, only geom...
# should be fixed in core
region_kwargs = _parse_region_value(
region.pop(kind),
data_catalog=self.data_catalog,
)
region_kwargs.update(region)
geom, _ = get_basin_geometry(
ds=self.staticmaps.data,
kind=kind,
basins_name=basins_name,
flwdir_name=flwdir_name,
**region_kwargs,
)
elif kind == "bbox":
logger.warning(
"Kind 'bbox' for the region is not recommended as it can lead "
"to mistakes in the catchment delineation. Use carefully."
)
geom = hydromt.processes.region.parse_region_bbox(region)
elif kind == "geom":
logger.warning(
"Kind 'geom' for the region is not recommended as it can lead "
"to mistakes in the catchment delineation. Use carefully."
)
geom = hydromt.processes.region.parse_region_geom(region)
else:
raise ValueError(
f"wflow region kind not understood or supported: {kind}. "
"Use 'basin', 'subbasin', 'bbox' or 'geom'."
)
# Remove upstream part from model
if inverse_clip:
geom = self.basins.overlay(geom, how="difference")
# clip based on subbasin args, geom or bbox
ds_grid = self.staticmaps.data.raster.clip_geom(
geom, align=align, buffer=buffer
)
ds_grid.coords["mask"] = ds_grid.raster.geometry_mask(geom)
ds_grid[basins_name] = ds_grid[basins_name].where(
ds_grid.coords["mask"], self.staticmaps.data[basins_name].raster.nodata
)
ds_grid[basins_name].attrs.update(
_FillValue=self.staticmaps.data[basins_name].raster.nodata
)
# Update flwdir grid and geoms
if self.crs is None and crs is not None:
self.set_crs(crs)
self.staticmaps._data = xr.Dataset()
self.set_grid(ds_grid)
# add pits at edges after clipping
self._flwdir = None # make sure old flwdir object is removed
self.staticmaps.data[self._MAPS["flwdir"]].data = self.flwdir.to_array("ldd")
# Reinitiliase geoms and re-create basins/rivers
self.geoms.clear()
self.basins
self.rivers
# Update reservoirs
if self._MAPS["reservoir_area_id"] in self.staticmaps.data:
reservoir = self.staticmaps.data[self._MAPS["reservoir_area_id"]]
if not np.any(reservoir > 0):
remove_maps = [
self._MAPS["reservoir_area_id"],
self._MAPS["reservoir_outlet_id"],
self._MAPS["reservoir_lower_id"],
self._MAPS["reservoir_storage_curve"],
self._MAPS["reservoir_rating_curve"],
self._MAPS["reservoir_area"],
self._MAPS["reservoir_initial_depth"],
self._MAPS["reservoir_demand"],
self._MAPS["reservoir_target_full_fraction"],
self._MAPS["reservoir_target_min_fraction"],
self._MAPS["reservoir_max_release"],
self._MAPS["reservoir_max_volume"],
"meta_reservoir_mean_outflow", # this is a hydromt meta map
self._MAPS["reservoir_outflow_threshold"],
self._MAPS["reservoir_b"],
self._MAPS["reservoir_e"],
]
self.staticmaps.drop_vars(remove_maps, errors="ignore")
# Update config
# Remove the absolute path and if needed remove reservoirs
# change reservoir__flag = true to false
self.set_config("model.reservoir__flag", False)
# remove states
self.config.remove(
"state.variables.reservoir_water_surface__instantaneous_elevation",
errors="ignore",
)
# Update tables
ids = np.unique(reservoir)
for res_id in ids:
keys_to_remove = [k for k in self.tables.data if str(res_id) in k]
for key in keys_to_remove:
del self.tables._tables[key]
[docs]
@hydromt_step
def clip_forcing(self):
"""Return clipped forcing for subbasin.
Returns
-------
xarray.DataSet
Clipped forcing.
"""
if len(self.forcing.data) > 0:
logger.info("Clipping NetCDF forcing..")
ds_forcing = self.forcing.data.raster.clip_bbox(
self.staticmaps.data.raster.bounds
)
self.set_forcing(ds_forcing)
[docs]
@hydromt_step
def clip_states(self):
"""Return clippped states for subbasin.
Returns
-------
xarray.DataSet
Clipped states.
"""
if len(self.states.data) > 0:
logger.info("Clipping NetCDF states..")
ds_states = self.states.data.raster.clip_bbox(
self.staticmaps.data.raster.bounds
)
# Check for reservoirs presence in the clipped model
remove_maps = []
if self._MAPS["reservoir_area_id"] not in self.staticmaps.data:
state_name = self.get_config(
"state.variables.reservoir_water_surface__instantaneous_elevation",
fallback="reservoir_instantaneous_water_level",
)
if state_name in ds_states:
remove_maps.extend([state_name])
ds_states = ds_states.drop_vars(remove_maps)
self.states._data = xr.Dataset() # Clear existing states
self.set_states(ds_states)