Source code for imod.wq.wel

from pathlib import Path

import jinja2
import numpy as np

import imod
from imod.wq.pkgbase import BoundaryCondition


def _column_order(df, variable):
    """
    Return ordered columns, and associated timeseries columns.
    """
    if "time" in df:
        assoc_columns = ["time", variable]
        if "layer" in df:
            columns = ["x", "y", "id", "time", "layer", variable]
        else:
            columns = ["x", "y", "id", "time", variable]
    else:
        assoc_columns = None
        if "layer" in df:
            columns = ["x", "y", variable, "layer", "id"]
        else:
            columns = ["x", "y", variable, "id"]
    return columns, assoc_columns


[docs] class Well(BoundaryCondition): """ The Well package is used to simulate a specified flux to individual cells and specified in units of length3/time. Parameters ---------- id_name: str or list of str name of the well(s). x: float or list of floats x coordinate of the well(s). y: float or list of floats y coordinate of the well(s). rate: float or list of floats. pumping rate in the well(s). layer: "None" or int, optional layer from which the pumping takes place. time: "None" or listlike of np.datetime64, datetime.datetime, pd.Timestamp, cftime.datetime time during which the pumping takes place. Only need to specify if model is transient. save_budget: bool, optional is a flag indicating if the budget should be saved (IRIVCB). Default is False. """ _pkg_id = "wel" _template = jinja2.Template( " {%- for time, timedict in wels.items() -%}" " {%- for layer, value in timedict.items() %}\n" " wel_p{{time}}_s{{system_index}}_l{{layer}} = {{value}}" " {%- endfor -%}\n" " {%- endfor -%}" ) # TODO: implement well to concentration IDF and use ssm_template # Ignored for now, since wells are nearly always extracting
[docs] def __init__( self, id_name, x, y, rate, layer=None, time=None, concentration=None, save_budget=False, ): super().__init__() variables = { "id_name": id_name, "x": x, "y": y, "rate": rate, "layer": layer, "time": time, "concentration": concentration, } variables = {k: np.atleast_1d(v) for k, v in variables.items() if v is not None} length = max(map(len, variables.values())) index = np.arange(1, length + 1) self["index"] = index for k, v in variables.items(): if v.size == index.size: self[k] = ("index", v) elif v.size == 1: self[k] = ("index", np.full(length, v)) else: raise ValueError(f"Length of {k} does not match other arguments") self["save_budget"] = save_budget
def _max_active_n(self, varname, nlayer, nrow, ncol): """ Determine the maximum active number of cells that are active during a stress period. Parameters ---------- varname : str name of the variable to use to calculate the maximum number of active cells. Not used for well, here for polymorphism. nlayer, nrow, ncol : int """ nmax = np.unique(self["id_name"]).size if "layer" not in self.dataset.coords: # Then it applies to every layer nmax *= nlayer self._cellcount = nmax self._ssm_cellcount = nmax return nmax def _compose_values_layer(self, directory, nlayer, name, time=None, compress=True): values = {} d = {"directory": directory, "name": name, "extension": ".ipf"} if time is None: if "layer" in self.dataset: for layer in np.unique(self.dataset["layer"]): layer = int(layer) d["layer"] = layer values[layer] = self._compose_path(d) else: values["$"] = self._compose_path(d) else: d["time"] = time if "layer" in self.dataset: # Since the well data is in long table format, it's the only # input that has to be inspected. select = np.argwhere((self.dataset["time"] == time).values) for layer in np.unique(self.dataset["layer"].values[select]): d["layer"] = layer values[layer] = self._compose_path(d) else: values["?"] = self._compose_path(d) if "layer" in self.dataset: # Compose does not accept non-integers, so use 0, then replace d["layer"] = 0 if np.unique(self.dataset["layer"].values).size == nlayer: token_path = imod.util.path.compose(d).as_posix() token_path = token_path.replace("_l0", "_l$") values = {"$": token_path} elif compress: range_path = imod.util.path.compose(d).as_posix() range_path = range_path.replace("_l0", "_l:") # TODO: temporarily disable until imod-wq is fixed values = self._compress_idflayers(values, range_path) return values def _compose_values_time(self, directory, name, globaltimes, nlayer): # TODO: rename to _compose_values_timelayer? values = {"?": self._compose_values_layer(directory, nlayer=nlayer, name=name)} return values def _render(self, directory, globaltimes, system_index, nlayer): d = {"system_index": system_index} d["wels"] = self._compose_values_time(directory, "rate", globaltimes, nlayer) return self._template.render(d) def _render_ssm(self, directory, globaltimes, nlayer): if "concentration" in self.dataset.data_vars: d = {"pkg_id": self._pkg_id} name = "concentration" if "species" in self.dataset["concentration"].coords: concentration = {} for species in self.dataset["concentration"]["species"].values: concentration[species] = self._compose_values_time( directory, f"{name}_c{species}", globaltimes, nlayer=nlayer ) else: concentration = { 1: self._compose_values_time( directory, name, globaltimes, nlayer=nlayer ) } d["concentration"] = concentration return self._ssm_template.render(d) else: return "" def _save_layers(self, df, directory, name, variable): d = {"directory": directory, "name": name, "extension": ".ipf"} d["directory"].mkdir(exist_ok=True, parents=True) if "time" in df: itype = "timeseries" else: itype = None columns, assoc_columns = _column_order(df, variable) path = self._compose_path(d) df = df[columns] if "layer" in df: for layer, layerdf in df.groupby("layer"): # Ensure different IDs per layer are not overwritten. layerdf["id"] = f"{name}_l{layer}/" + layerdf["id"].astype(str) imod.ipf.save( path=path, df=layerdf, itype=itype, assoc_columns=assoc_columns ) else: imod.ipf.save(path=path, df=df, itype=itype, assoc_columns=assoc_columns) return def save(self, directory): directory = Path(directory) all_species = [None] if "concentration" in self.dataset.data_vars: if "species" in self.dataset["concentration"].coords: all_species = self.dataset["concentration"]["species"].values df = self.dataset.to_dataframe().rename(columns={"id_name": "id"}) self._save_layers(df, directory, "rate", "rate") # Loop over species if applicable if "concentration" in self.dataset: for species in all_species: if species is not None: ds = self.dataset.sel(species=species) else: ds = self.dataset df = ds.to_dataframe().rename(columns={"id_name": "id"}) name = "concentration" if species is not None: name = f"{name}_c{species}" self._save_layers(df, directory, name, "concentration") return def _pkgcheck(self, ibound=None): # TODO: implement pass def repeat_stress(self, stress_repeats, use_cftime=False): raise NotImplementedError( "Well does not support repeated stresses: time-varying data is " "saved into associated IPF files. Set explicit timeseries intead." )