Using Bellman-Ford to find a shortest path (version 2)

 

[Goal]

  Based on l2_multi.py pox controller module, I write l2_bellmanford.py which use bellman-ford algorithm to find a shortest path. This version is more stable than the module in opennetmon.

 

[Evaluation Environment]

  We will build up a topology like the following figure. Then use l2_bellmanford to find a shortest path between hosts.

 

 

[Steps]

1.      Create the evaluation topology.

 

2.      Prepare the l2_bellmanford.py and save this file under /pox/ext folder.

 

from pox.core import core

import pox.openflow.libopenflow_01 as of

from pox.lib.revent import *

from pox.lib.recoco import Timer

from collections import defaultdict

from pox.openflow.discovery import Discovery

from pox.lib.util import dpid_to_str

import time

 

log = core.getLogger()

 

# Adjacency map.  [sw1][sw2] -> port from sw1 to sw2

adjacency = defaultdict(lambda:defaultdict(lambda:None))

 

# Switches we know of.  [dpid] -> Switch

switches = {}

 

# ethaddr -> (switch, port)

mac_map = {}

 

# Waiting path.  (dpid,xid)->WaitingPath

waiting_paths = {}

 

# Time to not flood in seconds

FLOOD_HOLDDOWN = 5

 

# Flow timeouts

FLOW_IDLE_TIMEOUT = 10

FLOW_HARD_TIMEOUT = 30

 

# How long is allowable to set up a path?

PATH_SETUP_TIME = 4

 

def _get_raw_path (src,dst):

  #Bellman-Ford algorithm

  #print "src=",src," dst=",dst

  distance = {}

  previous = {}      

  sws = switches.values()

 

  for dpid in sws:

    distance[dpid] = 9999

    previous[dpid] = None

 

  distance[src]=0

 

  for m in range(len(sws)-1):

    for p in sws:

      for q in sws:

        if adjacency[p][q]!=None:

           w = 1

           if distance[p] + w < distance[q]:

             distance[q] = distance[p] + w

             previous[q] = p

  r=[]

  p=dst

  r.append(p)

  q=previous[p]

  while q is not None:

    if q == src:

      r.append(q)

      break

    p=q

    r.append(p)

    q=previous[p]

 

  r.reverse() 

  return r

 

def _check_path (p):

  """

  Make sure that a path is actually a string of nodes with connected ports

 

  returns True if path is valid

  """

  for a,b in zip(p[:-1],p[1:]):

    if adjacency[a[0]][b[0]] != a[2]:

      return False

    if adjacency[b[0]][a[0]] != b[1]:

      return False

  return True

 

 

def _get_path (src, dst, first_port, final_port):

  """

  Gets a cooked path -- a list of (node,in_port,out_port)

  """

  # Start with a raw path...

  if src == dst:

    path = [src]

  else:

    path = _get_raw_path(src, dst)

    if path is None: return None

    print "src=",src," dst=",dst

    print time.time(),": ",path

 

  # Now add the ports

  r = []

  in_port = first_port

  for s1,s2 in zip(path[:-1],path[1:]):

    out_port = adjacency[s1][s2]

    r.append((s1,in_port,out_port))

    in_port = adjacency[s2][s1]

  r.append((dst,in_port,final_port))

 

  assert _check_path(r), "Illegal path!"

 

  return r

 

 

