Source code for hydromt_wflow.utils

"""Some utilities from the Wflow plugin."""
from pathlib import Path
from typing import Dict, Union

import numpy as np
import xarray as xr
from hydromt.io import open_timeseries_from_table
from hydromt.vector import GeoDataArray

__all__ = ["read_csv_results"]


[docs] def read_csv_results(fn: Union[str, Path], config: Dict, maps: xr.Dataset) -> Dict: """Read wflow results csv timeseries and parse to dictionnary. Parses the wflow csv results file into different ``hydromt.GeoDataArrays``, one per column (csv section and csv.column sections of the TOML). The xy coordinates are the coordinates of the station or of the representative point of the subcatch/area. The variable name in the ``GeoDataArray`` corresponds to the csv header attribute or header_map when available. Parameters ---------- fn: str Path to the wflow csv results file. config: dict wflow.toml configuration. maps: xr.Dataset wflow staticmaps.nc dataset Returns ------- csv_dict: dict Dictionnary of hydromt.GeoDataArrays for the different csv.column section \ of the config. """ # Count items by csv.column count = 1 csv_dict = dict() # Loop over csv.column for col in config["csv"].get("column"): header = col["header"] # Column based on map if "map" in col.keys(): # Read the corresponding map and derive the different locations # The centroid of the geometry is used as coordinates for the timeseries map_name = config["input"].get(f"{col['map']}") da = maps[map_name] gdf = da.raster.vectorize() gdf.geometry = gdf.geometry.representative_point() gdf.index = gdf.value.astype(da.dtype) gdf.index.name = "index" # Read the timeseries usecols = [0] usecols = np.append(usecols, np.arange(count, count + len(gdf.index))) count += len(gdf.index) da_ts = open_timeseries_from_table( fn, name=f'{header}_{col["map"]}', usecols=usecols ) da = GeoDataArray.from_gdf(gdf, da_ts, index_dim="index") # Column based on xy coordinates or reducer for the full model domain domain else: # Read the timeseries usecols = [0] usecols = np.append(usecols, np.arange(count, count + 1)) count += 1 try: da_ts = open_timeseries_from_table(fn, name=header, usecols=usecols) except Exception: colnames = ["time", "0"] da_ts = open_timeseries_from_table( fn, name=header, usecols=usecols, header=0, names=colnames, ) # Add point coordinates # Column based on xy coordinates if "coordinate" in col.keys(): scoords = { "x": xr.IndexVariable("index", [col["coordinate"]["x"]]), "y": xr.IndexVariable("index", [col["coordinate"]["y"]]), } # Column based on index elif "index" in col.keys(): # x and y index, works on the full 2D grid if isinstance(col["index"], dict): # index in julia starts at 1 # coordinates are always ascending xi = maps.raster.xcoords.values[col["index"]["x"] - 1] yi = np.sort(maps.raster.ycoords.values)[col["index"]["y"] - 1] scoords = { "x": xr.IndexVariable("index", [xi]), "y": xr.IndexVariable("index", [yi]), } # index of the full array else: # Create grid with full 2D Julia indices # Dimensions are ascending and ordered as (x,y,layer,time) # Indices are created before ordering for compatibility with # raster.idx_to_xy full_index = maps[f'{config["input"].get("subcatchment")}'].copy() res_x, res_y = full_index.raster.res if res_y < 0: full_index = full_index.reindex( { full_index.raster.y_dim: full_index[ full_index.raster.y_dim ][::-1] } ) data = np.arange(0, np.size(full_index)).reshape( np.size(full_index, 0), np.size(full_index, 1) ) full_index[:, :] = data full_index = full_index.transpose( full_index.raster.x_dim, full_index.raster.y_dim ) # Index depends on the struct # For land uses the active subcatch IDs if ( "vertical" in col["parameter"] or "lateral.land" in col["parameter"] ): mask = maps[f'{config["input"].get("subcatchment")}'].copy() elif "reservoir" in col["parameter"]: mask = maps[ f'{config["input"]["lateral"]["river"]["reservoir"].get("locs")}' ].copy() elif "lake" in col["parameter"]: mask = maps[ f'{config["input"]["lateral"]["river"]["lake"].get("locs")}' ].copy() # Else lateral.river else: mask = maps[f'{config["input"].get("river_location")}'].copy() # Rearrange the mask res_x, res_y = mask.raster.res if res_y < 0: mask = mask.reindex( {mask.raster.y_dim: mask[mask.raster.y_dim][::-1]} ) mask = mask.transpose(mask.raster.x_dim, mask.raster.y_dim) # Filter and reduce full_index based on mask full_index = full_index.where(mask != mask.raster.nodata, 0) full_index.attrs.update(_FillValue=0) mask_index = full_index.values.flatten() mask_index = mask_index[mask_index != 0] # idx corresponding to the wflow index idx = mask_index[col["index"] - 1] # Reorder full_index as (y,x) to use raster.idx_to_xy method xi, yi = full_index.transpose( full_index.raster.y_dim, full_index.raster.x_dim ).raster.idx_to_xy(idx) scoords = { "x": xr.IndexVariable("index", xi), "y": xr.IndexVariable("index", yi), } # Based on model bbox center for column based on reducer for # the full model domain else: xmin, ymin, xmax, ymax = maps.raster.bounds scoords = { "x": xr.IndexVariable("index", [(xmax + xmin) / 2]), "y": xr.IndexVariable("index", [(ymax + ymin) / 2]), } da = da_ts.assign_coords(scoords) csv_dict[f"{da.name}"] = da return csv_dict