Switch to side-by-side view

--- a/src/DIF/RA/RA.cc
+++ b/src/DIF/RA/RA.cc
@@ -1,3 +1,5 @@
+//
+// Copyright Š 2014 - 2015 PRISTINE Consortium (http://ict-pristine.eu)
 //
 // This program is free software: you can redistribute it and/or modify
 // it under the terms of the GNU Lesser General Public License as published by
@@ -65,8 +67,6 @@
 
     difAllocator = check_and_cast<DA*>
         (getModuleByPath("^.^.^.difAllocator.da"));
-    //fwdTable = check_and_cast<PDUForwardingTable*>
-    //    (getModuleByPath("^.pduForwardingTable"));
     flowTable = check_and_cast<NM1FlowTable*>
         (getModuleByPath("^.nm1FlowTable"));
     rmt = check_and_cast<RMT*>
@@ -107,6 +107,8 @@
 {
     sigRACreFloPosi = registerSignal(SIG_RA_CreateFlowPositive);
     sigRACreFloNega = registerSignal(SIG_RA_CreateFlowNegative);
+    sigRASDReqFromRMT = registerSignal(SIG_RA_InvokeSlowdown);
+    sigRASDReqFromRIB = registerSignal(SIG_RA_ExecuteSlowdown);
 
     lisRAAllocResPos = new LisRAAllocResPos(this);
     thisIPC->subscribe(SIG_FAI_AllocateResponsePositive, lisRAAllocResPos);
@@ -120,6 +122,26 @@
     lisRACreResPosi = new LisRACreResPosi(this);
     thisIPC->getParentModule()->
             subscribe(SIG_RIBD_CreateFlowResponsePositive, this->lisRACreResPosi);
+
+    lisEFCPStopSending = new LisEFCPStopSending(this);
+    thisIPC->getParentModule()->
+            subscribe(SIG_EFCP_StahpSending, this->lisEFCPStopSending);
+
+    lisEFCPStartSending = new LisEFCPStartSending(this);
+    thisIPC->getParentModule()->
+            subscribe(SIG_EFCP_StartSending, this->lisEFCPStartSending);
+
+    lisRMTSDReq = new LisRMTSlowdownRequest(this);
+    thisIPC->subscribe(SIG_RMT_SlowdownRequest, this->lisRMTSDReq);
+
+    lisRIBCongNotif = new LisRIBCongNotif(this);
+    thisIPC->subscribe(SIG_RIBD_CongestionNotification, this->lisRIBCongNotif);
+
+    lisRMTPortDrainDisable = new LisRMTPortDrainDisable(this);
+    thisIPC->subscribe(SIG_RMT_PortDrainDisable, this->lisRMTPortDrainDisable);
+
+    lisRMTPortDrainEnable = new LisRMTPortDrainEnable(this);
+    thisIPC->subscribe(SIG_RMT_PortDrainEnable, this->lisRMTPortDrainEnable);
 }
 
 void RA::initFlowAlloc()
@@ -398,7 +420,7 @@
     // create a mock "(N-1)-port" for interface
     RMTPort* port = rmtAllocator->addPort(NULL);
     // connect the port to the bottom
-    interconnectModules(rmtModule, port, rmtGate.str(), std::string(GATE_SOUTHIO));
+    interconnectModules(rmtModule, port->getParentModule(), rmtGate.str(), std::string(GATE_SOUTHIO));
     // finalize initial port parameters
     port->postInitialize();
     port->setReady();
@@ -433,7 +455,7 @@
 
     // 2) attach a RMTPort instance (pretty much a representation of an (N-1)-port)
     RMTPort* port = rmtAllocator->addPort(flow);
-    interconnectModules(rmtModule, port, thisIPCGate.str(), std::string(GATE_SOUTHIO));
+    interconnectModules(rmtModule, port->getParentModule(), thisIPCGate.str(), std::string(GATE_SOUTHIO));
     // finalize initial port parameters
     port->postInitialize();
 
