Source code for hydromt.data_adapter.rasterdataset

"""Implementation for the RasterDatasetAdapter."""
from __future__ import annotations

import logging
import os
import warnings
from datetime import datetime
from os.path import basename, join, splitext
from typing import Dict, Optional, Tuple, Union, cast

import geopandas as gpd
import numpy as np
import pandas as pd
import pyproj
import rasterio
import xarray as xr
from pyproj.exceptions import CRSError
from pystac import Asset as StacAsset
from pystac import Catalog as StacCatalog
from pystac import Item as StacItem
from pystac import MediaType
from rasterio.errors import RasterioIOError

from hydromt.typing import (
    ErrorHandleMethod,
    RasterDatasetSource,
    TimeRange,
    TotalBounds,
)

from .. import gis_utils, io
from ..nodata import NoDataStrategy, _exec_nodata_strat
from ..raster import GEO_MAP_COORD
from .caching import cache_vrt_tiles
from .data_adapter import PREPROCESSORS, DataAdapter

logger = logging.getLogger(__name__)

__all__ = ["RasterDatasetAdapter", "RasterDatasetSource"]


[docs] class RasterDatasetAdapter(DataAdapter): """Implementation for the RasterDatasetAdapter.""" _DEFAULT_DRIVER = "raster" _DRIVERS = { "nc": "netcdf", "zarr": "zarr", }
[docs] def __init__( self, path: str, driver: Optional[str] = None, filesystem: Optional[str] = None, crs: Optional[Union[int, str, dict]] = None, nodata: Optional[Union[dict, float, int]] = None, rename: Optional[dict] = None, unit_mult: Optional[dict] = None, unit_add: Optional[dict] = None, meta: Optional[dict] = None, attrs: Optional[dict] = None, extent: Optional[dict] = None, driver_kwargs: Optional[dict] = None, storage_options: Optional[dict] = None, zoom_levels: Optional[dict] = None, name: str = "", # optional for now catalog_name: str = "", # optional for now provider: Optional[str] = None, version: Optional[str] = None, **kwargs, ): """Initiate data adapter for geospatial raster data. This object contains all properties required to read supported raster files into a single unified RasterDataset, i.e. :py:class:`xarray.Dataset` with geospatial attributes. In addition it keeps meta data to be able to reproduce which data is used. Parameters ---------- path: str, Path Path to data source. If the dataset consists of multiple files, the path may contain {variable}, {year}, {month} placeholders as well as path search pattern using a ``*`` wildcard. driver: {'raster', 'netcdf', 'zarr', 'raster_tindex'}, optional Driver to read files with, for 'raster' :py:func:`~hydromt.io.open_mfraster`, for 'netcdf' :py:func:`xarray.open_mfdataset`, and for 'zarr' :py:func:`xarray.open_zarr` By default the driver is inferred from the file extension and falls back to 'raster' if unknown. filesystem: str, optional Filesystem where the data is stored (local, cloud, http etc.). If None (default) the filesystem is inferred from the path. See :py:func:`fsspec.registry.known_implementations` for all options. crs: int, dict, or str, optional Coordinate Reference System. Accepts EPSG codes (int or str); proj (str or dict) or wkt (str). Only used if the data has no native CRS. nodata: float, int, optional Missing value number. Only used if the data has no native missing value. Nodata values can be differentiated between variables using a dictionary. rename: dict, optional Mapping of native data source variable to output source variable name as required by hydroMT. unit_mult, unit_add: dict, optional Scaling multiplication and addition to change to map from the native data unit to the output data unit as required by hydroMT. meta: dict, optional Metadata information of dataset, prefably containing the following keys: - 'source_version' - 'source_url' - 'source_license' - 'paper_ref' - 'paper_doi' - 'category' placeholders: dict, optional Placeholders to expand yaml entry to multiple entries (name and path) based on placeholder values attrs: dict, optional Additional attributes relating to data variables. For instance unit or long name of the variable. extent: Extent(typed dict), Optional Dictionary describing the spatial and time range the dataset covers. should be of the form: - "bbox": [xmin, ymin, xmax, ymax], - "time_range": [start_datetime, end_datetime], data, and time_range should be inclusive on both sides. driver_kwargs, dict, optional Additional key-word arguments passed to the driver. storage_options: dict, optional Additional key-word arguments passed to the fsspec FileSystem object. zoomlevels: dict, optional Dictionary with zoom levels and associated resolution in the unit of the data CRS. name, catalog_name: str, optional Name of the dataset and catalog, optional. provider: str, optional A name to identifiy the specific provider of the dataset requested. if None is provided, the last added source will be used. version: str, optional A name to identifiy the specific version of the dataset requested. if None is provided, the last added source will be used. """ driver_kwargs = driver_kwargs or {} extent = extent or {} if kwargs: warnings.warn( "Passing additional keyword arguments to be used by the " "RasterDatasetAdapter driver is deprecated and will be removed " "in a future version. Please use 'driver_kwargs' instead.", DeprecationWarning, stacklevel=2, ) driver_kwargs.update(kwargs) super().__init__( path=path, driver=driver, filesystem=filesystem, nodata=nodata, rename=rename, unit_mult=unit_mult, unit_add=unit_add, meta=meta, attrs=attrs, driver_kwargs=driver_kwargs, storage_options=storage_options, name=name, catalog_name=catalog_name, provider=provider, version=version, ) self.crs = crs # should be None or non-empty dict when initialized self.zoom_levels = zoom_levels self.extent = extent
[docs] def to_file( self, data_root, data_name, bbox=None, time_tuple=None, driver=None, variables=None, logger=logger, **kwargs, ): """Save a data slice to file. Parameters ---------- data_root : str, Path Path to output folder data_name : str Name of output file without extension. bbox : array-like of floats (xmin, ymin, xmax, ymax) bounding box of area of interest. time_tuple : tuple of str, datetime, optional Start and end date of period of interest. By default the entire time period of the dataset is returned. driver : str, optional Driver to write file, e.g.: 'netcdf', 'zarr' or any gdal data type, by default None variables : list of str, optional Names of GeoDataset variables to return. By default all dataset variables are returned. **kwargs Additional keyword arguments that are passed to the `to_netcdf` function. Returns ------- fn_out: str Absolute path to output file driver: str Name of driver to read data with, see :py:func:`~hydromt.data_catalog.DataCatalog.get_rasterdataset` kwargs: dict the additional kwyeord arguments that were passed to `to_netcdf` """ obj = self.get_data( bbox=bbox, time_tuple=time_tuple, variables=variables, logger=logger, single_var_as_array=variables is None, ) read_kwargs = {} if driver is None: # by default write 2D raster data to GeoTiff and 3D raster data to netcdf driver = "netcdf" if len(obj.dims) == 3 else "GTiff" # write using various writers if driver == "netcdf": dvars = [obj.name] if isinstance(obj, xr.DataArray) else obj.raster.vars if variables is None: encoding = {k: {"zlib": True} for k in dvars} fn_out = join(data_root, f"{data_name}.nc") obj.to_netcdf(fn_out, encoding=encoding, **kwargs) else: # save per variable if not os.path.isdir(join(data_root, data_name)): os.makedirs(join(data_root, data_name)) for var in dvars: fn_out = join(data_root, data_name, f"{var}.nc") obj[var].to_netcdf(fn_out, encoding={var: {"zlib": True}}, **kwargs) fn_out = join(data_root, data_name, "{variable}.nc") elif driver == "zarr": fn_out = join(data_root, f"{data_name}.zarr") obj.to_zarr(fn_out, **kwargs) elif driver not in gis_utils.GDAL_DRIVER_CODE_MAP.values(): raise ValueError(f"RasterDataset: Driver {driver} unknown.") else: ext = gis_utils.GDAL_EXT_CODE_MAP.get(driver) if driver == "GTiff" and "compress" not in kwargs: kwargs.update(compress="lzw") # default lzw compression if isinstance(obj, xr.DataArray): fn_out = join(data_root, f"{data_name}.{ext}") obj.raster.to_raster(fn_out, driver=driver, **kwargs) else: fn_out = join(data_root, data_name, "{variable}" + f".{ext}") obj.raster.to_mapstack( join(data_root, data_name), driver=driver, **kwargs ) driver = "raster" return fn_out, driver, read_kwargs
[docs] def get_data( self, bbox=None, geom=None, buffer=0, zoom_level=None, align=None, variables=None, time_tuple=None, single_var_as_array=True, cache_root=None, logger=logger, ): """Return a clipped, sliced and unified RasterDataset. For a detailed description see: :py:func:`~hydromt.data_catalog.DataCatalog.get_rasterdataset` """ # load data fns = self._resolve_paths(time_tuple, variables, zoom_level, geom, bbox, logger) self.mark_as_used() # mark used ds = self._read_data(fns, geom, bbox, cache_root, zoom_level, logger) # rename variables and parse data and attrs ds = self._rename_vars(ds) ds = self._validate_spatial_dims(ds) ds = self._set_crs(ds, logger) ds = self._set_nodata(ds) ds = self._shift_time(ds, logger) # slice data ds = RasterDatasetAdapter._slice_data( ds, variables, geom, bbox, buffer, align, time_tuple, logger=logger ) # uniformize data ds = self._apply_unit_conversions(ds, logger) ds = self._set_metadata(ds) # return array if single var and single_var_as_array return self._single_var_as_array(ds, single_var_as_array, variables)
def _resolve_paths( self, time_tuple: Optional[tuple] = None, variables: Optional[list] = None, zoom_level: Optional[int] = 0, geom: Optional[gpd.GeoSeries] = None, bbox: Optional[list] = None, logger=logger, ): if zoom_level is not None and "{zoom_level}" in self.path: zoom_level = self._parse_zoom_level(zoom_level, geom, bbox, logger=logger) # resolve path based on time, zoom level and/or variables fns = super()._resolve_paths( time_tuple=time_tuple, variables=variables, zoom_level=zoom_level, ) return fns def _read_data(self, fns, geom, bbox, cache_root, zoom_level=None, logger=logger): kwargs = self.driver_kwargs.copy() # read using various readers logger.info(f"Reading {self.name} {self.driver} data from {self.path}") if self.driver == "netcdf": if "preprocess" in kwargs: preprocess = PREPROCESSORS.get(kwargs["preprocess"], None) kwargs.update(preprocess=preprocess) ds = xr.open_mfdataset(fns, decode_coords="all", **kwargs) elif self.driver == "zarr": preprocess = None if "preprocess" in kwargs: # for zarr preprocess is done after reading preprocess = PREPROCESSORS.get(kwargs.pop("preprocess"), None) ds_lst = [] for fn in fns: ds = xr.open_zarr(fn, **kwargs) if preprocess: ds = preprocess(ds) # type: ignore ds_lst.append(ds) ds = xr.merge(ds_lst) elif self.driver == "raster_tindex": if np.issubdtype(type(self.nodata), np.number): kwargs.update(nodata=self.nodata) ds = io.open_raster_from_tindex(fns[0], bbox=bbox, geom=geom, **kwargs) elif self.driver == "raster": # rasterio files if cache_root is not None and all([str(fn).endswith(".vrt") for fn in fns]): cache_dir = join(cache_root, self.catalog_name, self.name) fns_cached = [] for fn in fns: fn1 = cache_vrt_tiles( fn, geom=geom, cache_dir=cache_dir, logger=logger ) fns_cached.append(fn1) fns = fns_cached if np.issubdtype(type(self.nodata), np.number): kwargs.update(nodata=self.nodata) if zoom_level is not None and "{zoom_level}" not in self.path: zls_dict, crs = self._get_zoom_levels_and_crs(fns[0], logger=logger) zoom_level = self._parse_zoom_level( zoom_level, geom, bbox, zls_dict, crs, logger=logger ) if isinstance(zoom_level, int): kwargs.update(overview_level=zoom_level) ds = io.open_mfraster(fns, logger=logger, **kwargs) else: raise ValueError(f"RasterDataset: Driver {self.driver} unknown") return ds def _rename_vars(self, ds): rm = {k: v for k, v in self.rename.items() if k in ds} ds = ds.rename(rm) return ds def _validate_spatial_dims(self, ds): if GEO_MAP_COORD in ds.data_vars: ds = ds.set_coords(GEO_MAP_COORD) try: ds.raster.set_spatial_dims() # transpose dims to get y and x dim last x_dim = ds.raster.x_dim y_dim = ds.raster.y_dim ds = ds.transpose(..., y_dim, x_dim) except ValueError: raise ValueError( f"RasterDataset: No valid spatial coords found in data {self.path}" ) return ds def _set_crs(self, ds, logger=logger): # set crs if ds.raster.crs is None and self.crs is not None: ds.raster.set_crs(self.crs) elif ds.raster.crs is None: raise ValueError( f"RasterDataset {self.name}: CRS not defined in data catalog or data." ) elif self.crs is not None and ds.raster.crs != pyproj.CRS.from_user_input( self.crs ): logger.warning( f"RasterDataset {self.name}: CRS from data catalog does not match CRS " " of data. The original CRS will be used. Please check your catalog." ) return ds @staticmethod def _slice_data( ds, variables=None, geom=None, bbox=None, buffer=0, align=None, time_tuple=None, handle_nodata=NoDataStrategy.RAISE, logger=logger, ): """Return a RasterDataset sliced in both spatial and temporal dimensions. Arguments --------- ds : xarray.Dataset or xarray.DataArray The RasterDataset to slice. variables : list of str, optional Names of variables to return. By default all dataset variables geom : geopandas.GeoDataFrame/Series, optional A geometry defining the area of interest. bbox : array-like of floats, optional (xmin, ymin, xmax, ymax) bounding box of area of interest (in WGS84 coordinates). buffer : int, optional Buffer around the `bbox` or `geom` area of interest in pixels. By default 0. align : float, optional Resolution to align the bounding box, by default None time_tuple : Tuple of datetime, optional A tuple consisting of the lower and upper bounds of time that the result should contain handle_nodata: NoDataStrategy, optional How to handle no data values, by default NoDataStrategy.RAISE Returns ------- ds : xarray.Dataset The sliced RasterDataset. """ if isinstance(ds, xr.DataArray): if ds.name is None: # dummy name, required to create dataset # renamed to variable in _single_var_as_array ds.name = "data" ds = ds.to_dataset() elif variables is not None: variables = np.atleast_1d(variables).tolist() if len(variables) > 1 or len(ds.data_vars) > 1: mvars = [var not in ds.data_vars for var in variables] if any(mvars): raise ValueError(f"RasterDataset: variables not found {mvars}") ds = ds[variables] if time_tuple is not None: ds = RasterDatasetAdapter._slice_temporal_dimension( ds, time_tuple, handle_nodata, logger=logger, ) if geom is not None or bbox is not None: ds = RasterDatasetAdapter._slice_spatial_dimensions( ds, geom, bbox, buffer, align, handle_nodata, logger=logger, ) return ds def _shift_time(self, ds, logger=logger): dt = self.unit_add.get("time", 0) if ( dt != 0 and "time" in ds.dims and ds["time"].size > 1 and np.issubdtype(ds["time"].dtype, np.datetime64) ): logger.debug(f"Shifting time labels with {dt} sec.") ds["time"] = ds["time"] + pd.to_timedelta(dt, unit="s") elif dt != 0: logger.warning("Time shift not applied, time dimension not found.") return ds @staticmethod def _slice_temporal_dimension( ds, time_tuple, handle_nodata=NoDataStrategy.RAISE, logger=logger ): if ( "time" in ds.dims and ds["time"].size > 1 and np.issubdtype(ds["time"].dtype, np.datetime64) ): if time_tuple is not None: logger.debug(f"Slicing time dim {time_tuple}") ds = ds.sel({"time": slice(*time_tuple)}) if ds.time.size == 0: _exec_nodata_strat( "Time slice out of range.", handle_nodata, logger ) return ds @staticmethod def _slice_spatial_dimensions( ds, geom, bbox, buffer, align, handle_nodata=NoDataStrategy.RAISE, logger=logger, ): # make sure bbox is in data crs crs = ds.raster.crs epsg = crs.to_epsg() # this could return None if geom is not None: bbox = geom.to_crs(crs).total_bounds elif epsg != 4326 and bbox is not None: crs4326 = pyproj.CRS.from_epsg(4326) bbox = rasterio.warp.transform_bounds(crs4326, crs, *bbox) # work with 4326 data that is defined at 0-360 degrees longtitude w, _, e, _ = ds.raster.bounds if epsg == 4326 and np.isclose(e - w, 360): # allow for rounding errors ds = gis_utils.meridian_offset(ds, bbox) # clip with bbox if bbox is not None: bbox_str = ", ".join([f"{c:.3f}" for c in bbox]) logger.debug(f"Clip to [{bbox_str}] (epsg:{epsg}))") ds = ds.raster.clip_bbox(bbox, buffer=buffer, align=align) if np.any(np.array(ds.raster.shape) < 2): _exec_nodata_strat( "RasterDataset: No data within spatial domain", handle_nodata, logger, ) return ds def _apply_unit_conversions(self, ds, logger=logger): unit_names = list(self.unit_mult.keys()) + list(self.unit_add.keys()) unit_names = [k for k in unit_names if k in ds.data_vars] if len(unit_names) > 0: logger.debug(f"Convert units for {len(unit_names)} variables.") for name in list(set(unit_names)): # unique m = self.unit_mult.get(name, 1) a = self.unit_add.get(name, 0) da = ds[name] attrs = da.attrs.copy() nodata_isnan = da.raster.nodata is None or np.isnan(da.raster.nodata) # nodata value is explicitly set to NaN in case no nodata value is provided nodata = np.nan if nodata_isnan else da.raster.nodata data_bool = ~np.isnan(da) if nodata_isnan else da != nodata ds[name] = xr.where(data_bool, da * m + a, nodata) ds[name].attrs.update(attrs) # set original attributes ds[name].raster.set_nodata(nodata) # reset nodata in case of change return ds def _set_nodata(self, ds): # set nodata value if self.nodata is not None: if not isinstance(self.nodata, dict): nodata = {k: self.nodata for k in ds.data_vars.keys()} else: nodata = self.nodata for k in ds.data_vars: mv = nodata.get(k, None) if mv is not None and ds[k].raster.nodata is None: ds[k].raster.set_nodata(mv) return ds def _set_metadata(self, ds): # unit attributes for k in self.attrs: ds[k].attrs.update(self.attrs[k]) # set meta data ds.attrs.update(self.meta) return ds def _get_zoom_levels_and_crs(self, fn=None, logger=logger): """Get zoom levels and crs from adapter or detect from tif file if missing.""" if self.zoom_levels is not None and self.crs is not None: return self.zoom_levels, self.crs zoom_levels = {} crs = None if fn is None: fn = self.path try: with rasterio.open(fn) as src: res = abs(src.res[0]) crs = src.crs overviews = [src.overviews(i) for i in src.indexes] if len(overviews[0]) > 0: # check overviews for band 0 # check if identical if not all([o == overviews[0] for o in overviews]): raise ValueError("Overviews are not identical across bands") # dict with overview level and corresponding resolution zls = [1] + overviews[0] zoom_levels = {i: res * zl for i, zl in enumerate(zls)} except RasterioIOError as e: logger.warning(f"IO error while detecting zoom levels: {e}") self.zoom_levels = zoom_levels if self.crs is None: self.crs = crs return zoom_levels, crs def _parse_zoom_level( self, zoom_level: Union[int, Tuple[Union[int, float], str]], geom: Optional[gpd.GeoSeries] = None, bbox: Optional[list] = None, zls_dict: Optional[Dict[int, float]] = None, dst_crs: pyproj.CRS = None, logger=logger, ) -> Optional[int]: """Return overview level of data corresponding to zoom level. Parameters ---------- zoom_level: int or tuple overview level or tuple with resolution and unit geom: gpd.GeoSeries, optional geometry to determine res if zoom_level or source in degree bbox: list, optional bbox to determine res if zoom_level or source in degree zls_dict: dict, optional dictionary with overview levels and corresponding resolution dst_crs: pyproj.CRS, optional destination crs to determine res if zoom_level tuple is provided with different unit than dst_crs """ # check zoom level zls_dict = self.zoom_levels if zls_dict is None else zls_dict dst_crs = self.crs if dst_crs is None else dst_crs if zls_dict is None or len(zls_dict) == 0 or zoom_level is None: return None elif isinstance(zoom_level, int): if zoom_level not in zls_dict: raise ValueError( f"Zoom level {zoom_level} not defined." f"Select from {list(zls_dict.keys())}." ) zl = zoom_level dst_res = zls_dict[zoom_level] elif ( isinstance(zoom_level, tuple) and isinstance(zoom_level[0], (int, float)) and isinstance(zoom_level[1], str) and len(zoom_level) == 2 and dst_crs is not None ): src_res, src_res_unit = zoom_level # convert res if different unit than crs dst_crs = pyproj.CRS.from_user_input(dst_crs) dst_crs_unit = dst_crs.axis_info[0].unit_name dst_res = src_res if dst_crs_unit != src_res_unit: known_units = ["degree", "metre", "US survey foot", "meter", "foot"] if src_res_unit not in known_units: raise TypeError( f"zoom_level unit {src_res_unit} not understood;" f" should be one of {known_units}" ) if dst_crs_unit not in known_units: raise NotImplementedError( f"no conversion available for {src_res_unit} to {dst_crs_unit}" ) conversions = { "foot": 0.3048, "metre": 1, # official pyproj units "US survey foot": 0.3048, # official pyproj units } # to meter if src_res_unit == "degree" or dst_crs_unit == "degree": lat = 0 if bbox is not None: lat = (bbox[1] + bbox[3]) / 2 elif geom is not None: lat = geom.to_crs(4326).centroid.y.item() conversions["degree"] = gis_utils.cellres(lat=lat)[1] fsrc = conversions.get(src_res_unit, 1) fdst = conversions.get(dst_crs_unit, 1) dst_res = src_res * fsrc / fdst # find nearest zoom level eps = 1e-5 # allow for rounding errors zls = list(zls_dict.keys()) smaller = [x < (dst_res + eps) for x in zls_dict.values()] zl = zls[-1] if all(smaller) else zls[max(smaller.index(False) - 1, 0)] elif dst_crs is None: raise ValueError("No CRS defined, hence no zoom level can be determined.") else: raise TypeError(f"zoom_level not understood: {type(zoom_level)}") logger.debug(f"Parsed zoom_level {zl} ({dst_res:.2f})") return zl
[docs] def get_bbox(self, detect=True) -> TotalBounds: """Return the bounding box and espg code of the dataset. if the bounding box is not set and detect is True, :py:meth:`hydromt.RasterdatasetAdapter.detect_bbox` will be used to detect it. Parameters ---------- detect: bool, Optional whether to detect the bounding box if it is not set. If False, and it's not set None will be returned. Returns ------- bbox: Tuple[np.float64,np.float64,np.float64,np.float64] the bounding box coordinates of the data. coordinates are returned as [xmin,ymin,xmax,ymax] crs: int The ESPG code of the CRS of the coordinates returned in bbox """ bbox = self.extent.get("bbox", None) crs = cast(int, self.crs) if bbox is None and detect: bbox, crs = self.detect_bbox() return bbox, crs
[docs] def get_time_range(self, detect=True) -> TimeRange: """Detect the time range of the dataset. if the time range is not set and detect is True, :py:meth:`hydromt.RasterdatasetAdapter.detect_time_range` will be used to detect it. Parameters ---------- detect: bool, Optional whether to detect the time range if it is not set. If False, and it's not set None will be returned. Returns ------- range: Tuple[np.datetime64, np.datetime64] A tuple containing the start and end of the time dimension. Range is inclusive on both sides. """ time_range = self.extent.get("time_range", None) if time_range is None and detect: time_range = self.detect_time_range() return time_range
[docs] def detect_bbox( self, ds=None, ) -> TotalBounds: """Detect the bounding box and crs of the dataset. If no dataset is provided, it will be fetched according to the settings in the adapter. also see :py:meth:`hydromt.RasterdatasetAdapter.get_data`. the coordinates are in the CRS of the dataset itself, which is also returned alongside the coordinates. Parameters ---------- ds: xr.Dataset, xr.DataArray, Optional the dataset to detect the bounding box of. If none is provided, :py:meth:`hydromt.RasterdatasetAdapter.get_data` will be used to fetch the it before detecting. Returns ------- bbox: Tuple[np.float64,np.float64,np.float64,np.float64] the bounding box coordinates of the data. coordinates are returned as [xmin,ymin,xmax,ymax] crs: int The ESPG code of the CRS of the coordinates returned in bbox """ if ds is None: ds = self.get_data() crs = ds.raster.crs.to_epsg() bounds = ds.raster.bounds return bounds, crs
[docs] def detect_time_range(self, ds=None) -> TimeRange: """Detect the temporal range of the dataset. If no dataset is provided, it will be fetched accodring to the settings in the addapter. also see :py:meth:`hydromt.RasterdatasetAdapter.get_data`. Parameters ---------- ds: xr.Dataset, xr.DataArray, Optional the dataset to detect the time range of. It must have a time dimentsion set. If none is provided, :py:meth:`hydromt.RasterdatasetAdapter.get_data` will be used to fetch the it before detecting. Returns ------- range: Tuple[np.datetime64, np.datetime64] A tuple containing the start and end of the time dimension. Range is inclusive on both sides. """ if ds is None: ds = self.get_data() return ( ds[ds.raster.time_dim].min().values, ds[ds.raster.time_dim].max().values, )
def to_stac_catalog( self, on_error: ErrorHandleMethod = ErrorHandleMethod.COERCE, ) -> Optional[StacCatalog]: """ Convert a rasterdataset into a STAC Catalog representation. The collection will contain an asset for each of the associated files. Parameters ---------- - on_error (str, optional): The error handling strategy. Options are: "raise" to raise an error on failure, "skip" to skip the dataset on failure, and "coerce" (default) to set default values on failure. Returns ------- - Optional[StacCatalog]: The STAC Catalog representation of the dataset, or None if the dataset was skipped. """ try: bbox, crs = self.get_bbox(detect=True) bbox = list(bbox) start_dt, end_dt = self.get_time_range(detect=True) start_dt = pd.to_datetime(start_dt) end_dt = pd.to_datetime(end_dt) props = {**self.meta, "crs": crs} ext = splitext(self.path)[-1] match ext: case ".nc" | ".vrt": media_type = MediaType.HDF5 case ".tiff": media_type = MediaType.TIFF case ".cog": media_type = MediaType.COG case ".png": media_type = MediaType.PNG case _: raise RuntimeError( f"Unknown extention: {ext} cannot determine media type" ) except (IndexError, KeyError, CRSError) as e: if on_error == ErrorHandleMethod.SKIP: logger.warning( "Skipping {name} during stac conversion because" "because detecting spacial extent failed." ) return elif on_error == ErrorHandleMethod.COERCE: bbox = [0.0, 0.0, 0.0, 0.0] props = self.meta start_dt = datetime(1, 1, 1) end_dt = datetime(1, 1, 1) media_type = MediaType.JSON else: raise e else: # else makes type checkers a bit happier stac_catalog = StacCatalog( self.name, description=self.name, ) stac_item = StacItem( self.name, geometry=None, bbox=list(bbox), properties=props, datetime=None, start_datetime=start_dt, end_datetime=end_dt, ) stac_asset = StacAsset(str(self.path), media_type=media_type) base_name = basename(self.path) stac_item.add_asset(base_name, stac_asset) stac_catalog.add_item(stac_item) return stac_catalog