Source code for hydromt.data_catalog

#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""DataCatalog module for HydroMT."""

from __future__ import annotations

import copy
import itertools
import logging
import os
import warnings
from datetime import datetime
from os.path import abspath, basename, isfile, join
from pathlib import Path
from typing import (
    Dict,
    Iterator,
    List,
    Optional,
    Tuple,
    Union,
    cast,
)

import geopandas as gpd
import numpy as np
import pandas as pd
import pooch
import requests
import xarray as xr
import yaml
from packaging.specifiers import SpecifierSet
from packaging.version import Version
from pystac import Catalog as StacCatalog
from pystac import CatalogType, MediaType

from hydromt.nodata import NoDataException, NoDataStrategy, _exec_nodata_strat
from hydromt.predefined_catalog import (
    PREDEFINED_CATALOGS,
    PredefinedCatalog,
    _copy_file,
)
from hydromt.typing import Bbox, ErrorHandleMethod, SourceSpecDict, TimeRange
from hydromt.utils import partition_dictionaries

from .data_adapter import (
    DataAdapter,
    DataFrameAdapter,
    DatasetAdapter,
    GeoDataFrameAdapter,
    GeoDatasetAdapter,
    RasterDatasetAdapter,
)
from .data_adapter.caching import HYDROMT_DATADIR, _uri_validator

logger = logging.getLogger(__name__)

__all__ = [
    "DataCatalog",
]

# just for typehints


