Source code for xugrid.ugrid.snapping

"""Logic for snapping points and lines to face nodes and face edges."""
from typing import Tuple, TypeVar, Union

import numba as nb
import numpy as np
import pandas as pd
import xarray as xr
from scipy import sparse
from scipy.sparse.csgraph import connected_components
from scipy.spatial import cKDTree

import xugrid as xu
from xugrid.constants import (
    FloatArray,
    IntArray,
    IntDType,
    LineArray,
    MissingOptionalModule,
    Point,
    Vector,
)
from xugrid.ugrid import connectivity
from xugrid.ugrid.connectivity import AdjacencyMatrix
from xugrid.ugrid.ugrid2d import Ugrid2d

try:
    import geopandas as gpd

    GeoDataFrameType = gpd.GeoDataFrame
except ImportError:
    gpd = MissingOptionalModule("geopandas")
    # https://stackoverflow.com/questions/61384752/how-to-type-hint-with-an-optional-import
    GeoDataFrameType = TypeVar("GeoDataFrameType")  # avoid ImportError in typehints

try:
    import shapely
except ImportError:
    shapely = MissingOptionalModule("shapely")


def snap_nodes(
    x: FloatArray, y: FloatArray, max_distance: float
) -> Tuple[FloatArray, FloatArray, IntArray]:
    """
    Snap neigbhoring vertices together that are located within a maximum
    distance from each other.

    If vertices are located within a maximum distance, they are merged into a
    single vertex. The coordinates of the merged coordinates are given by the
    centroid of the merging vertices.

    Note that merging is a communicative process: vertex A might lie close to
    vertex B, vertex B might lie close to vertex C, and so on. These points are
    grouped and merged into a single new vertex.

    This function also return an inverse index array. In case of a connectivity
    array, ``inverse`` can be used to index into, yielding the updated
    numbers. E.g.:

    ``updated_face_nodes = inverse[face_nodes]``

    Parameters
    ----------
    x: 1D nd array of floats of size N
    y: 1D nd array of floats of size N
    max_distance: float

    Returns
    -------
    inverse: 1D nd array of ints of size N
        Inverse index array: the new vertex number for every old vertex. Is
        None when no vertices within max_distance of each other.
    x_merged: 1D nd array of floats of size M
        Returns a copy of ``x`` when no vertices within max_distance of each
        other.
    y_merged: 1D nd array of floats of size M
        Returns a copy of ``y`` when no vertices within max_distance of each
        other.
    """
    # First, find all the points that lie within max_distance of each other
    coords = np.column_stack((x, y))
    n = len(coords)
    tree = cKDTree(coords)
    coo_distances = tree.sparse_distance_matrix(
        tree, max_distance=max_distance, output_type="coo_matrix"
    )
    # Get rid of diagonal
    i = coo_distances.row
    j = coo_distances.col
    off_diagonal = i != j
    i = i[off_diagonal]
    j = j[off_diagonal]
    # If no pairs are found, there is nothing to snap
    if len(i) > 0:
        # Next, group the points together.
        # Point A might lie close to point B, point B might lie close to
        # Point C, and so on. These points are grouped, and their centroid
        # is computed.
        coo_content = (np.ones(i.size), (i, j))
        coo_matrix = sparse.coo_matrix(coo_content, shape=(n, n))
        # Directed is true: this matrix is symmetrical
        _, inverse = connected_components(coo_matrix, directed=True)
        new = (
            pd.DataFrame({"label": inverse, "x": x, "y": y})
            .groupby("label")
            .agg(
                {
                    "x": "mean",
                    "y": "mean",
                }
            )
        )
        return inverse, new["x"].to_numpy(), new["y"].to_numpy()
    else:
        return None, x.copy(), y.copy()


