--- a/src/DIF/RMT/RMTPort.cc
+++ b/src/DIF/RMT/RMTPort.cc
@@ -1,5 +1,5 @@
//
-// Copyright © 2014 PRISTINE Consortium (http://ict-pristine.eu)
+// Copyright © 2014 - 2015 PRISTINE Consortium (http://ict-pristine.eu)
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
@@ -17,20 +17,38 @@
#include "RMTPort.h"
+const char* SIG_STAT_RMTPORT_UP = "RMTPort_PassUp";
+const char* SIG_STAT_RMTPORT_DOWN = "RMTPort_PassDown";
+
Define_Module(RMTPort);
void RMTPort::initialize()
{
ready = false; // port should get activated by RA
+ blockedOutput = false;
+ blockedInput = false;
+
+ waitingOnInput = 0;
+ waitingOnOutput = 0;
+
southInputGate = gateHalf(GATE_SOUTHIO, cGate::INPUT);
southOutputGate = gateHalf(GATE_SOUTHIO, cGate::OUTPUT);
+ postServeDelay = par("postServeDelay").doubleValue() / 1000;
queueIdGen = check_and_cast<QueueIDGenBase*>
- (getModuleByPath("^.^.resourceAllocator.queueIdGenerator"));
-
- sigRMTPortPDURcvd = registerSignal(SIG_RMT_PortPDURcvd);
- sigRMTPortPDUSent = registerSignal(SIG_RMT_PortPDUSent);
+ (getModuleByPath("^.^.^.resourceAllocator.queueIdGenerator"));
+
+ sigStatRMTPortUp = registerSignal(SIG_STAT_RMTPORT_UP);
+ sigStatRMTPortDown = registerSignal(SIG_STAT_RMTPORT_DOWN);
sigRMTPortReady = registerSignal(SIG_RMT_PortReadyToServe);
+
+ WATCH(ready);
+ WATCH(waitingOnInput);
+ WATCH(waitingOnOutput);
+ WATCH(blockedInput);
+ WATCH(blockedOutput);
+ WATCH(postServeDelay);
+ WATCH_PTR(flow);
}
void RMTPort::postInitialize()
@@ -41,11 +59,18 @@
void RMTPort::handleMessage(cMessage* msg)
{
- if (msg->isSelfMessage() && !opp_strcmp(msg->getFullName(), "portTransmitEnd"))
- { // a PDU transmit procedure has just been finished
- setReady();
- //emit(sigRMTPortPDUSent, this);
- emit(sigRMTPortReady, this);
+ if (msg->isSelfMessage())
+ {
+ if (!opp_strcmp(msg->getFullName(), "portTransmitEnd"))
+ { // a PDU transmit procedure has just been finished
+ emit(sigStatRMTPortDown, true);
+ setReadyDelayed();
+ }
+ else if (!opp_strcmp(msg->getFullName(), "readyToServe"))
+ {
+ setReady();
+ emit(sigRMTPortReady, this);
+ }
delete msg;
}
else if (msg->getArrivalGate() == southInputGate) // incoming message
@@ -53,6 +78,7 @@
if (dynamic_cast<CDAPMessage*>(msg) != NULL)
{ // this will go away when we figure out management flow pre-allocation
send(msg, getManagementQueue(RMTQueue::INPUT)->getInputGate()->getPreviousGate());
+ emit(sigStatRMTPortUp, true);
}
else if (dynamic_cast<PDU*>(msg) != NULL)
{
@@ -63,6 +89,7 @@
if (inQueue != NULL)
{
send(msg, inQueue->getInputGate()->getPreviousGate());
+ emit(sigStatRMTPortUp, true);
}
else
{
@@ -73,8 +100,6 @@
{
EV << "this type of message isn't supported!" << endl;
}
-
- emit(sigRMTPortPDURcvd, this);
}
else if (northInputGates.count(msg->getArrivalGate())) // outgoing message
{
@@ -88,22 +113,20 @@
if (outputChannel != NULL)
{ // we're using a channel, likely with some sort of data rate/delay
simtime_t transmitEnd = outputChannel->getTransmissionFinishTime();
-// EV << "!!!!!!!!! transmit start: " << simTime()
-// << "; transmit end: " << transmitEnd << endl;
if (transmitEnd > simTime())
{ // transmit requires some simulation time
scheduleAt(transmitEnd, new cMessage("portTransmitEnd"));
}
else
{
- setReady();
- emit(sigRMTPortPDUSent, this);
+ setReadyDelayed();
+ emit(sigStatRMTPortDown, true);
}
}
else
{ // there isn't any delay or rate control in place
- setReady();
- //emit(sigRMTPortPDUSent, this);
+ setReadyDelayed();
+ emit(sigStatRMTPortDown, true);
emit(sigRMTPortReady, this);
}
}
@@ -120,22 +143,34 @@
return inputQueues;
}
-void RMTPort::addInputQueue(RMTQueue* queue, cGate* portGate)
-{
- northOutputGates.insert(portGate);
+const RMTQueues& RMTPort::getOutputQueues() const
+{
+ return outputQueues;
+}
+
+void RMTPort::registerInputQueue(RMTQueue* queue)
+{
inputQueues.push_back(queue);
}
-const RMTQueues& RMTPort::getOutputQueues() const
-{
- return outputQueues;
-}
-
-void RMTPort::addOutputQueue(RMTQueue* queue, cGate* portGate)
-{
- northInputGates.insert(portGate);
+void RMTPort::registerOutputQueue(RMTQueue* queue)
+{
+ northInputGates.insert(queue->getOutputGate()->getNextGate());
outputQueues.push_back(queue);
+}
+
+void RMTPort::unregisterInputQueue(RMTQueue* queue)
+{
+ inputQueues.erase(std::remove(inputQueues.begin(), inputQueues.end(), queue),
+ inputQueues.end());
+}
+
+void RMTPort::unregisterOutputQueue(RMTQueue* queue)
+{
+ northInputGates.erase(queue->getOutputGate()->getNextGate());
+ outputQueues.erase(std::remove(outputQueues.begin(), outputQueues.end(), queue),
+ outputQueues.end());
}
cGate* RMTPort::getSouthInputGate() const
@@ -186,7 +221,7 @@
const RMTQueues& queueVect = (type == RMTQueue::INPUT ? inputQueues : outputQueues);
std::ostringstream fullId;
- fullId << getFullName() << (type == RMTQueue::INPUT ? 'i' : 'o') << queueId;
+ fullId << (type == RMTQueue::INPUT ? "inQ_" : "outQ_") << queueId;
for(RMTQueuesConstIter it = queueVect.begin(); it != queueVect.end(); ++it)
{
@@ -206,9 +241,17 @@
void RMTPort::setReady()
{
- ready = true;
- emit(sigRMTPortReady, this);
- redrawGUI();
+ if (blockedOutput == false)
+ {
+ ready = true;
+ emit(sigRMTPortReady, this);
+ redrawGUI();
+ }
+}
+
+void RMTPort::setReadyDelayed()
+{
+ scheduleAt(simTime() + postServeDelay, new cMessage("readyToServe"));
}
void RMTPort::setBusy()
@@ -217,14 +260,31 @@
redrawGUI();
}
-void RMTPort::redrawGUI()
-{
- if (!ev.isGUI())
- {
- return;
- }
-
- getDisplayString().setTagArg("i2", 0, (isReady() ? "status/green" : "status/noentry"));
+void RMTPort::redrawGUI(bool redrawParent)
+{
+ if (ev.isGUI())
+ {
+ getDisplayString().setTagArg("i2", 0, (isReady() ? "status/green" : "status/noentry"));
+
+ if (redrawParent)
+ {
+ std::ostringstream ostr;
+ ostr << "dst app: " << dstAppAddr << endl;
+ if (blockedInput)
+ {
+ ostr << "input blocked" << endl;
+ }
+ if (blockedOutput)
+ {
+ ostr << "output blocked" << endl;
+ }
+
+ cDisplayString& dStr = getParentModule()->getDisplayString();
+
+ dStr.setTagArg("t", 0, ostr.str().c_str());
+ dStr.setTagArg("t", 1, "t");
+ }
+ }
}
const Flow* RMTPort::getFlow() const
@@ -237,17 +297,51 @@
this->flow = flow;
// display address of the remote IPC on top of the module
- if (flow != NULL)
- {
- // shitty temporary hack to strip the layer name off
- const std::string& dstAppFull = flow->getDstApni().getApn().getName();
- const std::string& dstAppAddr = dstAppFull.substr(0, dstAppFull.find("_"));
- getDisplayString().setTagArg("t", 0, dstAppAddr.c_str());
-
- }
- else
- {
- getDisplayString().setTagArg("t", 0, "PHY");
- }
-
-}
+ if (ev.isGUI())
+ {
+ if (flow != NULL)
+ {
+ // shitty temporary hack to strip the layer name off
+ const std::string& dstAppFull = flow->getDstApni().getApn().getName();
+ dstAppAddr = dstAppFull.substr(0, dstAppFull.find("_"));
+ }
+ else
+ {
+ dstAppAddr = "N/A (PHY)";
+ }
+ redrawGUI(true);
+ }
+}
+
+void RMTPort::blockOutput()
+{
+ EV << getFullPath() << ": blocking the port output." << endl;
+ blockedOutput = true;
+ if (ready)
+ {
+ setBusy();
+ }
+ redrawGUI(true);
+}
+
+void RMTPort::unblockOutput()
+{
+ EV << getFullPath() << ": unblocking the port output." << endl;
+ blockedOutput = false;
+ setReady();
+ redrawGUI(true);
+}
+
+void RMTPort::blockInput()
+{
+ EV << getFullPath() << ": blocking the port input." << endl;
+ blockedInput = true;
+ redrawGUI(true);
+}
+
+void RMTPort::unblockInput()
+{
+ EV << getFullPath() << ": unblocking the port input." << endl;
+ blockedInput = false;
+ redrawGUI(true);
+}