Switch to side-by-side view

--- a/src/DIF/FA/FAI.cc
+++ b/src/DIF/FA/FAI.cc
@@ -23,28 +23,36 @@
 Define_Module(FAI);
 
 FAI::FAI() : FAIBase() {
+    FaModule = NULL;
+    creReqTimer = NULL;
 }
 
 FAI::~FAI() {
-    this->FaModule = NULL;
-    this->FlowObject = NULL;
-    portId = -1;
-    cepId = -1;
-    cancelAndDelete(creReqTimer);
+    FaModule = NULL;
+    FlowObject = NULL;
+    degenerateDataTransfer = false;
+    localPortId     = VAL_UNDEF_PORTID;
+    localCEPId      = VAL_UNDEF_CEPID;
+    remotePortId    = VAL_UNDEF_PORTID;
+    remoteCEPId     = VAL_UNDEF_CEPID;
+    if (creReqTimer)
+        cancelAndDelete(creReqTimer);
 }
 
 void FAI::initialize() {
-    portId = par(PAR_PORTID);
-    cepId  = par(PAR_CEPID);
+    localPortId  = par(PAR_LOCALPORTID);
+    localCEPId   = par(PAR_LOCALCEPID);
+    remotePortId = par(PAR_REMOTEPORTID);
+    remoteCEPId  = par(PAR_REMOTECEPID);
 
     creReqTimeout = par(PAR_CREREQTIMEOUT).doubleValue();
-    creReqTimer = new cMessage(TIM_CREREQ);
 
     AllocRetryPolicy = check_and_cast<AllocateRetryBase*>(getParentModule()->getSubmodule(MOD_ALLOCRETRYPOLICY));
 
+    WATCH(degenerateDataTransfer);
+    WATCH_PTR(FlowObject);
+
     initSignalsAndListeners();
-
-//    this->efcp = (EFCP*)(getParentModule()->getParentModule()->getSubmodule("efcp"));
 }
 
 void FAI::postInitialize(FABase* fa, Flow* fl, EFCP* efcp) {
@@ -52,7 +60,6 @@
     this->FaModule = fa;
     this->FlowObject = fl;
     this->efcp = efcp;
-    //this->sigFAIAllocReq = sigAlReq;
 }
 
 bool FAI::receiveAllocateRequest() {
@@ -95,19 +102,17 @@
 
     // bind this flow to a suitable (N-1)-flow
     RABase* raModule = (RABase*) getModuleByPath("^.^.resourceAllocator.ra");
-    status = raModule->bindNFlowToNM1Flow(FlowObject);
+
+    status = isDegenerateDataTransfer() ? true : raModule->bindNFlowToNM1Flow(FlowObject);
     //IF flow is already available then schedule M_Create(Flow)
     if (status) {
         this->signalizeCreateFlowRequest();
     }
+
     //Everything went fine
     return true;
 }
 
-bool FAI::processDegenerateDataTransfer() {
-    return true;
-}
-
 bool FAI::receiveAllocateResponsePositive() {
     Enter_Method("receiveAllocateResponsePositive()");
 
@@ -118,6 +123,7 @@
         EV << "Cannot continue allocation of flow which is not in pending state" << endl;
         return false;
     }
+
 
     //Instantiate EFCPi
     bool status = this->createEFCPI();
@@ -141,11 +147,14 @@
 
     // bind this flow to a suitable (N-1)-flow
     RABase* raModule = (RABase*) getModuleByPath("^.^.resourceAllocator.ra");
-    raModule->bindNFlowToNM1Flow(FlowObject);
+    status = isDegenerateDataTransfer() ? true : raModule->bindNFlowToNM1Flow(FlowObject);
 
     ft->changeAllocStatus(FlowObject, FAITableEntry::TRANSFER);
     //Signalizes M_Create_R(flow)
-    this->signalizeCreateFlowResponsePositive();
+    if (status) {
+        this->signalizeCreateFlowResponsePositive();
+    }
+
     return true;
 }
 
@@ -159,6 +168,9 @@
     }
 
     ft->changeAllocStatus(FlowObject, FAITableEntry::ALLOC_NEGA);
+
+    //IF it is not DDT then retry M_CREATE
+    //if (!isDegenerateDataTransfer())
     this->signalizeCreateFlowResponseNegative();
 }
 
@@ -272,6 +284,10 @@
     //Change dstCep-Id and dstPortId according to new information
     FlowObject->getConnectionId().setDstCepId(flow->getConId().getDstCepId());
     FlowObject->setDstPortId(flow->getDstPortId());
