Source code for dpyverification.configuration.default.datasources

"""A module for default implementation of datasources."""

from enum import StrEnum
from typing import Annotated, Literal

from pydantic import BaseModel, Field, StringConstraints, model_validator

from dpyverification.configuration.base import BaseDatasourceConfig
from dpyverification.configuration.utils import (
    FewsWebserviceAuthConfig,
    LocalFile,
    LocalFiles,
)
from dpyverification.constants import DataSourceKind, DataType


class ArchiveKind(StrEnum):
    """Archive kind."""

    open_archive = "open_archive"
    external_storage_archive = "external_storage_archive"


class FewsNetCDFKind(StrEnum):
    """FEWS NetCDF kind."""

    observation = "observation"
    simulated_forecast_per_forecast_reference_time = (
        "simulated_forecast_per_forecast_reference_time"
    )
    simulated_forecast_per_forecast_period = "simulated_forecast_per_forecast_period"


class ForecastRetrievalMethod(StrEnum):
    """Retrieval methods for simulations."""

    retrieve_all_forecast_data = "retrieve_all_forecast_data"
    retrieve_forecast_data_per_lead_time = "retrieve_forecast_data_per_lead_time"


FewsWebserviceVersionString = Annotated[
    str,
    StringConstraints(pattern=r"^\d{4}\.(0[1-2])$"),
    Field(description="Please specify the version as 'YYYY.01' or 'YYYY.02"),
]


class FewsWebserviceVersion(BaseModel):
    """Configuration of FEWS Webservice version."""

    year: Annotated[int, Field(gt=2012, lt=2100, type=int)]
    subversion: Literal[1, 2]

    @property
    def supports_lead_time(self) -> bool:
        """Return True if lead time parameter is available in the webservice."""
        year_of_implementation = 2025
        return self.year >= year_of_implementation


[docs] class FewsWebserviceConfig(BaseDatasourceConfig): """A fews webservice input config element.""" import_adapter: Literal[DataSourceKind.FEWSWEBSERVICE] auth_config: FewsWebserviceAuthConfig = Field( default_factory=FewsWebserviceAuthConfig, # type:ignore[misc] ) location_ids: Annotated[list[str], Field(min_length=1)] parameter_ids: Annotated[list[str], Field(min_length=1)] module_instance_id: Annotated[str, Field(min_length=1)] ensemble_id: Annotated[str, Field(min_length=1)] | None = None qualifier_ids: Annotated[list[str], Field(min_length=1)] | None = None export_id_map: Annotated[str, Field(min_length=1)] | None = None webservice_version: FewsWebserviceVersionString archive_kind: Annotated[ ArchiveKind, Field( description="Archive kind. Defaults to a Delft-FEWS Open Archive, " "which is the Delft-FEWS standard.", ), ] = ArchiveKind.open_archive forecast_retrieval_method: Annotated[ ForecastRetrievalMethod, Field( description="Since Delft-FEWS 2025.01, the Delft-FEWS Webservice can" "retrieve forecasts for specific forecast periods (lead times). This avoid having " "to retrieve all forecast data outside of the configured forecast periods " "(lead times) for the verification pipeline. If not provided, the method will be " "automatically determined based on the configured webservice version.", ), ] = ForecastRetrievalMethod.retrieve_all_forecast_data max_workers_in_thread_pool: Annotated[ int, Field( description="This datasource asynchronously retrieves data from the " "Delft-FEWS webservice. Define here the maximum workers it can use. " "Use 5-10 for gentle load on the server-side and keep below 30 " "to avoid instability and minimize the risk of internal server errors.", ), ] = 2 @property def webservice_supports_lead_time_in_get_timeseries(self) -> bool: """Wether or not the leadTime parameter is supported. This determines the forecast retrieval method. """ implementation_year = 2025 webservice_version_year = int(self.webservice_version.split(".")[0]) return webservice_version_year >= implementation_year
[docs] @model_validator(mode="after") def validate_forecast_retrieval_method(self) -> "FewsWebserviceConfig": """Validate that the configures retrieval method is compatible with the webservice.""" if ( not self.webservice_supports_lead_time_in_get_timeseries and self.forecast_retrieval_method == ForecastRetrievalMethod.retrieve_forecast_data_per_lead_time ): msg = ( f"Configured forecast retrieval method {self.forecast_retrieval_method} is not " f"compatible with the configured webservice version {self.webservice_version}. " ) raise ValueError(msg) return self
[docs] class FewsNetCDFConfig(BaseDatasourceConfig, LocalFiles): """A FEWS NetCDF config element.""" import_adapter: Literal[DataSourceKind.FEWSNETCDF] netcdf_kind: FewsNetCDFKind station_ids: Annotated[list[str], Field(min_length=1)] | None = None parameter_ids: Annotated[list[str], Field(min_length=1)] | None = None
[docs] class NetCDFConfig(BaseDatasourceConfig, LocalFiles): """A NetCDF config element.""" import_adapter: Literal[DataSourceKind.NETCDF]
[docs] class CsvConfig(LocalFile, BaseDatasourceConfig): """A CSV input config element.""" import_adapter: Literal[DataSourceKind.CSV] data_type: Literal[DataType.threshold] stations: Annotated[list[str], Field(min_length=1)] variables: Annotated[list[str], Field(min_length=1)] thresholds: Annotated[list[str], Field(min_length=1)]