Switch to side-by-side view

--- a/src/DIF/RMT/RMT.cc
+++ b/src/DIF/RMT/RMT.cc
@@ -26,95 +26,99 @@
 
 Define_Module(RMT);
 
-const int PROCESSING_DELAY = 0.0;
-
 RMT::RMT()
 {
     relayOn = false;
-}
-
-RMT::~RMT()
-{
-}
-
-
-void RMT::initialize() {
+    onWire = false;
+    waitingMsgs = 0;
+}
+
+RMT::~RMT() {}
+
+
+void RMT::initialize()
+{
+    WATCH_PTRMAP(efcpiToQueue);
+
     fwTable = ModuleAccess<PDUForwardingTable>("pduForwardingTable").get();
-    //ports = ModuleAccess<RMTPortManager>("rmtPortManager").get();
+    queues = ModuleAccess<RMTQueueManager>("rmtQueueManager").get();
 
     cModule* ipcModule = getParentModule()->getParentModule();
     thisIpcAddr = Address(ipcModule->par("ipcAddress").stringValue(),
                           ipcModule->par("difName").stringValue());
 
-    //this->enableRelay();
-}
-
-/**
- * Creates a gate to be used for connection to an (N-1)-flow.
- *
- * @param gateName name to be given to the newly created gate; it has to be
- *        unambiguous within the scope of this IPC's management of (N-1)-flows
- */
-void RMT::createSouthGate(std::string gateName)
-{
+    // register signal for notifying others about a missing local EFCP instance
+    sigRMTNoConnID = registerSignal(SIG_RMT_NoConnId);
+
+    // listen for a signal indicating that a new message has arrived into a queue
+    lisRMTMsgRcvd = new LisRMTPDURcvd(this);
+    getParentModule()->subscribe(SIG_RMT_MessageReceived, lisRMTMsgRcvd);
+}
+
+/**
+ * Returns the current scheduling policy.
+ *
+ *@return pointer to current policy class
+ */
+RMTSchedulingBase* RMT::getSchedulingPolicy()
+{
+    return schedPolicy;
+}
+
+/**
+ * Changes the current scheduling policy.
+ *
+ * @param policy pointer to the new policy class
+ */
+void RMT::setSchedulingPolicy(RMTSchedulingBase* policy)
+{
+    schedPolicy = policy;
+}
+
+/**
+ * Schedules an end-of-queue-service event.
+ *
+ */
+void RMT::scheduleServiceEnd()
+{
+    scheduleAt(simTime() + par("queueServiceTime"), new cMessage("queueServiceDone"));
+}
+
+/**
+ * Tries to begin service of a message that has newly arrived into a queue.
+ * If servicing takes place right now, the wait counter is increased instead.
+ *
+ */
+void RMT::invokeSchedulingPolicy()
+{
+    Enter_Method("invokeSchedulingPolicy()");
+
+    if (!waitingMsgs)
+    {
+        scheduleServiceEnd();
+    }
+    else
+    {
+        waitingMsgs++;
+    }
+}
+
+/**
+ * Creates a gate to be used for connection to an EFCP instance.
+ *
+ * @param efcpiId CEP-id to be used in the gate's name
+ */
+void RMT::createEfcpiGate(unsigned int efcpiId)
+{
+    if (efcpiOut.count(efcpiId))
+    {
+        return;
+    }
+
     cModule* rmtModule = getParentModule();
 
-    this->addGate(gateName.c_str(), cGate::INOUT, false);
-    cGate* rmtIn = this->gateHalf(gateName.c_str(), cGate::INPUT);
-    cGate* rmtOut = this->gateHalf(gateName.c_str(), cGate::OUTPUT);
-
-    rmtModule->addGate(gateName.c_str(), cGate::INOUT, false);
-    cGate* rmtModuleIn = rmtModule->gateHalf(gateName.c_str(), cGate::INPUT);
-    cGate* rmtModuleOut = rmtModule->gateHalf(gateName.c_str(), cGate::OUTPUT);
-
-    rmtModuleIn->connectTo(rmtIn);
-    rmtOut->connectTo(rmtModuleOut);
-
-    // RMT<->(N-1)-EFCP interconnection shall be done by the RA
-}
-
-/**
- * Removes a gate used for a (N-1)-flow.
- *
- * @param gateName name of the gate meant for removal
- */
-void RMT::deleteSouthGate(std::string gateName)
-{
-    cModule* rmtModule = getParentModule();
-
-    cGate* rmtOut = this->gateHalf(gateName.c_str(), cGate::OUTPUT);
-    cGate* rmtModuleIn = rmtModule->gateHalf(gateName.c_str(), cGate::INPUT);
-
-    rmtOut->disconnect();
-    rmtModuleIn->disconnect();
-
-    this->deleteGate(gateName.c_str());
-    rmtModule->deleteGate(gateName.c_str());
-
-    //ports.erase(gateName);
-}
-
-void RMT::addRMTPort(RMTPortId portId, cGate* gate)
-{
-    ports[portId] = gate;
-}
-
-/**
- * Creates a gate to be used for connection to an EFCP instance.
- *
- * @param efcpiId CEP-id to be used in the gate's name
- */
-void RMT::createEfcpiGate(unsigned int efcpiId)
-{
-    if (efcpiGates.count(efcpiId))
-    {
-        return;
-    }
-
-    cModule* rmtModule = getParentModule();
-
     std::ostringstream gateName_str;
-    gateName_str << GATE_EFCPIO << efcpiId;
+    gateName_str << GATE_EFCPIO_ << efcpiId;
 
     this->addGate(gateName_str.str().c_str(), cGate::INOUT, false);
     cGate* rmtIn = this->gateHalf(gateName_str.str().c_str(), cGate::INPUT);
@@ -127,7 +131,8 @@
     rmtModuleIn->connectTo(rmtIn);
     rmtOut->connectTo(rmtModuleOut);
 
-    efcpiGates[efcpiId] = rmtOut;
+    efcpiOut[efcpiId] = rmtOut;
+    efcpiIn[efcpiId] = rmtIn;
 
     // RMT<->EFCP interconnection shall be done by the FAI
 }
