Source code for xugrid.ugrid.burn

from __future__ import annotations

from typing import List, Union

import numba as nb
import numpy as np
import xarray as xr
from numba_celltree.constants import TOLERANCE_ON_EDGE, Point, Triangle
from numba_celltree.geometry_utils import (
    as_point,
    as_triangle,
    cross_product,
    to_vector,
)

import xugrid
from xugrid.constants import FloatArray, IntArray, MissingOptionalModule

try:
    import shapely

except ImportError:
    shapely = MissingOptionalModule("shapely")

try:
    import mapbox_earcut

except ImportError:
    mapbox_earcut = MissingOptionalModule("mapbox_earcut")


@nb.njit(inline="always")
def in_bounds(p: Point, a: Point, b: Point) -> bool:
    """
    Check whether point p falls within the bounding box created by a and b
    (after we've checked the size of the cross product).

    However, we must take into account that a line may be either vertical
    (dx=0) or horizontal (dy=0) and only evaluate the non-zero value.

    If the area created by p, a, b is tiny AND p is within the bounds of a and
    b, the point lies very close to the edge.
    """
    # Already in numba_celltree, unreleased.
    dx = b.x - a.x
    dy = b.y - a.y
    if abs(dx) >= abs(dy):
        if dx > 0:
            return a.x <= p.x and p.x <= b.x
        return b.x <= p.x and p.x <= a.x
    else:
        if dy > 0:
            return a.y <= p.y and p.y <= b.y
        return b.y <= p.y and p.y <= a.y


@nb.njit(inline="always")
def point_in_triangle(p: Point, t: Triangle) -> bool:
    # TODO: move this into numba_celltree instead?
    ap = to_vector(t.a, p)
    bp = to_vector(t.b, p)
    cp = to_vector(t.c, p)
    ab = to_vector(t.a, t.b)
    bc = to_vector(t.b, t.c)
    ca = to_vector(t.c, t.a)
    # Do a half plane check.
    A = cross_product(ab, ap)
    B = cross_product(bc, bp)
    C = cross_product(ca, cp)
    signA = A > 0
    signB = B > 0
    signC = C > 0
    if (signA == signB) and (signB == signC):
        return True

    # Test whether p is located on/very close to edges.
    if (
        (abs(A) < TOLERANCE_ON_EDGE)
        and in_bounds(p, t.a, t.b)
        or (abs(B) < TOLERANCE_ON_EDGE)
        and in_bounds(p, t.b, t.c)
        or (abs(C) < TOLERANCE_ON_EDGE)
        and in_bounds(p, t.c, t.a)
    ):
        return True

    return False


@nb.njit(inline="always", parallel=True, cache=True)
def points_in_triangles(
    points: FloatArray,
    face_indices: IntArray,
    faces: IntArray,
    vertices: FloatArray,
):
    # TODO: move this into numba_celltree instead?
    n_points = len(points)
    inside = np.empty(n_points, dtype=np.bool_)
    for i in nb.prange(n_points):
        face_index = face_indices[i]
        face = faces[face_index]
        triangle = as_triangle(vertices, face)
        point = as_point(points[i])
        inside[i] = point_in_triangle(point, triangle)
    return inside


def _locate_polygon(
    grid: "xu.Ugrid2d",  # type: ignore # noqa
    exterior: FloatArray,
    interiors: List[FloatArray],
    all_touched: bool,
) -> IntArray:
    """
    Locate a single polygon.

    This algorithm burns a polygon vector geometry in a 2d topology by:

    * Extracting the exterior and interiors (holes) coordinates from the
      polygon.
    * Breaking every polygon down into a triangles using an "earcut" algorithm.
    * Searching the grid for these triangles.

    Due to the use of the separating axes theorem, _locate_faces finds all
    intersecting triangles, including those who only touch the edge.
    To enable all_touched=False, we have to search the centroids of candidates
    in the intersecting triangles.

    Parameters
    ----------
    grid: Ugrid2d
    exterior: FloatArray
        Exterior of the polygon.
    interiors: List[FloatArray]
        Interior holes of the polygon.
    all_touched: bool
        Whether to include include cells whose centroid falls inside, of every
        cell that is touched.
    """

    import mapbox_earcut

    rings = np.cumsum([len(exterior)] + [len(interior) for interior in interiors])
    vertices = np.vstack([exterior] + interiors).astype(np.float64)
    triangles = mapbox_earcut.triangulate_float64(vertices, rings).reshape((-1, 3))
    triangle_indices, grid_indices = grid.celltree.locate_faces(vertices, triangles)
    if all_touched:
        return grid_indices
    else:
        centroids = grid.centroids[grid_indices]
        inside = points_in_triangles(
            points=centroids,
            face_indices=triangle_indices,
            faces=triangles,
            vertices=vertices,
        )
        return grid_indices[inside]


