Source code for bin.main

#========================================================================
#=== SET THE PYTHON WORKING DIRECTORY TO THE SIMULATOR
#========================================================================

__version__ = 'R2019'
__release__ = 'R2019b'

from concurrent.futures import ProcessPoolExecutor
import gc
import os
from pathlib import Path
import pandas as pd
import traceback
import warnings

from simulator.utils.basic_utils import profileit, hdf5_store
from simulator.environments.DtnSimEnvironment import DtnSimEnviornment
from simulator.utils.DtnArgumentParser import get_argument_parser, process_arguments
from simulator.utils.DtnConfigParser import load_configuration_file, parse_configuration_dict
from simulator.utils.DtnIO import export_dtn_results
from simulator.utils.time_utils import sec2hms

#========================================================================
#=== MAIN FUNCTION TO SIMULATE
#========================================================================

[docs]def run_simulation(config=None, profile=False, return_env=False): """ **Main function to trigger a simulation**. :param dict config: Configuration structure from a YAML file. Use ``pyyaml`` to parse a configuration file. :param bool parallel: If False, the function returns 1) the simulation environment, 2) the results, 3) True/False if errors during the simulation validation process If True, then it only returns True or False (3). :param bool profile: If True, the simulation will be profiled. This is incompatible with parallel=True. .. Tip:: For parallel execution, the environment and results are not returned because they cannot be pickled. Therefore, the inter-process communication would fail. .. Tip:: If validate is true, then parallel and profile have no effect """ # Check inputs consistency if profile and return_env: raise ValueError("Parallel and profile cannot be both True") # If the configuration is provided, run the simulation if not config: # Otherwise, you need to use argument parser args = process_arguments() # Load configuration file config = load_configuration_file(args.configfile) # Run the simulation return _run_simulation(config, profile, return_env)
def _run_simulation(config, profile, return_env): """ Do not call directly. Use ``run_simulation`` """ # Ensure the configuration dictionary is ok config = parse_configuration_dict(config) # Create a simulation environment. From now on, ``config`` will be # a global variable available to everyone env = DtnSimEnviornment(config) # Initialize environment, create nodes/connections, start generators env.initialize() # Run simulation if profile: profileit(env.run) else: env.run() # Validate simulation ok = env.validate_simulation() # Create simulation outputs res = env.finalize_simulation() # Export the simulation results export_dtn_results(config, env) # Print the total simulation time print(env.end_msg.format(os.getpid(), env.sim_id, sec2hms(env.now))) # Delete environment to reset all static variables if not return_env: env.reset() del env gc.collect() # Return results return (env, res, ok) if return_env else ok #======================================================================== #=== MAIN FUNCTIONS TO DO A BATCH ANALYSIS #======================================================================== def _run_simulations(configs, ncpu=1): """ Run all simulations. Do not call directly. :param configs: List/tuple of dictionaries """ # If only one CPU, run in serial. Signal parallel execution to ensure # sim environment gets deleted between simulations and static variables # are not polluting your results if ncpu == 1: return [run_simulation(config=c) for c in configs] # Run in parallel with ProcessPoolExecutor(max_workers=ncpu) as p: futures = [p.submit(run_simulation, config=c) for c in configs] results = [f.result() for f in futures] return results
[docs]def run_simulations(input_file, build_inputs, ncpu=1, sheets=None, **kwargs): """ Run a batch of simulations. :param input_file: Initial config file path (as str or Path object) :param build_inputs: Function to build the inputs. It will be called internally as ``configs = build_inputs(d, **kwargs)`` where d is the dict obtained from ``load_configuration_file`` (i.e. load the YAML into dict and apply parsers to validate it). ``build_inputs`` **must** return a list/tuple of config dictionaries :param ncpu: Number of CPUs to use. Default is 1 (serial execution) :param sheets: See ``sheets`` parameter in ``merge_results`` :param **kwargs: Passed to ``build_inputs`` """ # Load basic configuration file d = load_configuration_file(input_file) # Build inputs configs = build_inputs(d, **kwargs) # Trigger computations _run_simulations(configs, ncpu=ncpu) # Get output directory outdir = Path(d['globals']['outdir']) outfile = outdir/Path('merged_' + d['globals']['outfile']) ext = outfile.suffix # Merge all results merge_results(outfile, outdir, ext=ext, sheets=sheets)
[docs]def merge_results(outfile, resdir, ext='.h5', sheets=None): """ Merge a set of simulation results into a single file containing all of them. The resulting file will be stored as an HDF5 file. :param outfile: File path (str or Path object) where the single file will be located :param resdir: Directory where the results to merge are located :param ext: Extension of the files in the ``resdir``. Options are '.h5' (default) or '.xlsx' :param sheets: List with the names of the tables to export. Defaults to ['sent', 'arrived']. Valid names can be found in the ``DtnAbstractReport`` and its subclasses. """ # Initialize variables outfile = Path(outfile) outfile = f'{outfile.parent/outfile.stem}.h5' # Get files to merge files = list(resdir.glob(f'./*{ext}')) # Select the function to read data if ext == '.xlsx': read_fun = lambda f, s: pd.read_excel(f, sheet_name=s) elif ext == '.h5': read_fun = lambda f, s: pd.read_hdf(f, key=s) else: print(f'Could not merge files. Extension {ext} is not valid') return # Get the default sheets to process if sheets is None: sheets = ['sent', 'arrived'] # Create HDF5 store with warnings.catch_warnings(): warnings.simplefilter("ignore") with hdf5_store(outfile, mode='w') as store: for sheet in sheets: print(f'[Merge Results] Processing Report "{sheet}"') try: store[sheet] = pd.concat({f.stem: read_fun(f, sheet).reset_index() for f in files}, names=['file']) except: traceback.print_exc()
#======================================================================== #=== CLI HELPER FUNCTIONS #======================================================================== def list_simulation_arguments(): # Print parser the options get_argument_parser().print_help() #======================================================================== #=== RUN SIMULATION USING COMMAND LINE ARGUMENTS #======================================================================== if __name__ == '__main__': run_simulation(profile=False)