Source code for dfm_tools.meshkernel_helpers

import xugrid as xu
import meshkernel
import xarray as xr
import datetime as dt
from dfm_tools import __version__
import getpass
import numpy as np
from dfm_tools.coastlines import get_coastlines_gdb
import geopandas as gpd
from shapely import MultiPolygon, LineString, MultiLineString
from shapely.ops import linemerge
from itertools import groupby
from shapely import Polygon
from meshkernel import GeometryList, MeshKernel

__all__ = [
    "meshkernel_delete_withcoastlines",
    "meshkernel_delete_withshp",
    "meshkernel_delete_withgdf",
    "meshkernel_get_illegalcells",
    "meshkernel_to_UgridDataset",
    "meshkernel_get_bbox",
    "make_basegrid",
    "refine_basegrid",
    "generate_bndpli_cutland",
    "interpolate_bndpli",
    ]

[docs] def meshkernel_get_bbox(mk:meshkernel.MeshKernel): """ get the bbox of a meshkernel instance as (xmin, ymin, xmax, ymax) """ mesh2d = mk.mesh2d_get() xcoords = mesh2d.node_x ycoords = mesh2d.node_y bbox = (xcoords.min(), ycoords.min(), xcoords.max(), ycoords.max()) return bbox
[docs] def meshkernel_delete_withcoastlines(mk:meshkernel.MeshKernel, res:str='f', min_area:float = 0, crs:(int,str) = None): """ Wrapper around meshkernel_delete_withgdf, which automatically gets the bbox from the meshkernel object and retrieves the coastlines_gdf. Parameters ---------- mk : meshkernel.MeshKernel DESCRIPTION. res : str, optional DESCRIPTION. The default is 'f'. min_area : float, optional DESCRIPTION. The default is 0. crs : (int,str), optional DESCRIPTION. The default is None. Returns ------- None. """ bbox = meshkernel_get_bbox(mk) coastlines_gdf = get_coastlines_gdb(bbox=bbox, res=res, min_area=min_area, crs=crs) meshkernel_delete_withgdf(mk, coastlines_gdf)
[docs] def meshkernel_delete_withshp(mk:meshkernel.MeshKernel, coastlines_shp:str, crs:(int,str) = None): """ Delete parts of mesh that are inside the shapefile polygon. Parameters ---------- mk : meshkernel.MeshKernel DESCRIPTION. coastlines_shp : str Path to the shp file. crs : (int,str), optional DESCRIPTION. The default is None. Returns ------- None. """ bbox = meshkernel_get_bbox(mk) coastlines_gdb = gpd.read_file(coastlines_shp, bbox=bbox) if crs: coastlines_gdb = coastlines_gdb.to_crs(crs) meshkernel_delete_withgdf(mk, coastlines_gdb)
[docs] def meshkernel_delete_withgdf(mk:meshkernel.MeshKernel, coastlines_gdf:gpd.GeoDataFrame): """ Delete parts of mesh that are inside the polygons/Linestrings in a GeoDataFrame. Parameters ---------- mk : meshkernel.MeshKernel DESCRIPTION. coastlines_gdf : gpd.GeoDataFrame DESCRIPTION. Returns ------- None. """ for coastline_geom in coastlines_gdf['geometry']: xx, yy = coastline_geom.exterior.coords.xy xx = np.array(xx) yy = np.array(yy) delete_pol_geom = meshkernel.GeometryList(x_coordinates=xx, y_coordinates=yy) #TODO: .copy()/to_numpy() makes the array contiguous in memory, which is necessary for meshkernel.mesh2d_delete() mk.mesh2d_delete(geometry_list=delete_pol_geom, delete_option=meshkernel.DeleteMeshOption.INSIDE_NOT_INTERSECTED, invert_deletion=False)
[docs] def meshkernel_get_illegalcells(mk): # first reset the mesh to get a blended inner boundary # TODO: remove this after fixing https://github.com/Deltares/MeshKernelPy/issues/253 mk2 = MeshKernel(projection=mk.get_projection()) mk2.mesh2d_set(mk.mesh2d_get()) # get illegalcells from meshkernel instance illegalcells_geom = mk2.mesh2d_get_mesh_inner_boundaries_as_polygons() # convert xy coords to numpy array illegalcells_np = np.c_[illegalcells_geom.x_coordinates, illegalcells_geom.y_coordinates] # split illegalcells array based on the geomtry_separator xy_lists = [list(g) for k, g in groupby(illegalcells_np, lambda x: (x != illegalcells_geom.geometry_separator).all()) if k] # convert to geodataframe of Polygons list_polygons = [Polygon(xylist) for xylist in xy_lists] illegalcells_gdf = gpd.GeoDataFrame(geometry=list_polygons) # only include polygons around cells up to 6 edges/nodes (polygon of 7 points) # larger "cells" are not considered as cells by the FM kernel and meshkernel pol_npoints = illegalcells_gdf.count_coordinates() bool_illegal = pol_npoints <= 7 illegalcells_gdf = illegalcells_gdf.loc[bool_illegal] return illegalcells_gdf
def geographic_to_meshkernel_projection(is_geographic:bool) -> meshkernel.ProjectionType: """ converts is_geographic boolean to meshkernel.ProjectionType (SPHERICAL OR CARTESIAN) Parameters ---------- is_geographic : bool DESCRIPTION. Returns ------- projection : TYPE DESCRIPTION. """ if is_geographic: projection = meshkernel.ProjectionType.SPHERICAL else: projection = meshkernel.ProjectionType.CARTESIAN return projection def meshkernel_is_geographic(mk): if mk.get_projection()==meshkernel.ProjectionType.CARTESIAN: is_geographic = False else: is_geographic = True return is_geographic
[docs] def meshkernel_to_UgridDataset(mk:meshkernel.MeshKernel, crs:(int,str) = None) -> xu.UgridDataset: """ Convert a meshkernel object to a UgridDataset. The UgridDataset enables bathymetry interpolation and writing to netfile. Parameters ---------- mk : meshkernel.MeshKernel DESCRIPTION. crs : (int,str), optional DESCRIPTION. The default is None. Returns ------- TYPE DESCRIPTION. """ #check if both crs and grid are geograpic or not crs_is_geographic = crs_to_isgeographic(crs) grid_is_geographic = meshkernel_is_geographic(mk) if crs_is_geographic != grid_is_geographic: raise ValueError(f"crs has is_geographic={crs_is_geographic} and grid has is_geographic={grid_is_geographic}. This is not allowed.") # convert meshkernel to Ugrid2d mesh2d_grid = mk.mesh2d_get() xu_ugrid2d = xu.Ugrid2d.from_meshkernel(mesh2d_grid, crs=crs) # convert 0-based to 1-based indices for connectivity variables like face_node_connectivity # this is required by delft3dfm xu_ugrid2d.start_index = 1 # convert to uds and add attrs and crs uds = xu.UgridDataset(grids=[xu_ugrid2d]) # temporarily assign_node_coords to avoid conversion to xr.Dataset when calling .assign_attrs() # TODO: to be fixed in https://github.com/Deltares/xugrid/issues/412 uds = uds.ugrid.assign_node_coords() uds = uds.assign_attrs({ 'institution': 'Deltares', 'references': 'https://www.deltares.nl', 'source': f'Created with meshkernel {meshkernel.__version__}, xugrid {xu.__version__} and dfm_tools {__version__}', 'history': 'Created on %s, %s'%(dt.datetime.now().strftime('%Y-%m-%dT%H:%M:%S%z'),getpass.getuser()), #TODO: add timezone }) # add crs including attrs if crs is not None: uds.ugrid.set_crs(crs) return uds
def crs_to_isgeographic(crs=None): if crs is None: is_geographic = False else: import pyproj crs = pyproj.CRS.from_user_input(crs) is_geographic = crs.is_geographic return is_geographic
[docs] def make_basegrid(lon_min,lon_max,lat_min,lat_max,dx,dy,angle=0, crs=None, is_geographic=None): """ empty docstring """ if is_geographic is not None: raise ValueError("'is_geographic' was deprecated as argument for dfmt.make_basegrid(), it is now derived from 'crs' instead") is_geographic = crs_to_isgeographic(crs) projection = geographic_to_meshkernel_projection(is_geographic) # create base grid make_grid_parameters = meshkernel.MakeGridParameters(angle=angle, origin_x=lon_min, origin_y=lat_min, upper_right_x=lon_max, upper_right_y=lat_max, block_size_x=dx, block_size_y=dy) mk = meshkernel.MeshKernel(projection=projection) mk.curvilinear_compute_rectangular_grid_on_extension(make_grid_parameters) mk.curvilinear_convert_to_mesh2d() #convert to ugrid/mesh2d return mk
[docs] def refine_basegrid( mk:meshkernel.MeshKernel, data_bathy_sel:xr.DataArray, polygon: GeometryList = GeometryList(), **kwargs, ): """ Refine the grid based on gridded bathymetry. Parameters ---------- mk : meshkernel.MeshKernel The meshkernel instance containing the base grid. Updated in place. data_bathy_sel : xr.DataArray The bathymetry data used for the refinement. Is converted to meshkernel.GriddedSamples(). polygon : GeometryList, optional A polygon in the format of meshkernel.GeometryList. Grid refinement will only happen within the specified polygon. The default is an empty GeometryList(), resulting in refinement everywhere. **kwargs : TYPE Arguments passed on to meshkernel.MeshRefinementParameters(). Some options from the meshkernelpy docs: * min_edge_size (float): Minimum edge size. Meshkernelpy default is `0.5`. Overwritten with 500 here. * refinement_type (RefinementType): Refinement criterion type. Meshkernelpy default is `RefinementType.REFINEMENT_LEVELS`. Overwritten with `RefinementType.WAVE_COURANT` here. * smoothing_iterations (int, optional): The number of smoothing iterations. Meshkernelpy default is `5`. Overwritten with 2 here. * connect_hanging_nodes (bool): Whether to connect hanging nodes at the end of the iteration. Meshkernelpy default is `True`. Set to False to do multiple refinement steps (e.g. for multiple regions) * max_courant_time (double, optional): Maximum courant time in seconds. Meshkernelpy default is `120`. """ if 'min_edge_size' not in kwargs: # min_edge_size (float): Minimum edge size. Meshkernelpy default is # `0.5`. Overwrite with more sensible value for coastal models. kwargs['min_edge_size'] = 500 if 'refinement_type' not in kwargs: # Refinement criterion type. Meshkernelpy default is # `RefinementType.REFINEMENT_LEVELS`. kwargs['refinement_type'] = meshkernel.RefinementType.WAVE_COURANT if 'smoothing_iterations' not in kwargs: # The number of smoothing iterations. Meshkernelpy default is `5`. kwargs['smoothing_iterations'] = 2 lon_np = data_bathy_sel.lon.to_numpy() lat_np = data_bathy_sel.lat.to_numpy() values_np = data_bathy_sel.to_numpy().flatten() gridded_samples = meshkernel.GriddedSamples( x_coordinates=lon_np, y_coordinates=lat_np, values=values_np) #refinement mesh_refinement_parameters = meshkernel.MeshRefinementParameters(**kwargs) mk.mesh2d_refine_based_on_gridded_samples( gridded_samples=gridded_samples, mesh_refinement_params=mesh_refinement_parameters, use_nodal_refinement=True, polygon=polygon, )
def meshkernel_get_outer_xycoords(mk:meshkernel.MeshKernel): mesh_bnds = mk.mesh2d_get_mesh_boundaries_as_polygons() xcoords = mesh_bnds.x_coordinates ycoords = mesh_bnds.y_coordinates if mesh_bnds.geometry_separator in xcoords: # TODO: would be nice to get only the outer xycoords instead # or to have easier selection of outer xycoords raise Exception('use meshkernel_get_outer_xycoords() on an uncut grid') return xcoords, ycoords
[docs] def generate_bndpli_cutland(mk:meshkernel.MeshKernel, res:str='f', min_area:float = 0, crs:(int,str) = None, buffer:float = 0): """ Generate a boundary polyline from the meshkernel object and cut away the landward part. Be sure to do this on the base/refined grid, not on the grid where the landward cells were already cut. Parameters ---------- mk : meshkernel.MeshKernel DESCRIPTION. res : str, optional DESCRIPTION. The default is 'f'. min_area : float, optional DESCRIPTION. The default is 0. crs : (int,str), optional DESCRIPTION. The default is None. buffer : float, optional DESCRIPTION. The default is 0. Returns ------- bnd_gdf : TYPE DESCRIPTION. """ bbox = meshkernel_get_bbox(mk) coastlines_gdf = get_coastlines_gdb(bbox=bbox, res=res, min_area=min_area, crs=crs) xcoords,ycoords = meshkernel_get_outer_xycoords(mk) mesh_bnds_xy = np.c_[xcoords,ycoords] meshbnd_ls = LineString(mesh_bnds_xy) coastlines_mp = MultiPolygon(coastlines_gdf.geometry.tolist()) coastlines_mp = coastlines_mp.buffer(buffer) bnd_ls = meshbnd_ls.difference(coastlines_mp) #attempt to merge MultiLineString to single LineString if isinstance(bnd_ls,MultiLineString): print('attemting to merge lines in MultiLineString to single LineString (if connected)') bnd_ls = linemerge(bnd_ls) #convert MultiLineString/LineString to GeoDataFrame if isinstance(bnd_ls,MultiLineString): bnd_gdf = gpd.GeoDataFrame(geometry=list(bnd_ls.geoms)) elif isinstance(bnd_ls,LineString): bnd_gdf = gpd.GeoDataFrame(geometry=[bnd_ls]) #set crs from coastlines bnd_gdf.crs = coastlines_gdf.crs return bnd_gdf
[docs] def interpolate_bndpli(bnd_gdf,res): """ interpolate bnd_gdf to a new resolution """ #TODO: keep corners of grid, maybe use mk.polygon_refine() bnd_gdf_interp = bnd_gdf.copy() for irow,row in bnd_gdf_interp.iterrows(): bnd_ls = row.geometry interp_range = np.arange(0,bnd_ls.length,res) bnd_ls_interp_points = bnd_ls.interpolate(interp_range) if len(bnd_ls_interp_points)==1: #no change if interp results in only one point continue bnd_ls_interp = LineString(bnd_ls_interp_points) bnd_gdf_interp.loc[irow,'geometry'] = bnd_ls_interp return bnd_gdf_interp