Source code for hydromt.gis_utils

#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""gis related convience functions. More in pyflwdir.gis_utils"""
from os.path import dirname, join, isfile
import os
import glob
import sys
import subprocess
import numpy as np
import xarray as xr
import rasterio
from pyproj import CRS
from rasterio.transform import Affine
import geopandas as gpd
from shapely.geometry.base import BaseGeometry
from shapely.geometry import box
import logging
from pyflwdir import core_conversion, core_d8, core_ldd
from pyflwdir import gis_utils as gis
from typing import Optional, Tuple
from . import _compat


__all__ = ["spread2d", "nearest", "nearest_merge"]

logger = logging.getLogger(__name__)

_R = 6371e3  # Radius of earth in m. Use 3956e3 for miles
XATTRS = {
    "geographic": {
        "standard_name": "longitude",
        "long_name": "longitude coordinate",
        "short_name": "lon",
        "units": "degrees_east",
    },
    "projected": {
        "standard_name": "projection_x_coordinate",
        "long_name": "x coordinate of projection",
        "short_name": "x",
        "units": "m",
    },
}
YATTRS = {
    "geographic": {
        "standard_name": "latitude",
        "long_name": "latitude coordinate",
        "short_name": "lat",
        "units": "degrees_north",
    },
    "projected": {
        "standard_name": "projection_y_coordinate",
        "long_name": "y coordinate of projection",
        "short_name": "y",
        "units": "m",
    },
}
PCR_VS_MAP = {"ldd": "ldd"}
GDAL_DRIVER_CODE_MAP = {
    "asc": "AAIGrid",
    "blx": "BLX",
    "bmp": "BMP",
    "bt": "BT",
    "dat": "ZMap",
    "dem": "USGSDEM",
    "gen": "ADRG",
    "gif": "GIF",
    "gpkg": "GPKG",
    "grd": "NWT_GRD",
    "gsb": "NTv2",
    "gtx": "GTX",
    "hdr": "MFF",
    "hf2": "HF2",
    "hgt": "SRTMHGT",
    "img": "HFA",
    "jpg": "JPEG",
    "kro": "KRO",
    "lcp": "LCP",
    "map": "PCRaster",
    "mbtiles": "MBTiles",
    "mpr/mpl": "ILWIS",
    "ntf": "NITF",
    "pix": "PCIDSK",
    "png": "PNG",
    "pnm": "PNM",
    "rda": "R",
    "rgb": "SGI",
    "rst": "RST",
    "rsw": "RMF",
    "sdat": "SAGA",
    "sqlite": "Rasterlite",
    "ter": "Terragen",
    "tif": "GTiff",
    "vrt": "VRT",
    "xpm": "XPM",
    "xyz": "XYZ",
}
GDAL_EXT_CODE_MAP = {v: k for k, v in GDAL_DRIVER_CODE_MAP.items()}

## GEOM functions


