Source code for imod.mf6.hfb

import abc
import copy
import textwrap
import typing
from copy import deepcopy
from enum import Enum
from typing import TYPE_CHECKING, Optional, Tuple

import cftime
import numpy as np
import xarray as xr
import xugrid as xu
from fastcore.dispatch import typedispatch

from imod.logging import init_log_decorator
from imod.mf6.boundary_condition import BoundaryCondition
from imod.mf6.interfaces.ilinedatapackage import ILineDataPackage
from imod.mf6.mf6_hfb_adapter import Mf6HorizontalFlowBarrier
from imod.mf6.package import Package
from imod.mf6.utilities.grid import broadcast_to_full_domain
from imod.schemata import EmptyIndexesSchema
from imod.typing import GridDataArray
from imod.util.imports import MissingOptionalModule

if TYPE_CHECKING:
    import geopandas as gpd
else:
    try:
        import geopandas as gpd
    except ImportError:
        gpd = MissingOptionalModule("geopandas")

try:
    import shapely
except ImportError:
    shapely = MissingOptionalModule("shapely")


@typedispatch
def _derive_connected_cell_ids(
    idomain: xr.DataArray, grid: xu.Ugrid2d, edge_index: np.ndarray
):
    """
    Derive the cell ids of the connected cells of an edge on a structured grid.

    Parameters
    ----------
    idomain: xr.DataArray
        The active domain
    grid :
        The unstructured grid of the domain
    edge_index :
        The indices of the edges from which the connected cell ids are computed

    Returns A dataset containing the cell_id1 and cell_id2 data variables. The  cell dimensions are stored in the
    cell_dims coordinates.
    -------

    """
    edge_faces = grid.edge_face_connectivity
    cell2d = edge_faces[edge_index]

    shape = (idomain["y"].size, idomain["x"].size)
    row_1, column_1 = np.unravel_index(cell2d[:, 0], shape)
    row_2, column_2 = np.unravel_index(cell2d[:, 1], shape)

    cell_ids = xr.Dataset()

    cell_ids["cell_id1"] = xr.DataArray(
        np.array([row_1 + 1, column_1 + 1]).T,
        coords={
            "edge_index": edge_index,
            "cell_dims1": ["row_1", "column_1"],
        },
    )

    cell_ids["cell_id2"] = xr.DataArray(
        np.array([row_2 + 1, column_2 + 1]).T,
        coords={
            "edge_index": edge_index,
            "cell_dims2": ["row_2", "column_2"],
        },
    )

    return cell_ids


@typedispatch  # type: ignore[no-redef]
def _derive_connected_cell_ids(
    _: xu.UgridDataArray, grid: xu.Ugrid2d, edge_index: np.ndarray
):
    """
    Derive the cell ids of the connected cells of an edge on an unstructured grid.

    Parameters
    ----------
    grid :
        The unstructured grid of the domain
    edge_index :
        The indices of the edges from which the connected cell ids are computed

    Returns A dataset containing the cell_id1 and cell_id2 data variables. The  cell dimensions are stored in the
    cell_dims coordinates.
    -------

    """
    edge_faces = grid.edge_face_connectivity
    cell2d = edge_faces[edge_index]

    cell2d_1 = cell2d[:, 0]
    cell2d_2 = cell2d[:, 1]

    cell_ids = xr.Dataset()

    cell_ids["cell_id1"] = xr.DataArray(
        np.array([cell2d_1 + 1]).T,
        coords={
            "edge_index": edge_index,
            "cell_dims1": ["cell2d_1"],
        },
    )

    cell_ids["cell_id2"] = xr.DataArray(
        np.array([cell2d_2 + 1]).T,
        coords={
            "edge_index": edge_index,
            "cell_dims2": ["cell2d_2"],
        },
    )

    return cell_ids


