Switch to side-by-side view

--- a/src/DIF/RA/RA.cc
+++ b/src/DIF/RA/RA.cc
@@ -51,41 +51,35 @@
     fwTable = ModuleAccess<PDUForwardingTable>("pduForwardingTable").get();
     flTable = ModuleAccess<FlowTable>("flowTable").get();
     rmt = (RMT*) this->getParentModule()->getParentModule()->getModuleByPath(".rmt.rmt");
+    rmtQM = (RMTQueueManager*) this->getParentModule()->getParentModule()->getModuleByPath(".rmt.rmtQueueManager");
 
     // initialize attributes
-    processName = getParentModule()->getParentModule()->par(PAR_IPCADDR).stdstringValue();
+    std::ostringstream os;
+    os << getParentModule()->getParentModule()->par(PAR_IPCADDR).stdstringValue()
+       << "_" << getParentModule()->getParentModule()->par(PAR_DIFNAME).stdstringValue();
+    processName  = os.str();
+
+    initSignalsAndListeners();
+    initQoSCubes();
 
     // determine and set RMT mode of operation
     setRmtMode();
 
-    initSignalsAndListeners();
-	initQoSCubes();
     WATCH_LIST(this->QosCubes);
 
-    initFlowAlloc();
-}
-
-void RA::initFlowAlloc()
-{
-    cXMLElement* dirXml = par("flows").xmlValue();
-    cXMLElementList map = dirXml->getChildrenByTagName("Flow");
-
-    for (cXMLElementList::iterator i = map.begin(); i != map.end(); ++i)
-    {
-        cXMLElement* m = *i;
-
-        APNamingInfo src = APNamingInfo(APN(processName));
-        APNamingInfo dst = APNamingInfo(APN(m->getAttribute("dest")));
-
-        Flow *fl = new Flow(src, dst);
-        // just use the first QoS cube available (temporary workaround)
-        fl->setQosParameters(getQosCubes().front());
-
-        preparedFlows.push_back(fl);
-        cMessage* msg = new cMessage("RA-CreateFlow");
-        scheduleAt(simTime(), msg);
-    }
-}
+}
+
+void RA::initSignalsAndListeners()
+{
+    cModule* catcher2 = this->getParentModule()->getParentModule();
+
+    sigRACreFloPosi = registerSignal(SIG_RA_CreateFlowPositive);
+    sigRACreFloNega = registerSignal(SIG_RA_CreateFlowNegative);
+
+    lisRACreFlow = new LisRACreFlow(this);
+    catcher2->subscribe(SIG_RIBD_CreateFlow, lisRACreFlow);
+}
+
 
 void RA::setRmtMode()
 {
@@ -96,25 +90,41 @@
     if (bottomGate == "medium$o")
     {
         // we're on wire! this is the bottommost "interface" DIF
+        rmt->setOnWire(true);
         // let's connect RMT to the medium
-        bindMediumToRMT();
-    }
-    else if (bottomGate == "northIo$o")
+        bindMediumToRMTQueue();
+    }
+    else if (bottomGate == "northIo$i")
     { // other IPC processes are below us
-        if (hostModule->gateSize("northIo") > 1)
-        {
-            // multiple (N-1)-DIFs are present, RMT shall be relaying
-            rmt->enableRelay();
-        }
-        else
-        {
-            // we're on top of a single IPC process
-        }
-    }
-}
-
-
-void RA::initQoSCubes() {
+        rmt->setOnWire(false);
+//        // create a queue for each QoS of each (N-1)-DIF
+//        for (int i = 0; i <= hostModule->gateSize("northIo"); i++)
+//        {
+//            EV << hostModule->gate("southIo$o", i)->getNextGate()->getOwnerModule()->getFullName() << endl;
+//            cModule* module = hostModule->gate("southIo$o", i)->getNextGate()->getOwnerModule();
+//            cModule* raBottom = module->getModuleByPath(".resourceAllocator.ra");
+//            QosCubeSet cubes = dynamic_cast<RABase*>(raBottom)->getQosCubes();
+//            for (QCubeCItem it = cubes.begin(); it != cubes.end(); ++it) {
+//                short qosid = it->getQosId();
+//                std::ostringstream ostr;
+//                ostr << "q_" << module->par("difName").stringValue() << "_" << qosid;
+//
+//                rmt->addQueueSet(ostr.str().c_str());
+//            }
+//        }
+    }
+
+    if (hostModule->par("routing").boolValue() == true)
+    {
+        rmt->enableRelay();
+    }
+
+    rmt->setSchedulingPolicy(new LongestQFirst);
+}
+
+
+void RA::initQoSCubes()
+{
     cXMLElement* qosXml = NULL;
     if (par(PAR_QOSDATA).xmlValue() != NULL && par(PAR_QOSDATA).xmlValue()->hasChildren())
         qosXml = par(PAR_QOSDATA).xmlValue();
@@ -136,99 +146,99 @@
         QosCube cube;
         cube.setQosId((unsigned short)atoi(m->getAttribute(ATTR_ID)));
         //Following data types should be same as in QosCubes.h
-        int avgBand                 = VAL_QOSPARAMDONOTCARE;    //Average bandwidth (measured at the application in bits/sec)
-        int avgSDUBand              = VAL_QOSPARAMDONOTCARE;    //Average SDU bandwidth (measured in SDUs/sec)
-        int peakBandDuration        = VAL_QOSPARAMDONOTCARE;    //Peak bandwidth-duration (measured in bits/sec);
-        int peakSDUBandDuration     = VAL_QOSPARAMDONOTCARE;    //Peak SDU bandwidth-duration (measured in SDUs/sec);
-        int burstPeriod             = VAL_QOSPARAMDONOTCARE;    //Burst period measured in useconds
-        int burstDuration           = VAL_QOSPARAMDONOTCARE;    //Burst duration, measured in usecs fraction of Burst Period
-        int undetectedBitErr        = VAL_QOSPARAMDONOTCARE;    //Undetected bit error rate measured as a probability
-        int maxSDUsize              = VAL_QOSPARAMDONOTCARE;    //MaxSDUSize measured in bytes
-        bool partDeliv              = VAL_QOSPARAMDEFBOOL;      //Partial Delivery - Can SDUs be delivered in pieces rather than all at once?
-        bool incompleteDeliv        = VAL_QOSPARAMDEFBOOL;      //Incomplete Delivery - Can SDUs with missing pieces be delivered?
-        bool forceOrder             = VAL_QOSPARAMDEFBOOL;      //Must SDUs be delivered in order?
-        unsigned int maxAllowGap    = VAL_QOSPARAMDONOTCARE;    //Max allowable gap in SDUs, (a gap of N SDUs is considered the same as all SDUs delivered, i.e. a gap of N is a "don't care.")
-        int delay                   = VAL_QOSPARAMDONOTCARE;    //Delay in usecs
-        int jitter                  = VAL_QOSPARAMDONOTCARE;    //Jitter in usecs2
-        int costtime                = VAL_QOSPARAMDONOTCARE;    //measured in $/ms
-        int costbits                = VAL_QOSPARAMDONOTCARE;    //measured in $/Mb
+        int avgBand                 = VAL_QOSPARDONOTCARE;    //Average bandwidth (measured at the application in bits/sec)
+        int avgSDUBand              = VAL_QOSPARDONOTCARE;    //Average SDU bandwidth (measured in SDUs/sec)
+        int peakBandDuration        = VAL_QOSPARDONOTCARE;    //Peak bandwidth-duration (measured in bits/sec);
+        int peakSDUBandDuration     = VAL_QOSPARDONOTCARE;    //Peak SDU bandwidth-duration (measured in SDUs/sec);
+        int burstPeriod             = VAL_QOSPARDONOTCARE;    //Burst period measured in useconds
+        int burstDuration           = VAL_QOSPARDONOTCARE;    //Burst duration, measured in usecs fraction of Burst Period
+        int undetectedBitErr        = VAL_QOSPARDONOTCARE;    //Undetected bit error rate measured as a probability
+        int maxSDUsize              = VAL_QOSPARDONOTCARE;    //MaxSDUSize measured in bytes
+        bool partDeliv              = VAL_QOSPARDEFBOOL;      //Partial Delivery - Can SDUs be delivered in pieces rather than all at once?
+        bool incompleteDeliv        = VAL_QOSPARDEFBOOL;      //Incomplete Delivery - Can SDUs with missing pieces be delivered?
+        bool forceOrder             = VAL_QOSPARDEFBOOL;      //Must SDUs be delivered in order?
+        unsigned int maxAllowGap    = VAL_QOSPARDONOTCARE;    //Max allowable gap in SDUs, (a gap of N SDUs is considered the same as all SDUs delivered, i.e. a gap of N is a "don't care.")
+        int delay                   = VAL_QOSPARDONOTCARE;    //Delay in usecs
+        int jitter                  = VAL_QOSPARDONOTCARE;    //Jitter in usecs2
+        int costtime                = VAL_QOSPARDONOTCARE;    //measured in $/ms
+        int costbits                = VAL_QOSPARDONOTCARE;    //measured in $/Mb
 
         cXMLElementList attrs = m->getChildren();
         for (cXMLElementList::iterator jt = attrs.begin(); jt != attrs.end(); ++jt) {
             cXMLElement* n = *jt;
             if ( !strcmp(n->getTagName(), ELEM_AVGBW) ) {
-                avgBand = n->getNodeValue() ? atoi(n->getNodeValue()) : VAL_QOSPARAMDONOTCARE;
+                avgBand = n->getNodeValue() ? atoi(n->getNodeValue()) : VAL_QOSPARDONOTCARE;
                 if (avgBand < 0)
-                    avgBand = VAL_QOSPARAMDONOTCARE;
+                    avgBand = VAL_QOSPARDONOTCARE;
             }
             else if (!strcmp(n->getTagName(), ELEM_AVGSDUBW)) {
-                avgSDUBand = n->getNodeValue() ? atoi(n->getNodeValue()) : VAL_QOSPARAMDONOTCARE;
+                avgSDUBand = n->getNodeValue() ? atoi(n->getNodeValue()) : VAL_QOSPARDONOTCARE;
                 if (avgSDUBand < 0)
-                    avgSDUBand = VAL_QOSPARAMDONOTCARE;
+                    avgSDUBand = VAL_QOSPARDONOTCARE;
             }
             else if (!strcmp(n->getTagName(), ELEM_PEAKBWDUR)) {
-                peakBandDuration = n->getNodeValue() ? atoi(n->getNodeValue()) : VAL_QOSPARAMDONOTCARE;
+                peakBandDuration = n->getNodeValue() ? atoi(n->getNodeValue()) : VAL_QOSPARDONOTCARE;
                 if (peakBandDuration < 0)
-                    peakBandDuration = VAL_QOSPARAMDONOTCARE;
+                    peakBandDuration = VAL_QOSPARDONOTCARE;
             }
             else if (!strcmp(n->getTagName(), ELEM_PEAKSDUBWDUR)) {
-                peakSDUBandDuration = n->getNodeValue() ? atoi(n->getNodeValue()) : VAL_QOSPARAMDONOTCARE;
+                peakSDUBandDuration = n->getNodeValue() ? atoi(n->getNodeValue()) : VAL_QOSPARDONOTCARE;
                 if (peakSDUBandDuration < 0)
-                    peakSDUBandDuration = VAL_QOSPARAMDONOTCARE;
+                    peakSDUBandDuration = VAL_QOSPARDONOTCARE;
             }
             else if (!strcmp(n->getTagName(), ELEM_BURSTPERIOD)) {
-                burstPeriod = n->getNodeValue() ? atoi(n->getNodeValue()) : VAL_QOSPARAMDONOTCARE;
+                burstPeriod = n->getNodeValue() ? atoi(n->getNodeValue()) : VAL_QOSPARDONOTCARE;
                 if (burstPeriod < 0)
-                    burstPeriod = VAL_QOSPARAMDONOTCARE;
+                    burstPeriod = VAL_QOSPARDONOTCARE;
             }
             else if (!strcmp(n->getTagName(), ELEM_BURSTDURATION)) {
-                burstDuration = n->getNodeValue() ? atoi(n->getNodeValue()) : VAL_QOSPARAMDONOTCARE;
+                burstDuration = n->getNodeValue() ? atoi(n->getNodeValue()) : VAL_QOSPARDONOTCARE;
                 if (burstDuration < 0)
-                    burstDuration = VAL_QOSPARAMDONOTCARE;
+                    burstDuration = VAL_QOSPARDONOTCARE;
             }
             else if (!strcmp(n->getTagName(), ELEM_UNDETECTBITERR)) {
-                undetectedBitErr = n->getNodeValue() ? atof(n->getNodeValue()) : VAL_QOSPARAMDONOTCARE;
+                undetectedBitErr = n->getNodeValue() ? atof(n->getNodeValue()) : VAL_QOSPARDONOTCARE;
                 if (undetectedBitErr < 0 || undetectedBitErr > 1 )
-                    undetectedBitErr = VAL_QOSPARAMDONOTCARE;
+                    undetectedBitErr = VAL_QOSPARDONOTCARE;
             }
             else if (!strcmp(n->getTagName(), ELEM_MAXSDUSIZE)) {
-                maxSDUsize = n->getNodeValue() ? atoi(n->getNodeValue()) : VAL_QOSPARAMDONOTCARE;
+                maxSDUsize = n->getNodeValue() ? atoi(n->getNodeValue()) : VAL_QOSPARDONOTCARE;
                 if (maxSDUsize < 0)
-                    maxSDUsize = VAL_QOSPARAMDONOTCARE;
+                    maxSDUsize = VAL_QOSPARDONOTCARE;
             }
             else if (!strcmp(n->getTagName(), ELEM_PARTIALDELIVER)) {
-                partDeliv = n->getNodeValue() ? atoi(n->getNodeValue()) : VAL_QOSPARAMDEFBOOL;
+                partDeliv = n->getNodeValue() ? atoi(n->getNodeValue()) : VAL_QOSPARDEFBOOL;
             }
             else if (!strcmp(n->getTagName(), ELEM_INCOMPLETEDELIVER)) {
-                incompleteDeliv = n->getNodeValue() ? atoi(n->getNodeValue()) : VAL_QOSPARAMDEFBOOL;
+                incompleteDeliv = n->getNodeValue() ? atoi(n->getNodeValue()) : VAL_QOSPARDEFBOOL;
             }
             else if (!strcmp(n->getTagName(), ELEM_FORCEORDER)) {
-                forceOrder = n->getNodeValue() ? atoi(n->getNodeValue()) : VAL_QOSPARAMDEFBOOL;
+                forceOrder = n->getNodeValue() ? atoi(n->getNodeValue()) : VAL_QOSPARDEFBOOL;
             }
             else if (!strcmp(n->getTagName(), ELEM_MAXALLOWGAP)) {
-                maxAllowGap = n->getNodeValue() ? atoi(n->getNodeValue()) : VAL_QOSPARAMDONOTCARE;
+                maxAllowGap = n->getNodeValue() ? atoi(n->getNodeValue()) : VAL_QOSPARDONOTCARE;
                 if (maxAllowGap < 0)
-                    maxAllowGap = VAL_QOSPARAMDONOTCARE;
+                    maxAllowGap = VAL_QOSPARDONOTCARE;
             }
             else if (!strcmp(n->getTagName(), ELEM_DELAY)) {
-                delay = n->getNodeValue() ? atoi(n->getNodeValue()) : VAL_QOSPARAMDONOTCARE;
+                delay = n->getNodeValue() ? atoi(n->getNodeValue()) : VAL_QOSPARDONOTCARE;
                 if (delay < 0)
-                    delay = VAL_QOSPARAMDONOTCARE;
+                    delay = VAL_QOSPARDONOTCARE;
             }
             else if (!strcmp(n->getTagName(), ELEM_JITTER)) {
-                jitter = n->getNodeValue() ? atoi(n->getNodeValue()) : VAL_QOSPARAMDONOTCARE;
+                jitter = n->getNodeValue() ? atoi(n->getNodeValue()) : VAL_QOSPARDONOTCARE;
                 if (jitter < 0)
-                    jitter = VAL_QOSPARAMDONOTCARE;
+                    jitter = VAL_QOSPARDONOTCARE;
             }
             else if (!strcmp(n->getTagName(), ELEM_COSTTIME)) {
-                costtime = n->getNodeValue() ? atoi(n->getNodeValue()) : VAL_QOSPARAMDEFBOOL;
+                costtime = n->getNodeValue() ? atoi(n->getNodeValue()) : VAL_QOSPARDEFBOOL;
                 if (costtime < 0)
-                    costtime = VAL_QOSPARAMDONOTCARE;
+                    costtime = VAL_QOSPARDONOTCARE;
             }
             else if (!strcmp(n->getTagName(), ELEM_COSTBITS)) {
-                costbits = n->getNodeValue() ? atoi(n->getNodeValue()) : VAL_QOSPARAMDEFBOOL;
+                costbits = n->getNodeValue() ? atoi(n->getNodeValue()) : VAL_QOSPARDEFBOOL;
                 if (costbits < 0)
-                    costbits = VAL_QOSPARAMDONOTCARE;
+                    costbits = VAL_QOSPARDONOTCARE;
             }
         }
 
@@ -252,7 +262,7 @@
         QosCubes.push_back(cube);
     }
     if (!QosCubes.size()) {
-        std::stringstream os;
+        std::ostringstream os;
         os << this->getFullPath() << " does not have any QoSCube in its set. It cannot work without at least one valid QoS cube!" << endl;
         error(os.str().c_str());
     }
