Switch to unified view

a/libupnpp/workqueue.hxx b/libupnpp/workqueue.hxx
...
...
27
using std::unordered_map;
27
using std::unordered_map;
28
using std::unordered_set;
28
using std::unordered_set;
29
using std::queue;
29
using std::queue;
30
using std::string;
30
using std::string;
31
31
32
//#include "debuglog.h"
33
#define LOGDEB2(X)
34
#define LOGDEB1(X)
35
#define LOGDEB0(X)
36
#define LOGDEB(X)
37
#define LOGINFO(X)
38
#define LOGERR(X)
39
#include "ptmutex.hxx"
32
#include "ptmutex.hxx"
40
33
41
/// Store per-worker-thread data. Just an initialized timespec, and
34
/// Store per-worker-thread data. Just an initialized timespec, and
42
/// used at the moment.
35
/// used at the moment.
43
class WQTData {
36
class WQTData {
...
...
77
        (pthread_cond_init(&m_wcond, 0) == 0);
70
        (pthread_cond_init(&m_wcond, 0) == 0);
78
    }
71
    }
79
72
80
    ~WorkQueue()
73
    ~WorkQueue()
81
    {
74
    {
82
      LOGDEB2(("WorkQueue::~WorkQueue:%s\n", m_name.c_str()));
83
        if (!m_worker_threads.empty())
75
        if (!m_worker_threads.empty())
84
            setTerminateAndWait();
76
            setTerminateAndWait();
85
    }
77
    }
86
78
87
    /** Start the worker threads.
79
    /** Start the worker threads.
...
...
97
        PTMutexLocker lock(m_mutex);
89
        PTMutexLocker lock(m_mutex);
98
        for  (int i = 0; i < nworkers; i++) {
90
        for  (int i = 0; i < nworkers; i++) {
99
            int err;
91
            int err;
100
            pthread_t thr;
92
            pthread_t thr;
101
            if ((err = pthread_create(&thr, 0, workproc, arg))) {
93
            if ((err = pthread_create(&thr, 0, workproc, arg))) {
102
              LOGERR(("WorkQueue:%s: pthread_create failed, err %d\n",
103
                      m_name.c_str(), err));
104
                return false;
94
                return false;
105
            }
95
            }
106
            m_worker_threads.insert(pair<pthread_t, WQTData>(thr, WQTData()));
96
            m_worker_threads.insert(pair<pthread_t, WQTData>(thr, WQTData()));
107
        }
97
        }
108
        return true;
98
        return true;
...
...
114
     */
104
     */
115
    bool put(T t)
105
    bool put(T t)
