Source code for imod.mf6.hfb

import abc
import json
import textwrap
import typing
from copy import deepcopy
from enum import Enum
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple

import cftime
import numpy as np
import numpy.typing as npt
import pandas as pd
import xarray as xr
import xugrid as xu
from fastcore.dispatch import typedispatch

from imod.logging import LogLevel, init_log_decorator, logger
from imod.mf6.boundary_condition import BoundaryCondition
from imod.mf6.interfaces.ilinedatapackage import ILineDataPackage
from imod.mf6.mf6_hfb_adapter import Mf6HorizontalFlowBarrier
from imod.mf6.package import Package
from imod.mf6.utilities.clip import (
    bounding_polygon_from_line_data_and_clip_box,
    clip_line_gdf_by_bounding_polygon,
    clip_line_gdf_by_grid,
)
from imod.mf6.utilities.grid import broadcast_to_full_domain
from imod.mf6.utilities.hfb import (
    _create_zlinestring_from_bound_df,
    _extract_hfb_bounds_from_zpolygons,
    _prepare_index_names,
)
from imod.mf6.validation_context import ValidationContext
from imod.schemata import EmptyIndexesSchema, MaxNUniqueValuesSchema, ValidationError
from imod.typing import GeoDataFrameType, GridDataArray, LineStringType
from imod.typing.grid import as_ugrid_dataarray, ones_like
from imod.util.imports import MissingOptionalModule

if TYPE_CHECKING:
    import geopandas as gpd
else:
    try:
        import geopandas as gpd
    except ImportError:
        gpd = MissingOptionalModule("geopandas")


if TYPE_CHECKING:
    import shapely
else:
    try:
        import shapely
    except ImportError:
        shapely = MissingOptionalModule("shapely")

NO_BARRIER_MSG = textwrap.dedent(
    """
    No barriers could be assigned to cell edges,
    this is caused by either one of the following:
    \t- Barriers fall completely outside the model grid
    \t- Barriers were assigned to the edge of the model domain
    \t- Barriers were assigned to edges of inactive cells
    """
)


@typedispatch
def _derive_connected_cell_ids(
    idomain: xr.DataArray, grid: xu.Ugrid2d, edge_index: np.ndarray
) -> xr.Dataset:
    """
    Derive the cell ids of the connected cells of an edge on a structured grid.

    Parameters
    ----------
    idomain: xr.DataArray
        The active domain
    grid :
        The unstructured grid of the domain
    edge_index :
        The indices of the edges from which the connected cell ids are computed

    Returns A dataset containing the cell_id1 and cell_id2 data variables. The  cell dimensions are stored in the
    cell_dims coordinates.
    -------

    """
    edge_faces = grid.edge_face_connectivity
    cell2d = edge_faces[edge_index]

    shape = (idomain["y"].size, idomain["x"].size)
    row_1, column_1 = np.unravel_index(cell2d[:, 0], shape)
    row_2, column_2 = np.unravel_index(cell2d[:, 1], shape)

    cell_ids = xr.Dataset()

    cell_ids["cell_id1"] = xr.DataArray(
        np.array([row_1 + 1, column_1 + 1]).T,
        coords={
            "edge_index": edge_index,
            "cell_dims1": ["row_1", "column_1"],
        },
    )

    cell_ids["cell_id2"] = xr.DataArray(
        np.array([row_2 + 1, column_2 + 1]).T,
        coords={
            "edge_index": edge_index,
            "cell_dims2": ["row_2", "column_2"],
        },
    )

    return cell_ids


@typedispatch  # type: ignore[no-redef]
def _derive_connected_cell_ids(
    _: xu.UgridDataArray, grid: xu.Ugrid2d, edge_index: np.ndarray
) -> xr.Dataset:
    """
    Derive the cell ids of the connected cells of an edge on an unstructured grid.

    Parameters
    ----------
    grid :
        The unstructured grid of the domain
    edge_index :
        The indices of the edges from which the connected cell ids are computed

    Returns A dataset containing the cell_id1 and cell_id2 data variables. The  cell dimensions are stored in the
    cell_dims coordinates.
    -------

    """
    edge_faces = grid.edge_face_connectivity
    cell2d = edge_faces[edge_index]

    cell2d_1 = cell2d[:, 0]
    cell2d_2 = cell2d[:, 1]

    cell_ids = xr.Dataset()

    cell_ids["cell_id1"] = xr.DataArray(
        np.array([cell2d_1 + 1]).T,
        coords={
            "edge_index": edge_index,
            "cell_dims1": ["cell2d_1"],
        },
    )

    cell_ids["cell_id2"] = xr.DataArray(
        np.array([cell2d_2 + 1]).T,
        coords={
            "edge_index": edge_index,
            "cell_dims2": ["cell2d_2"],
        },
    )

    return cell_ids


def to_connected_cells_dataset(
    idomain: GridDataArray,
    grid: xu.Ugrid2d,
    edge_index: np.ndarray,
    edge_values: dict,
) -> xr.Dataset:
    """
    Converts a cell edge grid with values defined on the edges to a dataset with the cell ids of the connected cells,
    the layer of the cells and the value of the edge. The connected cells are returned in cellid notation e.g.(row,
    column) for structured grid, (mesh2d_nFaces) for unstructured grids.

    Parameters
    ----------
    idomain: GridDataArray
        active domain
    grid: xu.Ugrid2d,
        unstructured grid containing the edge to cell array
    edge_index: np.ndarray
        indices of the edges for which the edge values will be converted to values in the connected cells
    edge_values: dict
        dictionary containing the value name and the edge values that are applied on the edges identified by the
        edge_index

    Returns
    ----------
        a dataset containing:
            - cell_id1
            - cell_id2
            - layer
            - value name

    """
    barrier_dataset = _derive_connected_cell_ids(idomain, grid, edge_index)

    for name, values in edge_values.items():
        barrier_dataset[name] = xr.DataArray(
            values,
            dims=["layer", "edge_index"],
            coords={
                "edge_index": edge_index,
                "layer": values.coords["layer"],
            },
        )

    barrier_dataset = barrier_dataset.stack(
        cell_id=("layer", "edge_index"), create_index=True
    )

    return barrier_dataset.dropna("cell_id")


