--- a/src/DIF/RA/RA.cc
+++ b/src/DIF/RA/RA.cc
@@ -1,3 +1,5 @@
+//
+// Copyright Š 2014 - 2015 PRISTINE Consortium (http://ict-pristine.eu)
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
@@ -65,8 +67,6 @@
difAllocator = check_and_cast<DA*>
(getModuleByPath("^.^.^.difAllocator.da"));
- //fwdTable = check_and_cast<PDUForwardingTable*>
- // (getModuleByPath("^.pduForwardingTable"));
flowTable = check_and_cast<NM1FlowTable*>
(getModuleByPath("^.nm1FlowTable"));
rmt = check_and_cast<RMT*>
@@ -107,6 +107,8 @@
{
sigRACreFloPosi = registerSignal(SIG_RA_CreateFlowPositive);
sigRACreFloNega = registerSignal(SIG_RA_CreateFlowNegative);
+ sigRASDReqFromRMT = registerSignal(SIG_RA_InvokeSlowdown);
+ sigRASDReqFromRIB = registerSignal(SIG_RA_ExecuteSlowdown);
lisRAAllocResPos = new LisRAAllocResPos(this);
thisIPC->subscribe(SIG_FAI_AllocateResponsePositive, lisRAAllocResPos);
@@ -120,6 +122,26 @@
lisRACreResPosi = new LisRACreResPosi(this);
thisIPC->getParentModule()->
subscribe(SIG_RIBD_CreateFlowResponsePositive, this->lisRACreResPosi);
+
+ lisEFCPStopSending = new LisEFCPStopSending(this);
+ thisIPC->getParentModule()->
+ subscribe(SIG_EFCP_StahpSending, this->lisEFCPStopSending);
+
+ lisEFCPStartSending = new LisEFCPStartSending(this);
+ thisIPC->getParentModule()->
+ subscribe(SIG_EFCP_StartSending, this->lisEFCPStartSending);
+
+ lisRMTSDReq = new LisRMTSlowdownRequest(this);
+ thisIPC->subscribe(SIG_RMT_SlowdownRequest, this->lisRMTSDReq);
+
+ lisRIBCongNotif = new LisRIBCongNotif(this);
+ thisIPC->subscribe(SIG_RIBD_CongestionNotification, this->lisRIBCongNotif);
+
+ lisRMTPortDrainDisable = new LisRMTPortDrainDisable(this);
+ thisIPC->subscribe(SIG_RMT_PortDrainDisable, this->lisRMTPortDrainDisable);
+
+ lisRMTPortDrainEnable = new LisRMTPortDrainEnable(this);
+ thisIPC->subscribe(SIG_RMT_PortDrainEnable, this->lisRMTPortDrainEnable);
}
void RA::initFlowAlloc()
@@ -398,7 +420,7 @@
// create a mock "(N-1)-port" for interface
RMTPort* port = rmtAllocator->addPort(NULL);
// connect the port to the bottom
- interconnectModules(rmtModule, port, rmtGate.str(), std::string(GATE_SOUTHIO));
+ interconnectModules(rmtModule, port->getParentModule(), rmtGate.str(), std::string(GATE_SOUTHIO));
// finalize initial port parameters
port->postInitialize();
port->setReady();
@@ -433,7 +455,7 @@
// 2) attach a RMTPort instance (pretty much a representation of an (N-1)-port)
RMTPort* port = rmtAllocator->addPort(flow);
- interconnectModules(rmtModule, port, thisIPCGate.str(), std::string(GATE_SOUTHIO));
+ interconnectModules(rmtModule, port->getParentModule(), thisIPCGate.str(), std::string(GATE_SOUTHIO));
// finalize initial port parameters
port->postInitialize();
@@ -521,15 +543,11 @@
{
// connect the new flow to the RMT
RMTPort* port = bindNM1FlowToRMT(targetIPC, fab, flow);
- // update the PDU forwarding table
- //fwdTable->insert(Address(dstApn.getName()), flow->getConId().getQoSId(), port);
- // TODO: remove this when management isn't piggy-backed anymore
+ // TODO: remove this when management isn't piggy-backed anymore!
// (port shouldn't be ready to send out data when the flow isn't yet allocated)
port->setReady();
- //fwTable->insert(Address(flow->getDstApni().getApn().getName()),
- // flow->getConId().getQoSId(), port);
-
+ // invoke fwdTable insertion policy
fwdtg->insertFlowInfo(
Address(flow->getDstApni().getApn().getName()),
flow->getConId().getQoSId(),
@@ -598,20 +616,6 @@
// attach the new flow to RMT
RMTPort* port = bindNM1FlowToRMT(targetIpc, fab, flow);
// update the PDU forwarding table
-/*
- fwdTable->insert(Address(dstAPN.getName()), qosID, port);
-
- // add other accessible applications into forwarding table
- const APNList* remoteApps = difAllocator->findNeigborApns(dstAPN);
- if (remoteApps)
- {
- for (ApnCItem it = remoteApps->begin(); it != remoteApps->end(); ++it)
- {
- Address addr = Address(it->getName());
- fwdTable->insert(addr, qosID, port);
- }
- }
-*/
fwdtg->insertFlowInfo(
Address(flow->getDstApni().getApn().getName()),
flow->getConId().getQoSId(),
@@ -647,7 +651,7 @@
NM1FlowTableItem* item = flowTable->findFlowByDstApni(neighApn, qosId);
if (item != NULL)
{
- qAllocPolicy->onNFlowAlloc(item->getRmtPort(), flow);
+ qAllocPolicy->onNFlowAlloc(item->getRMTPort(), flow);
}
}
}
@@ -667,21 +671,9 @@
// TODO: move this to receiveSignal()
NM1FlowTableItem* item = flowTable->findFlowByDstApni(dstApn.getName(), qosId);
if (item == NULL) return;
-/*
- // add other accessible applications into the forwarding table
- const APNList* remoteApps = difAllocator->findNeigborApns(dstApn);
- if (remoteApps)
- {
- for (ApnCItem it = remoteApps->begin(); it != remoteApps->end(); ++it)
- {
- Address addr = Address(it->getName());
- fwdTable->insert(addr, qosId, item->getRmtPort());
- }
- }
-*/
// mark this flow as connected
item->setConnectionStatus(NM1FlowTableItem::CON_ESTABLISHED);
- item->getRmtPort()->setReady();
+ item->getRMTPort()->setReady();
}
/**
@@ -693,29 +685,31 @@
{ // TODO: part of this should be split into something like postNM1FlowDeallocation
NM1FlowTableItem* flowItem = flowTable->lookup(flow);
+ ASSERT(flowItem != NULL);
flowItem->setConnectionStatus(NM1FlowTableItem::CON_RELEASING);
-
- RMTPort* port = flowItem->getRmtPort();
+ flowItem->getFABase()->receiveDeallocateRequest(flow);
+
+ // disconnect and delete gates
+ RMTPort* port = flowItem->getRMTPort();
const char* gateName = flowItem->getGateName().c_str();
cGate* thisIpcIn = thisIPC->gateHalf(gateName, cGate::INPUT);
cGate* thisIpcOut = thisIPC->gateHalf(gateName, cGate::OUTPUT);
cGate* rmtModuleIn = rmtModule->gateHalf(gateName, cGate::INPUT);
cGate* rmtModuleOut = rmtModule->gateHalf(gateName, cGate::OUTPUT);
- cGate* portOut = port->getSouthOutputGate();
+ cGate* portOut = port->getSouthOutputGate()->getNextGate();
portOut->disconnect();
+ rmtModuleOut->disconnect();
+ thisIpcOut->disconnect();
thisIpcIn->disconnect();
- thisIpcOut->disconnect();
rmtModuleIn->disconnect();
- rmtModuleOut->disconnect();
-
- fwdtg->removeFlowInfo(flowItem->getRmtPort());
-// fwdTable->remove(port);
- rmtAllocator->removePort(flowItem->getRmtPort());
+
rmtModule->deleteGate(gateName);
- flowItem->getFaBase()->receiveDeallocateRequest(flow);
-
thisIPC->deleteGate(gateName);
+
+ // remove table entries
+ fwdtg->removeFlowInfo(flowItem->getRMTPort());
+ rmtAllocator->removePort(flowItem->getRMTPort());
flowTable->remove(flow);
}
@@ -776,13 +770,6 @@
}
}
- // add another fwtable entry for direct srcApp->dstApp messages (if needed)
- // TODO: there must be a better place to put this
- if (neighAddr != dstAddr)
- {
-// fwdTable->insert(Address(dstAddr), qosID, nm1FlowItem->getRmtPort());
- }
-
if (nm1FlowItem->getConnectionStatus() == NM1FlowTableItem::CON_ESTABLISHED)
{
return true;
@@ -794,6 +781,76 @@
}
+void RA::blockNM1PortOutput(Flow* flow)
+{
+ Enter_Method("blockNM1PortOutput()");
+
+ NM1FlowTableItem* item = flowTable->lookup(flow);
+ if (item == NULL)
+ {
+// EV << "!!! given (N-1)-flow isn't registered in the flow table;"
+// << " ignoring pushback request." << endl;
+ return;
+ }
+
+ item->getRMTPort()->blockOutput();
+}
+
+void RA::unblockNM1PortOutput(Flow* flow)
+{
+ Enter_Method("unblockNM1PortOutput()");
+
+ NM1FlowTableItem* item = flowTable->lookup(flow);
+ if (item == NULL)
+ {
+// EV << "!!! given (N-1)-flow isn't registered in the flow table;"
+// << " ignoring port unblock request." << endl;
+ return;
+ }
+
+ item->getRMTPort()->unblockOutput();
+}
+
+void RA::blockNM1PortInput(cObject* obj)
+{
+ Enter_Method("blockNM1PortInput()");
+
+ PDU* pdu = dynamic_cast<PDU*>(obj);
+ if (pdu != NULL)
+ {
+ NM1FlowTableItem* flowItem = flowTable->findFlowByDstApni(
+ pdu->getSrcAddr().getApname().getName(),
+ pdu->getConnId().getQoSId());
+
+ if (flowItem != NULL)
+ {
+ flowItem->getRMTPort()->blockInput();
+ }
+ }
+}
+
+void RA::unblockNM1PortInput(cObject* obj)
+{
+ Enter_Method("unblockNM1PortInput()");
+
+ PDU* pdu = dynamic_cast<PDU*>(obj);
+ if (pdu != NULL)
+ {
+ NM1FlowTableItem* flowItem = flowTable->findFlowByDstApni(
+ pdu->getSrcAddr().getApname().getName(),
+ pdu->getConnId().getQoSId());
+
+ if (flowItem != NULL)
+ {
+ RMTPort* port = flowItem->getRMTPort();
+ // unblock!
+ port->unblockInput();
+ // resume processing of input queues
+ rmt->invokeQueueDeparturePolicies(port->getFirstQueue(RMTQueue::INPUT));
+ }
+ }
+}
+
void RA::signalizeCreateFlowPositiveToRIBd(Flow* flow)
{
emit(sigRACreFloPosi, flow);
@@ -803,3 +860,17 @@
{
emit(sigRACreFloNega, flow);
}
+
+void RA::signalizeSlowdownRequestToRIBd(cPacket* pdu)
+{
+ Enter_Method("signalizeSlowdownRequestToRIBd()");
+ emit(sigRASDReqFromRMT, pdu);
+}
+
+void RA::signalizeSlowdownRequestToEFCP(cObject* obj)
+{
+ Enter_Method("signalizeSlowdownRequestToEFCP()");
+ // TODO: move this to the listener
+ CongestionDescriptor* congInfo = check_and_cast<CongestionDescriptor*>(obj);
+ emit(sigRASDReqFromRIB, congInfo);
+}