Using Bellman-Ford to find a shortest path

 

[Description]

        Based on OpenNetMon (paper, code), I provide a simple example to show how to write a module for a POX controller to find a shortest path to data transmission. This module, called Forwarding, firstly listens to open.discovery events. When a Link is found (LinkEvent is raised and it is event.added), the Forwarding will save the information in adj and switch_ports. Assume that a link between SW1 (port1) ---- SW2 (port2) is found, adj[data path id (or dpid) for SW1][dpid for SW2] = port1 is saved. Aslo, switch_ports[dpid for SW1, port1] = link is saved. With this formation, when a dpid and a port number are given, we can get a link. Link is composed of link1.dpid, link2.dpid, link1.port1, and link2.port2. So we will know the switch and port that the corresponding dpid and port is connected to. Moreover, the Forwarding module will also listen to ConnectionUp event. When there is a new connection, the Forwarding module will use the dpid to initialize a “sw” object. This event raises a “NewSwitch Event. “sw” is the initialization of class Switch. It will listen to PacketIn event. When a new packet is sent to a switch, the switch dpid and incoming port is remembered in mac_learning structure. So if the source address of a packet is given, we can get the switch dpid and port number from mac_learning. Moreover, this new packet will use _get_path function to find a path to destination. In _get_path function, the Bellman Ford algorithm is implemented. After finding a path, each switch along the path will set the corresponding forwarding rule. This is implemented by calling _install_path function.

 

[Test]

We will create a topology shown as below. Host1 will send packets to Host2.

 

Host1 -----Switch1-----Switch2----Switch5

                       |                                    |

                       |                                    |

                 Switch3-------------------Switch4 ---------- Host2

 

 

[Scripts]

1.      A script for topology. (lab7.py)

#!/usr/bin/env python

 

from mininet.net  import Mininet

from mininet.node import RemoteController

from mininet.link import TCLink

from mininet.cli  import CLI

from mininet.util import quietRun

 

net = Mininet(link=TCLink);

 

# Add hosts and switches

Host1  = net.addHost('h1')

Host2  = net.addHost('h2')

 

Switch1 = net.addSwitch('s1')

Switch2 = net.addSwitch('s2')

Switch3 = net.addSwitch('s3')

Switch4 = net.addSwitch('s4')

Switch5 = net.addSwitch('s5')

 

# Add links

# set link speeds to 10Mbit/s

linkopts = dict(bw=10)

net.addLink(Host1,   Switch1,    **linkopts )

net.addLink(Switch1,  Switch2,    **linkopts )

net.addLink(Switch1,  Switch3,    **linkopts )

net.addLink(Switch3,  Switch4,    **linkopts )

net.addLink(Switch2,  Switch5,    **linkopts )

net.addLink(Switch4,  Switch5,    **linkopts )

net.addLink(Switch4,  Host2,     **linkopts )

 

# Start

net.addController('c', controller=RemoteController,ip='127.0.0.1',port=6633)

net.build()

net.start()

 

# CLI

CLI( net )

 

# Clean up

net.stop()

 

2.      A script for controller. (bellmanford.py)  (put this file under /pox/ext)

# Copyright 2012-2013 James McCauley

#

# Licensed under the Apache License, Version 2.0 (the "License");

# you may not use this file except in compliance with the License.

# You may obtain a copy of the License at:

#

#     http://www.apache.org/licenses/LICENSE-2.0

#

# Unless required by applicable law or agreed to in writing, software

# distributed under the License is distributed on an "AS IS" BASIS,

# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

# See the License for the specific language governing permissions and

# limitations under the License.

 

"""

A shortest-path forwarding application.

 

This is a standalone L2 switch that learns ethernet addresses

across the entire network and picks short paths between them.

 

You shouldn't really write an application this way -- you should

keep more state in the controller (that is, your flow tables),

and/or you should make your topology more static.  However, this

does (mostly) work. :)

 

Depends on openflow.discovery

Works with openflow.spanning_tree

"""

 

from pox.core import core

import pox.openflow.libopenflow_01 as of

from pox.lib.revent import *

from pox.lib.recoco import Timer

from collections import defaultdict

from pox.openflow.discovery import Discovery

from pox.lib.util import dpid_to_str

import time

 

log = core.getLogger()

 

# Adjacency map.  [sw1][sw2] -> port from sw1 to sw2

adjacency = defaultdict(lambda:defaultdict(lambda:None))

 

# Switches we know of.  [dpid] -> Switch

switches = {}

 

# ethaddr -> (switch, port)

mac_map = {}

 

# Waiting path.  (dpid,xid)->WaitingPath

waiting_paths = {}

 

# Time to not flood in seconds

FLOOD_HOLDDOWN = 5

 

# Flow timeouts

FLOW_IDLE_TIMEOUT = 10

