Source code for xugrid.regrid.regridder

"""This module is heavily inspired by xemsf.frontend.py"""

import abc
from typing import Callable, Optional, Tuple, Union

import numba
import numpy as np
import pandas as pd
import xarray as xr

import xugrid as xu

# dask as optional dependency
try:
    import dask.array

    DaskArray = dask.array.Array
except ImportError:
    DaskArray = ()

import xugrid
from xugrid.constants import FloatArray
from xugrid.core.sparse import MatrixCOO, MatrixCSR, row_slice
from xugrid.core.wrap import UgridDataArray
from xugrid.regrid import reduce
from xugrid.regrid.structured import StructuredGrid2d
from xugrid.regrid.unstructured import UnstructuredGrid2d


def make_regrid(func):
    """
    Use a closure to capture func, so numba can compile it efficiently without
    function call overhead.
    """
    f = numba.njit(func, inline="always")

    def _regrid(source: FloatArray, A: MatrixCSR, size: int):
        # Pre-allocate the output array
        n_extra = source.shape[0]
        out = np.full((n_extra, size), np.nan)
        # Pre-allocate workspace arrays. Every reduction algorithm should use
        # no more than the size of indices. Every thread gets it own workspace
        # row!
        n_work = np.diff(A.indptr).max()
        workspace = np.empty((n_extra, 2, n_work), dtype=np.float64)
        for extra_index in numba.prange(n_extra):
            source_flat = source[extra_index]
            for target_index in range(A.n):
                slice = row_slice(A, target_index)
                indices = A.indices[slice]
                weights = A.data[slice]

                # Copy the source data for this row to values.
                n_value = len(indices)
                values = workspace[extra_index, 0, :n_value]
                for i, index in enumerate(indices):
                    values[i] = source_flat[index]

                if len(indices) > 0:
                    out[extra_index, target_index] = f(
                        values, weights, workspace[extra_index, 1, :n_value]
                    )
        return out

    return numba.njit(_regrid, parallel=True, cache=True)


def setup_grid(obj, **kwargs):
    if isinstance(obj, (xu.Ugrid2d, xu.UgridDataArray, xu.UgridDataset)):
        return UnstructuredGrid2d(obj)
    elif isinstance(obj, (xr.DataArray, xr.Dataset)):
        return StructuredGrid2d(
            obj, name_y=kwargs.get("name_y", "y"), name_x=kwargs.get("name_x", "x")
        )
    else:
        raise TypeError()


def convert_to_match(source, target):
    PROMOTIONS = {
        frozenset({StructuredGrid2d}): StructuredGrid2d,
        frozenset({StructuredGrid2d, UnstructuredGrid2d}): UnstructuredGrid2d,
        frozenset({UnstructuredGrid2d, UnstructuredGrid2d}): UnstructuredGrid2d,
        #    {StructuredGrid3d, ExplicitStructuredGrid3d}: ExplicitStructuredGrid3d,
        #    {LayeredUnstructuredGrid2d, StructuredGrid2d}: StructuredGrid2d,
        #    {LayeredUnstructuredGrid2d, StructuredGrid2d}: StructuredGrid2d,
        #    {StructuredGrid3d, StructuredGrid2d}: StructuredGrid2d,
        #    # etc.
    }
    types = set({type(source), type(target)})
    matched_type = PROMOTIONS[frozenset(types)]
    return source.convert_to(matched_type), target.convert_to(matched_type)


