from __future__ import annotations
from itertools import chain
from typing import Any, Dict, Optional, Sequence, Tuple, Union
import numpy as np
import pandas as pd
import xarray as xr
from numba_celltree import CellTree2d
from scipy.sparse import coo_matrix, csr_matrix
from scipy.sparse.csgraph import reverse_cuthill_mckee
import xugrid
from xugrid import conversion
from xugrid import meshkernel_utils as mku
from xugrid.constants import (
BoolArray,
FloatArray,
FloatDType,
IntArray,
IntDType,
PolygonArray,
SparseMatrix,
)
from xugrid.core.utils import either_dict_or_kwargs
from xugrid.ugrid import connectivity, conventions
from xugrid.ugrid.ugridbase import AbstractUgrid, as_pandas_index
from xugrid.ugrid.voronoi import voronoi_topology
def section_coordinates(
edges: FloatArray, xy: FloatArray, dim: str, index: IntArray, name: str
) -> Tuple[IntArray, dict]:
# TODO: add boundaries xy[:, 0] and xy[:, 1]
xy_mid = 0.5 * (xy[:, 0, :] + xy[:, 1, :])
s = np.linalg.norm(xy_mid - edges[0, 0], axis=1)
order = np.argsort(s)
coords = {
f"{name}_x": (dim, xy_mid[order, 0]),
f"{name}_y": (dim, xy_mid[order, 1]),
f"{name}_s": (dim, s[order]),
}
return coords, index[order]
def numeric_bound(v: Union[float, None], other: float):
if v is None:
return other
else:
return v
[docs]
class Ugrid2d(AbstractUgrid):
"""
This class stores the topological data of a 2-D unstructured grid.
Parameters
----------
node_x: ndarray of floats
node_y: ndarray of floats
fill_value: int
face_node_connectivity: ndarray of integers
name: string, optional
Mesh name. Defaults to "mesh2d".
edge_node_connectivity: ndarray of integers, optional
dataset: xr.Dataset, optional
indexes: Dict[str, str], optional
When a dataset is provided, a mapping from the UGRID role to the dataset
variable name. E.g. {"face_x": "mesh2d_face_lon"}.
projected: bool, optional
Whether node_x and node_y are longitude and latitude or projected x and
y coordinates. Used to write the appropriate standard_name in the
coordinate attributes.
crs: Any, optional
Coordinate Reference System of the geometry objects. Can be anything accepted by
:meth:`pyproj.CRS.from_user_input() <pyproj.crs.CRS.from_user_input>`,
such as an authority string (eg "EPSG:4326") or a WKT string.
attrs: Dict[str, str], optional
UGRID topology attributes. Should not be provided together with
dataset: if other names are required, update the dataset instead.
A name entry is ignored, as name is given explicitly.
"""
[docs]
def __init__(
self,
node_x: FloatArray,
node_y: FloatArray,
fill_value: int,
face_node_connectivity: Union[IntArray, SparseMatrix],
name: str = "mesh2d",
edge_node_connectivity: IntArray = None,
dataset: xr.Dataset = None,
indexes: Dict[str, str] = None,
projected: bool = True,
crs: Any = None,
attrs: Dict[str, str] = None,
):
self.node_x = np.ascontiguousarray(node_x)
self.node_y = np.ascontiguousarray(node_y)
self.fill_value = fill_value
self.name = name
self.projected = projected
if isinstance(face_node_connectivity, np.ndarray):
face_node_connectivity = face_node_connectivity
elif isinstance(face_node_connectivity, (coo_matrix, csr_matrix)):
face_node_connectivity = connectivity.to_dense(
face_node_connectivity, fill_value
)
else:
raise TypeError(
"face_node_connectivity should be an array of integers or a sparse matrix"
)
self.face_node_connectivity = face_node_connectivity
# TODO: do this in validation instead. While UGRID conventions demand it,
# where does it go wrong?
# self.face_node_connectivity = connectivity.counterclockwise(
# face_node_connectivity, self.fill_value, self.node_coordinates
# )
self._initialize_indexes_attrs(name, dataset, indexes, attrs)
self._dataset = dataset
# Optional attributes, deferred initialization
# Meshkernel
self._mesh = None
self._meshkernel = None
# Celltree
self._celltree = None
# Perimeter
self._perimeter = None
# Area
self._area = None
# Centroids
self._centroids = None
self._circumcenters = None
# Bounds
self._xmin = None
self._xmax = None
self._ymin = None
self._ymax = None
# Edges
self._edge_x = None
self._edge_y = None
# Connectivity
self.edge_node_connectivity = edge_node_connectivity
self._edge_face_connectivity = None
self._node_node_connectivity = None
self._node_edge_connectivity = None
self._node_face_connectivity = None
self._face_edge_connectivity = None
self._face_face_connectivity = None
self._boundary_node_connectivity = None
# Derived topology
self._triangulation = None
self._voronoi_topology = None
self._centroid_triangulation = None
# crs
if crs is None:
self.crs = None
else:
import pyproj
self.crs = pyproj.CRS.from_user_input(crs)
def _clear_geometry_properties(self):
"""Clear all properties that may have been invalidated"""
# Meshkernel
self._mesh = None
self._meshkernel = None
# Celltree
self._celltree = None
# Perimeter
self._perimeter = None
# Area
self._area = None
# Centroids
self._centroids = None
self._circumcenters = None
# Bounds
self._xmin = None
self._xmax = None
self._ymin = None
self._ymax = None
# Edges
self._edge_x = None
self._edge_y = None
# Derived topology
self._triangulation = None
self._voronoi_topology = None
self._centroid_triangulation = None
[docs]
@classmethod
def from_meshkernel(
cls,
mesh,
name: str = "mesh2d",
projected: bool = True,
crs: Any = None,
):
"""
Create a 2D UGRID topology from a MeshKernel Mesh2d object.
Parameters
----------
mesh: MeshKernel.Mesh2d
name: str
Mesh name. Defaults to "mesh2d".
projected: bool
Whether node_x and node_y are longitude and latitude or projected x and
y coordinates. Used to write the appropriate standard_name in the
coordinate attributes.
crs: Any, optional
Coordinate Reference System of the geometry objects. Can be anything accepted by
:meth:`pyproj.CRS.from_user_input() <pyproj.crs.CRS.from_user_input>`,
such as an authority string (eg "EPSG:4326") or a WKT string.
Returns
-------
grid: Ugrid2d
"""
n_face = len(mesh.nodes_per_face)
n_max_node = mesh.nodes_per_face.max()
fill_value = -1
face_node_connectivity = np.full((n_face, n_max_node), fill_value)
isnode = connectivity.ragged_index(n_face, n_max_node, mesh.nodes_per_face)
face_node_connectivity[isnode] = mesh.face_nodes
return cls(
mesh.node_x,
mesh.node_y,
fill_value=fill_value,
face_node_connectivity=face_node_connectivity,
name=name,
projected=projected,
crs=crs,
)
[docs]
@classmethod
def from_dataset(cls, dataset: xr.Dataset, topology: str = None):
"""
Extract the 2D UGRID topology information from an xarray Dataset.
Parameters
----------
dataset: xr.Dataset
Dataset containing topology information stored according to UGRID conventions.
Returns
-------
grid: Ugrid1dAdapter
"""
ds = dataset
if not isinstance(ds, xr.Dataset):
raise TypeError(
"Ugrid should be initialized with xarray.Dataset. "
f"Received instead: {type(ds)}"
)
if topology is None:
topology = cls._single_topology(ds)
indexes = {}
# Collect names
connectivity = ds.ugrid_roles.connectivity[topology]
coordinates = ds.ugrid_roles.coordinates[topology]
ugrid_vars = (
[topology]
+ list(connectivity.values())
+ list(chain.from_iterable(chain.from_iterable(coordinates.values())))
)
x_index = coordinates["node_coordinates"][0][0]
y_index = coordinates["node_coordinates"][1][0]
node_x_coordinates = ds[x_index].astype(FloatDType).to_numpy()
node_y_coordinates = ds[y_index].astype(FloatDType).to_numpy()
face_nodes = connectivity["face_node_connectivity"]
fill_value = ds[face_nodes].encoding.get("_FillValue", -1)
face_node_connectivity = cls._prepare_connectivity(
ds[face_nodes], fill_value, dtype=IntDType
).to_numpy()
edge_nodes = connectivity.get("edge_node_connectivity")
if edge_nodes:
edge_node_connectivity = cls._prepare_connectivity(
ds[edge_nodes], fill_value, dtype=IntDType
).to_numpy()
else:
edge_node_connectivity = None
indexes["node_x"] = x_index
indexes["node_y"] = y_index
projected = False # TODO
return cls(
node_x_coordinates,
node_y_coordinates,
fill_value,
face_node_connectivity,
name=topology,
edge_node_connectivity=edge_node_connectivity,
dataset=ds[ugrid_vars],
indexes=indexes,
projected=projected,
crs=None,
)
def _get_name_and_attrs(self, name: str):
key = f"{name}_connectivity"
attrs = conventions.DEFAULT_ATTRS[key]
if "_FillValue" in attrs:
attrs["_FillValue"] = self.fill_value
return self._attrs[key], attrs
[docs]
def to_dataset(
self, other: xr.Dataset = None, optional_attributes: bool = False
) -> xr.Dataset:
node_x = self._indexes["node_x"]
node_y = self._indexes["node_y"]
face_nodes, face_nodes_attrs = self._get_name_and_attrs("face_node")
nmax_node_dim = self._attrs["max_face_nodes_dimension"]
edge_nodes, edge_nodes_attrs = self._get_name_and_attrs("edge_node")
data_vars = {
self.name: 0,
face_nodes: xr.DataArray(
data=self.face_node_connectivity,
attrs=face_nodes_attrs,
dims=(self.face_dimension, nmax_node_dim),
),
}
if self.edge_node_connectivity is not None or optional_attributes:
data_vars[edge_nodes] = xr.DataArray(
data=self.edge_node_connectivity,
attrs=edge_nodes_attrs,
dims=(self.edge_dimension, "two"),
)
if optional_attributes:
face_edges, face_edges_attrs = self._get_name_and_attrs("face_edge")
face_faces, face_faces_attrs = self._get_name_and_attrs("face_face")
edge_faces, edge_faces_attrs = self._get_name_and_attrs("edge_face")
bound_nodes, bound_nodes_attrs = self._get_name_and_attrs("boundary_node")
fill_value = self.fill_value
boundary_edge_dim = self._attrs["boundary_edge_dimension"]
data_vars[face_edges] = xr.DataArray(
data=self.face_edge_connectivity,
attrs=face_edges_attrs,
dims=(self.face_dimension, nmax_node_dim),
)
data_vars[face_faces] = xr.DataArray(
data=connectivity.to_dense(
self.face_face_connectivity, fill_value, self.n_max_node_per_face
),
attrs=face_faces_attrs,
dims=(self.face_dimension, nmax_node_dim),
)
data_vars[edge_faces] = xr.DataArray(
data=self.edge_face_connectivity,
attrs=edge_faces_attrs,
dims=(self.edge_dimension, "two"),
)
data_vars[bound_nodes] = xr.DataArray(
data=self.boundary_node_connectivity,
attrs=bound_nodes_attrs,
dims=(boundary_edge_dim, "two"),
)
attrs = {"Conventions": "CF-1.9 UGRID-1.0"}
if other is not None:
attrs.update(other.attrs)
dataset = xr.Dataset(data_vars, attrs=attrs)
if self._dataset:
dataset.update(self._dataset)
if other is not None:
dataset = dataset.merge(other)
if node_x not in dataset or node_y not in dataset:
dataset = self.assign_node_coords(dataset)
if optional_attributes:
dataset = self.assign_face_coords(dataset)
dataset = self.assign_edge_coords(dataset)
dataset[self.name].attrs = self._filtered_attrs(dataset)
return dataset
# These are all optional/derived UGRID attributes. They are not computed by
# default, only when called upon.
@property
def n_face(self) -> int:
"""Return the number of faces in the UGRID2D topology."""
return self.face_node_connectivity.shape[0]
@property
def n_max_node_per_face(self) -> int:
"""
Return the maximum number of nodes that a face can contain in the
UGRID2D topology.
"""
return self.face_node_connectivity.shape[1]
@property
def n_node_per_face(self) -> IntArray:
return (self.face_node_connectivity != self.fill_value).sum(axis=1)
@property
def core_dimension(self):
return self.face_dimension
@property
def dimensions(self):
return {
self.node_dimension: self.n_node,
self.edge_dimension: self.n_edge,
self.face_dimension: self.n_face,
}
@property
def max_face_node_dimension(self) -> str:
return self._attrs["max_face_nodes_dimension"]
@property
def max_connectivity_sizes(self) -> dict[str, int]:
return {
self.max_face_node_dimension: self.n_max_node_per_face,
}
@property
def max_connectivity_dimensions(self) -> tuple[str]:
return (self.max_face_node_dimension,)
@property
def topology_dimension(self):
"""Highest dimensionality of the geometric elements: 2"""
return 2
@property
def face_dimension(self):
"""Return the name of the face dimension."""
return self._attrs["face_dimension"]
def _edge_connectivity(self):
(
self._edge_node_connectivity,
self._face_edge_connectivity,
) = connectivity.edge_connectivity(
self.face_node_connectivity,
self.fill_value,
self._edge_node_connectivity,
)
@property
def edge_node_connectivity(self) -> IntArray:
"""
Edge to node connectivity. Every edge consists of a connection between
two nodes.
Returns
-------
connectivity: ndarray of integers with shape ``(n_edge, 2)``.
"""
if self._edge_node_connectivity is None:
self._edge_connectivity()
return self._edge_node_connectivity
@edge_node_connectivity.setter
def edge_node_connectivity(self, value):
self._edge_node_connectivity = value
@property
def face_edge_connectivity(self) -> csr_matrix:
"""
Face to edge connectivity.
Returns
-------
connectivity: csr_matrix
"""
if self._face_edge_connectivity is None:
self._edge_connectivity()
return self._face_edge_connectivity
@property
def boundary_node_connectivity(self) -> IntArray:
"""
Boundary node connectivity
Returns
-------
connectivity: ndarray of integers with shape ``(n_boundary_edge, 2)``
"""
if self._boundary_node_connectivity is None:
self._boundary_node_connectivity = connectivity.boundary_node_connectivity(
self.edge_face_connectivity,
self.fill_value,
self.edge_node_connectivity,
)
return self._boundary_node_connectivity
@property
def centroids(self) -> FloatArray:
"""
Centroid (x, y) of every face.
Returns
-------
centroids: ndarray of floats with shape ``(n_face, 2)``
"""
if self._centroids is None:
self._centroids = connectivity.centroids(
self.face_node_connectivity,
self.fill_value,
self.node_x,
self.node_y,
)
return self._centroids
@property
def circumcenters(self):
"""
Circumenter (x, y) of every face; only works for fully triangular
grids.
"""
if self._circumcenters is None:
self._circumcenters = connectivity.circumcenters(
self.face_node_connectivity,
self.fill_value,
self.node_x,
self.node_y,
)
return self._circumcenters
@property
def area(self) -> FloatArray:
"""Area of every face."""
if self._area is None:
self._area = connectivity.area(
self.face_node_connectivity,
self.fill_value,
self.node_x,
self.node_y,
)
return self._area
@property
def perimeter(self) -> FloatArray:
"""Perimeter length of every face."""
if self._perimeter is None:
self._perimeter = connectivity.perimeter(
self.face_node_connectivity,
self.fill_value,
self.node_x,
self.node_y,
)
return self._perimeter
@property
def face_bounds(self):
"""
Returns a numpy array with columns ``minx, miny, maxx, maxy``,
describing the bounds of every face in the grid.
Returns
-------
face_bounds: np.ndarray of shape (n_face, 4)
"""
x = self.node_x[self.face_node_connectivity]
y = self.node_y[self.face_node_connectivity]
isfill = self.face_node_connectivity == self.fill_value
x[isfill] = np.nan
y[isfill] = np.nan
return np.column_stack(
[
np.nanmin(x, axis=1),
np.nanmin(y, axis=1),
np.nanmax(x, axis=1),
np.nanmax(y, axis=1),
]
)
@property
def face_x(self):
"""x-coordinate of centroid of every face"""
return self.centroids[:, 0]
@property
def face_y(self):
"""y-coordinate of centroid of every face"""
return self.centroids[:, 1]
@property
def face_coordinates(self) -> FloatArray:
"""
Centroid (x, y) of every face.
Returns
-------
centroids: ndarray of floats with shape ``(n_face, 2)``
"""
return self.centroids
@property
def face_node_coordinates(self) -> FloatArray:
"""
Node coordinates of every face.
"Fill node" coordinates are set as NaN.
Returns
-------
face_node_coordinates: ndarray of floats with shape ``(n_face, n_max_node_per_face, 2)``
"""
coords = np.full(
(self.n_face, self.n_max_node_per_face, 2), np.nan, dtype=FloatDType
)
is_node = self.face_node_connectivity != self.fill_value
index = self.face_node_connectivity[is_node]
coords[is_node, :] = self.node_coordinates[index]
return coords
@property
def edge_face_connectivity(self) -> IntArray:
"""
Edge to face connectivity. An edge may belong to a single face
(exterior edge), or it may be shared by two faces (interior edge).
An exterior edge will contain a ``fill_value`` for the second column.
Returns
-------
connectivity: ndarray of integers with shape ``(n_edge, 2)``.
"""
if self._edge_face_connectivity is None:
self._edge_face_connectivity = connectivity.invert_dense(
self.face_edge_connectivity, self.fill_value
)
return self._edge_face_connectivity
@property
def face_face_connectivity(self) -> csr_matrix:
"""
Face to face connectivity. Derived from shared edges.
The connectivity is represented as an adjacency matrix in CSR format,
with the row and column indices as a (0-based) face index. The data of
the matrix contains the edge index as every connection is formed by a
shared edge.
Returns
-------
connectivity: csr_matrix
"""
if self._face_face_connectivity is None:
self._face_face_connectivity = connectivity.face_face_connectivity(
self.edge_face_connectivity, self.fill_value
)
return self._face_face_connectivity
@property
def node_face_connectivity(self):
"""
Node to face connectivity. Inverted from face node connectivity.
Returns
-------
connectivity: csr_matrix
"""
if self._node_face_connectivity is None:
self._node_face_connectivity = connectivity.invert_dense_to_sparse(
self.face_node_connectivity, self.fill_value
)
return self._node_face_connectivity
def connectivity_matrix(self, dim: str, xy_weights: bool):
if dim == self.node_dimension:
connectivity = self.node_node_connectivity.copy()
coordinates = self.node_coordinates
elif dim == self.face_dimension:
connectivity = self.face_face_connectivity.copy()
coordinates = self.centroids
else:
raise ValueError(
f"Expected {self.node_dimension} or {self.face_dimension}; got: {dim}"
)
if xy_weights:
connectivity.data = self._connectivity_weights(connectivity, coordinates)
return connectivity
@property
def mesh(self) -> "mk.Mesh2d": # type: ignore # noqa
"""
Create if needed, and return meshkernel Mesh2d object.
Returns
-------
mesh: meshkernel.Mesh2d
"""
import meshkernel as mk
edge_nodes = self.edge_node_connectivity.ravel().astype(np.int32)
is_node = self.face_node_connectivity != self.fill_value
nodes_per_face = is_node.sum(axis=1).astype(np.int32)
face_nodes = self.face_node_connectivity[is_node].ravel().astype(np.int32)
if self._mesh is None:
self._mesh = mk.Mesh2d(
node_x=self.node_x,
node_y=self.node_y,
edge_nodes=edge_nodes,
face_nodes=face_nodes,
nodes_per_face=nodes_per_face,
)
return self._mesh
@property
def meshkernel(self) -> "mk.MeshKernel": # type: ignore # noqa
"""
Create if needed, and return meshkernel MeshKernel instance.
Returns
-------
meshkernel: meshkernel.MeshKernel
"""
import meshkernel as mk
if self._meshkernel is None:
if self.is_geographic:
mk_projection = mk.ProjectionType.SPHERICAL
else:
mk_projection = mk.ProjectionType.CARTESIAN
self._meshkernel = mk.MeshKernel(mk_projection)
self._meshkernel.mesh2d_set(self.mesh)
return self._meshkernel
@property
def voronoi_topology(self):
"""
Centroidal Voronoi tesselation of this UGRID2D topology.
Returns
-------
vertices: ndarray of floats with shape ``(n_centroids, 2)``
face_node_connectivity: csr_matrix
Describes face node connectivity of voronoi topology.
face_index: 1d array of integers
"""
if self._voronoi_topology is None:
vertices, faces, face_index = voronoi_topology(
self.node_face_connectivity,
self.node_coordinates,
self.centroids,
self.edge_face_connectivity,
self.edge_node_connectivity,
self.fill_value,
add_exterior=True,
add_vertices=False,
)
self._voronoi_topology = vertices, faces, face_index
return self._voronoi_topology
@property
def centroid_triangulation(self):
"""
Triangulation of centroidal voronoi tesselation.
Required for e.g. contouring face data, which takes triangles and
associated values at the triangle vertices.
Returns
-------
vertices: ndarray of floats with shape ``(n_centroids, 2)``
face_node_connectivity: ndarray of integers with shape ``(n_triangle, 3)``
Describes face node connectivity of triangle topology.
face_index: 1d array of integers
"""
if self._centroid_triangulation is None:
nodes, faces, face_index = self.voronoi_topology
triangles, _ = connectivity.triangulate(faces, self.fill_value)
triangulation = (nodes[:, 0].copy(), nodes[:, 1].copy(), triangles)
self._centroid_triangulation = (triangulation, face_index)
return self._centroid_triangulation
@property
def triangulation(self):
"""
Triangulation of the UGRID2D topology.
Returns
-------
triangulation: tuple
Contains node_x, node_y, triangle face_node_connectivity.
triangle_face_connectivity: 1d array of integers
Identifies the original face for every triangle.
"""
if self._triangulation is None:
triangles, triangle_face_connectivity = connectivity.triangulate(
self.face_node_connectivity, self.fill_value
)
triangulation = (self.node_x, self.node_y, triangles)
self._triangulation = (triangulation, triangle_face_connectivity)
return self._triangulation
@property
def exterior_edges(self) -> IntArray:
"""
Get all exterior edges, i.e. edges with no other face.
Returns
-------
edge_index: 1d array of integers
"""
# Numpy argwhere doesn't return a 1D array
return np.nonzero(self.edge_face_connectivity[:, 1] == self.fill_value)[0]
@property
def exterior_faces(self) -> IntArray:
"""
Get all exterior faces, i.e. faces with an unshared edge.
Returns
-------
face_index: 1d array of integers
"""
exterior_edges = self.exterior_edges
exterior_faces = self.edge_face_connectivity[exterior_edges].ravel()
return np.unique(exterior_faces[exterior_faces != self.fill_value])
@property
def celltree(self):
"""
Initializes the celltree if needed, and returns celltree.
A celltree is a search structure for spatial lookups in unstructured grids.
"""
if self._celltree is None:
self._celltree = CellTree2d(
self.node_coordinates, self.face_node_connectivity, self.fill_value
)
return self._celltree
[docs]
def validate_edge_node_connectivity(self):
"""
Mark valid edges, by comparing face_node_connectivity and
edge_node_connectivity. Edges that are not part of a face, as well as
duplicate edges are marked ``False``.
An error is raised if the face_node_connectivity defines more unique
edges than the edge_node_connectivity.
Returns
-------
valid: np.ndarray of bool
Marks for every edge whether it is valid.
Examples
--------
To purge invalid edges and associated data from a dataset that contains
un-associated or duplicate edges:
>>> uds = xugrid.open_dataset("example.nc")
>>> valid = uds.ugrid.grid.validate_edge_node_connectivity()
>>> purged = uds.isel({grid.edge_dimension: valid})
"""
return connectivity.validate_edge_node_connectivity(
self.face_node_connectivity,
self.fill_value,
self.edge_node_connectivity,
)
[docs]
def assign_face_coords(
self,
obj: Union[xr.DataArray, xr.Dataset],
) -> Union[xr.DataArray, xr.Dataset]:
"""
Assign face coordinates from the grid to the object.
Returns a new object with all the original data in addition to the new
node coordinates of the grid.
Parameters
----------
obj: xr.DataArray or xr.Dataset
Returns
-------
assigned (same type as obj)
"""
xname = self._indexes.get("face_x", f"{self.name}_face_x")
yname = self._indexes.get("face_y", f"{self.name}_face_y")
x_attrs = conventions.DEFAULT_ATTRS["face_x"][self.projected]
y_attrs = conventions.DEFAULT_ATTRS["face_y"][self.projected]
coords = {
xname: xr.DataArray(
data=self.face_x,
dims=(self.face_dimension,),
attrs=x_attrs,
),
yname: xr.DataArray(
data=self.face_y,
dims=(self.face_dimension,),
attrs=y_attrs,
),
}
return obj.assign_coords(coords)
[docs]
def locate_points(self, points: FloatArray):
"""
Find in which face points are located.
Parameters
----------
points: ndarray of floats with shape ``(n_point, 2)``
Returns
-------
face_index: ndarray of integers with shape ``(n_points,)``
"""
return self.celltree.locate_points(points)
[docs]
def intersect_edges(self, edges: FloatArray):
"""
Find in which face edges are located and compute the intersection with
the face edges.
Parameters
----------
edges: ndarray of floats with shape ``(n_edge, 2, 2)``
The first dimensions represents the different edges.
The second dimensions represents the start and end of every edge.
The third dimensions reresent the x and y coordinate of every vertex.
Returns
-------
edge_index: ndarray of integers with shape ``(n_intersection,)``
face_index: ndarray of integers with shape ``(n_intersection,)``
intersections: ndarray of float with shape ``(n_intersection, 2, 2)``
"""
return self.celltree.intersect_edges(edges)
[docs]
def locate_bounding_box(
self, xmin: float, ymin: float, xmax: float, ymax: float
) -> IntArray:
"""
Find which faces are located in the bounding box. The centroids of the
faces are used.
Parameters
----------
xmin: float,
ymin: float,
xmax: float,
ymax: float
Returns
-------
face_index: ndarray of bools with shape ``(n_face,)``
"""
return np.nonzero(
(self.face_x >= xmin)
& (self.face_x < xmax)
& (self.face_y >= ymin)
& (self.face_y < ymax)
)[0]
[docs]
def compute_barycentric_weights(
self, points: FloatArray
) -> Tuple[IntArray, FloatArray]:
"""
Find in which face the points are located, and compute the barycentric
weight for every vertex of the face.
Parameters
----------
points: ndarray of floats with shape ``(n_point, 2)``
Returns
-------
face_index: ndarray of integers with shape ``(n_points,)``
weights: ndarray of floats with shape ```(n_points, n_max_node)``
"""
return self.celltree.compute_barycentric_weights(points)
[docs]
def rasterize_like(
self, x: FloatArray, y: FloatArray
) -> Tuple[FloatArray, FloatArray, IntArray]:
"""
Rasterize unstructured grid by sampling on the x and y coordinates.
Parameters
----------
x: 1d array of floats with shape ``(ncol,)``
y: 1d array of floats with shape ``(nrow,)``
Returns
-------
x: 1d array of floats with shape ``(ncol,)``
y: 1d array of floats with shape ``(nrow,)``
face_index: 1d array of integers with shape ``(nrow * ncol,)``
"""
yy, xx = np.meshgrid(y, x, indexing="ij")
nodes = np.column_stack([xx.ravel(), yy.ravel()])
index = self.celltree.locate_points(nodes).reshape((y.size, x.size))
return x, y, index
[docs]
def rasterize(
self,
resolution: float,
bounds: Optional[Tuple[float, float, float, float]] = None,
) -> Tuple[FloatArray, FloatArray, IntArray]:
"""
Rasterize unstructured grid by sampling.
x and y coordinates are generated from the bounds of the UGRID2D
topology and the provided resolution.
Parameters
----------
resolution: float
Spacing in x and y.
bounds: tuple of four floats, optional
xmin, ymin, xmax, ymax
Returns
-------
x: 1d array of floats with shape ``(ncol,)``
y: 1d array of floats with shape ``(nrow,)``
face_index: 1d array of integers with shape ``(nrow * ncol,)``
"""
if bounds is None:
bounds = self.bounds
xmin, ymin, xmax, ymax = bounds
d = abs(resolution)
xmin = np.floor(xmin / d) * d
xmax = np.ceil(xmax / d) * d
ymin = np.floor(ymin / d) * d
ymax = np.ceil(ymax / d) * d
x = np.arange(xmin + 0.5 * d, xmax, d)
y = np.arange(ymax - 0.5 * d, ymin, -d)
return self.rasterize_like(x, y)
[docs]
def topology_subset(
self, face_index: Union[BoolArray, IntArray], return_index: bool = False
):
"""
Create a new UGRID1D topology for a subset of this topology.
Parameters
----------
face_index: 1d array of integers or bool
Edges of the subset.
return_index: bool, optional
Whether to return node_index, edge_index, face_index.
Returns
-------
subset: Ugrid2d
indexes: dict
Dictionary with keys node dimension, edge dimension, face dimension
and values their respective index. Only returned if return_index is
True.
"""
if not isinstance(face_index, pd.Index):
face_index = as_pandas_index(face_index, self.n_face)
# The pandas index may only contain uniques. So if size matches, it may
# be the identity.
range_index = pd.RangeIndex(0, self.n_face)
if face_index.size == self.n_face and face_index.equals(range_index):
# TODO: return self.copy instead?
if return_index:
indexes = {
self.node_dimension: pd.RangeIndex(0, self.n_node),
self.edge_dimension: pd.RangeIndex(0, self.n_edge),
self.face_dimension: range_index,
}
return self, indexes
else:
return self
index = face_index.to_numpy()
face_subset = self.face_node_connectivity[index]
node_index = np.unique(face_subset.ravel())
node_index = node_index[node_index != self.fill_value]
new_faces = connectivity.renumber(face_subset, self.fill_value)
node_x = self.node_x[node_index]
node_y = self.node_y[node_index]
edge_index = None
new_edges = None
if self.edge_node_connectivity is not None:
edge_index = np.unique(self.face_edge_connectivity[index].ravel())
edge_index = edge_index[edge_index != self.fill_value]
edge_subset = self.edge_node_connectivity[edge_index]
new_edges = connectivity.renumber(edge_subset)
grid = self.__class__(
node_x,
node_y,
self.fill_value,
new_faces,
name=self.name,
edge_node_connectivity=new_edges,
indexes=self._indexes,
projected=self.projected,
crs=self.crs,
attrs=self._attrs,
)
if return_index:
indexes = {
self.node_dimension: pd.Index(node_index),
self.face_dimension: face_index,
}
if edge_index is not None:
indexes[self.edge_dimension] = pd.Index(edge_index)
return grid, indexes
else:
return grid
def clip_box(
self,
xmin: float,
ymin: float,
xmax: float,
ymax: float,
):
xmin, ymin, xmax, ymax = self.bounds
bounds = [xmin, ymin, xmax, ymax]
face_index = self.locate_bounding_box(*bounds)
return self.topology_subset(face_index)
[docs]
def isel(self, indexers=None, return_index=False, **indexers_kwargs):
"""
Select based on node, edge, or face.
Face selection always results in a valid UGRID topology.
Node or edge selection may result in invalid topologies (incomplete
faces), and will error in such a case.
Parameters
----------
indexers: dict of str to np.ndarray of integers or bools
return_index: bool, optional
Whether to return node_index, edge_index, face_index.
Returns
-------
obj: xr.Dataset or xr.DataArray
grid: Ugrid2d
indexes: dict
Dictionary with keys node dimension, edge dimension, face dimension
and values their respective index. Only returned if return_index is
True.
"""
indexers = either_dict_or_kwargs(indexers, indexers_kwargs, "isel")
alldims = set(self.dimensions)
invalid = indexers.keys() - alldims
if invalid:
raise ValueError(
f"Dimensions {invalid} do not exist. Expected one of {alldims}"
)
indexers = {
k: as_pandas_index(v, self.dimensions[k]) for k, v in indexers.items()
}
nodedim, edgedim, facedim = self.dimensions
face_index = {}
if nodedim in indexers:
node_index = indexers[nodedim]
face_index[nodedim] = np.unique(
self.node_face_connectivity[node_index].data
)
if edgedim in indexers:
edge_index = indexers[edgedim]
index = np.unique(self.edge_face_connectivity[edge_index])
face_index[edgedim] = index[index != self.fill_value]
if facedim in indexers:
face_index[facedim] = indexers[facedim]
# Convert all to pandas index.
face_index = {k: as_pandas_index(v, self.n_face) for k, v in face_index.items()}
# Check the indexes against each other.
index = self._precheck(face_index)
grid, finalized_indexers = self.topology_subset(index, return_index=True)
self._postcheck(indexers, finalized_indexers)
if return_index:
return grid, finalized_indexers
else:
return grid
def _validate_indexer(self, indexer) -> Union[slice, np.ndarray]:
if isinstance(indexer, slice):
s = indexer
if s.start is not None and s.stop is not None:
if s.start >= s.stop:
raise ValueError(
"slice stop should be larger than slice start, received: "
f"start: {s.start}, stop: {s.stop}"
)
if s.step is not None:
indexer = np.arange(s.start, s.stop, s.step)
elif s.start is None or s.stop is None:
if s.step is not None:
raise ValueError(
"step should be None if slice start or stop is None"
)
else: # Convert it into a 1d numpy array
if isinstance(indexer, xr.DataArray):
indexer = indexer.to_numpy()
if isinstance(indexer, (list, np.ndarray, int, float)):
indexer = np.atleast_1d(indexer)
else:
raise TypeError(
f"Invalid indexer type: {type(indexer).__name__}, "
"allowed types: integer, float, list, numpy array, xarray DataArray"
)
if indexer.ndim > 1:
raise ValueError("index should be 0d or 1d")
return indexer
def _sel_box(
self,
obj,
x: slice,
y: slice,
):
xmin, ymin, xmax, ymax = self.bounds
bounds = [
numeric_bound(x.start, xmin),
numeric_bound(y.start, ymin),
numeric_bound(x.stop, xmax),
numeric_bound(y.stop, ymax),
]
face_index = self.locate_bounding_box(*bounds)
grid, indexes = self.topology_subset(face_index, return_index=True)
indexes = {k: v for k, v in indexes.items() if k in obj.dims}
new_obj = obj.isel(indexes)
return new_obj, grid
def _sel_line(
self,
obj,
start,
end,
):
edges = np.array([[start, end]])
_, index, xy = self.intersect_edges(edges)
coords, index = section_coordinates(
edges, xy, self.face_dimension, index, self.name
)
return obj.isel({self.face_dimension: index}).assign_coords(coords)
def _sel_yline(
self,
obj,
x: float,
y: slice,
):
xmin, _, xmax, _ = self.bounds
if y.size != 1:
raise ValueError(
"If x is a slice without steps, y should be a single value"
)
y = y[0]
xstart = numeric_bound(x.start, xmin)
xstop = numeric_bound(x.stop, xmax)
return self._sel_line(obj, start=(xstart, y), end=(xstop, y))
def _sel_xline(
self,
obj,
x: float,
y: slice,
):
_, ymin, _, ymax = self.bounds
if x.size != 1:
raise ValueError(
"If y is a slice without steps, x should be a single value"
)
x = x[0]
ystart = numeric_bound(y.start, ymin)
ystop = numeric_bound(y.stop, ymax)
return self._sel_line(obj, start=(x, ystart), end=(x, ystop))
[docs]
def sel_points(self, obj, x: FloatArray, y: FloatArray):
"""
Select points in the unstructured grid.
Out-of-bounds points are ignored. They may be identified via the
``index`` coordinate of the returned selection.
Parameters
----------
x: 1d array of floats with shape ``(n_points,)``
y: 1d array of floats with shape ``(n_points,)``
obj: xr.DataArray or xr.Dataset
Returns
-------
selection: xr.DataArray or xr.Dataset
The name of the topology is prefixed in the x, y coordinates.
"""
x = np.atleast_1d(x)
y = np.atleast_1d(y)
if x.shape != y.shape:
raise ValueError("shape of x does not match shape of y")
if x.ndim != 1:
raise ValueError("x and y must be 1d")
dim = self.face_dimension
xy = np.column_stack([x, y])
index = self.locate_points(xy)
valid = index != -1
index = index[valid]
coords = {
f"{self.name}_index": (dim, np.arange(len(valid))[valid]),
f"{self.name}_x": (dim, xy[valid, 0]),
f"{self.name}_y": (dim, xy[valid, 1]),
}
return obj.isel({dim: index}).assign_coords(coords)
[docs]
def intersect_line(self, obj, start: Sequence[float], end: Sequence[float]):
"""
Intersect a line with this grid, and fetch the values of the
intersected faces.
Parameters
----------
obj: xr.DataArray or xr.Dataset
start: sequence of two floats
coordinate pair (x, y), designating the start point of the line.
end: sequence of two floats
coordinate pair (x, y), designating the end point of the line.
Returns
-------
selection: xr.DataArray or xr.Dataset
The name of the topology is prefixed in the x, y and s
(spatium=distance) coordinates.
"""
if (len(start) != 2) or (len(end) != 2):
raise ValueError("Start and end coordinate pairs must have length two")
return self._sel_line(obj, start, end)
[docs]
def intersect_linestring(
self,
obj: Union[xr.DataArray, xr.Dataset],
linestring: "shapely.geometry.LineString", # type: ignore # noqa
) -> Union[xr.DataArray, xr.Dataset]:
"""
Intersect linestrings with this grid, and fetch the values of the
intersected faces.
Parameters
----------
obj: xr.DataArray or xr.Dataset
linestring: shapely.geometry.lineString
Returns
-------
selection: xr.DataArray or xr.Dataset
The name of the topology is prefixed in the x, y and s
(spatium=distance) coordinates.
"""
import shapely
xy = shapely.get_coordinates([linestring])
edges = np.stack((xy[:-1], xy[1:]), axis=1)
edge_index, face_index, intersections = self.intersect_edges(edges)
# Compute the cumulative length along the edges
edge_length = np.linalg.norm(edges[:, 1] - edges[:, 0], axis=1)
cumulative_length = np.empty_like(edge_length)
cumulative_length[0] = 0
np.cumsum(edge_length[:-1], out=cumulative_length[1:])
# Compute the distance for every intersection to the start of the linestring.
intersection_centroid = intersections.mean(axis=1)
distance_node_to_intersection = np.linalg.norm(
intersection_centroid - edges[edge_index, 0], axis=1
)
s = distance_node_to_intersection + cumulative_length[edge_index]
# Now sort everything according to s.
sorter = np.argsort(s)
face_index = face_index[sorter]
intersection_centroid = intersection_centroid[sorter]
intersections = intersections[sorter]
facedim = self.face_dimension
coords = {
f"{self.name}_s": (facedim, s[sorter]),
f"{self.name}_x": (facedim, intersection_centroid[:, 0]),
f"{self.name}_y": (facedim, intersection_centroid[:, 1]),
}
return obj.isel({facedim: face_index}).assign_coords(coords)
[docs]
def sel(self, obj, x=None, y=None):
"""
Find selection in the UGRID x and y coordinates.
The indexing for x and y always occurs orthogonally, i.e.:
``.sel(x=[0.0, 5.0], y=[10.0, 15.0])`` results in a four points. For
vectorized indexing (equal to ``zip``ing through x and y), see
``.sel_points``.
Parameters
----------
obj: xr.DataArray or xr.Dataset
x: float, 1d array, slice
y: float, 1d array, slice
Returns
-------
dimension: str
as_ugrid: bool
index: 1d array of integers
coords: dict
"""
if x is None:
x = slice(None, None)
if y is None:
y = slice(None, None)
x = self._validate_indexer(x)
y = self._validate_indexer(y)
if isinstance(x, slice) and isinstance(y, slice):
f = self._sel_box
elif isinstance(x, slice) and isinstance(y, np.ndarray):
f = self._sel_yline
elif isinstance(x, np.ndarray) and isinstance(y, slice):
f = self._sel_xline
elif isinstance(x, np.ndarray) and isinstance(y, np.ndarray):
# Orthogonal points
y, x = [a.ravel() for a in np.meshgrid(y, x, indexing="ij")]
f = self.sel_points
else:
raise TypeError(
f"Invalid indexer types: {type(x).__name__}, and {type(y).__name__}"
)
return f(obj, x, y)
[docs]
def label_partitions(self, n_part: int) -> "xugrid.UgridDataArray":
"""
Generate partition labesl for this grid topology using METIS:
https://github.com/KarypisLab/METIS
This method utilizes the pymetis Python bindings:
https://github.com/inducer/pymetis
Parameters
----------
n_part: integer
The number of parts to partition the mesh.
Returns
-------
partition_labels: UgridDataArray of integers
"""
import pymetis
adjacency_matrix = self.face_face_connectivity
_, partition_index = pymetis.part_graph(
nparts=n_part,
xadj=adjacency_matrix.indptr,
adjncy=adjacency_matrix.indices,
)
return xugrid.UgridDataArray(
obj=xr.DataArray(
data=np.array(partition_index),
dims=(self.core_dimension,),
name="labels",
),
grid=self,
)
[docs]
def partition(self, n_part: int):
"""
Partition this grid topology using METIS:
https://github.com/KarypisLab/METIS
This method utilizes the pymetis Python bindings:
https://github.com/inducer/pymetis
Parameters
----------
n_part: integer
The number of parts to partition the mesh.
Returns
-------
partitions
"""
from xugrid.ugrid.partitioning import labels_to_indices
labels = self.label_partitions(n_part)
indices = labels_to_indices(labels.values)
return [self.topology_subset(index) for index in indices]
[docs]
@staticmethod
def merge_partitions(
grids: Sequence["Ugrid2d"]
) -> tuple["Ugrid2d", dict[str, np.array]]:
"""
Merge grid partitions into a single whole.
Duplicate faces are included only once, and removed from subsequent
partitions before merging.
Parameters
----------
grids: sequence of Ugrid2d
Returns
-------
merged: Ugrid2d
"""
from xugrid.ugrid import partitioning
# Grab a sample grid
grid = next(iter(grids))
fill_value = grid.fill_value
node_coordinates, node_indexes, node_inverse = partitioning.merge_nodes(grids)
new_faces, face_indexes = partitioning.merge_faces(
grids, node_inverse, fill_value
)
indexes = {
grid.node_dimension: node_indexes,
grid.face_dimension: face_indexes,
}
if grid._edge_node_connectivity is not None:
new_edges, edge_indexes = partitioning.merge_edges(grids, node_inverse)
indexes[grid.edge_dimension] = edge_indexes
else:
new_edges = None
merged_grid = Ugrid2d(
*node_coordinates.T,
fill_value,
new_faces,
name=grid.name,
edge_node_connectivity=new_edges,
indexes=grid._indexes,
projected=grid.projected,
crs=grid.crs,
attrs=grid._attrs,
)
return merged_grid, indexes
[docs]
def to_periodic(self, obj=None):
"""
Convert this grid to a periodic grid, where the rightmost nodes are
equal to the leftmost nodes. Note: for this to work, the y-coordinates
on the left boundary must match those on the right boundary exactly.
Returns
-------
periodic_grid: Ugrid2d
aligned: xr.DataArray or xr.Dataset
"""
xmin, _, xmax, _ = self.bounds
coordinates = self.node_coordinates
is_right = np.isclose(coordinates[:, 0], xmax)
is_left = np.isclose(coordinates[:, 0], xmin)
node_y = coordinates[:, 1]
if not np.allclose(np.sort(node_y[is_left]), np.sort(node_y[is_right])):
raise ValueError(
"y-coordinates of the left and right boundaries do not match"
)
# Discard the rightmost nodes. Preserve the order in the faces, and the
# order of the nodes.
coordinates[is_right, 0] = xmin
_, node_index, inverse = np.unique(
coordinates, return_index=True, return_inverse=True, axis=0
)
# Create a mapping of the inverse index to the new node index.
new_index = connectivity.renumber(node_index)
new_faces = new_index[inverse[self.face_node_connectivity]]
# Get the selection of nodes, and keep the order.
node_index.sort()
new_xy = self.node_coordinates[node_index]
# Preserve the order of the edge_node_connectivity if it is present.
new_edges = None
edge_index = None
if self._edge_node_connectivity is not None:
new_edges = inverse[self.edge_node_connectivity]
new_edges.sort(axis=1)
_, edge_index = np.unique(new_edges, axis=0, return_index=True)
edge_index.sort()
new_edges = new_index[new_edges][edge_index]
new = Ugrid2d(
node_x=new_xy[:, 0],
node_y=new_xy[:, 1],
face_node_connectivity=new_faces,
fill_value=self.fill_value,
name=self.name,
edge_node_connectivity=new_edges,
indexes=self._indexes,
projected=self.projected,
crs=self.crs,
attrs=self.attrs,
)
if obj is not None:
indexes = {
self.face_dimension: pd.RangeIndex(0, self.n_face),
self.node_dimension: pd.Index(node_index),
}
if edge_index is not None:
indexes[self.edge_dimension] = pd.Index(edge_index)
indexes = {k: v for k, v in indexes.items() if k in obj.dims}
return new, obj.isel(**indexes)
else:
return new
[docs]
def to_nonperiodic(self, xmax: float, obj=None):
"""
Convert this grid from a periodic grid (where the rightmost boundary shares its
nodes with the leftmost boundary) to an aperiodic grid, where the leftmost nodes
are separate from the rightmost nodes.
Parameters
----------
xmax: float
The x-value of the newly created rightmost boundary nodes.
obj: xr.DataArray or xr.Dataset
Returns
-------
nonperiodic_grid: Ugrid2d
aligned: xr.DataArray or xr.Dataset
"""
xleft, _, xright, _ = self.bounds
half_domain = 0.5 * (xright - xleft)
# Extract all x coordinates for every face. Then identify the nodes
# which have a value of e.g. -180, while the max x value for the face
# is 180.0. These nodes should be duplicated.
x = self.face_node_coordinates[..., 0]
is_periodic = (np.nanmax(x, axis=1)[:, np.newaxis] - x) > half_domain
periodic_nodes = self.face_node_connectivity[is_periodic]
uniques, new_nodes = np.unique(periodic_nodes, return_inverse=True)
new_x = np.full(uniques.size, xmax)
new_y = self.node_y[uniques]
new_faces = self.face_node_connectivity.copy()
new_faces[is_periodic] = new_nodes + self.n_node
# edge_node_connectivity must be rederived, since we've added a number
# of new edges and new nodes.
new = Ugrid2d(
node_x=np.concatenate((self.node_x, new_x)),
node_y=np.concatenate((self.node_y, new_y)),
face_node_connectivity=new_faces,
fill_value=self.fill_value,
name=self.name,
edge_node_connectivity=None,
indexes=self._indexes,
projected=self.projected,
crs=self.crs,
attrs=self.attrs,
)
edge_index = None
if self._edge_node_connectivity is not None:
# If there is edge associated data, we need to duplicate the data
# of the edges. It is impossible(?) to do this on the edges
# directly, due to the possible presence of "symmetric" edges:
# 2
# /|\
# / | \
# 0__1__0
#
# (0, 1) and (1, 0) are topologically distinct, but only in the
# face definition. In the new grid, the 0 on the right will have
# become node 3, creating distinct edges.
#
# Note that any data with the edge is only stored once, which is
# incorrect(!), but a given for these grids and would be a problem
# for the simulation code producing these results.
#
# We use a casting trick to collapse two integers into one so we
# can use searchsorted easily.
edges = (
np.sort(self.edge_node_connectivity, axis=1)
.astype(np.int32)
.view(np.int64)
.ravel()
)
# Create a mapping of the new nodes created above, to the original nodes.
# Then, find the new edges in the old using searchsorted.
mapping = np.concatenate((np.arange(self.n_node), uniques))
new_edges = (
np.sort(mapping[new.edge_node_connectivity], axis=1)
.astype(np.int32)
.view(np.int64)
.ravel()
)
edge_index = np.searchsorted(edges, new_edges, sorter=np.argsort(edges))
# Reshuffle to keep the original order as intact as possible; how
# much benefit does this actually give?
sorter = np.argsort(edge_index)
new._edge_node_connectivity = new._edge_node_connectivity[sorter]
edge_index = edge_index[sorter]
if obj is not None:
indexes = {
self.face_dimension: pd.RangeIndex(0, self.n_face),
self.node_dimension: pd.Index(
np.concatenate((np.arange(self.n_node), uniques))
),
}
if edge_index is not None:
indexes[self.edge_dimension] = pd.Index(edge_index)
indexes = {k: v for k, v in indexes.items() if k in obj.dims}
return new, obj.isel(**indexes)
else:
return new
[docs]
def reindex_like(
self,
other: "Ugrid2d",
obj: Union[xr.DataArray, xr.Dataset],
tolerance: float = 0.0,
):
"""
Conform a DataArray or Dataset to match the topology of another Ugrid2D
topology. The topologies must be exactly equivalent: only the order of
the nodes, edges, and faces may differ.
Parameters
----------
other: Ugrid2d
obj: DataArray or Dataset
tolerance: float, default value 0.0.
Maximum distance between inexact coordinate matches.
Returns
-------
reindexed: DataArray or Dataset
"""
if not isinstance(other, Ugrid2d):
raise TypeError(f"Expected Ugrid2d, received: {type(other).__name__}")
indexers = {
self.node_dimension: connectivity.index_like(
xy_a=self.node_coordinates,
xy_b=other.node_coordinates,
tolerance=tolerance,
),
self.face_dimension: connectivity.index_like(
xy_a=self.centroids,
xy_b=other.centroids,
tolerance=tolerance,
),
}
if other._edge_node_connectivity is not None:
indexers[self.edge_dimension] = connectivity.index_like(
xy_a=self.edge_coordinates,
xy_b=other.edge_coordinates,
tolerance=tolerance,
)
return obj.isel(indexers, missing_dims="ignore")
[docs]
def triangulate(self):
"""
Triangulate this UGRID2D topology, breaks more complex polygons down
into triangles.
Returns
-------
triangles: Ugrid2d
"""
triangles, _ = connectivity.triangulate(
self.face_node_connectivity, self.fill_value
)
return Ugrid2d(self.node_x, self.node_y, self.fill_value, triangles)
def _tesselate_voronoi(self, centroids, add_exterior, add_vertices):
if add_exterior:
edge_face_connectivity = self.edge_face_connectivity
edge_node_connectivity = self.edge_node_connectivity
else:
edge_face_connectivity = None
edge_node_connectivity = None
vertices, faces, _ = voronoi_topology(
self.node_face_connectivity,
self.node_coordinates,
centroids,
edge_face_connectivity,
edge_node_connectivity,
self.fill_value,
add_exterior,
add_vertices,
)
faces = connectivity.to_dense(faces, self.fill_value)
return Ugrid2d(vertices[:, 0], vertices[:, 1], self.fill_value, faces)
[docs]
def tesselate_centroidal_voronoi(self, add_exterior=True, add_vertices=True):
"""
Create a centroidal Voronoi tesselation of this UGRID2D topology.
Such a tesselation is not guaranteed to produce convex cells. To ensure
convexity, set ``add_vertices=False`` -- this will result in a
different exterior, however.
Parameters
----------
add_exterior: bool, default: True
add_vertices: bool, default: True
Returns
-------
tesselation: Ugrid2d
"""
return self._tesselate_voronoi(self.centroids, add_exterior, add_vertices)
[docs]
def tesselate_circumcenter_voronoi(self, add_exterior=True, add_vertices=True):
"""
Create a circumcenter Voronoi tesselation of this UGRID2D topology.
Such a tesselation is not guaranteed to produce convex cells. To ensure
convexity, set ``add_vertices=False`` -- this will result in a
different exterior, however.
Parameters
----------
add_exterior: bool, default: True
add_vertices: bool, default: True
Returns
-------
tesselation: Ugrid2d
"""
return self._tesselate_voronoi(self.circumcenters, add_exterior, add_vertices)
[docs]
def reverse_cuthill_mckee(self, dimension=None):
"""
Reduces bandwith of the connectivity matrix.
Wraps :py:func:`scipy.sparse.csgraph.reverse_cuthill_mckee`.
Returns
-------
reordered: Ugrid2d
"""
# TODO: dispatch on dimension?
reordering = reverse_cuthill_mckee(
graph=self.face_face_connectivity,
symmetric_mode=True,
)
reordered_grid = Ugrid2d(
self.node_x,
self.node_y,
self.fill_value,
self.face_node_connectivity[reordering],
)
return reordered_grid, reordering
def refine_polygon(
self,
polygon: "shapely.geometry.Polygon", # type: ignore # noqa
min_face_size: float,
refine_intersected: bool = True,
use_mass_center_when_refining: bool = True,
refinement_type: str = "refinement_levels",
connect_hanging_nodes: bool = True,
account_for_samples_outside_face: bool = True,
max_refinement_iterations: int = 10,
):
import meshkernel as mk
geometry_list = mku.to_geometry_list(polygon)
refinement_type = mku.either_string_or_enum(refinement_type, mk.RefinementType)
self._initialize_mesh_kernel()
mesh_refinement_params = mk.MeshRefinementParameters(
refine_intersected,
use_mass_center_when_refining,
min_face_size,
refinement_type,
connect_hanging_nodes,
account_for_samples_outside_face,
max_refinement_iterations,
)
self._meshkernel.mesh2d_refine_based_on_polygon(
geometry_list,
mesh_refinement_params,
)
def delete_polygon(
self,
polygon: "shapely.geometry.Polygon", # type: ignore # noqa
delete_option: str = "all_face_circumenters",
invert_deletion: bool = False,
):
import meshkernel as mk
geometry_list = mku.to_geometry_list(polygon)
delete_option = mku.either_string_or_enum(delete_option, mk.DeleteMeshOption)
self._initialize_mesh_kernel()
self._meshkernel.mesh2d_delete(geometry_list, delete_option, invert_deletion)
@staticmethod
def from_polygon(
polygon: "shapely.geometry.Polygon", # type: ignore # noqa
):
import meshkernel as mk
geometry_list = mku.to_geometry_list(polygon)
_mesh_kernel = mk.MeshKernel()
_mesh_kernel.mesh2d_make_mesh_from_polygon(geometry_list)
mesh = _mesh_kernel.mesh2d_get()
n_max_node = mesh.nodes_per_face.max()
ds = Ugrid2d.topology_dataset(
mesh.node_x,
mesh.node_y,
mesh.face_nodes.reshape((-1, n_max_node)),
)
ugrid = Ugrid2d(ds)
ugrid.mesh = mesh
ugrid._meshkernel = _mesh_kernel
return ugrid
[docs]
@classmethod
def from_geodataframe(cls, geodataframe: "geopandas.GeoDataFrame") -> "Ugrid2d": # type: ignore # noqa
"""
Convert a geodataframe of polygons to UGRID2D topology.
Parameters
----------
geodataframe: geopandas GeoDataFrame
Returns
-------
topology: Ugrid2d
"""
import geopandas as gpd
if not isinstance(geodataframe, gpd.GeoDataFrame):
raise TypeError(
f"Expected GeoDataFrame, received: {type(geodataframe).__name__}"
)
return cls.from_shapely(geodataframe.geometry.to_numpy(), crs=geodataframe.crs)
[docs]
@staticmethod
def from_shapely(geometry: PolygonArray, crs=None) -> "Ugrid2d":
"""
Convert an array of shapely polygons to UGRID2D topology.
Parameters
----------
geometry: np.ndarray of shapely polygons
crs: Any, optional
Coordinate Reference System of the geometry objects. Can be anything accepted by
:meth:`pyproj.CRS.from_user_input() <pyproj.crs.CRS.from_user_input>`,
such as an authority string (eg "EPSG:4326") or a WKT string.
Returns
-------
topology: Ugrid2d
"""
import shapely
if not (shapely.get_type_id(geometry) == shapely.GeometryType.POLYGON).all():
raise TypeError(
"Can only create Ugrid2d from shapely Polygon geometries, "
"geometry contains other types of geometries."
)
x, y, face_node_connectivity, fill_value = conversion.polygons_to_faces(
geometry
)
return Ugrid2d(x, y, fill_value, face_node_connectivity, crs=crs)
@staticmethod
def _from_intervals_helper(
node_x: np.ndarray,
node_y: np.ndarray,
nx: int,
ny: int,
) -> "Ugrid2d":
linear_index = np.arange(node_x.size, dtype=IntDType).reshape((ny + 1, nx + 1))
# Allocate face_node_connectivity
face_nodes = np.empty((ny * nx, 4), dtype=IntDType)
# Set connectivity in counterclockwise manner
left, right = slice(None, -1), slice(1, None)
lower, upper = slice(None, -1), slice(1, None)
if node_x[1] < node_x[0]: # x_decreasing
left, right = right, left
if node_y[ny + 1] < node_y[0]: # y_decreasing
lower, upper = upper, lower
face_nodes[:, 0] = linear_index[lower, left].ravel()
face_nodes[:, 1] = linear_index[lower, right].ravel()
face_nodes[:, 2] = linear_index[upper, right].ravel()
face_nodes[:, 3] = linear_index[upper, left].ravel()
return Ugrid2d(node_x, node_y, -1, face_nodes)
[docs]
@staticmethod
def from_structured_intervals1d(
x_intervals: np.ndarray,
y_intervals: np.ndarray,
) -> "Ugrid2d":
"""
Create a Ugrid2d topology from a structured topology based on 1D intervals.
Parameters
----------
x_intervals: np.ndarray of shape (M + 1,)
x-coordinate interval values for N row and M columns.
y_intervals: np.ndarray of shape (N + 1,)
y-coordinate interval values for N row and M columns.
"""
x_intervals = np.asarray(x_intervals)
y_intervals = np.asarray(y_intervals)
nx = x_intervals.shape[0] - 1
ny = y_intervals.shape[0] - 1
node_y, node_x = (
a.ravel() for a in np.meshgrid(y_intervals, x_intervals, indexing="ij")
)
return Ugrid2d._from_intervals_helper(node_x, node_y, nx, ny)
[docs]
@staticmethod
def from_structured_intervals2d(
x_intervals: np.ndarray,
y_intervals: np.ndarray,
) -> "Ugrid2d":
"""
Create a Ugrid2d topology from a structured topology based on 2D intervals.
Parameters
----------
x_intervals: np.ndarray of shape shape (N + 1, M + 1)
x-coordinate interval values for N row and M columns.
y_intervals: np.ndarray of shape shape (N + 1, M + 1)
y-coordinate interval values for N row and M columns.
"""
x_intervals = np.asarray(x_intervals)
y_intervals = np.asarray(y_intervals)
shape = x_intervals.shape
if (x_intervals.ndim != 2) or (y_intervals.ndim != 2):
raise ValueError("Dimensions of intervals must be 2D.")
if shape != y_intervals.shape:
raise ValueError(
"Interval shapes must match. Found: "
f"x_intervals: {shape}, versus y_intervals: {y_intervals.shape}"
)
nx = shape[1] - 1
ny = shape[0] - 1
node_x = x_intervals.ravel()
node_y = y_intervals.ravel()
return Ugrid2d._from_intervals_helper(node_x, node_y, nx, ny)
[docs]
@staticmethod
def from_structured_bounds(
x_bounds: np.ndarray,
y_bounds: np.ndarray,
) -> "Ugrid2d":
"""
Create a Ugrid2d topology from a structured topology based on 1D bounds.
The bounds contain the lower and upper cell boundary for each cell.
Parameters
----------
x_bounds: np.ndarray of shape (M, 2)
x-coordinate bounds for N row and M columns.
y_bounds: np.ndarray of shape (N, 2)
y-coordinate bounds for N row and M columns.
Returns
-------
grid: Ugrid2d
"""
nx, _ = x_bounds.shape
ny, _ = y_bounds.shape
x = conversion.bounds_to_vertices(x_bounds)
y = conversion.bounds_to_vertices(y_bounds)
node_y, node_x = (a.ravel() for a in np.meshgrid(y, x, indexing="ij"))
return Ugrid2d._from_intervals_helper(node_x, node_y, nx, ny)
[docs]
@staticmethod
def from_structured(
data: Union[xr.DataArray, xr.Dataset],
x: str | None = None,
y: str | None = None,
) -> "Ugrid2d":
"""
Create a Ugrid2d topology from an axis-aligned rectilinear structured topology.
This method assumes the coordinates are 1D.
Use ``from_structured_multicoord`` for 2D x and y coordinates, e.g. for
(approximated) curvilinear and rotated structured topologies.
Parameters
----------
data: xr.DataArray or xr.Dataset
x: str, optional
Name of the 1D coordinate to use as the UGRID x-coordinate.
y: str, optional
Name of the 1D coordinate to use as the UGRID y-coordinate.
Returns
-------
grid: Ugrid2d
"""
if x is None or y is None:
x, y = conversion.infer_xy_coords(data)
if x is None or y is None:
raise ValueError(
"Could not infer bounds. Please provide x and y explicitly."
)
x_intervals = conversion.infer_interval_breaks1d(data, x)
y_intervals = conversion.infer_interval_breaks1d(data, y)
return Ugrid2d.from_structured_intervals1d(x_intervals, y_intervals)
[docs]
@staticmethod
def from_structured_multicoord(
data: Union[xr.DataArray, xr.Dataset],
x: str,
y: str,
) -> "Ugrid2d":
"""
Create a Ugrid2d topology from a structured topology, including rotated
and (approximated) curvilinear topologies.
This method assumes the coordinates are 2D.
Use ``from_structured`` for 1D x and y coordinates, which is generally
the case for axis-aligned rectilinear topologies (most rasters).
Parameters
----------
data: xr.DataArray or xr.Dataset
x: str
Name of the 2D coordinate to use as the UGRID x-coordinate.
y: str
Name of the 2D coordinate to use as the UGRID y-coordinate.
Returns
-------
grid: Ugrid2d
"""
xv = conversion.infer_interval_breaks(data[x], axis=1, check_monotonic=True)
xv = conversion.infer_interval_breaks(xv, axis=0)
yv = conversion.infer_interval_breaks(data[y], axis=1)
yv = conversion.infer_interval_breaks(yv, axis=0, check_monotonic=True)
return Ugrid2d.from_structured_intervals2d(xv, yv)
[docs]
def to_shapely(self, dim):
"""
Convert UGRID topology to shapely objects.
* nodes: points
* edges: linestrings
* faces: polygons
Parameters
----------
dim: str
Node, edge, or face dimension.
Returns
-------
geometry: ndarray of shapely.Geometry
"""
if dim == self.face_dimension:
return conversion.faces_to_polygons(
self.node_x,
self.node_y,
self.face_node_connectivity,
self.fill_value,
)
elif dim == self.node_dimension:
return conversion.nodes_to_points(
self.node_x,
self.node_y,
)
elif dim == self.edge_dimension:
return conversion.edges_to_linestrings(
self.node_x,
self.node_y,
self.edge_node_connectivity,
)
else:
raise ValueError(
f"Dimension {dim} is not a face, node, or edge dimension of the"
" Ugrid2d topology."
)
[docs]
def bounding_polygon(self) -> "shapely.Polygon": # type: ignore # noqa
"""
Construct the bounding polygon of the grid. This polygon may include
holes if the grid also contains holes.
"""
import shapely
def _bbox_area(bounds):
return (bounds[2] - bounds[0]) * (bounds[3] - bounds[1])
edges = self.node_coordinates[self.boundary_node_connectivity]
collection = shapely.polygonize(shapely.linestrings(edges))
polygon = max(collection.geoms, key=lambda x: _bbox_area(x.bounds))
return polygon