def snap_to_nodes(
    x: FloatArray,
    y: FloatArray,
    to_x: FloatArray,
    to_y: FloatArray,
    max_distance: float,
    tiebreaker=None,
) -> Tuple[FloatArray, FloatArray, IntArray]:
    """
    Snap vertices (x, y) to another set of vertices (to_x, to_y) if within a
    specified maximum distance.

    Parameters
    ----------
    x: 1D nd array of floats of size N
    y: 1D nd array of floats of size N
    to_x: 1D nd array of floats of size M
    to_y: 1D nd array of floats of size M
    max_distance: float

    Returns
    -------
    x_snapped: 1D nd array of floats of size N
    y_snapped: 1D nd array of floats of size N
    """
    if tiebreaker not in (None, "nearest"):
        raise ValueError(
            f"Invalid tiebreaker: {tiebreaker}, should be one of "
            '{None, "nearest"} instead.'
        )
    # Build KDTrees
    coords = np.column_stack((x, y))
    to_coords = np.column_stack((to_x, to_y))
    tree = cKDTree(coords)
    to_tree = cKDTree(to_coords)

    # Build distance matrix, identify points to snap (update) and possible ties
    # row (i) is index in x, y; column (j) is index in to_x, to_y
    # Convert to csr for quicker row indexing, and row reductions.
    distances = tree.sparse_distance_matrix(
        to_tree, max_distance=max_distance, output_type="coo_matrix"
    ).tocsr()
    n_per_row = distances.getnnz(axis=1)
    update = n_per_row == 1
    tie = n_per_row > 1

    # Update the points that snap to a single (to_x, to_y) vertex
    xnew = x.copy()
    ynew = y.copy()
    j_update = distances[update].indices
    xnew[update] = to_x[j_update]
    ynew[update] = to_y[j_update]

    # Resolve ties with multiple (to_x, to_y) candidates
    if tie.any():
        if tiebreaker == "nearest":
            ties = distances[tie].tocoo()
            j_nearest = (
                pd.DataFrame({"i": ties.row, "distance": ties.data}, index=ties.col)
                .groupby("i")["distance"]
                .idxmin()
                .to_numpy()
            )
            xnew[tie] = to_x[j_nearest]
            ynew[tie] = to_y[j_nearest]
        elif tiebreaker is None:
            raise ValueError(
                "Ties detected: multiple options to snap to, given max distance: "
                "set a smaller tolerance or specify a tiebreaker."
            )
        else:
            raise ValueError("Invalid tiebreaker")

    return xnew, ynew


@nb.njit(inline="always")
def to_vector(a: Point, b: Point) -> Vector:
    return Vector(b.x - a.x, b.y - a.y)


@nb.njit(inline="always")
def as_point(a: FloatArray) -> Point:
    return Point(a[0], a[1])


def lines_as_edges(line_coords, line_index) -> FloatArray:
    edges = np.empty((len(line_coords) - 1, 2, 2))
    edges[:, 0, :] = line_coords[:-1]
    edges[:, 1, :] = line_coords[1:]
    return edges[np.diff(line_index) == 0]


@nb.njit(inline="always")
def left_of(a: Point, p: Point, U: Vector) -> bool:
    # Whether point a is left of vector U
    # U: p -> q direction vector
    # TODO: maybe add epsilon for floating point
    return U.x * (a.y - p.y) > U.y * (a.x - p.x)


def coerce_geometry(lines: GeoDataFrameType) -> LineArray:
    geometry = lines.geometry.to_numpy()
    geom_type = shapely.get_type_id(geometry)
    if not ((geom_type == 1) | (geom_type == 2)).all():
        raise ValueError("Geometry should contain only LineStrings and/or LinearRings")
    return geometry


@nb.njit(cache=True)
def snap_to_edges(
    face_indices: IntArray,
    intersection_edges: FloatArray,
    face_edge_connectivity: AdjacencyMatrix,
    centroids: FloatArray,
    edge_centroids: FloatArray,
    edges: IntArray,
    segment_index: IntArray,
) -> Tuple[IntArray, IntArray]:
    """
    Snap the intersected edges to the edges of the surrounding face.

    This algorithm works as follows:

    * It takes the intersected edges; any edge (p to q) to test falls fully
      within a single face.
    * For a face, we take the centroid (a).
    * We loop through every edge of the face.
    * If the edge separates the centroid (a) from the centroid of the edge (b)
      we store that edge as a separating edge.

    We test for separation by:

    * Finding whether a and b are located at opposite sides of the half-plane
      created by the edge p -> q (U).
    * Finding whether p and q are located at opposide sides of the half-plane
      created by a -> b (V).
    * The separation test will return False if the lines are collinear. This is
      desirable here, if U runs collinear with V, U doesn't separate a from b.

    Do a minimum amount of work: reuse a_left, only compute V if needed.
    """
    count = 0
    for i in range(len(face_indices)):
        face = face_indices[i]
        a = as_point(centroids[face])
        p = as_point(intersection_edges[i, 0])
        q = as_point(intersection_edges[i, 1])
        U = to_vector(p, q)
        if U.x == 0 and U.y == 0:
            continue

        a_left = left_of(a, p, U)
        for edge in connectivity.neighbors(face_edge_connectivity, face):
            b = as_point(edge_centroids[edge])
            b_left = left_of(b, p, U)
            if a_left != b_left:
                V = to_vector(a, b)
                if left_of(p, a, V) != left_of(q, a, V):
                    segment_index[count] = i
                    edges[count] = edge
                    count += 1

    return edges[:count], segment_index[:count]


def _find_largest_edges(
    edges: FloatArray,
    edge_index: IntArray,
    line_index: IntArray,
):
    max_edge_index = (
        pd.DataFrame(
            data={
                "edge_index": edge_index,
                "length": ((edges[:, 1] - edges[:, 0]) ** 2).sum(axis=1),
            }
        )
        .groupby("edge_index")
        .idxmax()["length"]
        .to_numpy()
    )

    edge_index = edge_index[max_edge_index]
    line_index = line_index[max_edge_index]
    return edge_index, line_index