def _make_linestring_from_polygon(
    dataframe: GeoDataFrameType,
) -> List[LineStringType]:
    """
    Make linestring from a polygon with one axis in the vertical dimension (z),
    and one axis in the horizontal plane (x & y dimension).
    """
    coordinates, index = shapely.get_coordinates(dataframe.geometry, return_index=True)
    df = pd.DataFrame(
        {"polygon_index": index, "x": coordinates[:, 0], "y": coordinates[:, 1]}
    )
    df = df.drop_duplicates().reset_index(drop=True)
    df = df.set_index("polygon_index")

    linestrings = [
        shapely.LineString(gb.values) for _, gb in df.groupby("polygon_index")
    ]

    return linestrings


def _select_dataframe_with_snapped_line_index(
    snapped_dataset: xr.Dataset, edge_index: np.ndarray, dataframe: GeoDataFrameType
):
    """
    Select dataframe rows with line indices of snapped edges. Usually, the
    broadcasting results in a larger dataframe where individual rows of input
    dataframe are repeated for multiple edges.
    """
    line_index = snapped_dataset["line_index"].values
    line_index = line_index[edge_index].astype(int)
    return dataframe.iloc[line_index]


def _extract_mean_hfb_bounds_from_dataframe(
    dataframe: GeoDataFrameType,
) -> Tuple[pd.Series, pd.Series]:
    """
    Extract hfb bounds from dataframe. Requires dataframe geometry to be of type
    shapely "Z Polygon".

    For the upper z bounds, function takes the average of the depth of the two
    upper nodes. The same holds for the lower z bounds, but then with the two
    lower nodes.

    As a visual representation, this happens for each z bound:

    .                 .
     \         >>>
      \   .    >>>     ---  .
       \ /     >>>         -
        .                 .

    """
    dataframe = _prepare_index_names(dataframe)

    if not dataframe.geometry.has_z.all():
        raise TypeError("GeoDataFrame geometry has no z, which is required.")

    lower, upper = _extract_hfb_bounds_from_zpolygons(dataframe)
    # Compute means inbetween nodes.
    index_names = lower.index.names
    lower_mean = lower.groupby(index_names)["z"].mean()
    upper_mean = upper.groupby(index_names)["z"].mean()

    # Assign to dataframe to map means to right index.
    df_to_broadcast = dataframe.copy()
    df_to_broadcast["lower"] = lower_mean
    df_to_broadcast["upper"] = upper_mean

    return df_to_broadcast["lower"], df_to_broadcast["upper"]


def _fraction_layer_overlap(
    snapped_dataset: xu.UgridDataset,
    edge_index: np.ndarray,
    dataframe: GeoDataFrameType,
    top: xu.UgridDataArray,
    bottom: xu.UgridDataArray,
) -> xr.DataArray:
    """
    Computes the fraction a barrier occupies inside a layer.
    """
    left, right = snapped_dataset.ugrid.grid.edge_face_connectivity[edge_index].T
    top_mean = _mean_left_and_right(top, left, right)
    bottom_mean = _mean_left_and_right(bottom, left, right)

    n_layer, n_edge = top_mean.shape
    layer_bounds = np.empty((n_edge, n_layer, 2), dtype=float)
    layer_bounds[..., 0] = typing.cast(np.ndarray, bottom_mean.values).T
    layer_bounds[..., 1] = typing.cast(np.ndarray, top_mean.values).T

    zmin, zmax = _extract_mean_hfb_bounds_from_dataframe(dataframe)
    hfb_bounds = np.empty((n_edge, n_layer, 2), dtype=float)
    hfb_bounds[..., 0] = zmin.values[:, np.newaxis]
    hfb_bounds[..., 1] = zmax.values[:, np.newaxis]

    overlap = _vectorized_overlap(hfb_bounds, layer_bounds)
    height = layer_bounds[..., 1] - layer_bounds[..., 0]
    # Avoid runtime warnings when diving by 0:
    height[height <= 0] = np.nan
    fraction = (overlap / height).T

    return xr.ones_like(top_mean) * fraction


def _mean_left_and_right(
    cell_values: xu.UgridDataArray, left: np.ndarray, right: np.ndarray
) -> xr.Dataset:
    """
    This method computes the mean value of cell pairs. The left array specifies the first cell, the right array
    the second cells. The mean is computed by (first_cell+second_cell/2.0)

    Parameters
    ----------
    cell_values: xu.UgridDataArray
        The array containing the data values of all the cells
    left :
        The array containing indices to the first cells
    right :
        The array containing indices to the second cells

    Returns
    -------
        The means of the cells

    """
    facedim = cell_values.ugrid.grid.face_dimension
    uda_left = cell_values.ugrid.obj.isel({facedim: left}).drop_vars(facedim)
    uda_right = cell_values.ugrid.obj.isel({facedim: right}).drop_vars(facedim)

    return xr.concat((uda_left, uda_right), dim="two").mean("two")


