Skip to content

data_access_layer

Module for DataAccessLayer class

!!! classes DataAccessLayer

DataAccessLayer (IDataAccessLayer)

Implementation of the data layer

Source code in entities/data_access_layer.py
class DataAccessLayer(IDataAccessLayer):
    """Implementation of the data layer"""

    def __init__(self, logger: ILogger):
        self._logger = logger

    def retrieve_file_names(self, path: Path) -> dict:
        """
        Find all files according to the pattern in the path string
        If the user gives one filename, one file is returned. The user
        can give in a * in the filename and all files that correspond to
        that pattern will be retrieved.

        Args:
            path (str): path to input file (with * for generic part)

        Returns:
            List: List of strings with all files in folder according to pattern

        """
        name_list = list(path.parent.glob(path.name))
        # check if there is at least 1 file found.
        if len(name_list) == 0:
            message = f"""No files found for inputfilename {path.name}. \
                          Make sure the input file location is valid."""
            raise FileExistsError(message)

        names = {}
        for name in name_list:
            if "*" in path.name:
                part = re.findall(path.name.replace("*", "(.*)"), name.as_posix())
                names["_".join(part)] = name
            else:
                names[""] = name
        return names

    def read_input_file(self, path: Path) -> IModelData:
        """Reads input file from provided path

        Args:
            path (str): path to input file

        Returns:
            IModelData: Data regarding model

        Raises:
            FileExistsError: if file does not exist
            AttributeError: if yaml data is invalid
        """
        self._logger.log_info(f"Creating model data based on yaml file {path}")

        if not path.exists():
            msg = f"ERROR: The input file {path} does not exist."
            self._logger.log_error(msg)
            raise FileExistsError(msg)

        with open(path, "r", encoding="utf-8") as stream:
            contents: dict[Any, Any] = _yaml.load(
                stream, Loader=self.__create_yaml_loader()
            )
            model_data_builder = ModelDataBuilder(self._logger)
            try:
                yaml_data = model_data_builder.parse_yaml_data(contents)
            except AttributeError as exc:
                raise AttributeError(f"Error reading input file. {exc}") from exc
            return yaml_data

    def read_input_dataset(self, dataset_data: IDatasetData) -> _xr.Dataset:
        """Uses the provided dataset_data to create/read a xarray Dataset

        Args:
            dataset_data (IDatasetData): dataset data for creating an
                                         xarray dataset

        Returns:
            _xr.Dataset: Dataset based on provided dataset_data
        """
        # get start and end date from input file and convert to date format
        # if start or end date is not given, then use None to slice the data
        date_format = "%d-%m-%Y"
        filter_start_date = None
        ds_start_date = dataset_data.start_date
        if ds_start_date != "None":
            filter_start_date = datetime.strptime(ds_start_date, date_format)
        filter_end_date = None
        ds_end_date = dataset_data.end_date
        if ds_end_date != "None":
            filter_end_date = datetime.strptime(ds_end_date, date_format)

        if dataset_data.path.suffix != ".nc":
            message = f"""The file {dataset_data.path} is not supported. \
                          Currently only UGrid (NetCDF) files are supported."""
            raise NotImplementedError(message)

        # open input dataset (from .nc file)
        try:
            dataset: _xr.Dataset = _xr.open_dataset(
                dataset_data.path, mask_and_scale=True
            )
            # mask_and_scale argument is needed to prevent inclusion of NaN's
            # in dataset for missing values. This inclusion converts integers
            # to floats
        except ValueError as exc:
            msg = "ERROR: Cannot open input .nc file -- " + str(dataset_data.path)
            raise ValueError(msg) from exc

        # apply time filter on input dataset
        try:
            if filter_start_date is not None or filter_end_date is not None:
                time_filter = f"({filter_start_date}, {filter_end_date})"
                self._logger.log_info(f"Applying time filter {time_filter} on dataset")
                dataset = dataset.sel(time=slice(filter_start_date, filter_end_date))
        except ValueError as exc:
            msg = "ERROR: error applying time filter on dataset"
            raise ValueError(msg) from exc
        return dataset

    def write_output_file(
        self, dataset: _xr.Dataset, path: Path, settings: OutputFileSettings
    ) -> None:
        """Write XArray dataset to specified path

        Args:
            dataset (XArray dataset): dataset to write
            path (str): path to output file
            settings (OutputFileSettings): settings to use for saving output

        Returns:
            None

        Raises:
            FileExistsError: if output file location does not exist
            OSError: if output file cannot be written
        """
        self._logger.log_info(f"Writing model output data to {path}")

        if not Path.exists(path.parent):
            # try to make intermediate folders
            Path(path.parent).mkdir(parents=True, exist_ok=True)

            if not Path.exists(path.parent):
                message = f"""The path {path.parent} is not found. \
                            Make sure the output file location is valid."""
                raise FileExistsError(message)

        if Path(path).suffix != ".nc":
            message = f"""The file {path} is not supported. \
                          Currently only UGrid (NetCDF) files are supported."""
            raise NotImplementedError(message)

        try:
            dataset.attrs["Version"] = settings.application_version
            dataset.attrs["Generated by"] = settings.application_name

            if settings.variables_to_save and len(settings.variables_to_save) > 0:
                dataset = reduce_dataset_for_writing(
                    dataset, settings.variables_to_save, self._logger
                )

            dataset.to_netcdf(path, format="NETCDF4")
            # D-Flow FM sometimes still uses netCDF3.
            # If necessary we can revert to "NETCDF4_CLASSIC"
            # (Data is stored in an HDF5 file, using only netCDF 3 compatible
            # API features.)

            # TO DO: write application_version to output file as a global attribute
        except OSError as exc:
            msg = f"ERROR: Cannot write output .nc file -- {path}"
            self._logger.log_error(msg)
            raise OSError(msg) from exc

    def yaml_include_constructor(self, loader: _yaml.Loader, node: _yaml.Node) -> Any:
        """constructor function to make !include (referencedfile) possible"""

        file_path = Path(loader.name).parent
        file_path = file_path.joinpath(loader.construct_yaml_str(node)).resolve()
        with open(file=file_path, mode="r", encoding="utf-8") as incl_file:
            return _yaml.load(incl_file, type(loader))

    def __create_yaml_loader(self):
        """create yaml loader"""

        loader = _yaml.FullLoader
        loader.add_constructor("!include", self.yaml_include_constructor)

        # Add support for scientific notation (example 1e5=100000)
        # Define the YAML float tag and regex pattern for scientific notation
        float_decimal = r"[-+]?(?:\d[\d_]*)\.[0-9_]*(?:[eE][-+]?\d+)?"
        float_exponent = r"[-+]?(?:\d[\d_]*)(?:[eE][-+]?\d+)"
        float_leading_dot = r"\.[\d_]+(?:[eE][-+]\d+)?"
        float_time = r"[-+]?\d[\d_]*(?::[0-5]?\d)+\.[\d_]*"
        float_inf = r"[-+]?\.(?:inf|Inf|INF)"
        float_nan = r"\.(?:nan|NaN|NAN)"

        float_regex_pattern = rf"""^(?:
            {float_decimal}
            |{float_exponent}
            |{float_leading_dot}
            |{float_time}
            |{float_inf}
            |{float_nan})$"""

        float_regex = re.compile(float_regex_pattern, re.X)

        loader.add_implicit_resolver(
            "tag:yaml.org,2002:float",
            float_regex,
            list("-+0123456789."),
        )

        return loader