[docs] class DataCatalog(object): """Base class for the data catalog object.""" _format_version = "v0" # format version of the data catalog _cache_dir = HYDROMT_DATADIR
[docs] def __init__( self, data_libs: Optional[Union[List, str]] = None, fallback_lib: Optional[str] = "artifact_data", logger=logger, cache: Optional[bool] = False, cache_dir: Optional[str] = None, **artifact_keys, ) -> None: """Catalog of DataAdapter sources. Helps to easily read from different files and keep track of files which have been accessed. Arguments --------- data_libs: (list of) str, Path, optional One or more paths to data catalog configuration files or names of predefined data catalogs. By default the data catalog is initiated without data entries. See :py:func:`~hydromt.data_adapter.DataCatalog.from_yml` for accepted yaml format. fallback_lib: Name of pre-defined data catalog to read if no data_libs are provided, by default 'artifact_data'. If None, no default data catalog is used. cache: bool, optional Set to true to cache data locally before reading. Currently only implemented for tiled rasterdatasets, by default False. cache_dir: str, Path, optional Folder root path to cach data to, by default ~/.hydromt_data artifact_keys: Deprecated from version v0.5 logger : logger object, optional The logger object used for logging messages. If not provided, the default logger will be used. """ if data_libs is None: data_libs = [] elif not isinstance(data_libs, list): # make sure data_libs is a list data_libs = np.atleast_1d(data_libs).tolist() self._sources = {} # dictionary of DataAdapter self._catalogs: Dict[str, PredefinedCatalog] = {} self._fallback_lib = fallback_lib self.logger = logger # caching self.cache = bool(cache) if cache_dir is not None: self._cache_dir = cache_dir # legacy code. to be removed for lib, version in artifact_keys.items(): warnings.warn( "Adding a predefined data catalog as key-word argument is deprecated, " f"add the catalog as '{lib}={version}'" " to the data_libs list instead.", DeprecationWarning, stacklevel=2, ) if not version: # False or None continue elif isinstance(version, str): lib += f"={version}" data_libs = [lib] + data_libs # parse data catalogs; both user and pre-defined for name_or_path in data_libs: if str(name_or_path).split(".")[-1] in ["yml", "yaml"]: # user defined self.from_yml(name_or_path) else: # predefined self.from_predefined_catalogs(name_or_path)
@property def sources(self) -> Dict: """Returns dictionary of DataAdapter sources.""" if len(self._sources) == 0 and self._fallback_lib is not None: # read artifacts by default if no catalogs are provided self.from_predefined_catalogs(self._fallback_lib) return self._sources @property def keys(self) -> List[str]: """Returns list of data source names.""" warnings.warn( "Using iterating over the DataCatalog directly is deprecated." "Please use cat.get_source()", DeprecationWarning, stacklevel=2, ) return list(self._sources.keys()) def get_source_names(self) -> List[str]: """Return a list of all available data source names.""" return list(self._sources.keys()) def to_stac_catalog( self, root: Union[str, Path], source_names: Optional[List] = None, meta: Optional[Dict] = None, catalog_name: str = "hydromt-stac-catalog", description: str = "The stac catalog of hydromt", used_only: bool = False, catalog_type: CatalogType = CatalogType.RELATIVE_PUBLISHED, on_error: ErrorHandleMethod = ErrorHandleMethod.COERCE, ): """Write data catalog to STAC format. Parameters ---------- path: str, Path stac output path. root: str, Path, optional Global root for all relative paths in yaml file. If "auto" (default) the data source paths are relative to the yaml output ``path``. source_names: list, optional List of source names to export, by default None in which case all sources are exported. This argument is ignored if `used_only=True`. used_only: bool, optional If True, export only data entries kept in used_data list, by default False. meta: dict, optional key-value pairs to add to the data catalog meta section, such as 'version', by default empty. """ meta = meta or {} stac_catalog = StacCatalog(id=catalog_name, description=description) for _name, source in self.iter_sources(used_only): stac_child_catalog = source.to_stac_catalog(on_error) if stac_child_catalog: stac_catalog.add_child(stac_child_catalog) stac_catalog.normalize_and_save(root, catalog_type=catalog_type) return stac_catalog def from_stac_catalog( self, stac_like: Union[str, Path, StacCatalog, dict], on_error: ErrorHandleMethod = ErrorHandleMethod.SKIP, ): """Write data catalog to STAC format. Parameters ---------- path: str, Path stac path. on_error: ErrorHandleMethod What to do on error when converting from STAC """ if isinstance(stac_like, (str, Path)): stac_catalog = StacCatalog.from_file(stac_like) elif isinstance(stac_like, dict): stac_catalog = StacCatalog.from_dict(stac_like) elif isinstance(stac_like, StacCatalog): stac_catalog = stac_like else: raise ValueError( f"Unsupported type for stac_like: {type(stac_like).__name__}" ) for item in stac_catalog.get_items(recursive=True): source_name = item.id for _asset_name, asset in item.get_assets().items(): if asset.media_type in [ MediaType.HDF, MediaType.HDF5, MediaType.COG, MediaType.TIFF, ]: adapter_kind = RasterDatasetAdapter elif asset.media_type in [MediaType.GEOPACKAGE, MediaType.FLATGEOBUF]: adapter_kind = GeoDataFrameAdapter elif asset.media_type == MediaType.GEOJSON: adapter_kind = GeoDatasetAdapter elif asset.media_type == MediaType.JSON: adapter_kind = DataFrameAdapter else: continue adapter = adapter_kind(str(asset.get_absolute_href())) self.add_source(source_name, adapter) return self @property def predefined_catalogs(self) -> Dict: """Return all predefined catalogs.""" if not self._catalogs: self._set_predefined_catalogs() return self._catalogs
[docs] def get_source_bbox( self, source: str, provider: Optional[str] = None, version: Optional[str] = None, detect: bool = True, strict: bool = False, ) -> Optional[Tuple[Tuple[float, float, float, float], int]]: """Retrieve the bounding box and crs of the source. Parameters ---------- source: str, the name of the data source. provider: Optional[str] the provider of the source to detect the bbox of, if None, the last one added will be used. version: Optional[str] the version of the source to detect the bbox of, if None, the last one added will be used. detect: bool Whether to detect the bbox of the source if it is not set. strict: bool Raise an error if the adapter does not support bbox detection (such as dataframes). In that case, a warning will be logged instead. Returns ------- bbox: Tuple[np.float64,np.float64,np.float64,np.float64] the bounding box coordinates of the data. coordinates are returned as [xmin,ymin,xmax,ymax] crs: int The ESPG code of the CRS of the coordinates returned in bbox """ s = self.get_source(source, provider, version) try: return s.get_bbox(detect=detect) # type: ignore except TypeError as e: if strict: raise e else: self.logger.warning( f"Source of type {type(s)} does not support detecting spatial" "extents. skipping..." )
[docs] def get_source_time_range( self, source: str, provider: Optional[str] = None, version: Optional[str] = None, detect: bool = True, strict: bool = False, ) -> Optional[Tuple[datetime, datetime]]: """Detect the temporal range of the dataset. Parameters ---------- source: str, the name of the data source. provider: Optional[str] the provider of the source to detect the time range of, if None, the last one added will be used. version: Optional[str] the version of the source to detect the time range of, if None, the last one added will be used. detect: bool Whether to detect the time range of the source if it is not set. strict: bool Raise an error if the adapter does not support time range detection (such as dataframes). In that case, a warning will be logged instead. Returns ------- range: Tuple[np.datetime64, np.datetime64] A tuple containing the start and end of the time dimension. Range is inclusive on both sides. """ s = self.get_source(source, provider, version) try: return s.get_time_range(detect=detect) # type: ignore except TypeError as e: if strict: raise e else: self.logger.warning( f"Source of type {type(s)} does not support detecting" " temporalextents. skipping..." )
[docs] def get_source( self, source: str, provider: Optional[str] = None, version: Optional[str] = None, ) -> DataAdapter: """Return a data source. Parameters ---------- source : str Name of the data source. provider : str, optional Name of the data provider, by default None. By default the last added provider is returned. version : str, optional Version of the data source, by default None. By default the newest version of the requested provider is returned. Returns ------- DataAdapter DataAdapter object. """ source = str(source) if source not in self._sources: available_sources = sorted(list(self._sources.keys())) raise KeyError( f"Requested unknown data source '{source}' " f"available sources are: {available_sources}" ) available_providers = self._sources[source] # make sure all arguments are strings provider = str(provider) if provider is not None else provider version = str(version) if version is not None else version # find provider matching requested version if provider is None and version is not None: providers = [p for p, v in available_providers.items() if version in v] if len(providers) > 0: # error raised later if no provider found provider = providers[-1] # check if provider is available, otherwise use last added provider if provider is None: requested_provider = list(available_providers.keys())[-1] else: requested_provider = provider if requested_provider not in available_providers: providers = sorted(list(available_providers.keys())) raise KeyError( f"Requested unknown provider '{requested_provider}' for " f"data source '{source}' available providers are: {providers}" ) available_versions = available_providers[requested_provider] # check if version is available, otherwise use last added version which is # always the newest version if version is None: requested_version = list(available_versions.keys())[-1] else: requested_version = version if requested_version not in available_versions: versions = sorted(list(map(str, available_versions.keys()))) raise KeyError( f"Requested unknown version '{requested_version}' for " f"data source '{source}' and provider '{requested_provider}' " f"available versions are {versions}" ) return self._sources[source][requested_provider][requested_version]
[docs] def add_source(self, source: str, adapter: DataAdapter) -> None: """Add a new data source to the data catalog. The data version and provider are extracted from the DataAdapter object. Parameters ---------- source : str Name of the data source. adapter : DataAdapter DataAdapter object. """ if not isinstance(adapter, DataAdapter): raise ValueError("Value must be DataAdapter") if hasattr(adapter, "version") and adapter.version is not None: version = adapter.version else: version = "_UNSPECIFIED_" # make sure this comes first in sorted list if hasattr(adapter, "provider") and adapter.provider is not None: provider = adapter.provider else: provider = adapter.catalog_name if source not in self._sources: self._sources[source] = {} else: # check if data type is the same as adapter with same name adapter0 = next(iter(next(iter(self._sources[source].values())).values())) if adapter0.data_type != adapter.data_type: raise ValueError( f"Data source '{source}' already exists with data type " f"'{adapter0.data_type}' but new data source has data type " f"'{adapter.data_type}'." ) if provider not in self._sources[source]: versions = {version: adapter} else: versions = self._sources[source][provider] if provider in self._sources[source] and version in versions: warnings.warn( f"overwriting data source '{source}' with " f"provider {provider} and version {version}.", UserWarning, stacklevel=2, ) # update and sort dictionary -> make sure newest version is last versions.update({version: adapter}) versions = {k: versions[k] for k in sorted(list(versions.keys()))} self._sources[source][provider] = versions
def __getitem__(self, key: str) -> DataAdapter: """Get the source.""" warnings.warn( 'Using iterating over the DataCatalog directly is deprecated."\ " Please use cat.get_source("name")', DeprecationWarning, stacklevel=2, ) return self.get_source(key) def __setitem__(self, key: str, value: DataAdapter) -> None: """Set or update adaptors.""" warnings.warn( "Using DataCatalog as a dictionary directly is deprecated." " Please use cat.add_source(adapter)", DeprecationWarning, stacklevel=2, ) self.add_source(key, value)
[docs] def iter_sources(self, used_only=False) -> List[Tuple[str, DataAdapter]]: """Return a flat list of all available data sources. Parameters ---------- used_only: bool, optional If True, return only data entries marked as used, by default False. """ ans = [] for source_name, available_providers in self._sources.items(): for _, available_versions in available_providers.items(): for _, adapter in available_versions.items(): if used_only and not adapter._used: continue ans.append((source_name, adapter)) return ans
def __iter__(self) -> Iterator[Tuple[str, DataAdapter]]: """Iterate over sources.""" return iter(self.iter_sources()) def __contains__(self, key: str) -> bool: """Check if source is in catalog.""" warnings.warn( "Directly checking for containement is deprecated. " " Use 'contains_source' instead.", DeprecationWarning, stacklevel=2, ) return self.contains_source(key) def contains_source( self, source: str, provider: Optional[str] = None, version: Optional[str] = None, permissive: bool = True, ) -> bool: """ Check if source is in catalog. Parameters ---------- source : str Name of the data source. provider : str, optional Name of the data provider, by default None. By default the last added provider is returned. version : str, optional Version of the data source, by default None. By default the newest version of the requested provider is returned. permissive : bool, optional Whether variant checking is necessary. If true, the name of the source only is checked, if false, and at least one of version or provider is not None, this will only return True if that variant specifically is available. Returns ------- bool whether the source (with specified variants if necessary) is available """ if permissive or (version is None and provider is None): return source in self._sources else: if version: if version not in self._sources[source]: return False else: selected_version = version else: selected_version = next(iter(self._sources[source].keys())) return provider not in self._sources[source][selected_version].keys() def __len__(self): """Return number of sources.""" return len(self.iter_sources()) def __repr__(self): """Prettyprint the sources.""" return self.to_dataframe().__repr__() def __eq__(self, other) -> bool: if type(other) is type(self): if len(self) != len(other): return False for name, source in self.iter_sources(): try: other_source = other.get_source( name, provider=source.provider, version=source.version ) except KeyError: return False if source != other_source: return False else: return False return True def _repr_html_(self): return self.to_dataframe()._repr_html_()
[docs] def update(self, **kwargs) -> None: """Add data sources to library or update them.""" for k, v in kwargs.items(): self.add_source(k, v)
def update_sources(self, **kwargs) -> None: """Add data sources to library or update them.""" self.update(**kwargs) def _set_predefined_catalogs(self) -> Dict: """Set initialized predefined catalogs to _catalogs attribute.""" for k, cat in PREDEFINED_CATALOGS.items(): self._catalogs[k] = cat( format_version=self._format_version, cache_dir=self._cache_dir ) return self._catalogs
[docs] def from_predefined_catalogs(self, name: str, version: str = "latest") -> None: """Add data sources from a predefined data catalog. Parameters ---------- name : str Catalog name. version : str, optional Catlog release version. By default it takes the latest known release. """ if "=" in name: name, version = name.split("=")[0], name.split("=")[-1] if name not in self.predefined_catalogs: raise ValueError( f'Catalog with name "{name}" not found in predefined catalogs' ) # cache and get path to data_datalog.yml file of the <name> catalog with <version> catalog_path = self.predefined_catalogs[name].get_catalog_file(version) # read catalog self.logger.info(f"Reading data catalog {name} {version}") self.from_yml(catalog_path, catalog_name=name)
def _cache_archive( self, archive_uri: str, name: str, version: str = "latest", sha256: Optional[str] = None, ) -> str: """Cache a data archive to the cache directory. The archive is unpacked and cached to <cache_dir>/<name>/<version> Parameters ---------- archive_uri : str uri to data archive. name : str Name of data catalog version : str, optional Version of data archive, by default 'latest'. sha256 : str, optional SHA256 hash of the archive, by default None. Returns ------- str Path to the datacatalog of the cached data archive """ root = Path(self._cache_dir, name, version) extract_dir = root / Path(archive_uri).stem # retrieve and unpack archive kwargs = {} if Path(archive_uri).suffix == ".zip": kwargs.update(processor=pooch.Unzip(extract_dir=extract_dir)) elif Path(archive_uri).suffix == ".gz": kwargs.update(processor=pooch.Untar(extract_dir=extract_dir)) if Path(archive_uri).exists(): # check if arhive is a local file kwargs.update(donwloader=_copy_file) pooch.retrieve( archive_uri, known_hash=sha256, path=str(root), fname=Path(archive_uri).name, **kwargs, ) return extract_dir
[docs] def from_yml( self, urlpath: Union[Path, str], root: Optional[str] = None, catalog_name: Optional[str] = None, mark_used: bool = False, ) -> DataCatalog: """Add data sources based on yaml file. Parameters ---------- urlpath: str, Path Path or url to data source yaml files. root: str, Path, optional Global root for all relative paths in yaml file(s). mark_used: bool If True, append to used_data list. Examples -------- A yaml data entry is provided below, where all the text between <> should be filled by the user. Multiple data sources of the same data type should be grouped. Currently the following data types are supported: {'RasterDataset', 'GeoDataset', 'GeoDataFrame'}. See the specific data adapters for more information about the required and optional arguments. .. code-block:: yaml meta: root: <path> category: <category> version: <version> name: <name> sha256: <sha256> # only if the root is an archive <name>: path: <path> data_type: <data_type> driver: <driver> filesystem: <filesystem> driver_kwargs: <key>: <value> nodata: <hydromt_variable_name1>: <nodata> rename: <native_variable_name1>: <hydromt_variable_name1> <native_variable_name2>: <hydromt_variable_name2> unit_add: <hydromt_variable_name1>: <float/int> unit_mult: <hydromt_variable_name1>: <float/int> meta: source_url: <source_url> source_version: <source_version> source_licence: <source_licence> paper_ref: <paper_ref> paper_doi: <paper_doi> placeholders: <placeholder_name_1>: <list of names> <placeholder_name_2>: <list of names> Returns ------- DataCatalog DataCatalog object with parsed yaml file added. """ self.logger.info(f"Parsing data catalog from {urlpath}") yml = _yml_from_uri_or_path(urlpath) # parse metadata meta = dict() # legacy code with root/category at highest yml level if "root" in yml: warnings.warn( "The 'root' key is deprecated, use 'meta: root' instead.", DeprecationWarning, stacklevel=2, ) meta.update(root=yml.pop("root")) if "category" in yml: warnings.warn( "The 'category' key is deprecated, use 'meta: category' instead.", DeprecationWarning, stacklevel=2, ) meta.update(category=yml.pop("category")) # read meta data meta = yml.pop("meta", meta) if catalog_name is None: catalog_name = cast( str, meta.get("name", "".join(basename(urlpath).split(".")[:-1])) ) version = meta.get("version", None) if root is None: root = meta.get("root", os.path.dirname(urlpath)) if root.split(".")[-1] in ["gz", "zip"]: # if root is an archive, unpack it at the cache dir root = self._cache_archive( archive_uri=root, name=catalog_name, version=version, sha256=meta.get("sha256", None), ) self.from_dict( yml, catalog_name=catalog_name, root=root, category=meta.get("category", None), mark_used=mark_used, ) return self
def _is_compatible( self, hydromt_version: str, requested_range: str, allow_prerelease=True ) -> bool: if requested_range is None: return True requested = SpecifierSet(requested_range) version = Version(hydromt_version) if allow_prerelease: return version in requested or Version(version.base_version) in requested else: return version in requested
[docs] def from_dict( self, data_dict: Dict, catalog_name: str = "", root: Optional[Union[str, Path]] = None, category: Optional[str] = None, mark_used: bool = False, ) -> DataCatalog: """Add data sources based on dictionary. Parameters ---------- data_dict: dict Dictionary of data_sources. catalog_name: str, optional Name of data catalog root: str, Path, optional Global root for all relative paths in `data_dict`. category: str, optional Global category for all sources in `data_dict`. mark_used: bool If True, append to used_data list. Examples -------- A data dictionary with two entries is provided below, where all the text between <> should be filled by the user. See the specific data adapters for more information about the required and optional arguments. .. code-block:: text { <name1>: { "path": <path>, "data_type": <data_type>, "driver": <driver>, "filesystem": <filesystem>, "driver_kwargs": {<key>: <value>}, "nodata": <nodata>, "rename": {<native_variable_name1>: <hydromt_variable_name1>}, "unit_add": {<hydromt_variable_name1>: <float/int>}, "unit_mult": {<hydromt_variable_name1>: <float/int>}, "meta": {...}, "placeholders": {<placeholder_name_1>: <list of names>}, } <name2>: { ... } } """ meta = data_dict.pop("meta", {}) if "root" in meta and root is None: root = meta.pop("root") if "category" in meta and category is None: category = meta.pop("category") if "name" in meta and catalog_name is None: catalog_name = meta.pop("name") for name, source_dict in _denormalise_data_dict(data_dict): adapter = _parse_data_source_dict( name, source_dict, catalog_name=catalog_name, root=root, category=category, ) if mark_used: adapter.mark_as_used() self.add_source(name, adapter) return self
[docs] def to_yml( self, path: Union[str, Path], root: str = "auto", source_names: Optional[List] = None, used_only: bool = False, meta: Optional[Dict] = None, ) -> None: """Write data catalog to yaml format. Parameters ---------- path: str, Path yaml output path. root: str, Path, optional Global root for all relative paths in yaml file. If "auto" (default) the data source paths are relative to the yaml output ``path``. source_names: list, optional List of source names to export, by default None in which case all sources are exported. This argument is ignored if `used_only=True`. used_only: bool, optional If True, export only data entries kept in used_data list, by default False. meta: dict, optional key-value pairs to add to the data catalog meta section, such as 'version', by default empty. """ meta = meta or [] yml_dir = os.path.dirname(abspath(path)) if root == "auto": root = yml_dir data_dict = self.to_dict( root=root, source_names=source_names, meta=meta, used_only=used_only ) if str(root) == yml_dir: data_dict.pop("root", None) # remove root if it equals the yml_dir if data_dict: with open(path, "w") as f: yaml.dump(data_dict, f, default_flow_style=False, sort_keys=False) else: self.logger.info("The data catalog is empty, no yml file is written.")
[docs] def to_dict( self, source_names: Optional[List] = None, root: Union[Path, str] = None, meta: Optional[dict] = None, used_only: bool = False, ) -> Dict: """Export the data catalog to a dictionary. Parameters ---------- source_names : list, optional List of source names to export, by default None in which case all sources are exported. root : str, Path, optional Global root for all relative paths in the file. meta: dict, optional key-value pairs to add to the data catalog meta section, such as 'version', by default empty. used_only: bool, optional If True, export only data entries marked as used, by default False. Returns ------- dict data catalog dictionary """ meta = meta or {} sources_out = dict() if root is not None: root = abspath(root) meta.update(**{"root": root}) root_drive = os.path.splitdrive(root)[0] sources = self.iter_sources(used_only=used_only) sorted_sources = sorted(sources, key=lambda x: x[0]) for name, source in sorted_sources: # alphabetical order if source_names is not None and name not in source_names: continue source_dict = source.to_dict() if root is not None: path = source_dict["path"] # is abspath source_drive = os.path.splitdrive(path)[0] if ( root_drive == source_drive and os.path.commonpath([path, root]) == root ): source_dict["path"] = os.path.relpath( source_dict["path"], root ).replace("\\", "/") # remove non serializable entries to prevent errors source_dict = _process_dict(source_dict, logger=self.logger) # TODO TEST if name in sources_out: existing = sources_out.pop(name) if existing == source_dict: sources_out.update({name: source_dict}) continue if "variants" in existing: variants = existing.pop("variants") _, variant, _ = partition_dictionaries(source_dict, existing) variants.append(variant) existing["variants"] = variants else: base, diff_existing, diff_new = partition_dictionaries( source_dict, existing ) # provider and version should always be in variants list provider = base.pop("provider", None) if provider is not None: diff_existing["provider"] = provider diff_new["provider"] = provider version = base.pop("version", None) if version is not None: diff_existing["version"] = version diff_new["version"] = version base["variants"] = [diff_new, diff_existing] sources_out[name] = base else: sources_out.update({name: source_dict}) if meta: sources_out = {"meta": meta, **sources_out} return sources_out
[docs] def to_dataframe(self, source_names: Optional[List] = None) -> pd.DataFrame: """Return data catalog summary as DataFrame.""" source_names = source_names or [] d = [] for name, source in self.iter_sources(): if len(source_names) > 0 and name not in source_names: continue d.append( { "name": name, "provider": source.provider, "version": source.version, **source.summary(), } ) return pd.DataFrame.from_records(d).set_index("name")
[docs] def export_data( self, data_root: Union[Path, str], bbox: Optional[Bbox] = None, time_tuple: Optional[TimeRange] = None, source_names: Optional[List] = None, unit_conversion: bool = True, meta: Optional[Dict] = None, append: bool = False, handle_nodata: NoDataStrategy = NoDataStrategy.IGNORE, ) -> None: """Export a data slice of each dataset and a data_catalog.yml file to disk. Parameters ---------- data_root : str, Path Path to output folder bbox : array-like of floats (xmin, ymin, xmax, ymax) bounding box of area of interest. time_tuple : tuple of str, datetime, optional Start and end date of period of interest. By default the entire time period of the dataset is returned. source_names: list, optional List of source names to export, by default None in which case all sources are exported. Specific variables can be selected by appending them to the source name in square brackets. For example, to export all variables of 'source_name1' and only 'var1' and 'var2' of 'source_name' use source_names=['source_name1', 'source_name2[var1,var2]'] unit_conversion: boolean, optional If False skip unit conversion when parsing data from file, by default True. meta: dict, optional key-value pairs to add to the data catalog meta section, such as 'version', by default empty. append: bool, optional If True, append to existing data catalog, by default False. """ source_names = source_names or [] meta = meta or {} data_root = abspath(data_root) if not os.path.isdir(data_root): os.makedirs(data_root) # create copy of data with selected source names source_vars = {} if len(source_names) > 0: sources = {} for source in source_names: # support both strings and SourceSpecDicts here if isinstance(source, str): name = source elif isinstance(source, Dict): name = source["source"] else: raise RuntimeError( f"unknown source type: {source} of type {type(source).__name__}" ) # deduce variables from name if "[" in name: variables = name.split("[")[-1].split("]")[0].split(",") name = name.split("[")[0] source_vars[name] = variables source = self.get_source(name) provider = source.provider version = source.version if name not in sources: sources[name] = {} if provider not in sources[name]: sources[name][provider] = {} sources[name][provider][version] = copy.deepcopy(source) else: sources = copy.deepcopy(self.sources) # read existing data catalog if it exists fn = join(data_root, "data_catalog.yml") if isfile(fn) and append: self.logger.info(f"Appending existing data catalog {fn}") sources_out = DataCatalog(fn).sources else: sources_out = {} # export data and update sources for key, available_variants in sources.items(): for provider, available_versions in available_variants.items(): for version, source in available_versions.items(): try: # read slice of source and write to file self.logger.debug(f"Exporting {key}.") if not unit_conversion: unit_mult = source.unit_mult unit_add = source.unit_add source.unit_mult = {} source.unit_add = {} try: fn_out, driver, driver_kwargs = source.to_file( data_root=data_root, data_name=key, variables=source_vars.get(key, None), bbox=bbox, time_tuple=time_tuple, handle_nodata=NoDataStrategy.RAISE, logger=self.logger, ) except NoDataException as e: _exec_nodata_strat( f"{key} file contains no data: {e}", handle_nodata, logger, ) continue # update path & driver and remove kwargs # and rename in output sources if unit_conversion: source.unit_mult = {} source.unit_add = {} else: source.unit_mult = unit_mult source.unit_add = unit_add source.path = fn_out source.driver = driver source.filesystem = "local" source.driver_kwargs = {} if driver_kwargs is not None: source.driver_kwargs.update(driver_kwargs) source.rename = {} if key in sources_out: self.logger.warning( f"{key} already exists in data catalog, overwriting..." ) if key not in sources_out: sources_out[key] = {} if provider not in sources_out[key]: sources_out[key][provider] = {} sources_out[key][provider][version] = source except FileNotFoundError: self.logger.warning(f"{key} file not found at {source.path}") # write data catalog to yml data_catalog_out = DataCatalog() for key, available_variants in sources_out.items(): for _provider, available_versions in available_variants.items(): for _version, adapter in available_versions.items(): data_catalog_out.add_source(key, adapter) data_catalog_out.to_yml(fn, root="auto", meta=meta)
[docs] def get_rasterdataset( self, data_like: Union[str, SourceSpecDict, Path, xr.Dataset, xr.DataArray], bbox: Optional[List] = None, geom: Optional[gpd.GeoDataFrame] = None, zoom_level: Optional[Union[int, tuple]] = None, buffer: Union[float, int] = 0, handle_nodata: NoDataStrategy = NoDataStrategy.RAISE, align: Optional[bool] = None, variables: Optional[Union[List, str]] = None, time_tuple: Optional[Tuple] = None, single_var_as_array: Optional[bool] = True, provider: Optional[str] = None, version: Optional[str] = None, **kwargs, ) -> Optional[xr.Dataset]: """Return a clipped, sliced and unified RasterDataset. To clip the data to the area of interest, provide a `bbox` or `geom`, with optional additional `buffer` and `align` arguments. To slice the data to the time period of interest, provide the `time_tuple` argument. To return only the dataset variables of interest provide the `variables` argument. NOTE: Unless `single_var_as_array` is set to False a single-variable data source will be returned as :py:class:`xarray.DataArray` rather than :py:class:`xarray.Dataset`. Arguments --------- data_like: str, Path, Dict, xr.Dataset, xr.Datarray DataCatalog key, path to raster file or raster xarray data object. The catalog key can be a string or a dictionary with the following keys: {'name', 'provider', 'version'}. If a path to a raster file is provided it will be added to the catalog with its based on the file basename. bbox : array-like of floats (xmin, ymin, xmax, ymax) bounding box of area of interest (in WGS84 coordinates). geom : geopandas.GeoDataFrame/Series, A geometry defining the area of interest. zoom_level : int, tuple, optional Zoom level of the xyz tile dataset (0 is base level) Using a tuple the zoom level can be specified as (<zoom_resolution>, <unit>), e.g., (1000, 'meter') buffer : int, optional Buffer around the `bbox` or `geom` area of interest in pixels. By default 0. handle_nodata: NoDataStrategy, optional What to do if no data can be found. align : float, optional Resolution to align the bounding box, by default None variables : str or list of str, optional. Names of RasterDataset variables to return. By default all dataset variables are returned. time_tuple : tuple of str, datetime, optional Start and end date of period of interest. By default the entire time period of the dataset is returned. single_var_as_array: bool, optional If True, return a DataArray if the dataset consists of a single variable. If False, always return a Dataset. By default True. provider: str, optional Data source provider. If None (default) the last added provider is used. version: str, optional Data source version. If None (default) the newest version is used. **kwargs: Additional keyword arguments that are passed to the `RasterDatasetAdapter` function. Only used if `data_like` is a path to a raster file. Returns ------- obj: xarray.Dataset or xarray.DataArray RasterDataset. If no data is found and handle_nodata is set to IGNORE None will be returned. if it is set to RAISE and exception will be raised in that situation """ if isinstance(data_like, dict): data_like, provider, version = _parse_data_like_dict( data_like, provider, version ) if isinstance(data_like, (str, Path)): if isinstance(data_like, str) and data_like in self.sources: name = data_like source = self.get_source(name, provider=provider, version=version) else: if "provider" not in kwargs: kwargs.update({"provider": "user"}) source = RasterDatasetAdapter(path=str(data_like), **kwargs) name = basename(data_like) self.add_source(name, source) elif isinstance(data_like, (xr.DataArray, xr.Dataset)): data_like = RasterDatasetAdapter._slice_data( data_like, variables, geom, bbox, buffer, align, time_tuple, logger=self.logger, ) if data_like is None: _exec_nodata_strat( "No data was left after slicing.", strategy=handle_nodata, logger=logger, ) ds = RasterDatasetAdapter._single_var_as_array( data_like, single_var_as_array, variables ) return ds else: raise ValueError(f'Unknown raster data type "{type(data_like).__name__}"') obj = source.get_data( bbox=bbox, geom=geom, buffer=buffer, zoom_level=zoom_level, align=align, variables=variables, time_tuple=time_tuple, single_var_as_array=single_var_as_array, cache_root=self._cache_dir if self.cache else None, handle_nodata=handle_nodata, logger=self.logger, ) return obj
[docs] def get_geodataframe( self, data_like: Union[str, SourceSpecDict, Path, xr.Dataset, xr.DataArray], bbox: Optional[List] = None, geom: Optional[gpd.GeoDataFrame] = None, buffer: Union[float, int] = 0, handle_nodata: NoDataStrategy = NoDataStrategy.RAISE, variables: Optional[Union[List, str]] = None, predicate: str = "intersects", provider: Optional[str] = None, version: Optional[str] = None, **kwargs, ) -> Optional[gpd.GeoDataFrame]: """Return a clipped and unified GeoDataFrame (vector). To clip the data to the area of interest, provide a `bbox` or `geom`, with optional additional `buffer` and `align` arguments. To return only the dataframe columns of interest provide the `variables` argument. Arguments --------- data_like: str, Path, gpd.GeoDataFrame Data catalog key, path to vector file or a vector geopandas object. The catalog key can be a string or a dictionary with the following keys: {'name', 'provider', 'version'}. If a path to a vector file is provided it will be added to the catalog with its based on the file basename. bbox : array-like of floats (xmin, ymin, xmax, ymax) bounding box of area of interest (in WGS84 coordinates). geom : geopandas.GeoDataFrame/Series, A geometry defining the area of interest. buffer : float, optional Buffer around the `bbox` or `geom` area of interest in meters. By default 0. handle_nodata : NoDataStrategy, optional How to handle no data values, by default NoDataStrategy.RAISE predicate : optional If predicate is provided, the GeoDataFrame is filtered by testing the predicate function against each item. Requires bbox or mask. By default 'intersects' options are: {'intersects', 'within', 'contains', 'overlaps', 'crosses', 'touches'}, align : float, optional Resolution to align the bounding box, by default None variables : str or list of str, optional. Names of GeoDataFrame columns to return. By default all columns are returned. provider: str, optional Data source provider. If None (default) the last added provider is used. version: str, optional Data source version. If None (default) the newest version is used. **kwargs: Additional keyword arguments that are passed to the `GeoDataFrameAdapter` function. Only used if `data_like` is a path to a vector file. Returns ------- gdf: Optional[geopandas.GeoDataFrame] GeoDataFrame. If no data is found and handle_nodata is set to IGNORE None will be returned. if it is set to RAISE and exception will be raised in that situation """ if isinstance(data_like, dict): data_like, provider, version = _parse_data_like_dict( data_like, provider, version ) if isinstance(data_like, (str, Path)): if str(data_like) in self.sources: name = str(data_like) source = self.get_source(name, provider=provider, version=version) else: if "provider" not in kwargs: kwargs.update({"provider": "user"}) source = GeoDataFrameAdapter(path=str(data_like), **kwargs) name = basename(data_like) self.add_source(name, source) elif isinstance(data_like, gpd.GeoDataFrame): data_like = GeoDataFrameAdapter._slice_data( data_like, variables, geom, bbox, buffer, predicate, logger=self.logger, ) if data_like is None: _exec_nodata_strat( "No data was left after slicing.", strategy=handle_nodata, logger=logger, ) return data_like else: raise ValueError(f'Unknown vector data type "{type(data_like).__name__}"') gdf = source.get_data( bbox=bbox, geom=geom, handle_nodata=handle_nodata, buffer=buffer, predicate=predicate, variables=variables, logger=self.logger, ) return gdf
[docs] def get_geodataset( self, data_like: Union[str, SourceSpecDict, Path, xr.Dataset, xr.DataArray], bbox: Optional[List] = None, geom: Optional[gpd.GeoDataFrame] = None, buffer: Union[float, int] = 0, handle_nodata: NoDataStrategy = NoDataStrategy.RAISE, predicate: str = "intersects", variables: Optional[List] = None, time_tuple: Optional[Union[Tuple[str, str], Tuple[datetime, datetime]]] = None, single_var_as_array: bool = True, provider: Optional[str] = None, version: Optional[str] = None, **kwargs, ) -> xr.Dataset: """Return a clipped, sliced and unified GeoDataset. To clip the data to the area of interest, provide a `bbox` or `geom`, with optional additional `buffer` and `align` arguments. To slice the data to the time period of interest, provide the `time_tuple` argument. To return only the dataset variables of interest provide the `variables` argument. NOTE: Unless `single_var_as_array` is set to False a single-variable data source will be returned as xarray.DataArray rather than Dataset. Arguments --------- data_like: str, Path, xr.Dataset, xr.DataArray Data catalog key, path to geodataset file or geodataset xarray object. The catalog key can be a string or a dictionary with the following keys: {'name', 'provider', 'version'}. If a path to a file is provided it will be added to the catalog with its based on the file basename. bbox : array-like of floats (xmin, ymin, xmax, ymax) bounding box of area of interest (in WGS84 coordinates). geom : geopandas.GeoDataFrame/Series, A geometry defining the area of interest. buffer : float, optional Buffer around the `bbox` or `geom` area of interest in meters. By default 0. handle_nodata: NoDataStrategy Optional what should happen if the requested data set is empty. RAISE by default predicate : optional If predicate is provided, the GeoDataFrame is filtered by testing the predicate function against each item. Requires bbox or mask. By default 'intersects' options are: {'intersects', 'within', 'contains', 'overlaps', 'crosses', 'touches'}, variables : str or list of str, optional. Names of GeoDataset variables to return. By default all dataset variables are returned. time_tuple : tuple of str, datetime, optional Start and end date of period of interest. By default the entire time period of the dataset is returned. single_var_as_array: bool, optional If True, return a DataArray if the dataset consists of a single variable. If False, always return a Dataset. By default True. **kwargs: Additional keyword arguments that are passed to the `GeoDatasetAdapter` function. Only used if `data_like` is a path to a geodataset file. Returns ------- obj: xarray.Dataset or xarray.DataArray GeoDataset """ if isinstance(data_like, dict): data_like, provider, version = _parse_data_like_dict( data_like, provider, version ) if isinstance(data_like, (str, Path)): if isinstance(data_like, str) and data_like in self.sources: name = data_like source = self.get_source(name, provider=provider, version=version) else: if "provider" not in kwargs: kwargs.update({"provider": "user"}) source = GeoDatasetAdapter(path=str(data_like), **kwargs) name = basename(data_like) self.add_source(name, source) elif isinstance(data_like, (xr.DataArray, xr.Dataset)): data_like = GeoDatasetAdapter._slice_data( data_like, variables, geom, bbox, buffer, predicate, time_tuple, logger=self.logger, ) if data_like is None: _exec_nodata_strat( "No data was left after slicing.", strategy=handle_nodata, logger=logger, ) return GeoDatasetAdapter._single_var_as_array( data_like, single_var_as_array, variables ) else: raise ValueError(f'Unknown geo data type "{type(data_like).__name__}"') obj = source.get_data( bbox=bbox, geom=geom, buffer=buffer, handle_nodata=handle_nodata, predicate=predicate, variables=variables, time_tuple=time_tuple, single_var_as_array=single_var_as_array, ) return obj
[docs] def get_dataset( self, data_like: Union[str, SourceSpecDict, Path, xr.Dataset, xr.DataArray], variables: Optional[List] = None, handle_nodata: NoDataStrategy = NoDataStrategy.RAISE, time_tuple: Optional[Union[Tuple[str, str], Tuple[datetime, datetime]]] = None, single_var_as_array: bool = True, provider: Optional[str] = None, version: Optional[str] = None, **kwargs, ) -> xr.Dataset: """Return a clipped, sliced and unified Dataset. To slice the data to the time period of interest, provide the `time_tuple` argument. To return only the dataset variables of interest provide the `variables` argument. NOTE: Unless `single_var_as_array` is set to False a single-variable data source will be returned as xarray.DataArray rather than a xarray.Dataset. Arguments --------- data_like: str, Path, xr.Dataset, xr.DataArray, SourceSpecDict Data catalog key, path to geodataset file or geodataset xarray object. The catalog key can be a string or a dictionary with the following keys: {'name', 'provider', 'version'}. If a path to a file is provided it will be added to the catalog with its based on the file basename. time_tuple : tuple of str, datetime, optional Start and end date of period of interest. By default the entire time period of the dataset is returned. single_var_as_array: bool, optional If True, return a DataArray if the dataset consists of a single variable. If False, always return a Dataset. By default True. **kwargs: Additional keyword arguments that are passed to the `DatasetAdapter` function. Only used if `data_like` is a path to a geodataset file. Returns ------- obj: xarray.Dataset or xarray.DataArray Dataset """ if isinstance(data_like, dict): data_like, provider, version = _parse_data_like_dict( data_like, provider, version ) if isinstance(data_like, (str, Path)): if isinstance(data_like, str) and data_like in self.sources: name = data_like source = self.get_source(name, provider=provider, version=version) else: if "provider" not in kwargs: kwargs.update({"provider": "local"}) source = DatasetAdapter(path=str(data_like), **kwargs) name = basename(data_like) self.add_source(name, source) elif isinstance(data_like, (xr.DataArray, xr.Dataset)): data_like = DatasetAdapter._slice_data( data_like, variables, time_tuple, logger=self.logger, ) if data_like is None: _exec_nodata_strat( "No data was left after slicing.", strategy=handle_nodata, logger=logger, ) return DatasetAdapter._single_var_as_array( data_like, single_var_as_array, variables ) else: raise ValueError(f'Unknown data type "{type(data_like).__name__}"') obj = source.get_data( variables=variables, time_tuple=time_tuple, single_var_as_array=single_var_as_array, handle_nodata=handle_nodata, ) return obj
[docs] def get_dataframe( self, data_like: Union[str, SourceSpecDict, Path, xr.Dataset, xr.DataArray], variables: Optional[list] = None, time_tuple: Optional[Tuple] = None, handle_nodata: NoDataStrategy = NoDataStrategy.RAISE, provider: Optional[str] = None, version: Optional[str] = None, **kwargs, ): """Return a unified and sliced DataFrame. Parameters ---------- data_like : str, Path, pd.DataFrame Data catalog key, path to tabular data file or tabular pandas dataframe. The catalog key can be a string or a dictionary with the following keys: {'name', 'provider', 'version'}. If a path to a tabular data file is provided it will be added to the catalog with its based on the file basename. variables : str or list of str, optional. Names of GeoDataset variables to return. By default all dataset variables are returned. time_tuple : tuple of str, datetime, optional Start and end date of period of interest. By default the entire time period of the dataset is returned. handle_nodata: NoDataStrategy Optional what should happen if the requested data set is empty. RAISE by default **kwargs: Additional keyword arguments that are passed to the `DataframeAdapter` function. Only used if `data_like` is a path to a tabular data file. Returns ------- pd.DataFrame Tabular data. If no data is found and handle_nodata is set to IGNORE None will be returned. if it is set to RAISE and exception will be raised in that situation """ if isinstance(data_like, dict): data_like, provider, version = _parse_data_like_dict( data_like, provider, version ) if isinstance(data_like, (str, Path)): if isinstance(data_like, str) and data_like in self.sources: name = data_like source = self.get_source(name, provider=provider, version=version) else: if "provider" not in kwargs: kwargs.update({"provider": "user"}) source = DataFrameAdapter(path=data_like, **kwargs) name = basename(data_like) self.add_source(name, source) elif isinstance(data_like, pd.DataFrame): df = DataFrameAdapter._slice_data( data_like, variables, time_tuple, logger=self.logger ) if df is None: _exec_nodata_strat( "No data was left after slicing.", strategy=handle_nodata, logger=logger, ) return df else: raise ValueError(f'Unknown tabular data type "{type(data_like).__name__}"') obj = source.get_data( variables=variables, time_tuple=time_tuple, handle_nodata=handle_nodata, logger=self.logger, ) return obj
def _parse_data_like_dict( data_like: SourceSpecDict, provider: Optional[str] = None, version: Optional[str] = None, ): if not SourceSpecDict.__required_keys__.issuperset(set(data_like.keys())): unknown_keys = set(data_like.keys()) - SourceSpecDict.__required_keys__ raise ValueError(f"Unknown keys in requested data source: {unknown_keys}") elif "source" not in data_like: raise ValueError("No source key found in requested data source") else: source = data_like.get("source") provider = data_like.get("provider", provider) version = data_like.get("version", version) return source, provider, version def _parse_data_source_dict( name: str, data_source_dict: Dict, catalog_name: str = "", root: Optional[Union[Path, str]] = None, category: Optional[str] = None, ) -> Dict: """Parse data source dictionary.""" # link yml keys to adapter classes ADAPTERS = { "RasterDataset": RasterDatasetAdapter, "GeoDataFrame": GeoDataFrameAdapter, "GeoDataset": GeoDatasetAdapter, "DataFrame": DataFrameAdapter, "Dataset": DatasetAdapter, } # parse data source = data_source_dict.copy() # important as we modify with pop # parse path if "path" not in source: raise ValueError(f"{name}: Missing required path argument.") # if remote path, keep as is else call abs_path method to solve local files path = source.pop("path") if not _uri_validator(str(path)): path = abs_path(root, path) # parse data type > adapter data_type = source.pop("data_type", None) if data_type is None: raise ValueError(f"{name}: Data type missing.") elif data_type not in ADAPTERS: raise ValueError(f"{name}: Data type {data_type} unknown") adapter = ADAPTERS.get(data_type) # source meta data meta = source.pop("meta", {}) if "category" not in meta and category is not None: meta.update(category=category) # driver arguments driver_kwargs = source.pop("driver_kwargs", source.pop("kwargs", {})) for driver_kwarg in driver_kwargs: # required for geodataset where driver_kwargs can be a path if "fn" in driver_kwarg: driver_kwargs.update( {driver_kwarg: abs_path(root, driver_kwargs[driver_kwarg])} ) return adapter( path=path, name=name, catalog_name=catalog_name, meta=meta, driver_kwargs=driver_kwargs, **source, ) def _yml_from_uri_or_path(uri_or_path: Union[Path, str]) -> Dict: if _uri_validator(str(uri_or_path)): with requests.get(uri_or_path, stream=True) as r: r.raise_for_status() yml = yaml.load(r.text, Loader=yaml.FullLoader) else: with open(uri_or_path, "r") as stream: yml = yaml.load(stream, Loader=yaml.FullLoader) return yml def _process_dict(d: Dict, logger=logger) -> Dict: """Recursively change dict values to keep only python literal structures.""" for k, v in d.items(): _check_key = isinstance(k, str) if _check_key and isinstance(v, dict): d[k] = _process_dict(v) elif _check_key and isinstance(v, Path): d[k] = str(v) # path to string return d def _denormalise_data_dict(data_dict) -> List[Tuple[str, Dict]]: """Return a flat list of with data name, dictionary of input data_dict. Expand possible versions, aliases and variants in data_dict. """ data_list = [] for name, source in data_dict.items(): source = copy.deepcopy(source) data_dicts = [] if "alias" in source: alias = source.pop("alias") warnings.warn( "The use of alias is deprecated, please add a version on the aliased" "catalog instead.", DeprecationWarning, stacklevel=2, ) if alias not in data_dict: raise ValueError(f"alias {alias} not found in data_dict.") # use alias source but overwrite any attributes with original source source_copy = data_dict[alias].copy() source_copy.update(source) data_dicts.append({name: source_copy}) elif "variants" in source: variants = source.pop("variants") for diff in variants: source_copy = copy.deepcopy(source) source_copy.update(**diff) data_dicts.append({name: source_copy}) elif "placeholders" in source: options = source.pop("placeholders") for combination in itertools.product(*options.values()): source_copy = copy.deepcopy(source) name_copy = name for k, v in zip(options.keys(), combination): name_copy = name_copy.replace("{" + k + "}", v) source_copy["path"] = source_copy["path"].replace("{" + k + "}", v) data_dicts.append({name_copy: source_copy}) else: data_list.append((name, source)) continue # recursively denormalise in case of multiple denormalise keys in source for item in data_dicts: data_list.extend(_denormalise_data_dict(item)) return data_list def abs_path(root: Union[Path, str], rel_path: Union[Path, str]) -> str: path = Path(str(rel_path)) if not path.is_absolute(): if root is not None: rel_path = join(root, rel_path) path = Path(abspath(rel_path)) return str(path)