FLOW_HARD_TIMEOUT = 30

 

# How long is allowable to set up a path?

PATH_SETUP_TIME = 4

 

def _get_raw_path (src,dst):

  #Bellman-Ford algorithm

  #print "src=",src," dst=",dst

  distance = {}

  previous = {}

  sws = switches.values()

 

  for dpid in sws:

    distance[dpid] = 9999

    previous[dpid] = None

 

  distance[src]=0

 

  for m in range(len(sws)-1):

    for p in sws:

      for q in sws:

        if adjacency[p][q]!=None:

           w = 1

           if distance[p] + w < distance[q]:

             distance[q] = distance[p] + w

             previous[q] = p

  r=[]

  p=dst

  r.append(p)

  q=previous[p]

  while q is not None:

    if q == src:

      r.append(q)

      break

    p=q

    r.append(p)

    q=previous[p]

 

  r.reverse() 

  return r

 

def _check_path (p):

  """

  Make sure that a path is actually a string of nodes with connected ports

 

  returns True if path is valid

  """

  for a,b in zip(p[:-1],p[1:]):

    if adjacency[a[0]][b[0]] != a[2]:

      return False

    if adjacency[b[0]][a[0]] != b[1]:

      return False

  return True

 

 

def _get_path (src, dst, first_port, final_port):

  """

  Gets a cooked path -- a list of (node,in_port,out_port)

  """

  # Start with a raw path...

  if src == dst:

    path = [src]

  else:

    path = _get_raw_path(src, dst)

    if path is None: return None

    print "src=",src," dst=",dst

    print time.time(),": ",path

 

  # Now add the ports

  r = []

  in_port = first_port

  for s1,s2 in zip(path[:-1],path[1:]):

    out_port = adjacency[s1][s2]

    r.append((s1,in_port,out_port))

    in_port = adjacency[s2][s1]

  r.append((dst,in_port,final_port))

 

  assert _check_path(r), "Illegal path!"

 

  return r

 

 

class WaitingPath (object):

  """

  A path which is waiting for its path to be established

  """

  def __init__ (self, path, packet):

    """

    xids is a sequence of (dpid,xid)

    first_switch is the DPID where the packet came from

    packet is something that can be sent in a packet_out

    """

    self.expires_at = time.time() + PATH_SETUP_TIME

    self.path = path

    self.first_switch = path[0][0].dpid

    self.xids = set()

    self.packet = packet

 

    if len(waiting_paths) > 1000:

      WaitingPath.expire_waiting_paths()

 

  def add_xid (self, dpid, xid):

    self.xids.add((dpid,xid))

    waiting_paths[(dpid,xid)] = self

 

  @property

  def is_expired (self):

    return time.time() >= self.expires_at

 

  def notify (self, event):

    """

    Called when a barrier has been received

    """

    self.xids.discard((event.dpid,event.xid))

    if len(self.xids) == 0:

      # Done!

      if self.packet:

        log.debug("Sending delayed packet out %s"

                  % (dpid_to_str(self.first_switch),))

        msg = of.ofp_packet_out(data=self.packet,

            action=of.ofp_action_output(port=of.OFPP_TABLE))

        core.openflow.sendToDPID(self.first_switch, msg)

 

      core.l2_multi.raiseEvent(PathInstalled(self.path))

 

 

  @staticmethod

  def expire_waiting_paths ():

    packets = set(waiting_paths.values())

    killed = 0

    for p in packets:

      if p.is_expired:

        killed += 1

        for entry in p.xids:

          waiting_paths.pop(entry, None)

    if killed:

      log.error("%i paths failed to install" % (killed,))

 

 

class PathInstalled (Event):

  """

  Fired when a path is installed

  """

  def __init__ (self, path):

    Event.__init__(self)

    self.path = path

 

 

