Traffic Volume Control

 

[Motivation] The network administrator usually needs to control the traffic volume in order to prevent too much p2p or on-line game traffic from affecting normal data traffic in the university campus in Taiwan. Therefore, when the usage of a specific network card exceeds the pre-defined value, this network card cannot work correctly. After one data, the quota is reset, and the network card can work again. In order to guide them how to use SDN technology to control data traffic volume, I write a simple example to control one-way traffic (UDP-based) to demonstrate this functionality. I hope my continue this work to support two-ways (TCP-based) traffic volume control.

 

[Steps]

1. Use http://www.ramonfontes.com/vnd/ this framework to create the following network topology and generate the corresponding mininet script.

2. mininet script (h3, h4 are the senders and h5 and h11 are the receivers)

#!/usr/bin/python

 

"""

Script created by VND - Visual Network Description (SDN version)

"""

from mininet.net import Mininet

from mininet.node import Controller, RemoteController, OVSKernelSwitch, OVSLegacyKernelSwitch, UserSwitch

from mininet.cli import CLI

from mininet.log import setLogLevel

from mininet.link import Link, TCLink

 

def topology():

    "Create a network."

    net = Mininet( controller=RemoteController, link=TCLink, switch=OVSKernelSwitch )

 

    print "*** Creating nodes"

    s1 = net.addSwitch( 's1', listenPort=6673, mac='00:00:00:00:00:01' )

    s2 = net.addSwitch( 's2', listenPort=6674, mac='00:00:00:00:00:02' )

    h3 = net.addHost( 'h3', mac='00:00:00:00:00:03', ip='10.0.0.3/8' )

    h4 = net.addHost( 'h4', mac='00:00:00:00:00:04', ip='10.0.0.4/8' )

    h5 = net.addHost( 'h5', mac='00:00:00:00:00:05', ip='10.0.0.5/8' )

    c6 = net.addController( 'c6', controller=RemoteController, ip='127.0.0.1', port=6633 )

    h11 = net.addHost( 'h11', mac='00:00:00:00:00:11', ip='10.0.0.11/8' )

 

    print "*** Creating links"

    net.addLink(s2, h11, 3, 0)

    net.addLink(s2, h5, 2, 0)

    net.addLink(s1, s2, 3, 1)

    net.addLink(h4, s1, 0, 2)

    net.addLink(h3, s1, 0, 1)

 

    print "*** Starting network"

    net.build()

    c6.start()

    s2.start( [c6] )

    s1.start( [c6] )

 

    print "*** Running CLI"

    CLI( net )

 

    print "*** Stopping network"

    net.stop()

 

if __name__ == '__main__':

    setLogLevel( 'info' )

    topology()

 

 

 

2. traffic_controll3.py (Traffic volume control module for POX controller. Put this file under /pox/ext)

# Copyright 2012-2013 James McCauley

#

# Licensed under the Apache License, Version 2.0 (the "License");

# you may not use this file except in compliance with the License.

# You may obtain a copy of the License at:

#

#     http://www.apache.org/licenses/LICENSE-2.0

#

# Unless required by applicable law or agreed to in writing, software

# distributed under the License is distributed on an "AS IS" BASIS,

# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

# See the License for the specific language governing permissions and

# limitations under the License.

 

"""

A shortest-path forwarding application.

 

This is a standalone L2 switch that learns ethernet addresses

across the entire network and picks short paths between them.

 

You shouldn't really write an application this way -- you should

keep more state in the controller (that is, your flow tables),

and/or you should make your topology more static.  However, this

does (mostly) work. :)

 

Depends on openflow.discovery

Works with openflow.spanning_tree

"""

 

from pox.core import core

import pox.openflow.libopenflow_01 as of

from pox.lib.revent import *

from pox.lib.recoco import Timer

from collections import defaultdict

from pox.openflow.discovery import Discovery

from pox.lib.util import dpid_to_str

import time

 

log = core.getLogger()

 

# Adjacency map.  [sw1][sw2] -> port from sw1 to sw2

adjacency = defaultdict(lambda:defaultdict(lambda:None))

 

# Switches we know of.  [dpid] -> Switch

