Source code for imod.prepare.wells

"""
Assign wells to layers.
"""

from typing import Optional, Union

import numpy as np
import pandas as pd
import xarray as xr
import xugrid as xu

import imod


def vectorized_overlap(bounds_a, bounds_b):
    """
    Vectorized overlap computation.
    Compare with:
    overlap = max(0, min(a[1], b[1]) - max(a[0], b[0]))
    """
    return np.maximum(
        0.0,
        np.minimum(bounds_a[:, 1], bounds_b[:, 1])
        - np.maximum(bounds_a[:, 0], bounds_b[:, 0]),
    )


def compute_overlap(wells, top, bottom):
    # layer bounds shape of (n_well, n_layer, 2)
    layer_bounds = np.stack((bottom, top), axis=-1)
    well_bounds = np.broadcast_to(
        np.stack(
            (wells["bottom"].to_numpy(), wells["top"].to_numpy()),
            axis=-1,
        )[np.newaxis, :, :],
        layer_bounds.shape,
    )
    overlap = vectorized_overlap(
        well_bounds.reshape((-1, 2)),
        layer_bounds.reshape((-1, 2)),
    )
    return overlap


def locate_wells(
    wells: pd.DataFrame,
    top: Union[xr.DataArray, xu.UgridDataArray],
    bottom: Union[xr.DataArray, xu.UgridDataArray],
    k: Optional[Union[xr.DataArray, xu.UgridDataArray]],
    validate: bool = True,
):
    if not isinstance(top, (xu.UgridDataArray, xr.DataArray)):
        raise TypeError(
            "top and bottom should be DataArray or UgridDataArray, received: "
            f"{type(top).__name__}"
        )

    # Default to a xy_k value of 1.0: weigh every layer equally.
    xy_k = 1.0
    first = wells.groupby("id").first()
    x = first["x"].to_numpy()
    y = first["y"].to_numpy()

    xy_top = imod.select.points_values(top, x=x, y=y, out_of_bounds="ignore")
    xy_bottom = imod.select.points_values(bottom, x=x, y=y, out_of_bounds="ignore")

    # Raise exception if not all wells could be mapped onto the domain
    if validate and len(x) > len(xy_top["index"]):
        inside = imod.select.points_in_bounds(top, x=x, y=y)
        out = np.where(~inside)
        raise ValueError(
            f"well at x = {x[out[0]]} and y = {y[out[0]]} could not be mapped on the grid"
        )

    if k is not None:
        xy_k = imod.select.points_values(k, x=x, y=y, out_of_bounds="ignore")

    # Discard out-of-bounds wells.
    index = xy_top["index"]
    if validate and not np.array_equal(xy_bottom["index"], index):
        raise ValueError("bottom grid does not match top grid")
    if validate and k is not None and not np.array_equal(xy_k["index"], index):  # type: ignore
        raise ValueError("k grid does not match top grid")
    id_in_bounds = first.index[index]

    return id_in_bounds, xy_top, xy_bottom, xy_k


[docs] def assign_wells( wells: pd.DataFrame, top: Union[xr.DataArray, xu.UgridDataArray], bottom: Union[xr.DataArray, xu.UgridDataArray], k: Optional[Union[xr.DataArray, xu.UgridDataArray]] = None, minimum_thickness: Optional[float] = 0.05, minimum_k: Optional[float] = 1.0, validate: bool = True, ) -> pd.DataFrame: """ Distribute well pumping rate according to filter length when ``k=None``, or to transmissivity of the sediments surrounding the filter. Minimum thickness and minimum k should be set to avoid placing wells in clay layers. Wells located outside of the grid are removed. Parameters ---------- wells: pd.DataFrame Should contain columns x, y, id, top, bottom, rate. top: xr.DataArray or xu.UgridDataArray Top of the model layers. bottom: xr.DataArray or xu.UgridDataArray Bottom of the model layers. k: xr.DataArray or xu.UgridDataArray, optional Horizontal conductivity of the model layers. minimum_thickness: float, optional, default: 0.01 minimum_k: float, optional, default: 1.0 Minimum conductivity validate: bool raise an excpetion if one of the wells is not in the domain Returns ------- placed_wells: pd.DataFrame Wells with rate subdivided per layer. Contains the original columns of ``wells``, as well as layer, overlap, transmissivity. """ names = {"x", "y", "id", "top", "bottom", "rate"} missing = names.difference(wells.columns) if missing: raise ValueError(f"Columns are missing in wells dataframe: {missing}") types = [type(arg) for arg in (top, bottom, k) if arg is not None] if len(set(types)) != 1: members = ",".join([t.__name__ for t in types]) raise TypeError( "top, bottom, and optionally k should be of the same type, " f"received: {members}" ) id_in_bounds, xy_top, xy_bottom, xy_k = locate_wells( wells, top, bottom, k, validate ) wells_in_bounds = wells.set_index("id").loc[id_in_bounds].reset_index() first = wells_in_bounds.groupby("id").first() overlap = compute_overlap(first, xy_top, xy_bottom) if k is None: k = 1.0 else: k = xy_k.values.ravel() # Distribute rate according to transmissivity. n_layer, n_well = xy_top.shape df = pd.DataFrame( index=pd.Index(np.tile(first.index, n_layer), name="id"), data={ "layer": np.repeat(top["layer"], n_well), "overlap": overlap, "k": k, "transmissivity": overlap * k, }, ) # remove entries # -in very thin layers or when the wellbore penetrates the layer very little # -in low conductivity layers df = df.loc[(df["overlap"] >= minimum_thickness) & (df["k"] >= minimum_k)] df["rate"] = df["transmissivity"] / df.groupby("id")["transmissivity"].transform( "sum" ) # Create a unique index for every id-layer combination. df["index"] = np.arange(len(df)) df = df.reset_index() # Get rid of those that are removed because of minimum thickness or # transmissivity. wells_in_bounds = wells_in_bounds.loc[wells_in_bounds["id"].isin(df["id"].unique())] # Use pandas multi-index broadcasting. # Maintain all other columns as-is. wells_in_bounds["index"] = 1 # N.B. integer! wells_in_bounds["overlap"] = 1.0 wells_in_bounds["k"] = 1.0 wells_in_bounds["transmissivity"] = 1.0 columns = list(set(wells_in_bounds.columns).difference(df.columns)) indexes = ["id"] for dim in ["species", "time"]: if dim in wells_in_bounds: indexes.append(dim) columns.remove(dim) df[columns] = 1 # N.B. integer! assigned = ( wells_in_bounds.set_index(indexes) * df.set_index(["id", "layer"]) ).reset_index() return assigned