Traffic Volume Control


[Motivation] The network administrator usually needs to control the traffic volume in order to prevent too much p2p or on-line game traffic from affecting normal data traffic in the university campus in Taiwan. Therefore, when the usage of a specific network card exceeds the pre-defined value, this network card cannot work correctly. After one data, the quota is reset, and the network card can work again. In order to guide them how to use SDN technology to control data traffic volume, I write a simple example to control one-way traffic (UDP-based) to demonstrate this functionality. I hope my continue this work to support two-ways (TCP-based) traffic volume control.



1. Use this framework to create the following network topology and generate the corresponding mininet script.

2. mininet script (h3, h4 are the senders and h5 and h11 are the receivers)




Script created by VND - Visual Network Description (SDN version)


from import Mininet

from mininet.node import Controller, RemoteController, OVSKernelSwitch, OVSLegacyKernelSwitch, UserSwitch

from mininet.cli import CLI

from mininet.log import setLogLevel

from import Link, TCLink


def topology():

    "Create a network."

    net = Mininet( controller=RemoteController, link=TCLink, switch=OVSKernelSwitch )


    print "*** Creating nodes"

    s1 = net.addSwitch( 's1', listenPort=6673, mac='00:00:00:00:00:01' )

    s2 = net.addSwitch( 's2', listenPort=6674, mac='00:00:00:00:00:02' )

    h3 = net.addHost( 'h3', mac='00:00:00:00:00:03', ip='' )

    h4 = net.addHost( 'h4', mac='00:00:00:00:00:04', ip='' )

    h5 = net.addHost( 'h5', mac='00:00:00:00:00:05', ip='' )

    c6 = net.addController( 'c6', controller=RemoteController, ip='', port=6633 )

    h11 = net.addHost( 'h11', mac='00:00:00:00:00:11', ip='' )


    print "*** Creating links"

    net.addLink(s2, h11, 3, 0)

    net.addLink(s2, h5, 2, 0)

    net.addLink(s1, s2, 3, 1)

    net.addLink(h4, s1, 0, 2)

    net.addLink(h3, s1, 0, 1)


    print "*** Starting network"


    s2.start( [c6] )

    s1.start( [c6] )


    print "*** Running CLI"

    CLI( net )


    print "*** Stopping network"



if __name__ == '__main__':

    setLogLevel( 'info' )





2. (Traffic volume control module for POX controller. Put this file under /pox/ext)

# Copyright 2012-2013 James McCauley


# Licensed under the Apache License, Version 2.0 (the "License");

# you may not use this file except in compliance with the License.

# You may obtain a copy of the License at:




# Unless required by applicable law or agreed to in writing, software

# distributed under the License is distributed on an "AS IS" BASIS,


# See the License for the specific language governing permissions and

# limitations under the License.



A shortest-path forwarding application.


This is a standalone L2 switch that learns ethernet addresses

across the entire network and picks short paths between them.


You shouldn't really write an application this way -- you should

keep more state in the controller (that is, your flow tables),

and/or you should make your topology more static.  However, this

does (mostly) work. :)


Depends on openflow.discovery

Works with openflow.spanning_tree



from pox.core import core

import pox.openflow.libopenflow_01 as of

from pox.lib.revent import *

from pox.lib.recoco import Timer

from collections import defaultdict

from pox.openflow.discovery import Discovery

from pox.lib.util import dpid_to_str

import time


log = core.getLogger()


# Adjacency map.  [sw1][sw2] -> port from sw1 to sw2

adjacency = defaultdict(lambda:defaultdict(lambda:None))


# Switches we know of.  [dpid] -> Switch

switches = {}


# ethaddr -> (switch, port)

mac_map = {}


# Waiting path.  (dpid,xid)->WaitingPath

waiting_paths = {}


# Time to not flood in seconds



# Flow timeouts




# How long is allowable to set up a path?



# [mac_addr]->bytes

//measured in bytes, if quota is set to 0, it means that there is no control for traffic volume.

quota = 300000

used = {}

bytes_received = {}


#path (dl_src, dl_dst) -> path







def _renew():

  global year, month, day

  flock = time.localtime()

  #print str(year), str(month), str(day), str(flock.tm_year), str(flock.tm_mon), str(flock.tm_mday)

  if flock.tm_year > year:

    return True

  elif flock.tm_mon > month:

    return True

  elif flock.tm_mday > day:

    return True


    return False


