#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""DataCatalog module for HydroMT"""
from __future__ import annotations
import os
from os.path import join, isdir, dirname, basename, isfile, abspath, exists
import copy
from pathlib import Path
from typing import Dict, List, Tuple, Union, Optional
import warnings
import numpy as np
import pandas as pd
import geopandas as gpd
import xarray as xr
import yaml
import logging
import requests
import shutil
from packaging.version import Version
import itertools
from .data_adapter import (
DataAdapter,
RasterDatasetAdapter,
GeoDatasetAdapter,
GeoDataFrameAdapter,
DataFrameAdapter,
)
from .data_adapter.caching import _uri_validator, _copyfile, HYDROMT_DATADIR
logger = logging.getLogger(__name__)
__all__ = [
"DataCatalog",
]
[docs]class DataCatalog(object):
# root URL with data_catalog file
_url = r"https://raw.githubusercontent.com/Deltares/hydromt/main/data/predefined_catalogs.yml"
_cache_dir = HYDROMT_DATADIR
[docs] def __init__(
self,
data_libs: Union[List, str] = [],
fallback_lib: Optional[str] = "artifact_data",
logger=logger,
cache: bool = False,
cache_dir: str = None,
**artifact_keys,
) -> None:
"""Catalog of DataAdapter sources to easily read from different files
and keep track of files which have been accessed.
Arguments
---------
data_libs: (list of) str, Path, optional
One or more paths to data catalog yaml files or names of predefined data catalogs.
By default the data catalog is initiated without data entries.
See :py:func:`~hydromt.data_adapter.DataCatalog.from_yml` for accepted yaml format.
fallback_lib:
Name of pre-defined data catalog to read if no data_libs are provided, by default 'artifact_data'.
If None, no default data catalog is used.
cache: bool, optional
Set to true to cache data locally before reading.
Currently only implemented for tiled rasterdatasets, by default False.
cache_dir: str, Path, optional
Folder root path to cach data to, by default ~/.hydromt_data
artifact_keys:
Deprecated from version v0.5
"""
if data_libs is None: # legacy code. to be removed
data_libs = []
elif not isinstance(data_libs, list): # make sure data_libs is a list
data_libs = np.atleast_1d(data_libs).tolist()
self._sources = {} # dictionary of DataAdapter
self._catalogs = {} # dictionary of predefined Catalogs
self._used_data = []
self._fallback_lib = fallback_lib
self.logger = logger
# caching
self.cache = bool(cache)
if cache_dir is not None:
self._cache_dir = cache_dir
# legacy code. to be removed
for lib, version in artifact_keys.items():
warnings.warn(
"Adding a predefined data catalog as key-word argument is deprecated, "
f"add the catalog as '{lib}={version}' to the data_libs list instead.",
DeprecationWarning,
)
if not version: # False or None
continue
elif isinstance(version, str):
lib += f"={version}"
data_libs = [lib] + data_libs
# parse data catalogs; both user and pre-defined
for name_or_path in data_libs:
if str(name_or_path).split(".")[-1] in ["yml", "yaml"]: # user defined
self.from_yml(name_or_path)
else: # predefined
self.from_predefined_catalogs(name_or_path)
@property
def sources(self) -> Dict:
"""Returns dictionary of DataAdapter sources."""
if len(self._sources) == 0 and self._fallback_lib is not None:
# read artifacts by default if no catalogs are provided
self.from_predefined_catalogs(self._fallback_lib)
return self._sources
@property
def keys(self) -> List:
"""Returns list of data source names."""
return list(self._sources.keys())
@property
def predefined_catalogs(self) -> Dict:
if not self._catalogs:
self.set_predefined_catalogs()
return self._catalogs
def __getitem__(self, key: str) -> DataAdapter:
return self._sources[key]
def __setitem__(self, key: str, value: DataAdapter) -> None:
if not isinstance(value, DataAdapter):
raise ValueError(f"Value must be DataAdapter, not {type(key).__name__}.")
if key in self._sources:
self.logger.warning(f"Overwriting data source {key}.")
return self._sources.__setitem__(key, value)
def __iter__(self):
return self._sources.__iter__()
def __len__(self):
return self._sources.__len__()
def __repr__(self):
return self.to_dataframe().__repr__()
def _repr_html_(self):
return self.to_dataframe()._repr_html_()
[docs] def update(self, **kwargs) -> None:
"""Add data sources to library."""
for k, v in kwargs.items():
self[k] = v
[docs] def set_predefined_catalogs(self, urlpath: Union[Path, str] = None) -> Dict:
# get predefined_catalogs
urlpath = self._url if urlpath is None else urlpath
# TODO cache predefined catalogs
self._catalogs = _yml_from_uri_or_path(urlpath)
return self._catalogs
def from_artifacts(
self, name: str = "artifact_data", version: str = "latest"
) -> None:
"""Deprecated method. Use :py:func:`hydromt.data_catalog.DataCatalog.from_predefined_catalogs` instead
Parameters
----------
name : str, optional
Catalog name. If None (default) sample data is downloaded.
version : str, optional
Release version. By default it takes the latest known release.
"""
warnings.warn(
f'"from_artifacts" is deprecated. Use "from_predefined_catalogs instead".',
DeprecationWarning,
)
self.from_predefined_catalogs(name, version)
[docs] def from_predefined_catalogs(self, name: str, version: str = "latest") -> None:
if "=" in name:
name, version = name.split("=")[0], name.split("=")[-1]
if name not in self.predefined_catalogs:
raise ValueError(
f'Catalog with name "{name}" not found in predefined catalogs'
)
urlpath = self.predefined_catalogs[name].get("urlpath")
versions_dict = self.predefined_catalogs[name].get("versions")
if version == "latest" or not isinstance(version, str):
versions = list(versions_dict.keys())
if len(versions) > 1:
version = versions[np.argmax([Version(v) for v in versions])]
else:
version = versions[0]
urlpath = urlpath.format(version=versions_dict.get(version, version))
if urlpath.split(".")[-1] in ["gz", "zip"]:
self.logger.info(f"Reading data catalog {name} {version} from archive")
self.from_archive(urlpath, name=name, version=version)
else:
self.logger.info(f"Reading data catalog {name} {version}")
self.from_yml(urlpath)
[docs] def from_archive(
self, urlpath: Union[Path, str], version: str = None, name: str = None
) -> None:
"""Read a data archive including a data_catalog.yml file"""
name = basename(urlpath).split(".")[0] if name is None else name
root = join(self._cache_dir, name)
if version is not None:
root = join(root, version)
archive_fn = join(root, basename(urlpath))
yml_fn = join(root, "data_catalog.yml")
if not isdir(root):
os.makedirs(root)
# download data if url
if _uri_validator(str(urlpath)) and not isfile(archive_fn):
_copyfile(urlpath, archive_fn)
# unpack data
if not isfile(yml_fn):
self.logger.debug(f"Unpacking data from {archive_fn}")
shutil.unpack_archive(archive_fn, root)
# parse catalog
self.from_yml(yml_fn)
[docs] def from_yml(
self, urlpath: Union[Path, str], root: str = None, mark_used: bool = False
) -> None:
"""Add data sources based on yaml file.
Parameters
----------
urlpath: str, Path
Path or url to data source yaml files.
root: str, Path, optional
Global root for all relative paths in yaml file(s).
mark_used: bool
If True, append to used_data list.
Examples
--------
A yaml data entry is provided below, where all the text between <>
should be filled by the user. Multiple data sources of the same
data type should be grouped. Currently the following data types are supported:
{'RasterDataset', 'GeoDataset', 'GeoDataFrame'}. See the specific data adapters
for more information about the required and optional arguments.
.. code-block:: console
meta:
root: <path>
category: <category>
version: <version>
<name>:
path: <path>
data_type: <data_type>
driver: <driver>
filesystem: <filesystem>
kwargs:
<key>: <value>
crs: <crs>
nodata:
<hydromt_variable_name1>: <nodata>
rename:
<native_variable_name1>: <hydromt_variable_name1>
<native_variable_name2>: <hydromt_variable_name2>
unit_add:
<hydromt_variable_name1>: <float/int>
unit_mult:
<hydromt_variable_name1>: <float/int>
meta:
source_url: <source_url>
source_version: <source_version>
source_licence: <source_licence>
paper_ref: <paper_ref>
paper_doi: <paper_doi>
placeholders:
<placeholder_name_1>: <list of names>
<placeholder_name_2>: <list of names>
zoom_levels:
<zoom_level_1>: <resolution_1>
<zoom_level_2>: <resolution_2>
"""
self.logger.info(f"Parsing data catalog from {urlpath}")
yml = _yml_from_uri_or_path(urlpath)
# parse metadata
meta = dict()
# legacy code with root/category at highest yml level
if "root" in yml:
meta.update(root=yml.pop("root"))
if "category" in yml:
meta.update(category=yml.pop("category"))
# read meta data
meta = yml.pop("meta", meta)
catalog_name = meta.get("name", "".join(basename(urlpath).split(".")[:-1]))
# TODO keep meta data!! Note only possible if yml files are not merged
if root is None:
root = meta.get("root", os.path.dirname(urlpath))
self.from_dict(
yml,
catalog_name=catalog_name,
root=root,
category=meta.get("category", None),
mark_used=mark_used,
)
[docs] def from_dict(
self,
data_dict: Dict,
catalog_name: str = "",
root: Union[str, Path] = None,
category: str = None,
mark_used: bool = False,
) -> None:
"""Add data sources based on dictionary.
Parameters
----------
data_dict: dict
Dictionary of data_sources.
catalog_name: str, optional
Name of data catalog
root: str, Path, optional
Global root for all relative paths in `data_dict`.
category: str, optional
Global category for all sources in `data_dict`.
mark_used: bool
If True, append to used_data list.
Examples
--------
A data dictionary with two entries is provided below, where all the text between <>
should be filled by the user. See the specific data adapters
for more information about the required and optional arguments.
.. code-block:: text
{
<name1>: {
"path": <path>,
"data_type": <data_type>,
"driver": <driver>,
"filesystem": <filesystem>,
"kwargs": {<key>: <value>},
"crs": <crs>,
"nodata": <nodata>,
"rename": {<native_variable_name1>: <hydromt_variable_name1>},
"unit_add": {<hydromt_variable_name1>: <float/int>},
"unit_mult": {<hydromt_variable_name1>: <float/int>},
"meta": {...},
"placeholders": {<placeholder_name_1>: <list of names>},
"zoom_levels": {<zoom_level_1>: <resolution_1>},
}
<name2>: {
...
}
}
"""
data_dict = _parse_data_dict(
data_dict,
catalog_name=catalog_name,
root=root,
category=category,
)
self.update(**data_dict)
if mark_used:
self._used_data.extend(list(data_dict.keys()))
[docs] def to_yml(
self,
path: Union[str, Path],
root: str = "auto",
source_names: Optional[List] = None,
used_only: bool = False,
meta: Dict = {},
) -> None:
"""Write data catalog to yaml format.
Parameters
----------
path: str, Path
yaml output path.
root: str, Path, optional
Global root for all relative paths in yaml file.
If "auto" (default) the data source paths are relative to the yaml output ``path``.
source_names: list, optional
List of source names to export, by default None in which case all sources are exported.
This argument is ignored if `used_only=True`.
used_only: bool, optional
If True, export only data entries kept in used_data list, by default False.
meta: dict, optional
key-value pairs to add to the data catalog meta section, such as 'version', by default empty.
"""
source_names = self._used_data if used_only else source_names
yml_dir = os.path.dirname(abspath(path))
if root == "auto":
root = yml_dir
data_dict = self.to_dict(root=root, source_names=source_names, meta=meta)
if str(root) == yml_dir:
data_dict.pop("root", None) # remove root if it equals the yml_dir
if data_dict:
with open(path, "w") as f:
yaml.dump(data_dict, f, default_flow_style=False, sort_keys=False)
else:
self.logger.info("The data catalog is empty, no yml file is written.")
[docs] def to_dict(
self,
source_names: Optional[List] = None,
root: Union[Path, str] = None,
meta: dict = {},
) -> Dict:
"""Export the data catalog to a dictionary.
Parameters
----------
source_names : list, optional
List of source names to export, by default None in which case all sources are exported.
root : str, Path, optional
Global root for all relative paths in yml file.
meta: dict, optional
key-value pairs to add to the data catalog meta section, such as 'version', by default empty.
Returns
-------
dict
data catalog dictionary
"""
sources_out = dict()
if root is not None:
root = abspath(root)
meta.update(**{"root": root})
root_drive = os.path.splitdrive(root)[0]
for name, source in sorted(self._sources.items()): # alphabetical order
if source_names is not None and name not in source_names:
continue
source_dict = source.to_dict()
if root is not None:
path = source_dict["path"] # is abspath
source_drive = os.path.splitdrive(path)[0]
if (
root_drive == source_drive
and os.path.commonpath([path, root]) == root
):
source_dict["path"] = os.path.relpath(
source_dict["path"], root
).replace("\\", "/")
# remove non serializable entries to prevent errors
source_dict = _process_dict(source_dict, logger=self.logger) # TODO TEST
sources_out.update({name: source_dict})
if meta:
sources_out = {"meta": meta, **sources_out}
return sources_out
[docs] def to_dataframe(self, source_names: List = []) -> pd.DataFrame:
"""Return data catalog summary as DataFrame"""
d = dict()
for name, source in self._sources.items():
if len(source_names) > 0 and name not in source_names:
continue
d[name] = source.summary()
return pd.DataFrame.from_dict(d, orient="index")
[docs] def export_data(
self,
data_root: Union[Path, str],
bbox: List = None,
time_tuple: Tuple = None,
source_names: List = [],
unit_conversion: bool = True,
meta: Dict = {},
append: bool = False,
) -> None:
"""Export a data slice of each dataset and a data_catalog.yml file to disk.
Parameters
----------
data_root : str, Path
Path to output folder
bbox : array-like of floats
(xmin, ymin, xmax, ymax) bounding box of area of interest.
time_tuple : tuple of str, datetime, optional
Start and end date of period of interest. By default the entire time period
of the dataset is returned.
source_names: list, optional
List of source names to export, by default None in which case all sources are exported.
Specific variables can be selected by appending them to the source name in square brackets.
For example, to export all variables of 'source_name1' and only 'var1' and 'var2' of 'source_name'
use source_names=['source_name1', 'source_name2[var1,var2]']
unit_conversion: boolean, optional
If False skip unit conversion when parsing data from file, by default True.
meta: dict, optional
key-value pairs to add to the data catalog meta section, such as 'version', by default empty.
append: bool, optional
If True, append to existing data catalog, by default False.
"""
data_root = abspath(data_root)
if not os.path.isdir(data_root):
os.makedirs(data_root)
# create copy of data with selected source names
source_vars = {}
if len(source_names) > 0:
sources = {}
for name in source_names:
# deduce variables from name
if "[" in name:
variables = name.split("[")[-1].split("]")[0].split(",")
name = name.split("[")[0]
source_vars[name] = variables
sources[name] = copy.deepcopy(self.sources[name])
else:
sources = copy.deepcopy(self.sources)
# read existing data catalog if it exists
fn = join(data_root, "data_catalog.yml")
if isfile(fn) and append:
self.logger.info(f"Appending existing data catalog {fn}")
sources_out = DataCatalog(fn).sources
else:
sources_out = {}
# export data and update sources
for key, source in sources.items():
try:
# read slice of source and write to file
self.logger.debug(f"Exporting {key}.")
if not unit_conversion:
unit_mult = source.unit_mult
unit_add = source.unit_add
source.unit_mult = {}
source.unit_add = {}
fn_out, driver = source.to_file(
data_root=data_root,
data_name=key,
variables=source_vars.get(key, None),
bbox=bbox,
time_tuple=time_tuple,
logger=self.logger,
)
if fn_out is None:
self.logger.warning(f"{key} file contains no data within domain")
continue
# update path & driver and remove kwargs and rename in output sources
if unit_conversion:
source.unit_mult = {}
source.unit_add = {}
else:
source.unit_mult = unit_mult
source.unit_add = unit_add
source.path = fn_out
source.driver = driver
source.filesystem = "local"
source.kwargs = {}
source.rename = {}
if key in sources_out:
self.logger.warning(
f"{key} already exists in data catalog and is overwritten."
)
sources_out[key] = source
except FileNotFoundError:
self.logger.warning(f"{key} file not found at {source.path}")
# write data catalog to yml
data_catalog_out = DataCatalog()
data_catalog_out._sources = sources_out
data_catalog_out.to_yml(fn, root="auto", meta=meta)
[docs] def get_rasterdataset(
self,
data_like: Union[str, Path, xr.Dataset, xr.DataArray],
bbox: List = None,
geom: gpd.GeoDataFrame = None,
zoom_level: int | tuple = None,
buffer: Union[float, int] = 0,
align: bool = None,
variables: Union[List, str] = None,
time_tuple: Tuple = None,
single_var_as_array: bool = True,
**kwargs,
) -> xr.Dataset:
"""Returns a clipped, sliced and unified RasterDataset.
To clip the data to the area of interest, provide a `bbox` or `geom`,
with optional additional `buffer` and `align` arguments.
To slice the data to the time period of interest, provide the `time_tuple` argument.
To return only the dataset variables of interest provide the `variables` argument.
NOTE: Unless `single_var_as_array` is set to False a single-variable data source
will be returned as :py:class:`xarray.DataArray` rather than :py:class:`xarray.Dataset`.
Arguments
---------
data_like: str, Path, xr.Dataset, xr.Datarray
Data catalog key, path to raster file or raster xarray data object.
If a path to a raster file is provided it will be added
to the data_catalog with its based on the file basename without extension.
bbox : array-like of floats
(xmin, ymin, xmax, ymax) bounding box of area of interest (in WGS84 coordinates).
geom : geopandas.GeoDataFrame/Series,
A geometry defining the area of interest.
zoom_level : int, tuple, optional
Zoom level of the xyz tile dataset (0 is base level)
Using a tuple the zoom level can be specified as (<zoom_resolution>, <unit>),
e.g., (1000, 'meter')
buffer : int, optional
Buffer around the `bbox` or `geom` area of interest in pixels. By default 0.
align : float, optional
Resolution to align the bounding box, by default None
variables : str or list of str, optional.
Names of RasterDataset variables to return. By default all dataset variables
are returned.
time_tuple : tuple of str, datetime, optional
Start and end date of period of interest. By default the entire time period
of the dataset is returned.
single_var_as_array: bool, optional
If True, return a DataArray if the dataset consists of a single variable.
If False, always return a Dataset. By default True.
Returns
-------
obj: xarray.Dataset or xarray.DataArray
RasterDataset
"""
if isinstance(data_like, (xr.DataArray, xr.Dataset)):
return data_like
elif not isinstance(data_like, (str, Path)):
raise ValueError(f'Unknown raster data type "{type(data_like).__name__}"')
if data_like not in self.sources and exists(abspath(data_like)):
path = str(abspath(data_like))
name = basename(data_like).split(".")[0]
self.update(**{name: RasterDatasetAdapter(path=path, **kwargs)})
elif data_like in self.sources:
name = data_like
else:
raise FileNotFoundError(f"No such file or catalog key: {data_like}")
self._used_data.append(name)
source = self.sources[name]
self.logger.info(
f"DataCatalog: Getting {name} RasterDataset {source.driver} data from {source.path}"
)
obj = self.sources[name].get_data(
bbox=bbox,
geom=geom,
buffer=buffer,
zoom_level=zoom_level,
align=align,
variables=variables,
time_tuple=time_tuple,
single_var_as_array=single_var_as_array,
cache_root=self._cache_dir if self.cache else None,
logger=self.logger,
)
return obj
[docs] def get_geodataframe(
self,
data_like: Union[str, Path, gpd.GeoDataFrame],
bbox: List = None,
geom: gpd.GeoDataFrame = None,
buffer: Union[float, int] = 0,
variables: Union[List, str] = None,
predicate: str = "intersects",
**kwargs,
):
"""Returns a clipped and unified GeoDataFrame (vector).
To clip the data to the area of interest, provide a `bbox` or `geom`,
with optional additional `buffer` and `align` arguments.
To return only the dataframe columns of interest provide the `variables` argument.
Arguments
---------
data_like: str, Path, gpd.GeoDataFrame
Data catalog key, path to vector file or a vector geopandas object.
If a path to a vector file is provided it will be added
to the data_catalog with its based on the file basename without extension.
bbox : array-like of floats
(xmin, ymin, xmax, ymax) bounding box of area of interest (in WGS84 coordinates).
geom : geopandas.GeoDataFrame/Series,
A geometry defining the area of interest.
buffer : float, optional
Buffer around the `bbox` or `geom` area of interest in meters. By default 0.
predicate : {'intersects', 'within', 'contains', 'overlaps', 'crosses', 'touches'}, optional
If predicate is provided, the GeoDataFrame is filtered by testing
the predicate function against each item. Requires bbox or mask.
By default 'intersects'
align : float, optional
Resolution to align the bounding box, by default None
variables : str or list of str, optional.
Names of GeoDataFrame columns to return. By default all columns are returned.
Returns
-------
gdf: geopandas.GeoDataFrame
GeoDataFrame
"""
if isinstance(data_like, gpd.GeoDataFrame):
return data_like
elif not isinstance(data_like, (str, Path)):
raise ValueError(f'Unknown vector data type "{type(data_like).__name__}"')
if data_like not in self.sources and exists(abspath(data_like)):
path = str(abspath(data_like))
name = basename(data_like).split(".")[0]
self.update(**{name: GeoDataFrameAdapter(path=path, **kwargs)})
elif data_like in self.sources:
name = data_like
else:
raise FileNotFoundError(f"No such file or catalog key: {data_like}")
self._used_data.append(name)
source = self.sources[name]
self.logger.info(
f"DataCatalog: Getting {name} GeoDataFrame {source.driver} data from {source.path}"
)
gdf = source.get_data(
bbox=bbox,
geom=geom,
buffer=buffer,
predicate=predicate,
variables=variables,
logger=self.logger,
)
return gdf
[docs] def get_geodataset(
self,
data_like: Union[Path, str, xr.DataArray, xr.Dataset],
bbox: List = None,
geom: gpd.GeoDataFrame = None,
buffer: Union[float, int] = 0,
variables: List = None,
time_tuple: Tuple = None,
single_var_as_array: bool = True,
**kwargs,
) -> xr.Dataset:
"""Returns a clipped, sliced and unified GeoDataset.
To clip the data to the area of interest, provide a `bbox` or `geom`,
with optional additional `buffer` and `align` arguments.
To slice the data to the time period of interest, provide the `time_tuple` argument.
To return only the dataset variables of interest provide the `variables` argument.
NOTE: Unless `single_var_as_array` is set to False a single-variable data source
will be returned as xarray.DataArray rather than Dataset.
Arguments
---------
data_like: str, Path, xr.Dataset, xr.DataArray
Data catalog key, path to geodataset file or geodataset xarray object.
If a path to a file is provided it will be added
to the data_catalog with its based on the file basename without extension.
bbox : array-like of floats
(xmin, ymin, xmax, ymax) bounding box of area of interest (in WGS84 coordinates).
geom : geopandas.GeoDataFrame/Series,
A geometry defining the area of interest.
buffer : float, optional
Buffer around the `bbox` or `geom` area of interest in meters. By default 0.
align : float, optional
Resolution to align the bounding box, by default None
variables : str or list of str, optional.
Names of GeoDataset variables to return. By default all dataset variables
are returned.
time_tuple : tuple of str, datetime, optional
Start and end date of period of interest. By default the entire time period
of the dataset is returned.
single_var_as_array: bool, optional
If True, return a DataArray if the dataset consists of a single variable.
If False, always return a Dataset. By default True.
Returns
-------
obj: xarray.Dataset or xarray.DataArray
GeoDataset
"""
if isinstance(data_like, (xr.DataArray, xr.Dataset)):
return data_like
elif not isinstance(data_like, (str, Path)):
raise ValueError(f'Unknown geo data type "{type(data_like).__name__}"')
if data_like not in self.sources and exists(abspath(data_like)):
path = str(abspath(data_like))
name = basename(data_like).split(".")[0]
self.update(**{name: GeoDatasetAdapter(path=path, **kwargs)})
elif data_like in self.sources:
name = data_like
else:
raise FileNotFoundError(f"No such file or catalog key: {data_like}")
self._used_data.append(name)
source = self.sources[name]
self.logger.info(
f"DataCatalog: Getting {name} GeoDataset {source.driver} data from {source.path}"
)
obj = source.get_data(
bbox=bbox,
geom=geom,
buffer=buffer,
variables=variables,
time_tuple=time_tuple,
single_var_as_array=single_var_as_array,
logger=self.logger,
)
return obj
def get_dataframe(
self,
data_like: Union[str, Path, pd.DataFrame],
variables: list = None,
time_tuple: tuple = None,
**kwargs,
):
"""Returns a unified and sliced DataFrame.
Parameters
----------
data_like : str, Path, pd.DataFrame
Data catalog key, path to tabular data file or tabular pandas dataframe object.
If a path to a tabular data file is provided it will be added
to the data_catalog with its based on the file basename without extension.
variables : str or list of str, optional.
Names of GeoDataset variables to return. By default all dataset variables
are returned.
time_tuple : tuple of str, datetime, optional
Start and end date of period of interest. By default the entire time period
of the dataset is returned.
Returns
-------
pd.DataFrame
Tabular data
"""
if isinstance(data_like, pd.DataFrame):
return data_like
elif not isinstance(data_like, (str, Path)):
raise ValueError(f'Unknown tabular data type "{type(data_like).__name__}"')
if data_like not in self.sources and exists(abspath(data_like)):
path = str(abspath(data_like))
name = basename(data_like).split(".")[0]
self.update(**{name: DataFrameAdapter(path=path, **kwargs)})
elif data_like in self.sources:
name = data_like
else:
raise FileNotFoundError(f"No such file or catalog key: {data_like}")
self._used_data.append(name)
source = self.sources[name]
self.logger.info(
f"DataCatalog: Getting {name} DataFrame {source.driver} data from {source.path}"
)
obj = source.get_data(
variables=variables,
time_tuple=time_tuple,
logger=self.logger,
)
return obj
def _parse_data_dict(
data_dict: Dict,
catalog_name: str = "",
root: Union[Path, str] = None,
category: str = None,
) -> Dict:
"""Parse data source dictionary."""
# link yml keys to adapter classes
ADAPTERS = {
"RasterDataset": RasterDatasetAdapter,
"GeoDataFrame": GeoDataFrameAdapter,
"GeoDataset": GeoDatasetAdapter,
"DataFrame": DataFrameAdapter,
}
# NOTE: shouldn't the kwarg overwrite the dict/yml ?
if root is None:
root = data_dict.pop("root", None)
# parse data
data = dict()
for name, source in data_dict.items():
source = source.copy() # important as we modify with pop
if "alias" in source:
alias = source.pop("alias")
if alias not in data_dict:
raise ValueError(f"alias {alias} not found in data_dict.")
# use alias source but overwrite any attributes with original source
source_org = source.copy()
source = data_dict[alias].copy()
source.update(source_org)
if "path" not in source:
raise ValueError(f"{name}: Missing required path argument.")
data_type = source.pop("data_type", None)
if data_type is None:
raise ValueError(f"{name}: Data type missing.")
elif data_type not in ADAPTERS:
raise ValueError(f"{name}: Data type {data_type} unknown")
adapter = ADAPTERS.get(data_type)
# Only for local files
path = source.pop("path")
# if remote path, keep as is else call abs_path method to solve local files
if not _uri_validator(path):
path = abs_path(root, path)
meta = source.pop("meta", {})
if "category" not in meta and category is not None:
meta.update(category=category)
# lower kwargs for backwards compatability
# FIXME this could be problamatic if driver kwargs conflict DataAdapter arguments
source.update(**source.pop("kwargs", {}))
for opt in source:
if "fn" in opt: # get absolute paths for file names
source.update({opt: abs_path(root, source[opt])})
if "placeholders" in source:
# pop avoid placeholders being passed to adapter
options = source.pop("placeholders")
for combination in itertools.product(*options.values()):
path_n = path
name_n = name
for k, v in zip(options.keys(), combination):
path_n = path_n.replace("{" + k + "}", v)
name_n = name_n.replace("{" + k + "}", v)
data[name_n] = adapter(
path=path_n,
name=name_n,
catalog_name=catalog_name,
meta=meta,
**source,
)
else:
data[name] = adapter(
path=path, name=name, catalog_name=catalog_name, meta=meta, **source
)
return data
def _yml_from_uri_or_path(uri_or_path: Union[Path, str]) -> Dict:
if _uri_validator(uri_or_path):
with requests.get(uri_or_path, stream=True) as r:
if r.status_code != 200:
raise IOError(f"URL {r.content}: {uri_or_path}")
yml = yaml.load(r.text, Loader=yaml.FullLoader)
else:
with open(uri_or_path, "r") as stream:
yml = yaml.load(stream, Loader=yaml.FullLoader)
return yml
def _process_dict(d: Dict, logger=logger) -> Dict:
"""Recursively change dict values to keep only python literal structures."""
for k, v in d.items():
_check_key = isinstance(k, str)
if _check_key and isinstance(v, dict):
d[k] = _process_dict(v)
elif _check_key and isinstance(v, Path):
d[k] = str(v) # path to string
# elif not _check_key or not isinstance(v, (list, str, int, float, bool)):
# d.pop(k) # remove this entry
# logger.debug(f'Removing non-serializable entry "{k}"')
return d
def abs_path(root: Union[Path, str], rel_path: Union[Path, str]) -> str:
path = Path(str(rel_path))
if not path.is_absolute():
if root is not None:
rel_path = join(root, rel_path)
path = Path(abspath(rel_path))
return str(path)