Source code for hydromt_sfincs.utils

"""
HydroMT-SFINCS utilities functions for reading and writing SFINCS specific input and output files,
as well as some common data conversions.
"""

import copy
import io
import logging
from configparser import ConfigParser
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Tuple, Union

import geopandas as gpd
import hydromt
import numpy as np
import pandas as pd
import pyproj
import rasterio
import xarray as xr
from hydromt.io import write_xy
from pyproj.crs.crs import CRS
from shapely.geometry import LineString, Polygon


__all__ = [
    "read_binary_map",
    "write_binary_map",
    "read_binary_map_index",
    "write_binary_map_index",
    "read_ascii_map",
    "write_ascii_map",
    "read_timeseries",
    "write_timeseries",
    "get_bounds_vector",
    "mask2gdf",
    "read_xy",
    "write_xy",
    "read_xyn",
    "write_xyn",
    "read_geoms",
    "write_geoms",
    "gdf2linestring",
    "gdf2polygon",
    "linestring2gdf",
    "polygon2gdf",
    "read_sfincs_map_results",
    "read_sfincs_his_results",
    "downscale_floodmap",
    "rotated_grid",
]

logger = logging.getLogger(__name__)


## BINARY MAPS: sfincs.ind, sfincs.msk, sfincs.dep etc. ##


