"""
Functions for reading and writing iMOD Point Files (IDFs) to ``pandas.DataFrame``.
The primary functions to use are :func:`imod.ipf.read` and
:func:`imod.ipf.save`, though lower level functions are also available.
"""
import collections
import csv
import glob
import io
import pathlib
import warnings
from typing import Tuple
import numpy as np
import pandas as pd
import imod
from imod.util.time import to_pandas_datetime_series
def _infer_delimwhitespace(line, ncol):
n_elem = len(next(csv.reader([line])))
if n_elem == 1:
return True
elif n_elem == ncol:
return False
else:
warnings.warn(
f"Inconsistent IPF: header states {ncol} columns, first line contains {n_elem}"
)
return False
def _read_ipf(path, kwargs=None) -> Tuple[pd.DataFrame, int, str]:
path = pathlib.Path(path)
if kwargs is None:
kwargs = {}
with open(path) as f:
nrow = int(f.readline().strip())
ncol = int(f.readline().strip())
colnames = [f.readline().strip().strip("'").strip('"') for _ in range(ncol)]
line = f.readline()
try:
# csv.reader parse one line
# this catches commas in quotes
indexcol, ext = map(str.strip, next(csv.reader([line])))
except ValueError: # then try whitespace delimited
indexcol, ext = map(str.strip, next(csv.reader([line], delimiter=" ")))
position = f.tell()
line = f.readline()
delim_whitespace = _infer_delimwhitespace(line, ncol)
f.seek(position)
ipf_kwargs = {
"delim_whitespace": delim_whitespace,
"header": None,
"names": colnames,
"nrows": nrow,
"skipinitialspace": True,
}
ipf_kwargs.update(kwargs)
df = pd.read_csv(f, **ipf_kwargs)
return df, int(indexcol), ext
def _read(path, kwargs=None, assoc_kwargs=None):
"""
Read one IPF file to a single pandas.DataFrame, including associated (TXT) files.
Parameters
----------
path: pathlib.Path or str
globpath for IPF files to read.
kwargs : dict
Dictionary containing the ``pandas.read_csv()`` keyword arguments for the
IPF files (e.g. `{"delim_whitespace": True}`)
assoc_kwargs: dict
Dictionary containing the ``pandas.read_csv()`` keyword arguments for the
associated (TXT) files (e.g. `{"delim_whitespace": True}`)
Returns
-------
pandas.DataFrame
"""
df, indexcol, ext = _read_ipf(path, kwargs)
if assoc_kwargs is None:
assoc_kwargs = {}
# See if reading associated files is necessary
if indexcol > 1:
colnames = df.columns
# df = pd.read_csv(f, header=None, names=colnames, nrows=nrow, **kwargs)
dfs = []
for row in df.itertuples():
filename = row[indexcol]
# associate paths are relative to the ipf
path_assoc = path.parent.joinpath(f"{filename}.{ext}")
# Note that these kwargs handle all associated files, which might differ
# within an IPF. If this happens we could consider supporting a dict
# or function that maps assoc filenames to different kwargs.
try: # Capture the error and print the offending path
df_assoc = read_associated(path_assoc, assoc_kwargs)
except Exception as e:
raise type(e)(
f'{e}\nWhile reading associated file "{path_assoc}" of IPF file "{path}"'
) from e
# Include records of the "mother" ipf file.
for name, value in zip(colnames, row[1:]): # ignores df.index in row
df_assoc[name] = value
# Append to list
dfs.append(df_assoc)
# Merge into a single whole
df = pd.concat(dfs, ignore_index=True, sort=False)
return df
[docs]
def read_associated(path, kwargs={}):
"""
Read an IPF associated file (TXT).
Parameters
----------
path : pathlib.Path or str
Path to associated file.
kwargs : dict
Dictionary containing the ``pandas.read_csv()`` keyword arguments for the
associated (TXT) file (e.g. `{"delim_whitespace": True}`).
Returns
-------
pandas.DataFrame
"""
# deal with e.g. incorrect capitalization
path = pathlib.Path(path).resolve()
with open(path) as f:
nrow = int(f.readline().strip())
line = f.readline()
try:
# csv.reader parse one line
# this catches commas in quotes
ncol, itype = map(int, map(str.strip, next(csv.reader([line]))))
# itype can be implicit, in which case it's a timeseries
except ValueError:
try:
ncol = int(line.strip())
itype = 1
except ValueError: # then try whitespace delimited
ncol, itype = map(
int, map(str.strip, next(csv.reader([line], delimiter=" ")))
)
# use pandas for csv parsing: stuff like commas within quotes
# this is a workaround for a pandas bug, probable related issue:
# https://github.com/pandas-dev/pandas/issues/19827#issuecomment-398649163
lines = [f.readline() for _ in range(ncol)]
delim_whitespace = _infer_delimwhitespace(lines[0], 2)
# Normally, this ought to work:
# metadata = pd.read_csv(f, header=None, nrows=ncol).values
# TODO: replace when bugfix is released
# try both comma and whitespace delimited, everything can be be mixed
# in a single file...
lines = "".join(lines)
# TODO: find out whether this can be replace by csv.reader
# the challenge lies in replacing the pd.notnull for nodata values.
# is otherwise quite a bit faster for such a header block.
metadata_kwargs = {
"delim_whitespace": delim_whitespace,
"header": None,
"nrows": ncol,
"skipinitialspace": True,
}
metadata_kwargs.update(kwargs)
metadata = pd.read_csv(io.StringIO(lines), **metadata_kwargs)
# header description possibly includes nodata
usecols = np.arange(ncol)[pd.notnull(metadata[0])]
metadata = metadata.iloc[usecols, :]
# Collect column names and nodata values
colnames = []
na_values = collections.OrderedDict()
for colname, nodata in metadata.values:
na_values[colname] = [nodata, "-"] # "-" seems common enough to ignore
if isinstance(colname, str):
colnames.append(colname.strip())
else:
colnames.append(colname)
# Sniff the first line of the data block
position = f.tell()
line = f.readline()
f.seek(position)
delim_whitespace = _infer_delimwhitespace(line, ncol)
itype_kwargs = {
"delim_whitespace": delim_whitespace,
"header": None,
"names": colnames,
"usecols": usecols,
"nrows": nrow,
"na_values": na_values,
"skipinitialspace": True,
}
if itype == 1: # Timevariant information: timeseries
# check if first column is time in [yyyymmdd] or [yyyymmddhhmmss]
itype_kwargs["dtype"] = {colnames[0]: str}
elif itype == 2: # 1D borehole
# enforce first column is a float
itype_kwargs["dtype"] = {colnames[0]: np.float64}
elif itype == 3: # cpt
# all columns must be numeric
itype_kwargs["dtype"] = {colname: np.float64 for colname in colnames}
elif itype == 4: # 3D borehole
# enforce first 3 columns are float
itype_kwargs["dtype"] = {
colnames[0]: np.float64,
colnames[1]: np.float64,
colnames[2]: np.float64,
}
itype_kwargs.update(kwargs)
df = pd.read_csv(f, **itype_kwargs)
if nrow > 0 and itype == 1:
time_column = colnames[0]
df[time_column] = to_pandas_datetime_series(df[time_column])
return df
[docs]
def read(path, kwargs={}, assoc_kwargs={}):
"""
Read one or more IPF files to a single pandas.DataFrame, including associated
(TXT) files.
The different IPF files can be from different model layers,
and column names may differ between them.
Note that this function always returns a ``pandas.DataFrame``. IPF files
always contain spatial information, for which ``geopandas.GeoDataFrame``
is a better fit, in principle. However, GeoDataFrames are not the best fit
for the associated data.
To perform spatial operations on the points, you're likely best served by
(temporarily) creating a GeoDataFrame, doing the spatial operation, and
then using the output to select values in the original DataFrame. Please
refer to the examples.
Parameters
----------
path: str, Path or list
This can be a single file, 'wells_l1.ipf', a glob pattern expansion,
'wells_l*.ipf', or a list of files, ['wells_l1.ipf', 'wells_l2.ipf'].
Note that each file needs to have the same columns, such that they can
be combined in a single pd.DataFrame.
kwargs : dict
Dictionary containing the ``pandas.read_csv()`` keyword arguments for the
IPF files (e.g. `{"delim_whitespace": True}`)
assoc_kwargs: dict
Dictionary containing the ``pandas.read_csv()`` keyword arguments for the
associated (TXT) files (e.g. `{"delim_whitespace": True}`)
Returns
-------
pandas.DataFrame
Examples
--------
Read an IPF file into a dataframe:
>>> import imod
>>> df = imod.ipf.read("example.ipf")
Convert the x and y data into a GeoDataFrame, do a spatial operation, and
use it to select points within a polygon.
Note: ``gpd.points_from_xy()`` requires a geopandas version >= 0.5.
>>> import geopandas as gpd
>>> polygon = gpd.read_file("polygon.shp").geometry[0]
>>> ipf_points = gpd.GeoDataFrame(geometry=gpd.points_from_xy(df["x"], df["y"]))
>>> within_polygon = ipf_points.within(polygon)
>>> selection = df[within_polygon]
The same exercise is a little more complicated when associated files (like
timeseries) are involved, since many duplicate values of x and y will exist.
The easiest way to isolate these is by applying a groupby, and then taking
first of x and y of every group:
>>> df = imod.ipf.read("example_with_time.ipf")
>>> first = df.groupby("id").first() # replace "id" by what your ID column is called
>>> x = first["x"]
>>> y = first["y"]
>>> id_code = first.index # id is a reserved keyword in python
>>> ipf_points = gpd.GeoDataFrame(geometry=gpd.points_from_xy(x, y))
>>> within_polygon = ipf_points.within(polygon)
Using the result is a little more complicated as well, since it has to be
mapped back to many duplicate values of the original dataframe.
There are two options. First, by using the index:
>>> within_polygon.index = id_code
>>> df = df.set_index("id")
>>> selection = df[within_polygon]
If you do not wish to change index on the original dataframe, use
``pandas.DataFrame.merge()`` instead.
>>> import pandas as pd
>>> within_polygon = pd.DataFrame({"within": within_polygon})
>>> within_polygon["id"] = id_code
>>> df = df.merge(within_polygon, on="id")
>>> df = df[df["within"]]
"""
if isinstance(path, list):
paths = path
elif isinstance(path, (str, pathlib.Path)):
# convert since for Path.glob non-relative patterns are unsupported
path = str(path)
paths = [pathlib.Path(p) for p in glob.glob(path)]
else:
raise ValueError("Path should be either a list, str or pathlib.Path")
n = len(paths)
if n == 0:
raise FileNotFoundError(f"Could not find any files matching {path}")
elif n == 1:
bigdf = _read(paths[0], kwargs, assoc_kwargs)
else:
dfs = []
for p in paths:
layer = imod.util.path.decompose(p).get("layer")
try:
df = _read(p, kwargs, assoc_kwargs)
except Exception as e:
raise type(e)(f'{e}\nWhile reading IPF file "{p}"') from e
if layer is not None:
df["layer"] = layer
dfs.append(df)
bigdf = pd.concat(
dfs, ignore_index=True, sort=False
) # this sorts in pandas < 0.23
return bigdf
def _coerce_itype(itype):
"""Changes string itype to int"""
if itype in [None, 1, 2, 3, 4]:
pass
elif itype.lower() == "timeseries":
itype = 1
elif itype.lower() == "borehole1d":
itype = 2
elif itype.lower() == "cpt":
itype = 3
elif itype.lower() == "borehole3d":
itype = 4
else:
raise ValueError("Invalid IPF itype")
return itype
def _lower(colnames):
"""Lowers colnames, checking for uniqueness"""
lowered_colnames = [s.lower() for s in colnames]
if len(set(lowered_colnames)) != len(colnames):
seen = set()
for name in lowered_colnames:
if name in seen:
raise ValueError(f'Column name "{name}" is not unique, after lowering.')
else:
seen.add(name)
return lowered_colnames
[docs]
def write_assoc(path, df, itype=1, nodata=1.0e20, assoc_columns=None):
"""
Writes a single IPF associated (TXT) file.
Parameters
----------
path : pathlib.Path or str
Path for the written associated file.
df : pandas.DataFrame
DataFrame containing the data to write.
itype : int or str
IPF type.
Possible values, either integer or string:
1 : "timeseries"
2 : "borehole1d"
3 : "cpt"
4 : "borehole3d"
nodata : float
The value given to nodata values. These are generally NaN (Not-a-Number)
in pandas, but this leads to errors in iMOD(FLOW) for IDFs.
Defaults to value of 1.0e20 instead.
assoc_columns : optional, list or dict
Columns to store in the associated file. In case of a dictionary, the
columns will be renamed according to the mapping in the dictionary.
Defaults to None.
Returns
-------
None
Writes a file.
"""
itype = _coerce_itype(itype)
required_columns = {
1: ["time"],
2: ["top"],
3: ["top"],
4: ["x_offset", "y_offset", "top"],
}
# Ensure columns are in the right order for the itype
colnames = _lower(list(df))
df.columns = colnames
columnorder = []
for colname in required_columns[itype]:
if colname not in colnames:
raise ValueError(f'given itype requires column "{colname}"')
colnames.remove(colname)
columnorder.append(colname)
columnorder += colnames
# Check if columns have to be renamed
if isinstance(assoc_columns, dict):
columnorder = [assoc_columns[col] for col in columnorder]
df = df.rename(columns=assoc_columns)
nrecords, nfields = df.shape
with open(path, "w") as f:
f.write(f"{nrecords}\n{nfields},{itype}\n")
for colname in columnorder:
if "," in colname or " " in colname:
colname = '"' + colname + '"'
f.write(f"{colname},{nodata}\n")
# workaround pandas issue by closing the file first, see
# https://github.com/pandas-dev/pandas/issues/19827#issuecomment-398649163
df = df.fillna(nodata)
df = df[columnorder]
# We cannot rely on the quoting=QUOTE_NONNUMERIC policy
# The reason is that datetime columns are converted to string as well
# and then quoted. This causes trouble with some iMOD(batch) functions.
for column in df.columns:
if df.loc[:, column].dtype == np.dtype("O"):
df.loc[:, column] = df.loc[:, column].astype(str)
df.loc[:, column] = '"' + df.loc[:, column] + '"'
df.to_csv(
path,
index=False,
header=False,
mode="a",
date_format="%Y%m%d%H%M%S",
quoting=csv.QUOTE_NONE,
)
[docs]
def write(path, df, indexcolumn=0, assoc_ext="txt", nodata=1.0e20):
"""
Writes a single IPF file.
Parameters
----------
path : pathlib.Path or str
path of the written IPF file.
Any associated files are written relative to this path, based on the ID
column.
df : pandas.DataFrame
DataFrame containing the data to write.
indexcolumn : integer
number of the column containg the paths to the associated (TXT) files.
Defaults to a value of 0 (no associated files).
assoc_ext : str
Extension of the associated files. Defaults to "txt".
Returns
-------
None
Writes a file.
"""
df = df.fillna(nodata)
nrecords, nfields = df.shape
with open(path, "w") as f:
f.write(f"{nrecords}\n{nfields}\n")
for colname in df.columns:
if "," in colname or " " in colname:
colname = '"' + colname + '"'
f.write(f"{colname}\n")
f.write(f"{indexcolumn},{assoc_ext}\n")
# We cannot rely on the quoting=QUOTE_NONNUMERIC policy
# The reason is that datetime columns are converted to string as well
# and then quoted. This causes trouble with some iMOD(batch) functions.
for column in df.columns:
if df.loc[:, column].dtype == np.dtype("O"):
df.loc[:, column] = df.loc[:, column].astype(str)
df.loc[:, column] = '"' + df.loc[:, column] + '"'
# workaround pandas issue by closing the file first, see
# https://github.com/pandas-dev/pandas/issues/19827#issuecomment-398649163
df.to_csv(path, index=False, header=False, mode="a", quoting=csv.QUOTE_NONE)
def _is_single_value(group):
return len(pd.unique(group)) == 1
def _compose_ipf(path, df, itype, assoc_ext, nodata=1.0e20, assoc_columns=None):
"""
When itype is not None, breaks down the pandas DataFrame into its IPF part
and its associated TXT files, creating the IPF data structure.
Parameters
----------
path : pathlib.Path or str
path of the written IPF file.
Any associated files are written relative to this path, based on the ID
column.
df : pandas.DataFrame
DataFrame containing the data to write.
itype : int or str or None
If ``None`` no associated files are written.
Other possible values, either integer or string:
* ``1`` or ``"timeseries"``
* ``2`` or ``"borehole1d"``
* ``3`` or ``"cpt"``
* ``4`` or ``"borehole3d"``
assoc_ext : str
Extension of the associated files. Normally ".txt".
nodata : float
The value given to nodata values. These are generally NaN (Not-a-Number)
in pandas, but this leads to errors in iMOD(FLOW) for IDFs.
Defaults to value of 1.0e20 instead.
assoc_columns : optional, list or dict
Columns to store in the associated file. In case of a dictionary, the
columns will be renamed according to the mapping in the dictionary.
Defaults to None.
Returns
-------
None
Writes files.
"""
if itype is None:
write(path, df, nodata=nodata)
else:
itype = _coerce_itype(itype)
colnames = _lower(list(df))
df.columns = colnames
for refname in ["x", "y", "id"]:
if refname not in colnames:
raise ValueError(f'given itype requires column "{refname}"')
colnames.remove(refname)
grouped = df.groupby("id")
if not grouped["x"].apply(_is_single_value).all():
raise ValueError("column x contains more than one value per id")
if not grouped["y"].apply(_is_single_value).all():
raise ValueError("column y contains more than one value per id")
# get columns that have only one value within a group, to save them in ipf
ipf_columns = [
(colname, "first")
for colname in colnames
if grouped[colname].apply(_is_single_value).all()
]
for idcode, group in grouped:
assoc_path = path.parent.joinpath(str(idcode) + "." + str(assoc_ext))
assoc_path.parent.mkdir(parents=True, exist_ok=True)
if isinstance(assoc_columns, list):
selection = assoc_columns
elif isinstance(assoc_columns, dict):
selection = list(assoc_columns.keys())
else:
selection = [
colname for colname in colnames if colname not in ipf_columns
]
out_df = group[selection]
write_assoc(assoc_path, out_df, itype, nodata, assoc_columns)
# ensures right order for x, y, id; so that also indexcolumn == 3
agg_kwargs = collections.OrderedDict(
[("x", "first"), ("y", "first"), ("id", "first")]
)
agg_kwargs.update(ipf_columns)
agg_df = grouped.agg(agg_kwargs)
write(path, agg_df, 3, assoc_ext, nodata=nodata)
[docs]
def save(path, df, itype=None, assoc_ext="txt", nodata=1.0e20, assoc_columns=None):
"""
Saves the contents of a pandas DataFrame to one or more IPF files, and
associated (TXT) files.
Can write multiple IPF files if one of the columns is named "layer". In
turn, multiple associated (TXT) files may written for each of these IPF
files. Note that the ID must be unique for each layer. See the examples.
Parameters
----------
path : pathlib.Path or str
path of the written IPF file.
Any associated files are written relative to this path, based on the ID
column.
df : pandas.DataFrame
DataFrame containing the data to write.
itype : int or str or None
IPF type. Defaults to ``None``, in which case no associated files are
created. Possible other values, either integer or string:
* ``1`` or ``"timeseries"``
* ``2`` or ``"borehole1d"``
* ``3`` or ``"cpt"``
* ``4`` or ``"borehole3d"``
assoc_ext : str
Extension of the associated files. Defaults to "txt".
nodata : float
The value given to nodata values. These are generally NaN (Not-a-Number)
in pandas, but this leads to errors in iMOD(FLOW) for IDFs.
Defaults to value of 1.0e20 instead.
assoc_columns : optional, list or dict
Columns to store in the associated file. In case of a dictionary, the
columns will be renamed according to the mapping in the dictionary.
Defaults to None.
Returns
-------
None
Writes files.
Examples
--------
To write a single IPF without associated timeseries or boreholes:
>>> imod.ipf.save("static-data.ipf", df)
To write timeseries data:
>>> imod.ipf.save("transient-data.ipf", df, itype="timeseries")
If a ``"layer"`` column is present, make sure the ID is unique per layer:
>>> df["id"] = df["id"].str.cat(df["layer"], sep="_")
>>> imod.ipf.save("layered.ipf", df, itype="timeseries")
An error will be raised otherwise.
"""
path = pathlib.Path(path)
d = {"extension": ".ipf", "name": path.stem, "directory": path.parent}
d["directory"].mkdir(exist_ok=True, parents=True)
colnames = _lower(list(df))
# Lower assoc_columns as well if available
if isinstance(assoc_columns, list):
assoc_columns = _lower(assoc_columns)
elif isinstance(assoc_columns, dict):
keys = _lower(assoc_columns.keys())
values = _lower(assoc_columns.values())
assoc_columns = dict(zip(keys, values))
df.columns = colnames
if "layer" in colnames:
if "time" in colnames:
groupcols = ["time", "id"]
else:
groupcols = "id"
n_layer_per_id = df.groupby(groupcols)["layer"].nunique()
if (n_layer_per_id > 1).any():
raise ValueError(
"Multiple layer values for a single ID detected. "
"Unique IDs are required for each layer."
)
for layer, group in df.groupby("layer"):
d["layer"] = layer
fn = imod.util.path.compose(d)
_compose_ipf(fn, group, itype, assoc_ext, nodata, assoc_columns)
else:
fn = imod.util.path.compose(d)
_compose_ipf(fn, df, itype, assoc_ext, nodata, assoc_columns)