Source code for simulator.ducts.outducts.DtnOutductMBLTP

from copy import deepcopy
import numpy as np
import pandas as pd
from simulator.core.DtnPriorityQueue import DtnPriorityQueue
from simulator.core.DtnSegments import LtpDataSegment
from simulator.core.DtnSegments import LtpReportAcknowledgementSegment
from simulator.ducts.DtnAbstractDuctMBLTP import DtnAbstractDuctMBLTP
from simulator.utils.math_utils import union_intervals, xor_intervals

[docs]class DtnOutductMBLTP(DtnAbstractDuctMBLTP): duct_type = 'outduct' def __init__(self, env, name, parent, neighbor): # Call parent constructor super(DtnOutductMBLTP, self).__init__(env, name, parent, neighbor) # Checkpoint counter to create unique checkpoints per block self.checkpoint_counter = {} # Stores current block, segments and checkpoint under transmission self.block = {} self.checkpoint = {} @property def stored(self): df = pd.concat({b: self.radio[b].stored for b in self.bands}) df['where'] = 'radio' d = {} for sid, q in self.ltp_queues.items(): dff = q.stored if dff.empty: continue dff['where'] = 'LTP session {}'.format(sid) d[sid] = dff return pd.concat(pd.concat(d), df) if d else df def initialize(self, peer, bands=None, agg_size_limit=1e9, agg_time_limit=1e9, segment_size=8e6, checkpoint_timer=1e10, **kwargs): # Call parent initialization super(DtnOutductMBLTP, self).initialize(peer, bands=bands, **kwargs) # The aggregation size and time limits to construct a bundle self.agg_size_limit = float(agg_size_limit) self.agg_time_limit = float(agg_time_limit) # The LTP segment size self.segment_size = float(segment_size) # Checkpoint timer. How long to wait until you resend the checkpoint segment self.checkpoint_timer = float(checkpoint_timer)
[docs] def run(self): """ Creates an LTP block from a set of bundles and sends them. This is the same as for a normal LTP outduct """ # Initialize variables cur_block_size = 0.0 last_block_time = 0.0 cur_block = [] while self.is_alive: # Wait until there is something to transmit bundle = yield from self.in_queue.get() # Add bundle to block cur_block.append(bundle) # Compute total size of this block and the time since last block was issued cur_block_size += bundle.data_vol # bits delta_t = self.env.now - last_block_time # If the aggregation size limit has not been exceeded, continue if cur_block_size < self.agg_size_limit and delta_t < self.agg_time_limit: continue # Transform block to tuple to make it hashable. Also, from now on it should not # change anymore cur_block = tuple(cur_block) session_id = self.get_session_id(cur_block) # Initialize an LTP session to transmit this block self.initialize_ltp_session(session_id, cur_block, cur_block_size) # Reset block counters cur_block_size = 0.0 last_block_time = 0.0 cur_block = []
def initialize_ltp_session(self, session_id, block, size): # Store the block and create an acknowledgement queue for that session self.block[session_id] = block self.ltp_queues[session_id] = DtnPriorityQueue(self.env) # Initialize the checkpoint counter for this session. All checkpoints # must have a unique value self.checkpoint_counter[session_id] = 0 # Start the process for managing this LTP session # Note: This is a non-blocking call since you can have multiple LTP sessions # running at the same time self.env.process(self.run_ltp_session(session_id, size)) def finalize_ltp_session(self, session_id): # Delete global variables for this session block = self.block.pop(session_id) self.checkpoint.pop(session_id) self.checkpoint_counter.pop(session_id) self.ltp_queues.pop(session_id) # Return the block in case you have to put it to the limbo # because this session failed return block def run_ltp_session(self, session_id, size): # Initialize variables acked = 0.0 # Number of bits in this block acknowledged reports = set() # Report segments seen during this LTP session success = False # If True, then LTP succeeded in sending the entire block # If this flag is true, then proceed with sending the segments. The first time around # this is always true do_send = True # Create initial list of segment to transmit segments, checkpoint = self.get_new_block_segment(session_id, size) # Store the current checkpoint self.checkpoint[session_id] = checkpoint # Run until all bits in this block have been acknowledged while self.is_alive: # If you have permission to send, go ahead and do it if do_send: # Start transmitting all segments for s in segments: self.send_through_all(s) # Start the timer for the checkpoint report receive self.env.process(self.start_checkpoint_timer(session_id, checkpoint)) # Mark do_send as false, to avoid re-sending these segments if the report # segment has errors do_send = False # Wait until you get a report segment. Note that this implementation, waiting # for the RS here is representative **only** of the deferred-ack mode report = yield from self.ltp_queues[session_id].get() # Check if the report received is correct. If it is not, then go back to waiting for # report segment but do not re-send the segments (you have already done it) if report.has_errors: continue # If this is a Cancel Segment report, then exit if report.type == 'CS': break # If this report segment was already received, you are done. Otherwise, mark it # as already seen if report.id in reports: continue reports.add(report.id) # Acknowledge report before checking if you have seen it since this can indicate # that the previous report acknowledgement was lost self.acknowledge_report(session_id, report) # Compute the total data volume acknowledged by this report. Note that you could # receive a report that acks less data than another that arrived before acked = max(acked, self.process_report(report)) # If the total number of bytes acknowledged equals the block size, you are done if acked >= size: success = True; break # Create segments to transmit because of this report segments, checkpoint = self.get_missing_block_segments(session_id, report) # Update the current checkpoint self.checkpoint[session_id] = checkpoint # Mark do_send as True since you have new segments to send do_send = True # Tear down this LTP session block = self.finalize_ltp_session(session_id) # If you succeeded, you are done if success: for bundle in block: yield from self.success_queue.put(bundle) return # If this LTP session did not succeed, send bundles to node's limbo for bundle in block: yield from self.to_limbo.put(bundle) def acknowledge_report(self, session_id, report): # Create the acknowledge report segment = LtpReportAcknowledgementSegment(session_id, report.id) # Send for transmission self.send_through_all(segment) def process_report(self, report): # Claims have already been run through the union_set_intervals # at the induct. Just return the data volume return sum(claim[1] for claim in report.claims) def get_new_block_segment(self, session_id, size): # Compute number of segments to send block N = int(np.ceil(size/self.segment_size)) # Create new segments to_tx = [LtpDataSegment(session_id, i*self.segment_size, self.segment_size) for i in range(N)] # Mark the last one as a checkpoint to_tx[-1].checkpoint = self.new_checkpoint_id(session_id) # Return segments and checkpoint return to_tx, to_tx[-1] def get_missing_block_segments(self, session_id, report): # Compute claims intervals offset, length = zip(*report.claims) # Compute start and end of intervals start = np.array(offset) end = start + np.array(length) # Compute which parts of the block are missing missing = xor_intervals(report.lower_bnd, report.upper_bnd, start, end, do_union=False, sort=True) # Initialize variables to_tx = [] # Create new segments for int_s, int_e in missing: # Calculate the number of data segments to generated for this # piece of missing data N = int((int_e-int_s)/self.segment_size) # Create the data segments new_segs = [] for i in range(N): seg_ini = int_s + i*self.segment_size new_segs.append(LtpDataSegment(session_id, seg_ini, self.segment_size, report=report.id)) # Save these segments to_tx.extend(new_segs) # Mark the last one as a checkpoint to_tx[-1].checkpoint = self.new_checkpoint_id(session_id) # Return segments and checkpoint return to_tx, to_tx[-1] def new_checkpoint_id(self, session_id): # Update the checkpoint counter self.checkpoint_counter[session_id] += 1 return self.checkpoint_counter[session_id] def start_checkpoint_timer(self, session_id, old_checkpoint): # Wait until timer expires yield self.env.timeout(self.checkpoint_timer) # If this session id is no longer present, then this LTP session has # already ended. Just return if not self.is_session(session_id): return # Get the current checkpoint being processed cur_checkpoint = self.checkpoint[session_id] # If the checkpoint that triggered the timer is not the current # checkpoint, then this timer is not needed if cur_checkpoint.checkpoint != old_checkpoint.checkpoint: return # Reset the ``has_errors`` flag old_checkpoint.has_errors = False # If the checkpoint that triggered the timer is still the current # checkpoint, then re-transmit the checkpoint segment (i.e. the last one) self.send_through_all(old_checkpoint) # Start the timer for the checkpoint report receive self.env.process(self.start_checkpoint_timer(session_id, old_checkpoint))
[docs] def do_ack(self, segment): """ Re-implement to enable reception of Report Segments """ self.disp('{} delivered to {} through ACK', segment, self.__class__.__name__) # Get session id sid = segment.session_id # If there is no ltp_queue for this message's session, then the outduct # thinks that this transmission was already successful. Therefore, just # send a report acknowledgement automatically. # This can happen if the RA for the last RS is lost, and the LTP induct # re-sends the last RS after a long enough timeout. if sid not in self.ltp_queues: self.acknowledge_report(sid, segment) return # Process the message normally by adding to the LTP queue of acknowledgements # Since this is a normal report segment, use priority 1. yield from self.ltp_queues[segment.session_id].put(segment, 1)
[docs] def send_through_all(self, segment): """ Send a copy of this segment through all the frequency bands available .. Warning:: A new deep copy of the segment is created for each frequency band. """ for i, b in enumerate(self.bands): s = segment if i == 0 else deepcopy(segment) self.radio[b].put(self.neighbor, s, self.peer, self.transmit_mode)
def __str__(self): return "<MBLtpOutduct {}-{}>".format(self.parent.nid, self.neighbor)