Skip to content

IO

DHyDAMO IO API

common

ExtendedDataFrame

Bases: DataFrame

Source code in hydrolib/dhydamo/io/common.py
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
class ExtendedDataFrame(pd.DataFrame):
    _metadata = ["required_columns"] + pd.DataFrame._metadata

    def __init__(self, required_columns=None, *args, **kwargs):
        super().__init__(*args, **kwargs)

        if required_columns is None:
            required_columns = []

        self.required_columns = (
            required_columns[:]
            if isinstance(required_columns, list)
            else [required_columns]
        )

    def delete_all(self):
        """
        Empty the dataframe
        """
        if not self.empty:
            self.iloc[:, 0] = np.nan
            self.dropna(inplace=True)

    def set_data(self, df, index_col):
        if not self.empty:
            self.delete_all()

        # Copy content
        for col, values in df.items():
            self[col] = values.to_numpy()

        if index_col is None:
            self.index = df.index
            self.index.name = df.index.name

        else:
            self.index = df[index_col]
            self.index.name = index_col

        # Check columns and types
        self._check_columns()

    def add_data(self, df):
        if not np.isin(df.columns, self.columns).all():
            raise KeyError(
                "The new df contains columns that are not present in the current df."
            )

        # Concatenate data
        current = pd.DataFrame(self.values, index=self.index, columns=self.columns)
        newdf = pd.concat([current, df], ignore_index=False, sort=False)

        # Empty current df
        self.delete_all()

        # Add values again
        self.set_data(newdf, index_col=self.index.name)

    def _check_columns(self):
        """
        Check presence of columns in geodataframe
        """
        present_columns = self.columns.tolist()
        for i, column in enumerate(self.required_columns):
            if column not in present_columns:
                raise KeyError(
                    'Column "{}" not found. Got {}, Expected at least {}'.format(
                        column,
                        ", ".join(present_columns),
                        ", ".join(self.required_columns),
                    )
                )

    def read_gpkg_layer(
        self,
        gpkg_path: str | Path,
        layer_name: str = None,
        column_mapping: dict = None,
        index_col: str = None,
    ):
        """
        Read GML file to GeoDataFrame.

        This function has the option to group Points into LineStrings. To do so,
        specify the groupby column (which the set has in common) and the order_column,
        the column which indicates the order of the grouping.

        A mask file can be specified to clip the selection.

        Parameters
        ----------
        gml_path : str
            Path to the GML file
        layer_name: str
            Layer name of the desired layer in the package
        index_col : str
            Optional, column to be set as index
        """

        if not Path(gpkg_path).exists():
            raise OSError(f'File not found: "{gpkg_path}"')

        if isinstance(gpkg_path, Path):
            gpkg_path = str(gpkg_path)

        if layer_name.lower() not in map(str.lower, gpd.list_layers(gpkg_path).name.tolist()):
            raise ValueError(f'Layer "{layer_name}" does not exist in: "{gpkg_path}"')

        layer = gpd.read_file(gpkg_path, layer=layer_name, engine='pyogrio')
        columns = [col.lower() for col in layer.columns]
        layer.columns = columns
        if 'geometry' in layer.columns:
            layer.drop("geometry", axis=1, inplace=True)

        df = layer
        if column_mapping is not None:
            df.rename(columns=column_mapping, inplace=True)

        # Add data to class GeoDataFrame
        self.set_data(df, index_col=index_col)
delete_all()

Empty the dataframe

Source code in hydrolib/dhydamo/io/common.py
446
447
448
449
450
451
452
def delete_all(self):
    """
    Empty the dataframe
    """
    if not self.empty:
        self.iloc[:, 0] = np.nan
        self.dropna(inplace=True)
read_gpkg_layer(gpkg_path: str | Path, layer_name: str = None, column_mapping: dict = None, index_col: str = None)

Read GML file to GeoDataFrame.

This function has the option to group Points into LineStrings. To do so, specify the groupby column (which the set has in common) and the order_column, the column which indicates the order of the grouping.

A mask file can be specified to clip the selection.

Parameters

gml_path : str Path to the GML file layer_name: str Layer name of the desired layer in the package index_col : str Optional, column to be set as index

Source code in hydrolib/dhydamo/io/common.py
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
def read_gpkg_layer(
    self,
    gpkg_path: str | Path,
    layer_name: str = None,
    column_mapping: dict = None,
    index_col: str = None,
):
    """
    Read GML file to GeoDataFrame.

    This function has the option to group Points into LineStrings. To do so,
    specify the groupby column (which the set has in common) and the order_column,
    the column which indicates the order of the grouping.

    A mask file can be specified to clip the selection.

    Parameters
    ----------
    gml_path : str
        Path to the GML file
    layer_name: str
        Layer name of the desired layer in the package
    index_col : str
        Optional, column to be set as index
    """

    if not Path(gpkg_path).exists():
        raise OSError(f'File not found: "{gpkg_path}"')

    if isinstance(gpkg_path, Path):
        gpkg_path = str(gpkg_path)

    if layer_name.lower() not in map(str.lower, gpd.list_layers(gpkg_path).name.tolist()):
        raise ValueError(f'Layer "{layer_name}" does not exist in: "{gpkg_path}"')

    layer = gpd.read_file(gpkg_path, layer=layer_name, engine='pyogrio')
    columns = [col.lower() for col in layer.columns]
    layer.columns = columns
    if 'geometry' in layer.columns:
        layer.drop("geometry", axis=1, inplace=True)

    df = layer
    if column_mapping is not None:
        df.rename(columns=column_mapping, inplace=True)

    # Add data to class GeoDataFrame
    self.set_data(df, index_col=index_col)

ExtendedGeoDataFrame

Bases: GeoDataFrame

Source code in hydrolib/dhydamo/io/common.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
class ExtendedGeoDataFrame(gpd.GeoDataFrame):
    # normal properties
    _metadata = ["required_columns", "geotype", "related"] + gpd.GeoDataFrame._metadata

    def __init__(self, geotype, required_columns=None, related=None, logger=logging, *args, **kwargs):
        # Check type
        if required_columns is None:
            required_columns = []
        elif not isinstance(required_columns, list):
            required_columns = [required_columns]

        # Add required columns to column list
        if "columns" in kwargs.keys():
            kwargs["columns"] += required_columns
        else:
            kwargs["columns"] = required_columns

        super().__init__(*args, **kwargs)

        self.required_columns = required_columns[:]
        self.geotype = geotype
        self.related = deepcopy(related)

    def copy(self, deep=True):
        """
        Create a copy
        """

        index = self.index.tolist() if not deep else deepcopy(self.index.tolist())
        columns = self.columns.tolist() if not deep else deepcopy(self.columns.tolist())

        edf = ExtendedGeoDataFrame(
            geotype=self.geotype,
            required_columns=[],
            index=index,
            columns=columns,
        )
        edf.required_columns.extend(self.required_columns[:])
        edf.loc[:, :] = self.values if not deep else deepcopy(self.values)

        return edf

    def delete_all(self):
        """
        Empty the dataframe
        """
        if not self.empty:
            self.drop(self.index, inplace=True)

    def read_shp(
        self,
        path: str | Path,
        index_col: str = None,
        column_mapping: dict = None,
        check_columns: bool = True,
        proj_crs=None,
        clip: Polygon | MultiPolygon = None,
        check_geotype: bool = True,
        id_col: str = "code",
        filter_cols: bool = False,
        filter_rows=None,
    ):
        """
        Import function, extended with type checks. Does not destroy reference to object.
        """
        # Read GeoDataFrame
        gdf = gpd.read_file(path)

        # Only keep required columns
        if filter_cols:
            logger.info("Filtering required column keys")
            gdf.drop(
                columns=gdf.columns[~gdf.columns.isin(self.required_columns)],
                inplace=True,
            )

        # filter out rows on key/value pairs if required
        if filter_rows is not None:
            logger.info("Filter rows using key value pairs")
            filtered = (gdf[list(filter_rows)] == pd.Series(filter_rows)).all(axis=1)
            gdf = gdf[filtered]

        # Drop features without geometry
        total_features = len(gdf)
        gdf.drop(gdf.index[gdf.geometry.isna()], inplace=True)  # temporary fix
        missing_features = len(gdf.index[gdf.geometry.isna()])
        logger.debug(
            f"{missing_features} out of {total_features} do not have a geometry"
        )

        # Rename columns:
        if column_mapping is not None:
            gdf.rename(columns=column_mapping, inplace=True)

        # Check number of entries
        if gdf.empty:
            raise OSError("Imported shapefile contains no rows.")

        # Add data to class GeoDataFrame
        self.set_data(
            gdf,
            index_col=index_col,
            check_columns=check_columns,
            check_geotype=check_geotype,
        )

        # Clip if extent is provided
        if clip is not None:
            self.clip(clip)

        # To re-project CRS system to projected CRS, first all empty geometries should be dropped
        if proj_crs is not None:
            self.check_projection(proj_crs)
        else:
            logger.debug("No projected CRS is given in ini-file")

    def set_data(self, gdf, index_col=None, check_columns=True, check_geotype=True):
        if not self.empty:
            self.delete_all()

        # Check columns
        if check_columns:
            self._check_columns(gdf)

        # Copy content
        for col, values in gdf.items():
            if str(values.dtype) == "geometry":
                self.set_geometry(values.to_numpy(), inplace=True)
            else:
                self[col] = values.to_numpy()

        if index_col is None:
            self.index = gdf.index
            self.index.name = gdf.index.name

        else:
            self.index = gdf[index_col]
            self.index.name = index_col

        # Check geometry types
        if check_geotype:
            self._check_geotype()

    def _check_columns(self, gdf):
        """
        Check presence of columns in geodataframe
        """
        present_columns = gdf.columns.tolist()
        for column in self.required_columns:
            if column not in present_columns:
                # geopackages don't support very long names. Only give an error if the first 25 characters don't occur, otherwise lengthen the column name
                if column not in present_columns:
                    raise KeyError(
                        'Column "{}" not found. Got {}, Expected at least {}'.format(
                            column,
                            ", ".join(present_columns),
                            ", ".join(self.required_columns),
                        )
                    )

    def _check_geotype(self):
        """
        Check geometry type
        """
        if not all(isinstance(geo, self.geotype) for geo in self.geometry):
            raise TypeError(
                'Geometrytype "{}" required. The input shapefile has geometry type(s) {}.'.format(
                    re.findall("([A-Z].*)'", repr(self.geotype))[0],
                    self.geometry.type.unique().tolist(),
                )
            )

    def show_gpkg(self, gpkg_path: str | Path):
        if not Path(gpkg_path).exists():
            raise OSError(f'File not found: "{gpkg_path}"')

        if isinstance(gpkg_path, Path):
            gpkg_path = str(gpkg_path)

        layerlist = gpd.list_layers(gpkg_path).name.tolist()
        logger.info(
            "Content of gpkg-file %s, containing %d layers:",
            gpkg_path,
            len(layerlist),
        )
        logger.info(
            "\tINDEX\t|\tNAME                        \t|\tGEOM_TYPE      \t|\t NFEATURES\t|\t   NFIELDS"
        )
        for laynum, layer_name in enumerate(layerlist):
            layer = gpd.read_file(gpkg_path, layer=layer_name)
            if layer.empty:
                logger.warning('Layer "%s" is empty.', layer_name)
                continue

            nfields = len(layer.columns)
            nfeatures = layer.shape[0]

            if isinstance(layer, gpd.GeoDataFrame):
                geom_type = layer.geom_type.iloc[0]
                if geom_type is None:
                    geom_type = "None"                
            else:
                geom_type = "None"

            logger.info(
                "\t%5d\t|\t%30s\t|\t%s\t|\t%10d\t|\t%10d",
                laynum,
                layer_name,
                geom_type,
                nfeatures,
                nfields,
            )

    def read_gpkg_layer(
        self,
        gpkg_path: str | Path,
        layer_name: str,
        index_col: str = None,
        groupby_column: str = None,
        order_column: str = None,
        id_col: str = "code",
        column_mapping: dict = None,
        check_columns: bool = True,
        check_geotype: bool = True,
        clip: Polygon | MultiPolygon = None,
        cliptype: str = "select",
        check_3d: bool = True
    ):
        if not Path(gpkg_path).exists():
            raise OSError(f'File not found: "{gpkg_path}"')

        if isinstance(gpkg_path, Path):
            gpkg_path = str(gpkg_path)

        if layer_name.lower() not in map(str.lower, gpd.list_layers(gpkg_path).name.tolist()):
            raise ValueError(f'Layer "{layer_name}" does not exist in: "{gpkg_path}"')

        layer = gpd.read_file(gpkg_path, layer=layer_name, engine='pyogrio')
        columns = [col.lower() for col in layer.columns]
        layer.columns = columns

        # Get group by columns
        if groupby_column is not None:
            # Check if the group by column is found
            if groupby_column not in columns:
                raise ValueError("Groupby column not found in feature list.")

            if order_column is None or order_column not in columns:
                raise ValueError("Order column not found in feature list.")

            # Check if the geometry is as expected
            geom_types = layer.geometry.geom_type.unique()
            if len(geom_types) != 1 or geom_types[0] != "Point":
                raise ValueError("Can only group Points to LineString")
            if check_3d and np.isnan(layer.geometry.z.to_numpy()).any():
                raise ValueError("All geometries need to have a Z coordinate")


            # Group geometries to lines
            geometries = []
            fields = []
            for groupname, group in layer.groupby(groupby_column, sort=False):
                # Filter branches with too few points
                if len(group) < 2:
                    logger.warning(
                        'Ignoring %s "%s": contains less than two points.',
                        groupby_column,
                        groupname,
                    )
                    continue

                # Determine relative order of points in profile
                group = group.sort_values(order_column)
                geometries.append(LineString(group.geometry.tolist()))
                fields.append(group.iloc[0, :]) 

        else:
            fields = layer
            geometries = layer.geometry

        # Create geodataframe
        gdf = gpd.GeoDataFrame(fields, columns=columns, geometry=geometries)

        if column_mapping is not None:
            gdf.rename(columns=column_mapping, inplace=True)

        # Enforce a unique index column
        if index_col is not None:
            dupes = gdf[gdf.duplicated(subset=index_col, keep="first")].copy()
            if len(dupes) > 0:
                logger.warning(
                    "Index column '%s' contains duplicates (%s). Adding a suffix to make it unique.",
                    index_col,
                    list(gdf[gdf[index_col].duplicated()].code.unique()),
                )
                for dupe_id, group in dupes.groupby(by=index_col, sort=False):
                    gdf.loc[group.index, index_col] = [f"{dupe_id}_{i+1}" for i in range(len(group))]

        # Add data to class GeoDataFrame
        self.set_data(
            gdf,
            index_col=index_col,
            check_columns=check_columns,
            check_geotype=check_geotype,
        )

        if clip is not None:
            self.clip(geometry=clip, cliptype=cliptype)

    def clip(self, geometry: Polygon | MultiPolygon, cliptype: str = "select", clip_and_drop: bool = True):
        """
        Clip geometry
        """
        if not isinstance(geometry, (Polygon, MultiPolygon)):
            raise TypeError("Expected geometry of type Polygon or MultiPolygon")

        # Clip if needed          
        if cliptype == "clip":
            pre_geomtypes = self.geom_type.unique().tolist()
            if "Polygon" in pre_geomtypes and "MultiPolygon" not in pre_geomtypes:
                pre_geomtypes.append("MultiPolygon")
            if "MultiPolygon" in pre_geomtypes and "Polygon" not in pre_geomtypes:
                pre_geomtypes.append("Polygon")
            gdf = gpd.clip(self, gpd.GeoDataFrame(geometry=[geometry], crs=self.crs))
            if clip_and_drop:
                # Reduce to allowed 
                gdf = gpd.GeoDataFrame(gdf, crs=gdf.crs)
                rem = gdf[~gdf.geom_type.isin(pre_geomtypes)]
                # Explode to deal with geometry collections
                rem = rem.explode(ignore_index=False, index_parts=False)
                rem = rem[rem.geom_type.isin(pre_geomtypes)]
                gdf = pd.concat([gdf[gdf.geom_type.isin(pre_geomtypes)], rem], ignore_index=False)
                # Merge duplicate indices to single multigeometry
                keep = []
                for idx, group in gdf.groupby(level=0, sort=False):
                    if len(group) == 1:
                        keep.append(group.iloc[0].copy())
                    elif len(group) > 1:
                        row = group.iloc[0].copy()
                        geoms = group.geometry.tolist()
                        geom = shapely.union_all(geoms)
                        if geom.geom_type not in pre_geomtypes:
                            logger.warning(
                                "Cannot deduplicate index '%s': clipping resulted in split geometries, using the first geometry.",
                                idx,
                            )
                            geom = geoms[0]
                        row.at["geometry"] = geom
                        keep.append(row)
                gdf = gpd.GeoDataFrame(keep, crs=gdf.crs)
        elif cliptype == "select":
            gdf = self.loc[self.intersects(geometry).to_numpy()]
        else:
            raise ValueError(f"Cliptype {cliptype} not recognized. Use 'clip' or 'select'.")
        if gdf.empty:
            raise ValueError("Found no features within extent geometry.")

        self.set_data(gdf)

    def check_projection(self, crs_out):
        """
        Check if reprojection is required
        """
        if crs_out != self.crs:
            self.to_crs(crs_out, inplace=True)
        else:
            logger.info("OSM data has same projection as projected crs in ini-file")

    def branch_to_prof(
        self, offset=0.0, vertex_end=False, rename_col=None, prefix="", suffix=""
    ):
        """Create profiles on branches from branch data"""

        gdf_out = self.copy()

        # interpolate over feature geometries
        if vertex_end:
            chainage = self.length - offset
            p = self.interpolate(chainage)
        else:
            chainage = offset
            p = self.interpolate(chainage)
        gdf_out.geometry = p
        gdf_out["offset"] = chainage

        if rename_col is not None:
            try:
                gdf_out["branch_id"] = gdf_out[rename_col]
                gdf_out[rename_col] = [
                    f"{prefix}{g[1][rename_col]}{suffix}" for g in self.iterrows()
                ]
            except Exception:
                raise ValueError(f"Column rename with '{rename_col}' did not succeed.")

        return gdf_out

    def merge_columns(self, col1, col2, rename_col):
        """merge columns"""

        if col1 or col2 in self.columns.to_numpy():
            try:
                self[rename_col] = self[col1] + self[col2]
            except Exception:
                raise ValueError(
                    f"Merge of two profile columns'{col1}' and '{col2}' did not succeed."
                )

    def snap_to_branch(self, branches, snap_method, maxdist=5):
        """Snap the geometries to the branch"""
        spatial.find_nearest_branch(
            branches=branches, geometries=self, method=snap_method, maxdist=maxdist
        )
branch_to_prof(offset=0.0, vertex_end=False, rename_col=None, prefix='', suffix='')

Create profiles on branches from branch data

Source code in hydrolib/dhydamo/io/common.py
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
def branch_to_prof(
    self, offset=0.0, vertex_end=False, rename_col=None, prefix="", suffix=""
):
    """Create profiles on branches from branch data"""

    gdf_out = self.copy()

    # interpolate over feature geometries
    if vertex_end:
        chainage = self.length - offset
        p = self.interpolate(chainage)
    else:
        chainage = offset
        p = self.interpolate(chainage)
    gdf_out.geometry = p
    gdf_out["offset"] = chainage

    if rename_col is not None:
        try:
            gdf_out["branch_id"] = gdf_out[rename_col]
            gdf_out[rename_col] = [
                f"{prefix}{g[1][rename_col]}{suffix}" for g in self.iterrows()
            ]
        except Exception:
            raise ValueError(f"Column rename with '{rename_col}' did not succeed.")

    return gdf_out
check_projection(crs_out)

Check if reprojection is required

Source code in hydrolib/dhydamo/io/common.py
376
377
378
379
380
381
382
383
def check_projection(self, crs_out):
    """
    Check if reprojection is required
    """
    if crs_out != self.crs:
        self.to_crs(crs_out, inplace=True)
    else:
        logger.info("OSM data has same projection as projected crs in ini-file")
clip(geometry: Polygon | MultiPolygon, cliptype: str = 'select', clip_and_drop: bool = True)

Clip geometry

Source code in hydrolib/dhydamo/io/common.py
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
def clip(self, geometry: Polygon | MultiPolygon, cliptype: str = "select", clip_and_drop: bool = True):
    """
    Clip geometry
    """
    if not isinstance(geometry, (Polygon, MultiPolygon)):
        raise TypeError("Expected geometry of type Polygon or MultiPolygon")

    # Clip if needed          
    if cliptype == "clip":
        pre_geomtypes = self.geom_type.unique().tolist()
        if "Polygon" in pre_geomtypes and "MultiPolygon" not in pre_geomtypes:
            pre_geomtypes.append("MultiPolygon")
        if "MultiPolygon" in pre_geomtypes and "Polygon" not in pre_geomtypes:
            pre_geomtypes.append("Polygon")
        gdf = gpd.clip(self, gpd.GeoDataFrame(geometry=[geometry], crs=self.crs))
        if clip_and_drop:
            # Reduce to allowed 
            gdf = gpd.GeoDataFrame(gdf, crs=gdf.crs)
            rem = gdf[~gdf.geom_type.isin(pre_geomtypes)]
            # Explode to deal with geometry collections
            rem = rem.explode(ignore_index=False, index_parts=False)
            rem = rem[rem.geom_type.isin(pre_geomtypes)]
            gdf = pd.concat([gdf[gdf.geom_type.isin(pre_geomtypes)], rem], ignore_index=False)
            # Merge duplicate indices to single multigeometry
            keep = []
            for idx, group in gdf.groupby(level=0, sort=False):
                if len(group) == 1:
                    keep.append(group.iloc[0].copy())
                elif len(group) > 1:
                    row = group.iloc[0].copy()
                    geoms = group.geometry.tolist()
                    geom = shapely.union_all(geoms)
                    if geom.geom_type not in pre_geomtypes:
                        logger.warning(
                            "Cannot deduplicate index '%s': clipping resulted in split geometries, using the first geometry.",
                            idx,
                        )
                        geom = geoms[0]
                    row.at["geometry"] = geom
                    keep.append(row)
            gdf = gpd.GeoDataFrame(keep, crs=gdf.crs)
    elif cliptype == "select":
        gdf = self.loc[self.intersects(geometry).to_numpy()]
    else:
        raise ValueError(f"Cliptype {cliptype} not recognized. Use 'clip' or 'select'.")
    if gdf.empty:
        raise ValueError("Found no features within extent geometry.")

    self.set_data(gdf)
copy(deep=True)

Create a copy

Source code in hydrolib/dhydamo/io/common.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
def copy(self, deep=True):
    """
    Create a copy
    """

    index = self.index.tolist() if not deep else deepcopy(self.index.tolist())
    columns = self.columns.tolist() if not deep else deepcopy(self.columns.tolist())

    edf = ExtendedGeoDataFrame(
        geotype=self.geotype,
        required_columns=[],
        index=index,
        columns=columns,
    )
    edf.required_columns.extend(self.required_columns[:])
    edf.loc[:, :] = self.values if not deep else deepcopy(self.values)

    return edf
delete_all()

Empty the dataframe

Source code in hydrolib/dhydamo/io/common.py
59
60
61
62
63
64
def delete_all(self):
    """
    Empty the dataframe
    """
    if not self.empty:
        self.drop(self.index, inplace=True)
merge_columns(col1, col2, rename_col)

merge columns

Source code in hydrolib/dhydamo/io/common.py
413
414
415
416
417
418
419
420
421
422
def merge_columns(self, col1, col2, rename_col):
    """merge columns"""

    if col1 or col2 in self.columns.to_numpy():
        try:
            self[rename_col] = self[col1] + self[col2]
        except Exception:
            raise ValueError(
                f"Merge of two profile columns'{col1}' and '{col2}' did not succeed."
            )
read_shp(path: str | Path, index_col: str = None, column_mapping: dict = None, check_columns: bool = True, proj_crs=None, clip: Polygon | MultiPolygon = None, check_geotype: bool = True, id_col: str = 'code', filter_cols: bool = False, filter_rows=None)

Import function, extended with type checks. Does not destroy reference to object.

Source code in hydrolib/dhydamo/io/common.py
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
def read_shp(
    self,
    path: str | Path,
    index_col: str = None,
    column_mapping: dict = None,
    check_columns: bool = True,
    proj_crs=None,
    clip: Polygon | MultiPolygon = None,
    check_geotype: bool = True,
    id_col: str = "code",
    filter_cols: bool = False,
    filter_rows=None,
):
    """
    Import function, extended with type checks. Does not destroy reference to object.
    """
    # Read GeoDataFrame
    gdf = gpd.read_file(path)

    # Only keep required columns
    if filter_cols:
        logger.info("Filtering required column keys")
        gdf.drop(
            columns=gdf.columns[~gdf.columns.isin(self.required_columns)],
            inplace=True,
        )

    # filter out rows on key/value pairs if required
    if filter_rows is not None:
        logger.info("Filter rows using key value pairs")
        filtered = (gdf[list(filter_rows)] == pd.Series(filter_rows)).all(axis=1)
        gdf = gdf[filtered]

    # Drop features without geometry
    total_features = len(gdf)
    gdf.drop(gdf.index[gdf.geometry.isna()], inplace=True)  # temporary fix
    missing_features = len(gdf.index[gdf.geometry.isna()])
    logger.debug(
        f"{missing_features} out of {total_features} do not have a geometry"
    )

    # Rename columns:
    if column_mapping is not None:
        gdf.rename(columns=column_mapping, inplace=True)

    # Check number of entries
    if gdf.empty:
        raise OSError("Imported shapefile contains no rows.")

    # Add data to class GeoDataFrame
    self.set_data(
        gdf,
        index_col=index_col,
        check_columns=check_columns,
        check_geotype=check_geotype,
    )

    # Clip if extent is provided
    if clip is not None:
        self.clip(clip)

    # To re-project CRS system to projected CRS, first all empty geometries should be dropped
    if proj_crs is not None:
        self.check_projection(proj_crs)
    else:
        logger.debug("No projected CRS is given in ini-file")
snap_to_branch(branches, snap_method, maxdist=5)

Snap the geometries to the branch

Source code in hydrolib/dhydamo/io/common.py
424
425
426
427
428
def snap_to_branch(self, branches, snap_method, maxdist=5):
    """Snap the geometries to the branch"""
    spatial.find_nearest_branch(
        branches=branches, geometries=self, method=snap_method, maxdist=maxdist
    )

dimrwriter

DIMRWriter

Temporary files for DIMR-configuration - to be superseded by Hydrolib-core functionality.

