Source code for simulator.ducts.inducts.DtnInductMBLTP

from collections import defaultdict
from copy import deepcopy
import numpy as np
import pandas as pd
from simulator.core.DtnPriorityQueue import DtnPriorityQueue
from simulator.core.DtnSegments import LtpDataSegment, LtpReportSegment
from simulator.ducts.DtnAbstractDuctMBLTP import DtnAbstractDuctMBLTP
from simulator.utils.math_utils import union_intervals

[docs]class DtnInductMBLTP(DtnAbstractDuctMBLTP): duct_type = 'induct' def __init__(self, env, name, parent, neighbor): super(DtnInductMBLTP, self).__init__(env, name, parent, neighbor) # Counter for the report segment ids, one per LTP session self.report_counter = {} # Dictionary with report segments pending acknowledgement, one per LTP session # {session_id: rs.id: rs} self.pending_ack = defaultdict(dict) # Indicates the maximum session_id for which a session has been opened. It is # updated every time an LTP session is initialized self.last_sid = -1 # UNCOMMENT FOR TESTING #self.counter = 0 # Counts the num. of blocks delivered #self.delivered = {} # Records bundles delivered @property def stored(self): df = pd.concat({b: self.radio[b].stored for b in self.bands}) df['where'] = 'radio' d = {} for sid, q in self.ltp_queues.items(): dff = q.stored if dff.empty: continue dff['where'] = 'LTP session {}'.format(sid) d[sid] = dff return pd.concat(pd.concat(d), df) if d else df def initialize(self, peer, report_timer=1e10, **kwargs): # The timer that triggers re-tx of a report segment if you do not hear from peer self.report_timer = float(report_timer) # Call parent initialization super(DtnInductMBLTP, self).initialize(peer, **kwargs) def run(self): while self.is_alive: # Wait until you have received a segment segment = yield from self.in_queue.get() # Get the session id for this segment sid = segment.session_id # If a session is not already running and you have never opened a # session with this id, do it. The validity of this piece of code # hinges on the session ids being sequential and in increasing order # as defined in ``DtnAbstractDuctLTP`` if not self.is_session(sid) and sid > self.last_sid: self.initialize_ltp_session(sid) # If there is no session opened for this segment, then this segment # is for a session that has already ended. Just discard it. if not self.is_session(sid): continue # Direct the segment to the appropriate ltp_receive process # This is either a DS or RA, so no need to put it expedited yield from self.ltp_queues[sid].put(segment, 1) def initialize_ltp_session(self, session_id): # Create a new queue self.ltp_queues[session_id] = DtnPriorityQueue(self.env) # Initialize the report counter self.report_counter[session_id] = 0 # Increase the last_sid counter self.last_sid = max(self.last_sid, session_id) # Start the process for managing this LTP session # Note: This is a non-blocking call since you can have multiple LTP sessions # running at the same time self.env.process(self.run_ltp_session(session_id)) def finalize_ltp_session(self, session_id): # Delete global variables for this session self.report_counter.pop(session_id) self.pending_ack.pop(session_id) self.ltp_queues.pop(session_id)
[docs] def run_ltp_session(self, session_id): """ Wait for segments to reconstruct a block. If a checkpoint is created, respond with a Report Segment """ # Initialize variables first_checkpt = True to_receive = -1 received = set() rx_checkpoints = set() success = False # Ensures you only deliver the blocks once. rs = LtpReportSegment(session_id) last_rs = None # Define a function to assess whether you have succeded in transmitting # this block. Note that this function has to belong to each ``run_ltp_session`` # because it will use its variables def has_succeeded(success): # If you succeeded in the past, then you have not succeeded now # This ensures that you only deliver blocks once if success: return False # If you do not know how much data is in this block, you cannot # have succeeded as you have not seen the first checkpoint yet. if to_receive == -1: return False # You need to have only 1 record in received due to union_intervals if len(received) != 1: return False # Make sure that the total data volume in the block is the expected return sum(c[1] for c in received) >= to_receive # Run until all bits in this block have been acknowledged while self.is_alive: # Wait until you have received a segment segment = yield from self.ltp_queues[session_id].get() # If this segment has errors, discard it, you cannot understand its contents if segment.has_errors: continue # If this is a Cancel Segment report, then exit if segment.type == 'CS': break # If this is a report acknowledgement segment, process it separately if segment.type == 'RA': self.process_report_acknowledgement(session_id, segment) # If you have not received acknowledgement from all report segments, wait if self.pending_ack[session_id]: continue # If in the last RS you did not acknowledge all the data volume # in this block, you cannot exit still. Otherwise you will leave # the peer induct lingering forever. dv_last_rs = sum(c[1] for c in last_rs.claims) if dv_last_rs != to_receive: continue # At this point you are ready to exit this LTP session break # If you have succeed, skip these to reduce computational burden if not success: # Add a reception claim. NOTE: This is different from rfc 5350!!! The offset of # the claim is relative to the start of the block, whereas in the spec it is # relative to RS' lower bound rs.claims.add((segment.offset, segment.length)) # Compute the received data up until now received = self.compute_received_data(rs, received) # If you have received all data in the block, you are ready to exit. However, this # cannot happen before the first checkpoint because you do not know how long the # block is. if has_succeeded(success): self.deliver_block(session_id) success = True # If this segment is not a checkpoint, you are done if not segment.is_checkpoint: continue # If you have already seen this checkpoint, continue. This segment could be a # duplicate checkpoint sent by a timer or the second copy through the other band. if segment.checkpoint in rx_checkpoints: continue # At this point you know that this is a checkpoint and it is not a duplicate. # Therefore, add it to set of seen checkpoints rx_checkpoints.add(segment.checkpoint) # If this is the first time you see a checkpoint, it will tell you the block # size. This is only true because of the deferred-ack mode if first_checkpt: to_receive = segment.offset + segment.length first_checkpt = False # This segment is a checkpoint, you must issue a Report Segment in response rs.checkpoint = segment.checkpoint rs.lower_bnd = 0 rs.upper_bnd = to_receive rs.id = self.new_report_id(session_id) rs.claims = deepcopy(received) # A copy is necessary! # Enqueue the report to be sent self.send_through_all(rs) # Mark this report segment as pending acknowledgement self.pending_ack[session_id][rs.id] = rs # Start the timer for the report segment self.env.process(self.start_report_timer(session_id, rs.id)) # If this is the first checkpoint and you have succeeded, then deliver the # block because you haven't done so previously. In any other case, you have # already delivered the block if has_succeeded(success): self.deliver_block(session_id) success = True # Reset the Report Segment last_rs = rs rs = LtpReportSegment(session_id) # Tear down this LTP session self.finalize_ltp_session(session_id)
#print(self.ltp_sessions) def new_report_id(self, session_id): # Update the report counter self.report_counter[session_id] += 1 # Return the current value return self.report_counter[session_id] def compute_received_data(self, rs, received): # Append the new claims to the already received data received.update(rs.claims) # Prepare claims offset, length = zip(*received) # Transform to numpy arrays start = np.array(offset) end = start + np.array(length) # Compute the data intervals that these claims acknowledge start, end = union_intervals(start, end, stacked=False) # Return the new set of received data return set(zip(start, end-start)) def deliver_block(self, session_id): # If this block is not present in the peer, it was already delivered # This can happen because of the early delivery mechanism of this duct if session_id not in self.peer.block: return # Get actual block from peer outduct. This does not actually happen, # it is just a shortcut for the simulation. Also, you can do this # because all connections have propagation delays > 1 second and therefore # it is impossible that you have already received the corresponding report # acknowledgment block = self.peer.block[session_id] # UNCOMMENT FOR TESTING #self.counter += 1 #print(self.t, self.counter, 'block delivered for session', session_id) # Deliver bundles to the DTN node for bundle in block: # UNCOMMENT FOR TESTING #if bundle.mid in self.delivered: # t2, t1 = self.t, self.delivered[bundle.mid] # dt = t2-t1 # print(f'{t1} : Bundle {bundle.mid} already delivered at {t2} - dt={dt}') #else: self.delivered[bundle.mid] = self.t self.parent.forward(bundle) def process_report_acknowledgement(self, session_id, segment): # If this report ack does not point to a report in pending, it was # previously eliminated by another report ack. Skip if segment.report_id not in self.pending_ack[session_id]: return # Mark this report segment as no longer pending acknowledgment del self.pending_ack[session_id][segment.report_id] def start_report_timer(self, session_id, rid): # Wait until timer expires yield self.env.timeout(self.report_timer) # If this session no longer exists, return if not self.is_session(session_id): return # If this report segment has already been acknowledged, return if rid not in self.pending_ack[session_id]: return # Get the missing report rs = self.pending_ack[session_id][rid] # Reset the ``has_errors`` flag rs.has_errors = False # Re-send report segment self.send_through_all(rs) # Start the timer for the report segment self.env.process(self.start_report_timer(session_id, rs.id))
[docs] def send_through_all(self, segment): """ Send a copy of this segment through all the bands """ for i, b in enumerate(self.bands): s = segment if i == 0 else deepcopy(segment) self.radio[b].put(self.neighbor, s, self.peer, self.transmit_mode)
def __str__(self): return "<MBLtpInduct {}-{}>".format(self.parent.nid, self.neighbor)