def _create_output_dataset(
    lines: GeoDataFrameType,
    topology: "xu.Ugrid2d",
    edges: IntArray,
    line_index: IntArray,
) -> xu.UgridDataset:
    uds = xu.UgridDataset(grids=[topology])
    data = np.full(topology.n_edge, np.nan)
    data[edges] = line_index
    uds["line_index"] = xr.DataArray(
        data=data,
        dims=[topology.edge_dimension],
    )
    for column in lines.columns:
        if column == "geometry":
            continue
        data = np.full(topology.n_edge, np.nan)
        data[edges] = lines[column].iloc[line_index]
        uds[column] = xr.DataArray(
            data=data,
            dims=[topology.edge_dimension],
        )
    return uds


def _create_output_gdf(
    lines,
    vertices,
    edge_node_connectivity,
    edges,
    shapely_index,
):
    edge_vertices = vertices[edge_node_connectivity[edges]]
    geometry = shapely.linestrings(edge_vertices)
    return gpd.GeoDataFrame(
        lines.drop(columns="geometry").iloc[shapely_index], geometry=geometry
    )


[docs] def snap_to_grid( lines: GeoDataFrameType, grid: Union[xr.DataArray, xu.UgridDataArray], max_snap_distance: float, ) -> Tuple[IntArray, Union[pd.DataFrame, GeoDataFrameType]]: """ Snap a collection of lines to a grid. A line is included and snapped to a grid edge when the line separates the centroid of the cell with the centroid of the edge. When a line in a cell is snapped to an edge that is **not** shared with another cell, this is denoted with a value of -1 in the second column of ``cell_to_cell``. Parameters ---------- lines: gpd.GeoDataFrame Line data. Geometry colum should contain exclusively LineStrings. grid: xr.DataArray or xu.UgridDataArray of integers Grid of cells to snap lines to. Cells with a value of 0 are not included. max_snap_distance: float Returns ------- cell_to_cell: ndarray of integers with shape ``(N, 2)`` Cells whose centroids are separated from each other by a line. segment_data: pd.DataFrame or gpd.DataFrame Data for every segment. GeoDataFrame if ``return_geometry`` is ``True``. """ if isinstance(grid, Ugrid2d): topology = grid elif isinstance(grid, xr.DataArray): # Convert structured to unstructured representation topology = Ugrid2d.from_structured(grid) elif isinstance(grid, xu.UgridDataArray): topology = grid.ugrid.grid else: raise TypeError( "Expected xarray.DataArray or xugrid.UgridDataArray, received: " f" {type(grid).__name__}" ) vertices = topology.node_coordinates edge_centroids = topology.edge_coordinates edge_node_connectivity = topology.edge_node_connectivity face_edge_connectivity = topology.face_edge_connectivity A = connectivity.to_sparse(face_edge_connectivity, fill_value=-1) n, m = A.shape face_edge_connectivity = AdjacencyMatrix(A.indices, A.indptr, A.nnz, n, m) # Create geometric data line_geometry = coerce_geometry(lines) line_coords, shapely_index = shapely.get_coordinates( line_geometry, return_index=True ) # Snap line_coords to grid x, y = snap_to_nodes( *line_coords.T, *vertices.T, max_snap_distance, tiebreaker="nearest" ) line_edges = lines_as_edges(np.column_stack([x, y]), shapely_index) # Search for intersections. Every edge is potentially divided into smaller # segments: The segment_indices contain (repeated) values of the # # * line_index: for each segment, the index of the shapely geometry. # * face_indices: for each segment, the index of the topology face. # * segment_edges: for each segment, start and end xy-coordinates. # line_index, face_indices, segment_edges = topology.celltree.intersect_edges( line_edges ) # Create edges from the intersected lines a: line can only snap to N - 1 # edges for N edges of a face. Pre-allocate the arrays here. For some # reason, recent versions of numba refuse np.empty or np.zeros calls in # this module (!). # TODO: investigate... # # * edge_index: which edge of the topology (may contain duplicates) # * segment_index: which segment max_n_new_edges = len(face_indices) * topology.n_max_node_per_face - 1 edge_index = np.empty(max_n_new_edges, dtype=IntDType) segment_index = np.empty(max_n_new_edges, dtype=IntDType) edge_index, segment_index = snap_to_edges( face_indices, segment_edges, face_edge_connectivity, topology.centroids, edge_centroids, edge_index, # out segment_index, # out ) line_index = line_index[segment_index] segment_edges = segment_edges[segment_index] # When multiple line parts are snapped to the same edge, use the ones with # the greatest length inside the cell. edges, line_index = _find_largest_edges(segment_edges, edge_index, line_index) shapely_index = shapely_index[line_index] uds = _create_output_dataset(lines, topology, edges, shapely_index) gdf = _create_output_gdf( lines, vertices, edge_node_connectivity, edges, shapely_index ) return uds, gdf