"""Logic for snapping points and lines to face nodes and face edges."""
from typing import Tuple, TypeVar, Union
import numba as nb
import numpy as np
import pandas as pd
import xarray as xr
from scipy.spatial import cKDTree
import xugrid as xu
from xugrid.constants import (
FloatArray,
IntArray,
IntDType,
LineArray,
MissingOptionalModule,
Point,
Vector,
)
from xugrid.core.sparse import MatrixCSR, columns_and_values, row_slice
from xugrid.ugrid import connectivity
from xugrid.ugrid.connectivity import AdjacencyMatrix
from xugrid.ugrid.ugrid2d import Ugrid2d
try:
import geopandas as gpd
GeoDataFrameType = gpd.GeoDataFrame
except ImportError:
gpd = MissingOptionalModule("geopandas")
# https://stackoverflow.com/questions/61384752/how-to-type-hint-with-an-optional-import
GeoDataFrameType = TypeVar("GeoDataFrameType") # avoid ImportError in typehints
try:
import shapely
except ImportError:
shapely = MissingOptionalModule("shapely")
@nb.njit(cache=True)
def _snap_to_nearest(A: MatrixCSR, snap_candidates: IntArray, max_distance) -> IntArray:
"""
Find a closest target for each node.
The kD tree distance matrix will have stored for each node the other nodes
that are within snapping distance. These are the rows in the sparse matrix
that have more than one entry: the snap_candidates.
The first condition for a point to become a TARGET is if it hasn't been
connected to another point yet, i.e. it is UNVISITED. Once a point becomes
an TARGET, it looks for nearby points within the max_distance. These nearby
points are connected if: they are UNVISITED (i.e. they don't have a target
yet), or the current target is closer than the previous.
"""
UNVISITED = -1
TARGET = -2
nearest = np.full(A.n, max_distance + 1.0)
visited = np.full(A.n, UNVISITED)
for i in snap_candidates:
if visited[i] != UNVISITED:
continue
visited[i] = TARGET
# Now iterate through every node j that is within max_distance of node i.
for j, dist in columns_and_values(A, row_slice(A, i)):
if i == j or visited[j] == TARGET:
# Continue if we're looking at the distance to ourselves
# (i==j), or other node is a target.
continue
if visited[j] == UNVISITED or dist < nearest[j]:
# If unvisited node, or already visited but we're closer, set
# to i.
visited[j] = i
nearest[j] = dist
return visited
[docs]
def snap_nodes(
x: FloatArray, y: FloatArray, max_snap_distance: float
) -> Tuple[FloatArray, FloatArray, IntArray]:
"""
Snap neigbhoring vertices together that are located within a maximum
snapping distance from each other.
If vertices are located within a maximum distance, some of them are snapped
to their neighbors ("targets"), thereby guaranteeing a minimum distance
between nodes in the result. The determination of whether a point becomes a
target itself or gets snapped to another point is primarily based on the
order in which points are processed and their spatial relationships.
This function also return an inverse index array. In case of a connectivity
array, ``inverse`` can be used to index into, yielding the updated
numbers. E.g.:
``updated_face_nodes = inverse[face_nodes]``
Parameters
----------
x: 1D nd array of floats of size N
y: 1D nd array of floats of size N
max_snap_distance: float
Returns
-------
inverse: 1D nd array of ints of size N
Inverse index array: the new vertex number for every old vertex. Is
None when no vertices within max_distance of each other.
x_snapped: 1D nd array of floats of size M
Returns a copy of ``x`` when no vertices within max_distance of each
other.
y_snapped: 1D nd array of floats of size M
Returns a copy of ``y`` when no vertices within max_distance of each
other.
"""
# First, find all the points that lie within max_distance of each other
coords = np.column_stack((x, y))
tree = cKDTree(coords)
distances = tree.sparse_distance_matrix(
tree, max_distance=max_snap_distance, output_type="coo_matrix"
).tocsr()
should_snap = distances.getnnz(axis=1) > 1
if should_snap.any():
index = np.arange(x.size)
visited = _snap_to_nearest(
A=MatrixCSR.from_csr_matrix(distances),
snap_candidates=index[should_snap],
max_distance=max_snap_distance,
)
targets = visited < 0 # i.e. still UNVISITED or TARGET valued.
visited[targets] = index[targets]
deduplicated, inverse = np.unique(visited, return_inverse=True)
return inverse, x[deduplicated], y[deduplicated]
else:
return None, x.copy(), y.copy()
def snap_to_nodes(
x: FloatArray,
y: FloatArray,
to_x: FloatArray,
to_y: FloatArray,
max_distance: float,
tiebreaker=None,
) -> Tuple[FloatArray, FloatArray, IntArray]:
"""
Snap vertices (x, y) to another set of vertices (to_x, to_y) if within a
specified maximum distance.
Parameters
----------
x: 1D nd array of floats of size N
y: 1D nd array of floats of size N
to_x: 1D nd array of floats of size M
to_y: 1D nd array of floats of size M
max_distance: float
Returns
-------
x_snapped: 1D nd array of floats of size N
y_snapped: 1D nd array of floats of size N
"""
if tiebreaker not in (None, "nearest"):
raise ValueError(
f"Invalid tiebreaker: {tiebreaker}, should be one of "
'{None, "nearest"} instead.'
)
# Build KDTrees
coords = np.column_stack((x, y))
to_coords = np.column_stack((to_x, to_y))
tree = cKDTree(coords)
to_tree = cKDTree(to_coords)
# Build distance matrix, identify points to snap (update) and possible ties
# row (i) is index in x, y; column (j) is index in to_x, to_y
# Convert to csr for quicker row indexing, and row reductions.
distances = tree.sparse_distance_matrix(
to_tree, max_distance=max_distance, output_type="coo_matrix"
).tocsr()
n_per_row = distances.getnnz(axis=1)
update = n_per_row == 1
tie = n_per_row > 1
# Update the points that snap to a single (to_x, to_y) vertex
xnew = x.copy()
ynew = y.copy()
j_update = distances[update].indices
xnew[update] = to_x[j_update]
ynew[update] = to_y[j_update]
# Resolve ties with multiple (to_x, to_y) candidates
if tie.any():
if tiebreaker == "nearest":
ties = distances[tie].tocoo()
j_nearest = (
pd.DataFrame({"i": ties.row, "distance": ties.data}, index=ties.col)
.groupby("i")["distance"]
.idxmin()
.to_numpy()
)
xnew[tie] = to_x[j_nearest]
ynew[tie] = to_y[j_nearest]
elif tiebreaker is None:
raise ValueError(
"Ties detected: multiple options to snap to, given max distance: "
"set a smaller tolerance or specify a tiebreaker."
)
else:
raise ValueError("Invalid tiebreaker")
return xnew, ynew
@nb.njit(inline="always")
def to_vector(a: Point, b: Point) -> Vector:
return Vector(b.x - a.x, b.y - a.y)
@nb.njit(inline="always")
def as_point(a: FloatArray) -> Point:
return Point(a[0], a[1])
def lines_as_edges(line_coords, line_index) -> FloatArray:
edges = np.empty((len(line_coords) - 1, 2, 2))
edges[:, 0, :] = line_coords[:-1]
edges[:, 1, :] = line_coords[1:]
keep = np.diff(line_index) == 0
return edges[keep], line_index[1:][keep]
@nb.njit(inline="always")
def left_of(a: Point, p: Point, U: Vector) -> bool:
# Whether point a is left of vector U
# U: p -> q direction vector
# TODO: maybe add epsilon for floating point
return U.x * (a.y - p.y) > U.y * (a.x - p.x)
def coerce_geometry(lines: GeoDataFrameType) -> LineArray:
geometry = lines.geometry.to_numpy()
geom_type = shapely.get_type_id(geometry)
if not ((geom_type == 1) | (geom_type == 2)).all():
raise ValueError("Geometry should contain only LineStrings and/or LinearRings")
return geometry
@nb.njit(cache=True)
def snap_to_edges(
face_indices: IntArray,
intersection_edges: FloatArray,
face_edge_connectivity: AdjacencyMatrix,
centroids: FloatArray,
edge_centroids: FloatArray,
edges: IntArray,
segment_index: IntArray,
) -> Tuple[IntArray, IntArray]:
"""
Snap the intersected edges to the edges of the surrounding face.
This algorithm works as follows:
* It takes the intersected edges; any edge (p to q) to test falls fully
within a single face.
* For a face, we take the centroid (a).
* We loop through every edge of the face.
* If the edge separates the centroid (a) from the centroid of the edge (b)
we store that edge as a separating edge.
We test for separation by:
* Finding whether a and b are located at opposite sides of the half-plane
created by the edge p -> q (U).
* Finding whether p and q are located at opposide sides of the half-plane
created by a -> b (V).
* The separation test will return False if the lines are collinear. This is
desirable here, if U runs collinear with V, U doesn't separate a from b.
Do a minimum amount of work: reuse a_left, only compute V if needed.
"""
count = 0
for i in range(len(face_indices)):
face = face_indices[i]
a = as_point(centroids[face])
p = as_point(intersection_edges[i, 0])
q = as_point(intersection_edges[i, 1])
U = to_vector(p, q)
if U.x == 0 and U.y == 0:
continue
a_left = left_of(a, p, U)
for edge in connectivity.neighbors(face_edge_connectivity, face):
b = as_point(edge_centroids[edge])
b_left = left_of(b, p, U)
if a_left != b_left:
V = to_vector(a, b)
if left_of(p, a, V) != left_of(q, a, V):
segment_index[count] = i
edges[count] = edge
count += 1
return edges[:count], segment_index[:count]
def _create_output_dataset(
lines: GeoDataFrameType,
topology: "xu.Ugrid2d",
edges: IntArray,
line_index: IntArray,
) -> xu.UgridDataset:
uds = xu.UgridDataset(grids=[topology])
data = np.full(topology.n_edge, np.nan)
data[edges] = line_index
uds["line_index"] = xr.DataArray(
data=data,
dims=[topology.edge_dimension],
)
for column in lines.columns:
if column == "geometry":
continue
data = np.full(topology.n_edge, np.nan)
data[edges] = lines[column].iloc[line_index]
uds[column] = xr.DataArray(
data=data,
dims=[topology.edge_dimension],
)
return uds
def _create_output_gdf(
lines,
vertices,
edge_node_connectivity,
edges,
shapely_index,
):
edge_vertices = vertices[edge_node_connectivity[edges]]
geometry = shapely.linestrings(edge_vertices)
return gpd.GeoDataFrame(
lines.drop(columns="geometry").iloc[shapely_index], geometry=geometry
)
[docs]
def create_snap_to_grid_dataframe(
lines: GeoDataFrameType,
grid: Union[xr.DataArray, xu.UgridDataArray],
max_snap_distance: float,
) -> pd.DataFrame:
"""
Create a dataframe required to snap line geometries to a Ugrid2d topology.
A line is included and snapped to a grid edge when the line separates
the centroid of the cell with the centroid of the edge.
Parameters
----------
lines: gpd.GeoDataFrame
Line data. Geometry colum should contain exclusively LineStrings.
grid: xugrid.Ugrid2d
Grid of cells to snap lines to.
max_snap_distance: float
Returns
-------
result: pd.DataFrame
DataFrame with columns:
* line_index: the index of the geodataframe geometry.
* edge_index: the index of the
* x0: start x-coordinate of edge segment.
* y0: start y-coordinate of edge segment.
* x1: end x-coordinate of edge segment.
* y1: end y-coordinate of edge segment.
* length: length of the edge.
Examples
--------
First create data frame:
>>> snapping_df = create_snap_to_grid_dataframe(lines, grid2d, max_snap_distance=0.5)
Use the ``line_index`` column to assign values from ``lines`` to this new dataframe:
>>> snapping_df["my_variable"] = lines["my_variable"].iloc[snapping_df["line_index"]].to_numpy()
Run some reduction on the variable, to create an aggregated value per grid edge:
>>> aggregated = snapping_df.groupby("edge_index").sum()
Assign the aggregated values to a Ugrid2d topology:
>>> new = xu.full_like(edge_data, np.nan)
>>> new.data[aggregated.index] = aggregated["my_variable"]
"""
if not isinstance(grid, Ugrid2d):
raise TypeError(f"Expected Ugrid2d, received: {type(grid).__name__}")
topology = grid
vertices = topology.node_coordinates
edge_centroids = topology.edge_coordinates
face_edge_connectivity = topology.face_edge_connectivity
A = connectivity.to_sparse(face_edge_connectivity)
n, m = A.shape
face_edge_connectivity = AdjacencyMatrix(A.indices, A.indptr, A.nnz, n, m)
# Create geometric data
line_geometry = coerce_geometry(lines)
line_coords, shapely_vertex_index = shapely.get_coordinates(
line_geometry, return_index=True
)
# Snap line_coords to grid
x, y = snap_to_nodes(
*line_coords.T, *vertices.T, max_snap_distance, tiebreaker="nearest"
)
line_edges, shapely_line_index = lines_as_edges(
np.column_stack([x, y]), shapely_vertex_index
)
# Search for intersections. Every edge is potentially divided into smaller
# segments: The segment_indices contain (repeated) values of the
#
# * line_index: for each segment, the index of the shapely geometry.
# * face_indices: for each segment, the index of the topology face.
# * segment_edges: for each segment, start and end xy-coordinates.
#
line_index, face_indices, segment_edges = topology.celltree.intersect_edges(
line_edges
)
# Create edges from the intersected lines a: line can only snap to N - 1
# edges for N edges of a face. Pre-allocate the arrays here. For some
# reason, recent versions of numba refuse np.empty or np.zeros calls in
# this module (!).
# TODO: investigate...
#
# * edge_index: which edge of the topology (may contain duplicates)
# * segment_index: which segment
max_n_new_edges = len(face_indices) * topology.n_max_node_per_face - 1
edge_index = np.empty(max_n_new_edges, dtype=IntDType)
segment_index = np.empty(max_n_new_edges, dtype=IntDType)
edge_index, segment_index = snap_to_edges(
face_indices,
segment_edges,
face_edge_connectivity,
topology.centroids,
edge_centroids,
edge_index, # out
segment_index, # out
)
line_index = line_index[segment_index]
segment_edges = segment_edges[segment_index]
return pd.DataFrame(
data={
"line_index": shapely_line_index[line_index],
"edge_index": edge_index,
"x0": segment_edges[:, 0, 0],
"y0": segment_edges[:, 0, 1],
"x1": segment_edges[:, 1, 0],
"y1": segment_edges[:, 1, 1],
"length": ((segment_edges[:, 1] - segment_edges[:, 0]) ** 2).sum(axis=1),
}
)
[docs]
def snap_to_grid(
lines: GeoDataFrameType,
grid: Union[xr.DataArray, xu.UgridDataArray],
max_snap_distance: float,
) -> Tuple[IntArray, Union[pd.DataFrame, GeoDataFrameType]]:
"""
Snap a collection of lines to a grid.
A line is included and snapped to a grid edge when the line separates
the centroid of the cell with the centroid of the edge.
Parameters
----------
lines: gpd.GeoDataFrame
Line data. Geometry colum should contain exclusively LineStrings.
grid: xr.DataArray or xu.UgridDataArray of integers
Grid of cells to snap lines to.
max_snap_distance: float
Returns
-------
uds: UgridDataset
Snapped line geometries as edges in a Ugrid2d topology. Contains a
``line_index`` variable identifying the original geodataframe line.
gdf: gpd.GeoDataFrame
Snapped line geometries.
"""
if isinstance(grid, Ugrid2d):
topology = grid
elif isinstance(grid, xr.DataArray):
# Convert structured to unstructured representation
topology = Ugrid2d.from_structured(grid)
elif isinstance(grid, xu.UgridDataArray):
topology = grid.ugrid.grid
else:
raise TypeError(
"Expected xarray.DataArray or xugrid.UgridDataArray, received: "
f" {type(grid).__name__}"
)
result = create_snap_to_grid_dataframe(lines, topology, max_snap_distance)
# When multiple line parts are snapped to the same edge, use the ones with
# the greatest length inside the cell.
max_edge_index = result.groupby("edge_index").idxmax()["length"].to_numpy()
line_index = result["line_index"].to_numpy()[max_edge_index]
edges = result["edge_index"].to_numpy()[max_edge_index]
uds = _create_output_dataset(lines, topology, edges, line_index)
gdf = _create_output_gdf(
lines,
topology.node_coordinates,
topology.edge_node_connectivity,
edges,
line_index,
)
return uds, gdf