Source code for imod.formats.gen.gen

import io
import warnings
from pathlib import Path
from typing import List, Optional, Tuple, Union

import numpy as np
import pandas as pd
from scipy.io import FortranFile, FortranFormattingError

from imod.formats.ipf import _infer_delimwhitespace
from imod.util.imports import MissingOptionalModule

try:
    import shapely
    import shapely.geometry as sg
except ImportError:
    sg = MissingOptionalModule("shapely")
    shapely = MissingOptionalModule("shapely")

try:
    import geopandas as gpd
except ImportError:
    gpd = MissingOptionalModule("geopandas")


# ASCII text format:
# ------------------
def parse_ascii_points(lines: List[str]) -> "geopandas.GeoDataFrame":  # type: ignore # noqa
    n = len(lines) - 1
    fid = np.empty(n, dtype=int)
    x = np.empty(n, dtype=float)
    y = np.empty(n, dtype=float)
    for i, line in enumerate(lines[:-1]):
        fid[i], x[i], y[i] = line.split(",")

    points = gpd.points_from_xy(x=x, y=y)
    return gpd.GeoDataFrame(data={"id": fid}, geometry=points)


def parse_ascii_segments(lines: List[str]):
    end = [i for i, line in enumerate(lines) if line == "end"][:-1]
    start = [-1] + end[:-1]
    features = [lines[s + 1 : e] for s, e in zip(start, end)]

    n_feature = len(features)
    n_vertex = [len(f) - 1 for f in features]

    # Process the feature id as a string. This adds support for id's containing
    # commas and spaces
    fid = np.full(n_feature, "", dtype=object)
    is_polygon = np.empty(n_feature, dtype=bool)
    indices = np.repeat(np.arange(n_feature), n_vertex)

    first_coord = features[0][1:][0]
    has_whitespace = _infer_delimwhitespace(first_coord, 2)

    vertex_coords = []
    for i, feature in enumerate(features):
        fid[i] = feature[0]
        # Use pd.read_csv instead of np.loadtxt, since it supports files
        # delimited with multiple whitespace characters.
        feature_buffer = io.StringIO(initial_value="\n".join(feature[1:]))
        coords_df = pd.read_csv(
            feature_buffer, delim_whitespace=has_whitespace, header=None
        )
        coords = coords_df.values
        is_polygon[i] = (coords[0] == coords[-1]).all()
        vertex_coords.append(coords)

    coordinates = np.vstack(vertex_coords)
    polygon = is_polygon.any()
    if polygon != is_polygon.all():
        raise ValueError("Some features are linestrings, some are polygons")

    if polygon:
        rings = shapely.linearrings(coordinates, indices=indices)
        geometry = shapely.polygons(rings)
    else:
        geometry = shapely.linestrings(coordinates, indices=indices)

    return gpd.GeoDataFrame(data={"id": fid}, geometry=geometry)


