Source code for hydromt_wflow.wflow

"""Implement wflow model class"""
# Implement model class following model API

import os
from os.path import join, dirname, basename, isfile, isdir
from typing import Union, Optional
import glob
import numpy as np
import pandas as pd
import geopandas as gpd
import xarray as xr
from shapely.geometry import box
import pyproj
import toml
import codecs
from pyflwdir import core_d8, core_ldd, core_conversion
from dask.diagnostics import ProgressBar
import logging
import rioxarray  # required for rio accessor

# from dask.distributed import LocalCluster, Client, performance_report
import hydromt
from hydromt.models.model_api import Model
from hydromt import flw
from hydromt.io import open_mfraster

from . import utils, workflows, DATADIR


__all__ = ["WflowModel"]

logger = logging.getLogger(__name__)

# specify pcraster map types
# NOTE non scalar (float) data types only
PCR_VS_MAP = {
    "wflow_ldd": "ldd",
    "wflow_river": "bool",
    "wflow_streamorder": "ordinal",
    "wflow_gauges": "nominal",  # to avoid large memory usage in pcraster.aguila
    "wflow_subcatch": "nominal",  # idem.
    "wflow_landuse": "nominal",
    "wflow_soil": "nominal",
    "wflow_reservoirareas": "nominal",
    "wflow_reservoirlocs": "nominal",
    "wflow_lakeareas": "nominal",
    "wflow_lakelocs": "nominal",
    "wflow_glacierareas": "nominal",
}