def _burn_polygons(
    polygons: "geopandas.GeoSeries",  # type: ignore # noqa
    like: "xugrid.Ugrid2d",
    values: np.ndarray,
    all_touched: bool,
    output: FloatArray,
) -> None:
    exterior_coordinates = [
        shapely.get_coordinates(exterior) for exterior in polygons.exterior
    ]
    interior_coordinates = [
        [shapely.get_coordinates(p_interior) for p_interior in p_interiors]
        for p_interiors in polygons.interiors
    ]
    to_burn = np.empty(like.n_face, dtype=bool)

    for exterior, interiors, value in zip(
        exterior_coordinates, interior_coordinates, values
    ):
        to_burn = _locate_polygon(like, exterior, interiors, all_touched)
        output[to_burn] = value

    return


def _burn_points(
    points: "geopandas.GeoSeries",  # type: ignore # noqa
    like: "xugrid.Ugrid2d",
    values: np.ndarray,
    output: FloatArray,
) -> None:
    """Simply searches the points in the ``like`` 2D topology."""
    xy = shapely.get_coordinates(points)
    to_burn = like.locate_points(xy)
    output[to_burn] = values
    return


def _burn_lines(
    lines: "geopandas.GeoSeries",  # type: ignore # noqa
    like: "xugrid.Ugrid2d",
    values: np.ndarray,
    output: FloatArray,
) -> None:
    """
    Burn the line values into the underlying faces.

    This algorithm breaks any linestring down into edges (two x, y points). We
    search and intersect every edge in the ``like`` grid, the intersections are
    discarded.
    """
    xy, index = shapely.get_coordinates(lines, return_index=True)
    # From the coordinates and the index, create the (n_edge, 2, 2) shape array
    # containing the edge_coordinates.
    linear_index = np.arange(index.size)
    segments = np.column_stack([linear_index[:-1], linear_index[1:]])
    # Only connections with vertices with the same index are valid.
    valid = np.diff(index) == 0
    segments = segments[valid]
    edges = xy[segments]
    # Now query the grid for these edges.
    edge_index, face_index, _ = like.intersect_edges(edges)
    # Find the associated values.
    line_index = index[1:][valid]
    value_index = line_index[edge_index]
    output[face_index] = values[value_index]
    return