[docs] def read_ascii(path: Union[Path, str]) -> "geopandas.GeoDataFrame": # type: ignore # noqa """ Read a text ASCII GEN file to a geopandas GeoDataFrame. Parameters ---------- path: Union[str, Path] Returns ------- geodataframe: gpd.GeoDataFrame """ # From gen itype to shapely geometry: with open(path, "r") as f: lines = [line.lower().strip() for line in f.readlines() if line.strip() != ""] if len(lines[0].split(",")) == 3: return parse_ascii_points(lines) return parse_ascii_segments(lines)
# Binary format: # -------------- # # From the iMOD User Manual FLOAT_TYPE = np.float64 INT_TYPE = np.int32 HEADER_TYPE = np.int32 CIRCLE = 1024 POLYGON = 1025 RECTANGLE = 1026 POINT = 1027 LINE = 1028 MAX_NAME_WIDTH = 11 # Map integer enumerators to strings GENTYPE_TO_NAME = { CIRCLE: "circle", POLYGON: "polygon", RECTANGLE: "rectangle", POINT: "point", LINE: "line", } NAME_TO_GENTYPE = {v: k for k, v in GENTYPE_TO_NAME.items()} # Forward references, since shapely is optional Point = "shapely.geometry.Point" Polygon = "shapely.geometry.Polygon" LineString = "shapely.geometry.LineString" class MyFortranFile(FortranFile): """ Unfortunately, the binary GEN files are written as Fortran Record files, so they cannot be read directly with e.g. numpy.fromfile (like direct access) The scipy FortranFile is mostly adequate, it just misses a method to read char records (always ascii encoded; note all ascii is valid utf-8, but not vice versa). Those are added here. """ def read_char_record(self): first_size = self._read_size(eof_ok=True) string = self._fp.read(first_size).decode("utf-8") if len(string) != first_size: raise FortranFormattingError("End of file in the middle of a record") second_size = self._read_size(eof_ok=True) if first_size != second_size: raise IOError("Sizes do not agree in the header and footer for this record") return string def write_char_record(self, string: str): total_size = len(string) bytes_string = string.encode("ascii") nb = np.array([total_size], dtype=self._header_dtype) nb.tofile(self._fp) self._fp.write(bytes_string) nb.tofile(self._fp) # The binary GEN file has some idiosyncratic geometries: # * A separate circle geometry # * A seperate rectangle geometry # In OGC (WKT) terms, these are polygons. # # Both are defined by two vertices: # * Circle: center and any other outside point (from which we can infer the radius) # * Rectangle: (left, lower), (right, upper); may also be (left, upper), (right, lower) # # The other shapely geometries can be generated directly from the vertices. def from_circle(xy: np.ndarray) -> Polygon: radius = np.sqrt(np.sum((xy[1] - xy[0]) ** 2)) return sg.Point(xy[0]).buffer(radius) def from_point(xy: np.ndarray) -> Polygon: return sg.Point(xy[0]) def from_rectangle(xy: np.ndarray) -> Polygon: return sg.box(xy[0, 0], xy[0, 1], xy[1, 0], xy[1, 1]) def to_circle(geometry: Polygon) -> Tuple[np.ndarray, int]: xy = np.array([geometry.centroid.coords[0], geometry.exterior.coords[0]]) return xy, 2 def to_rectangle(geometry: Polygon) -> Tuple[np.ndarray, int]: xy = np.array(geometry.exterior.coords) if (geometry.area / geometry.minimum_rotated_rectangle.area) < 0.999: raise ValueError("Feature_type is rectangle, but geometry is not a rectangular") # First and third vertex will give (left, right) and (lower, upper) return xy[[0, 2]], 2 def to_polygon(geometry: Polygon) -> Tuple[np.ndarray, int]: xy = np.array(geometry.exterior.coords) return xy, xy.shape[0] def to_point(geometry: Point) -> Tuple[np.ndarray, int]: return np.array(geometry.coords), 1 def to_line(geometry: LineString) -> Tuple[np.ndarray, int]: xy = np.array(geometry.coords) return xy, xy.shape[0] def read_binary(path: Union[str, Path]) -> "geopandas.GeoDataFrame": # type: ignore # noqa """ Read a binary GEN file to a geopandas GeoDataFrame. Parameters ---------- path: Union[str, Path] Returns ------- geodataframe: gpd.GeoDataFrame """ # From gen itype to shapely geometry: GENTYPE_TO_GEOM = { CIRCLE: from_circle, POLYGON: sg.Polygon, RECTANGLE: from_rectangle, POINT: from_point, LINE: sg.LineString, } with warnings.catch_warnings(record=True): warnings.filterwarnings( "ignore", message="Given a dtype which is not unsigned." ) with MyFortranFile(path, mode="r", header_dtype=HEADER_TYPE) as f: f.read_reals(dtype=FLOAT_TYPE) # Skip the bounding box n_feature, n_column = f.read_ints(dtype=INT_TYPE) if n_column > 0: widths = f.read_ints(dtype=INT_TYPE) indices = range(0, (n_column + 1) * MAX_NAME_WIDTH, MAX_NAME_WIDTH) string = f.read_char_record() # pylint:disable=no-member names = [string[i:j].strip() for i, j in zip(indices[:-1], indices[1:])] xy = [] rows = [] feature_type = np.empty(n_feature, dtype=INT_TYPE) for i in range(n_feature): _, ftype = f.read_ints(dtype=INT_TYPE) feature_type[i] = ftype if n_column > 0: rows.append(f.read_char_record()) # pylint:disable=no-member f.read_reals(dtype=FLOAT_TYPE) # skip the bounding box xy.append(f.read_reals(dtype=FLOAT_TYPE).reshape((-1, 2))) if n_column > 0: df = pd.read_fwf( io.StringIO("\n".join(rows)), widths=widths, names=names, ) else: df = pd.DataFrame() df["feature_type"] = feature_type df["feature_type"] = df["feature_type"].replace(GENTYPE_TO_NAME) geometry = [] for ftype, geom in zip(feature_type, xy): geometry.append(GENTYPE_TO_GEOM[ftype](geom)) return gpd.GeoDataFrame(df, geometry=geometry) def vertices( geometry: Union[Point, Polygon, LineString], ftype: str ) -> Tuple[int, np.ndarray, int]: """ Infer from geometry, or convert from string, the feature type to the GEN expected Enum (int). Convert the geometry to the GEN expected vertices, and the number of vertices. """ # Checking names with actual geometry types NAME_TO_GEOM = { "circle": sg.Polygon, "rectangle": sg.Polygon, "polygon": sg.Polygon, "point": sg.Point, "line": sg.LineString, } GENTYPE_TO_VERTICES = { CIRCLE: to_circle, RECTANGLE: to_rectangle, POLYGON: to_polygon, POINT: to_point, LINE: to_line, } # Infer gentype on the basis of shapely type GEOM_TO_GENTYPE = { sg.Polygon: POLYGON, sg.Point: POINT, sg.LineString: LINE, } if ftype != "": # Start by checking whether the feature type matches the geometry try: expected = NAME_TO_GEOM[ftype] except KeyError as e: raise ValueError( f"feature_type should be one of {set(NAME_TO_GEOM.keys())}. Got instead {ftype}" ) from e if not isinstance(geometry, expected): raise ValueError( f"Feature type is {ftype}, expected {expected}. Got instead: {type(geometry)}" ) ftype: int = NAME_TO_GENTYPE[ftype] else: try: ftype: int = GEOM_TO_GENTYPE[type(geometry)] except KeyError as e: raise TypeError( "Geometry type not allowed. Should be Polygon, Linestring, or Point." f" Got {type(geometry)} instead." ) from e xy, n_vertex = GENTYPE_TO_VERTICES[ftype](geometry) return ftype, xy, n_vertex
[docs] def read(path): """ Read a GEN file to a geopandas GeoDataFrame. The function first tries to read as binary, if this fails, it tries to read the gen file as ASCII. If certain that a GEN file is ascii or binary, the user is adviced to use the respective functions :func:`imod.gen.gen.read_ascii` or :func:`imod.gen.gen.read_binary`. Parameters ---------- path: Union[str, Path] Returns ------- geodataframe: gpd.GeoDataFrame """ try: return read_binary(path) except Exception: try: return read_ascii(path) except Exception as e: raise type(e)(f'{e}\nWhile reading GEN file "{path}"')
[docs] def write( path: Union[str, Path], geodataframe: "geopandas.GeoDataFrame", # type: ignore # noqa feature_type: Optional[str] = None, ) -> None: """ Write a GeoDataFrame to a binary GEN file. Note that the binary GEN file has two geometry types, circles and rectangles, which cannot be mapped directly from a shapely type. Points, lines, and polygons can be converted automatically. In shapely, circles and rectangles will also be represented by polygons. To specifically write circles and rectangles to a binary GEN file, an additional column of strings is required which specifies the geometry type. Parameters ---------- path : Union[str, Path] geodataframe : gpd.GeoDataFrame feature_type : Optional[str] Which column to interpret as geometry type, one of: point, line, polygon, circle, rectangle. Default value is ``None``. Returns ------- None Writes file. """ df = pd.DataFrame(geodataframe.drop(columns="geometry")).astype("string").fillna("") if feature_type is not None: types = df.pop(feature_type).str.lower() else: # Create a dummy iterator types = ("" for _ in range(len(df))) n_feature, n_column = df.shape # Truncate column names to 11 chars, then make everything at least 11 chars column_names = "".join([c[:11].ljust(11) for c in df]) # Get the widths of the columns. Make room for at least 11 widths = [] for column in df: width = max(11, df[column].str.len().max()) df[column] = df[column].str.pad(width, side="right") widths.append(width) with warnings.catch_warnings(record=True): warnings.filterwarnings( "ignore", message="Given a dtype which is not unsigned." ) with MyFortranFile(path, mode="w", header_dtype=HEADER_TYPE) as f: f.write_record(geodataframe.total_bounds.astype(FLOAT_TYPE)) f.write_record(np.array([n_feature, n_column], dtype=INT_TYPE)) if n_column > 0: f.write_record(np.array(widths).astype(INT_TYPE)) f.write_char_record(column_names) # pylint:disable=no-member for geometry, (_, row), ftype in zip( geodataframe.geometry, df.iterrows(), types ): ftype, xy, n_vertex = vertices(geometry, ftype) f.write_record(np.array([n_vertex, ftype], dtype=INT_TYPE)) if n_column > 0: f.write_char_record("".join(row.values)) # pylint:disable=no-member f.write_record(np.array(geometry.bounds).astype(FLOAT_TYPE)) f.write_record(xy.astype(FLOAT_TYPE))