@@ -139,7 +144,7 @@
  */
 void RMT::deleteEfcpiGate(unsigned int efcpiId)
 {
-    if (!efcpiGates.count(efcpiId))
+    if (!efcpiOut.count(efcpiId))
     {
         return;
     }
@@ -147,7 +152,7 @@
     cModule* rmtModule = getParentModule();
 
     std::ostringstream gateName_str;
-    gateName_str << GATE_EFCPIO << efcpiId;
+    gateName_str << GATE_EFCPIO_ << efcpiId;
 
     cGate* rmtOut = this->gateHalf(gateName_str.str().c_str(), cGate::OUTPUT);
     cGate* rmtModuleIn = rmtModule->gateHalf(gateName_str.str().c_str(), cGate::INPUT);
@@ -158,134 +163,251 @@
     this->deleteGate(gateName_str.str().c_str());
     rmtModule->deleteGate(gateName_str.str().c_str());
 
-    efcpiGates.erase(efcpiId);
-}
-
-/**
- * Passes given PDU to an (N-1)-flow using information from the forwarding table.
- *
- * @param pdu PDU to be sent
- */
-void RMT::sendDown(PDU_Base* pdu)
-{
-    Address& pduDestAddr = pdu->getDstAddr();
-    int pduQosId = pdu->getConnId().getQoSId();
-    cGate* outPort;
-
-    if (relayOn)
-    {
-        // forwarding table lookup
-        RMTPortId outPortId = fwTable->lookup(pduDestAddr, pduQosId);
-
-        if (outPortId.first != NULL)
+    deleteEfcpiToQueueMapping(efcpiId);
+    efcpiOut.erase(efcpiId);
+    efcpiIn.erase(efcpiId);
+}
+
+/**
+ * Creates a direct mapping from an EFCPI input gate to a RMT queue.
+ *
+ * @param cepId EFCP ID
+ * @param outQueue output RMT queue
+ */
+void RMT::addEfcpiToQueueMapping(unsigned cepId, RMTQueue* outQueue)
+{
+    efcpiToQueue[efcpiIn[cepId]] = outQueue;
+}
+
+/**
+ * Deletes a direct EFCPI->queue mapping.
+ *
+ * @param cepId EFCP ID
+ */
+void RMT::deleteEfcpiToQueueMapping(unsigned cepId)
+{
+    efcpiToQueue.erase(efcpiIn[cepId]);
+}
+
+/**
+ * Wrapper around forwarding table lookup returning the target output gate itself.
+ *
+ * @param destAddr destination address
+ * @param qosId qos-id
+ * @return RMT gate leading to an output RMT queue
+ */
+cGate* RMT::fwTableLookup(Address& destAddr, short qosId)
+{
+    cGate* outGate = NULL;
+    RMTQueue* outQueue = fwTable->lookup(destAddr, qosId);
+
+    if (outQueue != NULL)
+    {
+        outGate = outQueue->getRmtAccessGate();
+    }
+
+    return outGate;
+}
+
+/**
+ * Passes a PDU from an EFCP instance to an appropriate output queue.
+ *
+ * @param pdu PDU to be passed
+ */
+void RMT::efcpiToPort(PDU_Base* pdu)
+{
+    cGate* outGate = NULL;
+
+    outGate = efcpiToQueue[pdu->getArrivalGate()]->getRmtAccessGate();
+    if (outGate != NULL)
+    {
+        EV << this->getFullPath() << " passing a PDU to an output queue" << endl;
+        send(pdu, outGate);
+    }
+    else
+    {
+        EV << this->getFullPath() << "efcpi->port-id mapping not present!" << endl;
+    }
+}
+
+/**
+ * Passes a PDU from an (N-1)-port to an EFCP instance.
+ *
+ * @param pdu PDU to be passed
+ */
+void RMT::portToEfcpi(PDU_Base* pdu)
+{
+    unsigned cepId = pdu->getConnId().getDstCepId();
+    cGate* efcpiGate = efcpiOut[cepId];
+
+    if (efcpiGate != NULL)
+    {
+        EV << this->getFullPath() << " passing a PDU upwards to EFCPI " << cepId << endl;
+        send(pdu, efcpiGate);
+    }
+    else
+    {
+        EV << this->getFullPath() << " EFCPI " << cepId << " isn't present on this system! Notifying other modules." << endl;
+        emit(sigRMTNoConnID, pdu);
+        //delete pdu;
+    }
+
+}
+
+/**
+ * Passes a PDU from an EFCP instance to another local EFCP instance.
+ *
+ * @param pdu PDU to be passed
+ */
+void RMT::efcpiToEfcpi(PDU_Base* pdu)
+{
+    portToEfcpi(pdu);
+}
+
+/**
+ * Passes a CDAP mesage from an (N-1)-port instance to RIB daemon.
+ *
+ * @param cdap CDAP message to be passed
+ */
+void RMT::portToRIB(CDAPMessage* cdap)
+{
+
+    send(cdap, "ribdIo$o");
+}
+
+/**
+ * Passes a CDAP mesage from the RIB daemon to an appropriate output queue.
+ *
+ * @param cdap CDAP message to be passed
+ */
+void RMT::RIBToPort(CDAPMessage* cdap)
+{
+    cGate* outGate = NULL;
+
+    if (!onWire)
+    {
+        outGate = fwTableLookup(cdap->getDstAddr(), -1);
+    }
+    else
+    {
+        outGate = queues->getFirst(RMTQueue::OUTPUT)->getRmtAccessGate();
+    }
+
+    if (outGate != NULL)
+    {
+        EV << this->getFullPath() << " passing a CDAP message to an output queue" << endl;
+        send(cdap, outGate);
+    }
+    else
+    {
+        EV << "there isn't any suitable output queue available!" << endl;
+    }
+}
+
+/**
+ * Relays incoming message to an output queue based on data from PDUFwTable.
+ *
+ * @param msg either a PDU or a CDAP message to be relayed
+ */
+void RMT::portToPort(cMessage* msg)
+{
+    Address destAddr;
+    short qosId;
+
+    if (dynamic_cast<PDU_Base*>(msg) != NULL)
+    {
+        destAddr = ((PDU_Base*)msg)->getDstAddr();
+        qosId = ((PDU_Base*)msg)->getConnId().getQoSId();
+    }
+    else if (dynamic_cast<CDAPMessage*>(msg) != NULL)
+    {
+        destAddr = ((CDAPMessage*)msg)->getDstAddr();
+        qosId = -1;
+    }
+
+    cGate* outGate = NULL;
+
+    if (onWire)
+    {
+        outGate = queues->getFirst(RMTQueue::OUTPUT)->getRmtAccessGate();
+    }
+    else
+    {
+        outGate = fwTableLookup(destAddr, qosId);
+    }
+
+    if (outGate != NULL)
+    {
+        EV << this->getFullPath() << " relaying a message" << endl;
+        send(msg, outGate);
+    }
+    else
+    {
+        EV << this->getFullPath() << " I can't reach any suitable (N-1)-flow! Seems like none is allocated." << endl;
+    }
+
+}
+
+/**
+ * The main finite state machine of Relaying and Multiplexing task.
+ * Makes a decision about what to do with incoming message based on arrival gate,
+ * message type, message destination address and RMT mode of operation.
+ *
+ * @param msg either a PDU or a CDAP message
+ */
+void RMT::processMessage(cMessage* msg)
+{
+    std::string gate = msg->getArrivalGate()->getName();
+
+    if (dynamic_cast<PDU_Base*>(msg) != NULL)
+    { // PDU arrival
+        PDU_Base* pdu = (PDU_Base*) msg;
+
+        if (gate.substr(0, 2) == "in")
         {
-            outPort = ports[outPortId];
+            if (pdu->getDstAddr() == thisIpcAddr)
+            {
+                portToEfcpi(pdu);
+            }
+            else if (relayOn)
+            {
+                portToPort(msg);
+            }
+            else
+            {
+                EV << getFullPath() << " This PDU isn't for me! Holding it here." << endl;
+            }
+        }
+        else if (gate.substr(0, 7) == GATE_EFCPIO_)
+        {
+            if (pdu->getDstAddr() == thisIpcAddr)
+            {
+                efcpiToEfcpi(pdu);
+            }
+            else
+            {
+                efcpiToPort(pdu);
+            }
+        }
+    }
+    else if (dynamic_cast<CDAPMessage*>(msg) != NULL)
+    { // management message arrival
+        CDAPMessage* cdap = (CDAPMessage*) msg;
+
+        if (gate.substr(0, 2) == "in")
+        {
+            if (cdap->getDstAddr() == thisIpcAddr)
+            {
+                portToRIB(cdap);
+            }
+            else
+            {
+                portToPort(msg);
+            }
         }
         else
         {
-            EV << this->getFullPath() << " couldn't find any match in FWTable; dropping." << endl;
-            delete pdu;
-            return;
-        }
-
-    }
-    else
-    {
-        // decide which (N-1)-flow should get the PDU...
-        // we'll just grab the first one for now
-        outPort = ports.begin()->second;
-    }
-
-    EV << this->getFullPath() << " passing a PDU downwards..." << endl;
-
-    if (outPort != NULL)
-    {
-        // TODO: OMNeT++ 4.4.1 renders the message transfer as if this was sent to this IPC's EFCP. Is this a bug?
-        send(pdu, outPort);
-    }
-    else
-    {
-        EV << this->getFullPath()
-           << " I can't reach a suitable (N-1)-flow! It's probably not allocated. Dropping."
-           << endl;
-        delete pdu;
-    }
-}
-
-/**
- * Passes given PDU to the appropriate EFCP instance using PDU's destination CEP-id.
- *
- * @param pdu PDU to be sent
- */
-void RMT::sendUp(PDU_Base* pdu)
-{
-    Address& pduAddr = pdu->getDstAddr();
-
-    if (thisIpcAddr == pduAddr)
-    {
-        EV << this->getFullPath() << " passing a PDU upwards to EFCPI " << pdu->getConnId().getDstCepId() << endl;
-        cGate* efcpiGate = efcpiGates[pdu->getConnId().getDstCepId()];
-
-        if (efcpiGate != NULL)
-        {
-            send(pdu, efcpiGate);
-        }
-        else
-        {
-            EV << this->getFullPath()
-               << " I'm not connected to such EFCPI! Notifying other modules."
-               << endl;
-            // TODO: emit(cosi)
-            delete pdu;
-        }
-    }
-    else
-    { // this PDU isn't for us
-        if (relayOn)
-        { // ...let's relay it somewhere else
-            EV << this->getFullPath() << " relaying a PDU elsewhere" << endl;
-            sendDown(pdu);
-        }
-        else
-        { //
-            EV << this->getFullPath()
-               << " this PDU isn't for me, dropping it! (" << thisIpcAddr << " != " << pduAddr << ")"
-               << endl;
-            delete pdu;
-        }
-    }
-}
-
-void RMT::handleMessage(cMessage *msg)
-{
-    std::string gate = msg->getArrivalGate()->getName();
-
-    if (msg->isSelfMessage())
-    {
-        delete msg;
-    }
-    else if (dynamic_cast<PDU_Base*>(msg) != NULL)
-    { // PDU arrival
-        PDU_Base* pdu = (PDU_Base*) msg;
-
-        if (gate.substr(0, 8) == GATE_SOUTHIO)
-        {
-            sendUp(pdu);
-        }
-        else if (gate.substr(0, 7) == GATE_EFCPIO)
-        {
-            sendDown(pdu);
-        }
-    }
-    else if (dynamic_cast<CDAPMessage*>(msg) != NULL)
-    { // management message arrival
-        if (gate == "southIo$i")
-        {
-            send(msg, "ribdIo$o");
-        }
-        else
-        {
-            send(msg, "southIo$o", 0);
+            RIBToPort(cdap);
         }
     }
     else
@@ -295,3 +417,24 @@
     }
 }
 
+void RMT::handleMessage(cMessage *msg)
+{
+    if (msg->isSelfMessage())
+    {
+        if (!opp_strcmp(msg->getFullName(), "queueServiceDone"))
+        {
+            schedPolicy->run(queues);
+            if (waitingMsgs)
+            {
+                scheduleServiceEnd();
+                waitingMsgs--;
+            }
+        }
+        delete msg;
+    }
+    else
+    {
+        processMessage(msg);
+    }
+}
+