[docs] def burn_vector_geometry( gdf: "geopandas.GeoDataframe", # type: ignore # noqa like: Union["xugrid.Ugrid2d", "xugrid.UgridDataArray", "xugrid.UgridDataset"], column: str | None = None, fill: Union[int, float] = np.nan, all_touched: bool = False, ) -> xugrid.UgridDataArray: """ Burn vector geometries (points, lines, polygons) into a Ugrid2d mesh. If no ``column`` argument is provided, a value of 1.0 will be burned in to the mesh. Parameters ---------- gdf: geopandas.GeoDataFrame Polygons, points, and/or lines to be burned into the grid. like: UgridDataArray, UgridDataset, or Ugrid2d Grid to burn the vector data into. column: str, optional Name of the geodataframe column of which to the values to burn into grid. fill: int, float, optional, default value ``np.nan``. Fill value for nodata areas. all_touched: bool, optional, default value ``False``. All mesh faces (cells) touched by polygons will be updated, not just those whose center point is within the polygon. Returns ------- burned: UgridDataArray """ import geopandas as gpd POINT = shapely.GeometryType.POINT LINESTRING = shapely.GeometryType.LINESTRING LINEARRING = shapely.GeometryType.LINEARRING POLYGON = shapely.GeometryType.POLYGON GEOM_NAMES = {v: k for k, v in shapely.GeometryType.__members__.items()} if not isinstance(gdf, gpd.GeoDataFrame): raise TypeError(f"gdf must be GeoDataFrame, received: {type(gdf).__name__}") if isinstance(like, (xugrid.UgridDataArray, xugrid.UgridDataset)): like = like.ugrid.grid if not isinstance(like, xugrid.Ugrid2d): raise TypeError( "Like must be Ugrid2d, UgridDataArray, or UgridDataset;" f"received: {type(like).__name__}" ) geometry_id = shapely.get_type_id(gdf.geometry) allowed_types = (POINT, LINESTRING, LINEARRING, POLYGON) if not np.isin(geometry_id, allowed_types).all(): received = ", ".join( [GEOM_NAMES[geom_id] for geom_id in np.unique(geometry_id)] ) raise TypeError( "GeoDataFrame contains unsupported geometry types. Can only burn " "Point, LineString, LinearRing, and Polygon geometries. Received: " f"{received}" ) points = gdf.loc[geometry_id == POINT] lines = gdf.loc[(geometry_id == LINESTRING) | (geometry_id == LINEARRING)] polygons = gdf.loc[geometry_id == POLYGON] if column is None: point_values = np.ones(len(points), dtype=float) line_values = np.ones(len(lines), dtype=float) poly_values = np.ones(len(polygons), dtype=float) else: point_values = points[column].to_numpy() line_values = lines[column].to_numpy() poly_values = polygons[column].to_numpy() output = np.full(like.n_face, fill) if len(polygons) > 0: _burn_polygons(polygons.geometry, like, poly_values, all_touched, output) if len(lines) > 0: _burn_lines(lines.geometry, like, line_values, output) if len(points) > 0: _burn_points(points.geometry, like, point_values, output) return xugrid.UgridDataArray( obj=xr.DataArray(output, dims=[like.face_dimension], name=column), grid=like, )
def grid_from_earcut_polygons( polygons: "geopandas.GeoDataFrame", # type: ignore # noqa return_index: bool = False, ): import geopandas as gpd if not isinstance(polygons, gpd.GeoDataFrame): raise TypeError(f"Expected GeoDataFrame, received: {type(polygons).__name__}") geometry = polygons.geometry POLYGON = shapely.GeometryType.POLYGON GEOM_NAMES = {v: k for k, v in shapely.GeometryType.__members__.items()} geometry_id = shapely.get_type_id(geometry) if not (geometry_id == POLYGON).all(): GEOM_NAMES = {v: k for k, v in shapely.GeometryType.__members__.items()} received = ", ".join( [GEOM_NAMES[geom_id] for geom_id in np.unique(geometry_id)] ) raise TypeError( "geometry contains unsupported geometry types. Can only triangulate " f"Polygon geometries. Received: {received}" ) # Shapely does not provide a vectorized manner to get the interior rings # easily, an index argument is always required, which is a poor fit for # heterogeneous polygons (where some have no holes, and some may have # hundreds). # map_box_earcut is also not vectorized for polygons, so we simply loop # over the geometries here. exterior_coordinates = [ shapely.get_coordinates(exterior) for exterior in geometry.exterior ] interior_coordinates = [ [shapely.get_coordinates(p_interior) for p_interior in p_interiors] for p_interiors in geometry.interiors ] all_triangles = [] offset = 0 for exterior, interiors in zip(exterior_coordinates, interior_coordinates): rings = np.cumsum([len(exterior)] + [len(interior) for interior in interiors]) vertices = np.vstack([exterior] + interiors).astype(np.float64) triangles = mapbox_earcut.triangulate_float64(vertices, rings).reshape((-1, 3)) all_triangles.append(triangles + offset) offset += len(vertices) face_nodes = np.concatenate(all_triangles).reshape((-1, 3)) all_vertices = shapely.get_coordinates(geometry) node_x = all_vertices[:, 0] node_y = all_vertices[:, 1] grid = xugrid.Ugrid2d(node_x, node_y, -1, face_nodes) if return_index: n_triangles = [len(triangles) for triangles in all_triangles] index = np.repeat(np.arange(len(geometry)), n_triangles) return grid, index else: return grid
[docs] def earcut_triangulate_polygons( polygons: "geopandas.GeoDataframe", # type: ignore # noqa column: str | None = None, ) -> xugrid.UgridDataArray: """ Break down polygons using mapbox_earcut, and create a mesh from the resulting triangles. If no ``column`` argument is provided, the polygon index will be assigned to the grid faces. Parameters ---------- polygons: geopandas.GeoDataFrame Polygons to convert to triangles. column: str, optional Name of the geodataframe column of which to the values to assign to the grid faces. Returns ------- triangulated: UgridDataArray """ grid, index = grid_from_earcut_polygons(polygons, return_index=True) if column is not None: da = ( polygons[column] .reset_index(drop=True) .to_xarray() .isel(index=index) .rename({"index": grid.face_dimension}) ) else: da = xr.DataArray(data=index, dims=(grid.face_dimension,)) return xugrid.UgridDataArray(da, grid)