Source code for simulator.ducts.inducts.DtnInductLTP


from collections import defaultdict
from simulator.ducts.DtnAbstractDuctLTP import DtnAbstractDuctLTP
from simulator.core.DtnSegments import LtpReportSegment
from simulator.core.DtnPriorityQueue import DtnPriorityQueue
import pandas as pd

[docs]class DtnInductLTP(DtnAbstractDuctLTP): """ An LTP induct """ duct_type = 'induct' def __init__(self, env, name, parent, neighbor): # Call parent constructor super(DtnInductLTP, self).__init__(env, name, parent, neighbor) # Counter for the report segment ids, one per LTP session self.report_counter = {} # Dictionary with report segments pending acknowledgement, one per LTP session # {session_id: rs.id: rs} self.pending_ack = defaultdict(dict) # UNCOMMENT FOR TESTING # self.counter = 0 # Counts num of bundles delivered @property def stored(self): df = self.radio.stored d = {} for sid, q in self.ltp_queues.items(): dff = q.stored if dff.empty: continue dff['where'] = 'LTP session {}'.format(sid) d[sid] = dff return pd.concat(pd.concat(d), df) if d else df def initialize(self, peer, report_timer=1e10, **kwargs): # The timer that triggers re-tx of a report segment if you do not hear from peer self.report_timer = float(report_timer) # Call parent initialization super(DtnInductLTP, self).initialize(peer, **kwargs) def run(self): while self.is_alive: # Wait until you have received a segment segment = yield from self.in_queue.get() # Get the session id for this segment sid = segment.session_id # If a session is not already running, create it if not self.is_session(sid): self.initialize_ltp_session(sid) # Direct the segment to the appropriate ltp_receive process # This is either a DS or RA, so no need to put it expedited yield from self.ltp_queues[sid].put(segment, 1) def initialize_ltp_session(self, session_id): # Create a new queue self.ltp_queues[session_id] = DtnPriorityQueue(self.env) # Initialize the report counter self.report_counter[session_id] = 0 # Start the process for managing this LTP session # Note: This is a non-blocking call since you can have multiple LTP sessions # running at the same time self.env.process(self.run_ltp_session(session_id)) def finalize_ltp_session(self, session_id): # Delete global variables for this session self.report_counter.pop(session_id) self.pending_ack.pop(session_id) self.ltp_queues.pop(session_id)
[docs] def run_ltp_session(self, session_id): """ Wait for segments to reconstruct a block. If a checkpoint is created, respond with a Report Segment """ # Initialize variables first_checkpt = True to_receive = 0.0 received = 0.0 rx_checkpoints = set() success = False # If True, you have received all segments for this block rs = LtpReportSegment(session_id) # Run until all bits in this block have been acknowledged while self.is_alive: # Wait until you have received a segment segment = yield from self.ltp_queues[session_id].get() # If this segment has errors, discard it, you cannot understand its contents if segment.has_errors: continue # If this is a Cancel Segment report, then exit if segment.type == 'CS': break # If this is a report acknowledgement segment, process it separately if segment.type == 'RA': self.process_report_acknowledgement(session_id, segment) # If you have not received acknowledgement from all report segments, wait if self.pending_ack[session_id]: continue # If you are not ready to exit because you are missing data, wait if not success: continue # At this point you are ready to exit this LTP session break # If you have succeed already, just keep waiting for a RA. That is all left # in this session. if success: continue # If you have already seen this checkpoint, continue. You need to check # this here because this segment could be a duplicate checkpoint sent # by a timer. if segment in rx_checkpoints: continue # If this segment has an offset that is lower than the current report's lower # bound, use that one. It means that data segments for a given checkpoint have # been received out of order if segment.offset < rs.lower_bnd: rs.lower_bnd = segment.offset # Add a reception claim. NOTE: This is different from rfc 5350!!! The offset of # the claim is relative to the start of the block, whereas in the spec it is # relative to RS' lower bound rs.claims.add((segment.offset, segment.length)) # Add data volume received received += segment.length # If this segment is not a checkpoint, you are done if not segment.is_checkpoint: continue # At this point you know that this is a checkpoint and it is not a duplicate. # Therefore, add it to set of seen checkpoints rx_checkpoints.add(segment) # If this is the first time you see a checkpoint, it will tell you the block # size. This is only true because of the deferred-ack mode if first_checkpt: to_receive = segment.offset + segment.length first_checkpt = False # This segment is a checkpoint, you must issue a Report Segment in response rs.checkpoint = segment.checkpoint rs.upper_bnd = segment.offset + segment.length rs.id = self.new_report_id(session_id) # Enqueue the report to be sent self.radio.put(self.neighbor, rs, self.peer, self.transmit_mode) # Mark this report segment as pending acknowledgement self.pending_ack[session_id][rs.id] = rs # Start the timer for the report segment self.env.process(self.start_report_timer(session_id, rs.id)) # If you have received all data in the block, you are ready to exit success = received >= to_receive # If you are ready to exit, deliver the block to the DTN node. Do not wait # for the last report acknowledgement since that would be a waste of time # in the presence of long RTTs if success: self.deliver_block(session_id) # Reset the Report Segment rs = LtpReportSegment(session_id) # Tear down this LTP session self.finalize_ltp_session(session_id)
def new_report_id(self, session_id): # Update the report counter self.report_counter[session_id] += 1 # Return the current value return self.report_counter[session_id] def deliver_block(self, session_id): # Get actual block from peer outduct. This does not actually happen, # it is just a shortcut for the simulation. Also, you can do this # because all connections have propagation delays > 1 second and therefore # it is impossible that you have already received the corresponding report # acknowledgment block = self.peer.block[session_id] # UNCOMMENT FOR TESTING #self.counter += 1 #print(self.t, self.counter, 'block delivered') # Deliver bundles to the DTN node for bundle in block: self.parent.forward(bundle) def process_report_acknowledgement(self, session_id, segment): # If this report ack does not point to a report in pending, it was # previously eliminated by another report ack. Skip if segment.report_id not in self.pending_ack[session_id]: return # Mark this report segment as no longer pending acknowledgment del self.pending_ack[session_id][segment.report_id] def start_report_timer(self, session_id, rid): # Wait until timer expires yield self.env.timeout(self.report_timer) # If this session no longer exists, return if not self.is_session(session_id): return # If this report segment has already been acknowledged, return if rid not in self.pending_ack[session_id]: return # Get the missing report rs = self.pending_ack[session_id][rid] # Reset the ``has_errors`` flag rs.has_errors = False # Re-send report segment self.radio.put(self.neighbor, rs, self.peer, self.transmit_mode) # Start the timer for the report segment self.env.process(self.start_report_timer(session_id, rs.id)) def __str__(self): return "<LtpInduct {}-{}>".format(self.neighbor, self.parent.nid)