+    remotePortId = flow->getDstPortId();
+    remoteCEPId = flow->getConId().getDstCepId();
+    par(PAR_REMOTEPORTID) = remotePortId;
+    par(PAR_REMOTECEPID) = remoteCEPId;
 
     //Change status
     FaModule->getFaiTable()->changeAllocStatus(FlowObject, FAITableEntry::TRANSFER);
@@ -313,7 +329,9 @@
 
 std::string FAI::info() const {
     std::stringstream os;
-    os << "FAI>\tPort-ID: " << this->portId << "\tCEP-ID: " << this->cepId;
+    os << "FAI>" << endl
+       << "\tlocal  Port-ID: " << this->localPortId << "\tCEP-ID: " << this->localCEPId << endl
+       << "\tremote Port-ID: " << this->remotePortId << "\tCEP-ID: " << this->remoteCEPId;
     return os.str();
 }
 
@@ -324,7 +342,8 @@
 
 bool FAI::createEFCPI() {
     EV << this->getFullPath() << " attempts to create EFCP instance" << endl;
-    EFCPInstance* efcpi = efcp->createEFCPI(this->getFlow(), cepId);
+    //Create EFCPI for local bindings
+    EFCPInstance* efcpi = efcp->createEFCPI(FlowObject, localCEPId, localPortId);
     return efcpi ? true : false;
 }
 
@@ -332,7 +351,7 @@
     EV << this->getFullPath() << " attempts to bind EFCP and RMT" << endl;
 
     std::ostringstream nameIpcDown;
-    nameIpcDown << GATE_NORTHIO_ << portId;
+    nameIpcDown << GATE_NORTHIO_ << localPortId;
     cModule* IPCModule = FaModule->getParentModule()->getParentModule();
 
     //IF called as consequence of AllocateRequest then createNorthGate
@@ -344,29 +363,26 @@
     cGate* gateIpcDownOut = IPCModule->gateHalf(nameIpcDown.str().c_str(), cGate::OUTPUT);
 
     std::ostringstream nameEfcpNorth;
-    nameEfcpNorth << GATE_APPIO_ << cepId;
+    nameEfcpNorth << GATE_APPIO_ << localPortId;
     cModule* efcpModule = IPCModule->getModuleByPath(".efcp");
     cGate* gateEfcpUpIn = efcpModule->gateHalf(nameEfcpNorth.str().c_str(), cGate::INPUT);
     cGate* gateEfcpUpOut = efcpModule->gateHalf(nameEfcpNorth.str().c_str(), cGate::OUTPUT);
 
     //IPCModule.northIo <--> Efcp.fai
-//    cChannelType* channelType = cChannelType::get("rina.DIF.EFCP.DTP.PushBackChannel");
-//    cChannel* channel = channelType->create("test");
-//    gateEfcpUpOut->connectTo(gateIpcDownOut, channel);
     gateEfcpUpOut->connectTo(gateIpcDownOut);
     gateIpcDownIn->connectTo(gateEfcpUpIn);
 
     //Create bindings in RMT
     RMT* rmtModule = (RMT*) IPCModule->getModuleByPath(".relayAndMux.rmt");
-    rmtModule->createEfcpiGate(cepId);
+    rmtModule->createEfcpiGate(localCEPId);
 
     std::ostringstream nameRmtUp;
-    nameRmtUp << GATE_EFCPIO_ << cepId;
+    nameRmtUp << GATE_EFCPIO_ << localCEPId;
     cGate* gateRmtUpIn = rmtModule->getParentModule()->gateHalf(nameRmtUp.str().c_str(), cGate::INPUT);
     cGate* gateRmtUpOut = rmtModule->getParentModule()->gateHalf(nameRmtUp.str().c_str(), cGate::OUTPUT);
 
     std::ostringstream nameEfcpDown;
-    nameEfcpDown << GATE_RMT_ << cepId;
+    nameEfcpDown << GATE_RMT_ << localCEPId;
     cGate* gateEfcpDownIn = efcpModule->gateHalf(nameEfcpDown.str().c_str(), cGate::INPUT);
     cGate* gateEfcpDownOut = efcpModule->gateHalf(nameEfcpDown.str().c_str(), cGate::OUTPUT);
 
@@ -384,13 +400,13 @@
     EV << this->getFullPath() << " attempts to disconnect bindings between EFCP, IPC and RMT" << endl;
 
     std::ostringstream nameIpcDown;