class Switch (EventMixin):

  def __init__ (self):

    self.connection = None

    self.ports = None

    self.dpid = None

    self._listeners = None

    self._connected_at = None

 

  def __repr__ (self):

    return dpid_to_str(self.dpid)

 

  def _install (self, switch, in_port, out_port, match, buf = None):

    msg = of.ofp_flow_mod()

    msg.match = match

    msg.match.in_port = in_port

    msg.idle_timeout = FLOW_IDLE_TIMEOUT

    msg.hard_timeout = FLOW_HARD_TIMEOUT

    msg.actions.append(of.ofp_action_output(port = out_port))

    msg.buffer_id = buf

    switch.connection.send(msg)

 

  def _install_path (self, p, match, packet_in=None):

    wp = WaitingPath(p, packet_in)

    for sw,in_port,out_port in p:

      self._install(sw, in_port, out_port, match)

      msg = of.ofp_barrier_request()

      sw.connection.send(msg)

      wp.add_xid(sw.dpid,msg.xid)

 

  def install_path (self, dst_sw, last_port, match, event):

    """

    Attempts to install a path between this switch and some destination

    """

    p = _get_path(self, dst_sw, event.port, last_port)

    if p is None:

      log.warning("Can't get from %s to %s", match.dl_src, match.dl_dst)

 

      import pox.lib.packet as pkt

 

      if (match.dl_type == pkt.ethernet.IP_TYPE and

          event.parsed.find('ipv4')):

        # It's IP -- let's send a destination unreachable

        log.debug("Dest unreachable (%s -> %s)",

                  match.dl_src, match.dl_dst)

 

        from pox.lib.addresses import EthAddr

        e = pkt.ethernet()

        e.src = EthAddr(dpid_to_str(self.dpid)) #FIXME: Hmm...

        e.dst = match.dl_src

        e.type = e.IP_TYPE

        ipp = pkt.ipv4()

        ipp.protocol = ipp.ICMP_PROTOCOL

        ipp.srcip = match.nw_dst #FIXME: Ridiculous

        ipp.dstip = match.nw_src

        icmp = pkt.icmp()

        icmp.type = pkt.ICMP.TYPE_DEST_UNREACH

        icmp.code = pkt.ICMP.CODE_UNREACH_HOST

        orig_ip = event.parsed.find('ipv4')

 

        d = orig_ip.pack()

        d = d[:orig_ip.hl * 4 + 8]

        import struct

        d = struct.pack("!HH", 0,0) + d #FIXME: MTU

        icmp.payload = d

        ipp.payload = icmp

        e.payload = ipp

        msg = of.ofp_packet_out()

        msg.actions.append(of.ofp_action_output(port = event.port))

        msg.data = e.pack()

        self.connection.send(msg)

 

      return

 

    log.debug("Installing path for %s -> %s %04x (%i hops)",

        match.dl_src, match.dl_dst, match.dl_type, len(p))

 

    # We have a path -- install it

    self._install_path(p, match, event.ofp)

 

    # Now reverse it and install it backwards

    # (we'll just assume that will work)

    p = [(sw,out_port,in_port) for sw,in_port,out_port in p]

    self._install_path(p, match.flip())

 

 

  def _handle_PacketIn (self, event):

    def flood ():

      """ Floods the packet """

      if self.is_holding_down:

        log.warning("Not flooding -- holddown active")

      msg = of.ofp_packet_out()

      # OFPP_FLOOD is optional; some switches may need OFPP_ALL

      msg.actions.append(of.ofp_action_output(port = of.OFPP_FLOOD))

      msg.buffer_id = event.ofp.buffer_id

      msg.in_port = event.port

      self.connection.send(msg)

 

    def drop ():

      # Kill the buffer

      if event.ofp.buffer_id is not None:

        msg = of.ofp_packet_out()

        msg.buffer_id = event.ofp.buffer_id

        event.ofp.buffer_id = None # Mark is dead

        msg.in_port = event.port

        self.connection.send(msg)

 

    packet = event.parsed

 

    loc = (self, event.port) # Place we saw this ethaddr

    oldloc = mac_map.get(packet.src) # Place we last saw this ethaddr

 

    if packet.effective_ethertype == packet.LLDP_TYPE:

      drop()

      return

 

    if oldloc is None:

      if packet.src.is_multicast == False:

        mac_map[packet.src] = loc # Learn position for ethaddr

        log.debug("Learned %s at %s.%i", packet.src, loc[0], loc[1])

    elif oldloc != loc:

      # ethaddr seen at different place!

      if core.openflow_discovery.is_edge_port(loc[0].dpid, loc[1]):

        # New place is another "plain" port (probably)

        log.debug("%s moved from %s.%i to %s.%i?", packet.src,

                  dpid_to_str(oldloc[0].dpid), oldloc[1],

                  dpid_to_str(   loc[0].dpid),    loc[1])

        if packet.src.is_multicast == False:

          mac_map[packet.src] = loc # Learn position for ethaddr

          log.debug("Learned %s at %s.%i", packet.src, loc[0], loc[1])

      elif packet.dst.is_multicast == False:

        # New place is a switch-to-switch port!

        # Hopefully, this is a packet we're flooding because we didn't

        # know the destination, and not because it's somehow not on a

        # path that we expect it to be on.

        # If spanning_tree is running, we might check that this port is

        # on the spanning tree (it should be).

        if packet.dst in mac_map:

          # Unfortunately, we know the destination.  It's possible that

          # we learned it while it was in flight, but it's also possible

          # that something has gone wrong.

          log.warning("Packet from %s to known destination %s arrived "

                      "at %s.%i without flow", packet.src, packet.dst,

                      dpid_to_str(self.dpid), event.port)

 

 

    if packet.dst.is_multicast:

      log.debug("Flood multicast from %s", packet.src)

      flood()

    else:

      if packet.dst not in mac_map:

        log.debug("%s unknown -- flooding" % (packet.dst,))

        flood()

      else:

        dest = mac_map[packet.dst]

        match = of.ofp_match.from_packet(packet)

        self.install_path(dest[0], dest[1], match, event)

 

  def disconnect (self):

    if self.connection is not None:

      log.debug("Disconnect %s" % (self.connection,))

      self.connection.removeListeners(self._listeners)

      self.connection = None

      self._listeners = None

 

  def connect (self, connection):

    if self.dpid is None:

      self.dpid = connection.dpid

    assert self.dpid == connection.dpid

    if self.ports is None:

      self.ports = connection.features.ports

    self.disconnect()

    log.debug("Connect %s" % (connection,))

    self.connection = connection

    self._listeners = self.listenTo(connection)

    self._connected_at = time.time()

 

  @property

  def is_holding_down (self):

    if self._connected_at is None: return True

    if time.time() - self._connected_at > FLOOD_HOLDDOWN:

      return False

    return True

 

  def _handle_ConnectionDown (self, event):

    self.disconnect()

 

 

