Source code for hydromt_delwaq.workflows.segments

# -*- coding: utf-8 -*-

import numpy as np
import xarray as xr
import logging

from hydromt import flw


logger = logging.getLogger(__name__)


__all__ = ["hydromaps", "pointer"]


[docs]def hydromaps( hydromodel, logger=logger, ): """Returns base information maps from hydromodel. The following basemaps are extracted:\ - flwdir - basins - basmsk Parameters ---------- hydromodel : hydromt.model HydroMT Model class containing the hydromodel to build DelwaqModel from. Returns ------- ds_out : xarray.Dataset Dataset containing gridded emission map at model resolution. """ ds_out = ( hydromodel.staticmaps[hydromodel._MAPS["flwdir"]].rename("flwdir").to_dataset() ) ds_out["basins"] = hydromodel.staticmaps[hydromodel._MAPS["basins"]] ds_out["rivlen"] = hydromodel.staticmaps[hydromodel._MAPS["rivlen"]] ds_out["rivwth"] = hydromodel.staticmaps[hydromodel._MAPS["rivwth"]] basins_mv = ds_out["basins"].raster.nodata ds_out["basmsk"] = xr.Variable( dims=ds_out.raster.dims, data=(ds_out["basins"] != basins_mv), attrs=dict(_FillValue=0), ) return ds_out
[docs]def pointer( ds_hydro, build_pointer=True, logger=logger, ): """Returns map with Delwaq segment ID. Parameters ---------- ds_hydro : xr.Dataset Dataset of the hydromaps, contains 'basins', 'ldd', 'modelmap'. Returns ------- da_out : xarray.DataArray DataArray containing the Delwaq segment IDs. nrofseg : int Number of segments. """ ptid_mv = ds_hydro["basins"].raster.nodata np_ptid = ds_hydro["basins"].values.flatten() ptid = np_ptid[np_ptid != ptid_mv] ptid = np.arange(1, len(ptid) + 1) nrofseg = np.amax(ptid) np_ptid[np_ptid != ptid_mv] = ptid np_ptid = np_ptid.reshape( np.size(ds_hydro["basins"], 0), np.size(ds_hydro["basins"], 1) ) da_ptid = xr.DataArray( data=np_ptid, coords=ds_hydro.raster.coords, dims=ds_hydro.raster.dims, attrs=dict(_FillValue=ptid_mv), ) nb_cell = len(ptid) if build_pointer: logger.info(f"Preparing pointer with surface runoff and inwater.") # Start with searching for the ID of the downstream cells flwdir = flw.flwdir_from_da(ds_hydro["ldd"], ftype="infer", mask=None) ptiddown = flwdir.downstream(da_ptid).astype(np.int32) # Add boundaries bd_id = [] bd_type = [] # Outlets are boundaries and ptiddown should be negative np_ldd = ds_hydro["ldd"].values nb_out = len(np_ldd[np_ldd == 5]) outid = np.arange(1, nb_out + 1) * -1 ptiddown[np_ldd == 5] = outid # Keep track of the lowest boundary id value lowerid = outid[-1] bd_id = np.append(bd_id, (outid * (-1))) bd_type = np.append(bd_type, np.repeat("Sfw2Outflow", len(outid))) # Add ptiddown to xarray da_ptiddown = xr.DataArray( data=ptiddown, coords=ds_hydro.raster.coords, dims=ds_hydro.raster.dims, attrs=dict(_FillValue=ptid_mv), ) # Start building pointer with lateral fluxes (runoff) ptid = ptid.reshape(nb_cell, 1) ptiddown = ptiddown[ptiddown != ptid_mv].reshape(nb_cell, 1) zeros = np.zeros((nb_cell, 1)) pointer = np.hstack((ptid, ptiddown, zeros, zeros)) # Add the inwater flux for mass balance conservation # The inwater boundaries all have the same ID boundid = lowerid - 1 lowerid = boundid bd_id = np.append(bd_id, ([boundid * (-1)])) bd_type = np.append(bd_type, (["Inw2Sfw"])) boundid = np.repeat(boundid, nb_cell).reshape(nb_cell, 1) pointer = np.vstack((pointer, np.hstack((boundid, ptid, zeros, zeros)))) return nrofseg, da_ptid, da_ptiddown, pointer, bd_id, bd_type else: return nrofseg, da_ptid