Source code for hydromt.models.model_api

# -*- coding: utf-8 -*-
"""General and basic API for models in HydroMT."""

import glob
import inspect
import logging
import os
import typing
import warnings
from abc import ABCMeta
from os.path import abspath, basename, dirname, isabs, isdir, isfile, join
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple, Union

import geopandas as gpd
import numpy as np
import pandas as pd
import xarray as xr
from geopandas.testing import assert_geodataframe_equal
from pyproj import CRS
from shapely.geometry import box

from .. import config, log, workflows
from ..data_catalog import DataCatalog
from ..raster import GEO_MAP_COORD

__all__ = ["Model"]

logger = logging.getLogger(__name__)


[docs]class Model(object, metaclass=ABCMeta): """General and basic API for models in HydroMT.""" # FIXME _DATADIR = "" # path to the model data folder _NAME = "modelname" _CONF = "model.ini" _CF = dict() # configreader kwargs _GEOMS = {"<general_hydromt_name>": "<model_name>"} _MAPS = {"<general_hydromt_name>": "<model_name>"} _FOLDERS = [""] # tell hydroMT which methods should receive the res and region arguments # TODO: change it back to setup_region and no res --> deprecation _CLI_ARGS = {"region": "setup_basemaps", "res": "setup_basemaps"} _API = { "crs": CRS, "config": Dict[str, Any], "geoms": Dict[str, gpd.GeoDataFrame], "maps": Dict[str, Union[xr.DataArray, xr.Dataset]], "forcing": Dict[str, Union[xr.DataArray, xr.Dataset]], "region": gpd.GeoDataFrame, "results": Dict[str, Union[xr.DataArray, xr.Dataset]], "states": Dict[str, Union[xr.DataArray, xr.Dataset]], }
[docs] def __init__( self, root: Optional[str] = None, mode: Optional[str] = "w", config_fn: Optional[str] = None, data_libs: Union[List, str] = [], logger=logger, **artifact_keys, ): """Initialize a model. Parameters ---------- root : str, optional Model root, by default None mode : {'r','r+','w'}, optional read/append/write mode, by default "w" config_fn : str, optional Model simulation configuration file, by default None. Note that this is not the HydroMT model setup configuration file! data_libs : List[str], optional List of data catalog yaml files, by default None **artifact_keys: Additional keyword arguments to be passed down. logger: The logger to be used. """ from . import MODELS # avoid circular import self.logger = logger dist, version = "unknown", "NA" if self._NAME in MODELS: ep = MODELS[self._NAME] dist, version = ep.distro.name, ep.distro.version # link to data self.data_catalog = DataCatalog( data_libs=data_libs, logger=self.logger, **artifact_keys ) # placeholders # metadata maps that can be at different resolutions # TODO do we want read/write maps? self._config = dict() # nested dictionary self._maps = dict() # dictionary of xr.DataArray and/or xr.Dataset # NOTE was staticgeoms in <=v0.5 self._geoms = dict() # dictionary of gdp.GeoDataFrame self._forcing = dict() # dictionary of xr.DataArray and/or xr.Dataset self._states = dict() # dictionary of xr.DataArray and/or xr.Dataset self._results = dict() # dictionary of xr.DataArray and/or xr.Dataset # To be deprecated in future versions! self._staticmaps = xr.Dataset() self._staticgeoms = dict() # file system self._root = "" self._read = True self._write = False # model paths self._config_fn = self._CONF if config_fn is None else config_fn self.set_root(root, mode) # also creates hydromt.log file self.logger.info(f"Initializing {self._NAME} model from {dist} (v{version}).")
@property def api(self) -> Dict: """Return all model components and their data types.""" _api = self._API.copy() # loop over parent and mixin classes and update API for base_cls in self.__class__.__bases__: _api.update(getattr(base_cls, "_API", {})) return _api def _check_get_opt(self, opt): """Check all opt keys and raise sensible error messages if unknown.""" for method in opt.keys(): m = method.strip("0123456789") if not callable(getattr(self, m, None)): raise ValueError(f'Model {self._NAME} has no method "{method}"') return opt def _run_log_method(self, method, *args, **kwargs): """Log method parameters before running a method.""" method = method.strip("0123456789") func = getattr(self, method) signature = inspect.signature(func) # combine user and default options params = {} for i, (k, v) in enumerate(signature.parameters.items()): if k in ["args", "kwargs"]: if k == "args": params[k] = args[i:] else: params.update(**kwargs) else: v = kwargs.get(k, v.default) if len(args) > i: v = args[i] params[k] = v # log options for k, v in params.items(): if v is not inspect._empty: self.logger.info(f"{method}.{k}: {v}") return func(*args, **kwargs)
[docs] def build( self, region: Optional[dict] = None, write: Optional[bool] = True, opt: Optional[dict] = {}, ): """Single method to build a model from scratch based on settings in `opt`. Methods will be run one by one based on the order of appearance in `opt` (.ini configuration file). All model methods are supported including setup_*, read_* and write_* methods. If a write_* option is listed in `opt` (ini file) the full writing of the model at the end of the update process is skipped. Parameters ---------- region: dict Description of model region. See :py:meth:`~hydromt.workflows.parse_region` for all options. write: bool, optional Write complete model after executing all methods in opt, by default True. opt: dict, optional Model build configuration. The configuration can be parsed from a .ini file using :py:meth:`~hydromt.config.configread`. This is a nested dictionary where the first-level keys are the names of model specific (setup) methods and the second-level contain argument-value pairs of the method. .. code-block:: text { <name of method1>: { <argument1>: <value1>, <argument2>: <value2> }, <name of method2>: { ... } } """ opt = self._check_get_opt(opt) # merge cli region and res arguments with opt if region is not None: if self._CLI_ARGS["region"] not in opt: opt = {self._CLI_ARGS["region"]: {}, **opt} opt[self._CLI_ARGS["region"]].update(region=region) # then loop over other methods for method in opt: # if any write_* functions are present in opt, skip the final self.write() if method.startswith("write_"): write = False kwargs = {} if opt[method] is None else opt[method] self._run_log_method(method, **kwargs) # write if write: self.write()
[docs] def update( self, model_out: Optional[Union[str, Path]] = None, write: Optional[bool] = True, opt: Dict = {}, ): """Single method to update a model based the settings in `opt`. Methods will be run one by one based on the order of appearance in `opt` (ini configuration file). All model methods are supported including setup_*, read_* and write_* methods. If a write_* option is listed in `opt` (ini file) the full writing of the model at the end of the update process is skipped. Parameters ---------- model_out: str, path, optional Destination folder to write the model schematization after updating the model. If None the updated model components are overwritten in the current model schematization if these exist. By default None. write: bool, optional Write the updated model schematization to disk. By default True. opt: dict, optional Model build configuration. The configuration can be parsed from a .ini file using :py:meth:`~hydromt.config.configread`. This is a nested dictionary where the first-level keys are the names of model specific (setup) methods and the second-level contain argument-value pairs of the method. .. code-block:: text { <name of method1>: { <argument1>: <value1>, <argument2>: <value2> }, <name of method2>: { ... } } """ opt = self._check_get_opt(opt) # read current model if not self._write: if model_out is None: raise ValueError( '"model_out" directory required when updating in "read-only" mode' ) self.read() self.set_root(model_out, mode="w") # check if model has a region if self.region is None: raise ValueError("Model region not found, setup model using `build` first.") # remove setup_basemaps from options and throw warning method = self._CLI_ARGS["region"] if method in opt: opt.pop(method) # remove from opt self.logger.warning(f'"{method}" can only be called when building a model.') # loop over other methods from ini file for method in opt: # if any write_* functions are present in opt, skip the final self.write() if method.startswith("write_"): write = False kwargs = {} if opt[method] is None else opt[method] self._run_log_method(method, **kwargs) # write if write: self.write()
## general setup methods
[docs] def setup_region( self, region: dict, hydrography_fn: str = "merit_hydro", basin_index_fn: str = "merit_hydro_index", ) -> dict: """Set the `region` of interest of the model. Adds model layer: * **region** geom: region boundary vector Parameters ---------- region : dict Dictionary describing region of interest, e.g.: * {'bbox': [xmin, ymin, xmax, ymax]} * {'geom': 'path/to/polygon_geometry'} * {'basin': [xmin, ymin, xmax, ymax]} * {'subbasin': [x, y], '<variable>': threshold} For a complete overview of all region options, see :py:function:~hydromt.workflows.basin_mask.parse_region hydrography_fn : str Name of data source for hydrography data. FIXME describe data requirements basin_index_fn : str Name of data source with basin (bounding box) geometries associated with the 'basins' layer of `hydrography_fn`. Only required if the `region` is based on a (sub)(inter)basins without a 'bounds' argument. Returns ------- region: dict Parsed region dictionary See Also -------- hydromt.workflows.basin_mask.parse_region """ kind, region = workflows.parse_region(region, logger=self.logger) # NOTE: kind=outlet is deprecated! if kind in ["basin", "subbasin", "interbasin", "outlet"]: # retrieve global hydrography data (lazy!) ds_org = self.data_catalog.get_rasterdataset(hydrography_fn) if "bounds" not in region: region.update(basin_index=self.data_catalog[basin_index_fn]) # get basin geometry geom, xy = workflows.get_basin_geometry( ds=ds_org, kind=kind, logger=self.logger, **region, ) region.update(xy=xy) elif "bbox" in region: bbox = region["bbox"] geom = gpd.GeoDataFrame(geometry=[box(*bbox)], crs=4326) elif "geom" in region: geom = region["geom"] if geom.crs is None: raise ValueError('Model region "geom" has no CRS') elif "grid" in region: # Grid specific - should be removed in the future geom = region["grid"].raster.box elif "model" in region: geom = region["model"].region else: raise ValueError(f"model region argument not understood: {region}") self.set_geoms(geom, name="region") # This setup method returns region so that it can be wrapped for models which # require more information, e.g. grid RasterDataArray or xy coordinates. return region
# TODO remove placeholder to make make sure # build with the current _CLI_ARGS does not raise an error def setup_basemaps(self, *args, **kwargs): # noqa: D102 warnings.warn( "The setup_basemaps method is not implemented.", UserWarning, ) ## file system @property def root(self): """Path to model folder.""" if self._root is None: raise ValueError("Root unknown, use set_root method") return self._root @property def _assert_write_mode(self): if not self._write: raise IOError("Model opened in read-only mode") @property def _assert_read_mode(self): if not self._read: raise IOError("Model opened in write-only mode")
[docs] def set_root(self, root: Optional[str], mode: Optional[str] = "w"): """Initialize the model root. In read/append mode a check is done if the root exists. In write mode the required model folder structure is created. Parameters ---------- root : str, optional path to model root mode : {"r", "r+", "w"}, optional read/append/write mode for model files """ ignore_ext = set([".log", ".yml"]) if mode not in ["r", "r+", "w", "w+"]: raise ValueError( f'mode "{mode}" unknown, select from "r", "r+", "w" or "w+"' ) self._root = root if root is None else abspath(root) self._read = mode.startswith("r") self._write = mode != "r" self._overwrite = mode == "w+" if root is not None: if self._write: for name in self._FOLDERS: path = join(self._root, name) if not isdir(path): os.makedirs(path) continue # path already exists check files fns = glob.glob(join(path, "*.*")) exts = set([os.path.splitext(fn)[1] for fn in fns]) exts -= ignore_ext if len(exts) != 0: if mode.endswith("+"): self.logger.warning( "Model dir already exists and " f"files might be overwritten: {path}." ) else: msg = ( "Model dir already exists and cannot be " + f"overwritten: {path}. Use 'mode=w+' to force " + "overwrite existing files." ) self.logger.error(msg) raise IOError(msg) # check directory elif not isdir(self._root): raise IOError(f'model root not found at "{self._root}"') # remove old logging file handler and add new filehandler # in root if it does not exist has_log_file = False log_level = 20 # default, but overwritten by the level of active loggers for i, h in enumerate(self.logger.handlers): log_level = h.level if hasattr(h, "baseFilename"): if dirname(h.baseFilename) != self._root: self.logger.handlers.pop( i ).close() # remove handler and close file else: has_log_file = True break if not has_log_file: new_path = join(self._root, "hydromt.log") log.add_filehandler(self.logger, new_path, log_level)
# I/O
[docs] def read( self, components: List = [ "config", "staticmaps", "maps", "geoms", "forcing", "states", "results", ], ) -> None: """Read the complete model schematization and configuration from model files. Parameters ---------- components : List, optional List of model components to read, each should have an associated read_<component> method. By default ['config', 'maps', 'staticmaps', 'geoms', 'forcing', 'states', 'results'] """ self.logger.info(f"Reading model data from {self.root}") for component in components: if not hasattr(self, f"read_{component}"): raise AttributeError( f"{type(self).__name__} does not have read_{component}" ) getattr(self, f"read_{component}")()
[docs] def write( self, components: List = [ "staticmaps", "maps", "geoms", "forcing", "states", "config", ], ) -> None: """Write the complete model schematization and configuration to model files. Parameters ---------- components : List, optional List of model components to write, each should have an associated write_<component> method. By default ['config', 'maps', 'staticmaps', 'geoms', 'forcing', 'states'] """ self.logger.info(f"Writing model data to {self.root}") for component in components: if not hasattr(self, f"write_{component}"): raise AttributeError( f"{type(self).__name__} does not have write_{component}" ) getattr(self, f"write_{component}")()
[docs] def write_data_catalog( self, root: Optional[Union[str, Path]] = None, data_lib_fn: Union[str, Path] = "hydromt_data.yml", used_only: bool = True, append: bool = True, ): """Write the data catalog to data_lib_fn. Parameters ---------- root: str, Path, optional Global root for all relative paths in yaml file. If "auto" the data source paths are relative to the yaml output ``path``. data_lib_fn: str, Path, optional Path of output yml file, absolute or relative to the model root, by default "hydromt_data.yml". used_only: bool, optional If True, export only data entries kept in used_data list. By default True append: bool, optional If True, append to an existing """ path = data_lib_fn if isabs(data_lib_fn) else join(self.root, data_lib_fn) cat = DataCatalog(logger=self.logger, fallback_lib=None) # read hydromt_data yaml file and add to data catalog if self._read and isfile(path) and append: cat.from_yml(path) # update data catalog with new used sources source_names = ( self.data_catalog._used_data if used_only else list(self.data_catalog.sources.keys()) ) if len(source_names) > 0: cat.from_dict(self.data_catalog.to_dict(source_names=source_names)) if cat.sources: self._assert_write_mode cat.to_yml(path, root=root)
# model configuration @property def config(self) -> Dict[str, Union[Dict, str]]: """Model configuration. Returns a (nested) dictionary.""" # initialize default config if in write-mode if not self._config: self.read_config() return self._config
[docs] def set_config(self, *args): """Update the config dictionary at key(s) with values. Parameters ---------- args : key(s), value tuple, with minimal length of two keys can given by multiple args: ('key1', 'key2', 'value') or a string with '.' indicating a new level: ('key1.key2', 'value') Examples -------- >> # self.config = {'a': 1, 'b': {'c': {'d': 2}}} >> set_config('a', 99) >> {'a': 99, 'b': {'c': {'d': 2}}} >> set_config('b', 'c', 'd', 99) # identical to set_config('b.d.e', 99) >> {'a': 1, 'b': {'c': {'d': 99}}} """ if len(args) < 2: raise TypeError("set_config() requires a least one key and one value.") args = list(args) value = args.pop(-1) if len(args) == 1 and "." in args[0]: args = args[0].split(".") + args[1:] branch = self.config # reads config at first call for key in args[:-1]: if key not in branch or not isinstance(branch[key], dict): branch[key] = {} branch = branch[key] branch[args[-1]] = value
[docs] def setup_config(self, **cfdict): """Update config with a dictionary.""" # TODO rename to update_config if len(cfdict) > 0: self.logger.debug("Setting model config options.") for key, value in cfdict.items(): self.set_config(key, value)
[docs] def get_config(self, *args, fallback=None, abs_path: Optional[bool] = False): """Get a config value at key(s). Parameters ---------- args : tuple or string keys can given by multiple args: ('key1', 'key2') or a string with '.' indicating a new level: ('key1.key2') fallback: any, optional fallback value if key(s) not found in config, by default None. abs_path: bool, optional If True return the absolute path relative to the model root, by deafult False. NOTE: this assumes the config is located in model root! Returns ------- value : any type dictionary value Examples -------- >> # self.config = {'a': 1, 'b': {'c': {'d': 2}}} >> get_config('a') >> 1 >> get_config('b', 'c', 'd') # identical to get_config('b.c.d') >> 2 >> get_config('b.c') # # identical to get_config('b','c') >> {'d': 2} """ args = list(args) if len(args) == 1 and "." in args[0]: args = args[0].split(".") + args[1:] branch = self.config # reads config at first call for key in args[:-1]: branch = branch.get(key, {}) if not isinstance(branch, dict): branch = dict() break value = branch.get(args[-1], fallback) if abs_path and isinstance(value, str): value = Path(value) if not value.is_absolute(): value = Path(abspath(join(self.root, value))) return value
def _configread(self, fn: str): return config.configread(fn, abs_path=False) def _configwrite(self, fn: str): return config.configwrite(fn, self.config)
[docs] def read_config(self, config_fn: Optional[str] = None): """Parse config from file. If no config file found a default config file is read in writing mode. """ prefix = "User defined" if config_fn is None: # prioritize user defined config path (new v0.4.1) if not self._read: # write-only mode > read default config config_fn = join(self._DATADIR, self._NAME, self._CONF) prefix = "Default" elif self.root is not None: # append or write mode > read model config config_fn = join(self.root, self._config_fn) prefix = "Model" cfdict = dict() if config_fn is not None: if isfile(config_fn): cfdict = self._configread(config_fn) self.logger.debug(f"{prefix} config read from {config_fn}") elif ( self._root is not None and not isabs(config_fn) and isfile(join(self._root, config_fn)) ): cfdict = self._configread(join(self.root, config_fn)) self.logger.debug( f"{prefix} config read from {join(self.root,config_fn)}" ) elif isfile(abspath(config_fn)): cfdict = self._configread(abspath(config_fn)) self.logger.debug(f"{prefix} config read from {abspath(config_fn)}") else: # skip for missing default self.logger.error(f"{prefix} config file not found at {config_fn}") self._config = cfdict
[docs] def write_config( self, config_name: Optional[str] = None, config_root: Optional[str] = None ): """Write config to <root/config_fn>.""" self._assert_write_mode if config_name is not None: self._config_fn = config_name elif self._config_fn is None: self._config_fn = self._CONF if config_root is None: config_root = self.root fn = join(config_root, self._config_fn) self.logger.info(f"Writing model config to {fn}") self._configwrite(fn)
# model static maps @property def staticmaps(self): """Model static maps. Returns xarray.Dataset, ..NOTE: will be deprecated in future versions and replaced by `grid`. """ warnings.warn( "The staticmaps property of the Model class will be deprecated in future" "versions. Use the grid property of the GridModel class instead.", DeprecationWarning, ) if len(self._staticmaps) == 0 and self._read: self.read_staticmaps() return self._staticmaps def set_staticmaps( self, data: Union[xr.DataArray, xr.Dataset], name: Optional[str] = None ): """Add data to staticmaps. All layers of staticmaps must have identical spatial coordinates. This method will be deprecated in future versions. See :py:meth:`~hydromt.models.GridModel.set_grid`. Parameters ---------- data: xarray.DataArray or xarray.Dataset new map layer to add to staticmaps name: str, optional Name of new map layer, this is used to overwrite the name of a DataArray or to select a variable from a Dataset. """ warnings.warn( "The set_staticmaps method will be deprecated in future versions, " + "use set_grid instead.", DeprecationWarning, ) if name is None: if isinstance(data, xr.DataArray) and data.name is not None: name = data.name elif not isinstance(data, xr.Dataset): raise ValueError("Setting a map requires a name") elif name is not None and isinstance(data, xr.Dataset): data_vars = list(data.data_vars) if len(data_vars) == 1 and name not in data_vars: data = data.rename_vars({data_vars[0]: name}) elif name not in data_vars: raise ValueError("Name not found in DataSet") else: data = data[[name]] if isinstance(data, xr.DataArray): data.name = name data = data.to_dataset() if len(self._staticmaps) == 0: # new data self._staticmaps = data else: if isinstance(data, np.ndarray): if data.shape != self.shape: raise ValueError("Shape of data and staticmaps do not match") data = xr.DataArray(dims=self.dims, data=data, name=name).to_dataset() for dvar in data.data_vars.keys(): if dvar in self._staticmaps: self.logger.warning(f"Replacing staticmap: {dvar}") self._staticmaps[dvar] = data[dvar] def read_staticmaps(self, fn: str = "staticmaps/staticmaps.nc", **kwargs) -> None: """Read static model maps at <root>/<fn> and add to staticmaps property. key-word arguments are passed to :py:func:`xarray.open_dataset` .. NOTE: this method is deprecated. Use the grid property of the GridMixin instead. Parameters ---------- fn : str, optional filename relative to model root, by default "staticmaps/staticmaps.nc" **kwargs: Additional keyword arguments that are passed to the `_read_nc` function. """ self._assert_read_mode for ds in self._read_nc(fn, **kwargs).values(): self.set_staticmaps(ds) def write_staticmaps(self, fn: str = "staticmaps/staticmaps.nc", **kwargs) -> None: """Write static model maps to netcdf file at <root>/<fn>. key-word arguments are passed to :py:meth:`xarray.Dataset.to_netcdf` .. NOTE: this method is deprecated. Use the grid property of the GridMixin instead. Parameters ---------- fn : str, optional filename relative to model root, by default 'staticmaps/staticmaps.nc' **kwargs: Additional keyword arguments that are passed to the `_write_nc` function. """ if len(self._staticmaps) == 0: self.logger.debug("No staticmaps data found, skip writing.") else: self._assert_write_mode # _write_nc requires dict - use dummy 'staticmaps' key nc_dict = {"staticmaps": self._staticmaps} self._write_nc(nc_dict, fn, **kwargs) # map files setup methods
[docs] def setup_maps_from_rasterdataset( self, raster_fn: Union[str, Path, xr.Dataset], variables: Optional[List] = None, fill_method: Optional[str] = None, name: Optional[str] = None, reproject_method: Optional[str] = None, split_dataset: Optional[bool] = True, rename: Optional[Dict] = dict(), ) -> List[str]: """HYDROMT CORE METHOD: Add data variable(s) from ``raster_fn`` to maps object. If raster is a dataset, all variables will be added unless ``variables`` list is specified. Adds model layers: * **raster.name** maps: data from raster_fn Parameters ---------- raster_fn: str, Path, xr.Dataset Data catalog key, path to raster file or raster xarray data object. variables: list, optional List of variables to add to maps from raster_fn. By default all. fill_method : str, optional If specified, fills nodata values using fill_nodata method. Available methods are {'linear', 'nearest', 'cubic', 'rio_idw'}. name: str, optional Name of new dataset in self.maps dictionnary, only in case split_dataset=False. reproject_method: str, optional See rasterio.warp.reproject for existing methods, by default the data is not reprojected (None). split_dataset: bool, optional If data is a xarray.Dataset split it into several xarray.DataArrays. rename: dict, optional Dictionary to rename variable names in raster_fn before adding to maps {'name_in_raster_fn': 'name_in_maps'}. By default empty. Returns ------- list Names of added model map layers """ self.logger.info(f"Preparing maps data from raster source {raster_fn}") # Read raster data and select variables ds = self.data_catalog.get_rasterdataset( raster_fn, geom=self.region, buffer=2, variables=variables, single_var_as_array=False, ) # Fill nodata if fill_method is not None: ds = ds.raster.interpolate_na(method=fill_method) # Reprojection if ds.rio.crs != self.crs and reproject_method is not None: ds = ds.raster.reproject(dst_crs=self.crs, method=reproject_method) # Rename and add to maps self.set_maps(ds.rename(rename), name=name, split_dataset=split_dataset) return list(ds.data_vars.keys())
[docs] def setup_maps_from_raster_reclass( self, raster_fn: Union[str, Path, xr.DataArray], reclass_table_fn: Union[str, Path, pd.DataFrame], reclass_variables: List, variable: Optional[str] = None, fill_method: Optional[str] = None, reproject_method: Optional[str] = None, name: Optional[str] = None, split_dataset: Optional[bool] = True, rename: Optional[Dict] = dict(), **kwargs, ) -> List[str]: """HYDROMT CORE METHOD: Add data variable(s) to maps object by reclassifying the data in ``raster_fn`` based on ``reclass_table_fn``. This is done by reclassifying the data in ``raster_fn`` based on ``reclass_table_fn``. Adds model layers: * **reclass_variables** maps: reclassified raster data Parameters ---------- raster_fn: str, Path, xr.DataArray Data catalog key, path to raster file or raster xarray data object. Should be a DataArray. Else use `variable` argument for selection. reclass_table_fn: str, Path, pd.DataFrame Data catalog key, path to tabular data file or tabular pandas dataframe object for the reclassification table of `raster_fn`. reclass_variables: list List of reclass_variables from reclass_table_fn table to add to maps. Index column should match values in `raster_fn`. variable: str, optional Name of raster dataset variable to use. This is only required when reading datasets with multiple variables. By default None. fill_method : str, optional If specified, fills nodata values in `raster_fn` using fill_nodata method before reclassifying. Available methods are {'linear', 'nearest', 'cubic', 'rio_idw'}. reproject_method: str, optional See rasterio.warp.reproject for existing methods, by default the data is not reprojected (None). name: str, optional Name of new maps variable, only in case split_dataset=False. split_dataset: bool, optional If data is a xarray.Dataset split it into several xarray.DataArrays. rename: dict, optional Dictionary to rename variable names in reclass_variables before adding to grid {'name_in_reclass_table': 'name_in_grid'}. By default empty. **kwargs: Additional keyword arguments that are passed to the `data_catalog.get_rasterdataset` function. Returns ------- list Names of added model map layers """ # noqa: E501 self.logger.info( f"Preparing map data by reclassifying the data in {raster_fn} based" f" on {reclass_table_fn}" ) # Read raster data and remapping table da = self.data_catalog.get_rasterdataset( raster_fn, geom=self.region, buffer=2, variables=variable, **kwargs ) if not isinstance(da, xr.DataArray): raise ValueError( f"raster_fn {raster_fn} should be a single variable. " "Please select one using the 'variable' argument" ) df_vars = self.data_catalog.get_dataframe( reclass_table_fn, variables=reclass_variables ) # Fill nodata if fill_method is not None: da = da.raster.interpolate_na(method=fill_method) # Mapping function ds_vars = da.raster.reclassify(reclass_table=df_vars, method="exact") # Reprojection if ds_vars.rio.crs != self.crs and reproject_method is not None: ds_vars = ds_vars.raster.reproject(dst_crs=self.crs) # Add to maps self.set_maps(ds_vars.rename(rename), name=name, split_dataset=split_dataset) return list(ds_vars.data_vars.keys())
# model map @property def maps(self) -> Dict[str, Union[xr.Dataset, xr.DataArray]]: """Model maps. Returns dict of xarray.DataArray or xarray.Dataset.""" if len(self._maps) == 0 and self._read: self.read_maps() return self._maps
[docs] def set_maps( self, data: Union[xr.DataArray, xr.Dataset], name: Optional[str] = None, split_dataset: Optional[bool] = True, ) -> None: """Add raster data to the maps component. Dataset can either be added as is (default) or split into several DataArrays using the split_dataset argument. Arguments --------- data: xarray.Dataset or xarray.DataArray New forcing data to add name: str, optional Variable name, only in case data is of type DataArray or if a Dataset is added as is (split_dataset=False). split_dataset: bool, optional If data is a xarray.Dataset split it into several xarray.DataArrays. """ data_dict = _check_data(data, name, split_dataset) for name in data_dict: if name in self._maps: self.logger.warning(f"Replacing result: {name}") self._maps[name] = data_dict[name]
[docs] def read_maps(self, fn: str = "maps/*.nc", **kwargs) -> None: """Read model map at <root>/<fn> and add to maps component. key-word arguments are passed to :py:func:`xarray.open_dataset` Parameters ---------- fn : str, optional filename relative to model root, may wildcards, by default "maps/*.nc" **kwargs: Additional keyword arguments that are passed to the `_read_nc` function. """ self._assert_read_mode ncs = self._read_nc(fn, **kwargs) for name, ds in ncs.items(): self.set_maps(ds, name=name)
[docs] def write_maps(self, fn="maps/{name}.nc", **kwargs) -> None: """Write maps to netcdf file at <root>/<fn>. key-word arguments are passed to :py:meth:`xarray.Dataset.to_netcdf` Parameters ---------- fn : str, optional filename relative to model root and should contain a {name} placeholder, by default 'maps/{name}.nc' **kwargs: Additional keyword arguments that are passed to the `_write_nc` function. """ if len(self._maps) == 0: self.logger.debug("No maps data found, skip writing.") else: self._assert_write_mode self._write_nc(self._maps, fn, **kwargs)
# model geometry files @property def geoms(self) -> Dict[str, Union[gpd.GeoDataFrame, gpd.GeoSeries]]: """Model geometries. Return dict of geopandas.GeoDataFrame or geopandas.GeoDataSeries ..NOTE: previously call staticgeoms. """ if not self._geoms and self._read: self.read_geoms() return self._geoms
[docs] def set_geoms(self, geom: Union[gpd.GeoDataFrame, gpd.GeoSeries], name: str): """Add data to the geoms attribute. Arguments --------- geom: geopandas.GeoDataFrame or geopandas.GeoSeries New geometry data to add name: str Geometry name. """ gtypes = [gpd.GeoDataFrame, gpd.GeoSeries] if not np.any([isinstance(geom, t) for t in gtypes]): raise ValueError( "First parameter map(s) should be geopandas.GeoDataFrame" " or geopandas.GeoSeries" ) if name in self._geoms: self.logger.warning(f"Replacing geom: {name}") self._geoms[name] = geom
[docs] def read_geoms(self, fn: str = "geoms/*.geojson", **kwargs) -> None: """Read model geometries files at <root>/<fn> and add to geoms property. key-word arguments are passed to :py:func:`geopandas.read_file` Parameters ---------- fn : str, optional filename relative to model root, may wildcards, by default "geoms/*.nc" **kwargs: Additional keyword arguments that are passed to the `geopandas.read_file` function. """ self._assert_read_mode fns = glob.glob(join(self.root, fn)) for fn in fns: name = basename(fn).split(".")[0] self.logger.debug(f"Reading model file {name}.") self.set_geoms(gpd.read_file(fn, **kwargs), name=name)
[docs] def write_geoms(self, fn: str = "geoms/{name}.geojson", **kwargs) -> None: """Write model geometries to a vector file (by default GeoJSON) at <root>/<fn>. key-word arguments are passed to :py:meth:`geopandas.GeoDataFrame.to_file` Parameters ---------- fn : str, optional filename relative to model root and should contain a {name} placeholder, by default 'geoms/{name}.geojson' **kwargs: Additional keyword arguments that are passed to the `geopandas.to_file` function. """ if len(self._geoms) == 0: self.logger.debug("No geoms data found, skip writing.") return self._assert_write_mode if "driver" not in kwargs: kwargs.update(driver="GeoJSON") # default for name, gdf in self._geoms.items(): if not isinstance(gdf, (gpd.GeoDataFrame, gpd.GeoSeries)) or len(gdf) == 0: self.logger.warning( f"{name} object of type {type(gdf).__name__} not recognized" ) continue self.logger.debug(f"Writing file {fn.format(name=name)}") _fn = join(self.root, fn.format(name=name)) if not isdir(dirname(_fn)): os.makedirs(dirname(_fn)) gdf.to_file(_fn, **kwargs)
@property def staticgeoms(self): """Access the geometryes. This property will be deprecated in future versions, use :py:meth:`~hydromt.Model.geom`. """ warnings.warn( "The staticgeoms method will be deprecated in future versions," " use geoms instead.", DeprecationWarning, ) if not self._geoms and self._read: self.read_staticgeoms() self._staticgeoms = self._geoms return self._staticgeoms def set_staticgeoms(self, geom: Union[gpd.GeoDataFrame, gpd.GeoSeries], name: str): """Set the geometries. This method will be deprecated in future versions, use :py:meth:`~hydromt.Model.set_geoms`. """ warnings.warn( "The set_staticgeoms method will be deprecated in future versions," " use set_geoms instead.", DeprecationWarning, ) return self.set_geoms(geom, name) def read_staticgeoms(self): """Read gemoetries from disk. This method will be deprecated in future versions use :py:meth:`~hydromt.Model.read_geoms`. """ warnings.warn( 'The read_staticgeoms" method will be deprecated in future versions," \ " use read_geoms instead.', DeprecationWarning, ) return self.read_geoms(fn="staticgeoms/*.geojson") def write_staticgeoms(self): """Write the geometries to disk. This method will be deprecated in future versions, use :py:meth:`~hydromt.Model.write_geoms`. """ warnings.warn( 'The "write_staticgeoms" method will be deprecated in future versions,"\ " use "write_geoms" instead.', DeprecationWarning, ) return self.write_geoms(fn="staticgeoms/{name}.geojson") # model forcing files @property def forcing(self) -> Dict[str, Union[xr.Dataset, xr.DataArray]]: """Model forcing. Returns dict of xarray.DataArray or xarray.Dataset.""" if not self._forcing and self._read: self.read_forcing() return self._forcing
[docs] def set_forcing( self, data: Union[xr.DataArray, xr.Dataset], name: Optional[str] = None, split_dataset: Optional[bool] = True, ): """Add data to forcing attribute. Arguments --------- data: xarray.Dataset or xarray.DataArray New forcing data to add name: str, optional Results name, required if data is xarray.Dataset is and split_dataset=False. split_dataset: bool, optional If True (default), split a Dataset to store each variable as a DataArray. """ data_dict = _check_data(data, name, split_dataset) for name in data_dict: if name in self._forcing: self.logger.warning(f"Replacing forcing: {name}") self._forcing[name] = data_dict[name]
[docs] def read_forcing(self, fn: str = "forcing/*.nc", **kwargs) -> None: """Read forcing at <root>/<fn> and add to forcing property. key-word arguments are passed to :py:func:`xarray.open_dataset` Parameters ---------- fn : str, optional filename relative to model root, may wildcards, by default "forcing/*.nc" **kwargs: Additional keyword arguments that are passed to the `_read_nc` function. """ self._assert_read_mode ncs = self._read_nc(fn, **kwargs) for name, ds in ncs.items(): self.set_forcing(ds, name=name)
[docs] def write_forcing(self, fn="forcing/{name}.nc", **kwargs) -> None: """Write forcing to netcdf file at <root>/<fn>. key-word arguments are passed to :py:meth:`xarray.Dataset.to_netcdf` Parameters ---------- fn : str, optional filename relative to model root and should contain a {name} placeholder, by default 'forcing/{name}.nc' **kwargs: Additional keyword arguments that are passed to the `_write_nc` function. """ if len(self._forcing) == 0: self.logger.debug("No forcing data found, skip writing.") else: self._assert_write_mode self._write_nc(self._forcing, fn, **kwargs)
# model state files @property def states(self) -> Dict[str, Union[xr.Dataset, xr.DataArray]]: """Model states. Returns dict of xarray.DataArray or xarray.Dataset.""" if not self._states and self._read: self.read_states() return self._states
[docs] def set_states( self, data: Union[xr.DataArray, xr.Dataset], name: Optional[str] = None, split_dataset: Optional[bool] = True, ): """Add data to states attribute. Arguments --------- data: xarray.Dataset or xarray.DataArray New forcing data to add name: str, optional Results name, required if data is xarray.Dataset and split_dataset=False. split_dataset: bool, optional If True (default), split a Dataset to store each variable as a DataArray. """ data_dict = _check_data(data, name, split_dataset) for name in data_dict: if name in self._states: self.logger.warning(f"Replacing state: {name}") self._states[name] = data_dict[name]
[docs] def read_states(self, fn: str = "states/*.nc", **kwargs) -> None: """Read states at <root>/<fn> and add to states property. key-word arguments are passed to :py:func:`xarray.open_dataset` Parameters ---------- fn : str, optional filename relative to model root, may wildcards, by default "states/*.nc" **kwargs: Additional keyword arguments that are passed to the `_read_nc` function. """ self._assert_read_mode ncs = self._read_nc(fn, **kwargs) for name, ds in ncs.items(): self.set_states(ds, name=name)
[docs] def write_states(self, fn="states/{name}.nc", **kwargs) -> None: """Write states to netcdf file at <root>/<fn>. key-word arguments are passed to :py:meth:`xarray.Dataset.to_netcdf` Parameters ---------- fn : str, optional filename relative to model root and should contain a {name} placeholder, by default 'states/{name}.nc' **kwargs: Additional keyword arguments that are passed to the `RasterDatasetAdapter` function. """ if len(self._states) == 0: self.logger.debug("No states data found, skip writing.") else: self._assert_write_mode self._write_nc(self._states, fn, **kwargs)
# model results files; NOTE we don't have a write_results method # (that's up to the model kernel) @property def results(self) -> Dict[str, Union[xr.Dataset, xr.DataArray]]: """Model results. Returns dict of xarray.DataArray or xarray.Dataset.""" if not self._results and self._read: self.read_results() return self._results
[docs] def set_results( self, data: Union[xr.DataArray, xr.Dataset], name: Optional[str] = None, split_dataset: Optional[bool] = False, ): """Add data to results attribute. Dataset can either be added as is (default) or split into several DataArrays using the split_dataset argument. Arguments --------- data: xarray.Dataset or xarray.DataArray New forcing data to add name: str, optional Results name, required if data is xarray.Dataset and split_dataset=False. split_dataset: bool, optional If True (False by default), split a Dataset to store each variable as a DataArray. """ data_dict = _check_data(data, name, split_dataset) for name in data_dict: if name in self._results: self.logger.warning(f"Replacing result: {name}") self._results[name] = data_dict[name]
[docs] def read_results(self, fn: str = "results/*.nc", **kwargs) -> None: """Read results at <root>/<fn> and add to results property. key-word arguments are passed to :py:func:`xarray.open_dataset` Parameters ---------- fn : str, optional filename relative to model root, may wildcards, by default "results/*.nc" **kwargs: Additional keyword arguments that are passed to the `_read_nc` function. """ self._assert_read_mode ncs = self._read_nc(fn, **kwargs) for name, ds in ncs.items(): self.set_results(ds, name=name)
def _write_nc( self, nc_dict: Dict[str, Union[xr.DataArray, xr.Dataset]], fn: str, gdal_compliant: bool = False, rename_dims: bool = False, force_sn: bool = False, **kwargs, ) -> None: for name, ds in nc_dict.items(): if not isinstance(ds, (xr.Dataset, xr.DataArray)) or len(ds) == 0: self.logger.error( f"{name} object of type {type(ds).__name__} not recognized" ) continue self.logger.debug(f"Writing file {fn.format(name=name)}") _fn = join(self.root, fn.format(name=name)) if not isdir(dirname(_fn)): os.makedirs(dirname(_fn)) if gdal_compliant: ds = ds.raster.gdal_compliant( rename_dims=rename_dims, force_sn=force_sn ) ds.to_netcdf(_fn, **kwargs) # general reader & writer def _read_nc( self, fn: str, mask_and_scale=False, single_var_as_array=True, **kwargs ) -> Dict[str, xr.Dataset]: ncs = dict() fns = glob.glob(join(self.root, fn)) if "chunks" not in kwargs: # read lazy by default kwargs.update(chunks="auto") for fn in fns: name = basename(fn).split(".")[0] self.logger.debug(f"Reading model file {name}.") ds = xr.open_dataset(fn, mask_and_scale=mask_and_scale, **kwargs) # set geo coord if present as coordinate of dataset if GEO_MAP_COORD in ds.data_vars: ds = ds.set_coords(GEO_MAP_COORD) # single-variable Dataset to DataArray if single_var_as_array and len(ds.data_vars) == 1: (ds,) = ds.data_vars.values() ncs.update({name: ds}) return ncs ## properties / methods below can be used directly in actual class @property def crs(self) -> CRS: """Returns coordinate reference system embedded in region.""" if len(self._staticmaps) > 0: return self.staticmaps.raster.crs else: return self.region.crs def set_crs(self, crs) -> None: """Set the coordinate reference system.""" warnings.warn( '"set_crs" is deprecated. Please set the crs of all model' " components instead.", DeprecationWarning, ) if len(self._staticmaps) > 0: return self.staticmaps.raster.set_crs(crs) @property def dims(self) -> Tuple: """Returns spatial dimension names of staticmaps. ..NOTE: will be deprecated in future versions. """ if len(self._staticmaps) > 0: return self.staticmaps.raster.dims @property def coords(self) -> Dict: """Returns the coordinates of model staticmaps. ..NOTE: will be deprecated in future versions. """ if len(self._staticmaps) > 0: return self.staticmaps.raster.coords @property def res(self) -> Tuple: """Returns the resolution of the model staticmaps. ..NOTE: will be deprecated in future versions. """ if len(self._staticmaps) > 0: return self.staticmaps.raster.res @property def transform(self): """Returns the geospatial transform of the model staticmaps. ..NOTE: will be deprecated in future versions. """ if len(self._staticmaps) > 0: return self.staticmaps.raster.transform @property def width(self): """Returns the width of the model staticmaps. ..NOTE: will be deprecated in future versions. """ if len(self._staticmaps) > 0: return self.staticmaps.raster.width @property def height(self): """Returns the height of the model staticmaps. ..NOTE: will be deprecated in future versions. """ if len(self._staticmaps) > 0: return self.staticmaps.raster.height @property def shape(self) -> Tuple: """Returns the shape of the model staticmaps. ..NOTE: will be deprecated in future versions. """ if len(self._staticmaps) > 0: return self.staticmaps.raster.shape @property def bounds(self) -> Tuple: """Returns the bounding box of the model region.""" if len(self._staticmaps) > 0: return self.staticmaps.raster.bounds else: return self.region.total_bounds @property def region(self) -> gpd.GeoDataFrame: """Returns the geometry of the model area of interest.""" region = gpd.GeoDataFrame() if "region" in self.geoms: region = self.geoms["region"] # TODO: For now stays here but move to grid in GridModel and delete elif len(self.staticmaps) > 0: warnings.warn( 'Defining "region" based on staticmaps will be deprecated. Either use' " region from GridModel or define your own method.", DeprecationWarning, ) crs = self.staticmaps.raster.crs if crs is None and hasattr(crs, "to_epsg"): crs = crs.to_epsg() # not all CRS have an EPSG code region = gpd.GeoDataFrame( geometry=[box(*self.staticmaps.raster.bounds)], crs=crs ) return region # test methods def test_model_api(self): """Test compliance with HydroMT Model API.""" warnings.warn( '"test_model_api" is now part of the internal API, use "_test_model_api"' " instead.", DeprecationWarning, ) return self._test_model_api() def _test_model_api(self) -> List: """Test compliance with HydroMT Model API. Returns ------- non_compliant: list List of model components that are non-compliant with the model API structure. """ non_compliant = [] for component, dtype in self.api.items(): obj = getattr(self, component, None) try: assert obj is not None, component _assert_isinstance(obj, dtype, component) except AssertionError as err: non_compliant.append(str(err)) return non_compliant def _test_equal(self, other, skip_component=["root"]) -> Tuple[bool, Dict]: """Test if two models including their data components are equal. Parameters ---------- other : Model (or subclass) Model to compare against skip_component: list List of components to skip when testing equality. By default root. Returns ------- equal: bool True if equal errors: dict Dictionary with errors per model component which is not equal """ assert isinstance(other, type(self)) components = list(self.api.keys()) components_other = list(other.api.keys()) assert components == components_other for cp in skip_component: if cp in components: components.remove(cp) errors = {} for prop in components: errors.update( **_check_equal(getattr(self, prop), getattr(other, prop), prop) ) return len(errors) == 0, errors
def _check_data( data: Union[xr.DataArray, xr.Dataset], name: Optional[str] = None, split_dataset=True, ) -> Dict: if isinstance(data, xr.DataArray): # NOTE name can be different from data.name ! if data.name is None and name is not None: data.name = name elif name is None and data.name is not None: name = data.name elif data.name is None and name is None: raise ValueError("Name required for DataArray.") data = {name: data} elif isinstance(data, xr.Dataset): # return dict for consistency if split_dataset: data = {name: data[name] for name in data.data_vars} elif name is None: raise ValueError("Name required for Dataset.") else: data = {name: data} else: raise ValueError(f'Data type "{type(data).__name__}" not recognized') return data def _assert_isinstance(obj: Any, dtype: Any, name: str = ""): """Check if obj match typing or class (dtype).""" args = typing.get_args(dtype) _cls = typing.get_origin(dtype) if len(args) == 0 and dtype != Any and dtype is not None: assert isinstance(obj, dtype), name elif _cls == Union: assert isinstance(obj, args), name elif _cls is not None: assert isinstance(obj, _cls), name # recursive check of dtype dict keys and values if len(args) > 0 and _cls is dict: for key, val in obj.items(): _assert_isinstance(key, args[0], f"{name}.{str(key)}") _assert_isinstance(val, args[1], f"{name}.{str(key)}") def _check_equal(a, b, name="") -> Dict[str, str]: """Recursive test of model components. Returns dict with component name and associated error message. """ errors = {} try: assert isinstance(b, type(a)), "property types do not match" if isinstance(a, dict): for key in a: assert key in b, f"{key} missing" errors.update(**_check_equal(a[key], b[key], f"{name}.{key}")) elif isinstance(a, (xr.DataArray, xr.Dataset)): xr.testing.assert_allclose(a, b) elif isinstance(a, gpd.GeoDataFrame): assert_geodataframe_equal(a, b, check_like=True, check_less_precise=True) elif isinstance(a, np.ndarray): np.testing.assert_allclose(a, b) else: assert a == b, "values not equal" except AssertionError as e: errors.update({name: e}) return errors