class l2_multi (EventMixin):

 

  _eventMixin_events = set([

    PathInstalled,

  ])

 

  def __init__ (self):

    # Listen to dependencies

    def startup ():

      core.openflow.addListeners(self, priority=0)

      core.openflow_discovery.addListeners(self)

    core.call_when_ready(startup, ('openflow','openflow_discovery'))

 

  def _handle_LinkEvent (self, event):

    def flip (link):

      return Discovery.Link(link[2],link[3], link[0],link[1])

 

    l = event.link

    sw1 = switches[l.dpid1]

    sw2 = switches[l.dpid2]

 

    # Invalidate all flows and path info.

    # For link adds, this makes sure that if a new link leads to an

    # improved path, we use it.

    # For link removals, this makes sure that we don't use a

    # path that may have been broken.

    #NOTE: This could be radically improved! (e.g., not *ALL* paths break)

    clear = of.ofp_flow_mod(command=of.OFPFC_DELETE)

    for sw in switches.itervalues():

      if sw.connection is None: continue

      sw.connection.send(clear)

 

    if event.removed:

      # This link no longer okay

      if sw2 in adjacency[sw1]: del adjacency[sw1][sw2]

      if sw1 in adjacency[sw2]: del adjacency[sw2][sw1]

 

      # But maybe there's another way to connect these...

      for ll in core.openflow_discovery.adjacency:

        if ll.dpid1 == l.dpid1 and ll.dpid2 == l.dpid2:

          if flip(ll) in core.openflow_discovery.adjacency:

            # Yup, link goes both ways

            adjacency[sw1][sw2] = ll.port1

            adjacency[sw2][sw1] = ll.port2

            # Fixed -- new link chosen to connect these

            break

    else:

      # If we already consider these nodes connected, we can

      # ignore this link up.

      # Otherwise, we might be interested...

      if adjacency[sw1][sw2] is None:

        # These previously weren't connected.  If the link

        # exists in both directions, we consider them connected now.

        if flip(l) in core.openflow_discovery.adjacency:

          # Yup, link goes both ways -- connected!

          adjacency[sw1][sw2] = l.port1

          adjacency[sw2][sw1] = l.port2

 

      # If we have learned a MAC on this port which we now know to

      # be connected to a switch, unlearn it.

      bad_macs = set()

      for mac,(sw,port) in mac_map.iteritems():

        if sw is sw1 and port == l.port1: bad_macs.add(mac)

        if sw is sw2 and port == l.port2: bad_macs.add(mac)

      for mac in bad_macs:

        log.debug("Unlearned %s", mac)

        del mac_map[mac]

 

  def _handle_ConnectionUp (self, event):

    sw = switches.get(event.dpid)

    if sw is None:

      # New switch

      sw = Switch()

      switches[event.dpid] = sw

      sw.connect(event.connection)

    else:

      sw.connect(event.connection)

 

  def _handle_BarrierIn (self, event):

    wp = waiting_paths.pop((event.dpid,event.xid), None)

    if not wp:

      #log.info("No waiting packet %s,%s", event.dpid, event.xid)

      return

    #log.debug("Notify waiting packet %s,%s", event.dpid, event.xid)

    wp.notify(event)

 

 

def launch ():

  core.registerNew(l2_multi)

 

  timeout = min(max(PATH_SETUP_TIME, 5) * 2, 15)

  Timer(timeout, WaitingPath.expire_waiting_paths, recurring=True)

 

 

  

Dr. Chih-Heng Ke

Department of Computer Science and Information Engineering, National Quemoy University, Kinmen, Taiwan

Email: smallko@gmail.com