class BaseRegridder(abc.ABC):
    _JIT_FUNCTIONS = {}

    def __init__(
        self,
        source: "xugrid.Ugrid2d",
        target: "xugrid.Ugrid2d",
    ):
        self._source = setup_grid(source)
        self._target = setup_grid(target)
        self._weights = None
        self._compute_weights(self._source, self._target)
        return

    @property
    @abc.abstractmethod
    def weights(self):
        pass

    @abc.abstractmethod
    def _compute_weights(self, source, target):
        pass

    def _setup_regrid(self, func) -> Callable:
        if isinstance(func, str):
            functions = self._JIT_FUNCTIONS
            try:
                self._regrid = functions[func]
            except KeyError as e:
                raise ValueError(
                    "Invalid regridding method. Available methods are: {}".format(
                        functions.keys()
                    )
                ) from e
        elif callable(func):
            self._regrid = make_regrid(func)
        else:
            raise TypeError(
                f"method must be string or callable, received: {type(func).__name}"
            )
        return

    def _regrid_array(self, source):
        source_grid = self._source
        first_dims_shape = source.shape[: -source_grid.ndim]

        # The regridding can be mapped over additional dimensions, e.g. for
        # every time slice. This is the `extra_index` iteration in _regrid().
        # But it should work consistently even if no additional present: in
        # that case we create a 1-sized additional dimension in front, so the
        # `extra_index` iteration always applies.
        if source.ndim == source_grid.ndim:
            source = source[np.newaxis]

        # All additional dimension are flattened into one, in front.
        # E.g.:
        #
        #   * ("dummy", "face") -> ("dummy", "face")
        #   * ("time", "layer", "y", "x") -> ("stacked_time_layer", "stacked_y_x")
        #   * ("time", "layer", "face") -> ("stacked_time_layer", "face")
        #
        # Source is always 2D after this step, sized: (n_extra, size).
        source = source.reshape((-1, source_grid.size))

        size = self._target.size

        if isinstance(source, DaskArray):
            # It's possible that the topology dimensions are chunked (e.g. from
            # reading multiple partitions). The regrid operation does not
            # support this, since we might need multiple source chunks for a
            # single target chunk, which destroys the 1:1 relation between
            # chunks. Here we ensure that the topology dimensions are contained
            # in a single contiguous chunk.
            contiguous_chunks = (source.chunks[0], (source.shape[-1],))
            source = source.rechunk(contiguous_chunks)
            chunks = source.chunks[:-1] + (self._target.size,)
            out = dask.array.map_blocks(
                self._regrid,  # func
                source,  # *args
                self._weights,  # *args
                size,  # *args
                dtype=np.float64,
                chunks=chunks,
                meta=np.array((), dtype=source.dtype),
            )
        elif isinstance(source, np.ndarray):
            out = self._regrid(source, self._weights, size)
        else:
            raise TypeError(
                "Expected dask.array.Array or numpy.ndarray. Received: "
                f"{type(source).__name__}"
            )
        # E.g.: sizes of ("time", "layer") + ("y", "x")
        out_shape = first_dims_shape + self._target.shape
        return out.reshape(out_shape)

    def regrid_dataarray(self, source: xr.DataArray, source_dims: Tuple[str]):
        # Do not set vectorize=True: numba will run the for loop more
        # efficiently, and guarantees a single large allocation.
        out = xr.apply_ufunc(
            self._regrid_array,
            source,
            input_core_dims=[source_dims],
            exclude_dims=set(source_dims),
            output_core_dims=[self._target.dims],
            dask="allowed",
            keep_attrs=True,
            output_dtypes=[source.dtype],
        )
        return out

    def regrid(self, object) -> UgridDataArray:
        """
        Regrid the data from a DataArray from its old grid topology to the new
        target topology.

        Automatically regrids over additional dimensions (e.g. time).

        Supports lazy evaluation for dask arrays inside the DataArray.

        Parameters
        ----------
        object: UgridDataArray or xarray.DataArray

        Returns
        -------
        regridded: UgridDataArray or xarray.DataArray
        """

        # FIXME: this should work:
        # source_dims = self._source.dims
        #
        # But it causes problems with initializing a regridder
        # from_dataset, because the name has been changed to
        # __source_nFace.
        if isinstance(object, UgridDataArray):
            obj = object.ugrid.obj
            source_dims = (object.ugrid.grid.face_dimension,)
        else:
            obj = object
            source_dims = ("y", "x")

        missing_dims = set(source_dims).difference(object.dims)
        if missing_dims:
            raise ValueError(
                f"object does not contain regridder source dimensions: {missing_dims}"
            )

        regridded = self.regrid_dataarray(obj, source_dims)

        if isinstance(self._target, StructuredGrid2d):
            regridded = regridded.assign_coords(coords=self._target.coords)
            return regridded
        else:
            return UgridDataArray(
                regridded,
                self._target.ugrid_topology,
            )

    def to_dataset(self) -> xr.Dataset:
        """Store the computed weights and target in a dataset for re-use."""
        weights_ds = xr.Dataset(
            {f"__regrid_{k}": v for k, v in zip(self._weights._fields, self._weights)}
        )
        source_ds = self._source.to_dataset("__source")
        target_ds = self._target.to_dataset("__target")
        return xr.merge((weights_ds, source_ds, target_ds))

    def weights_as_dataframe(self) -> pd.DataFrame:
        """
        Return the weights as a three column dataframe:

        * source index
        * target index
        * weight

        Returns
        -------
        weights: pd.DataFrame
        """
        matrix = self._weights
        if matrix is None:
            raise ValueError("Weights have not been computed yet.")
        if isinstance(matrix, MatrixCSR):
            matrix = matrix.to_coo()
        return pd.DataFrame(
            {
                "target_index": matrix.row,
                "source_index": matrix.col,
                "weight": matrix.data,
            }
        )

    @staticmethod
    def _csr_from_dataset(dataset: xr.Dataset) -> MatrixCSR:
        """
        Create a compressed sparse row matrix from the dataset variables.

        Variables n and nnz are expected to be scalar variables.
        """
        return MatrixCSR(
            dataset["__regrid_data"].to_numpy(),
            dataset["__regrid_indices"].to_numpy(),
            dataset["__regrid_indptr"].to_numpy(),
            dataset["__regrid_n"].item(),
            dataset["__regrid_m"].item(),
            dataset["__regrid_nnz"].item(),
        )

    @staticmethod
    def _coo_from_dataset(dataset: xr.Dataset) -> MatrixCOO:
        """
        Create a coordinate/triplet sparse row matrix from the dataset variables.

        Variables n and nnz are expected to be scalar variables.
        """
        return MatrixCOO(
            dataset["__regrid_data"].to_numpy(),
            dataset["__regrid_row"].to_numpy(),
            dataset["__regrid_col"].to_numpy(),
            dataset["__regrid_n"].item(),
            dataset["__regrid_m"].item(),
            dataset["__regrid_nnz"].item(),
        )

    @abc.abstractmethod
    def _weights_from_dataset(cls, dataset: xr.Dataset) -> Union[MatrixCOO, MatrixCSR]:
        """Return either COO or CSR weights."""

    @classmethod
    def from_weights(
        cls, weights, target: Union["xugrid.Ugrid2d", xr.DataArray, xr.Dataset]
    ):
        instance = cls.__new__(cls)
        instance._weights = cls._weights_from_dataset(weights)
        instance._target = setup_grid(target)
        unstructured = weights["__source_type"].attrs["type"] == "UnstructuredGrid2d"
        if unstructured:
            instance._source = setup_grid(xu.Ugrid2d.from_dataset(weights, "__source"))
        else:
            instance._source = setup_grid(
                weights, name_x="__source_x", name_y="__source_y"
            )
        return instance

    @classmethod
    def from_dataset(cls, dataset: xr.Dataset):
        """
        Reconstruct the regridder from a dataset with source, target indices
        and weights.
        """
        unstructured = dataset["__target_type"].attrs["type"] == "UnstructuredGrid2d"
        if unstructured:
            target = xu.Ugrid2d.from_dataset(dataset, "__target")

        # weights = cls._weights_from_dataset(dataset)
        return cls.from_weights(dataset, target)