read_input_dataset(self, dataset_data)

Uses the provided dataset_data to create/read a xarray Dataset

Parameters:

Name Type Description Default
dataset_data IDatasetData

dataset data for creating an xarray dataset

required

Returns:

Type Description
_xr.Dataset

Dataset based on provided dataset_data

Source code in entities/data_access_layer.py
def read_input_dataset(self, dataset_data: IDatasetData) -> _xr.Dataset:
    """Uses the provided dataset_data to create/read a xarray Dataset

    Args:
        dataset_data (IDatasetData): dataset data for creating an
                                     xarray dataset

    Returns:
        _xr.Dataset: Dataset based on provided dataset_data
    """
    # get start and end date from input file and convert to date format
    # if start or end date is not given, then use None to slice the data
    date_format = "%d-%m-%Y"
    filter_start_date = None
    ds_start_date = dataset_data.start_date
    if ds_start_date != "None":
        filter_start_date = datetime.strptime(ds_start_date, date_format)
    filter_end_date = None
    ds_end_date = dataset_data.end_date
    if ds_end_date != "None":
        filter_end_date = datetime.strptime(ds_end_date, date_format)

    if dataset_data.path.suffix != ".nc":
        message = f"""The file {dataset_data.path} is not supported. \
                      Currently only UGrid (NetCDF) files are supported."""
        raise NotImplementedError(message)

    # open input dataset (from .nc file)
    try:
        dataset: _xr.Dataset = _xr.open_dataset(
            dataset_data.path, mask_and_scale=True
        )
        # mask_and_scale argument is needed to prevent inclusion of NaN's
        # in dataset for missing values. This inclusion converts integers
        # to floats
    except ValueError as exc:
        msg = "ERROR: Cannot open input .nc file -- " + str(dataset_data.path)
        raise ValueError(msg) from exc

    # apply time filter on input dataset
    try:
        if filter_start_date is not None or filter_end_date is not None:
            time_filter = f"({filter_start_date}, {filter_end_date})"
            self._logger.log_info(f"Applying time filter {time_filter} on dataset")
            dataset = dataset.sel(time=slice(filter_start_date, filter_end_date))
    except ValueError as exc:
        msg = "ERROR: error applying time filter on dataset"
        raise ValueError(msg) from exc
    return dataset

read_input_file(self, path)

Reads input file from provided path

Parameters:

Name Type Description Default
path str

path to input file

required

Returns:

Type Description
IModelData

Data regarding model

