Source code for hydromt_delwaq.workflows.segments

"""Worflows dealing with pointer and Delwaq segments."""

import logging
from typing import List

import numpy as np
import xarray as xr
from hydromt import flw

from .emissions import gridarea, gridlength_gridwidth

logger = logging.getLogger(__name__)


__all__ = [
    "hydromaps",
    "geometrymaps",
    "pointer",
]


[docs] def hydromaps( hydromodel, mask: str = "basins", ): """Return base information maps from hydromodel. The following basemaps are extracted:\ - flwdir - basins - basmsk - elevtn - rivmsk Parameters ---------- hydromodel : hydromt.model HydroMT Model class containing the hydromodel to build DelwaqModel from. mask: str, optional Name of the mask to use to mask the maps. Either 'rivers' or 'basins' (default). Returns ------- ds_out : xarray.Dataset Dataset containing gridded emission map at model resolution. """ ds_out = hydromodel.grid[hydromodel._MAPS["flwdir"]].rename("flwdir").to_dataset() ds_out["basins"] = hydromodel.grid[hydromodel._MAPS["basins"]] ds_out["river"] = hydromodel.grid[hydromodel._MAPS["rivmsk"]] ds_out["rivlen"] = hydromodel.grid[hydromodel._MAPS["rivlen"]] ds_out["rivwth"] = hydromodel.grid[hydromodel._MAPS["rivwth"]] ds_out["elevtn"] = hydromodel.grid[hydromodel._MAPS["elevtn"]] # Surface area maps ds_out["rivarea"] = ds_out["rivlen"] * ds_out["rivwth"] ds_out["rivarea"].raster.set_nodata(ds_out["rivlen"].raster.nodata) if "LakeArea" in hydromodel.grid: ds_out["lakearea"] = hydromodel.grid["LakeArea"] if "ResSimpleArea" in hydromodel.grid: ds_out["resarea"] = hydromodel.grid["ResSimpleArea"] basins_mv = ds_out["basins"].raster.nodata ds_out["basmsk"] = xr.Variable( dims=ds_out.raster.dims, data=(ds_out["basins"] != basins_mv), attrs=dict(_FillValue=False), ) river_mv = ds_out["river"].raster.nodata ds_out["rivmsk"] = xr.Variable( dims=ds_out.raster.dims, data=(ds_out["river"] != river_mv), attrs=dict(_FillValue=False), ) # Add mask if mask == "rivers": da_mask = ds_out["rivmsk"] else: da_mask = ds_out["basmsk"] ds_out = ds_out.drop_vars(["rivmsk", "basmsk"]) ds_out.coords["mask"] = da_mask ds_out["modelmap"] = da_mask.astype(np.int32) ds_out["modelmap"].raster.set_nodata(0) return ds_out
def maps_from_hydromodel( hydromodel, maps=["rivmsk", "lndslp", "strord", "N", "SoilThickness", "thetaS"], logger=logger, ): """Return maps from hydromodel. Parameters ---------- hydromodel : HydroMT Model HydroMT Model class containing the hydromodel to get geometry data from from. maps: list of str List of variables from hydromodel to extract and extend. By default ['rivmsk', 'lndslp', 'strord', 'N', 'SoilThickness', 'thetaS']. Returns ------- ds_out : xarray.Dataset Dataset containing gridded maps at model resolution. """ ds_out = xr.Dataset() for m in maps: if f"{m}_River" in hydromodel.grid: ds_out[m] = hydromodel.grid[f"{m}_River"].where( hydromodel.grid[hydromodel._MAPS["rivmsk"]], hydromodel.grid[m], ) elif m in hydromodel._MAPS: if hydromodel._MAPS[m] in hydromodel.grid: ds_out[m] = hydromodel.grid[hydromodel._MAPS[m]] elif m in hydromodel.grid: ds_out[m] = hydromodel.grid[m] else: logger.warning(f"Map {m} not found in hydromodel, skipping.") # Check if m is 3D and split into several variables if m in ds_out and len(ds_out[m].shape) == 3: # Find the name of the extra dimension dim0 = ds_out[m].raster.dim0 dim0_values = ds_out[dim0].values for i in range(ds_out[m].shape[0]): ds_out[f"{m}_{dim0}_{dim0_values[i]}"] = ds_out[m][i] ds_out = ds_out.drop_vars([m, dim0]) return ds_out
[docs] def geometrymaps( hydromodel, ): """Return geometry information maps from hydromodel. The following basemaps are extracted:\ - surface - length - width - latitude_grid - optional: bankfull_volume if bankfull depth `rivdph` is available Parameters ---------- hydromodel : HydroMT Model HydroMT Model class containing the hydromodel to get geometry data from from. Returns ------- ds_out : xarray.Dataset Dataset containing gridded geometry map at model resolution. """ ### Geometry data ### surface = gridarea(hydromodel.grid) surface.raster.set_nodata(-9999.0) surface = surface.rename("surface") length, width = gridlength_gridwidth(hydromodel.grid) rivlen = hydromodel.grid[hydromodel._MAPS["rivlen"]] rivwth = hydromodel.grid[hydromodel._MAPS["rivwth"]] rivmsk = hydromodel.grid[hydromodel._MAPS["rivmsk"]] # surface surface = surface.where(rivmsk == False, rivlen * rivwth) surface = surface.rename("surface") # Add waterbodies to surface for wb in ["LakeArea", "ResSimpleArea"]: if wb in hydromodel.grid: wb_surface = hydromodel.grid[wb] surface = surface.where(wb_surface == wb_surface.raster.nodata, wb_surface) # length length = rivlen.where(rivmsk, length) length = length.rename("length") # width width = rivwth.where(rivmsk, width) width = width.rename("width") ds_out = surface.to_dataset() ds_out["length"] = length ds_out["width"] = width # Derive latitude of each cell y_dim = ds_out.raster.y_dim x_dim = ds_out.raster.x_dim # Expend latitude dimension values to all cells of ds_out ds_out["latitude_grid"] = xr.DataArray( data=ds_out[y_dim].values.reshape(-1, 1).repeat(ds_out.dims[x_dim], axis=1), coords=ds_out.raster.coords, dims=ds_out.raster.dims, ) ds_out["latitude_grid"].raster.set_nodata(-9999.0) # Bankfull volume if hydromodel._MAPS["rivdph"] in hydromodel.grid: ds_out["bankfull_volume"] = ( rivlen * rivwth * hydromodel.grid[hydromodel._MAPS["rivdph"]] ) ds_out["bankfull_volume"].raster.set_nodata(ds_out["length"].raster.nodata) return ds_out
[docs] def pointer( ds_hydro: xr.Dataset, build_pointer: bool = False, surface_water: str = "sfw", boundaries: List[str] = ["bd"], fluxes: List[str] = ["sfw>sfw", "bd>sfw"], logger=logger, ): """Return map with Delwaq segment ID and pointer. The pointer matrix is built only if ``build_pointer`` is True. Parameters ---------- ds_hydro : xr.Dataset Dataset of the hydromaps, contains 'basins', 'ldd', 'modelmap'. build_pointer: boolean, optional Boolean to build a pointer file (delwaq) or not (demission). If True, compartments, boundaries and fluxes lists must be provided. surface_water : str, optional Name of the surface water layer. By default 'sfw'. boundaries: list of str, optional List of names of boundaries to include. By default a unique boundary called 'bd'. fluxes: list of str List of fluxes to include between surface water/boundaries. Name convention is '{surface_water_name}>{boundary_name}' for a flux from the surface water to a boundary, ex 'sfw>bd'. By default ['sfw>sfw', 'bd>sfw'] for runoff and inwater. Names in the fluxes list should match name in the hydrology_fn source in setup_hydrology_forcing. Returns ------- nrofseg : int Number of segments. da_ptid : xarray.DataArray DataArray containing the Delwaq segment IDs. da_ptiddown : xarray.DataArray DataArray containing the Delwaq downstream segment IDs. pointer: numpy.ndarray, optional Delwaq pointer array (4 columns) for exchanges. bd_id: numpy.array, optional Array with Delwaq boundary IDs. bd_type: numpy.array, optional Array ith Delwaq boundary names. """ # Prepare segments ID layer ptid based on mask of active cells ptid_mv = 0 np_ptid = ds_hydro["mask"].values.flatten().astype(np.int32) ptid = np_ptid[np_ptid != ptid_mv] ptid = np.arange(1, len(ptid) + 1) nrofseg = np.amax(ptid) np_ptid[np_ptid != ptid_mv] = ptid np_ptid = np_ptid.reshape( np.size(ds_hydro["mask"], 0), np.size(ds_hydro["mask"], 1) ) da_ptid = xr.DataArray( data=np_ptid, coords=ds_hydro.raster.coords, dims=ds_hydro.raster.dims, attrs=dict(_FillValue=ptid_mv), ) ### Downstream IDs ### # Start with searching for the ID of the downstream cells for lateral fluxes flwdir = flw.flwdir_from_da(ds_hydro["ldd"], ftype="infer", mask=None) # Boundaries bd_id = [] bd_type = [] # Keep track of the lowest boundary id value lowerid = 0 # Apply mask to ldd da_ldd = ds_hydro["ldd"].where(ds_hydro["mask"], ds_hydro["ldd"].raster.nodata) np_ldd = da_ldd.values # Number of outlets nb_out = len(np_ldd[np_ldd == 5]) ptiddown = flwdir.downstream(da_ptid).astype(np.int32) # Remask cells draining to rivers ptiddown[np_ptid == ptid_mv] = ptid_mv # Outlets are boundaries and ptiddown should be negative outid = np.arange((-lowerid) + 1, (-lowerid) + nb_out + 1) * -1 ptiddown[np_ldd == 5] = outid lowerid = outid[-1] bd_id = np.append(bd_id, (outid * (-1))) bd_type = np.append(bd_type, [f"{surface_water}>out{id}" for id in bd_id]) # Add ptiddown to xarray da_ptiddown = xr.DataArray( data=ptiddown, coords=ds_hydro.raster.coords, dims=ds_hydro.raster.dims, attrs=dict(_FillValue=ptid_mv), ) # Build pointer if build_pointer: nbound = len(boundaries) nflux = len(fluxes) logger.info(f"Preparing pointer with {nbound} boundaries and {nflux} fluxes.") ### Add fluxes ### zeros = np.zeros((nrofseg, 1)) pointer = None for flux in fluxes: flux0 = flux.split(">")[0] flux1 = flux.split(">")[-1] # Lateral flux (runoff) if flux0 == flux1: # Start building pointer with lateral fluxes (runoff) ptid = da_ptid.values ptid = ptid[ptid != ptid_mv].reshape(nrofseg, 1) ptiddown = da_ptiddown.values ptiddown = ptiddown[ptiddown != ptid_mv].reshape(nrofseg, 1) if pointer is None: pointer = np.hstack((ptid, ptiddown, zeros, zeros)) else: pointer = np.vstack( (pointer, np.hstack((ptid, ptiddown, zeros, zeros))) ) # Flux from/to boundaries elif flux0 != surface_water or flux1 != surface_water: # The boundary cells all have the same ID boundid = lowerid - 1 lowerid = boundid bd_id = np.append(bd_id, ([boundid * (-1)])) bd_type = np.append(bd_type, ([flux])) boundid = np.repeat(boundid, nrofseg).reshape(nrofseg, 1) # Flux from boundaries ptid = da_ptid.values ptid = ptid[ptid != ptid_mv].reshape(nrofseg, 1) if flux0 != surface_water: pointerbd = np.hstack((boundid, ptid, zeros, zeros)) else: pointerbd = np.hstack((ptid, boundid, zeros, zeros)) if pointer is None: pointer = pointerbd else: pointer = np.vstack((pointer, pointerbd)) else: raise ValueError( f"Flux {flux} should be of type{surface_water}>{surface_water}" f"or {surface_water}>boundary or boundary>{surface_water}." ) return nrofseg, da_ptid, da_ptiddown, pointer, bd_id, bd_type else: return nrofseg, da_ptid, da_ptiddown