@@ -521,15 +543,11 @@
     {
         // connect the new flow to the RMT
         RMTPort* port = bindNM1FlowToRMT(targetIPC, fab, flow);
-        // update the PDU forwarding table
-        //fwdTable->insert(Address(dstApn.getName()), flow->getConId().getQoSId(), port);
-        // TODO: remove this when management isn't piggy-backed anymore
+        // TODO: remove this when management isn't piggy-backed anymore!
         // (port shouldn't be ready to send out data when the flow isn't yet allocated)
         port->setReady();
 
-        //fwTable->insert(Address(flow->getDstApni().getApn().getName()),
-        //                flow->getConId().getQoSId(), port);
-
+        // invoke fwdTable insertion policy
         fwdtg->insertFlowInfo(
             Address(flow->getDstApni().getApn().getName()),
             flow->getConId().getQoSId(),
@@ -598,20 +616,6 @@
     // attach the new flow to RMT
     RMTPort* port = bindNM1FlowToRMT(targetIpc, fab, flow);
     // update the PDU forwarding table
-/*
-    fwdTable->insert(Address(dstAPN.getName()), qosID, port);
-
-    // add other accessible applications into forwarding table
-    const APNList* remoteApps = difAllocator->findNeigborApns(dstAPN);
-    if (remoteApps)
-    {
-        for (ApnCItem it = remoteApps->begin(); it != remoteApps->end(); ++it)
-        {
-            Address addr = Address(it->getName());
-            fwdTable->insert(addr, qosID, port);
-        }
-    }
-*/
     fwdtg->insertFlowInfo(
         Address(flow->getDstApni().getApn().getName()),
         flow->getConId().getQoSId(),
@@ -647,7 +651,7 @@
         NM1FlowTableItem* item = flowTable->findFlowByDstApni(neighApn, qosId);
         if (item != NULL)
         {
-            qAllocPolicy->onNFlowAlloc(item->getRmtPort(), flow);
+            qAllocPolicy->onNFlowAlloc(item->getRMTPort(), flow);
         }
     }
 }
@@ -667,21 +671,9 @@
     // TODO: move this to receiveSignal()
     NM1FlowTableItem* item = flowTable->findFlowByDstApni(dstApn.getName(), qosId);
     if (item == NULL) return;
-/*
-    // add other accessible applications into the forwarding table
-    const APNList* remoteApps = difAllocator->findNeigborApns(dstApn);
-    if (remoteApps)
-    {
-        for (ApnCItem it = remoteApps->begin(); it != remoteApps->end(); ++it)
-        {
-            Address addr = Address(it->getName());
-            fwdTable->insert(addr, qosId, item->getRmtPort());
-        }
-    }
-*/
     // mark this flow as connected
     item->setConnectionStatus(NM1FlowTableItem::CON_ESTABLISHED);
-    item->getRmtPort()->setReady();
+    item->getRMTPort()->setReady();
 }
 
 /**
@@ -693,29 +685,31 @@
 { // TODO: part of this should be split into something like postNM1FlowDeallocation
 
     NM1FlowTableItem* flowItem = flowTable->lookup(flow);
+    ASSERT(flowItem != NULL);
     flowItem->setConnectionStatus(NM1FlowTableItem::CON_RELEASING);
-
-    RMTPort* port = flowItem->getRmtPort();
+    flowItem->getFABase()->receiveDeallocateRequest(flow);
+
+    // disconnect and delete gates
+    RMTPort* port = flowItem->getRMTPort();
     const char* gateName = flowItem->getGateName().c_str();
     cGate* thisIpcIn = thisIPC->gateHalf(gateName, cGate::INPUT);
     cGate* thisIpcOut = thisIPC->gateHalf(gateName, cGate::OUTPUT);
     cGate* rmtModuleIn = rmtModule->gateHalf(gateName, cGate::INPUT);
     cGate* rmtModuleOut = rmtModule->gateHalf(gateName, cGate::OUTPUT);
-    cGate* portOut = port->getSouthOutputGate();
+    cGate* portOut = port->getSouthOutputGate()->getNextGate();
 
     portOut->disconnect();
+    rmtModuleOut->disconnect();
+    thisIpcOut->disconnect();
     thisIpcIn->disconnect();
-    thisIpcOut->disconnect();
     rmtModuleIn->disconnect();
-    rmtModuleOut->disconnect();
-
-    fwdtg->removeFlowInfo(flowItem->getRmtPort());
-//    fwdTable->remove(port);
-    rmtAllocator->removePort(flowItem->getRmtPort());
+
     rmtModule->deleteGate(gateName);
-    flowItem->getFaBase()->receiveDeallocateRequest(flow);
-
     thisIPC->deleteGate(gateName);
+
+    // remove table entries
+    fwdtg->removeFlowInfo(flowItem->getRMTPort());
+    rmtAllocator->removePort(flowItem->getRMTPort());
     flowTable->remove(flow);
 }
 
@@ -776,13 +770,6 @@
         }
     }
 
-    // add another fwtable entry for direct srcApp->dstApp messages (if needed)
-    // TODO: there must be a better place to put this
-    if (neighAddr != dstAddr)
-    {
-//        fwdTable->insert(Address(dstAddr), qosID, nm1FlowItem->getRmtPort());
-    }
-
     if (nm1FlowItem->getConnectionStatus() == NM1FlowTableItem::CON_ESTABLISHED)
     {
         return true;
@@ -794,6 +781,76 @@
 }
 
 
+void RA::blockNM1PortOutput(Flow* flow)
+{
+    Enter_Method("blockNM1PortOutput()");
+
+    NM1FlowTableItem* item = flowTable->lookup(flow);
+    if (item == NULL)
+    {
+//        EV << "!!! given (N-1)-flow isn't registered in the flow table;"
+//           << " ignoring pushback request." << endl;
+        return;
+    }
+
+    item->getRMTPort()->blockOutput();
+}
+
+void RA::unblockNM1PortOutput(Flow* flow)
+{
+    Enter_Method("unblockNM1PortOutput()");
+
+    NM1FlowTableItem* item = flowTable->lookup(flow);
+    if (item == NULL)
+    {
+//        EV << "!!! given (N-1)-flow isn't registered in the flow table;"
+//           << " ignoring port unblock request." << endl;
+        return;
+    }
+
+    item->getRMTPort()->unblockOutput();
+}
+
+void RA::blockNM1PortInput(cObject* obj)
+{
+    Enter_Method("blockNM1PortInput()");
+
+    PDU* pdu = dynamic_cast<PDU*>(obj);
+    if (pdu != NULL)
+    {
+        NM1FlowTableItem* flowItem = flowTable->findFlowByDstApni(
+                pdu->getSrcAddr().getApname().getName(),
+                pdu->getConnId().getQoSId());
+
+        if (flowItem != NULL)
+        {
+            flowItem->getRMTPort()->blockInput();
+        }
+    }
+}
+
+void RA::unblockNM1PortInput(cObject* obj)
+{
+    Enter_Method("unblockNM1PortInput()");
+
+    PDU* pdu = dynamic_cast<PDU*>(obj);
+    if (pdu != NULL)
+    {
+        NM1FlowTableItem* flowItem = flowTable->findFlowByDstApni(
+                pdu->getSrcAddr().getApname().getName(),
+                pdu->getConnId().getQoSId());
+
+        if (flowItem != NULL)
+        {
+            RMTPort* port = flowItem->getRMTPort();
+            // unblock!
+            port->unblockInput();
+            // resume processing of input queues
+            rmt->invokeQueueDeparturePolicies(port->getFirstQueue(RMTQueue::INPUT));
+        }
+    }
+}
+
 void RA::signalizeCreateFlowPositiveToRIBd(Flow* flow)
 {
     emit(sigRACreFloPosi, flow);
@@ -803,3 +860,17 @@
 {
     emit(sigRACreFloNega, flow);
 }
+
+void RA::signalizeSlowdownRequestToRIBd(cPacket* pdu)
+{
+    Enter_Method("signalizeSlowdownRequestToRIBd()");
+    emit(sigRASDReqFromRMT, pdu);
+}
+
+void RA::signalizeSlowdownRequestToEFCP(cObject* obj)
+{
+    Enter_Method("signalizeSlowdownRequestToEFCP()");
+    // TODO: move this to the listener
+    CongestionDescriptor* congInfo = check_and_cast<CongestionDescriptor*>(obj);
+    emit(sigRASDReqFromRIB, congInfo);
+}