Switch to side-by-side view

--- a/src/DIF/RMT/RMTPort.cc
+++ b/src/DIF/RMT/RMTPort.cc
@@ -1,5 +1,5 @@
 //
-// Copyright © 2014 PRISTINE Consortium (http://ict-pristine.eu)
+// Copyright © 2014 - 2015 PRISTINE Consortium (http://ict-pristine.eu)
 // 
 // This program is free software: you can redistribute it and/or modify
 // it under the terms of the GNU Lesser General Public License as published by
@@ -17,20 +17,38 @@
 
 #include "RMTPort.h"
 
+const char* SIG_STAT_RMTPORT_UP = "RMTPort_PassUp";
+const char* SIG_STAT_RMTPORT_DOWN = "RMTPort_PassDown";
+
 Define_Module(RMTPort);
 
 void RMTPort::initialize()
 {
     ready = false; // port should get activated by RA
+    blockedOutput = false;
+    blockedInput = false;
+
+    waitingOnInput = 0;
+    waitingOnOutput = 0;
+
     southInputGate = gateHalf(GATE_SOUTHIO, cGate::INPUT);
     southOutputGate = gateHalf(GATE_SOUTHIO, cGate::OUTPUT);
+    postServeDelay = par("postServeDelay").doubleValue() / 1000;
 
     queueIdGen = check_and_cast<QueueIDGenBase*>
-            (getModuleByPath("^.^.resourceAllocator.queueIdGenerator"));
-
-    sigRMTPortPDURcvd = registerSignal(SIG_RMT_PortPDURcvd);
-    sigRMTPortPDUSent = registerSignal(SIG_RMT_PortPDUSent);
+            (getModuleByPath("^.^.^.resourceAllocator.queueIdGenerator"));
+
+    sigStatRMTPortUp = registerSignal(SIG_STAT_RMTPORT_UP);
+    sigStatRMTPortDown = registerSignal(SIG_STAT_RMTPORT_DOWN);
     sigRMTPortReady = registerSignal(SIG_RMT_PortReadyToServe);
+
+    WATCH(ready);
+    WATCH(waitingOnInput);
+    WATCH(waitingOnOutput);
+    WATCH(blockedInput);
+    WATCH(blockedOutput);
+    WATCH(postServeDelay);
+    WATCH_PTR(flow);
 }
 
 void RMTPort::postInitialize()
