Source code for simulator.environments.DtnSimEnvironment

# -*- coding: utf-8 -*-

from simulator.environments.DtnAbstractSimEnvironment import SimEnvironment
import numpy as np
import pandas as pd
import os
from pathlib import Path
import random
from simulator.utils.DtnUtils import load_class_dynamically
from warnings import warn

[docs]class DtnSimEnviornment(SimEnvironment): def reset(self): for node in self.nodes.values(): node.reset(); del node for con in self.connections.values(): del con for mbm in self.mobility_models.values(): del mbm for rpt in self.all_results.values(): del rpt print(self.del_msg.format(os.getpid(), self.sim_id)) def initialize(self): # De-pack useful global arguments self.epoch = self.config['scenario'].epoch self.seed = self.config['scenario'].seed # Initialize logger self.initialize_logger() # Set the seed for the random numbers self.set_simulation_seed() # Variable to store all results self.all_results = {} # Create all nodes self.nodes = {} for nid, node in self.config['network'].nodes.items(): # Initialize variables props = self.config[node.type] clazz = load_class_dynamically('simulator.nodes', getattr(props, 'class')) # Create node object self.nodes[nid] = clazz(self, nid, props) # Create all connections self.connections = {} for cid, con in self.config['network'].connections.items(): # Initialize variables props = self.config[con.type] o, d = con.origin, con.destination clazz = load_class_dynamically('simulator.connections', getattr(props, 'class')) # Create connections self.connections[o, d] = clazz(self, cid, o, d, props) self.connections[d, o] = clazz(self, cid, d, o, props) # Create the mobility model self.create_mobility_models() # Initialize all nodes. This can only be done after initializing the connections # because otherwise you can't know which ducts to create. for node in self.nodes.values(): node.initialize() # Initialize all ducts. This must be done after the initializing the # node because the radios must be up and running for node in self.nodes.values(): node.initialize_neighbors_and_ducts() # Initialize all connections. This must be done after initializing the ducts for o, d in self.connections: self.connections[o, d].initialize() self.connections[d, o].initialize() # Initialize all mobility models. This can only be done after initializing # the connections, since the mobility model might inherit info from them. self.initialize_mobility_models() # Complete connection initialization using info from mobility models for c in self.connections.values(): c.initialize_contacts_and_ranges() # Initialize all routers using info from mobility models. Also initialize # all endpoints using information from routers. Finally, initialize neighbor # manager with info from the mobility model for node in self.nodes.values(): node.initialize_router() node.initialize_endpoints() node.initialize_neighbor_managers() # Flag the reports that need to be generated self.reports = self.config['reports'].reports # Show initialization message print(self.init_msg.format(os.getpid(), self.sim_id, self.config_file, self.seed)) def create_mobility_models(self): # Initialize variables self.mobility_models = {} # Gather all models defined in YAML file models = {node.props.mobility_model for node in self.nodes.values()} models.update({c.props.mobility_model for c in self.connections.values()}) models = models - {None} # Iterate over models to initialize for model in models: # Find properties of this mobility model props = self.config[model] # Initialize class clazz = load_class_dynamically('simulator.mobility_models', getattr(props, 'class')) self.mobility_models[model] = clazz(self, props) def initialize_mobility_models(self): for model in self.mobility_models.values(): model.initialize() def set_simulation_seed(self): if self.seed is None: return np.random.seed(self.seed) random.seed(self.seed) def finalize_simulation(self, close_logger=True): # If the results are already available, return them if self.all_results: return self.all_results # Initialize variables self.all_results = {} # Collect all the reports for report in self.reports: # Create the report clazz = load_class_dynamically('simulator.reports', report, report) report = clazz(self) # If this report already exists, raise error if report.alias in self.all_results: continue # Store the report self.all_results[report.alias] = report.data # Compose and return result return self.all_results def validate_simulation(self): # Format validation portion of log file self.new_log_section(title='VALIDATION RESULTS') # Get the simulation results self.finalize_simulation(close_logger=False) # Perform validation process error = self._validate_sent() | \ self._validate_arrived() | \ self._validate_dropped() | \ self._validate_stored() | \ self._validate_lost() | \ self._validate_arrived_bundles() | \ self._validate_standard_data_volume() | \ self._validate_critical_data_volume() | \ self._validate_expected_data_volume() | \ self._validate_num_bundles_end() # Display warning if error detected #if error: warn('Simulation did not pass all tests. See log file for details') return (not error) def _validate_sent(self): # If the required report was not collected, skip if 'DtnSentBundlesReport' not in self.reports: return False # Perform check error = self.all_results['sent'].empty # Display error if error: self.error("No bundles were ever sent.", header=False) else: self.log("Sent bundles test successfully passed.", header=False) self.new_line() return error def _validate_arrived(self): # If the required report was not collected, skip if 'DtnArrivedBundlesReport' not in self.reports: return False # Perform check error = self.all_results['arrived'].empty # Display error if error: self.error("No bundles were received at destination.", header=False) else: self.log("Received bundles test successfully passed.", header=False) self.new_line() return error def _validate_dropped(self): # If the required report was not collected, skip if 'DtnDroppedBundlesReport' not in self.reports: return False # Get report data dropped = self.all_results['dropped'] # If no bundles dropped, pass test if dropped.empty: self.log("Dropped bundles test successfully passed.", header=False) self.new_line() return False # Find the non-critical data that was dropped v = dropped.loc[dropped.critical == False, :] self.error("'dropped' should be empty but holds:\n{}", v, header=False) self.new_line() return True def _validate_stored(self): # If the required report was not collected, skip if 'DtnStoredBundlesReport' not in self.reports: return False # Get report data stored = self.all_results['stored'] # If stored is not empty, pass test if stored.empty: self.log("Stored bundles test successfully passed.", header=False) self.new_line() return False # If stored is not empty, then error self.error("Some bundles are still stored in DTN nodes:\n{}", stored, header=False) self.new_line() return False def _validate_lost(self): # If the required report was not collected, skip if 'DtnConnLostBundlesReport' not in self.reports: return False # Get report data lost = self.all_results['lost'] # If stored is not empty, pass test if lost.empty: self.log('Lost bundles test successfully passed.', header=False) self.new_line() return False # If stored is not empty, then error self.error("'lost' should be empty by holds:\n{}", lost, header=False) self.new_line() return False def _validate_arrived_bundles(self): # If the required report was not collected, skip if 'DtnArrivedBundlesReport' not in self.reports or \ 'DtnSentBundlesReport' not in self.reports: return False # Get report data arrived = self.all_results['arrived'] sent = self.all_results['sent'] # Get the list of bids that were transmitted s_bids = set(sent.index.get_level_values('bid')) if not sent.empty else set() # Get the list of bids that were received a_bids = set(arrived.bid) if not arrived.empty else set() # Perform check - See how many bids never arrived diff = s_bids - a_bids error = bool(diff) # Display log message if error: self.error('Bundles {} do not arrive.', diff, header=False) else: self.log('All bundle Ids where accounted for.', header=False) self.new_line() return error def _validate_standard_data_volume(self): # If the required report was not collected, skip if 'DtnArrivedBundlesReport' not in self.reports or \ 'DtnSentBundlesReport' not in self.reports: return False # Get report data arrived = self.all_results['arrived'] sent = self.all_results['sent'] # If nothing was sent or received, error if sent.empty or arrived.empty: self.error('Non-critical data volume test skipped') return False # Eliminate critical data sent = sent.loc[sent.critical == False, :] arrived = arrived.loc[arrived.critical == False, :] # Compute transmitted and received data volume per flow tx_dv = sent.groupby(by='fid').data_vol.sum() rx_dv = arrived.groupby(by='fid').data_vol.sum() # Perform check. Inspired by numpy's "allclose" function atol, rtol = 1e-8, 1e-3 ok = np.abs(tx_dv.subtract(rx_dv, fill_value=0.0).values) <= (atol + rtol * np.abs(tx_dv.values)) error = not ok.ravel().all() # Check whether some flows had different transmitted and received data volume. If so, data is being lost during the simulation if error: self.error('The following non-critical flows received a data volume that is ' 'different from the transmitted data volume:', header=False) dv = pd.concat([tx_dv, rx_dv], axis=1).fillna(value=0.0) dv.columns = ['TxDataVolume', 'RxDataVolume'] self.log('{}', dv.loc[~ok.ravel(), :], header=False) else: self.log('Non-critical data volume test successfully passed.', header=False) self.new_line() return error def _validate_critical_data_volume(self): # If the required report was not collected, skip if 'DtnArrivedBundlesReport' not in self.reports or \ 'DtnSentBundlesReport' not in self.reports: return False # Get report data arrived = self.all_results['arrived'] sent = self.all_results['sent'] # If nothing was sent or received, error if sent.empty or arrived.empty: self.error('Critical data volume test skipped'); return True # Eliminate non-critical data sent = sent.loc[sent.critical == True, :] arrived = arrived.loc[arrived.critical == True, :] # Compute transmitted and received data volume per flow tx_dv = sent.groupby(by='fid').data_vol.sum() rx_dv = arrived.groupby(by='fid').data_vol.sum() # Compute the ration of rx_data_vol/tx_data_vol mult = 1.0 / tx_dv.divide(rx_dv, axis=0, fill_value=0.0) # Perform check. error = (mult < 1.0).any() # Display error message if necessary if error: self.error('The following critical flows did not receive all received data volume:', header=False) dv = pd.concat([tx_dv, rx_dv], axis=1) dv.columns = ['TxDataVolume', 'RxDataVolume'] err_dv = dv.loc[mult < 1.0, :].fillna(value=0.0) self.log('{}', err_dv, header=False) else: self.log('Critical data volume test successfully passed.', header=False) # Display informational message about critical data volume self.log('Informational data on critical data flows. Data volume multiplier:', header=False) data = {k: 'x{:.1f}'.format(v) for k, v in mult.to_dict().items()} data = pd.DataFrame.from_dict(data, orient='index').rename(columns={0:'Multiplier'}) self.log('{}', data, header=False) self.new_line() return False def _validate_expected_data_volume(self): # If the required report was not collected, skip if 'DtnSentBundlesReport' not in self.reports: return False # Initialize variables dv1, dv2 = {}, {} # Iterate over all generators in the simulation for _, node in self.nodes.items(): for gid, gen in node.generators.items(): dv1[gid] = gen.predicted_data_vol() dv2[gid] = gen.generated_data_vol() # Compute the total data volume in [Tbit] tdv1 = sum(dv1.values())/1e12 tdv2 = sum(dv2.values())/1e12 # Perform check error = not np.isclose(tdv1, tdv2) # Display error message if necessary if error: self.error('The predicted data volume is {:.6f}Tbit') self.error('The generated data volume is {:.6f}Tbit') self.error('They should match. Differences in flows are as follows:') self.error('Predicted: {}'.format(dv1)) self.error('Predicted: {}'.format(dv2)) else: self.log('Expected data volume test successfully passed.', header=False) # Finish test self.new_line() return error def _validate_num_bundles_end(self): # If the required report was not collected, skip if 'DtnArrivedBundlesReport' not in self.reports or \ 'DtnSentBundlesReport' not in self.reports or \ 'DtnConnLostBundlesReport' not in self.reports or \ 'DtnDroppedBundlesReport' not in self.reports or \ 'DtnStoredBundlesReport' not in self.reports: return False # Get report data arrived = self.all_results['arrived'] dropped = self.all_results['dropped'] stored = self.all_results['stored'] sent = self.all_results['sent'] lost = self.all_results['lost'] # Compute number of bundles n1 = sent.shape[0] # Compute number of bundles that arrive, are dropped, lost or stored n2 = arrived.shape[0] + dropped.shape[0] + \ lost.shape[0] + stored.shape[0] # Perform check (because of bundle fragmentation and critical routers # policy, the number of bundles at end will almost certainly be larger # that the number of bundles sent) error = n1 > n2 # Display error message if necessary if error: self.error('{:.0f} bundles were sent, but {:.0f} bundles were either dropped,' 'arrived, lost or stored. Where are the rest?', n1, n2, header=False) else: self.log('"Num. bundles at end" test successfully passed.', header=False) self.new_line() return error def __str__(self): return f'<DtnSimEnvironment t={self.now}>'