Source code for hydromt_wflow.workflows.river

# -*- coding: utf-8 -*-
"""
@author: haag (2019)

Derives river/stream widths to construct a riverwidth map (wflow_riverwidth.map) to be used in a wflow model.
"""

import os
from os.path import join
import json
import numpy as np
from scipy.optimize import curve_fit
import xarray as xr
import pandas as pd
import logging
import pyflwdir
from pyflwdir import FlwdirRaster

from hydromt import gis_utils, stats, flw
from hydromt_wflow import DATADIR  # global var

logger = logging.getLogger(__name__)

__all__ = [
    "river",
    "river_width",
]

RESAMPLING = {"climate": "nearest", "precip": "average"}
NODATA = {"discharge": -9999}


def power_law(x, a, b):
    return a * np.power(x, b)


[docs]def river( ds, ds_like=None, river_upa=30.0, slope_len=1e3, channel_dir="up", min_rivlen_ratio=0.1, logger=logger, **kwargs, ): """Returns river maps The output maps are:\ - rivmsk : river mask based on upstream area threshold on upstream area\ - rivlen : river length [m], minimum set to 1/4 cell res\ - rivslp : smoothed river slope [m/m]\ - rivwth_obs : river width at pixel outlet (if in ds) - rivbed : elevation of the river bed based on pixel outlet Parameters ---------- ds: xr.Dataset dataset containing "flwdir", "uparea", "elevtn" variables; and optional "rivwth" variable ds_like: xr.Dataset, optional dataset with output grid, must contain "uparea", for subgrid rivlen/slp must contain "x_out", "y_out". If None, takes ds grid for output river_upa: float minimum threshold to define river cell & pixels, by default 30 [km2] slope_len: float minimum length over which to calculate the river slope, by default 1000 [m] min_rivlen_ratio: float minimum global river length to avg. cell resolution ratio, by default 0.1 channel_dir: {"up", "down"} flow direcition in which to calculate (subgrid) river length and width Returns: ds_out: xr.Dataset Dataset with output river attributes """ # sort options dvars = ["flwdir", "uparea", "elevtn"] dvars_like = ["x_out", "y_out", "uparea"] subgrid = True for name in dvars: if name not in ds.data_vars: raise ValueError(f"Dataset variable {name} not in ds.") if ds_like is None or not np.all([v in ds_like for v in dvars_like]): subgrid = False logger.info("River length and slope are calculated at model resolution.") ds_like = ds logger.debug(f"Set river mask with upstream area threshold: {river_upa} km2.") dims = ds_like.raster.dims _mask = ds_like["uparea"] > river_upa # initial mask _mask_org = ds["uparea"].values >= river_upa # highres riv mask logger.debug("Derive river length.") flwdir = flw.flwdir_from_da(ds["flwdir"], mask=True) if subgrid == False: # get cell index of river cells idxs_out = np.arange(ds_like.raster.size).reshape(ds_like.raster.shape) idxs_out = np.where(_mask, idxs_out, flwdir._mv) else: # get subgrid outlet pixel index idxs_out = ds.raster.xy_to_idx( xs=ds_like["x_out"].values, ys=ds_like["y_out"].values, mask=_mask.values, nodata=flwdir._mv, ) # get river length based on on-between distance between two outlet pixels msk = _mask.values rivlen = flwdir.subgrid_rivlen( idxs_out=idxs_out, mask=_mask_org, direction=channel_dir, unit="m", ) # minimum river length equal 10% of cellsize xres, yres = ds_like.raster.res if ds_like.raster.crs.is_geographic: # convert degree to meters xres, yres = gis_utils.cellres(ds_like.raster.ycoords.values.mean(), xres, yres) min_len = np.mean(np.abs([xres, yres])) * min_rivlen_ratio rivlen = np.where(msk, np.maximum(rivlen, min_len), -9999) # bed level # readout elevation of bedlevel on outlet pixels bed_level = ds["elevtn"].values.flat[idxs_out] # make model resolution masked flwdir for rivers da_flw_model = ds_like["flwdir"].copy() # da_flw_model = da_flw_model.assign_coords(mask=_mask) # indices van flwdir flwdir_model = flw.flwdir_from_da(da_flw_model, mask=_mask) # hydrologically adjust bed_level_adjust = flwdir_model.dem_adjust(bed_level) # (bed_level[msk] != bed_level_adjust[msk]).sum() # mask to keep river cells bed_level_adjust = np.where(msk, bed_level_adjust, -9999) # add diff and log diff = flwdir_model.downstream(bed_level_adjust) - bed_level_adjust if np.all(diff <= 0) == False: logger.warning( "Erroneous increase in riverbed level in downstream direction found." ) # set mean length at pits when taking the downstream length if channel_dir == "down": rivlen.flat[flwdir.idxs_pit] = np.mean(rivlen[_mask]) # get river slope as derivative of elevation around outlet pixels logger.debug("Derive river slope.") rivslp = flwdir.subgrid_rivslp( idxs_out=idxs_out, elevtn=ds["elevtn"].values, mask=_mask_org, length=slope_len, ) rivslp = np.where(msk, rivslp, -9999) # create xarray dataset for river mask, length and width ds_out = xr.Dataset(coords=ds_like.raster.coords) ds_out["rivmsk"] = xr.Variable(dims, msk, attrs=dict(_FillValue=0)) attrs = dict(_FillValue=-9999, unit="m") ds_out["rivlen"] = xr.Variable(dims, rivlen, attrs=attrs) attrs = dict(_FillValue=-9999, unit="m.m-1") ds_out["rivslp"] = xr.Variable(dims, rivslp, attrs=attrs) attrs = dict(_FillValue=-9999, unit="m") ds_out["rivbed"] = xr.Variable(dims, bed_level_adjust, attrs=attrs) # add river width at outlet pixels if in source if "rivwth" in ds: rivwth = np.full(msk.shape, -9999.0, dtype=np.float32) rivwth[msk] = ds["rivwth"].values.flat[idxs_out[msk]] attrs = dict(_FillValue=-9999, unit="m") ds_out["rivwth_obs"] = xr.Variable(dims, rivwth, attrs=attrs) return ds_out, flwdir
[docs]def river_width( ds_like, flwdir, data=dict(), fit=False, fill=True, fill_outliers=True, min_wth=1, mask_names=[], predictor="discharge", rivwth_name="rivwth", obs_postfix="_obs", rivmsk_name="rivmsk", a=None, b=None, logger=logger, **kwargs, ): nopars = a is None or b is None # no manual a, b parameters fit = fit or (nopars and predictor not in ["discharge"]) # fit power-law on the fly nowth = ( f"{rivwth_name}{obs_postfix}" not in ds_like ) # no obseved with in staticmaps fill = fill and nowth == False # fill datagaps and masked areas (lakes/res) in obs if nowth and fit: raise ValueError( f'Observed rivwth "{rivwth_name}{obs_postfix}" required to fit riverwidths.' ) elif nowth: logger.warning(f'Observed rivwth "{rivwth_name}{obs_postfix}" not found.') # get predictor logger.debug(f'Deriving predictor "{predictor}" values') if predictor == "discharge": values, pars = _discharge(ds_like, flwdir=flwdir, logger=logger, **data) if fit == False: a, b = pars # TODO: check units elif predictor == "precip": values = _precip(ds_like, flwdir=flwdir, logger=logger, **data) else: if predictor not in ds_like: raise ValueError(f"required {predictor} variable missing in staticmaps.") values = ds_like[predictor].values # read river width observations if nowth == False: rivwth_org = ds_like[f"{rivwth_name}{obs_postfix}"].values nodata = ds_like[f"{rivwth_name}{obs_postfix}"].raster.nodata rivmsk = rivwth_org != nodata rivwth_org = np.where(rivmsk, rivwth_org, -9999) # mask zero and negative riverwidth and waterbodies if present mask = rivwth_org > min_wth for name in mask_names: if name in ds_like: mask[ds_like[name].values != ds_like[name].raster.nodata] = False logger.debug(f"{name} masked out in rivwth data.") else: logger.warning(f'mask variable "{name}" not found in maps.') # fit predictor wth = rivwth_org[mask] val = values[mask] if fit: logger.info(f"Fitting power-law a,b parameters based on {predictor}") a, b = _width_fit(wth, val, mask, logger=logger) wsim = power_law(val, a, b) res = np.abs(wsim - wth) outliers = np.logical_and(res > 200, (res / wsim) > 1.0) pout = np.sum(outliers) / outliers.size * 100 # validate wsim = power_law(val[~outliers], a, b) nse = stats._nse(wsim, wth[~outliers]) logger.info( f'Using "width = a*{predictor}^b"; where a={a:.2f}, b={b:.2f} ' f"(nse={nse:.3f}; outliers={pout:.2f}%)" ) # compute new riverwidth rivmsk = ds_like[rivmsk_name].values rivwth_out = np.full(ds_like.raster.shape, -9999.0, dtype=np.float32) rivwth_out[rivmsk] = np.maximum(min_wth, power_law(values[rivmsk], a, b)) # overwrite rivwth data with original data at valid points if fill: if fill_outliers: mask[mask] = ~outliers logger.info(f"masking {np.sum(outliers):.0f} outliers") rivwth_out[mask] = wth[~outliers] fills = "for missing data" if fill else "globally" logger.info(f"rivwth set {fills} based on width-{predictor} relationship.") # return xarray dataarray attrs = dict(_FillValue=-9999, unit="m") return xr.DataArray(rivwth_out, dims=ds_like.raster.dims, attrs=attrs)
def _width_fit( wth, val, mask, p0=[0.15, 0.65], logger=logger, # rhine uparea based ): outliers = np.full(np.sum(mask), False, dtype=np.bool) a, b = None, None # check if sufficient data if np.sum(mask) > 10: n = 0 # fit while removing outliers. while n < 20: if np.sum(mask[mask][~outliers]) > 10: pars = curve_fit( f=power_law, xdata=val[~outliers], ydata=wth[~outliers], p0=p0, bounds=([0.01, 0.01], [100.0, 1.0]), )[0] # print(n, pars, np.sum(~outliers)) if np.allclose(p0, pars): break # outliers based on absolute and relative diff wsim = power_law(val, *pars) res = np.abs(wsim - wth) outliers = np.logical_and(res > 200, (res / wsim) > 1.0) if np.sum(outliers) > np.sum(mask) * 0.2: # max 20 % outliers pars = p0 outliers[:] = False logger.warning("Fit not successful.") break p0 = pars n += 1 a, b = pars elif a is None or b is None: a, b = p0 logger.warning("Insufficient data points, using global parameters.") return a, b def _precip(ds_like, flwdir, da_precip, logger=logger): """ """ precip = ( da_precip.raster.reproject_like(ds_like, method=RESAMPLING["precip"]).values / 1e3 ) # [m/yr] precip = np.maximum(precip, 0) lat, lon = ds_like.raster.ycoords.values, ds_like.raster.xcoords.values area = gis_utils.reggrid_area(lat, lon) # 10 x average flow accu_precip = flwdir.accuflux(precip * area / (86400 * 365) * 10) # [m3/s] return accu_precip def _discharge(ds_like, flwdir, da_precip, da_climate, logger=logger): """ """ # read clim classes and regression parameters from data dir precip_fn = da_precip.name climate_fn = da_climate.name fn_regr = join(DATADIR, "rivwth", f"regr_{precip_fn}.csv") fn_clim = join(DATADIR, "rivwth", f"{climate_fn}.csv") clim_map = pd.read_csv(fn_clim, index_col="class") regr_map = pd.read_csv(fn_regr, index_col="source").loc[climate_fn] regr_map = regr_map.set_index("base_class") # get overall catchment climate classification (based on mode) # TODO: convert to xarray method by using scipy.stats.mode in xr.apply_ufuncs # TODO: reproject climate and mask cells outside basin np_climate = da_climate.values basin_climate = np.bincount(np_climate.flatten()).argmax() # convert detailed climate classification to higher level base class base_class, class_name = clim_map.loc[basin_climate] logger.debug(f"Basin base climate classification: {base_class} ({class_name})") params = regr_map.loc[base_class] # precipitation (TO-DO: add checks that were present in old version?) precip = da_precip.raster.reproject_like( ds_like, method=RESAMPLING["precip"] ).values # apply scaling factor to make sure accumulated precipitation will stay # (approximately) the same scaling_factor_1 = np.round(da_precip.raster.res[0] / ds_like.raster.res[0], 4) scaling_factor_2 = np.round(da_precip.raster.res[1] / ds_like.raster.res[1], 4) scaling_factor = (scaling_factor_1 + scaling_factor_2) / 2 precip = precip / scaling_factor ** 2 # derive cell areas (m2) lat, lon = ds_like.raster.ycoords.values, ds_like.raster.xcoords.values areagrid = gis_utils.reggrid_area(lat, lon) / 1e6 # calculate "local runoff" (note: set missings in precipitation to zero) runoff = (np.maximum(precip, 0) * params["precip"]) + (areagrid * params["area"]) runoff = np.maximum(runoff, 0) # make sure runoff is not negative # get discharges discharge = flwdir.accuflux(runoff, nodata=NODATA["discharge"], direction="up") return discharge, (params["discharge_a"], params["discharge_b"])