--- a/src/DIF/RMT/RMT.cc
+++ b/src/DIF/RMT/RMT.cc
@@ -26,95 +26,99 @@
Define_Module(RMT);
-const int PROCESSING_DELAY = 0.0;
-
RMT::RMT()
{
relayOn = false;
-}
-
-RMT::~RMT()
-{
-}
-
-
-void RMT::initialize() {
+ onWire = false;
+ waitingMsgs = 0;
+}
+
+RMT::~RMT() {}
+
+
+void RMT::initialize()
+{
+ WATCH_PTRMAP(efcpiToQueue);
+
fwTable = ModuleAccess<PDUForwardingTable>("pduForwardingTable").get();
- //ports = ModuleAccess<RMTPortManager>("rmtPortManager").get();
+ queues = ModuleAccess<RMTQueueManager>("rmtQueueManager").get();
cModule* ipcModule = getParentModule()->getParentModule();
thisIpcAddr = Address(ipcModule->par("ipcAddress").stringValue(),
ipcModule->par("difName").stringValue());
- //this->enableRelay();
-}
-
-/**
- * Creates a gate to be used for connection to an (N-1)-flow.
- *
- * @param gateName name to be given to the newly created gate; it has to be
- * unambiguous within the scope of this IPC's management of (N-1)-flows
- */
-void RMT::createSouthGate(std::string gateName)
-{
+ // register signal for notifying others about a missing local EFCP instance
+ sigRMTNoConnID = registerSignal(SIG_RMT_NoConnId);
+
+ // listen for a signal indicating that a new message has arrived into a queue
+ lisRMTMsgRcvd = new LisRMTPDURcvd(this);
+ getParentModule()->subscribe(SIG_RMT_MessageReceived, lisRMTMsgRcvd);
+}
+
+/**
+ * Returns the current scheduling policy.
+ *
+ *@return pointer to current policy class
+ */
+RMTSchedulingBase* RMT::getSchedulingPolicy()
+{
+ return schedPolicy;
+}
+
+/**
+ * Changes the current scheduling policy.
+ *
+ * @param policy pointer to the new policy class
+ */
+void RMT::setSchedulingPolicy(RMTSchedulingBase* policy)
+{
+ schedPolicy = policy;
+}
+
+/**
+ * Schedules an end-of-queue-service event.
+ *
+ */
+void RMT::scheduleServiceEnd()
+{
+ scheduleAt(simTime() + par("queueServiceTime"), new cMessage("queueServiceDone"));
+}
+
+/**
+ * Tries to begin service of a message that has newly arrived into a queue.
+ * If servicing takes place right now, the wait counter is increased instead.
+ *
+ */
+void RMT::invokeSchedulingPolicy()
+{
+ Enter_Method("invokeSchedulingPolicy()");
+
+ if (!waitingMsgs)
+ {
+ scheduleServiceEnd();
+ }
+ else
+ {
+ waitingMsgs++;
+ }
+}
+
+/**
+ * Creates a gate to be used for connection to an EFCP instance.
+ *
+ * @param efcpiId CEP-id to be used in the gate's name
+ */
+void RMT::createEfcpiGate(unsigned int efcpiId)
+{
+ if (efcpiOut.count(efcpiId))
+ {
+ return;
+ }
+
cModule* rmtModule = getParentModule();
- this->addGate(gateName.c_str(), cGate::INOUT, false);
- cGate* rmtIn = this->gateHalf(gateName.c_str(), cGate::INPUT);
- cGate* rmtOut = this->gateHalf(gateName.c_str(), cGate::OUTPUT);
-
- rmtModule->addGate(gateName.c_str(), cGate::INOUT, false);
- cGate* rmtModuleIn = rmtModule->gateHalf(gateName.c_str(), cGate::INPUT);
- cGate* rmtModuleOut = rmtModule->gateHalf(gateName.c_str(), cGate::OUTPUT);
-
- rmtModuleIn->connectTo(rmtIn);
- rmtOut->connectTo(rmtModuleOut);
-
- // RMT<->(N-1)-EFCP interconnection shall be done by the RA
-}
-
-/**
- * Removes a gate used for a (N-1)-flow.
- *
- * @param gateName name of the gate meant for removal
- */
-void RMT::deleteSouthGate(std::string gateName)
-{
- cModule* rmtModule = getParentModule();
-
- cGate* rmtOut = this->gateHalf(gateName.c_str(), cGate::OUTPUT);
- cGate* rmtModuleIn = rmtModule->gateHalf(gateName.c_str(), cGate::INPUT);
-
- rmtOut->disconnect();
- rmtModuleIn->disconnect();
-
- this->deleteGate(gateName.c_str());
- rmtModule->deleteGate(gateName.c_str());
-
- //ports.erase(gateName);
-}
-
-void RMT::addRMTPort(RMTPortId portId, cGate* gate)
-{
- ports[portId] = gate;
-}
-
-/**
- * Creates a gate to be used for connection to an EFCP instance.
- *
- * @param efcpiId CEP-id to be used in the gate's name
- */
-void RMT::createEfcpiGate(unsigned int efcpiId)
-{
- if (efcpiGates.count(efcpiId))
- {
- return;
- }
-
- cModule* rmtModule = getParentModule();
-
std::ostringstream gateName_str;
- gateName_str << GATE_EFCPIO << efcpiId;
+ gateName_str << GATE_EFCPIO_ << efcpiId;
this->addGate(gateName_str.str().c_str(), cGate::INOUT, false);
cGate* rmtIn = this->gateHalf(gateName_str.str().c_str(), cGate::INPUT);
@@ -127,7 +131,8 @@
rmtModuleIn->connectTo(rmtIn);
rmtOut->connectTo(rmtModuleOut);
- efcpiGates[efcpiId] = rmtOut;
+ efcpiOut[efcpiId] = rmtOut;
+ efcpiIn[efcpiId] = rmtIn;
// RMT<->EFCP interconnection shall be done by the FAI
}
@@ -139,7 +144,7 @@
*/
void RMT::deleteEfcpiGate(unsigned int efcpiId)
{
- if (!efcpiGates.count(efcpiId))
+ if (!efcpiOut.count(efcpiId))
{
return;
}
@@ -147,7 +152,7 @@
cModule* rmtModule = getParentModule();
std::ostringstream gateName_str;
- gateName_str << GATE_EFCPIO << efcpiId;
+ gateName_str << GATE_EFCPIO_ << efcpiId;
cGate* rmtOut = this->gateHalf(gateName_str.str().c_str(), cGate::OUTPUT);
cGate* rmtModuleIn = rmtModule->gateHalf(gateName_str.str().c_str(), cGate::INPUT);
@@ -158,134 +163,251 @@
this->deleteGate(gateName_str.str().c_str());
rmtModule->deleteGate(gateName_str.str().c_str());
- efcpiGates.erase(efcpiId);
-}
-
-/**
- * Passes given PDU to an (N-1)-flow using information from the forwarding table.
- *
- * @param pdu PDU to be sent
- */
-void RMT::sendDown(PDU_Base* pdu)
-{
- Address& pduDestAddr = pdu->getDstAddr();
- int pduQosId = pdu->getConnId().getQoSId();
- cGate* outPort;
-
- if (relayOn)
- {
- // forwarding table lookup
- RMTPortId outPortId = fwTable->lookup(pduDestAddr, pduQosId);
-
- if (outPortId.first != NULL)
+ deleteEfcpiToQueueMapping(efcpiId);
+ efcpiOut.erase(efcpiId);
+ efcpiIn.erase(efcpiId);
+}
+
+/**
+ * Creates a direct mapping from an EFCPI input gate to a RMT queue.
+ *
+ * @param cepId EFCP ID
+ * @param outQueue output RMT queue
+ */
+void RMT::addEfcpiToQueueMapping(unsigned cepId, RMTQueue* outQueue)
+{
+ efcpiToQueue[efcpiIn[cepId]] = outQueue;
+}
+
+/**
+ * Deletes a direct EFCPI->queue mapping.
+ *
+ * @param cepId EFCP ID
+ */
+void RMT::deleteEfcpiToQueueMapping(unsigned cepId)
+{
+ efcpiToQueue.erase(efcpiIn[cepId]);
+}
+
+/**
+ * Wrapper around forwarding table lookup returning the target output gate itself.
+ *
+ * @param destAddr destination address
+ * @param qosId qos-id
+ * @return RMT gate leading to an output RMT queue
+ */
+cGate* RMT::fwTableLookup(Address& destAddr, short qosId)
+{
+ cGate* outGate = NULL;
+ RMTQueue* outQueue = fwTable->lookup(destAddr, qosId);
+
+ if (outQueue != NULL)
+ {
+ outGate = outQueue->getRmtAccessGate();
+ }
+
+ return outGate;
+}
+
+/**
+ * Passes a PDU from an EFCP instance to an appropriate output queue.
+ *
+ * @param pdu PDU to be passed
+ */
+void RMT::efcpiToPort(PDU_Base* pdu)
+{
+ cGate* outGate = NULL;
+
+ outGate = efcpiToQueue[pdu->getArrivalGate()]->getRmtAccessGate();
+ if (outGate != NULL)
+ {
+ EV << this->getFullPath() << " passing a PDU to an output queue" << endl;
+ send(pdu, outGate);
+ }
+ else
+ {
+ EV << this->getFullPath() << "efcpi->port-id mapping not present!" << endl;
+ }
+}
+
+/**
+ * Passes a PDU from an (N-1)-port to an EFCP instance.
+ *
+ * @param pdu PDU to be passed
+ */
+void RMT::portToEfcpi(PDU_Base* pdu)
+{
+ unsigned cepId = pdu->getConnId().getDstCepId();
+ cGate* efcpiGate = efcpiOut[cepId];
+
+ if (efcpiGate != NULL)
+ {
+ EV << this->getFullPath() << " passing a PDU upwards to EFCPI " << cepId << endl;
+ send(pdu, efcpiGate);
+ }
+ else
+ {
+ EV << this->getFullPath() << " EFCPI " << cepId << " isn't present on this system! Notifying other modules." << endl;
+ emit(sigRMTNoConnID, pdu);
+ //delete pdu;
+ }
+
+}
+
+/**
+ * Passes a PDU from an EFCP instance to another local EFCP instance.
+ *
+ * @param pdu PDU to be passed
+ */
+void RMT::efcpiToEfcpi(PDU_Base* pdu)
+{
+ portToEfcpi(pdu);
+}
+
+/**
+ * Passes a CDAP mesage from an (N-1)-port instance to RIB daemon.
+ *
+ * @param cdap CDAP message to be passed
+ */
+void RMT::portToRIB(CDAPMessage* cdap)
+{
+
+ send(cdap, "ribdIo$o");
+}
+
+/**
+ * Passes a CDAP mesage from the RIB daemon to an appropriate output queue.
+ *
+ * @param cdap CDAP message to be passed
+ */
+void RMT::RIBToPort(CDAPMessage* cdap)
+{
+ cGate* outGate = NULL;
+
+ if (!onWire)
+ {
+ outGate = fwTableLookup(cdap->getDstAddr(), -1);
+ }
+ else
+ {
+ outGate = queues->getFirst(RMTQueue::OUTPUT)->getRmtAccessGate();
+ }
+
+ if (outGate != NULL)
+ {
+ EV << this->getFullPath() << " passing a CDAP message to an output queue" << endl;
+ send(cdap, outGate);
+ }
+ else
+ {
+ EV << "there isn't any suitable output queue available!" << endl;
+ }
+}
+
+/**
+ * Relays incoming message to an output queue based on data from PDUFwTable.
+ *
+ * @param msg either a PDU or a CDAP message to be relayed
+ */
+void RMT::portToPort(cMessage* msg)
+{
+ Address destAddr;
+ short qosId;
+
+ if (dynamic_cast<PDU_Base*>(msg) != NULL)
+ {
+ destAddr = ((PDU_Base*)msg)->getDstAddr();
+ qosId = ((PDU_Base*)msg)->getConnId().getQoSId();
+ }
+ else if (dynamic_cast<CDAPMessage*>(msg) != NULL)
+ {
+ destAddr = ((CDAPMessage*)msg)->getDstAddr();
+ qosId = -1;
+ }
+
+ cGate* outGate = NULL;
+
+ if (onWire)
+ {
+ outGate = queues->getFirst(RMTQueue::OUTPUT)->getRmtAccessGate();
+ }
+ else
+ {
+ outGate = fwTableLookup(destAddr, qosId);
+ }
+
+ if (outGate != NULL)
+ {
+ EV << this->getFullPath() << " relaying a message" << endl;
+ send(msg, outGate);
+ }
+ else
+ {
+ EV << this->getFullPath() << " I can't reach any suitable (N-1)-flow! Seems like none is allocated." << endl;
+ }
+
+}
+
+/**
+ * The main finite state machine of Relaying and Multiplexing task.
+ * Makes a decision about what to do with incoming message based on arrival gate,
+ * message type, message destination address and RMT mode of operation.
+ *
+ * @param msg either a PDU or a CDAP message
+ */
+void RMT::processMessage(cMessage* msg)
+{
+ std::string gate = msg->getArrivalGate()->getName();
+
+ if (dynamic_cast<PDU_Base*>(msg) != NULL)
+ { // PDU arrival
+ PDU_Base* pdu = (PDU_Base*) msg;
+
+ if (gate.substr(0, 2) == "in")
{
- outPort = ports[outPortId];
+ if (pdu->getDstAddr() == thisIpcAddr)
+ {
+ portToEfcpi(pdu);
+ }
+ else if (relayOn)
+ {
+ portToPort(msg);
+ }
+ else
+ {
+ EV << getFullPath() << " This PDU isn't for me! Holding it here." << endl;
+ }
+ }
+ else if (gate.substr(0, 7) == GATE_EFCPIO_)
+ {
+ if (pdu->getDstAddr() == thisIpcAddr)
+ {
+ efcpiToEfcpi(pdu);
+ }
+ else
+ {
+ efcpiToPort(pdu);
+ }
+ }
+ }
+ else if (dynamic_cast<CDAPMessage*>(msg) != NULL)
+ { // management message arrival
+ CDAPMessage* cdap = (CDAPMessage*) msg;
+
+ if (gate.substr(0, 2) == "in")
+ {
+ if (cdap->getDstAddr() == thisIpcAddr)
+ {
+ portToRIB(cdap);
+ }
+ else
+ {
+ portToPort(msg);
+ }
}
else
{
- EV << this->getFullPath() << " couldn't find any match in FWTable; dropping." << endl;
- delete pdu;
- return;
- }
-
- }
- else
- {
- // decide which (N-1)-flow should get the PDU...
- // we'll just grab the first one for now
- outPort = ports.begin()->second;
- }
-
- EV << this->getFullPath() << " passing a PDU downwards..." << endl;
-
- if (outPort != NULL)
- {
- // TODO: OMNeT++ 4.4.1 renders the message transfer as if this was sent to this IPC's EFCP. Is this a bug?
- send(pdu, outPort);
- }
- else
- {
- EV << this->getFullPath()
- << " I can't reach a suitable (N-1)-flow! It's probably not allocated. Dropping."
- << endl;
- delete pdu;
- }
-}
-
-/**
- * Passes given PDU to the appropriate EFCP instance using PDU's destination CEP-id.
- *
- * @param pdu PDU to be sent
- */
-void RMT::sendUp(PDU_Base* pdu)
-{
- Address& pduAddr = pdu->getDstAddr();
-
- if (thisIpcAddr == pduAddr)
- {
- EV << this->getFullPath() << " passing a PDU upwards to EFCPI " << pdu->getConnId().getDstCepId() << endl;
- cGate* efcpiGate = efcpiGates[pdu->getConnId().getDstCepId()];
-
- if (efcpiGate != NULL)
- {
- send(pdu, efcpiGate);
- }
- else
- {
- EV << this->getFullPath()
- << " I'm not connected to such EFCPI! Notifying other modules."
- << endl;
- // TODO: emit(cosi)
- delete pdu;
- }
- }
- else
- { // this PDU isn't for us
- if (relayOn)
- { // ...let's relay it somewhere else
- EV << this->getFullPath() << " relaying a PDU elsewhere" << endl;
- sendDown(pdu);
- }
- else
- { //
- EV << this->getFullPath()
- << " this PDU isn't for me, dropping it! (" << thisIpcAddr << " != " << pduAddr << ")"
- << endl;
- delete pdu;
- }
- }
-}
-
-void RMT::handleMessage(cMessage *msg)
-{
- std::string gate = msg->getArrivalGate()->getName();
-
- if (msg->isSelfMessage())
- {
- delete msg;
- }
- else if (dynamic_cast<PDU_Base*>(msg) != NULL)
- { // PDU arrival
- PDU_Base* pdu = (PDU_Base*) msg;
-
- if (gate.substr(0, 8) == GATE_SOUTHIO)
- {
- sendUp(pdu);
- }
- else if (gate.substr(0, 7) == GATE_EFCPIO)
- {
- sendDown(pdu);
- }
- }
- else if (dynamic_cast<CDAPMessage*>(msg) != NULL)
- { // management message arrival
- if (gate == "southIo$i")
- {
- send(msg, "ribdIo$o");
- }
- else
- {
- send(msg, "southIo$o", 0);
+ RIBToPort(cdap);
}
}
else
@@ -295,3 +417,24 @@
}
}
+void RMT::handleMessage(cMessage *msg)
+{
+ if (msg->isSelfMessage())
+ {
+ if (!opp_strcmp(msg->getFullName(), "queueServiceDone"))
+ {
+ schedPolicy->run(queues);
+ if (waitingMsgs)
+ {
+ scheduleServiceEnd();
+ waitingMsgs--;
+ }
+ }
+ delete msg;
+ }
+ else
+ {
+ processMessage(msg);
+ }
+}
+