How to write a weighted random early detection (WRED) queue module to provide better video delivered quality?

 

[Background]

1.      In this example, I ported WRED queue management from NS2 diffserv module to NCTUNS. But in this version of WRED, only 1 physical queue with 3 virtual queues is provided. Virtual queue 2 can provide the lowest drop probability while virtual queue 0 provides the highest drop probability. Packets with TOS = 10 go to virtual queue 2 (I frame packets), packets with TOS =20 (P frame packets) go to virtual queue 1, and packets with TOS = 30 (B frame packets) or 0 (Normal packets) go to virtual queue 0.

2.      Refer to  How to insert a new module (simple FIFO, sFIFO) into NCTUNS? to know how to insert WRED into NCTUNS.

3.      Refer to How to evaluate MPEG video transmission? to know how to do MPEG video transmission simulations.

 

[Preparation]

1.      Create a folder called WRED under /root/NCTUns-6.0/src/nctuns/module/ps.

2.      Put the following files into this folder.

(wred.h)

#ifndef __NCTUNS_wred_h__

#define __NCTUNS_wred_h__

 

#include <object.h>

#include <packet.h>

 

#define MAX_PREC 3

 

/*

 * Define Interface Queue for every Interface

 */

struct ifqueue {

        ePacket_                *ifq_head; /* head of ifq */

        ePacket_                *ifq_tail; /* tail of ifq */

        int                   ifq_len;   /* current queue length */

        int                   ifq_maxlen;/* max queue length */

        int                   ifq_drops; /* drops count */

};

 

/*

 * Early drop parameters, supplied by user

 */

struct edp {

        /*

         * User supplied.

         */

        double th_min;             /* minimum threshold of average queue size */

        double th_max;            /* maximum threshold of average queue size */

        double max_p;             /* maximum dropping probability */

        double q_w;          /* queue weight given to cur q size sample */

        int    tos;

        int    mps;                /* mean packet size */

};

 

/*

 * Early drop variables, maintained by RED

 */

struct edv {

        double v_prob;             /* prob. of packet drop */

        int count;               /* number of packets since last drop */

        double avg_queue;   /* average queue size */    

        u_int64_t q_time;   /* start of the queue idle time */

        edv() : v_prob(0.002), count(-1), avg_queue(0.0), q_time(GetCurrentTime()) { }

};

 

/*

 * Define Macros for IFq (interface queue)

 */

#define IF_QFULL(ifq)           ((ifq)->ifq_len >= (ifq)->ifq_maxlen)

#define IF_DROP(ifq)            ((ifq)->ifq_drops++)

#define IF_ENQUEUE(ifq, m) { \

        if ((ifq)->ifq_tail == 0) \

                (ifq)->ifq_head = m; \

        else \

                (ifq)->ifq_tail->next_ep = m; \

        (ifq)->ifq_tail = m; \

        (ifq)->ifq_len++; \

}

#define IF_PREPEND(ifq, m) { \

        (m)->next_ep = (ifq)->ifq_head; \

        if ((ifq)->ifq_tail == 0) \

                (ifq)->ifq_tail = (m); \

        (ifq)->ifq_head = (m); \

        (ifq)->ifq_len++; \

}

#define IF_DEQUEUE(ifq, m) { \

        (m) = (ifq)->ifq_head; \

        if (m) { \

                if (((ifq)->ifq_head = (m)->next_ep) == 0) \

                        (ifq)->ifq_tail = 0; \

                (m)->next_ep = 0; \

                (ifq)->ifq_len--; \

        } \

}

 

class WRED : public NslObject {

 

 private:

 

        struct    ifqueue if_snd; /* output interface queue */

        struct    edp edp_[MAX_PREC]; /* early drop parameters */

        struct    edv edv_[MAX_PREC]; /* early drop variables */

        double *bw_;

 

 protected:

 

       int                  intrq(MBinder *);

 

 public :

 

        WRED(u_int32_t type, u_int32_t id, struct plist* pl, const char *name);

        ~WRED();

 

        int                   init();

        int                   recv(ePacket_ *);

        int                   send(ePacket_ *);

        void                       calcAvg(int prec, int m); /* to calculate avg queue size of a virtual queue */

};

 

#endif /* __NCTUNS_wred_h__ */

 

(wred.cc)

#include <stdlib.h>

#include <string.h>

#include <assert.h>

#include <nctuns_api.h>

#include <math.h>

#include <time.h>

#include <mbinder.h>

#include "wred.h"

 

#include <netinet/in.h>

#include <netinet/udp.h>

#include <packet.h>

#include <maptable.h>

 

MODULE_GENERATOR(WRED);

 

WRED::WRED(u_int32_t type, u_int32_t id, struct plist* pl, const char *name)

                : NslObject(type, id, pl, name)

