Source code for hydromt.data_adapter.data_adapter

"""DataAdapter class."""
from __future__ import annotations

import logging
import os
import pathlib
import warnings
from abc import ABCMeta, abstractmethod
from itertools import product
from string import Formatter
from typing import Optional, Union

import fsspec
import numpy as np
import pandas as pd
import xarray as xr
import yaml

# from pystac import Item as StacItem
from pystac import Catalog as StacCatalog
from upath import UPath

from hydromt.nodata import NoDataStrategy
from hydromt.typing import (
    Bbox,
    Data,
    ErrorHandleMethod,
    Geom,
    GeomBuffer,
    StrPath,
    TimeRange,
    Variables,
)

logger = logging.getLogger(__name__)


__all__ = ["DataAdapter"]


def round_latlon(ds, decimals=5):
    x_dim = ds.raster.x_dim
    y_dim = ds.raster.y_dim
    ds[x_dim] = np.round(ds[x_dim], decimals=decimals)
    ds[y_dim] = np.round(ds[y_dim], decimals=decimals)
    return ds


def to_datetimeindex(ds):
    if ds.indexes["time"].dtype == "O":
        ds["time"] = ds.indexes["time"].to_datetimeindex()
    return ds


def remove_duplicates(ds):
    return ds.sel(time=~ds.get_index("time").duplicated())


def harmonise_dims(ds):
    """Harmonise lon-lat-time dimensions.

    Where needed:
        - lon: Convert longitude coordinates from 0-360 to -180-180
        - lat: Do N->S orientation instead of S->N
        - time: Convert to datetimeindex.

    Parameters
    ----------
    ds: xr.DataSet
        DataSet with dims to harmonise

    Returns
    -------
    ds: xr.DataSet
        DataSet with harmonised longitude-latitude-time dimensions
    """
    # Longitude
    x_dim = ds.raster.x_dim
    lons = ds[x_dim].values
    if np.any(lons > 180):
        ds[x_dim] = xr.Variable(x_dim, np.where(lons > 180, lons - 360, lons))
        ds = ds.sortby(x_dim)
    # Latitude
    y_dim = ds.raster.y_dim
    if np.diff(ds[y_dim].values)[0] > 0:
        ds = ds.reindex({y_dim: ds[y_dim][::-1]})
    # Final check for lat-lon
    assert (
        np.diff(ds[y_dim].values)[0] < 0
    ), "orientation not N->S after get_data preprocess set_lon_lat_axis"
    assert (
        np.diff(ds[x_dim].values)[0] > 0
    ), "orientation not W->E after get_data preprocess set_lon_lat_axis"
    # Time
    if ds.indexes["time"].dtype == "O":
        ds = to_datetimeindex(ds)

    return ds


PREPROCESSORS = {
    "round_latlon": round_latlon,
    "to_datetimeindex": to_datetimeindex,
    "remove_duplicates": remove_duplicates,
    "harmonise_dims": harmonise_dims,
}