switches = {}

 

# ethaddr -> (switch, port)

mac_map = {}

 

# Waiting path.  (dpid,xid)->WaitingPath

waiting_paths = {}

 

# Time to not flood in seconds

FLOOD_HOLDDOWN = 5

 

# Flow timeouts

FLOW_IDLE_TIMEOUT =      3

FLOW_HARD_TIMEOUT =    0

 

# How long is allowable to set up a path?

PATH_SETUP_TIME = 4

 

# [mac_addr]->bytes

//measured in bytes, if quota is set to 0, it means that there is no control for traffic volume.

quota = 300000

used = {}

bytes_received = {}

 

#path (dl_src, dl_dst) -> path

mypath={}

 

year=2015

month=1

day=1

 

def _renew():

  global year, month, day

  flock = time.localtime()

  #print str(year), str(month), str(day), str(flock.tm_year), str(flock.tm_mon), str(flock.tm_mday)

  if flock.tm_year > year:

    return True

  elif flock.tm_mon > month:

    return True

  elif flock.tm_mday > day:

    return True

  else:

    return False

 

def _get_raw_path (src,dst):

  #Bellman-Ford algorithm

  #print "src=",src," dst=",dst

  distance = {}

  previous = {}      

  sws = switches.values()

 

  for dpid in sws:

    distance[dpid] = 9999

    previous[dpid] = None

 

  distance[src]=0

 

  for m in range(len(sws)-1):

    for p in sws:

      for q in sws:

        if adjacency[p][q]!=None:

           w = 1

           if distance[p] + w < distance[q]:

             distance[q] = distance[p] + w

             previous[q] = p

  r=[]

  p=dst

  r.append(p)

  q=previous[p]

  while q is not None:

    if q == src:

      r.append(q)

      break

    p=q

    r.append(p)

    q=previous[p]

 

  r.reverse() 

  return r

 

def _check_path (p):

  """

  Make sure that a path is actually a string of nodes with connected ports

 

  returns True if path is valid

  """

  for a,b in zip(p[:-1],p[1:]):

    if adjacency[a[0]][b[0]] != a[2]:

      return False

    if adjacency[b[0]][a[0]] != b[1]:

      return False

  return True

 

 

def _get_path (src, dst, first_port, final_port, match, event):

  """

  Gets a cooked path -- a list of (node,in_port,out_port)

  """

  # Start with a raw path...

  if src == dst:

    path = [src]

  else:

    path = _get_raw_path(src, dst)

    if path is None: return None

    #print "src=",src," dst=",dst, " path=", path

    if (match.dl_src in used.keys())==False:

      used[(match.dl_src)] = 0

      print "used[", match.dl_src, "] is reset to 0"

    if mac_map.get(match.dl_src)[0]==switches[event.connection.dpid] and (mac_map.get(match.dl_src)[0] in path) and (mac_map.get(match.dl_dst)[0] in path):

      mypath[(match.dl_src,match.dl_dst,match.tp_src,match.tp_dst,match.nw_proto)] = path

      if match.tp_src is not None and match.tp_src!=0 and match.tp_dst is not None and match.tp_dst!=0  and ((path[0],match.dl_src,match.dl_dst,match.tp_src,match.tp_dst,match.nw_proto) in bytes_received.keys())==False:

        bytes_received[(path[0],match.dl_src,match.dl_dst,match.tp_src,match.tp_dst,match.nw_proto)]=0

       

  # Now add the ports

  r = []

  in_port = first_port

  for s1,s2 in zip(path[:-1],path[1:]):

    out_port = adjacency[s1][s2]

    r.append((s1,in_port,out_port))

    in_port = adjacency[s2][s1]

  r.append((dst,in_port,final_port))

 

  assert _check_path(r), "Illegal path!"

 

  return r

 

 

