from typing import Dict, List, Optional, Sequence, Tuple, Union
import numpy as np
import scipy.sparse
import xarray as xr
# from xugrid.plot.pyvista import to_pyvista_grid
from xugrid.core.accessorbase import AbstractUgridAccessor
from xugrid.core.utils import UncachedAccessor
from xugrid.core.wrap import UgridDataArray, UgridDataset
from xugrid.plot.plot import _PlotMethods
from xugrid.ugrid import connectivity
from xugrid.ugrid.interpolate import (
interpolate_na_helper,
laplace_interpolate,
nearest_interpolate,
)
from xugrid.ugrid.ugrid1d import Ugrid1d
from xugrid.ugrid.ugrid2d import Ugrid2d
from xugrid.ugrid.ugridbase import UgridType
[docs]
class UgridDataArrayAccessor(AbstractUgridAccessor):
"""
This "accessor" makes operations using the UGRID topology available via the
``.ugrid`` attribute for UgridDataArrays and UgridDatasets.
"""
[docs]
def __init__(self, obj: xr.DataArray, grid: UgridType):
self.obj = obj
self.grid = grid
@property
def grids(self) -> List[UgridType]:
"""
The UGRID topology of this DataArry, as a list. Included for
consistency with UgridDataset.
"""
return [self.grid]
@property
def name(self) -> str:
"""Name of the UGRID topology of this DataArray."""
return self.grid.name
@property
def names(self) -> List[str]:
"""
Name of the UGRID topology, as a list. Included for consistency with
UgridDataset.
"""
return [self.grid.name]
@property
def topology(self) -> Dict[str, UgridType]:
"""Mapping from name to UGRID topology."""
return {self.name: self.grid}
@property
def bounds(self) -> Dict[str, Tuple]:
"""
Mapping from grid name to tuple containing ``minx, miny, maxx, maxy``
values of the grid's node coordinates.
"""
return {self.grid.name: self.grid.bounds}
@property
def total_bounds(self) -> Tuple:
"""
Returns a tuple containing ``minx, miny, maxx, maxy`` values of the grid's
node coordinates.
"""
return next(iter(self.bounds.values()))
plot = UncachedAccessor(_PlotMethods)
[docs]
def rename(self, name: str) -> UgridDataArray:
"""
Give a new name to the UGRID topology and update the associated
coordinate and dimension names in the DataArray.
Parameters
----------
name: str
The new name of the topology.
"""
obj = self.obj
new_grid, name_dict = self.grid.rename(name, return_name_dict=True)
to_rename = tuple(obj.coords) + tuple(obj.dims)
new_obj = obj.rename({k: v for k, v in name_dict.items() if k in to_rename})
return UgridDataArray(new_obj, new_grid)
[docs]
def assign_node_coords(self) -> UgridDataArray:
"""
Assign node coordinates from the grid to the object.
Returns a new object with all the original data in addition to the new
node coordinates of the grid.
Returns
-------
assigned: UgridDataset
"""
return UgridDataArray(self.grid.assign_node_coords(self.obj), self.grid)
[docs]
def assign_edge_coords(self) -> UgridDataArray:
"""
Assign edge coordinates from the grid to the object.
Returns a new object with all the original data in addition to the new
node coordinates of the grid.
Returns
-------
assigned: UgridDataset
"""
return UgridDataArray(self.grid.assign_edge_coords(self.obj), self.grid)
[docs]
def assign_face_coords(self) -> UgridDataArray:
"""
Assign face coordinates from the grid to the object.
Returns a new object with all the original data in addition to the new
node coordinates of the grid.
Returns
-------
assigned: UgridDataset
"""
if self.grid.topology_dimension == 1:
raise TypeError("Cannot set face coords from a Ugrid1D topology")
return UgridDataArray(self.grid.assign_face_coords(self.obj), self.grid)
[docs]
def set_node_coords(self, node_x: str, node_y: str):
"""
Given names of x and y coordinates of the nodes of an object, set them
as the coordinates in the grid.
Parameters
----------
node_x: str
Name of the x coordinate of the nodes in the object.
node_y: str
Name of the y coordinate of the nodes in the object.
"""
self.grid.set_node_coords(node_x, node_y, self.obj)
[docs]
def sel(self, x=None, y=None):
"""
Return a new object, a subselection in the UGRID x and y coordinates.
The indexing for x and y always occurs orthogonally, i.e.:
``.sel(x=[0.0, 5.0], y=[10.0, 15.0])`` results in a four points. For
vectorized indexing (equal to ``zip``ing through x and y), see
``.sel_points``.
Depending on the nature of the x and y indexers, a xugrid or xarray
object is returned:
* slice without step: ``x=slice(-100, 100)``: returns xugrid object, a
part of the unstructured grid.
* slice with step: ``x=slice(-100, 100, 10)``: returns xarray object, a
series of points (x=[-100, -90, -80, ..., 90, 100]).
* a scalar: ``x=5.0``: returns xarray object, a point.
* an array: ``x=[1.0, 15.0, 17.0]``: returns xarray object, a series of
points.
Parameters
----------
x: float, 1d array, slice
y: float, 1d array, slice
Returns
-------
selection: Union[UgridDataArray, UgridDataset, xr.DataArray, xr.Dataset]
"""
result = self.grid.sel(self.obj, x, y)
if isinstance(result, tuple):
return UgridDataArray(*result)
else:
return result
[docs]
def sel_points(self, x, y, out_of_bounds="warn", fill_value=np.nan):
"""
Select points in the unstructured grid.
Out-of-bounds points are ignored. They may be identified via the
``index`` coordinate of the returned selection.
Parameters
----------
x: ndarray of floats with shape ``(n_points,)``
y: ndarray of floats with shape ``(n_points,)``
out_of_bounds: str, default: "warn"
What to do when points are located outside of any feature:
* raise: raise a ValueError.
* ignore: return ``fill_value`` for the out of bounds points.
* warn: give a warning and return NaN for the out of bounds points.
* drop: drop the out of bounds points. They may be identified
via the ``index`` coordinate of the returned selection.
fill_value: scalar, DataArray, Dataset, or callable, optional, default: np.nan
Value to assign to out-of-bounds points if out_of_bounds is warn
or ignore. Forwarded to xarray's ``.where()`` method.
Returns
-------
points: Union[xr.DataArray, xr.Dataset]
"""
return self.grid.sel_points(self.obj, x, y, out_of_bounds, fill_value)
[docs]
def rasterize(self, resolution: float) -> xr.DataArray:
"""
Rasterize unstructured grid by sampling.
Parameters
----------
resolution: float
Spacing in x and y.
Returns
-------
rasterized: xr.DataArray
"""
x, y, index = self.grid.rasterize(resolution)
return self._raster(x, y, index)
[docs]
def rasterize_like(self, other: Union[xr.DataArray, xr.Dataset]) -> xr.DataArray:
"""
Rasterize unstructured grid by sampling on the x and y coordinates
of ``other``.
Parameters
----------
resolution: float
Spacing in x and y.
other: Union[xr.DataArray, xr.Dataset]
Object to take x and y coordinates from.
Returns
-------
rasterized: xr.DataArray
"""
x, y, index = self.grid.rasterize_like(
x=other["x"].to_numpy(),
y=other["y"].to_numpy(),
)
return self._raster(x, y, index)
[docs]
def to_periodic(self):
"""
Convert this grid to a periodic grid, where the rightmost boundary
shares its nodes with the leftmost boundary.
Returns
-------
periodic: UgridDataArray
"""
grid, obj = self.grid.to_periodic(obj=self.obj)
return UgridDataArray(obj, grid)
[docs]
def to_nonperiodic(self, xmax: float):
"""
Convert this grid from a periodic grid (where the rightmost boundary shares its
nodes with the leftmost boundary) to an aperiodic grid, where the leftmost nodes
are separate from the rightmost nodes.
Parameters
----------
xmax: float
The x-value of the newly created rightmost boundary nodes.
Returns
-------
nonperiodic: UgridDataArray
"""
grid, obj = self.grid.to_nonperiodic(xmax=xmax, obj=self.obj)
return UgridDataArray(obj, grid)
[docs]
def intersect_line(
self, start: Sequence[float], end: Sequence[float]
) -> xr.DataArray:
"""
Intersect a line with the grid of this data, and fetch the values of
the intersected faces.
Parameters
----------
obj: xr.DataArray or xr.Dataset
start: sequence of two floats
coordinate pair (x, y), designating the start point of the line.
end: sequence of two floats
coordinate pair (x, y), designating the end point of the line.
Returns
-------
intersection: xr.DataArray
The length along the line is returned as the "s" coordinate.
"""
return self.grid.intersect_line(self.obj, start, end)
[docs]
def intersect_linestring(self, linestring) -> xr.DataArray:
"""
Intersect the grid along a collection of linestrings. Returns a new DataArray
with the values for each intersected segment.
Parameters
----------
linestring: shapely.LineString
Returns
-------
intersection: xr.DataArray
The length along the linestring is returned as the "s" coordinate.
"""
return self.grid.intersect_linestring(self.obj, linestring)
@property
def crs(self):
"""
The Coordinate Reference System (CRS) represented as a ``pyproj.CRS`` object.
Returns None if the CRS is not set.
Returns
-------
crs: dict
A dictionary containing the name of the grid and its CRS.
"""
return {self.grid.name: self.grid.crs}
[docs]
def set_crs(
self,
crs: Union["pyproj.CRS", str] = None, # type: ignore # noqa
epsg: int = None,
allow_override: bool = False,
):
"""
Set the Coordinate Reference System (CRS) of a UGRID topology.
NOTE: The underlying geometries are not transformed to this CRS. To
transform the geometries to a new CRS, use the ``to_crs`` method.
Parameters
----------
crs : pyproj.CRS, optional if `epsg` is specified
The value can be anything accepted
by :meth:`pyproj.CRS.from_user_input() <pyproj.crs.CRS.from_user_input>`,
such as an authority string (eg "EPSG:4326") or a WKT string.
epsg : int, optional if `crs` is specified
EPSG code specifying the projection.
allow_override : bool, default False
If the the UGRID topology already has a CRS, allow to replace the
existing CRS, even when both are not equal.
"""
self.grid.set_crs(crs, epsg, allow_override)
[docs]
def to_crs(
self,
crs: Union["pyproj.CRS", str] = None, # type: ignore # noqa
epsg: int = None,
) -> UgridDataArray:
"""
Transform geometries to a new coordinate reference system.
Transform all geometries in an active geometry column to a different coordinate
reference system. The ``crs`` attribute on the current Ugrid must
be set. Either ``crs`` or ``epsg`` may be specified for output.
This method will transform all points in all objects. It has no notion
of projecting the cells. All segments joining points are assumed to be
lines in the current projection, not geodesics. Objects crossing the
dateline (or other projection boundary) will have undesirable behavior.
Parameters
----------
crs : pyproj.CRS, optional if `epsg` is specified
The value can be anything accepted by
:meth:`pyproj.CRS.from_user_input() <pyproj.crs.CRS.from_user_input>`,
such as an authority string (eg "EPSG:4326") or a WKT string.
epsg : int, optional if `crs` is specified
EPSG code specifying output projection.
"""
uda = UgridDataArray(self.obj, self.grid.to_crs(crs, epsg))
if self.grid.node_dimension in self.obj.dims:
return uda.ugrid.assign_node_coords()
else:
return uda
[docs]
def to_geodataframe(
self, name: str = None, dim_order=None
) -> "geopandas.GeoDataFrame": # type: ignore # noqa
"""
Convert data and topology of one facet (node, edge, face) of the grid
to a geopandas GeoDataFrame. This also determines the geometry type of
the geodataframe:
* node: point
* edge: line
* face: polygon
Parameters
----------
dim: str
node, edge, or face dimension. Inferred for DataArray.
name: str
Name to give to the array (required if unnamed).
dim_order:
Hierarchical dimension order for the resulting dataframe. Array content is
transposed to this order and then written out as flat vectors in contiguous
order, so the last dimension in this list will be contiguous in the resulting
DataFrame. This has a major influence on which operations are efficient on the
resulting dataframe.
If provided, must include all dimensions of this DataArray. By default,
dimensions are sorted according to the DataArray dimensions order.
Returns
-------
geodataframe: gpd.GeoDataFrame
"""
import geopandas as gpd
dim = self.obj.dims[-1]
if name is not None:
ds = self.obj.to_dataset(name=name)
else:
ds = self.obj.to_dataset()
variables = [var for var in ds.data_vars if dim in ds.variables[var].dims]
# TODO deal with time-dependent data, etc.
# Basically requires checking which variables are static, which aren't.
# For non-static, requires repeating all geometries.
# Call reset_index on multi-index to generate them as regular columns.
df = ds[variables].to_dataframe(dim_order=dim_order)
geometry = self.grid.to_shapely(dim)
return gpd.GeoDataFrame(df, geometry=geometry, crs=self.grid.crs)
[docs]
def reindex_like(
self,
other: Union[UgridType, UgridDataArray, UgridDataset],
tolerance: float = 0.0,
):
"""
Conform this object to match the topology of another object. The
topologies must be exactly equivalent: only the order of the nodes,
edges, and faces may differ.
Dimension names must match for equivalent topologies.
Parameters
----------
other: Ugrid1d, Ugrid2d, UgridDataArray, UgridDataset
obj: DataArray or Dataset
tolerance: float, default value 0.0.
Maximum distance between inexact coordinate matches.
Returns
-------
reindexed: UgridDataArray
"""
if isinstance(other, (Ugrid1d, Ugrid2d)):
other_grid = other
elif isinstance(other, (UgridDataArray, UgridDataset)):
other_grid = other.ugrid.grid
else:
raise TypeError(
"Expected Ugrid1d, Ugrid2d, UgridDataArray, or UgridDataset,"
f"received instead: {type(other).__name__}"
)
new_obj = self.grid.reindex_like(other_grid, obj=self.obj, tolerance=tolerance)
return UgridDataArray(new_obj, other_grid)
def _binary_iterate(self, iterations: int, mask, value, border_value):
if border_value == value:
exterior = self.grid.exterior_faces
else:
exterior = None
if mask is not None:
mask = mask.to_numpy()
obj = self.obj
if isinstance(obj, xr.DataArray):
output = connectivity._binary_iterate(
self.grid.face_face_connectivity,
obj.to_numpy(),
value,
iterations,
mask,
exterior,
border_value,
)
da = obj.copy(data=output)
return UgridDataArray(da, self.grid.copy())
elif isinstance(obj, xr.Dataset):
raise NotImplementedError
else:
raise ValueError("object should be a xr.DataArray")
[docs]
def binary_dilation(
self,
iterations: int = 1,
mask=None,
border_value=False,
):
"""
Binary dilation can be used on a boolean array to expand the "shape" of
features.
Compare with :py:func:`scipy.ndimage.binary_dilation`.
Parameters
----------
iterations: int, default: 1
mask: 1d array of bool, optional
border_value: bool, default value: False
Returns
-------
dilated: UgridDataArray
"""
return self._binary_iterate(iterations, mask, True, border_value)
[docs]
def binary_erosion(
self,
iterations: int = 1,
mask=None,
border_value=False,
):
"""
Binary erosion can be used on a boolean array to shrink the "shape" of
features.
Compare with :py:func:`scipy.ndimage.binary_erosion`.
Parameters
----------
iterations: int, default: 1
mask: 1d array of bool, optional
border_value: bool, default value: False
Returns
-------
eroded: UgridDataArray
"""
return self._binary_iterate(iterations, mask, False, border_value)
[docs]
def connected_components(self):
"""
Every edge or face is given a component number. If all are connected,
all will have the same number.
Wraps :py:func:`scipy.sparse.csgraph.connected_components``.
Returns
-------
labelled: UgridDataArray
"""
_, labels = scipy.sparse.csgraph.connected_components(
self.grid.face_face_connectivity
)
return UgridDataArray(
xr.DataArray(labels, dims=[self.grid.face_dimension]),
self.grid,
)
[docs]
def reverse_cuthill_mckee(self):
"""
Reduces bandwith of the connectivity matrix.
Wraps :py:func:`scipy.sparse.csgraph.reverse_cuthill_mckee`.
Returns
-------
reordered: Union[UgridDataArray, UgridDataset]
"""
grid = self.grid
reordered_grid, reordering = self.grid.reverse_cuthill_mckee()
reordered_data = self.obj.isel({grid.face_dimension: reordering})
# TODO: this might not work properly if e.g. centroids are stored in obj.
# Not all metadata would be reordered.
return UgridDataArray(
reordered_data,
reordered_grid,
)
[docs]
def interpolate_na(
self,
method: str = "nearest",
max_distance: Optional[float] = None,
):
"""
Fill in NaNs by interpolating.
This function automatically finds the UGRID dimension and broadcasts
over the other dimensions.
Parameters
----------
method: str, default is "nearest"
Currently the only supported method.
max_distance: nonnegative float, optional.
Use ``None`` for no maximum distance.
Returns
-------
filled: UgridDataArray of floats
"""
if method != "nearest":
raise ValueError(f'"{method}" is not a valid interpolator.')
if max_distance is None:
max_distance = np.inf
grid = self.grid
da = self.obj
ugrid_dim = grid.find_ugrid_dim(da)
da_filled = interpolate_na_helper(
da,
ugrid_dim=ugrid_dim,
func=nearest_interpolate,
kwargs={
"coordinates": grid.get_coordinates(ugrid_dim),
"max_distance": max_distance,
},
)
return UgridDataArray(da_filled, grid)
[docs]
def laplace_interpolate(
self,
xy_weights: bool = True,
direct_solve: bool = False,
delta=0.0,
relax=0.0,
rtol=1.0e-5,
atol=0.0,
maxiter: int = 500,
):
"""
Fill in NaNs by using Laplace interpolation.
This function automatically finds the UGRID dimension and broadcasts
over the other dimensions.
This solves Laplace's equation where where there is no data, with data
values functioning as fixed potential boundary conditions.
Note that an iterative solver method will be required for large grids.
In this case, some experimentation with the solver settings may be
required to find a converging solution of sufficient accuracy. Refer to
the documentation of :py:func:`scipy.sparse.linalg.spilu` and
:py:func:`scipy.sparse.linalg.cg`.
Data can be interpolated from nodes or faces. Direct interpolation of edge
associated data is not allowed. Instead, create node associated data first,
then translate that data to the edges.
Parameters
----------
xy_weights: bool, default False.
Wether to use the inverse of the centroid to centroid distance in
the coefficient matrix. If ``False``, defaults to uniform
coefficients of 1 so that each face connection has equal weight.
direct_solve: bool, optional, default ``False``
Whether to use a direct or an iterative solver or a conjugate gradient
solver. Direct method provides an exact answer, but are unsuitable
for large problems.
delta: float, default 0.0.
ILU0 preconditioner non-diagonally dominant correction.
relax: float, default 0.0.
Modified ILU0 preconditioner relaxation factor.
rtol: float, optional, default 1.0e-5.
Convergence tolerance for ``scipy.sparse.linalg.cg``.
atol: float, optional, default 0.0.
Convergence tolerance for ``scipy.sparse.linalg.cg``.
maxiter: int, default 500.
Maximum number of iterations for ``scipy.sparse.linalg.cg``.
Returns
-------
filled: UgridDataArray of floats
"""
grid = self.grid
da = self.obj
grid = self.grid
da = self.obj
ugrid_dim = grid.find_ugrid_dim(da)
if ugrid_dim == grid.edge_dimension:
raise ValueError("Laplace interpolation along edges is not allowed.")
connectivity = grid.get_connectivity_matrix(ugrid_dim, xy_weights=xy_weights)
da_filled = interpolate_na_helper(
da,
ugrid_dim,
func=laplace_interpolate,
kwargs={
"connectivity": connectivity,
"use_weights": xy_weights,
"direct_solve": direct_solve,
"delta": delta,
"relax": relax,
"rtol": rtol,
"atol": atol,
"maxiter": maxiter,
},
)
return UgridDataArray(da_filled, grid)
[docs]
def to_dataset(self, optional_attributes: bool = False):
"""
Convert this UgridDataArray or UgridDataset into a standard
xarray.Dataset.
The UGRID topology information is added as standard data variables.
Parameters
----------
optional_attributes: bool, default: False.
Whether to generate the UGRID optional attributes.
Returns
-------
dataset: UgridDataset
"""
return self.grid.to_dataset(self.obj, optional_attributes)