import numpy as np
import pandas as pd
import xarray as xr
import imod
[docs]
def stability_constraint_wel(wel, top_bot, porosity=0.3, R=1.0):
r"""
Computes sink/source stability constraint as applied in MT3D for adaptive
timestepping (Zheng & Wang, 1999 p54).
.. math:: \Delta t \leq \frac{R\theta }{\left | q_{s} \right |}
For the WEL package, a flux is known
beforehand, so we can evaluate beforehand if a flux assigned to a cell
will necessitate a small timestap, thereby slowing down calculations.
Returns a ipf DataFrame that includes a column for the specific discharge and
resulting minimum timestep.
Parameters
----------
wel: pd.DataFrame
pd.DataFrame that defines a WEL package. Minimally includes
x, y, layer and Q column.
top_bot: xr.Dataset of floats, containing 'top', 'bot' and optionally
'dz' of layers.
Dimensions must be exactly ``("layer", "y", "x")``.
porosity: float or xr.DataArray of floats, optional (default 0.3)
If xr.DataArray, dimensions must be exactly ``("layer", "y", "x")``.
R: Retardation factor, optional (default)
Only when sorption is a factor.
Returns
-------
wel: pd.DataFrame containing addition qs (specific discharge) and
dt (minimum timestep) columns
"""
b = imod.select.points_in_bounds(
top_bot, x=wel["x"], y=wel["y"], layer=wel["layer"]
)
indices = imod.select.points_indices(
top_bot, x=wel.loc[b, "x"], y=wel.loc[b, "y"], layer=wel.loc[b, "layer"]
)
if "dz" not in top_bot: # create if not present
top_bot["dz"] = top_bot["top"] - top_bot["bot"]
top_bot["volume"] = np.abs(top_bot["dz"] * top_bot.dx * top_bot.dy)
wel.loc[b, "qs"] = wel.loc[b, "Q"].abs() / top_bot["volume"].isel(indices).values
wel.loc[b, "dt"] = R * porosity / wel.loc[b, "qs"]
return wel
[docs]
def stability_constraint_advection(front, lower, right, top_bot, porosity=0.3, R=1.0):
r"""
Computes advection stability constraint as applied in MT3D for adaptive
timestepping (Zheng & Wang, 1999 p54):
.. math:: \Delta t \leq \frac{R}{\frac{\left | v_{x} \right |}{\Delta x}+\frac{\left | v_{y} \right |}{\Delta y}+\frac{\left | v_{z} \right |}{\Delta z}}
This function can be used to select
which cells necessitate a small timestap, thereby slowing down calculations.
Front, lower, and right arguments refer to iMOD face flow budgets, in cubic
meters per day. In terms of flow direction these are defined as:
* ``front``: positive with ``y`` (negative with row index)
* ``lower``: positive with ``layer`` (positive with layer index)
* ``right``: negative with ``x`` (negative with column index)
Returns the minimum timestep that is required to satisfy this constraint.
The resulting dt xr.DataArray is the minimum timestep over all three directions,
dt_xyz is an xr.Dataset containing minimum timesteps for the three directions
separately.
Parameters
----------
front: xr.DataArray of floats, optional
Dimensions must be exactly ``("layer", "y", "x")``.
lower: xr.DataArray of floats, optional
Dimensions must be exactly ``("layer", "y", "x")``.
right: xr.DataArray of floats, optional
Dimensions must be exactly ``("layer", "y", "x")``.
top_bot: xr.Dataset of floats, containing 'top', 'bot' and optionally
'dz' of layers.
Dimensions must be exactly ``("layer", "y", "x")``.
porosity: float or xr.DataArray of floats, optional (default 0.3)
If xr.DataArray, dimensions must be exactly ``("layer", "y", "x")``.
R: Retardation factor, optional (default)
Only when sorption is a factor.
Returns
-------
dt: xr.DataArray of floats
dt_xyz: xr.Dataset of floats
"""
# top_bot reselect to bdg bounds
top_bot = top_bot.sel(x=right.x, y=right.y, layer=right.layer)
# Compute flow velocities
v_x, v_y, v_z = imod.evaluate.flow_velocity(front, lower, right, top_bot, porosity)
if "dz" not in top_bot:
top_bot["dz"] = top_bot["top"] - top_bot["bot"]
# assert all dz positive - Issue #140
if not np.all(top_bot["dz"].values[~np.isnan(top_bot["dz"].values)] >= 0):
raise ValueError("All dz values should be positive")
# absolute velocities (m/d)
abs_v_x = np.abs(v_x)
abs_v_y = np.abs(v_y)
abs_v_z = np.abs(v_z)
# dt of constituents (d), set zero velocities to nans
dt_x = R / (abs_v_x.where(abs_v_x > 0) / top_bot.dx)
dt_y = R / (abs_v_y.where(abs_v_y > 0) / np.abs(top_bot.dy))
dt_z = R / (abs_v_z.where(abs_v_z > 0) / top_bot.dz)
# overall dt due to advection criterion (d)
dt = 1.0 / (1.0 / dt_x + 1.0 / dt_y + 1.0 / dt_z)
dt_xyz = xr.concat(
(dt_x, dt_y, dt_z), dim=pd.Index(["x", "y", "z"], name="direction")
)
return dt, dt_xyz
def _calculate_intra_cell_dt(
source_stage, source_cond, sink_stage, sink_cond, eff_volume
):
"""Calculate intra-cell dt by assuming a flux from a higher source_stage to a lower sink_stage,
ignoring other head influences. Use limiting (lowest) conductance. eff_volume is the effective
volume per cell (cell volume * effective porosity)"""
source_cond, sink_cond = xr.align(source_cond, sink_cond, join="inner", copy=False)
cond = np.minimum(source_cond, sink_cond)
Q = cond * (source_stage - sink_stage)
Q = Q.where(source_stage > sink_stage)
return eff_volume / Q
[docs]
def intra_cell_boundary_conditions(
top_bot, porosity=0.3, riv=None, ghb=None, drn=None, drop_allnan=True
):
"""Function to pre-check boundary-conditions against one another for large intra-cell fluxes.
ghb and river can function as source and sink, drn only as sink.
Parameters
----------
top_bot : xr.Dataset of floats
'top_bot' should at least contain `top` and `bot` data_vars
porosity : float or xr.DataArray of floats, optional
Effective porosity of model cells
riv : (dict or list of) imod.RiverPackage, optional
ghb : (dict or list of) imod.GeneralHeadBoundaryPackage, optional
drn : (dict or list of) imod.DrainagePackage, optional
drop_allnan : boolean, optional
Whether source-sink combinations without overlap should be dropped from result (default True)
Returns
-------
dt_min: xr.DataArray of floats
`dt_min` is the minimum calculated timestep over all combinations of boundary conditions
dt_all: xr.DataArray of floats
`dt_all` is the calculated timestep for all combinations of boundary conditions
"""
if riv is None and ghb is None:
raise ValueError(
"At least one source boundary condition must be supplied through riv or ghb."
)
# convert all inputs to dictionaries of packages
if riv is None:
riv = {}
elif isinstance(riv, dict):
pass
elif isinstance(riv, (list, tuple)):
riv = {f"riv_{i}": single_riv for i, single_riv in enumerate(riv)}
else:
riv = {"riv": riv}
if ghb is None:
ghb = {}
elif isinstance(ghb, dict):
pass
elif isinstance(ghb, (list, tuple)):
ghb = {f"ghb_{i}": single_riv for i, single_riv in enumerate(ghb)}
else:
ghb = {"ghb": ghb}
if drn is None:
drn = {}
elif isinstance(drn, dict):
pass
elif isinstance(drn, (list, tuple)):
drn = {f"drn_{i}": single_riv for i, single_riv in enumerate(drn)}
else:
drn = {"drn": drn}
# get sources and sinks:
sources = {}
sources.update(ghb)
sources.update(riv)
sinks = {}
sinks.update(ghb)
sinks.update(riv)
sinks.update(drn)
# determine effective volume
if "dz" not in top_bot:
top_bot["dz"] = top_bot["top"] - top_bot["bot"]
if (top_bot["dz"] <= 0.0).any():
raise ValueError("Cells with thickness <= 0 present in top_bot Dataset.")
eff_volume = top_bot["dz"] * top_bot.dx * np.abs(top_bot.dy) * porosity
def _get_stage_name(sid):
if sid in riv:
return "stage"
elif sid in ghb:
return "head"
elif sid in drn:
return "elevation"
# for all possible combinations: determine dt
resultids = []
results = []
for sourceid, source in sources.items():
for sinkid, sink in sinks.items():
if sourceid == sinkid:
continue
comb = f"{sourceid}-{sinkid}"
if comb not in resultids:
# source in riv: only where stage > bottom elev
if sourceid in riv:
source = source.where(source["stage"] > source["bottom_elevation"])
dt = _calculate_intra_cell_dt(
source_stage=source[_get_stage_name(sourceid)],
source_cond=source["conductance"].load(),
sink_stage=sink[_get_stage_name(sinkid)],
sink_cond=sink["conductance"].load(),
eff_volume=eff_volume,
)
if not drop_allnan or not dt.isnull().all():
results.append(dt)
resultids.append(comb)
dt_all = xr.concat(
results, pd.Index(resultids, name="combination"), coords="minimal"
)
# overall dt
dt_min = dt_all.min(dim="combination")
return dt_min, dt_all