def _get_raw_path (src,dst):

  #Bellman-Ford algorithm

  #print "src=",src," dst=",dst

  distance = {}

  previous = {}      

  sws = switches.values()


  for dpid in sws:

    distance[dpid] = 9999

    previous[dpid] = None




  for m in range(len(sws)-1):

    for p in sws:

      for q in sws:

        if adjacency[p][q]!=None:

           w = 1

           if distance[p] + w < distance[q]:

             distance[q] = distance[p] + w

             previous[q] = p





  while q is not None:

    if q == src:








  return r


def _check_path (p):


  Make sure that a path is actually a string of nodes with connected ports


  returns True if path is valid


  for a,b in zip(p[:-1],p[1:]):

    if adjacency[a[0]][b[0]] != a[2]:

      return False

    if adjacency[b[0]][a[0]] != b[1]:

      return False

  return True



def _get_path (src, dst, first_port, final_port, match, event):


  Gets a cooked path -- a list of (node,in_port,out_port)


  # Start with a raw path...

  if src == dst:

    path = [src]


    path = _get_raw_path(src, dst)

    if path is None: return None

    #print "src=",src," dst=",dst, " path=", path

    if (match.dl_src in used.keys())==False:

      used[(match.dl_src)] = 0

      print "used[", match.dl_src, "] is reset to 0"

    if mac_map.get(match.dl_src)[0]==switches[event.connection.dpid] and (mac_map.get(match.dl_src)[0] in path) and (mac_map.get(match.dl_dst)[0] in path):

      mypath[(match.dl_src,match.dl_dst,match.tp_src,match.tp_dst,match.nw_proto)] = path

      if match.tp_src is not None and match.tp_src!=0 and match.tp_dst is not None and match.tp_dst!=0  and ((path[0],match.dl_src,match.dl_dst,match.tp_src,match.tp_dst,match.nw_proto) in bytes_received.keys())==False:



  # Now add the ports

  r = []

  in_port = first_port

  for s1,s2 in zip(path[:-1],path[1:]):

    out_port = adjacency[s1][s2]


    in_port = adjacency[s2][s1]



  assert _check_path(r), "Illegal path!"


  return r