-    nameIpcDown << GATE_NORTHIO_ << portId;
+    nameIpcDown << GATE_NORTHIO_ << localPortId;
     cModule* IPCModule = FaModule->getParentModule()->getParentModule();
     cGate* gateIpcDownIn = IPCModule->gateHalf(nameIpcDown.str().c_str(), cGate::INPUT);
     cGate* gateIpcDownOut = IPCModule->gateHalf(nameIpcDown.str().c_str(), cGate::OUTPUT);
 
     std::ostringstream nameEfcpNorth;
-    nameEfcpNorth << GATE_APPIO_ << this->getFlow()->getConId().getSrcCepId();
+    nameEfcpNorth << GATE_APPIO_ << localPortId;
     cModule* efcpModule = IPCModule->getModuleByPath(".efcp");
     cGate* gateEfcpUpIn = efcpModule->gateHalf(nameEfcpNorth.str().c_str(), cGate::INPUT);
     cGate* gateEfcpUpOut = efcpModule->gateHalf(nameEfcpNorth.str().c_str(), cGate::OUTPUT);
@@ -408,12 +424,12 @@
     RMT* rmtModule = (RMT*) IPCModule->getModuleByPath(".relayAndMux.rmt");
 
     std::ostringstream nameRmtUp;
-    nameRmtUp << GATE_EFCPIO_ << this->getFlow()->getConId().getSrcCepId();
+    nameRmtUp << GATE_EFCPIO_ << localCEPId;
     cGate* gateRmtUpIn = rmtModule->getParentModule()->gateHalf(nameRmtUp.str().c_str(), cGate::INPUT);
     cGate* gateRmtUpOut = rmtModule->getParentModule()->gateHalf(nameRmtUp.str().c_str(), cGate::OUTPUT);
 
     std::ostringstream nameEfcpDown;
-    nameEfcpDown << GATE_RMT_ << this->getFlow()->getConId().getSrcCepId();
+    nameEfcpDown << GATE_RMT_ << localCEPId;
     cGate* gateEfcpDownIn = efcpModule->gateHalf(nameEfcpDown.str().c_str(), cGate::INPUT);
     cGate* gateEfcpDownOut = efcpModule->gateHalf(nameEfcpDown.str().c_str(), cGate::OUTPUT);
 
@@ -423,7 +439,7 @@
     gateEfcpDownIn->disconnect();
     gateEfcpDownOut->disconnect();
 
-    rmtModule->deleteEfcpiGate(cepId);
+    rmtModule->deleteEfcpiGate(localCEPId);
 
     return true;
 }
@@ -487,6 +503,7 @@
 }
 
 void FAI::signalizeCreateFlowRequest() {
+    creReqTimer = new cMessage(TIM_CREREQ);
     //Start timer
     scheduleAt(simTime() + creReqTimeout, creReqTimer);
     //Signalize RIBd to send M_CREATE(flow)
@@ -526,18 +543,52 @@
     emit(this->sigFAIDeallocRes, this->FlowObject);
 }
 
+int FAI::getLocalCepId() const {
+    return localCEPId;
+}
+
+void FAI::setLocalCepId(int localCepId) {
+    localCEPId = localCepId;
+}
+
+int FAI::getLocalPortId() const {
+    return localPortId;
+}
+
+void FAI::setLocalPortId(int localPortId) {
+    this->localPortId = localPortId;
+}
+
+int FAI::getRemoteCepId() const {
+    return remoteCEPId;
+}
+
+void FAI::setRemoteCepId(int remoteCepId) {
+    remoteCEPId = remoteCepId;
+}
+
+int FAI::getRemotePortId() const {
+    return remotePortId;
+}
+
+void FAI::setRemotePortId(int remotePortId) {
+    this->remotePortId = remotePortId;
+}
+
 void FAI::signalizeAllocateResponsePositive() {
     emit(this->sigFAIAllocResPosi, this->FlowObject);
 }
 
 void FAI::createNorthGates() {
     std::ostringstream nameIpcDown;
-    nameIpcDown << GATE_NORTHIO_ << portId;
+    nameIpcDown << GATE_NORTHIO_ << localPortId;
     cModule* IPCModule = FaModule->getParentModule()->getParentModule();
     IPCModule->addGate(nameIpcDown.str().c_str(), cGate::INOUT, false);
+    return;
 }
 
 void FAI::receiveCreateFlowResponsePositiveFromNminusOne() {
+    Enter_Method("receiveCreFlowResPositiveFromNminusOne()");
     //Schedule M_Create(Flow)
     this->signalizeCreateFlowRequest();
 }