def to_connected_cells_dataset(
    idomain: GridDataArray,
    grid: xu.Ugrid2d,
    edge_index: np.ndarray,
    edge_values: dict,
) -> xr.Dataset:
    """
    Converts a cell edge grid with values defined on the edges to a dataset with the cell ids of the connected cells,
    the layer of the cells and the value of the edge. The connected cells are returned in cellid notation e.g.(row,
    column) for structured grid, (mesh2d_nFaces) for unstructured grids.

    Parameters
    ----------
    idomain: GridDataArray
        active domain
    grid: xu.Ugrid2d,
        unstructured grid containing the edge to cell array
    edge_index: np.ndarray
        indices of the edges for which the edge values will be converted to values in the connected cells
    edge_values: dict
        dictionary containing the value name and the edge values that are applied on the edges identified by the
        edge_index

    Returns
    ----------
        a dataset containing:
            - cell_id1
            - cell_id2
            - layer
            - value name

    """
    barrier_dataset = _derive_connected_cell_ids(idomain, grid, edge_index)

    for name, values in edge_values.items():
        barrier_dataset[name] = xr.DataArray(
            values,
            dims=["layer", "edge_index"],
            coords={
                "edge_index": edge_index,
                "layer": values.coords["layer"],
            },
        )

    barrier_dataset = (
        barrier_dataset.stack(cell_id=("layer", "edge_index"), create_index=False)
        .drop_vars("edge_index")
        .reset_coords()
    )

    return barrier_dataset.dropna("cell_id")


def _fraction_layer_overlap(
    snapped_dataset: xu.UgridDataset,
    edge_index: np.ndarray,
    top: xu.UgridDataArray,
    bottom: xu.UgridDataArray,
):
    """
    Computes the fraction a barrier occupies inside a layer.
    """
    left, right = snapped_dataset.ugrid.grid.edge_face_connectivity[edge_index].T
    top_mean = _mean_left_and_right(top, left, right)
    bottom_mean = _mean_left_and_right(bottom, left, right)

    n_layer, n_edge = top_mean.shape
    layer_bounds = np.empty((n_edge, n_layer, 2), dtype=float)
    layer_bounds[..., 0] = typing.cast(np.ndarray, bottom_mean.values).T
    layer_bounds[..., 1] = typing.cast(np.ndarray, top_mean.values).T

    hfb_bounds = np.empty((n_edge, n_layer, 2), dtype=float)
    hfb_bounds[..., 0] = (
        snapped_dataset["zbottom"].values[edge_index].reshape(n_edge, 1)
    )
    hfb_bounds[..., 1] = snapped_dataset["ztop"].values[edge_index].reshape(n_edge, 1)

    overlap = _vectorized_overlap(hfb_bounds, layer_bounds)
    height = layer_bounds[..., 1] - layer_bounds[..., 0]
    # Avoid runtime warnings when diving by 0:
    height[height <= 0] = np.nan
    fraction = (overlap / height).T

    return xr.ones_like(top_mean) * fraction


def _mean_left_and_right(
    cell_values: xu.UgridDataArray, left: np.ndarray, right: np.ndarray
) -> xr.Dataset:
    """
    This method computes the mean value of cell pairs. The left array specifies the first cell, the right array
    the second cells. The mean is computed by (first_cell+second_cell/2.0)

    Parameters
    ----------
    cell_values: xu.UgridDataArray
        The array containing the data values of all the cells
    left :
        The array containing indices to the first cells
    right :
        The array containing indices to the second cells

    Returns
    -------
        The means of the cells

    """
    facedim = cell_values.ugrid.grid.face_dimension
    uda_left = cell_values.ugrid.obj.isel({facedim: left}).drop_vars(facedim)
    uda_right = cell_values.ugrid.obj.isel({facedim: right}).drop_vars(facedim)

    return xr.concat((uda_left, uda_right), dim="two").mean("two")


def _vectorized_overlap(bounds_a: np.ndarray, bounds_b: np.ndarray):
    """
    Vectorized overlap computation. Returns the overlap of 2 vectors along the same axis.
    If there is no overlap zero will be returned.

            b1

      a1    |
      ▲     |
      |     |
      |     ▼
      ▼     b0
      a0

    To compute the overlap of the 2 vectors the maximum of a0,b0, is subtracted from the minimum of a1,b1.

    Compare with:

    overlap = max(0, min(a[1], b[1]) - max(a[0], b[0]))
    """
    return np.maximum(
        0.0,
        np.minimum(bounds_a[..., 1], bounds_b[..., 1])
        - np.maximum(bounds_a[..., 0], bounds_b[..., 0]),
    )


