Source code for hydromt.data_catalog

#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""DataCatalog module for HydroMT"""

import os
from os.path import join, isdir, dirname, basename, isfile, abspath, exists
import copy
from pathlib import Path
from typing import Dict, List, Tuple, Union
import warnings
import numpy as np
import pandas as pd
import geopandas as gpd
import xarray as xr
import yaml
import logging
import requests
from urllib.parse import urlparse
import shutil
from packaging.version import Version
import itertools

from .data_adapter import (
    DataAdapter,
    RasterDatasetAdapter,
    GeoDatasetAdapter,
    GeoDataFrameAdapter,
    DataFrameAdapter,
)

logger = logging.getLogger(__name__)

__all__ = [
    "DataCatalog",
]


[docs]class DataCatalog(object): # root URL with data_catalog file _url = r"https://raw.githubusercontent.com/Deltares/hydromt/main/data/predefined_catalogs.yml" _cache_dir = join(Path.home(), ".hydromt_data")
[docs] def __init__( self, data_libs: Union[List, str] = [], logger=logger, **artifact_keys ) -> None: """Catalog of DataAdapter sources to easily read from different files and keep track of files which have been accessed. Arguments --------- data_libs: (list of) str, Path, optional One or more paths to data catalog yaml files or names of predefined data catalogs. By default the data catalog is initiated without data entries. See :py:func:`~hydromt.data_adapter.DataCatalog.from_yml` for accepted yaml format. artifact_keys: Deprecated from version v0.5 """ if data_libs is None: # legacy code. to be removed data_libs = [] elif not isinstance(data_libs, list): # make sure data_libs is a list data_libs = np.atleast_1d(data_libs).tolist() self._sources = {} # dictionary of DataAdapter self._catalogs = {} # dictionary of predefined Catalogs self._used_data = [] self.logger = logger # legacy code. to be removed for lib, version in artifact_keys.items(): warnings.warn( f"{lib}={version} as key-word argument is deprecated, add the predefined data catalog as string to the data_libs argument instead", DeprecationWarning, ) if not version: # False or None continue elif isinstance(version, str): lib += f"={version}" data_libs = [lib] + data_libs # parse data catalogs; both user and pre-defined for name_or_path in data_libs: if str(name_or_path).split(".")[-1] in ["yml", "yaml"]: # user defined self.from_yml(name_or_path) else: # predefined self.from_predefined_catalogs(name_or_path)
@property def sources(self) -> Dict: """Returns dictionary of DataAdapter sources.""" if len(self._sources) == 0: # read artifacts by default if no catalogs are provided self.from_predefined_catalogs("artifact_data") return self._sources @property def keys(self) -> List: """Returns list of data source names.""" return list(self.sources.keys()) @property def predefined_catalogs(self) -> Dict: if not self._catalogs: self.set_predefined_catalogs() return self._catalogs def __getitem__(self, key: str) -> DataAdapter: return self.sources[key] def __setitem__(self, key: str, value: DataAdapter) -> None: if not isinstance(value, DataAdapter): raise ValueError(f"Value must be DataAdapter, not {type(key).__name__}.") if key in self._sources: self.logger.warning(f"Overwriting data source {key}.") return self._sources.__setitem__(key, value) def __iter__(self): return self.sources.__iter__() def __len__(self): return self.sources.__len__() def __repr__(self): return self.to_dataframe().__repr__() def _repr_html_(self): return self.to_dataframe()._repr_html_()
[docs] def update(self, **kwargs) -> None: """Add data sources to library.""" for k, v in kwargs.items(): self[k] = v
[docs] def set_predefined_catalogs(self, urlpath: Union[Path, str] = None) -> Dict: # get predefined_catalogs urlpath = self._url if urlpath is None else urlpath self._catalogs = _yml_from_uri_or_path(urlpath) return self._catalogs
def from_artifacts( self, name: str = "artifact_data", version: str = "latest" ) -> None: """Deprecated method. Use :py:func:`hydromt.data_catalog.DataCatalog.from_predefined_catalogs` instead Parameters ---------- name : str, optional Catalog name. If None (default) sample data is downloaded. version : str, optional Release version. By default it takes the latest known release. """ warnings.warn( f'"from_artifacts" is deprecated. Use "from_predefined_catalogs instead".', DeprecationWarning, ) self.from_predefined_catalogs(name, version)
[docs] def from_predefined_catalogs(self, name: str, version: str = "latest") -> None: if "=" in name: name, version = name.split("=")[0], name.split("=")[-1] if name not in self.predefined_catalogs: raise ValueError( f'Catalog with name "{name}" not found in predefined catalogs' ) urlpath = self.predefined_catalogs[name].get("urlpath") versions_dict = self.predefined_catalogs[name].get("versions") if version == "latest" or not isinstance(version, str): versions = list(versions_dict.keys()) if len(versions) > 1: version = versions[np.argmax([Version(v) for v in versions])] else: version = versions[0] urlpath = urlpath.format(version=versions_dict.get(version, version)) if urlpath.split(".")[-1] in ["gz", "zip"]: self.logger.info(f"Reading data catalog {name} {version} from archive") self.from_archive(urlpath, name=name, version=version) else: self.logger.info(f"Reading data catalog {name} {version}") self.from_yml(urlpath)
[docs] def from_archive( self, urlpath: Union[Path, str], version: str = None, name: str = None ) -> None: """Read a data archive including a data_catalog.yml file""" name = basename(urlpath).split(".")[0] if name is None else name root = join(self._cache_dir, name) if version is not None: root = join(root, version) archive_fn = join(root, basename(urlpath)) yml_fn = join(root, "data_catalog.yml") if not isdir(root): os.makedirs(root) # download data if url if _uri_validator(str(urlpath)) and not isfile(archive_fn): with requests.get(urlpath, stream=True) as r: if r.status_code != 200: self.logger.error(f"Data archive not found at {urlpath}") return r.status_code self.logger.info(f"Downloading data archive file to {archive_fn}") with open(archive_fn, "wb") as f: shutil.copyfileobj(r.raw, f) # unpack data if not isfile(yml_fn): self.logger.debug(f"Unpacking data from {archive_fn}") shutil.unpack_archive(archive_fn, root) # parse catalog self.from_yml(yml_fn)
[docs] def from_yml( self, urlpath: Union[Path, str], root: str = None, mark_used: bool = False ) -> None: """Add data sources based on yaml file. Parameters ---------- urlpath: str, Path Path or url to data source yaml files. root: str, Path, optional Global root for all relative paths in yaml file(s). mark_used: bool If True, append to used_data list. Examples -------- A yaml data entry is provided below, where all the text between <> should be filled by the user. Multiple data sources of the same data type should be grouped. Currently the following data types are supported: {'RasterDataset', 'GeoDataset', 'GeoDataFrame'}. See the specific data adapters for more information about the required and optional arguments. .. code-block:: console root: <path> category: <category> <name>: path: <path> data_type: <data_type> driver: <driver> kwargs: <key>: <value> crs: <crs> nodata: <hydromt_variable_name1>: <nodata> rename: <native_variable_name1>: <hydromt_variable_name1> <native_variable_name2>: <hydromt_variable_name2> unit_add: <hydromt_variable_name1>: <float/int> unit_mult: <hydromt_variable_name1>: <float/int> meta: source_url: <source_url> source_version: <source_version> source_licence: <source_licence> paper_ref: <paper_ref> paper_doi: <paper_doi> placeholders: <placeholder_name_1>: <list of names> <placeholder_name_2>: <list of names> """ self.logger.info(f"Parsing data catalog from {urlpath}") yml = _yml_from_uri_or_path(urlpath) # parse metadata meta = dict() # legacy code with root/category at highest yml level if "root" in yml: meta.update(root=yml.pop("root")) if "category" in yml: meta.update(category=yml.pop("category")) # read meta data meta = yml.pop("meta", meta) # TODO keep meta data!! Note only possible if yml files are not merged if root is None: root = meta.get("root", dirname(urlpath)) self.from_dict( yml, root=root, category=meta.get("category", None), mark_used=mark_used )
[docs] def from_dict( self, data_dict: Dict, root: Union[str, Path] = None, category: str = None, mark_used: bool = False, ) -> None: """Add data sources based on dictionary. Parameters ---------- data_dict: dict Dictionary of data_sources. root: str, Path, optional Global root for all relative paths in `data_dict`. category: str, optional Global category for all sources in `data_dict`. mark_used: bool If True, append to used_data list. Examples -------- A data dictionary with two entries is provided below, where all the text between <> should be filled by the user. See the specific data adapters for more information about the required and optional arguments. .. code-block:: text { <name1>: { "path": <path>, "data_type": <data_type>, "driver": <driver>, "kwargs": {<key>: <value>}, "crs": <crs>, "nodata": <nodata>, "rename": {<native_variable_name1>: <hydromt_variable_name1>}, "unit_add": {<hydromt_variable_name1>: <float/int>}, "unit_mult": {<hydromt_variable_name1>: <float/int>}, "meta": {...}, "placeholders": {<placeholder_name_1>: <list of names>}, } <name2>: { ... } } """ data_dict = _parse_data_dict(data_dict, root=root, category=category) self.update(**data_dict) if mark_used: self._used_data.extend(list(data_dict.keys()))
[docs] def to_yml( self, path: Union[str, Path], root: str = "auto", source_names: List = [], used_only: bool = False, ) -> None: """Write data catalog to yaml format. Parameters ---------- path: str, Path yaml output path. root: str, Path, optional Global root for all relative paths in yaml file. If "auto" the data source paths are relative to the yaml output ``path``. source_names: list, optional List of source names to export; ignored if `used_only=True` used_only: bool If True, export only data entries kept in used_data list, by default False. """ source_names = self._used_data if used_only else source_names yml_dir = os.path.dirname(path) if root == "auto": root = yml_dir d = self.to_dict(root=root, source_names=source_names) if str(root) == yml_dir: d.pop("root", None) # remove root if it equals the yml_dir with open(path, "w") as f: yaml.dump(d, f, default_flow_style=False)
[docs] def to_dict(self, source_names: List = [], root: Union[Path, str] = None) -> Dict: """Export the data catalog to a dictionary. Parameters ---------- source_names : list, optional List of source names to export root : str, Path, optional Global root for all relative paths in yml file. Returns ------- dict data catalog dictionary """ sources_out = dict() if root is not None: root = os.path.abspath(root) sources_out["root"] = root root_drive = os.path.splitdrive(root)[0] for name, source in self.sources.items(): if len(source_names) > 0 and name not in source_names: continue source_dict = source.to_dict() if root is not None: path = source_dict["path"] # is abspath source_drive = os.path.splitdrive(path)[0] if ( root_drive == source_drive and os.path.commonpath([path, root]) == root ): source_dict["path"] = os.path.relpath( source_dict["path"], root ).replace("\\", "/") # remove non serializable entries to prevent errors source_dict = _process_dict(source_dict, logger=self.logger) # TODO TEST sources_out.update({name: source_dict}) return sources_out
[docs] def to_dataframe(self, source_names: List = []) -> pd.DataFrame: """Return data catalog summary as DataFrame""" d = dict() for name, source in self.sources.items(): if len(source_names) > 0 and name not in source_names: continue d[name] = source.summary() return pd.DataFrame.from_dict(d, orient="index")
[docs] def export_data( self, data_root: Union[Path, str], bbox: List = None, time_tuple: Tuple = None, source_names: List = [], unit_conversion: bool = True, ) -> None: """Export a data slice of each dataset and a data_catalog.yml file to disk. Parameters ---------- data_root : str, Path Path to output folder bbox : array-like of floats (xmin, ymin, xmax, ymax) bounding box of area of interest. time_tuple : tuple of str, datetime, optional Start and end date of period of interest. By default the entire time period of the dataset is returned. source_names: list, optional List of source names to export unit_conversion: boolean, optional If False skip unit conversion when parsing data from file, by default True. """ if not os.path.isdir(data_root): os.makedirs(data_root) # create copy of data with selected source names sources = copy.deepcopy(self.sources) if len(source_names) > 0: sources = {n: sources[n] for n in source_names} # export data and update sources sources_out = {} for key, source in sources.items(): try: # read slice of source and write to file self.logger.debug(f"Exporting {key}.") if not unit_conversion: unit_mult = source.unit_mult unit_add = source.unit_add source.unit_mult = {} source.unit_add = {} fn_out, driver = source.to_file( data_root=data_root, data_name=key, bbox=bbox, time_tuple=time_tuple, logger=self.logger, ) if fn_out is None: self.logger.warning(f"{key} file contains no data within domain") continue # update path & driver and remove kwargs and rename in output sources if unit_conversion: source.unit_mult = {} source.unit_add = {} else: source.unit_mult = unit_mult source.unit_add = unit_add source.path = fn_out source.driver = driver source.kwargs = {} source.rename = {} sources_out[key] = source except FileNotFoundError: self.logger.warning(f"{key} file not found at {source.path}") # write data catalog to yml data_catalog_out = DataCatalog() data_catalog_out._sources = sources_out fn = join(data_root, "data_catalog.yml") data_catalog_out.to_yml(fn, root="auto")
[docs] def get_rasterdataset( self, path_or_key: str, bbox: List = None, geom: gpd.GeoDataFrame = None, buffer: Union[float, int] = 0, align: bool = None, variables: Union[List, str] = None, time_tuple: Tuple = None, single_var_as_array: bool = True, **kwargs, ) -> xr.Dataset: """Returns a clipped, sliced and unified RasterDataset from the data catalog. To clip the data to the area of interest, provide a `bbox` or `geom`, with optional additional `buffer` and `align` arguments. To slice the data to the time period of interest, provide the `time_tuple` argument. To return only the dataset variables of interest and check their presence, provide the `variables` argument. NOTE: Unless `single_var_as_array` is set to False a single-variable data source will be returned as :py:class:`xarray.DataArray` rather than :py:class:`xarray.Dataset`. Arguments --------- path_or_key: str Data catalog key. If a path to a raster file is provided it will be added to the data_catalog with its based on the file basename without extension. bbox : array-like of floats (xmin, ymin, xmax, ymax) bounding box of area of interest. geom : geopandas.GeoDataFrame/Series, A geometry defining the area of interest. buffer : int, optional Buffer around the `bbox` or `geom` area of interest in pixels. By default 0. align : float, optional Resolution to align the bounding box, by default None variables : str or list of str, optional. Names of RasterDataset variables to return. By default all dataset variables are returned. time_tuple : tuple of str, datetime, optional Start and end date of period of interest. By default the entire time period of the dataset is returned. single_var_as_array: bool, optional If True, return a DataArray if the dataset consists of a single variable. If False, always return a Dataset. By default True. Returns ------- obj: xarray.Dataset or xarray.DataArray RasterDataset """ if path_or_key not in self.sources and exists(abspath(path_or_key)): path = str(abspath(path_or_key)) name = basename(path_or_key).split(".")[0] self.update(**{name: RasterDatasetAdapter(path=path, **kwargs)}) elif path_or_key in self.sources: name = path_or_key else: raise FileNotFoundError(f"No such file or catalog key: {path_or_key}") self._used_data.append(name) source = self.sources[name] self.logger.info( f"DataCatalog: Getting {name} RasterDataset {source.driver} data from {source.path}" ) obj = self.sources[name].get_data( bbox=bbox, geom=geom, buffer=buffer, align=align, variables=variables, time_tuple=time_tuple, single_var_as_array=single_var_as_array, logger=self.logger, ) return obj
[docs] def get_geodataframe( self, path_or_key: Union[str, Path], bbox: List = None, geom: gpd.GeoDataFrame = None, buffer: Union[float, int] = 0, variables: Union[List, str] = None, predicate: str = "intersects", **kwargs, ): """Returns a clipped and unified GeoDataFrame (vector) from the data catalog. To clip the data to the area of interest, provide a `bbox` or `geom`, with optional additional `buffer` and `align` arguments. To return only the dataframe columns of interest and check their presence, provide the `variables` argument. Arguments --------- path_or_key: str Data catalog key. If a path to a vector file is provided it will be added to the data_catalog with its based on the file basename without extension. bbox : array-like of floats (xmin, ymin, xmax, ymax) bounding box of area of interest. geom : geopandas.GeoDataFrame/Series, A geometry defining the area of interest. buffer : float, optional Buffer around the `bbox` or `geom` area of interest in meters. By default 0. predicate : {'intersects', 'within', 'contains', 'overlaps', 'crosses', 'touches'}, optional If predicate is provided, the GeoDataFrame is filtered by testing the predicate function against each item. Requires bbox or mask. By default 'intersects' align : float, optional Resolution to align the bounding box, by default None variables : str or list of str, optional. Names of GeoDataFrame columns to return. By default all columns are returned. Returns ------- gdf: geopandas.GeoDataFrame GeoDataFrame """ if path_or_key not in self.sources and exists(abspath(path_or_key)): path = str(abspath(path_or_key)) name = basename(path_or_key).split(".")[0] self.update(**{name: GeoDataFrameAdapter(path=path, **kwargs)}) elif path_or_key in self.sources: name = path_or_key else: raise FileNotFoundError(f"No such file or catalog key: {path_or_key}") self._used_data.append(name) source = self.sources[name] self.logger.info( f"DataCatalog: Getting {name} GeoDataFrame {source.driver} data from {source.path}" ) gdf = source.get_data( bbox=bbox, geom=geom, buffer=buffer, predicate=predicate, variables=variables, logger=self.logger, ) return gdf
[docs] def get_geodataset( self, path_or_key: Union[Path, str], bbox: List = None, geom: gpd.GeoDataFrame = None, buffer: Union[float, int] = 0, variables: List = None, time_tuple: Tuple = None, single_var_as_array: bool = True, **kwargs, ) -> xr.Dataset: """Returns a clipped, sliced and unified GeoDataset from the data catalog. To clip the data to the area of interest, provide a `bbox` or `geom`, with optional additional `buffer` and `align` arguments. To slice the data to the time period of interest, provide the `time_tuple` argument. To return only the dataset variables of interest and check their presence, provide the `variables` argument. NOTE: Unless `single_var_as_array` is set to False a single-variable data source will be returned as xarray.DataArray rather than Dataset. Arguments --------- path_or_key: str Data catalog key. If a path to a file is provided it will be added to the data_catalog with its based on the file basename without extension. bbox : array-like of floats (xmin, ymin, xmax, ymax) bounding box of area of interest. geom : geopandas.GeoDataFrame/Series, A geometry defining the area of interest. buffer : float, optional Buffer around the `bbox` or `geom` area of interest in meters. By default 0. align : float, optional Resolution to align the bounding box, by default None variables : str or list of str, optional. Names of GeoDataset variables to return. By default all dataset variables are returned. time_tuple : tuple of str, datetime, optional Start and end date of period of interest. By default the entire time period of the dataset is returned. single_var_as_array: bool, optional If True, return a DataArray if the dataset consists of a single variable. If False, always return a Dataset. By default True. Returns ------- obj: xarray.Dataset or xarray.DataArray GeoDataset """ if path_or_key not in self.sources and exists(abspath(path_or_key)): path = str(abspath(path_or_key)) name = basename(path_or_key).split(".")[0] self.update(**{name: GeoDatasetAdapter(path=path, **kwargs)}) elif path_or_key in self.sources: name = path_or_key else: raise FileNotFoundError(f"No such file or catalog key: {path_or_key}") self._used_data.append(name) source = self.sources[name] self.logger.info( f"DataCatalog: Getting {name} GeoDataset {source.driver} data from {source.path}" ) obj = source.get_data( bbox=bbox, geom=geom, buffer=buffer, variables=variables, time_tuple=time_tuple, single_var_as_array=single_var_as_array, logger=self.logger, ) return obj
def get_dataframe( self, path_or_key, variables=None, time_tuple=None, **kwargs, ): if path_or_key not in self.sources and exists(abspath(path_or_key)): path = str(abspath(path_or_key)) name = basename(path_or_key).split(".")[0] self.update(**{name: DataFrameAdapter(path=path, **kwargs)}) elif path_or_key in self.sources: name = path_or_key else: raise FileNotFoundError(f"No such file or catalog key: {path_or_key}") self._used_data.append(name) source = self.sources[name] self.logger.info( f"DataCatalog: Getting {name} DataFrame {source.driver} data from {source.path}" ) obj = source.get_data( variables=variables, time_tuple=time_tuple, logger=self.logger, ) return obj
def _parse_data_dict( data_dict: Dict, root: Union[Path, str] = None, category: str = None ) -> Dict: """Parse data source dictionary.""" # link yml keys to adapter classes ADAPTERS = { "RasterDataset": RasterDatasetAdapter, "GeoDataFrame": GeoDataFrameAdapter, "GeoDataset": GeoDatasetAdapter, "DataFrame": DataFrameAdapter, } # NOTE: shouldn't the kwarg overwrite the dict/yml ? if root is None: root = data_dict.pop("root", None) # parse data data = dict() for name, source in data_dict.items(): source = source.copy() # important as we modify with pop if "alias" in source: alias = source.pop("alias") if alias not in data_dict: raise ValueError(f"alias {alias} not found in data_dict.") # use alias source but overwrite any attributes with original source source_org = source.copy() source = data_dict[alias].copy() source.update(source_org) if "path" not in source: raise ValueError(f"{name}: Missing required path argument.") data_type = source.pop("data_type", None) if data_type is None: raise ValueError(f"{name}: Data type missing.") elif data_type not in ADAPTERS: raise ValueError(f"{name}: Data type {data_type} unknown") adapter = ADAPTERS.get(data_type) path = abs_path(root, source.pop("path")) meta = source.pop("meta", {}) if "category" not in meta and category is not None: meta.update(category=category) # lower kwargs for backwards compatability # FIXME this could be problamatic if driver kwargs conflict DataAdapter arguments source.update(**source.pop("kwargs", {})) for opt in source: if "fn" in opt: # get absolute paths for file names source.update({opt: abs_path(root, source[opt])}) if "placeholders" in source: options = source["placeholders"] for combination in itertools.product(*options.values()): path_n = path name_n = name for k, v in zip(options.keys(), combination): path_n = path_n.replace("{" + k + "}", v) name_n = name_n.replace("{" + k + "}", v) data[name_n] = adapter(path=path_n, meta=meta, **source) else: data[name] = adapter(path=path, meta=meta, **source) return data def _uri_validator(x: str) -> bool: try: result = urlparse(x) return all([result.scheme, result.netloc]) except: return False def _yml_from_uri_or_path(uri_or_path: Union[Path, str]) -> Dict: if _uri_validator(uri_or_path): with requests.get(uri_or_path, stream=True) as r: if r.status_code != 200: raise IOError(f"URL {r.content}: {uri_or_path}") yml = yaml.load(r.text, Loader=yaml.FullLoader) else: with open(uri_or_path, "r") as stream: yml = yaml.load(stream, Loader=yaml.FullLoader) return yml def _process_dict(d: Dict, logger=logger) -> Dict: """Recursively change dict values to keep only python literal structures.""" for k, v in d.items(): _check_key = isinstance(k, str) if _check_key and isinstance(v, dict): d[k] = _process_dict(v) elif _check_key and isinstance(v, Path): d[k] = str(v) # path to string # elif not _check_key or not isinstance(v, (list, str, int, float, bool)): # d.pop(k) # remove this entry # logger.debug(f'Removing non-serializable entry "{k}"') return d def abs_path(root: Union[Path, str], rel_path: Union[Path, str]) -> str: path = Path(str(rel_path)) if not path.is_absolute(): if root is not None: rel_path = join(root, rel_path) path = Path(abspath(rel_path)) return str(path)