"""Soilgrid workflows for Wflow plugin."""
import logging
from typing import List
import hydromt
import numpy as np
import pandas as pd
import xarray as xr
from scipy.optimize import curve_fit
from . import ptf, soilparams
logger = logging.getLogger(__name__)
__all__ = [
"soilgrids",
"soilgrids_sediment",
"soilgrids_brooks_corey",
"update_soil_with_paddy",
]
# soilgrids_2017
soildepth_cm = np.array([0.0, 5.0, 15.0, 30.0, 60.0, 100.0, 200.0])
soildepth_mm = 10.0 * soildepth_cm
# for 2017 - midpoint and midpoint surface are equal to original
soildepth_cm_midpoint = np.array([0.0, 5.0, 15.0, 30.0, 60.0, 100.0, 200.0])
soildepth_mm_midpoint = 10.0 * soildepth_cm
soildepth_mm_midpoint_surface = 10.0 * soildepth_cm
M_minmax = 10000.0
nodata = -9999.0
# default z [cm] of soil layers wflow_sbm: 10, 40, 120, > 120
# mapping of wflow_sbm parameter soil_brooks_corey_c to soil layer SoilGrids
c_sl_index = [2, 4, 6, 7] # v2017 direct mapping
# c_sl_index = [2, 3, 5, 6] #v2021 if direct mapping - not used,
# averages are taken instead.
def concat_layers(
ds: xr.Dataset,
soil_fn: str = "soilgrids",
variables: List[str] = ["bd", "oc", "ph", "clyppt", "sltppt", "sndppt"],
):
"""
Preprocess functions to concat soilgrids along a layer dimension.
Parameters
----------
ds : xarray.Dataset
Dataset containing soil properties.
soil_fn : str
soilgrids version {'soilgrids', 'soilgrids_2020'}
variables : list
List of soil properties to concat.
"""
if soil_fn == "soilgrids_2020":
nb_sl = 6
else:
nb_sl = 7
ds = ds.assign_coords(sl=np.arange(1, nb_sl + 1))
for var in variables:
da_prop = []
for i in np.arange(1, nb_sl + 1):
da_prop.append(ds[f"{var}_sl{i}"])
# remove layer from ds
ds = ds.drop_vars(f"{var}_sl{i}")
da = xr.concat(
da_prop,
pd.Index(np.arange(1, nb_sl + 1, dtype=int), name="sl"),
).transpose("sl", ...)
da.name = var
# add concat maps to ds
ds[f"{var}"] = da
return ds
def average_soillayers_block(ds, soilthickness):
"""
Determine weighted average of soil property at different depths over soil thickness.
Assuming that the properties are computed at the mid-point of the interval and
are considered constant over the whole depth interval (Sousa et al., 2020).
https://doi.org/10.5194/soil-2020-65
This function is used for soilgrids_2020.
Parameters
----------
ds: xarray.Dataset
Dataset containing soil property over each soil depth profile [sl1 - sl6].
soilthickness : xarray.DataArray
Dataset containing soil thickness [cm].
Returns
-------
da_av : xarray.DataArray
Dataset containing weighted average of soil property.
"""
da_sum = soilthickness * 0.0
# set NaN values to 0.0 (to avoid RuntimeWarning in comparison soildepth)
d = soilthickness.fillna(0.0)
for i in ds.sl:
da_sum = da_sum + (
(soildepth_cm[i] - soildepth_cm[i - 1])
* ds.sel(sl=i)
* (d >= soildepth_cm[i])
+ (d - soildepth_cm[i - 1])
* ds.sel(sl=i)
* ((d < soildepth_cm[i]) & (d > soildepth_cm[i - 1]))
)
da_av = xr.apply_ufunc(
lambda x, y: x / y,
da_sum,
soilthickness,
dask="parallelized",
output_dtypes=[float],
keep_attrs=True,
)
da_av.raster.set_nodata(np.nan)
return da_av
def average_soillayers(ds, soilthickness):
"""
Determine weighted average of soil property at different depths over soil thickness.
Using the trapezoidal rule.
See also: Hengl, T., Mendes de Jesus, J., Heuvelink, G. B. M., Ruiperez Gonzalez,
M., Kilibarda, M., Blagotic, A., et al.: SoilGrids250m: \
Global gridded soil information based on machine learning,
PLoS ONE, 12, https://doi.org/10.1371/journal.pone.0169748, 2017.
This function is used for soilgrids (2017).
Parameters
----------
ds: xarray.Dataset
Dataset containing soil property at each soil depth [sl1 - sl7].
soilthickness : xarray.DataArray
Dataset containing soil thickness [cm].
Returns
-------
da_av : xarray.DataArray
Dataset containing the weighted average of the soil property.
"""
da_sum = soilthickness * 0.0
# set NaN values to 0.0 (to avoid RuntimeWarning in comparison soildepth)
d = soilthickness.fillna(0.0)
for i in range(1, len(ds.sl)): # range(1, 7):
da_sum = da_sum + (
(soildepth_cm[i] - soildepth_cm[i - 1])
* (ds.sel(sl=i) + ds.sel(sl=i + 1))
* (d >= soildepth_cm[i])
+ (d - soildepth_cm[i - 1])
* (ds.sel(sl=i) + ds.sel(sl=i + 1))
* ((d < soildepth_cm[i]) & (d > soildepth_cm[i - 1]))
)
da_av = xr.apply_ufunc(
lambda x, y: x * (1 / (y * 2)),
da_sum,
soilthickness,
dask="parallelized",
output_dtypes=[float],
keep_attrs=True,
)
da_av.raster.set_nodata(np.nan)
return da_av
def pore_size_distribution_index_layers(ds, thetas):
"""
Determine pore size distribution index per soil layer depth based on PTF.
Parameters
----------
ds: xarray.Dataset
Dataset containing soil properties at each soil depth [sl1 - sl7].
thetas: xarray.Dataset
Dataset containing theta_s at each soil layer depth.
Returns
-------
ds_out : xarray.Dataset
Dataset containing pore size distribution index [-] for each soil layer
depth.
"""
ds_out = xr.apply_ufunc(
ptf.pore_size_index_brakensiek,
ds["sndppt"],
thetas,
ds["clyppt"],
dask="parallelized",
output_dtypes=[float],
keep_attrs=True,
)
ds_out.name = "pore_size"
ds_out.raster.set_nodata(np.nan)
# ds_out = ds_out.raster.interpolate_na("nearest")
return ds_out
def brooks_corey_layers(
thetas_sl: xr.Dataset,
ds: xr.Dataset,
ds_like: xr.Dataset,
soil_fn: str = "soilgrids",
wflow_layers: List[int] = [100, 300, 800],
soildepth_cm: np.array = np.array([0.0, 5.0, 15.0, 30.0, 60.0, 100.0, 200.0]),
):
"""
Determine Brooks Corey coefficient per wflow soil layer depth.
First pore size distribution index is computed based on theta_s and other soil
parameters and scaled to the model resolution.
Then the Brooks Corey coefficient is computed for each wflow soil layer depth
by weighted averaging the pore size distribution index over the soil thickness.
Parameters
----------
thetas_sl: xarray.Dataset
Dataset containing theta_s at each soil layer depth.
ds: xarray.Dataset
Dataset containing soil properties at each soil depth.
ds_like: xarray.Dataset
Dataset at model resolution for reprojection.
soil_fn: str
soilgrids version {'soilgrids', 'soilgrids_2020'}
wflow_layers: list
List of soil layer depths [cm] for which soil_brooks_corey_c is calculated.
soildepth_cm: np.array
Depth of each soil layers [cm].
Returns
-------
ds_c : xarray.Dataset
Dataset containing soil_brooks_corey_c for the wflow_sbm soil layers.
"""
# Get pore size distribution index
lambda_sl_hr = pore_size_distribution_index_layers(ds, thetas_sl)
lambda_sl = np.log(lambda_sl_hr)
lambda_sl = lambda_sl.raster.reproject_like(ds_like, method="average")
lambda_sl = np.exp(lambda_sl)
# Brooks Corey coefficient
c_sl = 3.0 + (2.0 / lambda_sl)
c_sl.name = "c_sl"
# Resample for the wflow layers
wflow_thickness = wflow_layers.copy()
# Go from wflow layer thickness to soil depths (cumulative)
wflow_depths = np.cumsum(wflow_thickness)
# Check if the last wflow depth is less than 2000 mm (soilgrids limit)
if wflow_depths[-1] > 2000:
raise ValueError(
"The total depth of the wflow soil layers should be 2000 mm, as the \
soilgrids data does not go deeper than 2 m."
)
# Add a zero for the first depth and 2000 for the last depth
wflow_depths = np.insert(wflow_depths, 0, 0)
wflow_depths = np.append(wflow_depths, 2000)
# Compute the thickness of the last layer
wflow_thickness.append(2000 - wflow_depths[-2])
# Soil data depth
soildepth = soildepth_cm * 10
# make empty dataarray for soil_brooks_corey_c for the 4 sbm layers
ds_c = hydromt.raster.full(
coords=c_sl.raster.coords,
nodata=np.nan,
crs=c_sl.raster.crs,
dtype="float32",
name="soil_brooks_corey_c",
attrs={"thickness": wflow_thickness, "soil_fn": soil_fn},
)
ds_c = ds_c.expand_dims(dim=dict(layer=np.arange(len(wflow_thickness)))).copy()
# calc weighted average values of soil_brooks_corey_c over the sbm soil layers
for nl in range(len(wflow_thickness)):
top_depth = wflow_depths[nl]
bottom_depth = wflow_depths[nl + 1]
c_nl_sum = None
for d in range(len(soildepth) - 1):
if soil_fn == "soilgrids_2020":
c_av = c_sl.sel(sl=d + 1)
else:
c_av = (c_sl.sel(sl=d + 1) + c_sl.sel(sl=d + 2)) / 2
# wflow layer fully within soilgrid layer
if soildepth[d] <= top_depth and soildepth[d + 1] >= bottom_depth:
c_nl = c_av * (bottom_depth - top_depth)
# layer fully within wflow layer
elif soildepth[d] >= top_depth and soildepth[d + 1] <= bottom_depth:
c_nl = c_av * (soildepth[d + 1] - soildepth[d])
# bottom part of the layer within wflow
elif soildepth[d] <= bottom_depth and soildepth[d + 1] >= bottom_depth:
c_nl = c_av * (bottom_depth - soildepth[d])
# top part of the layer within wflow
elif soildepth[d] <= top_depth and soildepth[d + 1] >= top_depth:
c_nl = c_av * (soildepth[d + 1] - top_depth)
# layer outside of wflow layer
else:
c_nl = None
# Add to the sum
if c_nl is not None:
if c_nl_sum is None:
c_nl_sum = c_nl
else:
c_nl_sum = c_nl_sum + c_nl
ds_c.loc[dict(layer=nl)] = c_nl_sum / wflow_thickness[nl]
return ds_c
def kv_layers(ds, thetas, ptf_name):
"""
Determine vertical saturated hydraulic conductivity per soil layer depth.
Based on PTF.
Parameters
----------
ds: xarray.Dataset
Dataset containing soil properties at each soil depth [sl1 - sl7].
thetas: xarray.Dataset
Dataset containing theta_s at each soil layer depth.
ptf_name : str
PTF to use for calculation ksat_vertical .
Returns
-------
ds_out : xarray.Dataset
Dataset containing ksat_vertical [mm/day] for each soil layer depth.
"""
if ptf_name == "brakensiek":
ds_out = xr.apply_ufunc(
ptf.kv_brakensiek,
thetas,
ds["clyppt"],
ds["sndppt"],
dask="parallelized",
output_dtypes=[float],
keep_attrs=True,
)
elif ptf_name == "cosby":
ds_out = xr.apply_ufunc(
ptf.kv_cosby,
ds["clyppt"],
ds["sndppt"],
dask="parallelized",
output_dtypes=[float],
keep_attrs=True,
)
ds_out.name = "kv"
ds_out.raster.set_nodata(np.nan)
return ds_out
def func(x, b):
return np.exp(-b * x)
def do_linalg(x, y):
"""
Apply np.linalg.lstsq and return fitted parameter.
Parameters
----------
x : array_like (float)
“Coefficient” matrix.
y : array_like (float)
dependent variable.
Returns
-------
popt_0 : float
Optimal value for the parameter fit.
"""
idx = (~np.isinf(np.log(y))) & (~np.isnan(y))
return np.linalg.lstsq(x[idx, np.newaxis], np.log(y[idx]), rcond=None)[0][0]
def do_curve_fit(x, y):
"""
Apply scipy.optimize.curve_fit and return fitted parameter.
If least-squares minimization fails with an initial guess p0 of 1e-3,
and 1e-4, np.linalg.lstsq is used for curve fitting.
Parameters
----------
x : array_like of M length (float)
independent variable.
y : array_like of M length (float)
dependent variable.
Returns
-------
popt_0 : float
Optimal value for the parameter fit.
"""
idx = (~np.isinf(np.log(y))) & (~np.isnan(y))
if len(y[idx]) == 0:
popt_0 = np.nan
else:
try:
# try curve fitting with certain p0
popt_0 = curve_fit(func, x[idx], y[idx], p0=(1e-3))[0]
except RuntimeError:
try:
# try curve fitting with lower p0
popt_0 = curve_fit(func, x[idx], y[idx], p0=(1e-4))[0]
except RuntimeError:
# do linalg regression instead
popt_0 = np.linalg.lstsq(
x[idx, np.newaxis], np.log(y[idx]), rcond=None
)[0][0]
return popt_0
def constrain_M(M, popt_0, M_minmax):
"""Constrain M parameter with the value M_minmax."""
M = xr.where((M > 0) & (popt_0 == 0), M_minmax, M)
M = xr.where(M > M_minmax, M_minmax, M)
M = xr.where(M < 0, M_minmax, M)
return M
[docs]
def soilgrids(
ds: xr.Dataset,
ds_like: xr.Dataset,
ptfKsatVer: str = "brakensiek",
soil_fn: str = "soilgrids",
wflow_layers: List[int] = [100, 300, 800],
logger=logger,
):
"""
Return soil parameter maps at model resolution.
Based on soil properties from SoilGrids datasets.
Both soilgrids 2017 and 2020 are supported. Soilgrids 2017 provides soil properties
at 7 specific depths, while soilgrids_2020 provides soil properties averaged over
6 depth intervals.
Ref: Hengl, T., Mendes de Jesus, J., Heuvelink, G. B. M., Ruiperez Gonzalez,
M., Kilibarda, M., Blagotic, A., et al.: SoilGrids250m: \
Global gridded soil information based on machine learning,
PLoS ONE, 12, https://doi.org/10.1371/journal.pone.0169748, 2017.
Ref: de Sousa, L.M., Poggio, L., Batjes, N.H., Heuvelink, G., Kempen, B., Riberio,
E. and Rossiter, D., 2020. SoilGrids 2.0: \
producing quality-assessed soil information for the globe. SOIL Discussions, pp.1-37.
https://doi.org/10.5194/soil-2020-65.
A ``soil_mapping`` table can optionally be provided to derive parameters based
on soil texture classes. A default table *soil_mapping_default* is available
to derive the infiltration capacity of the soil.
The following soil parameter maps are calculated:
- **theta_s** : average saturated soil water content [m3/m3]
- **theta_r** : average residual water content [m3/m3]
- **ksat_vertical** : vertical saturated hydraulic conductivity at soil \
surface [mm/day]
- **soil_thickness** : soil thickness [mm]
- **f** : scaling parameter controlling the decline of ksat_vertical [mm-1] \
(fitted with curve_fit (scipy.optimize)), bounds are checked
- **soil_f_** : scaling parameter controlling the decline of ksat_vertical [mm-1] \
(fitted with numpy linalg regression), bounds are checked
- **soil_brooks_corey_c_** map: Brooks Corey coefficients [-] based on pore size \
distribution index for the wflow_sbm soil layers.
- **meta_{soil_fn}_ksat_vertical_[z]cm** : ksat vertical [mm/day] at soil depths \
[z] of SoilGrids data [0.0, 5.0, 15.0, 30.0, 60.0, 100.0, 200.0]
- **meta_soil_texture** : USDA Soil texture based on percentage clay, silt, \
sand mapping: [1:Clay, 2:Silty Clay, 3:Silty Clay-Loam, 4:Sandy Clay, \
5:Sandy Clay-Loam, 6:Clay-Loam, 7:Silt, 8:Silt-Loam, 9:Loam, 10:Sand, 11: Loamy Sand, \
12:Sandy Loam]
Parameters
----------
ds : xarray.Dataset
Dataset containing soil properties.
ds_like : xarray.DataArray
Dataset at model resolution.
ptfKsatVer : str
PTF to use for calculation ksat_vertical .
soil_fn : str
soilgrids version {'soilgrids', 'soilgrids_2020'}
wflow_layers : list
List of soil layer depths [cm] for which soil_brooks_corey_c is calculated.
Returns
-------
ds_out : xarray.Dataset
Dataset containing gridded soil parameters.
"""
if soil_fn == "soilgrids_2020":
# use midpoints of depth intervals for soilgrids_2020.
soildepth_cm_midpoint = np.array([2.5, 10.0, 22.5, 45.0, 80.0, 150.0])
soildepth_cm_midpoint_surface = np.array([0, 10.0, 22.5, 45.0, 80.0, 150.0])
soildepth_cm = np.array([0.0, 5.0, 15.0, 30.0, 60.0, 100.0, 200.0])
soildepth_mm_midpoint_surface = 10.0 * soildepth_cm_midpoint_surface
else:
soildepth_cm = np.array([0.0, 5.0, 15.0, 30.0, 60.0, 100.0, 200.0])
soildepth_cm_midpoint = np.array([0.0, 5.0, 15.0, 30.0, 60.0, 100.0, 200.0])
soildepth_mm_midpoint_surface = 10.0 * soildepth_cm
ds_out = xr.Dataset(coords=ds_like.raster.coords)
# set nodata values in dataset to NaN (based on soil property SLTPPT at
# first soil layer)
# ds = xr.where(ds["sltppt_sl1"] == ds["sltppt_sl1"].raster.nodata, np.nan, ds)
ds = ds.raster.mask_nodata()
# concat along a sl dimension
ds = concat_layers(ds, soil_fn)
logger.info("calculate and resample theta_s")
thetas_sl = xr.apply_ufunc(
ptf.thetas_toth,
ds["ph"],
ds["bd"],
ds["clyppt"],
ds["sltppt"],
dask="parallelized",
output_dtypes=[float],
keep_attrs=True,
)
if soil_fn == "soilgrids_2020":
thetas = average_soillayers_block(thetas_sl, ds["soilthickness"])
else:
thetas = average_soillayers(thetas_sl, ds["soilthickness"])
thetas = thetas.raster.reproject_like(ds_like, method="average")
ds_out["theta_s"] = thetas.astype(np.float32)
logger.info("calculate and resample theta_r")
thetar_sl = xr.apply_ufunc(
ptf.thetar_rawls_brakensiek,
ds["sndppt"],
ds["clyppt"],
thetas_sl,
dask="parallelized",
output_dtypes=[float],
keep_attrs=True,
)
if soil_fn == "soilgrids_2020":
thetar = average_soillayers_block(thetar_sl, ds["soilthickness"])
else:
thetar = average_soillayers(thetar_sl, ds["soilthickness"])
thetar = thetar.raster.reproject_like(ds_like, method="average")
ds_out["theta_r"] = thetar.astype(np.float32)
soilthickness_hr = ds["soilthickness"]
soilthickness = soilthickness_hr.raster.reproject_like(ds_like, method="average")
# wflow_sbm cannot handle (yet) zero soil thickness
soilthickness = soilthickness.where(soilthickness > 0.0, np.nan)
soilthickness.raster.set_nodata(np.nan)
soilthickness = soilthickness.astype(np.float32)
ds_out["soil_thickness"] = soilthickness * 10.0 # from [cm] to [mm]
logger.info("calculate and resample ksat_vertical ")
kv_sl_hr = kv_layers(ds, thetas_sl, ptfKsatVer)
kv_sl = np.log(kv_sl_hr)
kv_sl = kv_sl.raster.reproject_like(ds_like, method="average")
kv_sl = np.exp(kv_sl)
logger.info("calculate and resample pore size distribution index")
ds_c = brooks_corey_layers(
thetas_sl=thetas_sl,
ds=ds,
ds_like=ds_like,
soil_fn=soil_fn,
wflow_layers=wflow_layers,
soildepth_cm=soildepth_cm,
)
ds_out["soil_brooks_corey_c"] = ds_c
ds_out["ksat_vertical"] = kv_sl.sel(sl=1).astype(np.float32)
for i, sl in enumerate(kv_sl["sl"]):
kv = kv_sl.sel(sl=sl)
ds_out[
f"meta_{soil_fn}_ksat_vertical_" + str(soildepth_cm_midpoint[i]) + "cm"
] = kv.astype(np.float32)
kv = kv_sl / kv_sl.sel(sl=1)
logger.info(
"fit z - log(ksat_vertical ) with numpy linalg regression (y = b*x) -> M_"
)
popt_0_ = xr.apply_ufunc(
do_linalg,
soildepth_mm_midpoint_surface,
kv.compute(),
vectorize=True,
dask="parallelized",
input_core_dims=[["z"], ["sl"]],
output_dtypes=[float],
keep_attrs=True,
)
M_ = (thetas - thetar) / (-popt_0_)
M_ = constrain_M(M_, popt_0_, M_minmax)
ds_out["soil_f_"] = ((thetas - thetar) / M_).astype(np.float32)
logger.info("fit zi - Ksat with curve_fit (scipy.optimize) -> M")
popt_0 = xr.apply_ufunc(
do_curve_fit,
soildepth_mm_midpoint_surface,
kv.compute(),
vectorize=True,
dask="parallelized",
input_core_dims=[["z"], ["sl"]],
output_dtypes=[float],
keep_attrs=True,
)
M = (thetas - thetar) / (popt_0)
M = constrain_M(M, popt_0, M_minmax)
ds_out["f"] = ((thetas - thetar) / M).astype(np.float32)
# wflow soil map is based on USDA soil classification
# soilmap = ds["tax_usda"].raster.interpolate_na()
# soilmap = soilmap.raster.reproject_like(ds_like, method="mode")
# ds_out["wflow_soil"] = soilmap.astype(np.float32)
# wflow_soil map is based on soil texture calculated with percentage
# sand, silt, clay
# clay, silt percentages averaged over soil thickness
if soil_fn == "soilgrids_2020":
clay_av = average_soillayers_block(ds["clyppt"], ds["soilthickness"])
silt_av = average_soillayers_block(ds["sltppt"], ds["soilthickness"])
else:
clay_av = average_soillayers(ds["clyppt"], ds["soilthickness"])
silt_av = average_soillayers(ds["sltppt"], ds["soilthickness"])
# calc soil texture
soil_texture = xr.apply_ufunc(
ptf.soil_texture_usda,
clay_av,
silt_av,
dask="parallelized",
output_dtypes=[float],
keep_attrs=True,
)
soil_texture_out = soil_texture.raster.reproject_like(ds_like, method="mode")
# np.nan is not a valid value for array with type integer
ds_out["meta_soil_texture"] = soil_texture_out
ds_out["meta_soil_texture"].raster.set_nodata(0)
dtypes = {"meta_soil_texture": np.int32}
for var in ds_out:
dtype = dtypes.get(var, np.float32)
logger.debug(f"Interpolate nodata (NaN) values for {var}")
ds_out[var] = ds_out[var].raster.interpolate_na("nearest")
ds_out[var] = ds_out[var].fillna(nodata).astype(dtype)
ds_out[var].raster.set_nodata(np.dtype(dtype).type(nodata))
return ds_out
[docs]
def soilgrids_brooks_corey(
ds: xr.Dataset,
ds_like: xr.Dataset,
soil_fn: str = "soilgrids",
wflow_layers: List[int] = [100, 300, 800],
logger=logger,
):
"""
Determine Brooks Corey coefficient per wflow soil layer depth.
First pore size distribution index is computed based on theta_s and other soil
parameters and scaled to the model resolution.
Then the Brooks Corey coefficient is computed for each wflow soil layer depth
by weighted averaging the pore size distribution index over the soil thickness.
Parameters
----------
ds : xarray.Dataset
Dataset containing soil properties.
ds_like : xarray.DataArray
Dataset at model resolution.
soil_fn : str
soilgrids version {'soilgrids', 'soilgrids_2020'}
wflow_layers : list
List of wflow soil layer depths [cm] over which soil_brooks_corey_c is
calculated.
Returns
-------
ds_out : xarray.Dataset
Dataset containing soil_brooks_corey_c for the wflow_sbm soil layers.
"""
if soil_fn == "soilgrids_2020" or soil_fn == "soilgrids":
soildepth_cm = np.array([0.0, 5.0, 15.0, 30.0, 60.0, 100.0, 200.0])
else:
raise ValueError("Only soilgrids_2020 and soilgrids are supported.")
ds_out = xr.Dataset(coords=ds_like.raster.coords)
ds = ds.raster.mask_nodata()
# concat along a sl dimension
ds = concat_layers(ds, soil_fn)
logger.info("calculate and resample theta_s")
thetas_sl = xr.apply_ufunc(
ptf.thetas_toth,
ds["ph"],
ds["bd"],
ds["clyppt"],
ds["sltppt"],
dask="parallelized",
output_dtypes=[float],
keep_attrs=True,
)
logger.info("calculate and resample pore size distribution index")
# Brooks Corey coefficient
ds_c = brooks_corey_layers(
thetas_sl=thetas_sl,
ds=ds,
ds_like=ds_like,
soil_fn=soil_fn,
wflow_layers=wflow_layers,
soildepth_cm=soildepth_cm,
)
ds_out["soil_brooks_corey_c"] = ds_c
return ds_out
[docs]
def soilgrids_sediment(
ds: xr.Dataset,
ds_like: xr.Dataset,
usle_k_method: str = "renard",
add_aggregates: bool = True,
logger=logger,
) -> xr.Dataset:
"""
Return soil parameter maps for sediment modelling at model resolution.
Based on soil properties from SoilGrids dataset.
Sediment size distribution and addition of small and large aggregates can be
estimated from primary particle size distribution with Foster et al. (1980).
USLE K factor can be computed from the soil data using Renard or EPIC methods.
Calculation of D50 and fraction of fine and very fine sand (fvfs) from
Fooladmand et al, 2006.
The following soil parameter maps are calculated:
* soil_clay_fraction: clay content of the topsoil [g/g]
* soil_silt_fraction: silt content of the topsoil [g/g]
* soil_sand_fraction: sand content of the topsoil [g/g]
* soil_sagg_fraction: small aggregate content of the topsoil [g/g]
* soil_lagg_fraction: large aggregate content of the topsoil [g/g]
* erosion_soil_detachability: mean detachability of the soil (Morgan et al., 1998)
[g/J]
* usle_k: soil erodibility factor from the USLE equation [-]
* soil_sediment_d50: median sediment diameter of the soil [mm]
* land_govers_c: Govers factor for overland flow transport capacity [-]
* land_govers_n: Govers exponent for overland flow transport capacity [-]
Parameters
----------
ds : xarray.Dataset
Dataset containing soil properties.
ds_like : xarray.DataArray
Dataset at model resolution.
usle_k_method : str
Method to use for calculation of USLE_K {"renard", "epic"}.
add_aggregates : bool
Add small and large aggregates to the soil properties. Default is True.
Returns
-------
ds_out : xarray.Dataset
Dataset containing gridded soil parameters for sediment modelling.
"""
ds_out = xr.Dataset(coords=ds_like.raster.coords)
# set nodata values in dataset to NaN (based on soil property SLTPPT at
# first soil layer)
# ds = xr.where(ds["sltppt_sl1"] == ds["sltppt_sl1"].raster.nodata, np.nan, ds)
ds = ds.raster.mask_nodata()
# soil properties
pclay = ds["clyppt_sl1"]
psilt = ds["sltppt_sl1"]
poc = ds["oc_sl1"]
# Detachability of the soil
erosK = xr.apply_ufunc(
ptf.ErosK_texture,
pclay,
psilt,
dask="parallelized",
output_dtypes=[float],
keep_attrs=True,
)
erosK = erosK.raster.reproject_like(ds_like, method="average")
ds_out["erosion_soil_detachability"] = erosK.astype(np.float32)
# USLE K parameter
if usle_k_method == "renard":
usle_k = xr.apply_ufunc(
ptf.UsleK_Renard,
pclay,
psilt,
dask="parallelized",
output_dtypes=[float],
keep_attrs=True,
)
elif usle_k_method == "epic":
usle_k = xr.apply_ufunc(
ptf.UsleK_EPIC,
pclay,
psilt,
poc,
dask="parallelized",
output_dtypes=[float],
keep_attrs=True,
)
usle_k = usle_k.raster.reproject_like(ds_like, method="average")
ds_out["usle_k"] = usle_k.astype(np.float32)
# Mean diameter of the soil
d50 = xr.apply_ufunc(
ptf.mean_diameter_soil,
pclay,
psilt,
dask="parallelized",
output_dtypes=[float],
keep_attrs=True,
)
ds_out["soil_sediment_d50"] = d50.raster.reproject_like(
ds_like, method="average"
).astype(np.float32)
# Govers factor and exponent for overland flow transport capacity
c_govers = ((d50 * 1000 + 5) / 0.32) ** (-0.6)
n_govers = ((d50 * 1000 + 5) / 300) ** (0.25)
ds_out["land_govers_c"] = c_govers.raster.reproject_like(
ds_like, method="average"
).astype(np.float32)
ds_out["land_govers_n"] = n_govers.raster.reproject_like(
ds_like, method="average"
).astype(np.float32)
# Sediment size distribution
if add_aggregates:
fclay = 0.20 * pclay / 100
fsilt = 0.13 * psilt / 100
fsand = (1 - pclay / 100 - psilt / 100) * (1 - pclay / 100) ** 2.4
fsagg = 0.28 * (pclay / 100 - 0.25) + 0.5
fsagg = fsagg.where(pclay < 50, 0.57)
fsagg = fsagg.where(pclay > 25, 2 * pclay / 100)
else:
fclay = pclay / 100
fsilt = psilt / 100
fsagg = fclay * 0.0
# Reproject to model resolution
fclay = fclay.raster.reproject_like(ds_like, method="average")
ds_out["soil_clay_fraction"] = fclay.astype(np.float32)
fsilt = fsilt.raster.reproject_like(ds_like, method="average")
ds_out["soil_silt_fraction"] = fsilt.astype(np.float32)
fsagg = fsagg.raster.reproject_like(ds_like, method="average")
ds_out["soil_sagg_fraction"] = fsagg.astype(np.float32)
# Make sure that the sum of the percentages is 100
if add_aggregates:
fsand = fsand.raster.reproject_like(ds_like, method="average").astype(
np.float32
)
ds_out["soil_sand_fraction"] = fsand
ds_out["soil_lagg_fraction"] = (1 - fclay - fsilt - fsand - fsagg).astype(
np.float32
)
else:
ds_out["soil_sand_fraction"] = (1 - fclay - fsilt).astype(np.float32)
ds_out["soil_lagg_fraction"] = (fclay * 0.0).astype(np.float32)
for var in ds_out:
ds_out[var] = ds_out[var].raster.interpolate_na("nearest")
logger.info(f"Interpolate NAN values for {var}")
ds_out[var] = ds_out[var].fillna(nodata)
ds_out[var].raster.set_nodata(nodata)
return ds_out
[docs]
def update_soil_with_paddy(
ds: xr.Dataset,
ds_like: xr.Dataset,
paddy_mask: xr.DataArray,
soil_fn: str = "soilgrids",
update_c: bool = True,
wflow_layers: List[int] = [50, 100, 50, 200, 800],
target_conductivity: List[None | int | float] = [None, None, 5, None, None],
logger=logger,
):
"""
Update soil_brooks_corey_c and soil_ksat_vertical_factor for paddy fields.
Parameters
----------
ds : xarray.Dataset
Dataset containing soil properties.
ds_like : xarray.DataArray
Dataset at model resolution.
Required variables: basins, ksat_vertical, f, elevtn, soil_brooks_corey_c
paddy_mask : xarray.DataArray
Dataset containing paddy fields mask.
soil_fn : str
soilgrids version {'soilgrids', 'soilgrids_2020'}
update_c : bool
Update soil_brooks_corey_c based on change in wflow_layers.
wflow_layers : list
List of soil layer depths [cm] for which soil_brooks_corey_c is calculated.
target_conductivity : list
Target conductivity for each wflow layer.
Returns
-------
soil_out : xarray.Dataset
Dataset containing updated soil properties.
"""
if len(wflow_layers) != len(target_conductivity):
raise ValueError(
"Lengths of wflow_thicknesslayers and target_conductivity does not match"
)
# Set soil_ksat_vertical_factor maps
# Determine the fraction required to reach target_conductivity
# Using to wflow exponential decline to determine the conductivity at the
# required depth Find value at the bottom of the required layer and infer
# required correction factor for that layer Values are only set for locations
# with paddy irrigation, all other cells are set to be equal to 1
logger.info("Adding soil_ksat_vertical_factor map")
kv0 = ds_like["ksat_vertical"]
f = ds_like["f"]
kv0_mask = kv0.where(paddy_mask == 1)
f_mask = f.where(paddy_mask == 1)
# update soil_brooks_corey_c
if update_c:
ds_out = soilgrids_brooks_corey(
ds=ds,
ds_like=ds_like,
soil_fn=soil_fn,
wflow_layers=wflow_layers,
)
for var in ds_out:
dtype = np.float32
logger.debug(f"Interpolate nodata (NaN) values for {var}")
ds_out[var] = ds_out[var].raster.interpolate_na("nearest")
ds_out[var] = ds_out[var].where(
~ds_like["basins"].raster.mask_nodata().isnull()
)
ds_out[var] = ds_out[var].fillna(-9999).astype(dtype)
ds_out[var].raster.set_nodata(np.dtype(dtype).type(-9999))
# temporarily add the dem to the dataset
ds_out["elevtn"] = ds_like["elevtn"]
# Compute the kv_frac (should be done after updating soil_brooks_corey_c!)
da_kvfrac = soilparams.update_kvfrac(
ds_model=ds_like if not update_c else ds_out,
kv0_mask=kv0_mask,
f_mask=f_mask,
wflow_thicknesslayers=wflow_layers,
target_conductivity=target_conductivity,
)
if update_c:
ds_out["soil_ksat_vertical_factor"] = da_kvfrac
# Remove wflow_dem
ds_out = ds_out.drop_vars("elevtn")
else:
ds_out = da_kvfrac.to_dataset(name="soil_ksat_vertical_factor")
return ds_out