Skip to content

rule_processor

Module for RuleProcessor class

!!! classes RuleProcessor

RuleProcessor

Model class for processing models based on rules

Source code in entities/rule_processor.py
class RuleProcessor:
    """Model class for processing models based on rules"""

    def __init__(self, rules: List[IRule], dataset: _xr.Dataset) -> None:
        """Creates instance of a rule processor using the provided
        rules and input datasets

        Args:
            rules (List[IRule]): rules to process
            input_dataset (_xr.Dataset): input dataset to use
        """
        if len(rules) < 1:
            raise ValueError("No rules defined.")

        if dataset is None:
            raise ValueError("No datasets defined.")

        self._rules = rules
        self._input_dataset = dataset
        self._processing_list: List[List[IRule]] = []

    def initialize(self, logger: ILogger) -> bool:
        """Creates an ordered list of rule arrays, where every rule array
        contains rules that can be processed simultaneously.

        Args:
            logger (ILogger): logger for reporting messages

        Returns:
            bool: A boolean to indicate if all the rules can be processed.
        """
        inputs: List[str] = []

        inputs = _lu.flatten_list(
            [_du.list_vars(self._input_dataset), _du.list_coords(self._input_dataset)]
        )

        tree, success = self._create_rule_sets(inputs, self._rules, [], logger)
        if success:
            self._processing_list = tree

        return success

    def process_rules(
        self, output_dataset: _xr.Dataset, logger: ILogger
    ) -> _xr.Dataset:
        """Processes the rules defined in the initialize method
        and adds the results to the provided output_dataset.

        Args:
            output_dataset (_xr.Dataset): Dataset to place the rule
                                          results into
            logger (ILogger): logger for reporting messages

        Raises:
            RuntimeError: if initialization is not correctly done
        """

        if len(self._processing_list) < 1:
            message = "Processor is not properly initialized, please initialize."
            raise RuntimeError(message)

        for rule_set in self._processing_list:
            for rule in rule_set:
                logger.log_info(f"Starting rule {rule.name}")

                rule_result = self._execute_rule(rule, output_dataset, logger)
                output_name = rule.output_variable_name

                output_dataset[output_name] = (
                    rule_result.dims,
                    rule_result.values,
                    rule_result.attrs,
                    rule_result.coords,
                )
                for coord_key in rule_result.coords:
                    # the coord_key is overwritten in case we don't have the if
                    # statement below
                    if coord_key not in output_dataset.coords:
                        output_dataset = output_dataset.assign_coords(
                            {coord_key: rule_result[coord_key]}
                        )
        return output_dataset

    def _create_rule_sets(
        self,
        inputs: List[str],
        unprocessed_rules: List[IRule],
        current_tree: List[List[IRule]],
        logger: ILogger,
    ) -> Tuple[List[List[IRule]], bool]:
        """Creates an ordered list of rule-sets that can be processed in parallel.

        Args:
            inputs (List[str]): input names that are available to rules
            unprocessed_rules (List[IRule]): rules that still need to be handled
            current_tree (List[List[IRule]]): the current output list state
            logger (ILogger): logger for logging messages

        Returns:
            Tuple[List[List[IRule]], bool]: Ordered list of rule-sets
        """
        solvable_rules = self._get_solvable_rules(inputs, unprocessed_rules)

        if len(solvable_rules) == 0:
            rules_list = [str(rule.name) for rule in unprocessed_rules]
            rules_text = ", ".join(rules_list)
            logger.log_warning(f"Some rules can not be resolved: {rules_text}")
            return [], False

        for rule in solvable_rules:
            unprocessed_rules.remove(rule)
            inputs.append(rule.output_variable_name)

        current_tree.append(solvable_rules)

        if len(unprocessed_rules) > 0:
            return self._create_rule_sets(
                inputs, unprocessed_rules, current_tree, logger
            )
        return current_tree, True

    def _get_solvable_rules(
        self, inputs: List[str], unprocessed_rules: List[IRule]
    ) -> List[IRule]:
        """Checks which rules can be resolved using the provided "inputs" list.

        Args:
            inputs (List[str]): available inputs to resolve rules with
            unprocessed_rules (List[IRule]): rules that need need to be checked

        Returns:
            List[IRule]: list of rules that can be resolved with the provided inputs
        """
        solvable_rules: List[IRule] = []

        for rule in unprocessed_rules:
            names = rule.input_variable_names
            if all(name in inputs for name in names):
                solvable_rules.append(rule)

        return solvable_rules

    def _execute_rule(
        self, rule: IRule, output_dataset: _xr.Dataset, logger: ILogger
    ) -> _xr.DataArray:
        """Processes the rule with the provided dataset.

        Returns:
            _xr.DataArray: result data set
        """

        variable_lookup = dict(self._get_rule_input_variables(rule, output_dataset))
        variables = list(variable_lookup.values())

        if isinstance(rule, IMultiArrayBasedRule):
            result = rule.execute(variable_lookup, logger)

            # set output attributes, based on first array
            self._set_output_attributes(rule, result, variables[0])
            return result

        if isinstance(rule, IMultiCellBasedRule):
            result = self._process_by_multi_cell(rule, variable_lookup, logger)
            self._set_output_attributes(rule, result, variables[0])
            return result

        if len(variables) != 1:
            raise NotImplementedError("Array based rule only supports one input array.")

        input_variable = variables[0]
        if isinstance(rule, IArrayBasedRule):
            result = rule.execute(input_variable, logger)
            self._set_output_attributes(rule, result, input_variable)
            return result

        if isinstance(rule, ICellBasedRule):
            result = self._process_by_cell(rule, input_variable, logger)
            self._set_output_attributes(rule, result, input_variable)
            return result

        raise NotImplementedError(f"Can not execute rule {rule.name}.")

    def _set_output_attributes(
        self, rule: IRule, result: _xr.DataArray, input_variable: _xr.DataArray
    ):
        self._copy_definition_attributes(input_variable, result)

        result.attrs["long_name"] = rule.output_variable_name
        result.attrs["standard_name"] = rule.output_variable_name

    def _copy_definition_attributes(
        self, source_array: _xr.DataArray, target_array: _xr.DataArray
    ) -> None:
        attributes_to_copy = ["location", "mesh"]

        for attribute_name in attributes_to_copy:
            target_array.attrs[attribute_name] = get_dict_element(
                attribute_name, source_array.attrs, False
            )

    def _process_by_cell(
        self, rule: ICellBasedRule, input_variable: _xr.DataArray, logger: ILogger
    ) -> _xr.DataArray:
        """Processes every value of the input_variable and creates a
        new one from it

        Args:
            rule (ICellBasedRule): rule to process
            input_variable (_xr.DataArray): input variable/data
            logger (ILogger): logger for log messages

        Returns:
            _xr.DataArray: _description_
        """
        np_array = input_variable.to_numpy()
        result_variable = _np.zeros_like(np_array)

        # define variables to count value exceedings (for some rules): min and max
        warning_counter_total = [0, 0]

        # execute rule and gather warnings for exceeded values (for some rules)
        for indices, value in _np.ndenumerate(np_array):
            result_variable[indices], warning_counter = rule.execute(value, logger)
            # update total counter for both min and max
            warning_counter_total[0] += warning_counter[0]
            warning_counter_total[1] += warning_counter[1]

        # show warnings values outside range (for some rules):
        if warning_counter_total[0] > 0:
            logger.log_warning(
                f"value less than min: {warning_counter_total[0]} occurence(s)"
            )
        if warning_counter_total[1] > 0:
            logger.log_warning(
                f"value greater than max: {warning_counter_total[1]} occurence(s)"
            )

        # use copy to get the same dimensions as the
        # original input variable
        return input_variable.copy(data=result_variable)

    def _process_by_multi_cell(
        self,
        rule: IMultiCellBasedRule,
        input_variables: Dict[str, _xr.DataArray],
        logger: ILogger,
    ) -> _xr.DataArray:
        """Processes every value of the input_variable and creates a
        new one from it

        Args:
            rule (IMultiCellBasedRule): rule to process
            input_variables (_xr.DataArray): input variables/data
            logger (ILogger): logger for log messages

        Returns:
            _xr.DataArray: _description_
        """
        if len(input_variables) < 1:
            raise NotImplementedError(
                f"Can not execute rule {rule.name} with no input variables."
            )

        value_arrays = list(input_variables.values())

        # Check the amount of dimensions of all variables
        len_dims = _np.array([len(vals.dims) for vals in value_arrays])

        # Use the variable with the most dimensions. Broadcast all other
        # variables to these dimensions
        most_dims_bool = len_dims == max(len_dims)

        ref_var = value_arrays[_np.argmax(len_dims)]
        for ind_vars, enough_dims in enumerate(most_dims_bool):
            if not enough_dims:
                var_orig = value_arrays[ind_vars]
                value_arrays[ind_vars] = self._expand_dimensions_of_variable(
                    var_orig, ref_var, logger
                )

        # Check if all variables now have the same dimensions
        self._check_variable_dimensions(value_arrays, rule)

        result_variable = _np.zeros_like(ref_var.to_numpy())
        cell_values = {}

        for indices, _ in _np.ndenumerate(ref_var.to_numpy()):
            for value in value_arrays:
                cell_values[value.name] = value.data[indices]

            result_variable[indices] = rule.execute(cell_values, logger)

        # use copy to get the same dimensions as the
        # original input variable
        return ref_var.copy(data=result_variable)

    def _get_rule_input_variables(
        self, rule: IRule, output_dataset: _xr.Dataset
    ) -> Iterable[Tuple[str, _xr.DataArray]]:
        input_variable_names = rule.input_variable_names

        for input_variable_name in input_variable_names:
            yield input_variable_name, self._get_variable_by_name(
                input_variable_name, output_dataset
            )

    def _get_variable_by_name(
        self, name: str, output_dataset: _xr.Dataset
    ) -> _xr.DataArray:
        # search output dataset (generated output)
        if name in output_dataset:
            return output_dataset[name]

        raise KeyError(
            f"Key {name} was not found in input datasets or "
            "in calculated output dataset.",
        )

    def _check_variable_dimensions(
        self, value_arrays: List[_xr.DataArray], rule: IMultiCellBasedRule
    ):
        for val_index in range(len(value_arrays) - 1):
            var1 = value_arrays[val_index]
            var2 = value_arrays[val_index + 1]
            diff = set(var1.dims) ^ set(var2.dims)

            # If the variables with the most dimensions have different dimensions,
            # stop the calculation
            if len(diff) != 0:
                raise NotImplementedError(
                    f"Can not execute rule {rule.name} with variables with different \
                    dimensions. Variable {var1.name} with dimensions:{var1.dims} is \
                    different than {var2.name} with dimensions:{var2.dims}"
                )

    def _expand_dimensions_of_variable(
        self, var_orig: _xr.DataArray, ref_var: _xr.DataArray, logger: ILogger
    ):
        """Creates a new data-array with the values of the var_org expanded to
        include all dimensions of the ref_var

        Args:
            var_orig (_xr.DataArray): variable to expand with extra dimensions
            ref_var (_xr.DataArray): reference variable to synchronize the
                                     dimensions with
            logger (ILogger): logger for logging messages
        """
        # Let the user know which variables will be broadcast to all dimensions
        dims_orig = var_orig.dims
        dims_result = ref_var.dims
        dims_diff = [str(x) for x in dims_result if x not in dims_orig]
        str_dims_broadcasted = ",".join(dims_diff)
        logger.log_info(
            f"""Variable {var_orig.name} will be expanded to the following \
            dimensions: {str_dims_broadcasted} """
        )

        # perform the broadcast
        var_broadcasted = _xr.broadcast(var_orig, ref_var)[0]

        # Make sure the dimensions are in the same order
        return var_broadcasted.transpose(*ref_var.dims)

