Source code for hydromt.data_adapter.geodataframe

# -*- coding: utf-8 -*-
from os.path import join
import numpy as np
import geopandas as gpd
from shapely.geometry import box
import logging
from typing import Union, NewType
from pathlib import Path

from .data_adapter import DataAdapter
from .. import io


logger = logging.getLogger(__name__)

__all__ = ["GeoDataFrameAdapter", "GeoDataframeSource"]

GeoDataframeSource = NewType("GeoDataframeSource", Union[str, Path])


[docs]class GeoDataFrameAdapter(DataAdapter): _DEFAULT_DRIVER = "vector" _DRIVERS = { "xy": "xy", "csv": "csv", "xls": "xls", "xlsx": "xlsx", }
[docs] def __init__( self, path, driver=None, filesystem="local", crs=None, nodata=None, rename={}, unit_mult={}, unit_add={}, units={}, meta={}, **kwargs, ): """Initiates data adapter for geospatial vector data. This object contains all properties required to read supported files into a single unified :py:func:`geopandas.GeoDataFrame`. In addition it keeps meta data to be able to reproduce which data is used. Parameters ---------- path: str, Path Path to data source. driver: {'vector', 'vector_table'}, optional Driver to read files with, for 'vector' :py:func:`~geopandas.read_file`, for {'vector_table'} :py:func:`hydromt.io.open_vector_from_table` By default the driver is inferred from the file extension and falls back to 'vector' if unknown. filesystem: {'local', 'gcs', 's3'}, optional Filesystem where the data is stored (local, cloud, http etc.). By default, local. crs: int, dict, or str, optional Coordinate Reference System. Accepts EPSG codes (int or str); proj (str or dict) or wkt (str). Only used if the data has no native CRS. nodata: (dictionary) float, int, optional Missing value number. Only used if the data has no native missing value. Multiple nodata values can be provided in a list and differentiated between dataframe columns using a dictionary with variable (column) keys. The nodata values are only applied to columns with numeric data. rename: dict, optional Mapping of native data source variable to output source variable name as required by hydroMT. unit_mult, unit_add: dict, optional Scaling multiplication and addition to change to map from the native data unit to the output data unit as required by hydroMT. meta: dict, optional Metadata information of dataset, prefably containing the following keys: {'source_version', 'source_url', 'source_license', 'paper_ref', 'paper_doi', 'category'} **kwargs Additional key-word arguments passed to the driver. """ super().__init__( path=path, driver=driver, filesystem=filesystem, crs=crs, nodata=nodata, rename=rename, unit_mult=unit_mult, unit_add=unit_add, meta=meta, **kwargs, )
[docs] def to_file( self, data_root, data_name, bbox=None, driver=None, variables=None, logger=logger, **kwargs, ): """Save a data slice to file. Parameters ---------- data_root : str, Path Path to output folder data_name : str Name of output file without extension. bbox : array-like of floats (xmin, ymin, xmax, ymax) bounding box of area of interest. driver : str, optional Driver to write file, e.g.: 'GPKG', 'ESRI Shapefile' or any fiona data type, by default None variables : list of str, optional Names of GeoDataset variables to return. By default all dataset variables are returned. Returns ------- fn_out: str Absolute path to output file driver: str Name of driver to read data with, see :py:func:`~hydromt.data_catalog.DataCatalog.get_geodataframe` """ kwargs.pop("time_tuple", None) gdf = self.get_data(bbox=bbox, variables=variables, logger=logger) if gdf.index.size == 0: return None, None if driver is None: _lst = ["csv", "xls", "xlsx", "xy", "vector_table"] driver = "csv" if self.driver in _lst else "GPKG" # always write netcdf if driver == "csv": fn_out = join(data_root, f"{data_name}.csv") if not np.all(gdf.geometry.type == "Point"): raise ValueError( f"{data_name} contains other geometries than 'Point' " "which cannot be written to csv." ) gdf["x"], gdf["y"] = gdf.geometry.x, gdf.geometry.y gdf.drop(columns="geometry").to_csv(fn_out, **kwargs) else: driver_extensions = { "ESRI Shapefile": ".shp", } ext = driver_extensions.get(driver, driver).lower() fn_out = join(data_root, f"{data_name}.{ext}") gdf.to_file(fn_out, driver=driver, **kwargs) driver = "vector" return fn_out, driver
[docs] def get_data( self, bbox=None, geom=None, predicate="intersects", buffer=0, logger=logger, variables=None, **kwargs, # this is not used, for testing only ): """Returns a clipped and unified GeoDataFrame (vector) based on the properties of this GeoDataFrameAdapter. For a detailed description see: :py:func:`~hydromt.data_catalog.DataCatalog.get_geodataframe` """ # If variable is string, convert to list if variables: variables = np.atleast_1d(variables).tolist() kwargs = self.kwargs.copy() if "storage_options" in kwargs: # not sure if storage options can be passed to fiona.open() # for now throw NotImplemented Error raise NotImplementedError( "Remote file storage_options not implemented for GeoDataFrame" ) _ = self.resolve_paths() # throw nice error if data not found # parse geom, bbox and buffer arguments clip_str = "" if geom is None and bbox is not None: # convert bbox to geom with crs EPGS:4326 to apply buffer later geom = gpd.GeoDataFrame(geometry=[box(*bbox)], crs=4326) clip_str = " and clip to bbox (epsg:4326)" elif geom is not None: clip_str = f" and clip to geom (epsg:{geom.crs.to_epsg():d})" if geom is not None: # make sure geom is projected > buffer in meters! if geom.crs.is_geographic and buffer > 0: geom = geom.to_crs(3857) geom = geom.buffer(buffer) # a buffer with zero fixes some topology errors bbox_str = ", ".join([f"{c:.3f}" for c in geom.total_bounds]) clip_str = f"{clip_str} [{bbox_str}]" if kwargs.pop("within", False): # for backward compatibility predicate = "contains" # read and clip logger.info(f"GeoDataFrame: Read {self.driver} data{clip_str}.") if self.driver in ["csv", "xls", "xlsx", "xy", "vector", "vector_table"]: # "csv", "xls", "xlsx", "xy" deprecated use vector_table instead. # specific driver should be added to open_vector kwargs if "driver" not in kwargs and self.driver in ["csv", "xls", "xlsx", "xy"]: kwargs.update(driver=self.driver) # Check if file-object is required because of additional options gdf = io.open_vector( self.path, crs=self.crs, geom=geom, predicate=predicate, **kwargs ) else: raise ValueError(f"GeoDataFrame: driver {self.driver} unknown.") # rename and select columns if self.rename: rename = {k: v for k, v in self.rename.items() if k in gdf.columns} gdf = gdf.rename(columns=rename) if variables is not None: if np.any([var not in gdf.columns for var in variables]): raise ValueError(f"GeoDataFrame: Not all variables found: {variables}") if "geometry" not in variables: # always keep geometry column variables = variables + ["geometry"] gdf = gdf.loc[:, variables] # nodata and unit conversion for numeric data if gdf.index.size == 0: logger.warning(f"GeoDataFrame: No data within spatial domain {self.path}.") else: # parse nodata values cols = gdf.select_dtypes([np.number]).columns if self.nodata is not None and len(cols) > 0: if not isinstance(self.nodata, dict): nodata = {c: self.nodata for c in cols} else: nodata = self.nodata for c in cols: mv = nodata.get(c, None) if mv is not None: is_nodata = np.isin(gdf[c], np.atleast_1d(mv)) gdf[c] = np.where(is_nodata, np.nan, gdf[c]) # unit conversion unit_names = list(self.unit_mult.keys()) + list(self.unit_add.keys()) unit_names = [k for k in unit_names if k in gdf.columns] if len(unit_names) > 0: logger.debug( f"GeoDataFrame: Convert units for {len(unit_names)} columns." ) for name in list(set(unit_names)): # unique m = self.unit_mult.get(name, 1) a = self.unit_add.get(name, 0) gdf[name] = gdf[name] * m + a # set meta data gdf.attrs.update(self.meta) return gdf