[docs]def nearest_merge( gdf1: gpd.GeoDataFrame, gdf2: gpd.GeoDataFrame, columns: Optional[list] = None, max_dist: Optional[float] = None, overwrite: bool = False, inplace: bool = False, logger=logger, ) -> gpd.GeoDataFrame: """Merge attributes of gdf2 with the nearest feature of gdf1, optionally bounded by a maximumum distance `max_dist`. Unless `overwrite = True`, gdf2 values are only merged where gdf1 has missing values. Parameters ---------- gdf1, gdf2: geopandas.GeoDataFrame Source `gdf1` and destination `gdf2` geometries. columns : list of str, optional Names of columns in `gdf2` to merge, by default None max_dist : float, optional Maximum distance threshold for merge, by default None, i.e.: no threshold. overwrite : bool, optional If False (default) gdf2 values are only merged where gdf1 has missing values, i.e. NaN values for existing columns or missing columns. Returns ------- gpd.GeoDataFrame Merged GeoDataFrames """ idx_nn, dst = nearest(gdf1, gdf2) if not inplace: gdf1 = gdf1.copy() valid = dst < max_dist if max_dist is not None else np.ones_like(idx_nn, dtype=bool) columns = gdf2.columns if columns is None else columns gdf1["distance_right"] = dst gdf1["index_right"] = -1 gdf1.loc[valid, "index_right"] = idx_nn[valid] skip = ["geometry"] for col in columns: if col in skip or col not in gdf2: if col not in gdf2: logger.warning(f"Column {col} not found in gdf2 and skipped.") continue new_vals = gdf2.loc[idx_nn[valid], col].values if col in gdf1 and not overwrite: old_vals = gdf1.loc[valid, col] replace = np.logical_or(old_vals.isnull(), old_vals.eq("")) new_vals = np.where(replace, new_vals, old_vals) gdf1.loc[valid, col] = new_vals return gdf1
[docs]def nearest( gdf1: gpd.GeoDataFrame, gdf2: gpd.GeoDataFrame ) -> Tuple[np.ndarray, np.ndarray]: """Return the index of and distance [m] to the nearest geometry in `gdf2` for each geometry of `gdf1`. For Line geometries in `gdf1` the nearest geometry is based line center point and for polygons on its representative point. Mixed geometry types are not yet supported. Note: Since geopandas v0.10.0 it contains a sjoin_nearest method which is very similar and should. Parameters ---------- gdf1, gdf2: geopandas.GeoDataFrame Source `gdf1` and destination `gdf2` geometries. Returns ------- index: ndarray index of nearest `gdf2` geometry dst: ndarray of float distance to the nearest `gdf2` geometry """ if np.all(gdf1.type == "Point"): pnts = gdf1.geometry.copy() elif np.all(np.isin(gdf1.type, ["LineString", "MultiLineString"])): pnts = gdf1.geometry.interpolate(0.5, normalized=True) # mid point elif np.all(np.isin(gdf1.type, ["Polygon", "MultiPolygon"])): pnts = gdf1.geometry.representative_point() # inside polygon else: raise NotImplementedError("Mixed geometry dataframes are not yet supported.") if gdf1.crs != gdf2.crs: pnts = pnts.to_crs(gdf2.crs) # find nearest # NOTE: requires shapely v2.0; changed in v0.6.1 if not _compat.HAS_SHAPELY20: raise ImportError("Shapely >= 2.0.0 is required for execution") other = pnts.geometry.values idx = gdf2.sindex.nearest(other, return_all=False)[1] # get distance in meters gdf2_nearest = gdf2.iloc[idx] if gdf2_nearest.crs.is_geographic: pnts = pnts.to_crs(3857) # web mercator gdf2_nearest = gdf2_nearest.to_crs(3857) dst = gdf2_nearest.distance(pnts, align=False).values return gdf2.index.values[idx], dst
[docs]def filter_gdf(gdf, geom=None, bbox=None, crs=None, predicate="intersects"): """Filter GeoDataFrame geometries based on geometry mask or bounding box.""" gtypes = (gpd.GeoDataFrame, gpd.GeoSeries, BaseGeometry) if bbox is not None and geom is None: if crs is None: crs = gdf.crs geom = gpd.GeoSeries([box(*bbox)], crs=crs) elif geom is not None and not isinstance(geom, gtypes): raise ValueError( f"Unknown geometry mask type {type(geom).__name__}. " "Provide geopandas GeoDataFrame, GeoSeries or shapely geometry." ) elif bbox is None and geom is None: raise ValueError("Either geom or bbox is required.") if not isinstance(geom, BaseGeometry): # reproject if gdf.crs is not None and geom.crs != gdf.crs: geom = geom.to_crs(gdf.crs) # convert geopandas to geometry geom = geom.unary_union idx = gdf.sindex.query(geom, predicate=predicate) return idx
# REPROJ
[docs]def utm_crs(bbox): """Returns wkt string of nearest UTM projects Parameters ---------- bbox : array-like of floats (xmin, ymin, xmax, ymax) bounding box in latlon WGS84 (EPSG:4326) coordinates Returns ------- crs: pyproj.CRS CRS of UTM projection """ left, bottom, right, top = bbox x = (left + right) / 2 y = (top + bottom) / 2 kwargs = dict(zone=int(np.ceil((x + 180) / 6))) # BUGFIX hydroMT v0.3.5: south=False doesn't work only add south=True if y<0 if y < 0: kwargs.update(south=True) # BUGFIX hydroMT v0.4.6: add datum epsg = CRS(proj="utm", datum="WGS84", ellps="WGS84", **kwargs).to_epsg() return CRS.from_epsg(epsg)
[docs]def parse_crs(crs, bbox=None): if crs == "utm": if bbox is not None: crs = utm_crs(bbox) else: raise ValueError('CRS "utm" requires bbox') else: crs = CRS.from_user_input(crs) return crs
[docs]def axes_attrs(crs): """ Provide CF-compliant variable names and metadata for axes Parameters ---------- crs: pyproj.CRS coordinate reference system Returns ------- x_dim: str - variable name of x dimension (e.g. 'x') y_dim: str - variable name of y dimension (e.g. 'lat') x_attr: dict - attributes of variable x y_attr: dict - attributes of variable y """ # check for type of crs crs_type = "geographic" if crs.is_geographic else "projected" y_dim = YATTRS[crs_type]["short_name"] x_dim = XATTRS[crs_type]["short_name"] y_attrs = YATTRS[crs_type] x_attrs = XATTRS[crs_type] return x_dim, y_dim, x_attrs, y_attrs
[docs]def meridian_offset(ds, x_name="x", bbox=None): """re-arange data along x dim""" if ds.raster.crs is None or ds.raster.crs.is_projected: raise ValueError("The method is only applicable to geographic CRS") lons = np.copy(ds[x_name].values) w, e = lons.min(), lons.max() if bbox is not None and bbox[0] < w and bbox[0] < -180: # 180W - 180E > 360W - 0W lons = np.where(lons > 0, lons - 360, lons) elif bbox is not None and bbox[2] > e and bbox[2] > 180: # 180W - 180E > 0E-360E lons = np.where(lons < 0, lons + 360, lons) elif e > 180: # 0E-360E > 180W - 180E lons = np.where(lons > 180, lons - 360, lons) else: return ds ds = ds.copy(deep=False) # make sure not to overwrite original ds ds[x_name] = xr.Variable(ds[x_name].dims, lons) return ds.sortby(x_name)
# TRANSFORM
[docs]def affine_to_coords(transform, shape, x_dim="x", y_dim="y"): """Returns a raster axis with pixel center coordinates based on the transform. Parameters ---------- transform : affine transform Two dimensional affine transform for 2D linear mapping shape : tuple of int The height, width of the raster. x_dim, y_dim: str The name of the x and y dimensions Returns ------- x, y coordinate arrays : dict of tuple with dims and coords """ if not isinstance(transform, Affine): transform = Affine(*transform) height, width = shape if transform.b == 0: x_coords, _ = transform * (np.arange(width) + 0.5, np.zeros(width) + 0.5) _, y_coords = transform * (np.zeros(height) + 0.5, np.arange(height) + 0.5) coords = { y_dim: (y_dim, y_coords), x_dim: (x_dim, x_coords), } else: x_coords, y_coords = ( transform * transform.translation(0.5, 0.5) * np.meshgrid(np.arange(width), np.arange(height)) ) coords = { "yc": ((y_dim, x_dim), y_coords), "xc": ((y_dim, x_dim), x_coords), } return coords
def affine_to_meshgrid(transform, shape): """Returns a mesgrid of pixel center coordinates based on the transform. Parameters ---------- transform : affine transform Two dimensional affine transform for 2D linear mapping shape : tuple of int The height, width of the raster. Returns ------- x_coords, y_coords: ndarray 2D arrays of x and y coordinates """ if not isinstance(transform, Affine): transform = Affine(*transform) height, width = shape x_coords, y_coords = ( transform * transform.translation(0.5, 0.5) * np.meshgrid(np.arange(width), np.arange(height)) ) return x_coords, y_coords ## CELLAREAS
[docs]def reggrid_area(lats, lons): """Returns the cell area [m2] for a regular grid based on its cell centres lat, lon coordinates.""" xres = np.abs(np.mean(np.diff(lons))) yres = np.abs(np.mean(np.diff(lats))) area = np.ones((lats.size, lons.size), dtype=lats.dtype) return cellarea(lats, xres, yres)[:, None] * area
[docs]def cellarea(lat, xres=1.0, yres=1.0): """Return the area [m2] of cell based on the cell center latitude and its resolution in measured in degrees.""" l1 = np.radians(lat - np.abs(yres) / 2.0) l2 = np.radians(lat + np.abs(yres) / 2.0) dx = np.radians(np.abs(xres)) return _R**2 * dx * (np.sin(l2) - np.sin(l1))
[docs]def cellres(lat, xres=1.0, yres=1.0): """Return the cell (x, y) resolution [m] based on cell center latitude and its resolution measured in degrees.""" m1 = 111132.92 # latitude calculation term 1 m2 = -559.82 # latitude calculation term 2 m3 = 1.175 # latitude calculation term 3 m4 = -0.0023 # latitude calculation term 4 p1 = 111412.84 # longitude calculation term 1 p2 = -93.5 # longitude calculation term 2 p3 = 0.118 # longitude calculation term 3 radlat = np.radians(lat) # numpy cos work in radians! # Calculate the length of a degree of latitude and longitude in meters dy = ( m1 + (m2 * np.cos(2.0 * radlat)) + (m3 * np.cos(4.0 * radlat)) + (m4 * np.cos(6.0 * radlat)) ) dx = ( (p1 * np.cos(radlat)) + (p2 * np.cos(3.0 * radlat)) + (p3 * np.cos(5.0 * radlat)) ) return dx * xres, dy * yres
## SPREAD
[docs]def spread2d( da_obs: xr.DataArray, da_mask: Optional[xr.DataArray] = None, da_friction: Optional[xr.DataArray] = None, nodata: Optional[float] = None, ) -> xr.Dataset: """Returns values of `da_obs` spreaded to cells with `nodata` value within `da_mask`, powered by :py:meth:`pyflwdir.gis_utils.spread2d` Parameters ---------- da_obs : xarray.DataArray Input raster with observation values and background/nodata values which are filled by the spreading algorithm. da_mask : xarray.DataArray, optional Mask of cells to fill with the spreading algorithm, by default None da_friction : xarray.DataArray, optional Friction values used by the spreading algorithm to calcuate the friction distance, by default None nodata : float, optional Nodata or background value. Must be finite numeric value. If not given the raster nodata value is used. Returns ------- ds_out: xarray.Dataset Dataset with spreaded source values, linear index of the source cell "source_idx" and friction distance to the source cell "source_dst". """ nodata = da_obs.raster.nodata if nodata is None else nodata if nodata is None or np.isnan(nodata): raise ValueError(f'"nodata" must be a finite value, not {nodata}') msk, frc = None, None if da_mask is not None: assert da_obs.raster.identical_grid(da_mask) msk = da_mask.values if da_friction is not None: assert da_obs.raster.identical_grid(da_friction) frc = da_friction.values out, src, dst = gis.spread2d( obs=da_obs.values, msk=msk, frc=frc, nodata=da_obs.raster.nodata if nodata is None else nodata, latlon=da_obs.raster.crs.is_geographic, transform=da_obs.raster.transform, ) # combine outputs and return as dataset dims = da_obs.raster.dims coords = da_obs.raster.coords name = da_obs.name if da_obs.name else "source_value" da_out = xr.DataArray(dims=dims, coords=coords, data=out, name=name) da_out.raster.attrs.update(**da_obs.attrs) # keep attrs incl nodata and unit da_src = xr.DataArray(dims=dims, coords=coords, data=src, name="source_idx") da_src.raster.set_nodata(-1) da_dst = xr.DataArray(dims=dims, coords=coords, data=dst, name="source_dst") da_dst.raster.set_nodata(-1) da_dst.attrs.update(unit="m") ds_out = xr.merge([da_out, da_src, da_dst]) ds_out.raster.set_crs(da_obs.raster.crs) return ds_out
## PCRASTER
[docs]def write_clone(tmpdir, gdal_transform, wkt_projection, shape): """write pcraster clone file to a tmpdir using gdal""" from osgeo import gdal gdal.AllRegister() driver1 = gdal.GetDriverByName("GTiff") driver2 = gdal.GetDriverByName("PCRaster") fn = join(tmpdir, "clone.map") # create temp tif file fn_temp = join(tmpdir, "clone.tif") TempDataset = driver1.Create(fn_temp, shape[1], shape[0], 1, gdal.GDT_Float32) TempDataset.SetGeoTransform(gdal_transform) if wkt_projection is not None: TempDataset.SetProjection(wkt_projection) # TODO set csr # copy to pcraster format outDataset = driver2.CreateCopy(fn, TempDataset, 0) # close and cleanup TempDataset = None outDataset = None return fn
[docs]def write_map( data, raster_path, nodata, transform, crs=None, clone_path=None, pcr_vs="scalar", **kwargs, ): """Write pcraster map files using pcr.report functionality. A PCRaster clone map is written to a temporary directory if not provided. For PCRaster types see https://www.gdal.org/frmt_various.html#PCRaster Parameters ---------- data : ndarray Raster data raster_path : str Path to output map nodata : int, float no data value transform : affine transform Two dimensional affine transform for 2D linear mapping clone_path : str, optional Path to PCRaster clone map, by default None pcr_vs : str, optional pcraster type, by default "scalar" Raises ------ ImportError pcraster package is required ValueError if invalid ldd """ if not _compat.HAS_PCRASTER: raise ImportError("The pcraster package is required to write map files") import tempfile import pcraster as pcr with tempfile.TemporaryDirectory() as tmpdir: # deal with pcr clone map if clone_path is None: clone_path = write_clone( tmpdir, gdal_transform=transform.to_gdal(), wkt_projection=None if crs is None else CRS.from_user_input(crs).wkt, shape=data.shape, ) elif not isfile(clone_path): raise IOError(f'clone_path: "{clone_path}" does not exist') pcr.setclone(clone_path) if nodata is None and pcr_vs != "ldd": raise ValueError("nodata value required to write PCR map") # write to pcrmap if pcr_vs == "ldd": # if d8 convert to ldd data = data.astype(np.uint8) # force dtype if core_d8.isvalid(data): data = core_conversion.d8_to_ldd(data) elif not core_ldd.isvalid(data): raise ValueError("LDD data not understood") mv = int(core_ldd._mv) ldd = pcr.numpy2pcr(pcr.Ldd, data.astype(int), mv) # make sure it is pcr sound # NOTE this should not be necessary pcrmap = pcr.lddrepair(ldd) elif pcr_vs == "bool": pcrmap = pcr.numpy2pcr(pcr.Boolean, data.astype(np.bool), np.bool(nodata)) elif pcr_vs == "scalar": pcrmap = pcr.numpy2pcr(pcr.Scalar, data.astype(float), float(nodata)) elif pcr_vs == "ordinal": pcrmap = pcr.numpy2pcr(pcr.Ordinal, data.astype(int), int(nodata)) elif pcr_vs == "nominal": pcrmap = pcr.numpy2pcr(pcr.Nominal, data.astype(int), int(nodata)) pcr.report(pcrmap, raster_path) # set crs (pcrmap ignores this info from clone ??) if crs is not None: with rasterio.open(raster_path, "r+") as dst: dst.crs = crs
[docs]def create_vrt( vrt_path: str, file_list_path: str = None, files_path: str = None, ): """Creates a .vrt file from a list op raster datasets by either passing the list directly (file_list_path) or by inferring it by passing a path containing wildcards (files_path) of the location(s) of the raster datasets Parameters ---------- vrt_path : str Path of the output vrt file_list_path : str, optional Path to the text file containing the paths to the raster files files_path : str, optional Unix style path containing a pattern using wildcards (*) n.b. this is without an extension e.g. c:\\temp\\*\\*.tif for all tif files in subfolders of 'c:\temp' Raises ------ ValueError A Path is needed, either file_list_path or files_path """ if file_list_path is None and files_path is None: raise ValueError( "Either 'file_list_path' or 'files_path' is required -> None was given" ) outdir = dirname(vrt_path) if not os.path.isdir(outdir): os.makedirs(outdir) if file_list_path is None: files = glob.glob(files_path) if len(files) == 0: raise IOError(f"No files found at {files_path}") file_list_path = join(outdir, "filelist.txt") with open(file_list_path, "w") as w: for line in files: w.write(f"{line}\n") # TODO ability to pass more options # TODO find method to pass dir of gdalbuiltvrt on different OS cmd = ["gdalbuildvrt", "-input_file_list", file_list_path, vrt_path] subprocess.run(cmd) return None