[docs] class CentroidLocatorRegridder(BaseRegridder): """ The CentroidLocatorRegridded regrids by searching the source grid for the centroids of the target grid. If a centroid is exactly located on an edge between two faces, the value of either face may be used. Parameters ---------- source: Ugrid2d, UgridDataArray target: Ugrid2d, UgridDataArray weights: Optional[MatrixCOO] """ def _compute_weights(self, source, target): source, target = convert_to_match(source, target) source_index, target_index, weight_values = source.locate_centroids(target) self._weights = MatrixCOO.from_triplet( target_index, source_index, weight_values, n=target.size, m=source.size, ) return @staticmethod @numba.njit(parallel=True, cache=True) def _regrid(source: FloatArray, A: MatrixCOO, size: int): n_extra = source.shape[0] out = np.full((n_extra, size), np.nan) for extra_index in numba.prange(n_extra): source_flat = source[extra_index] for target_index, source_index in zip(A.row, A.col): out[extra_index, target_index] = source_flat[source_index] return out @property def weights(self): return self.to_dataset() @weights.setter def weights(self, weights: MatrixCOO, target: "xugrid.Ugrid2d"): if not isinstance(weights, MatrixCOO): raise TypeError(f"Expected MatrixCOO, received: {type(weights).__name__}") self._weights = weights return @classmethod def _weights_from_dataset(cls, dataset: xr.Dataset) -> MatrixCOO: return cls._coo_from_dataset(dataset)
class BaseOverlapRegridder(BaseRegridder, abc.ABC): def _compute_weights(self, source, target, relative: bool) -> None: source, target = convert_to_match(source, target) source_index, target_index, weight_values = source.overlap( target, relative=relative ) self._weights = MatrixCSR.from_triplet( target_index, source_index, weight_values, n=target.size, m=source.size ) return @property def weights(self): return self.to_dataset() @weights.setter def weights(self, weights: MatrixCSR): if not isinstance(weights, MatrixCSR): raise TypeError(f"Expected MatrixCSR, received: {type(weights).__name__}") self._weights = weights return @classmethod def _weights_from_dataset(cls, dataset: xr.Dataset) -> MatrixCOO: return cls._csr_from_dataset(dataset)
[docs] class OverlapRegridder(BaseOverlapRegridder): """ The OverlapRegridder regrids by computing which target faces overlap with which source faces. It stores the area of overlap, which can be used in multiple ways to aggregate the values associated with the source faces. Currently supported aggregation methods are: * ``"mean"`` * ``"harmonic_mean"`` * ``"geometric_mean"`` * ``"sum"`` * ``"minimum"`` * ``"maximum"`` * ``"mode"`` * ``"median"`` * ``"max_overlap"`` * percentiles 5, 10, 25, 50, 75, 90, 95: as ``"p5"``, ``"p10"``, etc. Custom aggregation functions are also supported, if they can be compiled by Numba. See the User Guide. Any percentile method can be created via: ``method = OverlapRegridder.create_percentile_methode(percentile)`` See the examples. Parameters ---------- source: Ugrid2d, UgridDataArray target: Ugrid2d, UgridDataArray method: str, function, optional Default value is ``"mean"``. Examples -------- Create an OverlapRegridder to regrid with mean: >>> regridder = OverlapRegridder(source_grid, target_grid, method="mean") >>> regridder.regrid(source_data) Setup a custom percentile method and apply it: >>> p33_3 = OverlapRegridder.create_percentile_method(33.3) >>> regridder = OverlapRegridder(source_grid, target_grid, method=p33_3) >>> regridder.regrid(source_data) """ _JIT_FUNCTIONS = { k: make_regrid(f) for k, f in reduce.ABSOLUTE_OVERLAP_METHODS.items() }
[docs] def __init__( self, source: UgridDataArray, target: UgridDataArray, method: Union[str, Callable] = "mean", ): super().__init__(source=source, target=target) self._setup_regrid(method)
def _compute_weights(self, source, target) -> None: super()._compute_weights(source, target, relative=False) @staticmethod def create_percentile_method(percentile: float) -> Callable: return reduce.create_percentile_method(percentile) @classmethod def from_weights( cls, weights: xr.Dataset, target: Union["xugrid.Ugrid2d", xr.DataArray, xr.Dataset], method: Union[str, Callable] = "mean", ): instance = super().from_weights(weights, target) instance._setup_regrid(method) return instance
[docs] class RelativeOverlapRegridder(BaseOverlapRegridder): """ The RelativeOverlapRegridder regrids by computing which target faces overlap with which source faces. It stores the area of overlap, which can be used in multiple ways to aggregate the values associated with the source faces. Unlike the OverlapRegridder, the intersection area is divided by the total area of the source face. This is required for e.g. first-order conserative regridding. Currently supported aggregation methods are: * ``"max_overlap"`` Custom aggregation functions are also supported, if they can be compiled by Numba. See the User Guide. Parameters ---------- source: Ugrid2d, UgridDataArray target: Ugrid2d, UgridDataArray method: str, function, optional Default value is "first_order_conservative". """ _JIT_FUNCTIONS = { k: make_regrid(f) for k, f in reduce.RELATIVE_OVERLAP_METHODS.items() }
[docs] def __init__( self, source: UgridDataArray, target: UgridDataArray, method: Union[str, Callable] = "first_order_conservative", ): super().__init__(source=source, target=target) self._setup_regrid(method)
def _compute_weights(self, source, target) -> None: super()._compute_weights(source, target, relative=True) @classmethod def from_weights( cls, weights: MatrixCSR, target: "xugrid.Ugrid2d", method: Union[str, Callable] = "first_order_conservative", ): instance = super().from_weights(weights, target) instance._setup_regrid(method) return instance
[docs] class BarycentricInterpolator(BaseRegridder): """ The BaryCentricInterpolator searches the centroid of every face of the target grid in the source grid. It finds by which source faces the centroid is surrounded (via its centroidal voronoi tesselation), and computes barycentric weights which can be used for to interpolate smoothly between the values associated with the source faces. Parameters ---------- source: Ugrid2d, UgridDataArray target: Ugrid2d, UgridDataArray """ _JIT_FUNCTIONS = {"mean": make_regrid(reduce.mean)}
[docs] def __init__( self, source: UgridDataArray, target: UgridDataArray, ): super().__init__(source, target) # Since the weights for a target face sum up to 1.0, a weight mean is # appropriate, and takes care of NaN values in the source data. self._setup_regrid("mean")
def _compute_weights(self, source, target): source, target = convert_to_match(source, target) if isinstance(source, StructuredGrid2d): source_index, target_index, weights = source.linear_weights(target) else: source_index, target_index, weights = source.barycentric(target) self._weights = MatrixCSR.from_triplet( target_index, source_index, weights, n=target.size, m=source.size ) return @property def weights(self): return self.to_dataset() @weights.setter def weights(self, weights: MatrixCSR): if not isinstance(weights, MatrixCSR): raise TypeError(f"Expected MatrixCSR, received: {type(weights).__name__}") self._weights = weights return @classmethod def from_weights(cls, weights: MatrixCSR, target: Optional["xugrid.Ugrid2d"]): instance = super().from_weights(weights, target) instance._setup_regrid("mean") return instance @classmethod def _weights_from_dataset(cls, dataset: xr.Dataset) -> MatrixCOO: return cls._csr_from_dataset(dataset)