Source code in hydrolib/dhydamo/io/dimrwriter.py
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
class DIMRWriter:
    """Temporary files for DIMR-configuration - to be superseded by Hydrolib-core functionality."""

    @validate_arguments
    def __init__(
        self, dimr_path: str = None, output_path: str | Path = None
    ) -> None:
        """Class initialization for the DIMR writer. This is a temporary fix to include writers for RR and RTC, that are not yet fully included in Hydrolib-core, or have bugs. Eventually, this file should be redundant as this functionality is fully included in Hydrolib-core

        Args:
            dimr_path (Union[str, Path], optional): _description_. Defaults to None.
            output_path (Union[str, Path], optional): _description_. Defaults to None.
        """
        if dimr_path is None:
            self.run_dimr = r"C:\Program Files\Deltares\D-HYDRO Suite 2022.04 1D2D\plugins\DeltaShell.Dimr\kernels\x64\dimr\scripts\run_dimr.bat"
        else:
            self.run_dimr = dimr_path

        if output_path is None:
            self.output_path = Path(os.path.abspath("."))
        else:
            self.output_path = output_path

        self.template_dir = Path(os.path.abspath("."))

    @validate_arguments
    def write_runbat(self, debuglevel=6, runlog=None) -> None:
        """Generates a run.bat to run the model in DIMR. The path to the executable is provided by the user in the class initialization, or set to the default D-Hydro installation folder."""

        with open(os.path.join(self.output_path, "run.bat"), "w") as f:
            f.write("@ echo off\n")
            f.write("set OMP_NUM_THREADS=2\n")
            if runlog is None:
                f.write('call "' + str(self.run_dimr) + '" -d ' + str(debuglevel) + '\n')
            else:
                f.write('call "' + str(self.run_dimr) + '" -d ' + str(debuglevel) + ' > ' + str(runlog) + '\n')

    @validate_arguments
    def add_crs(self, netcdf_path: str | Path = None) -> None:
        """Reads the Netcdf file and addes the required attributes for a valid CRS."""        
        if netcdf_path is None:
            netfile = list((self.output_path / 'dflowfm').glob('*.nc'))[0]
            if not netfile.exists():
                raise FileNotFoundError(f"Netcdf file not found in {self.output_path / 'dflowfm'}. Provide the correct path via the netcdf_path argument.")
        else:
            if netcdf_path.is_dir():
                netfile = list(netcdf_path.glob('*.nc'))[0]
            elif netcdf_path.is_file():
                netfile = netcdf_path  
            else:        
                raise FileNotFoundError(f"Netcdf file not found at {netcdf_path}.")   
        netf = nc.Dataset(netfile, 'r+')
        netf.Conventions =  'CF-1.8 UGRID-1.0 Deltares-0.10'
        proj = netf.createVariable('projected_coordinate_system','i4')
        proj.epsg = 28992
        proj.grid_mapping_name = "Amersfoort / RD New" #"Rijksdriehoeksstelsel"
        proj.longitude_of_prime_meridian = 0.0#; // double
        proj.semi_major_axis = 6377397.155#; // double
        proj.semi_minor_axis = 6356078.962818189#; // double
        proj.inverse_flattening = 299.1528128#; // double
        proj.proj4_params = "+prdoj=sterea +lat_0=52.1561605555556 +lon_0=5.38763888888889 +k=0.9999079 +x_0=155000 +y_0=463000 +ellps=bessel +units=m +no_defs"
        proj.EPSG_code = "EPSG:28992"
        proj.value = "value is equal to EPSG code"
        proj.wkt = "PROJCS[\"Amersfoort / RD New\",\n    GEOGCS[\"Amersfoort\",\n        DATUM[\"Amersfoort\",\n            SPHEROID[\"Bessel 1841\",6377397.155,299.1528128,\n"             
        netf.close()   

    @staticmethod
    def _add_unique_coupler_item(
        coupler: ET.Element,
        source_name: str,
        target_name: str,
        seen_pairs: set[tuple[str, str]],
        gn_brackets: str,
    ) -> None:
        if source_name is None or target_name is None:
            return

        pair = (source_name, target_name)
        if pair in seen_pairs:
            return
        seen_pairs.add(pair)

        item = ET.SubElement(coupler, gn_brackets + "item")
        item.text = ""
        item.tail = "\n"

        source = ET.SubElement(item, gn_brackets + "sourceName")
        source.text = source_name
        source.tail = "\n"

        target = ET.SubElement(item, gn_brackets + "targetName")
        target.text = target_name
        target.tail = "\n"

    #@validate_arguments(config=ConfigDict(arbitrary_types_allowed=True))
    def write_dimrconfig(
        self, fm: FMModel, rr_model: DRRModel = None, rtc_model: DRTCModel = None
    ) -> None:
        """Construct the contents of a DIMR-config file, using elements from FM, RTC and RR models.

        Args:
            fm (instance of class FM-model): Hydrolib-core model object. When only fm is set, only a 1d2d model is generated.
            rr_model (instance of a DRRModel object, optional): Class containig all information from the RR-model. Defaults to None.
            rtc_model (instance of a DRTCmodel, optional): Class containig all information from the RTC-model. Defaults to None.
        """
        # initiate flags for RR and RTC blocks
        RR = False
        RTC = False
        if rr_model is not None:
            RR = True
        if rtc_model is not None:
            RTC = True

        generalname = "http://schemas.deltares.nl/dimr"  # Config
        xsi_name = "http://www.w3.org/2001/XMLSchema-instance"
        gn_brackets = "{" + generalname + "}"

        # registering namespaces
        ET.register_namespace("", generalname)
        ET.register_namespace("xsi", xsi_name)

        # Parsing xml file
        configfile = ET.parse(self.template_dir / "dimr_config.xml")

        myroot = configfile.getroot()
        myroot[1][1].text = fm.filepath.parts[-2]
        myroot[1][2].text = fm.filepath.name

        # control blocks
        control = ET.Element(gn_brackets + "control")
        control.text = ""
        control.tail = "\n"
        myroot.insert(1, control)

        if RR or RTC:
            parallel = ET.SubElement(control, gn_brackets + "parallel")
            parallel.tail = "\n"
            parallel.text = ""

        if RTC:
            startgrouprtc = ET.SubElement(parallel, gn_brackets + "startGroup")
            startgrouprtc.text = ""
            startgrouprtc.tail = "\n"

            timertc = ET.SubElement(startgrouprtc, gn_brackets + "time")
            timertc.text = f"{0} {rtc_model.time_settings['step']} {fm.time.tstop}"
            timertc.tail = "\n"
            if bool(rtc_model.pid_controllers) or bool(rtc_model.interval_controllers):
                couplerrtcrr = ET.SubElement(startgrouprtc, gn_brackets + "coupler")
                couplerrtcrr.attrib = {"name": "flow_to_rtc"}
                couplerrtcrr.tail = "\n"

            startrtc = ET.SubElement(startgrouprtc, gn_brackets + "start")
            startrtc.attrib = {"name": "Real_Time_Control"}
            startrtc.tail = "\n"
            couplerrtc = ET.SubElement(startgrouprtc, gn_brackets + "coupler")
            couplerrtc.attrib = {"name": "rtc_to_flow"}
            couplerrtc.tail = "\n"

        if RR:
            startgrouprr = ET.SubElement(parallel, gn_brackets + "startGroup")
            startgrouprr.text = ""
            startgrouprr.tail = "\n"

            timerr = ET.SubElement(startgrouprr, gn_brackets + "time")
            timerr.text = (
                f"{0} {rr_model.d3b_parameters['Timestepsize']} {fm.time.tstop}"
            )
            timerr.tail = "\n"
            couplerrrf = ET.SubElement(startgrouprr, gn_brackets + "coupler")
            couplerrrf.attrib = {"name": "flow_to_rr"}
            couplerrrf.tail = "\n"
            startrr = ET.SubElement(startgrouprr, gn_brackets + "start")
            startrr.attrib = {"name": "RR"}
            startrr.tail = "\n"
            couplerfrr = ET.SubElement(startgrouprr, gn_brackets + "coupler")
            couplerfrr.attrib = {"name": "rr_to_flow"}
            couplerfrr.tail = "\n"
        if RR or RTC:
            startfm = ET.SubElement(parallel, gn_brackets + "start")
        else:
            startfm = ET.SubElement(control, gn_brackets + "start")
        startfm.attrib = {"name": "DFM"}
        startfm.tail = "\n"

        # component blocks
        if RTC:
            comprtc = ET.Element(gn_brackets + "component")
            comprtc.attrib = {"name": "Real_Time_Control"}
            comprtc.tail = "\n"
            librtc = ET.SubElement(comprtc, gn_brackets + "library")
            librtc.text = "FBCTools_BMI"
            librtc.tail = "\n"
            wdrtc = ET.SubElement(comprtc, gn_brackets + "workingDir")
            wdrtc.text = "rtc"
            wdrtc.tail = "\n"
            frtc = ET.SubElement(comprtc, gn_brackets + "inputFile")
            frtc.text = "."
            frtc.tail = "\n"
            myroot.insert(2, comprtc)

        if RR:
            comprr = ET.Element(gn_brackets + "component")
            comprr.attrib = {"name": "RR"}
            comprr.tail = "\n"
            librr = ET.SubElement(comprr, gn_brackets + "library")
            librr.text = "rr_dll"
            librr.tail = "\n"
            wdrr = ET.SubElement(comprr, gn_brackets + "workingDir")
            wdrr.text = "rr"
            wdrr.tail = "\n"
            frr = ET.SubElement(comprr, gn_brackets + "inputFile")
            frr.text = "Sobek_3b.fnm"
            frr.tail = "\n"
            myroot.insert(4, comprr)

            # FM to RR coupler
            couplerfmrr = ET.Element(gn_brackets + "coupler")
            couplerfmrr.attrib = {"name": "flow_to_rr"}
            couplerfmrr.tail = "\n"

            sourcefmrr = ET.SubElement(couplerfmrr, gn_brackets + "sourceComponent")
            sourcefmrr.text = "DFM"
            sourcefmrr.tail = "\n"

            targetfmrr = ET.SubElement(couplerfmrr, gn_brackets + "targetComponent")
            targetfmrr.text = "RR"
            targetfmrr.tail = "\n"

            for i in rr_model.external_forcings.boundary_nodes.keys():
                item = ET.SubElement(couplerfmrr, gn_brackets + "item")
                item.text = ""
                item.tail = "\n"

                source = ET.SubElement(item, gn_brackets + "sourceName")
                source.text = f"laterals/{i}/water_level"
                source.tail = "\n"

                target = ET.SubElement(item, gn_brackets + "targetName")
                target.text = f"catchments/{i}/water_level"
                target.tail = "\n"

            logfmrr = ET.SubElement(couplerfmrr, gn_brackets + "logger")
            logfmrr.text = ""
            logfmrr.tail = "\n"

            wdlogfmrr = ET.SubElement(logfmrr, gn_brackets + "workingDir")
            wdlogfmrr.text = "."
            wdlogfmrr.tail = "\n"

            flogfmrr = ET.SubElement(logfmrr, gn_brackets + "outputFile")
            flogfmrr.text = "dflowfm_to_rr.nc"
            flogfmrr.tail = "\n"
            myroot.append(couplerfmrr)

        if RTC:
            # RTC to FM coupler
            couplerrtcfm = ET.Element(gn_brackets + "coupler")
            couplerrtcfm.attrib = {"name": "rtc_to_flow"}
            couplerrtcfm.tail = "\n"

            sourcertcfm = ET.SubElement(couplerrtcfm, gn_brackets + "sourceComponent")
            sourcertcfm.text = "Real_Time_Control"
            sourcertcfm.tail = "\n"

            targetrtcfm = ET.SubElement(couplerrtcfm, gn_brackets + "targetComponent")
            targetrtcfm.text = "DFM"
            targetrtcfm.tail = "\n"

            rtcdict = {
                      "Crest level (s)": ["weirs","crestLevel"],
                      "Gate lower edge level (s)": ["orifices","gateLowerEdgeLevel"],
                      "Capacity (p)": ["pumps","capacity"]
                      }

            rtc_to_flow_seen_pairs = set()

            for i in rtc_model.all_controllers.keys():
                svar=rtc_model.all_controllers[i]['steering_variable']
                self._add_unique_coupler_item(
                    couplerrtcfm,
                    f"[Output]{i}/{svar}",
                    f"{rtcdict[svar][0]}/{i}/{rtcdict[svar][1]}",
                    rtc_to_flow_seen_pairs,
                    gn_brackets,
                )

            # check if there are user-specified controller that should be included
            if rtc_model.complex_controllers is not None:
                complex_config = rtc_model.complex_controllers["dimr_config"][0]
                for block in complex_config:
                    block_tag = block.tag.split("}", 1)[-1] if block.tag.startswith("{") else block.tag
                    if block_tag == "coupler":
                        if block.attrib["name"].lower().startswith("rtc_to_flow"):
                            for iblock in block:
                                iblock_tag = iblock.tag.split("}", 1)[-1] if iblock.tag.startswith("{") else iblock.tag
                                if iblock_tag == "item":
                                    source = iblock.find(".//{*}sourceName")
                                    target = iblock.find(".//{*}targetName")
                                    self._add_unique_coupler_item(
                                        couplerrtcfm,
                                        source.text if source is not None else None,
                                        target.text if target is not None else None,
                                        rtc_to_flow_seen_pairs,
                                        gn_brackets,
                                    )

            logrtcfm = ET.SubElement(couplerrtcfm, gn_brackets + "logger")
            logrtcfm.text = ""
            logrtcfm.tail = "\n"

            wdlogrtcfm = ET.SubElement(logrtcfm, gn_brackets + "workingDir")
            wdlogrtcfm.text = "."
            wdlogrtcfm.tail = "\n"

            flogrtcfm = ET.SubElement(logrtcfm, gn_brackets + "outputFile")
            flogrtcfm.text = "rtc_to_dflowfm.nc"
            flogrtcfm.tail = "\n"

            myroot.append(couplerrtcfm)

            # the Fm to RTC coupler is not always needed. It is for PID controllers.
            coupler_exists = False
            flow_to_rtc_seen_pairs = set()
            if bool(rtc_model.pid_controllers) or bool(rtc_model.interval_controllers):
                couplerfmrtc = ET.Element(gn_brackets + "coupler")
                couplerfmrtc.attrib = {"name": "flow_to_rtc"}
                couplerfmrtc.tail = "\n"

                sourcefmrtc = ET.SubElement(
                    couplerfmrtc, gn_brackets + "sourceComponent"
                )
                sourcefmrtc.text = "DFM"
                sourcefmrtc.tail = "\n"

                targetfmrtc = ET.SubElement(
                    couplerfmrtc, gn_brackets + "targetComponent"
                )
                targetfmrtc.text = "Real_Time_Control"
                targetfmrtc.tail = "\n"

                for i in rtc_model.pid_controllers.keys():
                    if rtc_model.pid_controllers[i]['target_variable'] == 'Discharge (op)':
                        source_name = f"observations/{rtc_model.pid_controllers[i]['observation_point']}/discharge"
                    elif rtc_model.pid_controllers[i]['target_variable'] == 'Water level (op)':
                        source_name = f"observations/{rtc_model.pid_controllers[i]['observation_point']}/water_level"
                    else:
                        raise ValueError('Invalid target variable in controller: should bo discharge or water level.')
                    target_name = f"[Input]{rtc_model.pid_controllers[i]['observation_point']}/{rtc_model.pid_controllers[i]['target_variable']}"
                    self._add_unique_coupler_item(
                        couplerfmrtc,
                        source_name,
                        target_name,
                        flow_to_rtc_seen_pairs,
                        gn_brackets,
                    )

                # Loop through all interval controllers
                for i in rtc_model.interval_controllers.keys():
                    if rtc_model.interval_controllers[i]["target_variable"] == "Discharge (op)":
                        source_name = f"observations/{rtc_model.interval_controllers[i]['observation_point']}/discharge"
                    elif rtc_model.interval_controllers[i]["target_variable"] == "Water level (op)":
                        source_name = f"observations/{rtc_model.interval_controllers[i]['observation_point']}/water_level"
                    else:
                        raise ValueError("Invalid target variable in controller: should be discharge or water level.")
                    target_name = f"[Input]{rtc_model.interval_controllers[i]['observation_point']}/{rtc_model.interval_controllers[i]['target_variable']}"
                    self._add_unique_coupler_item(
                        couplerfmrtc,
                        source_name,
                        target_name,
                        flow_to_rtc_seen_pairs,
                        gn_brackets,
                    )

                coupler_exists = True
            # it could be that are no PID controllers, but there are complex controllers
            if rtc_model.complex_controllers is not None:
                complex_config = rtc_model.complex_controllers["dimr_config"][0]
                for block in complex_config:
                    block_tag = block.tag.split("}", 1)[-1] if block.tag.startswith("{") else block.tag
                    if block_tag == "coupler":
                        if block.attrib["name"].lower().startswith("flow_to_rtc"):
                            if not coupler_exists:
                                couplerfmrtc = ET.Element(gn_brackets + "coupler")
                                couplerfmrtc.attrib = {"name": "flow_to_rtc"}
                                couplerfmrtc.tail = "\n"

                                sourcefmrtc = ET.SubElement(
                                    couplerfmrtc, gn_brackets + "sourceComponent"
                                )
                                sourcefmrtc.text = "DFM"
                                sourcefmrtc.tail = "\n"

                                targetfmrtc = ET.SubElement(
                                    couplerfmrtc, gn_brackets + "targetComponent"
                                )
                                targetfmrtc.text = "Real_Time_Control"
                                targetfmrtc.tail = "\n"

                                coupler_exists = True
                            for iblock in block:
                                iblock_tag = iblock.tag.split("}", 1)[-1] if iblock.tag.startswith("{") else iblock.tag
                                if iblock_tag == "item":
                                    source = iblock.find(".//{*}sourceName")
                                    target = iblock.find(".//{*}targetName")
                                    self._add_unique_coupler_item(
                                        couplerfmrtc,
                                        source.text if source is not None else None,
                                        target.text if target is not None else None,
                                        flow_to_rtc_seen_pairs,
                                        gn_brackets,
                                    )
            # in any of those two cases, add the controllers and the logger
            if coupler_exists:
                logrtc = ET.SubElement(couplerfmrtc, gn_brackets + "logger")
                logrtc.text = ""
                logrtc.tail = "\n"

                wdlogrtc = ET.SubElement(logrtc, gn_brackets + "workingDir")
                wdlogrtc.text = "."
                wdlogrtc.tail = "\n"

                flogrtc = ET.SubElement(logrtc, gn_brackets + "outputFile")
                flogrtc.text = "dflowfm_to_rtc.nc"
                flogrtc.tail = "\n"

                myroot.append(couplerfmrtc)

        if RR:
            # finally the RR to FM coupler
            couplerrrfm = ET.Element(gn_brackets + "coupler")
            couplerrrfm.attrib = {"name": "rr_to_flow"}
            couplerrrfm.tail = "\n"

            sourcerrfm = ET.SubElement(couplerrrfm, gn_brackets + "sourceComponent")
            sourcerrfm.text = "RR"
            sourcerrfm.tail = "\n"

            targetrrfm = ET.SubElement(couplerrrfm, gn_brackets + "targetComponent")
            targetrrfm.text = "DFM"
            targetrrfm.tail = "\n"

            for i in rr_model.external_forcings.boundary_nodes.keys():
                item = ET.SubElement(couplerrrfm, gn_brackets + "item")
                item.text = ""
                item.tail = "\n"

                source = ET.SubElement(item, gn_brackets + "sourceName")
                source.text = f"catchments/{i}/water_discharge"
                source.tail = "\n"

                target = ET.SubElement(item, gn_brackets + "targetName")
                target.text = f"laterals/{i}/water_discharge"
                target.tail = "\n"

            logrrfm = ET.SubElement(couplerrrfm, gn_brackets + "logger")
            logrrfm.text = ""
            logrrfm.tail = "\n"

            wdlogrrfm = ET.SubElement(logrrfm, gn_brackets + "workingDir")
            wdlogrrfm.text = "."
            wdlogrrfm.tail = "\n"

            flogrrfm = ET.SubElement(logrrfm, gn_brackets + "outputFile")
            flogrrfm.text = "rr_to_dflowfm.nc"
            flogrrfm.tail = "\n"

            myroot.append(couplerrrfm)
        # use the function from DRTC to finish the file with the correct namespace
        DRTCModel.finish_file(myroot, configfile, self.output_path / "dimr_config.xml")
__init__(dimr_path: str = None, output_path: str | Path = None) -> None

Class initialization for the DIMR writer. This is a temporary fix to include writers for RR and RTC, that are not yet fully included in Hydrolib-core, or have bugs. Eventually, this file should be redundant as this functionality is fully included in Hydrolib-core

Parameters:

Name Type Description Default
dimr_path Union[str, Path]

description. Defaults to None.

None
output_path Union[str, Path]

description. Defaults to None.

None
Source code in hydrolib/dhydamo/io/dimrwriter.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@validate_arguments
def __init__(
    self, dimr_path: str = None, output_path: str | Path = None
) -> None:
    """Class initialization for the DIMR writer. This is a temporary fix to include writers for RR and RTC, that are not yet fully included in Hydrolib-core, or have bugs. Eventually, this file should be redundant as this functionality is fully included in Hydrolib-core

    Args:
        dimr_path (Union[str, Path], optional): _description_. Defaults to None.
        output_path (Union[str, Path], optional): _description_. Defaults to None.
    """
    if dimr_path is None:
        self.run_dimr = r"C:\Program Files\Deltares\D-HYDRO Suite 2022.04 1D2D\plugins\DeltaShell.Dimr\kernels\x64\dimr\scripts\run_dimr.bat"
    else:
        self.run_dimr = dimr_path

    if output_path is None:
        self.output_path = Path(os.path.abspath("."))
    else:
        self.output_path = output_path

    self.template_dir = Path(os.path.abspath("."))
add_crs(netcdf_path: str | Path = None) -> None

Reads the Netcdf file and addes the required attributes for a valid CRS.

Source code in hydrolib/dhydamo/io/dimrwriter.py
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
@validate_arguments
def add_crs(self, netcdf_path: str | Path = None) -> None:
    """Reads the Netcdf file and addes the required attributes for a valid CRS."""        
    if netcdf_path is None:
        netfile = list((self.output_path / 'dflowfm').glob('*.nc'))[0]
        if not netfile.exists():
            raise FileNotFoundError(f"Netcdf file not found in {self.output_path / 'dflowfm'}. Provide the correct path via the netcdf_path argument.")
    else:
        if netcdf_path.is_dir():
            netfile = list(netcdf_path.glob('*.nc'))[0]
        elif netcdf_path.is_file():
            netfile = netcdf_path  
        else:        
            raise FileNotFoundError(f"Netcdf file not found at {netcdf_path}.")   
    netf = nc.Dataset(netfile, 'r+')
    netf.Conventions =  'CF-1.8 UGRID-1.0 Deltares-0.10'
    proj = netf.createVariable('projected_coordinate_system','i4')
    proj.epsg = 28992
    proj.grid_mapping_name = "Amersfoort / RD New" #"Rijksdriehoeksstelsel"
    proj.longitude_of_prime_meridian = 0.0#; // double
    proj.semi_major_axis = 6377397.155#; // double
    proj.semi_minor_axis = 6356078.962818189#; // double
    proj.inverse_flattening = 299.1528128#; // double
    proj.proj4_params = "+prdoj=sterea +lat_0=52.1561605555556 +lon_0=5.38763888888889 +k=0.9999079 +x_0=155000 +y_0=463000 +ellps=bessel +units=m +no_defs"
    proj.EPSG_code = "EPSG:28992"
    proj.value = "value is equal to EPSG code"
    proj.wkt = "PROJCS[\"Amersfoort / RD New\",\n    GEOGCS[\"Amersfoort\",\n        DATUM[\"Amersfoort\",\n            SPHEROID[\"Bessel 1841\",6377397.155,299.1528128,\n"             
    netf.close()   
write_dimrconfig(fm: FMModel, rr_model: DRRModel = None, rtc_model: DRTCModel = None) -> None

Construct the contents of a DIMR-config file, using elements from FM, RTC and RR models.

Parameters:

Name Type Description Default
fm instance of class FM-model

Hydrolib-core model object. When only fm is set, only a 1d2d model is generated.

required
rr_model instance of a DRRModel object

Class containig all information from the RR-model. Defaults to None.

None
rtc_model instance of a DRTCmodel

Class containig all information from the RTC-model. Defaults to None.