def _vectorized_overlap(bounds_a: np.ndarray, bounds_b: np.ndarray) -> np.ndarray:
    """
    Vectorized overlap computation. Returns the overlap of 2 vectors along the same axis.
    If there is no overlap zero will be returned.

            b1

      a1    |
      ▲     |
      |     |
      |     ▼
      ▼     b0
      a0

    To compute the overlap of the 2 vectors the maximum of a0,b0, is subtracted from the minimum of a1,b1.

    Compare with:

    overlap = max(0, min(a[1], b[1]) - max(a[0], b[0]))
    """
    return np.maximum(
        0.0,
        np.minimum(bounds_a[..., 1], bounds_b[..., 1])
        - np.maximum(bounds_a[..., 0], bounds_b[..., 0]),
    )


def _prepare_barrier_dataset_for_mf6_adapter(dataset: xr.Dataset) -> xr.Dataset:
    """
    Prepare barrier dataset for the initialization of Mf6HorizontalFlowBarrier.
    The dataset is expected to have "edge_index" and "layer" coordinates and a
    multi-index "cell_id" coordinate. The dataset contains as variables:
    "cell_id1", "cell_id2", and "hydraulic_characteristic".

    - Reset coords to get a coordless "cell_id" dimension instead of a multi-index coord
    - Assign "layer" as variable to dataset instead of as coord.
    """
    # Store layer to work around multiindex issue where dropping the edge_index
    # removes the layer as well.
    layer = dataset.coords["layer"].values
    # Drop leftover coordinate and reset cell_id.
    dataset = dataset.drop_vars("edge_index").reset_coords()
    # Attach layer again
    dataset["layer"] = ("cell_id", layer)
    return dataset


def _snap_to_grid_and_aggregate(
    barrier_dataframe: GeoDataFrameType, grid2d: xu.Ugrid2d, vardict_agg: dict[str, str]
) -> tuple[xu.UgridDataset, npt.NDArray]:
    """
    Snap barrier dataframe to grid and aggregate multiple lines with a list of
    methods per variable.

    Parameters
    ----------
    barrier_dataframe: geopandas.GeoDataFrame
        GeoDataFrame with barriers, should have variable "line_index".
    grid2d: xugrid.Ugrid2d
        Grid to snap lines to
    vardict_agg: dict
        Mapping of variable name to aggregation method

    Returns
    -------
    snapping_dataset: xugrid.UgridDataset
        Dataset with all variables snapped and aggregated to cell edges
    edge_index: numpy.array
        1D array with indices of cell edges that lines were snapped to
    """
    snapping_df = xu.create_snap_to_grid_dataframe(
        barrier_dataframe, grid2d, max_snap_distance=0.5
    )
    # Map other variables to snapping_df with line indices
    line_index = snapping_df["line_index"]
    vars_to_snap = list(vardict_agg.keys())
    if "line_index" in vars_to_snap:
        vars_to_snap.remove("line_index")  # snapping_df already has line_index
    for varname in vars_to_snap:
        snapping_df[varname] = barrier_dataframe[varname].iloc[line_index].to_numpy()
    # Aggregate to grid edges
    gb_edge = snapping_df.groupby("edge_index")
    # Initialize dataset and dataarray with the right shape and dims
    snapped_dataset = xu.UgridDataset(grids=[grid2d])
    new = xr.DataArray(np.full(grid2d.n_edge, np.nan), dims=[grid2d.edge_dimension])
    edge_index = np.array(list(gb_edge.indices.keys()))
    # Aggregate with different methods per variable
    for varname, method in vardict_agg.items():
        snapped_dataset[varname] = new.copy()
        snapped_dataset[varname].data[edge_index] = gb_edge[varname].aggregate(method)

    return snapped_dataset, edge_index


class BarrierType(Enum):
    HydraulicCharacteristic = 0
    Multiplier = 1
    Resistance = 2