{

        /*

         * Disable flow control

         */

        s_flowctl = DISABLED;

        r_flowctl = DISABLED;

 

        /*

         * Initialize interface queue

         */

        if_snd.ifq_head = if_snd.ifq_tail = 0;

        if_snd.ifq_len = 0;

        if_snd.ifq_drops = 0;

        if_snd.ifq_maxlen = 0;

 

        for(int i = 0; i< MAX_PREC; i++) {

          edp_[i].th_min = 0;

          edp_[i].th_max = 0;

          edp_[i].q_w = 0.0;

          edp_[i].max_p = 0.0;

          edp_[i].tos = 0;

          edp_[i].mps = 1500;

        }

       

        vBind("q0_min_threshold", &edp_[0].th_min);

        vBind("q0_max_threshold", &edp_[0].th_max);

        vBind("q0_queue_weight", &edp_[0].q_w);

            vBind("q0_max_drop_prob", &edp_[0].max_p);

            vBind("q0_tos", &edp_[0].tos);

            vBind("q0_mps", &edp_[0].mps);

           

            vBind("q1_min_threshold", &edp_[1].th_min);

        vBind("q1_max_threshold", &edp_[1].th_max);

        vBind("q1_queue_weight", &edp_[1].q_w);

            vBind("q1_max_drop_prob", &edp_[1].max_p);

        vBind("q1_tos", &edp_[1].tos);

        vBind("q1_mps", &edp_[1].mps);

            

        vBind("q2_min_threshold", &edp_[2].th_min);

        vBind("q2_max_threshold", &edp_[2].th_max);

        vBind("q2_queue_weight", &edp_[2].q_w);

            vBind("q2_max_drop_prob", &edp_[2].max_p);

        vBind("q2_tos", &edp_[2].tos);

        vBind("q2_mps", &edp_[2].mps);

        

        vBind("max_qlen", &if_snd.ifq_maxlen);

}

 

WRED::~WRED() {

}

 

int WRED::init() {

    int (NslObject::*upcall)(MBinder *);

 

    bw_ = GET_REG_VAR(get_port(), "BW", double *);

   

    if (if_snd.ifq_maxlen <= 0)     if_snd.ifq_maxlen = 50;            /* default queue size    */

    if (edp_[0].th_min <= 0)         edp_[0].th_min = 5;        /* default min threshold */

    if (edp_[0].th_max <= 0)         edp_[0].th_max = 15;        /* default max threshold */

    if (edp_[0].q_w <= 0)              edp_[0].q_w = 0.02;           /* default queue weight  */

    if (edp_[0].max_p <= 0)          edp_[0].max_p = 0.1;        /* default max dropping probability */

   

    if (edp_[1].th_min <= 0)         edp_[1].th_min = 15;       /* default min threshold */

    if (edp_[1].th_max <= 0)         edp_[1].th_max = 30;        /* default max threshold */

    if (edp_[1].q_w <= 0)              edp_[1].q_w = 0.02;           /* default queue weight  */

    if (edp_[1].max_p <= 0)          edp_[1].max_p = 0.05;      /* default max dropping probability */

    if (edp_[1].tos <= 0)                edp_[1].tos = 20;

   

    if (edp_[2].th_min <= 0)         edp_[2].th_min = 30;       /* default min threshold */

    if (edp_[2].th_max <= 0)         edp_[2].th_max = 45;        /* default max threshold */

    if (edp_[2].q_w <= 0)              edp_[2].q_w = 0.02;           /* default queue weight  */

    if (edp_[2].max_p <= 0)          edp_[2].max_p = 0.025;            /* default max dropping probability */

    if (edp_[2].tos <= 0)                edp_[2].tos = 10; 

   

    upcall = (int (NslObject::*)(MBinder *))&WRED::intrq; /* Set upcall */

    sendtarget_->set_upcall(this, upcall);

    return(1);

}

 

