Source code for hydromt_wflow.utils

"""Some utilities from the Wflow plugin."""
from os.path import abspath, join
from pathlib import Path
from typing import Dict, Optional, Union

import numpy as np
import xarray as xr
from hydromt.io import open_timeseries_from_table
from hydromt.vector import GeoDataArray
from hydromt.workflows.grid import grid_from_constant

__all__ = ["read_csv_results", "get_config", "get_grid_from_config"]


[docs] def read_csv_results(fn: Union[str, Path], config: Dict, maps: xr.Dataset) -> Dict: """Read wflow results csv timeseries and parse to dictionnary. Parses the wflow csv results file into different ``hydromt.GeoDataArrays``, one per column (csv section and csv.column sections of the TOML). The xy coordinates are the coordinates of the station or of the representative point of the subcatch/area. The variable name in the ``GeoDataArray`` corresponds to the csv header attribute or header_map when available. Parameters ---------- fn: str Path to the wflow csv results file. config: dict wflow.toml configuration. maps: xr.Dataset wflow staticmaps.nc dataset Returns ------- csv_dict: dict Dictionnary of hydromt.GeoDataArrays for the different csv.column section \ of the config. """ # Count items by csv.column count = 1 csv_dict = dict() # Loop over csv.column for col in config["csv"].get("column"): header = col["header"] # Column based on map if "map" in col.keys(): # Read the corresponding map and derive the different locations # The centroid of the geometry is used as coordinates for the timeseries map_name = config["input"].get(f"{col['map']}") da = maps[map_name] gdf = da.raster.vectorize() gdf.geometry = gdf.geometry.representative_point() gdf.index = gdf.value.astype(da.dtype) gdf.index.name = "index" # Read the timeseries usecols = [0] usecols = np.append(usecols, np.arange(count, count + len(gdf.index))) count += len(gdf.index) da_ts = open_timeseries_from_table( fn, name=f'{header}_{col["map"]}', usecols=usecols ) da = GeoDataArray.from_gdf(gdf, da_ts, index_dim="index") # Column based on xy coordinates or reducer for the full model domain domain else: # Read the timeseries usecols = [0] usecols = np.append(usecols, np.arange(count, count + 1)) count += 1 try: da_ts = open_timeseries_from_table(fn, name=header, usecols=usecols) except Exception: colnames = ["time", "0"] da_ts = open_timeseries_from_table( fn, name=header, usecols=usecols, header=0, names=colnames, ) # Add point coordinates # Column based on xy coordinates if "coordinate" in col.keys(): scoords = { "x": xr.IndexVariable("index", [col["coordinate"]["x"]]), "y": xr.IndexVariable("index", [col["coordinate"]["y"]]), } # Column based on index elif "index" in col.keys(): # x and y index, works on the full 2D grid if isinstance(col["index"], dict): # index in julia starts at 1 # coordinates are always ascending xi = maps.raster.xcoords.values[col["index"]["x"] - 1] yi = np.sort(maps.raster.ycoords.values)[col["index"]["y"] - 1] scoords = { "x": xr.IndexVariable("index", [xi]), "y": xr.IndexVariable("index", [yi]), } # index of the full array else: # Create grid with full 2D Julia indices # Dimensions are ascending and ordered as (x,y,layer,time) # Indices are created before ordering for compatibility with # raster.idx_to_xy full_index = maps[f'{config["input"].get("subcatchment")}'].copy() res_x, res_y = full_index.raster.res if res_y < 0: full_index = full_index.reindex( { full_index.raster.y_dim: full_index[ full_index.raster.y_dim ][::-1] } ) data = np.arange(0, np.size(full_index)).reshape( np.size(full_index, 0), np.size(full_index, 1) ) full_index[:, :] = data full_index = full_index.transpose( full_index.raster.x_dim, full_index.raster.y_dim ) # Index depends on the struct # For land uses the active subcatch IDs if ( "vertical" in col["parameter"] or "lateral.land" in col["parameter"] ): mask = maps[f'{config["input"].get("subcatchment")}'].copy() elif "reservoir" in col["parameter"]: mask = maps[ f'{config["input"]["lateral"]["river"]["reservoir"].get("locs")}' ].copy() elif "lake" in col["parameter"]: mask = maps[ f'{config["input"]["lateral"]["river"]["lake"].get("locs")}' ].copy() # Else lateral.river else: mask = maps[f'{config["input"].get("river_location")}'].copy() # Rearrange the mask res_x, res_y = mask.raster.res if res_y < 0: mask = mask.reindex( {mask.raster.y_dim: mask[mask.raster.y_dim][::-1]} ) mask = mask.transpose(mask.raster.x_dim, mask.raster.y_dim) # Filter and reduce full_index based on mask full_index = full_index.where(mask != mask.raster.nodata, 0) full_index.attrs.update(_FillValue=0) mask_index = full_index.values.flatten() mask_index = mask_index[mask_index != 0] # idx corresponding to the wflow index idx = mask_index[col["index"] - 1] # Reorder full_index as (y,x) to use raster.idx_to_xy method xi, yi = full_index.transpose( full_index.raster.y_dim, full_index.raster.x_dim ).raster.idx_to_xy(idx) scoords = { "x": xr.IndexVariable("index", xi), "y": xr.IndexVariable("index", yi), } # Based on model bbox center for column based on reducer for # the full model domain else: xmin, ymin, xmax, ymax = maps.raster.bounds scoords = { "x": xr.IndexVariable("index", [(xmax + xmin) / 2]), "y": xr.IndexVariable("index", [(ymax + ymin) / 2]), } da = da_ts.assign_coords(scoords) csv_dict[f"{da.name}"] = da return csv_dict
def get_config( *args, config: Dict = {}, fallback=None, root: Path = None, abs_path: Optional[bool] = False, ): """ Get a config value at key(s). Copy of hydromt.Model.get_config method to parse config outside of Model functions. See Also -------- hydromt.Model.get_config Parameters ---------- args : tuple or string keys can given by multiple args: ('key1', 'key2') or a string with '.' indicating a new level: ('key1.key2') config : dict, optional config dict to get the values from. fallback: any, optional fallback value if key(s) not found in config, by default None. abs_path: bool, optional If True return the absolute path relative to the model root, by deafult False. Returns ------- value : any type dictionary value """ args = list(args) if len(args) == 1 and "." in args[0]: args = args[0].split(".") + args[1:] branch = config.copy() # reads config at first call for key in args[:-1]: branch = branch.get(key, {}) if not isinstance(branch, dict): branch = dict() break value = branch.get(args[-1], fallback) if abs_path and isinstance(value, str): value = Path(value) if not value.is_absolute(): if root is None: raise ValueError( "root path is required to get absolute path from relative path" ) value = Path(abspath(join(root, value))) return value
[docs] def get_grid_from_config( *args, config: Dict = {}, grid: xr.Dataset = None, fallback=None, root: Path = None, abs_path: Optional[bool] = False, nodata: Union[int, float] = -9999, mask_name: Optional[str] = None, ) -> xr.DataArray: """ Get actual grid values from config including scale and offset. Calls get_config and applies value, scale and offset if available. Parameters ---------- args : tuple or string keys can given by multiple args: ('key1', 'key2') or a string with '.' indicating a new level: ('key1.key2') config : dict config dict to get the values from. grid : xr.Dataset grid dataset to get the values from. fallback: any, optional fallback value if key(s) not found in config, by default None. abs_path: bool, optional If True return the absolute path relative to the model root, by default False. nodata: int or float, optional nodata value to use for the DataArray, by default -9999. Used only if the variable in config is described using value only. mask_name: str, optional Name of the mask variable in grid. Used only if the variable in config is described using value only. Returns ------- da : xr.DataArray DataArray with actual grid values See Also -------- get_config hydromt.workflows.grid.grid_from_constant """ # get config value var = get_config( *args, config=config, fallback=fallback, root=root, abs_path=abs_path, ) # direct map in grid if isinstance(var, str): if var in grid: da = grid[var] else: raise ValueError(f"grid variable {var} not found in staticmaps.") else: # dict type # constant value in config if "value" in var: da = grid_from_constant( grid, constant=var["value"], name=args[-1], nodata=nodata, mask_name=mask_name, ) # else scale and offset else: var_name = get_config("netcdf.variable.name", config=var) scale = var.get("scale", 1.0) offset = var.get("offset", 0.0) # apply scale and offset if var_name not in grid: raise ValueError(f"grid variable {var_name} not found in staticmaps.") da = grid[var_name] * scale + offset return da