--- a/src/DIF/FA/FAI.cc
+++ b/src/DIF/FA/FAI.cc
@@ -23,28 +23,36 @@
Define_Module(FAI);
FAI::FAI() : FAIBase() {
+ FaModule = NULL;
+ creReqTimer = NULL;
}
FAI::~FAI() {
- this->FaModule = NULL;
- this->FlowObject = NULL;
- portId = -1;
- cepId = -1;
- cancelAndDelete(creReqTimer);
+ FaModule = NULL;
+ FlowObject = NULL;
+ degenerateDataTransfer = false;
+ localPortId = VAL_UNDEF_PORTID;
+ localCEPId = VAL_UNDEF_CEPID;
+ remotePortId = VAL_UNDEF_PORTID;
+ remoteCEPId = VAL_UNDEF_CEPID;
+ if (creReqTimer)
+ cancelAndDelete(creReqTimer);
}
void FAI::initialize() {
- portId = par(PAR_PORTID);
- cepId = par(PAR_CEPID);
+ localPortId = par(PAR_LOCALPORTID);
+ localCEPId = par(PAR_LOCALCEPID);
+ remotePortId = par(PAR_REMOTEPORTID);
+ remoteCEPId = par(PAR_REMOTECEPID);
creReqTimeout = par(PAR_CREREQTIMEOUT).doubleValue();
- creReqTimer = new cMessage(TIM_CREREQ);
AllocRetryPolicy = check_and_cast<AllocateRetryBase*>(getParentModule()->getSubmodule(MOD_ALLOCRETRYPOLICY));
+ WATCH(degenerateDataTransfer);
+ WATCH_PTR(FlowObject);
+
initSignalsAndListeners();
-
-// this->efcp = (EFCP*)(getParentModule()->getParentModule()->getSubmodule("efcp"));
}
void FAI::postInitialize(FABase* fa, Flow* fl, EFCP* efcp) {
@@ -52,7 +60,6 @@
this->FaModule = fa;
this->FlowObject = fl;
this->efcp = efcp;
- //this->sigFAIAllocReq = sigAlReq;
}
bool FAI::receiveAllocateRequest() {
@@ -95,19 +102,17 @@
// bind this flow to a suitable (N-1)-flow
RABase* raModule = (RABase*) getModuleByPath("^.^.resourceAllocator.ra");
- status = raModule->bindNFlowToNM1Flow(FlowObject);
+
+ status = isDegenerateDataTransfer() ? true : raModule->bindNFlowToNM1Flow(FlowObject);
//IF flow is already available then schedule M_Create(Flow)
if (status) {
this->signalizeCreateFlowRequest();
}
+
//Everything went fine
return true;
}
-bool FAI::processDegenerateDataTransfer() {
- return true;
-}
-
bool FAI::receiveAllocateResponsePositive() {
Enter_Method("receiveAllocateResponsePositive()");
@@ -118,6 +123,7 @@
EV << "Cannot continue allocation of flow which is not in pending state" << endl;
return false;
}
+
//Instantiate EFCPi
bool status = this->createEFCPI();
@@ -141,11 +147,14 @@
// bind this flow to a suitable (N-1)-flow
RABase* raModule = (RABase*) getModuleByPath("^.^.resourceAllocator.ra");
- raModule->bindNFlowToNM1Flow(FlowObject);
+ status = isDegenerateDataTransfer() ? true : raModule->bindNFlowToNM1Flow(FlowObject);
ft->changeAllocStatus(FlowObject, FAITableEntry::TRANSFER);
//Signalizes M_Create_R(flow)
- this->signalizeCreateFlowResponsePositive();
+ if (status) {
+ this->signalizeCreateFlowResponsePositive();
+ }
+
return true;
}
@@ -159,6 +168,9 @@
}
ft->changeAllocStatus(FlowObject, FAITableEntry::ALLOC_NEGA);
+
+ //IF it is not DDT then retry M_CREATE
+ //if (!isDegenerateDataTransfer())
this->signalizeCreateFlowResponseNegative();
}
@@ -272,6 +284,10 @@
//Change dstCep-Id and dstPortId according to new information
FlowObject->getConnectionId().setDstCepId(flow->getConId().getDstCepId());
FlowObject->setDstPortId(flow->getDstPortId());
+ remotePortId = flow->getDstPortId();
+ remoteCEPId = flow->getConId().getDstCepId();
+ par(PAR_REMOTEPORTID) = remotePortId;
+ par(PAR_REMOTECEPID) = remoteCEPId;
//Change status
FaModule->getFaiTable()->changeAllocStatus(FlowObject, FAITableEntry::TRANSFER);
@@ -313,7 +329,9 @@
std::string FAI::info() const {
std::stringstream os;
- os << "FAI>\tPort-ID: " << this->portId << "\tCEP-ID: " << this->cepId;
+ os << "FAI>" << endl
+ << "\tlocal Port-ID: " << this->localPortId << "\tCEP-ID: " << this->localCEPId << endl
+ << "\tremote Port-ID: " << this->remotePortId << "\tCEP-ID: " << this->remoteCEPId;
return os.str();
}
@@ -324,7 +342,8 @@
bool FAI::createEFCPI() {
EV << this->getFullPath() << " attempts to create EFCP instance" << endl;
- EFCPInstance* efcpi = efcp->createEFCPI(this->getFlow(), cepId);
+ //Create EFCPI for local bindings
+ EFCPInstance* efcpi = efcp->createEFCPI(FlowObject, localCEPId, localPortId);
return efcpi ? true : false;
}
@@ -332,7 +351,7 @@
EV << this->getFullPath() << " attempts to bind EFCP and RMT" << endl;
std::ostringstream nameIpcDown;
- nameIpcDown << GATE_NORTHIO_ << portId;
+ nameIpcDown << GATE_NORTHIO_ << localPortId;
cModule* IPCModule = FaModule->getParentModule()->getParentModule();
//IF called as consequence of AllocateRequest then createNorthGate
@@ -344,29 +363,26 @@
cGate* gateIpcDownOut = IPCModule->gateHalf(nameIpcDown.str().c_str(), cGate::OUTPUT);
std::ostringstream nameEfcpNorth;
- nameEfcpNorth << GATE_APPIO_ << cepId;
+ nameEfcpNorth << GATE_APPIO_ << localPortId;
cModule* efcpModule = IPCModule->getModuleByPath(".efcp");
cGate* gateEfcpUpIn = efcpModule->gateHalf(nameEfcpNorth.str().c_str(), cGate::INPUT);
cGate* gateEfcpUpOut = efcpModule->gateHalf(nameEfcpNorth.str().c_str(), cGate::OUTPUT);
//IPCModule.northIo <--> Efcp.fai
-// cChannelType* channelType = cChannelType::get("rina.DIF.EFCP.DTP.PushBackChannel");
-// cChannel* channel = channelType->create("test");
-// gateEfcpUpOut->connectTo(gateIpcDownOut, channel);
gateEfcpUpOut->connectTo(gateIpcDownOut);
gateIpcDownIn->connectTo(gateEfcpUpIn);
//Create bindings in RMT
RMT* rmtModule = (RMT*) IPCModule->getModuleByPath(".relayAndMux.rmt");
- rmtModule->createEfcpiGate(cepId);
+ rmtModule->createEfcpiGate(localCEPId);
std::ostringstream nameRmtUp;
- nameRmtUp << GATE_EFCPIO_ << cepId;
+ nameRmtUp << GATE_EFCPIO_ << localCEPId;
cGate* gateRmtUpIn = rmtModule->getParentModule()->gateHalf(nameRmtUp.str().c_str(), cGate::INPUT);
cGate* gateRmtUpOut = rmtModule->getParentModule()->gateHalf(nameRmtUp.str().c_str(), cGate::OUTPUT);
std::ostringstream nameEfcpDown;
- nameEfcpDown << GATE_RMT_ << cepId;
+ nameEfcpDown << GATE_RMT_ << localCEPId;
cGate* gateEfcpDownIn = efcpModule->gateHalf(nameEfcpDown.str().c_str(), cGate::INPUT);
cGate* gateEfcpDownOut = efcpModule->gateHalf(nameEfcpDown.str().c_str(), cGate::OUTPUT);
@@ -384,13 +400,13 @@
EV << this->getFullPath() << " attempts to disconnect bindings between EFCP, IPC and RMT" << endl;
std::ostringstream nameIpcDown;
- nameIpcDown << GATE_NORTHIO_ << portId;
+ nameIpcDown << GATE_NORTHIO_ << localPortId;
cModule* IPCModule = FaModule->getParentModule()->getParentModule();
cGate* gateIpcDownIn = IPCModule->gateHalf(nameIpcDown.str().c_str(), cGate::INPUT);
cGate* gateIpcDownOut = IPCModule->gateHalf(nameIpcDown.str().c_str(), cGate::OUTPUT);
std::ostringstream nameEfcpNorth;
- nameEfcpNorth << GATE_APPIO_ << this->getFlow()->getConId().getSrcCepId();
+ nameEfcpNorth << GATE_APPIO_ << localPortId;
cModule* efcpModule = IPCModule->getModuleByPath(".efcp");
cGate* gateEfcpUpIn = efcpModule->gateHalf(nameEfcpNorth.str().c_str(), cGate::INPUT);
cGate* gateEfcpUpOut = efcpModule->gateHalf(nameEfcpNorth.str().c_str(), cGate::OUTPUT);
@@ -408,12 +424,12 @@
RMT* rmtModule = (RMT*) IPCModule->getModuleByPath(".relayAndMux.rmt");
std::ostringstream nameRmtUp;
- nameRmtUp << GATE_EFCPIO_ << this->getFlow()->getConId().getSrcCepId();
+ nameRmtUp << GATE_EFCPIO_ << localCEPId;
cGate* gateRmtUpIn = rmtModule->getParentModule()->gateHalf(nameRmtUp.str().c_str(), cGate::INPUT);
cGate* gateRmtUpOut = rmtModule->getParentModule()->gateHalf(nameRmtUp.str().c_str(), cGate::OUTPUT);
std::ostringstream nameEfcpDown;
- nameEfcpDown << GATE_RMT_ << this->getFlow()->getConId().getSrcCepId();
+ nameEfcpDown << GATE_RMT_ << localCEPId;
cGate* gateEfcpDownIn = efcpModule->gateHalf(nameEfcpDown.str().c_str(), cGate::INPUT);
cGate* gateEfcpDownOut = efcpModule->gateHalf(nameEfcpDown.str().c_str(), cGate::OUTPUT);
@@ -423,7 +439,7 @@
gateEfcpDownIn->disconnect();
gateEfcpDownOut->disconnect();
- rmtModule->deleteEfcpiGate(cepId);
+ rmtModule->deleteEfcpiGate(localCEPId);
return true;
}
@@ -487,6 +503,7 @@
}
void FAI::signalizeCreateFlowRequest() {
+ creReqTimer = new cMessage(TIM_CREREQ);
//Start timer
scheduleAt(simTime() + creReqTimeout, creReqTimer);
//Signalize RIBd to send M_CREATE(flow)
@@ -526,18 +543,52 @@
emit(this->sigFAIDeallocRes, this->FlowObject);
}
+int FAI::getLocalCepId() const {
+ return localCEPId;
+}
+
+void FAI::setLocalCepId(int localCepId) {
+ localCEPId = localCepId;
+}
+
+int FAI::getLocalPortId() const {
+ return localPortId;
+}
+
+void FAI::setLocalPortId(int localPortId) {
+ this->localPortId = localPortId;
+}
+
+int FAI::getRemoteCepId() const {
+ return remoteCEPId;
+}
+
+void FAI::setRemoteCepId(int remoteCepId) {
+ remoteCEPId = remoteCepId;
+}
+
+int FAI::getRemotePortId() const {
+ return remotePortId;
+}
+
+void FAI::setRemotePortId(int remotePortId) {
+ this->remotePortId = remotePortId;
+}
+
void FAI::signalizeAllocateResponsePositive() {
emit(this->sigFAIAllocResPosi, this->FlowObject);
}
void FAI::createNorthGates() {
std::ostringstream nameIpcDown;
- nameIpcDown << GATE_NORTHIO_ << portId;
+ nameIpcDown << GATE_NORTHIO_ << localPortId;
cModule* IPCModule = FaModule->getParentModule()->getParentModule();
IPCModule->addGate(nameIpcDown.str().c_str(), cGate::INOUT, false);
+ return;
}
void FAI::receiveCreateFlowResponsePositiveFromNminusOne() {
+ Enter_Method("receiveCreFlowResPositiveFromNminusOne()");
//Schedule M_Create(Flow)
this->signalizeCreateFlowRequest();
}