Source code for imod.msw.model

import collections
from copy import copy
from datetime import datetime
from pathlib import Path
from typing import Any, Optional, Tuple, Union, cast

import jinja2
import numpy as np
import xarray as xr

from imod.mf6.dis import StructuredDiscretization
from imod.mf6.mf6_wel_adapter import Mf6Wel
from imod.mf6.utilities.regrid import RegridderWeightsCache
from imod.msw.copy_files import FileCopier
from imod.msw.coupler_mapping import CouplerMapping
from imod.msw.grid_data import GridData
from imod.msw.idf_mapping import IdfMapping
from imod.msw.infiltration import Infiltration
from imod.msw.initial_conditions import (
    InitialConditionsEquilibrium,
    InitialConditionsPercolation,
    InitialConditionsRootzonePressureHead,
    InitialConditionsSavedState,
)
from imod.msw.landuse import LanduseOptions
from imod.msw.meteo_grid import MeteoGrid, MeteoGridCopy
from imod.msw.meteo_mapping import EvapotranspirationMapping, PrecipitationMapping
from imod.msw.output_control import TimeOutputControl
from imod.msw.ponding import Ponding
from imod.msw.scaling_factors import ScalingFactors
from imod.msw.sprinkling import Sprinkling
from imod.msw.timeutil import to_metaswap_timeformat
from imod.msw.utilities.common import find_in_file_list
from imod.msw.utilities.imod5_converter import has_active_scaling_factor
from imod.msw.utilities.mask import MaskValues, mask_and_broadcast_cap_data
from imod.msw.utilities.parse import read_para_sim
from imod.msw.utilities.select import drop_layer_dim_cap_data
from imod.msw.vegetation import AnnualCropFactors
from imod.typing import Imod5DataDict
from imod.util.regrid_method_type import RegridderType

REQUIRED_PACKAGES = (
    GridData,
    CouplerMapping,
    Infiltration,
    LanduseOptions,
    MeteoGrid,
    EvapotranspirationMapping,
    PrecipitationMapping,
    IdfMapping,
    TimeOutputControl,
    AnnualCropFactors,
)

INITIAL_CONDITIONS_PACKAGES = (
    InitialConditionsEquilibrium,
    InitialConditionsPercolation,
    InitialConditionsRootzonePressureHead,
    InitialConditionsSavedState,
)

DEFAULT_SETTINGS = {
    "vegetation_mdl": 1,
    "evapotranspiration_mdl": 1,
    "saltstress_mdl": 0,
    "surfacewater_mdl": 0,
    "infilimsat_opt": 0,
    "netcdf_per": 0,
    "postmsw_opt": 0,
    "dtgw": 1.0,
    "dtsw": 1.0,
    "ipstep": 2,
    "nxlvage_dim": 366,
    "co2": 404.32,
    "fact_beta2": 1.0,
    "rcsoil": 0.15,
    "iterur1": 3,
    "iterur2": 5,
    "tdbgsm": 91.0,
    "tdedsm": 270.0,
    "clocktime": 0,
}


class Model(collections.UserDict):
    def __setitem__(self, key, value):
        # TODO: Add packagecheck
        super().__setitem__(key, value)

    def update(self, *args, **kwargs):
        for k, v in dict(*args, **kwargs).items():
            self[k] = v