class BarrierType(Enum):
    HydraulicCharacteristic = 0
    Multiplier = 1
    Resistance = 2


class HorizontalFlowBarrierBase(BoundaryCondition, ILineDataPackage):
    _pkg_id = "hfb"

    _period_data = ()
    _init_schemata = {}
    _write_schemata = {"geometry": [EmptyIndexesSchema()]}

    def __init__(
        self,
        geometry: "gpd.GeoDataFrame",
        print_input: bool = False,
    ) -> None:
        dict_dataset = {"print_input": print_input}
        super().__init__(dict_dataset)
        self.line_data = geometry

    def _get_variable_names_for_gdf(self) -> list[str]:
        return [
            self._get_variable_name(),
            "geometry",
        ] + self._get_vertical_variables()

    @property
    def line_data(self) -> "gpd.GeoDataFrame":
        variables_for_gdf = self._get_variable_names_for_gdf()
        return gpd.GeoDataFrame(
            self.dataset[variables_for_gdf].to_dataframe(),
            geometry="geometry",
        )

    @line_data.setter
    def line_data(self, value: "gpd.GeoDataFrame") -> None:
        variables_for_gdf = self._get_variable_names_for_gdf()
        self.dataset = self.dataset.merge(
            value.to_xarray(), overwrite_vars=variables_for_gdf, join="right"
        )

    def render(self, directory, pkgname, globaltimes, binary):
        raise NotImplementedError(
            f"""{self.__class__.__name__} is a grid-agnostic package and does not have a render method. To render the
            package, first convert to a Modflow6 package by calling pkg.to_mf6_pkg()"""
        )

    def _netcdf_encoding(self):
        return {"geometry": {"dtype": "str"}}

    @classmethod
    def from_file(cls, path, **kwargs):
        instance = super().from_file(path, **kwargs)
        instance.dataset["geometry"] = shapely.wkt.loads(instance.dataset["geometry"])

        return instance

    def _compute_barrier_values(
        self, snapped_dataset, edge_index, idomain, top, bottom, k
    ):
        raise NotImplementedError()

    def to_mf6_pkg(
        self,
        idomain: GridDataArray,
        top: GridDataArray,
        bottom: GridDataArray,
        k: GridDataArray,
        validate: bool = False,
    ) -> Mf6HorizontalFlowBarrier:
        """
        Write package to Modflow 6 package.

        Based on the model grid, top and bottoms, the layers in which the barrier belong are computed. If the
        barrier only partially occupies a layer an effective resistance or hydraulic conductivity for that layer is
        calculated. This calculation is skipped for the Multiplier type.

        Parameters
        ----------
        idomain: GridDataArray
             Grid with active cells.
        top: GridDataArray
            Grid with top of model layers.
        bottom: GridDataArray
            Grid with bottom of model layers.
        k: GridDataArray
            Grid with hydraulic conductivities.
        validate: bool
            Run validation before converting

        Returns
        -------

        """
        if validate:
            self._validate(self._write_schemata)

        top, bottom = broadcast_to_full_domain(idomain, top, bottom)
        k = idomain * k
        unstructured_grid, top, bottom, k = (
            self.__to_unstructured(idomain, top, bottom, k)
            if isinstance(idomain, xr.DataArray)
            else [idomain, top, bottom, k]
        )
        snapped_dataset, edge_index = self.__snap_to_grid(idomain)
        edge_index = self.__remove_invalid_edges(unstructured_grid, edge_index)

        barrier_values = self._compute_barrier_values(
            snapped_dataset, edge_index, idomain, top, bottom, k
        )
        barrier_values = self.__remove_edge_values_connected_to_inactive_cells(
            barrier_values, unstructured_grid, edge_index
        )

        if (barrier_values.size == 0) | np.isnan(barrier_values).all():
            raise ValueError(
                textwrap.dedent(
                    """
                    No barriers could be assigned to cell edges,
                    this is caused by either one of the following:
                    \t- Barriers fall completely outside the model grid
                    \t- Barriers were assigned to the edge of the model domain
                    \t- Barriers were assigned to edges of inactive cells
                    """
                )
            )

        barrier_dataset = to_connected_cells_dataset(
            idomain,
            unstructured_grid.ugrid.grid,
            edge_index,
            {
                "hydraulic_characteristic": self.__to_hydraulic_characteristic(
                    barrier_values
                )
            },
        )

        barrier_dataset["print_input"] = self.dataset["print_input"]

        return Mf6HorizontalFlowBarrier(**barrier_dataset.data_vars)

    def is_empty(self) -> bool:
        if super().is_empty():
            return True

        linestrings = self.dataset["geometry"]
        only_empty_lines = all(ls.is_empty for ls in linestrings.values)
        return only_empty_lines

    def _resistance_layer(
        self,
        snapped_dataset: xu.UgridDataset,
        edge_index: np.ndarray,
        idomain: xu.UgridDataArray,
    ) -> xr.DataArray:
        """
        Returns layered xarray with barrier resistance distributed over layers
        """
        hfb_resistance = snapped_dataset[self._get_variable_name()].values[edge_index]
        hfb_layer = snapped_dataset["layer"].values[edge_index]
        nlay = idomain.layer.size
        model_layer = np.repeat(idomain.layer.values, hfb_resistance.size).reshape(
            (nlay, hfb_resistance.size)
        )
        data = np.where(model_layer == hfb_layer, hfb_resistance, np.nan)
        return xr.DataArray(
            data=data,
            coords={
                "layer": np.arange(nlay) + 1,
            },
            dims=["layer", "mesh2d_nFaces"],
        )

    def _resistance_layer_overlap(
        self,
        snapped_dataset: xu.UgridDataset,
        edge_index: np.ndarray,
        top: xu.UgridDataArray,
        bottom: xu.UgridDataArray,
        k: xu.UgridDataArray,
    ) -> xr.DataArray:
        """
        Computes the effective value of a barrier that partially overlaps a cell in the z direction.
        A barrier always lies on an edge in the xy-plane, however in doesn't have to fully extend in the z-direction to
        cover the entire layer. This method computes the effective resistance in that case.

                        Barrier
        ......................................  ▲     ▲
        .                @@@@                .  |     |
        .                @Rb@                .  | Lb  |
        .    Cell1       @@@@     Cell2      .  ▼     | H
        .                :  :                .        |
        .                :  :                .        |
        .................:..:.................        ▼
                k1                    k2

        The effective value of a partially emerged barrier in a layer is computed by:
        c_total = 1.0 / (fraction / Rb + (1.0 - fraction) / c_aquifer)
        c_aquifer = 1.0 / k_mean = 1.0 / ((k1 + k2) / 2.0)
        fraction = Lb / H

        """
        left, right = snapped_dataset.ugrid.grid.edge_face_connectivity[edge_index].T
        k_mean = _mean_left_and_right(k, left, right)

        resistance = self.__to_resistance(
            snapped_dataset[self._get_variable_name()]
        ).values[edge_index]

        fraction = _fraction_layer_overlap(snapped_dataset, edge_index, top, bottom)

        c_aquifer = 1.0 / k_mean
        inverse_c = (fraction / resistance) + ((1.0 - fraction) / c_aquifer)
        c_total = 1.0 / inverse_c

        return self.__from_resistance(c_total)

    def __to_resistance(self, value: xu.UgridDataArray) -> xu.UgridDataArray:
        match self._get_barrier_type():
            case BarrierType.HydraulicCharacteristic:
                return 1.0 / value
            case BarrierType.Multiplier:
                return -1.0 / value
            case BarrierType.Resistance:
                return value

        raise ValueError(r"Unknown barrier type {barrier_type}")

    def __from_resistance(self, resistance: xr.DataArray) -> xr.DataArray:
        match self._get_barrier_type():
            case BarrierType.HydraulicCharacteristic:
                return 1.0 / resistance
            case BarrierType.Multiplier:
                return -1.0 / resistance
            case BarrierType.Resistance:
                return resistance

        raise ValueError(r"Unknown barrier type {barrier_type}")

    def __to_hydraulic_characteristic(self, value: xr.DataArray) -> xr.DataArray:
        match self._get_barrier_type():
            case BarrierType.HydraulicCharacteristic:
                return value
            case BarrierType.Multiplier:
                return -1.0 * value
            case BarrierType.Resistance:
                return 1.0 / value

        raise ValueError(r"Unknown barrier type {barrier_type}")

    @abc.abstractmethod
    def _get_barrier_type(self) -> BarrierType:
        raise NotImplementedError

    @abc.abstractmethod
    def _get_variable_name(self) -> str:
        raise NotImplementedError

    @abc.abstractmethod
    def _get_vertical_variables(self) -> list:
        raise NotImplementedError

    def clip_box(
        self,
        time_min: Optional[cftime.datetime | np.datetime64 | str] = None,
        time_max: Optional[cftime.datetime | np.datetime64 | str] = None,
        layer_min: Optional[int] = None,
        layer_max: Optional[int] = None,
        x_min: Optional[float] = None,
        x_max: Optional[float] = None,
        y_min: Optional[float] = None,
        y_max: Optional[float] = None,
        top: Optional[GridDataArray] = None,
        bottom: Optional[GridDataArray] = None,
    ) -> "HorizontalFlowBarrierBase":
        """
        Clip a package by a bounding box (time, layer, y, x).

        Slicing intervals may be half-bounded, by providing None:

        * To select 500.0 <= x <= 1000.0:
          ``clip_box(x_min=500.0, x_max=1000.0)``.
        * To select x <= 1000.0: ``clip_box(x_min=None, x_max=1000.0)``
          or ``clip_box(x_max=1000.0)``.
        * To select x >= 500.0: ``clip_box(x_min = 500.0, x_max=None.0)``
          or ``clip_box(x_min=1000.0)``.

        Parameters
        ----------
        time_min: optional
        time_max: optional
        layer_min: optional, int
        layer_max: optional, int
        x_min: optional, float
        x_max: optional, float
        y_min: optional, float
        y_max: optional, float
        top: optional, GridDataArray
        bottom: optional, GridDataArray

        Returns
        -------
        sliced : Package
        """
        cls = type(self)
        new = cls.__new__(cls)
        new.dataset = copy.deepcopy(self.dataset)
        new.line_data = self.line_data
        return new

    def mask(self, _) -> Package:
        """
        The mask method is irrelevant for this package as it is
        grid-agnostic, instead this method retuns a copy of itself.
        """
        return deepcopy(self)

    @staticmethod
    def __to_unstructured(
        idomain: xr.DataArray, top: xr.DataArray, bottom: xr.DataArray, k: xr.DataArray
    ) -> Tuple[
        xu.UgridDataArray, xu.UgridDataArray, xu.UgridDataArray, xu.UgridDataArray
    ]:
        unstruct = xu.UgridDataArray.from_structured(idomain)
        top = xu.UgridDataArray.from_structured(top)
        bottom = xu.UgridDataArray.from_structured(bottom)
        k = xu.UgridDataArray.from_structured(k)

        return unstruct, top, bottom, k

    def __snap_to_grid(
        self, idomain: GridDataArray
    ) -> Tuple[xu.UgridDataset, np.ndarray]:
        if "layer" in self.dataset:
            variable_names = [self._get_variable_name(), "geometry", "layer"]
        else:
            variable_names = [self._get_variable_name(), "geometry", "ztop", "zbottom"]
        barrier_dataframe = self.dataset[variable_names].to_dataframe()

        snapped_dataset, _ = typing.cast(
            xu.UgridDataset,
            xu.snap_to_grid(barrier_dataframe, grid=idomain, max_snap_distance=0.5),
        )
        edge_index = np.argwhere(
            snapped_dataset[self._get_variable_name()].notnull().values
        ).ravel()

        return snapped_dataset, edge_index

    @staticmethod
    def __remove_invalid_edges(
        unstructured_grid: xu.UgridDataArray, edge_index: np.ndarray
    ) -> np.ndarray:
        """
        Remove invalid edges indices. An edge is considered invalid when:
        - it lies on an exterior boundary (face_connectivity equals the grid fill value)
        - The corresponding connected cells are inactive
        """
        grid = unstructured_grid.ugrid.grid
        face_dimension = unstructured_grid.ugrid.grid.face_dimension
        face_connectivity = grid.edge_face_connectivity[edge_index]

        valid_edges = np.all(face_connectivity != grid.fill_value, axis=1)

        connected_cells = -np.ones((len(edge_index), 2))
        connected_cells[valid_edges, 0] = (
            unstructured_grid.sel(layer=1)
            .loc[{face_dimension: face_connectivity[valid_edges, 0]}]
            .values
        )
        connected_cells[valid_edges, 1] = (
            unstructured_grid.sel(layer=1)
            .loc[{face_dimension: face_connectivity[valid_edges, 1]}]
            .values
        )

        valid = (connected_cells > 0).all(axis=1)

        return edge_index[valid]

    @staticmethod
    def __remove_edge_values_connected_to_inactive_cells(
        values, unstructured_grid: xu.UgridDataArray, edge_index: np.ndarray
    ):
        face_dimension = unstructured_grid.ugrid.grid.face_dimension

        face_connectivity = unstructured_grid.ugrid.grid.edge_face_connectivity[
            edge_index
        ]
        connected_cells_left = unstructured_grid.loc[
            {face_dimension: face_connectivity[:, 0]}
        ]
        connected_cells_right = unstructured_grid.loc[
            {face_dimension: face_connectivity[:, 1]}
        ]

        return values.where(
            (connected_cells_left.drop(face_dimension) > 0)
            & (connected_cells_right.drop(face_dimension) > 0)
        )


