"""
Utilities for parsing a project file.
"""
import shlex
import textwrap
from collections import defaultdict
from dataclasses import asdict, dataclass
from dataclasses import field as data_field
from datetime import datetime
from itertools import chain
from os import PathLike
from pathlib import Path
from typing import Any, Dict, List, Optional, Sequence, Tuple, Union
import numpy as np
import pandas as pd
import xarray as xr
import imod
import imod.logging
from imod.logging.loglevel import LogLevel
FilePath = Union[str, "PathLike[str]"]
KEYS = {
"(bnd)": ("ibound",),
"(top)": ("top",),
"(bot)": ("bottom",),
"(thk)": ("thickness",),
"(khv)": ("kh",),
"(kva)": ("vertical_anisotropy",),
"(kdw)": ("transmissivity",),
"(kvv)": ("kv",),
"(vcw)": ("resistance",),
"(shd)": ("head",),
"(sto)": ("storage_coefficient",),
"(spy)": ("specific_yield",),
"(por)": ("porosity",),
"(ani)": ("factor", "angle"),
"(hfb)": ("gen",),
"(ibs)": (None),
"(pwt)": (None),
"(sft)": (None),
"(obs)": (None),
"(cbi)": (None),
"(sco)": (None),
"(dsp)": (None),
"(ics)": (None),
"(fcs)": (None),
"(ssc)": (None),
"(fod)": (None),
"(fos)": (None),
"(rct)": (None),
"(con)": (None),
"(pst)": (None),
}
DATE_KEYS = {
"(uzf)": (None,),
"(rch)": ("rate",),
"(evt)": ("rate", "surface", "depth"),
"(drn)": ("conductance", "elevation"),
"(olf)": ("elevation",),
"(riv)": ("conductance", "stage", "bottom_elevation", "infiltration_factor"),
"(isg)": ("isg",),
"(sfr)": ("isg",),
"(lak)": (None,),
"(wel)": ("ipf",),
"(mnw)": (None,),
"(ghb)": ("conductance", "head"),
"(chd)": ("head",),
"(fhb)": (None,),
"(fde)": (None,),
"(tvc)": (None,),
}
METASWAP_VARS = (
"boundary",
"landuse",
"rootzone_thickness",
"soil_physical_unit",
"meteo_station_number",
"surface_elevation",
"artificial_recharge",
"artificial_recharge_layer",
"artificial_recharge_capacity",
"wetted_area",
"urban_area",
"urban_ponding_depth",
"rural_ponding_depth",
"urban_runoff_resistance",
"rural_runoff_resistance",
"urban_runon_resistance",
"rural_runon_resistance",
"urban_infiltration_capacity",
"rural_infiltration_capacity",
"perched_water_table_level",
"soil_moisture_fraction",
"conductivitiy_factor",
"plot_number",
"steering_location",
"plot_drainage_level",
"plot_drainage_resistance",
)
class _LineIterator:
"""
Like iter(lines), but we can go back and we check if we're finished.
"""
def __init__(self, lines: List[List[str]]):
self.lines = lines
self.count = -1
self.length = len(lines)
def __iter__(self):
return self
def __next__(self) -> List[str]:
if self.finished:
raise StopIteration
self.count += 1
return self.lines[self.count]
def back(self) -> None:
self.count = max(self.count - 1, -1)
@property
def finished(self) -> bool:
return (self.count + 1) >= self.length
def _tokenize(line: str) -> List[str]:
"""
A value separator in Fortran list-directed input is:
* A comma if period decimal edit mode is POINT.
* One or more contiguous spaces (blanks); no tabs.
Other remarks:
* Values, except for character strings, cannot contain blanks.
* Strings may be unquoted if they do not start with a digit and no value
separators.
* Character strings can be quoted strings, using pairs of quotes ("), pairs
of apostrophes (').
* A quote or apostrophe must be preceded by a value separator to initite a
quoted string.
* An empty entry consists of two consecutive commas (or semicolons).
For the use here (parsing IMOD's project files), we ignore:
* A semicolon value separator if period decimal edit mode is COMMA.
* Complex constants given as two real constants separated by a comma and
enclosed in parentheses.
* Repetition counts: 4*(3.,2.) 2*, 4*'hello'
Furthermore, we do not expect commas inside of the project file entries,
since we expect:
* Package names: unquoted character strings.
* File paths: will not contain commas, no single apostrophe, nor a single
quote symbol, may contain whitespace if quoted. * Integers for counts and
settings.
* Floats for addition and multiplication values.
* Simple character strings for period names (summer, winter). These
technically could contain commas if quoted, which is very unlikely.
* No quotes or apostrophes are escaped.
With these assumptions, we can limit complexity considerably (see the
PyLiDiRe link for a complete implementation):
* First we split by comma (we don't expected commas in quote strings).
* Next we split by whitespace, unless quoted.
We can expect both single and double quotes, even within a single line:
shlex.split() handles this. Note that additional entries are likely
allowed, as the Fortran implementation only reads what is necessary,
then stops parsing.
See also:
* https://stackoverflow.com/questions/36165050/python-equivalent-of-fortran-list-directed-input
* https://gitlab.com/everythingfunctional/PyLiDiRe
* https://docs.oracle.com/cd/E19957-01/805-4939/6j4m0vnc5/index.html
* The Fortran 2003 Handbook
Examples
--------
Raise ValueError, due to missing closing quotation. (Can be enabled
shlex.split(s, posix=False)):
>> _tokenize("That's life")
>> _tokenize("That 's life'")
>> ["That", "s life"]
>> _tokenize("That,'s life'")
>> ["That", "s life"]
"""
values = [v.strip().replace("\\", "/") for v in line.split(",")]
tokens = list(chain.from_iterable(shlex.split(v) for v in values))
return tokens
def _wrap_error_message(
exception: Exception, description: str, lines: _LineIterator
) -> None:
lines.back()
content = next(lines)
number = lines.count + 1
raise type(exception)(
f"{exception}\n"
f"Failed to parse {description} for line {number} with content:\n{content}"
)
def _parse_blockheader(lines: _LineIterator) -> Tuple[int, str, str]:
try:
no_result = None, None, None
line = next(lines)
# Skip if it's an empty line.
if len(line) == 0:
return no_result
first = line[0].lower()
if first in ("periods", "species"):
return 1, first, None
# The line must contain atleast nper, key, active.
elif len(line) >= 3:
n = int(first)
key = line[1].lower()
active = line[2]
return n, key, active
# It's a comment or something.
else:
return no_result
except Exception as e:
_wrap_error_message(e, "block header", lines)
def _parse_time(lines: _LineIterator) -> str:
try:
line = next(lines)
date = line[0].lower()
if len(line) > 1:
time = line[1]
return f"{date} {time}"
else:
return date
except Exception as e:
_wrap_error_message(e, "date time", lines)
def _parse_blockline(lines: _LineIterator, time: str = None) -> Dict[str, Any]:
try:
line = next(lines)
content = {
"active": bool(int(line[0])),
"is_constant": int(line[1]),
"layer": int(line[2]),
"factor": float(line[3]),
"addition": float(line[4]),
"constant": float(line[5]),
}
if content["is_constant"] == 2:
content["path"] = Path(line[6]).resolve()
if time is not None:
content["time"] = time
return content
except Exception as e:
_wrap_error_message(e, "entries", lines)
def _parse_nsub_nsystem(lines: _LineIterator) -> Tuple[int, int]:
try:
line = next(lines)
n_entry = int(line[0])
n_system = int(line[1])
return n_entry, n_system
except Exception as e:
_wrap_error_message(e, "number of sub-entries and number of systems", lines)
def _parse_notimeblock(
lines: _LineIterator,
fields: List[str],
) -> Dict[str, Any]:
n_entry, n_system = _parse_nsub_nsystem(lines)
if len(fields) != n_entry:
raise ValueError(
f"Expected NSUB entry of {len(fields)} for {fields}, read: {n_entry}"
)
content = {
field: [_parse_blockline(lines) for _ in range(n_system)] for field in fields
}
content["n_system"] = n_system
return content
def _parse_capblock(
lines: _LineIterator,
) -> Dict[str, Any]:
fields = METASWAP_VARS
n_entry, n_system = _parse_nsub_nsystem(lines)
if n_entry == 21:
# Remove layer entry.
fields = list(fields[:22]).pop(8)
elif n_entry == 22:
fields = fields[:22]
elif n_entry == 26:
pass
else:
raise ValueError(
f"Expected NSUB entry of 21, 22, or 26 for {fields}, read: {n_entry}"
)
content = {
field: [_parse_blockline(lines) for _ in range(n_system)] for field in fields
}
content["n_system"] = n_system
return content
def _parse_extrablock(lines: _LineIterator, n: int) -> Dict[str, List[str]]:
"""Parse the MetaSWAP "extra files" block"""
return {"paths": [next(lines) for _ in range(n)]}
def _parse_timeblock(
lines: List[str],
fields: List[str],
n: int,
) -> Dict[str, Any]:
n_fields = len(fields)
content = defaultdict(list)
for _ in range(n):
time = _parse_time(lines)
content["time"].append(time)
n_entry, n_system = _parse_nsub_nsystem(lines)
if n_fields != n_entry:
raise ValueError(
f"Expected NSUB entry of {n_fields} for {fields}, read: {n_entry}"
)
for field in fields:
content[field].extend(
[_parse_blockline(lines, time) for _ in range(n_system)]
)
content["n_system"] = n_system
return content
def _parse_pcgblock(lines: _LineIterator) -> Dict[str, Any]:
try:
line = next(lines)
# TODO: which are optional? How many to expect?
# Check for an empty line to terminate the block?
types = {
"mxiter": int,
"iter1": int,
"hclose": float,
"rclose": float,
"relax": float,
"npcond": int,
"iprpcg": int,
"mutpcg": int,
"damppcg": float,
"damppcgt": float,
"iqerror": int,
"qerror": float,
}
if len(line) == 12:
line_iterator = iter(line)
content = {
k: valuetype(next(line_iterator)) for k, valuetype in types.items()
}
elif any("=" in s for s in line):
pcglines = [line] + [next(lines) for _ in range(11)]
content = {}
for line in pcglines:
# undo separation, partition on equality sign instead.
line = "".join(line)
key, _, value = line.lower().partition("=")
value = types[key](value)
content[key] = value
else:
raise ValueError(
f"Expected 12 KEY = VALUE pairs, or 12 values. Found {len(line)}"
)
return content
except Exception as e:
_wrap_error_message(e, "PCG entry", lines)
def _parse_periodsblock(lines: _LineIterator) -> Dict[str, str]:
try:
periods = {}
while not lines.finished:
line = next(lines)
# Stop if we encounter an empty line.
if len(line) == 0:
break
# Read the alias
alias = line[0]
# Now read the time associated with it.
start = _parse_time(lines)
periods[alias] = start
return periods
except Exception as e:
_wrap_error_message(e, "periods data block", lines)
def _parse_speciesblock(lines: _LineIterator):
try:
species = {}
while not lines.finished:
line = next(lines)
# Stop if we encounter an empty line.
if len(line) == 0:
break
name, nr = line
nr = int(nr)
species[nr] = name
return species
except Exception as e:
_wrap_error_message(e, "species entry", lines)
def _parse_block(lines: _LineIterator, content: Dict[str, Any]) -> None:
"""
Mutates content dict.
"""
n = key = active = None
# A project file may contain any number of lines outside of a "topic"
# block. _parse_blockheader will return triple None in that case.
while key is None and not lines.finished:
n, key, active = _parse_blockheader(lines)
try:
if key in KEYS:
if n != 1:
raise ValueError(f"Expected N=1 for {key}, read: {n}")
fields = KEYS[key]
blockcontent = _parse_notimeblock(lines, fields)
elif key in DATE_KEYS:
fields = DATE_KEYS[key]
blockcontent = _parse_timeblock(lines, fields, n)
elif key == "(cap)":
blockcontent = _parse_capblock(lines)
elif key == "(pcg)":
blockcontent = _parse_pcgblock(lines)
elif key == "periods":
blockcontent = _parse_periodsblock(lines)
elif key == "species":
blockcontent = _parse_speciesblock(lines)
elif key == "extra":
blockcontent = _parse_extrablock(lines, n)
else:
other = ("(pcg)", "(gcg)", "(vdf)")
options = tuple(KEYS.keys()) + tuple(DATE_KEYS.keys()) + other
lines.back()
line = next(lines)
number = lines.count + 1
raise ValueError(
f"Failed to recognize header keyword: {key}. Expected one of keywords {options}"
f"\nErrored in line {number} with entries:\n{line}"
)
except Exception as e:
raise type(e)(f"{e}\nError occurred for keyword: {key}")
if blockcontent is not None and active is not None:
blockcontent["active"] = active
content[key] = blockcontent
return
def _process_package_entry(entry: Dict):
"""
The iMOD project file supports constants in lieu of IDFs.
"""
coords = {"layer": entry["layer"]}
dims = ("layer",)
if "path" not in entry:
path = None
header = {"coords": coords}
value = entry["constant"]
else:
path = entry["path"]
header = imod.idf.header(path, pattern="{name}")
value = None
header["dims"] = dims
return path, header, value
def _merge_coords(headers: List[Dict[str, Any]]) -> Dict[str, np.ndarray]:
coords = defaultdict(list)
for header in headers:
for key, value in header["coords"].items():
coords[key].append(value)
return {k: np.unique(coords[k]) for k in coords}
def _try_read_with_func(func, path, *args, **kwargs):
try:
return func(path, *args, **kwargs)
except Exception as e:
raise type(e)(f"{e}. Error thrown while opening file: {path}")
def _get_array_transformation_parameters(
headers: List[Dict[str, Any]], key: str, dim: str
) -> Union[xr.DataArray | float]:
"""
In imod5 prj files one can add linear transformation parameters to transform
the data read from an idf file: we can specify a multiplication factor and a
constant that will be added to the values. The factor and addition
parameters can be can be scalar (if applied to 1 idf), or they can be
xr.DataArrays if the factor and addition are for example layer- or
time-dependent (if both we apply the transformations one at a time)
Parameters
----------
headers: List[Dict[str, Any]]
prj-file lines which we want to import, serialized as a dictionary.
key: str
specifies the name of the transformation parameter in the idf file.
Usually "factor" or "addition"
dim: str
the name of the dimension over which transformation parameters are
expected to differ for the current import. Usually "time"or "layer"
"""
if dim in headers[0].keys():
return xr.DataArray(
data=[header[key] for header in headers],
dims=(dim,),
coords={dim: [header[dim] for header in headers]},
)
else:
return headers[0][key]
def _create_dataarray_from_paths(
paths: List[str], headers: List[Dict[str, Any]], dim: str
) -> xr.DataArray:
factor = _get_array_transformation_parameters(headers, "factor", dim)
addition = _get_array_transformation_parameters(headers, "addition", dim)
da = _try_read_with_func(
imod.formats.array_io.reading._load,
paths,
use_cftime=False,
_read=imod.idf._read,
headers=headers,
)
# Ensure factor and addition do not have more dimensions than da
if isinstance(factor, xr.DataArray):
missing_dims = set(factor.dims) - set(da.dims)
if missing_dims:
factor = factor.isel({d: 0 for d in missing_dims}, drop=True)
addition = addition.isel({d: 0 for d in missing_dims}, drop=True)
return da * factor + addition
def _create_dataarray_from_values(
values: List[float], headers: List[Dict[str, Any]], dim: str
):
factor = _get_array_transformation_parameters(headers, "factor", dim)
addition = _get_array_transformation_parameters(headers, "addition", dim)
coords = _merge_coords(headers)
firstdims = headers[0]["dims"]
shape = [len(coord) for coord in coords.values()]
da = xr.DataArray(np.reshape(values, shape), dims=firstdims, coords=coords)
return da * factor + addition
def _create_dataarray(
paths: List[str], headers: List[Dict[str, Any]], values: List[float], dim: str
) -> xr.DataArray:
"""
Create a DataArray from a list of IDF paths, or from constant values.
There are mixed cases possible, where some of the layers or stress periods
contain only a single constant value, and the others are specified as IDFs.
In that case, we cannot do a straightforward concatenation.
"""
values_valid = []
paths_valid = []
headers_paths = []
headers_values = []
for path, header, value in zip(paths, headers, values):
if path is None:
headers_values.append(header)
values_valid.append(value)
else:
headers_paths.append(header)
paths_valid.append(path)
if paths_valid and values_valid:
# Both lists contain entries: mixed case.
dap = _create_dataarray_from_paths(paths_valid, headers_paths, dim=dim)
dav = _create_dataarray_from_values(values_valid, headers_values, dim=dim)
dap.name = "tmp"
dav.name = "tmp"
da = xr.merge((dap, dav), join="outer")["tmp"]
elif paths_valid:
# Only paths provided
da = _create_dataarray_from_paths(paths_valid, headers_paths, dim=dim)
elif values_valid:
# Only scalar values provided
da = _create_dataarray_from_values(values_valid, headers_values, dim=dim)
return da
def _open_package_idf(
block_content: Dict[str, Any], variables: Sequence[str]
) -> list[dict[str, xr.DataArray]]:
das = {}
for variable in variables:
variable_content = block_content[variable]
paths = []
headers = []
values = []
for entry in variable_content:
path, header, value = _process_package_entry(entry)
header["name"] = variable
header["dims"] = ["layer"]
header["layer"] = entry["layer"]
header["addition"] = entry["addition"]
header["factor"] = entry["factor"]
paths.append(path)
headers.append(header)
values.append(value)
das[variable] = _create_dataarray(paths, headers, values, dim="layer")
return [das]
def _process_time(time: str, yearfirst: bool = True):
if time == "steady-state":
time = None
else:
if yearfirst:
if len(time) == 19:
time = datetime.strptime(time, "%Y-%m-%d %H:%M:%S")
elif len(time) == 10:
time = datetime.strptime(time, "%Y-%m-%d")
else:
raise ValueError(
f"time data {time} does not match format "
'"%Y-%m-%d %H:%M:%S" or "%Y-%m-%d"'
)
else:
if len(time) == 19:
time = datetime.strptime(time, "%d-%m-%Y %H:%M:%S")
elif len(time) == 10:
time = datetime.strptime(time, "%d-%m-%Y")
else:
raise ValueError(
f"time data {time} does not match format "
'"%d-%m-%Y %H:%M:%S" or "%d-%m-%Y"'
)
return time
def _process_boundary_condition_entry(entry: Dict, periods: Dict[str, datetime]):
"""
The iMOD project file supports constants in lieu of IDFs.
Also process repeated stress periods (on a yearly basis): substitute the
original date here.
"""
coords = {}
timestring = entry["time"]
# Resolve repeating periods first:
time = periods.get(timestring)
if time is not None:
repeat = time
else:
# this resolves e.g. "steady-state"
time = _process_time(timestring)
repeat = None
if time is None:
dims = ()
else:
dims = ("time",)
coords["time"] = time
# 0 signifies that the layer must be determined on the basis of
# bottom elevation and stage.
layer = entry["layer"]
if layer <= 0:
layer is None
else:
coords["layer"] = layer
dims = dims + ("layer",)
if "path" not in entry:
path = None
header = {"coords": coords}
value = entry["constant"]
else:
path = entry["path"]
header = imod.idf.header(path, pattern="{name}")
value = None
header["addition"] = entry["addition"]
header["factor"] = entry["factor"]
header["dims"] = dims
if layer is not None:
header["layer"] = layer
if time is not None:
header["time"] = time
return path, header, value, repeat
def _open_boundary_condition_idf(
block_content, variables, periods: Dict[str, datetime]
) -> Tuple[List[Dict[str, xr.DataArray]], List[datetime]]:
"""
Read the variables specified from block_content.
"""
n_system = block_content["n_system"]
n_time = len(block_content["time"])
n_total = n_system * n_time
das = [{} for _ in range(n_system)]
for variable in variables:
variable_content = block_content[variable]
n = len(variable_content)
if n != n_total:
raise ValueError(
f"Expected n_time * n_system = {n_time} * {n_system} = "
f"{n_total} entries for variable {variable}. Received: {n}"
)
# Group the paths and headers by system.
system_paths = defaultdict(list)
system_headers = defaultdict(list)
system_values = defaultdict(list)
all_repeats = set()
for i, entry in enumerate(variable_content):
path, header, value, repeat = _process_boundary_condition_entry(
entry, periods
)
header["name"] = variable
key = i % n_system
system_paths[key].append(path)
system_headers[key].append(header)
system_values[key].append(value)
if repeat:
all_repeats.add(repeat)
# Concat one system at a time.
for i, (paths, headers, values) in enumerate(
zip(system_paths.values(), system_headers.values(), system_values.values())
):
das[i][variable] = _create_dataarray(paths, headers, values, dim="time")
repeats = sorted(all_repeats)
return das, repeats
def _read_package_gen(
block_content: Dict[str, Any], has_topbot: bool
) -> List[Dict[str, Any]]:
out = []
for entry in block_content["gen"]:
gdf = imod.gen.read(entry["path"])
if has_topbot:
gdf["resistance"] = entry["factor"] * entry["addition"]
else:
gdf["multiplier"] = entry["factor"] * entry["addition"]
d = {
"geodataframe": gdf,
"layer": entry["layer"],
}
out.append(d)
return out
IPF_LOG_MESSAGE_TEMPLATE = """\
A well with the same x, y, id, filter_top and
filter_bot was already imported. This happened at x =
{row[1]}, y = {row[2]}, id = {path_assoc.stem}. Now the
ID for this new well was appended with the suffix _{suffix})
"""
@dataclass
class IpfResult:
has_associated: bool = data_field(default_factory=bool)
dataframe: list[pd.DataFrame] = data_field(default_factory=list)
layer: list[int] = data_field(default_factory=list)
time: list[str] = data_field(default_factory=list)
factor: list[float] = data_field(default_factory=list)
addition: list[float] = data_field(default_factory=list)
def append(
self,
dataframe: pd.DataFrame,
layer: int,
time: str,
factor: float,
addition: float,
):
self.dataframe.append(dataframe)
self.layer.append(layer)
self.time.append(time)
self.factor.append(factor)
self.addition.append(addition)
def _process_ipf_time(
entry: Dict[str, Any], periods: Dict[str, datetime], repeats: list[str]
) -> tuple[Optional[str], list[str]]:
timestring = entry["time"]
time = periods.get(timestring)
if time is None:
time = _process_time(timestring)
else:
repeats.append(time)
return time, repeats
def _prepare_df_unassociated(ipf_df: pd.DataFrame, layer: int) -> pd.DataFrame:
columns = ("x", "y", "rate")
if layer <= 0:
df = ipf_df.iloc[:, :5]
columns = columns + ("filt_top", "filt_bot")
else:
df = ipf_df.iloc[:, :3]
df.columns = columns
return df
def _prepare_df_associated(
ipf_df: pd.DataFrame,
imported_wells: dict[tuple, int],
indexcol: int,
path: Path,
ext: str,
) -> pd.DataFrame:
dfs = []
for row in ipf_df.itertuples():
filename = row[indexcol]
path_assoc = path.parent.joinpath(f"{filename}.{ext}")
well_characteristics_dict = {
"x": row[1],
"y": row[2],
"id": path_assoc.stem,
"filt_top": row[4],
"filt_bot": row[5],
}
df_assoc = _try_read_with_func(imod.ipf.read_associated, path_assoc).iloc[:, :2]
df_assoc.columns = ["time", "rate"]
df_assoc = df_assoc.assign(**well_characteristics_dict)
well_characteristics = tuple(well_characteristics_dict.values())
if well_characteristics not in imported_wells.keys():
imported_wells[well_characteristics] = 0
else:
suffix = imported_wells[well_characteristics] + 1
imported_wells[well_characteristics] = suffix
df_assoc["id"] = df_assoc["id"] + f"_{suffix}"
log_message = textwrap.dedent(
IPF_LOG_MESSAGE_TEMPLATE.format(
row=row, path_assoc=path_assoc, suffix=suffix
)
)
imod.logging.logger.log(
loglevel=LogLevel.WARNING,
message=log_message,
additional_depth=2,
)
dfs.append(df_assoc)
return pd.concat(dfs, ignore_index=True, sort=False)
def _read_package_ipf(
block_content: Dict[str, Any], periods: Dict[str, datetime]
) -> Tuple[Dict[str, Dict], List[datetime]]:
out = defaultdict(IpfResult)
repeats = []
# we will store in this set the tuples of (x, y, id, well_top, well_bot)
# which should be unique for each well
imported_wells = {}
for entry in block_content["ipf"]:
time, repeats = _process_ipf_time(entry, periods, repeats)
layer = entry["layer"]
factor = entry["factor"]
addition = entry["addition"]
# Ensure the columns are identifiable.
path = Path(entry["path"])
ipf_df, indexcol, ext = _try_read_with_func(imod.ipf._read_ipf, path)
if indexcol == 0:
# No associated files
has_associated = False
df = _prepare_df_unassociated(ipf_df, layer)
else:
has_associated = True
df = _prepare_df_associated(ipf_df, imported_wells, indexcol, path, ext)
df["rate"] = df["rate"] * factor + addition
out[path.stem].has_associated = has_associated
out[path.stem].append(df, layer, time, factor, addition)
out_dict_ls: dict[str, dict] = {key: asdict(o) for key, o in out.items()}
repeats = sorted(repeats)
return out_dict_ls, repeats
[docs]
def read_projectfile(path: FilePath) -> Dict[str, Any]:
"""
Read an iMOD project file into a collection of nested dictionaries.
The top-level keys are the "topic" entries such "bnd" or "riv" in the
project file. An example structure of the dictionaries is visualized below:
.. code-block::
content
├── bnd
│ ├── active: bool
│ └── ibound: list of dictionaries for each layer
├── riv
│ ├── active: bool
│ ├── conductance: list of dictionaries for each time and layer.
│ ├── stage: idem.
│ ├── bottom_elevation: idem.
│ └── infiltration_factor: idem.
etc.
Time and layer are flattened into a single list and time is included in
every dictionary:
.. code-block::
stage
├── 0 # First entry in list
│ ├── active: bool
│ ├── is_constant: bool
│ ├── layer: int
│ ├── factor: float
│ ├── addition: float
│ ├── constant: float
│ ├── path: str
│ └── time: str
│
├── 1 # Second entry in list
│ ├── etc.
etc.
Parameters
----------
path: str or Path
Returns
-------
content: Dict[str, Any]
"""
# Force to Path
path = Path(path)
with open(path) as f:
lines = f.readlines()
tokenized = []
for i, line in enumerate(lines):
try:
tokenized.append(_tokenize(line))
except Exception as e:
raise type(e)(f"{e}\nError occurred in line {i}")
lines = _LineIterator(tokenized)
content = {}
wdir = path.parent
# Change dir temporarely to projectfile dir to resolve relative paths
with imod.util.cd(wdir):
while not lines.finished:
_parse_block(lines, content)
return content
def _is_var_ipf_and_path(block_content: dict[str, Any], var: str):
block_item = block_content[var]
path = block_item[0]["path"]
is_ipf = path.suffix.lower() == ".ipf"
return is_ipf, path
[docs]
def open_projectfile_data(path: FilePath) -> tuple[dict[str, Any], dict[str, Any]]:
"""
Read the contents of an iMOD project file and read/open the data present in
it:
* IDF data is lazily loaded into xarray.DataArrays.
* GEN data is eagerly loaded into geopandas.GeoDataFrames
* IPF data is eagerly loaded into pandas.DataFrames
* Non-file based entries (such as the PCG settings) are kept as a dictionary.
When multiple systems are present, they are numbered starting from one, e.g.:
* drn-1
* drn-2
Xarray requires valid dates for the time coordinate. Aliases such as
"summer" and "winter" that are associated with dates in the project file
Periods block cannot be used in the time coordinate. Hence, this function
will instead insert the dates associated with the aliases, with the year
replaced by 1899; as the iMOD calendar starts at 1900, this ensures that
the repeats are always first and that no date collisions will occur.
Parameters
----------
path: pathlib.Path or str.
Returns
-------
data: Dict[str, Any]
Keys are the iMOD project file "topics", without parentheses.
"""
content = read_projectfile(path)
periods_block = content.pop("periods", None)
if periods_block is None:
periods = {}
else:
# Set the year of a repeat date to 1899: this ensures it falls outside
# of the iMOD calendar. Collisions are then always avoided.
periods = {
key: _process_time(time, yearfirst=False).replace(year=1899)
for key, time in periods_block.items()
}
# Pop species block, at the moment we do not do much with,
# since most regional models are without solute transport
content.pop("species", None)
has_topbot = "(top)" in content and "(bot)" in content
prj_data = {}
repeat_stress = {}
for key, block_content in content.items():
repeats = None
try:
if key == "(hfb)":
data = _read_package_gen(block_content, has_topbot)
elif key == "(wel)":
data, repeats = _read_package_ipf(block_content, periods)
elif key == "(cap)":
maybe_ipf_var = "artificial_recharge_layer"
is_ipf, maybe_ipf_path = _is_var_ipf_and_path(
block_content, maybe_ipf_var
)
# If its an ipf drop it and manually add it by calling the
# ipf.read function. If its not an ipf then its an idf, which
# doesnt need any special treatment.
if is_ipf:
block_content.pop(maybe_ipf_var)
variables = set(METASWAP_VARS).intersection(block_content.keys())
data = _open_package_idf(block_content, variables)
# Read and reattach ipf data to package data.
if is_ipf:
data[0][maybe_ipf_var] = imod.ipf.read(maybe_ipf_path)
elif key in ("extra", "(pcg)"):
data = [block_content]
elif key in KEYS:
variables = KEYS[key]
data = _open_package_idf(block_content, variables)
elif key in DATE_KEYS:
variables = DATE_KEYS[key]
data, repeats = _open_boundary_condition_idf(
block_content, variables, periods
)
else:
raise KeyError(f"Unsupported key: '{key}'")
except Exception as e:
raise type(e)(f"{e}. Errored while opening/reading data entries for: {key}")
strippedkey = key.strip("(").strip(")")
if strippedkey == "wel":
for key, d in data.items():
named_key = f"{strippedkey}-{key}"
prj_data[named_key] = d
repeat_stress[named_key] = repeats
elif len(data) > 1:
for i, da in enumerate(data):
numbered_key = f"{strippedkey}-{i + 1}"
prj_data[numbered_key] = da
repeat_stress[numbered_key] = repeats
else:
prj_data[strippedkey] = data[0]
repeat_stress[strippedkey] = repeats
repeat_stress = {k: v for k, v in repeat_stress.items() if v}
return prj_data, repeat_stress
[docs]
def read_timfile(path: FilePath) -> List[Dict]:
def parsetime(time: str) -> datetime:
# Check for steady-state:
if time == "00000000000000":
return None
return datetime.strptime(time, "%Y%m%d%H%M%S")
with open(path, "r") as f:
lines = f.readlines()
# A line contains 2, 3, or 4 values:
# time, isave, nstp, tmult
casters = {
"time": parsetime,
"save": lambda x: bool(int(x)),
"n_timestep": int,
"timestep_multiplier": float,
}
content = []
for line in lines:
stripped = line.strip()
if stripped == "":
continue
parts = stripped.split(",")
entry = {k: cast(s.strip()) for s, (k, cast) in zip(parts, casters.items())}
content.append(entry)
return content