Source code for simulator.connections.DtnScheduledBroadcastConnection

"""
# ==================================================================================
# Author: Marc Sanchez Net
# Date:   03/14/2019
# Copyright (c) 2019, Jet Propulsion Laboratory.
# ==================================================================================
"""

from collections import defaultdict
from copy import deepcopy
import numpy as np
from simulator.connections.DtnAbstractConnection import DtnAbstractConnection, TransmissionError
from uuid import uuid1
from warnings import warn

from simulator.core.DtnSemaphore import DtnSemaphore

# Dictionary with all broadcast connection instances
# {origin: DtnScheduledBroadcastConnection}
_instances = {}

[docs]class DtnScheduledBroadcastConnection(DtnAbstractConnection): """ A scheduled connection that, if in view at the same time, propagates data to one or multiple destinations. This class manages internally all connections from an origin to one or multiple destinations. Therefore, it re-implements __new__ to return an already existing instance if you request an already existing broadcast connection """ def __new__(cls, env, cid, orig, *args): # If you have an instance for this origin already, # return it if orig in _instances: return _instances[orig] # Create new instance instance = super().__new__(cls) # Store new instance _instances[orig] = instance return instance def __init__(self, env, cid, orig, dest, props): # Call parent constructor super(DtnScheduledBroadcastConnection, self).__init__(env, cid, orig, dest, props) # You need a semaphore per potential neighbor self.active = {n: False for n in env.nodes} # Map of messages in transit {dest: set(uuid1, uuid2, ...)} self.in_transit = defaultdict(set) # Flags to only initialize once self.initialized = False self.initialized_contacts = False def initialize(self, **kwargs): # If this connection is already running, skip if self.initialized: return # Call parent initializer super(DtnScheduledBroadcastConnection, self).initialize() # Mark as initialized self.initialized = True def initialize_contacts_and_ranges(self): # If this connection is already running, skip if self.initialized_contacts: return # Call parent super(DtnScheduledBroadcastConnection, self).initialize_contacts_and_ranges() # Initialize variables db = self.mobility_model.contacts_df # Get a copy of the contact plan for this contact cp = db.loc[db.orig == self.orig.nid].copy() # If no range intervals are found, invert orig, dest since connection is assumed # to be symmetric if cp.empty: cp = db.loc[db.dest == self.orig.nid] cp.rename({'orig':'dest', 'dest':'orig'}, axis=1, inplace=True) # If no range intervals are available at this point, exit if cp.empty: return # Sort range intervals cp = cp.sort_values(by=['tstart', 'tend']) # Drop rows with no duration self.contact_plan = cp[cp.duration != 0.0] # Create the broadcast intervals self.process_broacast_opportunities() # Mark the connection as initialized self.initialized_contacts = True def process_broacast_opportunities(self): # Initialize variables time = [] contacts = [] # Transform for fast processing df = self.contact_plan.copy().to_dict(orient='list') df['index'] = self.contact_plan.index.tolist() # Create list of with the cids active at any point in time while not np.isnan(df['tstart']).all() or not np.isnan(df['tend']).all(): # Get the index for the next contact start. Catch the case where # you are just processing contact ends try: ts_idx = np.nanargmin(df['tstart']) except ValueError: ts_idx = np.nan # Get the index for the next contact end te_idx = np.nanargmin(df['tend']) # Grab the list of current contacts in view inview = set() if len(contacts) == 0 else deepcopy(contacts[-1]) # Next event is contact end if np.isnan(ts_idx) or df['tstart'][ts_idx] > df['tend'][te_idx]: # Next event is a contact end time.append(df['tend'][te_idx]) # Remove the contact that ends contacts.append(inview - {df['index'][te_idx]}) # Set this event to NaN df['tend'][te_idx] = np.nan continue # Record the event time time.append(df['tstart'][ts_idx]) # Add a new entry contacts.append(inview | {df['index'][ts_idx]}) # Set this event to NaN df['tstart'][ts_idx] = np.nan # Initialize variables prev_t = -1 to_remove = np.zeros(len(time), dtype=bool) # Figure out where duplicates are located for i, t in enumerate(time): if t > prev_t: prev_t = t continue to_remove[i - 1] = True # Eliminate duplicates self.time = np.array(time)[~to_remove] self.contacts = np.array(contacts)[~to_remove] # If empty, issue warnings if len(self.time) != len(self.contacts): warn(f'Broadcast connection for {self.orig.nid} is wrong') if len(self.time) == 0: warn(f'Broadcast connection for {self.orig.nid} is empty') def run(self): # If you do not have a destination, return if len(self.time) == 0: yield self.env.exit() # If no contact information is available, return if len(self.contacts) == 0: yield self.env.exit() # Iterate over list of active contacts for t, cts in zip(self.time, self.contacts): # Wait until the next event yield self.env.timeout(max(0.0, t-self.t)) # If at this point in time no one is in view, close if len(cts) == 0: self.current_contacts = {} self.current_dests = {} self.in_transit = defaultdict(set) self.close_connection() continue # Store current contacts self.current_contacts = cts self.current_dests = {self.contact_plan.dest[cid] for cid in cts} # Eliminate all messages that were still in transit. This happens because # a message might not be completely delivered before the state of the # connection changes. See ``self.tx_to_neighbor`` ended = set(self.in_transit.keys()) - self.current_dests for dest in ended: self.in_transit[dest] = set() # Open the connection self.open_connection() def set_contact_properties(self): # Initialize variables cp = self.contact_plan # Save connection properties self.prop_delay = {cp.dest[cid]: cp.range[cid] for cid in self.current_contacts} def do_transmit(self, peer_duct, message, BER, direction): # Hack to transform this function to a generator yield self.env.timeout(0) # Initialize variables valid_ducts = [] # If the peer duct's parent is not the list of current destinations, then this message # is effectively lost since all routers will discard it. if peer_duct.parent.nid not in self.current_dests: self.lost.append(message) # Find all ducts where this message should be delivered for dest in self.current_dests: # Get the ducts for this destination towards this node ducts = self.env.nodes[dest].ducts[self.orig.nid] # If more than one duct, throw error. This is not allowed because you # don't have a criteria to choose between them if len(ducts) > 1: raise RuntimeError('Only one duct allowed in DtnScheduledBroadcastConnection') # Get the duct id duct_id = list(ducts.keys())[0] # Get another peer duct peer_duct2 = ducts[duct_id]['induct'] if direction == 'fwd' else ducts[duct_id]['outduct'] # Add duct to list of valid ducts valid_ducts.append(peer_duct2) # Log start of transmission self.disp('{} starts being propagated', message) # Monitor the start of transmission self.monitor_tx_start(message) # Create a new UUID for this message m_uuid = uuid1() # Put the messages in transit for duct in valid_ducts: self.in_transit[duct.parent.nid].add(m_uuid) self.env.process(self.tx_to_neighbor(m_uuid, message, duct, BER, direction)) def tx_to_neighbor(self, m_uuid, message, duct, BER, direction): # The duct's parent is the destination of this connection. # (duct.neighbor == self.orig) dest = duct.parent.nid # Do the actual transmission (This is a blocking call) try: yield from self.propagate(message, dest=dest) except TransmissionError: pass # If the uuid is not in the in-transit map, this message got lost if m_uuid not in self.in_transit[dest]: self.lost.append(message) return # Monitor end of transmission self.monitor_tx_end(message) # Remove the record for this message self.in_transit[dest].remove(m_uuid) # Create copy message = deepcopy(message) # Decide if message has error MER = (1 - (1 - BER) ** message.num_bits) if MER > 0: message.has_errors = (np.random.random() < MER) # Put the message in the destination node # Note: This is a non-blocking call since que in_queue # of a duct has infinite capacity if direction == 'fwd': duct.send(message) elif direction == 'ack': duct.ack(message) else: raise ValueError('Direction can only be "fwd" or "ack"') def transmission_error(self, message): err = '\n****** Cannot send message while connection is closed ******\n' err += 't={:.3f}:\tPropagation delay for connection ({}, {}) is None\n'.format(self.t, self.orig, self.dest) err += '\n' + repr(message) + '\n' err += 'Range interval table:\n' err += str(self.contact_plan) raise TransmissionError(err)