Source code for dfm_tools.modelbuilder

import os
import logging
import pandas as pd
import dfm_tools as dfmt
import datetime as dt
import xarray as xr
import platform
import hydrolib.core.dflowfm as hcdfm
from hydrolib.core.dimr.models import DIMR, FMComponent, Start
from dfm_tools.interpolate_grid2bnd import (ext_add_boundary_object_per_polyline,
                                            open_prepare_dataset,
                                            ds_apply_conversion_dict,
                                            _ds_sel_time_outside,
                                            )
from dfm_tools.xarray_helpers import interpolate_na_multidim

__all__ = [
    "constant_to_bc",
    "cmems_nc_to_bc",
    "cmems_nc_to_ini",
    "preprocess_merge_meteofiles_era5",
    "create_model_exec_files",
    "make_paths_relative",
    ]

logger = logging.getLogger(__name__)


def get_quantity_list(quantity):
    if quantity=='uxuyadvectionvelocitybnd': #T3Dvector
        quantity_list = ['ux','uy']
    elif isinstance(quantity, list):
        quantity_list = quantity
    else:
        quantity_list = [quantity]
    return quantity_list


def get_ncvarname(quantity, conversion_dict):
    # check existence of requested keys in conversion_dict
    if quantity not in conversion_dict.keys():
        raise KeyError(f"quantity '{quantity}' not in conversion_dict, (case sensitive) options are"
                       f": {str(list(conversion_dict.keys()))}")
    
    ncvarname = conversion_dict[quantity]['ncvarname']
    return ncvarname