class WaitingPath (object):


  A path which is waiting for its path to be established


  def __init__ (self, path, packet):


    xids is a sequence of (dpid,xid)

    first_switch is the DPID where the packet came from

    packet is something that can be sent in a packet_out


    self.expires_at = time.time() + PATH_SETUP_TIME

    self.path = path

    self.first_switch = path[0][0].dpid

    self.xids = set()

    self.packet = packet


    if len(waiting_paths) > 1000:



  def add_xid (self, dpid, xid):


    waiting_paths[(dpid,xid)] = self



  def is_expired (self):

    return time.time() >= self.expires_at


  def notify (self, event):


    Called when a barrier has been received



    if len(self.xids) == 0:

      # Done!

      if self.packet:

        log.debug("Sending delayed packet out %s"

                  % (dpid_to_str(self.first_switch),))

        msg = of.ofp_packet_out(data=self.packet,


        core.openflow.sendToDPID(self.first_switch, msg)






  def expire_waiting_paths ():

    packets = set(waiting_paths.values())

    killed = 0

    for p in packets:

      if p.is_expired:

        killed += 1

        for entry in p.xids:

          waiting_paths.pop(entry, None)

    if killed:

      log.error("%i paths failed to install" % (killed,))



class PathInstalled (Event):


  Fired when a path is installed


  def __init__ (self, path):


    self.path = path



class Switch (EventMixin):

  def __init__ (self):

    self.connection = None

    self.ports = None

    self.dpid = None

    self._listeners = None

    self._connected_at = None


  def __repr__ (self):

    return dpid_to_str(self.dpid)


  def _install (self, switch, in_port, out_port, match, buf = None):

    msg = of.ofp_flow_mod()

    msg.flags = of.OFPFF_SEND_FLOW_REM

    msg.match = match

    msg.match.in_port = in_port

    msg.idle_timeout = FLOW_IDLE_TIMEOUT

    msg.hard_timeout = FLOW_HARD_TIMEOUT

    msg.actions.append(of.ofp_action_output(port = out_port))

    msg.buffer_id = buf



  def _install_path (self, p, match, packet_in=None):

    wp = WaitingPath(p, packet_in)

    for sw,in_port,out_port in p:

      self._install(sw, in_port, out_port, match)

      msg = of.ofp_barrier_request()




  def install_path (self, dst_sw, last_port, match, event):


    Attempts to install a path between this switch and some destination


    if _renew():

       global year, month, day


       flock = time.localtime()




       print "Renew"


    p = _get_path(self, dst_sw, event.port, last_port, match, event)

    #print time.time(), "install_path is called at:", event.connection.dpid, p


    if used[(match.dl_src)] is not None and used[(match.dl_src)]>quota and quota:

      p = None


    if p is None:

      log.warning("Can't get from %s to %s", match.dl_src, match.dl_dst)


      import pox.lib.packet as pkt


      if (match.dl_type == pkt.ethernet.IP_TYPE and


        # It's IP -- let's send a destination unreachable

        log.debug("Dest unreachable (%s -> %s)",

                  match.dl_src, match.dl_dst)


        from pox.lib.addresses import EthAddr

        e = pkt.ethernet()

        e.src = EthAddr(dpid_to_str(self.dpid)) #FIXME: Hmm...

        e.dst = match.dl_src

        e.type = e.IP_TYPE

        ipp = pkt.ipv4()

        ipp.protocol = ipp.ICMP_PROTOCOL

        ipp.srcip = match.nw_dst #FIXME: Ridiculous

        ipp.dstip = match.nw_src

        icmp = pkt.icmp()

        icmp.type = pkt.ICMP.TYPE_DEST_UNREACH

        icmp.code = pkt.ICMP.CODE_UNREACH_HOST

        orig_ip = event.parsed.find('ipv4')


        d = orig_ip.pack()

        d = d[:orig_ip.hl * 4 + 8]

        import struct

        d = struct.pack("!HH", 0,0) + d #FIXME: MTU

        icmp.payload = d

        ipp.payload = icmp

        e.payload = ipp

        msg = of.ofp_packet_out()

        msg.actions.append(of.ofp_action_output(port = event.port)) = e.pack()





    log.debug("Installing path for %s -> %s %04x (%i hops)",

        match.dl_src, match.dl_dst, match.dl_type, len(p))


    # We have a path -- install it

    self._install_path(p, match, event.ofp)


    # Now reverse it and install it backwards

    # (we'll just assume that will work)

    p = [(sw,out_port,in_port) for sw,in_port,out_port in p]

    self._install_path(p, match.flip())



  def _handle_PacketIn (self, event):

    def flood ():

      """ Floods the packet """

      if self.is_holding_down:

        log.warning("Not flooding -- holddown active")

      msg = of.ofp_packet_out()

      # OFPP_FLOOD is optional; some switches may need OFPP_ALL

      msg.actions.append(of.ofp_action_output(port = of.OFPP_FLOOD))

      msg.buffer_id = event.ofp.buffer_id

      msg.in_port = event.port



    def drop ():

      # Kill the buffer

      if event.ofp.buffer_id is not None:

        msg = of.ofp_packet_out()

        msg.buffer_id = event.ofp.buffer_id

        event.ofp.buffer_id = None # Mark is dead

        msg.in_port = event.port



    packet = event.parsed


    loc = (self, event.port) # Place we saw this ethaddr

    oldloc = mac_map.get(packet.src) # Place we last saw this ethaddr


    if packet.effective_ethertype == packet.LLDP_TYPE:




    if oldloc is None:

      if packet.src.is_multicast == False:

        mac_map[packet.src] = loc # Learn position for ethaddr

        log.debug("Learned %s at %s.%i", packet.src, loc[0], loc[1])

    elif oldloc != loc:

      # ethaddr seen at different place!

      if core.openflow_discovery.is_edge_port(loc[0].dpid, loc[1]):

        # New place is another "plain" port (probably)

        log.debug("%s moved from %s.%i to %s.%i?", packet.src,

                  dpid_to_str(oldloc[0].dpid), oldloc[1],

                  dpid_to_str(   loc[0].dpid),    loc[1])

        if packet.src.is_multicast == False:

          mac_map[packet.src] = loc # Learn position for ethaddr

          log.debug("Learned %s at %s.%i", packet.src, loc[0], loc[1])

      elif packet.dst.is_multicast == False:

        # New place is a switch-to-switch port!

        # Hopefully, this is a packet we're flooding because we didn't

        # know the destination, and not because it's somehow not on a

        # path that we expect it to be on.

        # If spanning_tree is running, we might check that this port is

        # on the spanning tree (it should be).

        if packet.dst in mac_map:

          # Unfortunately, we know the destination.  It's possible that

          # we learned it while it was in flight, but it's also possible

          # that something has gone wrong.

          log.warning("Packet from %s to known destination %s arrived "

                      "at %s.%i without flow", packet.src, packet.dst,

                      dpid_to_str(self.dpid), event.port)



    if packet.dst.is_multicast:

      log.debug("Flood multicast from %s", packet.src)



      if packet.dst not in mac_map:

        log.debug("%s unknown -- flooding" % (packet.dst,))



        dest = mac_map[packet.dst]

        match = of.ofp_match.from_packet(packet)

        self.install_path(dest[0], dest[1], match, event)


  def disconnect (self):

    if self.connection is not None:

      log.debug("Disconnect %s" % (self.connection,))


      self.connection = None

      self._listeners = None


  def connect (self, connection):

    if self.dpid is None:

      self.dpid = connection.dpid

    assert self.dpid == connection.dpid

    if self.ports is None:

      self.ports = connection.features.ports


    log.debug("Connect %s" % (connection,))

    self.connection = connection

    self._listeners = self.listenTo(connection)

    self._connected_at = time.time()



  def is_holding_down (self):

    if self._connected_at is None: return True

    if time.time() - self._connected_at > FLOOD_HOLDDOWN:

      return False

    return True


  def _handle_ConnectionDown (self, event):




class l2_multi (EventMixin):


  _eventMixin_events = set([




  def __init__ (self):

    # Listen to dependencies

    Timer(2, _timer_func, recurring=True)


    def startup ():

      core.openflow.addListeners(self, priority=0)


    core.call_when_ready(startup, ('openflow','openflow_discovery'))


  def _handle_FlowStatsReceived (self, event):

      log.debug("Got FlowStatsReceived: %s", event.connection)

      print time.time(), event.connection, "Got FlowStatsReceived"


      for f in event.stats:

        for a,b in mypath.items():

           #print "a=", a, "b=", b


           if f.match.dl_src == a[0] and f.match.dl_dst == a[1] and f.match.tp_src==a[2] and a[2] is not None and a[3] is not None and f.match.tp_dst==a[3] and f.match.nw_proto==a[4] and f.match.tp_src!=0 and f.match.tp_dst!=0 and mypath[(f.match.dl_src,  f.match.dl_dst, f.match.tp_src, f.match.tp_dst, f.match.nw_proto)][0]==switches[event.connection.dpid]:

              #print "SSS:", f.match.dl_src,  f.match.dl_dst, f.match.tp_src, f.match.tp_dst, f.match.nw_proto,mypath[(f.match.dl_src,  f.match.dl_dst, f.match.tp_src, f.match.tp_dst, f.match.nw_proto)][0]

              print "upload) ", a[0], "->", a[1], ":", f.byte_count, bytes_received[(b[0],f.match.dl_src,f.match.dl_dst,f.match.tp_src,f.match.tp_dst,f.match.nw_proto)], f.byte_count - bytes_received[(b[0],f.match.dl_src,f.match.dl_dst,f.match.tp_src,f.match.tp_dst,f.match.nw_proto)]

              if f.byte_count >=bytes_received[(b[0],f.match.dl_src,f.match.dl_dst,f.match.tp_src,f.match.tp_dst,f.match.nw_proto)]:

                used[(f.match.dl_src)]+=(f.byte_count - bytes_received[(b[0],f.match.dl_src,f.match.dl_dst,f.match.tp_src,f.match.tp_dst,f.match.nw_proto)])

              bytes_received[(b[0],f.match.dl_src,f.match.dl_dst,f.match.tp_src,f.match.tp_dst,f.match.nw_proto)] = f.byte_count

              print "upload) used[", f.match.dl_src, "]=", used[(f.match.dl_src)]


              if used[(f.match.dl_src)] > quota and quota:

                  msg = of.ofp_flow_mod()


                  msg.match = f.match



  def _handle_FlowRemoved (self, event):

      log.debug("Got FlowRemoved: %s", event.connection)

      #print time.time(), event.connection, "Got FlowRemoved"


  def _handle_LinkEvent (self, event):

    def flip (link):

      return Discovery.Link(link[2],link[3], link[0],link[1])


    l =

    sw1 = switches[l.dpid1]

    sw2 = switches[l.dpid2]


    # Invalidate all flows and path info.

    # For link adds, this makes sure that if a new link leads to an

    # improved path, we use it.

    # For link removals, this makes sure that we don't use a

    # path that may have been broken.

    #NOTE: This could be radically improved! (e.g., not *ALL* paths break)

    clear = of.ofp_flow_mod(command=of.OFPFC_DELETE)

    for sw in switches.itervalues():

      if sw.connection is None: continue



    if event.removed:

      # This link no longer okay

      if sw2 in adjacency[sw1]: del adjacency[sw1][sw2]

      if sw1 in adjacency[sw2]: del adjacency[sw2][sw1]


      # But maybe there's another way to connect these...

      for ll in core.openflow_discovery.adjacency:

        if ll.dpid1 == l.dpid1 and ll.dpid2 == l.dpid2:

          if flip(ll) in core.openflow_discovery.adjacency:

            # Yup, link goes both ways

            adjacency[sw1][sw2] = ll.port1

            adjacency[sw2][sw1] = ll.port2

            # Fixed -- new link chosen to connect these



      # If we already consider these nodes connected, we can

      # ignore this link up.

      # Otherwise, we might be interested...

      if adjacency[sw1][sw2] is None:

        # These previously weren't connected.  If the link

        # exists in both directions, we consider them connected now.

        if flip(l) in core.openflow_discovery.adjacency:

          # Yup, link goes both ways -- connected!

          adjacency[sw1][sw2] = l.port1

          adjacency[sw2][sw1] = l.port2


      # If we have learned a MAC on this port which we now know to

      # be connected to a switch, unlearn it.

      bad_macs = set()

      for mac,(sw,port) in mac_map.iteritems():

        if sw is sw1 and port == l.port1: bad_macs.add(mac)

        if sw is sw2 and port == l.port2: bad_macs.add(mac)

      for mac in bad_macs:

        log.debug("Unlearned %s", mac)

        del mac_map[mac]


  def _handle_ConnectionUp (self, event):

    sw = switches.get(event.dpid)

    if sw is None:

      # New switch

      sw = Switch()

      switches[event.dpid] = sw





  def _handle_BarrierIn (self, event):

    wp = waiting_paths.pop((event.dpid,event.xid), None)

    if not wp:"No waiting packet %s,%s", event.dpid, event.xid)


    #log.debug("Notify waiting packet %s,%s", event.dpid, event.xid)



def _timer_func ():

  #print time.time(), "timer_func is called"


  for a,b  in mypath.iteritems():

    if b[0] not in sent_sw:

      print time.time(), "send out ofp_flow_stats_request to dpid:", b[0]




def launch ():

  global year, month, day


  flock = time.localtime()




  print "Now:", str(year), ":", str(month), ":", str(day)


  timeout = min(max(PATH_SETUP_TIME, 5) * 2, 15)

  Timer(timeout, WaitingPath.expire_waiting_paths, recurring=True)


4. Run the Pox controller.


5. Run the mininet script.


6. Open xterm windows for h3, h4, h5, and h11.


7. Run the iperf UDP servers on h5 and h11.


8. Run the iperf UDP client on h3 first and check the output of the pox controller. From the output of the pox controller, when the used[00:00:00:00:00:03] exceeds the pre-defined value, i.e.300000 bytes, h5 will not show any further information. Because the h3->h5 is blocked.


9. Run the iperf UDP client on h4. h4-h11 is not blocked. Because the pre-defined value is not reached.


10. Change the system time and see if the h3 can send data again. The quota is reset and h3 can send the data packets again.


Dr. Chih-Heng Ke

Department of Computer Science and Information Engineering, National Quemoy University, Kinmen, Taiwan

Email: /