class WaitingPath (object):

  """

  A path which is waiting for its path to be established

  """

  def __init__ (self, path, packet):

    """

    xids is a sequence of (dpid,xid)

    first_switch is the DPID where the packet came from

    packet is something that can be sent in a packet_out

    """

    self.expires_at = time.time() + PATH_SETUP_TIME

    self.path = path

    self.first_switch = path[0][0].dpid

    self.xids = set()

    self.packet = packet

 

    if len(waiting_paths) > 1000:

      WaitingPath.expire_waiting_paths()

 

  def add_xid (self, dpid, xid):

    self.xids.add((dpid,xid))

    waiting_paths[(dpid,xid)] = self

 

  @property

  def is_expired (self):

    return time.time() >= self.expires_at

 

  def notify (self, event):

    """

    Called when a barrier has been received

    """

    self.xids.discard((event.dpid,event.xid))

    if len(self.xids) == 0:

      # Done!

      if self.packet:

        log.debug("Sending delayed packet out %s"

                  % (dpid_to_str(self.first_switch),))

        msg = of.ofp_packet_out(data=self.packet,

            action=of.ofp_action_output(port=of.OFPP_TABLE))

        core.openflow.sendToDPID(self.first_switch, msg)

 

      core.l2_multi.raiseEvent(PathInstalled(self.path))

 

 

  @staticmethod

  def expire_waiting_paths ():

    packets = set(waiting_paths.values())

    killed = 0

    for p in packets:

      if p.is_expired:

        killed += 1

        for entry in p.xids:

          waiting_paths.pop(entry, None)

    if killed:

      log.error("%i paths failed to install" % (killed,))

 

 

class PathInstalled (Event):

  """

  Fired when a path is installed

  """

  def __init__ (self, path):

    Event.__init__(self)

    self.path = path

 

 

