Source code for dpyverification.pipeline

"""Specification of a pipeline that will collect data and run verification functions on the data."""

import logging
import warnings
from collections.abc import Sequence
from pathlib import Path
from typing import TypeVar, cast

from cftime import CFWarning  # type:ignore[import-untyped]
from xarray import SerializationWarning

from dpyverification.configuration.config import Config
from dpyverification.configuration.file import ConfigFile, ConfigKind
from dpyverification.datamodel import InputDataset, OutputDataset
from dpyverification.datasinks import DEFAULT_DATASINKS
from dpyverification.datasinks.base import BaseDatasink
from dpyverification.datasources import DEFAULT_DATASOURCES
from dpyverification.datasources.base import BaseDatasource
from dpyverification.scores import DEFAULT_SCORES
from dpyverification.scores.base import BaseCategoricalScore, BaseScore

__all__ = ["run_pipeline"]

logger = logging.getLogger(__name__)


TItem = TypeVar("TItem", bound=BaseDatasource | BaseDatasink | BaseScore | BaseCategoricalScore)


def find_matching_kind_in_list(
    items: Sequence[type[TItem]],
    kind: str,
) -> type[TItem]:
    """Return a datasource, calculation or datasink of a given kind."""
    for item in items:
        if kind == item.kind:
            return item
    msg = f"No item with type {kind} exists."
    raise ValueError(msg)


def merge_user_and_default_items(
    default_items: Sequence[type[TItem]],
    user_items: Sequence[type[TItem]] | None,
) -> list[type[TItem]]:
    """Merge default and user-provided items."""
    if user_items is None:
        return list(default_items)
    return list(default_items) + list(user_items)


[docs] def run_pipeline( config: tuple[Path, ConfigKind] | Config, user_datasources: list[type[BaseDatasource]] | None = None, user_scores: list[type[BaseScore] | type[BaseCategoricalScore]] | None = None, user_datasinks: list[type[BaseDatasink]] | None = None, ) -> OutputDataset: """Execute a verification pipeline as defined in the configuration. Parameters ---------- config : tuple[Path, ConfigKind] | Config When using a configuration file, provide a tuple with the path and kind of configuration file. For now, only 'yaml' is supported. user_datasources : list[type[BaseDatasource]] | None, optional Option to plug-in a user-implementation of a DataSource., by default None user_scores : list[type[BaseScore | BaseCategoricalScore]] | None, optional Option to plug-in a user-implementation of a Score., by default None user_datasinks : list[type[BaseDatasink]] | None, optional Option to plug-in a user-implementation of a DataSink., by default None Returns ------- OutputDataset The output dataset containing the results of the verification pipeline. In addition to the option of writing the output to a file or service, the output of the verification pipeline can also be assigned back to a Python variable for further inspection in an interactive Python environment. Examples -------- Using a YAML file: .. code-block:: python from dpyverification import run_pipeline from dpyverification.configuration import Config from pathlib import Path path_to_config = Path("./config.yaml) output_dataset = run_pipeline((path_to_config, "yaml")) Using Python objects directly: .. code-block:: python from dpyverification import run_pipeline from dpyverification.configuration import Config, GeneralInfoConfig config = Config( general=GeneralInfoConfig(log_level="INFO"), # ... other sub-models here ... ) output_dataset = run_pipeline(config) """ # Get the available sources, scores and sinks available_datasources = merge_user_and_default_items( DEFAULT_DATASOURCES, user_datasources, ) available_scores = merge_user_and_default_items( DEFAULT_SCORES, user_scores, ) available_datasinks = merge_user_and_default_items( DEFAULT_DATASINKS, user_datasinks, ) # Initialize the config instance from file when it's not directly provided if not isinstance(config, Config): config = ConfigFile( config_file=config[0], config_type=config[1], ).content # Log start message msg = ( "Successfully initialized the configuration. \n\t verification_period_start = " f"{config.general.verification_period.start} \n\t verification_period_end = " f"{config.general.verification_period.end}" ) logger.info(msg) # Collect and initialize all datasources datasources: list[BaseDatasource] = [] for datasource_config in config.datasources: source_kind = find_matching_kind_in_list( items=available_datasources, kind=datasource_config.import_adapter, ) datasource = source_kind.from_config( datasource_config.model_dump(), # type: ignore[misc] # Allow Any ) datasources.append(datasource) with warnings.catch_warnings(): # Filter some known and harmless warnings warnings.filterwarnings( "ignore", category=RuntimeWarning, message="invalid value encountered in cast", ) warnings.filterwarnings( "ignore", category=CFWarning, # type:ignore[misc] message="this date/calendar/year zero convention is not supported by CF", ) warnings.filterwarnings( "ignore", category=SerializationWarning, message="Unable to decode time axis into full numpy.datetime64 objects", ) # Get data for each datasource for datasource in datasources: msg = f"Start getting data from {datasource.__class__.__name__}." logger.info(msg) datasource.get_data() msg = f"Successfully got data from {datasource.__class__.__name__}." logger.info(msg) # Initialize the input dataset input_dataset = InputDataset( [datasource.data_array for datasource in datasources], ) msg = "Successfully loaded all data from sources." logger.info(msg) # Initialize the output dataset output_dataset = OutputDataset(input_dataset=input_dataset) # Add score results to the output dataset for score_config in config.scores: score_kind = find_matching_kind_in_list( items=available_scores, kind=score_config.score_adapter, ) score = cast( "BaseScore | BaseCategoricalScore", score_kind.from_config( score_config.model_dump(), # type: ignore[misc] # Allow Any ), ) for verification_pair in score.config.verification_pairs: obs, sim = input_dataset.get_pair(verification_pair) # Check if the score is a categorical score, because in that case we need to provide # the thresholds array as well. We do this runtime check, because the contract of # the compute function in the BaseCategoricalScore is different from the one in # BaseScore, and we want to keep the compute function signature of BaseScore simple # without optional arguments that are only required for categorical scores. if isinstance(score, BaseCategoricalScore): thresholds = input_dataset.get_thresholds_array() result = score.validate_and_compute(obs=obs, sim=sim, thresholds=thresholds) else: result = score.validate_and_compute(obs=obs, sim=sim) output_dataset.add_score(verification_pair_id=verification_pair.id, score=result) msg = ( f"Successfully computed {score.__class__.__name__} for verification pair " "{pair_id}." ) logger.info(msg) # Write data for each datasink if not None if config.datasinks is not None: for datasink_config in config.datasinks: sink_kind = find_matching_kind_in_list( items=available_datasinks, kind=datasink_config.export_adapter, ) datasink = sink_kind.from_config(datasink_config.model_dump()) # type: ignore[misc] # Allow Any # We write results for each verification pair separately to the datasink. The # datasink determines what the output will ook like. for verification_pair in config.general.verification_pairs: datasink.write_data( output_dataset.get_output_dataset(verification_pair), ) msg = ( f"Successfully wrote results of verification pair {verification_pair.id} " f"to {datasink.__class__.__name__}." ) logger.info(msg) msg = "Verification pipeline completed successfully." logger.info(msg) # Return the output dataset by default return output_dataset