Exceptions:

Type Description
FileExistsError

if file does not exist

AttributeError

if yaml data is invalid

Source code in entities/data_access_layer.py
def read_input_file(self, path: Path) -> IModelData:
    """Reads input file from provided path

    Args:
        path (str): path to input file

    Returns:
        IModelData: Data regarding model

    Raises:
        FileExistsError: if file does not exist
        AttributeError: if yaml data is invalid
    """
    self._logger.log_info(f"Creating model data based on yaml file {path}")

    if not path.exists():
        msg = f"ERROR: The input file {path} does not exist."
        self._logger.log_error(msg)
        raise FileExistsError(msg)

    with open(path, "r", encoding="utf-8") as stream:
        contents: dict[Any, Any] = _yaml.load(
            stream, Loader=self.__create_yaml_loader()
        )
        model_data_builder = ModelDataBuilder(self._logger)
        try:
            yaml_data = model_data_builder.parse_yaml_data(contents)
        except AttributeError as exc:
            raise AttributeError(f"Error reading input file. {exc}") from exc
        return yaml_data

retrieve_file_names(self, path)

Find all files according to the pattern in the path string If the user gives one filename, one file is returned. The user can give in a * in the filename and all files that correspond to that pattern will be retrieved.

Parameters:

Name Type Description Default
path str

path to input file (with * for generic part)

required

Returns:

Type Description
List

List of strings with all files in folder according to pattern

Source code in entities/data_access_layer.py
def retrieve_file_names(self, path: Path) -> dict:
    """
    Find all files according to the pattern in the path string
    If the user gives one filename, one file is returned. The user
    can give in a * in the filename and all files that correspond to
    that pattern will be retrieved.

    Args:
        path (str): path to input file (with * for generic part)

    Returns:
        List: List of strings with all files in folder according to pattern

    """
    name_list = list(path.parent.glob(path.name))
    # check if there is at least 1 file found.
    if len(name_list) == 0:
        message = f"""No files found for inputfilename {path.name}. \
                      Make sure the input file location is valid."""
        raise FileExistsError(message)

    names = {}
    for name in name_list:
        if "*" in path.name:
            part = re.findall(path.name.replace("*", "(.*)"), name.as_posix())
            names["_".join(part)] = name
        else:
            names[""] = name
    return names

write_output_file(self, dataset, path, settings)

Write XArray dataset to specified path

Parameters:

Name Type Description Default
dataset XArray dataset

dataset to write

required
path str

path to output file

required
settings OutputFileSettings

settings to use for saving output

required

Returns:

Type Description
None

None

Exceptions:

Type Description
FileExistsError

if output file location does not exist

OSError

if output file cannot be written

Source code in entities/data_access_layer.py
def write_output_file(
    self, dataset: _xr.Dataset, path: Path, settings: OutputFileSettings
) -> None:
    """Write XArray dataset to specified path

    Args:
        dataset (XArray dataset): dataset to write
        path (str): path to output file
        settings (OutputFileSettings): settings to use for saving output

    Returns:
        None

    Raises:
        FileExistsError: if output file location does not exist
        OSError: if output file cannot be written
    """
    self._logger.log_info(f"Writing model output data to {path}")

    if not Path.exists(path.parent):
        # try to make intermediate folders
        Path(path.parent).mkdir(parents=True, exist_ok=True)

        if not Path.exists(path.parent):
            message = f"""The path {path.parent} is not found. \
                        Make sure the output file location is valid."""
            raise FileExistsError(message)

    if Path(path).suffix != ".nc":
        message = f"""The file {path} is not supported. \
                      Currently only UGrid (NetCDF) files are supported."""
        raise NotImplementedError(message)

    try:
        dataset.attrs["Version"] = settings.application_version
        dataset.attrs["Generated by"] = settings.application_name

        if settings.variables_to_save and len(settings.variables_to_save) > 0:
            dataset = reduce_dataset_for_writing(
                dataset, settings.variables_to_save, self._logger
            )

        dataset.to_netcdf(path, format="NETCDF4")
        # D-Flow FM sometimes still uses netCDF3.
        # If necessary we can revert to "NETCDF4_CLASSIC"
        # (Data is stored in an HDF5 file, using only netCDF 3 compatible
        # API features.)

        # TO DO: write application_version to output file as a global attribute
    except OSError as exc:
        msg = f"ERROR: Cannot write output .nc file -- {path}"
        self._logger.log_error(msg)
        raise OSError(msg) from exc

yaml_include_constructor(self, loader, node)

constructor function to make !include (referencedfile) possible

Source code in entities/data_access_layer.py
def yaml_include_constructor(self, loader: _yaml.Loader, node: _yaml.Node) -> Any:
    """constructor function to make !include (referencedfile) possible"""

    file_path = Path(loader.name).parent
    file_path = file_path.joinpath(loader.construct_yaml_str(node)).resolve()
    with open(file=file_path, mode="r", encoding="utf-8") as incl_file:
        return _yaml.load(incl_file, type(loader))