Source code for imod.prepare.wells

"""
Assign wells to layers.
"""

from typing import Optional

import numpy as np
import numpy.typing as npt
import pandas as pd
import xarray as xr
import xugrid as xu

import imod
from imod.typing import GridDataArray


def compute_vectorized_overlap(
    bounds_a: npt.NDArray[np.float64], bounds_b: npt.NDArray[np.float64]
) -> npt.NDArray[np.float64]:
    """
    Vectorized overlap computation.
    Compare with:
    overlap = max(0, min(a[1], b[1]) - max(a[0], b[0]))
    """
    return np.maximum(
        0.0,
        np.minimum(bounds_a[:, 1], bounds_b[:, 1])
        - np.maximum(bounds_a[:, 0], bounds_b[:, 0]),
    )


def compute_point_filter_overlap(
    bounds_wells: npt.NDArray[np.float64], bounds_layers: npt.NDArray[np.float64]
) -> npt.NDArray[np.float64]:
    """
    Special case for filters with zero filter length, these are set to layer
    thickness. Filters which are not in a layer or have a nonzero filter length
    are set to zero overlap.
    """
    # Unwrap for readability
    wells_top = bounds_wells[:, 1]
    wells_bottom = bounds_wells[:, 0]
    layers_top = bounds_layers[:, 1]
    layers_bottom = bounds_layers[:, 0]

    has_zero_filter_length = wells_top == wells_bottom
    in_layer = (layers_top >= wells_top) & (layers_bottom < wells_bottom)
    layer_thickness = layers_top - layers_bottom
    # Multiplication to set any elements not meeting the criteria to zero.
    point_filter_overlap = (
        has_zero_filter_length.astype(float) * in_layer.astype(float) * layer_thickness
    )
    return point_filter_overlap


def compute_overlap(
    wells: pd.DataFrame, top: GridDataArray, bottom: GridDataArray
) -> npt.NDArray[np.float64]:
    # layer bounds stack shape of (n_well, n_layer, 2)
    layer_bounds_stack = np.stack((bottom, top), axis=-1)
    well_bounds = np.broadcast_to(
        np.stack(
            (wells["bottom"].to_numpy(), wells["top"].to_numpy()),
            axis=-1,
        )[np.newaxis, :, :],
        layer_bounds_stack.shape,
    ).reshape(-1, 2)
    layer_bounds = layer_bounds_stack.reshape(-1, 2)

    # Deal with filters with a nonzero length
    interval_filter_overlap = compute_vectorized_overlap(
        well_bounds,
        layer_bounds,
    )
    # Deal with filters with zero length
    point_filter_overlap = compute_point_filter_overlap(
        well_bounds,
        layer_bounds,
    )
    return np.maximum(interval_filter_overlap, point_filter_overlap)


def locate_wells(
    wells: pd.DataFrame,
    top: GridDataArray,
    bottom: GridDataArray,
    k: Optional[GridDataArray] = None,
    validate: bool = True,
) -> tuple[
    npt.NDArray[np.object_], GridDataArray, GridDataArray, float | GridDataArray
]:
    if not isinstance(top, (xu.UgridDataArray, xr.DataArray)):
        raise TypeError(
            "top and bottom should be DataArray or UgridDataArray, received: "
            f"{type(top).__name__}"
        )

    # Default to a xy_k value of 1.0: weigh every layer equally.
    xy_k: float | GridDataArray = 1.0
    first = wells.groupby("id").first()
    x = first["x"].to_numpy()
    y = first["y"].to_numpy()

    xy_top = imod.select.points_values(top, x=x, y=y, out_of_bounds="ignore")
    xy_bottom = imod.select.points_values(bottom, x=x, y=y, out_of_bounds="ignore")

    # Raise exception if not all wells could be mapped onto the domain
    if validate and len(x) > len(xy_top["index"]):
        inside = imod.select.points_in_bounds(top, x=x, y=y)
        out = np.where(~inside)
        raise ValueError(
            f"well at x = {x[out[0]]} and y = {y[out[0]]} could not be mapped on the grid"
        )

    if k is not None:
        xy_k = imod.select.points_values(k, x=x, y=y, out_of_bounds="ignore")

    # Discard out-of-bounds wells.
    index = xy_top["index"]
    if validate and not np.array_equal(xy_bottom["index"], index):
        raise ValueError("bottom grid does not match top grid")
    if validate and k is not None and not np.array_equal(xy_k["index"], index):  # type: ignore
        raise ValueError("k grid does not match top grid")
    id_in_bounds = first.index[index]

    return id_in_bounds, xy_top, xy_bottom, xy_k


