"""
HydroMT-SFINCS utilities functions for reading and writing SFINCS specific input and output files,
as well as some common data conversions.
"""
import copy
import io
import logging
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Tuple, Union
import geopandas as gpd
import hydromt
import numpy as np
import pandas as pd
import rasterio
from rasterio.enums import Resampling
from rasterio.rio.overview import get_maximum_overview_level
from rasterio.windows import Window
import xarray as xr
from hydromt.io import write_xy
from pyproj.crs.crs import CRS
from shapely.geometry import LineString, Polygon
__all__ = [
"read_binary_map",
"write_binary_map",
"read_binary_map_index",
"write_binary_map_index",
"read_ascii_map",
"write_ascii_map",
"read_timeseries",
"write_timeseries",
"get_bounds_vector",
"mask2gdf",
"read_xy",
"write_xy", # defined in hydromt.io
"read_xyn",
"write_xyn",
"read_geoms",
"write_geoms",
"read_drn",
"write_drn",
"gdf2linestring",
"gdf2polygon",
"linestring2gdf",
"polygon2gdf",
"read_sfincs_map_results",
"read_sfincs_his_results",
"downscale_floodmap",
"rotated_grid",
"build_overviews",
"find_uv_indices",
]
logger = logging.getLogger(__name__)
## BINARY MAPS: sfincs.ind, sfincs.msk, sfincs.dep etc. ##
[docs]
def write_binary_map_index(fn_ind: Union[str, Path], msk: np.ndarray) -> None:
"""Write flat index of binary map file.
NOTE: The array should be in S->N and W->E orientation, with origin in the SW corner.
Parameters
----------
fn_ind: str, Path
Path to output map index file.
msk: np.ndarray
2D array of sfincs mask map, where invalid cells have value 0.
"""
# the index number file of sfincs starts with the length of the index numbers
indices = np.where(msk.transpose().flatten() > 0)[0] + 1 # convert to 1-based index
indices_ = np.array(np.hstack([np.array(len(indices)), indices]), dtype="u4")
indices_.tofile(fn_ind)
[docs]
def read_binary_map_index(fn_ind: Union[str, Path]) -> np.ndarray:
"""Read binary map index file.
Parameters
----------
fn_ind: str, Path
Path to map index file.
Returns
-------
ind: np.ndarray
1D array of flat index of binary maps.
"""
_ind = np.fromfile(fn_ind, dtype="u4")
ind = _ind[1:] - 1 # convert to zero based index
assert _ind[0] == ind.size
return ind
[docs]
def write_binary_map(
fn: Union[str, Path],
data: np.ndarray,
msk: np.ndarray,
dtype: Union[str, np.dtype] = "f4",
) -> None:
"""Write binary map file.
NOTE: The array should be in S->N and W->E orientation, with origin in the SW corner.
Parameters
----------
fn str, Path
Path to output map index file.
data: np.ndarray
2D array of sfincs map.
msk: np.ndarray
2D array of sfincs mask map, where invalid cells have value 0.
dtype: str, np.dtype, optional
Data type, by default "f4". For sfincs.msk file use dtype="u1".
"""
data_out = np.asarray(data.transpose()[msk.transpose() > 0], dtype=dtype)
data_out.tofile(fn)
[docs]
def read_binary_map(
fn: Union[str, Path],
ind: np.ndarray,
shape: Tuple[int],
mv: float = -9999.0,
dtype: str = "f4",
) -> np.ndarray:
"""Read binary map.
Parameters
----------
fn: str, Path
Path to map file.
ind: np.ndarray
1D array of flat index of binary maps.
shape: tuple of int
(nrow, ncol) shape of output map.
mv: int or float
missing value, by default -9999.0.
dtype: str, np.dtype, optional
Data type, by default "f4". For sfincs.msk file use dtype="u1".
Returns
-------
ind: np.ndarray
1D array of flat index of binary maps.
"""
assert ind.max() <= np.multiply(*shape)
nrow, ncol = shape
data = np.full((ncol, nrow), mv, dtype=dtype)
data.flat[ind] = np.fromfile(fn, dtype=dtype)
data = data.transpose()
return data
## ASCII maps: sfincs.restart ##
[docs]
def read_ascii_map(fn: Union[str, Path]) -> np.ndarray:
"""Read ascii map
Parameters
----------
fn : str, Path
Path to ascii map file.
Returns
-------
data : np.ndarray
2D array of sfincs map.
"""
data = np.loadtxt(fn).astype(np.float32)
return data
[docs]
def write_ascii_map(fn: Union[str, Path], data: np.ndarray, fmt: str = "%8.3f") -> None:
"""Write ascii map
NOTE: The array should be in S->N and W->E orientation, with origin in the SW corner.
Parameters
----------
fn : str, Path
Path to ascii map file.
data : np.ndarray
2D array of sfincs map.
fmt : str, optional
Value format, by default "%8.3f". See numpy.savetxt for more options.
"""
with open(fn, "w") as f:
np.savetxt(f, data, fmt=fmt)
## XY files: bnd / src ##
# write_xy defined in hydromt.io
[docs]
def read_xy(fn: Union[str, Path], crs: Union[int, CRS] = None) -> gpd.GeoDataFrame:
"""Read sfincs xy files and parse to GeoDataFrame.
Parameters
----------
fn : str, Path
Path to ascii xy file.
crs: int, CRS
Coordinate reference system
Returns
-------
gdf: gpd.GeoDataFrame
GeoDataFrame with point geomtries
"""
gdf = hydromt.open_vector(fn, crs=crs, driver="xy")
gdf.index = np.arange(1, gdf.index.size + 1, dtype=int) # index starts at 1
return gdf
[docs]
def read_xyn(fn: str, crs: int = None):
df = pd.read_csv(fn, index_col=False, header=None, delim_whitespace=True).rename(
columns={0: "x", 1: "y"}
)
if len(df.columns) > 2:
df = df.rename(columns={2: "name"})
else:
df["name"] = df.index
points = gpd.points_from_xy(df["x"], df["y"])
gdf = gpd.GeoDataFrame(df.drop(columns=["x", "y"]), geometry=points, crs=crs)
return gdf
[docs]
def write_xyn(fn: str = "sfincs.obs", gdf: gpd.GeoDataFrame = None, fmt: str = "%.1f"):
# strip %-sign of fmt if present
fmt = fmt.replace("%", "")
with open(fn, "w") as fid:
for point in gdf.iterfeatures():
x, y = point["geometry"]["coordinates"]
try:
name = point["properties"]["name"]
except:
name = "obs" + str(point["id"])
string = f'{x:{fmt}} {y:{fmt}} "{name}"\n'
fid.write(string)
## ASCII TIMESERIES: bzs / dis / precip ##
[docs]
def parse_datetime(dt: Union[str, datetime], format="%Y%m%d %H%M%S") -> datetime:
"""Checks and/or parses datetime from a string, default sfincs datetime string format"""
if isinstance(dt, str):
dt = datetime.strptime(dt, format)
elif not isinstance(dt, datetime):
raise ValueError(f"Unknown type for datetime: {type(dt)})")
return dt
[docs]
def read_timeseries(fn: Union[str, Path], tref: Union[str, datetime]) -> pd.DataFrame:
"""Read ascii timeseries files such as sfincs.bzs, sfincs.dis and sfincs.precip.
The first column (time index) is parsed to datetime format assumming it represents
seconds from `tref`.
Parameters
----------
fn: str, Path
Path to output timeseries file.
tref: datetime.datetime, str
Datetime of tref, string in "%Y%m%d %H%M%S" format.
Returns
-------
df: pd.DataFrame
Dataframe of timeseries with parsed time index.
"""
tref = parse_datetime(tref)
df = pd.read_csv(fn, delim_whitespace=True, index_col=0, header=None)
df.index = pd.to_datetime(df.index.values, unit="s", origin=tref)
df.columns = df.columns.values.astype(int)
df.index.name = "time"
df.columns.name = "index"
return df
[docs]
def write_timeseries(
fn: Union[str, Path],
df: Union[pd.DataFrame, pd.Series],
tref: Union[str, datetime],
fmt: str = "%7.2f",
) -> None:
"""Write pandas.DataFrame to fixed width ascii timeseries files
such as sfincs.bzs, sfincs.dis and sfincs.precip. The output time index is given in
seconds from tref.
Parameters
----------
fn: str, Path
Path to output timeseries file.
df: pd.DataFrame
Dataframe of timeseries.
tref: datetime.datetime, str
Datetime of tref, string in "%Y%m%d %H%M%S" format.
fmt: str, optional
Output value format, by default "%7.2f".
"""
if isinstance(df, pd.Series):
df = df.to_frame()
elif not isinstance(df, pd.DataFrame):
raise ValueError(f"Unknown type for df: {type(df)})")
tref = parse_datetime(tref)
if df.index.size == 0:
raise ValueError("df does not contain data.")
data = df.reset_index().values
data[:, 0] = (df.index - tref).total_seconds()
# calculate required width for time column; hard coded single decimal precision
# format for other columns is based on fmt`argument
w = int(np.floor(np.log10(abs(data[-1, 0])))) + 3
fmt_lst = [f"%{w}.1f"] + [fmt for _ in range(df.columns.size)]
fmt_out = " ".join(fmt_lst)
with open(fn, "w") as f:
np.savetxt(f, data, fmt=fmt_out)
## MASK
[docs]
def get_bounds_vector(da_msk: xr.DataArray) -> gpd.GeoDataFrame:
"""Get bounds of vectorized mask as GeoDataFrame.
Parameters
----------
da_msk: xr.DataArray
Mask as DataArray with values 0 (inactive), 1 (active),
and boundary cells 2 (waterlevels) and 3 (outflow).
Returns
-------
gdf_msk: gpd.GeoDataFrame
GeoDataFrame with line geometries of mask boundaries.
"""
gdf_msk = da_msk.raster.vectorize()
# small buffer for rounding errors
if da_msk.raster.crs.is_geographic:
gdf_msk["geometry"] = gdf_msk.buffer(1e-6)
else:
gdf_msk["geometry"] = gdf_msk.buffer(1)
region = (da_msk >= 1).astype("int16").raster.vectorize()
region = region[region["value"] == 1].drop(columns="value")
region["geometry"] = region.boundary
gdf_msk = gdf_msk[gdf_msk["value"] != 1]
gdf_msk = gpd.overlay(
region, gdf_msk, "intersection", keep_geom_type=False
).explode(index_parts=True)
gdf_msk = gdf_msk[gdf_msk.length > 0]
return gdf_msk
[docs]
def mask2gdf(
da_mask: xr.DataArray,
option: str = "all",
) -> gpd.GeoDataFrame:
"""Convert a boolean mask to a GeoDataFrame of polygons.
Parameters
----------
da_mask: xr.DataArray
Mask with integer values.
option: {"all", "active", "wlev", "outflow"}
Returns
-------
gdf: geopandas.GeoDataFrame
GeoDataFrame of Points.
"""
if option == "all":
da_mask = da_mask != da_mask.raster.nodata
elif option == "active":
da_mask = da_mask == 1
elif option == "wlev":
da_mask = da_mask == 2
elif option == "outflow":
da_mask = da_mask == 3
indices = np.stack(np.where(da_mask), axis=-1)
if "x" in da_mask.coords:
x = da_mask.coords["x"].values[indices[:, 1]]
y = da_mask.coords["y"].values[indices[:, 0]]
else:
x = da_mask.coords["xc"].values[indices[:, 0], indices[:, 1]]
y = da_mask.coords["yc"].values[indices[:, 0], indices[:, 1]]
points = gpd.GeoDataFrame(geometry=gpd.points_from_xy(x, y), crs=da_mask.raster.crs)
if len(points) > 0:
return gpd.GeoDataFrame(points, crs=da_mask.raster.crs)
else:
return None
## STRUCTURES: thd / weir ##
[docs]
def gdf2linestring(gdf: gpd.GeoDataFrame) -> List[Dict]:
"""Convert GeoDataFrame[LineString] to list of structure dictionaries
The x,y are taken from the geometry.
For weir structures to additional paramters are required, a "z" (elevation) and
"par1" (Cd coefficient in weir formula) are required which should be supplied
as columns (or z-coordinate) of the GeoDataFrame. These columns should either
contain a float or 1D-array of floats with same length as the LineString.
Parameters
----------
gdf: geopandas.GeoDataFrame with LineStrings geometries
GeoDataFrame structures.
Returns
-------
feats: list of dict
List of dictionaries describing structures.
"""
feats = []
for _, item in gdf.iterrows():
feat = item.drop("geometry").dropna().to_dict()
# check geom
line = item.geometry
if line.geom_type == "MultiLineString" and len(line.geoms) == 1:
line = line.geoms[0]
if line.geom_type != "LineString":
raise ValueError("Invalid geometry type, only LineString is accepted.")
xyz = tuple(zip(*line.coords[:]))
feat["x"], feat["y"] = list(xyz[0]), list(xyz[1])
if len(xyz) == 3:
feat["z"] = list(xyz[2])
feats.append(feat)
return feats
[docs]
def gdf2polygon(gdf: gpd.GeoDataFrame) -> List[Dict]:
"""Convert GeoDataFrame[Polygon] to list of structure dictionaries
The x,y are taken from the geometry.
Parameters
----------
gdf: geopandas.GeoDataFrame with LineStrings geometries
GeoDataFrame structures.
Returns
-------
feats: list of dict
List of dictionaries describing structures.
"""
feats = []
for _, item in gdf.iterrows():
feat = item.drop("geometry").dropna().to_dict()
# check geom
poly = item.geometry
if poly.type == "MultiPolygon" and len(poly.geoms) == 1:
poly = poly.geoms[0]
if poly.type != "Polygon":
raise ValueError("Invalid geometry type, only Polygon is accepted.")
x, y = poly.exterior.coords.xy
feat["x"], feat["y"] = list(x), list(y)
feats.append(feat)
return feats
[docs]
def linestring2gdf(feats: List[Dict], crs: Union[int, CRS] = None) -> gpd.GeoDataFrame:
"""Convert list of structure dictionaries to GeoDataFrame[LineString]
Parameters
----------
feats: list of dict
List of dictionaries describing structures.
crs: int, CRS
Coordinate reference system
Returns
-------
gdf: geopandas.GeoDataFrame
GeoDataFrame structures
"""
records = []
for f in feats:
feat = copy.deepcopy(f)
xyz = [feat.pop("x"), feat.pop("y")]
if "z" in feat and np.atleast_1d(feat["z"]).size == len(xyz[0]):
xyz.append(feat.pop("z"))
feat.update({"geometry": LineString(list(zip(*xyz)))})
records.append(feat)
gdf = gpd.GeoDataFrame.from_records(records)
gdf.set_geometry("geometry", inplace=True)
if crs is not None:
gdf.set_crs(crs, inplace=True)
return gdf
[docs]
def polygon2gdf(
feats: List[Dict],
crs: Union[int, CRS] = None,
zmin: float = None,
zmax: float = None,
) -> gpd.GeoDataFrame:
"""Convert list of structure dictionaries to GeoDataFrame[Polygon]
Parameters
----------
feats: list of dict
List of dictionaries describing polygons.
crs: int, CRS
Coordinate reference system
Returns
-------
gdf: geopandas.GeoDataFrame
GeoDataFrame structures
"""
records = []
for f in feats:
feat = copy.deepcopy(f)
xy = [feat.pop("x"), feat.pop("y")]
feat.update({"geometry": Polygon(list(zip(*xy)))})
records.append(feat)
gdf = gpd.GeoDataFrame.from_records(records)
gdf["zmin"] = zmin
gdf["zmax"] = zmax
gdf.set_geometry("geometry", inplace=True)
if crs is not None:
gdf.set_crs(crs, inplace=True)
return gdf
[docs]
def write_geoms(
fn: Union[str, Path],
feats: List[Dict],
stype: str = "thd",
fmt: str = "%.1f",
fmt_z: str = "%.1f",
) -> None:
"""Write list of structure dictionaries to file
Parameters
----------
fn: str, Path
Path to output structure file.
feats: list of dict
List of dictionaries describing structures.
For pli, pol, thd anc crs files "x" and "y" are required, "name" is optional.
For weir files "x", "y" and "z" are required, "name" and "par1" are optional.
stype: {'pli', 'pol', 'thd', 'weir', 'crs'}
Geom type polylines (pli), polygons (pol) thin dams (thd), weirs (weir)
or cross-sections (crs).
fmt: str
format for "x" and "y" fields.
fmt_z: str
format for "z" and "par1" fields.
Examples
--------
>>> feats = [
{
"name": 'WEIR01',
"x": [0, 10, 20],
"y": [100, 100, 100],
"z": 5.0,
"par1": 0.6,
},
{
"name": 'WEIR02',
"x": [100, 110, 120],
"y": [100, 100, 100],
"z": [5.0, 5.1, 5.0],
"par1": 0.6,
},
]
>>> write_structures('sfincs.weir', feats, stype='weir')
"""
cols = {"pli": 2, "pol": 2, "thd": 2, "weir": 4, "crs": 2}[stype.lower()]
fmt = [fmt, fmt] + [fmt_z for _ in range(cols - 2)]
if stype.lower() == "weir" and np.any(["z" not in f for f in feats]):
raise ValueError('"z" value missing for weir files.')
with open(fn, "w") as f:
for i, feat in enumerate(feats):
name = feat.get("name", i + 1)
if isinstance(name, int):
name = f"{stype:s}{name:02d}"
rows = len(feat["x"])
a = np.zeros((rows, cols), dtype=np.float32)
a[:, 0] = np.asarray(feat["x"])
a[:, 1] = np.asarray(feat["y"])
if stype.lower() == "weir":
a[:, 2] = feat["z"]
a[:, 3] = feat.get("par1", 0.6)
s = io.BytesIO()
np.savetxt(s, a, fmt=fmt)
f.write(f"{name}\n")
f.write(f"{rows:d} {cols:d}\n")
f.write(s.getvalue().decode())
[docs]
def read_geoms(fn: Union[str, Path]) -> List[Dict]:
"""Read structure files to list of dictionaries.
Parameters
----------
fn : str, Path
Path to structure file.
Returns
-------
feats: list of dict
List of dictionaries describing structures.
"""
feats = []
col_names = ["x", "y", "z", "par1"]
with open(fn, "r") as f:
while True:
name = f.readline().strip()
if not name: # EOF
break
feat = {"name": name}
rows, cols = [int(v) for v in f.readline().strip().split(maxsplit=2)]
for c in range(cols):
feat[col_names[c]] = [0.0 for _ in range(rows)]
for r in range(rows):
for c, v in enumerate(f.readline().strip().split(maxsplit=cols)):
feat[col_names[c]][r] = float(v)
if cols > 2:
for c in col_names[2:]:
if np.unique(feat[c]).size == 1:
feat[c] = feat[c][0]
feats.append(feat)
return feats
[docs]
def write_drn(fn: Union[str, Path], gdf_drainage: gpd.GeoDataFrame, fmt="%.1f") -> None:
"""Write structure files from list of dictionaries.
Parameters
----------
fn : str, Path
Path to structure file.
drainage : gpd.GeoDataFrame
Dataframe with drainage structure parameters and geometry.
fmt : str
Format for coordinate values.
"""
# expected columns for drainage structures
col_names = [
"xsnk",
"ysnk",
"xsrc",
"ysrc",
"type",
"par1",
"par2",
"par3",
"par4",
"par5",
]
gdf = copy.deepcopy(gdf_drainage)
# get geometry linestring and convert to xsnk, ysnk, xsrc, ysrc
endpoints = gdf.boundary.explode(index_parts=True).unstack()
gdf["xsnk"] = endpoints[0].x
gdf["ysnk"] = endpoints[0].y
gdf["xsrc"] = endpoints[1].x
gdf["ysrc"] = endpoints[1].y
gdf.drop(["geometry"], axis=1, inplace=True)
# reorder columns based on col_names
gdf = gdf[col_names]
# change the format of the coordinates according to fmt
for col in ["xsnk", "ysnk", "xsrc", "ysrc"]:
gdf[col] = gdf[col].apply(lambda x: fmt % x)
# write to file
gdf.to_csv(fn, sep=" ", index=False, header=False)
[docs]
def read_drn(fn: Union[str, Path], crs: int = None) -> gpd.GeoDataFrame:
"""Read drainage structure files to geodataframe.
Parameters
----------
fn : str, Path
Path to drainge structure file.
crs : int
EPSG code for coordinate reference system.
Returns
-------
gpd.GeoDataFrame
Dataframe with drainage structure parameters and geometry.
"""
# expected columns for drainage structures
col_names = [
"xsnk",
"ysnk",
"xsrc",
"ysrc",
"type",
"par1",
"par2",
"par3",
"par4",
"par5",
]
# read structure file
df = pd.read_csv(fn, sep="\\s+", names=col_names)
# get geometry linestring
geom = [
LineString([(xsnk, ysnk), (xsrc, ysrc)])
for xsnk, ysnk, xsrc, ysrc in zip(
df["xsnk"], df["ysnk"], df["xsrc"], df["ysrc"]
)
]
df.drop(["xsnk", "ysnk", "xsrc", "ysrc"], axis=1, inplace=True)
# convert to geodataframe
gdf = gpd.GeoDataFrame(df, geometry=geom)
if crs is not None:
gdf.set_crs(crs, inplace=True)
return gdf
## OUTPUT: sfincs_map.nc, sfincs_his.nc ##
[docs]
def read_sfincs_map_results(
fn_map: Union[str, Path],
ds_like: xr.Dataset,
chunksize: int = 100,
drop: List[str] = ["crs", "sfincsgrid"],
logger=logger,
**kwargs,
) -> Tuple[xr.Dataset]:
"""Read sfincs_map.nc staggered grid netcdf files and parse to two
hydromt.RasterDataset objects: one with face and one with edge variables.
Parameters
----------
fn_map : str, Path
Path to sfincs_map.nc file
ds_like: xr.Dataset
Dataset with grid information to use for parsing.
chunksize: int, optional
chunk size along time dimension, by default 100
drop : List[str], optional
Variables to drop from reading, by default ["crs", "sfincsgrid"]
Returns
-------
ds_face, ds_edge: hydromt.RasterDataset
Parsed SFINCS output map file
"""
rm = {
"x": "xc",
"y": "yc",
"corner_x": "corner_xc",
"corner_y": "corner_yc",
"n": "y",
"m": "x",
"corner_n": "corner_y",
"corner_m": "corner_x",
}
ds_map = xr.open_dataset(fn_map, chunks={"time": chunksize}, **kwargs)
ds_map = ds_map.rename(
{k: v for k, v in rm.items() if (k in ds_map or k in ds_map.dims)}
)
ds_map = ds_map.set_coords(
[var for var in ds_map.data_vars.keys() if (var in rm.values())]
)
# support for older sfincs_map.nc files
# check if x,y dimensions are in the order y,x
ds_map = ds_map.transpose(..., "y", "x", "corner_y", "corner_x")
# split face and edge variables
scoords = ds_like.raster.coords
tcoords = {tdim: ds_map[tdim] for tdim in ds_map.dims if tdim.startswith("time")}
ds_face = xr.Dataset(coords={**scoords, **tcoords})
ds_edge = xr.Dataset()
for var in ds_map.data_vars:
if var in drop:
continue
if "x" in ds_map[var].dims and "y" in ds_map[var].dims:
# drop to overwrite with ds_like.raster.coords
ds_face[var] = ds_map[var].drop(["xc", "yc"])
elif ds_map[var].ndim == 0:
ds_face[var] = ds_map[var]
else:
ds_edge[var] = ds_map[var]
# add crs
if ds_like.raster.crs is not None:
ds_face.raster.set_crs(ds_like.raster.crs)
ds_edge.raster.set_crs(ds_like.raster.crs)
return ds_face, ds_edge
[docs]
def read_sfincs_his_results(
fn_his: Union[str, Path],
crs: Union[int, CRS] = None,
chunksize: int = 100,
**kwargs,
) -> xr.Dataset:
"""Read sfincs_his.nc point timeseries netcdf file and parse to hydromt.GeoDataset object.
Parameters
----------
fn_his : str, Path
Path to sfincs_his.nc file
crs: int, CRS
Coordinate reference system
chunksize: int, optional
chunk size along time dimension, by default 100
Returns
-------
ds_his: xr.Dataset
Parsed SFINCS output his file.
"""
ds_his = xr.open_dataset(fn_his, chunks={"time": chunksize}, **kwargs)
crs = ds_his["crs"].item() if ds_his["crs"].item() > 0 else crs
dvars = list(ds_his.data_vars.keys())
# set coordinates & spatial dims
cvars = ["id", "name", "x", "y"]
ds_his = ds_his.set_coords([v for v in dvars if v.split("_")[-1] in cvars])
ds_his.vector.set_spatial_dims(
x_name="station_x", y_name="station_y", index_dim="stations"
)
# set crs
ds_his.vector.set_crs(crs)
return ds_his
[docs]
def downscale_floodmap(
zsmax: xr.DataArray,
dep: Union[Path, str, xr.DataArray],
hmin: float = 0.05,
gdf_mask: gpd.GeoDataFrame = None,
floodmap_fn: Union[Path, str] = None,
reproj_method: str = "nearest",
nrmax: int = 2000,
logger=logger,
**kwargs,
):
"""Create a downscaled floodmap for (model) region.
Parameters
----------
zsmax : xr.DataArray
Maximum water level (m). When multiple timesteps provided, maximum over all timesteps is used.
dep : Path, str, xr.DataArray
High-resolution DEM (m) of model region:
* If a Path or str is provided, the DEM is read from disk and the floodmap
is written to disk (recommened for datasets that do not fit in memory.)
* If a xr.DataArray is provided, the floodmap is returned as xr.DataArray
and only written to disk when floodmap_fn is provided.
hmin : float, optional
Minimum water depth (m) to be considered as "flooded", by default 0.05
gdf_mask : gpd.GeoDataFrame, optional
Geodataframe with polygons to mask floodmap, example containing the landarea, by default None
Note that the area outside the polygons is set to nodata.
floodmap_fn : Union[Path, str], optional
Name (path) of output floodmap, by default None. If provided, the floodmap is written to disk.
reproj_method : str, optional
Reprojection method for downscaling the water levels, by default "nearest".
Other option is "bilinear".
nrmax : int, optional
Maximum number of cells per block, by default 2000. These blocks are used to prevent memory issues.
kwargs : dict, optional
Additional keyword arguments passed to `RasterDataArray.to_raster`.
Returns
-------
hmax: xr.Dataset
Downscaled and masked floodmap.
See Also
--------
hydromt.raster.RasterDataArray.to_raster
"""
# get maximum water level
timedim = set(zsmax.dims) - set(zsmax.raster.dims)
if timedim:
zsmax = zsmax.max(timedim)
# Hydromt expects a string so if a Path is provided, convert to str
if isinstance(floodmap_fn, Path):
floodmap_fn = str(floodmap_fn)
if isinstance(dep, xr.DataArray):
hmax = _downscale_floodmap_da(
zsmax=zsmax,
dep=dep,
hmin=hmin,
gdf_mask=gdf_mask,
reproj_method=reproj_method,
)
# write floodmap
if floodmap_fn is not None:
if not kwargs: # write COG by default
kwargs = dict(
driver="GTiff",
tiled=True,
blockxsize=256,
blockysize=256,
compress="deflate",
predictor=2,
profile="COG",
)
hmax.raster.to_raster(floodmap_fn, **kwargs)
# add overviews
build_overviews(fn=floodmap_fn, resample_method="nearest", logger=logger)
hmax.name = "hmax"
hmax.attrs.update({"long_name": "Maximum flood depth", "units": "m"})
return hmax
elif isinstance(dep, (str, Path)):
if floodmap_fn is not None:
raise ValueError(
"floodmap_fn should be provided when dep is a Path or str."
)
with rasterio.open(dep) as src:
# Define block size
n1, m1 = src.shape
nrcb = nrmax # nr of cells in a block
nrbn = int(np.ceil(n1 / nrcb)) # nr of blocks in n direction
nrbm = int(np.ceil(m1 / nrcb)) # nr of blocks in m direction
# avoid blocks with width or height of 1
merge_last_col = False
merge_last_row = False
if m1 % nrcb == 1:
nrbm -= 1
merge_last_col = True
if n1 % nrcb == 1:
nrbn -= 1
merge_last_row = True
profile = dict(
driver="GTiff",
width=src.width,
height=src.height,
count=1,
dtype=np.float32,
crs=src.crs,
transform=src.transform,
tiled=True,
blockxsize=256,
blockysize=256,
compress="deflate",
predictor=2,
profile="COG",
nodata=np.nan,
BIGTIFF="YES", # Add the BIGTIFF option here
)
with rasterio.open(floodmap_fn, "w", **profile):
pass
## Loop through blocks
for ii in range(nrbm):
bm0 = ii * nrcb # Index of first m in block
bm1 = min(bm0 + nrcb, m1) # last m in block
if merge_last_col and ii == (nrbm - 1):
bm1 += 1
for jj in range(nrbn):
bn0 = jj * nrcb # Index of first n in block
bn1 = min(bn0 + nrcb, n1) # last n in block
if merge_last_row and jj == (nrbn - 1):
bn1 += 1
# Define a window to read a block of data
window = Window(bm0, bn0, bm1 - bm0, bn1 - bn0)
# Read the block of data
block_data = src.read(window=window)
# check for nan-data
if np.all(np.isnan(block_data)):
continue
# TODO directly use the rasterio warp method rather than the raster.reproject see PR #145
# Convert row and column indices to pixel coordinates
cols, rows = np.indices((bm1 - bm0, bn1 - bn0))
x_coords, y_coords = src.transform * (cols + bm0, rows + bn0)
# Create xarray DataArray with coordinates
block_dep = xr.DataArray(
block_data.squeeze().transpose(),
dims=("y", "x"),
coords={
"yc": (("y", "x"), y_coords),
"xc": (("y", "x"), x_coords),
},
)
block_dep.raster.set_crs(src.crs)
block_hmax = _downscale_floodmap_da(
zsmax=zsmax,
dep=block_dep,
hmin=hmin,
gdf_mask=gdf_mask,
reproj_method=reproj_method,
)
with rasterio.open(floodmap_fn, "r+") as fm_tif:
fm_tif.write(
np.transpose(block_hmax.values),
window=window,
indexes=1,
)
# add overviews
build_overviews(fn=floodmap_fn, resample_method="nearest", logger=logger)
[docs]
def rotated_grid(
pol: Polygon, res: float, dec_origin=0, dec_rotation=3
) -> Tuple[float, float, int, int, float]:
"""Returns the origin (x0, y0), shape (mmax, nmax) and rotation
of the rotated grid fitted to the minimum rotated rectangle around the
area of interest (pol). The grid shape is defined by the resolution (res).
Parameters
----------
pol : Polygon
Polygon of the area of interest
res : float
Resolution of the grid
"""
def _azimuth(point1, point2):
"""azimuth between 2 points (interval 0 - 180)"""
angle = np.arctan2(point2[1] - point1[1], point2[0] - point1[0])
return round(np.degrees(angle), dec_rotation)
def _dist(a, b):
"""distance between points"""
return np.hypot(b[0] - a[0], b[1] - a[1])
mrr = pol.minimum_rotated_rectangle
coords = np.asarray(mrr.exterior.coords)[:-1, :] # get coordinates of all corners
# get origin based on the corner with the smallest distance to origin
# after translation to account for possible negative coordinates
ib = np.argmin(
np.hypot(coords[:, 0] - coords[:, 0].min(), coords[:, 1] - coords[:, 1].min())
)
ir = (ib + 1) % 4
il = (ib + 3) % 4
x0, y0 = coords[ib, :]
x0, y0 = round(x0, dec_origin), round(y0, dec_origin)
az1 = _azimuth((x0, y0), coords[ir, :])
az2 = _azimuth((x0, y0), coords[il, :])
axis1 = _dist((x0, y0), coords[ir, :])
axis2 = _dist((x0, y0), coords[il, :])
if az2 < az1:
rot = az2
mmax = int(np.ceil(axis2 / res))
nmax = int(np.ceil(axis1 / res))
else:
rot = az1
mmax = int(np.ceil(axis1 / res))
nmax = int(np.ceil(axis2 / res))
return x0, y0, mmax, nmax, rot
def build_overviews(
fn: Union[str, Path],
resample_method: str = "average",
overviews: Union[list, str] = "auto",
logger=logger,
):
"""Build overviews for GeoTIFF file.
Overviews are reduced resolution versions of your dataset that can speed up
rendering when you don’t need full resolution. By precomputing the upsampled
pixels, rendering can be significantly faster when zoomed out.
Parameters
----------
fn : str, Path
Path to GeoTIFF file.
method: str
Resampling method, by default "average". Other option is "nearest".
overviews: list of int, optional
List of overview levels, by default "auto". When set to "auto" the
overview levels are determined based on the size of the dataset.
"""
# Endswith is not a method of Path so convert to str
if isinstance(fn, Path):
fn = str(fn)
# check if fn is a geotiff file
extensions = [".tif", ".tiff"]
assert any(
fn.endswith(ext) for ext in extensions
), f"File {fn} is not a GeoTIFF file."
# open rasterio dataset
with rasterio.open(fn, "r+") as src:
# determine overviews when not provided
if overviews == "auto":
bs = src.profile.get("blockxsize", 256)
max_level = get_maximum_overview_level(src.width, src.height, bs)
overviews = [2**j for j in range(1, max_level + 1)]
if not isinstance(overviews, list):
raise ValueError("overviews should be a list of integers or 'auto'.")
resampling = getattr(Resampling, resample_method, None)
if resampling is None:
raise ValueError(f"Resampling method unknown: {resample_method}")
no = len(overviews)
logger.info(f"Building {no} overviews with {resample_method}")
# create new overviews, resampling with average method
src.build_overviews(overviews, resampling)
# update dataset tags
src.update_tags(ns="rio_overview", resampling=resample_method)
def _downscale_floodmap_da(
zsmax: xr.DataArray,
dep: xr.DataArray,
hmin: float = 0.05,
gdf_mask: gpd.GeoDataFrame = None,
reproj_method: str = "nearest",
) -> xr.DataArray:
"""Create a downscaled floodmap for (model) region.
Parameters
----------
zsmax : xr.DataArray
Maximum water level (m). When multiple timesteps provided, maximum over all timesteps is used.
dep : Path, str, xr.DataArray
High-resolution DEM (m) of model region:
hmin : float, optional
Minimum water depth (m) to be considered as "flooded", by default 0.05
gdf_mask : gpd.GeoDataFrame, optional
Geodataframe with polygons to mask floodmap, example containing the landarea, by default None
Note that the area outside the polygons is set to nodata.
"""
# interpolate zsmax to dep grid
zsmax = zsmax.raster.reproject_like(dep, method=reproj_method)
zsmax = zsmax.raster.mask_nodata() # make sure nodata is nan
# get flood depth
hmax = (zsmax - dep).astype("float32")
hmax.raster.set_nodata(np.nan)
# mask floodmap
hmax = hmax.where(hmax > hmin)
if gdf_mask is not None:
mask = hmax.raster.geometry_mask(gdf_mask, all_touched=True)
hmax = hmax.where(mask)
return hmax
def find_uv_indices(mask: xr.DataArray):
"""The subgrid tables for a regular SFINCS grid are organized as flattened arrays, meaning
2D arrays (y,x) are transformed into 1D arrays, only containing values for active cells.
For the cell centers, this is straightforward, we just find the indices of the active cells.
However, the u and v points are saved in combined arrays. Since u and v points are absent
at the boundaries of the domain, the index arrays are used to determine the location of the
u and v points in the combined flattened arrays.
Parameters
----------
mask: xr.DataArray
Mask with integer values specifying the active cells of the SFINCS domain.
Returns
-------
index_nm: np.ndarray
Index array for the active cell centers.
index_mu1: np.ndarray
Index of upstream u-point in combined uv-array.
index_nu1: np.ndarray
Index of upstream v-point in combined uv-array.
"""
mask = mask.values
# nr of cells
nr_cells = mask.shape[0] * mask.shape[1]
# get the index of the u and v points in a combined array
mu1 = np.zeros(nr_cells, dtype=int) - 1
nu1 = np.zeros(nr_cells, dtype=int) - 1
ms = np.linspace(0, mask.shape[1] - 1, mask.shape[1], dtype=int)
ns = np.linspace(0, mask.shape[0] - 1, mask.shape[0], dtype=int)
m, n = np.meshgrid(ms, ns)
m = np.transpose(m).flatten()
n = np.transpose(n).flatten()
mask = mask.transpose().flatten()
nmax = n.max() + 1
nms = m * nmax + n
for ic in range(nr_cells):
# nu1
nn = n[ic] + 1
if nn < nmax:
mm = m[ic]
nm = mm * nmax + nn
j = binary_search(nms, nm)
if j is not None:
nu1[ic] = j
# mu1
nn = n[ic]
mm = m[ic] + 1
nm = mm * nmax + nn
j = binary_search(nms, nm)
if j is not None:
mu1[ic] = j
# For regular grids, only the points with mask > 0 are stored
# The index arrays determine the location in the flattened arrays (with values for all active points)
# Initialize index arrays with -1, inactive cells will remain -1
index_nm = np.zeros(nr_cells, dtype=int) - 1
index_mu1 = np.zeros(nr_cells, dtype=int) - 1
index_nu1 = np.zeros(nr_cells, dtype=int) - 1
npuv = 0
npc = 0
# Loop through all cells
for ip in range(nr_cells):
# Check if this cell is active
if mask[ip] > 0:
index_nm[ip] = npc
npc += 1
if mu1[ip] >= 0:
if mask[mu1[ip]] > 0:
index_mu1[ip] = npuv
npuv += 1
if nu1[ip] >= 0:
if mask[nu1[ip]] > 0:
index_nu1[ip] = npuv
npuv += 1
return index_nm, index_mu1, index_nu1
def binary_search(vals, val):
indx = np.searchsorted(vals, val)
if indx < np.size(vals):
if vals[indx] == val:
return indx
return None