Switch to side-by-side view

--- a/src/DIF/RMT/RMT.cc
+++ b/src/DIF/RMT/RMT.cc
@@ -1,5 +1,5 @@
 //
-// Copyright © 2014 PRISTINE Consortium (http://ict-pristine.eu)
+// Copyright © 2014 - 2015 PRISTINE Consortium (http://ict-pristine.eu)
 // 
 // This program is free software: you can redistribute it and/or modify
 // it under the terms of the GNU Lesser General Public License as published by
@@ -26,18 +26,21 @@
 
 Define_Module(RMT);
 
-RMT::RMT()
-: relayOn(false), onWire(false)
-{
-}
-
 RMT::~RMT()
 {
+    while (!invalidPDUs.empty())
+    {
+        delete invalidPDUs.front();
+        invalidPDUs.pop_front();
+    }
 }
 
 
 void RMT::initialize()
 {
+    relayOn = false;
+    onWire = false;
+
     // get pointers to other components
     fwTable = check_and_cast<PDUForwardingTable*>
         (getModuleByPath("^.^.resourceAllocator.pduForwardingTable"));
@@ -54,11 +57,8 @@
         (getModuleByPath("^.^.resourceAllocator.queueAllocPolicy"));
     queueIdGenerator = check_and_cast<QueueIDGenBase*>
         (getModuleByPath("^.^.resourceAllocator.queueIdGenerator"));
-
-    // set up some parameters
-    cModule* ipcModule = getParentModule()->getParentModule();
-    thisIpcAddr = Address(ipcModule->par(PAR_IPCADDR).stringValue(),
-                          ipcModule->par(PAR_DIFNAME).stringValue());
+    addrComparator = check_and_cast<AddressComparatorBase*>
+        (getModuleByPath("^.^.resourceAllocator.addressComparator"));
 
     // register a signal for notifying others about a missing local EFCP instance
     sigRMTNoConnID = registerSignal(SIG_RMT_NoConnId);
@@ -74,8 +74,26 @@
     // listen for a signal indicating that a port is ready to serve
     lisRMTPortReady = new LisRMTPortReady(this);
     getParentModule()->subscribe(SIG_RMT_PortReadyToServe, lisRMTPortReady);
-}
-
+
+    WATCH(relayOn);
+    WATCH(onWire);
+}
+
+
+void RMT::finish()
+{
+    size_t pduCount = invalidPDUs.size();
+    if (pduCount)
+    {
+        EV << "This RMT still contains " << pduCount << " unprocessed PDUs!" << endl;
+
+        for (std::deque<cMessage*>::iterator it = invalidPDUs.begin(); it != invalidPDUs.end(); ++it)
+        {
+            cMessage* m = *it;
+            EV << m->getClassName() << " received at " << m->getArrivalTime() << endl;
+        }
+    }
+}
 
 /**
  * Invokes RMT policies related to queue processing. To be called when a message
@@ -88,6 +106,7 @@
     Enter_Method("invokeQueueArrivalPolicies()");
 
     RMTQueue* queue = check_and_cast<RMTQueue*>(obj);
+    RMTPort* port = rmtAllocator->getQueueToPortMapping(queue);
 
     // invoke monitor policy
     qMonPolicy->onMessageArrival(queue);
@@ -98,7 +117,7 @@
         // if the PDU has to be dropped, finish it here
         if (maxQPolicy->run(queue))
         {
-            const cMessage* dropped = queue->dropLast();
+            const cPacket* dropped = queue->dropLast();
             qMonPolicy->onMessageDrop(queue, dropped);
             delete dropped;
             return;
@@ -106,7 +125,14 @@
     }
 
     // finally, invoke the scheduling policy
-    schedPolicy->processQueues(rmtAllocator->getQueueToPortMapping(queue), queue->getType());
+    if (!((queue->getType() == RMTQueue::INPUT) && (port->hasBlockedInput())))
+    {
+        schedPolicy->processQueues(port, queue->getType());
+    }
+    else
+    {
+        port->addWaitingOnInput();
+    }
 }
 
 /**
@@ -124,8 +150,12 @@
     // (the output direction depends on port readiness, so it's done elsewhere)
     if (queue->getType() == RMTQueue::INPUT)
     {
-        schedPolicy->finalizeService(rmtAllocator->getQueueToPortMapping(queue),
-                                     queue->getType());
+        // input from this port could be blocked due to a congested output port
+        RMTPort* inputPort = rmtAllocator->getQueueToPortMapping(queue);
+        if (!inputPort->hasBlockedInput())
+        {
+            schedPolicy->finalizeService(inputPort, queue->getType());
+        }
     }
 }
 
@@ -255,7 +285,7 @@
     cGate* outGate = NULL;
     if (outQueue != NULL)
     {
-        outGate = outQueue->getRmtAccessGate();
+        outGate = outQueue->getRMTAccessGate();
     }
 
     if (outGate != NULL)
@@ -289,7 +319,7 @@
         EV << this->getFullPath() << ": EFCPI " << cepId
            << " isn't present on this system! Notifying other modules." << endl;
         emit(sigRMTNoConnID, pdu);
-        //delete pdu;
+        invalidPDUs.push_back(pdu);
     }
 
 }
@@ -319,7 +349,7 @@
  *
  * @param cdap CDAP message to be passed
  */
-void RMT::RIBToPort(CDAPMessage* cdap)
+void RMT::ribToPort(CDAPMessage* cdap)
 {
     cGate* outGate = NULL;
     RMTQueue* outQueue = NULL;
@@ -331,7 +361,7 @@
 
     if (outQueue != NULL)
     {
-        outGate = outQueue->getRmtAccessGate();
+        outGate = outQueue->getRMTAccessGate();
     }
 
     if (outGate != NULL)
@@ -396,7 +426,7 @@
 
     if (outQueue != NULL)
     {
-        outGate = outQueue->getRmtAccessGate();
+        outGate = outQueue->getRMTAccessGate();
     }
 
     if (outGate != NULL)
@@ -426,9 +456,10 @@
     { // PDU arrival
         PDU* pdu = (PDU*) msg;
 
+        // TODO: replace with something less dumb
         if (gate.substr(0, 1) == "p")
         { // from a port
-            if (pdu->getDstAddr() == thisIpcAddr)
+            if (addrComparator->matchesThisIPC(pdu->getDstAddr()))
             {
                 portToEfcpi(pdu);
             }
@@ -443,7 +474,7 @@
         }
         else if (gate.substr(0, 7) == GATE_EFCPIO_)
         { // from an EFCPI
-            if (pdu->getDstAddr() == thisIpcAddr)
+            if (addrComparator->matchesThisIPC(pdu->getDstAddr()))
             {
                 efcpiToEfcpi(pdu);
             }
@@ -459,7 +490,7 @@
 
         if (gate.substr(0, 1) == "p")
         { // from a port
-            if (cdap->getDstAddr() == thisIpcAddr)
+            if (addrComparator->matchesThisIPC(cdap->getDstAddr()))
             {
                 portToRIB(cdap);
             }
@@ -470,13 +501,13 @@
         }
         else
         { // from the RIBd
-            RIBToPort(cdap);
+            ribToPort(cdap);
         }
     }
     else
     {
         EV << this->getFullPath() << " message type not supported" << endl;
-        delete msg;
+        invalidPDUs.push_back(msg);
     }
 }
 
@@ -485,7 +516,7 @@
     if (msg->isSelfMessage())
     {
         // ?
-        delete msg;
+        invalidPDUs.push_back(msg);
     }
     else
     {