"""Soilgrid workflows for Wflow plugin."""
import logging
from typing import List, Union
import hydromt
import numpy as np
import pandas as pd
import xarray as xr
from scipy.optimize import curve_fit
from . import ptf, soilparams
logger = logging.getLogger(__name__)
__all__ = [
"soilgrids",
"soilgrids_sediment",
"soilgrids_brooks_corey",
"update_soil_with_paddy",
]
# soilgrids_2017
soildepth_cm = np.array([0.0, 5.0, 15.0, 30.0, 60.0, 100.0, 200.0])
soildepth_mm = 10.0 * soildepth_cm
# for 2017 - midpoint and midpoint surface are equal to original
soildepth_cm_midpoint = np.array([0.0, 5.0, 15.0, 30.0, 60.0, 100.0, 200.0])
soildepth_mm_midpoint = 10.0 * soildepth_cm
soildepth_mm_midpoint_surface = 10.0 * soildepth_cm
M_minmax = 10000.0
nodata = -9999.0
# default z [cm] of soil layers wflow_sbm: 10, 40, 120, > 120
# mapping of wflow_sbm parameter c to soil layer SoilGrids
c_sl_index = [2, 4, 6, 7] # v2017 direct mapping
# c_sl_index = [2, 3, 5, 6] #v2021 if direct mapping - not used,
# averages are taken instead.
def concat_layers(
ds: xr.Dataset,
soil_fn: str = "soilgrids",
variables: List[str] = ["bd", "oc", "ph", "clyppt", "sltppt", "sndppt"],
):
"""
Preprocess functions to concat soilgrids along a layer dimension.
Parameters
----------
ds : xarray.Dataset
Dataset containing soil properties.
soil_fn : str
soilgrids version {'soilgrids', 'soilgrids_2020'}
variables : list
List of soil properties to concat.
"""
if soil_fn == "soilgrids_2020":
nb_sl = 6
else:
nb_sl = 7
ds = ds.assign_coords(sl=np.arange(1, nb_sl + 1))
for var in variables:
da_prop = []
for i in np.arange(1, nb_sl + 1):
da_prop.append(ds[f"{var}_sl{i}"])
# remove layer from ds
ds = ds.drop_vars(f"{var}_sl{i}")
da = xr.concat(
da_prop,
pd.Index(np.arange(1, nb_sl + 1, dtype=int), name="sl"),
).transpose("sl", ...)
da.name = var
# add concat maps to ds
ds[f"{var}"] = da
return ds
def average_soillayers_block(ds, soilthickness):
"""
Determine weighted average of soil property at different depths over soil thickness.
Assuming that the properties are computed at the mid-point of the interval and
are considered constant over the whole depth interval (Sousa et al., 2020).
https://doi.org/10.5194/soil-2020-65
This function is used for soilgrids_2020.
Parameters
----------
ds: xarray.Dataset
Dataset containing soil property over each soil depth profile [sl1 - sl6].
soilthickness : xarray.DataArray
Dataset containing soil thickness [cm].
Returns
-------
da_av : xarray.DataArray
Dataset containing weighted average of soil property.
"""
da_sum = soilthickness * 0.0
# set NaN values to 0.0 (to avoid RuntimeWarning in comparison soildepth)
d = soilthickness.fillna(0.0)
for i in ds.sl:
da_sum = da_sum + (
(soildepth_cm[i] - soildepth_cm[i - 1])
* ds.sel(sl=i)
* (d >= soildepth_cm[i])
+ (d - soildepth_cm[i - 1])
* ds.sel(sl=i)
* ((d < soildepth_cm[i]) & (d > soildepth_cm[i - 1]))
)
da_av = xr.apply_ufunc(
lambda x, y: x / y,
da_sum,
soilthickness,
dask="parallelized",
output_dtypes=[float],
keep_attrs=True,
)
da_av.raster.set_nodata(np.nan)
return da_av
def average_soillayers(ds, soilthickness):
"""
Determine weighted average of soil property at different depths over soil thickness.
Using the trapezoidal rule.
See also: Hengl, T., Mendes de Jesus, J., Heuvelink, G. B. M., Ruiperez Gonzalez,
M., Kilibarda, M., Blagotic, A., et al.: SoilGrids250m: \
Global gridded soil information based on machine learning,
PLoS ONE, 12, https://doi.org/10.1371/journal.pone.0169748, 2017.
This function is used for soilgrids (2017).
Parameters
----------
ds: xarray.Dataset
Dataset containing soil property at each soil depth [sl1 - sl7].
soilthickness : xarray.DataArray
Dataset containing soil thickness [cm].
Returns
-------
da_av : xarray.DataArray
Dataset containing the weighted average of the soil property.
"""
da_sum = soilthickness * 0.0
# set NaN values to 0.0 (to avoid RuntimeWarning in comparison soildepth)
d = soilthickness.fillna(0.0)
for i in range(1, len(ds.sl)): # range(1, 7):
da_sum = da_sum + (
(soildepth_cm[i] - soildepth_cm[i - 1])
* (ds.sel(sl=i) + ds.sel(sl=i + 1))
* (d >= soildepth_cm[i])
+ (d - soildepth_cm[i - 1])
* (ds.sel(sl=i) + ds.sel(sl=i + 1))
* ((d < soildepth_cm[i]) & (d > soildepth_cm[i - 1]))
)
da_av = xr.apply_ufunc(
lambda x, y: x * (1 / (y * 2)),
da_sum,
soilthickness,
dask="parallelized",
output_dtypes=[float],
keep_attrs=True,
)
da_av.raster.set_nodata(np.nan)
return da_av
def pore_size_distrution_index_layers(ds, thetas):
"""
Determine pore size distribution index per soil layer depth based on PTF.
Parameters
----------
ds: xarray.Dataset
Dataset containing soil properties at each soil depth [sl1 - sl7].
thetas: xarray.Dataset
Dataset containing thetaS at each soil layer depth.
Returns
-------
ds_out : xarray.Dataset
Dataset containing pore size distribution index [-] for each soil layer
depth.
"""
ds_out = xr.apply_ufunc(
ptf.pore_size_index_brakensiek,
ds["sndppt"],
thetas,
ds["clyppt"],
dask="parallelized",
output_dtypes=[float],
keep_attrs=True,
)
ds_out.name = "pore_size"
ds_out.raster.set_nodata(np.nan)
# ds_out = ds_out.raster.interpolate_na("nearest")
return ds_out
def brooks_corey_layers(
thetas_sl: xr.Dataset,
ds: xr.Dataset,
ds_like: xr.Dataset,
soil_fn: str = "soilgrids",
wflow_layers: List[int] = [100, 300, 800],
soildepth_cm: np.array = np.array([0.0, 5.0, 15.0, 30.0, 60.0, 100.0, 200.0]),
):
"""
Determine Brooks Corey coefficient per wflow soil layer depth.
First pore size distribution index is computed based on theta_s and other soil
parameters and scaled to the model resolution.
Then the Brooks Corey coefficient is computed for each wflow soil layer depth
by weighted averaging the pore size distribution index over the soil thickness.
Parameters
----------
thetas_sl: xarray.Dataset
Dataset containing thetaS at each soil layer depth.
ds: xarray.Dataset
Dataset containing soil properties at each soil depth.
ds_like: xarray.Dataset
Dataset at model resolution for reprojection.
soil_fn: str
soilgrids version {'soilgrids', 'soilgrids_2020'}
wflow_layers: list
List of soil layer depths [cm] for which c is calculated.
soildepth_cm: np.array
Depth of each soil layers [cm].
Returns
-------
ds_c : xarray.Dataset
Dataset containing c for the wflow_sbm soil layers.
"""
# Get pore size distribution index
lambda_sl_hr = pore_size_distrution_index_layers(ds, thetas_sl)
lambda_sl = np.log(lambda_sl_hr)
lambda_sl = lambda_sl.raster.reproject_like(ds_like, method="average")
lambda_sl = np.exp(lambda_sl)
# Brooks Corey coefficient
c_sl = 3.0 + (2.0 / lambda_sl)
c_sl.name = "c_sl"
# Resample for the wflow layers
wflow_thickness = wflow_layers.copy()
# Go from wflow layer thickness to soil depths (cumulative)
wflow_depths = np.cumsum(wflow_thickness)
# Check if the last wflow depth is less than 2000 mm (soilgrids limit)
if wflow_depths[-1] > 2000:
raise ValueError(
"The total depth of the wflow soil layers should be 2000 mm, as the \
soilgrids data does not go deeper than 2 m."
)
# Add a zero for the first depth and 2000 for the last depth
wflow_depths = np.insert(wflow_depths, 0, 0)
wflow_depths = np.append(wflow_depths, 2000)
# Compute the thickness of the last layer
wflow_thickness.append(2000 - wflow_depths[-2])
# Soil data depth
soildepth = soildepth_cm * 10
# make empty dataarray for c for the 4 sbm layers
ds_c = hydromt.raster.full(
coords=c_sl.raster.coords,
nodata=np.nan,
crs=c_sl.raster.crs,
dtype="float32",
name="c",
attrs={"thickness": wflow_thickness, "soil_fn": soil_fn},
)
ds_c = ds_c.expand_dims(dim=dict(layer=np.arange(len(wflow_thickness)))).copy()
# calc weighted average values of c over the sbm soil layers
for nl in range(len(wflow_thickness)):
top_depth = wflow_depths[nl]
bottom_depth = wflow_depths[nl + 1]
c_nl_sum = None
for d in range(len(soildepth) - 1):
if soil_fn == "soilgrids_2020":
c_av = c_sl.sel(sl=d + 1)
else:
c_av = (c_sl.sel(sl=d + 1) + c_sl.sel(sl=d + 2)) / 2
# wflow layer fully within soilgrid layer
if soildepth[d] <= top_depth and soildepth[d + 1] >= bottom_depth:
c_nl = c_av * (bottom_depth - top_depth)
# layer fully within wflow layer
elif soildepth[d] >= top_depth and soildepth[d + 1] <= bottom_depth:
c_nl = c_av * (soildepth[d + 1] - soildepth[d])
# bottom part of the layer wihtin wflow
elif soildepth[d] <= bottom_depth and soildepth[d + 1] >= bottom_depth:
c_nl = c_av * (bottom_depth - soildepth[d])
# top part of the layer within wflow
elif soildepth[d] <= top_depth and soildepth[d + 1] >= top_depth:
c_nl = c_av * (soildepth[d + 1] - top_depth)
# layer outside of wflow layer
else:
c_nl = None
# Add to the sum
if c_nl is not None:
if c_nl_sum is None:
c_nl_sum = c_nl
else:
c_nl_sum = c_nl_sum + c_nl
ds_c.loc[dict(layer=nl)] = c_nl_sum / wflow_thickness[nl]
return ds_c
def kv_layers(ds, thetas, ptf_name):
"""
Determine vertical saturated hydraulic conductivity (KsatVer) per soil layer depth.
Based on PTF.
Parameters
----------
ds: xarray.Dataset
Dataset containing soil properties at each soil depth [sl1 - sl7].
thetas: xarray.Dataset
Dataset containing thetaS at each soil layer depth.
ptf_name : str
PTF to use for calculation KsatVer.
Returns
-------
ds_out : xarray.Dataset
Dataset containing KsatVer [mm/day] for each soil layer depth.
"""
if ptf_name == "brakensiek":
ds_out = xr.apply_ufunc(
ptf.kv_brakensiek,
thetas,
ds["clyppt"],
ds["sndppt"],
dask="parallelized",
output_dtypes=[float],
keep_attrs=True,
)
elif ptf_name == "cosby":
ds_out = xr.apply_ufunc(
ptf.kv_cosby,
ds["clyppt"],
ds["sndppt"],
dask="parallelized",
output_dtypes=[float],
keep_attrs=True,
)
ds_out.name = "kv"
ds_out.raster.set_nodata(np.nan)
return ds_out
def func(x, b):
return np.exp(-b * x)
def do_linalg(x, y):
"""
Apply np.linalg.lstsq and return fitted parameter.
Parameters
----------
x : array_like (float)
“Coefficient” matrix.
y : array_like (float)
dependent variable.
Returns
-------
popt_0 : float
Optimal value for the parameter fit.
"""
idx = ((~np.isinf(np.log(y)))) & ((~np.isnan(y)))
return np.linalg.lstsq(x[idx, np.newaxis], np.log(y[idx]), rcond=None)[0][0]
def do_curve_fit(x, y):
"""
Apply scipy.optimize.curve_fit and return fitted parameter.
If least-squares minimization fails with an inital guess p0 of 1e-3,
and 1e-4, np.linalg.lstsq is used for curve fitting.
Parameters
----------
x : array_like of M length (float)
independent variable.
y : array_like of M length (float)
dependent variable.
Returns
-------
popt_0 : float
Optimal value for the parameter fit.
"""
idx = ((~np.isinf(np.log(y)))) & ((~np.isnan(y)))
if len(y[idx]) == 0:
popt_0 = np.nan
else:
try:
# try curve fitting with certain p0
popt_0 = curve_fit(func, x[idx], y[idx], p0=(1e-3))[0]
except RuntimeError:
try:
# try curve fitting with lower p0
popt_0 = curve_fit(func, x[idx], y[idx], p0=(1e-4))[0]
except RuntimeError:
# do linalg regression instead
popt_0 = np.linalg.lstsq(
x[idx, np.newaxis], np.log(y[idx]), rcond=None
)[0][0]
return popt_0
def constrain_M(M, popt_0, M_minmax):
"""Constrain M parameter with the value M_minmax."""
M = xr.where((M > 0) & (popt_0 == 0), M_minmax, M)
M = xr.where(M > M_minmax, M_minmax, M)
M = xr.where(M < 0, M_minmax, M)
return M
[docs]
def soilgrids(
ds: xr.Dataset,
ds_like: xr.Dataset,
ptfKsatVer: str = "brakensiek",
soil_fn: str = "soilgrids",
wflow_layers: List[int] = [100, 300, 800],
logger=logger,
):
"""
Return soil parameter maps at model resolution.
Based on soil properties from SoilGrids datasets.
Both soilgrids 2017 and 2020 are supported. Soilgrids 2017 provides soil properties
at 7 specific depths, while soilgrids_2020 provides soil properties averaged over
6 depth intervals.
Ref: Hengl, T., Mendes de Jesus, J., Heuvelink, G. B. M., Ruiperez Gonzalez,
M., Kilibarda, M., Blagotic, A., et al.: SoilGrids250m: \
Global gridded soil information based on machine learning,
PLoS ONE, 12, https://doi.org/10.1371/journal.pone.0169748, 2017.
Ref: de Sousa, L.M., Poggio, L., Batjes, N.H., Heuvelink, G., Kempen, B., Riberio,
E. and Rossiter, D., 2020. SoilGrids 2.0: \
producing quality-assessed soil information for the globe. SOIL Discussions, pp.1-37.
https://doi.org/10.5194/soil-2020-65.
The following soil parameter maps are calculated:
- **thetaS** : average saturated soil water content [m3/m3]
- **thetaR** : average residual water content [m3/m3]
- **KsatVer** : vertical saturated hydraulic conductivity at soil surface [mm/day]
- **SoilThickness** : soil thickness [mm]
- **SoilMinThickness** : minimum soil thickness [mm] (equal to SoilThickness)
- **M** : model parameter [mm] that controls exponential decline of KsatVer with \
soil depth
(fitted with curve_fit (scipy.optimize)), bounds of **M** are checked
- **M_** : model parameter [mm] that controls exponential decline of KsatVer with \
soil depth
(fitted with numpy linalg regression), bounds of **M_** are checked
- **M_original** : **M** without checking bounds
- **M_original_** : **M_** without checking bounds
- **f** : scaling parameter controlling the decline of KsatVer [mm-1] \
(fitted with curve_fit (scipy.optimize)), bounds are checked
- **f_** : scaling parameter controlling the decline of KsatVer [mm-1]
(fitted with numpy linalg regression), bounds are checked
- **c_** map: Brooks Corey coefficients [-] based on pore size distribution \
index for the wflow_sbm soil layers.
- **KsatVer_[z]cm** : KsatVer [mm/day] at soil depths [z] of SoilGrids data \
[0.0, 5.0, 15.0, 30.0, 60.0, 100.0, 200.0]
- **wflow_soil** : USDA Soil texture based on percentage clay, silt, sand mapping: \
[1:Clay, 2:Silty Clay, 3:Silty Clay-Loam, 4:Sandy Clay, 5:Sandy Clay-Loam, \
6:Clay-Loam, 7:Silt, 8:Silt-Loam, 9:Loam, 10:Sand, 11: Loamy Sand, 12:Sandy Loam]
Parameters
----------
ds : xarray.Dataset
Dataset containing soil properties.
ds_like : xarray.DataArray
Dataset at model resolution.
ptfKsatVer : str
PTF to use for calculcation KsatVer.
soil_fn : str
soilgrids version {'soilgrids', 'soilgrids_2020'}
wflow_layers : list
List of soil layer depths [cm] for which c is calculated.
Returns
-------
ds_out : xarray.Dataset
Dataset containing gridded soil parameters.
"""
if soil_fn == "soilgrids_2020":
# use midpoints of depth intervals for soilgrids_2020.
soildepth_cm_midpoint = np.array([2.5, 10.0, 22.5, 45.0, 80.0, 150.0])
soildepth_cm_midpoint_surface = np.array([0, 10.0, 22.5, 45.0, 80.0, 150.0])
soildepth_cm = np.array([0.0, 5.0, 15.0, 30.0, 60.0, 100.0, 200.0])
soildepth_mm_midpoint_surface = 10.0 * soildepth_cm_midpoint_surface
else:
soildepth_cm = np.array([0.0, 5.0, 15.0, 30.0, 60.0, 100.0, 200.0])
soildepth_cm_midpoint = np.array([0.0, 5.0, 15.0, 30.0, 60.0, 100.0, 200.0])
soildepth_mm_midpoint_surface = 10.0 * soildepth_cm
ds_out = xr.Dataset(coords=ds_like.raster.coords)
# set nodata values in dataset to NaN (based on soil property SLTPPT at
# first soil layer)
# ds = xr.where(ds["sltppt_sl1"] == ds["sltppt_sl1"].raster.nodata, np.nan, ds)
ds = ds.raster.mask_nodata()
# concat along a sl dimension
ds = concat_layers(ds, soil_fn)
logger.info("calculate and resample thetaS")
thetas_sl = xr.apply_ufunc(
ptf.thetas_toth,
ds["ph"],
ds["bd"],
ds["clyppt"],
ds["sltppt"],
dask="parallelized",
output_dtypes=[float],
keep_attrs=True,
)
if soil_fn == "soilgrids_2020":
thetas = average_soillayers_block(thetas_sl, ds["soilthickness"])
else:
thetas = average_soillayers(thetas_sl, ds["soilthickness"])
thetas = thetas.raster.reproject_like(ds_like, method="average")
ds_out["thetaS"] = thetas.astype(np.float32)
logger.info("calculate and resample thetaR")
thetar_sl = xr.apply_ufunc(
ptf.thetar_rawls_brakensiek,
ds["sndppt"],
ds["clyppt"],
thetas_sl,
dask="parallelized",
output_dtypes=[float],
keep_attrs=True,
)
if soil_fn == "soilgrids_2020":
thetar = average_soillayers_block(thetar_sl, ds["soilthickness"])
else:
thetar = average_soillayers(thetar_sl, ds["soilthickness"])
thetar = thetar.raster.reproject_like(ds_like, method="average")
ds_out["thetaR"] = thetar.astype(np.float32)
soilthickness_hr = ds["soilthickness"]
soilthickness = soilthickness_hr.raster.reproject_like(ds_like, method="average")
# wflow_sbm cannot handle (yet) zero soil thickness
soilthickness = soilthickness.where(soilthickness > 0.0, np.nan)
soilthickness.raster.set_nodata(np.nan)
soilthickness = soilthickness.astype(np.float32)
ds_out["SoilThickness"] = soilthickness * 10.0 # from [cm] to [mm]
ds_out["SoilMinThickness"] = xr.DataArray.copy(ds_out["SoilThickness"], deep=False)
logger.info("calculate and resample KsatVer")
kv_sl_hr = kv_layers(ds, thetas_sl, ptfKsatVer)
kv_sl = np.log(kv_sl_hr)
kv_sl = kv_sl.raster.reproject_like(ds_like, method="average")
kv_sl = np.exp(kv_sl)
logger.info("calculate and resample pore size distribution index")
ds_c = brooks_corey_layers(
thetas_sl=thetas_sl,
ds=ds,
ds_like=ds_like,
soil_fn=soil_fn,
wflow_layers=wflow_layers,
soildepth_cm=soildepth_cm,
)
ds_out["c"] = ds_c
ds_out["KsatVer"] = kv_sl.sel(sl=1).astype(np.float32)
for i, sl in enumerate(kv_sl["sl"]):
kv = kv_sl.sel(sl=sl)
ds_out["KsatVer_" + str(soildepth_cm_midpoint[i]) + "cm"] = kv.astype(
np.float32
)
kv = kv_sl / kv_sl.sel(sl=1)
logger.info("fit z - log(KsatVer) with numpy linalg regression (y = b*x) -> M_")
popt_0_ = xr.apply_ufunc(
do_linalg,
soildepth_mm_midpoint_surface,
kv.compute(),
vectorize=True,
dask="parallelized",
input_core_dims=[["z"], ["sl"]],
output_dtypes=[float],
keep_attrs=True,
)
M_ = (thetas - thetar) / (-popt_0_)
ds_out["M_original_"] = M_.astype(np.float32)
M_ = constrain_M(M_, popt_0_, M_minmax)
ds_out["M_"] = M_.astype(np.float32)
ds_out["f_"] = ((thetas - thetar) / M_).astype(np.float32)
logger.info("fit zi - Ksat with curve_fit (scipy.optimize) -> M")
popt_0 = xr.apply_ufunc(
do_curve_fit,
soildepth_mm_midpoint_surface,
kv.compute(),
vectorize=True,
dask="parallelized",
input_core_dims=[["z"], ["sl"]],
output_dtypes=[float],
keep_attrs=True,
)
M = (thetas - thetar) / (popt_0)
ds_out["M_original"] = M.astype(np.float32)
M = constrain_M(M, popt_0, M_minmax)
ds_out["M"] = M.astype(np.float32)
ds_out["f"] = ((thetas - thetar) / M).astype(np.float32)
# wflow soil map is based on USDA soil classification
# soilmap = ds["tax_usda"].raster.interpolate_na()
# soilmap = soilmap.raster.reproject_like(ds_like, method="mode")
# ds_out["wflow_soil"] = soilmap.astype(np.float32)
# wflow_soil map is based on soil texture calculated with percentage
# sand, silt, clay
# clay, silt percentages averaged over soil thickness
if soil_fn == "soilgrids_2020":
clay_av = average_soillayers_block(ds["clyppt"], ds["soilthickness"])
silt_av = average_soillayers_block(ds["sltppt"], ds["soilthickness"])
else:
clay_av = average_soillayers(ds["clyppt"], ds["soilthickness"])
silt_av = average_soillayers(ds["sltppt"], ds["soilthickness"])
# calc soil texture
soil_texture = xr.apply_ufunc(
ptf.soil_texture_usda,
clay_av,
silt_av,
dask="parallelized",
output_dtypes=[float],
keep_attrs=True,
)
soil_texture = soil_texture.raster.reproject_like(ds_like, method="mode")
# np.nan is not a valid value for array with type integer
ds_out["wflow_soil"] = soil_texture
ds_out["wflow_soil"].raster.set_nodata(0)
# for writing pcraster map files a scalar nodata value is required
dtypes = {"wflow_soil": np.int32}
for var in ds_out:
dtype = dtypes.get(var, np.float32)
logger.debug(f"Interpolate nodata (NaN) values for {var}")
ds_out[var] = ds_out[var].raster.interpolate_na("nearest")
ds_out[var] = ds_out[var].fillna(nodata).astype(dtype)
ds_out[var].raster.set_nodata(np.dtype(dtype).type(nodata))
return ds_out
[docs]
def soilgrids_brooks_corey(
ds: xr.Dataset,
ds_like: xr.Dataset,
soil_fn: str = "soilgrids",
wflow_layers: List[int] = [100, 300, 800],
logger=logger,
):
"""
Determine Brooks Corey coefficient per wflow soil layer depth.
First pore size distribution index is computed based on theta_s and other soil
parameters and scaled to the model resolution.
Then the Brooks Corey coefficient is computed for each wflow soil layer depth
by weighted averaging the pore size distribution index over the soil thickness.
Parameters
----------
ds : xarray.Dataset
Dataset containing soil properties.
ds_like : xarray.DataArray
Dataset at model resolution.
soil_fn : str
soilgrids version {'soilgrids', 'soilgrids_2020'}
wflow_layers : list
List of wflow soil layer depths [cm] over which c is calculated.
Returns
-------
ds_out : xarray.Dataset
Dataset containing c for the wflow_sbm soil layers.
"""
if soil_fn == "soilgrids_2020" or soil_fn == "soilgrids":
soildepth_cm = np.array([0.0, 5.0, 15.0, 30.0, 60.0, 100.0, 200.0])
else:
raise ValueError("Only soilgrids_2020 and soilgrids are supported.")
ds_out = xr.Dataset(coords=ds_like.raster.coords)
ds = ds.raster.mask_nodata()
# concat along a sl dimension
ds = concat_layers(ds, soil_fn)
logger.info("calculate and resample thetaS")
thetas_sl = xr.apply_ufunc(
ptf.thetas_toth,
ds["ph"],
ds["bd"],
ds["clyppt"],
ds["sltppt"],
dask="parallelized",
output_dtypes=[float],
keep_attrs=True,
)
logger.info("calculate and resample pore size distribution index")
# Brooks Corey coefficient
ds_c = brooks_corey_layers(
thetas_sl=thetas_sl,
ds=ds,
ds_like=ds_like,
soil_fn=soil_fn,
wflow_layers=wflow_layers,
soildepth_cm=soildepth_cm,
)
ds_out["c"] = ds_c
return ds_out
[docs]
def soilgrids_sediment(ds, ds_like, usleK_method, logger=logger):
"""
Return soil parameter maps for sediment modelling at model resolution.
Based on soil properties from SoilGrids dataset.
The following soil parameter maps are calculated:
* PercentClay: clay content of the topsoil [%]
* PercentSilt: silt content of the topsoil [%]
* PercentOC: organic carbon in the topsoil [%]
* ErosK: mean detachability of the soil (Morgan et al., 1998) [g/J]
* USLE_K: soil erodibility factor from the USLE equation [-]
Parameters
----------
ds : xarray.Dataset
Dataset containing soil properties.
ds_like : xarray.DataArray
Dataset at model resolution.
usleK_method : str
Method to use for calculation of USLE_K {"renard", "epic"}.
Returns
-------
ds_out : xarray.Dataset
Dataset containing gridded soil parameters for sediment modelling.
"""
ds_out = xr.Dataset(coords=ds_like.raster.coords)
# set nodata values in dataset to NaN (based on soil property SLTPPT at
# first soil layer)
# ds = xr.where(ds["sltppt_sl1"] == ds["sltppt_sl1"].raster.nodata, np.nan, ds)
ds = ds.raster.mask_nodata()
# soil properties
pclay = ds["clyppt_sl1"]
percentclay = pclay.raster.reproject_like(ds_like, method="average")
ds_out["PercentClay"] = percentclay.astype(np.float32)
psilt = ds["sltppt_sl1"]
percentsilt = psilt.raster.reproject_like(ds_like, method="average")
ds_out["PercentSilt"] = percentsilt.astype(np.float32)
poc = ds["oc_sl1"]
percentoc = poc.raster.reproject_like(ds_like, method="average")
ds_out["PercentOC"] = percentoc.astype(np.float32)
# Detachability of the soil
erosK = xr.apply_ufunc(
ptf.ErosK_texture,
pclay,
psilt,
dask="parallelized",
output_dtypes=[float],
keep_attrs=True,
)
erosK = erosK.raster.reproject_like(ds_like, method="average")
ds_out["ErosK"] = erosK.astype(np.float32)
# USLE K parameter
if usleK_method == "renard":
usleK = xr.apply_ufunc(
ptf.UsleK_Renard,
pclay,
psilt,
dask="parallelized",
output_dtypes=[float],
keep_attrs=True,
)
elif usleK_method == "epic":
usleK = xr.apply_ufunc(
ptf.UsleK_EPIC,
pclay,
psilt,
poc,
dask="parallelized",
output_dtypes=[float],
keep_attrs=True,
)
usleK = usleK.raster.reproject_like(ds_like, method="average")
ds_out["USLE_K"] = usleK.astype(np.float32)
# for writing pcraster map files a scalar nodata value is required
for var in ds_out:
ds_out[var] = ds_out[var].raster.interpolate_na("nearest")
logger.info(f"Interpolate NAN values for {var}")
ds_out[var] = ds_out[var].fillna(nodata)
ds_out[var].raster.set_nodata(nodata)
return ds_out
[docs]
def update_soil_with_paddy(
ds: xr.Dataset,
ds_like: xr.Dataset,
paddy_mask: xr.DataArray,
soil_fn: str = "soilgrids",
update_c: bool = True,
wflow_layers: List[int] = [50, 100, 50, 200, 800],
target_conductivity: List[Union[None, int, float]] = [None, None, 5, None, None],
logger=logger,
):
"""
Update c and kvfrac soil properties for paddy fields.
Parameters
----------
ds : xarray.Dataset
Dataset containing soil properties.
ds_like : xarray.DataArray
Dataset at model resolution.
Required variables: wflow_subcatch, KsatVer, f
paddy_mask : xarray.DataArray
Dataset containing paddy fields mask.
soil_fn : str
soilgrids version {'soilgrids', 'soilgrids_2020'}
update_c : bool
Update c based on change in wflow_layers.
wflow_layers : list
List of soil layer depths [cm] for which c is calculated.
target_conductivity : list
Target conductivity for each wflow layer.
Returns
-------
soil_out : xarray.Dataset
Dataset containing updated soil properties.
"""
if len(wflow_layers) != len(target_conductivity):
raise ValueError(
"Lengths of wflow_thicknesslayers and target_conductivity does not match"
)
# Set kvfrac maps, determine the fraction required to reach target_conductivity
# Using to wflow exponential decline to determine the conductivity at the
# required depth Find value at the bottom of the required layer and infer
# required correction factor for that layer Values are only set for locations
# with paddy irrigation, all other cells are set to be equal to 1
logger.info("Adding kvfrac map")
kv0 = ds_like["KsatVer"]
f = ds_like["f"]
kv0_mask = kv0.where(paddy_mask == 1)
f_mask = f.where(paddy_mask == 1)
# update c
if update_c:
ds_out = soilgrids_brooks_corey(
ds=ds,
ds_like=ds_like,
soil_fn=soil_fn,
wflow_layers=wflow_layers,
)
for var in ds_out:
dtype = np.float32
logger.debug(f"Interpolate nodata (NaN) values for {var}")
ds_out[var] = ds_out[var].raster.interpolate_na("nearest")
ds_out[var] = ds_out[var].where(
~ds_like["wflow_subcatch"].raster.mask_nodata().isnull()
)
ds_out[var] = ds_out[var].fillna(-9999).astype(dtype)
ds_out[var].raster.set_nodata(np.dtype(dtype).type(-9999))
# temporarily add the dem to the dataset
ds_out["wflow_dem"] = ds_like["wflow_dem"]
# Compute the kv_frac (should be done after updating c!)
da_kvfrac = soilparams.update_kvfrac(
ds_model=ds_like if not update_c else ds_out,
kv0_mask=kv0_mask,
f_mask=f_mask,
wflow_thicknesslayers=wflow_layers,
target_conductivity=target_conductivity,
)
if update_c:
ds_out["kvfrac"] = da_kvfrac
# Remove wflow_dem
ds_out = ds_out.drop_vars("wflow_dem")
else:
ds_out = da_kvfrac.to_dataset(name="kvfrac")
return ds_out