None
Source code in hydrolib/dhydamo/io/dimrwriter.py
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
def write_dimrconfig(
    self, fm: FMModel, rr_model: DRRModel = None, rtc_model: DRTCModel = None
) -> None:
    """Construct the contents of a DIMR-config file, using elements from FM, RTC and RR models.

    Args:
        fm (instance of class FM-model): Hydrolib-core model object. When only fm is set, only a 1d2d model is generated.
        rr_model (instance of a DRRModel object, optional): Class containig all information from the RR-model. Defaults to None.
        rtc_model (instance of a DRTCmodel, optional): Class containig all information from the RTC-model. Defaults to None.
    """
    # initiate flags for RR and RTC blocks
    RR = False
    RTC = False
    if rr_model is not None:
        RR = True
    if rtc_model is not None:
        RTC = True

    generalname = "http://schemas.deltares.nl/dimr"  # Config
    xsi_name = "http://www.w3.org/2001/XMLSchema-instance"
    gn_brackets = "{" + generalname + "}"

    # registering namespaces
    ET.register_namespace("", generalname)
    ET.register_namespace("xsi", xsi_name)

    # Parsing xml file
    configfile = ET.parse(self.template_dir / "dimr_config.xml")

    myroot = configfile.getroot()
    myroot[1][1].text = fm.filepath.parts[-2]
    myroot[1][2].text = fm.filepath.name

    # control blocks
    control = ET.Element(gn_brackets + "control")
    control.text = ""
    control.tail = "\n"
    myroot.insert(1, control)

    if RR or RTC:
        parallel = ET.SubElement(control, gn_brackets + "parallel")
        parallel.tail = "\n"
        parallel.text = ""

    if RTC:
        startgrouprtc = ET.SubElement(parallel, gn_brackets + "startGroup")
        startgrouprtc.text = ""
        startgrouprtc.tail = "\n"

        timertc = ET.SubElement(startgrouprtc, gn_brackets + "time")
        timertc.text = f"{0} {rtc_model.time_settings['step']} {fm.time.tstop}"
        timertc.tail = "\n"
        if bool(rtc_model.pid_controllers) or bool(rtc_model.interval_controllers):
            couplerrtcrr = ET.SubElement(startgrouprtc, gn_brackets + "coupler")
            couplerrtcrr.attrib = {"name": "flow_to_rtc"}
            couplerrtcrr.tail = "\n"

        startrtc = ET.SubElement(startgrouprtc, gn_brackets + "start")
        startrtc.attrib = {"name": "Real_Time_Control"}
        startrtc.tail = "\n"
        couplerrtc = ET.SubElement(startgrouprtc, gn_brackets + "coupler")
        couplerrtc.attrib = {"name": "rtc_to_flow"}
        couplerrtc.tail = "\n"

    if RR:
        startgrouprr = ET.SubElement(parallel, gn_brackets + "startGroup")
        startgrouprr.text = ""
        startgrouprr.tail = "\n"

        timerr = ET.SubElement(startgrouprr, gn_brackets + "time")
        timerr.text = (
            f"{0} {rr_model.d3b_parameters['Timestepsize']} {fm.time.tstop}"
        )
        timerr.tail = "\n"
        couplerrrf = ET.SubElement(startgrouprr, gn_brackets + "coupler")
        couplerrrf.attrib = {"name": "flow_to_rr"}
        couplerrrf.tail = "\n"
        startrr = ET.SubElement(startgrouprr, gn_brackets + "start")
        startrr.attrib = {"name": "RR"}
        startrr.tail = "\n"
        couplerfrr = ET.SubElement(startgrouprr, gn_brackets + "coupler")
        couplerfrr.attrib = {"name": "rr_to_flow"}
        couplerfrr.tail = "\n"
    if RR or RTC:
        startfm = ET.SubElement(parallel, gn_brackets + "start")
    else:
        startfm = ET.SubElement(control, gn_brackets + "start")
    startfm.attrib = {"name": "DFM"}
    startfm.tail = "\n"

    # component blocks
    if RTC:
        comprtc = ET.Element(gn_brackets + "component")
        comprtc.attrib = {"name": "Real_Time_Control"}
        comprtc.tail = "\n"
        librtc = ET.SubElement(comprtc, gn_brackets + "library")
        librtc.text = "FBCTools_BMI"
        librtc.tail = "\n"
        wdrtc = ET.SubElement(comprtc, gn_brackets + "workingDir")
        wdrtc.text = "rtc"
        wdrtc.tail = "\n"
        frtc = ET.SubElement(comprtc, gn_brackets + "inputFile")
        frtc.text = "."
        frtc.tail = "\n"
        myroot.insert(2, comprtc)

    if RR:
        comprr = ET.Element(gn_brackets + "component")
        comprr.attrib = {"name": "RR"}
        comprr.tail = "\n"
        librr = ET.SubElement(comprr, gn_brackets + "library")
        librr.text = "rr_dll"
        librr.tail = "\n"
        wdrr = ET.SubElement(comprr, gn_brackets + "workingDir")
        wdrr.text = "rr"
        wdrr.tail = "\n"
        frr = ET.SubElement(comprr, gn_brackets + "inputFile")
        frr.text = "Sobek_3b.fnm"
        frr.tail = "\n"
        myroot.insert(4, comprr)

        # FM to RR coupler
        couplerfmrr = ET.Element(gn_brackets + "coupler")
        couplerfmrr.attrib = {"name": "flow_to_rr"}
        couplerfmrr.tail = "\n"

        sourcefmrr = ET.SubElement(couplerfmrr, gn_brackets + "sourceComponent")
        sourcefmrr.text = "DFM"
        sourcefmrr.tail = "\n"

        targetfmrr = ET.SubElement(couplerfmrr, gn_brackets + "targetComponent")
        targetfmrr.text = "RR"
        targetfmrr.tail = "\n"

        for i in rr_model.external_forcings.boundary_nodes.keys():
            item = ET.SubElement(couplerfmrr, gn_brackets + "item")
            item.text = ""
            item.tail = "\n"

            source = ET.SubElement(item, gn_brackets + "sourceName")
            source.text = f"laterals/{i}/water_level"
            source.tail = "\n"

            target = ET.SubElement(item, gn_brackets + "targetName")
            target.text = f"catchments/{i}/water_level"
            target.tail = "\n"

        logfmrr = ET.SubElement(couplerfmrr, gn_brackets + "logger")
        logfmrr.text = ""
        logfmrr.tail = "\n"

        wdlogfmrr = ET.SubElement(logfmrr, gn_brackets + "workingDir")
        wdlogfmrr.text = "."
        wdlogfmrr.tail = "\n"

        flogfmrr = ET.SubElement(logfmrr, gn_brackets + "outputFile")
        flogfmrr.text = "dflowfm_to_rr.nc"
        flogfmrr.tail = "\n"
        myroot.append(couplerfmrr)

    if RTC:
        # RTC to FM coupler
        couplerrtcfm = ET.Element(gn_brackets + "coupler")
        couplerrtcfm.attrib = {"name": "rtc_to_flow"}
        couplerrtcfm.tail = "\n"

        sourcertcfm = ET.SubElement(couplerrtcfm, gn_brackets + "sourceComponent")
        sourcertcfm.text = "Real_Time_Control"
        sourcertcfm.tail = "\n"

        targetrtcfm = ET.SubElement(couplerrtcfm, gn_brackets + "targetComponent")
        targetrtcfm.text = "DFM"
        targetrtcfm.tail = "\n"

        rtcdict = {
                  "Crest level (s)": ["weirs","crestLevel"],
                  "Gate lower edge level (s)": ["orifices","gateLowerEdgeLevel"],
                  "Capacity (p)": ["pumps","capacity"]
                  }

        rtc_to_flow_seen_pairs = set()

        for i in rtc_model.all_controllers.keys():
            svar=rtc_model.all_controllers[i]['steering_variable']
            self._add_unique_coupler_item(
                couplerrtcfm,
                f"[Output]{i}/{svar}",
                f"{rtcdict[svar][0]}/{i}/{rtcdict[svar][1]}",
                rtc_to_flow_seen_pairs,
                gn_brackets,
            )

        # check if there are user-specified controller that should be included
        if rtc_model.complex_controllers is not None:
            complex_config = rtc_model.complex_controllers["dimr_config"][0]
            for block in complex_config:
                block_tag = block.tag.split("}", 1)[-1] if block.tag.startswith("{") else block.tag
                if block_tag == "coupler":
                    if block.attrib["name"].lower().startswith("rtc_to_flow"):
                        for iblock in block:
                            iblock_tag = iblock.tag.split("}", 1)[-1] if iblock.tag.startswith("{") else iblock.tag
                            if iblock_tag == "item":
                                source = iblock.find(".//{*}sourceName")
                                target = iblock.find(".//{*}targetName")
                                self._add_unique_coupler_item(
                                    couplerrtcfm,
                                    source.text if source is not None else None,
                                    target.text if target is not None else None,
                                    rtc_to_flow_seen_pairs,
                                    gn_brackets,
                                )

        logrtcfm = ET.SubElement(couplerrtcfm, gn_brackets + "logger")
        logrtcfm.text = ""
        logrtcfm.tail = "\n"

        wdlogrtcfm = ET.SubElement(logrtcfm, gn_brackets + "workingDir")
        wdlogrtcfm.text = "."
        wdlogrtcfm.tail = "\n"

        flogrtcfm = ET.SubElement(logrtcfm, gn_brackets + "outputFile")
        flogrtcfm.text = "rtc_to_dflowfm.nc"
        flogrtcfm.tail = "\n"

        myroot.append(couplerrtcfm)

        # the Fm to RTC coupler is not always needed. It is for PID controllers.
        coupler_exists = False
        flow_to_rtc_seen_pairs = set()
        if bool(rtc_model.pid_controllers) or bool(rtc_model.interval_controllers):
            couplerfmrtc = ET.Element(gn_brackets + "coupler")
            couplerfmrtc.attrib = {"name": "flow_to_rtc"}
            couplerfmrtc.tail = "\n"

            sourcefmrtc = ET.SubElement(
                couplerfmrtc, gn_brackets + "sourceComponent"
            )
            sourcefmrtc.text = "DFM"
            sourcefmrtc.tail = "\n"

            targetfmrtc = ET.SubElement(
                couplerfmrtc, gn_brackets + "targetComponent"
            )
            targetfmrtc.text = "Real_Time_Control"
            targetfmrtc.tail = "\n"

            for i in rtc_model.pid_controllers.keys():
                if rtc_model.pid_controllers[i]['target_variable'] == 'Discharge (op)':
                    source_name = f"observations/{rtc_model.pid_controllers[i]['observation_point']}/discharge"
                elif rtc_model.pid_controllers[i]['target_variable'] == 'Water level (op)':
                    source_name = f"observations/{rtc_model.pid_controllers[i]['observation_point']}/water_level"
                else:
                    raise ValueError('Invalid target variable in controller: should bo discharge or water level.')
                target_name = f"[Input]{rtc_model.pid_controllers[i]['observation_point']}/{rtc_model.pid_controllers[i]['target_variable']}"
                self._add_unique_coupler_item(
                    couplerfmrtc,
                    source_name,
                    target_name,
                    flow_to_rtc_seen_pairs,
                    gn_brackets,
                )

            # Loop through all interval controllers
            for i in rtc_model.interval_controllers.keys():
                if rtc_model.interval_controllers[i]["target_variable"] == "Discharge (op)":
                    source_name = f"observations/{rtc_model.interval_controllers[i]['observation_point']}/discharge"
                elif rtc_model.interval_controllers[i]["target_variable"] == "Water level (op)":
                    source_name = f"observations/{rtc_model.interval_controllers[i]['observation_point']}/water_level"
                else:
                    raise ValueError("Invalid target variable in controller: should be discharge or water level.")
                target_name = f"[Input]{rtc_model.interval_controllers[i]['observation_point']}/{rtc_model.interval_controllers[i]['target_variable']}"
                self._add_unique_coupler_item(
                    couplerfmrtc,
                    source_name,
                    target_name,
                    flow_to_rtc_seen_pairs,
                    gn_brackets,
                )

            coupler_exists = True
        # it could be that are no PID controllers, but there are complex controllers
        if rtc_model.complex_controllers is not None:
            complex_config = rtc_model.complex_controllers["dimr_config"][0]
            for block in complex_config:
                block_tag = block.tag.split("}", 1)[-1] if block.tag.startswith("{") else block.tag
                if block_tag == "coupler":
                    if block.attrib["name"].lower().startswith("flow_to_rtc"):
                        if not coupler_exists:
                            couplerfmrtc = ET.Element(gn_brackets + "coupler")
                            couplerfmrtc.attrib = {"name": "flow_to_rtc"}
                            couplerfmrtc.tail = "\n"

                            sourcefmrtc = ET.SubElement(
                                couplerfmrtc, gn_brackets + "sourceComponent"
                            )
                            sourcefmrtc.text = "DFM"
                            sourcefmrtc.tail = "\n"

                            targetfmrtc = ET.SubElement(
                                couplerfmrtc, gn_brackets + "targetComponent"
                            )
                            targetfmrtc.text = "Real_Time_Control"
                            targetfmrtc.tail = "\n"

                            coupler_exists = True
                        for iblock in block:
                            iblock_tag = iblock.tag.split("}", 1)[-1] if iblock.tag.startswith("{") else iblock.tag
                            if iblock_tag == "item":
                                source = iblock.find(".//{*}sourceName")
                                target = iblock.find(".//{*}targetName")
                                self._add_unique_coupler_item(
                                    couplerfmrtc,
                                    source.text if source is not None else None,
                                    target.text if target is not None else None,
                                    flow_to_rtc_seen_pairs,
                                    gn_brackets,
                                )
        # in any of those two cases, add the controllers and the logger
        if coupler_exists:
            logrtc = ET.SubElement(couplerfmrtc, gn_brackets + "logger")
            logrtc.text = ""
            logrtc.tail = "\n"

            wdlogrtc = ET.SubElement(logrtc, gn_brackets + "workingDir")
            wdlogrtc.text = "."
            wdlogrtc.tail = "\n"

            flogrtc = ET.SubElement(logrtc, gn_brackets + "outputFile")
            flogrtc.text = "dflowfm_to_rtc.nc"
            flogrtc.tail = "\n"

            myroot.append(couplerfmrtc)

    if RR:
        # finally the RR to FM coupler
        couplerrrfm = ET.Element(gn_brackets + "coupler")
        couplerrrfm.attrib = {"name": "rr_to_flow"}
        couplerrrfm.tail = "\n"

        sourcerrfm = ET.SubElement(couplerrrfm, gn_brackets + "sourceComponent")
        sourcerrfm.text = "RR"
        sourcerrfm.tail = "\n"

        targetrrfm = ET.SubElement(couplerrrfm, gn_brackets + "targetComponent")
        targetrrfm.text = "DFM"
        targetrrfm.tail = "\n"

        for i in rr_model.external_forcings.boundary_nodes.keys():
            item = ET.SubElement(couplerrrfm, gn_brackets + "item")
            item.text = ""
            item.tail = "\n"

            source = ET.SubElement(item, gn_brackets + "sourceName")
            source.text = f"catchments/{i}/water_discharge"
            source.tail = "\n"

            target = ET.SubElement(item, gn_brackets + "targetName")
            target.text = f"laterals/{i}/water_discharge"
            target.tail = "\n"

        logrrfm = ET.SubElement(couplerrrfm, gn_brackets + "logger")
        logrrfm.text = ""
        logrrfm.tail = "\n"

        wdlogrrfm = ET.SubElement(logrrfm, gn_brackets + "workingDir")
        wdlogrrfm.text = "."
        wdlogrrfm.tail = "\n"

        flogrrfm = ET.SubElement(logrrfm, gn_brackets + "outputFile")
        flogrrfm.text = "rr_to_dflowfm.nc"
        flogrrfm.tail = "\n"

        myroot.append(couplerrrfm)
    # use the function from DRTC to finish the file with the correct namespace
    DRTCModel.finish_file(myroot, configfile, self.output_path / "dimr_config.xml")
write_runbat(debuglevel=6, runlog=None) -> None

Generates a run.bat to run the model in DIMR. The path to the executable is provided by the user in the class initialization, or set to the default D-Hydro installation folder.

Source code in hydrolib/dhydamo/io/dimrwriter.py
39
40
41
42
43
44
45
46
47
48
49
@validate_arguments
def write_runbat(self, debuglevel=6, runlog=None) -> None:
    """Generates a run.bat to run the model in DIMR. The path to the executable is provided by the user in the class initialization, or set to the default D-Hydro installation folder."""

    with open(os.path.join(self.output_path, "run.bat"), "w") as f:
        f.write("@ echo off\n")
        f.write("set OMP_NUM_THREADS=2\n")
        if runlog is None:
            f.write('call "' + str(self.run_dimr) + '" -d ' + str(debuglevel) + '\n')
        else:
            f.write('call "' + str(self.run_dimr) + '" -d ' + str(debuglevel) + ' > ' + str(runlog) + '\n')

drrreader

ExternalForcingsIO

Source code in hydrolib/dhydamo/io/drrreader.py
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
class ExternalForcingsIO:
    def __init__(self, external_forcings):
        self.external_forcings = external_forcings

    @validate_arguments(config=ConfigDict(arbitrary_types_allowed=True))
    def seepage_from_input(
        self, catchments: ExtendedGeoDataFrame, seepage_folder: Path | str
    ) -> None:
        """Perform zonal statistics to derive seepage time series per catchment. Time steps are derived from the data

        Args:
            catchments (ExtendedGeoDataFrame): catchment areas
            seepage_folder (str): folder where the seepage rasters are stored
        """
        warnings.filterwarnings("ignore")        
        file_list = os.listdir(seepage_folder)
        file_list = [file for file in file_list if file.lower()]        
        times = []
        convert_units=False
        arr = np.zeros((len(file_list), len(catchments.code)))
        for ifile, file in tqdm(
            enumerate(file_list), total=len(file_list), desc="Reading seepage files"
        ):
            if file.endswith('.idf'):
                dataset = idfreader.open(os.path.join(seepage_folder, file))                                
                array = dataset.squeeze().to_numpy()
                header = idfreader.header(os.path.join(seepage_folder, file), pattern=None)
                affine = from_origin(
                    header["xmin"], header["ymax"], header["dx"], header["dx"]
                )
                time = header['time']
                convert_units=True
            else:
                array, affine, time = self.external_forcings.drrmodel.read_raster(
                    os.path.join(seepage_folder, file)
                )
            times.append(time)
            stats = zonal_stats(
                 gpd.GeoDataFrame(catchments), array, affine=affine, stats="mean", all_touched=True
            )
            arr[ifile, :] = [s["mean"] for s in stats]
        result = pd.DataFrame(
            arr, columns=["sep_" + str(cat) for cat in catchments.code]
        )
        result.index = times
        if convert_units:
            # if an NHI model (IDF files) is used, convert units from m3 to mm/d
            result = (result / (1e-3 * (affine[0] * -affine[4]))) / (
                    (times[2] - times[1]).total_seconds() / 86400.0
            )
        [self.external_forcings.add_seepage(*sep) for sep in result.items()]


    @validate_arguments(config=ConfigDict(arbitrary_types_allowed=True))
    def precip_from_input(
        self,
        areas: ExtendedGeoDataFrame,
        precip_folder: Path | str = None,
        precip_file: Path | str = None,
    ) -> None:
        """Create time series of precipitation for every meteo_area, based on zonal statistics from rasters.

        Args:
            areas (ExtendedGeoDataFrame): meteo areas for which time series are created
            precip_folder (str, optional): folder where precipitation rasters are stored. Only used if no precip_file is given. Defaults to None.
            precip_file (str, optional): existing meteo-file, which is used if available.
        """
        if precip_file is None:
            warnings.filterwarnings("ignore")
            file_list = os.listdir(precip_folder)
            times = []
            arr = np.zeros((len(file_list), len(areas.code)))
            for ifile, file in tqdm(
                enumerate(file_list),
                total=len(file_list),
                desc="Reading precipitation files",
            ):
                array, affine, time = self.external_forcings.drrmodel.read_raster(
                    os.path.join(precip_folder, file)
                )
                times.append(time)
                stats = zonal_stats(
                     gpd.GeoDataFrame(areas), array, affine=affine, stats="mean", all_touched=True
                )
                arr[ifile, :] = [s["mean"] for s in stats]
            result = pd.DataFrame(
                arr, columns=["ms_" + str(area) for area in areas.code]
            )
            result.index = times
            [self.external_forcings.add_precip(*prec) for prec in result.items()]
        else:
            self.external_forcings.precip = str(precip_file)

    @validate_arguments(config=ConfigDict(arbitrary_types_allowed=True))
    def evap_from_input(
        self,
        areas: ExtendedGeoDataFrame,
        evap_folder: Path | str = None,
        evap_file: Path | str = None,
    ) -> None:
        """Create time series of evaporation for every meteo_area, based on zonal statistics from rasters.

        Args:
            areas (ExtendedGeoDataFrame): meteo areas for which time series are created
            evap_folder (str, optional): folder where precipitation rasters are stored. Only used if no precip_file is given. Defaults to None.
            evap_file (str, optional): existing meteo-file, which is used if available.
        """
        if evap_file is None:
            warnings.filterwarnings("ignore")
            file_list = os.listdir(evap_folder)
            # aggregated evap
            # areas['dissolve'] = 1
            # agg_areas = areas.iloc[0:len(areas),:].dissolve(by='dissolve',aggfunc='mean')
            times = []
            arr = np.zeros((len(file_list), len(areas)))
            for ifile, file in tqdm(
                enumerate(file_list),
                total=len(file_list),
                desc="Reading evaporation files",
            ):
                array, affine, time = self.external_forcings.drrmodel.read_raster(
                    os.path.join(evap_folder, file)
                )
                times.append(time)
                stats = zonal_stats(
                     gpd.GeoDataFrame(areas), array, affine=affine, stats="mean", all_touched=True
                )
                arr[ifile, :] = [s["mean"] for s in stats]
            result = pd.DataFrame(
                arr, columns=["ms_" + str(area) for area in areas.code]
            )
            result.index = times
            [self.external_forcings.add_evap(*evap) for evap in result.items()]
        else:
            self.external_forcings.evap = str(evap_file)

    @validate_arguments(config=ConfigDict(arbitrary_types_allowed=True))
    def boundary_from_input(
        self,
        boundary_nodes: ExtendedGeoDataFrame,
        catchments: ExtendedGeoDataFrame,
        drrmodel,
        overflows: ExtendedGeoDataFrame = None,
        greenhouse_laterals: ExtendedGeoDataFrame=None
    ) -> None:
        """Generate RR-boundary nodes to link to RR-nodes and to FM-laterals.

        Args:
            boundary_nodes (ExtendedGeoDataFrame): boundary nodes
            catchments (ExtendedGeoDataFrame): catchment areas associated with them
            drrmodel (_type_): drrmodel object
            overflows (ExtendedGeoDataFrame, optional): overflow locations, if applicable. Defaults to None.
            greenhouse_laterals (ExtendedGeoDataFrame, optional): overflow locations, if applicable. Defaults to None.

        """
        # find the catchments that have no area attached and no nodes that will be attached to the boundary
        not_occurring = []
        for cat in catchments.itertuples():
            occurs = False
            if cat.boundary_node in [
                val["boundary_node"]
                for val in drrmodel.unpaved.unp_nodes.values()
                if np.sum([float(d) for d in val["ar"].split(" ")]) > 0.0
            ]:
                occurs = True
            if cat.boundary_node in [
                val["boundary_node"]
                for val in drrmodel.paved.pav_nodes.values()
                if float(val["ar"]) > 0.0
            ]:
                occurs = True
            if cat.boundary_node in [
                val["boundary_node"]
                for val in drrmodel.greenhouse.gh_nodes.values()
                if float(val["ar"]) > 0.0
            ]:
                occurs = True
            if cat.boundary_node in [
                val["boundary_node"]
                for val in drrmodel.openwater.ow_nodes.values()
                if float(val["ar"]) > 0.0
            ]:
                occurs = True
            if not occurs:
                not_occurring.append(cat.boundary_node)


        drop_idx = catchments[catchments.boundary_node.isin(not_occurring)].index.to_list()
        if any(drop_idx):
            logger.warning(
                "%d catchments removed because of an area of 0 m2.",
                len(drop_idx),
            )
            catchments.drop(drop_idx, inplace=True)

        for i in not_occurring:
            catchments.drop(
                catchments[catchments.boundary_node == i].code.iloc[0],
                axis=0,
                inplace=True,
            )

        numlats = len(catchments)
        if overflows is not None:
            numlats = numlats + len(overflows)
        if greenhouse_laterals is not None:
            numlats = numlats + len(greenhouse_laterals)

        bnd_drr = ExtendedDataFrame(required_columns=["id"])
        bnd_drr.set_data(
            pd.DataFrame(
                np.zeros((numlats, 3)), columns=["id", "px", "py"], dtype="str"
            ),
            index_col="id",
        )
        index = catchments.code
        if overflows is not None:
            index = pd.concat([index, overflows.code], ignore_index=True)
        if greenhouse_laterals is not None:
            index = pd.concat([index, greenhouse_laterals.code], ignore_index=True)

        bnd_drr.index = index
        for num, cat in enumerate(catchments.itertuples()):
            # logger.info(num, cat.code)
            if boundary_nodes[boundary_nodes["globalid"] == cat.lateraleknoopid].empty:
                # raise IndexError(f'{cat.code} not connected to a boundary node. Skipping.')
                logger.warning(
                    f"{cat.code} not connected to a boundary node. Skipping."
                )
                continue
            bnd_drr.at[cat.code, "id"] = f'lat_{cat.code}'
            bnd_drr.at[cat.code, "px"] = str(
                boundary_nodes[boundary_nodes["globalid"] == cat.lateraleknoopid][
                    "geometry"
                ].x.iloc[0]
            ).strip()
            bnd_drr.at[cat.code, "py"] = str(
                boundary_nodes[boundary_nodes["globalid"] == cat.lateraleknoopid][
                    "geometry"
                ].y.iloc[0]
            ).strip()
        if overflows is not None:
            logger.info("Adding overflows to the boundary nodes.")
            for num, ovf in enumerate(overflows.itertuples()):
                bnd_drr.at[ovf.code, "id"] = str(ovf.code)
                bnd_drr.at[ovf.code, "px"] = str(ovf.geometry.coords[0][0])
                bnd_drr.at[ovf.code, "py"] = str(ovf.geometry.coords[0][1])
        if greenhouse_laterals is not None:
            logger.info("Adding greenhouse_laterals to the boundary nodes.")
            for num, gh in enumerate(greenhouse_laterals.itertuples()):
                bnd_drr.at[gh.code, "id"] = str(gh.code)
                bnd_drr.at[gh.code, "px"] = str(gh.geometry.coords[0][0])
                bnd_drr.at[gh.code, "py"] = str(gh.geometry.coords[0][1])
        [
            self.external_forcings.add_boundary_node(**bnd)
            for bnd in bnd_drr.to_dict("records")
        ]
boundary_from_input(boundary_nodes: ExtendedGeoDataFrame, catchments: ExtendedGeoDataFrame, drrmodel, overflows: ExtendedGeoDataFrame = None, greenhouse_laterals: ExtendedGeoDataFrame = None) -> None

Generate RR-boundary nodes to link to RR-nodes and to FM-laterals.

Parameters:

Name Type Description Default
boundary_nodes ExtendedGeoDataFrame

boundary nodes

required
catchments ExtendedGeoDataFrame

catchment areas associated with them

required
drrmodel _type_

drrmodel object

required
overflows ExtendedGeoDataFrame

overflow locations, if applicable. Defaults to None.

None
greenhouse_laterals ExtendedGeoDataFrame

overflow locations, if applicable. Defaults to None.

None
Source code in hydrolib/dhydamo/io/drrreader.py
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
@validate_arguments(config=ConfigDict(arbitrary_types_allowed=True))
def boundary_from_input(
    self,
    boundary_nodes: ExtendedGeoDataFrame,
    catchments: ExtendedGeoDataFrame,
    drrmodel,
    overflows: ExtendedGeoDataFrame = None,
    greenhouse_laterals: ExtendedGeoDataFrame=None
) -> None:
    """Generate RR-boundary nodes to link to RR-nodes and to FM-laterals.

    Args:
        boundary_nodes (ExtendedGeoDataFrame): boundary nodes
        catchments (ExtendedGeoDataFrame): catchment areas associated with them
        drrmodel (_type_): drrmodel object
        overflows (ExtendedGeoDataFrame, optional): overflow locations, if applicable. Defaults to None.
        greenhouse_laterals (ExtendedGeoDataFrame, optional): overflow locations, if applicable. Defaults to None.

    """
    # find the catchments that have no area attached and no nodes that will be attached to the boundary
    not_occurring = []
    for cat in catchments.itertuples():
        occurs = False
        if cat.boundary_node in [
            val["boundary_node"]
            for val in drrmodel.unpaved.unp_nodes.values()
            if np.sum([float(d) for d in val["ar"].split(" ")]) > 0.0
        ]:
            occurs = True
        if cat.boundary_node in [
            val["boundary_node"]
            for val in drrmodel.paved.pav_nodes.values()
            if float(val["ar"]) > 0.0
        ]:
            occurs = True
        if cat.boundary_node in [
            val["boundary_node"]
            for val in drrmodel.greenhouse.gh_nodes.values()
            if float(val["ar"]) > 0.0
        ]:
            occurs = True
        if cat.boundary_node in [
            val["boundary_node"]
            for val in drrmodel.openwater.ow_nodes.values()
            if float(val["ar"]) > 0.0
        ]:
            occurs = True
        if not occurs:
            not_occurring.append(cat.boundary_node)


    drop_idx = catchments[catchments.boundary_node.isin(not_occurring)].index.to_list()
    if any(drop_idx):
        logger.warning(
            "%d catchments removed because of an area of 0 m2.",
            len(drop_idx),
        )
        catchments.drop(drop_idx, inplace=True)

    for i in not_occurring:
        catchments.drop(
            catchments[catchments.boundary_node == i].code.iloc[0],
            axis=0,
            inplace=True,
        )

    numlats = len(catchments)
    if overflows is not None:
        numlats = numlats + len(overflows)
    if greenhouse_laterals is not None:
        numlats = numlats + len(greenhouse_laterals)

    bnd_drr = ExtendedDataFrame(required_columns=["id"])
    bnd_drr.set_data(
        pd.DataFrame(
            np.zeros((numlats, 3)), columns=["id", "px", "py"], dtype="str"
        ),
        index_col="id",
    )
    index = catchments.code
    if overflows is not None:
        index = pd.concat([index, overflows.code], ignore_index=True)
    if greenhouse_laterals is not None:
        index = pd.concat([index, greenhouse_laterals.code], ignore_index=True)

    bnd_drr.index = index
    for num, cat in enumerate(catchments.itertuples()):
        # logger.info(num, cat.code)
        if boundary_nodes[boundary_nodes["globalid"] == cat.lateraleknoopid].empty:
            # raise IndexError(f'{cat.code} not connected to a boundary node. Skipping.')
            logger.warning(
                f"{cat.code} not connected to a boundary node. Skipping."
            )
            continue
        bnd_drr.at[cat.code, "id"] = f'lat_{cat.code}'
        bnd_drr.at[cat.code, "px"] = str(
            boundary_nodes[boundary_nodes["globalid"] == cat.lateraleknoopid][
                "geometry"
            ].x.iloc[0]
        ).strip()
        bnd_drr.at[cat.code, "py"] = str(
            boundary_nodes[boundary_nodes["globalid"] == cat.lateraleknoopid][
                "geometry"
            ].y.iloc[0]
        ).strip()
    if overflows is not None:
        logger.info("Adding overflows to the boundary nodes.")
        for num, ovf in enumerate(overflows.itertuples()):
            bnd_drr.at[ovf.code, "id"] = str(ovf.code)
            bnd_drr.at[ovf.code, "px"] = str(ovf.geometry.coords[0][0])
            bnd_drr.at[ovf.code, "py"] = str(ovf.geometry.coords[0][1])
    if greenhouse_laterals is not None:
        logger.info("Adding greenhouse_laterals to the boundary nodes.")
        for num, gh in enumerate(greenhouse_laterals.itertuples()):
            bnd_drr.at[gh.code, "id"] = str(gh.code)
            bnd_drr.at[gh.code, "px"] = str(gh.geometry.coords[0][0])
            bnd_drr.at[gh.code, "py"] = str(gh.geometry.coords[0][1])
    [
        self.external_forcings.add_boundary_node(**bnd)
        for bnd in bnd_drr.to_dict("records")
    ]
evap_from_input(areas: ExtendedGeoDataFrame, evap_folder: Path | str = None, evap_file: Path | str = None) -> None

Create time series of evaporation for every meteo_area, based on zonal statistics from rasters.

Parameters:

Name Type Description Default
areas ExtendedGeoDataFrame

meteo areas for which time series are created

required
evap_folder str

folder where precipitation rasters are stored. Only used if no precip_file is given. Defaults to None.

None
evap_file str

existing meteo-file, which is used if available.

