Source code for hydromt.workflows.forcing

import pandas as pd
import xarray as xr
import xarray.core.resample
import numpy as np
import re
import logging

logger = logging.getLogger(__name__)


[docs]def precip( precip, da_like, clim=None, freq=None, reproj_method="nearest_index", resample_kwargs={}, logger=logger, ): """Lazy reprojection of precipitation to model grid and resampling of time dimension to frequency. Parameters ---------- precip: xarray.DataArray DataArray of precipitation forcing [mm] da_like: xarray.DataArray or Dataset DataArray of the target resolution and projection. clim: xarray.DataArray DataArray of monthly precipitation climatology. If provided this is used to to correct the precip downscaling. freq: str, Timedelta output frequency of time dimension reproj_method: str, optional Method for spatital reprojection of precip, by default 'nearest_index' resample_kwargs: Additional key-word arguments (e.g. label, closed) for time resampling method Returns -------- p_out: xarray.DataArray (lazy) processed precipitation forcing """ if precip.raster.dim0 != "time": raise ValueError(f'First precip dim should be "time", not {precip.raster.dim0}') # downscale precip (lazy); global min of zero p_out = np.fmax(precip.raster.reproject_like(da_like, method=reproj_method), 0) # correct precip based on high-res monthly climatology if clim is not None: # make sure first dim is month clim = clim.rename({clim.raster.dim0: "month"}) if not clim["month"].size == 12: raise ValueError("Precip climatology does not contain 12 months.") # set missings to NaN clim = clim.raster.mask_nodata() # calculate downscaling multiplication factor clim_coarse = clim.raster.reproject_like( precip, method="average" ).raster.reproject_like(da_like, method="average") clim_fine = clim.raster.reproject_like(da_like, method="average") p_mult = xr.where(clim_coarse > 0, clim_fine / clim_coarse, 1.0).fillna(1.0) # multiply with monthly multiplication factor p_out = p_out.groupby("time.month") * p_mult # resample time p_out.name = "precip" p_out.attrs.update(unit="mm") if freq is not None: resample_kwargs.update(upsampling="bfill", downsampling="sum", logger=logger) p_out = resample_time(p_out, freq, conserve_mass=True, **resample_kwargs) return p_out
# use dem_model (from staticmaps) and dem_forcing (meteo) in ini file
[docs]def temp( temp, dem_model, dem_forcing=None, lapse_correction=True, freq=None, reproj_method="nearest_index", lapse_rate=-0.0065, resample_kwargs={}, logger=logger, ): """Lazy reprojection of temperature to model grid using lapse_rate for downscaling, and resampling of time dimension to frequency. Parameters ---------- temp: xarray.DataArray DataArray of temperature forcing [°C] dem_model: xarray.DataArray DataArray of the target resolution and projection, contains elevation data dem_forcing: xarray.DataArray DataArray of elevation at forcing resolution. If provided this is used with `dem_model` to correct the temperature downscaling using a lapse rate lapse_correction : bool, optional If True, temperature is correctured based on lapse rate, by default True. freq: str, Timedelta output frequency of timedimension reproj_method: str, optional Method for spatital reprojection of precip, by default 'nearest_index' lapse_rate: float, optional lapse rate of temperature [C m-1] (default: -0.0065) resample_kwargs: Additional key-word arguments (e.g. label, closed) for time resampling method Returns -------- t_out: xarray.DataArray (lazy) processed temperature forcing """ if temp.raster.dim0 != "time": raise ValueError(f'First temp dim should be "time", not {temp.raster.dim0}') # apply lapse rate if lapse_correction: # if dem_forcing is not provided, reproject dem_model dem_model = dem_model.raster.mask_nodata() if dem_forcing is None: dem_forcing = dem_model.raster.reproject_like(temp, "average") if np.any(np.isnan(dem_forcing)): logger.warning( "Temperature lapse rate could be computed for some (edge) cells. " "Consider providing a full coverage dem_forcing." ) else: # assume nans in dem_forcing occur above the ocean only -> set to zero dem_forcing = dem_forcing.raster.mask_nodata().fillna(0) dem_forcing = dem_forcing.raster.reproject_like(temp, "average") # compute temperature at quasi MSL t_add_sea_level = temp_correction(dem_forcing, lapse_rate=lapse_rate) temp = temp - t_add_sea_level # downscale temperature (lazy) and add zeros with mask to mask areas outside AOI t_out = temp.raster.reproject_like(dem_model, method=reproj_method) if lapse_correction: # correct temperature based on high-res DEM # calculate downscaling addition t_add_elevation = temp_correction(dem_model, lapse_rate=lapse_rate) t_out = t_out + t_add_elevation # resample time t_out.name = "temp" t_out.attrs.update(unit="degree C.") if freq is not None: resample_kwargs.update(upsampling="bfill", downsampling="mean", logger=logger) t_out = resample_time(t_out, freq, conserve_mass=False, **resample_kwargs) return t_out
[docs]def press( press, dem_model, lapse_correction=True, freq=None, reproj_method="nearest_index", lapse_rate=-0.0065, resample_kwargs={}, logger=logger, ): """Lazy reprojection of pressure to model grid and resampling of time dimension to frequency. Parameters ---------- press: xarray.DataArray DataArray of pressure forcing [hPa] dem_model: xarray.DataArray DataArray of the target resolution and projection, contains elevation data lapse_correction: str, optional If True 'dem_model` is used to correct the pressure with the `lapse_rate`. freq: str, Timedelta output frequency of timedimension reproj_method: str, optional Method for spatital reprojection of precip, by default 'nearest_index' lapse_rate: float, optional lapse rate of temperature [C m-1] (default: -0.0065) resample_kwargs: Additional key-word arguments (e.g. label, closed) for time resampling method Returns -------- press_out: xarray.DataArray (lazy) processed pressure forcing """ if press.raster.dim0 != "time": raise ValueError(f'First press dim should be "time", not {press.raster.dim0}') # downscale pressure (lazy) press_out = press.raster.reproject_like(dem_model, method=reproj_method) # correct temperature based on high-res DEM if lapse_correction: # calculate downscaling addition press_factor = press_correction(dem_model, lapse_rate=lapse_rate) press_out = press_out * press_factor # resample time press_out.name = "press" press_out.attrs.update(unit="hPa") if freq is not None: resample_kwargs.update(upsampling="bfill", downsampling="mean", logger=logger) press_out = resample_time( press_out, freq, conserve_mass=False, **resample_kwargs ) return press_out
[docs]def pet( ds, temp, dem_model, method="debruin", press_correction=False, reproj_method="nearest_index", lapse_rate=-0.0065, freq=None, resample_kwargs={}, logger=logger, ): """Determines reference evapotranspiration (lazy reprojection on model grid and resampling of time dimension to frequency). Parameters ---------- ds : xarray.Dataset Dataset with pressure [hPa], global radiation [W m-2], TOA incident solar radiation [W m-2] temp : xarray.DataArray DataArray with temperature [°C] dem_model : xarray.DataArray DataArray of the target resolution and projection, contains elevation data method : {'debruin', 'makkink'} Potential evapotranspiration method. press_correction : bool, default False If True pressure is corrected, based on elevation data of `dem_model` freq : str, Timedelta, default None output frequency of timedimension resample_kwargs: Additional key-word arguments (e.g. label, closed) for time resampling method Returns -------- pet_out : xarray.DataArray (lazy) reference evapotranspiration """ # # resample in time if temp.raster.dim0 != "time" or ds.raster.dim0 != "time": raise ValueError(f'First dimension of input variables should be "time"') # make sure temp and ds align both temporally and spatially if not np.all(temp["time"].values == ds["time"].values): raise ValueError("All input variables have same time index.") if not temp.raster.identical_grid(dem_model): raise ValueError("Temp variable should be on model grid.") # resample input to model grid ds_out = ds.raster.reproject_like(dem_model, method=reproj_method) if press_correction: ds_out["press"] = press( ds["press_msl"], dem_model, lapse_correction=press_correction, freq=None, # do not change freq of press, put pet_out later reproj_method=reproj_method, ) else: ds_out = ds_out.rename({"press_msl": "press"}) timestep = to_timedelta(ds).total_seconds() if method == "debruin": pet_out = pet_debruin( temp, ds_out["press"], ds_out["kin"], ds_out["kout"], timestep=timestep, ) if method == "makkink": pet_out = pet_makkink(temp, ds_out["press"], ds_out["kin"], timestep=timestep) # resample in time pet_out.name = "pet" pet_out.attrs.update(unit="mm") if freq is not None: resample_kwargs.update(upsampling="bfill", downsampling="sum", logger=logger) pet_out = resample_time(pet_out, freq, conserve_mass=True, **resample_kwargs) return pet_out
[docs]def press_correction( dem_model, g=9.80665, R_air=8.3144621, Mo=0.0289644, lapse_rate=-0.0065 ): """Pressure correction based on elevation lapse_rate. Parameters ---------- dem_model : xarray.DataArray DataArray with high res lat/lon axis and elevation data g : float, default 9.80665 gravitational constant [m s-2] R_air : float, default 8.3144621 specific gas constant for dry air [J mol-1 K-1] Mo : float, default 0.0289644 molecular weight of gas [g / mol] LapseRate : float, deafult -0.0065 lapse rate of temperature [C m-1] Returns ------- press_fact : xarray.DataArray pressure correction factor """ # constant pow = g * Mo / (R_air * lapse_rate) press_fact = np.power(288.15 / (288.15 + lapse_rate * dem_model), pow).fillna(1.0) return press_fact
[docs]def temp_correction(dem, lapse_rate=-0.0065): """Temperature correction based on elevation data. Parameters ---------- dem : xarray.DataArray DataArray with elevation lapse_rate : float, default -0.0065 lapse rate of temperature [°C m-1] Returns ------- temp_add : xarray.DataArray temperature addition """ temp_add = (dem * lapse_rate).fillna(0) return temp_add
[docs]def pet_debruin( temp, press, k_in, k_ext, timestep=86400, cp=1005.0, beta=20.0, Cs=110.0 ): """Determines De Bruin (2016) reference evapotranspiration. Parameters ---------- temp : xarray.DataArray DataArray with temperature [°C] press : xarray.DataArray pressure at surface [hPa] k_in : xarray.DataArray global (=short wave incoming) radiation [W m-2] k_ext : xarray.DataArray} TOA incident solar radiation [W m-2] timestep : int, default 86400 seconds per timestep cp : float, default 1005.0 standard cp [J kg-1 K-1] beta : float, default 20.0 correction constant [W m-2] Cs : float, default 110.0 emperical constant [W m-2] Returns ------- pet : xarray.DataArray reference evapotranspiration """ # saturation and actual vapour pressure at given temperature [Pa] esat = 6.112 * np.exp((17.67 * temp) / (temp + 243.5)) # slope of vapour pressure curve slope = esat * (17.269 / (temp + 243.5)) * (1.0 - (temp / (temp + 243.5))) # compute latent heat of vapourization [J kg-1] lam = (2.502 * 10**6) - (2250.0 * temp) gamma = (cp * press) / (0.622 * lam) # compute ref. evaporation (with global radiation, therefore calling it potential) # in J m-2 over whole period ep_joule = ( (slope / (slope + gamma)) * (((1.0 - 0.23) * k_in) - (Cs * (k_in / (k_ext + 0.00001)))) ) + beta ep_joule = xr.where(k_ext == 0.0, 0.0, ep_joule) pet = ((ep_joule / lam) * timestep).astype(np.float32) pet = xr.where(pet > 0.0, pet, 0.0) return pet
[docs]def pet_makkink(temp, press, k_in, timestep=86400, cp=1005.0): """Determnines Makkink reference evapotranspiration. Parameters ---------- temp : xarray.DataArray DataArray with temperature [°C] press : xarray.DataArray DataArray with pressure [hPa] k_in : xarray.DataArray DataArray with global radiation [W m-2] timestep : int, default 86400 seconds per timestep cp : float, default 1005.0 standard cp [J kg-1 K-1] Returns -------- pet : xarray.DataArray (lazy) reference evapotranspiration """ # saturation and actual vapour pressure at given temperature [Pa] esat = 6.112 * np.exp((17.67 * temp) / (temp + 243.5)) # slope of vapour pressure curve slope = esat * (17.269 / (temp + 243.5)) * (1.0 - (temp / (temp + 243.5))) # compute latent heat of vapourization [J kg-1] lam = (2.502 * 10**6) - (2250.0 * temp) gamma = (cp * press) / (0.622 * lam) ep_joule = 0.65 * slope / (slope + gamma) * k_in pet = ((ep_joule / lam) * timestep).astype(np.float32) pet = xr.where(pet > 0.0, pet, 0.0) return pet
[docs]def resample_time( da, freq, label="right", closed="right", upsampling="bfill", downsampling="mean", conserve_mass=True, logger=logger, ): """Resample data to destination frequency. Skip if input data already at output frequency. Parameters ---------- da: xarray.DataArray Input data freq: str, pd.timedelta Output frequency. label: {'left', 'right'}, optional Side of each interval to use for labeling. By default 'right'. closed: {'left', 'right'}, optional Side of each interval to treat as closed. By default 'right'. upsampling, downsampling: str, optional Resampling method if output frequency is higher, lower (resp.) compared to input frequency. conserve_mass: bool, optional If True multiply output with relative change in frequency to conserve mass Returns -------- pet : xarray.DataArray Resampled data. """ da_out = da dfreq = delta_freq(da, freq) if not np.isclose(dfreq, 1.0): resample = upsampling if dfreq < 1 else downsampling pre = "up" if dfreq < 1 else "down" logger.debug( f"{pre}sampling {da.name} using {resample}; conserve mass: {conserve_mass}" ) if not hasattr(xr.core.resample.DataArrayResample, resample): raise ValueError(f"unknown resampling option {resample}") da_resampled = da.resample(time=freq, skipna=True, label=label, closed=closed) da_out = getattr(da_resampled, resample)() if conserve_mass: da_out = da_out * min(dfreq, 1) return da_out
[docs]def delta_freq(da_or_freq, da_or_freq1): """Returns the relative difference between the dataset mean timestep and destination freq <1 : upsampling 1 : same >1 : downsampling """ return to_timedelta(da_or_freq1) / to_timedelta(da_or_freq)
def to_timedelta(da_or_freq): if isinstance(da_or_freq, (xr.DataArray, xr.Dataset)): freq = da_to_timedelta(da_or_freq) else: freq = freq_to_timedelta(da_or_freq) return freq def da_to_timedelta(da): return pd.to_timedelta(np.diff(da.time).mean()) def freq_to_timedelta(freq): # Add '1' to freq that doesn't have any digit if isinstance(freq, str) and not bool(re.search(r"\d", freq)): freq = "1{}".format(freq) # Convert str to datetime.timedelta return pd.to_timedelta(freq)