Source code for simulator.core.DtnPriorityQueue
# -*- coding: utf-8 -*-
from collections import defaultdict, deque
from heapq import heappush
import pandas as pd
import simpy
from simulator.core.DtnCore import LoadMonitor, Simulable, TimeCounter
[docs]class DtnPriorityQueue(Simulable):
""" New FIFO queue with priority (least is more priority) and (if needed) max capacity. To use it:
1) Create the queue: ``q = DtnPriorityQueue(env, capacity=10)``
2) Put a new element: ``yield from q.put(item, priority)`` or ``yield env.process(q.put(item, priority))``
3) Get an element: ``yield from q.get(item)`` or ``yield env.process(q.get(item))``
.. Tip:: Starting in Python 3.3, you can use the ``yield from`` construct. Prior to that,
you must use the ``yield env.process(...)`` form
.. Tip:: The specified capacity is for the sum over all packets regardless of their
priority.
.. Tip:: You can also explicitly way for the non-empty event. This is useful because it
allows you to pull an element from the queue if multiple conditions occur. Consider
the following code:
.. code-block:: python
:linenos:
# Wait until the queue is not empty or a timeout of 2 seconds expires, whichever
# happens first
yield queue.is_empty() | env.timeout(2)
# If the queue is still empty, you are done
if not queue: return
# If queue is not empty, get data
data = yield from queue.get()
.. Danger:: When putting/getting an element in the queue, if ``yield from`` is not used, then nothing
will happen, even if the capacity for the queue is set to infinity. If that is the case,
the ``put`` method will never block, but you still need to ``yield from`` it.
"""
def __init__(self, env, capacity=float('inf')):
# Call parent constructor
super().__init__(env)
# Store items in deques. They key to the dictionary is the priority level
self.items = {}
# List of priority levels (least is better). It always stays sorted by using
# a heappush operation when a new priority level is registered
self.priorities = []
# Monitor for the number of elements in the queue. If no elements are
# present in the queue, it will stop the get method.
self.stop = simpy.Container(env, init=0, capacity=capacity)
def __len__(self):
""" Returns the total number of elements in this queue across
all priorities
"""
return sum(len(q) for q in self.items.values())
def __bool__(self):
""" Returns true if at least there is one element in any priority level """
return any(self.items.values())
@property
def stored(self):
d = {p: pd.DataFrame([b.to_dict() for b in self.items[p]]) for p in self.priorities}
df = pd.DataFrame() if not d else pd.concat(d.values())
return df
def put(self, item, priority, where='left'):
# Count the new addition. If there is not enough capacity, this will block
yield self.stop.put(1)
# If this priority level is not known, add create new queue
if priority not in self.items: self.new_priority_level(priority)
# Add the element to the appropriate queue
self.add_to_queue(item, priority, where)
def get(self, where='right', check_empty=True):
# Wait until there is at least one element in the queue. Only do it if the queue is
# empty. This allows the calling function to either ``data = yield from queue.get()``
# or to (1) ``yield queue.is_empty(); data = yield from queue.get()``
if check_empty: yield self.is_empty()
# Iterate over the queues in priority order. Recall that self.priorities
# is always sorted
for priority in self.priorities:
# If this priority level is empty, continue
if not any(self.items[priority]): continue
# Get the next element in this priority level
return self.get_from_queue(priority, where=where)
def new_priority_level(self, priority):
# Register the new priority level
heappush(self.priorities, priority)
# Create a new queue for this priority level
self.items[priority] = deque()
[docs] def popleft(self, priority, check_empty=True):
""" Pop from the beginning of the queue.
NOTE: This pops from the left, ``get`` pops from the right
"""
# Wait until there is at least one element in the queue
if check_empty: yield self.is_empty()
# Return the item in this priority level
return self.items[priority].popleft()
def is_empty(self):
return self.stop.get(1)
def add_to_queue(self, item, priority, where):
if where == 'left': self.items[priority].appendleft(item)
elif where == 'right': self.items[priority].append(item)
else: raise RuntimeError('"where" can only be "left" or "right"')
def get_from_queue(self, priority, where):
if where == 'left': return self.items[priority].popleft()
if where == 'right': return self.items[priority].pop()
raise RuntimeError('"where" can only be "left" or "right"')