Source code for hydromt_wflow.pcrm

"""Some pcraster functions to support older models."""
import glob
import logging
import os
import tempfile
from os.path import basename, dirname, isdir, isfile, join
from pathlib import Path

import numpy as np
import pandas as pd
import rasterio
import xarray as xr
from hydromt.io import open_mfraster
from pyflwdir import core_conversion, core_d8, core_ldd
from pyproj import CRS

try:
    import pcraster as pcr

    HAS_PCRASTER = True
except ImportError:
    HAS_PCRASTER = False

logger = logging.getLogger(__name__)

# specify pcraster map types
# NOTE non scalar (float) data types only
PCR_VS_MAP = {
    "wflow_ldd": "ldd",
    "wflow_river": "bool",
    "wflow_streamorder": "ordinal",
    "wflow_gauges": "nominal",  # to avoid large memory usage in pcraster.aguila
    "wflow_subcatch": "nominal",  # idem.
    "wflow_landuse": "nominal",
    "wflow_soil": "nominal",
    "wflow_reservoirareas": "nominal",
    "wflow_reservoirlocs": "nominal",
    "wflow_lakeareas": "nominal",
    "wflow_lakelocs": "nominal",
    "wflow_glacierareas": "nominal",
}

GDAL_DRIVER_CODE_MAP = {
    "asc": "AAIGrid",
    "blx": "BLX",
    "bmp": "BMP",
    "bt": "BT",
    "dat": "ZMap",
    "dem": "USGSDEM",
    "gen": "ADRG",
    "gif": "GIF",
    "gpkg": "GPKG",
    "grd": "NWT_GRD",
    "gsb": "NTv2",
    "gtx": "GTX",
    "hdr": "MFF",
    "hf2": "HF2",
    "hgt": "SRTMHGT",
    "img": "HFA",
    "jpg": "JPEG",
    "kro": "KRO",
    "lcp": "LCP",
    "map": "PCRaster",
    "mbtiles": "MBTiles",
    "mpr/mpl": "ILWIS",
    "ntf": "NITF",
    "pix": "PCIDSK",
    "png": "PNG",
    "pnm": "PNM",
    "rda": "R",
    "rgb": "SGI",
    "rst": "RST",
    "rsw": "RMF",
    "sdat": "SAGA",
    "sqlite": "Rasterlite",
    "ter": "Terragen",
    "tif": "GTiff",
    "vrt": "VRT",
    "xpm": "XPM",
    "xyz": "XYZ",
}
GDAL_EXT_CODE_MAP = {v: k for k, v in GDAL_DRIVER_CODE_MAP.items()}


def write_clone(tmpdir, gdal_transform, wkt_projection, shape):
    """Write pcraster clone file to a tmpdir using gdal."""
    from osgeo import gdal

    gdal.AllRegister()
    driver1 = gdal.GetDriverByName("GTiff")
    driver2 = gdal.GetDriverByName("PCRaster")
    fn = join(tmpdir, "clone.map")
    # create temp tif file
    fn_temp = join(tmpdir, "clone.tif")
    TempDataset = driver1.Create(fn_temp, shape[1], shape[0], 1, gdal.GDT_Float32)
    TempDataset.SetGeoTransform(gdal_transform)
    if wkt_projection is not None:
        TempDataset.SetProjection(wkt_projection)
    # TODO set csr
    # copy to pcraster format
    driver2.CreateCopy(fn, TempDataset, 0)
    # close and cleanup
    TempDataset = None
    return fn


def write_map(
    data,
    raster_path,
    nodata,
    transform,
    crs=None,
    clone_path=None,
    pcr_vs="scalar",
    **kwargs,
):
    """Write pcraster map files using pcr.report functionality.

    A PCRaster clone map is written to a temporary directory if not provided.
    For PCRaster types see https://www.gdal.org/frmt_various.html#PCRaster

    Parameters
    ----------
    data : ndarray
        Raster data
    raster_path : str
        Path to output map
    nodata : int, float
        no data value
    transform : affine transform
        Two dimensional affine transform for 2D linear mapping
    clone_path : str, optional
        Path to PCRaster clone map, by default None
    pcr_vs : str, optional
        pcraster type, by default "scalar"
    **kwargs:
        not used in this function, mainly here for compatability reasons.
    crs:
        The coordinate reference system of the data.


    Raises
    ------
    ImportError
        pcraster package is required
    ValueError
        if invalid ldd
    """
    import tempfile

    with tempfile.TemporaryDirectory() as tmpdir:
        # deal with pcr clone map
        if clone_path is None:
            clone_path = write_clone(
                tmpdir,
                gdal_transform=transform.to_gdal(),
                wkt_projection=None if crs is None else CRS.from_user_input(crs).wkt,
                shape=data.shape,
            )
        elif not isfile(clone_path):
            raise IOError(f'clone_path: "{clone_path}" does not exist')
        pcr.setclone(clone_path)
        if nodata is None and pcr_vs != "ldd":
            raise ValueError("nodata value required to write PCR map")
        # write to pcrmap
        if pcr_vs == "ldd":
            # if d8 convert to ldd
            data = data.astype(np.uint8)  # force dtype
            if core_d8.isvalid(data):
                data = core_conversion.d8_to_ldd(data)
            elif not core_ldd.isvalid(data):
                raise ValueError("LDD data not understood")
            mv = int(core_ldd._mv)
            ldd = pcr.numpy2pcr(pcr.Ldd, data.astype(int), mv)
            # make sure it is pcr sound
            # NOTE this should not be necessary
            pcrmap = pcr.lddrepair(ldd)
        elif pcr_vs == "bool":
            pcrmap = pcr.numpy2pcr(pcr.Boolean, data.astype(bool), np.bool_(nodata))
        elif pcr_vs == "scalar":
            pcrmap = pcr.numpy2pcr(pcr.Scalar, data.astype(float), float(nodata))
        elif pcr_vs == "ordinal":
            pcrmap = pcr.numpy2pcr(pcr.Ordinal, data.astype(int), int(nodata))
        elif pcr_vs == "nominal":
            pcrmap = pcr.numpy2pcr(pcr.Nominal, data.astype(int), int(nodata))
        pcr.report(pcrmap, raster_path)
        # set crs (pcrmap ignores this info from clone ??)
        if crs is not None:
            with rasterio.open(raster_path, "r+") as dst:
                dst.crs = crs