None
Source code in hydrolib/dhydamo/io/drrreader.py
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
@validate_arguments(config=ConfigDict(arbitrary_types_allowed=True))
def evap_from_input(
    self,
    areas: ExtendedGeoDataFrame,
    evap_folder: Path | str = None,
    evap_file: Path | str = None,
) -> None:
    """Create time series of evaporation for every meteo_area, based on zonal statistics from rasters.

    Args:
        areas (ExtendedGeoDataFrame): meteo areas for which time series are created
        evap_folder (str, optional): folder where precipitation rasters are stored. Only used if no precip_file is given. Defaults to None.
        evap_file (str, optional): existing meteo-file, which is used if available.
    """
    if evap_file is None:
        warnings.filterwarnings("ignore")
        file_list = os.listdir(evap_folder)
        # aggregated evap
        # areas['dissolve'] = 1
        # agg_areas = areas.iloc[0:len(areas),:].dissolve(by='dissolve',aggfunc='mean')
        times = []
        arr = np.zeros((len(file_list), len(areas)))
        for ifile, file in tqdm(
            enumerate(file_list),
            total=len(file_list),
            desc="Reading evaporation files",
        ):
            array, affine, time = self.external_forcings.drrmodel.read_raster(
                os.path.join(evap_folder, file)
            )
            times.append(time)
            stats = zonal_stats(
                 gpd.GeoDataFrame(areas), array, affine=affine, stats="mean", all_touched=True
            )
            arr[ifile, :] = [s["mean"] for s in stats]
        result = pd.DataFrame(
            arr, columns=["ms_" + str(area) for area in areas.code]
        )
        result.index = times
        [self.external_forcings.add_evap(*evap) for evap in result.items()]
    else:
        self.external_forcings.evap = str(evap_file)
precip_from_input(areas: ExtendedGeoDataFrame, precip_folder: Path | str = None, precip_file: Path | str = None) -> None

Create time series of precipitation for every meteo_area, based on zonal statistics from rasters.

Parameters:

Name Type Description Default
areas ExtendedGeoDataFrame

meteo areas for which time series are created

required
precip_folder str

folder where precipitation rasters are stored. Only used if no precip_file is given. Defaults to None.

None
precip_file str

existing meteo-file, which is used if available.

None
Source code in hydrolib/dhydamo/io/drrreader.py
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
@validate_arguments(config=ConfigDict(arbitrary_types_allowed=True))
def precip_from_input(
    self,
    areas: ExtendedGeoDataFrame,
    precip_folder: Path | str = None,
    precip_file: Path | str = None,
) -> None:
    """Create time series of precipitation for every meteo_area, based on zonal statistics from rasters.

    Args:
        areas (ExtendedGeoDataFrame): meteo areas for which time series are created
        precip_folder (str, optional): folder where precipitation rasters are stored. Only used if no precip_file is given. Defaults to None.
        precip_file (str, optional): existing meteo-file, which is used if available.
    """
    if precip_file is None:
        warnings.filterwarnings("ignore")
        file_list = os.listdir(precip_folder)
        times = []
        arr = np.zeros((len(file_list), len(areas.code)))
        for ifile, file in tqdm(
            enumerate(file_list),
            total=len(file_list),
            desc="Reading precipitation files",
        ):
            array, affine, time = self.external_forcings.drrmodel.read_raster(
                os.path.join(precip_folder, file)
            )
            times.append(time)
            stats = zonal_stats(
                 gpd.GeoDataFrame(areas), array, affine=affine, stats="mean", all_touched=True
            )
            arr[ifile, :] = [s["mean"] for s in stats]
        result = pd.DataFrame(
            arr, columns=["ms_" + str(area) for area in areas.code]
        )
        result.index = times
        [self.external_forcings.add_precip(*prec) for prec in result.items()]
    else:
        self.external_forcings.precip = str(precip_file)
seepage_from_input(catchments: ExtendedGeoDataFrame, seepage_folder: Path | str) -> None

Perform zonal statistics to derive seepage time series per catchment. Time steps are derived from the data

Parameters:

Name Type Description Default
catchments ExtendedGeoDataFrame

catchment areas

required
seepage_folder str

folder where the seepage rasters are stored

required
Source code in hydrolib/dhydamo/io/drrreader.py
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
@validate_arguments(config=ConfigDict(arbitrary_types_allowed=True))
def seepage_from_input(
    self, catchments: ExtendedGeoDataFrame, seepage_folder: Path | str
) -> None:
    """Perform zonal statistics to derive seepage time series per catchment. Time steps are derived from the data

    Args:
        catchments (ExtendedGeoDataFrame): catchment areas
        seepage_folder (str): folder where the seepage rasters are stored
    """
    warnings.filterwarnings("ignore")        
    file_list = os.listdir(seepage_folder)
    file_list = [file for file in file_list if file.lower()]        
    times = []
    convert_units=False
    arr = np.zeros((len(file_list), len(catchments.code)))
    for ifile, file in tqdm(
        enumerate(file_list), total=len(file_list), desc="Reading seepage files"
    ):
        if file.endswith('.idf'):
            dataset = idfreader.open(os.path.join(seepage_folder, file))                                
            array = dataset.squeeze().to_numpy()
            header = idfreader.header(os.path.join(seepage_folder, file), pattern=None)
            affine = from_origin(
                header["xmin"], header["ymax"], header["dx"], header["dx"]
            )
            time = header['time']
            convert_units=True
        else:
            array, affine, time = self.external_forcings.drrmodel.read_raster(
                os.path.join(seepage_folder, file)
            )
        times.append(time)
        stats = zonal_stats(
             gpd.GeoDataFrame(catchments), array, affine=affine, stats="mean", all_touched=True
        )
        arr[ifile, :] = [s["mean"] for s in stats]
    result = pd.DataFrame(
        arr, columns=["sep_" + str(cat) for cat in catchments.code]
    )
    result.index = times
    if convert_units:
        # if an NHI model (IDF files) is used, convert units from m3 to mm/d
        result = (result / (1e-3 * (affine[0] * -affine[4]))) / (
                (times[2] - times[1]).total_seconds() / 86400.0
        )
    [self.external_forcings.add_seepage(*sep) for sep in result.items()]

GreenhouseIO

Source code in hydrolib/dhydamo/io/drrreader.py
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
class GreenhouseIO:
    def __init__(self, greenhouse):
        self.greenhouse = greenhouse

    @validate_arguments(config=ConfigDict(arbitrary_types_allowed=True))
    def greenhouse_from_input(
        self,
        catchments: ExtendedGeoDataFrame,
        landuse: Path | str,
        surface_level: Path | str,
        roof_storage: StrictStr | float,
        meteo_areas: ExtendedGeoDataFrame,
        zonalstats_alltouched: bool = None,        
        greenhouse_areas: ExtendedGeoDataFrame=None,
        greenhouse_laterals: ExtendedGeoDataFrame=None,
        basin_storage_class: int=2    
    ) -> None:
        """Generate contents of an RR greenhouse node

        Args:
            catchments (ExtendedGeoDataFrame): catchment areas
            greenhouuse_areas (ExtendedGeoDataFrame): known set of greenhouse areas with attiributes
            greenhouse_laterals (ExtendedGeoDataFrame) : known set of outlet points for greenhouses
            landuse (str): filename of land use raster
            surface_level (str): filename of surface level raster
            roofstorage (Union): float for spatially uniform sewer storage (mm), or raster for distributed values
            meteo_areas (ExtendedGeoDataFrame): meteo areas, for each station a meteo time series is assigned
            zonalstats_alltouched (bool, optional): method to carry out zonal statistis, see rasterstats docx. Defaults to False.onalstats_alltouched (bool, optional): method to. Defaults to False.
        """
        all_touched = False if zonalstats_alltouched is None else zonalstats_alltouched

        lu_rast, lu_affine = self.greenhouse.drrmodel.read_raster(landuse, static=True)
        lu_counts = zonal_stats(
            gpd.GeoDataFrame(catchments),
            lu_rast.astype(int),
            affine=lu_affine,
            categorical=True,
            all_touched=all_touched,
        )
        rast, affine = self.greenhouse.drrmodel.read_raster(surface_level, static=True)
        mean_elev = zonal_stats(
            gpd.GeoDataFrame(catchments), rast.astype(float), affine=affine, stats="median", all_touched=all_touched
        )
        if greenhouse_areas is not None:
            mean_elev_gh = zonal_stats(
                 gpd.GeoDataFrame(greenhouse_areas), rast.astype(float), affine=affine, stats="median", all_touched=all_touched
            )

        # optional rasters
        if isinstance(roof_storage, (Path, str)):
            rast, affine = self.greenhouse.drrmodel.read_raster(
                roof_storage, static=True
            )
            roofstors = zonal_stats(
                 gpd.GeoDataFrame(catchments), rast.astype(float), affine=affine, stats="mean", all_touched=True
            )
            if greenhouse_areas is not None:
                roofstors_gh = zonal_stats(
                     gpd.GeoDataFrame(greenhouse_areas), rast.astype(float), affine=affine, stats="mean", all_touched=True
                )

        # get raster cellsize
        px_area = lu_affine[0] * -lu_affine[4]


        numgh = catchments.shape[0]
        indexgh = catchments.code
        if greenhouse_areas is not None:
            numgh = numgh + greenhouse_areas.shape[0]
            indexgh = indexgh + greenhouse_areas.code

        gh_drr = ExtendedDataFrame(required_columns=["id"])
        gh_drr.set_data(
            pd.DataFrame(
                np.zeros((numgh, 8)),
                columns=[
                    "id",
                    "area",
                    "surface_level",
                    "roof_storage",
                    "meteo_area",
                    "px",
                    "py",
                    "boundary_node",
                ],
                dtype="str",
            ),
            index_col="id",
        )
        gh_drr.index = indexgh
        if greenhouse_areas is not None:
            for num, gh in enumerate(greenhouse_areas.itertuples()):
                # find corresponding meteo-station
                if mean_elev_gh[num]["median"] is None:
                    logger.warning(NO_RASTERDATA_WARNING, gh.code)
                    continue
                tm = [
                    m
                    for m in meteo_areas.itertuples()
                    if m.geometry.contains(gh.geometry.centroid)
                ]
                ms = meteo_areas.iloc[0, :][0] if tm == [] else tm[0].code

                elev = mean_elev_gh[num]["median"]
                gh_drr.at[gh.code, "id"] = str(gh.code)
                gh_drr.at[gh.code, "area"] = gh.geometry.area
                gh_drr.at[gh.code, "surface_level"] = f"{elev:.2f}"
                if hasattr(gh, 'roof_storage_mm') & (~(np.isnan(gh.roof_storage_mm))):
                    gh_drr.at[gh.code, "roof_storage"] = f"{gh.roof_storage_mm:.2f}"                                                 
                if isinstance(roof_storage, float):
                    gh_drr.at[gh.code, "roof_storage"] = f"{roof_storage:.2f}"
                else:
                    gh_drr.at[gh.code, "roof_storage"] = f'{roofstors_gh[num]["mean"]:.2f}'
                if hasattr(gh, 'basin_storage_class') & (~(np.isnan(gh.basin_storage_class))):
                    gh_drr.at[gh.code, 'basin_storage_class'] = f"{gh.basin_storage_class:g}"
                else:
                    gh_drr.at[gh.code, "basin_storage_class"] = f'{basin_storage_class:g}'
                gh_drr.at[gh.code, "meteo_area"] = str(ms)
                gh_drr.at[gh.code, "px"] = f"{gh.geometry.centroid.coords[0][0]:.0f}"
                gh_drr.at[gh.code, "py"] = f"{gh.geometry.centroid.coords[0][1]:.0f}"
                latcode = greenhouse_laterals[greenhouse_laterals.codegerelateerdobject == gh.code].code.to_numpy()[0]
                gh_drr.at[gh.code, "boundary_node"] = str(latcode)           
            [self.greenhouse.add_greenhouse(**gh) for gh in gh_drr.to_dict("records")]

        for num, cat in enumerate(catchments.itertuples()):
            # if no rasterdata could be obtained for this catchment, skip it.
            if mean_elev[num]["median"] is None:
                logger.warning(NO_RASTERDATA_WARNING, cat.code)
                continue

            # find corresponding meteo-station
            tm = [
                m
                for m in meteo_areas.itertuples()
                if m.geometry.contains(cat.geometry.centroid)
            ]
            ms = meteo_areas.iloc[0, :][0] if tm == [] else tm[0].code

            if greenhouse_areas is not None:
                if cat.geometry.intersects(greenhouse_areas.geometry).any():
                    intersection_area = cat.geometry.intersection(greenhouse_areas.geometry).area
                    intersection_area = intersection_area[intersection_area > 0.].to_numpy()[0]                    
                    if 15 in lu_counts[num]:
                        # divide area to subtract between greenhouses and the most occurring area                                                
                        logger.info(
                            "Catchment: %s: subtracting %s m2 from greenhouse area in landuse map.",
                            cat.code,
                            np.min([(lu_counts[num][15] * px_area, intersection_area)]),
                        )
                        lu_counts[num][15] = np.max([0., (lu_counts[num][15] - np.round(intersection_area/px_area))])                                                               

            elev = mean_elev[num]["median"]
            gh_drr.at[cat.code, "id"] = str(cat.code)
            gh_drr.at[cat.code, "area"] = (
                str(lu_counts[num][15] * px_area) if 15 in lu_counts[num] else "0"
            )
            gh_drr.at[cat.code, "surface_level"] = f"{elev:.2f}"
            if isinstance(roof_storage, float):
                gh_drr.at[cat.code, "roof_storage"] = f"{roof_storage:.2f}"
            else:
                gh_drr.at[cat.code, "roof_storage"] = f'{roofstors[num]["mean"]:.2f}'
            gh_drr.at[cat.code, "basin_storage_class"] = f'{basin_storage_class:g}'
            gh_drr.at[cat.code, "meteo_area"] = str(ms)
            gh_drr.at[cat.code, "px"] = f"{cat.geometry.centroid.coords[0][0]+20:.0f}"
            gh_drr.at[cat.code, "py"] = f"{cat.geometry.centroid.coords[0][1]:.0f}"
            gh_drr.at[cat.code, "boundary_node"] = cat.boundary_node
        [self.greenhouse.add_greenhouse(**gh) for gh in gh_drr.to_dict("records")]
greenhouse_from_input(catchments: ExtendedGeoDataFrame, landuse: Path | str, surface_level: Path | str, roof_storage: StrictStr | float, meteo_areas: ExtendedGeoDataFrame, zonalstats_alltouched: bool = None, greenhouse_areas: ExtendedGeoDataFrame = None, greenhouse_laterals: ExtendedGeoDataFrame = None, basin_storage_class: int = 2) -> None

Generate contents of an RR greenhouse node

Parameters:

Name Type Description Default
catchments ExtendedGeoDataFrame

catchment areas

required
greenhouuse_areas ExtendedGeoDataFrame

known set of greenhouse areas with attiributes

required
greenhouse_laterals (ExtendedGeoDataFrame)

known set of outlet points for greenhouses

required
landuse str

filename of land use raster

required
surface_level str

filename of surface level raster

required
roofstorage Union

float for spatially uniform sewer storage (mm), or raster for distributed values

required
meteo_areas ExtendedGeoDataFrame

meteo areas, for each station a meteo time series is assigned

required
zonalstats_alltouched bool

method to carry out zonal statistis, see rasterstats docx. Defaults to False.onalstats_alltouched (bool, optional): method to. Defaults to False.

None
Source code in hydrolib/dhydamo/io/drrreader.py
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
@validate_arguments(config=ConfigDict(arbitrary_types_allowed=True))
def greenhouse_from_input(
    self,
    catchments: ExtendedGeoDataFrame,
    landuse: Path | str,
    surface_level: Path | str,
    roof_storage: StrictStr | float,
    meteo_areas: ExtendedGeoDataFrame,
    zonalstats_alltouched: bool = None,        
    greenhouse_areas: ExtendedGeoDataFrame=None,
    greenhouse_laterals: ExtendedGeoDataFrame=None,
    basin_storage_class: int=2    
) -> None:
    """Generate contents of an RR greenhouse node

    Args:
        catchments (ExtendedGeoDataFrame): catchment areas
        greenhouuse_areas (ExtendedGeoDataFrame): known set of greenhouse areas with attiributes
        greenhouse_laterals (ExtendedGeoDataFrame) : known set of outlet points for greenhouses
        landuse (str): filename of land use raster
        surface_level (str): filename of surface level raster
        roofstorage (Union): float for spatially uniform sewer storage (mm), or raster for distributed values
        meteo_areas (ExtendedGeoDataFrame): meteo areas, for each station a meteo time series is assigned
        zonalstats_alltouched (bool, optional): method to carry out zonal statistis, see rasterstats docx. Defaults to False.onalstats_alltouched (bool, optional): method to. Defaults to False.
    """
    all_touched = False if zonalstats_alltouched is None else zonalstats_alltouched

    lu_rast, lu_affine = self.greenhouse.drrmodel.read_raster(landuse, static=True)
    lu_counts = zonal_stats(
        gpd.GeoDataFrame(catchments),
        lu_rast.astype(int),
        affine=lu_affine,
        categorical=True,
        all_touched=all_touched,
    )
    rast, affine = self.greenhouse.drrmodel.read_raster(surface_level, static=True)
    mean_elev = zonal_stats(
        gpd.GeoDataFrame(catchments), rast.astype(float), affine=affine, stats="median", all_touched=all_touched
    )
    if greenhouse_areas is not None:
        mean_elev_gh = zonal_stats(
             gpd.GeoDataFrame(greenhouse_areas), rast.astype(float), affine=affine, stats="median", all_touched=all_touched
        )

    # optional rasters
    if isinstance(roof_storage, (Path, str)):
        rast, affine = self.greenhouse.drrmodel.read_raster(
            roof_storage, static=True
        )
        roofstors = zonal_stats(
             gpd.GeoDataFrame(catchments), rast.astype(float), affine=affine, stats="mean", all_touched=True
        )
        if greenhouse_areas is not None:
            roofstors_gh = zonal_stats(
                 gpd.GeoDataFrame(greenhouse_areas), rast.astype(float), affine=affine, stats="mean", all_touched=True
            )

    # get raster cellsize
    px_area = lu_affine[0] * -lu_affine[4]


    numgh = catchments.shape[0]
    indexgh = catchments.code
    if greenhouse_areas is not None:
        numgh = numgh + greenhouse_areas.shape[0]
        indexgh = indexgh + greenhouse_areas.code

    gh_drr = ExtendedDataFrame(required_columns=["id"])
    gh_drr.set_data(
        pd.DataFrame(
            np.zeros((numgh, 8)),
            columns=[
                "id",
                "area",
                "surface_level",
                "roof_storage",
                "meteo_area",
                "px",
                "py",
                "boundary_node",
            ],
            dtype="str",
        ),
        index_col="id",
    )
    gh_drr.index = indexgh
    if greenhouse_areas is not None:
        for num, gh in enumerate(greenhouse_areas.itertuples()):
            # find corresponding meteo-station
            if mean_elev_gh[num]["median"] is None:
                logger.warning(NO_RASTERDATA_WARNING, gh.code)
                continue
            tm = [
                m
                for m in meteo_areas.itertuples()
                if m.geometry.contains(gh.geometry.centroid)
            ]
            ms = meteo_areas.iloc[0, :][0] if tm == [] else tm[0].code

            elev = mean_elev_gh[num]["median"]
            gh_drr.at[gh.code, "id"] = str(gh.code)
            gh_drr.at[gh.code, "area"] = gh.geometry.area
            gh_drr.at[gh.code, "surface_level"] = f"{elev:.2f}"
            if hasattr(gh, 'roof_storage_mm') & (~(np.isnan(gh.roof_storage_mm))):
                gh_drr.at[gh.code, "roof_storage"] = f"{gh.roof_storage_mm:.2f}"                                                 
            if isinstance(roof_storage, float):
                gh_drr.at[gh.code, "roof_storage"] = f"{roof_storage:.2f}"
            else:
                gh_drr.at[gh.code, "roof_storage"] = f'{roofstors_gh[num]["mean"]:.2f}'
            if hasattr(gh, 'basin_storage_class') & (~(np.isnan(gh.basin_storage_class))):
                gh_drr.at[gh.code, 'basin_storage_class'] = f"{gh.basin_storage_class:g}"
            else:
                gh_drr.at[gh.code, "basin_storage_class"] = f'{basin_storage_class:g}'
            gh_drr.at[gh.code, "meteo_area"] = str(ms)
            gh_drr.at[gh.code, "px"] = f"{gh.geometry.centroid.coords[0][0]:.0f}"
            gh_drr.at[gh.code, "py"] = f"{gh.geometry.centroid.coords[0][1]:.0f}"
            latcode = greenhouse_laterals[greenhouse_laterals.codegerelateerdobject == gh.code].code.to_numpy()[0]
            gh_drr.at[gh.code, "boundary_node"] = str(latcode)           
        [self.greenhouse.add_greenhouse(**gh) for gh in gh_drr.to_dict("records")]

    for num, cat in enumerate(catchments.itertuples()):
        # if no rasterdata could be obtained for this catchment, skip it.
        if mean_elev[num]["median"] is None:
            logger.warning(NO_RASTERDATA_WARNING, cat.code)
            continue

        # find corresponding meteo-station
        tm = [
            m
            for m in meteo_areas.itertuples()
            if m.geometry.contains(cat.geometry.centroid)
        ]
        ms = meteo_areas.iloc[0, :][0] if tm == [] else tm[0].code

        if greenhouse_areas is not None:
            if cat.geometry.intersects(greenhouse_areas.geometry).any():
                intersection_area = cat.geometry.intersection(greenhouse_areas.geometry).area
                intersection_area = intersection_area[intersection_area > 0.].to_numpy()[0]                    
                if 15 in lu_counts[num]:
                    # divide area to subtract between greenhouses and the most occurring area                                                
                    logger.info(
                        "Catchment: %s: subtracting %s m2 from greenhouse area in landuse map.",
                        cat.code,
                        np.min([(lu_counts[num][15] * px_area, intersection_area)]),
                    )
                    lu_counts[num][15] = np.max([0., (lu_counts[num][15] - np.round(intersection_area/px_area))])                                                               

        elev = mean_elev[num]["median"]
        gh_drr.at[cat.code, "id"] = str(cat.code)
        gh_drr.at[cat.code, "area"] = (
            str(lu_counts[num][15] * px_area) if 15 in lu_counts[num] else "0"
        )
        gh_drr.at[cat.code, "surface_level"] = f"{elev:.2f}"
        if isinstance(roof_storage, float):
            gh_drr.at[cat.code, "roof_storage"] = f"{roof_storage:.2f}"
        else:
            gh_drr.at[cat.code, "roof_storage"] = f'{roofstors[num]["mean"]:.2f}'
        gh_drr.at[cat.code, "basin_storage_class"] = f'{basin_storage_class:g}'
        gh_drr.at[cat.code, "meteo_area"] = str(ms)
        gh_drr.at[cat.code, "px"] = f"{cat.geometry.centroid.coords[0][0]+20:.0f}"
        gh_drr.at[cat.code, "py"] = f"{cat.geometry.centroid.coords[0][1]:.0f}"
        gh_drr.at[cat.code, "boundary_node"] = cat.boundary_node
    [self.greenhouse.add_greenhouse(**gh) for gh in gh_drr.to_dict("records")]

OpenwaterIO

Source code in hydrolib/dhydamo/io/drrreader.py
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
class OpenwaterIO:
    def __init__(self, openwater):
        self.openwater = openwater

    @validate_arguments(config=ConfigDict(arbitrary_types_allowed=True))
    def openwater_from_input(
        self,
        catchments: ExtendedGeoDataFrame,
        landuse: Path | str,
        meteo_areas: ExtendedGeoDataFrame,
        zonalstats_alltouched: bool = None,
    ) -> None:
        """Generate contents of an RR open water node.

        Args:
            catchments (ExtendedGeoDataFrame): catchment areas
            landuse (str): filename of landuse raster
            meteo_areas (ExtendedGeoDataFrame): meteo areas, for each station a meteo time series is assigned
            zonalstats_alltouched (bool, optional): method to carry out zonal statistis, see rasterstats docx. Defaults to False.onalstats_alltouched (bool, optional): method to. Defaults to False.

        Returns:
            _type_: _description_
        """
        all_touched = False if zonalstats_alltouched is None else zonalstats_alltouched

        lu_rast, lu_affine = self.openwater.drrmodel.read_raster(landuse, static=True)
        lu_counts = zonal_stats(
            gpd.GeoDataFrame(catchments),
            lu_rast.astype(int),
            affine=lu_affine,
            categorical=True,
            all_touched=all_touched,
        )

        # get raster cellsize
        px_area = lu_affine[0] * -lu_affine[4]

        ow_drr = ExtendedDataFrame(required_columns=["id"])
        ow_drr.set_data(
            pd.DataFrame(
                np.zeros((len(catchments), 6)),
                columns=["id", "area", "meteo_area", "px", "py", "boundary_node"],
                dtype="str",
            ),
            index_col="id",
        )
        ow_drr.index = catchments.code
        for num, cat in enumerate(catchments.itertuples()):
            # find corresponding meteo-station
            tm = [
                m
                for m in meteo_areas.itertuples()
                if m.geometry.contains(cat.geometry.centroid)
            ]
            ms = meteo_areas.iloc[0, :][0] if tm == [] else tm[0].code

            ow_drr.at[cat.code, "id"] = str(cat.code)
            ow_drr.at[cat.code, "area"] = (
                str(lu_counts[num][13] * px_area) if 13 in lu_counts[num] else "0"
            )
            ow_drr.at[cat.code, "meteo_area"] = str(ms)
            ow_drr.at[cat.code, "px"] = f"{cat.geometry.centroid.coords[0][0]-20:.0f}"
            ow_drr.at[cat.code, "py"] = f"{cat.geometry.centroid.coords[0][1]:.0f}"
            ow_drr.at[cat.code, "boundary_node"] = cat.boundary_node
        [self.openwater.add_openwater(**ow) for ow in ow_drr.to_dict("records")]
openwater_from_input(catchments: ExtendedGeoDataFrame, landuse: Path | str, meteo_areas: ExtendedGeoDataFrame, zonalstats_alltouched: bool = None) -> None

Generate contents of an RR open water node.

Parameters:

Name Type Description Default
catchments ExtendedGeoDataFrame

catchment areas

required
landuse str

filename of landuse raster

required
meteo_areas ExtendedGeoDataFrame

meteo areas, for each station a meteo time series is assigned

required
zonalstats_alltouched bool

method to carry out zonal statistis, see rasterstats docx. Defaults to False.onalstats_alltouched (bool, optional): method to. Defaults to False.

None

Returns:

Name Type Description
_type_ None

description

Source code in hydrolib/dhydamo/io/drrreader.py
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
@validate_arguments(config=ConfigDict(arbitrary_types_allowed=True))
def openwater_from_input(
    self,
    catchments: ExtendedGeoDataFrame,
    landuse: Path | str,
    meteo_areas: ExtendedGeoDataFrame,
    zonalstats_alltouched: bool = None,
) -> None:
    """Generate contents of an RR open water node.

    Args:
        catchments (ExtendedGeoDataFrame): catchment areas
        landuse (str): filename of landuse raster
        meteo_areas (ExtendedGeoDataFrame): meteo areas, for each station a meteo time series is assigned
        zonalstats_alltouched (bool, optional): method to carry out zonal statistis, see rasterstats docx. Defaults to False.onalstats_alltouched (bool, optional): method to. Defaults to False.

    Returns:
        _type_: _description_
    """
    all_touched = False if zonalstats_alltouched is None else zonalstats_alltouched

    lu_rast, lu_affine = self.openwater.drrmodel.read_raster(landuse, static=True)
    lu_counts = zonal_stats(
        gpd.GeoDataFrame(catchments),
        lu_rast.astype(int),
        affine=lu_affine,
        categorical=True,
        all_touched=all_touched,
    )

    # get raster cellsize
    px_area = lu_affine[0] * -lu_affine[4]

    ow_drr = ExtendedDataFrame(required_columns=["id"])
    ow_drr.set_data(
        pd.DataFrame(
            np.zeros((len(catchments), 6)),
            columns=["id", "area", "meteo_area", "px", "py", "boundary_node"],
            dtype="str",
        ),
        index_col="id",
    )
    ow_drr.index = catchments.code
    for num, cat in enumerate(catchments.itertuples()):
        # find corresponding meteo-station
        tm = [
            m
            for m in meteo_areas.itertuples()
            if m.geometry.contains(cat.geometry.centroid)
        ]
        ms = meteo_areas.iloc[0, :][0] if tm == [] else tm[0].code

        ow_drr.at[cat.code, "id"] = str(cat.code)
        ow_drr.at[cat.code, "area"] = (
            str(lu_counts[num][13] * px_area) if 13 in lu_counts[num] else "0"
        )
        ow_drr.at[cat.code, "meteo_area"] = str(ms)
        ow_drr.at[cat.code, "px"] = f"{cat.geometry.centroid.coords[0][0]-20:.0f}"
        ow_drr.at[cat.code, "py"] = f"{cat.geometry.centroid.coords[0][1]:.0f}"
        ow_drr.at[cat.code, "boundary_node"] = cat.boundary_node
    [self.openwater.add_openwater(**ow) for ow in ow_drr.to_dict("records")]

