Skip to content

rolling_statistics_rule

Module for RollingStatisticsRule class

!!! classes RollingStatisticsRule

RollingStatisticsRule (RuleBase, IArrayBasedRule)

Implementation for the rolling statistics rule

Source code in rules/rolling_statistics_rule.py
class RollingStatisticsRule(RuleBase, IArrayBasedRule):
    """Implementation for the rolling statistics rule"""

    def __init__(
        self,
        name: str,
        input_variable_names: List[str],
        operation_type: TimeOperationType,
    ):
        super().__init__(name, input_variable_names)
        self._settings = TimeOperationSettings({"hour": "H", "day": "D"})
        self._settings.percentile_value = 0
        self._settings.operation_type = operation_type
        self._settings.time_scale = "day"
        self._period = 1

    @property
    def settings(self):
        """Time operation settings"""
        return self._settings

    @property
    def period(self) -> float:
        """Operation type property"""
        return self._period

    @period.setter
    def period(self, period: float):
        self._period = period

    def validate(self, logger: ILogger) -> bool:
        """Validates if the rule is valid

        Returns:
            bool: wether the rule is valid
        """

        return self.settings.validate(self.name, logger)

    def execute(self, value_array: _xr.DataArray, logger: ILogger) -> _xr.DataArray:
        """Calculating the rolling statistics for a given period

        Args:
            value_array (DataArray): value to aggregate

        Returns:
            DataArray: Aggregated values
        """

        time_scale = get_dict_element(
            self.settings.time_scale, self.settings.time_scale_mapping
        )

        time_dim_name = get_time_dimension_name(value_array, logger)

        result = self._perform_operation(
            value_array,
            time_dim_name,
            time_scale,
            logger,
        )
        return result

    def _perform_operation(
        self,
        values: _xr.DataArray,
        time_dim_name: str,
        time_scale: str,
        logger: ILogger,
    ) -> _xr.DataArray:
        """Returns the values based on the operation type


        Args:
            values (_xr.DataArray): values
            time_dim_name (str): time dimension name
            dim_name (str): dimension name
            logger (ILogger): logger

        Raises:
            NotImplementedError: If operation type is not supported

        Returns:
            DataArray: Values of operation type
        """

        result_array = _cp.deepcopy(values)
        result_array = result_array.where(False, _np.nan)

        if time_scale == "H":
            operation_time_delta = _dt.timedelta(hours=self._period)
        elif time_scale == "D":
            operation_time_delta = _dt.timedelta(days=self._period)
        else:
            error_message = f"Invalid time scale provided : '{time_scale}'."
            logger.log_error(error_message)
            raise ValueError(error_message)

        time_delta_ms = _np.array([operation_time_delta], dtype="timedelta64[ms]")[0]
        last_timestamp = values.time.isel(time=-1).values
        for time_step in values.time.values:  # Interested in vectorizing this loop
            if last_timestamp - time_step < time_delta_ms:
                break

            data = values.sel(time=slice(time_step, time_step + time_delta_ms))
            last_timestamp_data = data.time.isel(time=-1).values
            result = self._apply_operation(data, time_dim_name)

            result_array.loc[{"time": last_timestamp_data}] = result

        return _xr.DataArray(result_array)

    def _apply_operation(
        self, data: _xr.DataArray, time_dim_name: str
    ) -> _xr.DataArray:
        operation_type = self.settings.operation_type

        if operation_type is TimeOperationType.ADD:
            result = data.sum(dim=time_dim_name)

        elif operation_type is TimeOperationType.MIN:
            result = data.min(dim=time_dim_name)

        elif operation_type is TimeOperationType.MAX:
            result = data.max(dim=time_dim_name)

        elif operation_type is TimeOperationType.AVERAGE:
            result = data.mean(dim=time_dim_name)

        elif operation_type is TimeOperationType.MEDIAN:
            result = data.median(dim=time_dim_name)

        elif operation_type is TimeOperationType.STDEV:
            result = data.std(dim=time_dim_name)

        elif operation_type is TimeOperationType.PERCENTILE:
            result = data.quantile(
                self.settings.percentile_value / 100, dim=time_dim_name
            ).drop_vars("quantile")

        else:
            raise NotImplementedError(
                f"The operation type '{operation_type}' " "is currently not supported"
            )

        return result

period: float property writable

Operation type property

settings property readonly

Time operation settings

execute(self, value_array, logger)

Calculating the rolling statistics for a given period

Parameters:

Name Type Description Default
value_array DataArray

value to aggregate

required

Returns:

Type Description
DataArray

Aggregated values

Source code in rules/rolling_statistics_rule.py
def execute(self, value_array: _xr.DataArray, logger: ILogger) -> _xr.DataArray:
    """Calculating the rolling statistics for a given period

    Args:
        value_array (DataArray): value to aggregate

    Returns:
        DataArray: Aggregated values
    """

    time_scale = get_dict_element(
        self.settings.time_scale, self.settings.time_scale_mapping
    )

    time_dim_name = get_time_dimension_name(value_array, logger)

    result = self._perform_operation(
        value_array,
        time_dim_name,
        time_scale,
        logger,
    )
    return result

validate(self, logger)

Validates if the rule is valid

Returns:

Type Description
bool

wether the rule is valid

Source code in rules/rolling_statistics_rule.py
def validate(self, logger: ILogger) -> bool:
    """Validates if the rule is valid

    Returns:
        bool: wether the rule is valid
    """

    return self.settings.validate(self.name, logger)