class HorizontalFlowBarrierBase(BoundaryCondition, ILineDataPackage):
    _pkg_id = "hfb"

    _period_data = ()
    _init_schemata = {}
    _write_schemata = {"geometry": [EmptyIndexesSchema()]}

    def __init__(
        self,
        geometry: "gpd.GeoDataFrame",
        print_input: bool = False,
    ) -> None:
        dict_dataset = {"print_input": print_input}
        super().__init__(dict_dataset)
        self.line_data = geometry

    def __deepcopy__(self, memo):
        # There is an issue with deepcopy and the GeoDataFrame geometry.
        # The GeoDataFrame is stored in the dataset as an ExtensionArray.
        # For some reason this object causes an infinite recursion when being deep copied.
        # Therefor in this method the geometry is copied separately to the cloned object

        ds_vars = list(self.dataset.keys())
        for var in self._get_variable_names_for_gdf():
            ds_vars.remove(var)

        cls = type(self)
        new = cls._from_dataset(deepcopy(self.dataset[ds_vars]))
        new.line_data = gpd.GeoDataFrame(
            deepcopy(self.dataset[self._get_variable_names_for_gdf()].to_dataframe()),
            geometry="geometry",
        )

        return new

    def _get_variable_names_for_gdf(self) -> list[str]:
        return [
            self._get_variable_name(),
            "geometry",
        ] + self._get_vertical_variables()

    @property
    def line_data(self) -> GeoDataFrameType:
        variables_for_gdf = self._get_variable_names_for_gdf()
        return gpd.GeoDataFrame(
            self.dataset[variables_for_gdf].to_dataframe(),
            geometry="geometry",
        )

    @line_data.setter
    def line_data(self, value: GeoDataFrameType) -> None:
        variables_for_gdf = self._get_variable_names_for_gdf()
        self.dataset = self.dataset.merge(
            value.to_xarray(), overwrite_vars=variables_for_gdf, join="right"
        )

    def render(self, directory, pkgname, globaltimes, binary):
        raise NotImplementedError(
            f"""{self.__class__.__name__} is a grid-agnostic package and does not have a render method. To render the
            package, first convert to a Modflow6 package by calling pkg.to_mf6_pkg()"""
        )

    def to_netcdf(
        self, *args, mdal_compliant: bool = False, crs: Optional[Any] = None, **kwargs
    ):
        kwargs.update({"encoding": self._netcdf_encoding()})

        new = deepcopy(self)
        new.dataset["geometry"] = new.line_data.to_json()
        new.dataset.to_netcdf(*args, **kwargs)

    def _netcdf_encoding(self):
        return {"geometry": {"dtype": "str"}}

    @classmethod
    def from_file(cls, path, **kwargs):
        instance = super().from_file(path, **kwargs)
        geometry = json.loads(instance.dataset["geometry"].values.item())
        instance.line_data = gpd.GeoDataFrame.from_features(geometry)

        return instance

    def _compute_barrier_values(
        self, snapped_dataset, edge_index, idomain, top, bottom, k
    ):
        raise NotImplementedError()

    def _to_connected_cells_dataset(
        self,
        idomain: GridDataArray,
        top: GridDataArray,
        bottom: GridDataArray,
        k: GridDataArray,
        strict_hfb_validation: bool = True,
    ) -> xr.Dataset:
        """
        Method does the following
        - forces input grids to unstructured
        - snaps lines to cell edges
        - remove edge values connected to cell edges
        - compute barrier values
        - remove edge values to inactive cells
        - finds connected cells in dataset

        Returns
        -------
        dataset with connected cells, containing:
            - cell_id1
            - cell_id2
            - layer
            - value name
        """
        top, bottom = broadcast_to_full_domain(idomain, top, bottom)
        k = idomain * k
        # Enforce unstructured
        unstructured_grid, top, bottom, k = (
            as_ugrid_dataarray(grid) for grid in [idomain, top, bottom, k]
        )
        snapped_dataset, edge_index = self._snap_to_grid(idomain)
        edge_index = self.__remove_invalid_edges(unstructured_grid, edge_index)

        barrier_values = self._compute_barrier_values(
            snapped_dataset, edge_index, idomain, top, bottom, k
        )
        barrier_values = self.__remove_edge_values_connected_to_inactive_cells(
            barrier_values, unstructured_grid, edge_index
        )

        if (barrier_values.size == 0) | np.isnan(barrier_values).all():
            loglevel = LogLevel.ERROR if strict_hfb_validation else LogLevel.WARNING
            logger.log(
                loglevel=loglevel,
                message=NO_BARRIER_MSG,
                additional_depth=0,
            )
            if strict_hfb_validation:
                raise ValidationError(NO_BARRIER_MSG)

        return to_connected_cells_dataset(
            idomain,
            unstructured_grid.ugrid.grid,
            edge_index,
            {
                "hydraulic_characteristic": self.__to_hydraulic_characteristic(
                    barrier_values
                )
            },
        )

    def _barrier_dataset_to_mf6_pkg(
        self, barrier_dataset: xr.Dataset
    ) -> Mf6HorizontalFlowBarrier:
        """
        Internal method, which does the following
        - final coordinate cleanup of barrier dataset
        - adds missing options to dataset

        Parameters
        ----------
        barrier_dataset: xr.Dataset
            Xarray dataset with dimensions "cell_dims1", "cell_dims2", "cell_id".
            Additional coordinates should be "layer" and "edge_index".

        Returns
        -------
        Mf6HorizontalFlowBarrier
        """
        barrier_dataset["print_input"] = self.dataset["print_input"]
        barrier_dataset = _prepare_barrier_dataset_for_mf6_adapter(barrier_dataset)
        return Mf6HorizontalFlowBarrier(**barrier_dataset.data_vars)

    def _to_mf6_pkg(
        self,
        idomain: GridDataArray,
        top: GridDataArray,
        bottom: GridDataArray,
        k: GridDataArray,
        validation_context: ValidationContext,
    ) -> Mf6HorizontalFlowBarrier:
        barrier_dataset = self._to_connected_cells_dataset(
            idomain, top, bottom, k, validation_context.strict_hfb_validation
        )
        return self._barrier_dataset_to_mf6_pkg(barrier_dataset)

    def to_mf6_pkg(
        self,
        idomain: GridDataArray,
        top: GridDataArray,
        bottom: GridDataArray,
        k: GridDataArray,
        validate=True,
        strict_hfb_validation=True,
    ) -> Mf6HorizontalFlowBarrier:
        """
        Write package to Modflow 6 package.

        Based on the model grid, top and bottoms, the layers in which the barrier belong are computed. If the
        barrier only partially occupies a layer an effective resistance or hydraulic conductivity for that layer is
        calculated. This calculation is skipped for the Multiplier type.

        Parameters
        ----------
        idomain: GridDataArray
             Grid with active cells.
        top: GridDataArray
            Grid with top of model layers.
        bottom: GridDataArray
            Grid with bottom of model layers.
        k: GridDataArray
            Grid with hydraulic conductivities.
        validate: bool
            Validate HorizontalFlowBarrier upon calling this method.
        strict_hfb_validation: bool
            Turn on strict horizontal flow barrier validation.

        Returns
        -------
        Mf6HorizontalFlowBarrier
            Low level representation of the HFB package as MODFLOW 6 expects it.
        """
        validation_context = ValidationContext(
            validate=validate, strict_hfb_validation=strict_hfb_validation
        )

        return self._to_mf6_pkg(idomain, top, bottom, k, validation_context)

    def is_empty(self) -> bool:
        if super().is_empty():
            return True

        linestrings = self.dataset["geometry"]
        only_empty_lines = all(ls.is_empty for ls in linestrings.values.ravel())
        return only_empty_lines

    def _resistance_layer(
        self,
        snapped_dataset: xu.UgridDataset,
        edge_index: np.ndarray,
        idomain: xu.UgridDataArray,
    ) -> xr.DataArray:
        """
        Returns layered xarray with barrier resistance distributed over layers
        """
        hfb_resistance = snapped_dataset[self._get_variable_name()].values[edge_index]
        hfb_layer = snapped_dataset["layer"].values[edge_index]
        nlay = idomain.layer.size
        model_layer = np.repeat(idomain.layer.values, hfb_resistance.size).reshape(
            (nlay, hfb_resistance.size)
        )
        data = np.where(model_layer == hfb_layer, hfb_resistance, np.nan)
        return xr.DataArray(
            data=data,
            coords={
                "layer": np.arange(nlay) + 1,
            },
            dims=["layer", "mesh2d_nFaces"],
        )

    def _resistance_layer_overlap(
        self,
        snapped_dataset: xu.UgridDataset,
        edge_index: np.ndarray,
        top: xu.UgridDataArray,
        bottom: xu.UgridDataArray,
        k: xu.UgridDataArray,
    ) -> xr.DataArray:
        """
        Computes the effective value of a barrier that partially overlaps a cell in the z direction.
        A barrier always lies on an edge in the xy-plane, however in doesn't have to fully extend in the z-direction to
        cover the entire layer. This method computes the effective resistance in that case.

                        Barrier
        ......................................  ▲     ▲
        .                @@@@                .  |     |
        .                @Rb@                .  | Lb  |
        .    Cell1       @@@@     Cell2      .  ▼     | H
        .                :  :                .        |
        .                :  :                .        |
        .................:..:.................        ▼
                k1                    k2

        The effective value of a partially emerged barrier in a layer is computed by:
        c_total = 1.0 / (fraction / Rb + (1.0 - fraction) / c_aquifer)
        c_aquifer = 1.0 / k_mean = 1.0 / ((k1 + k2) / 2.0)
        fraction = Lb / H

        """
        left, right = snapped_dataset.ugrid.grid.edge_face_connectivity[edge_index].T
        k_mean = _mean_left_and_right(k, left, right)

        resistance = self.__to_resistance(
            snapped_dataset[self._get_variable_name()]
        ).values[edge_index]

        dataframe = _select_dataframe_with_snapped_line_index(
            snapped_dataset, edge_index, self.line_data
        )
        fraction = _fraction_layer_overlap(
            snapped_dataset, edge_index, dataframe, top, bottom
        )

        c_aquifer = 1.0 / k_mean
        inverse_c = (fraction / resistance) + ((1.0 - fraction) / c_aquifer)
        c_total = 1.0 / inverse_c

        return self.__from_resistance(c_total)

    def __to_resistance(self, value: xu.UgridDataArray) -> xu.UgridDataArray:
        match self._get_barrier_type():
            case BarrierType.HydraulicCharacteristic:
                return 1.0 / value
            case BarrierType.Multiplier:
                return -1.0 / value
            case BarrierType.Resistance:
                return value

        raise ValueError(r"Unknown barrier type {barrier_type}")

    def __from_resistance(self, resistance: xr.DataArray) -> xr.DataArray:
        match self._get_barrier_type():
            case BarrierType.HydraulicCharacteristic:
                return 1.0 / resistance
            case BarrierType.Multiplier:
                return -1.0 / resistance
            case BarrierType.Resistance:
                return resistance

        raise ValueError(r"Unknown barrier type {barrier_type}")

    def __to_hydraulic_characteristic(self, value: xr.DataArray) -> xr.DataArray:
        match self._get_barrier_type():
            case BarrierType.HydraulicCharacteristic:
                return value
            case BarrierType.Multiplier:
                return -1.0 * value
            case BarrierType.Resistance:
                return 1.0 / value

        raise ValueError(r"Unknown barrier type {barrier_type}")

    @abc.abstractmethod
    def _get_barrier_type(self) -> BarrierType:
        raise NotImplementedError

    @abc.abstractmethod
    def _get_variable_name(self) -> str:
        raise NotImplementedError

    @abc.abstractmethod
    def _get_vertical_variables(self) -> list:
        raise NotImplementedError

    def clip_box(
        self,
        time_min: Optional[cftime.datetime | np.datetime64 | str] = None,
        time_max: Optional[cftime.datetime | np.datetime64 | str] = None,
        layer_min: Optional[int] = None,
        layer_max: Optional[int] = None,
        x_min: Optional[float] = None,
        x_max: Optional[float] = None,
        y_min: Optional[float] = None,
        y_max: Optional[float] = None,
        top: Optional[GridDataArray] = None,
        bottom: Optional[GridDataArray] = None,
    ) -> "HorizontalFlowBarrierBase":
        """
        Clip a package by a bounding box (time, layer, y, x).

        Slicing intervals may be half-bounded, by providing None:

        * To select 500.0 <= x <= 1000.0:
          ``clip_box(x_min=500.0, x_max=1000.0)``.
        * To select x <= 1000.0: ``clip_box(x_min=None, x_max=1000.0)``
          or ``clip_box(x_max=1000.0)``.
        * To select x >= 500.0: ``clip_box(x_min = 500.0, x_max=None.0)``
          or ``clip_box(x_min=1000.0)``.

        Parameters
        ----------
        time_min: optional
        time_max: optional
        layer_min: optional, int
        layer_max: optional, int
        x_min: optional, float
        x_max: optional, float
        y_min: optional, float
        y_max: optional, float
        top: optional, GridDataArray
        bottom: optional, GridDataArray

        Returns
        -------
        sliced : Package
        """
        new = deepcopy(self)

        if x_min or x_max or y_min or y_max:
            # Create bounding polygon
            bounding_gdf = bounding_polygon_from_line_data_and_clip_box(
                self.line_data, x_min, x_max, y_min, y_max
            )
            new.line_data = clip_line_gdf_by_bounding_polygon(
                self.line_data, bounding_gdf
            )
        return new

    def mask(self, _) -> Package:
        """
        The mask method is irrelevant for this package as it is
        grid-agnostic, instead this method retuns a copy of itself.
        """
        return deepcopy(self)

    def _snap_to_grid(
        self, idomain: GridDataArray
    ) -> Tuple[xu.UgridDataset, np.ndarray]:
        variable_name = self._get_variable_name()
        has_layer = "layer" in self._get_vertical_variables()
        # Create geodataframe with barriers
        if has_layer:
            varnames_for_df = [variable_name, "geometry", "layer"]
        else:
            varnames_for_df = [variable_name, "geometry"]
        barrier_dataframe = gpd.GeoDataFrame(
            self.dataset[varnames_for_df].to_dataframe()
        )
        # Convert vertical polygon to linestring
        if not has_layer:
            lower, _ = _extract_hfb_bounds_from_zpolygons(barrier_dataframe)
            linestring = _create_zlinestring_from_bound_df(lower)
            barrier_dataframe["geometry"] = linestring["geometry"]
        # Clip barriers outside domain
        active = ones_like(idomain.sel(layer=1, drop=True))
        barrier_dataframe = clip_line_gdf_by_grid(barrier_dataframe, active)
        # Prepare variable names and methods for aggregation
        vardict_agg = {"line_index": "first", variable_name: "sum"}
        if has_layer:
            vardict_agg["layer"] = "first"
        # Create grid from structured
        grid2d = as_ugrid_dataarray(active).grid

        return _snap_to_grid_and_aggregate(barrier_dataframe, grid2d, vardict_agg)

    @staticmethod
    def __remove_invalid_edges(
        unstructured_grid: xu.UgridDataArray, edge_index: np.ndarray
    ) -> np.ndarray:
        """
        Remove invalid edges indices. An edge is considered invalid when:
        - it lies on an exterior boundary (face_connectivity equals the grid fill value)
        - The corresponding connected cells are inactive
        """
        grid = unstructured_grid.ugrid.grid
        face_dimension = unstructured_grid.ugrid.grid.face_dimension
        face_connectivity = grid.edge_face_connectivity[edge_index]

        valid_edges = np.all(face_connectivity != grid.fill_value, axis=1)

        connected_cells = -np.ones((len(edge_index), 2))
        connected_cells[valid_edges, 0] = (
            unstructured_grid.sel(layer=1)
            .loc[{face_dimension: face_connectivity[valid_edges, 0]}]
            .values
        )
        connected_cells[valid_edges, 1] = (
            unstructured_grid.sel(layer=1)
            .loc[{face_dimension: face_connectivity[valid_edges, 1]}]
            .values
        )

        valid = (connected_cells > 0).all(axis=1)

        return edge_index[valid]

    @staticmethod
    def __remove_edge_values_connected_to_inactive_cells(
        values, unstructured_grid: xu.UgridDataArray, edge_index: np.ndarray
    ):
        face_dimension = unstructured_grid.ugrid.grid.face_dimension

        face_connectivity = unstructured_grid.ugrid.grid.edge_face_connectivity[
            edge_index
        ]
        connected_cells_left = unstructured_grid.loc[
            {face_dimension: face_connectivity[:, 0]}
        ]
        connected_cells_right = unstructured_grid.loc[
            {face_dimension: face_connectivity[:, 1]}
        ]

        return values.where(
            (connected_cells_left.drop(face_dimension) > 0)
            & (connected_cells_right.drop(face_dimension) > 0)
        )


