Skip to content

time_aggregation_rule

Module for TimeAggregationRule class

!!! classes TimeAggregationRule

TimeAggregationRule (RuleBase, IArrayBasedRule)

Implementation for the time aggregation rule

Source code in rules/time_aggregation_rule.py
class TimeAggregationRule(RuleBase, IArrayBasedRule):
    """Implementation for the time aggregation rule"""

    def __init__(
        self,
        name: str,
        input_variable_names: List[str],
        operation_type: TimeOperationType,
    ):
        super().__init__(name, input_variable_names)
        self._settings = TimeOperationSettings({"month": "ME", "year": "YE"})
        self._settings.percentile_value = 0
        self._settings.operation_type = operation_type
        self._settings.time_scale = "year"

    @property
    def settings(self):
        """Time operation settings"""
        return self._settings

    def validate(self, logger: ILogger) -> bool:
        """Validates if the rule is valid

        Returns:
            bool: wether the rule is valid
        """
        return self.settings.validate(self.name, logger)

    def execute(self, value_array: _xr.DataArray, logger: ILogger) -> _xr.DataArray:
        """Aggregates the values for the specified start and end date

        Args:
            value_array (DataArray): value to aggregate

        Returns:
            DataArray: Aggregated values
        """
        settings = self._settings
        if settings.operation_type is TimeOperationType.COUNT_PERIODS:
            # Check if all values in a COUNT_PERIODS value array
            #  are either 0 or 1 or NaN
            compare_values = (
                (value_array == 0) | (value_array == 1) | _np.isnan(value_array)
            )
            check_values = _xr.where(compare_values, True, False)
            if False in check_values:
                raise ValueError(
                    "The value array for the time aggregation rule with operation type"
                    " COUNT_PERIODS should only contain the values 0 and 1 (or NaN)."
                )

        dim_name = get_dict_element(settings.time_scale, settings.time_scale_mapping)

        time_dim_name = get_time_dimension_name(value_array, logger)
        aggregated_values = value_array.resample({time_dim_name: dim_name}, skipna=True)

        result = self._perform_operation(aggregated_values)
        # create a new aggregated time dimension based on original time dimension

        result_time_dim_name = f"{time_dim_name}_{settings.time_scale}"
        result = result.rename({time_dim_name: result_time_dim_name})

        for key, value in value_array[time_dim_name].attrs.items():
            if value:
                result[result_time_dim_name].attrs[key] = value

        result = result.assign_coords(
            {result_time_dim_name: result[result_time_dim_name]}
        )
        result[result_time_dim_name].attrs["long_name"] = result_time_dim_name
        result[result_time_dim_name].attrs["standard_name"] = result_time_dim_name

        return result

    def _perform_operation(self, aggregated_values: DataArrayResample) -> _xr.DataArray:
        """Returns the values based on the operation type

        Args:
            aggregated_values (DataArrayResample): aggregate values

        Raises:
            NotImplementedError: If operation type is not supported

        Returns:
            DataArray: Values of operation type
        """
        period_operations = [
            TimeOperationType.COUNT_PERIODS,
            TimeOperationType.MAX_DURATION_PERIODS,
            TimeOperationType.AVG_DURATION_PERIODS,
        ]

        operation_type = self.settings.operation_type

        if operation_type is TimeOperationType.ADD:
            result = aggregated_values.sum()

        elif operation_type is TimeOperationType.MIN:
            result = aggregated_values.min()

        elif operation_type is TimeOperationType.MAX:
            result = aggregated_values.max()

        elif operation_type is TimeOperationType.AVERAGE:
            result = aggregated_values.mean()

        elif operation_type is TimeOperationType.MEDIAN:
            result = aggregated_values.median()

        elif operation_type in period_operations:
            result = aggregated_values.reduce(self.analyze_groups, dim="time")

        elif operation_type is TimeOperationType.STDEV:
            result = aggregated_values.std()

        elif operation_type is TimeOperationType.PERCENTILE:
            result = aggregated_values.quantile(
                self.settings.percentile_value / 100
            ).drop_vars("quantile")

        else:
            raise NotImplementedError(
                f"The operation type '{operation_type}' " "is currently not supported"
            )

        return _xr.DataArray(result)

    def count_groups(self, elem):
        """
        Count the amount of times the groups of 1 occur.

        Args:
            elem (Array): the data array in N-dimensions

        Returns:
            List: list with the counted periods
        """
        # in case of an example array with 5 values [1,1,0,1,0]:
        # subtract last 4 values from the first 4 values: [1,0,1,0] - [1,1,0,1]:
        # (the result of this example differences: [0,-1,1,-1])
        differences = _np.diff(elem)
        # First add the first element of the array to the difference array (as this
        # could also indicate a beginning of a group or not and the diff is calculated
        # from the second element)
        # when the difference of two neighbouring elements is 1, this indicates the
        # start of a group. to count the number of groups: count the occurences of
        # difference == 1: (the result of this examples: 1 + 1 = 2)
        differences = _np.append(differences, elem[0])
        return _np.count_nonzero(differences == 1)

    def duration_groups(self, elem):
        """
        Create an array that cumulative sums the values of the groups in the array,
        but restarts when a 0 occurs. For example: [0, 1, 1, 0, 1, 1, 1, 0, 1]
        This function will return: [0, 1, 2, 0, 1, 2, 3, 0, 1, 2, 0, 1]

        Args:
            elem (List): the data array in N-dimensions

        Returns:
            List: List with the duration of the periods
        """
        # Function to create a cumsum over the groups (where the elements in elem are 1)
        cumsum_groups = _np.frompyfunc(lambda a, b: a + b if b == 1 else 0, 2, 1)
        return cumsum_groups.accumulate(elem)

    def analyze_groups(self, elem, axis):
        """This function analyzes the input array (N-dimensional array containing 0
        and 1) The function will reduce the array over the time axis, depending on a
        certain time operation type. Below are the operation types with what this
        function will do to this example input array: [0, 1, 1, 0, 1, 0]. A period
        is all consecutive 1 values.
            - COUNT_PERIODS: count the amount of periods (result: 2)
            - MAX_DURATION_PERIODS: gives the longest period (result: 2)
            - AVG_DURATION_PERIODS: gives the average of periods (result: 1.5)

        Args:
            elem (Array): the data array in N-dimensions
            axis (integer): the value describing the time axis

        Returns:
            array: array with the analyzed periods, with the same dimensions as elem
        """
        # Determine the number of axes in the array
        no_axis = len(_np.shape(elem))

        # The reduce function that calls this analyze_groups function should be reduces
        # over the time axis. The argument axis in this function gives a number of which
        # axis is in fact the time axis. This axis needs to move to the last position,
        # because we need to reduce the N-dimensional arary to a 1D array with all the
        # values in time for a specific cell in order to do the calculation for that
        # cell. Because we are looping over the N-dimensional array iteratively, we
        # should only move the time axis the first time this function is called (so when
        # the axis is not yet set to -1!)
        if axis != -1:
            elem = _np.moveaxis(elem, axis, -1)
            axis = -1

        #  in case of 1 dimension:
        if no_axis == 1:
            # remove NaN values from the array (these are to be ignored)
            elem = elem[~_np.isnan(elem)]
            if len(elem) == 0:
                return 0
            if self.settings.operation_type is TimeOperationType.COUNT_PERIODS:
                group_result = self.count_groups(elem)
            elif self.settings.operation_type is TimeOperationType.MAX_DURATION_PERIODS:
                group_result = _np.max((self.duration_groups(elem)))
            elif self.settings.operation_type is TimeOperationType.AVG_DURATION_PERIODS:
                period = float(_np.sum(elem))
                group_count = float(self.count_groups(elem))
                group_result = _np.divide(
                    period,
                    group_count,
                    out=_np.zeros_like(period),
                    where=group_count != 0,
                )

        # in case of multiple dimensions:
        else:
            group_result = []
            for sub_elem in elem:
                # loop through this recursive function, determine output per axis:
                group_result_row = self.analyze_groups(sub_elem, axis)
                # add the result to the list of results, per axis:
                group_result.append(group_result_row)

        return group_result