@@ -41,11 +59,18 @@
 
 void RMTPort::handleMessage(cMessage* msg)
 {
-    if (msg->isSelfMessage() && !opp_strcmp(msg->getFullName(), "portTransmitEnd"))
-    { // a PDU transmit procedure has just been finished
-        setReady();
-        //emit(sigRMTPortPDUSent, this);
-        emit(sigRMTPortReady, this);
+    if (msg->isSelfMessage())
+    {
+        if (!opp_strcmp(msg->getFullName(), "portTransmitEnd"))
+        { // a PDU transmit procedure has just been finished
+            emit(sigStatRMTPortDown, true);
+            setReadyDelayed();
+        }
+        else if (!opp_strcmp(msg->getFullName(), "readyToServe"))
+        {
+            setReady();
+            emit(sigRMTPortReady, this);
+        }
         delete msg;
     }
     else if (msg->getArrivalGate() == southInputGate) // incoming message
@@ -53,6 +78,7 @@
         if (dynamic_cast<CDAPMessage*>(msg) != NULL)
         { // this will go away when we figure out management flow pre-allocation
             send(msg, getManagementQueue(RMTQueue::INPUT)->getInputGate()->getPreviousGate());
+            emit(sigStatRMTPortUp, true);
         }
         else if (dynamic_cast<PDU*>(msg) != NULL)
         {
@@ -63,6 +89,7 @@
             if (inQueue != NULL)
             {
                 send(msg, inQueue->getInputGate()->getPreviousGate());
+                emit(sigStatRMTPortUp, true);
             }
             else
             {
@@ -73,8 +100,6 @@
         {
             EV << "this type of message isn't supported!" << endl;
         }
-
-        emit(sigRMTPortPDURcvd, this);
     }
     else if (northInputGates.count(msg->getArrivalGate())) // outgoing message
     {
@@ -88,22 +113,20 @@
             if (outputChannel != NULL)
             { // we're using a channel, likely with some sort of data rate/delay
                 simtime_t transmitEnd = outputChannel->getTransmissionFinishTime();
-//                EV << "!!!!!!!!! transmit start: " << simTime()
-//                   << "; transmit end: " << transmitEnd << endl;
                 if (transmitEnd > simTime())
                 { // transmit requires some simulation time
                     scheduleAt(transmitEnd, new cMessage("portTransmitEnd"));
                 }
                 else
                 {
-                    setReady();
-                    emit(sigRMTPortPDUSent, this);
+                    setReadyDelayed();
+                    emit(sigStatRMTPortDown, true);
                 }
             }
             else
             { // there isn't any delay or rate control in place
-                setReady();
-                //emit(sigRMTPortPDUSent, this);
+                setReadyDelayed();
+                emit(sigStatRMTPortDown, true);
                 emit(sigRMTPortReady, this);
             }
         }
@@ -120,22 +143,34 @@
     return inputQueues;
 }
 
-void RMTPort::addInputQueue(RMTQueue* queue, cGate* portGate)
-{
-    northOutputGates.insert(portGate);
+const RMTQueues& RMTPort::getOutputQueues() const
+{
+    return outputQueues;
+}
+
+void RMTPort::registerInputQueue(RMTQueue* queue)
+{
     inputQueues.push_back(queue);
 }
 
 
-const RMTQueues& RMTPort::getOutputQueues() const
-{
-    return outputQueues;
-}
-
-void RMTPort::addOutputQueue(RMTQueue* queue, cGate* portGate)
-{
-    northInputGates.insert(portGate);
+void RMTPort::registerOutputQueue(RMTQueue* queue)
+{
+    northInputGates.insert(queue->getOutputGate()->getNextGate());
     outputQueues.push_back(queue);
+}
+
+void RMTPort::unregisterInputQueue(RMTQueue* queue)
+{
+    inputQueues.erase(std::remove(inputQueues.begin(), inputQueues.end(), queue),
+                      inputQueues.end());
+}
+
+void RMTPort::unregisterOutputQueue(RMTQueue* queue)
+{
+    northInputGates.erase(queue->getOutputGate()->getNextGate());
+    outputQueues.erase(std::remove(outputQueues.begin(), outputQueues.end(), queue),
+                       outputQueues.end());
 }
 
 cGate* RMTPort::getSouthInputGate() const
@@ -186,7 +221,7 @@
     const RMTQueues& queueVect = (type == RMTQueue::INPUT ? inputQueues : outputQueues);
 
     std::ostringstream fullId;
-    fullId << getFullName() << (type == RMTQueue::INPUT ? 'i' : 'o') << queueId;
+    fullId << (type == RMTQueue::INPUT ? "inQ_" : "outQ_") << queueId;
 
     for(RMTQueuesConstIter it = queueVect.begin(); it != queueVect.end(); ++it)
     {
@@ -206,9 +241,17 @@
 
 void RMTPort::setReady()
 {
-    ready = true;
-    emit(sigRMTPortReady, this);
-    redrawGUI();
+    if (blockedOutput == false)
+    {
+        ready = true;
+        emit(sigRMTPortReady, this);
+        redrawGUI();
+    }
+}
+
+void RMTPort::setReadyDelayed()
+{
+    scheduleAt(simTime() + postServeDelay, new cMessage("readyToServe"));
 }
 
 void RMTPort::setBusy()
@@ -217,14 +260,31 @@
     redrawGUI();
 }
 
-void RMTPort::redrawGUI()
-{
-    if (!ev.isGUI())
-    {
-        return;
-    }
-
-    getDisplayString().setTagArg("i2", 0, (isReady() ? "status/green" : "status/noentry"));
+void RMTPort::redrawGUI(bool redrawParent)
+{
+    if (ev.isGUI())
+    {
+        getDisplayString().setTagArg("i2", 0, (isReady() ? "status/green" : "status/noentry"));
+
+        if (redrawParent)
+        {
+            std::ostringstream ostr;
+            ostr << "dst app: " << dstAppAddr << endl;
+            if (blockedInput)
+            {
+                ostr << "input blocked" << endl;
+            }
+            if (blockedOutput)
+            {
+                ostr << "output blocked" << endl;
+            }
+
+            cDisplayString& dStr = getParentModule()->getDisplayString();
+
+            dStr.setTagArg("t", 0, ostr.str().c_str());
+            dStr.setTagArg("t", 1, "t");
+        }
+    }
 }
 
 const Flow* RMTPort::getFlow() const
@@ -237,17 +297,51 @@
     this->flow = flow;
 
     // display address of the remote IPC on top of the module
-    if (flow != NULL)
-    {
-        // shitty temporary hack to strip the layer name off
-        const std::string& dstAppFull = flow->getDstApni().getApn().getName();
-        const std::string& dstAppAddr = dstAppFull.substr(0, dstAppFull.find("_"));
-        getDisplayString().setTagArg("t", 0, dstAppAddr.c_str());
-
-    }
-    else
-    {
-        getDisplayString().setTagArg("t", 0, "PHY");
-    }
-
-}
+    if (ev.isGUI())
+    {
+        if (flow != NULL)
+        {
+            // shitty temporary hack to strip the layer name off
+            const std::string& dstAppFull = flow->getDstApni().getApn().getName();
+            dstAppAddr = dstAppFull.substr(0, dstAppFull.find("_"));
+        }
+        else
+        {
+            dstAppAddr = "N/A (PHY)";
+        }
+        redrawGUI(true);
+    }
+}
+
+void RMTPort::blockOutput()
+{
+    EV << getFullPath() << ": blocking the port output." << endl;
+    blockedOutput = true;
+    if (ready)
+    {
+        setBusy();
+    }
+    redrawGUI(true);
+}
+
+void RMTPort::unblockOutput()
+{
+    EV << getFullPath() << ": unblocking the port output." << endl;
+    blockedOutput = false;
+    setReady();
+    redrawGUI(true);
+}
+
+void RMTPort::blockInput()
+{
+    EV << getFullPath() << ": blocking the port input." << endl;
+    blockedInput = true;
+    redrawGUI(true);
+}
+
+void RMTPort::unblockInput()
+{
+    EV << getFullPath() << ": unblocking the port input." << endl;
+    blockedInput = false;
+    redrawGUI(true);
+}