Skip to content

depth_average_rule

Module for DepthAverageRule class

!!! classes DepthAverageRule

DepthAverageRule (RuleBase, IMultiArrayBasedRule)

Implementation for the depth average rule

Source code in rules/depth_average_rule.py
class DepthAverageRule(RuleBase, IMultiArrayBasedRule):
    """Implementation for the depth average rule"""

    # pylint: disable=too-many-locals
    def execute(
        self, value_arrays: Dict[str, _xr.DataArray], logger: ILogger
    ) -> _xr.DataArray:
        """Calculate depth average of assumed z-layers.

        Args:
            value_array (DataArray): Values to average over the depth

        Returns:
            DataArray: Depth-averaged values
        """

        # The first DataArray in our value_arrays contains the values to be averaged
        # but the name of the key is given by the user, and is unknown here, so use
        # the ordering defined in the parser.
        values_list = list(value_arrays.values())

        variable = values_list[0]
        bed_level_values = values_list[1]
        water_level_values = values_list[2]
        depths_interfaces = values_list[3]

        # Get the dimension names for the interfaces and for the layers
        dim_interfaces_name = list(depths_interfaces.dims)[0]
        interfaces_len = depths_interfaces[dim_interfaces_name].size

        dim_layer_name = [
            d for d in variable.dims if d not in water_level_values.dims
        ][0]
        layer_len = variable[dim_layer_name].size

        # interface dimension should always be one larger than layer dimension
        # Otherwise give an error to the user
        if interfaces_len != layer_len + 1:
            logger.log_error(
                f"The number of interfaces should be number of layers + 1. Number of "
                f"interfaces = {interfaces_len}. Number of layers = {layer_len}."
            )
            return variable

        # Deal with open layer system at water level and bed level
        depths_interfaces.values[depths_interfaces.values.argmin()] = -100000
        depths_interfaces.values[depths_interfaces.values.argmax()] = 100000

        # Broadcast the depths to the dimensions of the bed levels. Then make a
        # correction for the depths to the bed level, in other words all depths lower
        # than the bed level will be corrected to the bed level.
        depths_interfaces_broadcasted = depths_interfaces.broadcast_like(
            bed_level_values
        )

        corrected_depth_bed = depths_interfaces_broadcasted.where(
            bed_level_values < depths_interfaces_broadcasted, bed_level_values
        )

        # Make a similar correction for the waterlevels (first broadcast to match
        # dimensions and then replace all values higher than waterlevel with
        # waterlevel)
        corrected_depth_bed = corrected_depth_bed.broadcast_like(water_level_values)
        corrected_depth_bed = corrected_depth_bed.where(
            water_level_values > corrected_depth_bed, water_level_values
        )

        # Calculate the layer heights between depths
        layer_heights = corrected_depth_bed.diff(dim=dim_interfaces_name)
        layer_heights = layer_heights.rename({dim_interfaces_name: dim_layer_name})

        # Use the NaN filtering of the variable to set the correct depth per column
        layer_heights = layer_heights.where(variable.notnull())

        # Calculate depth average using relative value
        relative_values = variable * layer_heights

        # Calculate average
        return relative_values.sum(dim=dim_layer_name) / layer_heights.sum(
            dim=dim_layer_name
        )

execute(self, value_arrays, logger)

Calculate depth average of assumed z-layers.

Parameters:

Name Type Description Default
value_array DataArray

Values to average over the depth

required

Returns:

Type Description
DataArray

Depth-averaged values

Source code in rules/depth_average_rule.py
def execute(
    self, value_arrays: Dict[str, _xr.DataArray], logger: ILogger
) -> _xr.DataArray:
    """Calculate depth average of assumed z-layers.

    Args:
        value_array (DataArray): Values to average over the depth

    Returns:
        DataArray: Depth-averaged values
    """

    # The first DataArray in our value_arrays contains the values to be averaged
    # but the name of the key is given by the user, and is unknown here, so use
    # the ordering defined in the parser.
    values_list = list(value_arrays.values())

    variable = values_list[0]
    bed_level_values = values_list[1]
    water_level_values = values_list[2]
    depths_interfaces = values_list[3]

    # Get the dimension names for the interfaces and for the layers
    dim_interfaces_name = list(depths_interfaces.dims)[0]
    interfaces_len = depths_interfaces[dim_interfaces_name].size

    dim_layer_name = [
        d for d in variable.dims if d not in water_level_values.dims
    ][0]
    layer_len = variable[dim_layer_name].size

    # interface dimension should always be one larger than layer dimension
    # Otherwise give an error to the user
    if interfaces_len != layer_len + 1:
        logger.log_error(
            f"The number of interfaces should be number of layers + 1. Number of "
            f"interfaces = {interfaces_len}. Number of layers = {layer_len}."
        )
        return variable

    # Deal with open layer system at water level and bed level
    depths_interfaces.values[depths_interfaces.values.argmin()] = -100000
    depths_interfaces.values[depths_interfaces.values.argmax()] = 100000

    # Broadcast the depths to the dimensions of the bed levels. Then make a
    # correction for the depths to the bed level, in other words all depths lower
    # than the bed level will be corrected to the bed level.
    depths_interfaces_broadcasted = depths_interfaces.broadcast_like(
        bed_level_values
    )

    corrected_depth_bed = depths_interfaces_broadcasted.where(
        bed_level_values < depths_interfaces_broadcasted, bed_level_values
    )

    # Make a similar correction for the waterlevels (first broadcast to match
    # dimensions and then replace all values higher than waterlevel with
    # waterlevel)
    corrected_depth_bed = corrected_depth_bed.broadcast_like(water_level_values)
    corrected_depth_bed = corrected_depth_bed.where(
        water_level_values > corrected_depth_bed, water_level_values
    )

    # Calculate the layer heights between depths
    layer_heights = corrected_depth_bed.diff(dim=dim_interfaces_name)
    layer_heights = layer_heights.rename({dim_interfaces_name: dim_layer_name})

    # Use the NaN filtering of the variable to set the correct depth per column
    layer_heights = layer_heights.where(variable.notnull())

    # Calculate depth average using relative value
    relative_values = variable * layer_heights

    # Calculate average
    return relative_values.sum(dim=dim_layer_name) / layer_heights.sum(
        dim=dim_layer_name
    )