Source code for simulator.generators.DtnMarkovBundleGenerator

# -*- coding: utf-8 -*-

import os
from collections import defaultdict
from copy import deepcopy
from warnings import warn

import numpy as np
import pandas as pd
from pathlib import Path
from simulator.core.DtnBundle import Bundle
from simulator.utils.DtnIO import load_traffic_file
from simulator.utils.DtnUtils import shift_traffic
from simulator.generators.DtnAbstractGenerator import DtnAbstractGenerator

# ============================================================================================================
# === DEFINE LATENCY CATEGORIES - THESE ARE CONSTANT
# ============================================================================================================

# Define latency
lat = np.array([[60, np.nan, np.nan],
                [60, np.nan, np.nan],
                [60, np.nan, 3600],
                [60,    60, np.nan],
                [60,   900,  21600],
                [60,   300,   3600],
                [60,   300, np.nan],
                [60,    60, np.nan],
                [60,   900,  21600],
                [60,   900,  21600],
                [60,   900,  21600],
                [60,   300, np.nan]])
lat = pd.DataFrame(data=1.0*lat, columns=['seconds','minutes','hours'],
                   index=['voice','biomedical','caution and warning','command and teleoperation',
                          'file','health and status','nav type 1 products','nav type 2 message',
                          'pao hd video','sci hd video','science','sd video'])

# ============================================================================================================
# === FUNCTIONS TO CREATE TWO STATE MARKOV PROCESS AND BUNDLE GENERATION TIMES
# ============================================================================================================

def two_state_markov_process(Tmin, Tmax, DutyCycle, Ton):
    # Initialize variables
    Tstart = 0
    Tend   = Tmax - Tmin
    Toff   = ((1 / DutyCycle) - 1) * Ton
    K      = 10
    ok     = False

    while not ok:
        # Initialize variables
        Ns = int(np.ceil(0.5*K*(Tend-Tstart)/(Ton + Toff)))

        # Handle special case where duty cycle is 1
        if DutyCycle == 1:
            state, times = True, Tend
        else:
            state   = np.random.uniform() < DutyCycle
            on_dur  = np.random.exponential(scale=Ton,  size=Ns)
            off_dur = np.random.exponential(scale=Toff, size=Ns)

            times = np.zeros(2*Ns)
            if state == True:
                times[0::2] = on_dur
                times[1::2] = off_dur
            else:
                times[0::2] = off_dur
                times[1::2] = on_dur

        # Finalize the process generated
        times =  np.insert(np.cumsum(times), 0, 0)
        states = np.zeros_like(times, dtype=bool)
        states[0::2] = state
        states[1::2] = not state

        # Validate the sequence
        if times[-1] >= Tend: ok = True
        else: K += 1

    # Trim end of generated sequence to match Tend
    times[times > Tend] = Tend
    idx = np.argmax(times == Tend)+1
    if idx != 0 and DutyCycle != 1.0 and idx != len(times):
        times  = times[0:idx]
        states = states[0:idx]

    # Shift times to Tmin, Tmax
    times += Tmin

    return times, states

def generate_markov_bundles(BS, Rb, Lat, Tmin, Tmax, DutyCycle, Ton):
    # Generate Markov intervals
    times, states = two_state_markov_process(Tmin, Tmax, DutyCycle, Ton)

    # Initial processing entry. If initial state is OFF, skip it
    ini = (states[0] == False)

    # Initialize variables
    t     = []
    buf   = 0
    state = True

    # Iterate over periods
    for i in range(ini, len(states)-1):
        # Handle OFF state only if buffer is not empty
        if state == False and buf != 0:
            # t_ref indicates the time at which the last bundle was sent. If no
            # bundles were ever sent, assume 0.
            t_ref = 0 if len(t) == 0 else t[-1]

            # If waiting for the start of the ON period will make you exceed
            # the latency requirement, send a bundle with half data half padding.
            while t_ref + Lat < times[i+1] and buf >= BS:
                t_ref = max(t_ref, times[i]) + Lat
                t.append(t)
                buf -= BS

        # Handle ON state
        if state == True:
            dv    = buf + Rb * (times[i+1] - times[i])
            N_bnd = int(np.floor(dv / BS))
            t_bnd = times[i] + np.arange(1,N_bnd+1)*(BS / Rb)
            if len(t_bnd) > 0: t_bnd -= buf/Rb
            t_bnd = t_bnd[t_bnd <= times[i+1]]
            t.extend(t_bnd)
            buf = dv - N_bnd * BS

        # Switch state
        state = not state

    # Add one last bundle add the end of t to transmit all unaccounted data.
    # Note that this bundle might have some padding data
    if buf > 0:
        t_ref = times[-1] if len(t) == 0 else t[-1]
        if states[-1] == False:
            t.append(t_ref + Lat)
        else:
            t.append(max(t_ref, times[-1])+Lat)
        buf = 0

    # return times at which a bundle is delivered, and the amount of data left at the end
    return t, buf

