Source code for pyflwdir.dem

# -*- coding: utf-8 -*-
"""Methods to derive topo/hydrographical paramters from elevation data, in some cases 
 in combination with flow direction data."""

import numpy as np
from numba import njit
import math
import heapq

from . import gis_utils, core, core_d8

_mv = core._mv

__all__ = ["slope", "fill_depressions"]


[docs]@njit def fill_depressions( elevtn, outlets="edge", idxs_pit=None, nodata=-9999.0, max_depth=-1.0, elv_max=None, connectivity=8, ): """Fill local depressions in elevation data and derived local D8 flow directions. Outlets are assumed to occur at the edge of valid elevation cells `outlets='edge'`; at the lowest valid edge cell to create one single outlet `outlets='min'`; or at user provided outlet cells `idxs_pit`. Depressions elsewhere are filled based on its lowest pour point elevation. If the pour point depth is larger than the maximum pour point depth `max_depth` a pit is set at the depression local minimum elevation. Based on: Wang, L., & Liu, H. (2006). https://doi.org/10.1080/13658810500433453 Parameters ---------- elevtn: 2D array elevation raster nodata: float, optional nodata value, by default -9999.0 max_depth: float, optional Maximum pour point depth. Depressions with a larger pour point depth are set as pit. A negative value (default) equals an infitely large pour point depth causing all depressions to be filled. connectivity: {4, 8} Number of neighboring cells to consider. outlets: {'edge', 'min'} Initial basin outlet(s) at the edge of all cells ('edge'; default) or only the minimum elevation edge cell ('min') elv_max, float, optional Maximum elevation for outlets, only in combination with `outlets='edge'`. By default None. idxs_pit: 1D array of int Linear indices of outlet cells. Returns ------- elevtn_out: 2D array Depression filled elevation d8: 2D array of uint8 D8 flow directions """ nrow, ncol = elevtn.shape delv = np.zeros_like(elevtn) done = np.isnan(elevtn) if np.isnan(nodata) else elevtn == nodata d8 = np.where(done, np.uint8(247), np.uint8(0)) if connectivity not in [4, 8]: raise ValueError(f'"connectivity" should either be 4 or 8') # pfff.. numba does not allow creation of numpy bool arrays using normal methods struct = np.array([bool(1) for s in range(9)]).reshape((3, 3)) if connectivity == 4: struct[0, 0], struct[-1, -1] = False, False struct[0, -1], struct[-1, 0] = False, False # initiate queue if idxs_pit is None: # with edge cells queued = gis_utils.get_edge(~done, structure=struct) if elv_max is not None: queued = np.logical_and(queued, elevtn <= elv_max) if not np.any(queued): raise ValueError("No initial outlet cells found.") else: # with user defined outlet cells queued = np.array([bool(0) for s in range(elevtn.size)]).reshape((nrow, ncol)) for idx in idxs_pit: queued.flat[idx] = True # queue contains (elevation, boundary, row, col) # boundary is included to favor non-boundary cells over boundary cells with same elevation q = [(elevtn[0, 0], np.uint8(1), np.uint32(0), np.uint32(0)) for _ in range(0)] heapq.heapify(q) for r, c in zip(*np.where(queued)): heapq.heappush(q, (elevtn[r, c], np.uint8(1), np.uint32(r), np.uint32(c))) # restrict queue to global edge mimimum (single outlet) if outlets == "min": q = [heapq.heappop(q)] queued[:, :] = False queued[q[0][-2], q[0][-1]] = True # loop over cells and neighbors with ascending cell elevation. drs, dcs = np.where(struct) drs, dcs = drs - 1, dcs - 1 while len(q) > 0: z0, _, r0, c0 = heapq.heappop(q) for dr, dc in zip(drs, dcs): r = r0 + dr c = c0 + dc if r < 0 or r == nrow or c < 0 or c == ncol or done[r, c]: continue z1 = elevtn[r, c] dz = z0 - z1 # local depression if dz > 0 if max_depth >= 0: # if positive max_depth: don't fill when dz > max_depth if dz >= max_depth: heapq.heappush(q, (z1, np.uint8(0), np.uint32(r), np.uint32(c))) queued[r, c] = True for dr, dc in zip(drs, dcs): # (re)visit neighbors done[r + dr, c + dc] = False continue elif delv[r, c] > 0: # reset cell if previously filled & revisited queued[r, c] = False delv[r, c] = 0 if dz > 0: # check if local depression (dz>0) delv[r, c] = dz z1 += dz if ~queued[r, c]: # add to queue heapq.heappush(q, (z1, np.uint8(0), np.uint32(r), np.uint32(c))) queued[r, c] = True done[r, c] = True d8[r, c] = core_d8._us[dr + 1, dc + 1] return elevtn + delv, d8
@njit def adjust_elevation(idxs_ds, seq, elevtn, mv=_mv): """Given a flow direction map, remove pits in the elevation map. Algorithm based on Yamazaki et al. (2012) .. ref: Yamazaki, D., Baugh, C. A., Bates, P. D., Kanae, S., Alsdorf, D. E. and Oki, T.: Adjustment of a spaceborne DEM for use in floodplain hydrodynamic modeling, J. Hydrol., 436-437, 81-91, doi:10.1016/j.jhydrol.2012.02.045, 2012. """ elevtn_out = elevtn.copy() mask = np.array([bool(0) for _ in range(elevtn.size)]) # True for checked cells for idx0 in seq[::-1]: # from up- to downstream starting from longest stream paths if mask[idx0] == False: # @ head water cell # get downstream indices up to earlier fixed stream path idxs0 = core._trace(idx0, idxs_ds, mv=mv, mask=mask)[0] # fix elevation elevtn1 = _adjust_elevation(elevtn_out[idxs0]) # assert np.all(np.diff(elevtn1) <= 0), elevtn_out[idxs0] elevtn_out[idxs0] = elevtn1 mask[idxs0] = True # update mask return elevtn_out @njit def _adjust_elevation(elevtn): """fix elevation on single streamline based on minimum modification elevtn oderdered from up- to downstream """ n = elevtn.size imax, imin = -1, -1 zmax, zmin = elevtn[0], elevtn[0] # local max / min elevation zi_min1, zi_min2 = zmin, zmin # initialize # all elevtn should be larger than last value elevtn = np.maximum(elevtn, elevtn[-1]) for i in range(elevtn.size): zi = elevtn[i] if zi >= zmax: zmax = zi imax = i if (zi > zi_min1 and zi_min2 >= zi_min1) or (imin >= 0 and i + 1 == n): # pit if imin >= 0: # starting from second pit or end of vector # option 1: dig -> zmod = zmin, for all values larger than zmin, after imin idxs = np.arange(imin, i, dtype=np.uint32) zmod = np.minimum(zmin, elevtn[idxs]) cost = np.sum(np.abs(elevtn[idxs] - zmod)) # option 2: fill -> zmod = zmax, for all values smaller than zmax, previous to imax idxs2 = np.arange(0, imax, dtype=np.uint32) zmod2 = np.maximum(zmax, elevtn[idxs2]) cost2 = np.sum(np.abs(elevtn[idxs2] - zmod2)) if cost2 < cost: cost, idxs, zmod = cost2, idxs2, zmod2 # option 3: dig & fill -> try all values between imin and imax i0, j0, i1, j1 = 0, 0, imax, imax zs = np.unique(elevtn[imin + 1 : i])[::-1] for z in zs[1:]: # skip zmax for j0 in range(i0, imin + 1): # start of zmod if elevtn[j0] <= z: break for j1 in range(i1, i + 1): # end of zmod if elevtn[j1] <= z: break i0, i1 = j0, j1 idxs2 = np.arange(j0, max(imax + 1, j1), dtype=np.uint32) zmod2 = np.full(idxs2.size, z, dtype=elevtn.dtype) cost2 = np.sum(np.abs(elevtn[idxs2] - zmod2)) if cost2 < cost: cost, idxs, zmod = cost2, idxs2, zmod2 # update elevation elevtn[idxs] = zmod # update zmin & zmax imax = i zmax = elevtn[imax] imin = max(0, i - 1) zmin = elevtn[imin] # update zi values if zi_min2 != zi_min1: zi_min2 = zi_min1 zi_min1 = zi return elevtn
[docs]@njit def slope(elevtn, nodata=-9999.0, latlon=False, transform=gis_utils.IDENTITY): """Returns the local gradient Parameters ---------- elevnt : 1D array of float elevation raster nodata : float, optional nodata value, by default -9999.0 latlon : bool, optional True if WGS84 coordinates, by default False transform : affine transform Two dimensional transform for 2D linear mapping, by default gis_utils.IDENTITY Returns ------- 1D array of float slope """ xres, yres, north = transform[0], transform[4], transform[5] slope = np.zeros(elevtn.shape, dtype=np.float32) nrow, ncol = elevtn.shape elev = np.zeros((3, 3), dtype=elevtn.dtype) for r in range(0, nrow): for c in range(0, ncol): if elevtn[r, c] != nodata: # start with matrix based on central value (inside loop) elev[:, :] = elevtn[r, c] for dr in range(-1, 2): row = r + dr i = dr + 1 if row >= 0 and row < nrow: for dc in range(-1, 2): col = c + dc j = dc + 1 if col >= 0 and col < ncol: # fill matrix with elevation, except when nodata if elevtn[row, col] != nodata: elev[i, j] = elevtn[row, col] dzdx = ( (elev[0, 0] + 2 * elev[1, 0] + elev[2, 0]) - (elev[0, 2] + 2 * elev[1, 2] + elev[2, 2]) ) / (8 * abs(xres)) dzdy = ( (elev[0, 0] + 2 * elev[0, 1] + elev[0, 2]) - (elev[2, 0] + 2 * elev[2, 1] + elev[2, 2]) ) / (8 * abs(yres)) if latlon: lat = north + (r + 0.5) * yres deg_y = gis_utils.degree_metres_y(lat) deg_x = gis_utils.degree_metres_x(lat) slp = math.hypot(dzdx / deg_x, dzdy / deg_y) else: slp = math.hypot(dzdx, dzdy) else: slp = nodata slope[r, c] = slp return slope
def height_above_nearest_drain(idxs_ds, seq, drain, elevtn): """Returns the height above the nearest drain (HAND), i.e.: the relative vertical distance (drop) to the nearest dowstream river based on drainage‐normalized topography and flowpaths. Nobre A D et al. (2016) HAND contour: a new proxy predictor of inundation extent Hydrol. Process. 30 320–33 Parameters ---------- idxs_ds : 1D-array of intp index of next downstream cell seq : 1D array of int ordered cell indices from down- to upstream drain : 1D array of bool flattened drainage mask elevnt : 1D array of float flattened elevation raster Returns ------- 1D array of float height above nearest drain """ hand = np.full(drain.size, -9999.0, dtype=np.float64) hand[seq] = 0.0 for idx0 in seq: if drain[idx0] != 1: idx_ds = idxs_ds[idx0] dz = elevtn[idx0] - elevtn[idx_ds] hand[idx0] = hand[idx_ds] + dz return hand def floodplains(idxs_ds, seq, elevtn, uparea, upa_min=1000.0, b=0.3): """Returns floodplain boundaries based on a maximum treshold (h) of HAND which is scaled with upstream area following h ~ A**b. Nardi F et al (2019) GFPLAIN250m, a global high-resolution dataset of Earth’s floodplains Sci. Data 6 180309 Parameters ---------- idxs_ds : 1D-array of intp index of next downstream cell seq : 1D array of int ordered cell indices from down- to upstream elevnt : 1D array of float flattened elevation raster [m] uparea : 1D array of float flattened upstream area raster [km2] upa_min : float, optional minimum upstream area threshold for streams. b : float scale parameter Returns ------- 1D array of int8 floodplain """ drainh = np.full(uparea.size, -9999.0, dtype=np.float32) drainz = np.full(uparea.size, -9999.0, dtype=np.float32) fldpln = np.full(uparea.size, -1, dtype=np.int8) fldpln[seq] = 0 for idx0 in seq: # down- to upstream if uparea[idx0] >= upa_min: drainh[idx0] = uparea[idx0] ** b drainz[idx0] = elevtn[idx0] fldpln[idx0] = 1 else: idx_ds = idxs_ds[idx0] if fldpln[idx_ds] == 1: z0 = drainz[idx_ds] h0 = drainh[idx_ds] dh = elevtn[idx0] - z0 if dh <= h0: fldpln[idx0] = 1 drainz[idx0] = z0 drainh[idx0] = h0 return fldpln @njit def _local_d4(idx0, idx_ds, ncol): """Return indices of d4 neighbors in diagonal d8 direction, e.g.: indices of N, W neigbors if flowdir is NW.""" idxs_d4 = [ idx0 - ncol, idx0 - 1, idx0 + ncol, idx0 + 1, idx0 - ncol, ] # n, w, s, e, n if idx_ds != idx0: idxs_diag = [ idx0 - ncol - 1, idx0 + ncol - 1, idx0 + ncol + 1, idx0 - ncol + 1, ] # nw, sw, se, ne di = idxs_diag.index(idx_ds) return np.asarray(idxs_d4[di : di + 2]) else: return np.asarray(idxs_d4[1:]) @njit def dig_4connectivity( idxs_ds, seq, elv_flat, shape, mask=None, nodata=-9999, dz_min=1e-3 ): """Make sure that for every diagonal D8 downstream flow direction there is an adjacent D4 cell with same or lower elevation""" elv_out = elv_flat.copy() nrow, ncol = shape for idx0 in seq[::-1]: # up- to downstream if mask is not None and not mask[idx0]: continue idx_ds = idxs_ds[idx0] dd = abs(idx0 - idx_ds) if dd > 1 and dd != ncol: # diagonal idxs_d4 = _local_d4(idx0, idx_ds, ncol) # indices of adjacent d4 cells z0 = elv_out[idx0] # elevtn of current cell zs = elv_out[idxs_d4] valid = zs != nodata if not np.any(valid): continue # find adjacent with smallest dz and lower elevation to <= z0 idx_d4_min = idxs_d4[valid][np.argmin(zs[valid] - z0)] # force small change to detect d4 river elv_out[idx_d4_min] = min(elv_out[idx_d4_min] - dz_min, z0) if idxs_ds[idx_ds] == idx_ds: # next pit because we need to know upstream cell r = idx_ds // ncol c = idx_ds % ncol if r == 0 or r == nrow - 1 or c == 0 or c == ncol - 1: # edge continue idxs_d4 = _local_d4(idx_ds, idx_ds, ncol) if np.any(elv_out[idxs_d4] == nodata): # D4 link with nodata continue idxs_d4 = np.asarray([idx for idx in idxs_d4 if idx != idx0]) elv_out[idxs_d4] = np.minimum(elv_out[idx_ds], elv_out[idxs_d4]) return elv_out