[docs] class HorizontalFlowBarrierHydraulicCharacteristic(HorizontalFlowBarrierBase): """ Horizontal Flow Barrier (HFB) package Input to the Horizontal Flow Barrier (HFB) Package is read from the file that has type "HFB6" in the Name File. Only one HFB Package can be specified for a GWF model. https://water.usgs.gov/water-resources/software/MODFLOW-6/mf6io_6.2.2.pdf Parameters ---------- geometry: gpd.GeoDataFrame Dataframe that describes: - geometry: the geometries of the barriers, - hydraulic_characteristic: the hydraulic characteristic of the barriers - ztop: the top z-value of the barriers - zbottom: the bottom z-value of the barriers print_input: bool Examples -------- >>> barrier_x = [-1000.0, 0.0, 1000.0] >>> barrier_y = [500.0, 250.0, 500.0] >>> barrier_gdf = gpd.GeoDataFrame( >>> geometry=[shapely.linestrings(barrier_x, barrier_y),], >>> data={ >>> "hydraulic_characteristic": [1e-3,], >>> "ztop": [10.0,], >>> "zbottom": [0.0,], >>> }, >>> ) >>> hfb = imod.mf6.HorizontalFlowBarrierHydraulicCharacteristic(barrier_gdf) """
[docs] @init_log_decorator() def __init__( self, geometry: "gpd.GeoDataFrame", print_input=False, ): super().__init__(geometry, print_input)
def _get_barrier_type(self): return BarrierType.HydraulicCharacteristic def _get_variable_name(self) -> str: return "hydraulic_characteristic" def _get_vertical_variables(self) -> list: return ["ztop", "zbottom"] def _compute_barrier_values( self, snapped_dataset, edge_index, idomain, top, bottom, k ): barrier_values = self._resistance_layer_overlap( snapped_dataset, edge_index, top, bottom, k ) return barrier_values
class LayeredHorizontalFlowBarrierHydraulicCharacteristic(HorizontalFlowBarrierBase): """ Horizontal Flow Barrier (HFB) package Input to the Horizontal Flow Barrier (HFB) Package is read from the file that has type "HFB6" in the Name File. Only one HFB Package can be specified for a GWF model. https://water.usgs.gov/water-resources/software/MODFLOW-6/mf6io_6.2.2.pdf Parameters ---------- geometry: gpd.GeoDataFrame Dataframe that describes: - geometry: the geometries of the barriers, - hydraulic_characteristic: the hydraulic characteristic of the barriers - layer: model layer for the barrier print_input: bool Examples -------- >>> barrier_x = [-1000.0, 0.0, 1000.0] >>> barrier_y = [500.0, 250.0, 500.0] >>> barrier_gdf = gpd.GeoDataFrame( >>> geometry=[shapely.linestrings(barrier_x, barrier_y),], >>> data={ >>> "hydraulic_characteristic": [1e-3,], >>> "layer": [1,] >>> }, >>> ) >>> hfb = imod.mf6.LayeredHorizontalFlowBarrierHydraulicCharacteristic(barrier_gdf) """ @init_log_decorator() def __init__( self, geometry: "gpd.GeoDataFrame", print_input=False, ): super().__init__(geometry, print_input) def _get_barrier_type(self): return BarrierType.HydraulicCharacteristic def _get_variable_name(self) -> str: return "hydraulic_characteristic" def _get_vertical_variables(self) -> list: return ["layer"] def _compute_barrier_values( self, snapped_dataset, edge_index, idomain, top, bottom, k ): barrier_values = self._resistance_layer( snapped_dataset, edge_index, idomain, ) return barrier_values
[docs] class HorizontalFlowBarrierMultiplier(HorizontalFlowBarrierBase): """ Horizontal Flow Barrier (HFB) package Input to the Horizontal Flow Barrier (HFB) Package is read from the file that has type "HFB6" in the Name File. Only one HFB Package can be specified for a GWF model. https://water.usgs.gov/water-resources/software/MODFLOW-6/mf6io_6.2.2.pdf If parts of the barrier overlap a layer the multiplier is applied to the entire layer. Parameters ---------- geometry: gpd.GeoDataFrame Dataframe that describes: - geometry: the geometries of the barriers, - multiplier: the multiplier of the barriers - ztop: the top z-value of the barriers - zbottom: the bottom z-value of the barriers print_input: bool Examples -------- >>> barrier_x = [-1000.0, 0.0, 1000.0] >>> barrier_y = [500.0, 250.0, 500.0] >>> barrier_gdf = gpd.GeoDataFrame( >>> geometry=[shapely.linestrings(barrier_x, barrier_y),], >>> data={ >>> "multiplier": [1.5,], >>> "ztop": [10.0,], >>> "zbottom": [0.0,], >>> }, >>> ) >>> hfb = imod.mf6.HorizontalFlowBarrierMultiplier(barrier_gdf) """
[docs] @init_log_decorator() def __init__( self, geometry: "gpd.GeoDataFrame", print_input=False, ): super().__init__(geometry, print_input)
def _get_barrier_type(self): return BarrierType.Multiplier def _get_variable_name(self) -> str: return "multiplier" def _get_vertical_variables(self) -> list: return ["ztop", "zbottom"] def _compute_barrier_values( self, snapped_dataset, edge_index, idomain, top, bottom, k ): fraction = _fraction_layer_overlap(snapped_dataset, edge_index, top, bottom) barrier_values = ( fraction.where(fraction) * snapped_dataset[self._get_variable_name()].values[edge_index] ) return barrier_values
class LayeredHorizontalFlowBarrierMultiplier(HorizontalFlowBarrierBase): """ Horizontal Flow Barrier (HFB) package Input to the Horizontal Flow Barrier (HFB) Package is read from the file that has type "HFB6" in the Name File. Only one HFB Package can be specified for a GWF model. https://water.usgs.gov/water-resources/software/MODFLOW-6/mf6io_6.2.2.pdf If parts of the barrier overlap a layer the multiplier is applied to the entire layer. Parameters ---------- geometry: gpd.GeoDataFrame Dataframe that describes: - geometry: the geometries of the barriers, - multiplier: the multiplier of the barriers - layer: model layer for the barrier print_input: bool Examples -------- >>> barrier_x = [-1000.0, 0.0, 1000.0] >>> barrier_y = [500.0, 250.0, 500.0] >>> barrier_gdf = gpd.GeoDataFrame( >>> geometry=[shapely.linestrings(barrier_x, barrier_y),], >>> data={ >>> "multiplier": [1.5,], >>> "layer": [1,], >>> }, >>> ) >>> hfb = imod.mf6.LayeredHorizontalFlowBarrierMultiplier(barrier_gdf) """ @init_log_decorator() def __init__( self, geometry: "gpd.GeoDataFrame", print_input=False, ): super().__init__(geometry, print_input) def _get_barrier_type(self): return BarrierType.Multiplier def _get_variable_name(self) -> str: return "multiplier" def _get_vertical_variables(self) -> list: return ["layer"] def _compute_barrier_values( self, snapped_dataset, edge_index, idomain, top, bottom, k ): barrier_values = self.__multiplier_layer(snapped_dataset, edge_index, idomain) return barrier_values def __multiplier_layer( self, snapped_dataset: xu.UgridDataset, edge_index: np.ndarray, idomain: xu.UgridDataArray, ) -> xr.DataArray: """ Returns layered xarray with barrier multiplier distrubuted over layers """ hfb_multiplier = snapped_dataset[self._get_variable_name()].values[edge_index] hfb_layer = snapped_dataset["layer"].values[edge_index] nlay = idomain.layer.size model_layer = np.repeat(idomain.layer.values, hfb_multiplier.shape[0]).reshape( (nlay, hfb_multiplier.shape[0]) ) data = np.where(model_layer == hfb_layer, hfb_multiplier, np.nan) return xr.DataArray( data=data, coords={ "layer": np.arange(nlay) + 1, }, dims=["layer", "mesh2d_nFaces"], )
[docs] class HorizontalFlowBarrierResistance(HorizontalFlowBarrierBase): """ Horizontal Flow Barrier (HFB) package Input to the Horizontal Flow Barrier (HFB) Package is read from the file that has type "HFB6" in the Name File. Only one HFB Package can be specified for a GWF model. https://water.usgs.gov/water-resources/software/MODFLOW-6/mf6io_6.2.2.pdf Parameters ---------- geometry: gpd.GeoDataFrame Dataframe that describes: - geometry: the geometries of the barriers, - resistance: the resistance of the barriers - ztop: the top z-value of the barriers - zbottom: the bottom z-value of the barriers print_input: bool Examples -------- >>> barrier_x = [-1000.0, 0.0, 1000.0] >>> barrier_y = [500.0, 250.0, 500.0] >>> barrier_gdf = gpd.GeoDataFrame( >>> geometry=[shapely.linestrings(barrier_x, barrier_y),], >>> data={ >>> "resistance": [1e3,], >>> "ztop": [10.0,], >>> "zbottom": [0.0,], >>> }, >>> ) >>> hfb = imod.mf6.HorizontalFlowBarrierResistance(barrier_gdf) """
[docs] @init_log_decorator() def __init__( self, geometry: "gpd.GeoDataFrame", print_input=False, ): super().__init__(geometry, print_input)
def _get_barrier_type(self): return BarrierType.Resistance def _get_variable_name(self) -> str: return "resistance" def _get_vertical_variables(self) -> list: return ["ztop", "zbottom"] def _compute_barrier_values( self, snapped_dataset, edge_index, idomain, top, bottom, k ): barrier_values = self._resistance_layer_overlap( snapped_dataset, edge_index, top, bottom, k ) return barrier_values
class LayeredHorizontalFlowBarrierResistance(HorizontalFlowBarrierBase): """ Horizontal Flow Barrier (HFB) package Input to the Horizontal Flow Barrier (HFB) Package is read from the file that has type "HFB6" in the Name File. Only one HFB Package can be specified for a GWF model. https://water.usgs.gov/water-resources/software/MODFLOW-6/mf6io_6.2.2.pdf Parameters ---------- geometry: gpd.GeoDataFrame Dataframe that describes: - geometry: the geometries of the barriers, - resistance: the resistance of the barriers - layer: model layer for the barrier print_input: bool Examples -------- >>> barrier_x = [-1000.0, 0.0, 1000.0] >>> barrier_y = [500.0, 250.0, 500.0] >>> barrier_gdf = gpd.GeoDataFrame( >>> geometry=[shapely.linestrings(barrier_x, barrier_y),], >>> data={ >>> "resistance": [1e3,], >>> "layer": [2,], >>> }, >>> ) >>> hfb = imod.mf6.LayeredHorizontalFlowBarrierResistance(barrier_gdf) """ @init_log_decorator() def __init__( self, geometry: "gpd.GeoDataFrame", print_input=False, ): super().__init__(geometry, print_input) def _get_barrier_type(self): return BarrierType.Resistance def _get_variable_name(self) -> str: return "resistance" def _get_vertical_variables(self) -> list: return ["layer"] def _compute_barrier_values( self, snapped_dataset, edge_index, idomain, top, bottom, k ): barrier_values = self._resistance_layer( snapped_dataset, edge_index, idomain, ) return barrier_values