Source code for simulator.ducts.DtnAbstractDuct
import abc
from simulator.core.DtnCore import Simulable
from simulator.core.DtnQueue import DtnQueue
[docs]class DtnAbstractDuct(Simulable, metaclass=abc.ABCMeta):
""" An abstract duct. It operates 2 queues:
1) in_queue: Queue where all bundles to be sent are placed.
2) to_limbo: Queue where bundles that fail to be sent by the
convergence layer are placed. The ``fail_manager``
function in this class takes them an puts them in
the node's limbo queue for re-routing.
"""
duct_type = None
def __init__(self, env, name, parent, neighbor):
super(DtnAbstractDuct, self).__init__(env)
self.base_name = name # See network architecture file (xml)
self.parent = parent # DtnNode
self.neighbor = neighbor # A string with the name of the neighbor
self.monitor = self.env.monitor
# Connection through which data is sent
#self.conn = env.connections[parent.nid, neighbor]
# Add the queue for the convergence layer. DTN does not control it and
# therefore it is assumed to be plain FIFO
self.in_queue = DtnQueue(env)
# Queue to store messages that were not successfully sent by the duct
# and thus must be sent to the node's limbo
self.to_limbo = DtnQueue(self.env)
# Queue to store messages that were successfully sent by the duct
self.success_queue = DtnQueue(self.env)
def initialize(self, peer, **kwargs):
# Peer duct (for an outduct it is an induct and vice versa)
self.peer = peer
# Activate the process for this convergence layer
self.env.process(self.run())
# Run the fail manager that returns bundles that failed to be sent for
# re-routers
self.env.process(self.fail_manager())
# Run the success manager
self.env.process(self.success_manager())
@property
def is_alive(self):
return self.parent.is_alive
@abc.abstractmethod
def total_datarate(self, dest):
pass
@property
def transmit_mode(self):
return 'fwd' if self.duct_type == 'outduct' else 'ack'
@property
def name(self):
return '{} {} ({}-{})'.format(self.__class__.__name__, self.base_name,
self.parent.nid, self.neighbor)
@property
def stored(self):
return self.in_queue.stored
@property
@abc.abstractmethod
def radios(self):
""" Returns a dictionary with the radios for this duct """
pass
[docs] def send(self, message):
""" Pass a message to the duct for transmission
This is a non-blocking call.
"""
self.env.process(self.do_send(message))
def do_send(self, message):
# Add to the queue
self.disp('{} delivered to {}', message, self.__class__.__name__)
yield from self.in_queue.put(message)
@abc.abstractmethod
def run(self):
pass
[docs] def ack(self, message):
""" This is called by a DtnConnection if you transmit something an specify direction
equals "ack". It is more general than LTP, it is a generic mechanism to communicate
"backwards" between two ducts (from rx's induct to tx's outduct) and no go through
the in_queue (that takes message from the upper layer)
"""
self.env.process(self.do_ack(message))
[docs] def do_ack(self, message):
""" A duct, by default should not be able to perform ack. See DtnOutductLTP for an example
of implementation
"""
# Fake yield to ensure this is a coroutine
yield self.env.timeout(0)
# This is not allowed unless this function is re-implemented
# See DtnOutductLTP and DtnInductLTP for an example of how to use it
raise RuntimeError('You cannot ACK in this convergence layer')
def notify_success(self, message):
self.env.process(self.do_notify_success(message))
def do_notify_success(self, message):
yield from self.success_queue.put(message)
def success_manager(self):
while self.is_alive:
# Wait for a block that was successfully transmitted
# Do nothing in the default implementation. Just here to
# prevent the queue from filling indefinitely
yield from self.success_queue.get()
[docs] @abc.abstractmethod
def radio_error(self, message):
""" This function signals an LTP session that it needs to terminate because an error
has occurred in the radio. The ``fail_manager`` will then be responsible to put
the corresponding bundles to the node's limbo.
"""
pass
def fail_manager(self):
while self.is_alive:
# Wait for a block that was not successfully transmitted
# by an LTP session
bundle = yield from self.to_limbo.get()
# Get the cid that needs to be excluded. This is not very neat, but
# essentially reaches into the current DtnNeighborManager for this
# neighbor and pulls the contact id
cid = self.parent.queues[self.neighbor].current_cid
# Send to node's limbo for re-routers
self.parent.limbo(bundle, cid)