# -*- coding: utf-8 -*-
"""Scripts to derive (sub)basin geometries.
Based on pre-cooked basin index files, basin maps or flow direction maps.
"""
import logging
import warnings
from os.path import isdir, isfile
from pathlib import Path
import geopandas as gpd
import numpy as np
import xarray as xr
from shapely.geometry import box
from .. import _compat
from ..data_adapter import GeoDataFrameAdapter
from ..data_catalog import DataCatalog
from ..flw import basin_map, flwdir_from_da, outlet_map, stream_map
# local
from ..models import MODELS
if _compat.HAS_XUGRID:
import xugrid as xu
logger = logging.getLogger(__name__)
__all__ = ["get_basin_geometry", "parse_region"]
[docs]
def parse_region(region, logger=logger, data_catalog=None):
"""Check and return parsed region arguments.
Parameters
----------
region : dict
Dictionary describing region of interest.
For an exact clip of the region:
* {'bbox': [xmin, ymin, xmax, ymax]}
* {'geom': /path/to/polygon_geometry}
For a region based of another models grid:
* {'<model_name>': root}
For a region based of the grid of a raster file:
* {'grid': /path/to/raster}
For a region based on a mesh grid of a mesh file:
* {'mesh': /path/to/mesh}
Entire basin can be defined based on an ID, one or multiple point location
(x, y), or a region of interest (bounding box or geometry) for which the
basin IDs are looked up. The basins withint the area of interest can be further
filtered to only include basins with their outlet within the area of interest
('outlets': true) of stream threshold arguments (e.g.: 'uparea': 1000).
Common use-cases include:
* {'basin': ID}
* {'basin': [ID1, ID2, ..]}
* {'basin': [x, y]}
* {'basin': [[x1, x2, ..], [y1, y2, ..]]}
* {'basin': /path/to/point_geometry}
* {'basin': [xmin, ymin, xmax, ymax]}
* {'basin': [xmin, ymin, xmax, ymax], 'outlets': true}
* {'basin': [xmin, ymin, xmax, ymax], '<variable>': threshold}
Subbasins are defined by its outlet locations and include all area upstream
from these points. The outlet locations can be passed as xy coordinate pairs,
but also derived from the most downstream cell(s) within a area of interest
defined by a bounding box or geometry, optionally refined by stream threshold
arguments.
The method can be speed up by providing an additional ``bounds`` argument which
should contain all upstream cell. If cells upstream of the subbasin are not
within the provide bounds a warning will be raised. Common use-cases include:
* {'subbasin': [x, y], '<variable>': threshold}
* {
'subbasin': [[x1, x2, ..], [y1, y2, ..]],
'<variable>': threshold, 'bounds': [xmin, ymin, xmax, ymax]
}
* {'subbasin': /path/to/point_geometry, '<variable>': threshold}
* {'subbasin': [xmin, ymin, xmax, ymax], '<variable>': threshold}
* {'subbasin': /path/to/polygon_geometry, '<variable>': threshold}
Interbasins are similar to subbasins but are bounded by a bounding box or
geometry and do not include all upstream area. Common use-cases include:
* {'interbasin': [xmin, ymin, xmax, ymax], '<variable>': threshold}
* {'interbasin': [xmin, ymin, xmax, ymax], 'xy': [x, y]}
* {'interbasin': /path/to/polygon_geometry, 'outlets': true}
logger:
The logger to use.
Returns
-------
kind : {'basin', 'subbasin', 'interbasin', 'geom', 'bbox', 'grid'}
region kind
kwargs : dict
parsed region json
"""
if data_catalog is None:
data_catalog = DataCatalog()
kwargs = region.copy()
# NOTE: the order is important to prioritize the arguments
options = {
"basin": ["basid", "geom", "bbox", "xy"],
"subbasin": ["geom", "bbox", "xy"],
"interbasin": ["geom", "bbox", "xy"], # FIXME remove interbasin & xy combi?
"outlet": ["geom", "bbox"], # deprecated!
"geom": ["geom"],
"bbox": ["bbox"],
"grid": ["RasterDataArray"],
"mesh": ["UgridDataArray"],
}
kind = next(iter(kwargs)) # first key of region
value0 = kwargs.pop(kind)
if kind in MODELS:
model_class = MODELS.load(kind)
kwargs = dict(mod=model_class(root=value0, mode="r", logger=logger))
kind = "model"
elif kind == "grid":
kwargs = {"grid": data_catalog.get_rasterdataset(value0, driver_kwargs=kwargs)}
elif kind == "mesh":
if _compat.HAS_XUGRID:
if isinstance(value0, (str, Path)) and isfile(value0):
kwarg = dict(mesh=xu.open_dataset(value0))
elif isinstance(value0, (xu.UgridDataset, xu.UgridDataArray)):
kwarg = dict(mesh=value0)
elif isinstance(value0, (xu.Ugrid1d, xu.Ugrid2d)):
kwarg = dict(
mesh=xu.UgridDataset(value0.to_dataset(optional_attributes=True))
)
else:
raise ValueError(
f"Unrecognised type {type(value0)}."
"Should be a path, data catalog key or xugrid object."
)
kwargs.update(kwarg)
else:
raise ImportError("xugrid is required to read mesh files.")
elif kind not in options:
k_lst = '", "'.join(list(options.keys()) + list(MODELS))
raise ValueError(f'Region key "{kind}" not understood, select from "{k_lst}"')
else:
kwarg = _parse_region_value(value0, data_catalog=data_catalog)
if len(kwarg) == 0 or next(iter(kwarg)) not in options[kind]:
v_lst = '", "'.join(list(options[kind]))
raise ValueError(
f'Region value "{value0}" for kind={kind} not understood, '
f'provide one of "{v_lst}"'
)
kwargs.update(kwarg)
kwargs_str = dict()
for k, v in kwargs.items():
if isinstance(v, gpd.GeoDataFrame):
v = f"GeoDataFrame {v.total_bounds} (crs = {v.crs})"
elif isinstance(v, xr.DataArray):
v = f"DataArray {v.raster.bounds} (crs = {v.raster.crs})"
kwargs_str.update({k: v})
logger.debug(f"Parsed region (kind={kind}): {str(kwargs_str)}")
return kind, kwargs
def _parse_region_value(value, data_catalog):
kwarg = {}
if isinstance(value, np.ndarray):
value = value.tolist() # array to list
if isinstance(value, list):
if np.all([isinstance(p0, int) and abs(p0) > 180 for p0 in value]): # all int
kwarg = dict(basid=value)
elif len(value) == 4: # 4 floats
kwarg = dict(bbox=value)
elif len(value) == 2: # 2 floats
kwarg = dict(xy=value)
elif isinstance(value, tuple) and len(value) == 2: # tuple of x and y coords
kwarg = dict(xy=value)
elif isinstance(value, int): # single int
kwarg = dict(basid=value)
elif isinstance(value, (str, Path)) and isdir(value):
kwarg = dict(root=value)
elif isinstance(value, (str, Path)):
geom = data_catalog.get_geodataframe(value)
kwarg = dict(geom=geom)
elif isinstance(value, gpd.GeoDataFrame): # geometry
kwarg = dict(geom=value)
else:
raise ValueError(f"Region value {value} not understood.")
if "geom" in kwarg and np.all(kwarg["geom"].geometry.type == "Point"):
xy = (
kwarg["geom"].geometry.x.values,
kwarg["geom"].geometry.y.values,
)
kwarg = dict(xy=xy)
return kwarg
def _check_size(ds, logger=logger, threshold=12e3**2):
# warning for large domain
if (
np.multiply(*ds.raster.shape) > threshold
): # 12e3 ** 2 > 10x10 degree at 3 arcsec
logger.warning(
"Loading very large spatial domain to derive a subbasin. "
"Provide initial 'bounds' if this takes too long."
)
[docs]
def get_basin_geometry(
ds,
basin_index=None,
kind="basin",
bounds=None,
bbox=None,
geom=None,
xy=None,
basid=None,
outlets=False,
basins_name="basins",
flwdir_name="flwdir",
ftype="infer",
logger=logger,
buffer=10,
**stream_kwargs,
):
"""Return a geometry of the (sub)(inter)basin(s).
This method derives a geometry of sub-, inter- or full basin based on an input
dataset with flow-direction and optional basins ID raster data in combination
with a matching basin geometry file containing the bounding boxes of each basin.
Either ``bbox``, ``geom``, ``xy`` (or ``basid`` in case of ``kind='basin'``) must
be provided.
Parameters
----------
ds : xarray.Dataset
dataset containing basin and flow direction variables
basin_index: geopandas.GeoDataFrame or GeoDataFrameAdapter
Dataframe with basin geomtries or bounding boxes with "basid" column
corresponding to the ``ds[<basins_name>]`` map.
kind : {"basin", "subbasin", "interbasin"}
kind of basin description
bounds: array_like of float, optional
[xmin, ymin, xmax, ymax] coordinates of total bounding box, i.e. the data is
clipped to this domain before futher processing.
bbox : array_like of float, optional
[xmin, ymin, xmax, ymax] coordinates to infer (sub)(inter)basin(s)
geom : geopandas.GeoDataFrame, optional
polygon geometry describing area of interest
xy : tuple of array_like of float, optional
x, y coordinates of (sub)basin outlet locations
basid : int or array_like of int, optional
basin IDs, must match values in basin maps
outlets: bool, optional
If True, include (sub)basins of outlets within domain only.
flwdir_name : str, optional
Name of flow direction variable in source, by default "flwdir"
basins_name : str, optional
Name of flow direction variable in source, by default "basins"
ftype : {'d8', 'ldd', 'nextxy'}, optional
name of flow direction type, by default None; use input ftype.
stream_kwargs : key-word arguments
name of variable in ds and threshold value
buffer:
The buffer to apply.
logger:
The logger to use.
Returns
-------
basin_geom : geopandas.geoDataFrame
geometry the (sub)basin(s)
outlet_geom : geopandas.geoDataFrame
geometry the outlet point location
"""
kind_lst = ["basin", "subbasin", "interbasin"]
if kind == "outlet":
outlets = True
kind = "basin"
warnings.warn(
'kind="outlets" has been deprecated, use outlets=True in combination with '
'kind="basin" or kind="interbasin" instead.',
DeprecationWarning,
stacklevel=2,
)
elif kind not in kind_lst:
msg = f"Unknown kind: {kind}, select from {kind_lst}."
raise ValueError(msg)
if bool(stream_kwargs.pop("within", False)):
warnings.warn(
'"within" stream argument has been deprecated.',
DeprecationWarning,
stacklevel=2,
)
# check variables
dvars = [flwdir_name] + [v for v in stream_kwargs]
for name in dvars:
if name not in ds.data_vars:
raise ValueError(f"Dataset variable {name} not in ds.")
# for interbasins we can limit the domain based on either bbox / geom or bounds
if kind == "interbasin" and bounds is None:
if bbox is None and geom is None:
raise ValueError('"kind=interbasin" requires either "bbox" or "geom"')
bounds = bbox if bbox is not None else geom.total_bounds
# initial clip based on bounds
if bounds is not None:
ds = ds.raster.clip_bbox(bounds, buffer=buffer)
# convert bbox to geom
if geom is None and bbox is not None:
geom = gpd.GeoDataFrame(geometry=[box(*bbox)], crs=ds.raster.crs)
# check basin index
# TODO understand pfafstetter codes
gdf_bas = None
if basin_index is not None:
if isinstance(basin_index, GeoDataFrameAdapter):
kwargs = dict(variables=["basid"])
if geom is not None:
kwargs.update(geom=geom)
elif xy is not None:
xy0 = np.atleast_1d(xy[0])
xy1 = np.atleast_1d(xy[1])
kwargs.update(
bbox=[
min(xy0) - 0.1,
min(xy1) - 0.1,
max(xy0) + 0.1,
max(xy1) + 0.1,
]
)
gdf_bas = basin_index.get_data(**kwargs)
elif isinstance(basin_index, gpd.GeoDataFrame):
gdf_bas = basin_index
if "basid" not in gdf_bas.columns:
raise ValueError("Basin geometries does not have 'basid' column.")
if gdf_bas.crs != ds.raster.crs:
logger.warning("Basin geometries CRS does not match the input raster CRS.")
gdf_bas = gdf_bas.to_crs(ds.raster.crs)
## BASINS
if kind == "basin" or bounds is None:
dvars = dvars + [basins_name]
if basins_name not in ds:
if gdf_bas is not None:
gdf_bas = None
logger.warning(
"Basin geometries ignored as no corresponding"
+ " basin map is provided."
)
_check_size(ds, logger)
logger.info(f'basin map "{basins_name}" missing, calculating on the fly.')
flwdir = flwdir_from_da(ds[flwdir_name], ftype=ftype)
ds[basins_name] = xr.Variable(ds.raster.dims, flwdir.basins())
elif (
ds[basins_name].raster.nodata != 0
and ds[basins_name].raster.nodata is not None
):
ds[basins_name].where(ds[basins_name] != ds[basins_name].raster.nodata, 0)
ds[basins_name].raster.set_nodata(0)
# clip
ds_clip = ds[dvars]
if geom is not None:
ds_clip = ds[dvars].raster.clip_geom(geom, buffer=buffer, mask=True)
# get basin IDs
if xy is not None:
logger.debug("Getting basin IDs at point locations.")
sel = {
ds.raster.x_dim: xr.IndexVariable("xy", np.atleast_1d(xy[0])),
ds.raster.y_dim: xr.IndexVariable("xy", np.atleast_1d(xy[1])),
}
basid = np.unique(ds_clip[basins_name].sel(**sel, method="nearest").values)
elif basid is None:
if stream_kwargs or outlets:
if stream_kwargs:
stream = stream_map(ds_clip, **stream_kwargs)
if outlets:
outmap = outlet_map(ds_clip[flwdir_name], ftype=ftype)
if stream_kwargs:
stream = stream.where(outmap, 0)
else:
stream = outmap
ds_clip[basins_name] = ds_clip[basins_name].where(stream, 0)
logger.debug("Getting IDs of intersecting basins.")
basid = np.unique(ds_clip[basins_name].values)
basid = np.atleast_1d(basid)
basid = basid[basid > 0]
if basid.size == 0:
raise ValueError("No basins found with given criteria.")
# clip ds to total basin
if gdf_bas is not None:
gdf_match = np.isin(gdf_bas["basid"], basid)
gdf_bas = gdf_bas.loc[gdf_match]
if gdf_bas.index.size > 0:
if geom is not None:
xminbas, yminbas, xmaxbas, ymaxbas = gdf_bas.total_bounds
# Check that total_bounds is at least bigger
# than original geom bounds
xmingeom, ymingeom, xmaxgeom, ymaxgeom = geom.total_bounds
total_bounds = [
min(xminbas, xmingeom),
min(yminbas, ymingeom),
max(xmaxbas, xmaxgeom),
max(ymaxbas, ymaxgeom),
]
else:
total_bounds = gdf_bas.total_bounds
ds = ds[dvars].raster.clip_bbox(total_bounds, buffer=0)
elif np.any(gdf_match):
logger.warning("No matching basin IDs found in basin geometries.")
# get full basin mask and use this mask ds_basin incl flow direction raster
_mask = np.isin(ds[basins_name], basid)
if not np.any(_mask > 0):
raise ValueError(f"No basins found with IDs: {basid}")
ds = ds[dvars].raster.mask(_mask)
ds = ds.raster.clip_mask(ds[basins_name])
bas_mask = ds[basins_name]
# INTER- & SUBBASINS
xy_out = None
if kind in ["subbasin", "interbasin"]:
# get flow directions
_check_size(ds, logger) # warning for large domain
mask = False
# if interbasin, set flwdir mask within geometry / bounding box
if kind == "interbasin": # we have checked before that geom is not None
mask = ds.raster.geometry_mask(geom)
elif basins_name in ds: # set flwdir mask based on basin map
mask = ds[basins_name] > 0
flwdir = flwdir_from_da(ds[flwdir_name], ftype=ftype, mask=mask)
# get area of interest (aoi) mask
if geom is not None:
aoi = ds.raster.geometry_mask(geom)
else:
aoi = xr.DataArray(
coords=ds.raster.coords,
dims=ds.raster.dims,
data=np.full(ds.raster.shape, True, dtype=bool),
) # all True
# Convert xy to tuple
if xy is not None:
xy = (np.atleast_1d(xy[0]), np.atleast_1d(xy[1]))
# get stream mask. Always over entire domain
# to include cells downstream of aoi!
kwargs = dict()
if stream_kwargs:
stream = stream_map(ds, **stream_kwargs)
if not np.any(stream):
raise ValueError(f"No streams found with: {stream_kwargs}.")
if not outlets and xy is None: # get aoi outflow cells if none provided
xy = flwdir.xy(flwdir.outflow_idxs(np.logical_and(stream, aoi).values))
elif not outlets:
kwargs.update(stream=stream.values)
if outlets:
outmap = aoi.where(outlet_map(ds[flwdir_name], ftype=flwdir.ftype), False)
if stream_kwargs:
outmap = outmap.where(stream_kwargs, False)
idxs_out = np.where(outmap.values.ravel())[0]
if not np.any(outmap):
raise ValueError("No outlets found with with given criteria.")
xy = outmap.raster.idx_to_xy(idxs_out)
# get subbasin map
bas_mask, xy_out = basin_map(ds, flwdir, xy, **kwargs)
# is subbasin with bounds check if all upstream cells are included
if kind == "subbasin" and bounds is not None:
geom = gpd.GeoDataFrame(geometry=[box(*bounds)], crs=ds.raster.crs)
mask = ds.raster.geometry_mask(geom)
if np.any(np.logical_and(mask == 0, bas_mask != 0)):
logger.warning("The subbasin does not include all upstream cells.")
if not np.any(bas_mask > 0):
raise ValueError(f"No {kind} found with given criteria.")
bas_mask = bas_mask.astype(np.int32)
bas_mask.raster.set_crs(ds.raster.crs)
bas_mask.raster.set_nodata(0)
w, s, e, n = bas_mask.raster.clip_mask(bas_mask).raster.bounds
logger.info(f"{kind} bbox: [{w:.4f}, {s:.4f}, {e:.4f}, {n:.4f}]")
# vectorize basins and outlets
basin_geom = bas_mask.raster.vectorize()
outlet_geom = None
if xy_out is not None:
points = gpd.points_from_xy(*xy_out)
outlet_geom = gpd.GeoDataFrame(geometry=points, crs=ds.raster.crs)
return basin_geom, outlet_geom