Source code for hydromt.model.processes.meteo

"""Implementaion of forcing workflows."""

import logging
import re
from typing import Dict, Literal, Optional, Union

import numpy as np
import pandas as pd
import xarray as xr

from hydromt._compat import HAS_PYET

if HAS_PYET:
    import pyet

logger = logging.getLogger(__name__)

__all__ = [
    "precip",
    "temp",
    "press",
    "wind",
    "pet",
    "press_correction",
    "temp_correction",
    "pet_debruin",
    "pet_makkink",
    "pm_fao56",
]

PET_METHODS = [
    "debruin",
    "makkink",
    "penman-monteith_rh_simple",
    "penman-monteith_tdew",
]

PetMethods = Literal[
    "debruin",
    "makkink",
    "penman-monteith_rh_simple",
    "penman-monteith_tdew",
]


[docs] def precip( precip: xr.DataArray, da_like: Union[xr.DataArray, xr.Dataset], clim: Optional[xr.DataArray] = None, freq: Optional[Union[str, pd.Timedelta]] = None, reproj_method: str = "nearest_index", resample_kwargs: Optional[Dict[str, str]] = None, ) -> xr.DataArray: """Return the lazy reprojection of precipitation to model. Applies the projection to the grid and resampling of time dimension to frequency. Parameters ---------- precip: xarray.DataArray DataArray of precipitation forcing [mm] da_like: xarray.DataArray or Dataset DataArray of the target resolution and projection. clim: xarray.DataArray DataArray of monthly precipitation climatology. If provided this is used to to correct the precip downscaling. freq: str, Timedelta output frequency of time dimension reproj_method: str, optional Method for spatital reprojection of precip, by default 'nearest_index' resample_kwargs: Additional key-word arguments (e.g. label, closed) for time resampling method kwargs: Additional arguments to pass through to underlying methods. Returns ------- p_out: xarray.DataArray (lazy) processed precipitation forcing """ resample_kwargs = resample_kwargs or {} if precip.raster.dim0 != "time": raise ValueError(f'First precip dim should be "time", not {precip.raster.dim0}') # downscale precip (lazy); global min of zero p_out = np.fmax(precip.raster.reproject_like(da_like, method=reproj_method), 0) # correct precip based on high-res monthly climatology if clim is not None: # make sure first dim is month if clim.raster.dim0 != "month": clim = clim.rename({clim.raster.dim0: "month"}) if clim["month"].size != 12: raise ValueError("Precip climatology does not contain 12 months.") # set missings to NaN clim = clim.raster.mask_nodata() assert clim is not None # calculate downscaling multiplication factor clim_coarse = clim.raster.reproject_like( precip, method="average" ).raster.reproject_like(da_like, method="average") clim_fine = clim.raster.reproject_like(da_like, method="average") p_mult = xr.where(clim_coarse > 0, clim_fine / clim_coarse, 1.0).fillna(1.0) # multiply with monthly multiplication factor p_out = p_out.groupby("time.month") * p_mult # resample time p_out.name = "precip" p_out.attrs.update(unit="mm") if freq is not None: resample_kwargs.update(upsampling="bfill", downsampling="sum") p_out = resample_time(p_out, freq, conserve_mass=True, **resample_kwargs) return p_out
# use dem_model (from staticmaps) and dem_forcing (meteo) in config file
[docs] def temp( temp, dem_model: xr.DataArray, dem_forcing: Optional[xr.DataArray] = None, lapse_correction: bool = True, freq: Optional[Union[str, pd.Timedelta]] = None, reproj_method: str = "nearest_index", lapse_rate: float = -0.0065, resample_kwargs: Optional[Dict[str, str]] = None, ) -> xr.DataArray: """Return lazy reprojection of temperature to model grid. Use lapse_rate for downscaling, and resampling of time dimension to frequency. Parameters ---------- temp: xarray.DataArray DataArray of temperature forcing [°C] dem_model: xarray.DataArray DataArray of the target resolution and projection, contains elevation data dem_forcing: xarray.DataArray, optional DataArray of elevation at forcing resolution. If provided this is used with `dem_model` to correct the temperature downscaling using a lapse rate lapse_correction : bool, optional If True, temperature is correctured based on lapse rate, by default True. freq: str, Timedelta, optional output frequency of timedimension reproj_method: str, optional Method for spatital reprojection of precip, by default 'nearest_index' lapse_rate: float, optional lapse rate of temperature [C m-1] (default: -0.0065) resample_kwargs: dict, optional Additional key-word arguments (e.g. label, closed) for time resampling method Returns ------- t_out: xarray.DataArray (lazy) processed temperature forcing """ resample_kwargs = resample_kwargs or {} if temp.raster.dim0 != "time": raise ValueError(f'First temp dim should be "time", not {temp.raster.dim0}') # apply lapse rate if lapse_correction: # if dem_forcing is not provided, reproject dem_model dem_model = dem_model.raster.mask_nodata() if dem_forcing is None: dem_forcing = dem_model.raster.reproject_like(temp, "average") if np.any(np.isnan(dem_forcing)): logger.warning( "Temperature lapse rate could be computed for some (edge) cells. " "Consider providing a full coverage dem_forcing." ) else: # assume nans in dem_forcing occur above the ocean only -> set to zero dem_forcing = dem_forcing.raster.mask_nodata().fillna(0) assert dem_forcing is not None dem_forcing = dem_forcing.raster.reproject_like(temp, "average") # compute temperature at quasi MSL assert dem_forcing is not None t_add_sea_level = temp_correction(dem_forcing, lapse_rate=lapse_rate) temp = temp - t_add_sea_level # downscale temperature (lazy) and add zeros with mask to mask areas outside AOI t_out = temp.raster.reproject_like(dem_model, method=reproj_method) if lapse_correction: # correct temperature based on high-res DEM # calculate downscaling addition t_add_elevation = temp_correction(dem_model, lapse_rate=lapse_rate) t_out = t_out + t_add_elevation # resample time t_out.name = "temp" t_out.attrs.update(unit="degree C.") if freq is not None: resample_kwargs.update(upsampling="bfill", downsampling="mean") t_out = resample_time( t_out, freq, conserve_mass=False, logger=logger, **resample_kwargs ) return t_out
[docs] def press( press: xr.DataArray, dem_model: xr.DataArray, lapse_correction: bool = True, freq: Optional[Union[str, pd.Timedelta]] = None, reproj_method: str = "nearest_index", lapse_rate: float = -0.0065, resample_kwargs: Optional[Dict[str, str]] = None, ) -> xr.DataArray: """Return lazy reprojection of pressure to model grid. Resample time dimension to frequency. Parameters ---------- press: xarray.DataArray DataArray of pressure forcing [hPa] dem_model: xarray.DataArray DataArray of the target resolution and projection, contains elevation data lapse_correction: str, optional If True 'dem_model` is used to correct the pressure with the `lapse_rate`. freq: str, Timedelta output frequency of timedimension reproj_method: str, optional Method for spatital reprojection of precip, by default 'nearest_index' lapse_rate: float, optional lapse rate of temperature [C m-1] (default: -0.0065) resample_kwargs: Additional key-word arguments (e.g. label, closed) for time resampling method Returns ------- press_out: xarray.DataArray (lazy) processed pressure forcing """ if resample_kwargs is None: resample_kwargs = {} if press.raster.dim0 != "time": raise ValueError(f'First press dim should be "time", not {press.raster.dim0}') # downscale pressure (lazy) press_out = press.raster.reproject_like(dem_model, method=reproj_method) # correct temperature based on high-res DEM if lapse_correction: # calculate downscaling addition press_factor = press_correction(dem_model, lapse_rate=lapse_rate) press_out = press_out * press_factor # resample time press_out.name = "press" press_out.attrs.update(unit="hPa") if freq is not None: resample_kwargs.update(upsampling="bfill", downsampling="mean") press_out = resample_time( press_out, freq, conserve_mass=False, logger=logger, **resample_kwargs ) return press_out
[docs] def wind( da_model: Union[xr.DataArray, xr.Dataset], wind: Optional[xr.DataArray] = None, wind_u: Optional[xr.DataArray] = None, wind_v: Optional[xr.DataArray] = None, altitude: float = 10, altitude_correction: bool = False, freq: Optional[pd.Timedelta] = None, reproj_method: str = "nearest_index", resample_kwargs: Optional[Dict[str, str]] = None, ) -> xr.DataArray: """Return lazy reprojection of wind speed to model grid. Resample time dimension to frequency. Either provides wind speed directly or both wind_u and wind_v components. Parameters ---------- da_model: xarray.DataArray DataArray of the target resolution and projection wind: xarray.DataArray DataArray of wind speed forcing [m s-1] wind_u: xarray.DataArray DataArray of U component of wind speed forcing [m s-1] wind_v: xarray.DataArray DataArray of V component of wind speed forcing [m s-1] altitude: float, optional Altitude of wind speed data. By default 10m. altitude_correction: str, optional If True wind speed is re-calculated to wind speed at 2 meters using original `altitude`. freq: str, Timedelta output frequency of timedimension reproj_method: str, optional Method for spatital reprojection of precip, by default 'nearest_index' resample_kwargs: Additional key-word arguments (e.g. label, closed) for time resampling method Returns ------- wind_out: xarray.DataArray (lazy) processed wind forcing """ resample_kwargs = resample_kwargs or {} if wind_u is not None and wind_v is not None: wind = np.sqrt(np.power(wind_u, 2) + np.power(wind_v, 2)) elif wind is None: raise ValueError("Either wind or wind_u and wind_v variables must be supplied.") assert wind is not None if wind.raster.dim0 != "time": raise ValueError(f'First wind dim should be "time", not {wind.raster.dim0}') # compute wind at 2 meters altitude if altitude_correction: wind = wind * (4.87 / np.log((67.8 * altitude) - 5.42)) # downscale wind (lazy) assert wind is not None wind_out = wind.raster.reproject_like(da_model, method=reproj_method) # resample time wind_out.name = "wind" wind_out.attrs.update(unit="m s-1") if freq is not None: resample_kwargs.update(upsampling="bfill", downsampling="mean") wind_out = resample_time(wind_out, freq, conserve_mass=False, **resample_kwargs) return wind_out
[docs] def pet( ds: xr.Dataset, temp: xr.DataArray, dem_model: xr.DataArray, method: PetMethods = "debruin", press_correction: bool = False, wind_correction: bool = True, wind_altitude: float = 10, reproj_method: str = "nearest_index", freq: Optional[str] = None, resample_kwargs: Optional[Dict[str, str]] = None, ) -> xr.DataArray: """Determine reference evapotranspiration. (lazy reprojection on model grid and resampling of time dimension to frequency). Parameters ---------- ds : xarray.Dataset Dataset with climate variables: pressure [hPa], global radiation [W m-2], TOA incident solar radiation [W m-2], wind [m s-1] * Required variables: {"temp", "press" or "press_msl", "kin"} * additional variables for debruin: {"kout"} * additional variables for penman-monteith_rh_simple: {"temp_min", "temp_max", "wind" or "wind_u"+"wind_v", "rh"} * additional variables for penman-monteith_tdew: {"temp_min", "temp_max", "wind" or "wind_u"+"wind_v", "temp_dew"} temp : xarray.DataArray DataArray with temperature on model grid resolution [°C] dem_model : xarray.DataArray DataArray of the target resolution and projection, contains elevation data method : {'debruin', 'makkink', "penman-monteith_rh_simple", "penman-monteith_tdew"} Potential evapotranspiration method. if penman-monteith is used, requires the installation of the pyet package. reproj_method: str, optional Method for spatital reprojection of precip, by default 'nearest_index' press_correction : bool, default False If True pressure is corrected, based on elevation data of `dem_model` wind_altitude: float, optional ALtitude of wind speed data. By default 10m. wind_correction: str, optional If True wind speed is re-calculated to wind speed at 2 meters using original `wind_altitude`. freq: str, Timedelta, default None output frequency of timedimension resample_kwargs: Additional key-word arguments (e.g. label, closed) for time resampling method Returns ------- pet_out : xarray.DataArray (lazy) reference evapotranspiration """ if method not in PET_METHODS: raise ValueError( f"Unknown pet method '{method}', select from {','.join(PET_METHODS)}" ) # resample in time resample_kwargs = resample_kwargs or {} if temp.raster.dim0 != "time" or ds.raster.dim0 != "time": raise ValueError('First dimension of input variables should be "time"') # make sure temp and ds align both temporally and spatially if not np.all(temp["time"].values == ds["time"].values): raise ValueError("All input variables have same time index.") if not temp.raster.identical_grid(dem_model): raise ValueError("Temp variable should be on model grid.") # resample input to model grid ds = ds.raster.reproject_like(dem_model, method=reproj_method) # Process bands like 'pressure' and 'wind' if press_correction: ds["press"] = press( ds["press_msl"], dem_model, lapse_correction=press_correction, freq=None, # do not change freq of press, put pet_out later reproj_method=reproj_method, ) else: if "press_msl" in ds: ds = ds.rename({"press_msl": "press"}) elif HAS_PYET: # calculate pressure from elevation [kPa] ds["press"] = pyet.calc_press(dem_model) # convert to hPa to be consistent with press function calculation: ds["press"] = ds["press"] * 10 else: raise ModuleNotFoundError( "If 'press' is supplied and 'press_correction' is not used," + " the pyet package must be installed." ) timestep = to_timedelta(ds).total_seconds() if method == "debruin": pet_out = pet_debruin( temp, ds["press"], ds["kin"], ds["kout"], timestep=timestep, ) elif method == "makkink": pet_out = pet_makkink(temp, ds["press"], ds["kin"], timestep=timestep) elif "penman-monteith" in method: logger.info("Calculating Penman-Monteith ref evaporation") # Add wind # compute wind from u and v components at 10m (for era5) if ("wind10_u" in ds.data_vars) & ("wind10_v" in ds.data_vars): ds["wind"] = wind( da_model=dem_model, wind_u=ds["wind10_u"], wind_v=ds["wind10_v"], altitude=wind_altitude, altitude_correction=wind_correction, ) else: ds["wind"] = wind( da_model=dem_model, wind=ds["wind"], altitude=wind_altitude, altitude_correction=wind_correction, ) if method == "penman-monteith_rh_simple": pet_out = pm_fao56( temp["temp"], temp["temp_max"], temp["temp_min"], ds["press"], ds["kin"], ds["wind"], ds["rh"], dem_model, "rh", ) elif method == "penman-monteith_tdew": pet_out = pm_fao56( temp["temp"], temp["temp_max"], temp["temp_min"], ds["press"], ds["kin"], ds["wind"], ds["temp_dew"], dem_model, "temp_dew", ) # resample in time pet_out.name = "pet" pet_out.attrs.update(unit="mm") if freq is not None: resample_kwargs.update(upsampling="bfill", downsampling="sum") pet_out = resample_time(pet_out, freq, conserve_mass=True, **resample_kwargs) return pet_out
[docs] def press_correction( dem_model: xr.DataArray, g: float = 9.80665, r_air: float = 8.3144621, mo: float = 0.0289644, lapse_rate: float = -0.0065, ) -> xr.DataArray: """Pressure correction based on elevation lapse_rate. Parameters ---------- dem_model : xarray.DataArray DataArray with high res lat/lon axis and elevation data g : float, default 9.80665 gravitational constant [m s-2] R_air : float, default 8.3144621 specific gas constant for dry air [J mol-1 K-1] Mo : float, default 0.0289644 molecular weight of gas [kg / mol] lapse_rate : float, deafult -0.0065 lapse rate of temperature [C m-1] Returns ------- press_fact : xarray.DataArray pressure correction factor """ # constant power = g * mo / (r_air * lapse_rate) press_fact = np.power(288.15 / (288.15 + lapse_rate * dem_model), power).fillna(1.0) return press_fact
[docs] def temp_correction(dem: xr.DataArray, lapse_rate: float = -0.0065) -> xr.DataArray: """Temperature correction based on elevation data. Parameters ---------- dem : xarray.DataArray DataArray with elevation lapse_rate : float, default -0.0065 lapse rate of temperature [°C m-1] Returns ------- temp_add : xarray.DataArray temperature addition """ temp_add = (dem * lapse_rate).fillna(0) return temp_add
[docs] def pet_debruin( temp: xr.DataArray, press: xr.DataArray, k_in: xr.DataArray, k_ext: xr.DataArray, timestep: int = 86400, cp: float = 1005.0, beta: float = 20.0, cs: float = 110.0, ) -> xr.DataArray: """Determine De Bruin (2016) reference evapotranspiration. Parameters ---------- temp : xarray.DataArray DataArray with temperature [°C] press : xarray.DataArray pressure at surface [hPa] k_in : xarray.DataArray global (=short wave incoming) radiation [W m-2] k_ext : xarray.DataArray TOA incident solar radiation [W m-2] timestep : int, default 86400 seconds per timestep cp : float, default 1005.0 standard cp [J kg-1 K-1] beta : float, default 20.0 correction constant [W m-2] Cs : float, default 110.0 empirical constant [W m-2] Returns ------- pet : xarray.DataArray reference evapotranspiration """ # saturation and actual vapour pressure at given temperature [hPa °C-1] esat = 6.112 * np.exp((17.67 * temp) / (temp + 243.5)) # slope of vapour pressure curve [hPa °C-1] slope = esat * (17.269 / (temp + 243.5)) * (1.0 - (temp / (temp + 243.5))) # compute latent heat of vapourization [J kg-1] lam = (2.502 * 10**6) - (2250.0 * temp) # psychometric constant [hPa °C-1] gamma = (cp * press) / (0.622 * lam) # compute ref. evaporation (with global radiation, therefore calling it potential) # in J m-2 over whole period ep_joule = ( (slope / (slope + gamma)) * (((1.0 - 0.23) * k_in) - (cs * (k_in / (k_ext + 0.00001)))) ) + beta ep_joule = xr.where(np.isclose(k_ext, 0.0), 0.0, ep_joule) pet = ((ep_joule / lam) * timestep).astype(np.float32) pet = xr.where(pet > 0.0, pet, 0.0) return pet
[docs] def pet_makkink( temp: xr.DataArray, press: xr.DataArray, k_in: xr.DataArray, timestep: int = 86400, cp: float = 1005.0, ) -> xr.DataArray: """Determnines Makkink reference evapotranspiration. Parameters ---------- temp : xarray.DataArray DataArray with temperature [°C] press : xarray.DataArray DataArray with pressure [hPa] k_in : xarray.DataArray DataArray with global radiation [W m-2] timestep : int, default 86400 seconds per timestep cp : float, default 1005.0 standard cp [J kg-1 K-1] Returns ------- pet : xarray.DataArray (lazy) reference evapotranspiration """ # saturation and actual vapour pressure at given temperature [hPa °C-1] esat = 6.112 * np.exp((17.67 * temp) / (temp + 243.5)) # slope of vapour pressure curve [hPa °C-1] slope = esat * (17.269 / (temp + 243.5)) * (1.0 - (temp / (temp + 243.5))) # compute latent heat of vapourization [J kg-1] lam = (2.502 * 10**6) - (2250.0 * temp) # psychometric constant [hPa °C-1] gamma = (cp * press) / (0.622 * lam) ep_joule = 0.65 * slope / (slope + gamma) * k_in pet = ((ep_joule / lam) * timestep).astype(np.float32) pet = xr.where(pet > 0.0, pet, 0.0) return pet
[docs] def pm_fao56( temp: xr.DataArray, temp_max: xr.DataArray, temp_min: xr.DataArray, press: xr.DataArray, kin: xr.DataArray, wind: xr.DataArray, temp_dew: xr.DataArray, dem: xr.DataArray, var: str = "temp_dew", ) -> xr.DataArray: """Estimate daily reference evapotranspiration (ETo). Based on a hypothetical short grass reference surface using the FAO-56 Penman-Monteith equation. Actual vapor pressure is derived either from relative humidity or dewpoint temperature (depending on var_for_avp_name). Based on equation 6 in Allen et al (1998) and using the functions provided by the pyet package () Parameters ---------- temp : xarray.DataArray DataArray with daily temperature [°C] temp_max : xarray.DataArray DataArray with maximum daily temperature [°C] temp_min : xarray.DataArray DataArray with minimum daily temperature [°C] press : xarray.DataArray DataArray with pressure [hPa] kin : xarray.DataArray DataArray with global radiation [W m-2] wind : xarray.DataArray DataArray with wind speed at 2m above the surface [m s-1] temp_dew : xarray.DataArray DataArray with either temp_dew (dewpoint temperature at 2m above surface [°C]) or rh (relative humidity [%]) to estimate actual vapor pressure dem : xarray.DataArray DataArray with elevation at model resolution [m] var : str, optional String with variable name used to estimate actual vapor pressure (chose from ["temp_dew", "rh"]) Returns ------- xarray.DataArray DataArray with the estimated daily reference evapotranspiration pet [mm d-1] Raises ------ ModuleNotFoundError In case the pyet module is not installed """ # Small check for libraries if not HAS_PYET: raise ModuleNotFoundError("Penman-Monteith FAO-56 requires the 'pyet' library") # Precalculated variables: lat = kin[kin.raster.y_dim] * (np.pi / 180) # latitude in radians # Vapor pressure svp = pyet.calc_e0(tmean=temp) if var == "temp_dew": avp = pyet.calc_e0(tmean=temp_dew) elif var == "rh": avp = pyet.calc_ea(tmax=temp_max, tmin=temp_min, rh=temp_dew) # Net radiation dates = pyet.utils.get_index(kin) er = pyet.extraterrestrial_r( dates, lat ) # Extraterrestrial daily radiation [MJ m-2 d-1] csr = pyet.calc_rso(er, dem) # Clear-sky solar radiation [MJ m-2 day-1] # Net shortwave radiation [MJ m-2 d-1] swr = pyet.calc_rad_short(kin * (86400 / 1e6)) # Net longwave radiation [MJ m-2 d-1] lwr = pyet.calc_rad_long( kin * (86400 / 1e6), tmax=temp_max, tmin=temp_min, rso=csr, ea=avp ) nr = swr - lwr # daily net radiation # Penman Monteith FAO-56 gamma = pyet.calc_psy(press / 10) # psychrometric constant dlt = pyet.calc_vpc( temp ) # Slope of saturation vapour pressure curve at air Temperature [kPa °C-1]. gamma1 = gamma * (1 + 0.34 * wind) den = dlt + gamma1 num1 = (0.408 * dlt * (nr - 0)) / den num2 = (gamma * (svp - avp) * 900 * wind / (temp + 273.15)) / den pet = num1 + num2 pet = pyet.utils.clip_zeros(pet, True) return pet.rename("pet")
[docs] def resample_time( da: xr.DataArray, freq: Union[str, pd.Timedelta], label: str = "right", closed: str = "right", upsampling: str = "bfill", downsampling: str = "mean", conserve_mass: bool = True, ) -> xr.DataArray: """Resample data to destination frequency. Skip if input data already at output frequency. Parameters ---------- da: xarray.DataArray Input data freq: str, pd.Timedelta Output frequency. label: {'left', 'right'}, optional Side of each interval to use for labeling. By default 'right'. closed: {'left', 'right'}, optional Side of each interval to treat as closed. By default 'right'. upsampling, downsampling: str, optional Resampling method if output frequency is higher, lower (resp.) compared to input frequency. conserve_mass: bool, optional If True multiply output with relative change in frequency to conserve mass Returns ------- pet : xarray.DataArray Resampled data. """ da_out = da dfreq = delta_freq(da, freq) if not np.isclose(dfreq, 1.0): resample = upsampling if dfreq < 1 else downsampling pre = "up" if dfreq < 1 else "down" logger.debug( f"{pre}sampling {da.name} using {resample}; conserve mass: {conserve_mass}" ) da_resampled = da.resample(time=freq, skipna=True, label=label, closed=closed) da_out = getattr(da_resampled, resample)() if conserve_mass: da_out = da_out * min(dfreq, 1) return da_out
[docs] def delta_freq( da_or_freq: Union[Union[str, pd.Timedelta], xr.DataArray], da_or_freq1: Union[Union[str, pd.Timedelta], xr.DataArray], ) -> pd.Timedelta: """Return relative difference between dataset mean timestep and destination freq. <1 : upsampling 1 : same >1 : downsampling. """ return to_timedelta(da_or_freq1) / to_timedelta(da_or_freq)
def to_timedelta( da_or_freq: Union[Union[str, pd.Timedelta], xr.DataArray], ) -> pd.Timedelta: """Convert time dimension or frequency to timedelta.""" if isinstance(da_or_freq, (xr.DataArray, xr.Dataset)): freq = da_to_timedelta(da_or_freq) else: freq = freq_to_timedelta(da_or_freq) return freq def da_to_timedelta(da: xr.DataArray) -> pd.Timedelta: """Convert time dimenstion in dataset to timedelta.""" return pd.to_timedelta(np.diff(da.time).mean()) def freq_to_timedelta(freq: Union[str, pd.Timedelta]) -> pd.Timedelta: """Convert frequency to timedelta.""" # Add '1' to freq that doesn't have any digit if isinstance(freq, str) and not bool(re.search(r"\d", freq)): freq = "1{}".format(freq) # Convert str to datetime.timedelta return pd.to_timedelta(freq)