int WRED::send(ePacket_ *pkt) {

       

        int           m;

        double            p_b = 0, p_a = 0;

        u_int64_t tick;

        double             edp_ptc;

        Packet              *p;

        struct ip   *iph;

        int            prec=0;

       

        assert(pkt&&pkt->DataInfo_);

 

        p = (Packet *)pkt->DataInfo_;

        iph=(struct ip *)p->pkt_sget();

 

        if(iph!=NULL){

          if(iph->ip_tos==edp_[2].tos)

             prec=2;

          else if(iph->ip_tos==edp_[1].tos)

             prec=1;

          else

             prec=0;

        }

  

        /*

         * If Module-Binder Queue(MBQ) is full, we should

         * insert the outgoing packet into the interface

         * queue. If MBQ is not full, we can call the

         * put() or NslObject::send() method to pass the

         * outgoing packet to next module.

         */

        if( sendtarget_->qfull() ) {

                /* MBQ is full, insert to ifq */

                if (IF_QFULL(&if_snd)) {

                        /* ifq full, drop it! */

                        IF_DROP(&if_snd);

                        freePacket(pkt);

                        return(1);

                }

                else {

                  m = 0;

                  if(if_snd.ifq_len ==0){

                    edp_ptc = (*bw_) / (8 * edp_[prec].mps);

                    SEC_TO_TICK(tick, 1);

                    m = (edp_ptc * ((double) (GetCurrentTime() - edv_[0].q_time)/(double)tick));

                  }

   

                  calcAvg(0, m+1);

                 

                  

                  if(edv_[prec].avg_queue > edp_[prec].th_min) {

                    if(edv_[prec].avg_queue >  edp_[prec].th_max) {

                      edv_[prec].count++;

                      p_b = edp_[prec].max_p * (edv_[prec].avg_queue - edp_[prec].th_min)/(edp_[prec].th_max - edp_[prec].th_min);

                      p_a = p_b/(1-edv_[prec].count * p_b);

                      

                      double pp = rand()% 1000 + 1;

 

                      if(p_a < 0) p_a = 0;

 

                      if(p_a * 1000 > pp) {    

                        IF_DROP(&if_snd);

                        freePacket(pkt);

                        edv_[prec].count=0;

                        return (1);

                      } else {

                        IF_ENQUEUE(&if_snd, pkt);

                        return (1);

                      }

                    } else {  //if avg queue is greater than max. threshold

                        IF_DROP(&if_snd);

                        freePacket(pkt);

                        edv_[prec].count=0;

                        return (1);

                    }

                  } else {  // if avg queue is less than min. threshold

                      IF_ENQUEUE(&if_snd, pkt);

                      edv_[prec].count = -1;

                      return (1);

                  }                       

                }

                

                if(if_snd.ifq_len == 0)

                  edv_[0].q_time = GetCurrentTime();

        }

        else {

                /*

                 * MBQ is not full, pass outgoing packet

                 * to next module.

                 */

                return(NslObject::send(pkt));

        }

}

 

int WRED::recv(ePacket_ *pkt) {

        /* Just by pass incoming packet */

    assert(pkt&&pkt->DataInfo_);

    return(NslObject::recv(pkt));

}

 

 

int WRED::intrq(MBinder *port) {

 

        ePacket_  *pkt;

 

        /*

         * Push the packet in the interface queue

         * to the MBQ. Whenever the pakcet in the

         * MBQ is sent, the scheduler will call this

         * member function to give fifo module a

         * chance to send the next packet in the

         * interface queue.

         */

        IF_DEQUEUE(&if_snd, pkt);

       

        if (pkt != NULL) {

                /*

                 * If still exist packet in the interface

                 * queue, we try to push it to the MBQ,

                 */

                assert(sendtarget_->enqueue(pkt) == 0);

        }

        return(1);

}

 

void WRED::calcAvg(int prec, int m) {

  double f;

  int i;

 

  f = edv_[prec].avg_queue;

 

  while(--m>=1)

    f *= 1.0 - edp_[prec].q_w;

 

  f *= 1.0 - edp_[prec].q_w;

 

  f += edp_[prec].q_w * if_snd.ifq_len;

 

  for(i=0; i< MAX_PREC; i++)

    edv_[prec].avg_queue = f;

}

 

 

3.      Configure the related files and recompile.

 

[Simulation]

1.      Create a topology. (Video flow: Node 1à Node 2; Background Traffic: Node 4à Node 2; The output queue management for the link between Node 3 and Node 2: FIFO vs. WRED)

1

 

2.      Click “E” to edit property.

2

 

3

 

4

 

3.      Click “R” but don’t click Simulation/Run. (Here we first use FIFO for the output queue of Node 3.)

5

6

 

7

 

 

(Keep the following setting unchanged. “FIFO”)

9

 

4.      Copy st to test.sim and click Simulation/Run to start simulation.

5.      After simulation, use /tmp/mysd_1 and /tmp/myrd_2 to do the evaluation.

10

 

11

 

6.      Now change the FIFO queue to WRED and re-do the simulation again.

12

 

13

 

7.      After simulation, use /tmp/mysd_1 and /tmp/myrd_2 to do the evaluation.

10

 

14

 

8.      Use the gnuplot to draw the psnr values.

15

From the above figure, we can see that WRED can provide better delivered video quality.

 

Dr. Chih-Heng Ke (http://csie.nqu.edu.tw/smallko), smallko@gmail.com

Department of Computer Science and Information Engineering, National Quemoy University, Taiwan