[docs] def constant_to_bc(ext_new: hcdfm.ExtModel, file_pli, constant=0): """ Generate a boundary conditions file (.bc) with a constant waterlevel. This can be used to enforce a know offset from zero, for instance to account for sea level rise. """ quantity = "waterlevelbnd" # read polyfile as geodataframe polyfile_obj = hcdfm.PolyFile(file_pli) plinames_list = [x.metadata.name for x in polyfile_obj.objects] # generate constant forcingmodel object ForcingModel_object = hcdfm.ForcingModel() for pliname in plinames_list: locationname = f"{pliname}_0001" qup = [hcdfm.QuantityUnitPair(quantity=quantity, unit="m")] Constant_object = hcdfm.Constant( name=locationname, quantityunitpair=qup, datablock=[[constant]], ) ForcingModel_object.forcing.append(Constant_object) dir_output = os.path.dirname(file_pli) file_pli_name = polyfile_obj.filepath.stem file_bc_out = os.path.join(dir_output,f'{quantity}_constant_{file_pli_name}.bc') ForcingModel_object.save(filepath=file_bc_out) # generate hydrolib-core Boundary object to be appended to the ext file boundary_object = hcdfm.Boundary(quantity=quantity, locationfile=file_pli, forcingfile=ForcingModel_object) # add the boundary object to the ext file for each polyline in the polyfile ext_add_boundary_object_per_polyline(ext_new=ext_new, boundary_object=boundary_object) return ext_new
[docs] def cmems_nc_to_bc(ext_new, list_quantities, tstart, tstop, file_pli, dir_pattern, dir_output, conversion_dict=None, refdate_str=None): if conversion_dict is None: conversion_dict = dfmt.get_conversion_dict() for quantity in list_quantities: # loop over salinitybnd/uxuyadvectionvelocitybnd/etc print(f'processing quantity: {quantity}') quantity_list = get_quantity_list(quantity=quantity) for quantity_key in quantity_list: # loop over ux/uy ncvarname = get_ncvarname(quantity=quantity_key, conversion_dict=conversion_dict) dir_pattern_one = str(dir_pattern).format(ncvarname=ncvarname) #open regulargridDataset and do some basic stuff (time selection, renaming depth/lat/lon/varname, converting units, etc) data_xr_onevar = open_prepare_dataset(dir_pattern=dir_pattern_one, quantity=quantity_key, tstart=tstart, tstop=tstop, conversion_dict=conversion_dict, refdate_str=refdate_str) if quantity_key == quantity_list[0]: data_xr_vars = data_xr_onevar else: # only relevant in case of ux/uy, others all have only one quantity data_xr_vars[quantity_key] = data_xr_onevar[quantity_key] # interpolate regulargridDataset to plipointsDataset polyfile_obj = hcdfm.PolyFile(file_pli) gdf_points = dfmt.PolyFile_to_geodataframe_points(polyfile_object=polyfile_obj) data_interp = dfmt.interp_regularnc_to_plipointsDataset(data_xr_reg=data_xr_vars, gdf_points=gdf_points, load=True) #convert plipointsDataset to hydrolib ForcingModel ForcingModel_object = dfmt.plipointsDataset_to_ForcingModel(plipointsDataset=data_interp) # generate boundary object for the ext file (quantity, pli-filename, bc-filename) file_pli_name = polyfile_obj.filepath.stem file_bc_out = os.path.join(dir_output,f'{quantity}_CMEMS_{file_pli_name}.bc') ForcingModel_object.save(filepath=file_bc_out) boundary_object = hcdfm.Boundary(quantity=quantity, # locationfile is updated if multiple polylines in polyfile locationfile=file_pli, forcingfile=ForcingModel_object) # add the boundary object to the ext file for each polyline in the polyfile ext_add_boundary_object_per_polyline(ext_new=ext_new, boundary_object=boundary_object) return ext_new
[docs] def cmems_nc_to_ini( ext_old, dir_output, list_quantities, tstart, dir_pattern, conversion_dict=None, ): """ This makes quite specific 3D initial input files based on CMEMS data. delft3dfm is quite picky, so it works with CMEMS files because they have a depth variable with standard_name='depth'. If this is not present, or dimensions are ordered differently, there will be an error or incorrect model results. """ xr_kwargs = {"join":"exact", "data_vars":"minimal"} if conversion_dict is None: conversion_dict = dfmt.get_conversion_dict() tstart_pd = pd.Timestamp(tstart) tstart_str = tstart_pd.strftime("%Y-%m-%d_%H-%M-%S") # tstop_pd is slightly higher than tstart_pd to ensure >1 timesteps in all # cases tstop_pd = tstart_pd + pd.Timedelta(hours=1) for quan_bnd in list_quantities: if (quan_bnd != "salinitybnd") and not quan_bnd.startswith("tracer"): # quantities other than salinitybnd, temperaturebnd and tracer* are # silently skipped since they are also not supported by delft3dfm # temperaturebnd is handled with salinity # uxuyadvectionvelocitybnd is not supported print(f"quantity {quan_bnd} is not supported by cmems_nc_to_ini, " "silently skipped") continue ncvarname = get_ncvarname( quantity=quan_bnd, conversion_dict=conversion_dict, ) dir_pattern_one = dir_pattern.format(ncvarname=ncvarname) if quan_bnd=="salinitybnd": # this also handles temperaturebnd # 3D initialsalinity/initialtemperature fields are silently ignored # initial 3D conditions are only possible via nudging 1st timestep # via quantity=nudge_salinity_temperature data_xr = xr.open_mfdataset(dir_pattern_one, **xr_kwargs) ncvarname_tem = get_ncvarname( quantity="temperaturebnd", conversion_dict=conversion_dict, ) dir_pattern_tem = dir_pattern.format(ncvarname=ncvarname_tem) data_xr_tem = xr.open_mfdataset(dir_pattern_tem, **xr_kwargs) data_xr["thetao"] = data_xr_tem["thetao"] quantity = "nudge_salinity_temperature" varname = None elif quan_bnd.startswith("tracer"): data_xr = xr.open_mfdataset(dir_pattern_one, **xr_kwargs) data_xr = ds_apply_conversion_dict( data_xr=data_xr, conversion_dict=conversion_dict, quantity=quan_bnd, ) quantity = f'initial{quan_bnd.replace("bnd","")}' varname = quantity data_xr = data_xr.rename_vars({quan_bnd:quantity}) # subset two times. interp to tstart would be the proper way to do it, # but FM needs two timesteps for nudge_salinity_temperature and initial # waq vars data_xr = _ds_sel_time_outside( ds=data_xr, tstart=tstart_pd, tstop=tstop_pd, ) # assert that there are at least two timesteps in the resulting dataset # delft3dfm will crash if there is only one timestep assert len(data_xr.time) >= 2 # extrapolate data for all data_vars, first over depth, then over lat/lon for var in data_xr.data_vars: data_xr[var] = interpolate_na_multidim(data_xr[var], ["depth"]) data_xr[var] = interpolate_na_multidim(data_xr[var], ["latitude", "longitude"]) print('writing file') file_output = os.path.join(dir_output,f"{quantity}_{tstart_str}.nc") data_xr.to_netcdf(file_output) #append forcings to ext forcing_saltem = hcdfm.ExtOldForcing( quantity=quantity, varname=varname, filename=file_output, filetype=hcdfm.ExtOldFileType.NetCDFGridData, method=hcdfm.ExtOldMethod.InterpolateTimeAndSpaceSaveWeights, #3 operand=hcdfm.Operand.override, #O ) ext_old.forcing.append(forcing_saltem) return ext_old
[docs] def preprocess_merge_meteofiles_era5( ext_old, varkey_list, dir_data, dir_output, time_slice, ): """ Merge ERA5 data per variable for the requested time period. """ if not os.path.exists(dir_output): os.makedirs(dir_output) # TODO: align with variables_dict from dfmt.download_ERA5() dict_varkey_quantities = { 'ssr':'solarradiation', # 'sst':'sea_surface_temperature', 'strd':'longwaveradiation', # 'slhf':'surface_latent_heat_flux', # 'sshf':'surface_sensible_heat_flux', # 'str':'surface_net_thermal_radiation', 'chnk':'charnock', 'd2m':'dewpoint', 't2m':'airtemperature', 'tcc':'cloudiness', 'msl':'airpressure', 'u10':'windx', 'u10n':'windx', 'v10':'windy', 'v10n':'windy', 'mer':'rainfall_rate', 'mtpr':'rainfall_rate', 'rhoao':'airdensity', } for varkey in varkey_list: if isinstance(varkey, list): raise TypeError( "varkey_list should not contain lists, support was dropped in favour " "of supporting separate quantities. Provide a list of strings instead." ) if varkey not in dict_varkey_quantities.keys(): raise NotImplementedError( f"The varkey '{varkey}' is not supported yet by " "dfmt.preprocess_merge_meteofiles_era5(), please create a dfm_tools " "issue if you need this." ) fn_match_pattern = f'era5_{varkey}_*.nc' file_nc = os.path.join(dir_data, fn_match_pattern) ds = dfmt.merge_meteofiles( file_nc=file_nc, time_slice=time_slice, preprocess=dfmt.preprocess_ERA5, ) # check if the variable is present in merged netcdf. This could go wrong for # avg_tprate and avg_ie that are renamed to mtpr and mer in preprocess_ERA5 if varkey not in ds: raise KeyError( f"Requested variable ({varkey}) is not present in the " f"merged dataset ({list(ds.data_vars)})." ) # write to netcdf file print('>> writing file (can take a while): ',end='') dtstart = dt.datetime.now() times_pd = ds['time'].to_series() time_start_str = times_pd.iloc[0].strftime("%Y%m%d") time_stop_str = times_pd.iloc[-1].strftime("%Y%m%d") file_out = os.path.join( dir_output, f'era5_{varkey}_{time_start_str}to{time_stop_str}_ERA5.nc' ) ds.to_netcdf(file_out) print(f'{(dt.datetime.now()-dtstart).total_seconds():.2f} sec') quantity = dict_varkey_quantities[varkey] current_ext_quantities = [x.quantity for x in ext_old.forcing] operand = hcdfm.Operand.override # O if quantity in current_ext_quantities: logger.info( f"quantity {quantity} already found in ext file, using operand=+" ) operand = hcdfm.Operand.add # + forcing_meteo = hcdfm.ExtOldForcing( quantity=quantity, filename=file_out, varname=varkey, filetype=hcdfm.ExtOldFileType.NetCDFGridData, #11 method=hcdfm.ExtOldMethod.InterpolateTimeAndSpaceSaveWeights, #3 operand=operand, ) ext_old.forcing.append(forcing_meteo) return ext_old
[docs] def create_model_exec_files(file_mdu, nproc=1, dimrset_folder=None, path_style=None): """ creates a dimr_config.xml and if desired a batfile to run the model """ if not os.path.isfile(file_mdu): raise FileNotFoundError(f"file_mdu not found: {file_mdu}") dirname = os.path.dirname(file_mdu) mdu_name = os.path.basename(file_mdu) file_dimr = os.path.join(dirname,'dimr_config.xml') dimr_name = os.path.basename(file_dimr) # generate dimr_config.xml fm_modelname = "DFlowFM" control_comp = Start(name=fm_modelname) fm_comp = FMComponent(name=fm_modelname, workingDir='.', inputfile=mdu_name, process=nproc, mpiCommunicator="DFM_COMM_DFMWORLD") dimr_model = DIMR(control=control_comp, component=fm_comp) print(f"writing {dimr_name}") dimr_model.save(file_dimr) if dimrset_folder is None: print("no dimrset_folder provided, cannot write bat/sh file") return elif dimrset_folder=='docker': generate_docker_file(dimr_model=dimr_model) return # continue with dimrset_folder which is not None or 'docker' if path_style is None: # set the system name as the path_style (lowercase) path_style = platform.system().lower() if path_style == 'windows': generate_bat_file(dimr_model=dimr_model, dimrset_folder=dimrset_folder) else: logger.warning(f"path_style/os {path_style} not yet supported by `dfmt.create_model_exec_files()`, no bat/sh file is written")
def generate_bat_file(dimr_model, dimrset_folder=None): """ generate bat file for running on windows """ if dimr_model.filepath is None: raise Exception('first save the dimr_model before passing it to generate_bat_file') dirname = os.path.dirname(dimr_model.filepath) file_bat = os.path.join(dirname, "run_parallel.bat") bat_name = os.path.basename(file_bat) dimr_name = os.path.basename(dimr_model.filepath) mdu_name = os.path.basename(dimr_model.component[0].inputFile) nproc = dimr_model.component[0].process if not os.path.exists(dimrset_folder): raise FileNotFoundError(f"dimrset_folder not found: {dimrset_folder}") bat_str = fr"""rem User input set dimrset_folder="{dimrset_folder}" set MDU_file="{mdu_name}" set partitions={nproc} rem Partition the network and mdu call %dimrset_folder%\x64\bin\run_dflowfm.bat "--partition:ndomains=%partitions%:icgsolver=6" %MDU_file% rem Execute the simulation call %dimrset_folder%\x64\bin\run_dimr_parallel.bat %partitions% {dimr_name} rem To prevent the DOS box from disappearing immediately: enable pause on the following line pause """ print(f"writing {bat_name}") with open(file_bat,'w') as f: f.write(bat_str) def generate_docker_file(dimr_model): """ generate run_model.sh file for running on windows or unix with docker """ if dimr_model.filepath is None: raise Exception('first save the dimr_model before passing it to generate_bat_file') dirname = os.path.dirname(dimr_model.filepath) file_docker = os.path.join(dirname, "run_model.sh") docker_name = os.path.basename(file_docker) dimr_name = os.path.basename(dimr_model.filepath) mdu_name = os.path.basename(dimr_model.component[0].inputFile) nproc = dimr_model.component[0].process docker_str = fr"""#!/bin/bash # To start DIMR, execute this script # HOW TO RUN A MODEL WITH DOCKER (from delft3dfm 2025.02) # Create a MyDeltares account at containers.deltares.nl # Request access to the Delft3D Docker repository on Harbor via black-ops@deltares.nl # Get your CLI secret from your account settings at containers.deltares.nl # `docker login containers.deltares.nl` with your MyDeltares email address and CLI secret as credentials # `docker pull containers.deltares.nl/delft3d/delft3dfm:release-2025.02` # Run this run_model.sh script with docker via: # docker run -v "[absolute_path_to_model_folder]:/data" --shm-size 4G -it containers.deltares.nl/delft3d/delft3dfm:release-2025.02 /data/run_model.sh # stop after an error occured set -e # set number of partitions nPart={nproc} # DIMR input-file; must already exist! dimrFile={dimr_name} # Folder with the MDU file, relative to the location of this script mduFolder=. # Replace number of processes in DIMR file PROCESSSTR="$(seq -s " " 0 $((nPart-1)))" sed -i "s/\(<process.*>\)[^<>]*\(<\/process.*\)/\1$PROCESSSTR\2/" $dimrFile # Read MDU file from DIMR-file # mduFile="$(sed -n 's/\r//; s/<inputFile>\(.*\).mdu<\/inputFile>/\1/p' $dimrFile)".mdu mduFile={mdu_name} if [ "$nPart" == "1" ]; then run_dimr.sh -m $dimrFile else pushd $mduFolder run_dflowfm.sh --partition:ndomains=$nPart:icgsolver=6 $mduFile popd run_dimr.sh -c $nPart -m $dimrFile fi """ print(f"writing {docker_name}") # run_model.sh requires unix file endings, so we use newline='\n' with open(file_docker, 'w', newline='\n') as f: f.write(docker_str)
[docs] def make_paths_relative(mdu_file:str): """ Making paths on windows and unix relative by removing the dir_model prefix from paths in the mdu and ext files This is a temporary workaround until implemented in https://github.com/Deltares/HYDROLIB-core/issues/532 Parameters ---------- mdu_file : str path to mdu_file. Returns ------- None. """ dir_model = os.path.dirname(mdu_file) mdu_existing = hcdfm.FMModel(mdu_file, recurse=False) file_list = [mdu_file] ext_old = mdu_existing.external_forcing.extforcefile if ext_old is not None: file_list.append(ext_old.filepath) ext_new = mdu_existing.external_forcing.extforcefilenew if ext_new is not None: file_list.append(ext_new.filepath) for filename in file_list: if filename is None: continue with open(filename, 'r') as file: filedata = file.read() filedata = filedata.replace(dir_model.replace('\\','/')+'/', '') with open(filename, 'w') as file: file.write(filedata)