class WaitingPath (object):

  """

  A path which is waiting for its path to be established

  """

  def __init__ (self, path, packet):

    """

    xids is a sequence of (dpid,xid)

    first_switch is the DPID where the packet came from

    packet is something that can be sent in a packet_out

    """

    self.expires_at = time.time() + PATH_SETUP_TIME

    self.path = path

    self.first_switch = path[0][0].dpid

    self.xids = set()

    self.packet = packet

 

    if len(waiting_paths) > 1000:

      WaitingPath.expire_waiting_paths()

 

  def add_xid (self, dpid, xid):

    self.xids.add((dpid,xid))

    waiting_paths[(dpid,xid)] = self

 

  @property

  def is_expired (self):

    return time.time() >= self.expires_at

 

  def notify (self, event):

    """

    Called when a barrier has been received

    """

    self.xids.discard((event.dpid,event.xid))

    if len(self.xids) == 0:

      # Done!

      if self.packet:

        log.debug("Sending delayed packet out %s"

                  % (dpid_to_str(self.first_switch),))

        msg = of.ofp_packet_out(data=self.packet,

            action=of.ofp_action_output(port=of.OFPP_TABLE))

        core.openflow.sendToDPID(self.first_switch, msg)

 

      core.l2_multi.raiseEvent(PathInstalled(self.path))

 

 

  @staticmethod

  def expire_waiting_paths ():

    packets = set(waiting_paths.values())

    killed = 0

    for p in packets:

      if p.is_expired:

        killed += 1

        for entry in p.xids:

          waiting_paths.pop(entry, None)

    if killed:

      log.error("%i paths failed to install" % (killed,))

 

 

class PathInstalled (Event):

  """

  Fired when a path is installed

  """

  def __init__ (self, path):

    Event.__init__(self)

    self.path = path

 

 

