from itertools import chain
from typing import Any, Dict, Sequence, Tuple, Union
import numpy as np
import pandas as pd
import xarray as xr
from numpy.typing import ArrayLike
import xugrid
from xugrid import conversion
from xugrid.constants import (
FILL_VALUE,
BoolArray,
FloatArray,
FloatDType,
IntArray,
IntDType,
LineArray,
)
from xugrid.core.utils import either_dict_or_kwargs
from xugrid.ugrid import connectivity, conventions
from xugrid.ugrid.ugridbase import AbstractUgrid, as_pandas_index
[docs]
class Ugrid1d(AbstractUgrid):
"""
This class stores the topological data of a "1-D unstructured grid": a
collection of connected line elements, such as a river network.
Parameters
----------
node_x: ndarray of floats
node_y: ndarray of floats
fill_value: int
edge_node_connectivity: ndarray of integers
name: string, optional
Network name. Defaults to "network1d".
dataset: xr.Dataset, optional
indexes: Dict[str, str], optional
When a dataset is provided, a mapping from the UGRID role to the dataset
variable name. E.g. {"face_x": "mesh2d_face_lon"}.
projected: bool, optional
Whether node_x and node_y are longitude and latitude or projected x and
y coordinates. Used to write the appropriate standard_name in the
coordinate attributes.
crs: Any, optional
Coordinate Reference System of the geometry objects. Can be anything accepted by
:meth:`pyproj.CRS.from_user_input() <pyproj.crs.CRS.from_user_input>`,
such as an authority string (eg "EPSG:4326") or a WKT string.
attrs: Dict[str, str], optional
UGRID topology attributes. Should not be provided together with
dataset: if other names are required, update the dataset instead.
A name entry is ignored, as name is given explicitly.
start_index: int, 0 or 1, default is 0.
Start index of the connectivity arrays. Must match the start index
of the provided face_node_connectivity and edge_node_connectivity.
"""
[docs]
def __init__(
self,
node_x: FloatArray,
node_y: FloatArray,
fill_value: int,
edge_node_connectivity: IntArray = None,
name: str = "network1d",
dataset: xr.Dataset = None,
indexes: Dict[str, str] = None,
projected: bool = True,
crs: Any = None,
attrs: Dict[str, str] = None,
start_index: int = 0,
):
self.node_x = np.ascontiguousarray(node_x)
self.node_y = np.ascontiguousarray(node_y)
self.fill_value = fill_value
self.start_index = start_index
self.edge_node_connectivity = edge_node_connectivity - self.start_index
self.name = name
self.projected = projected
self._initialize_indexes_attrs(name, dataset, indexes, attrs)
self._dataset = dataset
# Optional attributes, deferred initialization
# Meshkernel
self._mesh = None
self._meshkernel = None
# Celltree
self._celltree = None
# Bounds
self._xmin = None
self._xmax = None
self._ymin = None
self._ymax = None
# Edges
self._edge_x = None
self._edge_y = None
# Connectivity
self._node_node_connectivity = None
self._node_edge_connectivity = None
# crs
if crs is None:
self.crs = None
else:
import pyproj
self.crs = pyproj.CRS.from_user_input(crs)
[docs]
@classmethod
def from_dataset(cls, dataset: xr.Dataset, topology: str = None):
"""
Extract the 1D UGRID topology information from an xarray Dataset.
Parameters
----------
dataset: xr.Dataset
Dataset containing topology information stored according to UGRID conventions.
Returns
-------
grid: Ugrid1dAdapter
"""
ds = dataset
if not isinstance(ds, xr.Dataset):
raise TypeError(
"Ugrid should be initialized with xarray.Dataset. "
f"Received instead: {type(ds)}"
)
if topology is None:
topology = cls._single_topology(ds)
indexes = {}
# Collect names
connectivity = ds.ugrid_roles.connectivity[topology]
coordinates = ds.ugrid_roles.coordinates[topology]
ugrid_vars = (
[topology]
+ list(connectivity.values())
+ list(chain.from_iterable(chain.from_iterable(coordinates.values())))
)
# Take the first coordinates by default.
# They can be reset with .set_node_coords()
x_index = coordinates["node_coordinates"][0][0]
y_index = coordinates["node_coordinates"][1][0]
node_x_coordinates = ds[x_index].astype(FloatDType).to_numpy()
node_y_coordinates = ds[y_index].astype(FloatDType).to_numpy()
edge_nodes = connectivity["edge_node_connectivity"]
fill_value = ds[edge_nodes].encoding.get("_FillValue", -1)
start_index = ds[edge_nodes].attrs.get("start_index", 0)
edge_node_connectivity = cls._prepare_connectivity(
ds[edge_nodes], fill_value, dtype=IntDType
).to_numpy()
indexes["node_x"] = x_index
indexes["node_y"] = y_index
projected = False # TODO
return cls(
node_x_coordinates,
node_y_coordinates,
fill_value,
edge_node_connectivity,
name=topology,
dataset=dataset[ugrid_vars],
indexes=indexes,
projected=projected,
crs=None,
start_index=start_index,
)
def _clear_geometry_properties(self):
"""Clear all properties that may have been invalidated"""
# Meshkernel
self._mesh = None
self._meshkernel = None
# Celltree
self._celltree = None
# Bounds
self._xmin = None
self._xmax = None
self._ymin = None
self._ymax = None
# Edges
self._edge_x = None
self._edge_y = None
[docs]
@classmethod
def from_meshkernel(
cls,
mesh,
name: str = "network1d",
projected: bool = True,
crs: Any = None,
):
"""
Create a 1D UGRID topology from a MeshKernel Mesh1d object.
Parameters
----------
mesh: MeshKernel.Mesh2d
name: str
Mesh name. Defaults to "network1d".
projected: bool
Whether node_x and node_y are longitude and latitude or projected x and
y coordinates. Used to write the appropriate standard_name in the
coordinate attributes.
crs: Any, optional
Coordinate Reference System of the geometry objects. Can be anything accepted by
:meth:`pyproj.CRS.from_user_input() <pyproj.crs.CRS.from_user_input>`,
such as an authority string (eg "EPSG:4326") or a WKT string.
Returns
-------
grid: Ugrid1d
"""
return cls(
mesh.node_x,
mesh.node_y,
fill_value=FILL_VALUE,
edge_node_connectivity=mesh.edge_nodes.reshape((-1, 2)),
name=name,
projected=projected,
crs=crs,
)
[docs]
def to_dataset(
self, other: xr.Dataset = None, optional_attributes: bool = False
) -> xr.Dataset:
node_x = self._indexes["node_x"]
node_y = self._indexes["node_y"]
edge_nodes = self._attrs["edge_node_connectivity"]
edge_nodes_attrs = conventions.DEFAULT_ATTRS["edge_node_connectivity"]
data_vars = {
self.name: 0,
edge_nodes: xr.DataArray(
data=self._adjust_connectivity(self.edge_node_connectivity),
attrs=edge_nodes_attrs,
dims=(self.edge_dimension, "two"),
),
}
attrs = {"Conventions": "CF-1.9 UGRID-1.0"}
if other is not None:
attrs.update(other.attrs)
dataset = xr.Dataset(data_vars, attrs=attrs)
if self._dataset:
dataset = dataset.merge(self._dataset, compat="override")
if other is not None:
dataset = dataset.merge(other)
if node_x not in dataset or node_y not in dataset:
dataset = self.assign_node_coords(dataset)
if optional_attributes:
dataset = self.assign_edge_coords(dataset)
dataset[self.name].attrs = self._filtered_attrs(dataset)
return dataset
@property
def topology_dimension(self):
"""Highest dimensionality of the geometric elements: 1"""
return 1
@property
def core_dimension(self):
return self.node_dimension
@property
def dims(self):
"""Set of UGRID dimension names: node dimension, edge dimension."""
return {self.node_dimension, self.edge_dimension}
@property
def sizes(self):
return {self.node_dimension: self.n_node, self.edge_dimension: self.n_edge}
@property
def coords(self):
"""Dictionary for grid coordinates."""
return {
self.node_dimension: self.node_coordinates,
self.edge_dimension: self.edge_coordinates,
}
[docs]
def get_coordinates(self, dim: str) -> FloatArray:
"""Return the coordinates for the specified UGRID dimension."""
if dim == self.node_dimension:
return self.node_coordinates
elif dim == self.edge_dimension:
return self.edge_coordinates
else:
raise ValueError(
f"Expected {self.node_dimension} or {self.edge_dimension}; got: {dim}"
)
[docs]
def get_connectivity_matrix(self, dim: str, xy_weights: bool):
"""Return the connectivity matrix for the specified UGRID dimension."""
if dim == self.node_dimension:
connectivity = self.node_node_connectivity.copy()
coordinates = self.node_coordinates
else:
raise ValueError(f"Expected {self.node_dimension}; got: {dim}")
if xy_weights:
connectivity.data = self._connectivity_weights(connectivity, coordinates)
return connectivity
# These are all optional attributes. They are not computed by default, only
# when called upon.
@property
def mesh(self) -> "mk.Mesh1d": # type: ignore # noqa
"""
Create if needed, and return meshkernel Mesh1d object.
Returns
-------
mesh: meshkernel.Mesh1d
"""
import meshkernel as mk
if self._mesh is None:
edge_nodes = self.edge_node_connectivity.ravel().astype(np.int32)
self._mesh = mk.Mesh1d(
node_x=self.node_x,
node_y=self.node_y,
edge_nodes=edge_nodes,
)
return self._mesh
@property
def meshkernel(self) -> "mk.MeshKernel": # type: ignore # noqa
"""
Create if needed, and return meshkernel MeshKernel instance.
Returns
-------
meshkernel: meshkernel.MeshKernel
"""
import meshkernel as mk
if self._meshkernel is None:
if self.is_geographic:
mk_projection = mk.ProjectionType.SPHERICAL
else:
mk_projection = mk.ProjectionType.CARTESIAN
self._meshkernel = mk.MeshKernel(mk_projection)
self._meshkernel.mesh1d_set(self.mesh)
return self._meshkernel
[docs]
@classmethod
def from_geodataframe(cls, geodataframe: "geopandas.GeoDataFrame") -> "Ugrid1d": # type: ignore # noqa
"""
Convert geodataframe of linestrings into a UGRID1D topology.
Parameters
----------
geodataframe: geopandas GeoDataFrame
Returns
-------
topology: Ugrid1d
"""
import geopandas as gpd
if not isinstance(geodataframe, gpd.GeoDataFrame):
raise TypeError(
f"Expected GeoDataFrame, received: {type(geodataframe).__name__}"
)
return cls.from_shapely(geodataframe.geometry.to_numpy(), crs=geodataframe.crs)
[docs]
@staticmethod
def from_shapely(geometry: LineArray, crs=None) -> "Ugrid1d":
"""
Convert an array of shapely linestrings to UGRID1D topology.
Parameters
----------
geometry: np.ndarray of shapely linestrings
crs: Any, optional
Coordinate Reference System of the geometry objects. Can be anything accepted by
:meth:`pyproj.CRS.from_user_input() <pyproj.crs.CRS.from_user_input>`,
such as an authority string (eg "EPSG:4326") or a WKT string.
"""
import shapely
if not (shapely.get_type_id(geometry) == shapely.GeometryType.LINESTRING).all():
raise TypeError(
"Can only create Ugrid1d from shapely LineString geometries, "
"geometry contains other types of geometries."
)
x, y, edge_node_connectivity = conversion.linestrings_to_edges(geometry)
return Ugrid1d(x, y, FILL_VALUE, edge_node_connectivity, crs=crs)
def to_pygeos(self, dim):
from warnings import warn
warn(
".to_pygeos has been deprecated. Use .to_shapely instead.",
DeprecationWarning,
)
return self.to_shapely(dim)
[docs]
def to_shapely(self, dim):
"""
Convert UGRID topology to shapely objects.
* nodes: points
* edges: linestrings
Parameters
----------
dim: str
Node or edge dimension.
Returns
-------
geometry: ndarray of shapely.Geometry
"""
if dim == self.node_dimension:
return conversion.nodes_to_points(
self.node_x,
self.node_y,
)
elif dim == self.edge_dimension:
return conversion.edges_to_linestrings(
self.node_x,
self.node_y,
self.edge_node_connectivity,
)
else:
raise ValueError(
f"Dimension {dim} is not a node or edge dimension of the"
" Ugrid1d topology."
)
[docs]
def isel(self, indexers=None, return_index=False, **indexers_kwargs):
"""
Select based on node or edge.
Edge selection always results in a valid UGRID topology. Node selection
may result in invalid topologies (incomplete edges), and will error in
such a case.
Parameters
----------
indexers: dict of str to np.ndarray of integers or bools
return_index: bool, optional
Whether to return node_index, edge_index.
Returns
-------
obj: xr.Dataset or xr.DataArray
grid: Ugrid2d
indexes: dict
Dictionary with keys node dimension, edge dimension and values
their respective index. Only returned if return_index is True.
"""
indexers = either_dict_or_kwargs(indexers, indexers_kwargs, "isel")
alldims = self.dims
invalid = indexers.keys() - alldims
if invalid:
raise ValueError(
f"Dimensions {invalid} do not exist. Expected one of {alldims}"
)
indexers = {k: as_pandas_index(v, self.sizes[k]) for k, v in indexers.items()}
nodedim = self.node_dimension
edgedim = self.edge_dimension
edge_index = {}
if nodedim in indexers:
node_index = indexers[nodedim]
edge_index[nodedim] = np.unique(
self.node_edge_connectivity[node_index].data
)
if edgedim in indexers:
edge_index[edgedim] = indexers[edgedim]
# Convert all to pandas index.
edge_index = {k: as_pandas_index(v, self.n_edge) for k, v in edge_index.items()}
# Check the indexes against each other.
index = self._precheck(edge_index)
grid, finalized_indexers = self.topology_subset(index, return_index=True)
self._postcheck(indexers, finalized_indexers)
if return_index:
return grid, finalized_indexers
else:
return grid
def _validate_indexer(self, indexer) -> Tuple[float, float]:
if isinstance(indexer, slice):
if indexer.step is not None:
raise ValueError("Ugrid1d does not support steps in slices")
if indexer.start >= indexer.stop:
raise ValueError("slice start should be smaller than slice stop")
else:
raise ValueError("Ugrid1d only supports slice indexing")
return indexer.start, indexer.stop
[docs]
def sel(self, obj, x, y) -> Tuple:
"""
Select a selection of edges, based on edge centroids.
Parameters
----------
x: slice
y: slice
Returns
-------
dimension: str
as_ugrid: bool
index: 1d array of integers
coords: dict
"""
xmin, xmax = self._validate_indexer(x)
ymin, ymax = self._validate_indexer(y)
edge_index = np.nonzero(
(self.edge_x >= xmin)
& (self.edge_x < xmax)
& (self.edge_y >= ymin)
& (self.edge_y < ymax)
)[0]
grid, indexes = self.topology_subset(edge_index, return_index=True)
indexes = {k: v for k, v in indexes.items() if k in obj.dims}
new_obj = obj.isel(indexes)
return new_obj, grid
[docs]
def topology_subset(
self, edge_index: Union[BoolArray, IntArray], return_index: bool = False
):
"""
Create a new UGRID1D topology for a subset of this topology.
Parameters
----------
edge_index: 1d array of integers or bool
Edges of the subset.
return_index: bool, optional
Whether to return node_index, edge_index.
Returns
-------
subset: Ugrid1d
indexes: dict
Dictionary with keys node dimension and edge dimension and values
their respective index. Only returned if return_index is True.
"""
if not isinstance(edge_index, pd.Index):
edge_index = as_pandas_index(edge_index, self.n_edge)
range_index = pd.RangeIndex(0, self.n_edge)
if edge_index.size == self.n_edge and edge_index.equals(range_index):
if return_index:
indexes = {
self.node_dimension: pd.RangeIndex(0, self.n_node),
self.edge_dimension: range_index,
}
return self, indexes
else:
return self
# N.B. edges do not contain fill values, as there are always two nodes
# required to form an edge.
edge_subset = self.edge_node_connectivity[edge_index]
node_index = np.unique(edge_subset.ravel())
new_edges = connectivity.renumber(edge_subset)
node_x = self.node_x[node_index]
node_y = self.node_y[node_index]
grid = Ugrid1d(
node_x,
node_y,
FILL_VALUE,
new_edges,
name=self.name,
indexes=self._indexes,
projected=self.projected,
crs=self.crs,
attrs=self._attrs,
)
self._propagate_properties(grid)
if return_index:
indexes = {
self.node_dimension: pd.Index(node_index),
self.edge_dimension: edge_index,
}
return grid, indexes
else:
return grid
def clip_box(
self,
xmin: float,
ymin: float,
xmax: float,
ymax: float,
):
return self.sel(x=slice(xmin, xmax), y=slice(ymin, ymax))
def sel_points(self, obj, x, y, out_of_bounds, fill_value):
return obj
def intersect_line(self, obj, start, stop):
return obj
def intersect_linestring(self, obj, linestring):
return obj
def to_periodic(self, obj):
return self, obj
def to_nonperiodic(self, xmax, obj):
return self, obj
[docs]
def topological_sort_by_dfs(self) -> IntArray:
"""
Return an array of vertices in topological order.
Returns
-------
sorted_vertices: np.ndarray of integer
"""
return connectivity.topological_sort_by_dfs(
self.directed_node_node_connectivity
)
[docs]
def contract_vertices(self, indices: IntArray) -> "Ugrid1d":
"""
Return a simplified network topology by removing all nodes that are
not listed in ``indices``.
Parameters
----------
indices: np.ndarray of integers
Returns
-------
contracted: Ugrid1d
"""
edges = connectivity.contract_vertices(
self.directed_node_node_connectivity, indices
)
node_index = np.unique(edges.ravel())
new_edges = connectivity.renumber(edges)
return Ugrid1d(
node_x=self.node_x[node_index],
node_y=self.node_y[node_index],
fill_value=self.fill_value,
edge_node_connectivity=new_edges,
name=self.name,
indexes=self._indexes,
projected=self.projected,
crs=self.crs,
attrs=self._attrs,
)
[docs]
@staticmethod
def merge_partitions(
grids: Sequence["Ugrid1d"]
) -> tuple["Ugrid1d", dict[str, np.array]]:
"""
Merge grid partitions into a single whole.
Duplicate edges are included only once, and removed from subsequent
partitions before merging.
Parameters
----------
grids: sequence of Ugrid1d
Returns
-------
merged: Ugrid1d
"""
from xugrid.ugrid import partitioning
# Grab a sample grid
grid = next(iter(grids))
fill_value = grid.fill_value
node_coordinates, node_indexes, node_inverse = partitioning.merge_nodes(grids)
new_edges, edge_indexes = partitioning.merge_edges(grids, node_inverse)
indexes = {
grid.node_dimension: node_indexes,
grid.edge_dimension: edge_indexes,
}
merged_grid = Ugrid1d(
*node_coordinates.T,
fill_value,
new_edges,
name=grid.name,
indexes=grid._indexes,
projected=grid.projected,
crs=grid.crs,
attrs=grid._attrs,
)
return merged_grid, indexes
[docs]
def reindex_like(
self,
other: "Ugrid1d",
obj: Union[xr.DataArray, xr.Dataset],
tolerance: float = 0.0,
):
"""
Conform a DataArray or Dataset to match the topology of another Ugrid1D
topology. The topologies must be exactly equivalent: only the order of
the nodes and edges may differ.
Parameters
----------
other: Ugrid1d
obj: DataArray or Dataset
tolerance: float, default value 0.0.
Maximum distance between inexact coordinate matches.
Returns
-------
reindexed: DataArray or Dataset
"""
if not isinstance(other, Ugrid1d):
raise TypeError(f"Expected Ugrid1d, received: {type(other).__name__}")
indexers = {
self.node_dimension: connectivity.index_like(
xy_a=self.node_coordinates,
xy_b=other.node_coordinates,
tolerance=tolerance,
),
self.edge_dimension: connectivity.index_like(
xy_a=self.edge_coordinates,
xy_b=other.edge_coordinates,
tolerance=tolerance,
),
}
return obj.isel(indexers, missing_dims="ignore")
[docs]
def create_data_array(self, data: ArrayLike, facet: str) -> "xugrid.UgridDataArray":
"""
Create a UgridDataArray from this grid and a 1D array of values.
Parameters
----------
data: array like
Values for this array. Must be a ``numpy.ndarray`` or castable to
it.
grid: Ugrid1d, Ugrid2d
facet: str
With which facet to associate the data. Options for Ugrid1d are,
``"node"`` or ``"edge"``. Options for Ugrid2d are ``"node"``,
``"edge"``, or ``"face"``.
Returns
-------
uda: UgridDataArray
"""
if facet == "node":
dimension = self.node_dimension
elif facet == "edge":
dimension = self.edge_dimension
else:
raise ValueError(f"Invalid facet: {facet}. Must be one of: node, edge.")
return self._create_data_array(data, dimension)