PavedIO

Source code in hydrolib/dhydamo/io/drrreader.py
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
class PavedIO:
    def __init__(self, paved):
        self.paved = paved

    @validate_arguments(config=ConfigDict(arbitrary_types_allowed=True))
    def paved_from_input(
        self,
        catchments: ExtendedGeoDataFrame,
        landuse: StrictStr | Path,
        surface_level: StrictStr | Path,
        street_storage:StrictStr | Path | float,
        sewer_storage:StrictStr | Path | float,
        pump_capacity:StrictStr | Path | float,
        meteo_areas: ExtendedGeoDataFrame,
        overflows: ExtendedGeoDataFrame = None,
        sewer_areas: ExtendedGeoDataFrame = None,
        zonalstats_alltouched: bool = None

    ) -> None:
        """Generate contents of RR paved nodes

        Args:
            catchments (ExtendedGeoDataFrame): catchment areas
            landuse (str): filename of landuse raster
            surface_level (str): file name of suface level raster
            street_storage (Union): numeric for spatially uniform street storage (mm), or raster for distributed values
            sewer_storage (Union): numeric for spatially uniform sewer storage (mm), or raster for distributed values
            pump_capacity (Union): numeric for spatially uniform pump capaities (mm), or raster for distributed values
            meteo_areas (ExtendedGeoDataFrame): meteo areas, for each station a meteo time series is assigned
            overflows (ExtendedGeoDataFrame, optional): overflow locations. Defaults to None.
            sewer_areas (ExtendedGeoDataFrame, optional): sewer area locations. Defaults to None.
            zonalstats_alltouched (bool, optional): method to carry out zonal statistis, see rasterstats docx. Defaults to False.onalstats_alltouched (bool, optional): method to. Defaults to False.

        Returns:
            _type_: _description_
        """
        all_touched = False if zonalstats_alltouched is None else zonalstats_alltouched

        lu_rast, lu_affine = self.paved.drrmodel.read_raster(landuse, static=True)
        lu_counts = zonal_stats(
            gpd.GeoDataFrame(catchments),
            lu_rast.astype(int),
            affine=lu_affine,
            categorical=True,
            all_touched=all_touched,
        )
        sl_rast, sl_affine = self.paved.drrmodel.read_raster(surface_level, static=True)
        mean_elev = zonal_stats(
            gpd.GeoDataFrame(catchments),
            sl_rast.astype(float),
            affine=sl_affine,
            stats="median",
            all_touched=all_touched,
        )

        if isinstance(street_storage, (Path, str)):
            strs_rast, strs_affine = self.paved.drrmodel.read_raster(
                street_storage, static=True
            )
            str_stors = zonal_stats(
                gpd.GeoDataFrame(catchments),
                strs_rast.astype(float),
                affine=strs_affine,
                stats="mean",
                all_touched=True,
            )

        if isinstance(sewer_storage, (Path, str)):
            sews_rast, sews_affine = self.paved.drrmodel.read_raster(
                sewer_storage, static=True
            )
            sew_stors = zonal_stats(
                gpd.GeoDataFrame(catchments),
                sews_rast.astype(float),
                affine=sews_affine,
                stats="mean",
                all_touched=True,
            )

        if isinstance(pump_capacity,  (Path, str)):
            # raster of POC in mm/h
            pump_rast, pump_affine = self.paved.drrmodel.read_raster(
                pump_capacity, static=True
            )
            pump_caps = zonal_stats(
                gpd.GeoDataFrame(catchments),
                pump_rast.astype(float),
                affine=pump_affine,
                stats="mean",
                all_touched=True,
            )

        def update_dict(dict1, dict2):
            for i in dict2.keys():
                if i in dict1:
                    dict1[i] += dict2[i]
                else:
                    dict1[i] = dict2[i]
            return dict1

        # get raster cellsize
        px_area = lu_affine[0] * -lu_affine[4]
        paved_drr = ExtendedDataFrame(required_columns=["id"])
        if sewer_areas is not None:
            # if the parameters ara rasters, do the zonal statistics per sewage area as well.
            if isinstance(street_storage,(Path, str)):
                str_stors_sa = zonal_stats(
                    sewer_areas,
                    strs_rast.astype(float),
                    affine=strs_affine,
                    stats="mean",
                    all_touched=True,
                )
            if isinstance(sewer_storage, (Path, str)):
                sew_stors_sa = zonal_stats(
                    sewer_areas,
                    sews_rast.astype(float),
                    affine=sews_affine,
                    stats="mean",
                    all_touched=True,
                )
            if isinstance(pump_capacity, (Path, str)):
                pump_caps_sa = zonal_stats(
                    gpd.GeoDataFrame(sewer_areas),
                    pump_rast.astype(float),
                    affine=pump_affine,
                    stats="mean",
                    all_touched=True,
                )
            mean_sa_elev = zonal_stats(
                gpd.GeoDataFrame(sewer_areas), sl_rast, affine=sl_affine, stats="median", all_touched=True
            )

            # initialize the array of paved nodes, which should contain a node for all catchments and all overflows
            paved_drr.set_data(
                pd.DataFrame(
                    np.zeros((len(catchments) + len(overflows), 10)),
                    columns=[
                        "id",
                        "area",
                        "surface_level",
                        "street_storage",
                        "sewer_storage",
                        "pump_capacity",
                        "meteo_area",
                        "px",
                        "py",
                        "boundary_node",
                    ],
                    dtype="str",
                ),
                index_col="id",
            )
            paved_drr.index = pd.concat([catchments.code, overflows.code], ignore_index=True)


            # find the paved area in the sewer areas
            for isew, sew in enumerate(sewer_areas.itertuples()):
                pav_area = 0
                pixels = zonal_stats(
                    sew.geometry,
                    lu_rast.astype(int),
                    affine=lu_affine,
                    categorical=True,
                    all_touched=all_touched,
                )[0]
                if 14.0 not in pixels:
                    logger.warning("No paved area in sewer area %s.", sew.code)
                    continue
                pav_pixels = pixels[14.0]
                pav_area += pav_pixels * px_area

                # subtract it fromthe total paved area in this catchment, make sure at least 0 remains
                # lu_counts[cat_ind][14.0] -=  pav_pixels
                # if lu_counts[cat_ind][14.0] < 0: lu_counts[cat_ind][14.0]  = 0

                elev = mean_sa_elev[isew]["median"]
                # find overflows related to this sewer area
                ovf = overflows[overflows.codegerelateerdobject == sew.code]
                for ov in ovf.itertuples():
                    # find corresponding meteo-station
                    tm = [
                        m
                        for m in meteo_areas.itertuples()
                        if m.geometry.contains(sew.geometry.centroid)
                    ]
                    ms = meteo_areas.iloc[0, :][0] if tm == [] else tm[0].code

                    # add prefix to the overflow id to create the paved-node id
                    paved_drr.at[ov.code, "id"] = str(ov.code)
                    paved_drr.at[ov.code, "area"] = str(pav_area * ov.fractie)
                    paved_drr.at[ov.code, "surface_level"] = f"{elev:.2f}"

                    # if a float is given, a standard value is passed. If a string is given, a rastername is assumed to zonal statistics are applied.
                    if isinstance(street_storage,  float):
                        paved_drr.at[
                            ov.code, "street_storage"
                        ] = f"{street_storage:.2f}"
                    elif isinstance(street_storage, (Path, str)):
                        paved_drr.at[
                            ov.code, "street_storage"
                        ] = f'{str_stors_sa[isew]["mean"]:.2f}'
                    else:
                        raise ValueError('Street_storage has the wrong datatype. It should be a filename (Path or string) or number (float or int).')

                    # three options: it can be an attribute of a sewer area, a uniform value or a raster
                    if sew.riool_berging_mm is None or np.isnan(sew.riool_berging_mm) or not isinstance(sew.riool_berging_mm, float):  
                        if isinstance(sewer_storage, float):
                            paved_drr.at[ov.code, "sewer_storage"] = f"{sewer_storage:.2f}"   
                        elif isinstance(sewer_storage, (Path, str)):
                            paved_drr.at[
                            ov.code, "sewer_storage"
                            ] = f'{sew_stors_sa[isew]["mean"]:.2f}'
                        else:
                            raise ValueError('Sewer_storage has the wrong datatype. It should be a filename (Path or string) or number (float or int).')                            
                    else:                                     
                        paved_drr.at[ov.code, "sewer_storage"] = f'{sew.riool_berging_mm:.2f}'

                    # three options: it can be an attribute of a sewer area, a uniform value or a raster
                    if sew.riool_poc_m3s is None or np.isnan(sew.riool_poc_m3s) or not isinstance(sew.riool_poc_m3s, float):                    
                         if isinstance(pump_capacity, float):
                            # convert the value from mm/h to m3/s
                            paved_drr.at[ov.code, "pump_capacity"] = f"{pump_capacity * (float(pav_area) * ov.fractie) / (1000. * 3600.):.8f}"   
                         elif isinstance(pump_capacity, (Path, str)):
                            # convert the value (extracted from the raster) from mm/h to m3/s
                            paved_drr.at[
                            ov.code, "pump_capacity"
                            ] = f'{pump_caps_sa[isew]["mean"] * (float(pav_area) * ov.fractie) / (1000. * 3600.):.8f}'
                         else:
                            raise ValueError('Pump_capacity has the wrong datatype. It should be a filename (Path or string) or number (float or int).')        
                    else:
                        # use the attribute value
                        paved_drr.at[ov.code, "pump_capacity"] = f'{sew.riool_poc_m3s * ov.fractie:.8f}'

                    paved_drr.at[ov.code, "meteo_area"] = str(ms)
                    paved_drr.at[ov.code, "px"] = f"{ov.geometry.coords[0][0]+10:.0f}"
                    paved_drr.at[ov.code, "py"] = f"{ov.geometry.coords[0][1]:.0f}"
                    paved_drr.at[ov.code, "boundary_node"] = ov.code
        else:
            # in this case only the catchments are taken into account. A node is created for every catchment nonetheless, but only nodes with a remaining area >0 are written.
            paved_drr.set_data(
                pd.DataFrame(
                    np.zeros((len(catchments), 10)),
                    columns=[
                        "id",
                        "area",
                        "surface_level",
                        "street_storage",
                        "sewer_storage",
                        "pump_capacity",
                        "meteo_area",
                        "px",
                        "py",
                        "boundary_node",
                    ],
                    dtype="str",
                ),
                index_col="id",
            )
            paved_drr.index = catchments.code

        for num, cat in enumerate(catchments.itertuples()):
            # if no rasterdata could be obtained for this catchment, skip it.
            if mean_elev[num]["median"] is None:
                logger.warning(NO_RASTERDATA_WARNING, cat.code)
                continue
            if sewer_areas is not None:
                # part of the catchment that is also in a sewer area
                if cat.geometry.intersects(sewer_areas.union_all()):
                    # the part of the catchment OUTSIDE the sewer area
                    area_outside_sewer = cat.geometry.difference(
                        sewer_areas.union_all()
                    )
                    if area_outside_sewer.area == 0:
                        logger.info(
                            f"No paved area outside sewer area in catchments {cat.code}."
                        )
                        pav_area = 0.0
                    else:
                        # the paved ara in the catchment OUTSIDE the sewer area
                        pixels = zonal_stats(
                            area_outside_sewer,
                            lu_rast.astype(int),
                            affine=lu_affine,
                            categorical=True,
                            all_touched=all_touched,
                        )[0]
                        if 14.0 in pixels:
                            pav_area = str(pixels[14.0] * px_area)
                        else:
                            pav_area = 0.0
                else:
                    # all of the catchment is outside the sewer area
                    pixels = zonal_stats(
                                cat.geometry,
                                lu_rast.astype(int),
                                affine=lu_affine,
                                categorical=True,
                                all_touched=all_touched,
                            )[0]
                    if 14.0 in pixels:
                        pav_area = str(pixels[14.0] * px_area)
                    else:
                        pav_area = 0.0
            else:
                # there is no sewer area at all
                pav_area = (
                    str(lu_counts[num][14.0] * px_area)
                    if 14.0 in lu_counts[num]
                    else "0"
                )

            # find corresponding meteo-station
            tm = [
                m
                for m in meteo_areas.itertuples()
                if m.geometry.contains(cat.geometry.centroid)
            ]
            ms = meteo_areas.iloc[0, :][0] if tm == [] else tm[0].code

            elev = mean_elev[num]["median"]
            paved_drr.at[cat.code, "id"] = str(cat.code)
            paved_drr.at[cat.code, "area"] = str(pav_area)  #
            paved_drr.at[cat.code, "surface_level"] = f"{elev:.2f}"
            # if a float is given, a standard value is passed. If a string is given, a rastername is assumed to zonal statistics are applied.
            if isinstance(street_storage, float):
                paved_drr.at[cat.code, "street_storage"] = f"{street_storage:.2f}"
            else:
                paved_drr.at[
                    cat.code, "street_storage"
                ] = f'{str_stors[num]["mean"]:.2f}'
            if isinstance(sewer_storage, float):
                paved_drr.at[cat.code, "sewer_storage"] = f"{sewer_storage:.2f}"
            else:
                paved_drr.at[
                    cat.code, "sewer_storage"
                ] = f'{sew_stors[num]["mean"]:.2f}'
            if isinstance(pump_capacity, float):
                paved_drr.at[cat.code, "pump_capacity"] = f'{(pump_capacity * float(pav_area)) / (1000. * 3600.):.8f}'
            else:
                paved_drr.at[
                    cat.code, "pump_capacity"
                ] = f'{pump_caps[num]["mean"] * (float(pav_area)) / (1000. * 3600.):.8f}'

            paved_drr.at[cat.code, "meteo_area"] = str(ms)
            paved_drr.at[
                cat.code, "px"
            ] = f"{cat.geometry.centroid.coords[0][0]+10:.0f}"
            paved_drr.at[cat.code, "py"] = f"{cat.geometry.centroid.coords[0][1]:.0f}"
            paved_drr.at[cat.code, "boundary_node"] = cat.boundary_node
        [self.paved.add_paved(**pav) for pav in paved_drr.to_dict("records")]
paved_from_input(catchments: ExtendedGeoDataFrame, landuse: StrictStr | Path, surface_level: StrictStr | Path, street_storage: StrictStr | Path | float, sewer_storage: StrictStr | Path | float, pump_capacity: StrictStr | Path | float, meteo_areas: ExtendedGeoDataFrame, overflows: ExtendedGeoDataFrame = None, sewer_areas: ExtendedGeoDataFrame = None, zonalstats_alltouched: bool = None) -> None

Generate contents of RR paved nodes

Parameters:

Name Type Description Default
catchments ExtendedGeoDataFrame

catchment areas

required
landuse str

filename of landuse raster

required
surface_level str

file name of suface level raster

required
street_storage Union

numeric for spatially uniform street storage (mm), or raster for distributed values

required
sewer_storage Union

numeric for spatially uniform sewer storage (mm), or raster for distributed values

required
pump_capacity Union

numeric for spatially uniform pump capaities (mm), or raster for distributed values

required
meteo_areas ExtendedGeoDataFrame

meteo areas, for each station a meteo time series is assigned

required
overflows ExtendedGeoDataFrame

overflow locations. Defaults to None.

None
sewer_areas ExtendedGeoDataFrame

sewer area locations. Defaults to None.

None
zonalstats_alltouched bool

method to carry out zonal statistis, see rasterstats docx. Defaults to False.onalstats_alltouched (bool, optional): method to. Defaults to False.

None

Returns:

Name Type Description
_type_ None

description

Source code in hydrolib/dhydamo/io/drrreader.py
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
@validate_arguments(config=ConfigDict(arbitrary_types_allowed=True))
def paved_from_input(
    self,
    catchments: ExtendedGeoDataFrame,
    landuse: StrictStr | Path,
    surface_level: StrictStr | Path,
    street_storage:StrictStr | Path | float,
    sewer_storage:StrictStr | Path | float,
    pump_capacity:StrictStr | Path | float,
    meteo_areas: ExtendedGeoDataFrame,
    overflows: ExtendedGeoDataFrame = None,
    sewer_areas: ExtendedGeoDataFrame = None,
    zonalstats_alltouched: bool = None

) -> None:
    """Generate contents of RR paved nodes

    Args:
        catchments (ExtendedGeoDataFrame): catchment areas
        landuse (str): filename of landuse raster
        surface_level (str): file name of suface level raster
        street_storage (Union): numeric for spatially uniform street storage (mm), or raster for distributed values
        sewer_storage (Union): numeric for spatially uniform sewer storage (mm), or raster for distributed values
        pump_capacity (Union): numeric for spatially uniform pump capaities (mm), or raster for distributed values
        meteo_areas (ExtendedGeoDataFrame): meteo areas, for each station a meteo time series is assigned
        overflows (ExtendedGeoDataFrame, optional): overflow locations. Defaults to None.
        sewer_areas (ExtendedGeoDataFrame, optional): sewer area locations. Defaults to None.
        zonalstats_alltouched (bool, optional): method to carry out zonal statistis, see rasterstats docx. Defaults to False.onalstats_alltouched (bool, optional): method to. Defaults to False.

    Returns:
        _type_: _description_
    """
    all_touched = False if zonalstats_alltouched is None else zonalstats_alltouched

    lu_rast, lu_affine = self.paved.drrmodel.read_raster(landuse, static=True)
    lu_counts = zonal_stats(
        gpd.GeoDataFrame(catchments),
        lu_rast.astype(int),
        affine=lu_affine,
        categorical=True,
        all_touched=all_touched,
    )
    sl_rast, sl_affine = self.paved.drrmodel.read_raster(surface_level, static=True)
    mean_elev = zonal_stats(
        gpd.GeoDataFrame(catchments),
        sl_rast.astype(float),
        affine=sl_affine,
        stats="median",
        all_touched=all_touched,
    )

    if isinstance(street_storage, (Path, str)):
        strs_rast, strs_affine = self.paved.drrmodel.read_raster(
            street_storage, static=True
        )
        str_stors = zonal_stats(
            gpd.GeoDataFrame(catchments),
            strs_rast.astype(float),
            affine=strs_affine,
            stats="mean",
            all_touched=True,
        )

    if isinstance(sewer_storage, (Path, str)):
        sews_rast, sews_affine = self.paved.drrmodel.read_raster(
            sewer_storage, static=True
        )
        sew_stors = zonal_stats(
            gpd.GeoDataFrame(catchments),
            sews_rast.astype(float),
            affine=sews_affine,
            stats="mean",
            all_touched=True,
        )

    if isinstance(pump_capacity,  (Path, str)):
        # raster of POC in mm/h
        pump_rast, pump_affine = self.paved.drrmodel.read_raster(
            pump_capacity, static=True
        )
        pump_caps = zonal_stats(
            gpd.GeoDataFrame(catchments),
            pump_rast.astype(float),
            affine=pump_affine,
            stats="mean",
            all_touched=True,
        )

    def update_dict(dict1, dict2):
        for i in dict2.keys():
            if i in dict1:
                dict1[i] += dict2[i]
            else:
                dict1[i] = dict2[i]
        return dict1

    # get raster cellsize
    px_area = lu_affine[0] * -lu_affine[4]
    paved_drr = ExtendedDataFrame(required_columns=["id"])
    if sewer_areas is not None:
        # if the parameters ara rasters, do the zonal statistics per sewage area as well.
        if isinstance(street_storage,(Path, str)):
            str_stors_sa = zonal_stats(
                sewer_areas,
                strs_rast.astype(float),
                affine=strs_affine,
                stats="mean",
                all_touched=True,
            )
        if isinstance(sewer_storage, (Path, str)):
            sew_stors_sa = zonal_stats(
                sewer_areas,
                sews_rast.astype(float),
                affine=sews_affine,
                stats="mean",
                all_touched=True,
            )
        if isinstance(pump_capacity, (Path, str)):
            pump_caps_sa = zonal_stats(
                gpd.GeoDataFrame(sewer_areas),
                pump_rast.astype(float),
                affine=pump_affine,
                stats="mean",
                all_touched=True,
            )
        mean_sa_elev = zonal_stats(
            gpd.GeoDataFrame(sewer_areas), sl_rast, affine=sl_affine, stats="median", all_touched=True
        )

        # initialize the array of paved nodes, which should contain a node for all catchments and all overflows
        paved_drr.set_data(
            pd.DataFrame(
                np.zeros((len(catchments) + len(overflows), 10)),
                columns=[
                    "id",
                    "area",
                    "surface_level",
                    "street_storage",
                    "sewer_storage",
                    "pump_capacity",
                    "meteo_area",
                    "px",
                    "py",
                    "boundary_node",
                ],
                dtype="str",
            ),
            index_col="id",
        )
        paved_drr.index = pd.concat([catchments.code, overflows.code], ignore_index=True)


        # find the paved area in the sewer areas
        for isew, sew in enumerate(sewer_areas.itertuples()):
            pav_area = 0
            pixels = zonal_stats(
                sew.geometry,
                lu_rast.astype(int),
                affine=lu_affine,
                categorical=True,
                all_touched=all_touched,
            )[0]
            if 14.0 not in pixels:
                logger.warning("No paved area in sewer area %s.", sew.code)
                continue
            pav_pixels = pixels[14.0]
            pav_area += pav_pixels * px_area

            # subtract it fromthe total paved area in this catchment, make sure at least 0 remains
            # lu_counts[cat_ind][14.0] -=  pav_pixels
            # if lu_counts[cat_ind][14.0] < 0: lu_counts[cat_ind][14.0]  = 0

            elev = mean_sa_elev[isew]["median"]
            # find overflows related to this sewer area
            ovf = overflows[overflows.codegerelateerdobject == sew.code]
            for ov in ovf.itertuples():
                # find corresponding meteo-station
                tm = [
                    m
                    for m in meteo_areas.itertuples()
                    if m.geometry.contains(sew.geometry.centroid)
                ]
                ms = meteo_areas.iloc[0, :][0] if tm == [] else tm[0].code

                # add prefix to the overflow id to create the paved-node id
                paved_drr.at[ov.code, "id"] = str(ov.code)
                paved_drr.at[ov.code, "area"] = str(pav_area * ov.fractie)
                paved_drr.at[ov.code, "surface_level"] = f"{elev:.2f}"

                # if a float is given, a standard value is passed. If a string is given, a rastername is assumed to zonal statistics are applied.
                if isinstance(street_storage,  float):
                    paved_drr.at[
                        ov.code, "street_storage"
                    ] = f"{street_storage:.2f}"
                elif isinstance(street_storage, (Path, str)):
                    paved_drr.at[
                        ov.code, "street_storage"
                    ] = f'{str_stors_sa[isew]["mean"]:.2f}'
                else:
                    raise ValueError('Street_storage has the wrong datatype. It should be a filename (Path or string) or number (float or int).')

                # three options: it can be an attribute of a sewer area, a uniform value or a raster
                if sew.riool_berging_mm is None or np.isnan(sew.riool_berging_mm) or not isinstance(sew.riool_berging_mm, float):  
                    if isinstance(sewer_storage, float):
                        paved_drr.at[ov.code, "sewer_storage"] = f"{sewer_storage:.2f}"   
                    elif isinstance(sewer_storage, (Path, str)):
                        paved_drr.at[
                        ov.code, "sewer_storage"
                        ] = f'{sew_stors_sa[isew]["mean"]:.2f}'
                    else:
                        raise ValueError('Sewer_storage has the wrong datatype. It should be a filename (Path or string) or number (float or int).')                            
                else:                                     
                    paved_drr.at[ov.code, "sewer_storage"] = f'{sew.riool_berging_mm:.2f}'

                # three options: it can be an attribute of a sewer area, a uniform value or a raster
                if sew.riool_poc_m3s is None or np.isnan(sew.riool_poc_m3s) or not isinstance(sew.riool_poc_m3s, float):                    
                     if isinstance(pump_capacity, float):
                        # convert the value from mm/h to m3/s
                        paved_drr.at[ov.code, "pump_capacity"] = f"{pump_capacity * (float(pav_area) * ov.fractie) / (1000. * 3600.):.8f}"   
                     elif isinstance(pump_capacity, (Path, str)):
                        # convert the value (extracted from the raster) from mm/h to m3/s
                        paved_drr.at[
                        ov.code, "pump_capacity"
                        ] = f'{pump_caps_sa[isew]["mean"] * (float(pav_area) * ov.fractie) / (1000. * 3600.):.8f}'
                     else:
                        raise ValueError('Pump_capacity has the wrong datatype. It should be a filename (Path or string) or number (float or int).')        
                else:
                    # use the attribute value
                    paved_drr.at[ov.code, "pump_capacity"] = f'{sew.riool_poc_m3s * ov.fractie:.8f}'

                paved_drr.at[ov.code, "meteo_area"] = str(ms)
                paved_drr.at[ov.code, "px"] = f"{ov.geometry.coords[0][0]+10:.0f}"
                paved_drr.at[ov.code, "py"] = f"{ov.geometry.coords[0][1]:.0f}"
                paved_drr.at[ov.code, "boundary_node"] = ov.code
    else:
        # in this case only the catchments are taken into account. A node is created for every catchment nonetheless, but only nodes with a remaining area >0 are written.
        paved_drr.set_data(
            pd.DataFrame(
                np.zeros((len(catchments), 10)),
                columns=[
                    "id",
                    "area",
                    "surface_level",
                    "street_storage",
                    "sewer_storage",
                    "pump_capacity",
                    "meteo_area",
                    "px",
                    "py",
                    "boundary_node",
                ],
                dtype="str",
            ),
            index_col="id",
        )
        paved_drr.index = catchments.code

    for num, cat in enumerate(catchments.itertuples()):
        # if no rasterdata could be obtained for this catchment, skip it.
        if mean_elev[num]["median"] is None:
            logger.warning(NO_RASTERDATA_WARNING, cat.code)
            continue
        if sewer_areas is not None:
            # part of the catchment that is also in a sewer area
            if cat.geometry.intersects(sewer_areas.union_all()):
                # the part of the catchment OUTSIDE the sewer area
                area_outside_sewer = cat.geometry.difference(
                    sewer_areas.union_all()
                )
                if area_outside_sewer.area == 0:
                    logger.info(
                        f"No paved area outside sewer area in catchments {cat.code}."
                    )
                    pav_area = 0.0
                else:
                    # the paved ara in the catchment OUTSIDE the sewer area
                    pixels = zonal_stats(
                        area_outside_sewer,
                        lu_rast.astype(int),
                        affine=lu_affine,
                        categorical=True,
                        all_touched=all_touched,
                    )[0]
                    if 14.0 in pixels:
                        pav_area = str(pixels[14.0] * px_area)
                    else:
                        pav_area = 0.0
            else:
                # all of the catchment is outside the sewer area
                pixels = zonal_stats(
                            cat.geometry,
                            lu_rast.astype(int),
                            affine=lu_affine,
                            categorical=True,
                            all_touched=all_touched,
                        )[0]
                if 14.0 in pixels:
                    pav_area = str(pixels[14.0] * px_area)
                else:
                    pav_area = 0.0
        else:
            # there is no sewer area at all
            pav_area = (
                str(lu_counts[num][14.0] * px_area)
                if 14.0 in lu_counts[num]
                else "0"
            )

        # find corresponding meteo-station
        tm = [
            m
            for m in meteo_areas.itertuples()
            if m.geometry.contains(cat.geometry.centroid)
        ]
        ms = meteo_areas.iloc[0, :][0] if tm == [] else tm[0].code

        elev = mean_elev[num]["median"]
        paved_drr.at[cat.code, "id"] = str(cat.code)
        paved_drr.at[cat.code, "area"] = str(pav_area)  #
        paved_drr.at[cat.code, "surface_level"] = f"{elev:.2f}"
        # if a float is given, a standard value is passed. If a string is given, a rastername is assumed to zonal statistics are applied.
        if isinstance(street_storage, float):
            paved_drr.at[cat.code, "street_storage"] = f"{street_storage:.2f}"
        else:
            paved_drr.at[
                cat.code, "street_storage"
            ] = f'{str_stors[num]["mean"]:.2f}'
        if isinstance(sewer_storage, float):
            paved_drr.at[cat.code, "sewer_storage"] = f"{sewer_storage:.2f}"
        else:
            paved_drr.at[
                cat.code, "sewer_storage"
            ] = f'{sew_stors[num]["mean"]:.2f}'
        if isinstance(pump_capacity, float):
            paved_drr.at[cat.code, "pump_capacity"] = f'{(pump_capacity * float(pav_area)) / (1000. * 3600.):.8f}'
        else:
            paved_drr.at[
                cat.code, "pump_capacity"
            ] = f'{pump_caps[num]["mean"] * (float(pav_area)) / (1000. * 3600.):.8f}'

        paved_drr.at[cat.code, "meteo_area"] = str(ms)
        paved_drr.at[
            cat.code, "px"
        ] = f"{cat.geometry.centroid.coords[0][0]+10:.0f}"
        paved_drr.at[cat.code, "py"] = f"{cat.geometry.centroid.coords[0][1]:.0f}"
        paved_drr.at[cat.code, "boundary_node"] = cat.boundary_node
    [self.paved.add_paved(**pav) for pav in paved_drr.to_dict("records")]