class Switch (EventMixin):

  def __init__ (self):

    self.connection = None

    self.ports = None

    self.dpid = None

    self._listeners = None

    self._connected_at = None

 

  def __repr__ (self):

    return dpid_to_str(self.dpid)

 

  def _install (self, switch, in_port, out_port, match, buf = None):

    msg = of.ofp_flow_mod()

    msg.flags = of.OFPFF_SEND_FLOW_REM

    msg.match = match

    msg.match.in_port = in_port

    msg.idle_timeout = FLOW_IDLE_TIMEOUT

    msg.hard_timeout = FLOW_HARD_TIMEOUT

    msg.actions.append(of.ofp_action_output(port = out_port))

    msg.buffer_id = buf

    switch.connection.send(msg)

 

  def _install_path (self, p, match, packet_in=None):

    wp = WaitingPath(p, packet_in)

    for sw,in_port,out_port in p:

      self._install(sw, in_port, out_port, match)

      msg = of.ofp_barrier_request()

      sw.connection.send(msg)

      wp.add_xid(sw.dpid,msg.xid)

 

  def install_path (self, dst_sw, last_port, match, event):

    """

    Attempts to install a path between this switch and some destination

    """

    if _renew():

       global year, month, day

       used.clear()

       flock = time.localtime()

       year=flock.tm_year

       month=flock.tm_mon

       day=flock.tm_mday

       print "Renew"

     

    p = _get_path(self, dst_sw, event.port, last_port, match, event)

    #print time.time(), "install_path is called at:", event.connection.dpid, p

     

    if used[(match.dl_src)] is not None and used[(match.dl_src)]>quota and quota:

      p = None

 

    if p is None:

      log.warning("Can't get from %s to %s", match.dl_src, match.dl_dst)

 

      import pox.lib.packet as pkt

 

      if (match.dl_type == pkt.ethernet.IP_TYPE and

          event.parsed.find('ipv4')):

        # It's IP -- let's send a destination unreachable

        log.debug("Dest unreachable (%s -> %s)",

                  match.dl_src, match.dl_dst)

 

        from pox.lib.addresses import EthAddr

        e = pkt.ethernet()

        e.src = EthAddr(dpid_to_str(self.dpid)) #FIXME: Hmm...

        e.dst = match.dl_src

        e.type = e.IP_TYPE

        ipp = pkt.ipv4()

        ipp.protocol = ipp.ICMP_PROTOCOL

        ipp.srcip = match.nw_dst #FIXME: Ridiculous

        ipp.dstip = match.nw_src

        icmp = pkt.icmp()

        icmp.type = pkt.ICMP.TYPE_DEST_UNREACH

        icmp.code = pkt.ICMP.CODE_UNREACH_HOST

        orig_ip = event.parsed.find('ipv4')

 

        d = orig_ip.pack()

        d = d[:orig_ip.hl * 4 + 8]

        import struct

        d = struct.pack("!HH", 0,0) + d #FIXME: MTU

        icmp.payload = d

        ipp.payload = icmp

        e.payload = ipp

        msg = of.ofp_packet_out()

        msg.actions.append(of.ofp_action_output(port = event.port))

        msg.data = e.pack()

        self.connection.send(msg)

 

      return

 

    log.debug("Installing path for %s -> %s %04x (%i hops)",

        match.dl_src, match.dl_dst, match.dl_type, len(p))

 

    # We have a path -- install it

    self._install_path(p, match, event.ofp)

 

    # Now reverse it and install it backwards

    # (we'll just assume that will work)

    p = [(sw,out_port,in_port) for sw,in_port,out_port in p]

    self._install_path(p, match.flip())

 

 

  def _handle_PacketIn (self, event):

    def flood ():

      """ Floods the packet """

      if self.is_holding_down:

        log.warning("Not flooding -- holddown active")

      msg = of.ofp_packet_out()

      # OFPP_FLOOD is optional; some switches may need OFPP_ALL

      msg.actions.append(of.ofp_action_output(port = of.OFPP_FLOOD))

      msg.buffer_id = event.ofp.buffer_id

      msg.in_port = event.port

      self.connection.send(msg)

 

    def drop ():

      # Kill the buffer

      if event.ofp.buffer_id is not None:

        msg = of.ofp_packet_out()

        msg.buffer_id = event.ofp.buffer_id

        event.ofp.buffer_id = None # Mark is dead

        msg.in_port = event.port

        self.connection.send(msg)

 

    packet = event.parsed

 

    loc = (self, event.port) # Place we saw this ethaddr

    oldloc = mac_map.get(packet.src) # Place we last saw this ethaddr

 

    if packet.effective_ethertype == packet.LLDP_TYPE:

      drop()

      return

 

    if oldloc is None:

      if packet.src.is_multicast == False:

        mac_map[packet.src] = loc # Learn position for ethaddr

        log.debug("Learned %s at %s.%i", packet.src, loc[0], loc[1])

    elif oldloc != loc:

      # ethaddr seen at different place!

      if core.openflow_discovery.is_edge_port(loc[0].dpid, loc[1]):

        # New place is another "plain" port (probably)

        log.debug("%s moved from %s.%i to %s.%i?", packet.src,

                  dpid_to_str(oldloc[0].dpid), oldloc[1],

                  dpid_to_str(   loc[0].dpid),    loc[1])

        if packet.src.is_multicast == False:

          mac_map[packet.src] = loc # Learn position for ethaddr

          log.debug("Learned %s at %s.%i", packet.src, loc[0], loc[1])

      elif packet.dst.is_multicast == False:

        # New place is a switch-to-switch port!

        # Hopefully, this is a packet we're flooding because we didn't

        # know the destination, and not because it's somehow not on a

        # path that we expect it to be on.

        # If spanning_tree is running, we might check that this port is

        # on the spanning tree (it should be).

        if packet.dst in mac_map:

          # Unfortunately, we know the destination.  It's possible that

          # we learned it while it was in flight, but it's also possible

          # that something has gone wrong.

          log.warning("Packet from %s to known destination %s arrived "

                      "at %s.%i without flow", packet.src, packet.dst,

                      dpid_to_str(self.dpid), event.port)

 

 

    if packet.dst.is_multicast:

      log.debug("Flood multicast from %s", packet.src)

      flood()

    else:

      if packet.dst not in mac_map:

        log.debug("%s unknown -- flooding" % (packet.dst,))

        flood()

      else:

        dest = mac_map[packet.dst]

        match = of.ofp_match.from_packet(packet)

        self.install_path(dest[0], dest[1], match, event)

 

  def disconnect (self):

    if self.connection is not None:

      log.debug("Disconnect %s" % (self.connection,))

      self.connection.removeListeners(self._listeners)

      self.connection = None

      self._listeners = None

 

  def connect (self, connection):

    if self.dpid is None:

      self.dpid = connection.dpid

    assert self.dpid == connection.dpid

    if self.ports is None:

      self.ports = connection.features.ports

    self.disconnect()

    log.debug("Connect %s" % (connection,))

    self.connection = connection

    self._listeners = self.listenTo(connection)

    self._connected_at = time.time()

 

  @property

  def is_holding_down (self):

    if self._connected_at is None: return True

    if time.time() - self._connected_at > FLOOD_HOLDDOWN:

      return False

    return True

 

  def _handle_ConnectionDown (self, event):

    self.disconnect()

 

 