settings property readonly

Time operation settings

analyze_groups(self, elem, axis)

This function analyzes the input array (N-dimensional array containing 0 and 1) The function will reduce the array over the time axis, depending on a certain time operation type. Below are the operation types with what this function will do to this example input array: [0, 1, 1, 0, 1, 0]. A period is all consecutive 1 values. - COUNT_PERIODS: count the amount of periods (result: 2) - MAX_DURATION_PERIODS: gives the longest period (result: 2) - AVG_DURATION_PERIODS: gives the average of periods (result: 1.5)

Parameters:

Name Type Description Default
elem Array

the data array in N-dimensions

required
axis integer

the value describing the time axis

required

Returns:

Type Description
array

array with the analyzed periods, with the same dimensions as elem

Source code in rules/time_aggregation_rule.py
def analyze_groups(self, elem, axis):
    """This function analyzes the input array (N-dimensional array containing 0
    and 1) The function will reduce the array over the time axis, depending on a
    certain time operation type. Below are the operation types with what this
    function will do to this example input array: [0, 1, 1, 0, 1, 0]. A period
    is all consecutive 1 values.
        - COUNT_PERIODS: count the amount of periods (result: 2)
        - MAX_DURATION_PERIODS: gives the longest period (result: 2)
        - AVG_DURATION_PERIODS: gives the average of periods (result: 1.5)

    Args:
        elem (Array): the data array in N-dimensions
        axis (integer): the value describing the time axis

    Returns:
        array: array with the analyzed periods, with the same dimensions as elem
    """
    # Determine the number of axes in the array
    no_axis = len(_np.shape(elem))

    # The reduce function that calls this analyze_groups function should be reduces
    # over the time axis. The argument axis in this function gives a number of which
    # axis is in fact the time axis. This axis needs to move to the last position,
    # because we need to reduce the N-dimensional arary to a 1D array with all the
    # values in time for a specific cell in order to do the calculation for that
    # cell. Because we are looping over the N-dimensional array iteratively, we
    # should only move the time axis the first time this function is called (so when
    # the axis is not yet set to -1!)
    if axis != -1:
        elem = _np.moveaxis(elem, axis, -1)
        axis = -1

    #  in case of 1 dimension:
    if no_axis == 1:
        # remove NaN values from the array (these are to be ignored)
        elem = elem[~_np.isnan(elem)]
        if len(elem) == 0:
            return 0
        if self.settings.operation_type is TimeOperationType.COUNT_PERIODS:
            group_result = self.count_groups(elem)
        elif self.settings.operation_type is TimeOperationType.MAX_DURATION_PERIODS:
            group_result = _np.max((self.duration_groups(elem)))
        elif self.settings.operation_type is TimeOperationType.AVG_DURATION_PERIODS:
            period = float(_np.sum(elem))
            group_count = float(self.count_groups(elem))
            group_result = _np.divide(
                period,
                group_count,
                out=_np.zeros_like(period),
                where=group_count != 0,
            )

    # in case of multiple dimensions:
    else:
        group_result = []
        for sub_elem in elem:
            # loop through this recursive function, determine output per axis:
            group_result_row = self.analyze_groups(sub_elem, axis)
            # add the result to the list of results, per axis:
            group_result.append(group_result_row)

    return group_result

