Source code for hydromt.data_adapter.dataframe

"""Implementation for the Pandas Dataframe adapter."""
import warnings
from datetime import datetime
from logging import Logger, getLogger
from os.path import join
from typing import Any, Dict, List, Optional, Tuple, Union, cast

import numpy as np
import pandas as pd
from pystac import Asset as StacAsset
from pystac import Catalog as StacCatalog
from pystac import Item as StacItem

from hydromt.nodata import NoDataException, NoDataStrategy, _exec_nodata_strat
from hydromt.typing import ErrorHandleMethod, StrPath, TimeRange, Variables

from .data_adapter import DataAdapter

logger = getLogger(__name__)

__all__ = [
    "DataFrameAdapter",
]


[docs] class DataFrameAdapter(DataAdapter): """DataAdapter implementation for Pandas Dataframes.""" _DEFAULT_DRIVER = "csv" _DRIVERS = {"xlsx": "excel", "xls": "excel"}
[docs] def __init__( self, path: StrPath, driver: Optional[str] = None, filesystem: Optional[str] = None, nodata: Optional[Union[dict, float, int]] = None, rename: Optional[dict] = None, unit_mult: Optional[dict] = None, unit_add: Optional[dict] = None, meta: Optional[dict] = None, attrs: Optional[dict] = None, driver_kwargs: Optional[dict] = None, storage_options: Optional[dict] = None, name: str = "", # optional for now catalog_name: str = "", # optional for now provider: Optional[str] = None, version: Optional[str] = None, **kwargs, ): """Initiate data adapter for 2D tabular data. This object contains all properties required to read supported files into a :py:func:`pandas.DataFrame`. In addition it keeps meta data to be able to reproduce which data is used. Parameters ---------- path: str, Path Path to data source. If the dataset consists of multiple files, the path may contain {variable}, {year}, {month} placeholders as well as path search pattern using a '*' wildcard. driver: {'csv', 'parquet', 'xlsx', 'xls', 'fwf'}, optional Driver to read files with, for 'csv' :py:func:`~pandas.read_csv`, for 'parquet' :py:func:`~pandas.read_parquet`, for {'xlsx', 'xls'} :py:func:`~pandas.read_excel`, and for 'fwf' :py:func:`~pandas.read_fwf`. By default the driver is inferred from the file extension and falls back to 'csv' if unknown. filesystem: str, optional Filesystem where the data is stored (local, cloud, http etc.). If None (default) the filesystem is inferred from the path. See :py:func:`fsspec.registry.known_implementations` for all options. nodata: dict, float, int, optional Missing value number. Only used if the data has no native missing value. Nodata values can be differentiated between variables using a dictionary. rename: dict, optional Mapping of native data source variable to output source variable name as required by hydroMT. unit_mult, unit_add: dict, optional Scaling multiplication and addition to change to map from the native data unit to the output data unit as required by hydroMT. meta: dict, optional Metadata information of dataset, prefably containing the following keys: {'source_version', 'source_url', 'source_license', 'paper_ref', 'paper_doi', 'category'} placeholders: dict, optional Placeholders to expand yaml entry to multiple entries (name and path) based on placeholder values attrs: dict, optional Additional attributes relating to data variables. For instance unit or long name of the variable. extent: None Not used in this adapter. Only here for compatability with other adapters. driver_kwargs, dict, optional Additional key-word arguments passed to the driver. storage_options: dict, optional Additional key-word arguments passed to the fsspec FileSystem object. name, catalog_name: str, optional Name of the dataset and catalog, optional for now. """ driver_kwargs = driver_kwargs or {} attrs = attrs or {} meta = meta or {} unit_add = unit_add or {} unit_mult = unit_mult or {} rename = rename or {} storage_options = storage_options or {} if kwargs: warnings.warn( "Passing additional keyword arguments to be used by the " "DataFrameAdapter driver is deprecated and will be removed " "in a future version. Please use 'driver_kwargs' instead.", DeprecationWarning, stacklevel=2, ) driver_kwargs.update(kwargs) super().__init__( path=path, driver=driver, filesystem=filesystem, nodata=nodata, rename=rename, unit_mult=unit_mult, unit_add=unit_add, meta=meta, attrs=attrs, driver_kwargs=driver_kwargs, storage_options=storage_options, name=name, catalog_name=catalog_name, provider=provider, version=version, )
[docs] def to_file( self, data_root: StrPath, data_name: str, driver: Optional[str] = None, variables: Optional[Variables] = None, time_tuple: Optional[TimeRange] = None, handle_nodata: NoDataStrategy = NoDataStrategy.RAISE, logger: Logger = logger, **kwargs, ) -> Optional[Tuple[StrPath, str, Dict[str, Any]]]: """Save a dataframe slice to a file. Parameters ---------- data_root : str or Path Path to the output folder. data_name : str Name of the output file without extension. driver : str, optional Driver to write the file, e.g., 'csv','parquet', 'excel'. If None, the default behavior is used. variables : list of str, optional Names of DataFrame columns to include in the output. By default, all columns are included. time_tuple : tuple of str or datetime, optional Start and end date of the period of interest. By default, the entire time period of the DataFrame is included. **kwargs : dict Additional keyword arguments to be passed to the file writing method. Returns ------- fn_out : str Absolute path to the output file. driver : str Name of the driver used to read the data. See :py:func:`~hydromt.data_catalog.DataCatalog.get_geodataset`. kwargs: dict The additional keyword arguments that were passed in. """ kwargs.pop("bbox", None) obj = self.get_data( time_tuple=time_tuple, variables=variables, handle_nodata=handle_nodata, logger=logger, ) if obj is None: return None read_kwargs = dict() if driver is None or driver == "csv": # always write as CSV driver = "csv" fn_out = join(data_root, f"{data_name}.csv") obj.to_csv(fn_out, **kwargs) read_kwargs["index_col"] = 0 elif driver == "parquet": fn_out = join(data_root, f"{data_name}.parquet") obj.to_parquet(fn_out, **kwargs) elif driver == "excel": fn_out = join(data_root, f"{data_name}.xlsx") obj.to_excel(fn_out, **kwargs) else: raise ValueError(f"DataFrame: Driver {driver} is unknown.") return fn_out, driver, read_kwargs
[docs] def get_data( self, variables: Optional[Variables] = None, time_tuple: Optional[TimeRange] = None, logger: Logger = logger, handle_nodata: NoDataStrategy = NoDataStrategy.RAISE, ) -> Optional[pd.DataFrame]: """Return a DataFrame. Returned data is optionally sliced by time and variables, based on the properties of this DataFrameAdapter. For a detailed description see: :py:func:`~hydromt.data_catalog.DataCatalog.get_dataframe` """ try: # load data fns = self._resolve_paths(variables=variables) self.mark_as_used() # mark used df = self._read_data(fns, logger=logger) # just raise an exxceptoin so we can handle the strategy in one place (the except) if df is None: raise NoDataException() # rename variables and parse nodata df = self._rename_vars(df) df = self._set_nodata(df) # slice data df = DataFrameAdapter._slice_data( df, variables, time_tuple, logger=logger, ) if df is None: raise NoDataException() # uniformize data df = self._apply_unit_conversion(df, logger=logger) df = self._set_metadata(df) return df except NoDataException: _exec_nodata_strat( f"No data was read from source: {self.name}", strategy=handle_nodata, logger=logger, ) return None
def _read_data( self, fns: List[StrPath], logger: Logger = logger, ) -> Optional[pd.DataFrame]: if len(fns) > 1: raise ValueError( f"DataFrame: Reading multiple {self.driver} files is not supported." ) kwargs = self.driver_kwargs.copy() path = fns[0] logger.info(f"Reading {self.name} {self.driver} data from {self.path}") if self.driver in ["csv"]: df = pd.read_csv(path, **kwargs) elif self.driver == "parquet": _ = kwargs.pop("index_col", None) df = pd.read_parquet(path, **kwargs) elif self.driver in ["xls", "xlsx", "excel"]: df = pd.read_excel(path, engine="openpyxl", **kwargs) elif self.driver in ["fwf"]: df = pd.read_fwf(path, **kwargs) else: raise IOError(f"DataFrame: driver {self.driver} unknown.") # cast is just for the type checkers, it doesn't actually do anything df = cast(pd.DataFrame, df) if len(df) == 0: return None else: return df def _rename_vars(self, df: pd.DataFrame) -> pd.DataFrame: if self.rename: rename = {k: v for k, v in self.rename.items() if k in df.columns} df = df.rename(columns=rename) return df def _set_nodata(self, df: pd.DataFrame) -> pd.DataFrame: # parse nodata values cols = df.select_dtypes([np.number]).columns if self.nodata is not None and len(cols) > 0: if not isinstance(self.nodata, dict): nodata = {c: self.nodata for c in cols} else: nodata = self.nodata for c in cols: mv = nodata.get(c, None) if mv is not None: is_nodata = np.isin(df[c], np.atleast_1d(mv)) df[c] = np.where(is_nodata, np.nan, df[c]) return df @staticmethod def _slice_data( df: pd.DataFrame, variables: Optional[Variables] = None, time_tuple: Optional[TimeRange] = None, logger: Logger = logger, ) -> Optional[pd.DataFrame]: """Return a sliced DataFrame. Parameters ---------- df : pd.DataFrame the dataframe to be sliced. variables : list of str, optional Names of DataFrame columns to include in the output. By default all columns time_tuple : tuple of str, datetime, optional Start and end date of period of interest. By default the entire time period of the dataset is returned. Returns ------- pd.DataFrame Tabular data """ if variables is not None: variables = np.atleast_1d(variables).tolist() if np.any([var not in df.columns for var in variables]): raise ValueError(f"DataFrame: Not all variables found: {variables}") df = df.loc[:, variables] if time_tuple is not None and np.dtype(df.index).type == np.datetime64: logger.debug(f"Slicing time dime {time_tuple}") try: df = df[df.index.slice_indexer(*time_tuple)] except IndexError: df = pd.DataFrame() if len(df) == 0: return None else: return df def _apply_unit_conversion( self, df: pd.DataFrame, logger: Logger = logger ) -> pd.DataFrame: unit_names = list(self.unit_mult.keys()) + list(self.unit_add.keys()) unit_names = [k for k in unit_names if k in df.columns] if len(unit_names) > 0: logger.debug(f"Convert units for {len(unit_names)} columns.") for name in list(set(unit_names)): # unique m = self.unit_mult.get(name, 1) a = self.unit_add.get(name, 0) df[name] = df[name] * m + a return df def _set_metadata(self, df: pd.DataFrame) -> pd.DataFrame: df.attrs.update(self.meta) # set column attributes for col in self.attrs: if col in df.columns: df[col].attrs.update(**self.attrs[col]) return df def to_stac_catalog( self, on_error: ErrorHandleMethod = ErrorHandleMethod.COERCE, ) -> Optional[StacCatalog]: """ Convert a dataframe into a STAC Catalog representation. The collection will contain an asset for each of the associated files. Parameters ---------- - on_error (str, optional): The error handling strategy. Options are: "raise" to raise an error on failure, "skip" to skip the dataset on failure, and "coerce" (default) to set default values on failure. Returns ------- - Optional[StacCatalog]: The STAC Catalog representation of the dataset, or None if the dataset was skipped. """ if on_error == ErrorHandleMethod.SKIP: logger.warning( f"Skipping {self.name} during stac conversion because" "because detecting temporal extent failed." ) return elif on_error == ErrorHandleMethod.COERCE: stac_catalog = StacCatalog( self.name, description=self.name, ) stac_item = StacItem( self.name, geometry=None, bbox=[0, 0, 0, 0], properties=self.meta, datetime=datetime(1, 1, 1), ) stac_asset = StacAsset(str(self.path)) stac_item.add_asset("hydromt_path", stac_asset) stac_catalog.add_item(stac_item) return stac_catalog else: raise NotImplementedError( "DataframeAdapter does not support full stac conversion as it lacks" " spatio-temporal dimensions" )