import numpy as np
import pandas as pd
from simulator.utils.DtnIO import load_route_schedule_file
from simulator.routers import build_route_list
from simulator.routers.DtnAbstractRouter import DtnAbstractRouter, RtRecord
[docs]class DtnLookupRouter(DtnAbstractRouter):
""" Class that implements a router that looks up the route to use based on a
pre-computed route schedule. This route schedule is provided as an excel
file typically and can be computed using different methods (BFS, CGR+anchoring,
CGR+Yen K, etc.)
"""
_contacts_df = None # Dataframe
_ranges_df = None # Dataframe
_all_routes = None # Dictionary
_all_contacts = None # Dictionary indexed by contact id
_all_ranges = None # Dictionary indexed by contact id
def __init__(self, env, parent):
super().__init__(env, parent)
# Initialize variables
indir = self.config['globals'].indir
self.routes_file = indir / self.props.routes
self.max_crit = self.props.max_crit
def reset(self):
# Reset static variables
self.__class__._contacts_df = None
self.__class__._ranges_df = None
self.__class__._all_routes = None
self.__class__._all_contacts = None
self.__class__._all_ranges = None
def initialize(self):
# Get list of relays
self.relays = set(self.env.nodes.keys()) if self.props.relays == 'all' else self.props.relays
self.non_relays = set(self.env.nodes.keys()) - self.relays
# Get the mobility model for this router
self.mobility_model = self.env.mobility_models[self.parent.props.mobility_model]
# Initialize contacts and ranges
self.initialize_contacts_and_ranges()
# Initialize routes
self.initialize_routes()
def initialize_contacts_and_ranges(self):
# Working on static properties of the class, do it once only
if self.__class__._all_contacts is not None: return
# Get the contacts and ranges from the contact
cp = self.mobility_model.contacts_df
ri = self.mobility_model.ranges_df
# Check that the contact plan was properly imported
assert cp is not None, 'Contact plan is None'
assert ri is not None, 'Range intervals is None'
# Store contacts and ranges df
self.__class__._contacts_df = cp
self.__class__._ranges_df = ri
# Depack dataframe to a dictionary to improve performance.
self.__class__._all_contacts = cp.to_dict(orient='index')
self.__class__._all_ranges = ri.to_dict(orient='index')
def initialize_routes(self):
# Working on static properties of the class, do it once only
if self.__class__._all_routes is not None: return
# If routes file is provided, and re-computation is not forced, just load the file
if self.props.routes != None and self.props.recompute_routes == False:
print('Loading route schedule')
routes = load_route_schedule_file(self.routes_file, self.epoch)
else:
routes = self.build_route_schedule(self.env.do_track)
# Validate the route schedule
routes = self.validate_route_list(routes)
# Save route schedule if necessary
if self.props.recompute_routes:
path = self.config['globals'].indir / self.props.routes
routes.to_excel(path)
# If mode is slow, run translation mechanism
routes = self.prepare_route_list(routes)
# Store the routes
self.__class__._all_routes = routes
[docs] def find_routes(self, bundle, first_time, **kwargs):
""" Find a route for a bundle. This is comprised of four steps:
1) Building a route list
2) Down-selecting the route list to find the best option for each neighbor
3) Selecting the neighbor(s) depending on criticality
4) Try the route to make sure that you will not miss any future contacts given the
current time and backlog for this neighbor. If so, try a re-routers event
"""
# Increase counter
self.counter += 1
# Step 1) Get route list
try:
options = self.build_route_list(bundle)
except Exception as e:
raise type(e)('t={}: Error creating route list for bundle {}.'.format(self.t, bundle))
# If no options found, then tell the DtnNode forward function to drop this bundle
# In reality, in ION you would probably check for alternate routing procedures
if not options: return None, None
# Step 2) Find proximate node list in the form of CgrRecord list
try:
rt_records = self.find_prox_node_list(bundle, options)
except Exception as e:
raise type(e)('t={}: Error creating proximate node list for bundle {}.'.format(self.t, bundle))
# The proximate node list is empty. Return ([],[]), which will mark this bundle as dropped
if not rt_records: return [], []
# Step 3) Find next hop based on proximate node list
if bundle.critical == True and first_time == True:
next = self.select_neighbor_critical_bundle(rt_records)
else:
next = self.select_neighbor_standard_bundle(rt_records)
# Step 4) Try the selected routes to see if they actually works.
# NOTE: returns empty lists
try:
to_keep, cids_to_exclude = self.try_route_list(next)
except Exception as e:
raise type(e)('t={}: Error trying routes for bundle {}.'.format(self.t, bundle))
# Return routes
return to_keep, cids_to_exclude
def build_route_list(self, bundle):
# If the route schedule is available, use it
if self.__class__._all_routes:
try: return self.__class__._all_routes[self.parent.nid, bundle.dest]
except KeyError: return
# If the route schedule is not available, trigger cgr
routes = build_route_list(self.parent.nid, bundle.dest, self.t, self._contacts_df, self._ranges_df,
relays=self.relays, max_speed=self.props.max_speed,
verbose=False, ncpu=1, algorithm=self.props.algorithm,
mode=self.props.mode)
# Validate the route list generated
routes = self.validate_route_list(routes)
# If mode is slow, run translation mechanism
routes = self.prepare_route_list(routes)
# Return the route list
return routes[self.parent.nid, bundle.dest]
def build_route_schedule(self, verbose=False):
# Create the route schedule
nodes = list(self.env.nodes.keys())
routes = build_route_list(nodes, nodes, 0.0, self._contacts_df, self._ranges_df, relays=self.relays,
ncpu=self.props.num_cores, algorithm=self.props.algorithm,
mode=self.props.mode, verbose=verbose)
# Return the route schedule
return routes
def validate_route_list(self, routes):
# Initialize variables
to_keep = []
excluded = [','.join(r) for r in self.props.excluded_routes]
# Filter routes that are invalid
for idx, row in routes.iterrows():
# Initialize variables
route = row.route
# Get the mid hops
mid_hops = set(route[1:-1])
# If the there is a non-relay in the mid hops, skip
if not mid_hops.isdisjoint(self.non_relays): continue
# If neither the tx and rx are relays, compute the number of hops
if row.orig not in self.relays and row.dest not in self.relays:
# Compute the number of hops through a relay.
relay_hops = np.in1d(list(self.relays), route).sum()
# If too many relay hops, continue
if relay_hops > self.props.max_relay_hops: continue
# If this route contains any of the excluded routes, eliminate it
route = ','.join(route)
if any(exc in route for exc in excluded):
continue
to_keep.append(idx)
# Filter routes that are valid
routes = routes.loc[to_keep, :].reset_index(drop=True)
return routes
def prepare_route_list(self, routes):
# If routes has already been prepared, return
if isinstance(routes, dict): return routes
# Initialize variables
prepared_routes = {}
routes = {c: routes[c].values for c in routes.columns} # {column -> [values]}
contacts = self.__class__._all_contacts
# Process route schedule
for o, d in pd.unique(list(zip(routes['orig'], routes['dest']))):
# Get the indices for this o-d pair
idx = (routes['orig'] == o) & (routes['dest'] == d)
# Construct the data structure
data = {}
data['EAT'] = routes['EAT'][idx]
data['nhops'] = routes['nhops'][idx].astype('int32')
data['tstart'] = routes['tstart'][idx]
data['tend'] = routes['tend'][idx]
data['route'] = routes['route'][idx]
data['contacts'] = routes['contacts'][idx]
data['contacts_set'] = [set(c) for c in data['contacts']]
data['nodes'] = [set(r) for r in data['route']] # Full list of nodes visited, including origin
data['hops'] = [r - {o} for r in data['nodes']] # List of nodes visited, without the origin
data['next_hop'] = np.array([r[1] for r in data['route']])
data['next_cid'] = np.array([r[0] for r in data['contacts']])
data['capacity'] = [min(contacts[cid]['capacity'] for cid in cts) for cts in data['contacts']]
data['capacity'] = np.array(data['capacity'], dtype='float64')
# Store the routes for this o-d pair
prepared_routes[o, d] = data
return prepared_routes
[docs] def find_prox_node_list(self, bundle, opts):
""" Find all neighbors to send the bundle to. Create a CgrRecord for each option
Performs step 2 of ``find_routes``.
"""
# Filter routes that end before now
idx = opts['tend'] > self.t
# Filter routes that go through already visited nodes
if bundle.visited:
tmp = set(bundle.visited)
idx &= np.array([n.isdisjoint(tmp) for n in opts['hops']])
# Filter routes that go through excluded contacts
#if bundle.excluded: idx &= ~np.isin(opts['next_cid'], bundle.excluded)
if bundle.excluded:
tmp = set(bundle.excluded)
idx &= np.array([c.isdisjoint(tmp) for c in opts['contacts_set']])
# Filter routes that do not have enough capacity for this bundle
idx &= (opts['capacity'] >= bundle.data_vol)
# If all opts have been invalidated, return None
if not idx.any(): return
# Initialize variables
candidates = opts['next_hop'][idx]
next_cids = opts['next_cid'][idx]
contacts = opts['contacts'][idx]
tstart_route = opts['tstart'][idx]
tend_route = opts['tend'][idx]
rt_records = []
# Figure out the priority of this bundle (0=critical, 1=false)
priority = self.find_bundle_priority(bundle)
# Get first contact for each neighbor. This relies on the fact that data
# comes in sorted format
while candidates.size > 0:
idx = candidates == candidates[0]
next_cid = next_cids[idx][0]
contact = contacts[idx][0]
ts_route = tstart_route[idx][0]
te_route = tend_route[idx][0]
# Compose the DTN record and store it
con = self.__class__._all_contacts[next_cid]
con['cid'] = next_cid
rte = {'tstart': ts_route, 'tend': te_route, 'contacts': contact}
rec = RtRecord(bundle=bundle, contact=con, route=rte, priority=priority, neighbor=con['dest'])
rt_records.append(rec)
# Check if this routing decision has affected the current route
if hasattr(bundle, 'cur_route'):
if bundle.cur_route != contacts:
print('Route changed')
# Eliminate all routes to the node already used
candidates = candidates[~idx]
next_cids = next_cids[~idx]
contacts = contacts[~idx]
tstart_route = tstart_route[~idx]
tend_route = tend_route[~idx]
return rt_records
[docs] def select_neighbor_critical_bundle(self, rt_records):
""" Select the next neighbors to send a critical bundle to.
Performs step 3 of ``find_routes``.
"""
return rt_records[:self.max_crit] if self.max_crit is not None else rt_records
[docs] def select_neighbor_standard_bundle(self, rt_records):
""" Select the next neighbor to send a standard bundle to.
Performs step 3 of ``find_routes``.
"""
return [rt_records[0]]
def try_route_list(self, rt_records):
# Initialize variables
to_keep = []
cids_to_exclude = []
EAT = self.t
# Iterate over route options
for record in rt_records:
# Initialize variables
valid = True
bundle = record.bundle
contact = record.contact
mngr = self.parent.queues[contact['dest']]
# Compute backlog for this neighbor
if contact['cid'] == mngr.current_cid:
backlog = mngr.queue.backlog # [bits]
else:
backlog = mngr.future_backlog[contact['cid']]
# Follow the route to see if you will miss any connections
for cid in record.route['contacts']:
# Get hop data
dr = self.__class__._all_contacts[cid]['rate'] # [bps]
Tp = self.__class__._all_contacts[cid]['range'] # [s]
Te = self.__class__._all_contacts[cid]['tend'] # [s]
# Compute the "Estimated Departure Time"
EDT = max(EAT, contact['tstart']) + backlog/dr
# Compute the "Estimated Arrival Time"
EAT = EDT + bundle.data_vol/dr + Tp
# If the estimated arrival time is too late, then this route is
# not valid
if EAT >= Te: valid = False; cids_to_exclude.append(cid); break
# After the first hop you do not have information about the state
# of another node's queues. Therefore, assume it is zero
backlog = 0
# If the route is valid, keep it
if valid: to_keep.append(record)
return to_keep, cids_to_exclude