Source code for imod.mf6.uzf

import numpy as np
import xarray as xr
import xugrid as xu

from imod.logging import init_log_decorator
from imod.mf6.boundary_condition import AdvancedBoundaryCondition, BoundaryCondition
from imod.mf6.validation import BOUNDARY_DIMS_SCHEMA
from imod.prepare.layer import get_upper_active_grid_cells
from imod.schemata import (
    AllInsideNoDataSchema,
    AllNoDataSchema,
    AllValueSchema,
    CoordsSchema,
    DimsSchema,
    DTypeSchema,
    IdentityNoDataSchema,
    IndexesSchema,
    OtherCoordsSchema,
)


[docs] class UnsaturatedZoneFlow(AdvancedBoundaryCondition): """ Unsaturated Zone Flow (UZF) package. TODO: Support timeseries file? Observations? Water Mover? Parameters ---------- surface_depression_depth: array of floats (xr.DataArray) is the surface depression depth of the UZF cell. kv_sat: array of floats (xr.DataArray) is the vertical saturated hydraulic conductivity of the UZF cell. NOTE: the UZF package determines the location of inactive cells where kv_sat is np.nan theta_res: array of floats (xr.DataArray) is the residual (irreducible) water content of the UZF cell. theta_sat: array of floats (xr.DataArray) is the saturated water content of the UZF cell. theta_init: array of floats (xr.DataArray) is the initial water content of the UZF cell. epsilon: array of floats (xr.DataArray) is the epsilon exponent of the UZF cell. infiltration_rate: array of floats (xr.DataArray) defines the applied infiltration rate of the UZF cell (LT -1). et_pot: array of floats (xr.DataArray, optional) defines the potential evapotranspiration rate of the UZF cell and specified GWF cell. Evapotranspiration is first removed from the unsaturated zone and any remaining potential evapotranspiration is applied to the saturated zone. If IVERTCON is greater than zero then residual potential evapotranspiration not satisfied in the UZF cell is applied to the underlying UZF and GWF cells. extinction_depth: array of floats (xr.DataArray, optional) defines the evapotranspiration extinction depth of the UZF cell. If IVERTCON is greater than zero and EXTDP extends below the GWF cell bottom then remaining potential evapotranspiration is applied to the underlying UZF and GWF cells. EXTDP is always specified, but is only used if SIMULATE ET is specified in the OPTIONS block. extinction_theta: array of floats (xr.DataArray, optional) defines the evapotranspiration extinction water content of the UZF cell. If specified, ET in the unsaturated zone will be simulated either as a function of the specified PET rate while the water content (THETA) is greater than the ET extinction water content air_entry_potential: array of floats (xr.DataArray, optional) defines the air entry potential (head) of the UZF cell. If specified, ET will be simulated using a capillary pressure based formulation. Capillary pressure is calculated using the Brooks-Corey retention function ("air_entry") root_potential: array of floats (xr.DataArray, optional) defines the root potential (head) of the UZF cell. If specified, ET will be simulated using a capillary pressure based formulation. Capillary pressure is calculated using the Brooks-Corey retention function ("air_entry" root_activity: array of floats (xr.DataArray, optional) defines the root activity function of the UZF cell. ROOTACT is the length of roots in a given volume of soil divided by that volume. Values range from 0 to about 3 cm-2, depending on the plant community and its stage of development. If specified, ET will be simulated using a capillary pressure based formulation. Capillary pressure is calculated using the Brooks-Corey retention function ("air_entry" groundwater_ET_function: ({"linear", "square"}, optional) keyword specifying that groundwater evapotranspiration will be simulated using either the original ET formulation of MODFLOW-2005 ("linear"). Or by assuming a constant ET rate for groundwater levels between land surface (TOP) and land surface minus the ET extinction depth (TOP-EXTDP) ("square"). In the latter case, groundwater ET is smoothly reduced from the PET rate to zero over a nominal interval at TOP-EXTDP. simulate_seepage: ({True, False}, optional) keyword specifying that groundwater discharge (GWSEEP) to land surface will be simulated. Groundwater discharge is nonzero when groundwater head is greater than land surface. print_input: ({True, False}, optional) keyword to indicate that the list of UZF information will be written to the listing file immediately after it is read. Default is False. print_flows: ({True, False}, optional) keyword to indicate that the list of UZF flow rates will be printed to the listing file for every stress period time step in which "BUDGET PRINT" is specified in Output Control. If there is no Output Control option and "PRINT FLOWS" is specified, then flow rates are printed for the last time step of each stress period. Default is False. save_flows: ({True, False}, optional) keyword to indicate that UZF flow terms will be written to the file specified with "BUDGET FILEOUT" in Output Control. Default is False. budget_fileout: ({"str"}, optional) path to output cbc-file for UZF budgets budgetcsv_fileout: ({"str"}, optional) path to output csv-file for summed budgets water_content_file: ({"str"}, optional) path to output file for unsaturated zone water content observations: [Not yet supported.] Default is None. water_mover: [Not yet supported.] Default is None. timeseries: [Not yet supported.] Default is None. TODO: We could allow the user to either use xarray DataArrays to specify BCS or use a pd.DataFrame and use the MF6 timeseries files to read input. The latter could save memory for laterally large-scale models, through efficient use of the UZF cell identifiers. validate: {True, False} Flag to indicate whether the package should be validated upon initialization. This raises a ValidationError if package input is provided in the wrong manner. Defaults to True. """ _period_data = ( "infiltration_rate", "et_pot", "extinction_depth", "extinction_theta", "air_entry_potential", "root_potential", "root_activity", ) _init_schemata = { "surface_depression_depth": [ DTypeSchema(np.floating), BOUNDARY_DIMS_SCHEMA, ], "kv_sat": [ DTypeSchema(np.floating), IndexesSchema(), CoordsSchema(("layer",)), BOUNDARY_DIMS_SCHEMA, ], "theta_res": [ DTypeSchema(np.floating), BOUNDARY_DIMS_SCHEMA, ], "theta_sat": [ DTypeSchema(np.floating), BOUNDARY_DIMS_SCHEMA, ], "theta_init": [ DTypeSchema(np.floating), BOUNDARY_DIMS_SCHEMA, ], "epsilon": [ DTypeSchema(np.floating), BOUNDARY_DIMS_SCHEMA, ], "infiltration_rate": [ DTypeSchema(np.floating), BOUNDARY_DIMS_SCHEMA, ], "et_pot": [ DTypeSchema(np.floating), BOUNDARY_DIMS_SCHEMA | DimsSchema(), # optional var ], "extinction_depth": [ DTypeSchema(np.floating), BOUNDARY_DIMS_SCHEMA | DimsSchema(), # optional var ], "extinction_theta": [ DTypeSchema(np.floating), BOUNDARY_DIMS_SCHEMA | DimsSchema(), # optional var ], "root_potential": [ DTypeSchema(np.floating), BOUNDARY_DIMS_SCHEMA | DimsSchema(), # optional var ], "root_activity": [ DTypeSchema(np.floating), BOUNDARY_DIMS_SCHEMA | DimsSchema(), # optional var ], "print_flows": [DTypeSchema(np.bool_), DimsSchema()], "save_flows": [DTypeSchema(np.bool_), DimsSchema()], } _write_schemata = { "kv_sat": [ AllValueSchema(">", 0.0), OtherCoordsSchema("idomain"), AllNoDataSchema(), # Check for all nan, can occur while clipping AllInsideNoDataSchema(other="idomain", is_other_notnull=(">", 0)), ], "surface_depression_depth": [IdentityNoDataSchema("kv_sat")], "theta_res": [IdentityNoDataSchema("kv_sat"), AllValueSchema(">=", 0.0)], "theta_sat": [IdentityNoDataSchema("kv_sat"), AllValueSchema(">=", 0.0)], "theta_init": [IdentityNoDataSchema("kv_sat"), AllValueSchema(">=", 0.0)], "epsilon": [IdentityNoDataSchema("kv_sat")], "infiltration_rate": [IdentityNoDataSchema("stress_period_active")], "et_pot": [IdentityNoDataSchema("stress_period_active")], "extinction_depth": [IdentityNoDataSchema("stress_period_active")], "extinction_theta": [IdentityNoDataSchema("stress_period_active")], "root_potential": [IdentityNoDataSchema("stress_period_active")], "root_activity": [IdentityNoDataSchema("stress_period_active")], } _package_data = ( "surface_depression_depth", "kv_sat", "theta_res", "theta_sat", "theta_init", "epsilon", ) _pkg_id = "uzf" _template = BoundaryCondition._initialize_template(_pkg_id)
[docs] @init_log_decorator() def __init__( self, surface_depression_depth, kv_sat, theta_res, theta_sat, theta_init, epsilon, infiltration_rate, et_pot=None, extinction_depth=None, extinction_theta=None, air_entry_potential=None, root_potential=None, root_activity=None, ntrailwaves=7, # Recommended in manual nwavesets=40, groundwater_ET_function=None, simulate_groundwater_seepage=False, print_input=False, print_flows=False, save_flows=False, budget_fileout=None, budgetcsv_fileout=None, water_content_file=None, observations=None, water_mover=None, timeseries=None, validate: bool = True, ): landflag = self._determine_landflag(kv_sat) iuzno = self._create_uzf_numbers(landflag) ivertcon = self._determine_vertical_connection(iuzno) stress_period_active = landflag.where(landflag == 1) dict_dataset = { # Package data "surface_depression_depth": surface_depression_depth, "kv_sat": kv_sat, "theta_res": theta_res, "theta_sat": theta_sat, "theta_init": theta_init, "epsilon": epsilon, # Stress period data "stress_period_active": stress_period_active, "infiltration_rate": infiltration_rate, "et_pot": et_pot, "extinction_depth": extinction_depth, "extinction_theta": extinction_theta, "air_entry_potential": air_entry_potential, "root_potential": root_potential, "root_activity": root_activity, # Dimensions "ntrailwaves": ntrailwaves, "nwavesets": nwavesets, # Options "groundwater_ET_function": groundwater_ET_function, "simulate_gwseep": simulate_groundwater_seepage, "print_input": print_input, "print_flows": print_flows, "save_flows": save_flows, "budget_fileout": budget_fileout, "budgetcsv_fileout": budgetcsv_fileout, "water_content_file": water_content_file, "observations": observations, "water_mover": water_mover, "timeseries": timeseries, # Additonal indices for Packagedata "landflag": landflag, "iuzno": iuzno, "ivertcon": ivertcon, } super().__init__(dict_dataset) self.dataset["iuzno"].name = "uzf_number" self._check_options( groundwater_ET_function, et_pot, extinction_depth, extinction_theta, air_entry_potential, root_potential, root_activity, ) self._validate_init_schemata(validate)
def fill_stress_perioddata(self): """Modflow6 requires something to be filled in the stress perioddata, even though the data is not used in the current configuration. Only an infiltration rate is required, the rest can be filled with dummy values if not provided. """ for var in self._period_data: if self.dataset[var].size == 1: # Prevent loading large arrays in memory if self.dataset[var].values[()] is None: if isinstance(self["infiltration_rate"], xu.UgridDataArray): self.dataset[var] = xu.full_like(self["infiltration_rate"], 0) else: self.dataset[var] = xr.full_like(self["infiltration_rate"], 0.0) else: raise ValueError("{} cannot be a scalar".format(var)) def _check_options( self, groundwater_ET_function, et_pot, extinction_depth, extinction_theta, air_entry_potential, root_potential, root_activity, ): simulate_et = [x is not None for x in [et_pot, extinction_depth]] unsat_etae = [ x is not None for x in [air_entry_potential, root_potential, root_activity] ] if all(simulate_et): self.dataset["simulate_et"] = True elif any(simulate_et): raise ValueError("To simulate ET, set both et_pot and extinction_depth") if extinction_theta is not None: self.dataset["unsat_etwc"] = True if all(unsat_etae): self.dataset["unsat_etae"] = True elif any(unsat_etae): raise ValueError( "To simulate ET with a capillary based formulation, set air_entry_potential, root_potential, and root_activity" ) if all(unsat_etae) and (extinction_theta is not None): raise ValueError( """Both capillary based formulation and water content based formulation set based on provided input data. Please provide either only extinction_theta or (air_entry_potential, root_potential, and root_activity)""" ) if groundwater_ET_function == "linear": self.dataset["linear_gwet"] = True elif groundwater_ET_function == "square": self.dataset["square_gwet"] = True elif groundwater_ET_function is None: pass else: raise ValueError( "Groundwater ET function should be either 'linear','square' or None" ) def _create_uzf_numbers(self, landflag): """Create unique UZF ID's. Inactive cells equal 0""" active_nodes = landflag.notnull().astype(np.int8) return np.nancumsum(active_nodes).reshape(landflag.shape) * active_nodes def _determine_landflag(self, kv_sat): """returns the landflag for uzf-model. Landflag == 1 for top active UZF-nodes""" land_nodes = get_upper_active_grid_cells(kv_sat).astype(np.int32) return land_nodes.where(kv_sat.notnull()) def _determine_vertical_connection(self, uzf_number): return uzf_number.shift(layer=-1, fill_value=0) def _package_data_to_sparse(self): notnull = self.dataset["landflag"].notnull().to_numpy() iuzno = self.dataset["iuzno"].values[notnull] landflag = self.dataset["landflag"].values[notnull] ivertcon = self.dataset["ivertcon"].values[notnull] ds = self.dataset[list(self._package_data)] layer = ds["layer"].values arrdict = self._ds_to_arrdict(ds) recarr = super()._to_struct_array(arrdict, layer) field_spec = self._get_field_spec_from_dtype(recarr) field_names = [i[0] for i in field_spec] n = 3 if isinstance(self.dataset, xu.UgridDataset): n = 2 index_spec = [("iuzno", np.int32)] + field_spec[:n] field_spec = ( [("landflag", np.int32)] + [("ivertcon", np.int32)] + field_spec[n:] ) sparse_dtype = np.dtype(index_spec + field_spec) recarr_new = np.empty(recarr.shape, dtype=sparse_dtype) recarr_new["iuzno"] = iuzno recarr_new["landflag"] = landflag recarr_new["ivertcon"] = ivertcon recarr_new[field_names] = recarr return recarr_new def render(self, directory, pkgname, globaltimes, binary): """Render fills in the template only, doesn't write binary data""" d = {} bin_ds = self.dataset[list(self._period_data)] d["periods"] = self._period_paths( directory, pkgname, globaltimes, bin_ds, binary=False ) not_options = ( list(self._period_data) + list(self._package_data) + ["iuzno" + "ivertcon"] ) d = self._get_options(d, not_options=not_options) path = directory / pkgname / f"{self._pkg_id}-pkgdata.dat" d["packagedata"] = path.as_posix() # max uzf-cells for which time period data will be supplied d["nuzfcells"] = np.count_nonzero(np.isfinite(d["landflag"])) return self._template.render(d) def _to_struct_array(self, arrdict, layer): """Convert from dense arrays to list based input, since the perioddata does not require cellids but iuzno, we hgave to override""" # TODO add pkgcheck that period table aligns # Get the number of valid values notnull = self.dataset["landflag"].values == 1 iuzno = self.dataset["iuzno"].values nrow = notnull.sum() # Define the numpy structured array dtype index_spec = [("iuzno", np.int32)] field_spec = [(key, np.float64) for key in arrdict] sparse_dtype = np.dtype(index_spec + field_spec) # Initialize the structured array recarr = np.empty(nrow, dtype=sparse_dtype) # Fill in the indices recarr["iuzno"] = iuzno[notnull] # Fill in the data for key, arr in arrdict.items(): recarr[key] = arr[notnull].astype(np.float64) return recarr def _validate(self, schemata, **kwargs): # Insert additional kwargs kwargs["kv_sat"] = self["kv_sat"] kwargs["stress_period_active"] = self["stress_period_active"] errors = super()._validate(schemata, **kwargs) return errors