--- a/src/DIF/RMT/RMT.cc
+++ b/src/DIF/RMT/RMT.cc
@@ -24,6 +24,12 @@
#include <RMT.h>
+// shared access to trace logger
+#ifndef RMT_TRACING
+#define RMT_TRACING
+std::ofstream rmtTraceFile;
+#endif
+
Define_Module(RMT);
RMT::~RMT()
@@ -38,14 +44,30 @@
void RMT::initialize()
{
+ // set up features
relayOn = false;
onWire = false;
+ tracing = getParentModule()->par("pduTracing").boolValue();
+ if (tracing && !rmtTraceFile.is_open())
+ {
+ std::ostringstream filename;
+ filename << "results/" << ev.getConfigEx()->getActiveConfigName() << "-"
+ << ev.getConfigEx()->getActiveRunNumber() << ".tr";
+ rmtTraceFile.open(filename.str().c_str());
+
+ if (!rmtTraceFile.is_open())
+ {
+ EV << "Couldn't create a trace file!" << endl;
+ tracing = false;
+ }
+ }
+
// get pointers to other components
- fwTable = check_and_cast<PDUForwardingTable*>
- (getModuleByPath("^.^.resourceAllocator.pduForwardingTable"));
+ fwd = check_and_cast<IntPDUForwarding*>
+ (getModuleByPath("^.pduForwardingPolicy"));
rmtAllocator = check_and_cast<RMTModuleAllocator*>
- (getModuleByPath("^.rmtModuleAllocator"));
+ (getModuleByPath("^.allocator"));
schedPolicy = check_and_cast<RMTSchedulingBase*>
(getModuleByPath("^.schedulingPolicy"));
@@ -63,51 +85,130 @@
// register a signal for notifying others about a missing local EFCP instance
sigRMTNoConnID = registerSignal(SIG_RMT_NoConnId);
+ // register a signal for notifying others about a packet bit error
+ sigRMTPacketError = registerSignal(SIG_RMT_ErrornousPacket);
+
// listen for a signal indicating that a new message has arrived into a queue
lisRMTQueuePDURcvd = new LisRMTQueuePDURcvd(this);
getParentModule()->subscribe(SIG_RMT_QueuePDURcvd, lisRMTQueuePDURcvd);
+ // listen for a signal indicating that a message is leaving a queue
+ lisRMTQueuePDUPreSend = new LisRMTQueuePDUPreSend(this);
+ getParentModule()->subscribe(SIG_RMT_QueuePDUPreSend, lisRMTQueuePDUPreSend);
+
// listen for a signal indicating that a message has left a queue
lisRMTQueuePDUSent = new LisRMTQueuePDUSent(this);
getParentModule()->subscribe(SIG_RMT_QueuePDUSent, lisRMTQueuePDUSent);
// listen for a signal indicating that a port is ready to serve
- lisRMTPortReady = new LisRMTPortReady(this);
- getParentModule()->subscribe(SIG_RMT_PortReadyToServe, lisRMTPortReady);
+ lisRMTPortReadyToServe = new LisRMTPortReadyToServe(this);
+ getParentModule()->subscribe(SIG_RMT_PortReadyToServe, lisRMTPortReadyToServe);
+
+ // listen for a signal indicating that a port is ready to be read from
+ lisRMTPortReadyForRead = new LisRMTPortReadyForRead(this);
+ getParentModule()->subscribe(SIG_RMT_PortReadyForRead, lisRMTPortReadyForRead);
WATCH(relayOn);
WATCH(onWire);
}
-
+/**
+ * Notify the user of PDUs left in this RMT on simulation end.
+ */
void RMT::finish()
{
size_t pduCount = invalidPDUs.size();
if (pduCount)
{
- EV << "This RMT still contains " << pduCount << " unprocessed PDUs!" << endl;
+ EV << "RMT " << this->getFullPath() << " still contains " << pduCount
+ << " unprocessed PDUs!" << endl;
for (std::deque<cMessage*>::iterator it = invalidPDUs.begin(); it != invalidPDUs.end(); ++it)
{
cMessage* m = *it;
- EV << m->getClassName() << " received at " << m->getArrivalTime() << endl;
- }
- }
-}
-
-/**
- * Invokes RMT policies related to queue processing. To be called when a message
- * arrives into a queue.
+ EV << m->getClassName() << " received at " << m->getArrivalTime()
+ << " from " << m->getSenderModule()->getFullPath() << endl;
+ }
+ }
+
+ if (rmtTraceFile.is_open())
+ {
+ rmtTraceFile.flush();
+ rmtTraceFile.close();
+ }
+}
+
+/**
+ * Append a line into the trace file.
+ *
+ * @param pkt packet
+ * @param eventType event (receive/send/enqueue/dequeue/drop)
+ */
+void RMT::tracePDUEvent(const cPacket* pkt, TraceEventType eventType)
+{
+ const PDU* pdu = dynamic_cast<const PDU*>(pkt);
+ if (pdu == NULL)
+ {
+ return;
+ }
+
+ std::ostringstream flowID;
+ flowID << pdu->getConnId().getSrcCepId() << pdu->getConnId().getDstCepId()
+ << pdu->getConnId().getQoSId();
+
+ std::string flags = std::bitset<8>(pdu->getFlags()).to_string().c_str();
+
+ rmtTraceFile << char(eventType) << " "
+ << simTime() << " "
+ << getModuleByPath("^.^.^.")->getFullName() << " "
+ << getModuleByPath("^.^.")->getFullName() << " "
+ << pdu->getClassName() << " "
+ << pdu->getBitLength() << " "
+ << flags << " "
+ << flowID.str().c_str() << " "
+ << pdu->getDstAddr().getDifName() << " "
+ << pdu->getSrcAddr().getIpcAddress() << " "
+ << pdu->getDstAddr().getIpcAddress() << " "
+ << pdu->getSeqNum() << " "
+ << pdu->getId()
+ << endl;
+}
+
+/**
+ * Procedures executed when a PDU arrives into a queue.
*
* @param obj RMT queue object
*/
-void RMT::invokeQueueArrivalPolicies(cObject* obj)
-{
- Enter_Method("invokeQueueArrivalPolicies()");
+void RMT::onQueueArrival(cObject* obj)
+{
+ Enter_Method("onQueueArrival()");
RMTQueue* queue = check_and_cast<RMTQueue*>(obj);
RMTPort* port = rmtAllocator->getQueueToPortMapping(queue);
+ if (tracing)
+ {
+ if (queue->getType() == RMTQueue::INPUT)
+ {
+ tracePDUEvent(queue->getLastPDU(), MSG_RECEIVE);
+ }
+ tracePDUEvent(queue->getLastPDU(), MSG_ENQUEUE);
+ }
+
+ // detection of channel-induced bit error
+ if (queue->getLastPDU()->hasBitError())
+ {
+ EV << "PDU arriving on " << port->getParentModule()->getFullName()
+ << " contains one or more bit errors! Dropping." << endl;
+ if (tracing)
+ {
+ tracePDUEvent(queue->getLastPDU(), MSG_DROP);
+ }
+ emit(sigRMTPacketError, obj);
+ queue->dropLast();
+ return;
+ }
+
// invoke monitor policy
qMonPolicy->onMessageArrival(queue);
@@ -117,6 +218,10 @@
// if the PDU has to be dropped, finish it here
if (maxQPolicy->run(queue))
{
+ if (tracing)
+ {
+ tracePDUEvent(queue->getLastPDU(), MSG_DROP);
+ }
const cPacket* dropped = queue->dropLast();
qMonPolicy->onMessageDrop(queue, dropped);
delete dropped;
@@ -124,51 +229,83 @@
}
}
- // finally, invoke the scheduling policy
- if (!((queue->getType() == RMTQueue::INPUT) && (port->hasBlockedInput())))
- {
- schedPolicy->processQueues(port, queue->getType());
- }
- else
- {
- port->addWaitingOnInput();
- }
-}
-
-/**
- * Takes care of re-invocation of the scheduling policy after a queue is popped.
+ port->addWaiting(queue->getType());
+ schedPolicy->processQueues(port, queue->getType());
+}
+
+/**
+ * Procedures executed right before a PDU leaves its queue.
*
* @param obj RMT queue object
*/
-void RMT::invokeQueueDeparturePolicies(cObject* obj)
-{
- Enter_Method("invokeQueueDeparturePolicies()");
+void RMT::preQueueDeparture(cObject* obj)
+{
+ Enter_Method("preQueueDeparture()");
+ RMTQueue* queue = check_and_cast<RMTQueue*>(obj);
+
+ if (tracing)
+ {
+ tracePDUEvent(queue->getFirstPDU(), MSG_DEQUEUE);
+ if (queue->getType() == RMTQueue::OUTPUT)
+ {
+ tracePDUEvent(queue->getFirstPDU(), MSG_SEND);
+ }
+ }
+}
+
+/**
+ * Procedures executed after a PDU leaves its queue.
+ *
+ * @param obj RMT queue object
+ */
+void RMT::postQueueDeparture(cObject* obj)
+{
+ Enter_Method("postQueueDeparture()");
RMTQueue* queue = check_and_cast<RMTQueue*>(obj);
qMonPolicy->onMessageDeparture(queue);
+ RMTPort* port = rmtAllocator->getQueueToPortMapping(queue);
+ port->substractWaiting(queue->getType());
+
+ // notify MaxQ in case the queue length just went back under its threshold
+ if (queue->getLength() == (queue->getThreshLength() - 1))
+ {
+ maxQPolicy->onQueueLengthDrop(queue);
+ }
+
// if this is an incoming PDU, take care of scheduler reinvocation
- // (the output direction depends on port readiness, so it's done elsewhere)
if (queue->getType() == RMTQueue::INPUT)
{
- // input from this port could be blocked due to a congested output port
- RMTPort* inputPort = rmtAllocator->getQueueToPortMapping(queue);
- if (!inputPort->hasBlockedInput())
- {
- schedPolicy->finalizeService(inputPort, queue->getType());
- }
- }
-}
-
-/**
- * Takes care of re-invocation of the scheduling policy after a port becomes ready.
+ port->scheduleNextRead();
+ }
+ else
+ { // if this is an outgoing PDU, set the port as busy
+ port->setOutputBusy();
+ }
+}
+
+/**
+ * Invokes the scheduling policy for output queues.
*
* @param obj RMT queue object
*/
-void RMT::invokePortReadyPolicies(cObject* obj)
-{
- Enter_Method("invokePortReadyPolicies()");
+void RMT::writeToPort(cObject* obj)
+{
+ Enter_Method_Silent("writeToPort()");
RMTPort* port = check_and_cast<RMTPort*>(obj);
- schedPolicy->finalizeService(port, RMTQueue::OUTPUT);
+ schedPolicy->processQueues(port, RMTQueue::OUTPUT);
+}
+
+/**
+ * Invokes the scheduling policy for input queues.
+ *
+ * @param obj RMT queue object
+ */
+void RMT::readFromPort(cObject* obj)
+{
+ Enter_Method_Silent("readFromPort()");
+ RMTPort* port = check_and_cast<RMTPort*>(obj);
+ schedPolicy->processQueues(port, RMTQueue::INPUT);
}
/**
@@ -240,30 +377,52 @@
*
* @param destAddr destination address
* @param qosId qos-id
- * @param useQoS (to be removed) indicator of whether the QoS info should be used
* @return output port
*/
-RMTPort* RMT::fwTableLookup(Address& destAddr, short qosId, bool useQoS)
-{
- RMTPort* outPort = NULL;
-
+RMTPort* RMT::fwTableLookup(const Address& destAddr, const unsigned short &qosId)
+{
if (onWire)
{ // get the interface port
- outPort = rmtAllocator->getInterfacePort();
- }
- else
- { // get a suitable port from PDUFT
- if (useQoS)
- {
- outPort = fwTable->lookup(destAddr, qosId);
+ return rmtAllocator->getInterfacePort();
+ }
+ else
+ { // get a suitable ports from PDUF
+ std::vector<RMTPort*> ports = fwd->lookup(destAddr, qosId);
+ if (ports.size() > 0)
+ {
+ return ports.front();
}
else
{
- outPort = fwTable->lookup(destAddr);
- }
- }
-
- return outPort;
+ return NULL;
+ }
+ }
+}
+
+/**
+ * A wrapper for forwarding table lookup.
+ *
+ * @param pdu PDU to forward
+ * @return output port
+ */
+RMTPort* RMT::fwTableLookup(const PDU * pdu)
+{
+ if (onWire)
+ { // get the interface port
+ return rmtAllocator->getInterfacePort();
+ }
+ else
+ { // get output ports from PDUFT
+ std::vector<RMTPort*> ports = fwd->lookup(pdu);
+ if(ports.size()>0)
+ {
+ return ports.front();
+ }
+ else
+ {
+ return NULL;
+ }
+ }
}
/**
@@ -353,7 +512,7 @@
{
cGate* outGate = NULL;
RMTQueue* outQueue = NULL;
- RMTPort* outPort = fwTableLookup(cdap->getDstAddr(), 0, false);
+ RMTPort* outPort = fwTableLookup(cdap->getDstAddr(), 0);
if (outPort != NULL)
{
outQueue = outPort->getManagementQueue(RMTQueue::OUTPUT);
@@ -377,6 +536,16 @@
}
/**
+ * Bounces a CDAP mesage back to local RIB.
+ *
+ * @param cdap CDAP message to be passed
+ */
+void RMT::ribToRIB(CDAPMessage* cdap)
+{
+ send(cdap, "ribdIo$o");
+}
+
+/**
* Relays incoming message to an output queue based on data from PDUFwTable.
*
* @param msg either a PDU or a CDAP message to be relayed
@@ -387,13 +556,10 @@
RMTPort* outPort = NULL;
RMTQueue* outQueue = NULL;
-
- if (dynamic_cast<PDU*>(msg) != NULL)
- {
- destAddr = ((PDU*)msg)->getDstAddr();
- short qosId = ((PDU*)msg)->getConnId().getQoSId();
-
- outPort = fwTableLookup(destAddr, qosId);
+ PDU* pdu = dynamic_cast<PDU*>(msg);
+ if (pdu != NULL)
+ {
+ outPort = fwTableLookup(pdu);
if (outPort == NULL)
{
EV << getFullPath()
@@ -407,7 +573,7 @@
{
destAddr = ((CDAPMessage*)msg)->getDstAddr();
- outPort = fwTableLookup(destAddr, 0, false);
+ outPort = fwTableLookup(destAddr, 0);
if (outPort == NULL)
{
EV << getFullPath()
@@ -501,7 +667,14 @@
}
else
{ // from the RIBd
- ribToPort(cdap);
+ if (addrComparator->matchesThisIPC(cdap->getDstAddr()))
+ {
+ ribToRIB(cdap);
+ }
+ else
+ {
+ ribToPort(cdap);
+ }
}
}
else