116
    {
106
    {
117
        PTMutexLocker lock(m_mutex);
107
        PTMutexLocker lock(m_mutex);
118
        if (!lock.ok() || !ok()) {
108
        if (!lock.ok() || !ok()) {
119
      LOGERR(("WorkQueue::put:%s: !ok or mutex_lock failed\n",
120
          m_name.c_str()));
121
            return false;
109
            return false;
122
        }
110
        }
123
111
124
        while (ok() && m_high > 0 && m_queue.size() >= m_high) {
112
        while (ok() && m_high > 0 && m_queue.size() >= m_high) {
125
        m_clientsleeps++;
113
        m_clientsleeps++;
...
...
161
     */
149
     */
162
    bool waitIdle()
150
    bool waitIdle()
163
    {
151
    {
164
        PTMutexLocker lock(m_mutex);
152
        PTMutexLocker lock(m_mutex);
165
        if (!lock.ok() || !ok()) {
153
        if (!lock.ok() || !ok()) {
166
          LOGERR(("WorkQueue::waitIdle:%s: not ok or can't lock\n",
167
                  m_name.c_str()));
168
            return false;
154
            return false;
169
        }
155
        }
170
156
171
        // We're done when the queue is empty AND all workers are back
157
        // We're done when the queue is empty AND all workers are back
172
        // waiting for a task.
158
        // waiting for a task.
...
...
174
                        m_workers_waiting != m_worker_threads.size())) {
160
                        m_workers_waiting != m_worker_threads.size())) {
175
            m_clients_waiting++;
161
            m_clients_waiting++;
176
            if (pthread_cond_wait(&m_ccond, lock.getMutex())) {
162
            if (pthread_cond_wait(&m_ccond, lock.getMutex())) {
177
                m_clients_waiting--;
163
                m_clients_waiting--;
178
                m_ok = false;
164
                m_ok = false;
179
              LOGERR(("WorkQueue::waitIdle:%s: cond_wait failed\n",
180
                      m_name.c_str()));
181
                return false;
165
                return false;
182
            }
166
            }
183
            m_clients_waiting--;
167
            m_clients_waiting--;
184
        }
168
        }
185
169
...
...
193
     * should be called after waitIdle() for an orderly shutdown.
177
     * should be called after waitIdle() for an orderly shutdown.
194
     */
178
     */
195
    void* setTerminateAndWait()
179
    void* setTerminateAndWait()
196
    {
180
    {
197
        PTMutexLocker lock(m_mutex);
181
        PTMutexLocker lock(m_mutex);
198
      LOGDEB(("setTerminateAndWait:%s\n", m_name.c_str()));
199
182
200
        if (m_worker_threads.empty()) {
183
        if (m_worker_threads.empty()) {
201
            // Already called ?
184
            // Already called ?
202
            return (void*)0;
185
            return (void*)0;
203
        }
186
        }
...
...
206
        m_ok = false;
189
        m_ok = false;
207
        while (m_workers_exited < m_worker_threads.size()) {
190
        while (m_workers_exited < m_worker_threads.size()) {
208
            pthread_cond_broadcast(&m_wcond);
191
            pthread_cond_broadcast(&m_wcond);
209
            m_clients_waiting++;
192
            m_clients_waiting++;
210
            if (pthread_cond_wait(&m_ccond, lock.getMutex())) {
193
            if (pthread_cond_wait(&m_ccond, lock.getMutex())) {
211
              LOGERR(("WorkQueue::setTerminate:%s: cond_wait failed\n",
212
                      m_name.c_str()));
213
                m_clients_waiting--;
194
                m_clients_waiting--;
214
                return (void*)0;
195
                return (void*)0;
215
            }
196
            }
216
            m_clients_waiting--;
197
            m_clients_waiting--;
217
        }
198
        }
218
199
219
      LOGINFO(("%s: tasks %u nowakes %u wsleeps %u csleeps %u\n",
220
               m_name.c_str(), m_tottasks, m_nowake, m_workersleeps,
221
               m_clientsleeps));
222
        // Perform the thread joins and compute overall status
200
        // Perform the thread joins and compute overall status
223
        // Workers return (void*)1 if ok
201
        // Workers return (void*)1 if ok
224
        void *statusall = (void*)1;
202
        void *statusall = (void*)1;
225
        unordered_map<pthread_t,  WQTData>::iterator it;
203
        unordered_map<pthread_t,  WQTData>::iterator it;
226
        while (!m_worker_threads.empty()) {
204
        while (!m_worker_threads.empty()) {
...
...
235
        // Reset to start state.
213
        // Reset to start state.
236
        m_workers_exited = m_clients_waiting = m_workers_waiting =
214
        m_workers_exited = m_clients_waiting = m_workers_waiting =
237
            m_tottasks = m_nowake = m_workersleeps = m_clientsleeps = 0;
215
            m_tottasks = m_nowake = m_workersleeps = m_clientsleeps = 0;
238
        m_ok = true;
216
        m_ok = true;
239
217
240
      LOGDEB(("setTerminateAndWait:%s done\n", m_name.c_str()));
241
        return statusall;
218
        return statusall;
242
    }
219
    }
243
220
244
    /** Take task from queue. Called from worker.
221
    /** Take task from queue. Called from worker.
245
     *
222
     *
...
...
248
     */
225
     */
249
    bool take(T* tp, size_t *szp = 0)
226
    bool take(T* tp, size_t *szp = 0)
250
    {
227
    {
251
        PTMutexLocker lock(m_mutex);
228
        PTMutexLocker lock(m_mutex);
252
        if (!lock.ok() || !ok()) {
229
        if (!lock.ok() || !ok()) {
253
          LOGDEB(("WorkQueue::take:%s: not ok\n", m_name.c_str()));
254
            return false;
230
            return false;
255
        }
231
        }
256
232
257
        while (ok() && m_queue.size() < m_low) {
233
        while (ok() && m_queue.size() < m_low) {
258
            m_workersleeps++;
234
            m_workersleeps++;
259
            m_workers_waiting++;
235
            m_workers_waiting++;
260
            if (m_queue.empty())
236
            if (m_queue.empty())
261
                pthread_cond_broadcast(&m_ccond);
237
                pthread_cond_broadcast(&m_ccond);
262
            if (pthread_cond_wait(&m_wcond, lock.getMutex()) || !ok()) {
238
            if (pthread_cond_wait(&m_wcond, lock.getMutex()) || !ok()) {
263
              // !ok is a normal condition when shutting down
264
              if (ok()) {
265
                  LOGERR(("WorkQueue::take:%s: cond_wait failed or !ok\n",
266
                          m_name.c_str()));
267
              }
268
                m_workers_waiting--;
239
                m_workers_waiting--;
269
                return false;
240
                return false;
270
            }
241
            }
271
            m_workers_waiting--;
242
            m_workers_waiting--;
272
        }
243
        }
...
...
293
     * false by the shutdown code anyway). The thread must return/exit
264
     * false by the shutdown code anyway). The thread must return/exit
294
     * immediately after calling this.
265
     * immediately after calling this.
295
     */
266
     */
296
    void workerExit()
267
    void workerExit()
297
    {
268
    {
298
      LOGDEB(("workerExit:%s\n", m_name.c_str()));
299
        PTMutexLocker lock(m_mutex);
269
        PTMutexLocker lock(m_mutex);
300
        m_workers_exited++;
270
        m_workers_exited++;
301
        m_ok = false;
271
        m_ok = false;
302
        pthread_cond_broadcast(&m_ccond);
272
        pthread_cond_broadcast(&m_ccond);
303
    }
273
    }
...
...
311
281
312
private:
282
private:
313
    bool ok()
283
    bool ok()
314
    {
284
    {
315
        bool isok = m_ok && m_workers_exited == 0 && !m_worker_threads.empty();
285
        bool isok = m_ok && m_workers_exited == 0 && !m_worker_threads.empty();
316
      if (!isok) {
317
          LOGDEB(("WorkQueue:ok:%s: not ok m_ok %d m_workers_exited %d "
318
                  "m_worker_threads size %d\n", m_name.c_str(),
319
                  m_ok, m_workers_exited, int(m_worker_threads.size())));
320
      }
321
        return isok;
286
        return isok;
322
    }
287
    }
323
288
324
    long long nanodiff(const struct timespec& older,
289
    long long nanodiff(const struct timespec& older,
325
                       const struct timespec& newer)
290
                       const struct timespec& newer)