UnpavedIO

Source code in hydrolib/dhydamo/io/drrreader.py
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
class UnpavedIO:
    def __init__(self, unpaved):
        self.unpaved = unpaved

    @validate_arguments(config=ConfigDict(arbitrary_types_allowed=True))
    def unpaved_from_input(
        self,
        catchments: ExtendedGeoDataFrame,
        landuse: StrictStr | Path,
        surface_level: StrictStr | Path,
        soiltype: StrictStr | Path,
        surface_storage: StrictStr | Path | float,
        infiltration_capacity: StrictStr | Path | float,
        initial_gwd: StrictStr | Path | float,
        meteo_areas: ExtendedGeoDataFrame,
        zonalstats_alltouched: bool = None,        
        greenhouse_areas: ExtendedGeoDataFrame = None
    ):
        """Generate contents of an unpaved node from raster data

        Args:
            catchments (ExtendedGeoDataFrame): catchment areas
            landuse (str): filename of land use raster
            surface_level (str): file name of surface level raster
            soiltype (str): file name of soiltype raster
            surface_storage (Union): numeric for spatially uniform surface storage (mm), or raster for distributed values
            infiltration_capacity (Union): numeric for spatially uniform infiltration capacity (mm/d), or raster for distributed values
            initial_gwd (Union): numeric for spatially uniform initial groundwater levels (m), or raster for distributed values
            meteo_areas (ExtendedGeoDataFrame): meteo areas, for each station a meteo time series is assigned
            zonalstats_alltouched (bool, optional): method to carry out zonal statistics, see rasterstats docx. Defaults to False.
        """
        all_touched = False if zonalstats_alltouched is None else zonalstats_alltouched

        # required rasters
        warnings.filterwarnings("ignore")
        lu_rast, lu_affine = self.unpaved.drrmodel.read_raster(landuse, static=True)
        lu_counts = zonal_stats(
            gpd.GeoDataFrame(catchments),
            lu_rast.astype(int),
            affine=lu_affine,
            categorical=True,
            all_touched=all_touched,
        )

        soil_rast, affine = self.unpaved.drrmodel.read_raster(soiltype, static=True)
        soiltypes = zonal_stats(
            gpd.GeoDataFrame(catchments),
            soil_rast.astype(int),
            affine=affine,
            stats="majority",
            all_touched=all_touched,
        )

        rast, affine = self.unpaved.drrmodel.read_raster(surface_level, static=True)
        mean_elev = zonal_stats(
            gpd.GeoDataFrame(catchments), rast.astype(float), affine=affine, stats="median", all_touched=all_touched
        )

        # optional rasters
        if isinstance(surface_storage, str):
            rast, affine = self.unpaved.drrmodel.read_raster(
                surface_storage, static=True
            )
            sstores = zonal_stats(
                gpd.GeoDataFrame(catchments), rast.astype(float), affine=affine, stats="mean", all_touched=True
            )
        elif isinstance(surface_storage, int):
            surface_storage = float(surface_storage)
        if isinstance(infiltration_capacity, str):
            rast, affine = self.unpaved.drrmodel.read_raster(
                infiltration_capacity, static=True
            )
            infcaps = zonal_stats(
                gpd.GeoDataFrame(catchments), rast.astype(float), affine=affine, stats="mean", all_touched=True
            )
        elif isinstance(infiltration_capacity, int):
            infiltration_capacity = float(infiltration_capacity)
        if isinstance(initial_gwd, str):
            rast, affine = self.unpaved.drrmodel.read_raster(initial_gwd, static=True)
            ini_gwds = zonal_stats(
                gpd.GeoDataFrame(catchments), rast.astype(float), affine=affine, stats="mean", all_touched=True
            )
        elif isinstance(initial_gwd, int):
            initial_gwd = float(initial_gwd)

        # get raster cellsize
        px_area = lu_affine[0] * -lu_affine[4]

        unpaved_drr = ExtendedDataFrame(required_columns=["id"])
        unpaved_drr.set_data(
            pd.DataFrame(
                np.zeros((len(catchments), 12)),
                columns=[
                    "id",
                    "total_area",
                    "lu_areas",
                    "surface_level",
                    "soiltype",
                    "surface_storage",
                    "infiltration_capacity",
                    "initial_gwd",
                    "meteo_area",
                    "px",
                    "py",
                    "boundary_node",
                ],
                dtype="str",
            ),
            index_col="id",
        )

        unpaved_drr.index = catchments.code
        # HyDAMO Crop code; hydamo name, sobek index, sobek name:
        # 1 aardappelen   3 potatoes
        # 2 graan         5 grain
        # 3 suikerbiet    4 sugarbeet
        # 4 mais          2 corn
        # 5 overige gew. 15 vegetables
        # 6 bloembollen  10 bulbous plants
        # 7 boomgaard     9 orchard
        # 8 gras          1 grass
        # 9 loofbos      11 dediduous
        # 10 naaldbos    12 conferous
        # 11 natuuur     13 nature
        # 12 braak       14 fallow
        sobek_indices = [3, 5, 4, 2, 15, 10, 9, 1, 11, 12, 13, 14]
        for num, cat in enumerate(catchments.itertuples()):
            # if no rasterdata could be obtained for this catchment, skip it.
            if mean_elev[num]["median"] is None:
                logger.warning(NO_RASTERDATA_WARNING, cat.code)
                continue
            tm = [
                m
                for m in meteo_areas.itertuples()
                if m.geometry.contains(cat.geometry.centroid)
            ]
            ms = meteo_areas.iloc[0, :][0] if tm == [] else tm[0].code
            mapping = np.zeros(16, dtype=int)

            # subtract greenhouse area from most occurring land use if no greenhouse area is in the landuse map
            if greenhouse_areas is not None:
                if cat.geometry.intersects(greenhouse_areas.geometry).any():
                    intersection_area = cat.geometry.intersection(greenhouse_areas.geometry).area
                    intersection_area = intersection_area[intersection_area > 0.].to_numpy()[0]                    
                    if 15 in lu_counts[num]:
                        # divide area to subtract between greenhouses and the most occurring area
                        remainder = np.max([0., intersection_area - float(lu_counts[num][15]*px_area)])                                                
                    else:    
                        remainder = intersection_area
                    maxind = np.argmax(list(lu_counts[num].values()))              
                    logger.info(
                        "Catchment %s: subtracting %s m2 from class %s for supplied greenhouse area.",
                        cat.code,
                        remainder,
                        maxind,
                    )
                    lu_counts[num][list(lu_counts[num].keys())[maxind]] = np.max([0., (lu_counts[num][list(lu_counts[num].keys())[maxind]] - np.round(remainder/px_area))])

            for i in range(1, 13):
                if i in lu_counts[num]:
                    mapping[sobek_indices[i - 1] - 1] = lu_counts[num][i] * px_area
            lu_map = " ".join(map(str, mapping))
            elev = mean_elev[num]["median"]
            unpaved_drr.at[cat.code, "id"] = str(cat.code)
            unpaved_drr.at[cat.code, "total_area"] = f"{cat.geometry.area:.0f}"
            unpaved_drr.at[cat.code, "lu_areas"] = lu_map
            unpaved_drr.at[cat.code, "surface_level"] = f"{elev:.2f}"
            unpaved_drr.at[
                cat.code, "soiltype"
            ] = f'{soiltypes[num]["majority"]+100.:.0f}'
            if isinstance(surface_storage, float):
                unpaved_drr.at[cat.code, "surface_storage"] = f"{surface_storage:.3f}"
            else:
                unpaved_drr.at[
                    cat.code, "surface_storage"
                ] = f'{sstores[num]["mean"]:.3f}'
            if isinstance(infiltration_capacity, float):
                unpaved_drr.at[
                    cat.code, "infiltration_capacity"
                ] = f"{infiltration_capacity:.3f}"
            else:
                unpaved_drr.at[
                    cat.code, "infiltration_capacity"
                ] = f'{infcaps[num]["mean"]:.3f}'
            if isinstance(initial_gwd, float):
                unpaved_drr.at[cat.code, "initial_gwd"] = f"{initial_gwd:.2f}"
            else:
                unpaved_drr.at[cat.code, "initial_gwd"] = f'{ini_gwds[num]["mean"]:.2f}'
            unpaved_drr.at[cat.code, "meteo_area"] = str(ms)
            unpaved_drr.at[
                cat.code, "px"
            ] = f"{cat.geometry.centroid.coords[0][0]-10:.0f}"
            unpaved_drr.at[cat.code, "py"] = f"{cat.geometry.centroid.coords[0][1]:.0f}"
            unpaved_drr.at[cat.code, "boundary_node"] = cat.boundary_node

        [
            self.unpaved.add_unpaved(**unpaved)
            for unpaved in unpaved_drr.to_dict("records")
        ]

    @validate_arguments(config=ConfigDict(arbitrary_types_allowed=True))
    def ernst_from_input(
        self,
        catchments: ExtendedGeoDataFrame,
        depths: list,
        resistance: list,
        infiltration_resistance: float = None,
        runoff_resistance: float = None,
    ) -> None:
        """Generate an Ernst definition for an unpaved node.

        Args:
            catchments (ExtendedGeoDataFrame): Cahchment areas
            depths (list): list of layer depths (m)
            resistance (list): list of layer Ernst resistances (d-1)
            infiltration_resistance (int or float, optional): resistance to infiltration. Defaults to 300 d-1.
            runoff_resistance (int or float, optional): resistance to suface runoff. Defaults to 1 d-1.
        """
        if infiltration_resistance is None:
            infiltration_resistance = 300.0
        if runoff_resistance is None:
            runoff_resistance = 1.0

        ernst_drr = ExtendedDataFrame(required_columns=["id"])
        ernst_drr.set_data(
            pd.DataFrame(
                np.zeros((len(catchments), 5)),
                columns=["id", "cvo", "lv", "cvi", "cvs"],
                dtype="str",
            ),
            index_col="id",
        )
        ernst_drr.index = catchments.code
        for num, cat in enumerate(catchments.itertuples()):
            ernst_drr.at[cat.code, "id"] = str(cat.code)
            ernst_drr.at[cat.code, "cvo"] = " ".join([str(res) for res in resistance])
            ernst_drr.at[cat.code, "lv"] = " ".join([str(depth) for depth in depths])
            ernst_drr.at[cat.code, "cvi"] = f'{infiltration_resistance:.2f}'
            ernst_drr.at[cat.code, "cvs"] = f'{runoff_resistance:.2f}'

        [self.unpaved.add_ernst_def(**ernst) for ernst in ernst_drr.to_dict("records")]
ernst_from_input(catchments: ExtendedGeoDataFrame, depths: list, resistance: list, infiltration_resistance: float = None, runoff_resistance: float = None) -> None

Generate an Ernst definition for an unpaved node.

Parameters:

Name Type Description Default
catchments ExtendedGeoDataFrame

Cahchment areas

required
depths list

list of layer depths (m)

required
resistance list

list of layer Ernst resistances (d-1)

required
infiltration_resistance int or float

resistance to infiltration. Defaults to 300 d-1.

None
runoff_resistance int or float

resistance to suface runoff. Defaults to 1 d-1.

None
Source code in hydrolib/dhydamo/io/drrreader.py
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
@validate_arguments(config=ConfigDict(arbitrary_types_allowed=True))
def ernst_from_input(
    self,
    catchments: ExtendedGeoDataFrame,
    depths: list,
    resistance: list,
    infiltration_resistance: float = None,
    runoff_resistance: float = None,
) -> None:
    """Generate an Ernst definition for an unpaved node.

    Args:
        catchments (ExtendedGeoDataFrame): Cahchment areas
        depths (list): list of layer depths (m)
        resistance (list): list of layer Ernst resistances (d-1)
        infiltration_resistance (int or float, optional): resistance to infiltration. Defaults to 300 d-1.
        runoff_resistance (int or float, optional): resistance to suface runoff. Defaults to 1 d-1.
    """
    if infiltration_resistance is None:
        infiltration_resistance = 300.0
    if runoff_resistance is None:
        runoff_resistance = 1.0

    ernst_drr = ExtendedDataFrame(required_columns=["id"])
    ernst_drr.set_data(
        pd.DataFrame(
            np.zeros((len(catchments), 5)),
            columns=["id", "cvo", "lv", "cvi", "cvs"],
            dtype="str",
        ),
        index_col="id",
    )
    ernst_drr.index = catchments.code
    for num, cat in enumerate(catchments.itertuples()):
        ernst_drr.at[cat.code, "id"] = str(cat.code)
        ernst_drr.at[cat.code, "cvo"] = " ".join([str(res) for res in resistance])
        ernst_drr.at[cat.code, "lv"] = " ".join([str(depth) for depth in depths])
        ernst_drr.at[cat.code, "cvi"] = f'{infiltration_resistance:.2f}'
        ernst_drr.at[cat.code, "cvs"] = f'{runoff_resistance:.2f}'

    [self.unpaved.add_ernst_def(**ernst) for ernst in ernst_drr.to_dict("records")]
unpaved_from_input(catchments: ExtendedGeoDataFrame, landuse: StrictStr | Path, surface_level: StrictStr | Path, soiltype: StrictStr | Path, surface_storage: StrictStr | Path | float, infiltration_capacity: StrictStr | Path | float, initial_gwd: StrictStr | Path | float, meteo_areas: ExtendedGeoDataFrame, zonalstats_alltouched: bool = None, greenhouse_areas: ExtendedGeoDataFrame = None)

Generate contents of an unpaved node from raster data

Parameters:

Name Type Description Default
catchments ExtendedGeoDataFrame

catchment areas

required
landuse str

filename of land use raster

required
surface_level str

file name of surface level raster

required
soiltype str

file name of soiltype raster

required
surface_storage Union

numeric for spatially uniform surface storage (mm), or raster for distributed values

required
infiltration_capacity Union

numeric for spatially uniform infiltration capacity (mm/d), or raster for distributed values

required
initial_gwd Union

numeric for spatially uniform initial groundwater levels (m), or raster for distributed values

required
meteo_areas ExtendedGeoDataFrame

meteo areas, for each station a meteo time series is assigned

required
zonalstats_alltouched bool

method to carry out zonal statistics, see rasterstats docx. Defaults to False.

None
Source code in hydrolib/dhydamo/io/drrreader.py
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
@validate_arguments(config=ConfigDict(arbitrary_types_allowed=True))
def unpaved_from_input(
    self,
    catchments: ExtendedGeoDataFrame,
    landuse: StrictStr | Path,
    surface_level: StrictStr | Path,
    soiltype: StrictStr | Path,
    surface_storage: StrictStr | Path | float,
    infiltration_capacity: StrictStr | Path | float,
    initial_gwd: StrictStr | Path | float,
    meteo_areas: ExtendedGeoDataFrame,
    zonalstats_alltouched: bool = None,        
    greenhouse_areas: ExtendedGeoDataFrame = None
):
    """Generate contents of an unpaved node from raster data

    Args:
        catchments (ExtendedGeoDataFrame): catchment areas
        landuse (str): filename of land use raster
        surface_level (str): file name of surface level raster
        soiltype (str): file name of soiltype raster
        surface_storage (Union): numeric for spatially uniform surface storage (mm), or raster for distributed values
        infiltration_capacity (Union): numeric for spatially uniform infiltration capacity (mm/d), or raster for distributed values
        initial_gwd (Union): numeric for spatially uniform initial groundwater levels (m), or raster for distributed values
        meteo_areas (ExtendedGeoDataFrame): meteo areas, for each station a meteo time series is assigned
        zonalstats_alltouched (bool, optional): method to carry out zonal statistics, see rasterstats docx. Defaults to False.
    """
    all_touched = False if zonalstats_alltouched is None else zonalstats_alltouched

    # required rasters
    warnings.filterwarnings("ignore")
    lu_rast, lu_affine = self.unpaved.drrmodel.read_raster(landuse, static=True)
    lu_counts = zonal_stats(
        gpd.GeoDataFrame(catchments),
        lu_rast.astype(int),
        affine=lu_affine,
        categorical=True,
        all_touched=all_touched,
    )

    soil_rast, affine = self.unpaved.drrmodel.read_raster(soiltype, static=True)
    soiltypes = zonal_stats(
        gpd.GeoDataFrame(catchments),
        soil_rast.astype(int),
        affine=affine,
        stats="majority",
        all_touched=all_touched,
    )

    rast, affine = self.unpaved.drrmodel.read_raster(surface_level, static=True)
    mean_elev = zonal_stats(
        gpd.GeoDataFrame(catchments), rast.astype(float), affine=affine, stats="median", all_touched=all_touched
    )

    # optional rasters
    if isinstance(surface_storage, str):
        rast, affine = self.unpaved.drrmodel.read_raster(
            surface_storage, static=True
        )
        sstores = zonal_stats(
            gpd.GeoDataFrame(catchments), rast.astype(float), affine=affine, stats="mean", all_touched=True
        )
    elif isinstance(surface_storage, int):
        surface_storage = float(surface_storage)
    if isinstance(infiltration_capacity, str):
        rast, affine = self.unpaved.drrmodel.read_raster(
            infiltration_capacity, static=True
        )
        infcaps = zonal_stats(
            gpd.GeoDataFrame(catchments), rast.astype(float), affine=affine, stats="mean", all_touched=True
        )
    elif isinstance(infiltration_capacity, int):
        infiltration_capacity = float(infiltration_capacity)
    if isinstance(initial_gwd, str):
        rast, affine = self.unpaved.drrmodel.read_raster(initial_gwd, static=True)
        ini_gwds = zonal_stats(
            gpd.GeoDataFrame(catchments), rast.astype(float), affine=affine, stats="mean", all_touched=True
        )
    elif isinstance(initial_gwd, int):
        initial_gwd = float(initial_gwd)

    # get raster cellsize
    px_area = lu_affine[0] * -lu_affine[4]

    unpaved_drr = ExtendedDataFrame(required_columns=["id"])
    unpaved_drr.set_data(
        pd.DataFrame(
            np.zeros((len(catchments), 12)),
            columns=[
                "id",
                "total_area",
                "lu_areas",
                "surface_level",
                "soiltype",
                "surface_storage",
                "infiltration_capacity",
                "initial_gwd",
                "meteo_area",
                "px",
                "py",
                "boundary_node",
            ],
            dtype="str",
        ),
        index_col="id",
    )

    unpaved_drr.index = catchments.code
    # HyDAMO Crop code; hydamo name, sobek index, sobek name:
    # 1 aardappelen   3 potatoes
    # 2 graan         5 grain
    # 3 suikerbiet    4 sugarbeet
    # 4 mais          2 corn
    # 5 overige gew. 15 vegetables
    # 6 bloembollen  10 bulbous plants
    # 7 boomgaard     9 orchard
    # 8 gras          1 grass
    # 9 loofbos      11 dediduous
    # 10 naaldbos    12 conferous
    # 11 natuuur     13 nature
    # 12 braak       14 fallow
    sobek_indices = [3, 5, 4, 2, 15, 10, 9, 1, 11, 12, 13, 14]
    for num, cat in enumerate(catchments.itertuples()):
        # if no rasterdata could be obtained for this catchment, skip it.
        if mean_elev[num]["median"] is None:
            logger.warning(NO_RASTERDATA_WARNING, cat.code)
            continue
        tm = [
            m
            for m in meteo_areas.itertuples()
            if m.geometry.contains(cat.geometry.centroid)
        ]
        ms = meteo_areas.iloc[0, :][0] if tm == [] else tm[0].code
        mapping = np.zeros(16, dtype=int)

        # subtract greenhouse area from most occurring land use if no greenhouse area is in the landuse map
        if greenhouse_areas is not None:
            if cat.geometry.intersects(greenhouse_areas.geometry).any():
                intersection_area = cat.geometry.intersection(greenhouse_areas.geometry).area
                intersection_area = intersection_area[intersection_area > 0.].to_numpy()[0]                    
                if 15 in lu_counts[num]:
                    # divide area to subtract between greenhouses and the most occurring area
                    remainder = np.max([0., intersection_area - float(lu_counts[num][15]*px_area)])                                                
                else:    
                    remainder = intersection_area
                maxind = np.argmax(list(lu_counts[num].values()))              
                logger.info(
                    "Catchment %s: subtracting %s m2 from class %s for supplied greenhouse area.",
                    cat.code,
                    remainder,
                    maxind,
                )
                lu_counts[num][list(lu_counts[num].keys())[maxind]] = np.max([0., (lu_counts[num][list(lu_counts[num].keys())[maxind]] - np.round(remainder/px_area))])

        for i in range(1, 13):
            if i in lu_counts[num]:
                mapping[sobek_indices[i - 1] - 1] = lu_counts[num][i] * px_area
        lu_map = " ".join(map(str, mapping))
        elev = mean_elev[num]["median"]
        unpaved_drr.at[cat.code, "id"] = str(cat.code)
        unpaved_drr.at[cat.code, "total_area"] = f"{cat.geometry.area:.0f}"
        unpaved_drr.at[cat.code, "lu_areas"] = lu_map
        unpaved_drr.at[cat.code, "surface_level"] = f"{elev:.2f}"
        unpaved_drr.at[
            cat.code, "soiltype"
        ] = f'{soiltypes[num]["majority"]+100.:.0f}'
        if isinstance(surface_storage, float):
            unpaved_drr.at[cat.code, "surface_storage"] = f"{surface_storage:.3f}"
        else:
            unpaved_drr.at[
                cat.code, "surface_storage"
            ] = f'{sstores[num]["mean"]:.3f}'
        if isinstance(infiltration_capacity, float):
            unpaved_drr.at[
                cat.code, "infiltration_capacity"
            ] = f"{infiltration_capacity:.3f}"
        else:
            unpaved_drr.at[
                cat.code, "infiltration_capacity"
            ] = f'{infcaps[num]["mean"]:.3f}'
        if isinstance(initial_gwd, float):
            unpaved_drr.at[cat.code, "initial_gwd"] = f"{initial_gwd:.2f}"
        else:
            unpaved_drr.at[cat.code, "initial_gwd"] = f'{ini_gwds[num]["mean"]:.2f}'
        unpaved_drr.at[cat.code, "meteo_area"] = str(ms)
        unpaved_drr.at[
            cat.code, "px"
        ] = f"{cat.geometry.centroid.coords[0][0]-10:.0f}"
        unpaved_drr.at[cat.code, "py"] = f"{cat.geometry.centroid.coords[0][1]:.0f}"
        unpaved_drr.at[cat.code, "boundary_node"] = cat.boundary_node

    [
        self.unpaved.add_unpaved(**unpaved)
        for unpaved in unpaved_drr.to_dict("records")
    ]

drrwriter

DRRWriter

Writer for RR files

