"""Workflow for wflow model states."""
from typing import Dict, Tuple
import numpy as np
import pandas as pd
import xarray as xr
from hydromt.raster import full_like
from hydromt.workflows.grid import grid_from_constant
from ..utils import get_grid_from_config
__all__ = ["prepare_cold_states"]
[docs]
def prepare_cold_states(
ds_like: xr.Dataset,
config: dict,
timestamp: str = None,
) -> Tuple[xr.Dataset, Dict[str, str]]:
"""
Prepare cold states for Wflow.
Compute cold states variables:
* **satwaterdepth**: saturated store [mm]
* **snow**: snow storage [mm]
* **tsoil**: top soil temperature [°C]
* **ustorelayerdepth**: amount of water in the unsaturated store, per layer [mm]
* **snowwater**: liquid water content in the snow pack [mm]
* **canopystorage**: canopy storage [mm]
* **q_river**: river discharge [m3/s]
* **h_river**: river water level [m]
* **h_av_river**: river average water level [m]
* **ssf**: subsurface flow [m3/d]
* **h_land**: land water level [m]
* **h_av_land**: land average water level[m]
* **q_land** or **qx_land**+**qy_land**: overland flow for kinwave [m3/s] or
overland flow in x/y directions for local-inertial [m3/s]
If lakes, also adds:
* **waterlevel_lake**: lake water level [m]
If reservoirs, also adds:
* **volume_reservoir**: reservoir volume [m3]
If glaciers, also adds:
* **glacierstore**: water within the glacier [mm]
Parameters
----------
ds_like : xr.Dataset
Dataset containing the staticmaps grid and variables to prepare some of the
states.
* Required variables: wflow_subcatch, wflow_river
* Other required variables (exact name from the wflow config): c, soilthickness,
theta_s, theta_r, kv_0, f, slope, ksathorfrac
* Optional variables (exact name from the wflow config): reservoir.locs,
glacierstore, reservoir.maxvolume, reservoir.targetfullfrac,
lake.waterlevel
config : dict
Wflow configuration dictionary.
timestamp : str, optional
Timestamp of the cold states. By default uses the starttime
from the config.
Returns
-------
xr.Dataset
Dataset containing the cold states.
dict
Config dictionary with the cold states variable names.
"""
# Defaults
nodata = -9999.0
dtype = "float32"
# Times
if timestamp is None:
# states time = starttime for Wflow.jl > 0.7.0)
starttime = pd.to_datetime(config.get("starttime", None))
timestamp = starttime # - pd.Timedelta(seconds=timestepsecs)
if timestamp is None:
raise ValueError("No timestamp provided and no starttime in config.")
else:
timestamp = pd.to_datetime(timestamp)
timestepsecs = config.get("timestepsecs", 86400)
# Create empty dataset
ds_out = xr.Dataset()
# Base output config dict for states
states_config = {
"state.vertical.satwaterdepth": "satwaterdepth",
"state.vertical.snow": "snow",
"state.vertical.tsoil": "tsoil",
"state.vertical.ustorelayerdepth": "ustorelayerdepth",
"state.vertical.snowwater": "snowwater",
"state.vertical.canopystorage": "canopystorage",
"state.lateral.subsurface.ssf": "ssf",
"state.lateral.land.h": "h_land",
"state.lateral.land.h_av": "h_av_land",
"state.lateral.river.q": "q_river",
"state.lateral.river.h": "h_river",
"state.lateral.river.h_av": "h_av_river",
}
# Map with constant values or zeros for basin
zeromap = ["tsoil", "snow", "snowwater", "canopystorage", "h_land", "h_av_land"]
land_routing = config["model"].get("land_routing", "kinematic-wave")
if land_routing == "local-inertial":
zeromap.extend(["qx_land", "qy_land"])
states_config["state.lateral.land.qx"] = "qx_land"
states_config["state.lateral.land.qy"] = "qy_land"
else:
zeromap.extend(["q_land"])
states_config["state.lateral.land.q"] = "q_land"
for var in zeromap:
if var == "tsoil":
value = 10.0
else:
value = 0.0
da_param = grid_from_constant(
ds_like,
constant=value,
name=var,
dtype=dtype,
nodata=nodata,
mask_name="wflow_subcatch",
)
ds_out[var] = da_param
# ustorelayerdepth (zero per layer)
# layers are based on c parameter
c = get_grid_from_config(
"input.vertical.c", config=config, grid=ds_like, fallback="c"
)
usld = full_like(c, nodata=nodata)
for sl in usld["layer"]:
usld.loc[dict(layer=sl)] = xr.where(ds_like["wflow_subcatch"], 0.0, nodata)
ds_out["ustorelayerdepth"] = usld
# Compute other soil states
# Get required variables from config
st = get_grid_from_config(
"input.vertical.soilthickness",
config=config,
grid=ds_like,
fallback="SoilThickness",
)
ts = get_grid_from_config(
"input.vertical.theta_s", config=config, grid=ds_like, fallback="thetaS"
)
tr = get_grid_from_config(
"input.vertical.theta_r", config=config, grid=ds_like, fallback="thetaR"
)
ksh = get_grid_from_config(
"input.lateral.subsurface.ksathorfrac",
config=config,
grid=ds_like,
fallback="KsatHorFrac",
)
ksv = get_grid_from_config(
"input.vertical.kv_0", config=config, grid=ds_like, fallback="KsatVer"
)
f = get_grid_from_config(
"input.vertical.f", config=config, grid=ds_like, fallback="f"
)
sl = get_grid_from_config(
"input.vertical.slope", config=config, grid=ds_like, fallback="Slope"
)
# satwaterdepth
swd = 0.85 * st * (ts - tr)
swd = swd.where(ds_like["wflow_subcatch"] > 0, nodata)
swd.raster.set_nodata(nodata)
ds_out["satwaterdepth"] = swd
# ssf
zi = np.maximum(0.0, st - swd / (ts - tr))
kh0 = ksh * ksv * 0.001 * (86400 / timestepsecs)
ssf = (kh0 * np.maximum(0.00001, sl) / (f * 1000)) * (
np.exp(-f * 1000 * zi * 0.001)
) - (np.exp(-f * 1000 * st) * np.sqrt(ds_like.raster.area_grid()))
ssf = ssf.where(ds_like["wflow_subcatch"] > 0, nodata)
ssf.raster.set_nodata(nodata)
ds_out["ssf"] = ssf
# River
zeromap_riv = ["q_river", "h_river", "h_av_river"]
# 1D floodplain
if config["model"].get("floodplain_1d", False):
zeromap_riv.extend(["q_floodplain", "h_floodplain"])
states_config["state.lateral.floodplain.q"] = "q_floodplain"
states_config["state.lateral.floodplain.h"] = "h_floodplain"
for var in zeromap_riv:
value = 0.0
da_param = grid_from_constant(
ds_like,
constant=value,
name=var,
dtype=dtype,
nodata=nodata,
mask_name="wflow_river",
)
da_param = da_param.rename(var)
ds_out[var] = da_param
# reservoir
if config["model"].get("reservoirs", False):
tff = get_grid_from_config(
"input.lateral.river.reservoir.targetfullfrac",
config=config,
grid=ds_like,
fallback="ResTargetFullFrac",
)
mv = get_grid_from_config(
"input.lateral.river.reservoir.maxvolume",
config=config,
grid=ds_like,
fallback="ResMaxVolume",
)
locs = get_grid_from_config(
"input.lateral.river.reservoir.locs",
config=config,
grid=ds_like,
fallback="wflow_reservoirlocs",
)
resvol = tff * mv
resvol = xr.where(locs > 0, resvol, nodata)
resvol.raster.set_nodata(nodata)
ds_out["volume_reservoir"] = resvol
states_config["state.lateral.river.reservoir.volume"] = "volume_reservoir"
# lake
if config["model"].get("lakes", False):
ll = get_grid_from_config(
"input.lateral.river.lake.waterlevel",
config=config,
grid=ds_like,
fallback="LakeAvgLevel",
)
ll = ll.where(ll != ll.raster.nodata, nodata)
ll.raster.set_nodata(nodata)
ds_out["waterlevel_lake"] = ll
states_config["state.lateral.river.lake.waterlevel"] = "waterlevel_lake"
# glacier
if config["model"].get("glacier", False):
gs_vn = config["input"]["vertical"].get("glacierstore", "wflow_glacierstore")
if gs_vn in ds_like:
ds_out["glacierstore"] = ds_like[gs_vn]
else:
glacstore = grid_from_constant(
ds_like,
value=5500.0,
name="glacierstore",
nodata=nodata,
dtype=dtype,
mask="wflow_subcatch",
)
ds_out["glacierstore"] = glacstore
states_config["state.vertical.glacierstore"] = "glacierstore"
# Add time dimension
ds_out = ds_out.expand_dims(dim=dict(time=[timestamp]))
return ds_out, states_config