Switch to side-by-side view

--- a
+++ b/src/DIF/EFCP/DTP/DTP.cc
@@ -0,0 +1,1337 @@
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+// 
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU Lesser General Public License for more details.
+// 
+// You should have received a copy of the GNU Lesser General Public License
+// along with this program.  If not, see http://www.gnu.org/licenses/.
+// 
+
+#include "DTP.h"
+
+Define_Module(DTP);
+
+DTP::DTP()
+{
+  deletePdu = 0;
+
+  rcvrInactivityTimer = NULL;
+  senderInactivityTimer = NULL;
+
+  rcvrInactivityPolicy = NULL;
+  senderInactivityPolicy = NULL;
+  initialSeqNumPolicy = NULL;
+
+
+
+
+}
+DTP::~DTP()
+{
+
+  flushAllQueuesAndPrepareToDie();
+  if(state.isDtcpPresent()){
+
+  }
+
+}
+
+void DTP::createPolicyModule(cModule* policy, const char* prefix, const char* name)
+{
+
+  if (std::string(par(name).stringValue()).empty())
+  {
+    policy = NULL;
+  }
+  else
+  {
+    std::stringstream moduleName;
+    moduleName << prefix << par(name).stringValue();
+    cModuleType* policyType = cModuleType::get(moduleName.str().c_str());
+    policy = policyType->createScheduleInit(name, getParentModule());
+  }
+}
+
+void DTP::initialize(int step)
+{
+
+  initGates();
+  state.initDefaults();
+
+
+  //FIXME A! Set it based on QoS parameters
+  state.setWinBased(true);
+  state.setRxPresent(false);
+
+  if(state.isDtcpPresent()){
+    senderInactivityTimer = new SenderInactivityTimer();
+    rcvrInactivityTimer = new RcvrInactivityTimer();
+  }else{
+    senderInactivityTimer = NULL;
+    rcvrInactivityTimer = NULL;
+  }
+  par(RCVR_INACTIVITY_POLICY_NAME).setStringValue(getModuleByPath((std::string(".^.^.") + std::string(MOD_EFCP)).c_str())->par(RCVR_INACTIVITY_POLICY_NAME).stringValue());
+  par(SENDER_INACTIVITY_POLICY_NAME).setStringValue(getModuleByPath((std::string(".^.^.") + std::string(MOD_EFCP)).c_str())->par(SENDER_INACTIVITY_POLICY_NAME).stringValue());
+  par(INITIAL_SEQ_NUM_POLICY_NAME).setStringValue(getModuleByPath((std::string(".^.^.") + std::string(MOD_EFCP)).c_str())->par(INITIAL_SEQ_NUM_POLICY_NAME).stringValue());
+
+
+  createPolicyModule(rcvrInactivityPolicy, RCVR_INACTIVITY_POLICY_PREFIX, RCVR_INACTIVITY_POLICY_NAME);
+  createPolicyModule(senderInactivityPolicy, SENDER_INACTIVITY_POLICY_PREFIX, SENDER_INACTIVITY_POLICY_NAME);
+  createPolicyModule(initialSeqNumPolicy, INITIAL_SEQ_NUM_POLICY_PREFIX, INITIAL_SEQ_NUM_POLICY_NAME);
+
+
+//  redrawGUI();
+
+}
+
+void DTP::initGates()
+{
+  northI = this->gateHalf(GATE_DTP_NORTHIO, cGate::INPUT);
+  northO = this->gateHalf(GATE_DTP_NORTHIO, cGate::OUTPUT);
+  southI = this->gateHalf(GATE_DTP_SOUTHIO, cGate::INPUT);
+  southO = this->gateHalf(GATE_DTP_SOUTHIO, cGate::OUTPUT);
+}
+
+const QoSCube* DTP::getQoSCube() const
+{
+  return state.getQoSCube();
+}
+
+void DTP::setQoSCube(const QoSCube* qosCube)
+{
+  //TODO A2 Make copy
+  state.setQoSCube(qosCube);
+  if(qosCube->isForceOrder()){
+    state.setRxPresent(true);
+  }
+  //TODO A1
+  if(qosCube->getAvgBand() > 0){
+    state.setWinBased(true);
+  }
+}
+
+void DTP::setPduDroppingEnabled(bool pduDroppingEnabled)
+{
+  this->pduDroppingEnabled = pduDroppingEnabled;
+}
+
+void DTP::flushAllQueuesAndPrepareToDie()
+{
+    cancelAndDelete(senderInactivityTimer);
+    senderInactivityTimer = NULL;
+    cancelAndDelete(rcvrInactivityTimer);
+    rcvrInactivityTimer = NULL;
+
+
+}
+
+void DTP::redrawGUI()
+{
+  if (!ev.isGUI())
+  {
+      return;
+  }
+
+  dtcp->redrawGUI();
+
+  cDisplayString& disp = getDisplayString();
+  disp.setTagArg("t", 1, "r");
+  std::ostringstream desc;
+  desc << "nextSeqNum: " << state.getNextSeqNumToSendWithoutIncrement() <<"\n";
+  desc << "sLWE: " << dtcp->dtcpState->getSenderLeftWinEdge() <<"\n";
+  if(state.isDtcpPresent() && state.isFCPresent()){
+    desc << "sRWE: " << dtcp->getSndRtWinEdge() << "\n";
+  }
+  desc << "rLWE: " << state.getRcvLeftWinEdge() <<"\n";
+  desc << "maxSeqNumRcvd: " << state.getMaxSeqNumRcvd() <<"\n";
+  if(state.isDtcpPresent() && state.isFCPresent()){
+    desc << "rRWE: " << dtcp->getRcvRtWinEdge() << "\n";
+  }
+
+  std::vector<DataTransferPDU*>::iterator it;
+  std::vector<DataTransferPDU*>* pduQ;
+
+  pduQ = state.getReassemblyPDUQ();
+  if(pduQ->empty()){
+      desc << "reassemblyQ: empty"<< "\n";
+    }
+    else
+    {
+      desc << "reassemblyQ: ";
+      std::vector<DataTransferPDU*>::iterator it;
+      for (it = pduQ->begin(); it != pduQ->end(); ++it)
+      {
+        desc << (*it)->getSeqNum() << " | ";
+      }
+      desc << "\n";
+    }
+  if(state.isDtcpPresent() && state.isWinBased()){
+
+  }
+
+  desc << "droppedPDU: " << state.getDropDup();
+
+  disp.setTagArg("t", 0, desc.str().c_str());
+
+}
+
+
+
+/**
+ *
+ * @param connId
+ */
+void DTP::setConnId(const ConnectionId& connId)
+{
+  this->connId = connId;
+}
+
+void DTP::handleMessage(cMessage *msg)
+{
+  if (msg->isSelfMessage())
+  {
+    /* Timers */
+    DTPTimers* timer = static_cast<DTPTimers*>(msg);
+    switch(timer->getType()){
+      case(DTP_RCVR_INACTIVITY_TIMER):{
+        handleDTPRcvrInactivityTimer(static_cast<RcvrInactivityTimer*>(timer));
+        break;
+      }
+
+      case(DTP_SENDER_INACTIVITY_TIMER):{
+        handleDTPSenderInactivityTimer(static_cast<SenderInactivityTimer*>(timer));
+        break;
+      }
+
+      case(DTP_A_TIMER):{
+        handleDTPATimer(static_cast<ATimer*>(timer));
+        break;
+      }
+    }
+  }
+  else
+  {
+
+    /* Either PDUs from RMT or SDUs from AP */
+    if (msg->arrivedOn(northI->getId()))
+    {
+      //handle SDUs
+//          handleSDUs((CDAPMessage*) msg);
+//      handleMsgFromDelimiting((Data*) msg);
+      handleMsgFromDelimitingnew((SDU*) msg);
+
+    }
+    else if (msg->arrivedOn(southI->getId()))
+    {
+//      handleMsgFromRmt((PDU*) msg);
+      handleMsgFromRMT((PDU*) msg);
+    }
+  }
+
+  redrawGUI();
+}
+
+/**
+ * This method fills header fields in given @param pdu
+ * @param pdu
+ */
+void DTP::setPDUHeader(PDU* pdu)
+{
+  pdu->setConnId(this->flow->getConId());
+  pdu->setSrcAddr(this->flow->getSrcAddr());
+  pdu->setDstAddr(this->flow->getDstAddr());
+  pdu->setSrcApn(this->flow->getDstApni().getApn());
+  pdu->setDstApn(this->flow->getSrcApni().getApn());
+}
+
+void DTP::handleMsgFromDelimitingnew(SDU* sdu){
+  cancelEvent(senderInactivityTimer);
+
+  delimit(sdu);
+
+  generatePDUsnew();
+
+  trySendGenPDUs(state.getGeneratedPDUQ());
+
+  schedule(senderInactivityTimer);
+}
+
+void DTP::addPDUToReassemblyQ(DataTransferPDU* pdu)
+{
+
+ state.addPDUToReassemblyQ(pdu);
+}
+
+
+void DTP::delimitFromRMT(DataTransferPDU* pdu)
+{
+  Enter_Method_Silent();
+  PDUQ_t* pduQ = state.getReassemblyPDUQ();
+
+  for (PDUQ_t::iterator it = pduQ->begin(); it != pduQ->end(); )
+  {/* DO NOT FORGET TO PUT '++it' in all cases where we DO NOT erase PDUs from queue */
+
+    //TODO B1 add support for out of order SDU delivery
+    // negative implication
+    unsigned int seqNum = (*it)->getSeqNum();
+    if((getQoSCube()->isForceOrder() && ! (seqNum < state.getRcvLeftWinEdge()))){
+      return;
+    }
+
+
+    unsigned int delimitFlags = (*it)->getUserDataField()->getSduDelimitFlags();
+    if((delimitFlags & SDU_NO_LENGTH) == SDU_NO_LENGTH){
+//      if(delimitFlags & SDU_)
+      EV << getFullPath() << ": Unhandled condition in delimiting !!!!!!!!!!!!!!!!!!!!!!!!!!!!" <<endl;
+    }else{
+      if((delimitFlags & SDU_L_COMP_SDU) == SDU_L_COMP_SDU){
+        /* PDU contain ZERO or more complete SDUs */
+        /*  TODO PUT this in separate method */
+
+        UserDataField* userData = (*it)->getUserDataField();
+
+        SDU* sdu;
+        if(userData != NULL){
+          while ((sdu = userData->getData()) != NULL)
+          {
+          //TODO Delimiting/de-fragmentation
+//          take(sdu);
+            take(sdu);
+
+            //XXX We assume that every SDU is numbered.
+            state.setLastSduDelivered(sdu->getSeqNum());
+
+            send(sdu, northO);
+          }
+        }
+//        delete userData;
+
+        delete (*it);
+
+        it = pduQ->erase(it);
+      }
+    }
+  }
+}
+
+bool DTP::commonRcvControl(ControlPDU* pdu)
+{
+  if (pdu->getSeqNum() <= dtcp->getLastCtrlSeqNumRcv())/* Duplicate ControlPDU */
+  {
+    if ((pdu->getType() & PDU_ACK_BIT) == PDU_ACK_BIT)
+    {
+      dtcp->incDupAcks();
+    }
+    if ((pdu->getType() & PDU_FC_BIT) == PDU_FC_BIT)
+    {
+      dtcp->incDupFC();
+    }
+    delete pdu;
+    return false;
+  }
+  else if (pdu->getSeqNum() > dtcp->getLastCtrlSeqNumRcv() + 1)/* Out of order */
+  {
+    /* LostControlPDU Policy */
+    dtcp->runLostControlPDUPolicy(&state);
+  }
+  else
+  {
+    dtcp->setLastCtrlSeqnumRcvd(pdu->getSeqNum());
+  }
+
+  return true;
+}
+
+void DTP::sendFCOnlyPDU()
+{
+  Enter_Method_Silent();
+
+  FlowControlOnlyPDU* fcOnlyPdu = new FlowControlOnlyPDU();
+
+  setPDUHeader(fcOnlyPdu);
+
+  fcOnlyPdu->setSeqNum(dtcp->getNextSndCtrlSeqNum());
+
+  fillFlowControlPDU(fcOnlyPdu);
+
+  sendToRMT(fcOnlyPdu);
+}
+
+void DTP::fillFlowControlPDU(FlowControlPDU* flowControlPdu)
+{
+
+  //      unsigned int newRightWinEdge;
+  flowControlPdu->setNewRightWinEdge(dtcp->getRcvRtWinEdge());
+  //        unsigned int newRate;
+  flowControlPdu->setNewRate(dtcp->getRcvrRate());
+  //          unsigned int timeUnit;
+  flowControlPdu->setTimeUnit(dtcp->getSendingTimeUnit());
+  //          unsigned int myLeftWinEdge;
+  flowControlPdu->setMyLeftWinEdge(state.getRcvLeftWinEdge());
+  //          unsigned int myRightWinEdge;
+  //TODO A1 Change it. Get value from dtcp flowControl rcvRightWindowEdge
+  // Also make sure the rcvRightWindowEdge is properly updated whenever rcvLeftWindowEdge gets updated
+  flowControlPdu->setMyRightWinEdge(dtcp->getSndRtWinEdge());
+  //          unsigned int myRcvRate;
+  flowControlPdu->setMyRcvRate(dtcp->getSendingRate());
+}
+
+void DTP::sendAckFlowPDU(unsigned int seqNum, bool seqNumValid)
+{
+
+
+  if(!seqNumValid){
+    seqNum = state.getRcvLeftWinEdge() - 1;
+  }
+
+  if(state.isFCPresent() && state.isRxPresent()){
+    // Send Ack/Flow Control PDU with LWE and RWE
+    AckFlowPDU* ackFlowPdu = new AckFlowPDU();
+    setPDUHeader(ackFlowPdu);
+    ackFlowPdu->setSeqNum(dtcp->getNextSndCtrlSeqNum());
+
+    ackFlowPdu->setAckNackSeqNum(state.getRcvLeftWinEdge() - 1);
+
+    fillFlowControlPDU(ackFlowPdu);
+    sendToRMT(ackFlowPdu);
+  }else if(state.isFCPresent()){
+    // send FC
+
+    sendFCOnlyPDU();
+  }else if(state.isRxPresent()){
+    sendAckOnlyPDU(seqNum);
+  }
+
+
+}
+
+void DTP::handleMsgFromRMT(PDU* msg){
+
+  state.setCurrentPdu(msg);
+
+  if (dynamic_cast<DataTransferPDU*>(msg))
+  {
+    DataTransferPDU* pdu = (DataTransferPDU*) msg;
+
+    /* This is here just for testing RX */
+    if (pduDroppingEnabled)
+    {
+      if (((deletePdu++ + 1) % 5) == 0)
+      {
+        std::ostringstream out;
+        out << "Dropping PDU number " << pdu->getSeqNum();
+        bubble(out.str().c_str());
+        EV << this->getFullPath() << "; " << out.str().c_str() << " in time: " << simTime() << endl;
+        delete pdu;
+        pduDroppingEnabled = false;
+        return;
+      }
+    }
+
+    /* End */
+
+    cancelEvent(rcvrInactivityTimer);
+    handleDataTransferPDUFromRMT(pdu);
+    schedule(rcvrInactivityTimer);
+
+
+  }
+  else if (dynamic_cast<ControlPDU*>(msg))
+  {
+    ControlPDU* pdu = (ControlPDU*) msg;
+    EV << getFullPath() <<": Control PDU number: " << pdu->getSeqNum() <<" received" << endl;
+
+
+    //Putting it before ControlAckPDU might be an issue
+    if(!commonRcvControl(pdu)){
+      //The Control PDU has been deleted because of duplicate
+      return;
+    }
+
+
+    if(dynamic_cast<ControlAckPDU*>(pdu))
+    {
+      dtcp->runRcvrControlAckPolicy(&state);
+    }
+    else
+    {
+
+      if (pdu->getType() & PDU_SEL_BIT)
+      {
+        /*
+         SELECT_ACK_PDU        = 0x8806;
+         SELECT_NACK_PDU       = 0x8807;
+         SELECT_ACK_FLOW_PDU   = 0x880E;
+         SELECT_NACK_FLOW_PDU  = 0x880F;
+         */
+
+        //TODO A1 RTT estimator
+        if (pdu->getType() & PDU_NACK_BIT)
+        {
+          SelectiveNackPDU* selNackPdu = (SelectiveNackPDU*) pdu;
+
+          for (unsigned int i = 0; i < selNackPdu->getNackListLen(); i++)
+          {
+            unsigned int startSeqNum = selNackPdu->getNackList(i * 2);
+            unsigned int endSeqNum = selNackPdu->getNackList((i * 2) + 1);
+
+            dtcp->nackPDU(startSeqNum, endSeqNum);
+
+          }
+        }
+        else if (pdu->getType() & PDU_ACK_BIT)
+        {
+          SelectiveAckPDU* selAckPdu = (SelectiveAckPDU*) pdu;
+          unsigned int tempSLWE = 0;
+          for (unsigned int i = 0; i < selAckPdu->getNackListLen(); i++)
+          {
+            unsigned int startSeqNum = selAckPdu->getNackList(i * 2);
+            unsigned int endSeqNum = selAckPdu->getNackList((i * 2) + 1);
+
+            tempSLWE = std::max(endSeqNum, tempSLWE);
+
+            dtcp->ackPDU(startSeqNum, endSeqNum);
+
+            // min = dtcp->getSmallestSeqNumFromRxQOrNextSeqNumToSend - 1
+            //state.setSenderLeftWinEdge(std::min(min, state.getSenderLeftWinEdge);
+
+//            state.setSenderLeftWinEdge(tempSLWE);
+            //TODO B2 O'really? Shouldn't it always be nextSeqNum -1?
+            dtcp->updateSenderLWE(tempSLWE + 1);
+
+          }
+        }
+
+      }
+      else
+
+      if (pdu->getType() & (PDU_ACK_BIT | PDU_NACK_BIT))
+      {
+
+
+        //TODO A1 Retrieve the Time of this Ack - RTT estimator policy
+
+        if (pdu->getType() & PDU_NACK_BIT)
+        {
+
+          NackOnlyPDU *nackPdu = (NackOnlyPDU*) pdu;
+
+          dtcp->nackPDU(nackPdu->getAckNackSeqNum());
+
+        }
+        else if (pdu->getType() & PDU_ACK_BIT)
+        {
+          AckOnlyPDU *ackPdu = (AckOnlyPDU*) pdu;
+          EV << getFullPath() <<": PDU number: " << ackPdu->getAckNackSeqNum() <<" Acked"<<endl;
+
+          dtcp->runSenderAckPolicy(&state);
+
+        }
+        /* End of Ack/Nack */
+
+      }
+
+      if ((pdu->getType() & PDU_FC_BIT))
+      {
+        FlowControlOnlyPDU *flowPdu = (FlowControlOnlyPDU*) pdu;
+
+        //Update RightWindowEdge and SendingRate.
+        dtcp->setSndRtWinEdge(flowPdu->getNewRightWinEdge());
+        dtcp->setSendingRate(flowPdu->getNewRate());
+
+        if (dtcp->getDTCPState()->getClosedWinQueLen() > 0)
+        {
+          /* Note: The ClosedWindow flag could get set back to true immediately in trySendGenPDUs */
+          dtcp->getDTCPState()->setClosedWindow(false);
+          trySendGenPDUs(dtcp->getDTCPState()->getClosedWindowQ());
+        }
+      }
+
+    }
+    delete pdu;
+  }else{
+
+    throw cRuntimeError("Unexptected PDU Type");
+  }
+
+
+
+  //TODO A1 not sure about this scheduling; yeah it will be deleted
+//  schedule(rcvrInactivityTimer);
+
+
+  state.setCurrentPdu(NULL);
+}
+
+void DTP::startATimer(unsigned int seqNum)
+{
+  Enter_Method_Silent();
+  ATimer* aTimer = new ATimer();
+  aTimer->setSeqNum(seqNum);
+  schedule(aTimer);
+}
+
+bool DTP::isDuplicate(unsigned int seqNum)
+{
+  /* Not a true duplicate. (Might be a duplicate among the gaps) */
+  bool dup = false;
+  PDUQ_t::iterator it;
+  PDUQ_t* pduQ = state.getReassemblyPDUQ();
+  for (it = pduQ->begin(); it != pduQ->end(); ++it)
+  {
+    if ((*it)->getSeqNum() == seqNum)
+    {
+      dup = true;
+      break;
+    }
+  }
+  return dup;
+}
+
+void DTP::handleDataTransferPDUFromRMT(DataTransferPDU* pdu){
+
+
+  EV << getFullPath() << ": PDU number: " << pdu->getSeqNum() << " received" << endl;
+
+  if (state.isFCPresent())
+  {
+//    dtcp->resetWindowTimer();
+
+    /* Run ECN policy */
+    dtcp->runECNPolicy(&state);
+
+  }
+  // if PDU.DRF == true
+  if (pdu->getFlags() & DRF_FLAG)
+  {
+    bubble("Received PDU with DRF set");
+    /* Case 1) DRF is set - either first PDU or new run */
+
+    delimitFromRMT(NULL);
+
+    //Flush the PDUReassemblyQueue
+    flushReassemblyPDUQ();
+
+    state.setMaxSeqNumRcvd(pdu->getSeqNum());
+
+    //Put PDU on ReassemblyQ
+    addPDUToReassemblyQ(pdu);
+
+    //XXX Setting DRF would create infinity loop of setting DRF for every PDU, right?
+    /* Initialize the other direction */
+//    state.setSetDrfFlag(true);
+
+    //s check if this is needed to uncomment
+    /* If this is a new run then I should set my rcvrLeftWindowEdge to pdu->seqNum +1 */
+    state.setRcvLeftWinEdge(pdu->getSeqNum() + 1);
+
+    // WHY??? -> Then don't.
+//    runInitialSequenceNumberPolicy();
+
+    if (state.isDtcpPresent())
+    {
+      svUpdate(pdu->getSeqNum());
+
+    }
+    delimitFromRMT(NULL);
+
+  }
+  else
+  {
+
+    /* Not the start of a run */
+    if (pdu->getSeqNum() < state.getRcvLeftWinEdge())
+    {
+      bubble("Dropping duplicate PDU");
+      EV << getFullPath() << ":Duplicated PDU number: " << pdu->getSeqNum() << " received - DROPPING!" << endl;
+      /* Case 2) A Real Duplicate */
+      //Discard PDU and increment counter of dropped duplicates PDU
+      delete pdu;
+
+      state.incDropDup();
+
+
+      sendAckFlowPDU();
+
+      return;
+    }
+    if (state.getRcvLeftWinEdge() <= pdu->getSeqNum() && pdu->getSeqNum() <= state.getMaxSeqNumRcvd())
+//    if (state.getRcvLeftWinEdge() < pdu->getSeqNum())
+    {
+      /* Not a true duplicate. (Might be a duplicate among the gaps) */
+
+
+      if (isDuplicate(pdu->getSeqNum()))
+      {
+        /* Case 3) Duplicate Among gaps */
+        EV << getFullPath() << ":Duplicated PDU number: " << pdu->getSeqNum() << " received - DROPPING!" << endl;
+        //Discard PDU and increment counter of dropped duplicates PDU
+        delete pdu;
+        //increment counter of dropped duplicates PDU
+        state.incDropDup();
+
+        //send an Ack/Flow Control PDU with current window values
+        sendAckFlowPDU();
+        return;
+      }
+      else
+      {
+        /* Case 3) This goes in a gap */
+        /* Put at least the User-Data of the PDU with its Sequence Number on PDUReassemblyQueue in Sequence Number order */
+
+//      reassemblyPDUQ.push_back(pdu);
+        addPDUToReassemblyQ(pdu);
+        if (state.isDtcpPresent())
+        {
+          svUpdate(state.getMaxSeqNumRcvd()); /* Update left edge, etc */
+//          svUpdate(state.getRcvLeftWinEdge() - 1); /* Update left edge, etc */
+        }
+        else
+        {
+          state.setRcvLeftWinEdge(state.getMaxSeqNumRcvd());
+//          state.setRcvLeftWinEdge(pdu->getSeqNum());
+          /* No A-Timer necessary, already running */
+        }
+
+        delimitFromRMT(NULL);
+        return;
+      }
+    }
+    /* Case 4) This is in order */
+    if (pdu->getSeqNum() == state.getMaxSeqNumRcvd() + 1)
+//    if (pdu->getSeqNum() == state.getRcvLeftWinEdge())
+    {
+    state.incMaxSeqNumRcvd();
+      //XXX This is not mentioned in the specs. IMHO this is the most important part ;)
+      addPDUToReassemblyQ(pdu);
+
+      if (state.isDtcpPresent())
+      {
+        svUpdate(state.getMaxSeqNumRcvd()); /* Update Left Edge, etc. */
+//        svUpdate(pdu->getSeqNum()); /* Update Left Edge, etc. */
+      }
+      else
+      {
+        state.setRcvLeftWinEdge(state.getMaxSeqNumRcvd());
+//        state.incRcvLeftWindowEdge();
+        //start A-Timer (for this PDU)
+        startATimer(pdu->getSeqNum());
+
+      }
+
+      delimitFromRMT(NULL);/* Create as many whole SDUs as possible */
+
+    }
+    else
+    {
+      //XXX After removing the MaxSeqNumRcvd, this case no longer makes sense.
+      //This assumption needs to be verified with somebody (John, Michael, Leo, Miguel, ...?)
+
+      /* Case 5) it is out of order */
+      if (pdu->getSeqNum() > state.getMaxSeqNumRcvd() + 1)
+//      if (pdu->getSeqNum() > state.getRcvLeftWinEdge() + 1)
+      {
+        //TODO A! Mention it to others
+        /* NOT IN SPECS */
+        addPDUToReassemblyQ(pdu);
+
+        state.setMaxSeqNumRcvd(pdu->getSeqNum());
+        if (state.isDtcpPresent())
+        {
+          svUpdate(state.getMaxSeqNumRcvd()); /* Update Left Edge, etc. */
+//          svUpdate(state.getRcvLeftWinEdge()); /* Update Left Edge, etc. */
+        }
+        else
+        {
+//          state.setRcvLeftWinEdge(state.getMaxSeqNumRcvd());
+//          state.setMaxSeqNumRcvd(pdu->getSeqNum());
+
+          startATimer(state.getMaxSeqNumRcvd());
+        }
+
+        delimitFromRMT(pdu);
+      }
+      //TODO Find out why there is sequenceNumber -> Start RcvrInactivityTimer(PDU.SequenceNumber) /* Backstop timer */
+    }
+//    schedule(rcvrInactivityTimer);
+    //TODO A1 DIF.integrity
+    /* If we are encrypting, we can't let PDU sequence numbers roll over */
+
+    //If DIF.Integrity and PDU.SeqNum > SequenceNumberRollOverThreshhold Then
+    ///* Security requires a new flow */
+    //RequestFAICreateNewConnection( PDU.FlowID )
+    //Fi
+  }
+}
+
+void DTP::handleDTPRcvrInactivityTimer(RcvrInactivityTimer* timer)
+{
+
+  runRcvrInactivityTimerPolicy();
+
+
+}
+
+void DTP::handleDTPSenderInactivityTimer(SenderInactivityTimer* timer)
+{
+  runSenderInactivityTimerPolicy();
+
+}
+
+void DTP::resetSenderInactivTimer()
+{
+  Enter_Method_Silent();
+  cancelEvent(senderInactivityTimer);
+  schedule(senderInactivityTimer);
+}
+
+void DTP::handleDTPATimer(ATimer* timer)
+{
+
+  if(state.isDtcpPresent()){
+//    runSendingAckPolicy(timer);
+    dtcp->runSendingAckPolicy(&state, timer);
+  }else{
+
+    //TODO A1
+    //Update RcvLeftWindowEdge
+
+    if(state.getRcvLeftWinEdge() > timer->getSeqNum()){
+      throw cRuntimeError("RcvLeftWindowEdge SHOULD not be bigger than seqNum in A-Timer, right?");
+    }else{
+      state.setRcvLeftWinEdge(timer->getSeqNum());
+    }
+
+
+    //Invoke delimiting
+    delimitFromRMT(NULL);
+
+    //reset SenderInactivity timer
+    resetSenderInactivTimer();
+  }
+}
+
+unsigned int DTP::delimit(SDU* sdu)
+{
+
+  //TODO B2 Does PDU header counts to MaxFlowPDUSize? edit: yes, it does!
+  if (sdu->getSize() > state.getMaxFlowPduSize())
+  {
+    unsigned int size = state.getMaxFlowPduSize();
+    //SDU is bigger than Max Allowed PDU
+    //S/DUFrag *frag = new SDUFrag
+
+    SDU* tmp;
+    unsigned int i;
+    for (i = 0; i * size < sdu->getSize(); i++)
+    {
+      tmp = sdu->genFragment(size, i, i * size);
+      dataQ.push_back(tmp);
+
+    }
+    return i;
+  }
+  else
+  {
+    dataQ.push_back(sdu);
+    return 1;
+  }
+
+}
+
+
+unsigned int DTP::delimitFromRMT(PDU *pdu, unsigned int len)
+{
+  unsigned int counter = 0;
+
+//  if()
+
+  return counter;
+}
+
+bool DTP::setDRFInPDU(bool override)
+{
+  if (state.isSetDrfFlag() || override)
+  {
+    state.setSetDrfFlag(false);
+          return true;
+  }
+  else
+  {
+          return false;
+  }
+}
+
+
+void DTP::generatePDUsnew()
+{
+  DataTransferPDU* baseDataPDU = new DataTransferPDU();
+  setPDUHeader(baseDataPDU);
+
+  //invoke SDU protection so we don't have to bother with it afterwards; EDIT: sduQ is not used anymore!!!
+  for (std::vector<SDU*>::iterator it = sduQ.begin(); it != sduQ.end(); ++it){
+    sduProtection(*it);
+  }
+
+  while (!dataQ.empty())
+  {
+    DataTransferPDU* genPDU = baseDataPDU->dup();
+    genPDU->setSeqNum(state.getNextSeqNumToSend());
+
+
+    /* Set DRF flag in PDU */
+    if(setDRFInPDU(false)){
+      genPDU->setFlags(genPDU->getFlags() | DRF_FLAG);
+    }
+
+
+    UserDataField* userData = new UserDataField();
+
+    while (!dataQ.empty() && dataQ.front()->getSize() <= MAX_PDU_SIZE - userData->getSize())
+    {
+      userData->addData((dataQ.front()));
+
+      dataQ.erase(dataQ.begin());
+
+    }
+
+    genPDU->setUserDataField(userData);
+
+    state.pushBackToGeneratedPDUQ(genPDU);
+  }
+
+  delete baseDataPDU;
+}
+
+
+
+
+/**
+ *
+ */
+void DTP::trySendGenPDUs(std::vector<DataTransferPDU*>* pduQ)
+{
+
+  if (state.isDtcpPresent())
+  {
+    //postablePDUs = empty;
+
+    //if flowControl present
+    if (state.isWinBased() || state.isRateBased())
+    {
+      std::vector<DataTransferPDU*>::iterator it;
+      for (it = pduQ->begin(); it != pduQ->end(); it = pduQ->begin())
+      {
+        if (state.isWinBased())
+        {
+          if ((*it)->getSeqNum() <= dtcp->getSndRtWinEdge())
+          {
+            /* The Window is Open. */
+            dtcp->runTxControlPolicy(&state, pduQ);
+            /* Watchout because the current 'it' could be freed */
+          }
+          else
+          {
+            /* The Window is Closed */
+            dtcp->getDTCPState()->setClosedWindow(true);
+
+          if(pduQ == dtcp->getDTCPState()->getClosedWindowQ()){
+              break;
+            }
+
+            if (dtcp->getDTCPState()->getClosedWinQueLen() < dtcp->getDTCPState()->getMaxClosedWinQueLen() - 1)
+            {
+              /* Put PDU on the closedWindowQueue */
+              dtcp->getDTCPState()->pushBackToClosedWinQ((*it));
+              /* I know it is nasty to access it directly */
+              state.getGeneratedPDUQ()->erase(it);
+
+
+            }
+            else
+            {
+              state.setCurrentPdu(*it);
+              state.getGeneratedPDUQ()->erase(it);
+
+              dtcp->runFCOverrunPolicy(&state);
+
+            }
+          }
+        }// end of Window based
+
+        if (state.isRateBased())
+        {
+          if (dtcp->getPdusSentInTimeUnit() < dtcp->getSendingRate())
+          {
+            dtcp->runNoRateSlowDownPolicy(&state);
+          }
+          else
+          {
+            /* Exceeding Sending Rate */
+
+            dtcp->runNoOverridePeakPolicy(&state);
+          }
+        }// end of RateBased
+
+
+        if (dtcp->getDTCPState()->isClosedWindow() ^  dtcp->isSendingRateFullfilled())
+        {
+          dtcp->runReconcileFCPolicy(&state);
+        }
+
+      }//end of for
+
+
+
+    }else{
+      /* FlowControl is not present */
+      std::vector<DataTransferPDU*>::iterator it;
+      PDUQ_t* pduQ = state.getGeneratedPDUQ();
+      for (it = pduQ->begin(); it != pduQ->end();)
+      {
+        state.pushBackToPostablePDUQ((*it));
+        it = pduQ->erase(it);
+      }
+    }
+
+    /* Iterate over postablePDUs and give them to the RMT */
+    if (state.isRxPresent())
+    {
+      std::vector<DataTransferPDU*>::iterator it;
+      PDUQ_t* pduQ = state.getPostablePDUQ();
+      for (it = pduQ->begin(); it != pduQ->end();)
+      {
+        /* Put a copy of each PDU in the RetransmissionQueue */
+        dtcp->pushBackToRxQ((*it)->dup());
+
+        sendToRMT((*it));
+
+        it = pduQ->erase(it);
+      }
+    }
+    else
+    {
+      /*No Retransmission Control is present, but FlowControl */
+      /* Post all postablePDUs to RMT */
+      std::vector<DataTransferPDU*>::iterator it;
+      PDUQ_t* pduQ = state.getPostablePDUQ();
+      for (it = pduQ->begin(); it != pduQ->end();)
+      {
+
+        sendToRMT((*it));
+        it = pduQ->erase(it);
+      }
+      //TODO A4 Report change in specs
+//      state.setSenderLeftWinEdge(state.getNextSeqNumToSendWithoutIncrement());
+      dtcp->updateSenderLWE(state.getNextSeqNumToSendWithoutIncrement());
+
+    }
+  }
+  else
+  {
+    /* DTCP is not present */
+    /* Post all generatedPDUs to RMT */
+    std::vector<DataTransferPDU*>::iterator it;
+    PDUQ_t* pduQ = state.getGeneratedPDUQ();
+    for (it = pduQ->begin(); it != pduQ->end();)
+    {
+      sendToRMT((*it));
+      it = pduQ->erase(it);
+    }
+//    TODO A4 Report change in specs -> O'really?
+//    state.setSenderLeftWinEdge(state.getNextSeqNumToSendWithoutIncrement());
+  }
+
+}
+
+
+
+
+
+/**
+ * This method calls specified function to perform SDU protection.
+ * SDU size will probably change because of the added CRC or whatnot.
+ * @param sdu is the SDU being protected eg added CRC or otherwise
+ */
+void DTP::sduProtection(SDU *sdu)
+{
+  //TODO A1
+
+}
+
+bool DTP::runInitialSeqNumPolicy()
+{
+  Enter_Method("InitialSeqNumPolicy");
+  if (initialSeqNumPolicy == NULL || initialSeqNumPolicy->run(&state, dtcp->getDTCPState()))
+  {
+
+  /*Default*/
+  //TODO B1 set it to random number
+  state.setNextSeqNumToSend(DEFAULT_INIT_SEQUENCE_NUMBER);
+
+  }
+  return false;
+}
+
+void DTP::sendAckOnlyPDU(unsigned int seqNum)
+{
+
+  //send an Ack/FlowControlPDU
+  AckOnlyPDU* ackPDU = new AckOnlyPDU();
+  setPDUHeader(ackPDU);
+  ackPDU->setSeqNum(dtcp->getNextSndCtrlSeqNum());
+  ackPDU->setAckNackSeqNum(seqNum);
+  EV << getFullPath() << ": Sending Ack for PDU number: " << seqNum << endl;
+  //    send(ackPDU, southO);
+  sendToRMT(ackPDU);
+}
+
+void DTP::sendControlAckPDU()
+{
+  Enter_Method_Silent();
+
+
+  ControlAckPDU* ctrlAckPdu = new ControlAckPDU();
+  setPDUHeader(ctrlAckPdu);
+  ctrlAckPdu->setSeqNum(dtcp->getNextSndCtrlSeqNum());
+  ctrlAckPdu->setLastCtrlSeqNumRcv(dtcp->getLastCtrlSeqNumRcv());
+  ctrlAckPdu->setSndLtWinEdge(state.getRcvLeftWinEdge());
+  ctrlAckPdu->setSndRtWinEdge(dtcp->getRcvRtWinEdge());
+  ctrlAckPdu->setMyLtWinEdge(dtcp->getSenderLeftWinEdge());
+  ctrlAckPdu->setMyRtWinEdge(dtcp->getSndRtWinEdge());
+
+  ctrlAckPdu->setMyRcvRate(dtcp->getRcvrRate());
+
+  sendToRMT(ctrlAckPdu);
+}
+
+void DTP::sendEmptyDTPDU()
+{
+  Enter_Method_Silent();
+
+  //Send Transfer PDU With Zero length
+  DataTransferPDU* dataPdu = new DataTransferPDU();
+  setPDUHeader(dataPdu);
+  unsigned int seqNum = state.getNextSeqNumToSend();
+  dataPdu->setSeqNum(seqNum);
+
+  if(setDRFInPDU(false)){
+    dataPdu->setFlags(dataPdu->getFlags() | DRF_FLAG);
+  }
+  UserDataField* userData = new UserDataField();
+  dataPdu->setUserDataField(userData);
+
+  if(state.isRxPresent()){
+//    RxExpiryTimer* rxExpTimer = new RxExpiryTimer("RxExpiryTimer");
+//    rxExpTimer->setPdu(dataPdu->dup());
+//    rxQ.push_back(rxExpTimer);
+//    schedule(rxExpTimer);
+    dtcp->pushBackToRxQ(dataPdu->dup());
+  }
+
+  sendToRMT(dataPdu);
+}
+
+void DTP::runRcvrInactivityTimerPolicy()
+{
+  Enter_Method("RcvrInactivityPolicy");
+  if (rcvrInactivityPolicy == NULL || rcvrInactivityPolicy->run(&state, dtcp->getDTCPState()))
+  {
+
+    //XXX Why reset my sending direction when I am not receiving anything?
+    // I can still be sending load of PDUs
+    /* Default */
+    state.setSetDrfFlag(true);
+    runInitialSeqNumPolicy();
+
+    //Discard any PDUs on the PDUretransmissionQueue
+//    clearRxQ();
+//
+//    //Discard any PDUs on the ClosedWindowQueue
+//    clearClosedWindowQ();
+
+    //XXX Ok, we can send ControlAck. It won't hurt.
+    //Send Control Ack PDU
+    sendControlAckPDU();
+    //TODO RcvRates
+
+    //Send Transfer PDU With Zero length
+    sendEmptyDTPDU();
+
+    //TODO A! Notify User Flow there has been no activity for awhile.
+  }
+}
+
+void DTP::runSenderInactivityTimerPolicy()
+{
+  Enter_Method("SenderInactivityPolicy");
+   if (senderInactivityPolicy == NULL || senderInactivityPolicy->run(&state, dtcp->getDTCPState()))
+   {
+
+  /* Default */
+  state.setSetDrfFlag(true);
+  runInitialSeqNumPolicy();
+
+  //Discard any PDUs on the PDUretransmissionQueue
+  clearRxQ();
+
+  //Discard any PDUs on the ClosedWindowQueue
+  dtcp->getDTCPState()->clearClosedWindowQ();
+
+
+  //Send Control Ack PDU
+  sendControlAckPDU();
+
+  //Send Transfer PDU With Zero length
+  sendEmptyDTPDU();
+
+  //TODO A! Notify User Flow there has been no activity for awhile.
+   }
+
+}
+
+
+
+void DTP::sendToRMT(PDU* pdu)
+{
+  Enter_Method("SendToRMT");
+  take(pdu);
+  if(pdu->getType() == DATA_TRANSFER_PDU){
+    state.setLastSeqNumSent(pdu->getSeqNum());
+    EV << getFullPath() <<": PDU number: " << pdu->getSeqNum() <<" sent in time: " << simTime() << endl;
+  }else{
+    //This should be controlPDU so do not have to increment LastSeqNumSent
+    EV << getFullPath() <<": Control PDU number: " << pdu->getSeqNum() <<" sent in time: " << simTime() << endl;
+  }
+
+  send(pdu, southO);
+}
+
+double DTP::getRxTime()
+{
+  //TODO A2 Epsilon
+  // RTT + A + epsilon
+
+  return state.getRtt() + getQoSCube()->getATime()/1000;
+}
+
+unsigned int DTP::getAllowableGap()
+{
+
+  getQoSCube()->getMaxAllowGap();
+  return 4;
+}
+
+//TODO A! When to call it?
+void DTP::rcvrBufferStateChange()
+{
+  if (state.isRateBased())
+  {
+    dtcp->runRateReductionPolicy(&state);
+  }
+}
+
+void DTP::svUpdate(unsigned int seqNum)
+{
+//  state.setRcvLeftWinEdge(seqNum);
+//  uint ackSeqNum = state.getRcvLeftWinEdge();
+  /* XXX Don't know where else to put */
+
+  // TODO A1 Get approval for this change
+  //update RcvLeftWindoEdge
+  // if there is FC, but no RX then updateRcvLWE would not be called, right?
+    state.updateRcvLWE(seqNum);
+
+
+  if (state.isFCPresent())
+  {
+    if (state.isWinBased())
+    {
+//      runRcvrFlowControlPolicy();
+
+     dtcp->runRcvrFCPolicy(&state);
+    }
+  }
+
+  if (state.isRxPresent())
+  {
+//    runRcvrAckPolicy(state.getRcvLeftWinEdge() - 1);
+    dtcp->runRcvrAckPolicy(&state);
+  }
+
+  if (state.isFCPresent() && !state.isRxPresent())
+  {
+//    runReceivingFlowControlPolicy();
+    dtcp->runReceivingFCPolicy(&state);
+  }
+
+}
+
+void DTP::flushReassemblyPDUQ()
+{
+  state.clearReassemblyPDUQ();
+}
+
+void DTP::clearRxQ()
+{
+  dtcp->clearRxQ();
+}
+
+void DTP::schedule(DTPTimers *timer, double time)
+{
+
+  switch (timer->getType())
+  {
+
+    case (DTP_SENDER_INACTIVITY_TIMER): {
+
+      //TODO A1 Why schedule inactivity timer when there is not RxControl? -> Don't!
+
+      //TODO A! 3(MPL+R+A)
+      unsigned int rxCount = 1;
+      if(state.isRxPresent()){
+        rxCount = dtcp->getDataReXmitMax();
+
+
+      scheduleAt(simTime() + 3 * (MPL_TIME + (getRxTime() * rxCount)) + state.getQoSCube()->getATime()/1000 , timer);
+      }
+//      scheduleAt(simTime() + 10, timer);
+      break;
+    }
+    case (DTP_RCVR_INACTIVITY_TIMER): {
+
+      //TODO A1 Why schedule inactivity timer when there is not RxControl? -> Don't!
+      unsigned int rxCount = 1;
+      if(state.isRxPresent()){
+        rxCount = dtcp->getDataReXmitMax();
+        //TODO A!
+        scheduleAt(simTime() + 2 * (MPL_TIME + (getRxTime() * rxCount)) + 0 , timer);
+      }
+      break;
+    }
+    case (DTP_A_TIMER):{
+      //TODO A2 Tune it up.
+      scheduleAt(simTime() + getQoSCube()->getATime() , timer);
+      break;
+    }
+  }
+}
+
+void DTP::setFlow(Flow* flow)
+{
+  this->flow = flow;
+}
+
+void DTP::setDTCP(DTCP* dtcp){
+  this->dtcp = dtcp;
+  if(dtcp!= NULL){
+    dtcp->setDTP(this);
+  }
+}
+