[docs]class WflowModel(Model): """This is the wflow model class""" _NAME = "wflow" _CONF = "wflow_sbm.toml" _DATADIR = DATADIR _GEOMS = {} _MAPS = { "flwdir": "wflow_ldd", "elevtn": "wflow_dem", "subelv": "dem_subgrid", "uparea": "wflow_uparea", "strord": "wflow_streamorder", "basins": "wflow_subcatch", "rivlen": "wflow_riverlength", "rivmsk": "wflow_river", "rivwth": "wflow_riverwidth", "lndslp": "Slope", "rivslp": "RiverSlope", "rivdph": "RiverDepth", "rivman": "N_River", "gauges": "wflow_gauges", "landuse": "wflow_landuse", "resareas": "wflow_reservoirareas", "reslocs": "wflow_reservoirlocs", "lakeareas": "wflow_lakeareas", "lakelocs": "wflow_lakelocs", "glacareas": "wflow_glacierareas", "glacfracs": "wflow_glacierfrac", "glacstore": "wflow_glacierstore", } _FOLDERS = [ "staticgeoms", "instate", "run_default", ]
[docs] def __init__( self, root=None, mode="w", config_fn=None, data_libs=None, deltares_data=False, logger=logger, ): super().__init__( root=root, mode=mode, config_fn=config_fn, data_libs=data_libs, deltares_data=deltares_data, logger=logger, ) # wflow specific self._intbl = dict() self._flwdir = None
# COMPONENTS
[docs] def setup_basemaps( self, region, res=1 / 120.0, hydrography_fn="merit_hydro", basin_index_fn="merit_hydro_index", upscale_method="ihu", ): """ This component sets the ``region`` of interest and ``res`` (resolution in degrees) of the model. All DEM and flow direction related maps are then build. If the model resolution is larger than the source data resolution, the flow direction is upscaled using the ``upscale_method``, by default the Iterative Hydrography Upscaling (IHU). The default ``hydrography_fn`` is "merit_hydro" (`MERIT hydro <http://hydro.iis.u-tokyo.ac.jp/~yamadai/MERIT_Hydro/index.html>`_ at 3 arcsec resolution) Alternative sources include "merit_hydro_1k" at 30 arcsec resolution. Users can also supply their own elevation and flow direction data. Note that only EPSG:4326 base data supported. Adds model layers: * **wflow_ldd** map: flow direction in LDD format [-] * **wflow_subcatch** map: basin ID map [-] * **wflow_uparea** map: upstream area [km2] * **wflow_streamorder** map: Strahler stream order [-] * **wflow_dem** map: average elevation [m+REF] * **dem_subgrid** map: subgrid outlet elevation [m+REF] * **Slope** map: average land surface slope [m/m] * **basins** geom: basins boundary vector * **region** geom: region boundary vector Parameters ---------- hydrography_fn : str Name of data source for basemap parameters. * Required variables: ['flwdir', 'uparea', 'basins', 'strord', 'elevtn'] * Optional variables: ['lndslp', 'mask'] basin_index_fn : str Name of data source for basin_index data linked to hydrography_fn. region : dict Dictionary describing region of interest. See :py:function:~basin_mask.parse_region for all options res : float Output model resolution upscale_method : {'ihu', 'eam', 'dmm'} Upscaling method for flow direction data, by default 'ihu'. See Also -------- hydromt.workflows.parse_region hydromt.workflows.get_basin_geometry workflows.hydrography workflows.topography """ self.logger.info(f"Preparing base hydrography basemaps.") # retrieve global data (lazy!) ds_org = self.data_catalog.get_rasterdataset(hydrography_fn) # TODO support and test (user) data from other sources with other crs! if ds_org.raster.crs is None or ds_org.raster.crs.to_epsg() != 4326: raise ValueError("Only EPSG:4326 base data supported.") # get basin geometry and clip data kind, region = hydromt.workflows.parse_region(region, logger=self.logger) xy = None if kind in ["basin", "subbasin", "outlet"]: bas_index = self.data_catalog[basin_index_fn] geom, xy = hydromt.workflows.get_basin_geometry( ds=ds_org, kind=kind, basin_index=bas_index, logger=self.logger, **region, ) elif "bbox" in region: bbox = region.get("bbox") geom = gpd.GeoDataFrame(geometry=[box(*bbox)], crs=4326) elif "geom" in region: geom = region.get("geom") else: raise ValueError(f"wflow region argument not understood: {region}") if geom is not None and geom.crs is None: raise ValueError("wflow region geometry has no CRS") ds_org = ds_org.raster.clip_geom(geom, align=res, buffer=10) self.logger.debug(f"Adding basins vector to staticgeoms.") self.set_staticgeoms(geom, name="basins") # setup hydrography maps and set staticmap attribute with renamed maps ds_base, _ = workflows.hydrography( ds=ds_org, res=res, xy=xy, upscale_method=upscale_method, logger=self.logger, ) # Convert flow direction from d8 to ldd format flwdir_data = ds_base["flwdir"].values.astype(np.uint8) # force dtype # if d8 convert to ldd if core_d8.isvalid(flwdir_data): data = core_conversion.d8_to_ldd(flwdir_data) da_flwdir = xr.DataArray( name="flwdir", data=data, coords=ds_base.raster.coords, dims=ds_base.raster.dims, attrs=dict( long_name="ldd flow direction", _FillValue=core_ldd._mv, ), ) ds_base["flwdir"] = da_flwdir # Rename and add to staticmaps rmdict = {k: v for k, v in self._MAPS.items() if k in ds_base.data_vars} self.set_staticmaps(ds_base.rename(rmdict)) # setup topography maps ds_topo = workflows.topography( ds=ds_org, ds_like=self.staticmaps, method="average", logger=self.logger ) ds_topo["lndslp"] = np.maximum(ds_topo["lndslp"], 0.0) rmdict = {k: v for k, v in self._MAPS.items() if k in ds_topo.data_vars} self.set_staticmaps(ds_topo.rename(rmdict)) # set basin geometry self.logger.debug(f"Adding region vector to staticgeoms.") self.set_staticgeoms(self.region, name="region")
[docs] def setup_rivers( self, hydrography_fn, river_geom_fn=None, river_upa=30, rivdph_method="powlaw", slope_len=2e3, min_rivlen_ratio=0.1, min_rivdph=1, min_rivwth=30, smooth_len=5e3, rivman_mapping_fn=join(DATADIR, "wflow", "N_river_mapping.csv"), **kwargs, ): """ This component sets the all river parameter maps. The river mask is defined by all cells with a mimimum upstream area threshold `river_upa` [km2]. The river length is defined as the distance from the subgrid outlet pixel to the next upstream subgrid outlet pixel. The river slope is derived from the subgrid elevation difference between pixels at a half distance `slope_len` [m] up- and downstream from the subgrid outlet pixel. The river manning roughness coefficient is derived based on reclassification of the streamorder map using a lookup table `rivman_mapping_fn`. The river width is derived from the nearest river segment in `river_geom_fn`. Data gaps are filled by the nearest valid upstream value and averaged along the flow directions over a length `smooth_len` [m] The river depth is calculated using the `rivdph_method`, by default powlaw: h = hc*Qbf**hp, which is based on qbankfull discharge from the nearest river segment in `river_geom_fn` and takes optional arguments for the hc (default = 0.27) and hp (default = 0.30) parameters. For other methods see :py:meth:`hydromt.workflows.river_depth`. Adds model layers: * **wflow_river** map: river mask [-] * **wflow_riverlength** map: river length [m] * **wflow_riverwidth** map: river width [m] * **RiverDepth** map: bankfull river depth [m] * **RiverSlope** map: river slope [m/m] * **N_River** map: Manning coefficient for river cells [s.m^1/3] * **rivers** geom: river vector based on wflow_river mask Parameters ---------- hydrography_fn : str, Path Name of data source for hydrography data. Must be same as setup_basemaps for consistent results. * Required variables: ['flwdir', 'uparea', 'elevtn'] * Optional variables: ['rivwth', 'qbankfull'] river_geom_fn : str, Path, optional Name of data source for river data. * Required variables: ['rivwth', 'qbankfull'] river_upa : float minimum upstream area threshold for the river map [km2] slope_len : float length over which the river slope is calculated [km] min_rivlen_ratio: float minimum global river length to avg. cell resolution ratio, by default 0.1 rivdph_method : {'gvf', 'manning', 'powlaw'} see py:meth:`hydromt.workflows.river_depth` for details, by default "powlaw" smooth_len : float, optional Length [m] over which to smooth the output river width and depth, by default 5e3 min_rivdph : float, optional Minimum river depth [m], by default 1.0 min_rivwth : float, optional Minimum river width [m], by default 30.0 See Also -------- workflows.river_bathymetry hydromt.workflows.river_depth pyflwdir.FlwdirRaster.river_depth """ self.logger.info(f"Preparing river maps.") # read data ds_hydro = self.data_catalog.get_rasterdataset( hydrography_fn, geom=self.region, buffer=10 ) # get rivmsk, rivlen, rivslp # read model maps and revert wflow to hydromt map names inv_rename = {v: k for k, v in self._MAPS.items() if v in self.staticmaps} ds_riv = workflows.river( ds=ds_hydro, ds_model=self.staticmaps.rename(inv_rename), river_upa=river_upa, slope_len=slope_len, channel_dir="up", min_rivlen_ratio=min_rivlen_ratio, logger=self.logger, )[0] dvars = ["rivmsk", "rivlen", "rivslp"] rmdict = {k: self._MAPS.get(k, k) for k in dvars} self.set_staticmaps(ds_riv[dvars].rename(rmdict)) # TODO make separate workflows.river_manning method # Make N_River map from csv file with mapping between streamorder and N_River value strord = self.staticmaps[self._MAPS["strord"]].copy() df = pd.read_csv(rivman_mapping_fn, index_col=0, sep=",|;", engine="python") # max streamorder value above which values get the same N_River value max_str = df.index[-2] # if streamorder value larger than max_str, assign last value strord = strord.where(strord <= max_str, max_str) # handle missing value (last row of csv is mapping of missing values) strord = strord.where(strord != strord.raster.nodata, -999) strord.raster.set_nodata(-999) ds_nriver = workflows.landuse( da=strord, ds_like=self.staticmaps, fn_map=rivman_mapping_fn, logger=self.logger, ) self.set_staticmaps(ds_nriver) # get rivdph, rivwth # while we still have setup_riverwidth one can skip river_bathymetry here # TODO make river_geom_fn required after removing setup_riverwidth if river_geom_fn is not None: gdf_riv = self.data_catalog.get_geodataframe( river_geom_fn, geom=self.region ) # reread model data to get river maps inv_rename = {v: k for k, v in self._MAPS.items() if v in self.staticmaps} ds_riv1 = workflows.river_bathymetry( ds_model=self.staticmaps.rename(inv_rename), gdf_riv=gdf_riv, method=rivdph_method, smooth_len=smooth_len, min_rivdph=min_rivdph, min_rivwth=min_rivwth, logger=self.logger, **kwargs, ) rmdict = {k: v for k, v in self._MAPS.items() if k in ds_riv1.data_vars} self.set_staticmaps(ds_riv1.rename(rmdict)) # update config self.set_config("input.lateral.river.bankfull_depth", self._MAPS["rivdph"]) self.logger.debug(f"Adding rivers vector to staticgeoms.") self.staticgeoms.pop("rivers", None) # remove old rivers if in staticgeoms self.rivers # add new rivers to staticgeoms
[docs] def setup_hydrodem( self, elevtn_map="wflow_dem", river_routing="local-inertial", land_routing="kinematic-wave", ): """This component adds a hydrologically conditioned elevation (hydrodem) map for river and/or land local-inertial routing. River cells are always conditioned to D8 flow directions. Land cells are conditioned to D4 flow directions if land_routing="local-inertial", else to D8. If local inertial is selected for land routing, it is required to have a D4 conditioning, otherwise pits may remain in the land cells. The conditioned elevation can be based on the average cell elevation ("wflow_dem") or subgrid outlet pixel elevation ("dem_subgrid"). For local-inertial river routing the subgrid elevation might provide a better representation of the river elevation profile, however in combination with local-inertial land routing the subgrid elevation will likely overestimate the floodplain storage capacity. Note that the same input elevation map should be used for river bankfull elevation and land elevation when using local-inertial land routing. Adds model layers: * **hydrodem** map: hydrologically conditioned elevation [m+REF]. Parameters ---------- elevtn_map: {"wflow_dem", "dem_subgrid"} Name of staticmap to hydrologically condition, by default "wflow_dem". river_routing, land_routing : {"kinematic-wave", "local-inertial"} changes wflow config model.river_routing (by default "local-inertial") and model.land_routing (default "kinematic-wave") See Also -------- hydromt.flw.dem_adjust pyflwdir.FlwdirRaster.dem_adjust """ r_list = ["kinematic-wave", "local-inertial"] if not elevtn_map in self.staticmaps: raise ValueError(f'"{elevtn_map}" not found in staticmaps') if river_routing not in r_list: raise ValueError( f'river_routing="{river_routing}" unknown. Select from {r_list}.' ) if land_routing not in r_list: raise ValueError( f'land_routing="{land_routing}" unknown. Select from {r_list}.' ) postfix = {"wflow_dem": "_avg", "dem_subgrid": "_subgrid"}.get(elevtn_map, "") name = f"hydrodem{postfix}" self.logger.info(f"Preparing {name} map for routing.") connectivity = {"local-inertial": 4, "kinematic-wave": 8}[land_routing] name = f"hydrodem{postfix}_D{connectivity}" self.logger.info(f"Preparing {name} map for routing.") ds_out = flw.dem_adjust( da_flwdir=self.staticmaps[self._MAPS["flwdir"]], da_elevtn=self.staticmaps[elevtn_map], da_rivmsk=self.staticmaps[self._MAPS["rivmsk"]], flwdir=self.flwdir, connectivity=connectivity, river_d8=True, logger=self.logger, ).rename(name) self.set_staticmaps(ds_out) # update toml model.river_routing self.logger.debug(f'Update wflow config model.river_routing="{river_routing}"') self.set_config("model.river_routing", river_routing) self.logger.debug(f'Update wflow config model.land_routing="{land_routing}"') self.set_config("model.land_routing", land_routing) if river_routing == "local-inertial": self.set_config("input.lateral.river.bankfull_depth", self._MAPS["rivdph"]) self.set_config("input.lateral.river.bankfull_elevation", name) if land_routing == "local-inertial": self.set_config("input.lateral.land.elevation", name) self.config["state"]["lateral"]["land"].pop("q", None) self.config["output"]["lateral"]["land"].pop("q", None) self.set_config("state.lateral.land.qx", "qx_land") self.set_config("state.lateral.land.qy", "qy_land") else: self.config["state"]["lateral"]["land"].pop("qx", None) self.config["state"]["lateral"]["land"].pop("qy", None) self.config["output"]["lateral"]["land"].pop("qx", None) self.config["output"]["lateral"]["land"].pop("qy", None) self.set_config("state.lateral.land.q", "q_land")
def setup_riverwidth( self, predictor="discharge", fill=False, fit=False, min_wth=1.0, precip_fn="chelsa", climate_fn="koppen_geiger", **kwargs, ): """ This component sets the river width parameter based on a power-lay relationship with a predictor. By default the riverwidth is estimated based on discharge as ``predictor`` and used to set the riverwidth globally based on pre-defined power-law parameters per climate class. With ``fit`` set to True, the power-law relationsship paramters are set on-the-fly. With ``fill`` set to True, the estimated river widths are only used fill gaps in the observed data. Alternative ``predictor`` values are precip (accumulated precipitation) and uparea (upstream area). For these predictors values ``fit`` default to True. By default the predictor is based on discharge which is estimated through multiple linear regression with precipitation and upstream area per climate zone. * **wflow_riverwidth** map: river width [m] Parameters ---------- predictor : {"discharge", "precip", "uparea"} Predictor used in the power-law equation: width = a * predictor ^ b. Discharge is based on multiple linear regression per climate zone. Precip is based on the 10x the daily average accumulated precipitation [m3/s]. Uparea is based on the upstream area grid [km2]. Other variables, e.g. bankfull discharge, can also be provided if present in the staticmaps fill : bool, optional If True (default), use estimate to fill gaps, outliers and lake/res areas in observed width data (if present); if False, set all riverwidths based on predictor (automatic choice if no observations found) fit : bool, optional kwarg If True, the power-law parameters are fitted on the fly By default True for all but "discharge" predictor. A-priori derived parameters will be overwritten if True. a, b : float, optional kwarg Manual power-law parameters min_wth : float minimum river width precip_fn : {'chelsa'} Source of long term precipitation grid if the predictor is set to 'discharge' or 'precip'. climate_fn: {'koppen_geiger'} Source of long-term climate grid if the predictor is set to 'discharge'. """ self.logger.warning( 'The "setup_riverwidth" method has been deprecated and will soon be removed. ' 'You can now use the "setup_river" method for all river parameters.' ) if not self._MAPS["rivmsk"] in self.staticmaps: raise ValueError( 'The "setup_riverwidth" method requires to run setup_river method first.' ) # derive river width data = {} if predictor in ["discharge", "precip"]: da_precip = self.data_catalog.get_rasterdataset( precip_fn, geom=self.region, buffer=2 ) da_precip.name = precip_fn data["da_precip"] = da_precip if predictor == "discharge": da_climate = self.data_catalog.get_rasterdataset( climate_fn, geom=self.region, buffer=2 ) da_climate.name = climate_fn data["da_climate"] = da_climate inv_rename = {v: k for k, v in self._MAPS.items() if v in self.staticmaps} da_rivwth = workflows.river_width( ds_like=self.staticmaps.rename(inv_rename), flwdir=self.flwdir, data=data, fill=fill, fill_outliers=kwargs.pop("fill_outliers", fill), min_wth=min_wth, mask_names=["lakeareas", "resareas", "glacareas"], predictor=predictor, a=kwargs.get("a", None), b=kwargs.get("b", None), logger=self.logger, fit=fit, **kwargs, ) self.set_staticmaps(da_rivwth, name=self._MAPS["rivwth"])
[docs] def setup_lulcmaps( self, lulc_fn="globcover", lulc_mapping_fn=None, lulc_vars=[ "landuse", "Kext", "N", "PathFrac", "RootingDepth", "Sl", "Swood", "WaterFrac", ], ): """ This component derives several wflow maps are derived based on landuse- landcover (LULC) data. Currently, ``lulc_fn`` can be set to the "vito", "globcover" or "corine", fo which lookup tables are constructed to convert lulc classses to model parameters based on literature. The data is remapped at its original resolution and then resampled to the model resolution using the average value, unless noted differently. Adds model layers: * **landuse** map: Landuse class [-] * **Kext** map: Extinction coefficient in the canopy gap fraction equation [-] * **Sl** map: Specific leaf storage [mm] * **Swood** map: Fraction of wood in the vegetation/plant [-] * **RootingDepth** map: Length of vegetation roots [mm] * **PathFrac** map: The fraction of compacted or urban area per grid cell [-] * **WaterFrac** map: The fraction of open water per grid cell [-] * **N** map: Manning Roughness [-] Parameters ---------- lulc_fn : {"globcover", "vito", "corine"} Name of data source in data_sources.yml file. lulc_mapping_fn : str Path to a mapping csv file from landuse in source name to parameter values in lulc_vars. lulc_vars : list List of landuse parameters to keep.\ By default ["landuse","Kext","N","PathFrac","RootingDepth","Sl","Swood","WaterFrac"] """ self.logger.info(f"Preparing LULC parameter maps.") if lulc_mapping_fn is None: fn_map = join(DATADIR, "lulc", f"{lulc_fn}_mapping.csv") else: fn_map = lulc_mapping_fn if not isfile(fn_map): self.logger.error(f"LULC mapping file not found: {fn_map}") return # read landuse map to DataArray da = self.data_catalog.get_rasterdataset( lulc_fn, geom=self.region, buffer=2, variables=["landuse"] ) # process landuse ds_lulc_maps = workflows.landuse( da=da, ds_like=self.staticmaps, fn_map=fn_map, params=lulc_vars, logger=self.logger, ) rmdict = {k: v for k, v in self._MAPS.items() if k in ds_lulc_maps.data_vars} self.set_staticmaps(ds_lulc_maps.rename(rmdict))
[docs] def setup_laimaps(self, lai_fn="model_lai"): """ This component sets leaf area index (LAI) climatology maps per month. The values are resampled to the model resolution using the average value. The only ``lai_fn`` currently supported is "modis_lai" based on MODIS data. Adds model layers: * **LAI** map: Leaf Area Index climatology [-] Resampled from source data using average. Assuming that missing values correspond to bare soil, these are set to zero before resampling. Parameters ---------- lai_fn : {'modis_lai'} Name of data source for LAI parameters, see data/data_sources.yml. * Required variables: ['LAI'] """ if lai_fn not in self.data_catalog: self.logger.warning(f"Invalid source '{lai_fn}', skipping setup_laimaps.") return # retrieve data for region self.logger.info(f"Preparing LAI maps.") da = self.data_catalog.get_rasterdataset(lai_fn, geom=self.region, buffer=2) da_lai = workflows.lai( da=da, ds_like=self.staticmaps, logger=self.logger, ) # Rename the first dimension to time rmdict = {da_lai.dims[0]: "time"} self.set_staticmaps(da_lai.rename(rmdict), name="LAI")
[docs] def setup_gauges( self, gauges_fn="grdc", source_gdf=None, index_col=None, snap_to_river=True, mask=None, derive_subcatch=False, derive_outlet=True, basename=None, update_toml=True, gauge_toml_header=None, gauge_toml_param=None, **kwargs, ): """This components sets the default gauge map based on basin outlets and additional gauge maps based on ``gauges_fn`` data. Supported gauge datasets include "grdc" or "<path_to_source>" for user supplied csv or geometry files with gauge locations. If a csv file is provided, a "x" or "lon" and "y" or "lat" column is required and the first column will be used as IDs in the map. If ``snap_to_river`` is set to True, the gauge location will be snapped to the boolean river mask. If ``derive_subcatch`` is set to True, an additional subcatch map is derived from the gauge locations. Adds model layers: * **wflow_gauges** map: gauge IDs map from catchment outlets [-] * **wflow_gauges_source** map: gauge IDs map from source [-] (if gauges_fn) * **wflow_subcatch_source** map: subcatchment based on gauge locations [-] (if derive_subcatch) * **gauges** geom: polygon of catchment outlets * **gauges_source** geom: polygon of gauges from source * **subcatch_source** geom: polygon of subcatchment based on gauge locations [-] (if derive_subcatch) Parameters ---------- gauges_fn : str, {"grdc"}, optional Known source name or path to gauges file geometry file, by default None. source_gdf : geopandas.GeoDataFame, optional Direct gauges file geometry, by default None. index_col : str, optional Column in gauges_fn to use for ID values, by default None (use the default index column) snap_to_river : bool, optional Snap point locations to the closest downstream river cell, by default True mask : np.boolean, optional If provided snaps to the mask, else snaps to the river (default). derive_subcatch : bool, optional Derive subcatch map for gauges, by default False derive_outlet : bool, optional Derive gaugemap based on catchment outlets, by default True basename : str, optional Map name in staticmaps (wflow_gauges_basename), if None use the gauges_fn basename. update_toml : boolean, optional Update [outputcsv] section of wflow toml file. gauge_toml_header : list, optional Save specific model parameters in csv section. This option defines the header of the csv file./ By default saves Q (for lateral.river.q_av) and P (for vertical.precipitation). gauge_toml_param: list, optional Save specific model parameters in csv section. This option defines the wflow variable corresponding to the/ names in gauge_toml_header. By default saves lateral.river.q_av (for Q) and vertical.precipitation (for P). """ # read existing staticgeoms; important to get the right basin when updating self.staticgeoms if derive_outlet: self.logger.info(f"Gauges locations set based on river outlets.") da, idxs, ids = flw.gauge_map(self.staticmaps, idxs=self.flwdir.idxs_pit) # Only keep river outlets for gauges da = da.where(self.staticmaps[self._MAPS["rivmsk"]] != 0, da.raster.nodata) ids_da = np.unique(da.values[da.values > 0]) idxs_da = idxs[np.isin(ids, ids_da)] self.set_staticmaps(da, name=self._MAPS["gauges"]) points = gpd.points_from_xy(*self.staticmaps.raster.idx_to_xy(idxs_da)) gdf = gpd.GeoDataFrame( index=ids_da.astype(np.int32), geometry=points, crs=self.crs ) gdf["fid"] = ids_da.astype(np.int32) self.set_staticgeoms(gdf, name="gauges") self.logger.info(f"Gauges map based on catchment river outlets added.") if gauges_fn is not None or source_gdf is not None: # append location from geometry # TODO check snapping locations based on upstream area attribute of the gauge data if gauges_fn is not None: kwargs = {} if isfile(gauges_fn): # try to get epsg number directly, important when writting back data_catalog if self.crs.is_epsg_code: code = int(self.crs["init"].lstrip("epsg:")) else: code = self.crs kwargs.update(crs=code) gdf = self.data_catalog.get_geodataframe( gauges_fn, geom=self.basins, assert_gtype="Point", **kwargs ) gdf = gdf.to_crs(self.crs) elif source_gdf is not None and basename is None: raise ValueError( "Basename is required when setting gauges based on source_gdf" ) elif source_gdf is not None: self.logger.info(f"Gauges locations read from source_gdf") gdf = source_gdf.to_crs(self.crs) if gdf.index.size == 0: self.logger.warning( f"No {gauges_fn} gauge locations found within domain" ) else: if basename is None: basename = ( os.path.basename(gauges_fn).split(".")[0].replace("_", "-") ) self.logger.info( f"{gdf.index.size} {basename} gauge locations found within domain" ) # Set index to index_col if index_col is not None and index_col in gdf: gdf = gdf.set_index(index_col) xs, ys = np.vectorize(lambda p: (p.xy[0][0], p.xy[1][0]))( gdf["geometry"] ) idxs = self.staticmaps.raster.xy_to_idx(xs, ys) ids = gdf.index.values if snap_to_river and mask is None: mask = self.staticmaps[self._MAPS["rivmsk"]].values da, idxs, ids = flw.gauge_map( self.staticmaps, idxs=idxs, ids=ids, stream=mask, flwdir=self.flwdir, logger=self.logger, ) # Filter gauges that could not be snapped to rivers if snap_to_river: ids_old = ids.copy() da = da.where( self.staticmaps[self._MAPS["rivmsk"]] != 0, da.raster.nodata ) ids_new = np.unique(da.values[da.values > 0]) idxs = idxs[np.isin(ids_old, ids_new)] ids = da.values.flat[idxs] # Add to staticmaps mapname = f'{str(self._MAPS["gauges"])}_{basename}' self.set_staticmaps(da, name=mapname) # geoms points = gpd.points_from_xy(*self.staticmaps.raster.idx_to_xy(idxs)) # if csv contains additional columns, these are also written in the staticgeoms gdf_snapped = gpd.GeoDataFrame( index=ids.astype(np.int32), geometry=points, crs=self.crs ) # Set the index name of gdf snapped based on original gdf if gdf.index.name is not None: gdf_snapped.index.name = gdf.index.name else: gdf_snapped.index.name = "fid" gdf.index.name = "fid" # Add gdf attributes to gdf_snapped (filter on snapped index before merging) df_attrs = pd.DataFrame(gdf.drop(columns="geometry")) df_attrs = df_attrs[np.isin(df_attrs.index, gdf_snapped.index)] gdf_snapped = gdf_snapped.merge( df_attrs, how="inner", on=gdf.index.name ) # Add gdf_snapped to staticgeoms self.set_staticgeoms(gdf_snapped, name=mapname.replace("wflow_", "")) # # Add new outputcsv section in the config if gauge_toml_param is None and update_toml: gauge_toml_header = ["Q", "P"] gauge_toml_param = ["lateral.river.q_av", "vertical.precipitation"] if update_toml: self.set_config(f"input.gauges_{basename}", f"{mapname}") if self.get_config("csv") is not None: for o in range(len(gauge_toml_param)): gauge_toml_dict = { "header": gauge_toml_header[o], "map": f"gauges_{basename}", "parameter": gauge_toml_param[o], } # If the gauge outcsv column already exists skip writting twice if gauge_toml_dict not in self.config["csv"]["column"]: self.config["csv"]["column"].append(gauge_toml_dict) self.logger.info(f"Gauges map from {basename} added.") # add subcatch if derive_subcatch: da_basins = flw.basin_map( self.staticmaps, self.flwdir, idxs=idxs, ids=ids )[0] mapname = self._MAPS["basins"] + "_" + basename self.set_staticmaps(da_basins, name=mapname) gdf_basins = self.staticmaps[mapname].raster.vectorize() self.set_staticgeoms(gdf_basins, name=mapname.replace("wflow_", ""))
[docs] def setup_areamap( self, area_fn: str, col2raster: str, nodata: Union[int, float] = -1, ): """Setup area map from vector data to save wflow outputs for specific area. Adds model layer: * **area_fn** map: output area data map Parameters ---------- area_fn : str Name of vector data corresponding to wflow output area. col2raster : str Name of the column from the vector file to rasterize. nodata : int/float, optional Nodata value to use when rasterizing. Should match the dtype of col2raster. By default -1. """ if area_fn not in self.data_catalog: self.logger.warning(f"Invalid source '{area_fn}', skipping setup_areamap.") return self.logger.info(f"Preparing '{area_fn}' map.") gdf_org = self.data_catalog.get_geodataframe( area_fn, geom=self.basins, dst_crs=self.crs ) if gdf_org.empty: self.logger.warning( f"No shapes of {area_fn} found within region, skipping areamap." ) return else: da_area = self.staticmaps.raster.rasterize( gdf=gdf_org, col_name=col2raster, nodata=nodata, all_touched=True, ) self.set_staticmaps(da_area.rename(area_fn))
[docs] def setup_lakes(self, lakes_fn="hydro_lakes", min_area=10.0): """This component generates maps of lake areas and outlets as well as parameters with average lake area, depth a discharge values. The data is generated from features with ``min_area`` [km2] (default 1 km2) from a database with lake geometry, IDs and metadata. Data required are lake ID 'waterbody_id', average area 'Area_avg' [m2], average volume 'Vol_avg' [m3], average depth 'Depth_avg' [m] and average discharge 'Dis_avg' [m3/s]. Adds model layers: * **wflow_lakeareas** map: lake IDs [-] * **wflow_lakelocs** map: lake IDs at outlet locations [-] * **LakeArea** map: lake area [m2] * **LakeAvgLevel** map: lake average water level [m] * **LakeAvgOut** map: lake average discharge [m3/s] * **Lake_b** map: lake rating curve coefficient [-] * **lakes** geom: polygon with lakes and wflow lake parameters Parameters ---------- lakes_fn : {'hydro_lakes'} Name of data source for lake parameters, see data/data_sources.yml. * Required variables: ['waterbody_id', 'Area_avg', 'Vol_avg', 'Depth_avg', 'Dis_avg'] min_area : float, optional Minimum lake area threshold [km2], by default 1.0 km2. """ lakes_toml = { "model.lakes": True, "state.lateral.river.lake.waterlevel": "waterlevel_lake", "input.lateral.river.lake.area": "LakeArea", "input.lateral.river.lake.areas": "wflow_lakeareas", "input.lateral.river.lake.b": "Lake_b", "input.lateral.river.lake.e": "Lake_e", "input.lateral.river.lake.locs": "wflow_lakelocs", "input.lateral.river.lake.outflowfunc": "LakeOutflowFunc", "input.lateral.river.lake.storfunc": "LakeStorFunc", "input.lateral.river.lake.threshold": "LakeThreshold", "input.lateral.river.lake.linkedlakelocs": "LinkedLakeLocs", "input.lateral.river.lake.waterlevel": "LakeAvgLevel", } gdf_org, ds_lakes = self._setup_waterbodies(lakes_fn, "lake", min_area) if ds_lakes is not None: rmdict = {k: v for k, v in self._MAPS.items() if k in ds_lakes.data_vars} self.set_staticmaps(ds_lakes.rename(rmdict)) # add waterbody parameters # rename to param values gdf_org = gdf_org.rename( columns={ "Area_avg": "LakeArea", "Depth_avg": "LakeAvgLevel", "Dis_avg": "LakeAvgOut", } ) # Minimum value for LakeAvgOut LakeAvgOut = gdf_org["LakeAvgOut"].copy() gdf_org["LakeAvgOut"] = np.maximum(gdf_org["LakeAvgOut"], 0.01) gdf_org["Lake_b"] = gdf_org["LakeAvgOut"].values / ( gdf_org["LakeAvgLevel"].values ) ** (2) gdf_org["Lake_e"] = 2 gdf_org["LakeStorFunc"] = 1 gdf_org["LakeOutflowFunc"] = 3 gdf_org["LakeThreshold"] = 0.0 gdf_org["LinkedLakeLocs"] = 0 # Check if some LakeAvgOut values have been replaced if not np.all(LakeAvgOut == gdf_org["LakeAvgOut"]): self.logger.warning( "Some values of LakeAvgOut have been replaced by a minimum value of 0.01m3/s" ) lake_params = [ "waterbody_id", "LakeArea", "LakeAvgLevel", "LakeAvgOut", "Lake_b", "Lake_e", "LakeStorFunc", "LakeOutflowFunc", "LakeThreshold", "LinkedLakeLocs", ] gdf_org_points = gpd.GeoDataFrame( gdf_org[lake_params], geometry=gpd.points_from_xy(gdf_org.xout, gdf_org.yout), ) # write lakes with attr tables to static geoms. self.set_staticgeoms(gdf_org, name="lakes") for name in lake_params[1:]: da_lake = ds_lakes.raster.rasterize( gdf_org_points, col_name=name, dtype="float32", nodata=-999 ) self.set_staticmaps(da_lake) # if there are lakes, change True in toml for option in lakes_toml: self.set_config(option, lakes_toml[option])
[docs] def setup_reservoirs( self, reservoirs_fn="hydro_reservoirs", min_area=1.0, priority_jrc=True, **kwargs, ): """This component generates maps of reservoir areas and outlets as well as parameters with average reservoir area, demand, min and max target storage capacities and discharge capacity values. The data is generated from features with ``min_area`` [km2] (default is 1 km2) from a database with reservoir geometry, IDs and metadata. Data requirements for direct use (ie wflow parameters are data already present in reservoirs_fn) are reservoir ID 'waterbody_id', area 'ResSimpleArea' [m2], maximum volume 'ResMaxVolume' [m3], the targeted minimum and maximum fraction of water volume in the reservoir 'ResTargetMinFrac' and 'ResTargetMaxFrac' [-], the average water demand ResDemand [m3/s] and the maximum release of the reservoir before spilling 'ResMaxRelease' [m3/s]. In case the wflow parameters are not directly available they can be computed by HydroMT using other reservoir characteristics. If not enough characteristics are available, the hydroengine tool will be used to download additionnal timeseries from the JRC database. The required variables for computation of the parameters with hydroengine are reservoir ID 'waterbody_id', reservoir ID in the HydroLAKES database 'Hylak_id', average volume 'Vol_avg' [m3], average depth 'Depth_avg' [m], average discharge 'Dis_avg' [m3/s] and dam height 'Dam_height' [m]. To compute parameters without using hydroengine, the required varibales in reservoirs_fn are reservoir ID 'waterbody_id', average area 'Area_avg' [m2], average volume 'Vol_avg' [m3], average depth 'Depth_avg' [m], average discharge 'Dis_avg' [m3/s] and dam height 'Dam_height' [m] and minimum / normal / maximum storage capacity of the dam 'Capacity_min', 'Capacity_norm', 'Capacity_max' [m3]. Adds model layers: * **wflow_reservoirareas** map: reservoir IDs [-] * **wflow_reservoirlocs** map: reservoir IDs at outlet locations [-] * **ResSimpleArea** map: reservoir area [m2] * **ResMaxVolume** map: reservoir max volume [m3] * **ResTargetMinFrac** map: reservoir target min frac [m3/m3] * **ResTargetFullFrac** map: reservoir target full frac [m3/m3] * **ResDemand** map: reservoir demand flow [m3/s] * **ResMaxRelease** map: reservoir max release flow [m3/s] * **reservoirs** geom: polygon with reservoirs and wflow reservoir parameters Parameters ---------- reservoirs_fn : {'hydro_reservoirs'} Name of data source for reservoir parameters, see data/data_sources.yml. * Required variables for direct use: ['waterbody_id', 'ResSimpleArea', 'ResMaxVolume', 'ResTargetMinFrac', 'ResTargetFullFrac', 'ResDemand', 'ResMaxRelease'] * Required variables for computation with hydroengine: ['waterbody_id', 'Hylak_id', 'Vol_avg', 'Depth_avg', 'Dis_avg', 'Dam_height'] * Required variables for computation without hydroengine: ['waterbody_id', 'Area_avg', 'Vol_avg', 'Depth_avg', 'Dis_avg', 'Capacity_max', 'Capacity_norm', 'Capacity_min', 'Dam_height'] min_area : float, optional Minimum reservoir area threshold [km2], by default 1.0 km2. priority_jrc : boolean, optional If True, use JRC water occurrence (Pekel,2016) data from GEE to calculate and overwrite the reservoir volume/areas of the data source. """ # rename to wflow naming convention tbls = { "resarea": "ResSimpleArea", "resdemand": "ResDemand", "resfullfrac": "ResTargetFullFrac", "resminfrac": "ResTargetMinFrac", "resmaxrelease": "ResMaxRelease", "resmaxvolume": "ResMaxVolume", "resid": "expr1", } res_toml = { "model.reservoirs": True, "state.lateral.river.reservoir.volume": "volume_reservoir", "input.lateral.river.reservoir.area": "ResSimpleArea", "input.lateral.river.reservoir.areas": "wflow_reservoirareas", "input.lateral.river.reservoir.demand": "ResDemand", "input.lateral.river.reservoir.locs": "wflow_reservoirlocs", "input.lateral.river.reservoir.maxrelease": "ResMaxRelease", "input.lateral.river.reservoir.maxvolume": "ResMaxVolume", "input.lateral.river.reservoir.targetfullfrac": "ResTargetFullFrac", "input.lateral.river.reservoir.targetminfrac": "ResTargetMinFrac", } gdf_org, ds_res = self._setup_waterbodies(reservoirs_fn, "reservoir", min_area) # TODO: check if there are missing values in the above columns of the parameters tbls = # if everything is present, skip calculate reservoirattrs() and directly make the maps if ds_res is not None: rmdict = {k: v for k, v in self._MAPS.items() if k in ds_res.data_vars} self.set_staticmaps(ds_res.rename(rmdict)) # add attributes # if present use directly resattributes = [ "waterbody_id", "ResSimpleArea", "ResMaxVolume", "ResTargetMinFrac", "ResTargetFullFrac", "ResDemand", "ResMaxRelease", ] if np.all(np.isin(resattributes, gdf_org.columns)): intbl_reservoirs = gdf_org[resattributes] reservoir_accuracy = None # else compute else: intbl_reservoirs, reservoir_accuracy = workflows.reservoirattrs( gdf=gdf_org, priorityJRC=priority_jrc, usehe=kwargs.get("usehe", True), logger=self.logger, ) intbl_reservoirs = intbl_reservoirs.rename(columns=tbls) # create a geodf with id of reservoir and gemoetry at outflow location gdf_org_points = gpd.GeoDataFrame( gdf_org["waterbody_id"], geometry=gpd.points_from_xy(gdf_org.xout, gdf_org.yout), ) intbl_reservoirs = intbl_reservoirs.rename( columns={"expr1": "waterbody_id"} ) gdf_org_points = gdf_org_points.merge( intbl_reservoirs, on="waterbody_id" ) # merge # add parameter attributes to polygone gdf: gdf_org = gdf_org.merge(intbl_reservoirs, on="waterbody_id") # write reservoirs with param values to staticgeoms self.set_staticgeoms(gdf_org, name="reservoirs") for name in gdf_org_points.columns[2:]: gdf_org_points[name] = gdf_org_points[name].astype("float32") da_res = ds_res.raster.rasterize( gdf_org_points, col_name=name, dtype="float32", nodata=-999 ) self.set_staticmaps(da_res) # Save accuracy information on reservoir parameters if reservoir_accuracy is not None: reservoir_accuracy.to_csv(join(self.root, "reservoir_accuracy.csv")) for option in res_toml: self.set_config(option, res_toml[option])
def _setup_waterbodies(self, waterbodies_fn, wb_type, min_area=0.0): """Helper method with the common workflow of setup_lakes and setup_reservoir. See specific methods for more info about the arguments.""" # retrieve data for basin self.logger.info(f"Preparing {wb_type} maps.") gdf_org = self.data_catalog.get_geodataframe( waterbodies_fn, geom=self.basins, predicate="contains" ) # skip small size waterbodies if "Area_avg" in gdf_org.columns and gdf_org.geometry.size > 0: min_area_m2 = min_area * 1e6 gdf_org = gdf_org[gdf_org.Area_avg >= min_area_m2] else: self.logger.warning( f"{wb_type}'s database has no area attribute. " f"All {wb_type}s will be considered." ) # get waterbodies maps and parameters nb_wb = gdf_org.geometry.size ds_waterbody = None if nb_wb > 0: self.logger.info( f"{nb_wb} {wb_type}(s) of sufficient size found within region." ) # add waterbody maps uparea_name = self._MAPS["uparea"] if uparea_name not in self.staticmaps.data_vars: self.logger.warning( f"Upstream area map for {wb_type} outlet setup not found. " "Database coordinates used instead" ) uparea_name = None ds_waterbody, gdf_wateroutlet = workflows.waterbodymaps( gdf=gdf_org, ds_like=self.staticmaps, wb_type=wb_type, uparea_name=uparea_name, logger=self.logger, ) # update xout and yout in gdf_org from gdf_wateroutlet: if "xout" in gdf_org.columns and "yout" in gdf_org.columns: gdf_org.loc[:, "xout"] = gdf_wateroutlet["xout"] gdf_org.loc[:, "yout"] = gdf_wateroutlet["yout"] else: self.logger.warning( f"No {wb_type}s of sufficient size found within region! " f"Skipping {wb_type} procedures!" ) # rasterize points polygons in raster.rasterize -- you need staticmaps to nkow the grid return gdf_org, ds_waterbody
[docs] def setup_soilmaps(self, soil_fn="soilgrids", ptf_ksatver="brakensiek"): """ This component derives several (layered) soil parameters based on a database with physical soil properties using available point-scale (pedo)transfer functions (PTFs) from literature with upscaling rules to ensure flux matching across scales. Currently, supported ``soil_fn`` is "soilgrids" and "soilgrids_2020". ``ptf_ksatver`` (PTF for the vertical hydraulic conductivity) options are "brakensiek" and "cosby". "soilgrids" provides data at 7 specific depths, while "soilgrids_2020" provides data averaged over 6 depth intervals. This leads to small changes in the workflow: (1) M parameter uses midpoint depths in soilgrids_2020 versus specific depths in soilgrids, (2) weighted average of soil properties over soil thickness is done with the trapezoidal rule in soilgrids versus simple block weighted average in soilgrids_2020, (3) the c parameter is computed as weighted average over wflow_sbm soil layers in soilgrids_2020 versus at specific depths for soilgrids. The following maps are added to staticmaps: * **thetaS** map: average saturated soil water content [m3/m3] * **thetaR** map: average residual water content [m3/m3] * **KsatVer** map: vertical saturated hydraulic conductivity at soil surface [mm/day] * **SoilThickness** map: soil thickness [mm] * **SoilMinThickness** map: minimum soil thickness [mm] (equal to SoilThickness) * **M** map: model parameter [mm] that controls exponential decline of KsatVer with soil depth (fitted with curve_fit (scipy.optimize)), bounds of M are checked * **`M_`** map: model parameter [mm] that controls exponential decline of KsatVer with soil depth (fitted with numpy linalg regression), bounds of `M_` are checked * **M_original** map: M without checking bounds * **`M_original_`** map: `M_` without checking bounds * **f** map: scaling parameter controlling the decline of KsatVer [mm-1] (fitted with curve_fit (scipy.optimize)), bounds are checked * **`f_`** map: scaling parameter controlling the decline of KsatVer [mm-1] (fitted with numpy linalg regression), bounds are checked * **c_0** map: Brooks Corey coefficient [-] based on pore size distribution index at depth of 1st soil layer (100 mm) wflow_sbm * **c_1** map: idem c_0 at depth 2nd soil layer (400 mm) wflow_sbm * **c_2** map: idem c_0 at depth 3rd soil layer (1200 mm) wflow_sbm * **c_3** map: idem c_0 at depth 4th soil layer (> 1200 mm) wflow_sbm * **KsatVer_[z]cm** map: KsatVer [mm/day] at soil depths [z] of SoilGrids data [0.0, 5.0, 15.0, 30.0, 60.0, 100.0, 200.0] * **wflow_soil** map: soil texture based on USDA soil texture triangle (mapping: [1:Clay, 2:Silty Clay, 3:Silty Clay-Loam, 4:Sandy Clay, 5:Sandy Clay-Loam, 6:Clay-Loam, 7:Silt, 8:Silt-Loam, 9:Loam, 10:Sand, 11: Loamy Sand, 12:Sandy Loam]) Parameters ---------- soil_fn : {'soilgrids', 'soilgrids_2020'} Name of data source for soil parameter maps, see data/data_sources.yml. Should contain info for the 7 soil depths of soilgrids (or 6 depths intervals for soilgrids_2020). * Required variables: ['bd_sl*', 'clyppt_sl*', 'sltppt_sl*', 'oc_sl*', 'ph_sl*', 'sndppt_sl*', 'soilthickness', 'tax_usda'] ptf_ksatver : {'brakensiek', 'cosby'} Pedotransfer function (PTF) to use for calculation KsatVer (vertical saturated hydraulic conductivity [mm/day]). By default 'brakensiek'. """ self.logger.info(f"Preparing soil parameter maps.") # TODO add variables list with required variable names dsin = self.data_catalog.get_rasterdataset(soil_fn, geom=self.region, buffer=2) dsout = workflows.soilgrids( dsin, self.staticmaps, ptf_ksatver, soil_fn, logger=self.logger, ).reset_coords(drop=True) self.set_staticmaps(dsout)
[docs] def setup_glaciers(self, glaciers_fn="rgi", min_area=1): """ This component generates maps of glacier areas, area fraction and volume fraction, as well as tables with temperature threshold, melting factor and snow-to-ice convertion fraction. The data is generated from features with ``min_area`` [km2] (default is 1 km2) from a database with glacier geometry, IDs and metadata. The required variables from glaciers_fn dataset are glacier ID 'simple_id'. Optionnally glacier area 'AREA' [km2] can be present to filter the glaciers by size. If not present it will be computed on the fly. Adds model layers: * **wflow_glacierareas** map: glacier IDs [-] * **wflow_glacierfrac** map: area fraction of glacier per cell [-] * **wflow_glacierstore** map: storage (volume) of glacier per cell [mm] * **G_TT** map: temperature threshold for glacier melt/buildup [°C] * **G_Cfmax** map: glacier melting factor [mm/°C*day] * **G_SIfrac** map: fraction of snowpack on top of glacier converted to ice, added to glacierstore [-] Parameters ---------- glaciers_fn : {'rgi'} Name of data source for glaciers, see data/data_sources.yml. * Required variables: ['simple_id'] min_area : float, optional Minimum glacier area threshold [km2], by default 0 (all included) """ glac_toml = { "model.glacier": True, "state.vertical.glacierstore": "glacierstore", "input.vertical.glacierstore": "wflow_glacierstore", "input.vertical.glacierfrac": "wflow_glacierfrac", "input.vertical.g_cfmax": "G_Cfmax", "input.vertical.g_tt": "G_TT", "input.vertical.g_sifrac": "G_SIfrac", } # retrieve data for basin self.logger.info(f"Preparing glacier maps.") gdf_org = self.data_catalog.get_geodataframe( glaciers_fn, geom=self.basins, predicate="intersects" ) # skip small size glacier if "AREA" in gdf_org.columns and gdf_org.geometry.size > 0: gdf_org = gdf_org[gdf_org["AREA"] >= min_area] # get glacier maps and parameters nb_glac = gdf_org.geometry.size ds_glac = None if nb_glac > 0: self.logger.info( f"{nb_glac} glaciers of sufficient size found within region." ) # add glacier maps ds_glac = workflows.glaciermaps( gdf=gdf_org, ds_like=self.staticmaps, id_column="simple_id", elevtn_name=self._MAPS["elevtn"], logger=self.logger, ) rmdict = {k: v for k, v in self._MAPS.items() if k in ds_glac.data_vars} self.set_staticmaps(ds_glac.rename(rmdict)) self.set_staticgeoms(gdf_org, name="glaciers") for option in glac_toml: self.set_config(option, glac_toml[option]) else: self.logger.warning( f"No glaciers of sufficient size found within region!" f"Skipping glacier procedures!" )
[docs] def setup_constant_pars(self, dtype="float32", nodata=-999, **kwargs): """Setup constant parameter maps for all active model cells. Adds model layer: * **param_name** map: constant parameter map. Parameters ---------- dtype: str data type nodata: int or float nodata value kwargs "param_name: value" pairs for constant staticmaps. """ for key, value in kwargs.items(): nodata = np.dtype(dtype).type(nodata) da_param = xr.where( self.staticmaps[self._MAPS["basins"]], value, nodata ).astype(dtype) da_param.raster.set_nodata(nodata) da_param = da_param.rename(key) self.set_staticmaps(da_param)
[docs] def setup_precip_forcing( self, precip_fn: str = "era5", precip_clim_fn: Optional[str] = None, chunksize: Optional[int] = None, **kwargs, ) -> None: """Setup gridded precipitation forcing at model resolution. Adds model layer: * **precip**: precipitation [mm] Parameters ---------- precip_fn : str, default era5 Precipitation data source, see data/forcing_sources.yml. * Required variable: ['precip'] precip_clim_fn : str, default None High resolution climatology precipitation data source to correct precipitation, see data/forcing_sources.yml. * Required variable: ['precip'] chunksize: int, optional Chunksize on time dimension for processing data (not for saving to disk!). If None the data chunksize is used, this can however be optimized for large/small catchments. By default None. """ if precip_fn is None: return starttime = self.get_config("starttime") endtime = self.get_config("endtime") freq = pd.to_timedelta(self.get_config("timestepsecs"), unit="s") mask = self.staticmaps[self._MAPS["basins"]].values > 0 precip = self.data_catalog.get_rasterdataset( precip_fn, geom=self.region, buffer=2, time_tuple=(starttime, endtime), variables=["precip"], ) if chunksize is not None: precip = precip.chunk({"time": chunksize}) clim = None if precip_clim_fn != None: clim = self.data_catalog.get_rasterdataset( precip_clim_fn, geom=precip.raster.box, buffer=2, variables=["precip"], ) precip_out = hydromt.workflows.forcing.precip( precip=precip, da_like=self.staticmaps[self._MAPS["elevtn"]], clim=clim, freq=freq, resample_kwargs=dict(label="right", closed="right"), logger=self.logger, **kwargs, ) # Update meta attributes (used for default output filename later) precip_out.attrs.update({"precip_fn": precip_fn}) if precip_clim_fn is not None: precip_out.attrs.update({"precip_clim_fn": precip_clim_fn}) self.set_forcing(precip_out.where(mask), name="precip")
[docs] def setup_temp_pet_forcing( self, temp_pet_fn: str = "era5", pet_method: str = "debruin", press_correction: bool = True, temp_correction: bool = True, dem_forcing_fn: str = "era5_orography", skip_pet: str = False, chunksize: Optional[int] = None, **kwargs, ) -> None: """Setup gridded reference evapotranspiration forcing at model resolution. Adds model layer: * **pet**: reference evapotranspiration [mm] * **temp**: temperature [°C] Parameters ---------- temp_pet_fn : str, optional Name or path of data source with variables to calculate temperature and reference evapotranspiration, see data/forcing_sources.yml, by default 'era5'. * Required variable for temperature: ['temp'] * Required variables for De Bruin reference evapotranspiration: ['temp', 'press_msl', 'kin', 'kout'] * Required variables for Makkink reference evapotranspiration: ['temp', 'press_msl', 'kin'] pet_method : {'debruin', 'makkink'}, optional Reference evapotranspiration method, by default 'debruin'. press_correction, temp_correction : bool, optional If True pressure, temperature are corrected using elevation lapse rate, by default False. dem_forcing_fn : str, default None Elevation data source with coverage of entire meteorological forcing domain. If temp_correction is True and dem_forcing_fn is provided this is used in combination with elevation at model resolution to correct the temperature. skip_pet : bool, optional If True calculate temp only. chunksize: int, optional Chunksize on time dimension for processing data (not for saving to disk!). If None the data chunksize is used, this can however be optimized for large/small catchments. By default None. """ if temp_pet_fn is None: return starttime = self.get_config("starttime") endtime = self.get_config("endtime") timestep = self.get_config("timestepsecs") freq = pd.to_timedelta(timestep, unit="s") mask = self.staticmaps[self._MAPS["basins"]].values > 0 variables = ["temp"] if not skip_pet: if pet_method == "debruin": variables += ["press_msl", "kin", "kout"] elif pet_method == "makkink": variables += ["press_msl", "kin"] else: methods = ["debruin", "makking"] ValueError(f"Unknown pet method {pet_method}, select from {methods}") ds = self.data_catalog.get_rasterdataset( temp_pet_fn, geom=self.region, buffer=1, time_tuple=(starttime, endtime), variables=variables, single_var_as_array=False, # always return dataset ) if chunksize is not None: ds = ds.chunk({"time": chunksize}) dem_forcing = None if dem_forcing_fn != None: dem_forcing = self.data_catalog.get_rasterdataset( dem_forcing_fn, geom=ds.raster.box, # clip dem with forcing bbox for full coverage buffer=2, variables=["elevtn"], ).squeeze() temp_in = hydromt.workflows.forcing.temp( ds["temp"], dem_model=self.staticmaps[self._MAPS["elevtn"]], dem_forcing=dem_forcing, lapse_correction=temp_correction, logger=self.logger, freq=None, # resample time after pet workflow **kwargs, ) if not skip_pet: pet_out = hydromt.workflows.forcing.pet( ds[variables[1:]], dem_model=self.staticmaps[self._MAPS["elevtn"]], temp=temp_in, method=pet_method, press_correction=press_correction, freq=freq, resample_kwargs=dict(label="right", closed="right"), logger=self.logger, **kwargs, ) # Update meta attributes with setup opt opt_attr = { "pet_fn": temp_pet_fn, "pet_method": pet_method, } pet_out.attrs.update(opt_attr) self.set_forcing(pet_out.where(mask), name="pet") # resample temp after pet workflow temp_out = hydromt.workflows.forcing.resample_time( temp_in, freq, upsampling="bfill", # we assume right labeled original data downsampling="mean", label="right", closed="right", conserve_mass=False, logger=self.logger, ) # Update meta attributes with setup opt (used for default naming later) opt_attr = { "temp_fn": temp_pet_fn, "temp_correction": str(temp_correction), } temp_out.attrs.update(opt_attr) self.set_forcing(temp_out.where(mask), name="temp")
# I/O
[docs] def read(self): """Method to read the complete model schematization and configuration from file.""" self.read_config() self.read_staticmaps() self.read_intbl() self.read_staticgeoms() self.read_forcing() self.logger.info("Model read")
[docs] def write(self): """Method to write the complete model schematization and configuration to file.""" self.logger.info(f"Write model data to {self.root}") # if in r, r+ mode, only write updated components if not self._write: self.logger.warning("Cannot write in read-only mode") return self.write_data_catalog() if self.config: # try to read default if not yet set self.write_config() if self._staticmaps: self.write_staticmaps() if self._staticgeoms: self.write_staticgeoms() if self._forcing: self.write_forcing()
[docs] def read_staticmaps(self, **kwargs): """Read staticmaps""" fn_default = join(self.root, "staticmaps.nc") fn = self.get_config("input.path_static", abs_path=True, fallback=fn_default) if not self._write: # start fresh in read-only mode self._staticmaps = xr.Dataset() if fn is not None and isfile(fn): self.logger.info(f"Read staticmaps from {fn}") # FIXME: we need a smarter (lazy) solution for big models which also # works when overwriting / appending data in the same source! ds = xr.open_dataset( fn, mask_and_scale=False, decode_coords="all", **kwargs ).load() ds.close() # make sure internally maps are always North -> South oriented if ds.raster.res[1] > 0: ds = ds.raster.flipud() self.set_staticmaps(ds) elif len(glob.glob(join(self.root, "staticmaps", "*.map"))) > 0: self.read_staticmaps_pcr()
[docs] def write_staticmaps(self): """Write staticmaps""" if not self._write: raise IOError("Model opened in read-only mode") # clean-up staticmaps and write CRS according to CF-conventions # TODO replace later with hydromt.raster.gdal_compliant method after core release crs = self.staticmaps.raster.crs ds_out = self.staticmaps.reset_coords() # TODO?! # if ds_out.raster.res[1] < 0: # write data with South -> North orientation # ds_out = ds_out.raster.flipud() x_dim, y_dim, x_attrs, y_attrs = hydromt.gis_utils.axes_attrs(crs) ds_out = ds_out.rename({ds_out.raster.x_dim: x_dim, ds_out.raster.y_dim: y_dim}) ds_out[x_dim].attrs.update(x_attrs) ds_out[y_dim].attrs.update(y_attrs) ds_out = ds_out.drop_vars(["mask", "spatial_ref", "ls"], errors="ignore") ds_out.rio.write_crs(crs, inplace=True) ds_out.rio.write_transform(self.staticmaps.raster.transform, inplace=True) ds_out.raster.set_spatial_dims() # filename fn_default = join(self.root, "staticmaps.nc") fn = self.get_config("input.path_static", abs_path=True, fallback=fn_default) # Check if all sub-folders in fn exists and if not create them if not isdir(dirname(fn)): os.makedirs(dirname(fn)) self.logger.info(f"Write staticmaps to {fn}") mask = ds_out[self._MAPS["basins"]] > 0 for v in ds_out.data_vars: # nodata is required for all but boolean fields if ds_out[v].dtype != "bool": ds_out[v] = ds_out[v].where(mask, ds_out[v].raster.nodata) ds_out.to_netcdf(fn)
# self.write_staticmaps_pcr()
[docs] def read_staticmaps_pcr(self, crs=4326, **kwargs): """Read and staticmaps at <root/staticmaps> and parse to xarray""" if self._read and "chunks" not in kwargs: kwargs.update(chunks={"y": -1, "x": -1}) fn = join(self.root, "staticmaps", f"*.map") fns = glob.glob(fn) if len(fns) == 0: self.logger.warning(f"No staticmaps found at {fn}") return self._staticmaps = open_mfraster(fns, **kwargs) path = join(self.root, "staticmaps", "clim", f"LAI*") if len(glob.glob(path)) > 0: da_lai = open_mfraster( path, concat=True, concat_dim="time", logger=self.logger, **kwargs ) self.set_staticmaps(da_lai, "LAI") # reorganize c_0 etc maps da_c = [] list_c = [v for v in self._staticmaps if str(v).startswith("c_")] if len(list_c) > 0: for i, v in enumerate(list_c): da_c.append(self._staticmaps[f"c_{i:d}"]) da = xr.concat( da_c, pd.Index(np.arange(len(list_c), dtype=int), name="layer") ).transpose("layer", ...) self.set_staticmaps(da, "c") if self.crs is None: if crs is None: crs = 4326 # default to 4326 self.set_crs(crs)
[docs] def write_staticmaps_pcr(self): """Write staticmaps at <root/staticmaps> in PCRaster maps format.""" if not self._write: raise IOError("Model opened in read-only mode") ds_out = self.staticmaps if "LAI" in ds_out.data_vars: ds_out = ds_out.rename_vars({"LAI": "clim/LAI"}) if "c" in ds_out.data_vars: for layer in ds_out["layer"]: ds_out[f"c_{layer.item():d}"] = ds_out["c"].sel(layer=layer) ds_out[f"c_{layer.item():d}"].raster.set_nodata( ds_out["c"].raster.nodata ) ds_out = ds_out.drop_vars(["c", "layer"]) self.logger.info("Writing (updated) staticmap files.") # add datatypes for maps with same basenames, e.g. wflow_gauges_grdc pcr_vs_map = PCR_VS_MAP.copy() for var_name in ds_out.raster.vars: base_name = "_".join(var_name.split("_")[:-1]) # clip _<postfix> if base_name in PCR_VS_MAP: pcr_vs_map.update({var_name: PCR_VS_MAP[base_name]}) ds_out.raster.to_mapstack( root=join(self.root, "staticmaps"), mask=True, driver="PCRaster", pcr_vs_map=pcr_vs_map, logger=self.logger, )
[docs] def read_staticgeoms(self): """Read and staticgeoms at <root/staticgeoms> and parse to geopandas""" if not self._write: self._staticgeoms = dict() # fresh start in read-only mode dir_default = join(self.root, "staticmaps.nc") dir_mod = dirname( self.get_config("input.path_static", abs_path=True, fallback=dir_default) ) fns = glob.glob(join(dir_mod, "staticgeoms", "*.geojson")) if len(fns) > 1: self.logger.info("Reading model staticgeom files.") for fn in fns: name = basename(fn).split(".")[0] if name != "region": self.set_staticgeoms(gpd.read_file(fn), name=name)
[docs] def write_staticgeoms(self): """Write staticmaps at <root/staticgeoms> in model ready format""" # to write use self.staticgeoms[var].to_file() if not self._write: raise IOError("Model opened in read-only mode") if self.staticgeoms: self.logger.info("Writing model staticgeom to file.") for name, gdf in self.staticgeoms.items(): fn_out = join(self.root, "staticgeoms", f"{name}.geojson") gdf.to_file(fn_out, driver="GeoJSON")
[docs] def read_forcing(self): """Read forcing""" fn_default = join(self.root, "inmaps.nc") fn = self.get_config("input.path_forcing", abs_path=True, fallback=fn_default) if not self._write: # start fresh in read-only mode self._forcing = dict() if fn is not None and isfile(fn): self.logger.info(f"Read forcing from {fn}") ds = xr.open_dataset(fn, chunks={"time": 30}, decode_coords="all") for v in ds.data_vars: self.set_forcing(ds[v])
[docs] def write_forcing( self, fn_out=None, freq_out=None, chunksize=1, decimals=2, time_units="days since 1900-01-01T00:00:00", **kwargs, ): """write forcing at `fn_out` in model ready format. If no `fn_out` path is provided and path_forcing from the wflow toml exists, the following default filenames are used: * Default name format (with downscaling): inmaps_sourcePd_sourceTd_methodPET_freq_startyear_endyear.nc * Default name format (no downscaling): inmaps_sourceP_sourceT_methodPET_freq_startyear_endyear.nc Parameters ---------- fn_out: str, Path, optional Path to save output netcdf file; if None the name is read from the wflow toml file. freq_out: str (Offset), optional Write several files for the forcing according to fn_freq. For example 'Y' for one file per year or 'M' for one file per month. By default writes the one file. For more options, see https://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html#offset-aliases chunksize: int, optional Chunksize on time dimension when saving to disk. By default 1. decimals, int, optional Round the ouput data to the given number of decimals. time_units: str, optional Common time units when writting several netcdf forcing files. By default "days since 1900-01-01T00:00:00". """ if not self._write: raise IOError("Model opened in read-only mode") if self.forcing: self.logger.info("Write forcing file") # Get default forcing name from forcing attrs yr0 = pd.to_datetime(self.get_config("starttime")).year yr1 = pd.to_datetime(self.get_config("endtime")).year freq = self.get_config("timestepsecs") # get output filename if fn_out is not None: self.set_config("input.path_forcing", fn_out) self.write_config() # re-write config else: fn_out = self.get_config("input.path_forcing", abs_path=True) # get deafult filename if file exists if fn_out is None or isfile(fn_out): self.logger.warning( "Netcdf forcing file from input.path_forcing in the TOML " "already exists, using default name." ) sourceP = "" sourceT = "" methodPET = "" if "precip" in self.forcing: val = self.forcing["precip"].attrs.get("precip_clim_fn", None) Pdown = "d" if val is not None else "" val = self.forcing["precip"].attrs.get("precip_fn", None) if val is not None: sourceP = f"_{val}{Pdown}" if "temp" in self.forcing: val = self.forcing["temp"].attrs.get("temp_correction", "False") Tdown = "d" if val == "True" else "" val = self.forcing["temp"].attrs.get("temp_fn", None) if val is not None: sourceT = f"_{val}{Tdown}" if "pet" in self.forcing: val = self.forcing["pet"].attrs.get("pet_method", None) if val is not None: methodPET = f"_{val}" fn_default = ( f"inmaps{sourceP}{sourceT}{methodPET}_{freq}_{yr0}_{yr1}.nc" ) fn_default_path = join(self.root, fn_default) if isfile(fn_default_path): self.logger.warning( "Netcdf default forcing file already exists, skipping write_forcing. " "To overwrite netcdf forcing file: change name input.path_forcing " "in setup_config section of the build inifile." ) return else: self.set_config("input.path_forcing", fn_default) self.write_config() # re-write config fn_out = fn_default_path # Check if all dates between (starttime, endtime) are in all da forcing start = pd.to_datetime(self.get_config("starttime")) end = pd.to_datetime(self.get_config("endtime")) missings = False for da in self.forcing.values(): if "time" in da.coords: times = da.time.values if start < pd.to_datetime(times[0]): start = pd.to_datetime(times[0]) missings = True if end > pd.to_datetime(times[-1]): end = pd.to_datetime(times[-1]) missings = True # merge, process and write forcing ds = xr.merge([da.reset_coords(drop=True) for da in self.forcing.values()]) ds.raster.set_crs(self.crs) # Send warning, slice ds and update config with new start and end time if missings: self.logger.warning( f"Not all dates found in precip_fn changing starttime to {start} and endtime to {end} in the toml." ) self.set_config("starttime", start.to_pydatetime()) self.set_config("endtime", end.to_pydatetime()) self.write_config() ds = ds.sel({"time": slice(start, end)}) if decimals is not None: ds = ds.round(decimals) # clean-up forcing and write CRS according to CF-conventions ds = ds.raster.gdal_compliant(rename_dims=True, force_sn=False) ds = ds.drop_vars(["mask", "idx_out"], errors="ignore") # write with output chunksizes with single timestep and complete # spatial grid to speed up the reading from wflow.jl # dims are always ordered (time, y, x) ds.raster._check_dimensions() chunksizes = (chunksize, ds.raster.ycoords.size, ds.raster.xcoords.size) encoding = { v: {"zlib": True, "dtype": "float32", "chunksizes": chunksizes} for v in ds.data_vars.keys() } # make sure no _FillValue is written to the time dimension # For several forcing files add common units attributes to time ds["time"].attrs.pop("_FillValue", None) encoding["time"] = {"_FillValue": None} # Check if all sub-folders in fn_out exists and if not create them if not isdir(dirname(fn_out)): os.makedirs(dirname(fn_out)) forcing_list = [] if freq_out is None: # with compute=False we get a delayed object which is executed when # calling .compute where we can pass more arguments to the dask.compute method forcing_list.append([fn_out, ds]) else: self.logger.info(f"Writting several forcing with freq {freq_out}") # For several forcing files add common units attributes to time encoding["time"] = {"_FillValue": None, "units": time_units} # Updating path forcing in config fns_out = os.path.relpath(fn_out, self.root) fns_out = f"{str(fns_out)[0:-3]}_*.nc" self.set_config("input.path_forcing", fns_out) for label, ds_gr in ds.resample(time=freq_out): # ds_gr = group[1] start = ds_gr["time"].dt.strftime("%Y%m%d")[0].item() fn_out_gr = f"{str(fn_out)[0:-3]}_{start}.nc" forcing_list.append([fn_out_gr, ds_gr]) for fn_out_gr, ds_gr in forcing_list: self.logger.info(f"Process forcing; saving to {fn_out_gr}") delayed_obj = ds_gr.to_netcdf( fn_out_gr, encoding=encoding, mode="w", compute=False ) with ProgressBar(): delayed_obj.compute(**kwargs)
# TO profile uncomment lines below to replace lines above # from dask.diagnostics import Profiler, CacheProfiler, ResourceProfiler # import cachey # with Profiler() as prof, CacheProfiler(metric=cachey.nbytes) as cprof, ResourceProfiler() as rprof: # delayed_obj.compute() # visualize([prof, cprof, rprof], file_path=r'c:\Users\eilan_dk\work\profile2.html')
[docs] def read_states(self): """Read states at <root/?/> and parse to dict of xr.DataArray""" if not self._write: # start fresh in read-only mode self._states = dict()
# raise NotImplementedError()
[docs] def write_states(self): """write states at <root/?/> in model ready format""" if not self._write: raise IOError("Model opened in read-only mode")
# raise NotImplementedError()
[docs] def read_results(self): """Read results at <root/?/> and parse to dict of xr.DataArray/xr.Dataset""" if not self._write: # start fresh in read-only mode self._results = dict() # Read gridded netcdf (output section) nc_fn = self.get_config("output.path", abs_path=True) if nc_fn is not None and isfile(nc_fn): self.logger.info(f"Read results from {nc_fn}") ds = xr.open_dataset(nc_fn, chunks={"time": 30}, decode_coords="all") # TODO ? align coords names and values of results nc with staticmaps self.set_results(ds, name="output") # Read scalar netcdf (netcdf section) ncs_fn = self.get_config("netcdf.path", abs_path=True) if ncs_fn is not None and isfile(ncs_fn): self.logger.info(f"Read results from {ncs_fn}") ds = xr.open_dataset(ncs_fn, chunks={"time": 30}) self.set_results(ds, name="netcdf") # Read csv timeseries (csv section) csv_fn = self.get_config("csv.path", abs_path=True) if csv_fn is not None and isfile(csv_fn): csv_dict = utils.read_csv_results( csv_fn, config=self.config, maps=self.staticmaps ) for key in csv_dict: # Add to results self.set_results(csv_dict[f"{key}"])
def write_results(self): """write results at <root/?/> in model ready format""" if not self._write: raise IOError("Model opened in read-only mode") # raise NotImplementedError() def read_intbl(self, **kwargs): """Read and intbl files at <root/intbl> and parse to xarray""" if not self._write: self._intbl = dict() # start fresh in read-only mode if not self._read: self.logger.info("Reading default intbl files.") fns = glob.glob(join(DATADIR, "wflow", "intbl", f"*.tbl")) else: self.logger.info("Reading model intbl files.") fns = glob.glob(join(self.root, "intbl", f"*.tbl")) if len(fns) > 0: for fn in fns: name = basename(fn).split(".")[0] tbl = pd.read_csv(fn, delim_whitespace=True, header=None) tbl.columns = [ f"expr{i+1}" if i + 1 < len(tbl.columns) else "value" for i in range(len(tbl.columns)) ] # rename columns self.set_intbl(tbl, name=name) def write_intbl(self): """Write intbl at <root/intbl> in PCRaster table format.""" if not self._write: raise IOError("Model opened in read-only mode") if self.intbl: self.logger.info("Writing intbl files.") for name in self.intbl: fn_out = join(self.root, "intbl", f"{name}.tbl") self.intbl[name].to_csv(fn_out, sep=" ", index=False, header=False) def set_intbl(self, df, name): """Add intbl <pandas.DataFrame> to model.""" if not (isinstance(df, pd.DataFrame) or isinstance(df, pd.Series)): raise ValueError("df type not recognized, should be pandas.DataFrame.") if name in self._intbl: if not self._write: raise IOError(f"Cannot overwrite intbl {name} in read-only mode") elif self._read: self.logger.warning(f"Overwriting intbl: {name}") self._intbl[name] = df def _configread(self, fn): with codecs.open(fn, "r", encoding="utf-8") as f: fdict = toml.load(f) return fdict def _configwrite(self, fn): with codecs.open(fn, "w", encoding="utf-8") as f: toml.dump(self.config, f) ## WFLOW specific data and methods @property def intbl(self): """Returns a dictionary of pandas.DataFrames representing the wflow intbl files.""" if not self._intbl: self.read_intbl() return self._intbl @property def flwdir(self): """Returns the pyflwdir.FlwdirRaster object parsed from the wflow ldd.""" if self._flwdir is None: self.set_flwdir() return self._flwdir
[docs] def set_flwdir(self, ftype="infer"): """Parse pyflwdir.FlwdirRaster object parsed from the wflow ldd""" flwdir_name = flwdir_name = self._MAPS["flwdir"] self._flwdir = flw.flwdir_from_da( self.staticmaps[flwdir_name], ftype=ftype, check_ftype=True, mask=(self.staticmaps[self._MAPS["basins"]] > 0), )
@property def basins(self): """Returns a basin(s) geometry as a geopandas.GeoDataFrame.""" if "basins" in self.staticgeoms: gdf = self.staticgeoms["basins"] else: gdf = flw.basin_shape( self.staticmaps, self.flwdir, basin_name=self._MAPS["basins"] ) self.set_staticgeoms(gdf, name="basins") return gdf @property def rivers(self): """Returns a river geometry as a geopandas.GeoDataFrame. If available, the stream order and upstream area values are added to the geometry properties. """ if "rivers" in self.staticgeoms: gdf = self.staticgeoms["rivers"] elif self._MAPS["rivmsk"] in self.staticmaps: rivmsk = self.staticmaps[self._MAPS["rivmsk"]].values != 0 # Check if there are river cells in the model before continuing if np.any(rivmsk): # add stream order 'strord' column strord = self.flwdir.stream_order(mask=rivmsk) feats = self.flwdir.streams(mask=rivmsk, strord=strord) gdf = gpd.GeoDataFrame.from_features(feats) gdf.crs = pyproj.CRS.from_user_input(self.crs) self.set_staticgeoms(gdf, name="rivers") else: self.logger.warning("No river cells detected in the selected basin.") gdf = None return gdf
[docs] def clip_staticmaps( self, region, buffer=0, align=None, crs=4326, ): """Clip staticmaps to subbasin. Parameters ---------- region : dict See :meth:`models.wflow.WflowModel.setup_basemaps` buffer : int, optional Buffer around subbasin in number of pixels, by default 0 align : float, optional Align bounds of region to raster with resolution <align>, by default None crs: int, optional Default crs of the staticmaps to clip. Returns ------- xarray.DataSet Clipped staticmaps. """ basins_name = self._MAPS["basins"] flwdir_name = self._MAPS["flwdir"] kind, region = hydromt.workflows.parse_region(region, logger=self.logger) # translate basin and outlet kinds to geom geom = region.get("geom", None) bbox = region.get("bbox", None) if kind in ["basin", "outlet", "subbasin"]: # supply bbox to avoid getting basin bounds first when clipping subbasins if kind == "subbasin" and bbox is None: region.update(bbox=self.bounds) geom, _ = hydromt.workflows.get_basin_geometry( ds=self.staticmaps, logger=self.logger, kind=kind, basins_name=basins_name, flwdir_name=flwdir_name, **region, ) # clip based on subbasin args, geom or bbox if geom is not None: ds_staticmaps = self.staticmaps.raster.clip_geom( geom, align=align, buffer=buffer ) ds_staticmaps[basins_name] = ds_staticmaps[basins_name].where( ds_staticmaps["mask"], self.staticmaps[basins_name].raster.nodata ) ds_staticmaps[basins_name].attrs.update( _FillValue=self.staticmaps[basins_name].raster.nodata ) elif bbox is not None: ds_staticmaps = self.staticmaps.raster.clip_bbox( bbox, align=align, buffer=buffer ) # Update flwdir staticmaps and staticgeoms if self.crs is None and crs is not None: self.set_crs(crs) self._staticmaps = xr.Dataset() self.set_staticmaps(ds_staticmaps) # add pits at edges after clipping self._flwdir = None # make sure old flwdir object is removed self.staticmaps[self._MAPS["flwdir"]].data = self.flwdir.to_array("ldd") self._staticgeoms = dict() self.basins self.rivers # Update reservoir and lakes remove_reservoir = False if self._MAPS["resareas"] in self.staticmaps: reservoir = self.staticmaps[self._MAPS["resareas"]] if not np.any(reservoir > 0): remove_reservoir = True remove_maps = [ self._MAPS["resareas"], self._MAPS["reslocs"], "ResSimpleArea", "ResDemand", "ResTargetFullFrac", "ResTargetMinFrac", "ResMaxRelease", "ResMaxVolume", ] self._staticmaps = self.staticmaps.drop_vars(remove_maps) remove_lake = False if self._MAPS["lakeareas"] in self.staticmaps: lake = self.staticmaps[self._MAPS["lakeareas"]] if not np.any(lake > 0): remove_lake = True remove_maps = [ self._MAPS["lakeareas"], self._MAPS["lakelocs"], "LinkedLakeLocs", "LakeStorFunc", "LakeOutflowFunc", "LakeArea", "LakeAvgLevel", "LakeAvgOut", "LakeThreshold", "Lake_b", "Lake_e", ] self._staticmaps = self.staticmaps.drop_vars(remove_maps) # Update config # Remove the absolute path and if needed remove lakes and reservoirs if remove_reservoir: # change reservoirs = true to false self.set_config("model.reservoirs", False) # remove states if self.get_config("state.lateral.river.reservoir") is not None: del self.config["state"]["lateral"]["river"]["reservoir"] if remove_lake: # change lakes = true to false self.set_config("model.lakes", False) # remove states if self.get_config("state.lateral.river.lake") is not None: del self.config["state"]["lateral"]["river"]["lake"]
[docs] def clip_forcing(self, crs=4326, **kwargs): """Return clippped forcing for subbasin. Returns ------- xarray.DataSet Clipped forcing. """ if len(self.forcing) > 0: self.logger.info("Clipping NetCDF forcing..") ds_forcing = xr.merge(self.forcing.values()).raster.clip_bbox( self.staticmaps.raster.bounds ) self.set_forcing(ds_forcing)