class l2_multi (EventMixin):

 

  _eventMixin_events = set([

    PathInstalled,

  ])

 

  def __init__ (self):

    # Listen to dependencies

    Timer(2, _timer_func, recurring=True)

 

    def startup ():

      core.openflow.addListeners(self, priority=0)

      core.openflow_discovery.addListeners(self)

    core.call_when_ready(startup, ('openflow','openflow_discovery'))

 

  def _handle_FlowStatsReceived (self, event):

      log.debug("Got FlowStatsReceived: %s", event.connection)

      print time.time(), event.connection, "Got FlowStatsReceived"

     

      for f in event.stats:

        for a,b in mypath.items():

           #print "a=", a, "b=", b

           #upload

           if f.match.dl_src == a[0] and f.match.dl_dst == a[1] and f.match.tp_src==a[2] and a[2] is not None and a[3] is not None and f.match.tp_dst==a[3] and f.match.nw_proto==a[4] and f.match.tp_src!=0 and f.match.tp_dst!=0 and mypath[(f.match.dl_src,  f.match.dl_dst, f.match.tp_src, f.match.tp_dst, f.match.nw_proto)][0]==switches[event.connection.dpid]:

              #print "SSS:", f.match.dl_src,  f.match.dl_dst, f.match.tp_src, f.match.tp_dst, f.match.nw_proto,mypath[(f.match.dl_src,  f.match.dl_dst, f.match.tp_src, f.match.tp_dst, f.match.nw_proto)][0]

              print "upload) ", a[0], "->", a[1], ":", f.byte_count, bytes_received[(b[0],f.match.dl_src,f.match.dl_dst,f.match.tp_src,f.match.tp_dst,f.match.nw_proto)], f.byte_count - bytes_received[(b[0],f.match.dl_src,f.match.dl_dst,f.match.tp_src,f.match.tp_dst,f.match.nw_proto)]

              if f.byte_count >=bytes_received[(b[0],f.match.dl_src,f.match.dl_dst,f.match.tp_src,f.match.tp_dst,f.match.nw_proto)]:

                used[(f.match.dl_src)]+=(f.byte_count - bytes_received[(b[0],f.match.dl_src,f.match.dl_dst,f.match.tp_src,f.match.tp_dst,f.match.nw_proto)])

              bytes_received[(b[0],f.match.dl_src,f.match.dl_dst,f.match.tp_src,f.match.tp_dst,f.match.nw_proto)] = f.byte_count

              print "upload) used[", f.match.dl_src, "]=", used[(f.match.dl_src)]

                  

              if used[(f.match.dl_src)] > quota and quota:

                  msg = of.ofp_flow_mod()

                      msg.command=of.OFPFC_DELETE_STRICT

                  msg.match = f.match

                  switches[event.connection.dpid].connection.send(msg)  

 

  def _handle_FlowRemoved (self, event):

      log.debug("Got FlowRemoved: %s", event.connection)

      #print time.time(), event.connection, "Got FlowRemoved"

    

  def _handle_LinkEvent (self, event):

    def flip (link):

      return Discovery.Link(link[2],link[3], link[0],link[1])

 

    l = event.link

    sw1 = switches[l.dpid1]

    sw2 = switches[l.dpid2]

 

    # Invalidate all flows and path info.

    # For link adds, this makes sure that if a new link leads to an

    # improved path, we use it.

    # For link removals, this makes sure that we don't use a

    # path that may have been broken.

    #NOTE: This could be radically improved! (e.g., not *ALL* paths break)

    clear = of.ofp_flow_mod(command=of.OFPFC_DELETE)

    for sw in switches.itervalues():

      if sw.connection is None: continue

      sw.connection.send(clear)

 

    if event.removed:

      # This link no longer okay

      if sw2 in adjacency[sw1]: del adjacency[sw1][sw2]

      if sw1 in adjacency[sw2]: del adjacency[sw2][sw1]

 

      # But maybe there's another way to connect these...

      for ll in core.openflow_discovery.adjacency:

        if ll.dpid1 == l.dpid1 and ll.dpid2 == l.dpid2:

          if flip(ll) in core.openflow_discovery.adjacency:

            # Yup, link goes both ways

            adjacency[sw1][sw2] = ll.port1

            adjacency[sw2][sw1] = ll.port2

            # Fixed -- new link chosen to connect these

            break

    else:

      # If we already consider these nodes connected, we can

      # ignore this link up.

      # Otherwise, we might be interested...

      if adjacency[sw1][sw2] is None:

        # These previously weren't connected.  If the link

        # exists in both directions, we consider them connected now.

        if flip(l) in core.openflow_discovery.adjacency:

          # Yup, link goes both ways -- connected!

          adjacency[sw1][sw2] = l.port1

          adjacency[sw2][sw1] = l.port2

 

      # If we have learned a MAC on this port which we now know to

      # be connected to a switch, unlearn it.

      bad_macs = set()

      for mac,(sw,port) in mac_map.iteritems():

        if sw is sw1 and port == l.port1: bad_macs.add(mac)

        if sw is sw2 and port == l.port2: bad_macs.add(mac)

      for mac in bad_macs:

        log.debug("Unlearned %s", mac)

        del mac_map[mac]

 

  def _handle_ConnectionUp (self, event):

    sw = switches.get(event.dpid)

    if sw is None:

      # New switch

      sw = Switch()

      switches[event.dpid] = sw

      sw.connect(event.connection)

    else:

      sw.connect(event.connection)

 

  def _handle_BarrierIn (self, event):

    wp = waiting_paths.pop((event.dpid,event.xid), None)

    if not wp:

      #log.info("No waiting packet %s,%s", event.dpid, event.xid)

      return

    #log.debug("Notify waiting packet %s,%s", event.dpid, event.xid)

    wp.notify(event)

 

