--- a/src/DIF/RA/RA.cc
+++ b/src/DIF/RA/RA.cc
@@ -51,41 +51,35 @@
fwTable = ModuleAccess<PDUForwardingTable>("pduForwardingTable").get();
flTable = ModuleAccess<FlowTable>("flowTable").get();
rmt = (RMT*) this->getParentModule()->getParentModule()->getModuleByPath(".rmt.rmt");
+ rmtQM = (RMTQueueManager*) this->getParentModule()->getParentModule()->getModuleByPath(".rmt.rmtQueueManager");
// initialize attributes
- processName = getParentModule()->getParentModule()->par(PAR_IPCADDR).stdstringValue();
+ std::ostringstream os;
+ os << getParentModule()->getParentModule()->par(PAR_IPCADDR).stdstringValue()
+ << "_" << getParentModule()->getParentModule()->par(PAR_DIFNAME).stdstringValue();
+ processName = os.str();
+
+ initSignalsAndListeners();
+ initQoSCubes();
// determine and set RMT mode of operation
setRmtMode();
- initSignalsAndListeners();
- initQoSCubes();
WATCH_LIST(this->QosCubes);
- initFlowAlloc();
-}
-
-void RA::initFlowAlloc()
-{
- cXMLElement* dirXml = par("flows").xmlValue();
- cXMLElementList map = dirXml->getChildrenByTagName("Flow");
-
- for (cXMLElementList::iterator i = map.begin(); i != map.end(); ++i)
- {
- cXMLElement* m = *i;
-
- APNamingInfo src = APNamingInfo(APN(processName));
- APNamingInfo dst = APNamingInfo(APN(m->getAttribute("dest")));
-
- Flow *fl = new Flow(src, dst);
- // just use the first QoS cube available (temporary workaround)
- fl->setQosParameters(getQosCubes().front());
-
- preparedFlows.push_back(fl);
- cMessage* msg = new cMessage("RA-CreateFlow");
- scheduleAt(simTime(), msg);
- }
-}
+}
+
+void RA::initSignalsAndListeners()
+{
+ cModule* catcher2 = this->getParentModule()->getParentModule();
+
+ sigRACreFloPosi = registerSignal(SIG_RA_CreateFlowPositive);
+ sigRACreFloNega = registerSignal(SIG_RA_CreateFlowNegative);
+
+ lisRACreFlow = new LisRACreFlow(this);
+ catcher2->subscribe(SIG_RIBD_CreateFlow, lisRACreFlow);
+}
+
void RA::setRmtMode()
{
@@ -96,25 +90,41 @@
if (bottomGate == "medium$o")
{
// we're on wire! this is the bottommost "interface" DIF
+ rmt->setOnWire(true);
// let's connect RMT to the medium
- bindMediumToRMT();
- }
- else if (bottomGate == "northIo$o")
+ bindMediumToRMTQueue();
+ }
+ else if (bottomGate == "northIo$i")
{ // other IPC processes are below us
- if (hostModule->gateSize("northIo") > 1)
- {
- // multiple (N-1)-DIFs are present, RMT shall be relaying
- rmt->enableRelay();
- }
- else
- {
- // we're on top of a single IPC process
- }
- }
-}
-
-
-void RA::initQoSCubes() {
+ rmt->setOnWire(false);
+// // create a queue for each QoS of each (N-1)-DIF
+// for (int i = 0; i <= hostModule->gateSize("northIo"); i++)
+// {
+// EV << hostModule->gate("southIo$o", i)->getNextGate()->getOwnerModule()->getFullName() << endl;
+// cModule* module = hostModule->gate("southIo$o", i)->getNextGate()->getOwnerModule();
+// cModule* raBottom = module->getModuleByPath(".resourceAllocator.ra");
+// QosCubeSet cubes = dynamic_cast<RABase*>(raBottom)->getQosCubes();
+// for (QCubeCItem it = cubes.begin(); it != cubes.end(); ++it) {
+// short qosid = it->getQosId();
+// std::ostringstream ostr;
+// ostr << "q_" << module->par("difName").stringValue() << "_" << qosid;
+//
+// rmt->addQueueSet(ostr.str().c_str());
+// }
+// }
+ }
+
+ if (hostModule->par("routing").boolValue() == true)
+ {
+ rmt->enableRelay();
+ }
+
+ rmt->setSchedulingPolicy(new LongestQFirst);
+}
+
+
+void RA::initQoSCubes()
+{
cXMLElement* qosXml = NULL;
if (par(PAR_QOSDATA).xmlValue() != NULL && par(PAR_QOSDATA).xmlValue()->hasChildren())
qosXml = par(PAR_QOSDATA).xmlValue();
@@ -136,99 +146,99 @@
QosCube cube;
cube.setQosId((unsigned short)atoi(m->getAttribute(ATTR_ID)));
//Following data types should be same as in QosCubes.h
- int avgBand = VAL_QOSPARAMDONOTCARE; //Average bandwidth (measured at the application in bits/sec)
- int avgSDUBand = VAL_QOSPARAMDONOTCARE; //Average SDU bandwidth (measured in SDUs/sec)
- int peakBandDuration = VAL_QOSPARAMDONOTCARE; //Peak bandwidth-duration (measured in bits/sec);
- int peakSDUBandDuration = VAL_QOSPARAMDONOTCARE; //Peak SDU bandwidth-duration (measured in SDUs/sec);
- int burstPeriod = VAL_QOSPARAMDONOTCARE; //Burst period measured in useconds
- int burstDuration = VAL_QOSPARAMDONOTCARE; //Burst duration, measured in usecs fraction of Burst Period
- int undetectedBitErr = VAL_QOSPARAMDONOTCARE; //Undetected bit error rate measured as a probability
- int maxSDUsize = VAL_QOSPARAMDONOTCARE; //MaxSDUSize measured in bytes
- bool partDeliv = VAL_QOSPARAMDEFBOOL; //Partial Delivery - Can SDUs be delivered in pieces rather than all at once?
- bool incompleteDeliv = VAL_QOSPARAMDEFBOOL; //Incomplete Delivery - Can SDUs with missing pieces be delivered?
- bool forceOrder = VAL_QOSPARAMDEFBOOL; //Must SDUs be delivered in order?
- unsigned int maxAllowGap = VAL_QOSPARAMDONOTCARE; //Max allowable gap in SDUs, (a gap of N SDUs is considered the same as all SDUs delivered, i.e. a gap of N is a "don't care.")
- int delay = VAL_QOSPARAMDONOTCARE; //Delay in usecs
- int jitter = VAL_QOSPARAMDONOTCARE; //Jitter in usecs2
- int costtime = VAL_QOSPARAMDONOTCARE; //measured in $/ms
- int costbits = VAL_QOSPARAMDONOTCARE; //measured in $/Mb
+ int avgBand = VAL_QOSPARDONOTCARE; //Average bandwidth (measured at the application in bits/sec)
+ int avgSDUBand = VAL_QOSPARDONOTCARE; //Average SDU bandwidth (measured in SDUs/sec)
+ int peakBandDuration = VAL_QOSPARDONOTCARE; //Peak bandwidth-duration (measured in bits/sec);
+ int peakSDUBandDuration = VAL_QOSPARDONOTCARE; //Peak SDU bandwidth-duration (measured in SDUs/sec);
+ int burstPeriod = VAL_QOSPARDONOTCARE; //Burst period measured in useconds
+ int burstDuration = VAL_QOSPARDONOTCARE; //Burst duration, measured in usecs fraction of Burst Period
+ int undetectedBitErr = VAL_QOSPARDONOTCARE; //Undetected bit error rate measured as a probability
+ int maxSDUsize = VAL_QOSPARDONOTCARE; //MaxSDUSize measured in bytes
+ bool partDeliv = VAL_QOSPARDEFBOOL; //Partial Delivery - Can SDUs be delivered in pieces rather than all at once?
+ bool incompleteDeliv = VAL_QOSPARDEFBOOL; //Incomplete Delivery - Can SDUs with missing pieces be delivered?
+ bool forceOrder = VAL_QOSPARDEFBOOL; //Must SDUs be delivered in order?
+ unsigned int maxAllowGap = VAL_QOSPARDONOTCARE; //Max allowable gap in SDUs, (a gap of N SDUs is considered the same as all SDUs delivered, i.e. a gap of N is a "don't care.")
+ int delay = VAL_QOSPARDONOTCARE; //Delay in usecs
+ int jitter = VAL_QOSPARDONOTCARE; //Jitter in usecs2
+ int costtime = VAL_QOSPARDONOTCARE; //measured in $/ms
+ int costbits = VAL_QOSPARDONOTCARE; //measured in $/Mb
cXMLElementList attrs = m->getChildren();
for (cXMLElementList::iterator jt = attrs.begin(); jt != attrs.end(); ++jt) {
cXMLElement* n = *jt;
if ( !strcmp(n->getTagName(), ELEM_AVGBW) ) {
- avgBand = n->getNodeValue() ? atoi(n->getNodeValue()) : VAL_QOSPARAMDONOTCARE;
+ avgBand = n->getNodeValue() ? atoi(n->getNodeValue()) : VAL_QOSPARDONOTCARE;
if (avgBand < 0)
- avgBand = VAL_QOSPARAMDONOTCARE;
+ avgBand = VAL_QOSPARDONOTCARE;
}
else if (!strcmp(n->getTagName(), ELEM_AVGSDUBW)) {
- avgSDUBand = n->getNodeValue() ? atoi(n->getNodeValue()) : VAL_QOSPARAMDONOTCARE;
+ avgSDUBand = n->getNodeValue() ? atoi(n->getNodeValue()) : VAL_QOSPARDONOTCARE;
if (avgSDUBand < 0)
- avgSDUBand = VAL_QOSPARAMDONOTCARE;
+ avgSDUBand = VAL_QOSPARDONOTCARE;
}
else if (!strcmp(n->getTagName(), ELEM_PEAKBWDUR)) {
- peakBandDuration = n->getNodeValue() ? atoi(n->getNodeValue()) : VAL_QOSPARAMDONOTCARE;
+ peakBandDuration = n->getNodeValue() ? atoi(n->getNodeValue()) : VAL_QOSPARDONOTCARE;
if (peakBandDuration < 0)
- peakBandDuration = VAL_QOSPARAMDONOTCARE;
+ peakBandDuration = VAL_QOSPARDONOTCARE;
}
else if (!strcmp(n->getTagName(), ELEM_PEAKSDUBWDUR)) {
- peakSDUBandDuration = n->getNodeValue() ? atoi(n->getNodeValue()) : VAL_QOSPARAMDONOTCARE;
+ peakSDUBandDuration = n->getNodeValue() ? atoi(n->getNodeValue()) : VAL_QOSPARDONOTCARE;
if (peakSDUBandDuration < 0)
- peakSDUBandDuration = VAL_QOSPARAMDONOTCARE;
+ peakSDUBandDuration = VAL_QOSPARDONOTCARE;
}
else if (!strcmp(n->getTagName(), ELEM_BURSTPERIOD)) {
- burstPeriod = n->getNodeValue() ? atoi(n->getNodeValue()) : VAL_QOSPARAMDONOTCARE;
+ burstPeriod = n->getNodeValue() ? atoi(n->getNodeValue()) : VAL_QOSPARDONOTCARE;
if (burstPeriod < 0)
- burstPeriod = VAL_QOSPARAMDONOTCARE;
+ burstPeriod = VAL_QOSPARDONOTCARE;
}
else if (!strcmp(n->getTagName(), ELEM_BURSTDURATION)) {
- burstDuration = n->getNodeValue() ? atoi(n->getNodeValue()) : VAL_QOSPARAMDONOTCARE;
+ burstDuration = n->getNodeValue() ? atoi(n->getNodeValue()) : VAL_QOSPARDONOTCARE;
if (burstDuration < 0)
- burstDuration = VAL_QOSPARAMDONOTCARE;
+ burstDuration = VAL_QOSPARDONOTCARE;
}
else if (!strcmp(n->getTagName(), ELEM_UNDETECTBITERR)) {
- undetectedBitErr = n->getNodeValue() ? atof(n->getNodeValue()) : VAL_QOSPARAMDONOTCARE;
+ undetectedBitErr = n->getNodeValue() ? atof(n->getNodeValue()) : VAL_QOSPARDONOTCARE;
if (undetectedBitErr < 0 || undetectedBitErr > 1 )
- undetectedBitErr = VAL_QOSPARAMDONOTCARE;
+ undetectedBitErr = VAL_QOSPARDONOTCARE;
}
else if (!strcmp(n->getTagName(), ELEM_MAXSDUSIZE)) {
- maxSDUsize = n->getNodeValue() ? atoi(n->getNodeValue()) : VAL_QOSPARAMDONOTCARE;
+ maxSDUsize = n->getNodeValue() ? atoi(n->getNodeValue()) : VAL_QOSPARDONOTCARE;
if (maxSDUsize < 0)
- maxSDUsize = VAL_QOSPARAMDONOTCARE;
+ maxSDUsize = VAL_QOSPARDONOTCARE;
}
else if (!strcmp(n->getTagName(), ELEM_PARTIALDELIVER)) {
- partDeliv = n->getNodeValue() ? atoi(n->getNodeValue()) : VAL_QOSPARAMDEFBOOL;
+ partDeliv = n->getNodeValue() ? atoi(n->getNodeValue()) : VAL_QOSPARDEFBOOL;
}
else if (!strcmp(n->getTagName(), ELEM_INCOMPLETEDELIVER)) {
- incompleteDeliv = n->getNodeValue() ? atoi(n->getNodeValue()) : VAL_QOSPARAMDEFBOOL;
+ incompleteDeliv = n->getNodeValue() ? atoi(n->getNodeValue()) : VAL_QOSPARDEFBOOL;
}
else if (!strcmp(n->getTagName(), ELEM_FORCEORDER)) {
- forceOrder = n->getNodeValue() ? atoi(n->getNodeValue()) : VAL_QOSPARAMDEFBOOL;
+ forceOrder = n->getNodeValue() ? atoi(n->getNodeValue()) : VAL_QOSPARDEFBOOL;
}
else if (!strcmp(n->getTagName(), ELEM_MAXALLOWGAP)) {
- maxAllowGap = n->getNodeValue() ? atoi(n->getNodeValue()) : VAL_QOSPARAMDONOTCARE;
+ maxAllowGap = n->getNodeValue() ? atoi(n->getNodeValue()) : VAL_QOSPARDONOTCARE;
if (maxAllowGap < 0)
- maxAllowGap = VAL_QOSPARAMDONOTCARE;
+ maxAllowGap = VAL_QOSPARDONOTCARE;
}
else if (!strcmp(n->getTagName(), ELEM_DELAY)) {
- delay = n->getNodeValue() ? atoi(n->getNodeValue()) : VAL_QOSPARAMDONOTCARE;
+ delay = n->getNodeValue() ? atoi(n->getNodeValue()) : VAL_QOSPARDONOTCARE;
if (delay < 0)
- delay = VAL_QOSPARAMDONOTCARE;
+ delay = VAL_QOSPARDONOTCARE;
}
else if (!strcmp(n->getTagName(), ELEM_JITTER)) {
- jitter = n->getNodeValue() ? atoi(n->getNodeValue()) : VAL_QOSPARAMDONOTCARE;
+ jitter = n->getNodeValue() ? atoi(n->getNodeValue()) : VAL_QOSPARDONOTCARE;
if (jitter < 0)
- jitter = VAL_QOSPARAMDONOTCARE;
+ jitter = VAL_QOSPARDONOTCARE;
}
else if (!strcmp(n->getTagName(), ELEM_COSTTIME)) {
- costtime = n->getNodeValue() ? atoi(n->getNodeValue()) : VAL_QOSPARAMDEFBOOL;
+ costtime = n->getNodeValue() ? atoi(n->getNodeValue()) : VAL_QOSPARDEFBOOL;
if (costtime < 0)
- costtime = VAL_QOSPARAMDONOTCARE;
+ costtime = VAL_QOSPARDONOTCARE;
}
else if (!strcmp(n->getTagName(), ELEM_COSTBITS)) {
- costbits = n->getNodeValue() ? atoi(n->getNodeValue()) : VAL_QOSPARAMDEFBOOL;
+ costbits = n->getNodeValue() ? atoi(n->getNodeValue()) : VAL_QOSPARDEFBOOL;
if (costbits < 0)
- costbits = VAL_QOSPARAMDONOTCARE;
+ costbits = VAL_QOSPARDONOTCARE;
}
}
@@ -252,7 +262,7 @@
QosCubes.push_back(cube);
}
if (!QosCubes.size()) {
- std::stringstream os;
+ std::ostringstream os;
os << this->getFullPath() << " does not have any QoSCube in its set. It cannot work without at least one valid QoS cube!" << endl;
error(os.str().c_str());
}
@@ -265,81 +275,109 @@
delete msg;
return;
}
-
- if ( !strcmp(msg->getName(), "RA-CreateFlow") ) {
- createFlow(preparedFlows.front());
- preparedFlows.pop_front();
- }
-}
-
-/**
- * Connects the RMT module to the medium defined in NED.
+}
+
+/**
+ * Connects the medium defined in NED to the RMT module.
* Used only for the bottom IPC process in a processing system.
*/
-void RA::bindMediumToRMT()
-{
- std::ostringstream rmtGate;
- rmtGate << GATE_SOUTHIO << "PHY";
-
- rmt->createSouthGate(rmtGate.str().c_str());
- cGate* rmtIn = rmt->getParentModule()->gateHalf(rmtGate.str().c_str(), cGate::INPUT);
- cGate* rmtOut = rmt->getParentModule()->gateHalf(rmtGate.str().c_str(), cGate::OUTPUT);
-
+void RA::bindMediumToRMTQueue()
+{
+ // retrieve the south gate
cModule* thisIpc = this->getParentModule()->getParentModule();
cGate* thisIpcIn = thisIpc->gateHalf("southIo$i", cGate::INPUT, 0);
cGate* thisIpcOut = thisIpc->gateHalf("southIo$o", cGate::OUTPUT, 0);
- rmtOut->connectTo(thisIpcOut);
- thisIpcIn->connectTo(rmtIn);
-
- rmt->addRMTPort(std::make_pair((cModule*)NULL, -1), rmtOut->getPathStartGate());
-}
-
-/**
- * Connects the RMT module to the specified (N-1)-flow.
+ // now let's connect rmtModule to bottom of IPC
+ // create an INOUT on the bottom of rmtModule
+ std::ostringstream rmtGate;
+ rmtGate << GATE_SOUTHIO_ << "PHY";
+
+ rmt->getParentModule()->addGate(rmtGate.str().c_str(), cGate::INOUT, false);
+ cGate* rmtModuleIn = rmt->getParentModule()->gateHalf(rmtGate.str().c_str(), cGate::INPUT);
+ cGate* rmtModuleOut = rmt->getParentModule()->gateHalf(rmtGate.str().c_str(), cGate::OUTPUT);
+
+ // interconnect rmtModule and bottom of IPC
+ rmtModuleOut->connectTo(thisIpcOut);
+ thisIpcIn->connectTo(rmtModuleIn);
+
+ RMTQueue* inputQueue = rmtQM->addQueue(RMTQueue::INPUT);
+ RMTQueue* outputQueue = rmtQM->addQueue(RMTQueue::OUTPUT);
+
+ outputQueue->getOutputGate()->connectTo(rmtModuleOut);
+ rmtModuleIn->connectTo(inputQueue->getInputGate());
+}
+
+/**
+ * Connects the specified (N-1)-flow to an RMT queue.
*
* @param ipc IPC process containing the (N-1)-flow
* @param flow the (N-1)-flow
- */
-void RA::bindFlowToRMT(cModule* ipc, Flow* flow)
-{
+ * @return output gate for the (N-1)-flow
+ */
+RMTQueue* RA::bindLowerFlowToRmtQueue(cModule* ipc, FABase* fab, Flow* flow)
+{
+
+ EV << "attempting to bind an (N-1)-flow to a RMT queue..." << endl;
+ int portId = flow->getSrcPortId();
+ /*
+ if ( difAllocator->isAppLocal(flow->getDstApni().getApn()) )
+ portId = flow->getDstPortId();
+ else if ( difAllocator->isAppLocal(flow->getSrcApni().getApn()) )
+ portId = flow->getSrcPortId();
+ else
+ throw("Binding to inconsistant PortId occured!");
+ */
+
// expand the given portId so it's unambiguous within this IPC
- std::string combinedPortId = normalizePortId(ipc->getFullName(), flow->getSrcPortId());
-
- std::ostringstream rmtGate;
- rmtGate << GATE_SOUTHIO << combinedPortId;
-
- rmt->createSouthGate(rmtGate.str());
-
- // get (N-1)-IPC gates
+ std::string combinedPortId = normalizePortId(ipc->getFullName(), portId);
+
+ //// binding begins at the bottom: first, we need to interconnect both IPCs
+ // get (N-1)-IPC north gates
std::ostringstream bottomIpcGate;
- bottomIpcGate << "northIo_" << flow->getSrcPortId();
+ bottomIpcGate << GATE_NORTHIO_ << portId;
cGate* bottomIpcIn = ipc->gateHalf(bottomIpcGate.str().c_str(), cGate::INPUT);
cGate* bottomIpcOut = ipc->gateHalf(bottomIpcGate.str().c_str(), cGate::OUTPUT);
- // get RMT gates
- cGate* rmtIn = rmt->getParentModule()->gateHalf(rmtGate.str().c_str(), cGate::INPUT);
- cGate* rmtOut = rmt->getParentModule()->gateHalf(rmtGate.str().c_str(), cGate::OUTPUT);
-
- // create an intermediate border gate
+ // create a border south gate for this IPC
cModule* thisIpc = this->getParentModule()->getParentModule();
- std::ostringstream thisIpcGate;
- thisIpcGate << "southIo_" << combinedPortId;
-
- thisIpc->addGate(thisIpcGate.str().c_str(), cGate::INOUT, false);
- cGate* thisIpcIn = thisIpc->gateHalf(thisIpcGate.str().c_str(), cGate::INPUT);
- cGate* thisIpcOut = thisIpc->gateHalf(thisIpcGate.str().c_str(), cGate::OUTPUT);
-
-
+ std::ostringstream thisIpcGate_ostr;
+ thisIpcGate_ostr << GATE_SOUTHIO_ << combinedPortId;
+ std::string thisIpcGate = thisIpcGate_ostr.str();
+
+ thisIpc->addGate(thisIpcGate.c_str(), cGate::INOUT, false);
+ cGate* thisIpcIn = thisIpc->gateHalf(thisIpcGate.c_str(), cGate::INPUT);
+ cGate* thisIpcOut = thisIpc->gateHalf(thisIpcGate.c_str(), cGate::OUTPUT);
+
+ // interconnect IPCs
bottomIpcOut->connectTo(thisIpcIn);
- thisIpcIn->connectTo(rmtIn);
-
- rmtOut->connectTo(thisIpcOut);
thisIpcOut->connectTo(bottomIpcIn);
- // modules are connected; register a handle
- rmt->addRMTPort(std::make_pair(ipc, flow->getSrcPortId()), rmtOut->getPathStartGate());
-
+ //// now we'll connect bottom of this IPC to rmtModule
+
+ rmt->getParentModule()->addGate(thisIpcGate.c_str(), cGate::INOUT, false);
+ cGate* rmtModuleIn = rmt->getParentModule()->gateHalf(thisIpcGate.c_str(), cGate::INPUT);
+ cGate* rmtModuleOut = rmt->getParentModule()->gateHalf(thisIpcGate.c_str(), cGate::OUTPUT);
+
+ // interconnect rmtModule and bottom of IPC
+ rmtModuleOut->connectTo(thisIpcOut);
+ thisIpcIn->connectTo(rmtModuleIn);
+
+ // all that's left to do is a binding of rmtModule south gate to a queue...
+ // since the specifications are (at this time) a bit unclear about cardinality
+ // between queues and (N-1)-flows, we'll simply create a new set of queues
+ // for each (N-1)-flow, i.e. 1:1
+
+ RMTQueue* inputQueue = rmtQM->addQueue(RMTQueue::INPUT);
+ RMTQueue* outputQueue = rmtQM->addQueue(RMTQueue::OUTPUT);
+
+ rmtModuleIn->connectTo(inputQueue->getInputGate());
+ outputQueue->getOutputGate()->connectTo(rmtModuleOut);
+
+ // update the flow table
+ flTable->insert(flow, fab, inputQueue, outputQueue, thisIpcGate);
+
+ return outputQueue;
}
/**
@@ -348,7 +386,7 @@
*
* @param ipcName module identifier of an underlying IPC process
* @param flowPortId original portId to be expanded
- * @return normalizes port-id
+ * @return normalized port-id
*/
std::string RA::normalizePortId(std::string ipcName, int flowPortId)
{
@@ -358,22 +396,21 @@
}
/**
- * Creates an (N-1)-flow.
+ * Creates an (N-1)-flow (this is the mechanism behind Allocate() call).
*
- * @param dstIpc address of the destination IPC process
- */
-void RA::createFlow(Flow *fl)
-{
+ * @param flow specified flow object
+ */
+void RA::createFlow(Flow *flow)
+{
+ Enter_Method("createFlow()");
+
//Ask DA which IPC to use to reach dst App
- DirectoryEntry* de = difAllocator->resolveApn(fl->getDstApni().getApn());
-
- if (de == NULL) {
- EV << "DA does not know target application." << endl;
+ const Address* ad = difAllocator->resolveApnToBestAddress(flow->getDstApni().getApn());
+ if (ad == NULL) {
+ EV << "DifAllocator returned NULL for resolving " << flow->getDstApni().getApn() << endl;
return;
}
-
- //TODO: Vesely - Now using first available APN to DIFMember mapping
- Address addr = de->getSupportedDifs().front();
+ Address addr = *ad;
//TODO: Vesely - New IPC must be enrolled or DIF created
if (!difAllocator->isDifLocal(addr.getDifName())) {
@@ -386,63 +423,186 @@
FABase* fab = difAllocator->findFaInsideIpc(targetIpc);
//Command target FA to allocate flow
- bool status = fab->receiveAllocateRequest(fl);
+ bool status = fab->receiveAllocateRequest(flow);
+
+ //FIXME: Vesely - WAIT!
//If AllocationRequest ended by creating connections
if (status)
{
// connect the new flow to the RMT
- bindFlowToRMT(targetIpc, fl);
+ RMTQueue* outQ = bindLowerFlowToRmtQueue(targetIpc, fab, flow);
// we're ready to go!
- //signalizeFlowAllocated(fl);
- flTable->insert(fl, fab);
+ // update the PDU forwarding table
+ fwTable->insert(Address(flow->getDstApni().getApn().getName()), flow->getConId().getQoSId(), outQ);
}
else
{
EV << "Flow not allocated!" << endl;
}
-
-}
-
-void RA::initSignalsAndListeners() {
-/*
- // allocation request
- sigRAAllocReq = registerSignal(SIG_RA_AllocateRequest);
- // deallocation request
- sigRADeallocReq = registerSignal(SIG_RA_DeallocateRequest);
- // positive response to allocation request
- sigRAAllocResPosi = registerSignal(SIG_RA_AllocateResponsePositive);
- // negative response to allocation request
- sigRAAllocResNega = registerSignal(SIG_RA_AllocateResponseNegative);
- // successful allocation of an (N-1)-flow
- sigRAFlowAllocd = registerSignal(SIG_RA_FlowAllocated);
- // successful deallocation of an (N-1)-flow
- sigRAFlowDeallocd = registerSignal(SIG_RA_FlowDeallocated);
-*/
-}
-
-/*
-void RA::signalizeAllocateRequest(Flow* flow) {
- emit(sigRAAllocReq, flow);
-}
-
-void RA::signalizeDeallocateRequest(Flow* flow) {
- emit(sigRADeallocReq, flow);
-}
-
-void RA::signalizeAllocateResponsePositive(Flow* flow) {
- emit(sigRAAllocResPosi, flow);
-}
-
-void RA::signalizeAllocateResponseNegative(Flow* flow) {
- emit(sigRAAllocResNega, flow);
-}
-
-void RA::signalizeFlowAllocated(Flow* flow) {
- emit(sigRAFlowAllocd, flow);
-}
-
-void RA::signalizeFlowDeallocated(Flow* flow) {
- emit(sigRAFlowDeallocd, flow);
-}
-*/
+}
+
+/**
+ * Creates an (N-1)-flow (this is the mechanism behind response to an M_CREATE request).
+ *
+ * @param flow specified flow object
+ */
+void RA::createFlowWithoutAllocate(Flow* flow)
+{
+ Enter_Method("createFlowWoAlloc()");
+
+ //Ask DA which IPC to use to reach dst App
+ const Address* ad = difAllocator->resolveApnToBestAddress(flow->getDstApni().getApn());
+ if (ad == NULL) {
+ EV << "DifAllocator returned NULL for resolving " << flow->getDstApni().getApn() << endl;
+ signalizeCreateFlowNegativeToRibd(flow);
+ return;
+ }
+ Address addr = *ad;
+
+ //TODO: Vesely - New IPC must be enrolled or DIF created
+ if (!difAllocator->isDifLocal(addr.getDifName())) {
+ EV << "Local CS does not have any IPC in DIF " << addr.getDifName() << endl;
+ signalizeCreateFlowNegativeToRibd(flow);
+ return;
+ }
+
+ //Retrieve DIF's local IPC member
+ cModule* targetIpc = difAllocator->getDifMember(addr.getDifName());
+ FABase* fab = difAllocator->findFaInsideIpc(targetIpc);
+
+ // connect the new flow to the RMT
+ RMTQueue* outQ = bindLowerFlowToRmtQueue(targetIpc, fab, flow);
+ // we're ready to go!
+ // update the PDU forwarding table
+ fwTable->insert(Address(flow->getDstApni().getApn().getName()), flow->getConId().getQoSId(), outQ);
+ // add other accessible applications into forwarding table
+ const APNList* remoteApps = difAllocator->findNeigborApns(flow->getDstApni().getApn());
+ if (remoteApps)
+ {
+ for (ApnCItem it = remoteApps->begin(); it != remoteApps->end(); ++it)
+ {
+ Address addr = Address(it->getName());
+ fwTable->insert(addr, flow->getConId().getQoSId(), outQ);
+ }
+ }
+
+ signalizeCreateFlowPositiveToRibd(flow);
+}
+
+/**
+ * Removes specified (N-1)-flow and bindings (this is the mechanism behind Deallocate() call).
+ *
+ * @param flow specified flow object
+ */
+void RA::removeFlow(Flow *flow)
+{
+ cModule* thisIpc = this->getParentModule()->getParentModule();
+ FlowTableItem* flowItem = flTable->lookup(flow);
+ const char* gateName = flowItem->getGateName().c_str();
+
+ cGate* thisIpcIn = thisIpc->gateHalf(gateName, cGate::INPUT);
+ cGate* thisIpcOut = thisIpc->gateHalf(gateName, cGate::OUTPUT);
+
+ cGate* rmtModuleIn = rmt->getParentModule()->gateHalf(gateName, cGate::INPUT);
+ cGate* rmtModuleOut = rmt->getParentModule()->gateHalf(gateName, cGate::OUTPUT);
+
+ thisIpcIn->disconnect();
+ thisIpcOut->disconnect();
+ rmtModuleIn->disconnect();
+ rmtModuleOut->disconnect();
+ flowItem->getRmtOutputQueue()->getOutputGate()->disconnect();
+
+ rmtQM->removeQueue(flowItem->getRmtInputQueue());
+ rmtQM->removeQueue(flowItem->getRmtOutputQueue());
+
+ flowItem->getFaBase()->receiveDeallocateRequest(flow);
+ thisIpc->deleteGate(gateName);
+ rmt->getParentModule()->deleteGate(gateName);
+
+ fwTable->remove(flowItem->getRmtOutputQueue());
+ flTable->remove(flow);
+}
+
+/**
+ * Creates a binding between a flow in this IPC and medium defined in NED
+ * @param flow specified flow object
+ *
+ */
+void RA::bindFlowToMedium(Flow* flow)
+{
+ EV << "binding a flow to the medium" << endl;
+
+ RMTQueue* outQueue = rmtQM->getFirst(RMTQueue::OUTPUT);
+ rmt->addEfcpiToQueueMapping(flow->getConnectionId().getSrcCepId(), outQueue);
+}
+
+/**
+ * Creates a binding between a flow in this IPC and a suitable (N-1)-flow
+ *
+ * @param flow specified flow object
+ * @return Returns TRUE if connected onWire else return FALSE
+ */
+bool RA::bindFlowToLowerFlow(Flow* flow)
+{
+ //EV << "XXXXXXXXXXXXXX" << flow->info();
+ Enter_Method("bindToLowerFlow()");
+ if (rmt->isOnWire())
+ {
+ bindFlowToMedium(flow);
+ return true;
+ }
+
+ std::string neighAddr = flow->getDstNeighbor().getApname().getName();
+ std::string dstAddr = flow->getDstAddr().getApname().getName();
+ unsigned short qosId = flow->getConId().getQoSId();
+
+ // see if any appropriate (N-1)-flow already exists
+ FlowTableItem* curFlow = NULL;
+ curFlow = flTable->lookup(neighAddr, qosId);
+
+ if (curFlow == NULL)
+ { // we need to create a new (N-1)-flow to suit our needs
+ EV << "creating an (N-1)-flow (dst Addr " << neighAddr << ")" << endl;
+
+ APNamingInfo src = APNamingInfo(APN(processName));
+ APNamingInfo dst = APNamingInfo(APN(neighAddr));
+
+ Flow *lowerFlow = new Flow(src, dst);
+ lowerFlow->setQosParameters(flow->getQosParameters());
+ createFlow(lowerFlow);
+ }
+
+ EV << "binding a flow to an (N-1)-flow" << endl;
+
+ // add efcpi->rmtQueue mapping for direct multiplexing
+ FlowTableItem* targetFlow = flTable->lookup(neighAddr, qosId);
+ if (targetFlow == NULL)
+ {
+ EV << "!!! something went wrong! there isn't any suitable (N-1)-flow present for dst "
+ << neighAddr << " so it won't be multiplexed further." << endl;
+ }
+ else
+ {
+ rmt->addEfcpiToQueueMapping(flow->getConnectionId().getSrcCepId(),
+ targetFlow->getRmtOutputQueue());
+ // add another fwtable entry for direct srcApp->dstApp messages (if needed)
+ if (neighAddr != dstAddr)
+ {
+ fwTable->insert(Address(dstAddr), qosId, targetFlow->getRmtOutputQueue());
+ }
+ }
+
+ return false;
+}
+
+
+void RA::signalizeCreateFlowPositiveToRibd(Flow* flow) {
+ EV << "Emits CreateFlowPositive signal for flow" << endl;
+ emit(sigRACreFloPosi, flow);
+}
+
+void RA::signalizeCreateFlowNegativeToRibd(Flow* flow) {
+ EV << "Emits CreateFlowNegative signal for flow" << endl;
+ emit(sigRACreFloNega, flow);
+}