Source code for hydromt.predefined_catalog
"""Implementation of the predefined data catalogs entry points."""
import logging
import shutil
import sys
from pathlib import Path
from typing import Callable, ClassVar, Optional
import packaging.version
import pooch
from hydromt.data_adapter.caching import HYDROMT_DATADIR, _copyfile, _uri_validator
logger = logging.getLogger(__name__)
# this is the default location of the predefined catalogs
# in the test environment this is set to local data/catalogs directory using a global fixture
GIT_ROOT = r"https://raw.githubusercontent.com/Deltares/hydromt/main/data/catalogs"
__all__ = [
"PredefinedCatalog",
"DeltaresDataCatalog",
"ArtifactDataCatalog",
"AWSDataCatalog",
"GCSCMIP6DataCatalog",
"create_registry_file",
]
[docs]
def create_registry_file(root: Path, registry_path: Optional[Path] = None) -> None:
"""Create a registry file for all catalog files in the root directory.
The root directory should contain a <version>/data_catalog.yml file per version.
By default the root directory is the cache directory of the catalog instance.
Parameters
----------
root: Path
root directory to search for data_catalog.yml files
"""
# we don't use pooch.create_registry here as we want to only include vaild data_catalog.yml files
registry = {}
for path in root.glob("**/data_catalog.yml"):
key = path.relative_to(root).as_posix()
if not _valid_key(key):
raise ValueError(f"No valid version found in {key}")
if sys.platform == "win32":
# The line endings need to be replaced when operating from windows in order to maintain equality of hashes
_replace_line_endings(path)
file_hash = pooch.file_hash(path)
registry[key] = file_hash
if not registry:
raise FileNotFoundError(f"No data_catalog.yml files found in {root}")
if registry_path is None:
registry_path = Path(root / "registry.txt")
with open(registry_path, "w") as f:
for fname, hash in registry.items():
f.write(f"{fname} {hash}\n")
[docs]
class PredefinedCatalog(object):
"""Predefined data catalog.
A predefined data catalog is a collection of data_catalog.yml files that are stored in a
specific directory structure. The catalog is defined by a base_url and a name. The predefined
catalog can be used to retrieve data_catalog.yml files for specific versions.
Directory structure:
- <base_url>/registry.txt
- <base_url>/<version>/data_catalog.yml
Cached directory structure:
- <cache_dir>/<name>/registry.txt
- <cache_dir>/<name>/<version>/data_catalog.yml
"""
# required class variables to be defined in subclasses
base_url: ClassVar[str] = GIT_ROOT
name: ClassVar[str] = "predefined_catalog"
[docs]
def __init__(self, format_version: str = "v0", cache_dir=HYDROMT_DATADIR) -> None:
# init arguments passed by DataCatalog
self._format_version = format_version
self._cache_dir: Path = Path(cache_dir)
# placeholders set by the class
self._pooch: Optional[pooch.Pooch] = None
self._versions: Optional[list[str]] = None
@property
def registry(self) -> dict:
"""Return the registry."""
return self.pooch.registry
@property
def pooch(self) -> pooch.Pooch:
"""Return a pooch instance with all data catalog files in registry."""
if self._pooch is None:
self._create_pooch()
self._load_registry_file()
return self._pooch
@property
def versions(self) -> list[str]:
"""Return the versions of the catalog."""
if not self._versions:
self._versions = self._get_versions()
return self._versions
def _create_pooch(self) -> None:
self._pooch = pooch.create(
path=self._cache_dir / self.name,
base_url=self.base_url,
retry_if_failed=3,
)
def _get_versions(self) -> list[str]:
"""Set valid catalog versions."""
# parse versions from registry, assume registry key is <version>/data_catalog.yml
# keep only versions that match the format_version
keys = self.registry.keys()
_versions = [
v.split("/")[0] for v in keys if _valid_key(v, self._format_version)
]
if len(_versions) == 0:
raise RuntimeError(
f"No compatible catalog version could be found for {self.name}."
)
self._versions = sorted(_versions, key=packaging.version.parse)
return self._versions
def _load_registry_file(self, overwrite: bool = False) -> None:
"""Create a catalog from a yaml file."""
if self._pooch is None:
self._create_pooch()
if self.registry and not overwrite:
return
registry_path = Path(self._cache_dir / self.name / "registry.txt")
if registry_path.exists():
registry_path.unlink()
try: # try to retrieve and cache the registry file
_copyfile(f"{self.base_url}/registry.txt", registry_path)
except (ConnectionError, FileNotFoundError):
logger.warning(
f"Failed to retrieve {self.name} versions file from {self.base_url}."
" Creating registry file from cached catalog files."
)
create_registry_file(registry_path.parent)
if not registry_path.exists():
raise FileNotFoundError(
f"No cached file found. Failed to retrieve {self.name} versions file"
)
self.pooch.load_registry(registry_path)
[docs]
def get_catalog_file(self, version: Optional[str] = None) -> Optional[Path]:
"""Get the cached catalog file path for a specific version.
Parameters
----------
version: str, optional
The version of the catalog to retrieve. If None, the latest version is retrieved.
Returns
-------
Path
The path to the cachd catalog file.
"""
if version is None or version == "latest": # get latest version
version = self.versions[-1]
if version not in self.versions:
raise ValueError(f"Version {version} not found in {self.name} catalog")
# get the catalog file
key = f"{version}/data_catalog.yml"
# fetch the file (download if not cached)
path = self.pooch.fetch(key, downloader=self._downloader)
return Path(path) if path else None
@property
def _downloader(self) -> Optional[Callable]:
if not _uri_validator(self.base_url):
return _copy_file
return None
def _valid_key(v: str, format_version: Optional[str] = None) -> bool:
"""Check if the key is a valid version."""
try:
packaging.version.parse(v.split("/")[0])
return v.startswith(format_version) if format_version else True
except (packaging.version.InvalidVersion, AttributeError):
return False
def _copy_file(
url: str,
output_file: str,
pooch: Optional[pooch.Pooch] = None,
check_only: bool = False,
):
"""Copy a local file to the cache directory for testing purposes.
for more info, see: https://www.fatiando.org/pooch/latest/downloaders.html
"""
url = Path(url)
output_file = Path(output_file)
file_exists = url.is_file()
if check_only:
return file_exists
if not file_exists:
raise FileNotFoundError(f"Local file {url} does not exist.")
output_file.parent.mkdir(parents=True, exist_ok=True)
shutil.copyfile(url, output_file)
return output_file
class DeltaresDataCatalog(PredefinedCatalog):
"""Deltares data catalog."""
base_url = f"{GIT_ROOT}/deltares_data"
name = "deltares_data"
class ArtifactDataCatalog(PredefinedCatalog):
"""Artifact data catalog."""
base_url = f"{GIT_ROOT}/artifact_data"
name = "artifact_data"
class AWSDataCatalog(PredefinedCatalog):
"""AWS data catalog."""
base_url = f"{GIT_ROOT}/aws_data"
name = "aws_data"
class GCSCMIP6DataCatalog(PredefinedCatalog):
"""GCS CMIP6 data catalog."""
base_url = f"{GIT_ROOT}/gcs_cmip6_data"
name = "gcs_cmip6_data"
# TODO: replace with a entrypoint plugin structure in v1
PREDEFINED_CATALOGS = {
"artifact_data": ArtifactDataCatalog,
"deltares_data": DeltaresDataCatalog,
"aws_data": AWSDataCatalog,
"gcs_cmip6_data": GCSCMIP6DataCatalog,
}
def _replace_line_endings(file_path: Path):
WINDOWS_LINE_ENDING = b"\r\n"
UNIX_LINE_ENDING = b"\n"
with open(file_path, "rb") as open_file:
content = open_file.read()
content = content.replace(WINDOWS_LINE_ENDING, UNIX_LINE_ENDING)
with open(file_path, "wb") as open_file:
open_file.write(content)