def _timer_func ():

  #print time.time(), "timer_func is called"

  sent_sw={None}

  for a,b  in mypath.iteritems():

    if b[0] not in sent_sw:

      print time.time(), "send out ofp_flow_stats_request to dpid:", b[0]

      core.openflow.getConnection(b[0].dpid).send(of.ofp_stats_request(body=of.ofp_flow_stats_request()))

      sent_sw.add(b[0])

 

def launch ():

  global year, month, day

  core.registerNew(l2_multi)

  flock = time.localtime()

  year=flock.tm_year

  month=flock.tm_mon

  day=flock.tm_mday

  print "Now:", str(year), ":", str(month), ":", str(day)

 

  timeout = min(max(PATH_SETUP_TIME, 5) * 2, 15)

  Timer(timeout, WaitingPath.expire_waiting_paths, recurring=True)

 

4. Run the Pox controller.

 

5. Run the mininet script.

 

6. Open xterm windows for h3, h4, h5, and h11.

 

7. Run the iperf UDP servers on h5 and h11.

 

8. Run the iperf UDP client on h3 first and check the output of the pox controller. From the output of the pox controller, when the used[00:00:00:00:00:03] exceeds the pre-defined value, i.e.300000 bytes, h5 will not show any further information. Because the h3->h5 is blocked.

 

9. Run the iperf UDP client on h4. h4-h11 is not blocked. Because the pre-defined value is not reached.

 

10. Change the system time and see if the h3 can send data again. The quota is reset and h3 can send the data packets again.

 

Dr. Chih-Heng Ke

Department of Computer Science and Information Engineering, National Quemoy University, Kinmen, Taiwan

Email: smallko@gmail.com / smallko@nqu.edu.tw