Source code in hydrolib/dhydamo/io/drrwriter.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
class DRRWriter:
    """Writer for RR files"""

    def __init__(self, rrmodel, output_dir, name=None, wwtp=None):
        self.rrmodel = rrmodel
        # self.geometries = parent.geometries
        # self.boundary_conditions = parent.boundary_conditions
        self.output_dir = os.path.join(output_dir, "rr")

        if name is not None:
            logger.warning(
                "The 'name' parameter is deprecated and will be removed in a future version.",                
        )

        hydamo = HyDAMO()
        self.version = hydamo.version

        if os.path.exists(self.output_dir):
            shutil.rmtree(self.output_dir)

        self.d3b_parameters = rrmodel.d3b_parameters

        self.precip_df = {}
        self.run_dimrpad = rrmodel.dimr_path

        if wwtp is None:
            self.wwtp = Point((1e5, 5e5))
        else:
            self.wwtp = Point((wwtp[0], wwtp[1]))

    def write_all(self):  # write all RR files
        """
        Wrapper method to write all components
        """
        # copy all 'standard' files from the resources folder
        self.copyRRFiles()

        self.write_topology()

        self.write_unpaved()

        self.write_paved()

        self.write_greenhouse()

        self.write_openwater()

        self.write_meteo()

        # Change d3b parameters
        for parameter, value in self.d3b_parameters.items():
            self.change_d3b_parameter(parameter, value)

    def copyRRFiles(self):
        """
        Method to copy the RR-files from resources.

        Returns
        -------
        bool
            Succesfull or not.

        """
        srcRR = os.path.join(os.path.dirname(__file__), "..", "resources", "RR")
        targetRR = os.path.join(self.output_dir)
        shutil.copytree(srcRR, targetRR)
        return True

    def write_topology(self):
        """
        Wrapper to write the topolgy files for RR. The following files are written:
            3B_NOD.TP: IDs and coordinates of all nodes: unpaved, paved, greenhouse, open water and boundary. Only nodes with a total area of >0 are written. Nodes are placed up to 20 m to the east and west of the catchment centroid.
            3B_LINK.TP: Links between all nodes and the correct boundaries.
            BOUND3B.3B: coupling information for every RR boundary.
            BoundaryConditions.bc: see bound3b.3b
        Returns
        -------
        None.

        """
        filepath = os.path.join(self.output_dir, "3B_NOD.TP")
        with open(filepath, "w") as f:

            # Unpaved nodes
            if any(self.rrmodel.unpaved.unp_nodes):
                for _, dct in self.rrmodel.unpaved.unp_nodes.items():
                    if np.sum([float(d) for d in dct["ar"].split(" ")]) > 0.0:
                        f.write(
                            "NODE id '"
                            + dct["id"]
                            + "' nm '"
                            + dct["id"]
                            + "' ri '-1' mt 1 '2' nt 44 ObID '3B_UNPAVED' px "
                            + dct["px"]
                            + " py "
                            + dct["py"]
                            + " node\n"
                        )

            # Paved nodes
            if any(self.rrmodel.paved.pav_nodes):
                for _, dct in self.rrmodel.paved.pav_nodes.items():
                    if float(dct["ar"]) > 0.0:
                        f.write(
                            "NODE id '"
                            + dct["id"]
                            + "' nm '"
                            + dct["id"]
                            + "' ri '-1' mt 1 '1' nt 43 ObID '3B_PAVED' px "
                            + dct["px"]
                            + " py "
                            + dct["py"]
                            + " node\n"
                        )

                f.write(
                    "NODE id 'WWTP' nm 'WWTP' ri '-1' mt 1 '14' nt 56 ObID '3B_WWTP' px "
                    + str(self.wwtp.coords[0][0])
                    + " py "
                    + str(self.wwtp.coords[0][1])
                    + " node\n"
                )
                f.write(
                    "NODE id 'WWTP_BND' nm 'WWTP_BND' ri '-1' mt 1 '6' nt 47 ObID '3B_BOUNDARY' px "
                    + str(self.wwtp.coords[0][0] + 50.0)
                    + " py "
                    + str(self.wwtp.coords[0][1] + 50.0)
                    + " node\n"
                )

            # Greenhouse nodes
            if any(self.rrmodel.greenhouse.gh_nodes):
                for _, dct in self.rrmodel.greenhouse.gh_nodes.items():
                    if float(dct["ar"]) > 0.0:
                        f.write(
                            "NODE id '"
                            + dct["id"]
                            + "' nm '"
                            + dct["id"]
                            + "' ri '-1' mt 1 '3' nt 45 ObID '3B_GREENHOUSE' px "
                            + dct["px"]
                            + " py "
                            + dct["py"]
                            + " node\n"
                        )

            # Openwater nodes
            if any(self.rrmodel.openwater.ow_nodes):
                for _, dct in self.rrmodel.openwater.ow_nodes.items():
                    if float(dct["ar"]) > 0.0:
                        f.write(
                            "NODE id '"
                            + dct["id"]
                            + "' nm '"
                            + dct["id"]
                            + "' ri '-1' mt 1 '21' nt 46 ObID 'OW_PRECIP' px "
                            + dct["px"]
                            + " py "
                            + dct["py"]
                            + " node\n"
                        )

            # Boundary_nodes
            if any(self.rrmodel.external_forcings.boundary_nodes):
                for _, dct in self.rrmodel.external_forcings.boundary_nodes.items():
                    f.write(
                        "NODE id '"
                        + dct["id"]
                        + "' nm '"
                        + dct["id"]
                        + "' ri '-1' mt 1 '6' nt 78 ObID 'SBK_SBK-3B-NODE' px "
                        + dct["px"]
                        + " py "
                        + dct["py"]
                        + " node\n"
                    )

        filepath = os.path.join(self.output_dir, "3B_LINK.TP")
        with open(filepath, "w") as f:

            cnt = 0
            if any(self.rrmodel.unpaved.unp_nodes):
                for _, dct in self.rrmodel.unpaved.unp_nodes.items():
                    if np.sum([float(d) for d in dct["ar"].split(" ")]) > 0.0:
                        cnt += 1
                        f.write(
                            "BRCH id 'link_"
                            + str(cnt)
                            + "' ri '-1' mt 1 '0' bt 17 ObID '3B_LINK' bn '"
                            + dct["id"]
                            + "' en '"
                            + dct["boundary_node"]
                            + "' brch\n"
                        )
            if any(self.rrmodel.paved.pav_nodes):
                for _, dct in self.rrmodel.paved.pav_nodes.items():
                    if float(dct["ar"]) > 0.0:
                        cnt += 1
                        f.write(
                            "BRCH id 'link_"
                            + str(cnt)
                            + "' ri '-1' mt 1 '0' bt 17 ObID '3B_LINK' bn '"
                            + dct["id"]
                            + "' en '"
                            + dct["boundary_node"]
                            + "' brch\n"
                        )
                        cnt += 1
                        f.write(
                            "BRCH id 'link_"
                            + str(cnt)
                            + "' ri '-1' mt 1 '1' bt 18 ObID '3B_LINK_RWZI' bn '"
                            + dct["id"]
                            + "' en 'WWTP' brch\n"
                        )
                cnt += 1
                f.write(
                    "BRCH id 'link_"
                    + str(cnt)
                    + "' ri '-1' mt 1 '0' bt 17 ObID '3B_LINK' bn 'WWTP' en 'WWTP_BND' brch\n"
                )

            if any(self.rrmodel.greenhouse.gh_nodes):
                for _, dct in self.rrmodel.greenhouse.gh_nodes.items():
                    if float(dct["ar"]) > 0.0:
                        cnt += 1
                        f.write(
                            "BRCH id 'link_"
                            + str(cnt)
                            + "' ri '-1' mt 1 '0' bt 17 ObID '3B_LINK' bn '"
                            + dct["id"]
                            + "' en '"
                            + dct["boundary_node"]
                            + "' brch\n"
                        )
            if any(self.rrmodel.openwater.ow_nodes):
                for _, dct in self.rrmodel.openwater.ow_nodes.items():
                    if float(dct["ar"]) > 0.0:
                        cnt += 1
                        f.write(
                            "BRCH id 'link_"
                            + str(cnt)
                            + "' ri '-1' mt 1 '0' bt 17 ObID '3B_LINK' bn '"
                            + dct["id"]
                            + "' en '"
                            + dct["boundary_node"]
                            + "' brch\n"
                        )

        # bound3b.3b
        filepath = os.path.join(self.output_dir, "Bound3B.3B")
        with open(filepath, "w") as f:
            for _, dct in self.rrmodel.external_forcings.boundary_nodes.items():
                f.write("BOUN id '" + dct["id"] + "' bl 2 '0' is 0 boun\n")
            if any(self.rrmodel.paved.pav_nodes):
                f.write("BOUN id 'WWTP_BND' bl 0 -3 is 0 boun\n")

        # BoundaryConditions.bc
        filepath = os.path.join(self.output_dir, "BoundaryConditions.bc")
        header = {"fileVersion": "1.01", "fileType": "boundConds"}
        with open(filepath, "w") as f:
            self._write_dict(f, header, "General", "\n")
            for _, dct in self.rrmodel.external_forcings.boundary_nodes.items():
                temp = {
                    "name": "" + dct["id"],
                    "function": "constant",
                    "quantity": "water_level",
                    "unit": "m",
                }
                self._write_dict(f, temp, "Boundary", "    0\n\n")
            if any(self.rrmodel.paved.pav_nodes):
                temp = {
                    "name": "WWTP_BND",
                    "function": "constant",
                    "quantity": "water_level",
                    "unit": "m",
                }
                self._write_dict(f, temp, "Boundary", "    0\n\n")

    def write_unpaved(self):
        """
        Method to write all files associated with unpaved nodes: UNPAVED.3B, UNPAVED.ALF, UNPAVED.STO, UNPAVED.INF and UNPAVED.SEP. All files contain a definition for every node  because they may or may not be spatially distributed.

        Returns
        -------
        None.

        """
        if any(self.rrmodel.unpaved.unp_nodes):
            filepath = os.path.join(self.output_dir, "UNPAVED.3B")
            with open(filepath, "w") as f:
                for _, dct in self.rrmodel.unpaved.unp_nodes.items():
                    if np.sum([float(d) for d in dct["ar"].split(" ")]) > 0.0:
                        f.write(
                            "UNPV id '"
                            + dct["id"]
                            + "' na "
                            + dct["na"]
                            + " ga "
                            + dct["ga"]
                            + " ar "
                            + dct["ar"]
                            + " lv "
                            + dct["lv"]
                            + " co "
                            + dct["co"]
                            + " su "
                            + dct["su"]
                            + " sd 'sto_"
                            + dct["id"]
                            + "' ic 'inf_"
                            + dct["id"]
                            + "' bt "
                            + dct["bt"]
                            + " ed '"
                            + dct["ed"]
                            + "' sp '"
                            + dct["sp"]
                            + "' ig 0 "
                            + dct["ig"]
                            + " mg "
                            + dct["mg"]
                            + " gl "
                            + dct["gl"]
                            + " is "
                            + dct["is"]
                            + " ms '"
                            + dct["ms"]
                            + "' unpv\n"
                        )

            filepath = os.path.join(self.output_dir, "UNPAVED.STO")
            with open(filepath, "w") as f:
                for _, dct in self.rrmodel.unpaved.unp_nodes.items():
                    if np.sum([float(d) for d in dct["ar"].split(" ")]) > 0.0:
                        f.write(
                            "STDF id 'sto_"
                            + dct["id"]
                            + "' nm 'sto_"
                            + dct["id"]
                            + "' ml "
                            + dct["sd"]
                            + " il 0 stdf\n"
                        )

            filepath = os.path.join(self.output_dir, "UNPAVED.INF")
            with open(filepath, "w") as f:
                for _, dct in self.rrmodel.unpaved.unp_nodes.items():
                    if np.sum([float(d) for d in dct["ar"].split(" ")]) > 0.0:
                        f.write(
                            "INFC id 'inf_"
                            + dct["id"]
                            + "' nm 'inf_"
                            + dct["id"]
                            + "' ic "
                            + dct["ic"]
                            + " infc\n"
                        )

            filepath = os.path.join(self.output_dir, "UNPAVED.ALF")
            with open(filepath, "w") as f:
                for _, dct in self.rrmodel.unpaved.ernst_defs.items():
                    if (
                        np.sum(
                            [
                                float(d)
                                for d in self.rrmodel.unpaved.unp_nodes[_]["ar"].split(
                                    " "
                                )
                            ]
                        )
                        > 0.0
                    ):
                        f.write(
                            "ERNS id '"
                            + dct["id"]
                            + "' nm '"
                            + dct["id"]
                            + "' cvi "
                            + dct["cvi"]
                            + " cvo 0 "
                            + dct["cvo"]
                            + " cvs "
                            + dct["cvs"]
                            + " lv "
                            + dct["lv"]
                            + " erns\n"
                        )

            filepath = os.path.join(self.output_dir, "UNPAVED.SEP")
            with open(filepath, "w") as f:
                for _, dct in self.rrmodel.unpaved.unp_nodes.items():
                    if np.sum([float(d) for d in dct["ar"].split(" ")]) > 0.0:
                        f.write(
                            "SEEP id '"
                            + dct["sp"]
                            + "' nm '"
                            + dct["sp"]
                            + "' co 4 PDIN 0 0 pdin ss 0\n"
                        )
                        f.write("TBLE\n")
                        for row in self.rrmodel.external_forcings.seepage[dct["sp"]][
                            "seepage"
                        ].items():
                            f.write(
                                "'"
                                + row[0].strftime("%Y/%m/%d;%H:%M:%S")
                                + "' "
                                + f"{row[1]:.5f} <\n"
                            )
                        f.write("tble\nseep\n")

    def write_paved(self):
        """
        Method to write all files associated with paved nodes: PAVED.3B, PAVED.STO and PAVED.DWA. The latter contains only one definition.

        Returns
        -------
        None.

        """
        if any(self.rrmodel.paved.pav_nodes):
            filepath = os.path.join(self.output_dir, "PAVED.3B")
            with open(filepath, "w") as f:
                for _, dct in self.rrmodel.paved.pav_nodes.items():
                    if float(dct["ar"]) > 0.0:
                        f.write(
                            "PAVE id '"
                            + dct["id"]
                            + "' ar "
                            + dct["ar"]
                            + " lv "
                            + dct["lv"]
                            + " sd 'sto_"
                            + dct["id"]
                            + "' ss 0 qc 0 "
                            + dct["qc"]
                            + " 0 qo 2 2 ms '"
                            + dct["ms"]
                            + "' is "
                            + dct["is"]
                            + " np "
                            + dct["np"]
                            + " dw 'Def_DWA' ro "
                            + dct["ro"]
                            + " ru "
                            + dct["ru"]
                            + " qh '' pave\n"
                        )

            filepath = os.path.join(self.output_dir, "PAVED.STO")
            with open(filepath, "w") as f:
                for _, dct in self.rrmodel.paved.pav_nodes.items():
                    if float(dct["ar"]) > 0.0:
                        f.write(
                            "STDF id 'sto_"
                            + dct["id"]
                            + "' nm 'sto_"
                            + dct["id"]
                            + "' ms "
                            + dct["strs"]
                            + " is 0 mr "
                            + dct["sews"]
                            + " 0.0 ir 0.0 0.0 stdf\n"
                        )

            filepath = os.path.join(self.output_dir, "PAVED.DWA")
            with open(filepath, "w") as f:
                f.write(
                    "DWA id 'Def_DWA' nm 'Def_DWA' do 1 wc 0 wd 0 wh 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 dwa\n"
                )

            filepath = os.path.join(self.output_dir, "WWTP.3B")
            with open(filepath, "w") as f:
                f.write("WWTP id 'WWTP' tb 0  wwtp\n")

            filepath = os.path.join(self.output_dir, "WWTP.tbl")
            with open(filepath, "w") as f:
                f.write("\n")

    def write_greenhouse(self):
        """
        Method to write all files associated with greenhouse nodes: GREENHSE.3B, GREENHSE.RF and GREENHSE.SIL. The latter contains only one definition.

        Returns
        -------
        None.

        """
        if any(self.rrmodel.greenhouse.gh_nodes):
            filepath = os.path.join(self.output_dir, "GREENHSE.3B")
            with open(filepath, "w") as f:
                for _, dct in self.rrmodel.greenhouse.gh_nodes.items():                    
                    if float(dct["ar"]) > 0.0:
                        area_string = ' '.join(['0' for _ in range(int(dct['basin_storage_class'])-1)]+ [dct['ar']]+['0' for _ in range(10-(int(dct['basin_storage_class'])))])
                        f.write(
                            "GRHS id '"
                            + dct["id"]
                            + "' na 10 ar "
                            + area_string
                            + " sl "
                            + dct["sl"]
                            + " as 0 si 'Def_silo' sd 'sto_"
                            + dct["id"]
                            + "' ms '"
                            + dct["ms"]
                            + "' is "
                            + dct["is"]
                            + " grhs\n"
                        )

            filepath = os.path.join(self.output_dir, "GREENHSE.SIL")
            with open(filepath, "w") as f:
                f.write("SILO id 'Def_silo' nm 'Def_silo' sc 0.0 pc 0.0 silo\n")

            filepath = os.path.join(self.output_dir, "GREENHSE.RF")
            with open(filepath, "w") as f:
                for _, dct in self.rrmodel.greenhouse.gh_nodes.items():
                    if float(dct["ar"]) > 0.0:
                        f.write(
                            "STDF id 'sto_"
                            + dct["id"]
                            + "' nm 'sto_"
                            + dct["id"]
                            + "' mk "
                            + dct["sd"]
                            + " ik 0 stdf\n"
                        )

    def write_openwater(self):
        """
         Method to write OPENWATE.3B file.

        Returns
        -------
        None.

        """
        # write openwater.3b
        if any(self.rrmodel.openwater.ow_nodes):
            filepath = os.path.join(self.output_dir, "OPENWATE.3B")
            with open(filepath, "w") as f:
                for _, dct in self.rrmodel.openwater.ow_nodes.items():
                    if float(dct["ar"]) > 0.0:
                        f.write(
                            "OWRR id '"
                            + dct["id"]
                            + "' ar "
                            + dct["ar"]
                            + " ms '"
                            + dct["ms"]
                            + "' owrr\n"
                        )

    def write_meteo(self):
        """
        Method to write meteofiles (DEFAULT.BUI and DEFAULT.EVP) based on values per catchment.

        Returns
        -------
        None.

        """
        # precip
        filepath = os.path.join(self.output_dir, "DEFAULT.BUI")
        if isinstance(self.rrmodel.external_forcings.precip, str):
            shutil.copy(self.rrmodel.external_forcings.precip, filepath)
        else:
            df = {}
            for ii, ms in enumerate(self.rrmodel.external_forcings.precip.items()):
                df[ms[0]] = pd.Series(ms[1]["precip"])

            f = open(filepath, "w")
            f.write("*Name of this file: c:\\Result\\1058\\DEFAULT.BUI\n")
            f.close()
            f = open(filepath, "a")
            f.write("*Date and time of construction: 00/00/2000 00:00:00.\n")
            f.write("1\n")
            f.write("*Aantal stations\n")
            f.write(str(len(self.rrmodel.external_forcings.precip)) + "\n")
            f.write("*Namen van stations\n")
            for ms in self.rrmodel.external_forcings.precip.items():
                f.write("'" + ms[0] + "'" + "\n")
            f.write(
                "*Aantal gebeurtenissen (omdat het 1 bui betreft is dit altijd 1)\n"
            )
            f.write("*en het aantal seconden per waarnemingstijdstap\n")
            f.write(
                f'1 {(ms[1]["precip"].index[1]-ms[1]["precip"].index[0]).total_seconds():.0f}\n'
            )
            f.write("*Elke commentaarregel wordt begonnen met een * (asterisk).\n")
            f.write(
                "*Eerste record bevat startdatum en -tijd, lengte van de gebeurtenis in dd hh mm ss\n"
            )
            f.write("*Het format is: yyyymmdd:hhmmss:ddhhmmss")
            f.write("*Daarna voor elk station de neerslag in mm per tijdstap.\n")
            duration = (
                ms[1]["precip"].index[-1] - ms[1]["precip"].index[0]
            ).total_seconds()
            f.write(
                ms[1]["precip"].index[0].strftime("%Y %#m %#d %#H %#M %#S ")
                + str(int(duration / 86400.0))
                + " "
                + str(int(np.remainder(duration, 86400.0) / 3600.0))
                + " 0 0\n"
            )
            f.close()
            self._dict_to_df()
            self.precip_df.to_csv(
                filepath,
                header=False,
                index=False,
                sep=" ",
                float_format="%.3f",
                mode="a",
            )
        #%%

        #%%%
        # evaporation
        filepath = os.path.join(self.output_dir, "DEFAULT.EVP")
        if isinstance(self.rrmodel.external_forcings.evap, str):
            shutil.copy(self.rrmodel.external_forcings.evap, filepath)
        else:
            f = open(filepath, "w")
            f.write("*Verdampingsfile\n")
            f.write("*Meteo data: evaporation intensity in mm/day\n")
            f.write("*First record: start date, data in mm/day\n")
            f.write(
                "*Datum (year month day), verdamping (mm/dag) voor elk weerstation\n"
            )
            f.write("*jaar maand dag verdamping[mm]\n")
            f.close()
            table = list(self.rrmodel.external_forcings.evap.values())[0]["evap"]
            table = table.sort_index()
            date_format = "%#Y  %#m  %#d "
            float_format = "%.3f"
            sep = " "
            with open(filepath, "a") as f:
                for ts, val in table.items():
                    date = ts.strftime(date_format)
                    f.write(f"{date}{sep}{float_format % val}\n")

    def _dict_to_df(self):
        """
        Method to convert the precip dictionary to a dataframe
        Returns
        -------
        None.

        """
        df = {}
        for ii, stat in enumerate(self.rrmodel.external_forcings.precip.items()):
            df[stat[0]] = pd.Series(stat[1]["precip"])
        self.precip_df = pd.DataFrame(df)

    def _write_dict(self, f, dct, header, endline):
        """
        Write a dictionary to file, without transforming with mapping
        """
        # Write header
        f.write(f"[{header}]\n")
        for key, value in dct.items():
            f.write(f"    {key:20s} = {value:20s}\n")
        f.write(endline)

    def change_d3b_parameter(self, parameter, value):
        """
        Change parameter value in delft3b.ini file
        """
        with open(os.path.join(self.output_dir, "DELFT_3B.INI")) as f:
            lines = f.readlines()

        for i, line in enumerate(lines):
            if line.lower().strip().startswith(parameter.lower()):
                items = re.split("=", line)
                key, oldvalue = items[0], items[1]
                if not key.strip().lower() == parameter.lower():
                    continue
                lines[i] = line.replace(
                    "=" + oldvalue, f"={value}\n"
                )
                break

        with open(os.path.join(self.output_dir, "DELFT_3B.INI"), "w") as f:
            f.write("".join(lines))
change_d3b_parameter(parameter, value)

Change parameter value in delft3b.ini file

Source code in hydrolib/dhydamo/io/drrwriter.py
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
def change_d3b_parameter(self, parameter, value):
    """
    Change parameter value in delft3b.ini file
    """
    with open(os.path.join(self.output_dir, "DELFT_3B.INI")) as f:
        lines = f.readlines()

    for i, line in enumerate(lines):
        if line.lower().strip().startswith(parameter.lower()):
            items = re.split("=", line)
            key, oldvalue = items[0], items[1]
            if not key.strip().lower() == parameter.lower():
                continue
            lines[i] = line.replace(
                "=" + oldvalue, f"={value}\n"
            )
            break

    with open(os.path.join(self.output_dir, "DELFT_3B.INI"), "w") as f:
        f.write("".join(lines))
copyRRFiles()

Method to copy the RR-files from resources.

Returns

bool Succesfull or not.

Source code in hydrolib/dhydamo/io/drrwriter.py
72
73
74
75
76
77
78
79
80
81
82
83
84
85
def copyRRFiles(self):
    """
    Method to copy the RR-files from resources.

    Returns
    -------
    bool
        Succesfull or not.

    """
    srcRR = os.path.join(os.path.dirname(__file__), "..", "resources", "RR")
    targetRR = os.path.join(self.output_dir)
    shutil.copytree(srcRR, targetRR)
    return True
write_all()

Wrapper method to write all components

Source code in hydrolib/dhydamo/io/drrwriter.py
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
def write_all(self):  # write all RR files
    """
    Wrapper method to write all components
    """
    # copy all 'standard' files from the resources folder
    self.copyRRFiles()

    self.write_topology()

    self.write_unpaved()

    self.write_paved()

    self.write_greenhouse()

    self.write_openwater()

    self.write_meteo()

    # Change d3b parameters
    for parameter, value in self.d3b_parameters.items():
        self.change_d3b_parameter(parameter, value)
write_greenhouse()

Method to write all files associated with greenhouse nodes: GREENHSE.3B, GREENHSE.RF and GREENHSE.SIL. The latter contains only one definition.

Returns

None.

Source code in hydrolib/dhydamo/io/drrwriter.py
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
def write_greenhouse(self):
    """
    Method to write all files associated with greenhouse nodes: GREENHSE.3B, GREENHSE.RF and GREENHSE.SIL. The latter contains only one definition.

    Returns
    -------
    None.

    """
    if any(self.rrmodel.greenhouse.gh_nodes):
        filepath = os.path.join(self.output_dir, "GREENHSE.3B")
        with open(filepath, "w") as f:
            for _, dct in self.rrmodel.greenhouse.gh_nodes.items():                    
                if float(dct["ar"]) > 0.0:
                    area_string = ' '.join(['0' for _ in range(int(dct['basin_storage_class'])-1)]+ [dct['ar']]+['0' for _ in range(10-(int(dct['basin_storage_class'])))])
                    f.write(
                        "GRHS id '"
                        + dct["id"]
                        + "' na 10 ar "
                        + area_string
                        + " sl "
                        + dct["sl"]
                        + " as 0 si 'Def_silo' sd 'sto_"
                        + dct["id"]
                        + "' ms '"
                        + dct["ms"]
                        + "' is "
                        + dct["is"]
                        + " grhs\n"
                    )

        filepath = os.path.join(self.output_dir, "GREENHSE.SIL")
        with open(filepath, "w") as f:
            f.write("SILO id 'Def_silo' nm 'Def_silo' sc 0.0 pc 0.0 silo\n")

        filepath = os.path.join(self.output_dir, "GREENHSE.RF")
        with open(filepath, "w") as f:
            for _, dct in self.rrmodel.greenhouse.gh_nodes.items():
                if float(dct["ar"]) > 0.0:
                    f.write(
                        "STDF id 'sto_"
                        + dct["id"]
                        + "' nm 'sto_"
                        + dct["id"]
                        + "' mk "
                        + dct["sd"]
                        + " ik 0 stdf\n"
                    )
write_meteo()

Method to write meteofiles (DEFAULT.BUI and DEFAULT.EVP) based on values per catchment.

Returns

None.

Source code in hydrolib/dhydamo/io/drrwriter.py
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
def write_meteo(self):
    """
    Method to write meteofiles (DEFAULT.BUI and DEFAULT.EVP) based on values per catchment.

    Returns
    -------
    None.

    """
    # precip
    filepath = os.path.join(self.output_dir, "DEFAULT.BUI")
    if isinstance(self.rrmodel.external_forcings.precip, str):
        shutil.copy(self.rrmodel.external_forcings.precip, filepath)
    else:
        df = {}
        for ii, ms in enumerate(self.rrmodel.external_forcings.precip.items()):
            df[ms[0]] = pd.Series(ms[1]["precip"])

        f = open(filepath, "w")
        f.write("*Name of this file: c:\\Result\\1058\\DEFAULT.BUI\n")
        f.close()
        f = open(filepath, "a")
        f.write("*Date and time of construction: 00/00/2000 00:00:00.\n")
        f.write("1\n")
        f.write("*Aantal stations\n")
        f.write(str(len(self.rrmodel.external_forcings.precip)) + "\n")
        f.write("*Namen van stations\n")
        for ms in self.rrmodel.external_forcings.precip.items():
            f.write("'" + ms[0] + "'" + "\n")
        f.write(
            "*Aantal gebeurtenissen (omdat het 1 bui betreft is dit altijd 1)\n"
        )
        f.write("*en het aantal seconden per waarnemingstijdstap\n")
        f.write(
            f'1 {(ms[1]["precip"].index[1]-ms[1]["precip"].index[0]).total_seconds():.0f}\n'
        )
        f.write("*Elke commentaarregel wordt begonnen met een * (asterisk).\n")
        f.write(
            "*Eerste record bevat startdatum en -tijd, lengte van de gebeurtenis in dd hh mm ss\n"
        )
        f.write("*Het format is: yyyymmdd:hhmmss:ddhhmmss")
        f.write("*Daarna voor elk station de neerslag in mm per tijdstap.\n")
        duration = (
            ms[1]["precip"].index[-1] - ms[1]["precip"].index[0]
        ).total_seconds()
        f.write(
            ms[1]["precip"].index[0].strftime("%Y %#m %#d %#H %#M %#S ")
            + str(int(duration / 86400.0))
            + " "
            + str(int(np.remainder(duration, 86400.0) / 3600.0))
            + " 0 0\n"
        )
        f.close()
        self._dict_to_df()
        self.precip_df.to_csv(
            filepath,
            header=False,
            index=False,
            sep=" ",
            float_format="%.3f",
            mode="a",
        )
    #%%

    #%%%
    # evaporation
    filepath = os.path.join(self.output_dir, "DEFAULT.EVP")
    if isinstance(self.rrmodel.external_forcings.evap, str):
        shutil.copy(self.rrmodel.external_forcings.evap, filepath)
    else:
        f = open(filepath, "w")
        f.write("*Verdampingsfile\n")
        f.write("*Meteo data: evaporation intensity in mm/day\n")
        f.write("*First record: start date, data in mm/day\n")
        f.write(
            "*Datum (year month day), verdamping (mm/dag) voor elk weerstation\n"
        )
        f.write("*jaar maand dag verdamping[mm]\n")
        f.close()
        table = list(self.rrmodel.external_forcings.evap.values())[0]["evap"]
        table = table.sort_index()
        date_format = "%#Y  %#m  %#d "
        float_format = "%.3f"
        sep = " "
        with open(filepath, "a") as f:
            for ts, val in table.items():
                date = ts.strftime(date_format)
                f.write(f"{date}{sep}{float_format % val}\n")
write_openwater()

Method to write OPENWATE.3B file.

Returns

None.

Source code in hydrolib/dhydamo/io/drrwriter.py
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
def write_openwater(self):
    """
     Method to write OPENWATE.3B file.

    Returns
    -------
    None.

    """
    # write openwater.3b
    if any(self.rrmodel.openwater.ow_nodes):
        filepath = os.path.join(self.output_dir, "OPENWATE.3B")
        with open(filepath, "w") as f:
            for _, dct in self.rrmodel.openwater.ow_nodes.items():
                if float(dct["ar"]) > 0.0:
                    f.write(
                        "OWRR id '"
                        + dct["id"]
                        + "' ar "
                        + dct["ar"]
                        + " ms '"
                        + dct["ms"]
                        + "' owrr\n"
                    )
write_paved()

Method to write all files associated with paved nodes: PAVED.3B, PAVED.STO and PAVED.DWA. The latter contains only one definition.

Returns

None.

Source code in hydrolib/dhydamo/io/drrwriter.py
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
def write_paved(self):
    """
    Method to write all files associated with paved nodes: PAVED.3B, PAVED.STO and PAVED.DWA. The latter contains only one definition.

    Returns
    -------
    None.

    """
    if any(self.rrmodel.paved.pav_nodes):
        filepath = os.path.join(self.output_dir, "PAVED.3B")
        with open(filepath, "w") as f:
            for _, dct in self.rrmodel.paved.pav_nodes.items():
                if float(dct["ar"]) > 0.0:
                    f.write(
                        "PAVE id '"
                        + dct["id"]
                        + "' ar "
                        + dct["ar"]
                        + " lv "
                        + dct["lv"]
                        + " sd 'sto_"
                        + dct["id"]
                        + "' ss 0 qc 0 "
                        + dct["qc"]
                        + " 0 qo 2 2 ms '"
                        + dct["ms"]
                        + "' is "
                        + dct["is"]
                        + " np "
                        + dct["np"]
                        + " dw 'Def_DWA' ro "
                        + dct["ro"]
                        + " ru "
                        + dct["ru"]
                        + " qh '' pave\n"
                    )

        filepath = os.path.join(self.output_dir, "PAVED.STO")
        with open(filepath, "w") as f:
            for _, dct in self.rrmodel.paved.pav_nodes.items():
                if float(dct["ar"]) > 0.0:
                    f.write(
                        "STDF id 'sto_"
                        + dct["id"]
                        + "' nm 'sto_"
                        + dct["id"]
                        + "' ms "
                        + dct["strs"]
                        + " is 0 mr "
                        + dct["sews"]
                        + " 0.0 ir 0.0 0.0 stdf\n"
                    )

        filepath = os.path.join(self.output_dir, "PAVED.DWA")
        with open(filepath, "w") as f:
            f.write(
                "DWA id 'Def_DWA' nm 'Def_DWA' do 1 wc 0 wd 0 wh 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 dwa\n"
            )

        filepath = os.path.join(self.output_dir, "WWTP.3B")
        with open(filepath, "w") as f:
            f.write("WWTP id 'WWTP' tb 0  wwtp\n")

        filepath = os.path.join(self.output_dir, "WWTP.tbl")
        with open(filepath, "w") as f:
            f.write("\n")