count_groups(self, elem)

Count the amount of times the groups of 1 occur.

Parameters:

Name Type Description Default
elem Array

the data array in N-dimensions

required

Returns:

Type Description
List

list with the counted periods

Source code in rules/time_aggregation_rule.py
def count_groups(self, elem):
    """
    Count the amount of times the groups of 1 occur.

    Args:
        elem (Array): the data array in N-dimensions

    Returns:
        List: list with the counted periods
    """
    # in case of an example array with 5 values [1,1,0,1,0]:
    # subtract last 4 values from the first 4 values: [1,0,1,0] - [1,1,0,1]:
    # (the result of this example differences: [0,-1,1,-1])
    differences = _np.diff(elem)
    # First add the first element of the array to the difference array (as this
    # could also indicate a beginning of a group or not and the diff is calculated
    # from the second element)
    # when the difference of two neighbouring elements is 1, this indicates the
    # start of a group. to count the number of groups: count the occurences of
    # difference == 1: (the result of this examples: 1 + 1 = 2)
    differences = _np.append(differences, elem[0])
    return _np.count_nonzero(differences == 1)

duration_groups(self, elem)

Create an array that cumulative sums the values of the groups in the array, but restarts when a 0 occurs. For example: [0, 1, 1, 0, 1, 1, 1, 0, 1] This function will return: [0, 1, 2, 0, 1, 2, 3, 0, 1, 2, 0, 1]