[docs]def write_binary_map_index(fn_ind: Union[str, Path], msk: np.ndarray) -> None: """Write flat index of binary map file. NOTE: The array should be in S->N and W->E orientation, with origin in the SW corner. Parameters ---------- fn_ind: str, Path Path to output map index file. msk: np.ndarray 2D array of sfincs mask map, where invalid cells have value 0. """ # the index number file of sfincs starts with the length of the index numbers indices = np.where(msk.transpose().flatten() > 0)[0] + 1 # convert to 1-based index indices_ = np.array(np.hstack([np.array(len(indices)), indices]), dtype="u4") indices_.tofile(fn_ind)
[docs]def read_binary_map_index(fn_ind: Union[str, Path]) -> np.ndarray: """Read binary map index file. Parameters ---------- fn_ind: str, Path Path to map index file. Returns ------- ind: np.ndarray 1D array of flat index of binary maps. """ _ind = np.fromfile(fn_ind, dtype="u4") ind = _ind[1:] - 1 # convert to zero based index assert _ind[0] == ind.size return ind
[docs]def write_binary_map( fn: Union[str, Path], data: np.ndarray, msk: np.ndarray, dtype: Union[str, np.dtype] = "f4", ) -> None: """Write binary map file. NOTE: The array should be in S->N and W->E orientation, with origin in the SW corner. Parameters ---------- fn str, Path Path to output map index file. data: np.ndarray 2D array of sfincs map. msk: np.ndarray 2D array of sfincs mask map, where invalid cells have value 0. dtype: str, np.dtype, optional Data type, by default "f4". For sfincs.msk file use dtype="u1". """ data_out = np.asarray(data.transpose()[msk.transpose() > 0], dtype=dtype) data_out.tofile(fn)
[docs]def read_binary_map( fn: Union[str, Path], ind: np.ndarray, shape: Tuple[int], mv: float = -9999.0, dtype: str = "f4", ) -> np.ndarray: """Read binary map. Parameters ---------- fn: str, Path Path to map file. ind: np.ndarray 1D array of flat index of binary maps. shape: tuple of int (nrow, ncol) shape of output map. mv: int or float missing value, by default -9999.0. dtype: str, np.dtype, optional Data type, by default "f4". For sfincs.msk file use dtype="u1". Returns ------- ind: np.ndarray 1D array of flat index of binary maps. """ assert ind.max() <= np.multiply(*shape) nrow, ncol = shape data = np.full((ncol, nrow), mv, dtype=dtype) data.flat[ind] = np.fromfile(fn, dtype=dtype) data = data.transpose() return data
## ASCII maps: sfincs.restart ##
[docs]def read_ascii_map(fn: Union[str, Path]) -> np.ndarray: """Read ascii map Parameters ---------- fn : str, Path Path to ascii map file. Returns ------- data : np.ndarray 2D array of sfincs map. """ data = np.loadtxt(fn).astype(np.float32) return data
[docs]def write_ascii_map(fn: Union[str, Path], data: np.ndarray, fmt: str = "%8.3f") -> None: """Write ascii map NOTE: The array should be in S->N and W->E orientation, with origin in the SW corner. Parameters ---------- fn : str, Path Path to ascii map file. data : np.ndarray 2D array of sfincs map. fmt : str, optional Value format, by default "%8.3f". See numpy.savetxt for more options. """ with open(fn, "w") as f: np.savetxt(f, data, fmt=fmt)
## XY files: bnd / src ## # write_xy defined in hydromt.io
[docs]def read_xy(fn: Union[str, Path], crs: Union[int, CRS] = None) -> gpd.GeoDataFrame: """Read sfincs xy files and parse to GeoDataFrame. Parameters ---------- fn : str, Path Path to ascii xy file. crs: int, CRS Coordinate reference system Returns ------- gdf: gpd.GeoDataFrame GeoDataFrame with point geomtries """ gdf = hydromt.open_vector(fn, crs=crs, driver="xy") gdf.index = np.arange(1, gdf.index.size + 1, dtype=int) # index starts at 1 return gdf
[docs]def read_xyn(fn: str, crs: int = None): df = pd.read_csv(fn, index_col=False, header=None, delim_whitespace=True).rename( columns={0: "x", 1: "y"} ) if len(df.columns) > 2: df = df.rename(columns={2: "name"}) else: df["name"] = df.index points = gpd.points_from_xy(df["x"], df["y"]) gdf = gpd.GeoDataFrame(df.drop(columns=["x", "y"]), geometry=points, crs=crs) return gdf
[docs]def write_xyn(fn: str = "sfincs.obs", gdf: gpd.GeoDataFrame = None, crs: int = None): with open(fn, "w") as fid: for point in gdf.iterfeatures(): x, y = point["geometry"]["coordinates"] try: name = point["properties"]["name"] except: name = "obs" + str(point["id"]) if crs.is_geographic: string = f'{x:12.6f}{y:12.6f} "{name}"\n' else: string = f'{x:12.1f}{y:12.1f} "{name}"\n' fid.write(string)
## ASCII TIMESERIES: bzs / dis / precip ##
[docs]def parse_datetime(dt: Union[str, datetime], format="%Y%m%d %H%M%S") -> datetime: """Checks and/or parses datetime from a string, default sfincs datetime string format""" if isinstance(dt, str): dt = datetime.strptime(dt, format) elif not isinstance(dt, datetime): raise ValueError(f"Unknown type for datetime: {type(dt)})") return dt
[docs]def read_timeseries(fn: Union[str, Path], tref: Union[str, datetime]) -> pd.DataFrame: """Read ascii timeseries files such as sfincs.bzs, sfincs.dis and sfincs.precip. The first column (time index) is parsed to datetime format assumming it represents seconds from `tref`. Parameters ---------- fn: str, Path Path to output timeseries file. tref: datetime.datetime, str Datetime of tref, string in "%Y%m%d %H%M%S" format. Returns ------- df: pd.DataFrame Dataframe of timeseries with parsed time index. """ tref = parse_datetime(tref) df = pd.read_csv(fn, delim_whitespace=True, index_col=0, header=None) df.index = pd.to_datetime(df.index.values, unit="s", origin=tref) df.columns = df.columns.values.astype(int) df.index.name = "time" df.columns.name = "index" return df
[docs]def write_timeseries( fn: Union[str, Path], df: Union[pd.DataFrame, pd.Series], tref: Union[str, datetime], fmt: str = "%7.2f", ) -> None: """Write pandas.DataFrame to fixed width ascii timeseries files such as sfincs.bzs, sfincs.dis and sfincs.precip. The output time index is given in seconds from tref. Parameters ---------- fn: str, Path Path to output timeseries file. df: pd.DataFrame Dataframe of timeseries. tref: datetime.datetime, str Datetime of tref, string in "%Y%m%d %H%M%S" format. fmt: str, optional Output value format, by default "%7.2f". """ if isinstance(df, pd.Series): df = df.to_frame() elif not isinstance(df, pd.DataFrame): raise ValueError(f"Unknown type for df: {type(df)})") tref = parse_datetime(tref) if df.index.size == 0: raise ValueError("df does not contain data.") data = df.reset_index().values data[:, 0] = (df.index - tref).total_seconds() # calculate required width for time column; hard coded single decimal precision # format for other columns is based on fmt`argument w = int(np.floor(np.log10(data[-1, 0]))) + 3 fmt_lst = [f"%{w}.1f"] + [fmt for _ in range(df.columns.size)] fmt_out = " ".join(fmt_lst) with open(fn, "w") as f: np.savetxt(f, data, fmt=fmt_out)
## MASK
[docs]def get_bounds_vector(da_msk: xr.DataArray) -> gpd.GeoDataFrame: """Get bounds of vectorized mask as GeoDataFrame. Parameters ---------- da_msk: xr.DataArray Mask as DataArray with values 0 (inactive), 1 (active), and boundary cells 2 (waterlevels) and 3 (outflow). Returns ------- gdf_msk: gpd.GeoDataFrame GeoDataFrame with line geometries of mask boundaries. """ gdf_msk = da_msk.raster.vectorize() gdf_msk["geometry"] = gdf_msk.buffer(1) # small buffer for rounding errors region = (da_msk >= 1).astype("int16").raster.vectorize() region = region[region["value"] == 1].drop(columns="value") region["geometry"] = region.boundary gdf_msk = gdf_msk[gdf_msk["value"] != 1] gdf_msk = gpd.overlay( region, gdf_msk, "intersection", keep_geom_type=False ).explode() gdf_msk = gdf_msk[gdf_msk.length > 0] return gdf_msk
[docs]def mask2gdf( da_mask: xr.DataArray, option: str = "all", ) -> gpd.GeoDataFrame: """Convert a boolean mask to a GeoDataFrame of polygons. Parameters ---------- da_mask: xr.DataArray Mask with integer values. option: {"all", "active", "wlev", "outflow"} Returns ------- gdf: geopandas.GeoDataFrame GeoDataFrame of Points. """ if option == "all": da_mask = da_mask != da_mask.raster.nodata elif option == "active": da_mask = da_mask == 1 elif option == "wlev": da_mask = da_mask == 2 elif option == "outflow": da_mask = da_mask == 3 indices = np.stack(np.where(da_mask), axis=-1) if "x" in da_mask.coords: x = da_mask.coords["x"].values[indices[:, 1]] y = da_mask.coords["y"].values[indices[:, 0]] else: x = da_mask.coords["xc"].values[indices[:, 0], indices[:, 1]] y = da_mask.coords["yc"].values[indices[:, 0], indices[:, 1]] points = gpd.GeoDataFrame(geometry=gpd.points_from_xy(x, y), crs=da_mask.raster.crs) if len(points) > 0: return gpd.GeoDataFrame(points, crs=da_mask.raster.crs) else: return None
## STRUCTURES: thd / weir ##
[docs]def gdf2linestring(gdf: gpd.GeoDataFrame) -> List[Dict]: """Convert GeoDataFrame[LineString] to list of structure dictionaries The x,y are taken from the geometry. For weir structures to additional paramters are required, a "z" (elevation) and "par1" (Cd coefficient in weir formula) are required which should be supplied as columns (or z-coordinate) of the GeoDataFrame. These columns should either contain a float or 1D-array of floats with same length as the LineString. Parameters ---------- gdf: geopandas.GeoDataFrame with LineStrings geometries GeoDataFrame structures. Returns ------- feats: list of dict List of dictionaries describing structures. """ feats = [] for _, item in gdf.iterrows(): feat = item.drop("geometry").dropna().to_dict() # check geom line = item.geometry if line.type == "MultiLineString" and len(line.geoms) == 1: line = line.geoms[0] if line.type != "LineString": raise ValueError("Invalid geometry type, only LineString is accepted.") xyz = tuple(zip(*line.coords[:])) feat["x"], feat["y"] = list(xyz[0]), list(xyz[1]) if len(xyz) == 3: feat["z"] = list(xyz[2]) feats.append(feat) return feats
[docs]def gdf2polygon(gdf: gpd.GeoDataFrame) -> List[Dict]: """Convert GeoDataFrame[Polygon] to list of structure dictionaries The x,y are taken from the geometry. Parameters ---------- gdf: geopandas.GeoDataFrame with LineStrings geometries GeoDataFrame structures. Returns ------- feats: list of dict List of dictionaries describing structures. """ feats = [] for _, item in gdf.iterrows(): feat = item.drop("geometry").dropna().to_dict() # check geom poly = item.geometry if poly.type == "MultiPolygon" and len(poly.geoms) == 1: poly = poly.geoms[0] if poly.type != "Polygon": raise ValueError("Invalid geometry type, only Polygon is accepted.") x, y = poly.exterior.coords.xy feat["x"], feat["y"] = list(x), list(y) feats.append(feat) return feats
[docs]def linestring2gdf(feats: List[Dict], crs: Union[int, CRS] = None) -> gpd.GeoDataFrame: """Convert list of structure dictionaries to GeoDataFrame[LineString] Parameters ---------- feats: list of dict List of dictionaries describing structures. crs: int, CRS Coordinate reference system Returns ------- gdf: geopandas.GeoDataFrame GeoDataFrame structures """ records = [] for f in feats: feat = copy.deepcopy(f) xyz = [feat.pop("x"), feat.pop("y")] if "z" in feat and np.atleast_1d(feat["z"]).size == len(xyz[0]): xyz.append(feat.pop("z")) feat.update({"geometry": LineString(list(zip(*xyz)))}) records.append(feat) gdf = gpd.GeoDataFrame.from_records(records) if crs is not None: gdf.set_crs(crs, inplace=True) return gdf
[docs]def polygon2gdf( feats: List[Dict], crs: Union[int, CRS] = None, zmin: float = None, zmax: float = None, ) -> gpd.GeoDataFrame: """Convert list of structure dictionaries to GeoDataFrame[Polygon] Parameters ---------- feats: list of dict List of dictionaries describing polygons. crs: int, CRS Coordinate reference system Returns ------- gdf: geopandas.GeoDataFrame GeoDataFrame structures """ records = [] for f in feats: feat = copy.deepcopy(f) xy = [feat.pop("x"), feat.pop("y")] feat.update({"geometry": Polygon(list(zip(*xy)))}) records.append(feat) gdf = gpd.GeoDataFrame.from_records(records) gdf["zmin"] = zmin gdf["zmax"] = zmax if crs is not None: gdf.set_crs(crs, inplace=True) return gdf
[docs]def write_geoms( fn: Union[str, Path], feats: List[Dict], stype: str = "thd", fmt="%.1f" ) -> None: """Write list of structure dictionaries to file Parameters ---------- fn: str, Path Path to output structure file. feats: list of dict List of dictionaries describing structures. For pli, pol and thd files "x" and "y" are required, "name" is optional. For weir files "x", "y" and "z" are required, "name" and "par1" are optional. stype: {'pli', 'pol', 'thd', 'weir'} Geom type polylines (pli), polygons (pol) thin dams (thd) or weirs (weir). fmt: str format for "z" and "par1" fields. Examples -------- >>> feats = [ { "name": 'WEIR01', "x": [0, 10, 20], "y": [100, 100, 100], "z": 5.0, "par1": 0.6, }, { "name": 'WEIR02', "x": [100, 110, 120], "y": [100, 100, 100], "z": [5.0, 5.1, 5.0], "par1": 0.6, }, ] >>> write_structures('sfincs.weir', feats, stype='weir') """ cols = {"pli": 2, "pol": 2, "thd": 2, "weir": 4}[stype.lower()] fmt = ["%.0f", "%.0f"] + [fmt for _ in range(cols - 2)] if stype.lower() == "weir" and np.any(["z" not in f for f in feats]): raise ValueError('"z" value missing for weir files.') with open(fn, "w") as f: for i, feat in enumerate(feats): name = feat.get("name", i + 1) if isinstance(name, int): name = f"{stype:s}{name:02d}" rows = len(feat["x"]) a = np.zeros((rows, cols), dtype=np.float32) a[:, 0] = np.asarray(feat["x"]).round(0) a[:, 1] = np.asarray(feat["y"]).round(0) if stype.lower() == "weir": a[:, 2] = feat["z"] a[:, 3] = feat.get("par1", 0.6) s = io.BytesIO() np.savetxt(s, a, fmt=fmt) f.write(f"{name}\n") f.write(f"{rows:d} {cols:d}\n") f.write(s.getvalue().decode())
[docs]def read_geoms(fn: Union[str, Path]) -> List[Dict]: """Read structure files to list of dictionaries. Parameters ---------- fn : str, Path Path to structure file. Returns ------- feats: list of dict List of dictionaries describing structures. """ feats = [] col_names = ["x", "y", "z", "par1"] with open(fn, "r") as f: while True: name = f.readline().strip() if not name: # EOF break feat = {"name": name} rows, cols = [int(v) for v in f.readline().strip().split(maxsplit=2)] for c in range(cols): feat[col_names[c]] = [0.0 for _ in range(rows)] for r in range(rows): for c, v in enumerate(f.readline().strip().split(maxsplit=cols)): feat[col_names[c]][r] = float(v) if cols > 2: for c in col_names[2:]: if np.unique(feat[c]).size == 1: feat[c] = feat[c][0] feats.append(feat) return feats
## OUTPUT: sfincs_map.nc, sfincs_his.nc ##
[docs]def read_sfincs_map_results( fn_map: Union[str, Path], ds_like: xr.Dataset, chunksize: int = 100, drop: List[str] = ["crs", "sfincsgrid"], logger=logger, **kwargs, ) -> Tuple[xr.Dataset]: """Read sfincs_map.nc staggered grid netcdf files and parse to two hydromt.RasterDataset objects: one with face and one with edge variables. Parameters ---------- fn_map : str, Path Path to sfincs_map.nc file ds_like: xr.Dataset Dataset with grid information to use for parsing. chunksize: int, optional chunk size along time dimension, by default 100 drop : List[str], optional Variables to drop from reading, by default ["crs", "sfincsgrid"] Returns ------- ds_face, ds_edge: hydromt.RasterDataset Parsed SFINCS output map file """ rm = { "x": "xc", "y": "yc", "corner_x": "corner_xc", "corner_y": "corner_yc", "n": "y", "m": "x", "corner_n": "corner_y", "corner_m": "corner_x", } ds_map = xr.open_dataset(fn_map, chunks={"time": chunksize}, **kwargs) ds_map = ds_map.rename( {k: v for k, v in rm.items() if (k in ds_map or k in ds_map.dims)} ) ds_map = ds_map.set_coords( [var for var in ds_map.data_vars.keys() if (var in rm.values())] ) # split face and edge variables scoords = ds_like.raster.coords tcoords = {tdim: ds_map[tdim] for tdim in ds_map.dims if tdim.startswith("time")} ds_face = xr.Dataset(coords={**scoords, **tcoords}) ds_edge = xr.Dataset() for var in ds_map.data_vars: if var in drop: continue if "x" in ds_map[var].dims and "y" in ds_map[var].dims: # drop to overwrite with ds_like.raster.coords ds_face[var] = ds_map[var].drop(["xc", "yc"]) elif ds_map[var].ndim == 0: ds_face[var] = ds_map[var] else: ds_edge[var] = ds_map[var] # add crs if ds_like.raster.crs is not None: ds_face.raster.set_crs(ds_like.raster.crs) ds_edge.raster.set_crs(ds_like.raster.crs) return ds_face, ds_edge
[docs]def read_sfincs_his_results( fn_his: Union[str, Path], crs: Union[int, CRS] = None, chunksize: int = 100, **kwargs, ) -> xr.Dataset: """Read sfincs_his.nc point timeseries netcdf file and parse to hydromt.GeoDataset object. Parameters ---------- fn_his : str, Path Path to sfincs_his.nc file crs: int, CRS Coordinate reference system chunksize: int, optional chunk size along time dimension, by default 100 Returns ------- ds_his: xr.Dataset Parsed SFINCS output his file. """ ds_his = xr.open_dataset(fn_his, chunks={"time": chunksize}, **kwargs) crs = ds_his["crs"].item() if ds_his["crs"].item() != 0 else crs dvars = list(ds_his.data_vars.keys()) # set coordinates & spatial dims cvars = ["id", "name", "x", "y"] ds_his = ds_his.set_coords([v for v in dvars if v.split("_")[-1] in cvars]) ds_his.vector.set_spatial_dims( x_name="station_x", y_name="station_y", index_dim="stations" ) # set crs ds_his.vector.set_crs(crs) return ds_his
[docs]def downscale_floodmap( zsmax: xr.DataArray, dep: xr.DataArray, hmin: float = 0.05, gdf_mask: gpd.GeoDataFrame = None, floodmap_fn: Union[Path, str] = None, reproj_method: str = "nearest", **kwargs, ) -> xr.Dataset: """Create a downscaled floodmap for (model) region. Parameters ---------- zsmax : xr.DataArray Maximum water level (m). When multiple timesteps provided, maximum over all timesteps is used. dep : xr.DataArray High-resolution DEM (m) of model region. hmin : float, optional Minimum water depth (m) to be considered as "flooded", by default 0.05 gdf_mask : gpd.GeoDataFrame, optional Geodataframe with polygons to mask floodmap, example containing the landarea, by default None Note that the area outside the polygons is set to nodata. floodmap_fn : Union[Path, str], optional Name (path) of output floodmap, by default None. If provided, the floodmap is written to disk. reproj_method : str, optional Reprojection method for downscaling the water levels, by default "nearest". Other option is "bilinear". kwargs : dict, optional Additional keyword arguments passed to `RasterDataArray.to_raster`. Returns ------- hmax: xr.Dataset Downscaled and masked floodmap. See Also -------- hydromt.raster.RasterDataArray.to_raster """ # get maximum water level timedim = set(zsmax.dims) - set(zsmax.raster.dims) if timedim: zsmax = zsmax.max(timedim) # interpolate zsmax to dep grid zsmax = zsmax.raster.reproject_like(dep, method=reproj_method) zsmax = zsmax.raster.mask_nodata() # make sure nodata is nan # get flood depth hmax = (zsmax - dep).astype("float32") hmax.raster.set_nodata(np.nan) # mask floodmap hmax = hmax.where(hmax > hmin) if gdf_mask is not None: mask = hmax.raster.geometry_mask(gdf_mask, all_touched=True) hmax = hmax.where(mask) # write floodmap if floodmap_fn is not None: if not kwargs: # write COG by default kwargs = dict( driver="GTiff", tiled=True, blockxsize=256, blockysize=256, compress="deflate", predictor=2, profile="COG", ) hmax.raster.to_raster(floodmap_fn, **kwargs) return hmax
[docs]def rotated_grid( pol: Polygon, res: float, dec_origin=0, dec_rotation=3 ) -> Tuple[float, float, int, int, float]: """Returns the origin (x0, y0), shape (mmax, nmax) and rotation of the rotated grid fitted to the minimum rotated rectangle around the area of interest (pol). The grid shape is defined by the resolution (res). Parameters ---------- pol : Polygon Polygon of the area of interest res : float Resolution of the grid """ def _azimuth(point1, point2): """azimuth between 2 points (interval 0 - 180)""" angle = np.arctan2(point2[1] - point1[1], point2[0] - point1[0]) return round(np.degrees(angle), dec_rotation) def _dist(a, b): """distance between points""" return np.hypot(b[0] - a[0], b[1] - a[1]) mrr = pol.minimum_rotated_rectangle coords = np.asarray(mrr.exterior.coords)[:-1, :] # get coordinates of all corners # get origin based on the corner with the smallest distance to origin # after translation to account for possible negative coordinates ib = np.argmin( np.hypot(coords[:, 0] - coords[:, 0].min(), coords[:, 1] - coords[:, 1].min()) ) ir = (ib + 1) % 4 il = (ib + 3) % 4 x0, y0 = coords[ib, :] x0, y0 = round(x0, dec_origin), round(y0, dec_origin) az1 = _azimuth((x0, y0), coords[ir, :]) az2 = _azimuth((x0, y0), coords[il, :]) axis1 = _dist((x0, y0), coords[ir, :]) axis2 = _dist((x0, y0), coords[il, :]) if az2 < az1: rot = az2 mmax = int(np.ceil(axis2 / res)) nmax = int(np.ceil(axis1 / res)) else: rot = az1 mmax = int(np.ceil(axis1 / res)) nmax = int(np.ceil(axis2 / res)) return x0, y0, mmax, nmax, rot