Switch to unified view

a b/src/DAF/AE/AEStream.cc
1
//
2
// This program is free software: you can redistribute it and/or modify
3
// it under the terms of the GNU Lesser General Public License as published by
4
// the Free Software Foundation, either version 3 of the License, or
5
// (at your option) any later version.
6
// 
7
// This program is distributed in the hope that it will be useful,
8
// but WITHOUT ANY WARRANTY; without even the implied warranty of
9
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10
// GNU Lesser General Public License for more details.
11
// 
12
// You should have received a copy of the GNU Lesser General Public License
13
// along with this program.  If not, see http://www.gnu.org/licenses/.
14
// 
15
16
#include "AEStream.h"
17
18
Define_Module(AEStream);
19
20
AEStream::AEStream(){
21
  //Consts
22
      TIM_START           = "StartCommunication";
23
      TIM_STOP            = "StopCommunication";
24
      MSG_STDATA          = "DATA-";
25
      PAR_START           = "startAt";
26
      PAR_STOP            = "stopAt";
27
      PAR_BEGIN           = "beginStreamAt";
28
      PAR_END             = "endStreamAt";
29
      PAR_INTERVAL        = "interval";
30
      PAR_SIZE            = "size";
31
      PAR_DSTAPNAME       = "dstApName";
32
      PAR_DSTAPINSTANCE   = "dstApInstance";
33
      PAR_DSTAENAME       = "dstAeName";
34
      PAR_DSTAEINSTANCE   = "dstAeInstance";
35
}
36
37
38
void AEStream::initialize()
39
{
40
    //Init pointers
41
    initPointers();
42
    //Source info
43
    initNamingInfo();
44
    //Setup signals
45
    initSignalsAndListeners();
46
    //Init QoSRequirements
47
    initQoSRequiremets();
48
49
    //Destination for flow
50
    dstApName     = this->par(PAR_DSTAPNAME).stringValue();
51
    dstApInstance = this->par(PAR_DSTAPINSTANCE).stringValue();
52
    dstAeName     = this->par(PAR_DSTAENAME).stringValue();
53
    dstAeInstance = this->par(PAR_DSTAEINSTANCE).stringValue();
54
55
    //Timers
56
    startAt         = simTime() + par(PAR_START);
57
    stopAt          = simTime() + par(PAR_STOP);
58
    beginStreamAt   = simTime() + par(PAR_BEGIN);
59
    endStreamAt     = simTime() + par(PAR_END);
60
    interval        = par(PAR_INTERVAL).doubleValue();
61
    size            = par(PAR_SIZE);
62
63
    //Schedule AllocateRequest
64
    if (startAt > 0)
65
        prepareAllocateRequest();
66
    //Schedule Data transfer
67
    if (beginStreamAt > 0 && beginStreamAt < endStreamAt)
68
        prepareStreamData();
69
    //Schedule DeallocateRequest
70
    if (stopAt > 0)
71
        prepareDeallocateRequest();
72
73
74
}
75
76
void AEStream::handleMessage(cMessage *msg)
77
{
78
    if ( msg->isSelfMessage() )
79
        this->handleSelfMessage(msg);
80
}
81
82
void AEStream::handleSelfMessage(cMessage* msg) {
83
    if ( !strcmp(msg->getName(), TIM_START) ) {
84
        //Flow
85
        APNamingInfo src = this->getApni();
86
        APNamingInfo dst = APNamingInfo( APN(this->dstApName), this->dstApInstance,
87
                                         this->dstAeName, this->dstAeInstance);
88
89
        Flow fl = Flow(src, dst);
90
        fl.setQosParameters(this->getQoSRequirements());
91
92
        //Insert it to the Flows ADT
93
        insertFlow(fl);
94
95
        sendAllocationRequest(&flows.back());
96
    }
97
    else if ( !strcmp(msg->getName(), TIM_STOP) ) {
98
        sendDeallocationRequest(&flows.back());
99
    }
100
    else if ( strstr(msg->getName(), MSG_STDATA) ) {
101
        //Create data stream chunk messsage
102
        CDAP_M_Read* data = new CDAP_M_Read(msg->getName());
103
        object_t obj;
104
        obj.objectName = msg->getName();
105
        obj.objectClass = "string";
106
        obj.objectInstance = -1;
107
        obj.objectVal = (cObject*)("0123456789abcdef");
108
        data->setObject(obj);
109
        data->setByteLength(size);
110
111
        //Send message
112
        sendData(&flows.back(), data);
113
    }
114
    else
115
        EV << this->getFullPath() << " received unknown self-message " << msg->getName();
116
    delete(msg);
117
}
118
119
void AEStream::prepareAllocateRequest() {
120
    //Schedule AllocateRequest
121
    cMessage* m1 = new cMessage(TIM_START);
122
    scheduleAt(startAt, m1);
123
}
124
125
void AEStream::prepareStreamData() {
126
    //Schedule stream data chunk
127
    int j = 0;
128
    for (simtime_t i = beginStreamAt; i < endStreamAt; i += interval) {
129
        std::ostringstream ss;
130
        ss << MSG_STDATA << j++;
131
        cMessage* m2 = new cMessage(ss.str().c_str());
132
        scheduleAt(i, m2);
133
    }
134
}
135
136
void AEStream::prepareDeallocateRequest() {
137
    //Schedule DeallocateRequest
138
    cMessage* m3 = new cMessage(TIM_STOP);
139
    scheduleAt(stopAt, m3);
140
}
141
142
void AEStream::processMRead(CDAPMessage* msg) {
143
    CDAP_M_Read* msg1 = check_and_cast<CDAP_M_Read*>(msg);
144
145
    EV << "Received data M_DATA";
146
    object_t object = msg1->getObject();
147
    EV << " with object '" << object.objectClass << "' and value '" << object.objectVal << "'" << endl;
148
149
}