Parameters:

Name Type Description Default
elem List

the data array in N-dimensions

required

Returns:

Type Description
List

List with the duration of the periods

Source code in rules/time_aggregation_rule.py
def duration_groups(self, elem):
    """
    Create an array that cumulative sums the values of the groups in the array,
    but restarts when a 0 occurs. For example: [0, 1, 1, 0, 1, 1, 1, 0, 1]
    This function will return: [0, 1, 2, 0, 1, 2, 3, 0, 1, 2, 0, 1]

    Args:
        elem (List): the data array in N-dimensions

    Returns:
        List: List with the duration of the periods
    """
    # Function to create a cumsum over the groups (where the elements in elem are 1)
    cumsum_groups = _np.frompyfunc(lambda a, b: a + b if b == 1 else 0, 2, 1)
    return cumsum_groups.accumulate(elem)

execute(self, value_array, logger)

Aggregates the values for the specified start and end date

Parameters:

Name Type Description Default
value_array DataArray

value to aggregate

required

Returns:

Type Description
DataArray

Aggregated values

Source code in rules/time_aggregation_rule.py
def execute(self, value_array: _xr.DataArray, logger: ILogger) -> _xr.DataArray:
    """Aggregates the values for the specified start and end date

    Args:
        value_array (DataArray): value to aggregate

    Returns:
        DataArray: Aggregated values
    """
    settings = self._settings
    if settings.operation_type is TimeOperationType.COUNT_PERIODS:
        # Check if all values in a COUNT_PERIODS value array
        #  are either 0 or 1 or NaN
        compare_values = (
            (value_array == 0) | (value_array == 1) | _np.isnan(value_array)
        )
        check_values = _xr.where(compare_values, True, False)
        if False in check_values:
            raise ValueError(
                "The value array for the time aggregation rule with operation type"
                " COUNT_PERIODS should only contain the values 0 and 1 (or NaN)."
            )

    dim_name = get_dict_element(settings.time_scale, settings.time_scale_mapping)

    time_dim_name = get_time_dimension_name(value_array, logger)
    aggregated_values = value_array.resample({time_dim_name: dim_name}, skipna=True)

    result = self._perform_operation(aggregated_values)
    # create a new aggregated time dimension based on original time dimension

    result_time_dim_name = f"{time_dim_name}_{settings.time_scale}"
    result = result.rename({time_dim_name: result_time_dim_name})

    for key, value in value_array[time_dim_name].attrs.items():
        if value:
            result[result_time_dim_name].attrs[key] = value

    result = result.assign_coords(
        {result_time_dim_name: result[result_time_dim_name]}
    )
    result[result_time_dim_name].attrs["long_name"] = result_time_dim_name
    result[result_time_dim_name].attrs["standard_name"] = result_time_dim_name

    return result

validate(self, logger)

Validates if the rule is valid

Returns:

Type Description
bool

wether the rule is valid

Source code in rules/time_aggregation_rule.py
def validate(self, logger: ILogger) -> bool:
    """Validates if the rule is valid

    Returns:
        bool: wether the rule is valid
    """
    return self.settings.validate(self.name, logger)