"""DataCatalog module for HydroMT."""
from __future__ import annotations
import copy
import inspect
import itertools
import logging
import os
from datetime import datetime
from os.path import abspath, basename, dirname, exists, isfile, join, splitext
from pathlib import Path
from typing import (
Any,
Dict,
Iterator,
List,
Optional,
Set,
Tuple,
Union,
cast,
)
import dateutil.parser
import geopandas as gpd
import numpy as np
import pandas as pd
import pooch
import xarray as xr
import yaml
from packaging.specifiers import SpecifierSet
from packaging.version import Version
from pystac import Catalog as StacCatalog
from pystac import CatalogType, MediaType
from hydromt import __version__
from hydromt._io.readers import _yml_from_uri_or_path
from hydromt._typing import Bbox, SourceSpecDict, StrPath, TimeRange
from hydromt._typing.error import NoDataException, NoDataStrategy, exec_nodata_strat
from hydromt._utils import (
_deep_merge,
_partition_dictionaries,
_single_var_as_array,
)
from hydromt.config import SETTINGS
from hydromt.data_catalog.adapters import (
DataFrameAdapter,
DatasetAdapter,
GeoDataFrameAdapter,
GeoDatasetAdapter,
RasterDatasetAdapter,
)
from hydromt.data_catalog.predefined_catalog import (
PredefinedCatalog,
_copy_file,
)
from hydromt.data_catalog.sources import (
DataFrameSource,
DatasetSource,
DataSource,
GeoDataFrameSource,
GeoDatasetSource,
RasterDatasetSource,
create_source,
)
from hydromt.gis._gis_utils import _parse_geom_bbox_buffer
from hydromt.plugins import PLUGINS
logger = logging.getLogger(__name__)
__all__ = ["DataCatalog"]
_NO_DATA_AFTER_SLICE_MSG = "No data was left after slicing."
[docs]
class DataCatalog(object):
"""Base class for the data catalog object."""
_format_version = "v1" # format version of the data catalog
_cache_dir = SETTINGS.cache_root
[docs]
def __init__(
self,
data_libs: Optional[Union[List, str]] = None,
fallback_lib: Optional[str] = "artifact_data",
cache: Optional[bool] = False,
cache_dir: Optional[str] = None,
) -> None:
"""Catalog of sources.
Helps to easily read from different files and keep track of
files which have been accessed.
Arguments
---------
data_libs: List[str], str, Path, optional
One or more paths to data catalog configuration files or names of predefined
data catalogs. By default the data catalog is initiated without data
entries. See :py:func:`~hydromt.data_catalog.DataCatalog.from_yml` for
accepted yaml format.
fallback_lib:
Name of pre-defined data catalog to read if no data_libs are provided,
by default 'artifact_data'.
If None, no default data catalog is used.
cache: bool, optional
Set to true to cache data locally before reading.
Currently only implemented for tiled rasterdatasets, by default False.
cache_dir: str, Path, optional
Folder root path to cach data to, by default ~/.hydromt_data
logger : logger object, optional
The logger object used for logging messages. If not provided, the default
logger will be used.
"""
if data_libs is None:
data_libs = []
elif not isinstance(data_libs, list): # make sure data_libs is a list
data_libs = np.atleast_1d(data_libs).tolist()
data_libs = cast(list, data_libs)
self._sources: Dict[str, DataSource] = {}
self._catalogs: Dict[str, PredefinedCatalog] = {}
self.root: Optional[StrPath] = None
self._fallback_lib = fallback_lib
# caching
self.cache = bool(cache)
if cache_dir is not None:
self._cache_dir = Path(cache_dir)
for name_or_path in data_libs:
if str(name_or_path).split(".")[-1] in ["yml", "yaml"]: # user defined
self.from_yml(name_or_path)
else: # predefined
self.from_predefined_catalogs(name_or_path)
@property
def sources(self) -> Dict[str, DataSource]:
"""Returns dictionary of DataSources."""
if len(self._sources) == 0 and self._fallback_lib is not None:
# read artifacts by default if no catalogs are provided
self.from_predefined_catalogs(self._fallback_lib)
return self._sources
def get_source_names(self) -> List[str]:
"""Return a list of all available data source names."""
return list(self._sources.keys())
def to_stac_catalog(
self,
root: Union[str, Path],
source_names: Optional[List[str]] = None,
meta: Optional[Dict[str, Any]] = None,
catalog_name: str = "hydromt-stac-catalog",
description: str = "The stac catalog of hydromt",
used_only: bool = False,
catalog_type: CatalogType = CatalogType.RELATIVE_PUBLISHED,
handle_nodata: NoDataStrategy = NoDataStrategy.IGNORE,
):
"""Write data catalog to STAC format.
Parameters
----------
path: str, Path
stac output path.
root: str, Path, optional
Global root for all relative paths in yaml file.
If "auto" (default) the data source paths are relative to the yaml
output ``path``.
source_names: list, optional
List of source names to export, by default None in which case all sources
are exported. This argument is ignored if `used_only=True`.
used_only: bool, optional
If True, export only data entries kept in used_data list, by default False.
meta: dict, optional
key-value pairs to add to the data catalog meta section, such as 'version',
by default empty.
"""
meta = meta or {}
stac_catalog = StacCatalog(id=catalog_name, description=description)
for _name, source in self.list_sources(used_only):
stac_child_catalog = source.to_stac_catalog(handle_nodata)
if stac_child_catalog:
stac_catalog.add_child(stac_child_catalog)
stac_catalog.normalize_and_save(root, catalog_type=catalog_type)
return stac_catalog
def from_stac_catalog(
self,
stac_like: Union[str, Path, StacCatalog, Dict[Any, Any]],
):
"""Write data catalog to STAC format.
Parameters
----------
path: str, Path
stac path.
handle_nodata: NoDataStrategy
What to do when required data is not available when converting from STAC
"""
if isinstance(stac_like, (str, Path)):
stac_catalog = StacCatalog.from_file(stac_like)
elif isinstance(stac_like, dict):
stac_catalog = StacCatalog.from_dict(stac_like)
elif isinstance(stac_like, StacCatalog):
stac_catalog = stac_like
else:
raise ValueError(
f"Unsupported type for stac_like: {type(stac_like).__name__}"
)
for item in stac_catalog.get_items(recursive=True):
source_name = item.id
for _asset_name, asset in item.get_assets().items():
if asset.media_type in [
MediaType.HDF,
MediaType.HDF5,
MediaType.COG,
MediaType.TIFF,
]:
source: RasterDatasetSource = RasterDatasetSource(
name=source_name,
uri=asset.get_absolute_href(),
driver=RasterDatasetSource._fallback_driver_read,
)
elif asset.media_type in [MediaType.GEOPACKAGE, MediaType.FLATGEOBUF]:
source: GeoDataFrameSource = GeoDataFrameSource(
name=source_name,
uri=asset.get_absolute_href(),
driver=GeoDataFrameSource._fallback_driver_read,
)
elif asset.media_type == MediaType.GEOJSON:
source: GeoDatasetSource = GeoDatasetSource(
name=source_name,
uri=asset.get_absolute_href(),
driver=GeoDatasetSource._fallback_driver_read,
)
elif asset.media_type == MediaType.JSON:
source: DataFrameSource = DataFrameSource(
name=source_name,
uri=asset.get_absolute_href(),
driver=DataFrameSource._fallback_driver_read,
)
else:
continue
self.add_source(source_name, source)
return self
@property
def predefined_catalogs(self) -> Dict:
"""Return all predefined catalogs."""
if not self._catalogs:
self._set_predefined_catalogs()
return self._catalogs
[docs]
def get_source_bbox(
self,
source: str,
provider: Optional[str] = None,
version: Optional[str] = None,
detect: bool = True,
strict: bool = False,
) -> Optional[Tuple[Bbox, int]]:
"""Retrieve the bounding box and crs of the source.
Parameters
----------
source: str,
the name of the data source.
provider: Optional[str]
the provider of the source to detect the bbox of, if None, the last one
added will be used.
version: Optional[str]
the version of the source to detect the bbox of, if None, the last one
added will be used.
detect: bool
Whether to detect the bbox of the source if it is not set.
strict: bool
Raise an error if the adapter does not support bbox detection (such as
dataframes). In that case, a warning will be logged instead.
Returns
-------
bbox: Tuple[np.float64,np.float64,np.float64,np.float64]
the bounding box coordinates of the data. coordinates are returned as
[xmin,ymin,xmax,ymax]
crs: int
The ESPG code of the CRS of the coordinates returned in bbox
"""
s: DataSource = self.get_source(source, provider, version)
try:
return s.get_bbox(detect=detect) # type: ignore
except TypeError as e:
if strict:
raise e
else:
logger.warning(
f"Source of type {type(s)} does not support detecting spatial"
"extents. skipping..."
)
[docs]
def get_source_time_range(
self,
source: str,
provider: Optional[str] = None,
version: Optional[str] = None,
detect: bool = True,
strict: bool = False,
) -> Optional[Tuple[datetime, datetime]]:
"""Detect the temporal range of the dataset.
Parameters
----------
source: str,
the name of the data source.
provider: Optional[str]
the provider of the source to detect the time range of, if None,
the last one added will be used.
version: Optional[str]
the version of the source to detect the time range of, if None, the last one
added will be used.
detect: bool
Whether to detect the time range of the source if it is not set.
strict: bool
Raise an error if the adapter does not support time range detection (such as
dataframes). In that case, a warning will be logged instead.
Returns
-------
range: Tuple[np.datetime64, np.datetime64]
A tuple containing the start and end of the time dimension. Range is
inclusive on both sides.
"""
s = self.get_source(source, provider, version)
try:
return s.get_time_range(detect=detect) # type: ignore
except TypeError as e:
if strict:
raise e
else:
logger.warning(
f"Source of type {type(s)} does not support detecting"
" temporalextents. skipping..."
)
[docs]
def get_source(
self,
source: str,
provider: Optional[str] = None,
version: Optional[str] = None,
) -> DataSource:
"""Return a data source.
Parameters
----------
source : str
Name of the data source.
provider : str, optional
Name of the data provider, by default None.
By default the last added provider is returned.
version : str, optional
Version of the data source, by default None.
By default the newest version of the requested provider is returned.
Returns
-------
DataSource
DataSource object.
"""
source = str(source)
if source not in self._sources:
available_sources = sorted(list(self._sources.keys()))
raise KeyError(
f"Requested unknown data source '{source}' "
f"available sources are: {available_sources}"
)
available_providers = self._sources[source]
# make sure all arguments are strings
provider = str(provider) if provider is not None else provider
version = str(version) if version is not None else version
# find provider matching requested version
if provider is None and version is not None:
providers = [p for p, v in available_providers.items() if version in v]
if len(providers) > 0: # error raised later if no provider found
provider = providers[-1]
# check if provider is available, otherwise use last added provider
if provider is None:
requested_provider = list(available_providers.keys())[-1]
else:
requested_provider = provider
if requested_provider not in available_providers:
providers = sorted(list(available_providers.keys()))
raise KeyError(
f"Requested unknown provider '{requested_provider}' for "
f"data source '{source}' available providers are: {providers}"
)
available_versions = available_providers[requested_provider]
# check if version is available, otherwise use last added version which is
# always the newest version
if version is None:
requested_version = list(available_versions.keys())[-1]
else:
requested_version = version
if requested_version not in available_versions:
versions = sorted(list(map(str, available_versions.keys())))
raise KeyError(
f"Requested unknown version '{requested_version}' for "
f"data source '{source}' and provider '{requested_provider}' "
f"available versions are {versions}"
)
return self._sources[source][requested_provider][requested_version]
[docs]
def add_source(self, name: str, source: DataSource) -> None:
"""Add a new data source to the data catalog.
The data version and provider are extracted from the DataSource object.
Parameters
----------
name: str
Name of the data source.
source: DataSource
DataSource object.
"""
if not isinstance(source, DataSource):
raise ValueError("Value must be DataSource")
if source.version:
version = str(source.version)
else:
version = "_UNSPECIFIED_" # make sure this comes first in sorted list
if source.provider:
provider = str(source.provider)
else:
protocol = source.driver.filesystem.protocol
if isinstance(protocol, str):
provider: str = protocol
else:
provider: str = protocol[0]
if name not in self._sources:
self._sources[name] = {}
else: # check if data type is the same as source with same name
source0 = next(iter(next(iter(self._sources[name].values())).values()))
if source0.data_type != source.data_type:
raise ValueError(
f"Data source '{name}' already exists with data type "
f"'{source0.data_type}' but new data source has data type "
f"'{source.data_type}'."
)
if provider not in self._sources[name]:
versions = {str(version): source}
else:
versions = self._sources[name][provider]
if provider in self._sources[name] and version in versions:
logger.warning(
f"overwriting data source '{name}' with "
f"provider {provider} and version {version}.",
)
# update and sort dictionary -> make sure newest version is last
versions.update({str(version): source})
versions = {(k): versions[k] for k in sorted(list(versions.keys()))}
self._sources[name][provider] = versions
def list_sources(self, used_only=False) -> List[Tuple[str, DataSource]]:
"""Return a flat list of all available data sources.
Parameters
----------
used_only: bool, optional
If True, return only data entries marked as used, by default False.
"""
sources = []
for source_name, available_providers in self._sources.items():
for _, available_versions in available_providers.items():
for _, source in available_versions.items():
if used_only and not source._used:
continue
sources.append((source_name, source))
return sources
def __iter__(self) -> Iterator[Tuple[str, DataSource]]:
"""Iterate over sources."""
return iter(self.list_sources())
def contains_source(
self,
source: str,
provider: Optional[str] = None,
version: Optional[str] = None,
permissive: bool = True,
) -> bool:
"""
Check if source is in catalog.
Parameters
----------
source : str
Name of the data source.
provider : str, optional
Name of the data provider, by default None.
By default the last added provider is returned.
version : str, optional
Version of the data source, by default None.
By default the newest version of the requested provider is returned.
permissive : bool, optional
Whether variant checking is necessary. If true, the name of the source
only is checked, if false, and at least one of version or provider is
not None, this will only return True if that variant specifically is
available.
Returns
-------
bool
whether the source (with specified variants if necessary) is available
"""
if permissive or (version is None and provider is None):
return source in self._sources
else:
if version:
if version not in self._sources[source]:
return False
else:
selected_version = version
else:
selected_version = next(iter(self._sources[source].keys()))
return provider not in self._sources[source][selected_version].keys()
def __len__(self):
"""Return number of sources."""
return len(self.list_sources())
def __repr__(self):
"""Prettyprint the sources."""
return self._to_dataframe().__repr__()
def __eq__(self, other) -> bool:
if type(other) is type(self):
if len(self) != len(other):
return False
for name, source in self.list_sources():
try:
other_source = other.get_source(
name, provider=source.provider, version=str(source.version)
)
except KeyError:
return False
if source != other_source:
return False
else:
return False
return True
def _repr_html_(self):
return self._to_dataframe()._repr_html_()
[docs]
def update(self, **kwargs) -> None:
"""Add data sources to library or update them."""
for k, v in kwargs.items():
self.add_source(k, v)
def update_sources(self, **kwargs) -> None:
"""Add data sources to library or update them."""
self.update(**kwargs)
def _set_predefined_catalogs(self) -> Dict:
"""Set initialized predefined catalogs to _catalogs attribute."""
for k, cat in PLUGINS.catalog_plugins.items():
self._catalogs[k] = cat(
format_version=self._format_version, cache_dir=self._cache_dir
)
return self._catalogs
[docs]
def from_predefined_catalogs(self, name: str, version: str = "latest") -> None:
"""Add data sources from a predefined data catalog.
Parameters
----------
name : str
Catalog name.
version : str, optional
Catlog release version. By default it takes the latest known release.
"""
if "=" in name:
name, version = name.split("=")
if name not in self.predefined_catalogs:
raise ValueError(
f'Catalog with name "{name}" not found in predefined catalogs'
)
# cache and get path to data_datalog.yml file of the <name> catalog with <version>
catalog_path = self.predefined_catalogs[name].get_catalog_file(version)
# read catalog
logger.info(f"Reading data catalog {name} {version}")
self.from_yml(catalog_path, catalog_name=name, catalog_version=version)
def _cache_archive(
self,
archive_uri: str,
name: str,
version: str = "latest",
sha256: Optional[str] = None,
) -> Path:
"""Cache a data archive to the cache directory.
The archive is unpacked and cached to <cache_dir>/<name>/<version>
Parameters
----------
archive_uri : str
uri to data archive.
name : str
Name of data catalog
version : str, optional
Version of data archive, by default 'latest'.
sha256 : str, optional
SHA256 hash of the archive, by default None.
Returns
-------
str
Path to the datacatalog of the cached data archive
"""
root = Path(self._cache_dir, name, version)
# retrieve and unpack archive
kwargs = {}
if Path(archive_uri).suffix == ".zip":
kwargs.update(processor=pooch.Unzip(extract_dir=root))
elif Path(archive_uri).suffix == ".gz":
kwargs.update(processor=pooch.Untar(extract_dir=root))
if Path(archive_uri).exists(): # check if arhive is a local file
kwargs.update(donwloader=_copy_file)
pooch.retrieve(
archive_uri,
known_hash=sha256,
path=str(root),
fname=Path(archive_uri).name,
**kwargs,
)
return root
[docs]
def from_yml(
self,
urlpath: Union[Path, str],
root: Optional[StrPath] = None,
catalog_name: Optional[str] = None,
catalog_version: Optional[str] = None,
mark_used: bool = False,
) -> DataCatalog:
"""Add data sources based on yaml file.
Parameters
----------
urlpath: str, Path
Path or url to data source yaml files.
root: str, Path, optional
Global root for all relative paths in yaml file(s).
mark_used: bool
If True, append to used_data list.
Examples
--------
A yaml data entry is provided below, where all the text between <>
should be filled by the user. Multiple data sources of the same
data type should be grouped. Currently the following data types are supported:
{'RasterDataset', 'GeoDataset', 'GeoDataFrame', 'DataFrame', 'Dataset'}. See the
specific data adapters
for more information about the required and optional arguments.
.. code-block:: yaml
meta:
root: <path>
category: <category>
version: <version>
name: <name>
sha256: <sha256> # only if the root is an archive
<name>:
uri: <uri>
data_type: <data_type>
driver: <driver>
data_adapter: <data_adapter>
uri_resolver: <uri_resolver>
metadata:
source_url: <source_url>
source_version: <source_version>
source_licence: <source_licence>
paper_ref: <paper_ref>
paper_doi: <paper_doi>
placeholders:
<placeholder_name_1>: <list of names>
<placeholder_name_2>: <list of names>
Returns
-------
DataCatalog
DataCatalog object with parsed yaml file added.
"""
logger.info(f"Parsing data catalog from {urlpath}")
yml = _yml_from_uri_or_path(urlpath)
# read meta data
meta = yml.pop("meta", {})
if catalog_name is None:
catalog_name = cast(
str, meta.get("name", "".join(basename(urlpath).split(".")[:-1]))
)
if catalog_version is not None:
version = catalog_version
else:
version = meta.pop("version", "latest")
if root is not None:
self.root = root
elif "root" in meta:
root = meta.pop("root")
elif "roots" in meta:
root = self._determine_catalog_root(meta)
else:
root = dirname(Path(urlpath))
self.from_dict(
yml,
catalog_name=catalog_name,
catalog_version=version,
root=root,
category=meta.get("category", None),
mark_used=mark_used,
)
return self
def _is_compatible(
self, hydromt_version: str, requested_range: str, allow_prerelease=True
) -> bool:
if requested_range is None:
return True
requested = SpecifierSet(requested_range)
version = Version(hydromt_version)
if allow_prerelease:
return version in requested or Version(version.base_version) in requested
else:
return version in requested
def _determine_catalog_root(self, meta: Dict[str, Any]) -> Path:
"""Determine which of the roots provided in meta exists and should be used."""
root = None
to_check = meta.get("roots", [])
if "root" in meta:
to_check.append(meta["root"])
for r in to_check:
if exists(r):
root = r
break
if root is None:
raise ValueError("None of the specified roots were found")
else:
return Path(root)
[docs]
def from_dict(
self,
data_dict: Dict[str, Any],
catalog_name: str = "",
catalog_version: Optional[str] = None,
root: Optional[StrPath] = None,
category: Optional[str] = None,
mark_used: bool = False,
) -> DataCatalog:
"""Add data sources based on dictionary.
Parameters
----------
data_dict: dict
Dictionary of data_sources.
catalog_name: str, optional
Name of data catalog
root: str, Path, optional
Global root for all relative paths in `data_dict`.
category: str, optional
Global category for all sources in `data_dict`.
mark_used: bool
If True, append to used_data list.
Examples
--------
A data dictionary with two entries is provided below, where all the text between
<> should be filled by the user. See the specific data adapters
for more information about the required and optional arguments.
.. code-block:: text
{
<name1>: {
"path": <path>,
"data_type": <data_type>,
"driver": <driver>,
"data_adapter": <data_adapter>,
"uri_resolver": <uri_resolver>,
"metadata": {...},
"placeholders": {<placeholder_name_1>: <list of names>},
}
<name2>: {
...
}
}
"""
meta = data_dict.pop("meta", {})
# check version required hydromt version
requested_version = meta.get("hydromt_version", None)
if requested_version is not None:
allow_dev = meta.get("allow_dev_version", True)
if not self._is_compatible(__version__, requested_version, allow_dev):
raise RuntimeError(
f"Data catalog requires Hydromt Version {requested_version} which "
f"is incompattible with current hydromt version {__version__}."
)
if "category" in meta and category is None:
category = meta.pop("category")
if catalog_version is not None:
version = catalog_version
else:
version = meta.get("version", None)
if root is not None:
self.root = root
elif "root" in meta:
self.root = meta.pop("root")
elif "roots" in meta:
self.root = self._determine_catalog_root(meta)
else:
self.root = dirname(Path("."))
if self.root is not None and splitext(self.root)[-1] in [".gz", ".zip"]:
# if root is an archive, unpack it at the cache dir
self.root = self._cache_archive(
self.root, name=catalog_name, version=version
)
# save catalog to cache
with open(join(self.root, f"{catalog_name}.yml"), "w") as f:
d = {"meta": {k: v for k, v in meta.items() if k != "roots"}}
d.update(data_dict)
yaml.dump(d, f, default_flow_style=False, sort_keys=False)
for name, source_dict in _denormalise_data_dict(data_dict):
source = _parse_data_source_dict(
name,
source_dict,
root=self.root,
category=category,
)
if mark_used:
source._mark_as_used()
self.add_source(name, source)
return self
[docs]
def to_yml(
self,
path: Union[str, Path],
root: str = "auto",
source_names: Optional[List[str]] = None,
used_only: bool = False,
meta: Optional[Dict[str, Any]] = None,
) -> None:
"""Write data catalog to yaml format.
Parameters
----------
path: str, Path
yaml output path.
root: str, Path, optional
Global root for all relative paths in yaml file.
If "auto" (default) the data source paths are relative to the yaml
output ``path``.
source_names: list, optional
List of source names to export, by default None in which case all sources
are exported. This argument is ignored if `used_only=True`.
used_only: bool, optional
If True, export only data entries kept in used_data list, by default False.
meta: dict, optional
key-value pairs to add to the data catalog meta section, such as 'version',
by default empty.
"""
meta = meta or {}
yml_dir = os.path.dirname(abspath(path))
if root == "auto":
root = yml_dir
data_dict = self.to_dict(
root=root, source_names=source_names, meta=meta, used_only=used_only
)
if str(root) == yml_dir:
data_dict.pop("root", None) # remove root if it equals the yml_dir
if data_dict:
with open(path, "w") as f:
yaml.dump(data_dict, f, default_flow_style=False, sort_keys=False)
else:
logger.info("The data catalog is empty, no yml file is written.")
[docs]
def to_dict(
self,
source_names: Optional[List[str]] = None,
root: Optional[StrPath] = None,
meta: Optional[Dict[str, Any]] = None,
used_only: bool = False,
) -> Dict[str, Any]:
"""Export the data catalog to a dictionary.
Parameters
----------
source_names : list, optional
List of source names to export, by default None in which case all sources
are exported.
root : str, Path, optional
Global root for all relative paths in the file.
meta: dict, optional
key-value pairs to add to the data catalog meta section, such as 'version',
by default empty.
used_only: bool, optional
If True, export only data entries marked as used, by default False.
Returns
-------
dict
data catalog dictionary
"""
meta = meta or {}
sources_out: Dict[str, Any] = dict()
if root is None:
root = str(self.root)
meta.update(**{"root": root})
sources = self.list_sources(used_only=used_only)
sorted_sources = sorted(sources, key=lambda x: x[0])
for name, source in sorted_sources: # alphabetical order
if source_names is not None and name not in source_names:
continue
source_dict = source.model_dump(
exclude_defaults=True, # keeps catalog as clean as possible
exclude={"name"}, # name is already in the key
round_trip=True,
)
# remove non serializable entries to prevent errors
source_dict = _process_dict(source_dict)
source_dict["root"] = root
if name in sources_out:
existing = sources_out.pop(name)
if existing == source_dict:
sources_out.update({name: source_dict})
continue
if "variants" in existing:
variants = existing.pop("variants")
_, variant, _ = _partition_dictionaries(source_dict, existing)
variants.append(variant)
existing["variants"] = variants
else:
base, diff_existing, diff_new = _partition_dictionaries(
source_dict, existing
)
# provider and version should always be in variants list
provider = base.pop("provider", None)
if provider is not None:
diff_existing["provider"] = provider
diff_new["provider"] = provider
version = base.pop("version", None)
if version is not None:
diff_existing["version"] = version
diff_new["version"] = version
base["variants"] = [diff_new, diff_existing]
sources_out[name] = base
else:
sources_out.update({name: source_dict})
if meta:
sources_out = {"meta": meta, **sources_out}
return sources_out
def _to_dataframe(self, source_names: Optional[List] = None) -> pd.DataFrame:
"""Return data catalog summary as DataFrame."""
source_names = source_names or []
d = []
for name, source in self.list_sources():
if len(source_names) > 0 and name not in source_names:
continue
d.append(
{
"name": name,
"provider": source.provider,
"version": source.version,
**source.summary(),
}
)
if d:
return pd.DataFrame.from_records(d).set_index("name")
else:
return pd.DataFrame()
[docs]
def export_data(
self,
new_root: Union[Path, str],
bbox: Optional[Bbox] = None,
time_range: Optional[TimeRange] = None,
source_names: Optional[List[str]] = None,
unit_conversion: bool = True,
metadata: Optional[Dict[str, Any]] = None,
force_overwrite: bool = False,
append: bool = False,
handle_nodata: NoDataStrategy = NoDataStrategy.IGNORE,
) -> None:
"""Export a data slice of each dataset and a data_catalog.yml file to disk.
Parameters
----------
new_root : str, Path
Path to output folder
bbox : array-like of floats
(xmin, ymin, xmax, ymax) bounding box of area of interest.
time_range: tuple of str, datetime, optional
Start and end date of period of interest. By default the entire time period
of the dataset is returned.
source_names: list, optional
List of source names to export, by default None in which case all sources
are exported. Specific variables can be selected by appending them to the
source name in square brackets. For example, to export all variables of
'source_name1' and only 'var1' and 'var2' of 'source_name'
use source_names=['source_name1', 'source_name2[var1,var2]']
unit_conversion: boolean, optional
If False skip unit conversion when parsing data from file, by default True.
meta: dict, optional
key-value pairs to add to the data catalog meta section, such as 'version',
by default empty.
forced_overwrite: bool
override any existing files if True. False by default.
append: bool, optional
If True, append to existing data catalog, by default False.
"""
# Create new root
source_names = source_names or []
metadata = metadata or {}
if not isinstance(new_root, Path):
new_root = Path(new_root)
new_root = new_root.absolute()
new_root.mkdir(exist_ok=True)
if time_range:
time_range: TimeRange = _parse_time_range(time_range)
# create copy of data with selected source names
source_vars = {}
if len(source_names) > 0:
sources: Dict[str, Dict[str, Dict[Any, DataSource]]] = {}
for source in source_names:
# support both strings and SourceSpecDicts here
if isinstance(source, str):
name = source
elif isinstance(source, Dict):
name = source["source"]
else:
raise RuntimeError(
f"unknown source type: {source} of type {type(source).__name__}"
)
# deduce variables from name
if "[" in name:
variables = name.split("[")[-1].split("]")[0].split(",")
name = name.split("[")[0]
source_vars[name] = variables
source = self.get_source(name)
provider = source.provider
version = source.version
if name not in sources:
sources[name] = {}
if provider not in sources[name]:
sources[name][provider] = {}
sources[name][provider][version] = copy.deepcopy(source)
else:
sources: Dict[str, Dict[str, Dict[Any, DataSource]]] = copy.deepcopy(
self.sources
)
# read existing data catalog if it exists
path = join(new_root, "data_catalog.yml")
if isfile(path) and append:
logger.info(f"Appending existing data catalog {path}")
sources_out = DataCatalog(path).sources
else:
sources_out = {}
# export data and update sources
for key, available_variants in sources.items():
for provider, available_versions in available_variants.items():
for version, source in available_versions.items():
try:
# read slice of source and write to file
logger.debug(f"Exporting {key}.")
if not unit_conversion:
source.data_adapter.unit_mult = {}
source.data_adapter.unit_add = {}
try:
# get keyword only params
kw_only_params: Set[inspect.Parameter] = set(
map(
lambda x: x[0], # take param name
filter(
lambda t: t[1].kind
== 3, # 3 in enum is kw_only
inspect.signature(
source.to_file
).parameters.items(),
),
)
)
# get kwargs that are available for this source
query_kwargs: Dict[str, Any] = {
"variables": source_vars.get(key, None),
"bbox": bbox,
"time_range": time_range,
}
# Then combine with values
query_kwargs: Dict[str, Any] = {
k: v
for k, v in query_kwargs.items()
if k in kw_only_params
}
bbox: Optional[Bbox] = query_kwargs.get("bbox")
if bbox is not None:
mask = _parse_geom_bbox_buffer(bbox=bbox)
else:
mask = None
source_kwargs: Dict[str, Any] = copy.deepcopy(query_kwargs)
source_kwargs.pop("bbox", None)
source_kwargs["mask"] = mask
basename: str = source._get_uri_basename(
handle_nodata, **source_kwargs
)
p = cast(Path, Path(new_root) / basename)
if not force_overwrite and isfile(p):
logger.warning(
f"File {p} already exists and not in forced overwrite mode. skipping..."
)
continue
new_source: DataSource = source.to_file(
file_path=p,
handle_nodata=NoDataStrategy.RAISE,
**query_kwargs,
)
except NoDataException as e:
exec_nodata_strat(
f"{key} file contains no data: {e}",
handle_nodata,
)
continue
if key in sources_out:
logger.warning(
f"{key} already exists in data catalog, overwriting..."
)
if key not in sources_out:
sources_out[key] = {}
if provider not in sources_out[key]:
sources_out[key][provider] = {}
sources_out[key][provider][version] = new_source
except FileNotFoundError:
logger.warning(f"{key} file not found at {new_source.uri}")
# write data catalog to yml
data_catalog_out = DataCatalog()
for key, available_variants in sources_out.items():
for _provider, available_versions in available_variants.items():
for _version, adapter in available_versions.items():
data_catalog_out.add_source(key, adapter)
data_catalog_out.to_yml(path, root="auto", meta=metadata)
[docs]
def get_rasterdataset(
self,
data_like: Union[
str, SourceSpecDict, Path, xr.Dataset, xr.DataArray, RasterDatasetSource
],
bbox: Optional[Bbox] = None,
geom: Optional[gpd.GeoDataFrame] = None,
zoom: Optional[Union[int, tuple]] = None,
buffer: Union[float, int] = 0,
handle_nodata: NoDataStrategy = NoDataStrategy.RAISE,
variables: Optional[Union[List, str]] = None,
time_range: Optional[TimeRange] = None,
single_var_as_array: Optional[bool] = True,
provider: Optional[str] = None,
version: Optional[str] = None,
**kwargs,
) -> Optional[Union[xr.Dataset, xr.DataArray]]:
"""Return a clipped, sliced and unified RasterDataset.
To clip the data to the area of interest, provide a `bbox` or `geom`,
with optional additional `buffer` argument.
To slice the data to the time period of interest, provide the `time_range`
argument. To return only the dataset variables of interest provide the
`variables` argument.
NOTE: Unless `single_var_as_array` is set to False a single-variable data source
will be returned as :py:class:`xarray.DataArray` rather than
:py:class:`xarray.Dataset`.
Parameters
----------
data_like : Union[ str, SourceSpecDict, Path, xr.Dataset, xr.DataArray, RasterDatasetSource ]
Data catalog key, path to RasterDataset file, a RasterDatasetSource object,
or RasterDataset xarray object.
The catalog key can be a string or a dictionary with the following keys:
{'name', 'provider', 'version'}.
If a path to a file is provided it will be added
to the catalog with its based on the file basename.
bbox : Optional[List], optional
(xmin, ymin, xmax, ymax) bounding box of area of interest
(in WGS84 coordinates), by default None
geom : Optional[gpd.GeoDataFrame], optional
A geometry defining the area of interest, by default None
zoom: Optional[Zoom], optional,
Either an overview_level, or a tuple with the resolution and unit of the resolution.
buffer : Union[float, int], optional
Buffer around the `bbox` or `geom` area of interest in meters, by default 0
handle_nodata : NoDataStrategy, optional
How to react when no data is found, by default NoDataStrategy.RAISE
variables : Optional[List], optional
Names of RasterDataset variables to return, or all if None, by default None
time_range : Optional[TimeRange], optional
Start and end date of period of interest, or entire period if None, by default None
single_var_as_array : bool, optional
Wether to return a xr.DataArray if the dataset consists of a single variable,
by default True
provider : Optional[str], optional
Specifies a data provider, by default None
version : Optional[str], optional
Specifies a data version, by default None
**kwargs:
Extra keyword arguments passed to the RasterDatasetSource construction
Returns
-------
xr.Dataset
a clipped, sliced and unified RasterDataset
Raises
------
ValueError
If `data_like` is of an unknown type
NoDataException
If no data is found and handle_nodata is NoDataStrategy.RAISE
"""
if isinstance(variables, str):
variables = [variables]
if time_range:
time_range = _parse_time_range(time_range)
if isinstance(data_like, dict):
data_like, provider, version = _parse_data_like_dict(
data_like, provider, version
)
if isinstance(data_like, (str, Path)):
if isinstance(data_like, str) and data_like in self.sources:
name = data_like
source = self.get_source(name, provider=provider, version=version)
else:
if "provider" not in kwargs:
kwargs.update({"provider": "user"})
driver: str = kwargs.pop(
"driver", RasterDatasetSource._fallback_driver_read
)
name = basename(data_like)
source = RasterDatasetSource(
name=name, uri=str(data_like), driver=driver, **kwargs
)
self.add_source(name, source)
elif isinstance(data_like, (xr.DataArray, xr.Dataset)):
if geom is not None or bbox is not None:
mask = _parse_geom_bbox_buffer(geom, bbox, buffer)
else:
mask = None
data_like = RasterDatasetAdapter._slice_data(
ds=data_like,
variables=variables,
mask=mask,
time_range=time_range,
)
if data_like is None:
exec_nodata_strat(
_NO_DATA_AFTER_SLICE_MSG,
strategy=handle_nodata,
)
return _single_var_as_array(
maybe_ds=data_like,
single_var_as_array=single_var_as_array,
variable_name=variables,
)
elif isinstance(data_like, RasterDatasetSource):
source = data_like
else:
raise ValueError(f'Unknown raster data type "{type(data_like).__name__}"')
return source.read_data(
bbox=bbox,
mask=geom,
buffer=buffer,
zoom=zoom,
variables=variables,
time_range=time_range,
handle_nodata=handle_nodata,
single_var_as_array=single_var_as_array,
)
[docs]
def get_geodataframe(
self,
data_like: Union[
str, SourceSpecDict, Path, xr.Dataset, xr.DataArray, GeoDataFrameSource
],
bbox: Optional[List] = None,
geom: Optional[gpd.GeoDataFrame] = None,
buffer: Union[float, int] = 0,
handle_nodata: NoDataStrategy = NoDataStrategy.RAISE,
variables: Optional[Union[List, str]] = None,
predicate: str = "intersects",
provider: Optional[str] = None,
version: Optional[str] = None,
**kwargs,
) -> Optional[gpd.GeoDataFrame]:
"""Return a clipped and unified GeoDataFrame (vector).
To clip the data to the area of interest, provide a `bbox` or `geom`,
with optional additional `buffer` argument.
To return only the GeoDataFrame columns of interest provide the
`variables` argument.
Parameters
----------
data_like : Union[ str, SourceSpecDict, Path, xr.Dataset, xr.DataArray, GeoDataFrameSource ]
Data catalog key, path to vector file, a GeoDataFrameSource object,
or a vector geopandas object.
The catalog key can be a string or a dictionary with the following keys:
{'name', 'provider', 'version'}.
If a path to a vector file is provided it will be added
to the catalog with its based on the file basename.
bbox : Optional[List], optional
(xmin, ymin, xmax, ymax) bounding box of area of interest
(in WGS84 coordinates), by default None
geom : Optional[gpd.GeoDataFrame], optional
A geometry defining the area of interest, by default None
buffer : Union[float, int], optional
Buffer around the `bbox` or `geom` area of interest in meters, by default 0
handle_nodata : NoDataStrategy, optional
How to react when no data is found, by default NoDataStrategy.RAISE
variables : Optional[List], optional
Names of GeoDataFrame variables to return, or all if None, by default None
predicate : str, optional
If predicate is provided, the GeoDataSet is filtered by testing
the predicate function against each item. Requires bbox or mask.
options are:
{'intersects', 'within', 'contains', 'overlaps', 'crosses', 'touches'},
by default 'intersects'
provider : Optional[str], optional
Specifies a data provider, by default None
version : Optional[str], optional
Specifies a data version, by default None
**kwargs:
Extra keyword arguments passed to the GeoDataFrameSource construction
Returns
-------
gpd.GeoDataFrame
a clipped, sliced and unified GeoDataFrame
Raises
------
ValueError
If `data_like` is of an unknown type
NoDataException
If no data is found and handle_nodata is NoDataStrategy.RAISE
"""
if geom is not None or bbox is not None:
mask = _parse_geom_bbox_buffer(geom=geom, bbox=bbox, buffer=buffer)
else:
mask = None
if isinstance(data_like, dict):
data_like, provider, version = _parse_data_like_dict(
data_like, provider, version
)
if isinstance(data_like, (str, Path)):
if str(data_like) in self.sources:
name = str(data_like)
source = self.get_source(name, provider=provider, version=version)
else:
if "provider" not in kwargs:
kwargs.update({"provider": "user"})
driver: str = kwargs.pop(
"driver", GeoDataFrameSource._fallback_driver_read
)
name = basename(data_like)
source = GeoDataFrameSource(
name=name, uri=str(data_like), driver=driver, **kwargs
)
self.add_source(name, source)
elif isinstance(data_like, gpd.GeoDataFrame):
data_like = GeoDataFrameAdapter._slice_data(
data_like,
variables=variables,
mask=mask,
predicate=predicate,
)
if data_like is None:
exec_nodata_strat(
_NO_DATA_AFTER_SLICE_MSG,
strategy=handle_nodata,
)
return data_like
elif isinstance(data_like, GeoDataFrameSource):
source = data_like
else:
raise ValueError(f'Unknown vector data type "{type(data_like).__name__}"')
gdf = source.read_data(
mask=mask,
handle_nodata=handle_nodata,
predicate=predicate,
variables=variables,
)
return gdf
[docs]
def get_geodataset(
self,
data_like: Union[
str, SourceSpecDict, Path, xr.Dataset, xr.DataArray, GeoDatasetSource
],
bbox: Optional[Bbox] = None,
geom: Optional[gpd.GeoDataFrame] = None,
buffer: Union[float, int] = 0,
handle_nodata: NoDataStrategy = NoDataStrategy.RAISE,
predicate: str = "intersects",
variables: Optional[List[str]] = None,
time_range: Optional[Union[Tuple[str, str], Tuple[datetime, datetime]]] = None,
single_var_as_array: bool = True,
provider: Optional[str] = None,
version: Optional[str] = None,
**kwargs,
) -> xr.Dataset:
"""Return a clipped, sliced and unified GeoDataset.
To clip the data to the area of interest, provide a `bbox` or `geom`,
with optional additional `buffer` argument.
To slice the data to the time period of interest, provide the
`time_range` argument. To return only the dataset variables
of interest provide the `variables` argument.
NOTE: Unless `single_var_as_array` is set to False a single-variable data source
will be returned as xarray.DataArray rather than Dataset.
Parameters
----------
data_like : Union[ str, SourceSpecDict, Path, xr.Dataset, xr.DataArray, GeoDatasetSource ]
Data catalog key, path to GeoDataset file, a GeoDatasetSource object,
or GeoDataset xarray object.
The catalog key can be a string or a dictionary with the following keys:
{'name', 'provider', 'version'}.
If a path to a file is provided it will be added
to the catalog with its based on the file basename.
bbox : Optional[List], optional
(xmin, ymin, xmax, ymax) bounding box of area of interest
(in WGS84 coordinates), by default None
geom : Optional[gpd.GeoDataFrame], optional
A geometry defining the area of interest, by default None
buffer : Union[float, int], optional
Buffer around the `bbox` or `geom` area of interest in meters, by default 0
handle_nodata : NoDataStrategy, optional
How to react when no data is found, by default NoDataStrategy.RAISE
predicate : str, optional
If predicate is provided, the GeoDataSet is filtered by testing
the predicate function against each item. Requires bbox or mask.
options are:
{'intersects', 'within', 'contains', 'overlaps', 'crosses', 'touches'},
by default 'intersects'
variables : Optional[List], optional
Names of GeoDataset variables to return, or all if None, by default None
time_range : Optional[TimeRange], optional
Start and end date of period of interest, or entire period if None, by default None
single_var_as_array : bool, optional
Wether to return a xr.DataArray if the dataset consists of a single variable,
by default True
provider : Optional[str], optional
Specifies a data provider, by default None
version : Optional[str], optional
Specifies a data version, by default None
**kwargs:
Extra keyword arguments passed to the GeoDatasetSource construction
Returns
-------
xr.Dataset
a clipped, sliced and unified GeoDataset
Raises
------
ValueError
If `data_like` is of an unknown type
NoDataException
If no data is found and handle_nodata is NoDataStrategy.RAISE
"""
if geom is not None or bbox is not None:
mask = _parse_geom_bbox_buffer(geom=geom, bbox=bbox, buffer=buffer)
else:
mask = None
if time_range:
time_range = _parse_time_range(time_range)
if isinstance(data_like, dict):
data_like, provider, version = _parse_data_like_dict(
data_like, provider, version
)
if isinstance(data_like, (str, Path)):
if isinstance(data_like, str) and data_like in self.sources:
name = data_like
source = self.get_source(name, provider=provider, version=version)
else:
if "provider" not in kwargs:
kwargs.update({"provider": "user"})
driver: str = kwargs.pop(
"driver", GeoDatasetSource._fallback_driver_read
)
name = basename(data_like)
source = GeoDatasetSource(
name=name,
uri=str(data_like),
driver=driver,
**kwargs,
)
self.add_source(name, source)
elif isinstance(data_like, (xr.DataArray, xr.Dataset)):
data_like = GeoDatasetAdapter._slice_data(
data_like,
variables=variables,
mask=mask,
predicate=predicate,
time_range=time_range,
)
if data_like is None:
exec_nodata_strat(
_NO_DATA_AFTER_SLICE_MSG,
strategy=handle_nodata,
)
return _single_var_as_array(data_like, single_var_as_array, variables)
elif isinstance(data_like, GeoDatasetSource):
source = data_like
else:
raise ValueError(f'Unknown geo data type "{type(data_like).__name__}"')
return source.read_data(
mask=mask,
handle_nodata=handle_nodata,
predicate=predicate,
variables=variables,
time_range=time_range,
single_var_as_array=single_var_as_array,
)
[docs]
def get_dataset(
self,
data_like: Union[
str, SourceSpecDict, Path, xr.Dataset, xr.DataArray, DatasetSource
],
variables: Optional[List] = None,
handle_nodata: NoDataStrategy = NoDataStrategy.RAISE,
time_range: Optional[TimeRange] = None,
single_var_as_array: bool = True,
provider: Optional[str] = None,
version: Optional[str] = None,
**kwargs,
) -> xr.Dataset:
"""Return a clipped, sliced and unified Dataset.
To slice the data to the time period of interest, provide the
`time_range` argument. To return only the dataset variables
of interest provide the `variables` argument.
NOTE: Unless `single_var_as_array` is set to False a single-variable data source
will be returned as xarray.DataArray rather than a xarray.Dataset.
Parameters
----------
data_like : Union[ str, SourceSpecDict, Path, xr.Dataset, xr.DataArray, DatasetSource ]
Data catalog key, path to Dataset file, DatasetSource or Dataset
xarray object.
The catalog key can be a string or a dictionary with the following keys:
{'name', 'provider', 'version'}.
If a path to a file is provided it will be added
to the catalog with its based on the file basename.
variables : Optional[List], optional
Names of Dataset variables to return, or all if None, by default None
handle_nodata : NoDataStrategy, optional
How to react when no data is found, by default NoDataStrategy.RAISE
time_range : Optional[TimeRange], optional
Start and end date of period of interest, or entire period if None, by default None
single_var_as_array : bool, optional
Wether to return a xr.DataArray if the dataset consists of a single variable,
by default True
provider : Optional[str], optional
Specifies a data provider, by default None
version : Optional[str], optional
Specifies a data version, by default None
**kwargs:
Extra keyword arguments passed to the DatasetSource construction
Returns
-------
xr.Dataset
a clipped, sliced and unified Dataset
Raises
------
ValueError
If `data_like` is of an unknown type
NoDataException
If no data is found and handle_nodata is NoDataStrategy.RAISE
"""
if isinstance(data_like, dict):
data_like, provider, version = _parse_data_like_dict(
data_like, provider, version
)
if time_range:
time_range = _parse_time_range(time_range)
if isinstance(data_like, (str, Path)):
if isinstance(data_like, str) and data_like in self.sources:
name = data_like
source = self.get_source(name, provider=provider, version=version)
else:
if "provider" not in kwargs:
kwargs.update({"provider": "user"})
name = basename(data_like)
source = DatasetSource(uri=str(data_like), name=name, **kwargs)
self.add_source(name, source)
elif isinstance(data_like, (xr.DataArray, xr.Dataset)):
data_like = DatasetAdapter._slice_data(
data_like,
variables,
time_range,
)
if data_like is None:
exec_nodata_strat(
_NO_DATA_AFTER_SLICE_MSG,
strategy=handle_nodata,
)
return _single_var_as_array(data_like, single_var_as_array, variables)
else:
raise ValueError(f'Unknown data type "{type(data_like).__name__}"')
return source.read_data(
variables=variables,
time_range=time_range,
single_var_as_array=single_var_as_array,
handle_nodata=handle_nodata,
)
[docs]
def get_dataframe(
self,
data_like: Union[str, SourceSpecDict, Path, pd.DataFrame, DataFrameSource],
variables: Optional[List] = None,
time_range: Optional[TimeRange] = None,
handle_nodata: NoDataStrategy = NoDataStrategy.RAISE,
provider: Optional[str] = None,
version: Optional[str] = None,
**kwargs,
) -> pd.DataFrame:
"""Return a clipped, sliced and unified DataFrame.
Parameters
----------
data_like : Union[str, SourceSpecDict, Path, pd.DataFrame, DataFrameSource]
Data catalog key, path to tabular data file, DataFrameSource object
or tabular pandas dataframe.
The catalog key can be a string or a dictionary with the following keys:
{'name', 'provider', 'version'}.
If a path to a tabular data file is provided it will be added
to the catalog with its based on the file basename.
variables : Optional[List], optional
Names of DataFrame variables to return, or all if None, by default None
time_range : Optional[TimeRange], optional
Start and end date of period of interest, or entire period if None, by default None
handle_nodata : NoDataStrategy, optional
How to react when no data is found, by default NoDataStrategy.RAISE
provider : Optional[str], optional
Specifies a data provider, by default None
version : Optional[str], optional
Specifies a data version, by default None
**kwargs:
Extra keyword arguments passed to the DataFrameSource construction
Returns
-------
pd.DataFrame
A unified and sliced DataFrame
Raises
------
ValueError
On unknown type of `data_like`
NoDataException
If no data is found and handle_nodata is NoDataStrategy.RAISE
"""
if isinstance(data_like, dict):
data_like, provider, version = _parse_data_like_dict(
data_like, provider, version
)
if time_range:
time_range = _parse_time_range(time_range)
if isinstance(data_like, (str, Path)):
if isinstance(data_like, str) and data_like in self.sources:
name = data_like
source: DataSource = self.get_source(
name, provider=provider, version=version
)
if not isinstance(source, DataFrameSource):
raise ValueError(f"Source '{source.name}' is not a DataFrame.")
else:
if "provider" not in kwargs:
kwargs.update({"provider": "user"})
driver: str = kwargs.pop(
"driver", DataFrameSource._fallback_driver_read
)
name = basename(data_like)
source = DataFrameSource(
uri=str(data_like), name=name, driver=driver, **kwargs
)
self.add_source(name, source)
elif isinstance(data_like, pd.DataFrame):
df = DataFrameAdapter._slice_data(data_like, variables, time_range)
if df is None:
exec_nodata_strat(
_NO_DATA_AFTER_SLICE_MSG,
strategy=handle_nodata,
)
return df
else:
raise ValueError(f'Unknown tabular data type "{type(data_like).__name__}"')
obj = source.read_data(
variables=variables,
time_range=time_range,
handle_nodata=handle_nodata,
)
return obj
def _parse_data_like_dict(
data_like: SourceSpecDict,
provider: Optional[str] = None,
version: Optional[str] = None,
):
if not SourceSpecDict.__required_keys__.issuperset(set(data_like.keys())):
unknown_keys = set(data_like.keys()) - SourceSpecDict.__required_keys__
raise ValueError(f"Unknown keys in requested data source: {unknown_keys}")
elif "source" not in data_like:
raise ValueError("No source key found in requested data source")
else:
source = data_like.get("source")
provider = data_like.get("provider", provider)
version = data_like.get("version", version)
return source, provider, version
def _parse_data_source_dict(
name: str,
data_source_dict: Dict,
root: Optional[Union[Path, str]] = None,
category: Optional[str] = None,
) -> DataSource:
"""Parse data source dictionary."""
# parse data
source = data_source_dict.copy() # important as we modify with pop
source["name"] = name
# add root
if root:
source.update({"root": str(root)})
# source meta data
meta: Dict[str, str] = source.get("metadata", {})
if "category" not in meta and category is not None:
meta.update(category=category)
source["metadata"] = meta
return create_source(source)
def _process_dict(d: Dict) -> Dict:
"""Recursively change dict values to keep only python literal structures."""
for k, v in d.items():
_check_key = isinstance(k, str)
if _check_key and isinstance(v, dict):
d[k] = _process_dict(v)
elif _check_key and isinstance(v, Path):
d[k] = str(v) # path to string
return d
def _denormalise_data_dict(data_dict) -> List[Tuple[str, Dict]]:
"""Return a flat list of with data name, dictionary of input data_dict.
Expand possible versions and variants in data_dict.
"""
data_list = []
for name, source in data_dict.items():
source = copy.deepcopy(source)
data_dicts = []
if "variants" in source:
variants = source.pop("variants")
for diff in variants:
source_copy = copy.deepcopy(source)
source_copy = {
str(k): v for (k, v) in _deep_merge(source_copy, diff).items()
}
data_dicts.append({name: source_copy})
elif "placeholders" in source:
options = source.pop("placeholders")
for combination in itertools.product(*options.values()):
source_copy = copy.deepcopy(source)
name_copy = name
for k, v in zip(options.keys(), combination):
name_copy = name_copy.replace("{" + k + "}", v)
source_copy["uri"] = source_copy["uri"].replace("{" + k + "}", v)
data_dicts.append({name_copy: source_copy})
else:
for k, v in source.items():
if isinstance(v, (int, float)):
# numbers are pretty much always a version here,
# and we need strings, so just cast to string when
# we encoutner a number. not the pretties,
# but it will have to do for now.
source[k] = str(v)
data_list.append((name, source))
continue
# recursively denormalise in case of multiple denormalise keys in source
for item in data_dicts:
data_list.extend(_denormalise_data_dict(item))
return data_list
def _parse_time_range(
time_range: Tuple[Union[str, datetime], Union[str, datetime]],
) -> TimeRange:
"""Parse timerange with strings to datetime."""
if any(map(lambda t: not isinstance(t, datetime), time_range)):
time_range = tuple(map(lambda t: dateutil.parser.parse(t), time_range))
return cast(TimeRange, time_range)