Source code for hydromt_sfincs.components.forcing.water_level

import logging
from os.path import join
from pathlib import Path
from typing import TYPE_CHECKING, List, Union

from cht_tide import predict
import geopandas as gpd
import numpy as np
import pandas as pd
from pyproj import Transformer
import shapely
import xarray as xr

from hydromt import hydromt_step
from hydromt.gis.vector import GeoDataset
from hydromt_sfincs import utils
from .deltares_ini import IniStruct
from .boundary_conditions import SfincsBoundaryBase

if TYPE_CHECKING:
    from hydromt_sfincs import SfincsModel

logger = logging.getLogger(f"hydromt.{__name__}")


[docs] class SfincsWaterLevel(SfincsBoundaryBase): """Water level boundary component for SFINCS models. This component handles reading and writing of water level boundary conditions in SFINCS format, including both ASCII and netCDF files. Furthermore, it allows to generate time series from astronomical constituents (if available). """ _default_varname = "bzs"
[docs] def __init__(self, model: "SfincsModel"): super().__init__(model)
[docs] def read(self, format: str = None): """Read SFINCS boundary conditions (.bnd, .bzs, .bca files) or netcdf file. The format of the boundary conditions files can be specified, otherwise it is determined from the model configuration. Parameters ---------- format : str, optional Format of the boundary conditions files, "asc" or "netcdf". """ if format is None: if self.model.config.get("netbndbzsbzifile"): format = "netcdf" else: format = "asc" if format == "asc": gdf = self.read_boundary_points() # Check if there are any points if not gdf.empty: df = self.read_boundary_conditions_timeseries() self.set(df=df, gdf=gdf, merge=False, drop_duplicates=False) # Read astro if bcafile is defined if self.model.config.get("bcafile"): self.read_boundary_conditions_astro() elif format == "netcdf": # Read netcdf file ds = self.read_boundary_conditions_netcdf() self.set(geodataset=ds, merge=False, drop_duplicates=False)
def read_boundary_points(self, filename: str | Path = None): """Read SFINCS boundary condition points (.bnd) file""" # Check that read mode is on self.root._assert_read_mode() # Get absolute file name and set it in config if crsfile is not None abs_file_path = self.model.config.get_set_file_variable( "bndfile", value=filename ) # Check if abs_file_path is None if abs_file_path is None: # File name not defined return gpd.GeoDataFrame() # Check if bnd file exists if not abs_file_path.exists(): raise FileNotFoundError(f"Discharge points file not found: {abs_file_path}") # Read bnd file # TODO check if we want read_xyn? Before we used read_xy, so without name column gdf = utils.read_xyn(abs_file_path, crs=self.model.crs) return gdf def read_boundary_conditions_timeseries(self, filename: str | Path = None): """Read SFINCS boundary condition timeseries (.bzs) file""" # Check that read mode is on self.root._assert_read_mode() # Get absolute file name and set it in config if crsfile is not None abs_file_path = self.model.config.get_set_file_variable( "bzsfile", value=filename ) # Check if abs_file_path is None if abs_file_path is None: # File name not defined return # Check if bzs file exists if not abs_file_path.exists(): raise FileNotFoundError( f"Boundary condition timeseries file not found: {abs_file_path}" ) # Read bzs file (this creates one DataFrame with all timeseries) df = utils.read_timeseries(abs_file_path, tref=self.model.config.get("tref")) df.index.name = "time" df.columns.name = "index" return df def read_boundary_conditions_astro(self, filename: str | Path = None): """Read SFINCS boundary condition astro (.bca) file""" # Check that read mode is on self.root._assert_read_mode() # Get absolute file name and set it in config if bcafile is not None abs_file_path = self.model.config.get_set_file_variable( "bcafile", value=filename ) # Check if abs_file_path is None if abs_file_path is None: # File name not defined return # Check if bca file exists if not abs_file_path.exists(): raise FileNotFoundError( f"Boundary condition astro file not found: {abs_file_path}" ) if self.nr_points == 0: return # Read bca file, which is actually some sort of toml file d = IniStruct(filename=abs_file_path) # Store all constituents in a list section_data = [d.section[i].data for i in range(self.nr_points)] # Add constituents self._data = add_constituents(self.data, section_data) def read_boundary_conditions_netcdf(self, filename: str | Path = None): """Read SFINCS boundary conditions netcdf file""" # Check that read mode is on self.root._assert_read_mode() # Get absolute file name and set it in config if netbndbzsbzifile is not None abs_file_path = self.model.config.get_set_file_variable( "netbndbzsbzifile", value=filename ) # Check if abs_file_path is None if abs_file_path is None: # File name not defined return # Check if netbndbzsbzifile exists if not abs_file_path.exists(): raise FileNotFoundError( f"Boundary condition netcdf file not found: {abs_file_path}" ) # Read netcdf file ds = GeoDataset.from_netcdf(abs_file_path, crs=self.model.crs, chunks="auto") return ds
[docs] def write(self, format: str = None): """Write SFINCS boundary conditions (.bnd, .bzs, .bca files) or netcdf file. The format of the boundary conditions files can be specified, otherwise it is determined from the model configuration. Parameters ---------- format : str, optional Format of the boundary conditions files, "asc" (default), or "netcdf". """ if self.nr_points == 0: # There are no boundary points return if format is None: if self.model.config.get("netbndbzsbzifile"): format = "netcdf" else: format = "asc" if format == "asc": self.write_boundary_points() self.write_boundary_conditions_timeseries() if self.model.config.get("bcafile"): self.write_boundary_conditions_astro() else: self.write_boundary_conditions_netcdf() if self.model.write_gis: utils.write_vector( self.gdf, name="bnd", root=join(self.model.root.path, "gis"), logger=logger, )
def write_boundary_points(self, filename: str | Path = None): """Write SFINCS boundary condition points (.bnd) file""" # Check that write mode is on self.root._assert_write_mode() # Get absolute file name and set it in config if bndfile is not None abs_file_path = self.model.config.get_set_file_variable( "bndfile", value=filename, default="sfincs.bnd" ) # Create parent directories if they do not exist abs_file_path.parent.mkdir(parents=True, exist_ok=True) # Write bnd file # Change precision of coordinates according to crs if self.model.crs.is_geographic: fmt = "%11.6f" else: fmt = "%11.1f" # TODO check whether write_xyn or write_xy utils.write_xyn(abs_file_path, self.gdf, fmt=fmt) def write_boundary_conditions_timeseries(self, filename: str | Path = None): """Write SFINCS boundary condition timeseries (.bzs) file""" # Check that write mode is on self.root._assert_write_mode() # Get absolute file name and set it in config if bzsfile is not None abs_file_path = self.model.config.get_set_file_variable( "bzsfile", value=filename, default="sfincs.bzs" ) # Create parent directories if they do not exist abs_file_path.parent.mkdir(parents=True, exist_ok=True) # parse data to dataframe da = self.data["bzs"].transpose("time", ...) df = da.to_pandas() # Write to file utils.write_timeseries(abs_file_path, df, self.model.config.get("tref")) def write_boundary_conditions_astro(self, filename: str | Path = None): """Write SFINCS boundary condition astro (.bca) file""" # Check that write mode is on self.root._assert_write_mode() # Get absolute file name and set it in config if bcafile is not None abs_file_path = self.model.config.get_set_file_variable( "bcafile", value=filename, default="sfincs.bca" ) # Create parent directories if they do not exist abs_file_path.parent.mkdir(parents=True, exist_ok=True) amp = self.data["amplitude"].to_pandas() pha = self.data["phase"].to_pandas() with open(abs_file_path, "w") as fid: for ip in self.data.index.values: # Optional: if you have names from your dataset if "name" in self.data.coords: name = f"sfincs_{int(self.data.name.sel(index=ip).item()):04d}" else: name = f"sfincs_{ip+1:04d}" fid.write(f"[forcing]\n") fid.write(f"Name = {name}\n") fid.write(f"Function = astronomic\n") fid.write(f"Quantity = astronomic component\n") fid.write(f"Unit = -\n") fid.write( f"Quantity = waterlevelbnd amplitude\n" ) fid.write(f"Unit = m\n") fid.write(f"Quantity = waterlevelbnd phase\n") fid.write(f"Unit = deg\n") # Write each constituent (skip NaN) for constituent in self.data.constituent.values: a = amp.loc[ip, constituent] p = pha.loc[ip, constituent] if not (pd.isna(a) or pd.isna(p)): fid.write(f"{constituent:6s}{a:10.5f}{p:10.2f}\n") fid.write("\n") def write_boundary_conditions_netcdf(self, filename: str | Path = None): """Write SFINCS boundary condition netcdf (.nc) file""" # Check that write mode is on self.root._assert_write_mode() # Get absolute file name and set it in config if netbndbzsbzifile is not None abs_file_path = self.model.config.get_set_file_variable( "netbndbzsbzifile", value=filename, default="sfincs_netbndbzsbzifile.nc" ) # Create parent directories if they do not exist abs_file_path.parent.mkdir(parents=True, exist_ok=True) # Check if abs_file_path is None if abs_file_path is None: # File name not defined return ds = self.data.load() # Write netcdf file safely (might get locked) final_path = utils.write_netcdf_safely(ds, abs_file_path) if final_path != abs_file_path: self.model.config.set("netbndbzsbzifile", final_path.name) def delete(self, index: Union[int, List[int]]): "Delete boundary points and clear config when no points remain." super().delete(index) if self.nr_points == 0: self.model.config.set("bndfile", None) self.model.config.set("bzsfile", None) self.model.config.set("bcafile", None) self.model.config.set("netbndbzsbzifile", None) def clear(self): "Clear boundary points and unset associated config keys." super().clear() self.model.config.set("bndfile", None) self.model.config.set("bzsfile", None) self.model.config.set("bcafile", None) self.model.config.set("netbndbzsbzifile", None)
[docs] @hydromt_step def create( self, geodataset: Union[str, Path, xr.Dataset] = None, timeseries: Union[str, Path, pd.DataFrame] = None, locations: Union[str, Path, gpd.GeoDataFrame] = None, offset: Union[str, Path, xr.Dataset] = None, buffer: float = 5e3, merge: bool = True, drop_duplicates: bool = True, ): """Setup waterlevel forcing. Waterlevel boundary conditions are read from a `geodataset` (geospatial point timeseries) or a tabular `timeseries` dataframe. At least one of these must be provided. The tabular timeseries data is combined with `locations` if provided, or with existing 'bnd' locations if previously set. Adds model forcing layers: * **bzs** forcing: waterlevel time series [m+ref] Parameters ---------- geodataset: str, Path, xr.Dataset, optional Path, data source name, or xarray data object for geospatial point timeseries. timeseries: str, Path, pd.DataFrame, optional Path, data source name, or pandas data object for tabular timeseries. locations: str, Path, gpd.GeoDataFrame, optional Path, data source name, or geopandas object for bnd point locations. It should contain a 'index' column matching the column names in `timeseries`. offset: str, Path, xr.Dataset, float, optional Path, data source name, constant value or xarray raster data for gridded offset between vertical reference of elevation and waterlevel data, The offset is added to the waterlevel data. buffer: float, optional Buffer [m] around model water level boundary cells to select waterlevel gauges, by default 5 km. merge : bool, optional If True, merge with existing forcing data, by default True. drop_duplicates : bool, optional If True, drop duplicate points in gdf based on 'name' column or geometry. See Also -------- set_forcing_1d """ gdf_locs, df_ts = None, None tstart, tstop = self.model.get_model_time() # model time # buffer around msk==2 values if not self.model.grid_type == "quadtree": if np.any(self.model.grid.mask == 2): # get region around waterlevel boundary cells region = self.model.grid.mask.where( self.model.grid.mask == 2, 0 ).raster.vectorize() else: raise ValueError( "No waterlevel boundary cells (mask==2) in model grid." ) else: region = self.model.region # read waterlevel data from geodataset or geodataframe if geodataset is not None: # read and clip data in time & space da = self.data_catalog.get_geodataset( geodataset, geom=region, buffer=buffer, variables=["waterlevel"], time_range=(tstart, tstop), ) df_ts = da.transpose(..., da.vector.index_dim).to_pandas() gdf_locs = da.vector.to_gdf() elif timeseries is not None: df_ts = self.data_catalog.get_dataframe( timeseries, time_range=(tstart, tstop), source_kwargs={ "driver": { "name": "pandas", "options": {"index_col": 0, "parse_dates": True}, } }, ) df_ts.columns = df_ts.columns.map(int) # parse column names to integers used_existing = False # read location data (if not already read from geodataset) if gdf_locs is None and locations is not None: gdf_locs = self.data_catalog.get_geodataframe( locations, geom=region, buffer=buffer, ).to_crs(self.model.crs) if "index" in gdf_locs.columns: gdf_locs = gdf_locs.set_index("index") # filter df_ts timeseries based on gdf_locs index # this allows to use a subset of the locations in the timeseries if df_ts is not None and np.isin(gdf_locs.index, df_ts.columns).all(): df_ts = df_ts.reindex(gdf_locs.index, axis=1, fill_value=0) elif gdf_locs is None and "bzs" in self.data: # no locations provided, using existing waterlevel boundary points from data used_existing = True gdf_locs = self.data[ "bzs" ].vector.to_gdf() # NOTE this is now done in set_timeseries ... elif gdf_locs is None: raise ValueError("No waterlevel boundary (bnd) points provided.") # It is still possible that all points are outside the region+buffer, this error should provide clear feedback if gdf_locs.is_empty.all(): raise ValueError( "All waterlevel boundary points provided are outside the active model domain plus specified buffer. " "Check the provided locations or increase the value of the buffer argument." ) # optionally read offset data and correct df_ts if offset is not None and gdf_locs is not None: if isinstance(offset, (float, int)): df_ts += offset else: da_offset = self.data_catalog.get_rasterdataset( offset, bbox=self.model.bbox, buffer=5, ) offset_pnts = da_offset.raster.sample(gdf_locs) df_offset = offset_pnts.to_pandas().reindex(df_ts.columns).fillna(0) df_ts = df_ts + df_offset offset = offset_pnts.mean().values logger.debug(f"waterlevel forcing: applied offset (avg: {offset:+.2f})") # set/ update forcing if used_existing: gdf_locs = None # only update timeseries for existing points self.set(df=df_ts, gdf=gdf_locs, merge=merge, drop_duplicates=drop_duplicates) # update config if geodataset is not None: # when reading from geodataset, keep the format to netcdf self.model.config.set("netbndbzsbzifile", "sfincs_netbndbzsbzifile.nc") else: self.model.config.set("bndfile", "sfincs.bnd") self.model.config.set("bzsfile", "sfincs.bzs")
[docs] @hydromt_step def create_timeseries( self, index: Union[int, List[int]] = None, shape: str = "constant", timestep: float = 600.0, offset: float = 0.0, amplitude: float = 1.0, phase: float = 0.0, period: float = 43200.0, peak: float = 1.0, tpeak: float = 86400.0, duration: float = 43200.0, ): """Applies time series boundary conditions for each point Create numpy datetime64 array for time series with python datetime.datetime objects Parameters ---------- shape : str Shape of the time series. Options are "constant", "sine", "gaussian", "astronomical". timestep : float Time step [s] offset : float Offset of the time series [m] amplitude : float Amplitude of the sine wave [m] phase : float Phase of the sine wave [degrees] period : float Period of the sine wave [s] peak : float Peak of the Gaussian wave [m] tpeak : float Time of the peak of the Gaussian wave [s] duration : float Duration of the Gaussian wave [s] """ if self.nr_points == 0: raise ValueError( "Cannot create timeseries without existing waterlevel boundary points" ) t0 = np.datetime64(self.model.config.get("tstart")) t1 = np.datetime64(self.model.config.get("tstop")) if shape == "constant": dt = np.timedelta64(int((t1 - t0).astype(float) / 1e6), "s") else: dt = np.timedelta64(int(timestep), "s") time = np.arange(t0, t1 + dt, dt) dtsec = dt.astype(float) # Convert time to seconds since tref tsec = ( (time - np.datetime64(self.model.config.get("tref"))) .astype("timedelta64[s]") .astype(float) ) nt = len(tsec) if shape == "constant": wl = [offset] * nt elif shape == "sine": wl = offset + amplitude * np.sin( 2 * np.pi * tsec / period + phase * np.pi / 180 ) elif shape == "gaussian": wl = offset + peak * np.exp(-(((tsec - tpeak) / (0.25 * duration)) ** 2)) elif shape == "astronomical": # Use existing method self.generate_bzs_from_bca(dt=timestep, offset=offset, write_file=False) return else: raise NotImplementedError( f"Shape '{shape}' is not implemented. Use 'constant', 'sine', 'gaussian' or 'astronomical'." ) times = pd.date_range( start=t0, end=t1, freq=pd.tseries.offsets.DateOffset(seconds=dtsec) ) if index is None: index = list(self.data.index.values) elif not isinstance(index, list): index = [index] # Create DataFrame: rows = time, columns = locations (index), values = wl (same for all) df = pd.DataFrame( data=np.tile(wl, (len(index), 1)).T, index=times, columns=index ) # Call set_timeseries to update your object's data self.set_timeseries(df)
[docs] @hydromt_step def create_timeseries_from_astro( self, dt: float = 600.0, offset: float = 0.0, ): """Generates boundary time series file from astronomical components Parameters ---------- dt : float, optional, default 600.0 Time step [s] offset : float, optional, default 0.0 Offset of the time series [m] """ if self.nr_points == 0: return times = pd.date_range( start=self.model.config.get("tstart"), end=self.model.config.get("tstop"), freq=pd.tseries.offsets.DateOffset(seconds=dt), ) df_ts = pd.DataFrame(index=times) for ip in self.data.index.values: df = pd.DataFrame( { "constituent": self.data.constituent.values, "amplitude": self.data["amplitude"].loc[ip].values, "phase": self.data["phase"].loc[ip].values, } ) df = df.dropna(subset=["amplitude", "phase"]) # remove NaN paddings df = df.set_index("constituent") v = predict(df, times) + offset ts = pd.Series(v, index=times) df_ts[ip] = ts # Call set_timeseries to update your object's data self.set_timeseries(df_ts)
def generate_bzs_from_bca( self, dt: float = 600.0, offset: float = 0.0, write_file: bool = True ): """Function called in CoSMoS to generate bzs file from bca file. Should probably update CoSMoS and only use generate_timeseries_from_astro""" self.create_timeseries_from_astro(dt=dt, offset=offset) if write_file: self.write_boundary_conditions_timeseries()
[docs] def create_boundary_points_from_mask(self, min_dist=None, bnd_dist=5000.0): """Get boundary points from mask in quadtree grid. Should make utils function as sfincs_snapwave_boundary conditions uses nearly same code Also, regular grid has similar code. Maybe that is more efficient or better. """ if self.model.grid_type == "regular": # get waterlevel boundary vector based on mask gdf_msk = utils.get_bounds_vector(self.model.grid.mask) gdf_msk2 = gdf_msk[gdf_msk["value"] == 2] # convert to meters if crs is geographic if self.model.crs.is_geographic: bnd_dist = bnd_dist / 111111.0 # create points along boundary points = [] for _, row in gdf_msk2.iterrows(): distances = np.arange(0, row.geometry.length, bnd_dist) for d in distances: point = row.geometry.interpolate(d) points.append((point.x, point.y)) # create geodataframe with points gdf = gpd.GeoDataFrame( geometry=gpd.points_from_xy(*zip(*points)), crs=self.model.crs ) elif self.model.grid_type == "quadtree": if min_dist is None: # Set minimum distance between to grid boundary points on polyline to 2 * dx min_dist = self.model.quadtree_grid.data.attrs["dx"] * 2 mask = self.model.quadtree_grid.data["mask"] ibnd = np.where(mask == 2) xz, yz = self.model.quadtree_grid.face_coordinates xp = xz[ibnd] yp = yz[ibnd] # Make boolean array for points that are include in a polyline used = np.full(xp.shape, False, dtype=bool) # Make list of polylines. Each polyline is a list of indices of boundary points. polylines = [] while True: if np.all(used): # All boundary grid points have been used. We can stop now. break # Find first the unused points i1 = np.where(~used)[0][0] # Set this point to used used[i1] = True # Start new polyline with index i1 polyline = [i1] while True: # Compute distances to all points that have not been used xpunused = xp[~used] ypunused = yp[~used] # Get all indices of unused points unused_indices = np.where(~used)[0] dst = np.sqrt((xpunused - xp[i1]) ** 2 + (ypunused - yp[i1]) ** 2) if np.all(np.isnan(dst)): break inear = np.nanargmin(dst) inearall = unused_indices[inear] if dst[inear] < min_dist: # Found next point along polyline polyline.append(inearall) used[inearall] = True i1 = inearall else: # Last point found break # Now work the other way # Start with first point of polyline i1 = polyline[0] while True: if np.all(used): # All boundary grid points have been used. We can stop now. break # Now we go in the other direction xpunused = xp[~used] ypunused = yp[~used] unused_indices = np.where(~used)[0] dst = np.sqrt((xpunused - xp[i1]) ** 2 + (ypunused - yp[i1]) ** 2) inear = np.nanargmin(dst) inearall = unused_indices[inear] if dst[inear] < min_dist: # Found next point along polyline polyline.insert(0, inearall) used[inearall] = True # Set index of next point i1 = inearall else: # Last nearby point found break if len(polyline) > 1: polylines.append(polyline) gdf_list = [] ip = 0 # Transform to web mercator to get distance in metres if self.model.crs.is_geographic: transformer = Transformer.from_crs(self.model.crs, 3857, always_xy=True) # Loop through polylines for polyline in polylines: x = xp[polyline] y = yp[polyline] points = [(x, y) for x, y in zip(x.ravel(), y.ravel())] line = shapely.geometry.LineString(points) if self.model.crs.is_geographic: # Line in web mercator (to get length in metres) xm, ym = transformer.transform(x, y) pointsm = [(xm, ym) for xm, ym in zip(xm.ravel(), ym.ravel())] linem = shapely.geometry.LineString(pointsm) num_points = int(linem.length / bnd_dist) + 2 else: num_points = int(line.length / bnd_dist) + 2 # Interpolate to new points new_points = [ line.interpolate(i / float(num_points - 1), normalized=True) for i in range(num_points) ] # Loop through points in polyline for point in new_points: name = str(ip + 1).zfill(4) d = { "name": name, "timeseries": pd.DataFrame(), "astro": pd.DataFrame(), "geometry": point, } gdf_list.append(d) ip += 1 gdf = gpd.GeoDataFrame(gdf_list, crs=self.model.crs) self.set_locations(gdf, merge=False)
def add_constituents(ds, section_data): """ Attach tidal constituent data to an existing Dataset that already has a 'bzs(time, index)' variable. Parameters ---------- ds : xr.Dataset or xr.DataArray Dataset containing 'bzs(time, index)'. section_data : list of pandas.DataFrame One DataFrame per index point. Each DataFrame must have: - index: constituent names - first column: amplitude - second column: phase Returns ------- xr.Dataset Same dataset, with two new variables: - amplitude(index, constituent) - phase(index, constituent) """ # Ensure we have a Dataset if isinstance(ds, xr.DataArray): ds = ds.to_dataset(name="bzs") # Collect all constituent names across all points all_constituents = sorted(set().union(*[df.index for df in section_data])) # Allocate arrays (index x constituent) n_points = len(section_data) n_const = len(all_constituents) amp = np.full((n_points, n_const), np.nan) pha = np.full((n_points, n_const), np.nan) # Fill arrays for i, df in enumerate(section_data): for cname in df.index: j = all_constituents.index(cname) amp[i, j] = df.iloc[df.index.get_loc(cname), 0] # amplitude pha[i, j] = df.iloc[df.index.get_loc(cname), 1] # phase # Attach to dataset ds["amplitude"] = (("index", "constituent"), amp) ds["phase"] = (("index", "constituent"), pha) ds = ds.assign_coords(constituent=all_constituents) return ds