Source code for simulator.nodes.DtnOverbookeableQueue

from simulator.core.DtnLock import DtnLock
from simulator.core.DtnCore import Simulable
from simulator.core.DtnBundle import critical_priority, bulk_priority
from simulator.core.DtnPriorityQueue import DtnPriorityQueue

[docs]class DtnLockeablePriorityQueue(Simulable): """ Implements the locking mechanism for when a contact is not available """ def __init__(self, env, parent): super().__init__(env) self.monitor = self.env.monitor # Store the node that contains this manager, it is a node self.parent = parent # Create a priority queue for this manager self.queue = DtnPriorityQueue(env) # Create the critical and bulk priority levels self.queue.new_priority_level(critical_priority) # Critical self.queue.new_priority_level(bulk_priority) # Bulk # Create a lock. When closed, no elements can be taken from the queue self.lock = DtnLock(env) # Acquire the key initially, i.e. close the door self.close() self.disp('Gate is closed') # Total number of bits accumulated in the queue self.backlog = 0.0 # If no need to monitor, return if self.monitor == False: return # Initialize variables for monitoring the gate self.open_times = [] self.close_times = [] @property def is_alive(self): return self.parent.is_alive @property def stored(self): return self.queue.stored @property def items(self): return self.queue.items def put(self, rt_record, priority, where='left'): # Log the arrival self.disp('{} with priority {} is put into the manager {}-{}', rt_record.bundle, priority, self.parent.parent.nid, self.parent.neighbor) # Add this bundle data volume to the backlog self.backlog += rt_record.bundle.data_vol # Put in the queue yield from self.queue.put(rt_record, priority, where=where) def get(self): # Wait until there is something to take out rt_record = yield from self.queue.get() # Log the departure self.disp('{} is retrieved from the manager', rt_record.bundle) # Subtract this bundle data volume from the backlog self.backlog -= rt_record.bundle.data_vol return rt_record def close(self): return self.lock.acquire() def open(self): self.lock.release() def monitor_gate_open(self): if self.monitor == False: return self.open_times.append(self.t) def monitor_gate_close(self): if self.monitor == False: return self.close_times.append(self.t)
[docs]class DtnOverbookeableQueue(DtnLockeablePriorityQueue): """ Implements the overbooking mechanism """ def __init__(self, env, parent): super().__init__(env, parent) # Data rate of the output of this queue self.data_rate = None # Capacity counter self._capacity = 0 # Time of next closing self.next_close = None @property def capacity(self): # If the gate is already closed, just return counter if self.next_close == None: return self._capacity # The gate is opened, so capacity is equal to min between # the capacity that is left from the time the previous bundle # arrived, and the capacity left in the current contact return min(self._capacity, self.data_rate * (self.next_close - self.t)) @capacity.setter def capacity(self, value): self._capacity = value def put(self, rt_record, priority, where='left'): # Initialize variables bundle = rt_record.bundle # If there is enough capacity left in this contact, just add the bundle to the queue if self.capacity > bundle.data_vol: yield from self.put_in_queue(rt_record, priority, where=where) return () # If not enough capacity and this bundle is non-critical, then it needs # to be re-routed. Return False to indicate that it is not accepted here if not bundle.critical: return (rt_record,) # Figure out how many non-critical bundles you need to remove to make # room for this bundle. If you cannot make room for this bundle, removed = () removed = yield from self.make_room(bundle) # If success is indicated, put the bundle in if removed: yield from self.put_in_queue(rt_record, priority, where=where) return removed # If you reach this point, you where not able to make room for this bundle. # Therefore, trigger re-routing return (rt_record,) def put_in_queue(self, rt_record, priority, where='left'): # Subtract capacity from the contact self.capacity -= rt_record.bundle.data_vol # If capacity goes to negative, raise error if self.capacity < 0: raise RuntimeError('Capacity for DtnOverbookeableQueue is < 0') # Trigger the put operation yield from super().put(rt_record, priority, where=where) def make_room(self, bundle): # If the bulk priority queue is empty, there is no extra room if not self.queue.items[1]: return () # Get all items in the bulk priority queue blk_items = self.queue.items[1] # Initialize variables room, i, N = 0, 0, len(blk_items) # Iterate over bulk items to see how many do you need while self.is_alive: # Count room by taking one out (NOTE: An item contains a rt_record) room += blk_items[i].data.bundle.data_vol # If this is the last bulk bundle, break if i == N - 1: break # If you already have enough room, break if room >= bundle.data_vol: break # Increment the number of bundles to pop i += 1 # If not enough room is available, fail. if bundle.data_vol > room + self.capacity: return () # Gather the list of bundles to remove (always from the bulk queue) # NOTE 1: Popleft always from bulk queue # NOTE 2: DO NOT use list comprehension with yields from inside! removed = [] for _ in range(i + 1): rt_record = yield from self.queue.popleft(1) removed.append(rt_record) # Add capacity back to the queue for the room you just made self.capacity += sum(record.bundle.data_vol for record in removed) # Return list of bulk bundles to be removed return removed