__init__(self, rules, dataset) special

Creates instance of a rule processor using the provided rules and input datasets

Parameters:

Name Type Description Default
rules List[IRule]

rules to process

required
input_dataset _xr.Dataset

input dataset to use

required
Source code in entities/rule_processor.py
def __init__(self, rules: List[IRule], dataset: _xr.Dataset) -> None:
    """Creates instance of a rule processor using the provided
    rules and input datasets

    Args:
        rules (List[IRule]): rules to process
        input_dataset (_xr.Dataset): input dataset to use
    """
    if len(rules) < 1:
        raise ValueError("No rules defined.")

    if dataset is None:
        raise ValueError("No datasets defined.")

    self._rules = rules
    self._input_dataset = dataset
    self._processing_list: List[List[IRule]] = []

initialize(self, logger)

Creates an ordered list of rule arrays, where every rule array contains rules that can be processed simultaneously.

Parameters:

Name Type Description Default
logger ILogger

logger for reporting messages

required

Returns:

Type Description
bool

A boolean to indicate if all the rules can be processed.

Source code in entities/rule_processor.py
def initialize(self, logger: ILogger) -> bool:
    """Creates an ordered list of rule arrays, where every rule array
    contains rules that can be processed simultaneously.

    Args:
        logger (ILogger): logger for reporting messages

    Returns:
        bool: A boolean to indicate if all the rules can be processed.
    """
    inputs: List[str] = []

    inputs = _lu.flatten_list(
        [_du.list_vars(self._input_dataset), _du.list_coords(self._input_dataset)]
    )

    tree, success = self._create_rule_sets(inputs, self._rules, [], logger)
    if success:
        self._processing_list = tree

    return success