def validate_well_columnnames(
    wells: pd.DataFrame, names: set = {"x", "y", "id"}
) -> None:
    missing = names.difference(wells.columns)
    if missing:
        raise ValueError(f"Columns are missing in wells dataframe: {missing}")


def validate_arg_types_equal(**kwargs):
    types = [type(arg) for arg in (kwargs.values()) if arg is not None]
    if len(set(types)) != 1:
        members = ", ".join([t.__name__ for t in types])
        names = ", ".join(kwargs.keys())
        raise TypeError(f"{names} should be of the same type, received: {members}")


[docs] def assign_wells( wells: pd.DataFrame, top: GridDataArray, bottom: GridDataArray, k: Optional[GridDataArray] = None, minimum_thickness: Optional[float] = 0.05, minimum_k: Optional[float] = 1.0, validate: bool = True, ) -> pd.DataFrame: """ Distribute well pumping rate according to filter length when ``k=None``, or to transmissivity of the sediments surrounding the filter. Minimum thickness and minimum k should be set to avoid placing wells in clay layers. Wells where well screen_top equals screen_bottom are assigned to the layer they are located in, without any subdivision. Wells located outside of the grid are removed. Parameters ---------- wells: pandas.DataFrame Should contain columns x, y, id, top, bottom, rate. top: xarray.DataArray or xugrid.UgridDataArray Top of the model layers. bottom: xarray.DataArray or xugrid.UgridDataArray Bottom of the model layers. k: xarray.DataArray or xugrid.UgridDataArray, optional Horizontal conductivity of the model layers. minimum_thickness: float, optional, default: 0.01 minimum_k: float, optional, default: 1.0 Minimum conductivity validate: bool raise an excpetion if one of the wells is not in the domain Returns ------- placed_wells: pd.DataFrame Wells with rate subdivided per layer. Contains the original columns of ``wells``, as well as layer, overlap, transmissivity. """ columnnames = {"x", "y", "id", "top", "bottom", "rate"} validate_well_columnnames(wells, columnnames) validate_arg_types_equal(top=top, bottom=bottom, k=k) id_in_bounds, xy_top, xy_bottom, xy_k = locate_wells( wells, top, bottom, k, validate ) wells_in_bounds = wells.set_index("id").loc[id_in_bounds].reset_index() first = wells_in_bounds.groupby("id").first() overlap = compute_overlap(first, xy_top, xy_bottom) if isinstance(xy_k, (xr.DataArray, xu.UgridDataArray)): k_for_df = xy_k.values.ravel() else: k_for_df = xy_k # Distribute rate according to transmissivity. n_layer, n_well = xy_top.shape df_factor = pd.DataFrame( index=pd.Index(np.tile(first.index, n_layer), name="id"), data={ "layer": np.repeat(top["layer"], n_well), "overlap": overlap, "k": k_for_df, "transmissivity": overlap * k_for_df, }, ) # remove entries # -in very thin layers or when the wellbore penetrates the layer very little # -in low conductivity layers df_factor = df_factor.loc[ (df_factor["overlap"] >= minimum_thickness) & (df_factor["k"] >= minimum_k) ] df_factor["rate"] = df_factor["transmissivity"] / df_factor.groupby("id")[ "transmissivity" ].transform("sum") # Create a unique index for every id-layer combination. df_factor["index"] = np.arange(len(df_factor)) df_factor = df_factor.reset_index() # Get rid of those that are removed because of minimum thickness or # transmissivity. wells_in_bounds = wells_in_bounds.loc[ wells_in_bounds["id"].isin(df_factor["id"].unique()) ] # Use pandas multi-index broadcasting. # Maintain all other columns as-is. wells_in_bounds["index"] = 1 # N.B. integer! wells_in_bounds["overlap"] = 1.0 wells_in_bounds["k"] = 1.0 wells_in_bounds["transmissivity"] = 1.0 columns = list(set(wells_in_bounds.columns).difference(df_factor.columns)) indexes = ["id"] for dim in ["species", "time"]: if dim in wells_in_bounds: indexes.append(dim) columns.remove(dim) df_factor[columns] = 1 # N.B. integer! assigned = ( wells_in_bounds.set_index(indexes) * df_factor.set_index(["id", "layer"]) ).reset_index() return assigned