import xugrid as xu
import meshkernel
import xarray as xr
import datetime as dt
from dfm_tools import __version__
import getpass
import numpy as np
from dfm_tools.coastlines import get_coastlines_gdb
import geopandas as gpd
from shapely import MultiPolygon, LineString, MultiLineString
from shapely.ops import linemerge
from itertools import groupby
from shapely import Polygon
from meshkernel import GeometryList, MeshKernel
__all__ = [
"meshkernel_delete_withcoastlines",
"meshkernel_delete_withshp",
"meshkernel_delete_withgdf",
"meshkernel_get_illegalcells",
"meshkernel_to_UgridDataset",
"meshkernel_get_bbox",
"make_basegrid",
"refine_basegrid",
"generate_bndpli_cutland",
"interpolate_bndpli",
]
[docs]
def meshkernel_get_bbox(mk:meshkernel.MeshKernel):
"""
get the bbox of a meshkernel instance as (xmin, ymin, xmax, ymax)
"""
mesh2d = mk.mesh2d_get()
xcoords = mesh2d.node_x
ycoords = mesh2d.node_y
bbox = (xcoords.min(), ycoords.min(), xcoords.max(), ycoords.max())
return bbox
[docs]
def meshkernel_delete_withcoastlines(mk:meshkernel.MeshKernel, res:str='f', min_area:float = 0, crs:(int,str) = None):
"""
Wrapper around meshkernel_delete_withgdf, which automatically gets the bbox from the meshkernel object and retrieves the coastlines_gdf.
Parameters
----------
mk : meshkernel.MeshKernel
DESCRIPTION.
res : str, optional
DESCRIPTION. The default is 'f'.
min_area : float, optional
DESCRIPTION. The default is 0.
crs : (int,str), optional
DESCRIPTION. The default is None.
Returns
-------
None.
"""
bbox = meshkernel_get_bbox(mk)
coastlines_gdf = get_coastlines_gdb(bbox=bbox, res=res, min_area=min_area, crs=crs)
meshkernel_delete_withgdf(mk, coastlines_gdf)
[docs]
def meshkernel_delete_withshp(mk:meshkernel.MeshKernel, coastlines_shp:str, crs:(int,str) = None):
"""
Delete parts of mesh that are inside the shapefile polygon.
Parameters
----------
mk : meshkernel.MeshKernel
DESCRIPTION.
coastlines_shp : str
Path to the shp file.
crs : (int,str), optional
DESCRIPTION. The default is None.
Returns
-------
None.
"""
bbox = meshkernel_get_bbox(mk)
coastlines_gdb = gpd.read_file(coastlines_shp, bbox=bbox)
if crs:
coastlines_gdb = coastlines_gdb.to_crs(crs)
meshkernel_delete_withgdf(mk, coastlines_gdb)
[docs]
def meshkernel_delete_withgdf(mk:meshkernel.MeshKernel, coastlines_gdf:gpd.GeoDataFrame):
"""
Delete parts of mesh that are inside the polygons/Linestrings in a GeoDataFrame.
Parameters
----------
mk : meshkernel.MeshKernel
DESCRIPTION.
coastlines_gdf : gpd.GeoDataFrame
DESCRIPTION.
Returns
-------
None.
"""
for coastline_geom in coastlines_gdf['geometry']:
xx, yy = coastline_geom.exterior.coords.xy
xx = np.array(xx)
yy = np.array(yy)
delete_pol_geom = meshkernel.GeometryList(x_coordinates=xx, y_coordinates=yy) #TODO: .copy()/to_numpy() makes the array contiguous in memory, which is necessary for meshkernel.mesh2d_delete()
mk.mesh2d_delete(geometry_list=delete_pol_geom,
delete_option=meshkernel.DeleteMeshOption.INSIDE_NOT_INTERSECTED,
invert_deletion=False)
[docs]
def meshkernel_get_illegalcells(mk):
# first reset the mesh to get a blended inner boundary
# TODO: remove this after fixing https://github.com/Deltares/MeshKernelPy/issues/253
mk2 = MeshKernel(projection=mk.get_projection())
mk2.mesh2d_set(mk.mesh2d_get())
# get illegalcells from meshkernel instance
illegalcells_geom = mk2.mesh2d_get_mesh_inner_boundaries_as_polygons()
# convert xy coords to numpy array
illegalcells_np = np.c_[illegalcells_geom.x_coordinates, illegalcells_geom.y_coordinates]
# split illegalcells array based on the geomtry_separator
xy_lists = [list(g) for k, g in groupby(illegalcells_np, lambda x: (x != illegalcells_geom.geometry_separator).all()) if k]
# convert to geodataframe of Polygons
list_polygons = [Polygon(xylist) for xylist in xy_lists]
illegalcells_gdf = gpd.GeoDataFrame(geometry=list_polygons)
# only include polygons around cells up to 6 edges/nodes (polygon of 7 points)
# larger "cells" are not considered as cells by the FM kernel and meshkernel
pol_npoints = illegalcells_gdf.count_coordinates()
bool_illegal = pol_npoints <= 7
illegalcells_gdf = illegalcells_gdf.loc[bool_illegal]
return illegalcells_gdf
def geographic_to_meshkernel_projection(is_geographic:bool) -> meshkernel.ProjectionType:
"""
converts is_geographic boolean to meshkernel.ProjectionType (SPHERICAL OR CARTESIAN)
Parameters
----------
is_geographic : bool
DESCRIPTION.
Returns
-------
projection : TYPE
DESCRIPTION.
"""
if is_geographic:
projection = meshkernel.ProjectionType.SPHERICAL
else:
projection = meshkernel.ProjectionType.CARTESIAN
return projection
def meshkernel_is_geographic(mk):
if mk.get_projection()==meshkernel.ProjectionType.CARTESIAN:
is_geographic = False
else:
is_geographic = True
return is_geographic
[docs]
def meshkernel_to_UgridDataset(mk:meshkernel.MeshKernel, crs:(int,str) = None) -> xu.UgridDataset:
"""
Convert a meshkernel object to a UgridDataset. The UgridDataset enables bathymetry
interpolation and writing to netfile.
Parameters
----------
mk : meshkernel.MeshKernel
DESCRIPTION.
crs : (int,str), optional
DESCRIPTION. The default is None.
Returns
-------
TYPE
DESCRIPTION.
"""
#check if both crs and grid are geograpic or not
crs_is_geographic = crs_to_isgeographic(crs)
grid_is_geographic = meshkernel_is_geographic(mk)
if crs_is_geographic != grid_is_geographic:
raise ValueError(f"crs has is_geographic={crs_is_geographic} and grid has is_geographic={grid_is_geographic}. This is not allowed.")
# convert meshkernel to Ugrid2d
mesh2d_grid = mk.mesh2d_get()
xu_ugrid2d = xu.Ugrid2d.from_meshkernel(mesh2d_grid, crs=crs)
# convert 0-based to 1-based indices for connectivity variables like face_node_connectivity
# this is required by delft3dfm
xu_ugrid2d.start_index = 1
# convert to uds and add attrs and crs
uds = xu.UgridDataset(grids=[xu_ugrid2d])
# temporarily assign_node_coords to avoid conversion to xr.Dataset when calling .assign_attrs()
# TODO: to be fixed in https://github.com/Deltares/xugrid/issues/412
uds = uds.ugrid.assign_node_coords()
uds = uds.assign_attrs({
'institution': 'Deltares',
'references': 'https://www.deltares.nl',
'source': f'Created with meshkernel {meshkernel.__version__}, xugrid {xu.__version__} and dfm_tools {__version__}',
'history': 'Created on %s, %s'%(dt.datetime.now().strftime('%Y-%m-%dT%H:%M:%S%z'),getpass.getuser()), #TODO: add timezone
})
# add crs including attrs
if crs is not None:
uds.ugrid.set_crs(crs)
return uds
def crs_to_isgeographic(crs=None):
if crs is None:
is_geographic = False
else:
import pyproj
crs = pyproj.CRS.from_user_input(crs)
is_geographic = crs.is_geographic
return is_geographic
[docs]
def make_basegrid(lon_min,lon_max,lat_min,lat_max,dx,dy,angle=0,
crs=None, is_geographic=None):
"""
empty docstring
"""
if is_geographic is not None:
raise ValueError("'is_geographic' was deprecated as argument for dfmt.make_basegrid(), it is now derived from 'crs' instead")
is_geographic = crs_to_isgeographic(crs)
projection = geographic_to_meshkernel_projection(is_geographic)
# create base grid
make_grid_parameters = meshkernel.MakeGridParameters(angle=angle,
origin_x=lon_min,
origin_y=lat_min,
upper_right_x=lon_max,
upper_right_y=lat_max,
block_size_x=dx,
block_size_y=dy)
mk = meshkernel.MeshKernel(projection=projection)
mk.curvilinear_compute_rectangular_grid_on_extension(make_grid_parameters)
mk.curvilinear_convert_to_mesh2d() #convert to ugrid/mesh2d
return mk
[docs]
def refine_basegrid(
mk:meshkernel.MeshKernel,
data_bathy_sel:xr.DataArray,
polygon: GeometryList = GeometryList(),
**kwargs,
):
"""
Refine the grid based on gridded bathymetry.
Parameters
----------
mk : meshkernel.MeshKernel
The meshkernel instance containing the base grid. Updated in place.
data_bathy_sel : xr.DataArray
The bathymetry data used for the refinement. Is converted to
meshkernel.GriddedSamples().
polygon : GeometryList, optional
A polygon in the format of meshkernel.GeometryList. Grid refinement will only
happen within the specified polygon. The default is an empty GeometryList(),
resulting in refinement everywhere.
**kwargs : TYPE
Arguments passed on to meshkernel.MeshRefinementParameters(). Some
options from the meshkernelpy docs:
* min_edge_size (float): Minimum edge size. Meshkernelpy default is
`0.5`. Overwritten with 500 here.
* refinement_type (RefinementType): Refinement criterion type.
Meshkernelpy default is `RefinementType.REFINEMENT_LEVELS`. Overwritten
with `RefinementType.WAVE_COURANT` here.
* smoothing_iterations (int, optional): The number of smoothing
iterations. Meshkernelpy default is `5`. Overwritten with 2 here.
* connect_hanging_nodes (bool): Whether to connect hanging nodes at the
end of the iteration. Meshkernelpy default is `True`. Set to False to
do multiple refinement steps (e.g. for multiple regions)
* max_courant_time (double, optional): Maximum courant time in seconds.
Meshkernelpy default is `120`.
"""
if 'min_edge_size' not in kwargs:
# min_edge_size (float): Minimum edge size. Meshkernelpy default is
# `0.5`. Overwrite with more sensible value for coastal models.
kwargs['min_edge_size'] = 500
if 'refinement_type' not in kwargs:
# Refinement criterion type. Meshkernelpy default is
# `RefinementType.REFINEMENT_LEVELS`.
kwargs['refinement_type'] = meshkernel.RefinementType.WAVE_COURANT
if 'smoothing_iterations' not in kwargs:
# The number of smoothing iterations. Meshkernelpy default is `5`.
kwargs['smoothing_iterations'] = 2
lon_np = data_bathy_sel.lon.to_numpy()
lat_np = data_bathy_sel.lat.to_numpy()
values_np = data_bathy_sel.to_numpy().flatten()
gridded_samples = meshkernel.GriddedSamples(
x_coordinates=lon_np,
y_coordinates=lat_np,
values=values_np)
#refinement
mesh_refinement_parameters = meshkernel.MeshRefinementParameters(**kwargs)
mk.mesh2d_refine_based_on_gridded_samples(
gridded_samples=gridded_samples,
mesh_refinement_params=mesh_refinement_parameters,
use_nodal_refinement=True,
polygon=polygon,
)
def meshkernel_get_outer_xycoords(mk:meshkernel.MeshKernel):
mesh_bnds = mk.mesh2d_get_mesh_boundaries_as_polygons()
xcoords = mesh_bnds.x_coordinates
ycoords = mesh_bnds.y_coordinates
if mesh_bnds.geometry_separator in xcoords:
# TODO: would be nice to get only the outer xycoords instead
# or to have easier selection of outer xycoords
raise Exception('use meshkernel_get_outer_xycoords() on an uncut grid')
return xcoords, ycoords
[docs]
def generate_bndpli_cutland(mk:meshkernel.MeshKernel, res:str='f', min_area:float = 0, crs:(int,str) = None, buffer:float = 0):
"""
Generate a boundary polyline from the meshkernel object and cut away the landward part.
Be sure to do this on the base/refined grid, not on the grid where the landward cells were already cut.
Parameters
----------
mk : meshkernel.MeshKernel
DESCRIPTION.
res : str, optional
DESCRIPTION. The default is 'f'.
min_area : float, optional
DESCRIPTION. The default is 0.
crs : (int,str), optional
DESCRIPTION. The default is None.
buffer : float, optional
DESCRIPTION. The default is 0.
Returns
-------
bnd_gdf : TYPE
DESCRIPTION.
"""
bbox = meshkernel_get_bbox(mk)
coastlines_gdf = get_coastlines_gdb(bbox=bbox, res=res, min_area=min_area, crs=crs)
xcoords,ycoords = meshkernel_get_outer_xycoords(mk)
mesh_bnds_xy = np.c_[xcoords,ycoords]
meshbnd_ls = LineString(mesh_bnds_xy)
coastlines_mp = MultiPolygon(coastlines_gdf.geometry.tolist())
coastlines_mp = coastlines_mp.buffer(buffer)
bnd_ls = meshbnd_ls.difference(coastlines_mp)
#attempt to merge MultiLineString to single LineString
if isinstance(bnd_ls,MultiLineString):
print('attemting to merge lines in MultiLineString to single LineString (if connected)')
bnd_ls = linemerge(bnd_ls)
#convert MultiLineString/LineString to GeoDataFrame
if isinstance(bnd_ls,MultiLineString):
bnd_gdf = gpd.GeoDataFrame(geometry=list(bnd_ls.geoms))
elif isinstance(bnd_ls,LineString):
bnd_gdf = gpd.GeoDataFrame(geometry=[bnd_ls])
#set crs from coastlines
bnd_gdf.crs = coastlines_gdf.crs
return bnd_gdf
[docs]
def interpolate_bndpli(bnd_gdf,res):
"""
interpolate bnd_gdf to a new resolution
"""
#TODO: keep corners of grid, maybe use mk.polygon_refine()
bnd_gdf_interp = bnd_gdf.copy()
for irow,row in bnd_gdf_interp.iterrows():
bnd_ls = row.geometry
interp_range = np.arange(0,bnd_ls.length,res)
bnd_ls_interp_points = bnd_ls.interpolate(interp_range)
if len(bnd_ls_interp_points)==1: #no change if interp results in only one point
continue
bnd_ls_interp = LineString(bnd_ls_interp_points)
bnd_gdf_interp.loc[irow,'geometry'] = bnd_ls_interp
return bnd_gdf_interp