class Switch (EventMixin):

  def __init__ (self):

    self.connection = None

    self.ports = None

    self.dpid = None

    self._listeners = None

    self._connected_at = None

 

  def __repr__ (self):

    return dpid_to_str(self.dpid)

 

  def _install (self, switch, in_port, out_port, match, buf = None):

    msg = of.ofp_flow_mod()

    msg.match = match

    msg.match.in_port = in_port

    msg.idle_timeout = FLOW_IDLE_TIMEOUT

    msg.hard_timeout = FLOW_HARD_TIMEOUT

    msg.actions.append(of.ofp_action_output(port = out_port))

    msg.buffer_id = buf

    switch.connection.send(msg)

 

  def _install_path (self, p, match, packet_in=None):

    wp = WaitingPath(p, packet_in)

    for sw,in_port,out_port in p:

      self._install(sw, in_port, out_port, match)

      msg = of.ofp_barrier_request()

      sw.connection.send(msg)

      wp.add_xid(sw.dpid,msg.xid)

 

  def install_path (self, dst_sw, last_port, match, event):

    """

    Attempts to install a path between this switch and some destination

    """

    p = _get_path(self, dst_sw, event.port, last_port)

    if p is None:

      log.warning("Can't get from %s to %s", match.dl_src, match.dl_dst)

 

      import pox.lib.packet as pkt

 

      if (match.dl_type == pkt.ethernet.IP_TYPE and

          event.parsed.find('ipv4')):

        # It's IP -- let's send a destination unreachable

        log.debug("Dest unreachable (%s -> %s)",

                  match.dl_src, match.dl_dst)

 

        from pox.lib.addresses import EthAddr

        e = pkt.ethernet()

        e.src = EthAddr(dpid_to_str(self.dpid)) #FIXME: Hmm...

        e.dst = match.dl_src

        e.type = e.IP_TYPE

        ipp = pkt.ipv4()

        ipp.protocol = ipp.ICMP_PROTOCOL

        ipp.srcip = match.nw_dst #FIXME: Ridiculous

        ipp.dstip = match.nw_src

        icmp = pkt.icmp()

        icmp.type = pkt.ICMP.TYPE_DEST_UNREACH

        icmp.code = pkt.ICMP.CODE_UNREACH_HOST

        orig_ip = event.parsed.find('ipv4')

 

        d = orig_ip.pack()

        d = d[:orig_ip.hl * 4 + 8]

        import struct

        d = struct.pack("!HH", 0,0) + d #FIXME: MTU

        icmp.payload = d

        ipp.payload = icmp

        e.payload = ipp

        msg = of.ofp_packet_out()

        msg.actions.append(of.ofp_action_output(port = event.port))

        msg.data = e.pack()

        self.connection.send(msg)

 

      return

 

    log.debug("Installing path for %s -> %s %04x (%i hops)",

        match.dl_src, match.dl_dst, match.dl_type, len(p))

 

    # We have a path -- install it

    self._install_path(p, match, event.ofp)

 

    # Now reverse it and install it backwards

    # (we'll just assume that will work)

    p = [(sw,out_port,in_port) for sw,in_port,out_port in p]

    self._install_path(p, match.flip())

 

 

  def _handle_PacketIn (self, event):

    def flood ():

      """ Floods the packet """

      if self.is_holding_down:

        log.warning("Not flooding -- holddown active")

      msg = of.ofp_packet_out()

      # OFPP_FLOOD is optional; some switches may need OFPP_ALL

      msg.actions.append(of.ofp_action_output(port = of.OFPP_FLOOD))

      msg.buffer_id = event.ofp.buffer_id

      msg.in_port = event.port

      self.connection.send(msg)

 

    def drop ():

      # Kill the buffer

      if event.ofp.buffer_id is not None:

        msg = of.ofp_packet_out()

        msg.buffer_id = event.ofp.buffer_id

        event.ofp.buffer_id = None # Mark is dead

        msg.in_port = event.port

        self.connection.send(msg)

 

    packet = event.parsed

 

    loc = (self, event.port) # Place we saw this ethaddr

    oldloc = mac_map.get(packet.src) # Place we last saw this ethaddr

 

    if packet.effective_ethertype == packet.LLDP_TYPE:

      drop()

      return

 

    if oldloc is None:

      if packet.src.is_multicast == False:

        mac_map[packet.src] = loc # Learn position for ethaddr

        log.debug("Learned %s at %s.%i", packet.src, loc[0], loc[1])

    elif oldloc != loc:

      # ethaddr seen at different place!

      if core.openflow_discovery.is_edge_port(loc[0].dpid, loc[1]):

        # New place is another "plain" port (probably)

        log.debug("%s moved from %s.%i to %s.%i?", packet.src,

                  dpid_to_str(oldloc[0].dpid), oldloc[1],

                  dpid_to_str(   loc[0].dpid),    loc[1])

        if packet.src.is_multicast == False:

          mac_map[packet.src] = loc # Learn position for ethaddr

          log.debug("Learned %s at %s.%i", packet.src, loc[0], loc[1])

      elif packet.dst.is_multicast == False:

        # New place is a switch-to-switch port!

        # Hopefully, this is a packet we're flooding because we didn't

        # know the destination, and not because it's somehow not on a

        # path that we expect it to be on.

        # If spanning_tree is running, we might check that this port is

        # on the spanning tree (it should be).

        if packet.dst in mac_map:

          # Unfortunately, we know the destination.  It's possible that

          # we learned it while it was in flight, but it's also possible

          # that something has gone wrong.

          log.warning("Packet from %s to known destination %s arrived "

                      "at %s.%i without flow", packet.src, packet.dst,

                      dpid_to_str(self.dpid), event.port)

 

 

    if packet.dst.is_multicast:

      log.debug("Flood multicast from %s", packet.src)

      flood()

    else:

      if packet.dst not in mac_map:

        log.debug("%s unknown -- flooding" % (packet.dst,))

        flood()

      else:

        dest = mac_map[packet.dst]

        match = of.ofp_match.from_packet(packet)

        self.install_path(dest[0], dest[1], match, event)

 

  def disconnect (self):

    if self.connection is not None:

      log.debug("Disconnect %s" % (self.connection,))

      self.connection.removeListeners(self._listeners)

      self.connection = None

      self._listeners = None

 

  def connect (self, connection):

    if self.dpid is None:

      self.dpid = connection.dpid

    assert self.dpid == connection.dpid

    if self.ports is None:

      self.ports = connection.features.ports

    self.disconnect()

    log.debug("Connect %s" % (connection,))

    self.connection = connection

    self._listeners = self.listenTo(connection)

    self._connected_at = time.time()

 

  @property

  def is_holding_down (self):

    if self._connected_at is None: return True

    if time.time() - self._connected_at > FLOOD_HOLDDOWN:

      return False

    return True

 

  def _handle_ConnectionDown (self, event):

    self.disconnect()

 

 