[docs] def read_staticmaps_pcr( root: Path | str, crs: int = 4326, obj: object = None, **kwargs ): """Read and staticmaps at <root/staticmaps> and parse to xarray.""" da = None fn = join(root, "staticmaps", "*.map") fns = glob.glob(fn) if len(fns) == 0: logger.warning(f"No staticmaps found at {fn}") return _staticmaps = open_mfraster(fns, **kwargs) path = join(root, "staticmaps", "clim", "LAI*") if len(glob.glob(path)) > 0: ds_lai = open_mfraster( path, concat=True, concat_dim="time", logger=logger, **kwargs ) lai_key = list(ds_lai.data_vars)[0] _staticmaps["LAI"] = ds_lai[lai_key] # reorganize c_0 etc maps da_c = [] list_c = [v for v in _staticmaps if str(v).startswith("c_")] if len(list_c) > 0: for i, v in enumerate(list_c): da_c.append(_staticmaps[f"c_{i:d}"]) da = xr.concat( da_c, pd.Index(np.arange(len(list_c), dtype=int), name="layer") ).transpose("layer", ...) _staticmaps = _staticmaps.drop_vars(list_c) _staticmaps["c"] = da _staticmaps = _staticmaps.rename({"x": "lon", "y": "lat"}) # add maps to staticmaps if obj is not None: obj.set_grid(_staticmaps) if obj.crs is None: if crs is None: crs = 4326 # default to 4326 obj.set_crs(crs) return _staticmaps
[docs] def write_staticmaps_pcr( staticmaps: xr.Dataset, root: Path | str, ): """Write staticmaps at <root/staticmaps> in PCRaster maps format.""" root = os.path.join(root, "staticmaps") if not isdir(root): os.makedirs(root) profile_kwargs = {} mask = True ds_out = staticmaps if "LAI" in ds_out.data_vars: ds_out = ds_out.rename_vars({"LAI": "clim/LAI"}) if "c" in ds_out.data_vars: for layer in ds_out["layer"]: ds_out[f"c_{layer.item():d}"] = ds_out["c"].sel(layer=layer) ds_out[f"c_{layer.item():d}"].raster.set_nodata(ds_out["c"].raster.nodata) ds_out = ds_out.drop_vars(["c", "layer"]) logger.info("Writing (updated) staticmap files.") # add datatypes for maps with same basenames, e.g. wflow_gauges_grdc pcr_vs_map = PCR_VS_MAP.copy() for var_name in ds_out.raster.vars: base_name = "_".join(var_name.split("_")[:-1]) # clip _<postfix> if base_name in PCR_VS_MAP: pcr_vs_map.update({var_name: PCR_VS_MAP[base_name]}) # ds_out.raster.to_mapstack( # root=join(root, "staticmaps"), # mask=True, # driver="PCRaster", # pcr_vs_map=pcr_vs_map, # logger=logger, # ) ext = GDAL_EXT_CODE_MAP.get("PCRaster") with tempfile.TemporaryDirectory() as tmpdir: clone_path = write_clone( tmpdir, gdal_transform=ds_out.raster.transform.to_gdal(), wkt_projection=None if ds_out.raster.crs is None else ds_out.raster.crs.to_wkt(), shape=ds_out.raster.shape, ) profile_kwargs.update({"clone_path": clone_path}) for var in ds_out.raster.vars: if "/" in var: # variables with in subfolders folders = "/".join(var.split("/")[:-1]) if not isdir(join(root, folders)): os.makedirs(join(root, folders)) var0 = var.split("/")[-1] raster_path = join(root, folders, f"{var0}.{ext}") else: raster_path = join(root, f"{var}.{ext}") profile_kwargs.update({"pcr_vs": pcr_vs_map.get(var, "scalar")}) da_out = ds_out[var].copy() for k in ["height", "width", "count", "transform"]: if k in profile_kwargs: msg = f"{k} will be set based on the DataArray, remove the argument" raise ValueError(msg) if "nodata" in profile_kwargs: da_out.raster.set_nodata(profile_kwargs.pop("nodata")) nodata = da_out.raster.nodata if nodata is not None and not np.isnan(nodata): da_out = da_out.fillna(nodata) elif nodata is None: logger.warning(f"nodata value missing for {raster_path}") if mask and "mask" in da_out.coords and nodata is not None: da_out = da_out.where(da_out.coords["mask"] != 0, nodata) if "crs" in profile_kwargs: da_out.raster.set_crs(profile_kwargs.pop("crs")) # check dimensionality dim0 = da_out.raster.dim0 count = 1 if dim0 is not None: count = da_out[dim0].size da_out = da_out.sortby(dim0) # write for i in range(count): if dim0: bname = basename(raster_path).split(".")[0] bname = f"{bname[:8]:8s}".replace(" ", "0") raster_path = join(dirname(raster_path), f"{bname}.{i+1:03d}") data = da_out.isel({dim0: i}).load().squeeze().data else: data = da_out.load().data write_map( data, raster_path, crs=da_out.raster.crs, transform=da_out.raster.transform, nodata=nodata, **profile_kwargs, )