Source code for hydromt.data_catalog.sources.rasterdataset

"""DataSource class for the RasterDataset type."""

from logging import Logger, getLogger
from os.path import basename, splitext
from typing import Any, ClassVar, Dict, List, Literal, Optional, Union, cast

import pandas as pd
import xarray as xr
from fsspec import filesystem
from pydantic import Field
from pyproj import CRS
from pyproj.exceptions import CRSError
from pystac import Asset as StacAsset
from pystac import Catalog as StacCatalog
from pystac import Item as StacItem
from pystac import MediaType

from hydromt._typing import (
    Bbox,
    Geom,
    NoDataStrategy,
    StrPath,
    TimeRange,
    TotalBounds,
    Zoom,
)
from hydromt.data_catalog.adapters.rasterdataset import RasterDatasetAdapter
from hydromt.data_catalog.drivers import RasterDatasetDriver
from hydromt.data_catalog.sources.data_source import DataSource
from hydromt.gis._gis_utils import _parse_geom_bbox_buffer

logger: Logger = getLogger(__name__)


[docs] class RasterDatasetSource(DataSource): """DataSource class for the RasterDataset type.""" data_type: ClassVar[Literal["RasterDataset"]] = "RasterDataset" _fallback_driver_read: ClassVar[str] = "rasterio" _fallback_driver_write: ClassVar[str] = "raster_xarray" driver: RasterDatasetDriver data_adapter: RasterDatasetAdapter = Field(default_factory=RasterDatasetAdapter)
[docs] def read_data( self, *, bbox: Optional[Bbox] = None, mask: Optional[Geom] = None, buffer: float = 0, variables: Optional[List[str]] = None, time_range: Optional[TimeRange] = None, zoom: Optional[Zoom] = None, single_var_as_array: bool = True, handle_nodata: NoDataStrategy = NoDataStrategy.RAISE, ) -> Union[xr.Dataset, xr.DataArray]: """ Read data from this source. Args: """ self._used = True if bbox is not None or (mask is not None and buffer > 0): mask = _parse_geom_bbox_buffer(mask, bbox, buffer) # Transform time_range and variables to match the data source tr = self.data_adapter._to_source_timerange(time_range) vrs = self.data_adapter._to_source_variables(variables) uris: List[str] = self.uri_resolver.resolve( self.full_uri, time_range=tr, mask=mask, variables=vrs, zoom=zoom, metadata=self.metadata, handle_nodata=handle_nodata, ) ds: xr.Dataset = self.driver.read( uris, mask=mask, time_range=tr, variables=vrs, zoom=zoom, metadata=self.metadata, handle_nodata=handle_nodata, ) return self.data_adapter.transform( ds, self.metadata, mask=mask, variables=variables, time_range=time_range, single_var_as_array=single_var_as_array, )
def to_file( self, file_path: StrPath, *, driver_override: Optional[RasterDatasetDriver] = None, bbox: Optional[Bbox] = None, mask: Optional[Geom] = None, buffer: float = 0.0, time_range: Optional[TimeRange] = None, zoom: Optional[Zoom] = None, handle_nodata: NoDataStrategy = NoDataStrategy.RAISE, **kwargs, ) -> "RasterDatasetSource": """ Write the RasterDatasetSource to a local file. args: """ if driver_override is None and not self.driver.supports_writing: # default to fallback driver driver: RasterDatasetDriver = RasterDatasetDriver.model_validate( self._fallback_driver_write ) elif driver_override: if not driver_override.supports_writing: raise RuntimeError( f"driver: '{driver_override.name}' does not support writing data." ) driver: RasterDatasetDriver = driver_override else: # use local filesystem driver: RasterDatasetDriver = self.driver.model_copy( update={"filesystem": filesystem("local")} ) ds: Optional[xr.Dataset] = self.read_data( bbox=bbox, mask=mask, buffer=buffer, time_range=time_range, zoom=zoom, handle_nodata=handle_nodata, ) if ds is None: # handle_nodata == ignore return None dest_path: str = driver.write( file_path, ds, **kwargs, ) # update driver based on local path update: Dict[str, Any] = {"uri": dest_path, "root": None, "driver": driver} return self.model_copy(update=update)
[docs] def get_bbox(self, crs: Optional[CRS] = None, detect: bool = True) -> TotalBounds: """Return the bounding box and espg code of the dataset. if the bounding box is not set and detect is True, :py:meth:`hydromt.RasterdatasetAdapter.detect_bbox` will be used to detect it. Parameters ---------- detect: bool, Optional whether to detect the bounding box if it is not set. If False, and it's not set None will be returned. Returns ------- bbox: Tuple[np.float64,np.float64,np.float64,np.float64] the bounding box coordinates of the data. coordinates are returned as [xmin,ymin,xmax,ymax] crs: int The ESPG code of the CRS of the coordinates returned in bbox """ bbox = self.metadata.extent.get("bbox", None) crs = cast(int, crs) if bbox is None and detect: bbox, crs = self.detect_bbox() return bbox, crs
[docs] def get_time_range( self, detect: bool = True, ) -> TimeRange: """Detect the time range of the dataset. if the time range is not set and detect is True, :py:meth:`hydromt.RasterdatasetAdapter.detect_time_range` will be used to detect it. Parameters ---------- detect: bool, Optional whether to detect the time range if it is not set. If False, and it's not set None will be returned. Returns ------- range: Tuple[np.datetime64, np.datetime64] A tuple containing the start and end of the time dimension. Range is inclusive on both sides. """ time_range = self.metadata.extent.get("time_range", None) if time_range is None and detect: time_range = self.detect_time_range() return time_range
[docs] def detect_bbox( self, ds: Optional[xr.Dataset] = None, ) -> TotalBounds: """Detect the bounding box and crs of the dataset. If no dataset is provided, it will be fetched according to the settings in the adapter. also see :py:meth:`hydromt.RasterdatasetAdapter.get_data`. the coordinates are in the CRS of the dataset itself, which is also returned alongside the coordinates. Parameters ---------- ds: xr.Dataset, xr.DataArray, Optional the dataset to detect the bounding box of. If none is provided, :py:meth:`hydromt.RasterdatasetAdapter.get_data` will be used to fetch the it before detecting. Returns ------- bbox: Tuple[np.float64,np.float64,np.float64,np.float64] the bounding box coordinates of the data. coordinates are returned as [xmin,ymin,xmax,ymax] crs: int The ESPG code of the CRS of the coordinates returned in bbox """ if ds is None: ds = self.read_data() crs = ds.raster.crs.to_epsg() bounds = ds.raster.bounds return bounds, crs
[docs] def detect_time_range( self, ds: Optional[xr.Dataset] = None, ) -> TimeRange: """Detect the temporal range of the dataset. If no dataset is provided, it will be fetched accodring to the settings in the addapter. also see :py:meth:`hydromt.RasterdatasetAdapter.get_data`. Parameters ---------- ds: xr.Dataset, xr.DataArray, Optional the dataset to detect the time range of. It must have a time dimentsion set. If none is provided, :py:meth:`hydromt.RasterdatasetAdapter.get_data` will be used to fetch the it before detecting. Returns ------- range: Tuple[np.datetime64, np.datetime64] A tuple containing the start and end of the time dimension. Range is inclusive on both sides. """ if ds is None: ds = self.read_data() return ( ds[ds.raster.time_dim].min().values, ds[ds.raster.time_dim].max().values, )
[docs] def to_stac_catalog( self, handle_nodata: NoDataStrategy = NoDataStrategy.IGNORE, ) -> Optional[StacCatalog]: """ Convert a rasterdataset into a STAC Catalog representation. The collection will contain an asset for each of the associated files. Parameters ---------- - handle_nodata (str, optional): The error handling strategy. Options are: "raise" to raise an error on failure, "ignore" to skip the dataset on failure Returns ------- - Optional[StacCatalog]: The STAC Catalog representation of the dataset, or None if the dataset was skipped. """ try: bbox, crs = self.get_bbox(detect=True) bbox = list(bbox) start_dt, end_dt = self.get_time_range(detect=True) start_dt = pd.to_datetime(start_dt) end_dt = pd.to_datetime(end_dt) props = {**self.metadata.model_dump(exclude_none=True), "crs": crs} ext = splitext(self.full_uri)[-1] if ext == ".nc" or ext == ".vrt": media_type = MediaType.HDF5 elif ext == ".tiff": media_type = MediaType.TIFF elif ext == ".cog": media_type = MediaType.COG elif ext == ".png": media_type = MediaType.PNG else: raise RuntimeError( f"Unknown extension: {ext} cannot determine media type" ) except (IndexError, KeyError, CRSError) as e: if handle_nodata == NoDataStrategy.IGNORE: logger.warning( "Skipping {name} during stac conversion because" "because detecting spacial extent failed." ) return else: raise e else: # else makes type checkers a bit happier stac_catalog = StacCatalog( self.name, description=self.name, ) stac_item = StacItem( self.name, geometry=None, bbox=list(bbox), properties=props, datetime=None, start_datetime=start_dt, end_datetime=end_dt, ) stac_asset = StacAsset(str(self.full_uri), media_type=media_type) base_name = basename(self.full_uri) stac_item.add_asset(base_name, stac_asset) stac_catalog.add_item(stac_item) return stac_catalog