class l2_multi (EventMixin):

 

  _eventMixin_events = set([

    PathInstalled,

  ])

 

  def __init__ (self):

    # Listen to dependencies

    def startup ():

      core.openflow.addListeners(self, priority=0)

      core.openflow_discovery.addListeners(self)

    core.call_when_ready(startup, ('openflow','openflow_discovery'))

 

  def _handle_LinkEvent (self, event):

    def flip (link):

      return Discovery.Link(link[2],link[3], link[0],link[1])

 

    l = event.link

    sw1 = switches[l.dpid1]

    sw2 = switches[l.dpid2]

 

    # Invalidate all flows and path info.

    # For link adds, this makes sure that if a new link leads to an

    # improved path, we use it.

    # For link removals, this makes sure that we don't use a

    # path that may have been broken.

    #NOTE: This could be radically improved! (e.g., not *ALL* paths break)

    clear = of.ofp_flow_mod(command=of.OFPFC_DELETE)

    for sw in switches.itervalues():

      if sw.connection is None: continue

      sw.connection.send(clear)

 

    if event.removed:

      # This link no longer okay

      if sw2 in adjacency[sw1]: del adjacency[sw1][sw2]

      if sw1 in adjacency[sw2]: del adjacency[sw2][sw1]

 

      # But maybe there's another way to connect these...

      for ll in core.openflow_discovery.adjacency:

        if ll.dpid1 == l.dpid1 and ll.dpid2 == l.dpid2:

          if flip(ll) in core.openflow_discovery.adjacency:

            # Yup, link goes both ways

            adjacency[sw1][sw2] = ll.port1

            adjacency[sw2][sw1] = ll.port2

            # Fixed -- new link chosen to connect these

            break

    else:

      # If we already consider these nodes connected, we can

      # ignore this link up.

      # Otherwise, we might be interested...

      if adjacency[sw1][sw2] is None:

        # These previously weren't connected.  If the link

        # exists in both directions, we consider them connected now.

        if flip(l) in core.openflow_discovery.adjacency:

          # Yup, link goes both ways -- connected!

          adjacency[sw1][sw2] = l.port1

          adjacency[sw2][sw1] = l.port2

 

      # If we have learned a MAC on this port which we now know to

      # be connected to a switch, unlearn it.

      bad_macs = set()

      for mac,(sw,port) in mac_map.iteritems():

        if sw is sw1 and port == l.port1: bad_macs.add(mac)

        if sw is sw2 and port == l.port2: bad_macs.add(mac)

      for mac in bad_macs:

        log.debug("Unlearned %s", mac)

        del mac_map[mac]

 

  def _handle_ConnectionUp (self, event):

    sw = switches.get(event.dpid)

    if sw is None:

      # New switch

      sw = Switch()

      switches[event.dpid] = sw

      sw.connect(event.connection)

    else:

      sw.connect(event.connection)

 

  def _handle_BarrierIn (self, event):

    wp = waiting_paths.pop((event.dpid,event.xid), None)

    if not wp:

      #log.info("No waiting packet %s,%s", event.dpid, event.xid)

      return

    #log.debug("Notify waiting packet %s,%s", event.dpid, event.xid)

    wp.notify(event)

 

def launch ():

  core.registerNew(l2_multi)

  timeout = min(max(PATH_SETUP_TIME, 5) * 2, 15)

      Timer(timeout, WaitingPath.expire_waiting_paths, recurring=True)

 

3.      Open another terminal, kill the default controller, and run the l2_bellmanford module of pox controller.

 

 

4.      Ping tests from h1 to h3. (We can see that the paths are built from 00-00-00-00-00-03 (S3)->00-00-00-00-00-02(S2)-> 00-00-00-00-00-04(S4) or 00-00-00-00-00-04 (S4)->00-00-00-00-00-02(S2)-> 00-00-00-00-00-03(S3) for IP and ARP packets transmission.

 

5.      Ping tests from h1 to h8. The paths are built from s3-s2-s1-s5-s7 or vice versa.

 

Dr. Chih-Heng Ke

Department of Computer Science and Information Engineering, National Quemoy University, Kinmen, Taiwan

Email: smallko@gmail.com / smallko@nqu.edu.tw