@@ -265,81 +275,109 @@
         delete msg;
         return;
     }
-
-    if ( !strcmp(msg->getName(), "RA-CreateFlow") ) {
-        createFlow(preparedFlows.front());
-        preparedFlows.pop_front();
-    }
-}
-
-/**
- * Connects the RMT module to the medium defined in NED.
+}
+
+/**
+ * Connects the medium defined in NED to the RMT module.
  * Used only for the bottom IPC process in a processing system.
  */
-void RA::bindMediumToRMT()
-{
-    std::ostringstream rmtGate;
-    rmtGate << GATE_SOUTHIO << "PHY";
-
-    rmt->createSouthGate(rmtGate.str().c_str());
-    cGate* rmtIn = rmt->getParentModule()->gateHalf(rmtGate.str().c_str(), cGate::INPUT);
-    cGate* rmtOut = rmt->getParentModule()->gateHalf(rmtGate.str().c_str(), cGate::OUTPUT);
-
+void RA::bindMediumToRMTQueue()
+{
+    // retrieve the south gate
     cModule* thisIpc = this->getParentModule()->getParentModule();
     cGate* thisIpcIn = thisIpc->gateHalf("southIo$i", cGate::INPUT, 0);
     cGate* thisIpcOut = thisIpc->gateHalf("southIo$o", cGate::OUTPUT, 0);
 
-    rmtOut->connectTo(thisIpcOut);
-    thisIpcIn->connectTo(rmtIn);
-
-    rmt->addRMTPort(std::make_pair((cModule*)NULL, -1), rmtOut->getPathStartGate());
-}
-
-/**
- * Connects the RMT module to the specified (N-1)-flow.
+    // now let's connect rmtModule to bottom of IPC
+    // create an INOUT on the bottom of rmtModule
+    std::ostringstream rmtGate;
+    rmtGate << GATE_SOUTHIO_ << "PHY";
+
+    rmt->getParentModule()->addGate(rmtGate.str().c_str(), cGate::INOUT, false);
+    cGate* rmtModuleIn = rmt->getParentModule()->gateHalf(rmtGate.str().c_str(), cGate::INPUT);
+    cGate* rmtModuleOut = rmt->getParentModule()->gateHalf(rmtGate.str().c_str(), cGate::OUTPUT);
+
+    // interconnect rmtModule and bottom of IPC
+    rmtModuleOut->connectTo(thisIpcOut);
+    thisIpcIn->connectTo(rmtModuleIn);
+
+    RMTQueue* inputQueue = rmtQM->addQueue(RMTQueue::INPUT);
+    RMTQueue* outputQueue = rmtQM->addQueue(RMTQueue::OUTPUT);
+
+    outputQueue->getOutputGate()->connectTo(rmtModuleOut);
+    rmtModuleIn->connectTo(inputQueue->getInputGate());
+}
+
+/**
+ * Connects the specified (N-1)-flow to an RMT queue.
  *
  * @param ipc IPC process containing the (N-1)-flow
  * @param flow the (N-1)-flow
- */
-void RA::bindFlowToRMT(cModule* ipc, Flow* flow)
-{
+ * @return output gate for the (N-1)-flow
+ */
+RMTQueue* RA::bindLowerFlowToRmtQueue(cModule* ipc, FABase* fab, Flow* flow)
+{
+
+    EV << "attempting to bind an (N-1)-flow to a RMT queue..." << endl;
+    int portId = flow->getSrcPortId();
+    /*
+    if ( difAllocator->isAppLocal(flow->getDstApni().getApn()) )
+        portId = flow->getDstPortId();
+    else if ( difAllocator->isAppLocal(flow->getSrcApni().getApn()) )
+        portId = flow->getSrcPortId();
+    else
+        throw("Binding to inconsistant PortId occured!");
+    */
+
     // expand the given portId so it's unambiguous within this IPC
-    std::string combinedPortId = normalizePortId(ipc->getFullName(), flow->getSrcPortId());
-
-    std::ostringstream rmtGate;
-    rmtGate << GATE_SOUTHIO << combinedPortId;
-
-    rmt->createSouthGate(rmtGate.str());
-
-    // get (N-1)-IPC gates
+    std::string combinedPortId = normalizePortId(ipc->getFullName(), portId);
+
+    //// binding begins at the bottom: first, we need to interconnect both IPCs
+    // get (N-1)-IPC north gates
     std::ostringstream bottomIpcGate;
-    bottomIpcGate << "northIo_" << flow->getSrcPortId();
+    bottomIpcGate << GATE_NORTHIO_ << portId;
     cGate* bottomIpcIn = ipc->gateHalf(bottomIpcGate.str().c_str(), cGate::INPUT);
     cGate* bottomIpcOut = ipc->gateHalf(bottomIpcGate.str().c_str(), cGate::OUTPUT);
 
-    // get RMT gates
-    cGate* rmtIn = rmt->getParentModule()->gateHalf(rmtGate.str().c_str(), cGate::INPUT);
-    cGate* rmtOut = rmt->getParentModule()->gateHalf(rmtGate.str().c_str(), cGate::OUTPUT);
-
-    // create an intermediate border gate
+    // create a border south gate for this IPC
     cModule* thisIpc = this->getParentModule()->getParentModule();
-    std::ostringstream thisIpcGate;
-    thisIpcGate << "southIo_" << combinedPortId;
-
-    thisIpc->addGate(thisIpcGate.str().c_str(), cGate::INOUT, false);
-    cGate* thisIpcIn = thisIpc->gateHalf(thisIpcGate.str().c_str(), cGate::INPUT);
-    cGate* thisIpcOut = thisIpc->gateHalf(thisIpcGate.str().c_str(), cGate::OUTPUT);
-
-
+    std::ostringstream thisIpcGate_ostr;
+    thisIpcGate_ostr << GATE_SOUTHIO_ << combinedPortId;
+    std::string thisIpcGate = thisIpcGate_ostr.str();
+
+    thisIpc->addGate(thisIpcGate.c_str(), cGate::INOUT, false);
+    cGate* thisIpcIn = thisIpc->gateHalf(thisIpcGate.c_str(), cGate::INPUT);
+    cGate* thisIpcOut = thisIpc->gateHalf(thisIpcGate.c_str(), cGate::OUTPUT);
+
+    // interconnect IPCs
     bottomIpcOut->connectTo(thisIpcIn);
-    thisIpcIn->connectTo(rmtIn);
-
-    rmtOut->connectTo(thisIpcOut);
     thisIpcOut->connectTo(bottomIpcIn);
 
-    // modules are connected; register a handle
-    rmt->addRMTPort(std::make_pair(ipc, flow->getSrcPortId()), rmtOut->getPathStartGate());
-
+    //// now we'll connect bottom of this IPC to rmtModule
+
+    rmt->getParentModule()->addGate(thisIpcGate.c_str(), cGate::INOUT, false);
+    cGate* rmtModuleIn = rmt->getParentModule()->gateHalf(thisIpcGate.c_str(), cGate::INPUT);
+    cGate* rmtModuleOut = rmt->getParentModule()->gateHalf(thisIpcGate.c_str(), cGate::OUTPUT);
+
+    // interconnect rmtModule and bottom of IPC
+    rmtModuleOut->connectTo(thisIpcOut);
+    thisIpcIn->connectTo(rmtModuleIn);
+
+    // all that's left to do is a binding of rmtModule south gate to a queue...
+    // since the specifications are (at this time) a bit unclear about cardinality
+    // between queues and (N-1)-flows, we'll simply create a new set of queues
+    // for each (N-1)-flow, i.e. 1:1
+
+    RMTQueue* inputQueue = rmtQM->addQueue(RMTQueue::INPUT);
+    RMTQueue* outputQueue = rmtQM->addQueue(RMTQueue::OUTPUT);
+
+    rmtModuleIn->connectTo(inputQueue->getInputGate());
+    outputQueue->getOutputGate()->connectTo(rmtModuleOut);
+
+    // update the flow table
+    flTable->insert(flow, fab, inputQueue, outputQueue, thisIpcGate);
+
+    return outputQueue;
 }
 
 /**
@@ -348,7 +386,7 @@
  *
  * @param ipcName module identifier of an underlying IPC process
  * @param flowPortId original portId to be expanded
- * @return normalizes port-id
+ * @return normalized port-id
  */
 std::string RA::normalizePortId(std::string ipcName, int flowPortId)
 {
@@ -358,22 +396,21 @@
 }
 
 /**
- * Creates an (N-1)-flow.
+ * Creates an (N-1)-flow (this is the mechanism behind Allocate() call).
  *
- * @param dstIpc address of the destination IPC process
- */
-void RA::createFlow(Flow *fl)
-{
+ * @param flow specified flow object
+ */
+void RA::createFlow(Flow *flow)
+{
+    Enter_Method("createFlow()");
+
     //Ask DA which IPC to use to reach dst App
-    DirectoryEntry* de = difAllocator->resolveApn(fl->getDstApni().getApn());
-
-    if (de == NULL) {
-        EV << "DA does not know target application." << endl;
+    const Address* ad = difAllocator->resolveApnToBestAddress(flow->getDstApni().getApn());
+    if (ad == NULL) {
+        EV << "DifAllocator returned NULL for resolving " << flow->getDstApni().getApn() << endl;
         return;
     }
-
-    //TODO: Vesely - Now using first available APN to DIFMember mapping
-    Address addr = de->getSupportedDifs().front();
+    Address addr = *ad;
 
     //TODO: Vesely - New IPC must be enrolled or DIF created
     if (!difAllocator->isDifLocal(addr.getDifName())) {
@@ -386,63 +423,186 @@
     FABase* fab = difAllocator->findFaInsideIpc(targetIpc);
 
     //Command target FA to allocate flow
-    bool status = fab->receiveAllocateRequest(fl);
+    bool status = fab->receiveAllocateRequest(flow);
+
+    //FIXME: Vesely - WAIT!
 
     //If AllocationRequest ended by creating connections
     if (status)
     {
         // connect the new flow to the RMT
-        bindFlowToRMT(targetIpc, fl);
+        RMTQueue* outQ = bindLowerFlowToRmtQueue(targetIpc, fab, flow);
         // we're ready to go!
-        //signalizeFlowAllocated(fl);
-        flTable->insert(fl, fab);
+        // update the PDU forwarding table
+        fwTable->insert(Address(flow->getDstApni().getApn().getName()), flow->getConId().getQoSId(), outQ);
     }
     else
     {
        EV << "Flow not allocated!" << endl;
     }
-
-}
-
-void RA::initSignalsAndListeners() {
-/*
-    // allocation request
-    sigRAAllocReq      = registerSignal(SIG_RA_AllocateRequest);
-    // deallocation request
-    sigRADeallocReq    = registerSignal(SIG_RA_DeallocateRequest);
-    // positive response to allocation request
-    sigRAAllocResPosi  = registerSignal(SIG_RA_AllocateResponsePositive);
-    // negative response to allocation request
-    sigRAAllocResNega  = registerSignal(SIG_RA_AllocateResponseNegative);
-    // successful allocation of an (N-1)-flow
-    sigRAFlowAllocd  = registerSignal(SIG_RA_FlowAllocated);
-    // successful deallocation of an (N-1)-flow
-    sigRAFlowDeallocd  = registerSignal(SIG_RA_FlowDeallocated);
-*/
-}
-
-/*
-void RA::signalizeAllocateRequest(Flow* flow) {
-    emit(sigRAAllocReq, flow);
-}
-
-void RA::signalizeDeallocateRequest(Flow* flow) {
-    emit(sigRADeallocReq, flow);
-}
-
-void RA::signalizeAllocateResponsePositive(Flow* flow) {
-    emit(sigRAAllocResPosi, flow);
-}
-
-void RA::signalizeAllocateResponseNegative(Flow* flow) {
-    emit(sigRAAllocResNega, flow);
-}
-
-void RA::signalizeFlowAllocated(Flow* flow) {
-    emit(sigRAFlowAllocd, flow);
-}
-
-void RA::signalizeFlowDeallocated(Flow* flow) {
-    emit(sigRAFlowDeallocd, flow);
-}
-*/
+}
+
+/**
+ * Creates an (N-1)-flow (this is the mechanism behind response to an M_CREATE request).
+ *
+ * @param flow specified flow object
+ */
+void RA::createFlowWithoutAllocate(Flow* flow)
+{
+    Enter_Method("createFlowWoAlloc()");
+
+    //Ask DA which IPC to use to reach dst App
+    const Address* ad = difAllocator->resolveApnToBestAddress(flow->getDstApni().getApn());
+    if (ad == NULL) {
+        EV << "DifAllocator returned NULL for resolving " << flow->getDstApni().getApn() << endl;
+        signalizeCreateFlowNegativeToRibd(flow);
+        return;
+    }
+    Address addr = *ad;
+
+    //TODO: Vesely - New IPC must be enrolled or DIF created
+    if (!difAllocator->isDifLocal(addr.getDifName())) {
+        EV << "Local CS does not have any IPC in DIF " << addr.getDifName() << endl;
+        signalizeCreateFlowNegativeToRibd(flow);
+        return;
+    }
+
+    //Retrieve DIF's local IPC member
+    cModule* targetIpc = difAllocator->getDifMember(addr.getDifName());
+    FABase* fab = difAllocator->findFaInsideIpc(targetIpc);
+
+    // connect the new flow to the RMT
+    RMTQueue* outQ = bindLowerFlowToRmtQueue(targetIpc, fab, flow);
+    // we're ready to go!
+    // update the PDU forwarding table
+    fwTable->insert(Address(flow->getDstApni().getApn().getName()), flow->getConId().getQoSId(), outQ);
+    // add other accessible applications into forwarding table
+    const APNList* remoteApps = difAllocator->findNeigborApns(flow->getDstApni().getApn());
+    if (remoteApps)
+    {
+        for (ApnCItem it = remoteApps->begin(); it != remoteApps->end(); ++it)
+        {
+            Address addr = Address(it->getName());
+            fwTable->insert(addr, flow->getConId().getQoSId(), outQ);
+        }
+    }
+
+    signalizeCreateFlowPositiveToRibd(flow);
+}
+
+/**
+ * Removes specified (N-1)-flow and bindings (this is the mechanism behind Deallocate() call).
+ *
+ * @param flow specified flow object
+ */
+void RA::removeFlow(Flow *flow)
+{
+    cModule* thisIpc = this->getParentModule()->getParentModule();
+    FlowTableItem* flowItem = flTable->lookup(flow);
+    const char* gateName = flowItem->getGateName().c_str();
+
+    cGate* thisIpcIn = thisIpc->gateHalf(gateName, cGate::INPUT);
+    cGate* thisIpcOut = thisIpc->gateHalf(gateName, cGate::OUTPUT);
+
+    cGate* rmtModuleIn = rmt->getParentModule()->gateHalf(gateName, cGate::INPUT);
+    cGate* rmtModuleOut = rmt->getParentModule()->gateHalf(gateName, cGate::OUTPUT);
+
+    thisIpcIn->disconnect();
+    thisIpcOut->disconnect();
+    rmtModuleIn->disconnect();
+    rmtModuleOut->disconnect();
+    flowItem->getRmtOutputQueue()->getOutputGate()->disconnect();
+
+    rmtQM->removeQueue(flowItem->getRmtInputQueue());
+    rmtQM->removeQueue(flowItem->getRmtOutputQueue());
+
+    flowItem->getFaBase()->receiveDeallocateRequest(flow);
+    thisIpc->deleteGate(gateName);
+    rmt->getParentModule()->deleteGate(gateName);
+
+    fwTable->remove(flowItem->getRmtOutputQueue());
+    flTable->remove(flow);
+}
+
+/**
+ * Creates a binding between a flow in this IPC and medium defined in NED
+ * @param flow specified flow object
+ *
+ */
+void RA::bindFlowToMedium(Flow* flow)
+{
+    EV << "binding a flow to the medium" << endl;
+
+    RMTQueue* outQueue = rmtQM->getFirst(RMTQueue::OUTPUT);
+    rmt->addEfcpiToQueueMapping(flow->getConnectionId().getSrcCepId(), outQueue);
+}
+
+/**
+ * Creates a binding between a flow in this IPC and a suitable (N-1)-flow
+ *
+ * @param flow specified flow object
+ * @return Returns TRUE if connected onWire else return FALSE
+ */
+bool RA::bindFlowToLowerFlow(Flow* flow)
+{
+    //EV << "XXXXXXXXXXXXXX" << flow->info();
+    Enter_Method("bindToLowerFlow()");
+    if (rmt->isOnWire())
+    {
+        bindFlowToMedium(flow);
+        return true;
+    }
+
+    std::string neighAddr = flow->getDstNeighbor().getApname().getName();
+    std::string dstAddr = flow->getDstAddr().getApname().getName();
+    unsigned short qosId = flow->getConId().getQoSId();
+
+    // see if any appropriate (N-1)-flow already exists
+    FlowTableItem* curFlow = NULL;
+    curFlow = flTable->lookup(neighAddr, qosId);
+
+    if (curFlow == NULL)
+    { // we need to create a new (N-1)-flow to suit our needs
+        EV << "creating an (N-1)-flow (dst Addr " << neighAddr << ")" << endl;
+
+        APNamingInfo src = APNamingInfo(APN(processName));
+        APNamingInfo dst = APNamingInfo(APN(neighAddr));
+
+        Flow *lowerFlow = new Flow(src, dst);
+        lowerFlow->setQosParameters(flow->getQosParameters());
+        createFlow(lowerFlow);
+    }
+
+    EV << "binding a flow to an (N-1)-flow" << endl;
+
+    // add efcpi->rmtQueue mapping for direct multiplexing
+    FlowTableItem* targetFlow = flTable->lookup(neighAddr, qosId);
+    if (targetFlow == NULL)
+    {
+        EV << "!!! something went wrong! there isn't any suitable (N-1)-flow present for dst "
+           << neighAddr << " so it won't be multiplexed further." << endl;
+    }
+    else
+    {
+        rmt->addEfcpiToQueueMapping(flow->getConnectionId().getSrcCepId(),
+                                    targetFlow->getRmtOutputQueue());
+        // add another fwtable entry for direct srcApp->dstApp messages (if needed)
+        if (neighAddr != dstAddr)
+        {
+            fwTable->insert(Address(dstAddr), qosId, targetFlow->getRmtOutputQueue());
+        }
+    }
+
+    return false;
+}
+
+
+void RA::signalizeCreateFlowPositiveToRibd(Flow* flow) {
+    EV << "Emits CreateFlowPositive signal for flow" << endl;
+    emit(sigRACreFloPosi, flow);
+}
+
+void RA::signalizeCreateFlowNegativeToRibd(Flow* flow) {
+    EV << "Emits CreateFlowNegative signal for flow" << endl;
+    emit(sigRACreFloNega, flow);
+}