Source code for simulator.nodes.DtnNode

# -*- coding: utf-8 -*-

from collections import defaultdict
from copy import deepcopy
import importlib
import pandas as pd
from simulator.core.DtnQueue import DtnQueue
from simulator.core.DtnCore import Simulable
from simulator.utils.DtnUtils import load_class_dynamically
from simulator.endpoints.DtnDefaultEndpoint import DtnDefaultEndpoint
from .DtnCgrNeighborManager import DtnCgrNeighborManager

[docs]class DtnNode(Simulable): def __init__(self, env, nid, props): super().__init__(env) # Initialize node properties self.nid = nid self.type = env.config['network'].nodes[nid].type self.alias = env.config['network'].nodes[nid].alias self.props = props # Initialize variables self.generators = {} # Map: {generator type: Generator class} self.queues = {} # Map: {neighbor id: DtnNeighborManager} self.neighbors = [] # List of neighbors as specified in config file self.radios = {} # Map: {radio id: Radio class} self.endpoints = {} # Map: {eid: Endpoint class} # Convergence layers. This is a map of map of maps of the following form # {neighbor id: duct id: induct/ouduct: DtnAbstractDuct subclass} self.ducts = defaultdict(lambda: defaultdict(dict)) # Queue to store all bundles that are waiting to be forwarded self.in_queue = DtnQueue(env) # Queue for the limbo self.limbo_queue = DtnQueue(env) # Create variables to store results self.dropped = [] def reset(self): # Reset node elements self.router.reset() self.selector.reset() for _, gen in self.generators.items(): gen.reset() for _, radio in self.radios.items(): radio.reset() @property def available_radios(self): return self.radios
[docs] def initialize(self): """ Initialize the node. Note that his can only be done after the connections have been created. """ self.mobility_model = self.env.mobility_models[self.props.mobility_model] # Initialize bundle generators for this node self.initialize_bundle_generators() # Initialize the band selector self.initialize_outduct_selector() # Initialize radios. Note: This must be done prior to initializing # the ducts since ducts require radios. Ducts are initialized in the # environment. self.initialize_radios() # Now that you have created everything, start the forward and limbo managers self.env.process(self.forward_manager()) self.env.process(self.limbo_manager())
def initialize_bundle_generators(self): # Initialize variables config = self.env.config # Get the list of generators for this node gens = config[self.type].generators # Instantiate generators dynamically based on class name for gen in gens: clazz = getattr(config[gen], 'class') module = importlib.import_module(f'simulator.generators.{clazz}') clazz = getattr(module, clazz) # Create the generator self.generators[gen] = clazz(self.env, self, config[gen]) # Initialize the generator self.generators[gen].initialize() def initialize_router(self): # Initialize variables config = self.env.config # Get type of router for this node router_type = config[self.type].router # Instantiate generators dynamically based on class name clazz = load_class_dynamically('simulator.routers', getattr(config[router_type], 'class')) self.router = clazz(self.env, self) # Initialize this router self.router.initialize() # If this router is not opportunistic, you are done if not self.router.opportunistic: return # If this is an opportunistic router, then a specialized queue # manager is needed. clazz = load_class_dynamically('simulator.nodes', config[router_type].manager) self.queues['opportunistic'] = clazz(self.env, self, config[router_type]) def initialize_outduct_selector(self): # Initialize variables config = self.env.config # Get type of router for this node selector_type = config[self.type].selector # Instantiate generators dynamically based on class name clazz = getattr(config[selector_type], 'class') module = importlib.import_module(f'simulator.selectors.{clazz}') clazz = getattr(module, clazz) self.selector = clazz(self.env, self) # Initialize this router self.selector.initialize() def initialize_radios(self): # Iterate over the radios for radio in self.props.radios: # Create the radio for this duct class_name = getattr(self.config[radio], 'class') clazz = load_class_dynamically('simulator.radios', class_name) # Get properties radio_props = dict(self.config[radio]) # Store the new radio self.radios[radio] = clazz(self.env, self) # Initialize the radio self.radios[radio].initialize(**radio_props) def initialize_neighbors_and_ducts(self): # Iterate over all neighbors for orig, neighbor in self.env.connections.keys(): # If this is not the right origin, continue if orig != self.nid: continue # Store this neighbor self.neighbors.append(neighbor) # Create and store the priority queue for this neighbor self.queues[neighbor] = DtnCgrNeighborManager(self.env, self, neighbor) # Get the neighbor node and the connection between them other = self.env.nodes[neighbor] conn = self.env.connections[self.nid, neighbor] # Iterate over defined ducts and create them for duct_id, duct_name in self.env.config[conn.type].ducts.items(): # Get the properties of this duct props = self.env.config[duct_name] # Initialize variables iduct, oduct = None, None # Create the induct and outduct for class_name in getattr(props, 'class'): # Load class type. Since you can't know if it is an induct or outduct, # try both. try: clazz = load_class_dynamically('simulator.ducts.inducts', class_name) except ModuleNotFoundError: clazz = load_class_dynamically('simulator.ducts.outducts', class_name) # Construct depending on whether it is an induct or outduct if clazz.duct_type == None: raise RuntimeError(f'{clazz} has no duct_type defined. Is it an induct or outduct?') elif clazz.duct_type == 'outduct': oduct = clazz(self.env, duct_id, self, neighbor) elif clazz.duct_type == 'induct': iduct = clazz(self.env, duct_id, other, orig) else: raise RuntimeError('f{clazz} has duct_type = {clazz.duct_type}. Valid options are "induct" and ' '"outduct"') # If either the ouduct or induct are not set, throw error if iduct == None or oduct == None: raise RuntimeError('Could not create duct f{duct_id}') # Store the newly created ducts self.ducts[neighbor][duct_id]['outduct'] = oduct other.ducts[orig][duct_id]['induct'] = iduct # Iterate over defined ducts and initialize them. This is done separately # since you need to have created all ducts to initialize them for ParallelLTP for duct_id, duct_name in self.env.config[conn.type].ducts.items(): # Get the properties of this duct props = self.env.config[duct_name] oduct = self.ducts[neighbor][duct_id]['outduct'] iduct = other.ducts[orig][duct_id]['induct'] # Initialize duct parameters. Initialization must happen after creating the ducts # since they must point to each other. oduct.initialize(iduct, **dict(props)) iduct.initialize(oduct, **dict(props)) # For now, assume that there is no need for an opportunistic queue. # If so, it will be initialized with the router. self.queues['opportunistic'] = None def initialize_endpoints(self): # Add any additional endpoints for eid, ept_class in self.props.endpoints.items(): # Handle special case for default endpoint if eid == 0: self.endpoints[eid] = DtnDefaultEndpoint(self.env, self) self.endpoints[eid].initialize() continue # Find the endpoint type clazz = load_class_dynamically('simulator.endpoints', ept_class) # Store the new endpoint self.endpoints[eid] = clazz(self.env, self) # Initialize endpoint if eid in self.config: self.endpoints[eid].initialize(**dict(self.config[eid])) else: self.endpoints[eid].initialize() def initialize_neighbor_managers(self): for neighbor, mgr in self.queues.items(): if neighbor.lower() == 'opportunistic': continue mgr.initialize()
[docs] def forward_manager(self): """ This agent pulls bundles from the node incoming queue for processing. It ensures that this happens one at a time following the order in which they are added to the queue. Note that both new bundles and bundles awaiting re-routers will be directed to the ``in_queue`` (see ``forward`` vs ``limbo``) """ # Iterate forever looking for new bundles to forward while self.is_alive: # Wait until there is a bundle to process item = yield from self.in_queue.get() # Depack item bundle, first_time = item[0], item[1] # Trigger forwarding mechanism. If you add a delay in the forwarding mechanism, # this delay will be preserved here. To have non-blocking behavior, use # ``self.env.process(self.process_bundle(item[0], first_time=item[1])`` self.process_bundle(bundle, first_time=first_time)
[docs] def process_bundle(self, bundle, first_time=True): """ Process this bundle in the node. This entails: 1) If this node is the destination, you are done 2) Otherwise, route the bundle 3) If no routes are available, drop 4) Otherwise, put the bundle into one or more neighbor queues to await transmission by the corresponding convergence layer :param bundle: The bundle to forward :param first_time: True if this is the first time this node sees this bundle """ # If this bundle has errors, drop immediately if bundle.has_errors: self.drop(bundle, 'error') return # It this bundle exceeds the TTL, drop if self.check_bundle_TTL(bundle): return # Add this node in the list of visited nodes (NOTE: must be done before ``find_routes``) if first_time: bundle.visited.append(self.nid) # Reset the list of excluded contacts if first_time: bundle.excluded = [] # If bundle has reached destination, simply store if bundle.dest == self.nid: self.arrive(bundle); return # Get contacts for this bundle records_to_fwd, cids_to_exclude = self.router.find_routes(bundle, first_time) # If router asks for limbo, send this bundle there if records_to_fwd == 'limbo': self.limbo(bundle, cids_to_exclude) return # If you can neither forward nor re-route, then drop if not records_to_fwd and not cids_to_exclude: self.drop(bundle, 'unroutable') return # If the router has indicated to drop, do it if records_to_fwd == 'drop': self.drop(bundle, 'router_drops') return for record in records_to_fwd: # Log this successful routers event self.disp('{} is routed towards {}', record.bundle, record.contact['dest']) # Get the record to forward. If critical and first time, deepcopy it to_fwd = deepcopy(record) if bundle.critical and first_time else record # Pass the bundle to the appropriate neighbor manager self.store_routed_bundle(to_fwd) # If you have forwarded at least once, you are done if records_to_fwd: return # Trigger re-routers excluding the appropriate contacts self.limbo(bundle, cids_to_exclude)
def store_routed_bundle(self, rt_record): # Get the neighbor node to send to neighbor = rt_record.neighbor # Put in the queue self.queues[neighbor].put(rt_record, rt_record.priority)
[docs] def forward_to_outduct(self, neighbor, rt_record): """ Called whenever a bundle successfully exits a DtnNeighborManager """ # Initialize variables bundle = rt_record.bundle # Get the outduct for this bundle duct = self.selector.select_duct(neighbor, bundle) # If bundle TTL is exceeded, do not forward if self.check_bundle_TTL(bundle): return # Put bundle in the convergence layer duct['outduct'].send(bundle)
[docs] def forward(self, bundle): """ Put a bundle in the queue of bundles to route. Forward should be used the first time that you route a bundle. For bundles that are being re-routed, use ``limbo`` instead .. Tip:: This function never blocks despite the ``yield from`` because the input queue has infinite capacity """ self.env.process(self.do_forward(bundle))
def do_forward(self, bundle): yield from self.in_queue.put((bundle, True))
[docs] def limbo(self, bundle, contact_ids): """ Put a bundle in the queue of bundles to route (this bundle is re-routed and thus this is equivalent to ION's limbo). Add the provided contacts as excluded since you already tried to send this bundle through them .. Tip:: This function never blocks despite the ``yield from`` because the input queue has infinite capacity """ # HACK: If contact_ids is None, try re-routing again if contact_ids is not None: if not isinstance(contact_ids, (list, tuple)): contact_ids = (contact_ids,) bundle.excluded.extend(contact_ids) self.env.process(self.do_limbo(bundle))
def do_limbo(self, bundle): # If you do not have a limbo wait finite, wait for a second here. # Otherwise you will try to re-route the bundle at the same instant # in time, thus creating an infinite loop if self.props.limbo_wait == float('inf'): yield self.env.timeout(1) # Add to the limbo queue yield from self.limbo_queue.put((bundle, False)) def limbo_manager(self): # Initialize variables dt = self.props.limbo_wait # if dt = inf, then you want to pause the limbo until there # is something in it check_empty = dt == float('inf') while self.is_alive: # Wait for a while. Only do this if you specified a rate at # which to pull from the queue if not check_empty: yield self.env.timeout(dt) # Get everything from the queue items = yield from self.limbo_queue.get_all(check_empty=check_empty) # Put all items in the input queue for item in items: yield from self.in_queue.put(item) def check_bundle_TTL(self, bundle): if self.t-bundle.creation_time < bundle.TTL: return False # Drop the bundle self.drop(bundle, f'TTL (t={self.t})') return True def arrive(self, bundle): # If node is not alive, delete if not self.is_alive: self.drop(bundle, 'dead_node') # Mark arrival self.disp('{} arrives at destination', bundle) bundle.arrived = True bundle.arrival_time = self.t bundle.latency = bundle.arrival_time - bundle.creation_time # Dispatch bundle to the appropriate endpoint depending on the bundle EID self.endpoints[bundle.eid].put(bundle) def drop(self, bundle, drop_reason): self.disp('{} is dropped at node {}', bundle, self.nid) bundle.dropped = True bundle.drop_reason = drop_reason self.dropped.append(bundle) def radio_error(self, message): self.disp('Error in radio') def __str__(self): return '<DtnNode {}>'.format(self.nid)