process_rules(self, output_dataset, logger)

Processes the rules defined in the initialize method and adds the results to the provided output_dataset.

Parameters:

Name Type Description Default
output_dataset _xr.Dataset

Dataset to place the rule results into

required
logger ILogger

logger for reporting messages

required

Exceptions:

Type Description
RuntimeError

if initialization is not correctly done

Source code in entities/rule_processor.py
def process_rules(
    self, output_dataset: _xr.Dataset, logger: ILogger
) -> _xr.Dataset:
    """Processes the rules defined in the initialize method
    and adds the results to the provided output_dataset.

    Args:
        output_dataset (_xr.Dataset): Dataset to place the rule
                                      results into
        logger (ILogger): logger for reporting messages

    Raises:
        RuntimeError: if initialization is not correctly done
    """

    if len(self._processing_list) < 1:
        message = "Processor is not properly initialized, please initialize."
        raise RuntimeError(message)

    for rule_set in self._processing_list:
        for rule in rule_set:
            logger.log_info(f"Starting rule {rule.name}")

            rule_result = self._execute_rule(rule, output_dataset, logger)
            output_name = rule.output_variable_name

            output_dataset[output_name] = (
                rule_result.dims,
                rule_result.values,
                rule_result.attrs,
                rule_result.coords,
            )
            for coord_key in rule_result.coords:
                # the coord_key is overwritten in case we don't have the if
                # statement below
                if coord_key not in output_dataset.coords:
                    output_dataset = output_dataset.assign_coords(
                        {coord_key: rule_result[coord_key]}
                    )
    return output_dataset