[docs] class HorizontalFlowBarrierHydraulicCharacteristic(HorizontalFlowBarrierBase): """ Horizontal Flow Barrier (HFB) package Input to the Horizontal Flow Barrier (HFB) Package is read from the file that has type "HFB6" in the Name File. Only one HFB Package can be specified for a GWF model. https://water.usgs.gov/water-resources/software/MODFLOW-6/mf6io_6.2.2.pdf Parameters ---------- geometry: gpd.GeoDataFrame Dataframe that describes: - geometry: the geometries of the barriers, - hydraulic_characteristic: the hydraulic characteristic of the barriers print_input: bool Examples -------- >>> x = [-10.0, 0.0, 10.0] >>> y = [10.0, 0.0, -10.0] >>> ztop = [10.0, 20.0, 15.0] >>> zbot = [-10.0, -20.0, 0.0] >>> polygons = linestring_to_trapezoid_zpolygons(x, y, ztop, zbot) >>> barrier_gdf = gpd.GeoDataFrame( >>> geometry=polygons, >>> data={ >>> "resistance": [1e-3, 1e-3], >>> }, >>> ) >>> hfb = imod.mf6.HorizontalFlowBarrierHydraulicCharacteristic(barrier_gdf) """
[docs] @init_log_decorator() def __init__( self, geometry: "gpd.GeoDataFrame", print_input=False, ): super().__init__(geometry, print_input)
def _get_barrier_type(self): return BarrierType.HydraulicCharacteristic def _get_variable_name(self) -> str: return "hydraulic_characteristic" def _get_vertical_variables(self) -> list: return [] def _compute_barrier_values( self, snapped_dataset, edge_index, idomain, top, bottom, k ): barrier_values = self._resistance_layer_overlap( snapped_dataset, edge_index, top, bottom, k ) return barrier_values
[docs] class SingleLayerHorizontalFlowBarrierHydraulicCharacteristic( HorizontalFlowBarrierBase ): """ Horizontal Flow Barrier (HFB) package Input to the Horizontal Flow Barrier (HFB) Package is read from the file that has type "HFB6" in the Name File. Only one HFB Package can be specified for a GWF model. https://water.usgs.gov/water-resources/software/MODFLOW-6/mf6io_6.2.2.pdf Parameters ---------- geometry: gpd.GeoDataFrame Dataframe that describes: - geometry: the geometries of the barriers, - hydraulic_characteristic: the hydraulic characteristic of the barriers - layer: model layer for the barrier, only 1 single layer can be entered. print_input: bool Examples -------- >>> barrier_x = [-1000.0, 0.0, 1000.0] >>> barrier_y = [500.0, 250.0, 500.0] >>> barrier_gdf = gpd.GeoDataFrame( >>> geometry=[shapely.linestrings(barrier_x, barrier_y),], >>> data={ >>> "hydraulic_characteristic": [1e-3,], >>> "layer": [1,] >>> }, >>> ) >>> hfb = imod.mf6.SingleLayerHorizontalFlowBarrierHydraulicCharacteristic(barrier_gdf) """ _write_schemata = { "geometry": [EmptyIndexesSchema()], "layer": [MaxNUniqueValuesSchema(1)], }
[docs] @init_log_decorator() def __init__( self, geometry: "gpd.GeoDataFrame", print_input=False, ): super().__init__(geometry, print_input)
def _get_barrier_type(self): return BarrierType.HydraulicCharacteristic def _get_variable_name(self) -> str: return "hydraulic_characteristic" def _get_vertical_variables(self) -> list: return ["layer"] def _compute_barrier_values( self, snapped_dataset, edge_index, idomain, top, bottom, k ): barrier_values = self._resistance_layer( snapped_dataset, edge_index, idomain, ) return barrier_values
[docs] class HorizontalFlowBarrierMultiplier(HorizontalFlowBarrierBase): """ Horizontal Flow Barrier (HFB) package Input to the Horizontal Flow Barrier (HFB) Package is read from the file that has type "HFB6" in the Name File. Only one HFB Package can be specified for a GWF model. https://water.usgs.gov/water-resources/software/MODFLOW-6/mf6io_6.2.2.pdf If parts of the barrier overlap a layer the multiplier is applied to the entire layer. Parameters ---------- geometry: gpd.GeoDataFrame Dataframe that describes: - geometry: the geometries of the barriers, - multiplier: the multiplier of the barriers print_input: bool Examples -------- >>> x = [-10.0, 0.0, 10.0] >>> y = [10.0, 0.0, -10.0] >>> ztop = [10.0, 20.0, 15.0] >>> zbot = [-10.0, -20.0, 0.0] >>> polygons = linestring_to_trapezoid_zpolygons(x, y, ztop, zbot) >>> barrier_gdf = gpd.GeoDataFrame( >>> geometry=polygons, >>> data={ >>> "multiplier": [10.0, 10.0], >>> }, >>> ) >>> hfb = imod.mf6.HorizontalFlowBarrierMultiplier(barrier_gdf) """
[docs] @init_log_decorator() def __init__( self, geometry: "gpd.GeoDataFrame", print_input=False, ): super().__init__(geometry, print_input)
def _get_barrier_type(self): return BarrierType.Multiplier def _get_variable_name(self) -> str: return "multiplier" def _get_vertical_variables(self) -> list: return [] def _compute_barrier_values( self, snapped_dataset, edge_index, idomain, top, bottom, k ): dataframe = _select_dataframe_with_snapped_line_index( snapped_dataset, edge_index, self.line_data ) fraction = _fraction_layer_overlap( snapped_dataset, edge_index, dataframe, top, bottom ) barrier_values = ( fraction.where(fraction) * snapped_dataset[self._get_variable_name()].values[edge_index] ) return barrier_values
[docs] class SingleLayerHorizontalFlowBarrierMultiplier(HorizontalFlowBarrierBase): """ Horizontal Flow Barrier (HFB) package Input to the Horizontal Flow Barrier (HFB) Package is read from the file that has type "HFB6" in the Name File. Only one HFB Package can be specified for a GWF model. https://water.usgs.gov/water-resources/software/MODFLOW-6/mf6io_6.2.2.pdf Parameters ---------- geometry: gpd.GeoDataFrame Dataframe that describes: - geometry: the geometries of the barriers, - multiplier: the multiplier of the barriers - layer: model layer for the barrier, only 1 single layer can be entered. print_input: bool Examples -------- >>> barrier_x = [-1000.0, 0.0, 1000.0] >>> barrier_y = [500.0, 250.0, 500.0] >>> barrier_gdf = gpd.GeoDataFrame( >>> geometry=[shapely.linestrings(barrier_x, barrier_y),], >>> data={ >>> "multiplier": [1.5,], >>> "layer": [1,], >>> }, >>> ) >>> hfb = imod.mf6.SingleLayerHorizontalFlowBarrierMultiplier(barrier_gdf) """ _write_schemata = { "geometry": [EmptyIndexesSchema()], "layer": [MaxNUniqueValuesSchema(1)], }
[docs] @init_log_decorator() def __init__( self, geometry: "gpd.GeoDataFrame", print_input=False, ): super().__init__(geometry, print_input)
def _get_barrier_type(self): return BarrierType.Multiplier def _get_variable_name(self) -> str: return "multiplier" def _get_vertical_variables(self) -> list: return ["layer"] def _compute_barrier_values( self, snapped_dataset, edge_index, idomain, top, bottom, k ): barrier_values = self.__multiplier_layer(snapped_dataset, edge_index, idomain) return barrier_values def __multiplier_layer( self, snapped_dataset: xu.UgridDataset, edge_index: np.ndarray, idomain: xu.UgridDataArray, ) -> xr.DataArray: """ Returns layered xarray with barrier multiplier distrubuted over layers """ hfb_multiplier = snapped_dataset[self._get_variable_name()].values[edge_index] hfb_layer = snapped_dataset["layer"].values[edge_index] nlay = idomain.layer.size model_layer = np.repeat(idomain.layer.values, hfb_multiplier.shape[0]).reshape( (nlay, hfb_multiplier.shape[0]) ) data = np.where(model_layer == hfb_layer, hfb_multiplier, np.nan) return xr.DataArray( data=data, coords={ "layer": np.arange(nlay) + 1, }, dims=["layer", "mesh2d_nFaces"], )
[docs] class HorizontalFlowBarrierResistance(HorizontalFlowBarrierBase): """ Horizontal Flow Barrier (HFB) package Input to the Horizontal Flow Barrier (HFB) Package is read from the file that has type "HFB6" in the Name File. Only one HFB Package can be specified for a GWF model. https://water.usgs.gov/water-resources/software/MODFLOW-6/mf6io_6.2.2.pdf Parameters ---------- geometry: gpd.GeoDataFrame Dataframe that describes: - geometry: the geometries of the barriers, - resistance: the resistance of the barriers print_input: bool Examples -------- >>> x = [-10.0, 0.0, 10.0] >>> y = [10.0, 0.0, -10.0] >>> ztop = [10.0, 20.0, 15.0] >>> zbot = [-10.0, -20.0, 0.0] >>> polygons = linestring_to_trapezoid_zpolygons(x, y, ztop, zbot) >>> barrier_gdf = gpd.GeoDataFrame( >>> geometry=polygons, >>> data={ >>> "resistance": [1e3, 1e3], >>> }, >>> ) >>> hfb = imod.mf6.HorizontalFlowBarrierResistance(barrier_gdf) """
[docs] @init_log_decorator() def __init__( self, geometry: "gpd.GeoDataFrame", print_input=False, ): super().__init__(geometry, print_input)
def _get_barrier_type(self): return BarrierType.Resistance def _get_variable_name(self) -> str: return "resistance" def _get_vertical_variables(self) -> list: return [] def _compute_barrier_values( self, snapped_dataset, edge_index, idomain, top, bottom, k ): barrier_values = self._resistance_layer_overlap( snapped_dataset, edge_index, top, bottom, k ) return barrier_values
[docs] class SingleLayerHorizontalFlowBarrierResistance(HorizontalFlowBarrierBase): """ Horizontal Flow Barrier (HFB) package Input to the Horizontal Flow Barrier (HFB) Package is read from the file that has type "HFB6" in the Name File. Only one HFB Package can be specified for a GWF model. https://water.usgs.gov/water-resources/software/MODFLOW-6/mf6io_6.2.2.pdf Parameters ---------- geometry: gpd.GeoDataFrame Dataframe that describes: - geometry: the geometries of the barriers, - resistance: the resistance of the barriers - layer: model layer for the barrier, only 1 single layer can be entered. print_input: bool Examples -------- >>> barrier_x = [-1000.0, 0.0, 1000.0] >>> barrier_y = [500.0, 250.0, 500.0] >>> barrier_gdf = gpd.GeoDataFrame( >>> geometry=[shapely.linestrings(barrier_x, barrier_y),], >>> data={ >>> "resistance": [1e3,], >>> "layer": [2,], >>> }, >>> ) >>> hfb = imod.mf6.SingleLayerHorizontalFlowBarrierResistance(barrier_gdf) """ _write_schemata = { "geometry": [EmptyIndexesSchema()], "layer": [MaxNUniqueValuesSchema(1)], }
[docs] @init_log_decorator() def __init__( self, geometry: "gpd.GeoDataFrame", print_input=False, ): super().__init__(geometry, print_input)
def _get_barrier_type(self): return BarrierType.Resistance def _get_variable_name(self) -> str: return "resistance" def _get_vertical_variables(self) -> list: return ["layer"] def _compute_barrier_values( self, snapped_dataset, edge_index, idomain, top, bottom, k ): barrier_values = self._resistance_layer( snapped_dataset, edge_index, idomain, ) return barrier_values
[docs] @classmethod def from_imod5_data(cls, key: str, imod5_data: Dict[str, Dict[str, GridDataArray]]): imod5_keys = list(imod5_data.keys()) if key not in imod5_keys: raise ValueError("hfb key not present.") hfb_dict = imod5_data[key] if not list(hfb_dict.keys()) == ["geodataframe", "layer"]: raise ValueError("hfb is not a SingleLayerHorizontalFlowBarrierResistance") layer = hfb_dict["layer"] if layer == 0: raise ValueError( "assigning to layer 0 is not supported for " "SingleLayerHorizontalFlowBarrierResistance. " "Try HorizontalFlowBarrierResistance class." ) geometry_layer = hfb_dict["geodataframe"] geometry_layer["layer"] = layer return cls(geometry_layer)