def generate_bundles(traffic, id2alias, min_bundle_size=1024, max_bundle_size=8e9, lat_frac=0.5):
    # Get a map from node alias to ids
    alias2id = {v: k for k, v in id2alias.items()}

    # Get simulation start time
    t0 = min([flow['StartTime'] for _, flow in traffic.items()])

    # Iterate over flows
    for fid, flow in traffic.items():
        # Get the numeric latency
        flow['Latency'] = lat.loc[flow['DataType'].lower(), flow['Latency'].lower()]

        # Compute bundle size
        bundle_lat = flow['Latency']*min(lat_frac, flow['DutyCycle'])
        bundle_sz  = min(max(min_bundle_size, int(flow['DataRate']*bundle_lat)), max_bundle_size)

        # Get start and time for this flow
        Tmin = (flow['StartTime'] - t0).total_seconds()
        Tmax = (flow['EndTime'] - t0).total_seconds()

        # Generate bundles
        t, _ = generate_markov_bundles(bundle_sz, flow['DataRate'], flow['Latency'],
                                       Tmin, Tmax, flow['DutyCycle'], flow['Duration'])

        # Store the bundle times and size
        flow['Bundles']    = t
        flow['BundleSize'] = bundle_sz
        flow['fid']        = fid

        # Transform names of flows from alias to ids
        flow['Orig'] = alias2id[flow['TransElementName']]
        flow['Dest'] = alias2id[flow['ReceiveElementName']]

    return traffic

# ============================================================================================================
# === SIMULATION CLASS
# ============================================================================================================

[docs]class DtnMarkovBundleGenerator(DtnAbstractGenerator): _all_flows = None def __init__(self, env, parent, props): super().__init__(env, parent, props) # Initialize variables self.traffic_file = self.config['globals'].indir / props.file def reset(self): # Reset static variables super().reset() self.__class__._all_flows = None def initialize(self): # Setting static variables only once if not self.__class__._all_flows: self.load_flows() # Get flows for this generator self.flows = self.__class__._all_flows[self.parent.nid] # Iterate over all flows for this generator for _, flow in self.flows.items(): self.env.process(self.run(flow)) def load_flows(self): # Load generators file traffic = shift_traffic(load_traffic_file(self.traffic_file), self.epoch) # Generate bundles id2alias = {nid: dd.alias for nid, dd in self.config['network'].nodes.items()} flows = generate_bundles(traffic, id2alias, min_bundle_size=int(self.props.min_bundle_size), max_bundle_size=float(self.props.max_bundle_size), lat_frac=float(self.props.latency_fraction)) # Log bundle generation for fid, flow in flows.items(): if len(flow['Bundles']) == 0: self.disp('Flow {}: No bundles generated', fid) else: self.disp('Flow {}: {} bundles generated between t={:.3f} and t={:.3f}', fid, len(flow['Bundles']), min(flow['Bundles']), max(flow['Bundles'])) # Create a dictionary of dictionaries or dictionary: {Node ID: {flow id: {flow props}} d = defaultdict(dict) for fid, flow in flows.items(): d[flow['Orig']][fid] = flow # Store all the flows generated self.__class__._all_flows = d def run(self, flow): # If no bundles, return if len(flow['Bundles']) == 0: return # Initialize variables bnd_dt = np.insert(np.diff(flow['Bundles']), 0, flow['Bundles'][0]) # Iterate over bundle transmit times for dt in bnd_dt: # Wait until next time to transmit yield self.env.timeout(dt) # Create a new bundle and record it new_bundle = Bundle.from_flow(self.env, flow) # Monitor the new bundle creation self.monitor_new_bundle(new_bundle) # Log the new bundle creation self.disp('{} is created at node {}', new_bundle, self.parent.nid) # Schedule routers of bundle self.parent.forward(new_bundle)
[docs] def predicted_data_vol(self): """ Predicted data volume in [bits] """ return sum(f['DataRate']*((f['EndTime']-f['StartTime']).total_seconds()) for f in self.flows.values())