write_topology()

Wrapper to write the topolgy files for RR. The following files are written: 3B_NOD.TP: IDs and coordinates of all nodes: unpaved, paved, greenhouse, open water and boundary. Only nodes with a total area of >0 are written. Nodes are placed up to 20 m to the east and west of the catchment centroid. 3B_LINK.TP: Links between all nodes and the correct boundaries. BOUND3B.3B: coupling information for every RR boundary. BoundaryConditions.bc: see bound3b.3b Returns


None.

Source code in hydrolib/dhydamo/io/drrwriter.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
def write_topology(self):
    """
    Wrapper to write the topolgy files for RR. The following files are written:
        3B_NOD.TP: IDs and coordinates of all nodes: unpaved, paved, greenhouse, open water and boundary. Only nodes with a total area of >0 are written. Nodes are placed up to 20 m to the east and west of the catchment centroid.
        3B_LINK.TP: Links between all nodes and the correct boundaries.
        BOUND3B.3B: coupling information for every RR boundary.
        BoundaryConditions.bc: see bound3b.3b
    Returns
    -------
    None.

    """
    filepath = os.path.join(self.output_dir, "3B_NOD.TP")
    with open(filepath, "w") as f:

        # Unpaved nodes
        if any(self.rrmodel.unpaved.unp_nodes):
            for _, dct in self.rrmodel.unpaved.unp_nodes.items():
                if np.sum([float(d) for d in dct["ar"].split(" ")]) > 0.0:
                    f.write(
                        "NODE id '"
                        + dct["id"]
                        + "' nm '"
                        + dct["id"]
                        + "' ri '-1' mt 1 '2' nt 44 ObID '3B_UNPAVED' px "
                        + dct["px"]
                        + " py "
                        + dct["py"]
                        + " node\n"
                    )

        # Paved nodes
        if any(self.rrmodel.paved.pav_nodes):
            for _, dct in self.rrmodel.paved.pav_nodes.items():
                if float(dct["ar"]) > 0.0:
                    f.write(
                        "NODE id '"
                        + dct["id"]
                        + "' nm '"
                        + dct["id"]
                        + "' ri '-1' mt 1 '1' nt 43 ObID '3B_PAVED' px "
                        + dct["px"]
                        + " py "
                        + dct["py"]
                        + " node\n"
                    )

            f.write(
                "NODE id 'WWTP' nm 'WWTP' ri '-1' mt 1 '14' nt 56 ObID '3B_WWTP' px "
                + str(self.wwtp.coords[0][0])
                + " py "
                + str(self.wwtp.coords[0][1])
                + " node\n"
            )
            f.write(
                "NODE id 'WWTP_BND' nm 'WWTP_BND' ri '-1' mt 1 '6' nt 47 ObID '3B_BOUNDARY' px "
                + str(self.wwtp.coords[0][0] + 50.0)
                + " py "
                + str(self.wwtp.coords[0][1] + 50.0)
                + " node\n"
            )

        # Greenhouse nodes
        if any(self.rrmodel.greenhouse.gh_nodes):
            for _, dct in self.rrmodel.greenhouse.gh_nodes.items():
                if float(dct["ar"]) > 0.0:
                    f.write(
                        "NODE id '"
                        + dct["id"]
                        + "' nm '"
                        + dct["id"]
                        + "' ri '-1' mt 1 '3' nt 45 ObID '3B_GREENHOUSE' px "
                        + dct["px"]
                        + " py "
                        + dct["py"]
                        + " node\n"
                    )

        # Openwater nodes
        if any(self.rrmodel.openwater.ow_nodes):
            for _, dct in self.rrmodel.openwater.ow_nodes.items():
                if float(dct["ar"]) > 0.0:
                    f.write(
                        "NODE id '"
                        + dct["id"]
                        + "' nm '"
                        + dct["id"]
                        + "' ri '-1' mt 1 '21' nt 46 ObID 'OW_PRECIP' px "
                        + dct["px"]
                        + " py "
                        + dct["py"]
                        + " node\n"
                    )

        # Boundary_nodes
        if any(self.rrmodel.external_forcings.boundary_nodes):
            for _, dct in self.rrmodel.external_forcings.boundary_nodes.items():
                f.write(
                    "NODE id '"
                    + dct["id"]
                    + "' nm '"
                    + dct["id"]
                    + "' ri '-1' mt 1 '6' nt 78 ObID 'SBK_SBK-3B-NODE' px "
                    + dct["px"]
                    + " py "
                    + dct["py"]
                    + " node\n"
                )

    filepath = os.path.join(self.output_dir, "3B_LINK.TP")
    with open(filepath, "w") as f:

        cnt = 0
        if any(self.rrmodel.unpaved.unp_nodes):
            for _, dct in self.rrmodel.unpaved.unp_nodes.items():
                if np.sum([float(d) for d in dct["ar"].split(" ")]) > 0.0:
                    cnt += 1
                    f.write(
                        "BRCH id 'link_"
                        + str(cnt)
                        + "' ri '-1' mt 1 '0' bt 17 ObID '3B_LINK' bn '"
                        + dct["id"]
                        + "' en '"
                        + dct["boundary_node"]
                        + "' brch\n"
                    )
        if any(self.rrmodel.paved.pav_nodes):
            for _, dct in self.rrmodel.paved.pav_nodes.items():
                if float(dct["ar"]) > 0.0:
                    cnt += 1
                    f.write(
                        "BRCH id 'link_"
                        + str(cnt)
                        + "' ri '-1' mt 1 '0' bt 17 ObID '3B_LINK' bn '"
                        + dct["id"]
                        + "' en '"
                        + dct["boundary_node"]
                        + "' brch\n"
                    )
                    cnt += 1
                    f.write(
                        "BRCH id 'link_"
                        + str(cnt)
                        + "' ri '-1' mt 1 '1' bt 18 ObID '3B_LINK_RWZI' bn '"
                        + dct["id"]
                        + "' en 'WWTP' brch\n"
                    )
            cnt += 1
            f.write(
                "BRCH id 'link_"
                + str(cnt)
                + "' ri '-1' mt 1 '0' bt 17 ObID '3B_LINK' bn 'WWTP' en 'WWTP_BND' brch\n"
            )

        if any(self.rrmodel.greenhouse.gh_nodes):
            for _, dct in self.rrmodel.greenhouse.gh_nodes.items():
                if float(dct["ar"]) > 0.0:
                    cnt += 1
                    f.write(
                        "BRCH id 'link_"
                        + str(cnt)
                        + "' ri '-1' mt 1 '0' bt 17 ObID '3B_LINK' bn '"
                        + dct["id"]
                        + "' en '"
                        + dct["boundary_node"]
                        + "' brch\n"
                    )
        if any(self.rrmodel.openwater.ow_nodes):
            for _, dct in self.rrmodel.openwater.ow_nodes.items():
                if float(dct["ar"]) > 0.0:
                    cnt += 1
                    f.write(
                        "BRCH id 'link_"
                        + str(cnt)
                        + "' ri '-1' mt 1 '0' bt 17 ObID '3B_LINK' bn '"
                        + dct["id"]
                        + "' en '"
                        + dct["boundary_node"]
                        + "' brch\n"
                    )

    # bound3b.3b
    filepath = os.path.join(self.output_dir, "Bound3B.3B")
    with open(filepath, "w") as f:
        for _, dct in self.rrmodel.external_forcings.boundary_nodes.items():
            f.write("BOUN id '" + dct["id"] + "' bl 2 '0' is 0 boun\n")
        if any(self.rrmodel.paved.pav_nodes):
            f.write("BOUN id 'WWTP_BND' bl 0 -3 is 0 boun\n")

    # BoundaryConditions.bc
    filepath = os.path.join(self.output_dir, "BoundaryConditions.bc")
    header = {"fileVersion": "1.01", "fileType": "boundConds"}
    with open(filepath, "w") as f:
        self._write_dict(f, header, "General", "\n")
        for _, dct in self.rrmodel.external_forcings.boundary_nodes.items():
            temp = {
                "name": "" + dct["id"],
                "function": "constant",
                "quantity": "water_level",
                "unit": "m",
            }
            self._write_dict(f, temp, "Boundary", "    0\n\n")
        if any(self.rrmodel.paved.pav_nodes):
            temp = {
                "name": "WWTP_BND",
                "function": "constant",
                "quantity": "water_level",
                "unit": "m",
            }
            self._write_dict(f, temp, "Boundary", "    0\n\n")
write_unpaved()

Method to write all files associated with unpaved nodes: UNPAVED.3B, UNPAVED.ALF, UNPAVED.STO, UNPAVED.INF and UNPAVED.SEP. All files contain a definition for every node because they may or may not be spatially distributed.

Returns

None.

Source code in hydrolib/dhydamo/io/drrwriter.py
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
def write_unpaved(self):
    """
    Method to write all files associated with unpaved nodes: UNPAVED.3B, UNPAVED.ALF, UNPAVED.STO, UNPAVED.INF and UNPAVED.SEP. All files contain a definition for every node  because they may or may not be spatially distributed.

    Returns
    -------
    None.

    """
    if any(self.rrmodel.unpaved.unp_nodes):
        filepath = os.path.join(self.output_dir, "UNPAVED.3B")
        with open(filepath, "w") as f:
            for _, dct in self.rrmodel.unpaved.unp_nodes.items():
                if np.sum([float(d) for d in dct["ar"].split(" ")]) > 0.0:
                    f.write(
                        "UNPV id '"
                        + dct["id"]
                        + "' na "
                        + dct["na"]
                        + " ga "
                        + dct["ga"]
                        + " ar "
                        + dct["ar"]
                        + " lv "
                        + dct["lv"]
                        + " co "
                        + dct["co"]
                        + " su "
                        + dct["su"]
                        + " sd 'sto_"
                        + dct["id"]
                        + "' ic 'inf_"
                        + dct["id"]
                        + "' bt "
                        + dct["bt"]
                        + " ed '"
                        + dct["ed"]
                        + "' sp '"
                        + dct["sp"]
                        + "' ig 0 "
                        + dct["ig"]
                        + " mg "
                        + dct["mg"]
                        + " gl "
                        + dct["gl"]
                        + " is "
                        + dct["is"]
                        + " ms '"
                        + dct["ms"]
                        + "' unpv\n"
                    )

        filepath = os.path.join(self.output_dir, "UNPAVED.STO")
        with open(filepath, "w") as f:
            for _, dct in self.rrmodel.unpaved.unp_nodes.items():
                if np.sum([float(d) for d in dct["ar"].split(" ")]) > 0.0:
                    f.write(
                        "STDF id 'sto_"
                        + dct["id"]
                        + "' nm 'sto_"
                        + dct["id"]
                        + "' ml "
                        + dct["sd"]
                        + " il 0 stdf\n"
                    )

        filepath = os.path.join(self.output_dir, "UNPAVED.INF")
        with open(filepath, "w") as f:
            for _, dct in self.rrmodel.unpaved.unp_nodes.items():
                if np.sum([float(d) for d in dct["ar"].split(" ")]) > 0.0:
                    f.write(
                        "INFC id 'inf_"
                        + dct["id"]
                        + "' nm 'inf_"
                        + dct["id"]
                        + "' ic "
                        + dct["ic"]
                        + " infc\n"
                    )

        filepath = os.path.join(self.output_dir, "UNPAVED.ALF")
        with open(filepath, "w") as f:
            for _, dct in self.rrmodel.unpaved.ernst_defs.items():
                if (
                    np.sum(
                        [
                            float(d)
                            for d in self.rrmodel.unpaved.unp_nodes[_]["ar"].split(
                                " "
                            )
                        ]
                    )
                    > 0.0
                ):
                    f.write(
                        "ERNS id '"
                        + dct["id"]
                        + "' nm '"
                        + dct["id"]
                        + "' cvi "
                        + dct["cvi"]
                        + " cvo 0 "
                        + dct["cvo"]
                        + " cvs "
                        + dct["cvs"]
                        + " lv "
                        + dct["lv"]
                        + " erns\n"
                    )

        filepath = os.path.join(self.output_dir, "UNPAVED.SEP")
        with open(filepath, "w") as f:
            for _, dct in self.rrmodel.unpaved.unp_nodes.items():
                if np.sum([float(d) for d in dct["ar"].split(" ")]) > 0.0:
                    f.write(
                        "SEEP id '"
                        + dct["sp"]
                        + "' nm '"
                        + dct["sp"]
                        + "' co 4 PDIN 0 0 pdin ss 0\n"
                    )
                    f.write("TBLE\n")
                    for row in self.rrmodel.external_forcings.seepage[dct["sp"]][
                        "seepage"
                    ].items():
                        f.write(
                            "'"
                            + row[0].strftime("%Y/%m/%d;%H:%M:%S")
                            + "' "
                            + f"{row[1]:.5f} <\n"
                        )
                    f.write("tble\nseep\n")

idfreader

Functions for reading and writing iMOD Data Files (IDFs) to xarray objects.

The primary functions to use are :func:imod.idf.open and :func:imod.idf.save, though lower level functions are also available.

The function in this file are taken from the imod-python package version 0.10.1.

The only functionality needed in dhydamo is the capability to read IDF files.

decompose(path, pattern=None)

Parse a path, returning a dict of the parts, following the iMOD conventions.

Parameters

path : str or pathlib.Path Path to the file. Upper case is ignored. pattern : str, regex pattern, optional If the path is not made up of standard paths, and the default decompose does not produce the right result, specify the used pattern here. See the examples below.

Returns

d : dict Dictionary with name of variable and dimensions

Examples

Decompose a path, relying on default conventions:

decompose("head_20010101_l1.idf")

Do the same, by specifying a format string pattern, excluding extension:

decompose("head_20010101_l1.idf", pattern="{name}_{time}_l{layer}")

This supports an arbitrary number of variables:

decompose("head_slr_20010101_l1.idf", pattern="{name}{scenario}{time}_l{layer}")

The format string pattern will only work on tidy paths, where variables are separated by underscores. You can also pass a compiled regex pattern. Make sure to include the re.IGNORECASE flag since all paths are lowered.

import re pattern = re.compile(r"(?P[\w]+)L(?P[\d+]*)") decompose("headL11", pattern=pattern)

However, this requires constructing regular expressions, which is generally a fiddly process. The website https://regex101.com is a nice help. Alternatively, the most pragmatic solution may be to just rename your files.

Source code in hydrolib/dhydamo/io/idfreader.py
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
def decompose(path, pattern=None):
    r"""
    Parse a path, returning a dict of the parts, following the iMOD conventions.

    Parameters
    ----------
    path : str or pathlib.Path
        Path to the file. Upper case is ignored.
    pattern : str, regex pattern, optional
        If the path is not made up of standard paths, and the default decompose
        does not produce the right result, specify the used pattern here. See
        the examples below.

    Returns
    -------
    d : dict
        Dictionary with name of variable and dimensions

    Examples
    --------
    Decompose a path, relying on default conventions:

    >>> decompose("head_20010101_l1.idf")

    Do the same, by specifying a format string pattern, excluding extension:

    >>> decompose("head_20010101_l1.idf", pattern="{name}_{time}_l{layer}")

    This supports an arbitrary number of variables:

    >>> decompose("head_slr_20010101_l1.idf", pattern="{name}_{scenario}_{time}_l{layer}")

    The format string pattern will only work on tidy paths, where variables are
    separated by underscores. You can also pass a compiled regex pattern.
    Make sure to include the ``re.IGNORECASE`` flag since all paths are lowered.

    >>> import re
    >>> pattern = re.compile(r"(?P<name>[\w]+)L(?P<layer>[\d+]*)")
    >>> decompose("headL11", pattern=pattern)

    However, this requires constructing regular expressions, which is generally
    a fiddly process. The website https://regex101.com is a nice help.
    Alternatively, the most pragmatic solution may be to just rename your files.
    """
    path = pathlib.Path(path)
    # We'll ignore upper case
    stem = path.stem.lower()

    d = _groupdict(stem, pattern)
    dims = list(d.keys())
    # If name is not provided, generate one from other fields
    if "name" not in d.keys():
        d["name"] = "_".join(d.values())
    else:
        dims.remove("name")

    # String -> type conversion
    if "layer" in d.keys():
        d["layer"] = int(d["layer"])
    if "species" in d.keys():
        d["species"] = int(d["species"])
    if "time" in d.keys():
        d["time"] = to_datetime(d["time"])
    if "steady-state" in d["name"]:
        # steady-state as time identifier isn't picked up by <time>[0-9] regex
        d["name"] = d["name"].replace("_steady-state", "")
        d["time"] = "steady-state"
        dims.append("time")

    d["extension"] = path.suffix
    d["directory"] = path.parent
    d["dims"] = dims
    return d

header(path, pattern)

Read the IDF header information into a dictionary

Source code in hydrolib/dhydamo/io/idfreader.py
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
def header(path, pattern):
    """Read the IDF header information into a dictionary"""
    attrs = decompose(path, pattern)
    with f_open(path, "rb") as f:
        reclen_id = struct.unpack("i", f.read(4))[0]  # Lahey RecordLength Ident.
        if reclen_id == 1271:
            floatsize = intsize = 4
            floatformat = "f"
            intformat = "i"
            dtype = "float32"
            doubleprecision = False
        # 2296 was a typo in the iMOD manual. Keep 2296 around in case some IDFs
        # were written with this identifier to avoid possible incompatibility
        # issues.
        elif reclen_id == 2295 or reclen_id == 2296:
            floatsize = intsize = 8
            floatformat = "d"
            intformat = "q"
            dtype = "float64"
            doubleprecision = True
        else:
            raise ValueError(
                f"Not a supported IDF file: {path}\n"
                "Record length identifier should be 1271 or 2295, "
                f"received {reclen_id} instead."
            )

        # Header is fully doubled in size in case of double precision ...
        # This means integers are also turned into 8 bytes
        # and requires padding with some additional bytes
        if doubleprecision:
            f.read(4)  # not used

        ncol = struct.unpack(intformat, f.read(intsize))[0]
        nrow = struct.unpack(intformat, f.read(intsize))[0]
        attrs["xmin"] = struct.unpack(floatformat, f.read(floatsize))[0]
        attrs["xmax"] = struct.unpack(floatformat, f.read(floatsize))[0]
        attrs["ymin"] = struct.unpack(floatformat, f.read(floatsize))[0]
        attrs["ymax"] = struct.unpack(floatformat, f.read(floatsize))[0]
        # dmin and dmax are recomputed during writing
        f.read(floatsize)  # dmin, minimum data value present
        f.read(floatsize)  # dmax, maximum data value present
        nodata = struct.unpack(floatformat, f.read(floatsize))[0]
        attrs["nodata"] = nodata
        # flip definition here such that True means equidistant
        # equidistant IDFs
        ieq = not struct.unpack("?", f.read(1))[0]
        itb = struct.unpack("?", f.read(1))[0]

        f.read(2)  # not used
        if doubleprecision:
            f.read(4)  # not used

        if ieq:
            # dx and dy are stored positively in the IDF
            # dy is made negative here to be consistent with the nonequidistant case
            attrs["dx"] = struct.unpack(floatformat, f.read(floatsize))[0]
            attrs["dy"] = -struct.unpack(floatformat, f.read(floatsize))[0]

        if itb:
            attrs["top"] = struct.unpack(floatformat, f.read(floatsize))[0]
            attrs["bot"] = struct.unpack(floatformat, f.read(floatsize))[0]

        if not ieq:
            # dx and dy are stored positive in the IDF, but since the difference between
            # successive y coordinates is negative, it is made negative here
            attrs["dx"] = np.fromfile(f, dtype, ncol)
            attrs["dy"] = -np.fromfile(f, dtype, nrow)

        # These are derived, remove after using them downstream
        attrs["headersize"] = f.tell()
        attrs["ncol"] = ncol
        attrs["nrow"] = nrow
        attrs["dtype"] = dtype

    return attrs

open(path, use_cftime=False, pattern=None)

Open one or more IDF files as an xarray.DataArray.

In accordance with xarray's design, open loads the data of IDF files lazily. This means the data of the IDFs are not loaded into memory until the data is needed. This allows for easier handling of large datasets, and more efficient computations.

Parameters

path : str, Path or list This can be a single file, 'head_l1.idf', a glob pattern expansion, 'head_l*.idf', or a list of files, ['head_l1.idf', 'head_l2.idf']. Note that each file needs to be of the same name (part before the first underscore) but have a different layer and/or timestamp, such that they can be combined in a single xarray.DataArray. use_cftime : bool, optional Use cftime.DatetimeProlepticGregorian instead of np.datetime64[ns] for the time axis.

Dates are normally encoded as ``np.datetime64[ns]``; however, if dates
fall before 1678 or after 2261, they are automatically encoded as
``cftime.DatetimeProlepticGregorian`` objects rather than
``np.datetime64[ns]``.

pattern : str, regex pattern, optional If the filenames do match default naming conventions of {name}_{time}_l{layer}, a custom pattern can be defined here either as a string, or as a compiled regular expression pattern. See the examples below.

Returns

xarray.DataArray A float xarray.DataArray of the values in the IDF file(s). All metadata needed for writing the file to IDF or other formats using imod.rasterio are included in the xarray.DataArray.attrs.

Examples

Open an IDF file:

da = imod.idf.open("example.idf")

Open an IDF file, relying on default naming conventions to identify layer:

da = imod.idf.open("example_l1.idf")

Open an IDF file, relying on default naming conventions to identify layer and time:

head = imod.idf.open("head_20010101_l1.idf")

Open multiple IDF files, in this case files for the year 2001 for all layers, again relying on default conventions for naming:

head = imod.idf.open("head_2001_l.idf")

The same, this time explicitly specifying name, time, and layer:

head = imod.idf.open("head_2001_l.idf", pattern="{name}_{time}_l{layer}")

The format string pattern will only work on tidy paths, where variables are separated by underscores. You can also pass a compiled regex pattern. Make sure to include the re.IGNORECASE flag since all paths are lowered.

import re pattern = re.compile(r"(?P[\w]+)L(?P[\d+]*)", re.IGNORECASE) head = imod.idf.open("headL11", pattern=pattern)

However, this requires constructing regular expressions, which is generally a fiddly process. Regex notation is also impossible to remember. The website https://regex101.com is a nice help. Alternatively, the most pragmatic solution may be to just rename your files.

Source code in hydrolib/dhydamo/io/idfreader.py
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
def open(path, use_cftime=False, pattern=None):
    r"""
    Open one or more IDF files as an xarray.DataArray.

    In accordance with xarray's design, ``open`` loads the data of IDF files
    lazily. This means the data of the IDFs are not loaded into memory until the
    data is needed. This allows for easier handling of large datasets, and
    more efficient computations.

    Parameters
    ----------
    path : str, Path or list
        This can be a single file, 'head_l1.idf', a glob pattern expansion,
        'head_l*.idf', or a list of files, ['head_l1.idf', 'head_l2.idf'].
        Note that each file needs to be of the same name (part before the
        first underscore) but have a different layer and/or timestamp,
        such that they can be combined in a single xarray.DataArray.
    use_cftime : bool, optional
        Use ``cftime.DatetimeProlepticGregorian`` instead of `np.datetime64[ns]`
        for the time axis.

        Dates are normally encoded as ``np.datetime64[ns]``; however, if dates
        fall before 1678 or after 2261, they are automatically encoded as
        ``cftime.DatetimeProlepticGregorian`` objects rather than
        ``np.datetime64[ns]``.
    pattern : str, regex pattern, optional
        If the filenames do match default naming conventions of
        {name}_{time}_l{layer}, a custom pattern can be defined here either
        as a string, or as a compiled regular expression pattern. See the
        examples below.

    Returns
    -------
    xarray.DataArray
        A float xarray.DataArray of the values in the IDF file(s).
        All metadata needed for writing the file to IDF or other formats
        using imod.rasterio are included in the xarray.DataArray.attrs.

    Examples
    --------
    Open an IDF file:

    >>> da = imod.idf.open("example.idf")

    Open an IDF file, relying on default naming conventions to identify
    layer:

    >>> da = imod.idf.open("example_l1.idf")

    Open an IDF file, relying on default naming conventions to identify layer
    and time:

    >>> head = imod.idf.open("head_20010101_l1.idf")

    Open multiple IDF files, in this case files for the year 2001 for all
    layers, again relying on default conventions for naming:

    >>> head = imod.idf.open("head_2001*_l*.idf")

    The same, this time explicitly specifying ``name``, ``time``, and ``layer``:

    >>> head = imod.idf.open("head_2001*_l*.idf", pattern="{name}_{time}_l{layer}")

    The format string pattern will only work on tidy paths, where variables are
    separated by underscores. You can also pass a compiled regex pattern.
    Make sure to include the ``re.IGNORECASE`` flag since all paths are lowered.

    >>> import re
    >>> pattern = re.compile(r"(?P<name>[\w]+)L(?P<layer>[\d+]*)", re.IGNORECASE)
    >>> head = imod.idf.open("headL11", pattern=pattern)

    However, this requires constructing regular expressions, which is
    generally a fiddly process. Regex notation is also impossible to
    remember. The website https://regex101.com is a nice help. Alternatively,
    the most pragmatic solution may be to just rename your files.
    """

    if isinstance(path, list):
        return _load(path, use_cftime, pattern, _read, header)
    elif isinstance(path, pathlib.Path):
        path = str(path)

    paths = [pathlib.Path(p) for p in glob.glob(path)]
    n = len(paths)
    if n == 0:
        raise FileNotFoundError(f"Could not find any files matching {path}")
    return _load(paths, use_cftime, pattern, _read, header)

read(path, pattern=None)

Read a single IDF file to a numpy.ndarray

Parameters

path : str or Path Path to the IDF file to be read pattern : str, regex pattern, optional If the filenames do match default naming conventions of {name}_{time}_l{layer}, a custom pattern can be defined here either as a string, or as a compiled regular expression pattern. Please refer to the examples for imod.idf.open.

Returns

numpy.ndarray A float numpy.ndarray with shape (nrow, ncol) of the values in the IDF file. On opening all nodata values are changed to NaN in the numpy.ndarray. dict A dict with all metadata.

Source code in hydrolib/dhydamo/io/idfreader.py
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
def read(path, pattern=None):
    """
    Read a single IDF file to a numpy.ndarray

    Parameters
    ----------
    path : str or Path
        Path to the IDF file to be read
    pattern : str, regex pattern, optional
        If the filenames do match default naming conventions of
        {name}_{time}_l{layer}, a custom pattern can be defined here either
        as a string, or as a compiled regular expression pattern. Please refer
        to the examples for ``imod.idf.open``.

    Returns
    -------
    numpy.ndarray
        A float numpy.ndarray with shape (nrow, ncol) of the values
        in the IDF file. On opening all nodata values are changed
        to NaN in the numpy.ndarray.
    dict
        A dict with all metadata.
    """
    warnings.warn(
        "The idf.read() function is deprecated. To get a numpy array of an IDF, "
        "use instead: imod.idf.open(path).values",
        FutureWarning,
    )
    attrs = header(path, pattern)
    headersize = attrs.pop("headersize")
    nrow = attrs.pop("nrow")
    ncol = attrs.pop("ncol")
    nodata = attrs.pop("nodata")
    dtype = attrs.pop("dtype")
    return _read(path, headersize, nrow, ncol, nodata, dtype), attrs