[docs] class DataAdapter(object, metaclass=ABCMeta): """General Interface to data source for HydroMT.""" _DEFAULT_DRIVER = None # placeholder _DRIVERS = {}
[docs] def __init__( self, path: StrPath, driver: Optional[str] = None, filesystem: Optional[str] = None, nodata: Optional[Union[dict, float, int]] = None, rename: Optional[dict] = None, unit_mult: Optional[dict] = None, unit_add: Optional[dict] = None, meta: Optional[dict] = None, attrs: Optional[dict] = None, driver_kwargs: Optional[dict] = None, storage_options: Optional[dict] = None, name: str = "", catalog_name: str = "", provider: Optional[str] = None, version: Optional[str] = None, ): """General Interface to data source for HydroMT. Parameters ---------- path: str, Path Path to data source. If the dataset consists of multiple files, the path may contain {variable}, {year}, {month} placeholders as well as path search pattern using a '*' wildcard. driver: {'vector', 'netcdf', 'zarr'}, optional Driver to read files with, for 'vector' :py:func:`~hydromt.io.open_geodataset`, for 'netcdf' :py:func:`xarray.open_mfdataset`. By default the driver is inferred from the file extension and falls back to 'vector' if unknown. filesystem: str, optional Filesystem where the data is stored (local, cloud, http etc.). If None (default) the filesystem is inferred from the path. See :py:func:`fsspec.registry.known_implementations` for all options. nodata: float, int, optional Missing value number. Only used if the data has no native missing value. Nodata values can be differentiated between variables using a dictionary. rename: dict, optional Mapping of native data source variable to output source variable name as required by hydroMT. unit_mult, unit_add: dict, optional Scaling multiplication and addition to change to map from the native data unit to the output data unit as required by hydroMT. meta: dict, optional Metadata information of dataset, prefably containing the following keys: {'source_version', 'source_url', 'source_license', 'paper_ref', 'paper_doi', 'category'} placeholders: dict, optional Placeholders to expand yaml entry to multiple entries (name and path) based on placeholder values attrs: dict, optional Additional attributes relating to data variables. For instance unit or long name of the variable. driver_kwargs, dict, optional Additional key-word arguments passed to the driver. storage_options: dict, optional Additional key-word arguments passed to the fsspec FileSystem object. name, catalog_name: str, optional Name of the dataset and catalog, optional for now. """ unit_mult = unit_mult or {} unit_add = unit_add or {} meta = meta or {} attrs = attrs or {} driver_kwargs = driver_kwargs or {} storage_options = storage_options or {} rename = rename or {} self.name = name self.catalog_name = catalog_name self.provider = provider self.version = str(version) if version is not None else None # version as str # general arguments self.path = path # driver and driver keyword-arguments # check for non default driver based on extension if driver is None: driver = self._DRIVERS.get( str(path).split(".")[-1].lower(), self._DEFAULT_DRIVER ) self.driver = driver if "storage_options" in driver_kwargs: warnings.warn( "storage_options should be provided as a separate argument, " "not as part of driver_kwargs", DeprecationWarning, stacklevel=2, ) storage_options.update(driver_kwargs.pop("storage_options")) self.driver_kwargs = driver_kwargs self.storage_options = storage_options # get filesystem self.filesystem = filesystem self._fs = None # placeholder for fsspec filesystem object # data adapter arguments self.nodata = nodata self.rename = rename self.unit_mult = unit_mult self.unit_add = unit_add # meta data self.meta = {k: v for k, v in meta.items() if v is not None} # variable attributes self.attrs = {k: v for k, v in attrs.items() if v is not None} # keep track of wether the data is used self._used = False
@property def data_type(self): """Return the datatype of the addapter.""" return type(self).__name__.replace("Adapter", "") def mark_as_used(self): """Mark the data adapter as used.""" self._used = True
[docs] def summary(self): """Return a dictionary summary of the data adapter.""" return dict( path=self.path, data_type=self.data_type, driver=self.driver, **self.meta, )
[docs] def to_dict(self): """Return a dictionary view of the data source. Can be used to initialize the data adapter. """ source = dict(data_type=self.data_type) for k, v in vars(self).items(): if k in ["name", "catalog_name"] or k.startswith("_"): continue # do not add these identifiers if v is not None and (not isinstance(v, dict) or len(v) > 0): source.update({k: v}) return source
def __str__(self): """Return string representation of self in yaml.""" return yaml.dump(self.to_dict()) def __repr__(self): """Pretty print string representation of self.""" return self.__str__() def __eq__(self, other: object) -> bool: """Return True if self and other are equal.""" if type(other) is type(self): return self.to_dict() == other.to_dict() else: return False @property def fs(self): """Return the filesystem object .""" if self._fs is not None: return self._fs fs = None if self.filesystem: # use filesystem property protocols = {"local": "file"} protocol = protocols.get(self.filesystem, self.filesystem) else: # infer filesystem from path upath = UPath(self.path) if hasattr(upath, "fs"): fs = upath.fs else: protocol = "file" if hasattr(upath, "protocol"): protocol = upath.protocol elif not isinstance(upath, (pathlib.PosixPath, pathlib.WindowsPath)): warnings.warn( "No filesystem found for path, using default local filesystem", stacklevel=2, ) if fs is None: fs = fsspec.filesystem(protocol, **self.storage_options) self._fs = fs return fs def _resolve_paths( self, time_tuple: Optional[TimeRange] = None, variables: Optional[Variables] = None, zoom_level: Optional[int] = 0, ): """Resolve {year}, {month} and {variable} keywords in self.path. Keywords are based on 'time_tuple' and 'variables'. Parameters ---------- time_tuple : tuple of str, optional Start and end date in string format understood by :py:func:`pandas.to_timedelta`, by default None variables : list of str, optional List of variable names, by default None zoom_level : int Parsed zoom level to use, by default 0 See :py:meth:`RasterDataAdapter._parse_zoom_level` for more info logger: The logger to use. If none is provided, the devault logger will be used. Returns ------- fns: list of str list of filenames matching the path pattern given date range and variables """ known_keys = ["year", "month", "zoom_level", "variable"] fns = [] keys = [] # rebuild path based on arguments and escape unknown keys if "{" in str(self.path): path = "" for literal_text, key, fmt, _ in Formatter().parse(str(self.path)): path += literal_text if key is None: continue key_str = "{" + f"{key}:{fmt}" + "}" if fmt else "{" + key + "}" # remove unused fields if key in ["year", "month"] and time_tuple is None: path += "*" elif key == "variable" and variables is None: path += "*" # escape unknown fields elif key is not None and key not in known_keys: path = path + "{" + key_str + "}" else: path = path + key_str keys.append(key) else: path = self.path # expand path based on keys for dates, variables and zoomlevel dates, vrs, postfix = [None], [None], "" fmt = {} if len(keys) > 0: # resolve dates: month & year keys if time_tuple is not None: dt = pd.to_timedelta(self.unit_add.get("time", 0), unit="s") t_range = pd.to_datetime(list(time_tuple)) - dt freq, strf = ("m", "%Y-%m") if "month" in keys else ("a", "%Y") dates = pd.period_range(*t_range, freq=freq) t_range_str = [t.strftime(strf) for t in t_range] postfix += "; date range: " + " - ".join(t_range_str) # resolve variables if variables is not None: variables = np.atleast_1d(variables).tolist() mv_inv = {v: k for k, v in self.rename.items()} vrs = [mv_inv.get(var, var) for var in variables] # type: ignore postfix += f"; variables: {variables}" # update based on zoomlevel (size = 1) if "zoom_level" in keys: zoom_level = zoom_level or 0 fmt.update(zoom_level=zoom_level) # update based on dates and variables (size >= 1) for date, var in product(dates, vrs): if date is not None: fmt.update(year=date.year, month=date.month) if var is not None: fmt.update(variable=var) fns.append(str(path).format(**fmt)) else: fns.append(path) protocol = str(path).split("://")[0] if "://" in str(path) else "" if protocol and self.filesystem is None: self.filesystem = protocol # For s3, anonymous connection still requires --no-sign-request profile to # read the data setting environment variable works # assume anonymous connection if storage_options is not set if protocol in ["s3"]: if not self.storage_options: self.storage_options = {"anon": True} if self.storage_options.get("anon", False): os.environ["AWS_NO_SIGN_REQUEST"] = "YES" else: os.environ["AWS_NO_SIGN_REQUEST"] = "NO" # expand path with glob and check if files exists fns_out = [] for fn in list(set(fns)): if "*" in str(fn): for fn0 in self.fs.glob(fn): if protocol and not str(fn0).startswith(protocol): fn0 = f"{protocol}://{fn0}" fns_out.append(UPath(fn0, **self.storage_options)) elif self.fs.exists(fn): fns_out.append(UPath(fn, **self.storage_options)) if len(fns_out) == 0: raise FileNotFoundError(f"No such file found: {path}{postfix}") return fns_out @abstractmethod def get_data( self, bbox: Optional[Bbox], geom: Optional[Geom], buffer: Optional[GeomBuffer], handle_nodata: NoDataStrategy = NoDataStrategy.RAISE, ): """Return a view (lazy if possible) of the data with standardized field names. If bbox of mask are given, clip data to that extent. """ @staticmethod def _single_var_as_array( ds: Data, single_var_as_array: bool, variable_name: Optional[str] = None ) -> Data: # return data array if single variable dataset dvars = list(ds.data_vars.keys()) if single_var_as_array and len(dvars) == 1: da = ds[dvars[0]] if isinstance(variable_name, list) and len(variable_name) == 1: da.name = variable_name[0] elif isinstance(variable_name, str): da.name = variable_name return da else: return ds @abstractmethod def to_stac_catalog( self, on_error: ErrorHandleMethod = ErrorHandleMethod.COERCE, ) -> Optional[StacCatalog]: """Create a stac item from the data adapter to be added to a stac catalog."""