"""Basemap workflows for Wflow plugin."""
import logging
from typing import Optional
import geopandas as gpd
import numpy as np
import pyflwdir
import xarray as xr
from hydromt import flw, gis_utils
logger = logging.getLogger(__name__)
__all__ = ["hydrography", "topography"]
[docs]
def hydrography(
ds: xr.Dataset,
res: float,
xy: Optional[gpd.GeoDataFrame] = None,
upscale_method: str = "ihu",
flwdir_name: str = "flwdir",
uparea_name: str = "uparea",
basins_name: str = "basins",
strord_name: str = "strord",
ftype: str = "infer",
logger=logger,
):
"""Return hydrography maps (see list below) and FlwdirRaster object.
Based on gridded flow direction and elevation data input.
The output maps are:
- flwdir : flow direction [-]
- basins : basin map [-]
- uparea : upstream area [km2]
- strord : stream order [-]
If the resolution is lower than the source resolution, the flow direction data is
upscaled and river length and slope are based on subgrid flow paths and
the following maps are added:
- subare : contributing area to each subgrid outlet pixel \
(unit catchment area) [km2]
- subelv : elevation at subgrid outlet pixel [m+REF]
Parameters
----------
ds : xarray.DataArray
Dataset containing gridded flow direction and elevation data.
res : float
output resolution
xy : geopandas.GeoDataFrame, optional
Subbasin pits. Only required when upscaling a subbasin.
upscale_method : {'ihu', 'eam', 'dmm'}
Upscaling method for flow direction data, by default 'ihu', see [1]_
ftype : {'d8', 'ldd', 'nextxy', 'nextidx', 'infer'}, optional
name of flow direction type, infer from data if 'infer', by default is 'infer'
flwdir_name, elevtn_name, uparea_name, basins_name, strord_name : str, optional
Name of flow direction [-], elevation [m], upstream area [km2], basin index [-]
and stream order [-] variables in ds
Returns
-------
ds_out : xarray.DataArray
Dataset containing gridded hydrography data
flwdir_out : pyflwdir.FlwdirRaster
Flow direction raster object.
References
----------
.. [1] Eilander et al. (2021). A hydrography upscaling method for scale-invariant \
parametrization of distributed hydrological models.
Hydrology and Earth System Sciences, 25(9), 5287–5313. https://doi.org/10.5194/hess-25-5287-2021
See Also
--------
pyflwdir.FlwdirRaster.upscale_flwdir
"""
# TODO add check if flwdir in ds, calculate if not
flwdir = None
basins = None
outidx = None
if "mask" not in ds.coords and xy is None:
ds.coords["mask"] = xr.Variable(
dims=ds.raster.dims, data=np.ones(ds.raster.shape, dtype=bool)
)
elif "mask" not in ds.coords:
# NOTE if no subbasin mask is provided calculate it here
logger.debug(f"Delineate {xy[0].size} subbasin(s).")
flwdir = flw.flwdir_from_da(ds[flwdir_name], ftype=ftype)
basins = flwdir.basins(xy=xy).astype(np.int32)
ds.coords["mask"].data = basins != 0
if not np.any(ds.coords["mask"]):
raise ValueError("Delineating subbasins not successfull.")
elif xy is not None:
# NOTE: this mask is passed on from get_basin_geometry method
logger.debug("Mask in dataset assumed to represent subbasins.")
ncells = np.sum(ds["mask"].values)
scale_ratio = int(np.round(res / ds.raster.res[0]))
if ncells < 4:
raise ValueError(
"(Sub)basin at original resolution should at least consist of two cells on "
f"each axis and the total number of cells is {ncells}. "
"Consider using a larger domain or higher spatial resolution. "
"For subbasin models, consider a (higher) threshold on for example "
"upstream area or stream order to snap the outlet."
)
elif ncells < 100 and scale_ratio > 1:
logger.warning(
f"(Sub)basin at original resolution is small and has {ncells} cells. "
"This may results in errors later when upscaling flow directions. "
"If so, consider using a larger domain or higher spatial resolution. "
"For subbasin models, consider a (higher) threshold on for example "
"upstream area or stream order to snap the outlet."
)
else:
logger.debug(f"(Sub)basin at original resolution has {ncells} cells.")
if scale_ratio > 1: # upscale flwdir
if flwdir is None:
# NOTE initialize with mask is FALSE
flwdir = flw.flwdir_from_da(ds[flwdir_name], ftype=ftype, mask=False)
if xy is not None:
logger.debug("Burn subbasin outlet in upstream area data.")
if isinstance(xy, gpd.GeoDataFrame):
assert xy.crs == ds.raster.crs
xy = xy.geometry.x, xy.geometry.y
idxs_pit = flwdir.index(*xy)
flwdir.add_pits(idxs=idxs_pit)
uparea = ds[uparea_name].values
uparea.flat[idxs_pit] = uparea.max() + 1.0
ds[uparea_name].data = uparea
logger.info(
f"Upscale flow direction data: {scale_ratio:d}x, {upscale_method} method."
)
da_flw, flwdir_out = flw.upscale_flwdir(
ds,
flwdir=flwdir,
scale_ratio=scale_ratio,
method=upscale_method,
uparea_name=uparea_name,
flwdir_name=flwdir_name,
logger=logger,
)
da_flw.raster.set_crs(ds.raster.crs)
# make sure x_out and y_out get saved
ds_out = da_flw.to_dataset().reset_coords(["x_out", "y_out"])
dims = ds_out.raster.dims
# find pits within basin mask
idxs_pit0 = flwdir_out.idxs_pit
outlon = ds_out["x_out"].values.ravel()
outlat = ds_out["y_out"].values.ravel()
sel = {
ds.raster.x_dim: xr.Variable("yx", outlon[idxs_pit0]),
ds.raster.y_dim: xr.Variable("yx", outlat[idxs_pit0]),
}
outbas_pit = ds.coords["mask"].sel(sel, method="nearest").values
# derive basins
if np.any(outbas_pit != 0):
idxs_pit = idxs_pit0[outbas_pit != 0]
basins = flwdir_out.basins(idxs=idxs_pit).astype(np.int32)
ds_out.coords["mask"] = xr.Variable(
dims=ds_out.raster.dims, data=basins != 0, attrs=dict(_FillValue=0)
)
else:
# This is a patch for basins which are clipped based on bbox or wrong geom
mask_int = ds["mask"].astype(np.int8)
mask_int.raster.set_nodata(-1) # change nodata value
ds_out.coords["mask"] = mask_int.raster.reproject_like(
da_flw, method="nearest"
).astype(bool)
basins = ds_out["mask"].values.astype(np.int32)
logger.warning(
"The basin delineation might be wrong as no original resolution outlets"
" are found in the upscaled map."
)
ds_out[basins_name] = xr.Variable(dims, basins, attrs=dict(_FillValue=0))
# calculate upstream area using subgrid ucat cell areas
outidx = np.where(
ds_out["mask"], da_flw.coords["idx_out"].values, flwdir_out._mv
)
subare = flwdir.ucat_area(outidx, unit="km2")[1]
uparea = flwdir_out.accuflux(subare)
attrs = dict(_FillValue=-9999, unit="km2")
ds_out[uparea_name] = xr.Variable(dims, uparea, attrs=attrs).astype(np.float32)
# NOTE: subgrid cella area is currently not used in wflow
ds_out["subare"] = xr.Variable(dims, subare, attrs=attrs).astype(np.float32)
if "elevtn" in ds:
subelv = ds["elevtn"].values.flat[outidx]
subelv = np.where(outidx >= 0, subelv, -9999)
attrs = dict(_FillValue=-9999, unit="m+REF")
ds_out["subelv"] = xr.Variable(dims, subelv, attrs=attrs)
# initiate masked flow dir
flwdir_out = flw.flwdir_from_da(
ds_out[flwdir_name], ftype=flwdir.ftype, mask=True
)
else:
# NO upscaling : source resolution equals target resolution
# NOTE (re-)initialize with mask is TRUE
ftype = flwdir.ftype if flwdir is not None and ftype == "infer" else ftype
flwdir = flw.flwdir_from_da(ds[flwdir_name], ftype=ftype, mask=True)
flwdir_out = flwdir
ds_out = xr.DataArray(
name=flwdir_name,
data=flwdir_out.to_array(),
coords=ds.raster.coords,
dims=ds.raster.dims,
attrs=dict(
long_name=f"{ftype} flow direction",
_FillValue=flwdir_out._core._mv,
),
).to_dataset()
dims = ds_out.raster.dims
ds_out.coords["mask"] = xr.Variable(
dims=dims, data=flwdir_out.mask.reshape(flwdir_out.shape)
)
# copy data variables from source if available
for dvar in [basins_name, uparea_name, strord_name]:
if dvar in ds.data_vars:
ds_out[dvar] = xr.where(
ds_out["mask"],
ds[dvar],
ds[dvar].dtype.type(ds[dvar].raster.nodata),
)
ds_out[dvar].attrs.update(ds[dvar].attrs)
# basins
if basins_name not in ds_out.data_vars:
if basins is None:
basins = flwdir_out.basins(idxs=flwdir_out.idxs_pit).astype(np.int32)
ds_out[basins_name] = xr.Variable(dims, basins, attrs=dict(_FillValue=0))
else:
# make sure dtype in ds_out is np.int32
ds_out[basins_name] = ds_out[basins_name].astype(np.int32)
# upstream area
if uparea_name not in ds_out.data_vars:
uparea = flwdir_out.upstream_area("km2") # km2
attrs = dict(_FillValue=-9999, unit="km2")
ds_out[uparea_name] = xr.Variable(dims, uparea, attrs=attrs).astype(
np.float32
)
# cell area
# NOTE: subgrid cella area is currently not used in wflow
ys, xs = ds.raster.ycoords.values, ds.raster.xcoords.values
subare = gis_utils.reggrid_area(ys, xs) / 1e6 # km2
attrs = dict(_FillValue=-9999, unit="km2")
ds_out["subare"] = xr.Variable(dims, subare, attrs=attrs).astype(np.float32)
# logging
npits = flwdir_out.idxs_pit.size
xy_pit = flwdir_out.xy(flwdir_out.idxs_pit[:5])
xy_pit_str = ", ".join([f"({x:.5f},{y:.5f})" for x, y in zip(*xy_pit)])
# stream order
if strord_name not in ds_out.data_vars:
logger.debug("Derive stream order.")
strord = flwdir_out.stream_order()
ds_out[strord_name] = xr.Variable(dims, strord)
ds_out[strord_name].raster.set_nodata(255)
# clip to basin extent
ds_out = ds_out.raster.clip_mask(da_mask=ds_out[basins_name])
ds_out.raster.set_crs(ds.raster.crs)
logger.debug(
f"Map shape: {ds_out.raster.shape}; active cells: {flwdir_out.ncells}."
)
logger.debug(f"Outlet coordinates ({len(xy_pit[0])}/{npits}): {xy_pit_str}.")
if np.any(np.asarray(ds_out.raster.shape) == 1):
raise ValueError(
"The output extent at model resolution should at least consist of two "
"cells on each axis. Consider using a larger domain or higher spatial "
"resolution. For subbasin models, consider a (higher) threshold to snap "
"the outlet."
)
return ds_out, flwdir_out
[docs]
def topography(
ds: xr.Dataset,
ds_like: xr.Dataset,
elevtn_name: str = "elevtn",
lndslp_name: str = "lndslp",
method: str = "average",
logger=logger,
):
"""Return topography maps (see list below) at model resolution.
Based on gridded elevation data input.
The following topography maps are calculated:
- elevtn : average elevation [m]
- lndslp : average land surface slope [m/m]
Parameters
----------
ds : xarray.DataArray
Dataset containing gridded flow direction and elevation data.
ds_like : xarray.DataArray
Dataset at model resolution.
elevtn_name, lndslp_name : str, optional
Name of elevation [m] and land surface slope [m/m] variables in ds
method: str, optional
Resample method used to reproject the input data, by default "average"
Returns
-------
ds_out : xarray.DataArray
Dataset containing gridded hydrography data
flwdir_out : pyflwdir.FlwdirRaster
Flow direction raster object.
See Also
--------
pyflwdir.dem.slope
"""
if lndslp_name not in ds.data_vars:
logger.debug(f"Slope map {lndslp_name} not found: derive from elevation map.")
crs = ds[elevtn_name].raster.crs
nodata = ds[elevtn_name].raster.nodata
ds[lndslp_name] = xr.Variable(
dims=ds.raster.dims,
data=pyflwdir.dem.slope(
elevtn=ds[elevtn_name].values,
nodata=nodata,
latlon=crs is not None and crs.to_epsg() == 4326,
transform=ds[elevtn_name].raster.transform,
),
)
ds[lndslp_name].raster.set_nodata(nodata)
# clip or reproject if non-identical grids
ds_out = ds[[elevtn_name, lndslp_name]].raster.reproject_like(ds_like, method)
ds_out[elevtn_name].attrs.update(unit="m")
ds_out[lndslp_name].attrs.update(unit="m.m-1")
return ds_out