import abc
import collections
import os
import pathlib
import warnings
import cftime
import jinja2
import numpy as np
import pandas as pd
import xarray as xr
import imod
from imod.flow.pkgbase import BoundaryCondition
from imod.flow.pkggroup import PackageGroups
from imod.flow.timeutil import insert_unique_package_times
from imod.util.nested_dict import append_nested_dict, initialize_nested_dict
from imod.util.time import _compose_timestring, timestep_duration, to_datetime_internal
class IniFile(collections.UserDict, abc.ABC):
"""
Some basic support for iMOD ini files here
These files contain the settings that iMOD uses to run its batch
functions. For example to convert its model description -- a projectfile
containing paths to respective .IDFs for each package -- to a Modflow6
model.
"""
# TODO: Create own key mapping to avoid keys like "edate"?
_template = jinja2.Template(
"{%- for key, value in settings %}\n" "{{key}}={{value}}\n" "{%- endfor %}\n"
)
def _format_datetimes(self):
for timekey in ["sdate", "edate"]:
if timekey in self.keys():
# If not string assume it is in some kind of datetime format
if not isinstance(self[timekey], str):
self[timekey] = _compose_timestring(self[timekey])
def render(self):
self._format_datetimes()
return self._template.render(settings=self.items())
def _relpath(path, to):
# Wraps os.path.relpath
try:
return pathlib.Path(os.path.relpath(path, to))
except ValueError:
# Fails to switch between drives e.g.
return pathlib.Path(os.path.abspath(path))
# This class allows only imod packages as values
class Model(collections.UserDict):
def __setitem__(self, key, value):
# TODO: raise ValueError on setting certain duplicates
# e.g. two solvers
if self.check == "eager":
value._pkgcheck()
super().__setitem__(key, value)
def update(self, *args, **kwargs):
for k, v in dict(*args, **kwargs).items():
self[k] = v
def _delete_empty_packages(self, verbose=False):
to_del = []
for pkg in self.keys():
dv = list(self[pkg].dataset.data_vars)[0]
if not self[pkg][dv].notnull().any().compute():
if verbose:
warnings.warn(
f"Deleting package {pkg}, found no data in parameter {dv}"
)
to_del.append(pkg)
for pkg in to_del:
del self[pkg]
[docs]
class ImodflowModel(Model):
"""
Class representing iMODFLOW model input. Running it requires iMOD5.
`Download iMOD5 here <https://oss.deltares.nl/web/imod/download-imod5>`_
Attributes
----------
modelname : str check : str, optional
When to perform model checks {None, "defer", "eager"}. Defaults to
"defer".
Examples
--------
>>> m = Imodflow("example")
>>> m["riv"] = River(...)
>>> # ...etc.
>>> m.create_time_discretization(endtime)
>>> m.write()
"""
# These templates end up here since they require global information
# from more than one package
_PACKAGE_GROUPS = PackageGroups
[docs]
def __init__(self, modelname, check="defer"):
super().__init__()
self.modelname = modelname
self.check = check
def _get_pkgkey(self, pkg_id):
"""
Get package key that belongs to a certain pkg_id, since the keys are
user specified.
"""
key = [pkgname for pkgname, pkg in self.items() if pkg._pkg_id == pkg_id]
nkey = len(key)
if nkey > 1:
raise ValueError(f"Multiple instances of {key} detected")
elif nkey == 1:
return key[0]
else:
return None
def _group(self):
"""
Group multiple systems of a single package E.g. all river or drainage
sub-systems
"""
groups = collections.defaultdict(dict)
groupable = set(self._PACKAGE_GROUPS.__members__.keys())
for key, package in self.items():
pkg_id = package._pkg_id
if pkg_id in groupable:
groups[pkg_id][key] = package
package_groups = []
for pkg_id, group in groups.items():
# Create PackageGroup for every package
# RiverGroup for rivers, DrainageGroup for drainage, etc.
package_groups.append(self._PACKAGE_GROUPS[pkg_id].value(**group))
return package_groups
def _use_cftime(self):
"""
Also checks if datetime types are homogeneous across packages.
"""
types = []
for pkg in self.values():
if pkg._hastime():
types.append(type(np.atleast_1d(pkg["time"].values)[0]))
# Types will be empty if there's no time dependent input
set_of_types = set(types)
if len(set_of_types) == 0:
return None
else: # there is time dependent input
if not len(set_of_types) == 1:
raise ValueError(
f"Multiple datetime types detected: {set_of_types}. "
"Use either cftime or numpy.datetime64[ns]."
)
# Since we compare types and not instances, we use issubclass
if issubclass(types[0], cftime.datetime):
return True
elif issubclass(types[0], np.datetime64):
return False
else:
raise ValueError("Use either cftime or numpy.datetime64[ns].")
def time_discretization(self, times):
warnings.warn(
f"{self.__class__.__name__}.time_discretization() is deprecated. "
f"In the future call {self.__class__.__name__}.create_time_discretization().",
DeprecationWarning,
)
self.create_time_discretization(additional_times=times)
[docs]
def create_time_discretization(self, additional_times):
"""
Collect all unique times from model packages and additional given `times`. These
unique times are used as stress periods in the model. All stress packages must
have the same starting time.
The time discretization in imod-python works as follows:
- The datetimes of all packages you send in are always respected
- Subsequently, the input data you use is always included fully as well
- All times are treated as starting times for the stress: a stress is
always applied until the next specified date
- For this reason, a final time is required to determine the length of
the last stress period
- Additional times can be provided to force shorter stress periods &
more detailed output
- Every stress has to be defined on the first stress period (this is a
modflow requirement)
Or visually (every letter a date in the time axes):
>>> recharge a - b - c - d - e - f
>>> river g - - - - h - - - - j
>>> times - - - - - - - - - - - i
>>> model a - b - c h d - e - f i
with the stress periods defined between these dates. I.e. the model times are the set of all times you include in the model.
Parameters
----------
times : str, datetime; or iterable of str, datetimes.
Times to add to the time discretization. At least one single time
should be given, which will be used as the ending time of the
simulation.
Examples
--------
Add a single time:
>>> m.create_time_discretization("2001-01-01")
Add a daterange:
>>> m.create_time_discretization(pd.daterange("2000-01-01", "2001-01-01"))
Add a list of times:
>>> m.create_time_discretization(["2000-01-01", "2001-01-01"])
"""
# Make sure it's an iterable
if not isinstance(
additional_times, (np.ndarray, list, tuple, pd.DatetimeIndex)
):
additional_times = [additional_times]
# Loop through all packages, check if cftime is required.
self.use_cftime = self._use_cftime()
# use_cftime is None if you no datetimes are present in packages
# use_cftime is False if np.datetimes present in packages
# use_cftime is True if cftime.datetime present in packages
for time in additional_times:
if issubclass(type(time), cftime.datetime):
if self.use_cftime is None:
self.use_cftime = True
if self.use_cftime is False:
raise ValueError(
"Use either cftime or numpy.datetime64[ns]. "
f"Received: {type(time)}."
)
if self.use_cftime is None:
self.use_cftime = False
times = [
to_datetime_internal(time, self.use_cftime) for time in additional_times
]
times, first_times = insert_unique_package_times(self.items(), times)
# Check if every transient package commences at the same time.
for key, first_time in first_times.items():
time0 = times[0]
if (first_time != time0) and not self[key]._is_periodic():
raise ValueError(
f"Package {key} does not have a value specified for the "
f"first time: {time0}. Every input must be present in the "
"first stress period. Values are only filled forward in "
"time."
)
duration = timestep_duration(times, self.use_cftime)
# Generate time discretization, just rely on default arguments
# Probably won't be used that much anyway?
times = np.array(times)
timestep_duration_da = xr.DataArray(
duration, coords={"time": times[:-1]}, dims=("time",)
)
self["time_discretization"] = imod.flow.TimeDiscretization(
timestep_duration=timestep_duration_da, endtime=times[-1]
)
def _calc_n_entry(self, composed_package, is_boundary_condition):
"""Calculate amount of entries for each timestep and variable."""
def first(d):
"""Get first value of dictionary values"""
return next(iter(d.values()))
if is_boundary_condition:
first_variable = first(first(composed_package))
n_entry = 0
for sys in first_variable.values():
n_entry += len(sys)
return n_entry
else: # No time and no systems in regular packages
first_variable = first(composed_package)
return len(first_variable)
def _compose_timestrings(self, globaltimes):
time_format = "%Y-%m-%d %H:%M:%S"
time_composed = self["time_discretization"]._compose_values_time(
"time", globaltimes
)
time_composed = {
timestep_nr: _compose_timestring(time, time_format=time_format)
for timestep_nr, time in time_composed.items()
}
return time_composed
def _compose_periods(self):
periods = {}
for key, package in self.items():
if package._is_periodic():
# Periodic stresses are defined for all variables
first_var = list(package.dataset.data_vars)[0]
periods.update(package.dataset[first_var].attrs["stress_periodic"])
# Create timestrings for "Periods" section in projectfile
# Basically swap around period attributes and compose timestring
# Note that the timeformat for periods in the Projectfile is different
# from that for stress periods
time_format = "%d-%m-%Y %H:%M:%S"
periods_composed = {
value: _compose_timestring(time, time_format=time_format)
for time, value in periods.items()
}
return periods_composed
def _compose_all_packages(self, directory, globaltimes):
"""
Compose all transient packages before rendering.
Required because of outer timeloop
Returns
-------
A tuple with lists of respectively the composed packages and boundary conditions
"""
bndkey = self._get_pkgkey("bnd")
nlayer = self[bndkey]["layer"].size
composition = initialize_nested_dict(5)
group_packages = self._group()
# Get get pkg_id from first value in dictionary in group list
group_pkg_ids = [next(iter(group.values()))._pkg_id for group in group_packages]
for group in group_packages:
group_composition = group.compose(
directory,
globaltimes,
nlayer,
)
append_nested_dict(composition, group_composition)
for key, package in self.items():
if package._pkg_id not in group_pkg_ids:
package_composition = package.compose(
directory.joinpath(key),
globaltimes,
nlayer,
)
append_nested_dict(composition, package_composition)
return composition
def _render_periods(self, periods_composed):
_template_periods = jinja2.Template(
"Periods\n"
"{%- for key, timestamp in periods.items() %}\n"
"{{key}}\n{{timestamp}}\n"
"{%- endfor %}\n"
)
return _template_periods.render(periods=periods_composed)
def _render_projectfile(self, directory):
"""
Render projectfile. The projectfile has the hierarchy:
package - time - system - layer
"""
diskey = self._get_pkgkey("dis")
globaltimes = self[diskey]["time"].values
content = []
composition = self._compose_all_packages(directory, globaltimes)
times_composed = self._compose_timestrings(globaltimes)
periods_composed = self._compose_periods()
# Add period strings to times_composed
# These are the strings atop each stress period in the projectfile
times_composed.update({key: key for key in periods_composed.keys()})
# Add steady-state for packages without time specified
times_composed["steady-state"] = "steady-state"
rendered = []
ignored = ["dis", "oc"]
for key, package in self.items():
pkg_id = package._pkg_id
if (pkg_id in rendered) or (pkg_id in ignored):
continue # Skip if already rendered (for groups) or not necessary to render
kwargs = {
"pkg_id": pkg_id,
"name": package.__class__.__name__,
"variable_order": package._variable_order,
"package_data": composition[pkg_id],
}
if isinstance(package, BoundaryCondition):
kwargs["n_entry"] = self._calc_n_entry(composition[pkg_id], True)
kwargs["times"] = times_composed
else:
kwargs["n_entry"] = self._calc_n_entry(composition[pkg_id], False)
content.append(package._render_projectfile(**kwargs))
rendered.append(pkg_id)
# Add periods definition
content.append(self._render_periods(periods_composed))
return "\n\n".join(content)
def _render_runfile(self, directory):
"""
Render runfile. The runfile has the hierarchy:
time - package - system - layer
"""
raise NotImplementedError("Currently only projectfiles can be rendered.")
def render(self, directory, render_projectfile=True):
"""
Render the runfile as a string, package by package.
"""
if render_projectfile:
return self._render_projectfile(directory)
else:
return self._render_runfile(directory)
def _model_path_management(
self, directory, result_dir, resultdir_is_workdir, render_projectfile
):
# Coerce to pathlib.Path
directory = pathlib.Path(directory)
if result_dir is None:
result_dir = pathlib.Path("results")
else:
result_dir = pathlib.Path(result_dir)
# Create directories if necessary
directory.mkdir(exist_ok=True, parents=True)
result_dir.mkdir(exist_ok=True, parents=True)
if render_projectfile:
ext = ".prj"
else:
ext = ".run"
runfilepath = directory / f"{self.modelname}{ext}"
results_runfilepath = result_dir / f"{self.modelname}{ext}"
# Where will the model run?
# Default is inputdir, next to runfile:
# in that case, resultdir is relative to inputdir
# If resultdir_is_workdir, inputdir is relative to resultdir
# render_dir is the inputdir that is printed in the runfile.
# result_dir is the resultdir that is printed in the runfile.
# caching_reldir is from where to check for files. This location
# is the same as the eventual model working dir.
if resultdir_is_workdir:
caching_reldir = result_dir
if not directory.is_absolute():
render_dir = _relpath(directory, result_dir)
else:
render_dir = directory
result_dir = pathlib.Path(".")
else:
caching_reldir = directory
render_dir = pathlib.Path(".")
if not result_dir.is_absolute():
result_dir = _relpath(result_dir, directory)
return result_dir, render_dir, runfilepath, results_runfilepath, caching_reldir
def write(
self,
directory=pathlib.Path("."),
result_dir=None,
resultdir_is_workdir=False,
convert_to="mf2005_namfile",
):
"""
Writes model input files.
Parameters
----------
directory : str, pathlib.Path
Directory into which the model input will be written. The model
input will be written into a directory called modelname.
result_dir : str, pathlib.Path
Path to directory in which output will be written when running the
model. Is written as the value of the ``result_dir`` key in the
runfile. See the examples.
resultdir_is_workdir: boolean, optional
Wether the set all input paths in the runfile relative to the output
directory. Because iMOD-wq generates a number of files in its
working directory, it may be advantageous to set the working
directory to a different path than the runfile location.
convert_to: str
The type of object to convert the projectfile to in the
configuration ini file. Should be one of ``["mf2005_namfile",
"mf6_namfile", "runfile"]``.
Returns
-------
None
Examples
--------
Say we wish to write the model input to a file called input, and we
desire that when running the model, the results end up in a directory
called output. We may run:
>>> model.write(directory="input", result_dir="output")
And in the ``config_run.ini``, a value of ``../../output`` will be
written for ``result_dir``. This ``config_run.ini`` has to be called
with iMOD 5 to convert the model projectfile to a Modflow 2005 namfile.
To specify a conversion to a runfile, run:
>>> model.write(directory="input", convert_to="runfile")
You can then run the following command to convert the projectfile to a runfile:
>>> path/to/iMOD5.exe ./input/config_run.ini
`Download iMOD5 here <https://oss.deltares.nl/web/imod/download-imod5>`_
"""
directory = pathlib.Path(directory)
allowed_conversion_settings = ["mf2005_namfile", "mf6_namfile", "runfile"]
if convert_to not in allowed_conversion_settings:
raise ValueError(
f"Got convert_setting: '{convert_to}', should be one of: {allowed_conversion_settings}"
)
# Currently only supported, no runfile can be directly written by iMOD Python
# TODO: Add runfile support
render_projectfile = True
# TODO: Find a cleaner way to pack and unpack these paths
(
result_dir,
render_dir,
runfilepath,
results_runfilepath,
caching_reldir,
) = self._model_path_management(
directory, result_dir, resultdir_is_workdir, render_projectfile
)
directory = directory.resolve() # Force absolute paths
# TODO
# Check if any caching packages are present, and set necessary states.
# self._set_caching_packages(caching_reldir)
if self.check is not None:
self.package_check()
# TODO Necessary?
# Delete packages without data
# self._delete_empty_packages(verbose=True)
runfile_content = self.render(
directory=directory, render_projectfile=render_projectfile
)
# Start writing
# Write the runfile
with open(runfilepath, "w") as f:
f.write(runfile_content)
# Also write the runfile in the workdir
if resultdir_is_workdir:
with open(results_runfilepath, "w") as f:
f.write(runfile_content)
# Write iMOD TIM file
diskey = self._get_pkgkey("dis")
time_path = directory / f"{diskey}.tim"
self[diskey].save(time_path)
# Create and write INI file to configure conversion/simulation
ockey = self._get_pkgkey("oc")
bndkey = self._get_pkgkey("bnd")
nlayer = self[bndkey]["layer"].size
if ockey is None:
raise ValueError("No OutputControl was specified for the model")
else:
oc_configuration = self[ockey]._compose_oc_configuration(nlayer)
outfilepath = directory / runfilepath
RUNFILE_OPTIONS = {
"mf2005_namfile": {
"sim_type": 2,
"namfile_out": outfilepath.with_suffix(".nam"),
},
"runfile": {"sim_type": 1, "runfile_out": outfilepath.with_suffix(".run")},
"mf6_namfile": {
"sim_type": 3,
"namfile_out": outfilepath.with_suffix(".nam"),
},
}
conversion_settings = RUNFILE_OPTIONS[convert_to]
config = IniFile(
function="runfile",
prjfile_in=directory / runfilepath.name,
iss=1,
timfname=directory / time_path.name,
output_folder=result_dir,
**conversion_settings,
**oc_configuration,
)
config_content = config.render()
with open(directory / "config_run.ini", "w") as f:
f.write(config_content)
# Write all IDFs and IPFs
for pkgname, pkg in self.items():
if (
"x" in pkg.dataset.coords and "y" in pkg.dataset.coords
) or pkg._pkg_id in ["wel", "hfb"]:
try:
pkg.save(directory=directory / pkgname)
except Exception as error:
raise type(error)(
f"{error}/nAn error occured during saving of package: {pkgname}."
)
def _check_top_bottom(self):
"""Check whether bottom of a layer does not exceed a top somewhere."""
basic_ids = ["top", "bot"]
topkey, botkey = [self._get_pkgkey(pkg_id) for pkg_id in basic_ids]
top, bot = [self[key] for key in (topkey, botkey)]
if (top["top"] < bot["bottom"]).any():
raise ValueError(
f"top should be larger than bottom in {topkey} and {botkey}"
)
def package_check(self):
bndkey = self._get_pkgkey("bnd")
active_cells = self[bndkey]["ibound"] != 0
self._check_top_bottom()
for pkg in self.values():
pkg._pkgcheck(active_cells=active_cells)