Source code for hydromt.data_adapter.geodataset

"""Implementation for the geodataset DataAdapter."""
import warnings
from datetime import datetime
from logging import Logger, getLogger
from os.path import basename, splitext
from typing import Any, Dict, List, Optional, Tuple, Union

import numpy as np
import pandas as pd
import pyproj
import xarray as xr
from pyproj.exceptions import CRSError
from pystac import Asset as StacAsset
from pystac import Catalog as StacCatalog
from pystac import Item as StacItem
from pystac import MediaType

from hydromt.nodata import NoDataException, NoDataStrategy, _exec_nodata_strat
from hydromt.typing import (
    Bbox,
    Data,
    ErrorHandleMethod,
    GeoDatasetSource,
    Geom,
    GeomBuffer,
    Predicate,
    StrPath,
    TimeRange,
    TotalBounds,
    Variables,
)
from hydromt.utils import has_no_data

from .. import gis_utils, io
from ..raster import GEO_MAP_COORD
from .data_adapter import DataAdapter
from .utils import netcdf_writer, shift_dataset_time, zarr_writer

logger = getLogger(__name__)

__all__ = ["GeoDatasetAdapter", "GeoDatasetSource"]


[docs] class GeoDatasetAdapter(DataAdapter): """DatasetAdapter for GeoDatasets.""" _DEFAULT_DRIVER = "vector" _DRIVERS = { "nc": "netcdf", }
[docs] def __init__( self, path: StrPath, driver: Optional[str] = None, filesystem: Optional[str] = None, crs: Optional[Union[int, str, dict]] = None, nodata: Optional[Union[dict, float, int]] = None, rename: Optional[dict] = None, unit_mult: Optional[dict] = None, unit_add: Optional[dict] = None, meta: Optional[dict] = None, attrs: Optional[dict] = None, extent: Optional[dict] = None, driver_kwargs: Optional[dict] = None, storage_options: Optional[dict] = None, name: str = "", catalog_name: str = "", provider: Optional[str] = None, version: Optional[str] = None, **kwargs, ): """Initiate data adapter for geospatial timeseries data. This object contains all properties required to read supported files into a single unified GeoDataset, i.e. :py:class:`xarray.Dataset` with geospatial point geometries. In addition it keeps meta data to be able to reproduce which data is used. Parameters ---------- path: str, Path Path to data source. If the dataset consists of multiple files, the path may contain {variable}, {year}, {month} placeholders as well as path search pattern using a ``*`` wildcard. driver: {'vector', 'netcdf', 'zarr'}, optional Driver to read files with, for 'vector' :py:func:`~hydromt.io.open_geodataset`, for 'netcdf' :py:func:`xarray.open_mfdataset`. By default the driver is inferred from the file extension and falls back to 'vector' if unknown. filesystem: str, optional Filesystem where the data is stored (local, cloud, http etc.). If None (default) the filesystem is inferred from the path. See :py:func:`fsspec.registry.known_implementations` for all options. crs: int, dict, or str, optional Coordinate Reference System. Accepts EPSG codes (int or str); proj (str or dict) or wkt (str). Only used if the data has no native CRS. nodata: float, int, optional Missing value number. Only used if the data has no native missing value. Nodata values can be differentiated between variables using a dictionary. rename: dict, optional Mapping of native data source variable to output source variable name as required by hydroMT. unit_mult, unit_add: dict, optional Scaling multiplication and addition to change to map from the native data unit to the output data unit as required by hydroMT. meta: dict, optional Metadata information of dataset, prefably containing the following keys: - 'source_version' - 'source_url' - 'source_license' - 'paper_ref' - 'paper_doi' - 'category' placeholders: dict, optional Placeholders to expand yaml entry to multiple entries (name and path) based on placeholder values attrs: dict, optional Additional attributes relating to data variables. For instance unit or long name of the variable. extent: Extent(typed dict), Optional Dictionary describing the spatial and time range the dataset covers. should be of the form: - "bbox": [xmin, ymin, xmax, ymax], - "time_range": [start_datetime, end_datetime], data, and time_range should be inclusive on both sides. driver_kwargs, dict, optional Additional key-word arguments passed to the driver. storage_options: dict, optional Additional key-word arguments passed to the fsspec FileSystem object. name, catalog_name: str, optional Name of the dataset and catalog, optional. provider: str, optional A name to identifiy the specific provider of the dataset requested. if None is provided, the last added source will be used. version: str, optional A name to identifiy the specific version of the dataset requested. if None is provided, the last added source will be used. """ driver_kwargs = driver_kwargs or {} extent = extent or {} if kwargs: warnings.warn( "Passing additional keyword arguments to be used by the " "GeoDatasetAdapter driver is deprecated and will be removed " "in a future version. Please use 'driver_kwargs' instead.", DeprecationWarning, stacklevel=2, ) driver_kwargs.update(kwargs) super().__init__( path=path, driver=driver, filesystem=filesystem, nodata=nodata, rename=rename, unit_mult=unit_mult, unit_add=unit_add, meta=meta, attrs=attrs, driver_kwargs=driver_kwargs, storage_options=storage_options, name=name, catalog_name=catalog_name, provider=provider, version=version, ) self.crs = crs self.extent = extent
[docs] def to_file( self, data_root: StrPath, data_name: str, bbox: Optional[Bbox] = None, time_tuple: Optional[TimeRange] = None, variables: Optional[List[str]] = None, driver: Optional[str] = None, handle_nodata: NoDataStrategy = NoDataStrategy.RAISE, logger: Logger = logger, **kwargs, ) -> Optional[Tuple[StrPath, Optional[str], Dict[str, Any]]]: """Save a data slice to file. Parameters ---------- data_root : str, Path Path to output folder data_name : str Name of output file without extension. bbox : array-like of floats (xmin, ymin, xmax, ymax) bounding box of area of interest. time_tuple : tuple of str, datetime, optional Start and end date of period of interest. By default the entire time period of the dataset is returned. driver : str, optional Driver to write file, e.g.: 'netcdf', 'zarr', by default None variables : list of str, optional Names of GeoDataset variables to return. By default all dataset variables are returned. **kwargs Additional keyword arguments that are passed to the `to_zarr` function. Returns ------- fn_out: str Absolute path to output file driver: str Name of driver to read data with, see :py:func:`~hydromt.data_catalog.DataCatalog.get_geodataset` """ obj = self.get_data( bbox=bbox, time_tuple=time_tuple, variables=variables, handle_nodata=handle_nodata, logger=logger, single_var_as_array=variables is None, ) # we'll handle the correct strategy in the except clause if obj is None: return None read_kwargs = {} # much better for mem/storage/processing if dtypes are set correctly for name, coord in obj.coords.items(): if coord.values.dtype != object: continue # not sure if coordinates values of different dtypes # are possible, but let's just hope users aren't # that mean for now. if isinstance(coord.values[0], str): obj[name] = obj[name].astype(str) if driver is None or driver == "netcdf": # always write netcdf fn_out = netcdf_writer( obj=obj, data_root=data_root, data_name=data_name, variables=variables, ) elif driver == "zarr": fn_out = zarr_writer( obj=obj, data_root=data_root, data_name=data_name, **kwargs ) else: raise ValueError(f"GeoDataset: Driver {driver} unknown.") return fn_out, driver, read_kwargs
[docs] def get_data( self, bbox: Optional[Bbox] = None, geom: Optional[Geom] = None, buffer: Optional[GeomBuffer] = 0, predicate: Predicate = "intersects", variables: Optional[Variables] = None, time_tuple: Optional[TimeRange] = None, single_var_as_array: bool = True, handle_nodata: NoDataStrategy = NoDataStrategy.RAISE, logger: Logger = logger, ) -> Optional[Data]: """Return a clipped, sliced and unified GeoDataset. For a detailed description see: :py:func:`~hydromt.data_catalog.DataCatalog.get_geodataset` """ try: # load data fns = self._resolve_paths(variables=variables, time_tuple=time_tuple) self.mark_as_used() # mark used ds = self._read_data(fns, logger=logger) if ds is None: raise NoDataException() # rename variables and parse data and attrs ds = self._rename_vars(ds) ds = self._validate_spatial_coords(ds) ds = self._set_crs(ds, logger=logger) ds = self._set_nodata(ds) ds = self._shift_time(ds, logger=logger) # slice ds = GeoDatasetAdapter._slice_data( ds, variables, geom, bbox, buffer, predicate, time_tuple, logger=logger, ) if ds is None: raise NoDataException() # uniformize ds = self._apply_unit_conversion(ds, logger=logger) ds = self._set_metadata(ds) # return array if single var and single_var_as_array return self._single_var_as_array(ds, single_var_as_array, variables) except NoDataException: _exec_nodata_strat( f"No data was read from source: {self.name}", strategy=handle_nodata, logger=logger, ) return None
def _read_data( self, fns: List[StrPath], logger: Logger = logger, ) -> Data: kwargs = self.driver_kwargs.copy() if len(fns) > 1 and self.driver in ["vector", "zarr"]: raise ValueError( f"GeoDataset: Reading multiple {self.driver} files is not supported." ) logger.info(f"Reading {self.name} {self.driver} data from {self.path}") if self.driver in ["netcdf"]: ds = xr.open_mfdataset(fns, **kwargs) elif self.driver == "zarr": ds = xr.open_zarr(fns[0], **kwargs) elif self.driver == "vector": ds = io.open_geodataset(fn_locs=fns[0], crs=self.crs, **kwargs) else: raise ValueError(f"GeoDataset: Driver {self.driver} unknown") if has_no_data(ds): raise NoDataException() return ds def _rename_vars(self, ds: Data) -> Data: rm = {k: v for k, v in self.rename.items() if k in ds} ds = ds.rename(rm) return ds def _validate_spatial_coords(self, ds: Data) -> Data: if GEO_MAP_COORD in ds.data_vars: ds = ds.set_coords(GEO_MAP_COORD) try: ds.vector.set_spatial_dims() idim = ds.vector.index_dim if idim not in ds: # set coordinates for index dimension if missing ds[idim] = xr.IndexVariable(idim, np.arange(ds.dims[idim])) coords = [ds.vector.x_name, ds.vector.y_name, idim] coords = [item for item in coords if item is not None] ds = ds.set_coords(coords) except ValueError: raise ValueError( f"GeoDataset: No spatial geometry dimension found in data {self.path}" ) return ds def _set_crs(self, ds: Data, logger: Logger = logger) -> Data: # set crs if ds.vector.crs is None and self.crs is not None: ds.vector.set_crs(self.crs) elif ds.vector.crs is None: raise ValueError( f"GeoDataset {self.name}: CRS not defined in data catalog or data." ) elif self.crs is not None and ds.vector.crs != pyproj.CRS.from_user_input( self.crs ): logger.warning( f"GeoDataset {self.name}: CRS from data catalog does not match CRS of" " data. The original CRS will be used. Please check your data catalog." ) return ds @staticmethod def _slice_data( ds: Data, variables: Optional[Variables] = None, geom: Optional[Geom] = None, bbox: Optional[Bbox] = None, buffer: Optional[GeomBuffer] = 0, predicate: Predicate = "intersects", time_tuple: Optional[TimeRange] = None, logger: Logger = logger, ) -> Optional[Data]: """Slice the dataset in space and time. Arguments --------- ds : xarray.Dataset or xarray.DataArray The GeoDataset to slice. variables : str or list of str, optional. Names of variables to return. geom : geopandas.GeoDataFrame/Series, A geometry defining the area of interest. bbox : array-like of floats (xmin, ymin, xmax, ymax) bounding box of area of interest (in WGS84 coordinates). buffer : float, optional Buffer distance [m] applied to the geometry or bbox. By default 0 m. predicate : str, optional Predicate used to filter the GeoDataFrame, see :py:func:`hydromt.gis_utils.filter_gdf` for details. time_tuple : tuple of str, datetime, optional Start and end date of period of interest. By default the entire time period of the dataset is returned. Returns ------- ds : xarray.Dataset The sliced GeoDataset. """ if isinstance(ds, xr.DataArray): if ds.name is None: # dummy name, required to create dataset # renamed to variable in _single_var_as_array ds.name = "data" ds = ds.to_dataset() elif variables is not None: variables = np.atleast_1d(variables).tolist() if len(variables) > 1 or len(ds.data_vars) > 1: mvars = [var not in ds.data_vars for var in variables] if any(mvars): raise ValueError(f"GeoDataset: variables not found {mvars}") ds = ds[variables] if time_tuple is not None: ds = GeoDatasetAdapter._slice_temporal_dimension( ds, time_tuple, logger=logger ) if geom is not None or bbox is not None: ds = GeoDatasetAdapter._slice_spatial_dimension( ds, geom, bbox, buffer, predicate, logger=logger, ) if has_no_data(ds): return None else: return ds @staticmethod def _slice_spatial_dimension( ds: Data, geom: Optional[Geom], bbox: Optional[Bbox], buffer: Optional[GeomBuffer], predicate: Predicate, logger: Logger = logger, ) -> Optional[Data]: geom = gis_utils.parse_geom_bbox_buffer(geom, bbox, buffer) bbox_str = ", ".join([f"{c:.3f}" for c in geom.total_bounds]) epsg = geom.crs.to_epsg() logger.debug(f"Clip {predicate} [{bbox_str}] (EPSG:{epsg})") ds = ds.vector.clip_geom(geom, predicate=predicate) if has_no_data(ds): return None else: return ds def _shift_time(self, ds: Data, logger: Logger = logger) -> Data: dt = self.unit_add.get("time", 0) return shift_dataset_time(dt=dt, ds=ds, logger=logger) @staticmethod def _slice_temporal_dimension( ds: Data, time_tuple: Optional[TimeRange], logger: Logger = logger, ) -> Optional[Data]: if ( "time" in ds.dims and ds["time"].size > 1 and np.issubdtype(ds["time"].dtype, np.datetime64) ): logger.debug(f"Slicing time dim {time_tuple}") ds = ds.sel(time=slice(*time_tuple)) if has_no_data(ds): return None else: return ds def _set_metadata(self, ds: Data) -> Data: if self.attrs: if isinstance(ds, xr.DataArray): ds.attrs.update(self.attrs[ds.name]) else: for k in self.attrs: ds[k].attrs.update(self.attrs[k]) ds.attrs.update(self.meta) return ds def _set_nodata(self, ds: Data) -> Data: if self.nodata is not None: if not isinstance(self.nodata, dict): nodata = {k: self.nodata for k in ds.data_vars.keys()} else: nodata = self.nodata for k in ds.data_vars: mv = nodata.get(k, None) if mv is not None and ds[k].vector.nodata is None: ds[k].vector.set_nodata(mv) return ds def _apply_unit_conversion(self, ds: Data, logger: Logger = logger) -> Data: unit_names = list(self.unit_mult.keys()) + list(self.unit_add.keys()) unit_names = [k for k in unit_names if k in ds.data_vars] if len(unit_names) > 0: logger.debug(f"Convert units for {len(unit_names)} variables.") for name in list(set(unit_names)): # unique m = self.unit_mult.get(name, 1) a = self.unit_add.get(name, 0) da = ds[name] attrs = da.attrs.copy() nodata_isnan = da.vector.nodata is None or np.isnan(da.vector.nodata) # nodata value is explicitly set to NaN in case no nodata value is provided nodata = np.nan if nodata_isnan else da.vector.nodata data_bool = ~np.isnan(da) if nodata_isnan else da != nodata ds[name] = xr.where(data_bool, da * m + a, nodata) ds[name].attrs.update(attrs) # set original attributes return ds
[docs] def get_bbox( self, detect: bool = True, ) -> Optional[TotalBounds]: """Return the bounding box and espg code of the dataset. if the bounding box is not set and detect is True, :py:meth:`hydromt.GeoDatasetAdapter.detect_bbox` will be used to detect it. Parameters ---------- detect: bool, Optional whether to detect the bounding box if it is not set. If False, and it's not set None will be returned. Returns ------- bbox: Tuple[np.float64,np.float64,np.float64,np.float64] the bounding box coordinates of the data. coordinates are returned as [xmin,ymin,xmax,ymax] crs: int The ESPG code of the CRS of the coordinates returned in bbox """ bbox = self.extent.get("bbox", None) if bbox is None and detect: bbox, crs = self.detect_bbox() crs = self.crs return bbox, crs
[docs] def get_time_range( self, detect: bool = True, ) -> Optional[TimeRange]: """Detect the time range of the dataset. if the time range is not set and detect is True, :py:meth:`hydromt.GeoDatasetAdapter.detect_time_range` will be used to detect it. Parameters ---------- detect: bool, Optional whether to detect the time range if it is not set. If False, and it's not set None will be returned. Returns ------- range: Tuple[np.datetime64, np.datetime64] A tuple containing the start and end of the time dimension. Range is inclusive on both sides. """ time_range = self.extent.get("time_range", None) if time_range is None and detect: time_range = self.detect_time_range() return time_range
[docs] def detect_bbox( self, ds: Optional[Data] = None, ) -> Optional[TotalBounds]: """Detect the bounding box and crs of the dataset. If no dataset is provided, it will be fetched according to the settings in the adapter. also see :py:meth:`hydromt.GeoDatasetAdapter.get_data`. the coordinates are in the CRS of the dataset itself, which is also returned alongside the coordinates. Parameters ---------- ds: xr.Dataset, xr.DataArray, Optional the dataset to detect the bounding box of. If none is provided, :py:meth:`hydromt.GeoDatasetAdapter.get_data` will be used to fetch the it before detecting. Returns ------- bbox: Tuple[np.float64,np.float64,np.float64,np.float64] the bounding box coordinates of the data. coordinates are returned as [xmin,ymin,xmax,ymax] crs: int The ESPG code of the CRS of the coordinates returned in bbox """ if ds is None: ds = self.get_data() crs = ds.vector.crs.to_epsg() bounds = ds.vector.bounds return bounds, crs
[docs] def detect_time_range( self, ds: Optional[Data] = None, ) -> Optional[TimeRange]: """Detect the temporal range of the dataset. If no dataset is provided, it will be fetched according to the settings in the adapter. also see :py:meth:`hydromt.GeoDatasetAdapter.get_data`. Parameters ---------- ds: xr.Dataset, xr.DataArray, Optional the dataset to detect the time range of. It must have a time dimentsion set. If none is provided, :py:meth:`hydromt.GeoDatasetAdapter.get_data` will be used to fetch the it before detecting. Returns ------- range: Tuple[np.datetime64, np.datetime64] A tuple containing the start and end of the time dimension. Range is inclusive on both sides. """ if ds is None: ds = self.get_data() return ( ds[ds.vector.time_dim].min().values, ds[ds.vector.time_dim].max().values, )
[docs] def to_stac_catalog( self, on_error: ErrorHandleMethod = ErrorHandleMethod.COERCE, ) -> Optional[StacCatalog]: """ Convert a geodataset into a STAC Catalog representation. The collection will contain an asset for each of the associated files. Parameters ---------- - on_error (str, optional): The error handling strategy. Options are: "raise" to raise an error on failure, "skip" to skip the dataset on failure, and "coerce" (default) to set default values on failure. Returns ------- - Optional[StacCatalog]: The STAC Catalog representation of the dataset, or None if the dataset was skipped. """ try: bbox, crs = self.get_bbox(detect=True) bbox = list(bbox) start_dt, end_dt = self.get_time_range(detect=True) start_dt = pd.to_datetime(start_dt) end_dt = pd.to_datetime(end_dt) props = {**self.meta, "crs": crs} ext = splitext(self.path)[-1] if ext in [".nc", ".vrt"]: media_type = MediaType.HDF5 else: raise RuntimeError( f"Unknown extention: {ext} cannot determine media type" ) except (IndexError, KeyError, CRSError) as e: if on_error == ErrorHandleMethod.SKIP: logger.warning( "Skipping {name} during stac conversion because" "because detecting spacial extent failed." ) return elif on_error == ErrorHandleMethod.COERCE: bbox = [0.0, 0.0, 0.0, 0.0] props = self.meta start_dt = datetime(1, 1, 1) end_dt = datetime(1, 1, 1) media_type = MediaType.JSON else: raise e stac_catalog = StacCatalog( self.name, description=self.name, ) stac_item = StacItem( self.name, geometry=None, bbox=bbox, properties=props, datetime=None, start_datetime=start_dt, end_datetime=end_dt, ) stac_asset = StacAsset(str(self.path), media_type=media_type) base_name = basename(self.path) stac_item.add_asset(base_name, stac_asset) stac_catalog.add_item(stac_item) return stac_catalog