--- a/src/DIF/RMT/RMT.cc
+++ b/src/DIF/RMT/RMT.cc
@@ -1,5 +1,5 @@
//
-// Copyright © 2014 PRISTINE Consortium (http://ict-pristine.eu)
+// Copyright © 2014 - 2015 PRISTINE Consortium (http://ict-pristine.eu)
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
@@ -26,18 +26,21 @@
Define_Module(RMT);
-RMT::RMT()
-: relayOn(false), onWire(false)
-{
-}
-
RMT::~RMT()
{
+ while (!invalidPDUs.empty())
+ {
+ delete invalidPDUs.front();
+ invalidPDUs.pop_front();
+ }
}
void RMT::initialize()
{
+ relayOn = false;
+ onWire = false;
+
// get pointers to other components
fwTable = check_and_cast<PDUForwardingTable*>
(getModuleByPath("^.^.resourceAllocator.pduForwardingTable"));
@@ -54,11 +57,8 @@
(getModuleByPath("^.^.resourceAllocator.queueAllocPolicy"));
queueIdGenerator = check_and_cast<QueueIDGenBase*>
(getModuleByPath("^.^.resourceAllocator.queueIdGenerator"));
-
- // set up some parameters
- cModule* ipcModule = getParentModule()->getParentModule();
- thisIpcAddr = Address(ipcModule->par(PAR_IPCADDR).stringValue(),
- ipcModule->par(PAR_DIFNAME).stringValue());
+ addrComparator = check_and_cast<AddressComparatorBase*>
+ (getModuleByPath("^.^.resourceAllocator.addressComparator"));
// register a signal for notifying others about a missing local EFCP instance
sigRMTNoConnID = registerSignal(SIG_RMT_NoConnId);
@@ -74,8 +74,26 @@
// listen for a signal indicating that a port is ready to serve
lisRMTPortReady = new LisRMTPortReady(this);
getParentModule()->subscribe(SIG_RMT_PortReadyToServe, lisRMTPortReady);
-}
-
+
+ WATCH(relayOn);
+ WATCH(onWire);
+}
+
+
+void RMT::finish()
+{
+ size_t pduCount = invalidPDUs.size();
+ if (pduCount)
+ {
+ EV << "This RMT still contains " << pduCount << " unprocessed PDUs!" << endl;
+
+ for (std::deque<cMessage*>::iterator it = invalidPDUs.begin(); it != invalidPDUs.end(); ++it)
+ {
+ cMessage* m = *it;
+ EV << m->getClassName() << " received at " << m->getArrivalTime() << endl;
+ }
+ }
+}
/**
* Invokes RMT policies related to queue processing. To be called when a message
@@ -88,6 +106,7 @@
Enter_Method("invokeQueueArrivalPolicies()");
RMTQueue* queue = check_and_cast<RMTQueue*>(obj);
+ RMTPort* port = rmtAllocator->getQueueToPortMapping(queue);
// invoke monitor policy
qMonPolicy->onMessageArrival(queue);
@@ -98,7 +117,7 @@
// if the PDU has to be dropped, finish it here
if (maxQPolicy->run(queue))
{
- const cMessage* dropped = queue->dropLast();
+ const cPacket* dropped = queue->dropLast();
qMonPolicy->onMessageDrop(queue, dropped);
delete dropped;
return;
@@ -106,7 +125,14 @@
}
// finally, invoke the scheduling policy
- schedPolicy->processQueues(rmtAllocator->getQueueToPortMapping(queue), queue->getType());
+ if (!((queue->getType() == RMTQueue::INPUT) && (port->hasBlockedInput())))
+ {
+ schedPolicy->processQueues(port, queue->getType());
+ }
+ else
+ {
+ port->addWaitingOnInput();
+ }
}
/**
@@ -124,8 +150,12 @@
// (the output direction depends on port readiness, so it's done elsewhere)
if (queue->getType() == RMTQueue::INPUT)
{
- schedPolicy->finalizeService(rmtAllocator->getQueueToPortMapping(queue),
- queue->getType());
+ // input from this port could be blocked due to a congested output port
+ RMTPort* inputPort = rmtAllocator->getQueueToPortMapping(queue);
+ if (!inputPort->hasBlockedInput())
+ {
+ schedPolicy->finalizeService(inputPort, queue->getType());
+ }
}
}
@@ -255,7 +285,7 @@
cGate* outGate = NULL;
if (outQueue != NULL)
{
- outGate = outQueue->getRmtAccessGate();
+ outGate = outQueue->getRMTAccessGate();
}
if (outGate != NULL)
@@ -289,7 +319,7 @@
EV << this->getFullPath() << ": EFCPI " << cepId
<< " isn't present on this system! Notifying other modules." << endl;
emit(sigRMTNoConnID, pdu);
- //delete pdu;
+ invalidPDUs.push_back(pdu);
}
}
@@ -319,7 +349,7 @@
*
* @param cdap CDAP message to be passed
*/
-void RMT::RIBToPort(CDAPMessage* cdap)
+void RMT::ribToPort(CDAPMessage* cdap)
{
cGate* outGate = NULL;
RMTQueue* outQueue = NULL;
@@ -331,7 +361,7 @@
if (outQueue != NULL)
{
- outGate = outQueue->getRmtAccessGate();
+ outGate = outQueue->getRMTAccessGate();
}
if (outGate != NULL)
@@ -396,7 +426,7 @@
if (outQueue != NULL)
{
- outGate = outQueue->getRmtAccessGate();
+ outGate = outQueue->getRMTAccessGate();
}
if (outGate != NULL)
@@ -426,9 +456,10 @@
{ // PDU arrival
PDU* pdu = (PDU*) msg;
+ // TODO: replace with something less dumb
if (gate.substr(0, 1) == "p")
{ // from a port
- if (pdu->getDstAddr() == thisIpcAddr)
+ if (addrComparator->matchesThisIPC(pdu->getDstAddr()))
{
portToEfcpi(pdu);
}
@@ -443,7 +474,7 @@
}
else if (gate.substr(0, 7) == GATE_EFCPIO_)
{ // from an EFCPI
- if (pdu->getDstAddr() == thisIpcAddr)
+ if (addrComparator->matchesThisIPC(pdu->getDstAddr()))
{
efcpiToEfcpi(pdu);
}
@@ -459,7 +490,7 @@
if (gate.substr(0, 1) == "p")
{ // from a port
- if (cdap->getDstAddr() == thisIpcAddr)
+ if (addrComparator->matchesThisIPC(cdap->getDstAddr()))
{
portToRIB(cdap);
}
@@ -470,13 +501,13 @@
}
else
{ // from the RIBd
- RIBToPort(cdap);
+ ribToPort(cdap);
}
}
else
{
EV << this->getFullPath() << " message type not supported" << endl;
- delete msg;
+ invalidPDUs.push_back(msg);
}
}
@@ -485,7 +516,7 @@
if (msg->isSelfMessage())
{
// ?
- delete msg;
+ invalidPDUs.push_back(msg);
}
else
{