[docs] class MetaSwapModel(Model): """ Contains data and writes consistent model input files Parameters ---------- unsaturated_database: Path-like or str Path to the MetaSWAP soil physical database folder. settings: dict """ _pkg_id = "model" _file_name = "para_sim.inp" _template = jinja2.Template( "{%for setting, value in settings.items()%}" "{{setting}} = {{value}}\n" "{%endfor%}" )
[docs] def __init__( self, unsaturated_database: Path | str, settings: Optional[dict[str, Any]] = None, ): super().__init__() if settings is None: self.simulation_settings = copy(DEFAULT_SETTINGS) else: self.simulation_settings = settings self.simulation_settings["unsa_svat_path"] = ( self._render_unsaturated_database_path(unsaturated_database) )
def _render_unsaturated_database_path(self, unsaturated_database: Union[str, Path]): # Force to Path object unsaturated_database = Path(unsaturated_database) # Render to string for MetaSWAP if unsaturated_database.is_absolute(): return f'"{unsaturated_database}\\"' else: # TODO: Test if this is how MetaSWAP accepts relative paths return f'"${unsaturated_database}\\"' def _check_required_packages(self): pkg_types_included = {type(pkg) for pkg in self.values()} missing_packages = set(REQUIRED_PACKAGES) - pkg_types_included if len(missing_packages) > 0: raise ValueError( f"Missing the following required packages: {missing_packages}" ) initial_condition_set = pkg_types_included & set(INITIAL_CONDITIONS_PACKAGES) if len(initial_condition_set) < 1: raise ValueError( "Missing InitialCondition package, assign one of " f"{INITIAL_CONDITIONS_PACKAGES}" ) elif len(initial_condition_set) > 1: raise ValueError( "Multiple InitialConditions assigned, choose one of " f"{initial_condition_set}" ) def _check_landuse_indices_in_lookup_options(self): grid_key = self._get_pkg_key(GridData) landuse_options_key = self._get_pkg_key(LanduseOptions) indices_in_grid = set(self[grid_key]["landuse"].values.ravel()) indices_in_options = set( self[landuse_options_key].dataset.coords["landuse_index"].values ) missing_indices = indices_in_grid - indices_in_options if len(missing_indices) > 0: raise ValueError( "Found the following landuse indices in GridData which " f"were not in LanduseOptions: {missing_indices}" ) def _check_vegetation_indices_in_annual_crop_factors(self): landuse_options_key = self._get_pkg_key(LanduseOptions) annual_crop_factors_key = self._get_pkg_key(AnnualCropFactors) indices_in_options = set( np.unique(self[landuse_options_key]["vegetation_index"]) ) indices_in_crop_factors = set( self[annual_crop_factors_key].dataset.coords["vegetation_index"].values ) missing_indices = indices_in_options - indices_in_crop_factors if len(missing_indices) > 0: raise ValueError( "Found the following vegetation indices in LanduseOptions " f"which were not in AnnualCropGrowth: {missing_indices}" ) def _get_starttime(self): """ Loop over all packages to get the minimum time. MetaSWAP requires a starttime in its simulation settings (para_sim.inp) """ starttimes = [] for pkgname in self: ds = self[pkgname].dataset if "time" in ds.coords: starttimes.append(ds["time"].min().values) starttime = min(starttimes) year, time_since_start_year = to_metaswap_timeformat([starttime]) year = int(year.values) time_since_start_year = float(time_since_start_year.values) return year, time_since_start_year def _get_pkg_key(self, pkg_type: type, optional_package: bool = False): for pkg_key, pkg in self.items(): if isinstance(pkg, pkg_type): return pkg_key if not optional_package: raise KeyError(f"Could not find package of type: {pkg_type}")
[docs] def write( self, directory: Union[str, Path], mf6_dis: StructuredDiscretization, mf6_wel: Mf6Wel, validate: bool = True, ): """ Write packages and simulation settings (para_sim.inp). Parameters ---------- directory: Path or str directory to write model in. """ # Model checks if validate: self._check_required_packages() self._check_vegetation_indices_in_annual_crop_factors() self._check_landuse_indices_in_lookup_options() # Force to Path directory = Path(directory) directory.mkdir(exist_ok=True, parents=True) # Add time settings year, time_since_start_year = self._get_starttime() self.simulation_settings["iybg"] = year self.simulation_settings["tdbg"] = time_since_start_year # Add IdfMapping settings idf_key = self._get_pkg_key(IdfMapping) self.simulation_settings.update(self[idf_key].get_output_settings()) filename = directory / self._file_name with open(filename, "w") as f: rendered = self._template.render(settings=self.simulation_settings) f.write(rendered) # Get index and svat grid_key = self._get_pkg_key(GridData) index, svat = self[grid_key].generate_index_array() # write package contents for pkgname in self: self[pkgname].write(directory, index, svat, mf6_dis, mf6_wel)
[docs] def regrid_like( self, mf6_regridded_dis: StructuredDiscretization, regrid_context: Optional[RegridderWeightsCache] = None, regridder_types: Optional[dict[str, Tuple[RegridderType, str]]] = None, ) -> "MetaSwapModel": unsat_database = cast(str, self.simulation_settings["unsa_svat_path"]) regridded_model = MetaSwapModel(unsat_database) target_grid = mf6_regridded_dis["idomain"] mod2svat_name = None for pkgname in self: msw_package = self[pkgname] if isinstance(msw_package, CouplerMapping): # there can be only one couplermapping mod2svat_name = pkgname elif msw_package.is_regridding_supported(): regridded_package = msw_package.regrid_like( target_grid, regrid_context, regridder_types ) else: raise ValueError(f"package {pkgname} cannot be regridded") regridded_model[pkgname] = regridded_package if mod2svat_name is not None: regridded_model[mod2svat_name] = CouplerMapping() return regridded_model
[docs] @classmethod def from_imod5_data( cls, imod5_data: Imod5DataDict, target_dis: StructuredDiscretization, times: list[datetime], ): """ Construct a MetaSWAP model from iMOD5 data in the CAP package, loaded with the :func:`imod.formats.prj.open_projectfile_data` function. Parameters ---------- imod5_data: dict Dictionary with iMOD5 data. This can be constructed from the :func:`imod.formats.prj.open_projectfile_data` method. target_dis: imod.mf6.StructuredDiscretization Target discretization, cells where MODLOW6 is inactive will be inactive in MetaSWAP as well. times: list[datetime] List of datetimes, will be used to set the output control times. Is also used to infer the starttime of the simulation. Returns ------- MetaSwapModel MetaSWAP model imported from imod5 data. """ extra_paths = imod5_data["extra"]["paths"] path_to_parasim = find_in_file_list("para_sim.inp", extra_paths) parasim_settings = read_para_sim(path_to_parasim) unsa_svat_path = cast(str, parasim_settings["unsa_svat_path"]) # Drop layer coord imod5_cap_no_layer = drop_layer_dim_cap_data(imod5_data) model = cls(unsa_svat_path, parasim_settings) model["grid"], msw_active = GridData.from_imod5_data( imod5_cap_no_layer, target_dis ) cap_data_masked = mask_and_broadcast_cap_data( imod5_cap_no_layer["cap"], msw_active ) imod5_masked: Imod5DataDict = { "cap": cap_data_masked, "extra": {"paths": extra_paths}, } model["infiltration"] = Infiltration.from_imod5_data(imod5_masked) model["ponding"] = Ponding.from_imod5_data(imod5_masked) model["sprinkling"] = Sprinkling.from_imod5_data(imod5_masked) model["meteo_grid"] = MeteoGridCopy.from_imod5_data(imod5_masked) model["prec_mapping"] = PrecipitationMapping.from_imod5_data(imod5_masked) model["evt_mapping"] = EvapotranspirationMapping.from_imod5_data(imod5_masked) if has_active_scaling_factor(imod5_cap_no_layer["cap"]): model["scaling_factor"] = ScalingFactors.from_imod5_data(imod5_masked) area = model["grid"]["area"].isel(subunit=0, drop=True) model["idf_mapping"] = IdfMapping(area, MaskValues.default) model["coupling"] = CouplerMapping() model["extra_files"] = FileCopier.from_imod5_data(imod5_masked) times_da = xr.DataArray(times, coords={"time": times}, dims=("time",)) model["time_oc"] = TimeOutputControl(times_da) return model