Switch to unified view

a/src/utils/workqueue.h b/src/utils/workqueue.h
...
...
15
 *   59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
15
 *   59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
16
 */
16
 */
17
#ifndef _WORKQUEUE_H_INCLUDED_
17
#ifndef _WORKQUEUE_H_INCLUDED_
18
#define _WORKQUEUE_H_INCLUDED_
18
#define _WORKQUEUE_H_INCLUDED_
19
19
20
#include "pthread.h"
20
#include <pthread.h>
21
#include <time.h>
22
21
#include <string>
23
#include <string>
22
#include <queue>
24
#include <queue>
25
#include <tr1/unordered_map>
26
#include <tr1/unordered_set>
27
using std::tr1::unordered_map;
28
using std::tr1::unordered_set;
23
using std::queue;
29
using std::queue;
24
using std::string;
30
using std::string;
25
31
32
#include "debuglog.h"
33
34
#define WORKQUEUE_TIMING
35
36
class WQTData {
37
    public:
38
    WQTData() {wstart.tv_sec = 0; wstart.tv_nsec = 0;}
39
    struct timespec wstart;
40
};
41
26
/**
42
/**
27
 * A WorkQueue manages the synchronisation around a queue of work items,
43
 * A WorkQueue manages the synchronisation around a queue of work items,
28
 * where a single client thread queues tasks and a single worker takes
44
 * where a number of client threads queue tasks and a number of worker
29
 * and executes them. The goal is to introduce some level of
45
 * threads takes and executes them. The goal is to introduce some level
30
 * parallelism between the successive steps of a previously single
46
 * of parallelism between the successive steps of a previously single
31
 * threaded pipe-line (data extraction / data preparation / index
47
 * threaded pipe-line (data extraction / data preparation / index
32
 * update).
48
 * update).
33
 *
49
 *
34
 * There is no individual task status return. In case of fatal error,
50
 * There is no individual task status return. In case of fatal error,
35
 * the client or worker sets an end condition on the queue. A second
51
 * the client or worker sets an end condition on the queue. A second
36
 * queue could conceivably be used for returning individual task
52
 * queue could conceivably be used for returning individual task
37
 * status.
53
 * status.
38
 */
54
 */
39
template <class T> class WorkQueue {
55
template <class T> class WorkQueue {
40
public:
56
public:
57
58
    /** Create a WorkQueue
59
     * @param name for message printing
60
     * @param hi number of tasks on queue before clients blocks. Default 0 
61
     *    meaning no limit.
62
     * @param lo minimum count of tasks before worker starts. Default 1.
63
     */
41
    WorkQueue(int hi = 0, int lo = 1)
64
    WorkQueue(const string& name, int hi = 0, int lo = 1)
42
  : m_high(hi), m_low(lo), m_size(0), m_worker_up(false),
65
        : m_name(name), m_high(hi), m_low(lo), m_size(0), 
43
    m_worker_waiting(false), m_jobcnt(0), m_lenacc(0)
66
          m_workers_waiting(0), m_workers_exited(0), m_jobcnt(0), 
67
          m_clientwait(0), m_workerwait(0), m_workerwork(0)
44
    {
68
    {
45
  m_ok = (pthread_cond_init(&m_cond, 0) == 0) && 
69
        m_ok = (pthread_cond_init(&m_cond, 0) == 0) && 
46
      (pthread_mutex_init(&m_mutex, 0) == 0);
70
            (pthread_mutex_init(&m_mutex, 0) == 0);
47
    }
71
    }
48
72
49
    ~WorkQueue() 
73
    ~WorkQueue() 
50
    {
74
    {
51
  if (m_worker_up)
75
        LOGDEB2(("WorkQueue::~WorkQueue: name %s\n", m_name.c_str()));
76
        if (!m_worker_threads.empty())
52
      setTerminateAndWait();
77
            setTerminateAndWait();
78
    }
79
80
    /** Start the worker threads. 
53
    }
81
     *
54
82
     * @param nworkers number of threads copies to start.
55
    /** Start the worker thread. The start_routine will loop
83
     * @param start_routine thread function. It should loop
56
     *  taking and executing tasks. */
84
     *      taking (QueueWorker::take() and executing tasks. 
85
     * @param arg initial parameter to thread function.
86
     * @return true if ok.
87
     */
57
    bool start(void *(*start_routine)(void *), void *arg)
88
    bool start(int nworkers, void *(*start_routine)(void *), void *arg)
58
    {
59
  bool status = pthread_create(&m_worker_thread, 0, 
60
                   start_routine, arg) == 0;
61
  if (status)
62
      m_worker_up = true;
63
  return status;
64
    }
89
    {
90
        for  (int i = 0; i < nworkers; i++) {
91
            int err;
92
            pthread_t thr;
93
            if ((err = pthread_create(&thr, 0, start_routine, arg))) {
94
                LOGERR(("WorkQueue:%s: pthread_create failed, err %d\n",
95
                        m_name.c_str(), err));
96
                return false;
97
            }
98
            m_worker_threads.insert(pair<pthread_t, WQTData>(thr, WQTData()));
99
        }
100
        return true;
101
    }
65
102
103
    /** Add item to work queue, called from client.
66
    /**
104
     *
67
     * Add item to work queue. Sleep if there are already too many.
105
     * Sleeps if there are already too many.
68
     * Called from client.
69
     */
106
     */
70
    bool put(T t)
107
    bool put(T t)
71
    {
108
    {
72
  if (!ok() || pthread_mutex_lock(&m_mutex) != 0) 
109
        if (!ok() || pthread_mutex_lock(&m_mutex) != 0) 
73
      return false;
110
            return false;
74
111
112
#ifdef WORKQUEUE_TIMING
113
        struct timespec before;
114
        clock_gettime(CLOCK_MONOTONIC, &before);
115
#endif
116
75
  while (ok() && m_high > 0 && m_queue.size() >= m_high) {
117
        while (ok() && m_high > 0 && m_queue.size() >= m_high) {
76
      // Keep the order: we test ok() AFTER the sleep...
118
            // Keep the order: we test ok() AFTER the sleep...
77
      if (pthread_cond_wait(&m_cond, &m_mutex) || !ok()) {
119
            if (pthread_cond_wait(&m_cond, &m_mutex) || !ok()) {
120
                pthread_mutex_unlock(&m_mutex);
121
                return false;
122
            }
123
        }
124
125
#ifdef WORKQUEUE_TIMING
126
        struct timespec after;
127
        clock_gettime(CLOCK_MONOTONIC, &after);
128
        m_clientwait += nanodiff(before, after);
129
#endif
130
131
        m_queue.push(t);
132
        ++m_size;
133
  // Just wake one worker, there is only one new task.
134
        pthread_cond_signal(&m_cond);
78
      pthread_mutex_unlock(&m_mutex);
135
        pthread_mutex_unlock(&m_mutex);
79
      return false;
136
        return true;
80
      }
137
    }
138
139
    /** Wait until the queue is inactive. Called from client.
140
     *
141
     * Waits until the task queue is empty and the workers are all
142
     * back sleeping. Used by the client to wait for all current work
143
     * to be completed, when it needs to perform work that couldn't be
144
     * done in parallel with the worker's tasks, or before shutting
145
     * down. Work can be resumed after calling this.
146
     */
147
    bool waitIdle()
148
    {
149
        if (!ok() || pthread_mutex_lock(&m_mutex) != 0) {
150
            LOGERR(("WorkQueue::waitIdle: %s not ok or can't lock\n",
151
                    m_name.c_str()));
152
            return false;
153
        }
154
155
        // We're done when the queue is empty AND all workers are back
156
        // waiting for a task.
157
        while (ok() && (m_queue.size() > 0 || 
158
                        m_workers_waiting != m_worker_threads.size())) {
159
            if (pthread_cond_wait(&m_cond, &m_mutex)) {
160
                pthread_mutex_unlock(&m_mutex);
161
                m_ok = false;
162
                LOGERR(("WorkQueue::waitIdle: cond_wait failed\n"));
163
                return false;
164
            }
165
        }
166
167
#ifdef WORKQUEUE_TIMING
168
        long long M = 1000000LL;
169
        long long wscl = m_worker_threads.size() * M;
170
        LOGERR(("WorkQueue:%s: clients wait (all) %lld mS, "
171
                 "worker wait (avg) %lld mS, worker work (avg) %lld mS\n", 
172
                 m_name.c_str(), m_clientwait / M, m_workerwait / wscl,  
173
                 m_workerwork / wscl));
174
#endif // WORKQUEUE_TIMING
175
176
        pthread_mutex_unlock(&m_mutex);
177
        return ok();
178
    }
179
180
181
    /** Tell the workers to exit, and wait for them. Does not bother about
182
     * tasks possibly remaining on the queue, so should be called
183
     * after waitIdle() for an orderly shutdown.
184
     */
185
    void* setTerminateAndWait()
186
    {
187
        LOGDEB(("setTerminateAndWait:%s\n", m_name.c_str()));
188
        pthread_mutex_lock(&m_mutex);
189
190
  if (m_worker_threads.empty()) {
191
      // Already called ?
192
      return (void*)0;
81
    }
193
    }
82
194
83
  m_queue.push(t);
195
  // Wait for all worker threads to have called workerExit()
84
  ++m_size;
196
        m_ok = false;
197
        while (m_workers_exited < m_worker_threads.size()) {
85
  pthread_cond_broadcast(&m_cond);
198
            pthread_cond_broadcast(&m_cond);
86
  pthread_mutex_unlock(&m_mutex);
87
  return true;
88
    }
89
90
    /** Wait until the queue is empty and the worker is
91
     *  back waiting for task. Called from the client when it needs to
92
     *  perform work that couldn't be done in parallel with the
93
     *  worker's tasks.
94
     */
95
    bool waitIdle()
96
    {
97
  if (!ok() || pthread_mutex_lock(&m_mutex) != 0) 
98
      return false;
99
100
  // We're done when the queue is empty AND the worker is back
101
  // for a task (has finished the last)
102
  while (ok() && (m_queue.size() > 0 || !m_worker_waiting)) {
103
      if (pthread_cond_wait(&m_cond, &m_mutex)) {
199
            if (pthread_cond_wait(&m_cond, &m_mutex)) {
200
                pthread_mutex_unlock(&m_mutex);
201
                LOGERR(("WorkQueue::setTerminate: cond_wait failed\n"));
202
                return false;
203
            }
204
        }
205
206
  // Perform the thread joins and compute overall status
207
        // Workers return (void*)1 if ok
208
        void *statusall = (void*)1;
209
        unordered_map<pthread_t,  WQTData>::iterator it;
210
        while (!m_worker_threads.empty()) {
211
            void *status;
212
            it = m_worker_threads.begin();
213
            pthread_join(it->first, &status);
214
            if (status == (void *)0)
215
                statusall = status;
216
            m_worker_threads.erase(it);
217
        }
104
      pthread_mutex_unlock(&m_mutex);
218
        pthread_mutex_unlock(&m_mutex);
105
      return false;
219
        LOGDEB(("setTerminateAndWait:%s done\n", m_name.c_str()));
220
        return statusall;
106
      }
221
    }
107
  }
108
  pthread_mutex_unlock(&m_mutex);
109
  return ok();
110
    }
111
222
112
    /** Tell the worker to exit, and wait for it. There may still
223
    /** Take task from queue. Called from worker.
113
  be tasks on the queue. */
224
     * 
114
    void* setTerminateAndWait()
115
    {
116
  if (!m_worker_up)
117
      return (void *)0;
118
119
  pthread_mutex_lock(&m_mutex);
120
  m_ok = false;
121
  pthread_cond_broadcast(&m_cond);
122
  pthread_mutex_unlock(&m_mutex);
123
  void *status;
124
  pthread_join(m_worker_thread, &status);
125
  m_worker_up = false;
126
  return status;
127
    }
128
129
    /** Remove task from queue. Sleep if there are not enough. Signal if we go
225
     * Sleeps if there are not enough. Signal if we go
130
  to sleep on empty queue: client may be waiting for our going idle */
226
     * to sleep on empty queue: client may be waiting for our going idle.
227
     */
131
    bool take(T* tp)
228
    bool take(T* tp)
132
    {
229
    {
133
  if (!ok() || pthread_mutex_lock(&m_mutex) != 0)
230
        if (!ok() || pthread_mutex_lock(&m_mutex) != 0)
134
      return false;
231
            return false;
135
232
233
#ifdef WORKQUEUE_TIMING
234
        struct timespec beforesleep;
235
        clock_gettime(CLOCK_MONOTONIC, &beforesleep);
236
237
        pthread_t me = pthread_self();
238
        unordered_map<pthread_t, WQTData>::iterator it = 
239
            m_worker_threads.find(me);
240
        if (it != m_worker_threads.end() && 
241
            it->second.wstart.tv_sec != 0 && it->second.wstart.tv_nsec != 0) {
242
            long long nanos = nanodiff(it->second.wstart, beforesleep);
243
            m_workerwork += nanos;
244
        }
245
#endif
246
136
  while (ok() && m_queue.size() < m_low) {
247
        while (ok() && m_queue.size() < m_low) {
137
      m_worker_waiting = true;
248
            m_workers_waiting++;
138
      if (m_queue.empty())
249
            if (m_queue.empty())
139
      pthread_cond_broadcast(&m_cond);
250
                pthread_cond_broadcast(&m_cond);
140
      if (pthread_cond_wait(&m_cond, &m_mutex) || !ok()) {
251
            if (pthread_cond_wait(&m_cond, &m_mutex) || !ok()) {
252
      // !ok is a normal condition when shutting down
253
      if (ok())
254
          LOGERR(("WorkQueue::take:%s: cond_wait failed or !ok\n",
255
              m_name.c_str()));
256
                pthread_mutex_unlock(&m_mutex);
257
                m_workers_waiting--;
258
                return false;
259
            }
260
            m_workers_waiting--;
261
        }
262
263
#ifdef WORKQUEUE_TIMING
264
        struct timespec aftersleep;
265
        clock_gettime(CLOCK_MONOTONIC, &aftersleep);
266
        m_workerwait += nanodiff(beforesleep, aftersleep);
267
        it = m_worker_threads.find(me);
268
        if (it != m_worker_threads.end())
269
            it->second.wstart = aftersleep;
270
#endif
271
272
        ++m_jobcnt;
273
        *tp = m_queue.front();
274
        m_queue.pop();
275
        --m_size;
276
  // No reason to wake up more than one client thread
277
        pthread_cond_signal(&m_cond);
141
      pthread_mutex_unlock(&m_mutex);
278
        pthread_mutex_unlock(&m_mutex);
142
      m_worker_waiting = false;
279
        return true;
143
      return false;
144
      }
280
    }
145
      m_worker_waiting = false;
146
  }
147
281
148
  ++m_jobcnt;
282
    /** Advertise exit and abort queue. Called from worker
149
  m_lenacc += m_size;
283
     * This would normally happen after an unrecoverable error, or when 
150
284
     * the queue is terminated by the client. Workers never exit normally,
151
  *tp = m_queue.front();
285
     * except when the queue is shut down (at which point m_ok is set to false
152
  m_queue.pop();
286
     * by the shutdown code anyway). The thread must return/exit immediately 
153
  --m_size;
287
     * after calling this
154
288
     */
155
  pthread_cond_broadcast(&m_cond);
156
  pthread_mutex_unlock(&m_mutex);
157
  return true;
158
    }
159
160
    /** Take note of the worker exit. This would normally happen after an
161
  unrecoverable error */
162
    void workerExit()
289
    void workerExit()
163
    {
290
    {
164
  if (!ok() || pthread_mutex_lock(&m_mutex) != 0)
291
        if (pthread_mutex_lock(&m_mutex) != 0)
165
      return;
292
            return;
293
        m_workers_exited++;
166
  m_ok = false;
294
        m_ok = false;
167
  pthread_cond_broadcast(&m_cond);
295
        pthread_cond_broadcast(&m_cond);
168
  pthread_mutex_unlock(&m_mutex);
296
        pthread_mutex_unlock(&m_mutex);
169
    }
297
    }
170
298
171
    /** Debug only: as the size is returned while the queue is unlocked, there
299
    /** Return current queue size. Debug only.
172
     *  is no warranty on its consistency. Not that we use the member size, not 
173
     *  the container size() call which would need locking.
174
     */
300
     *
175
    size_t size() {return m_size;}
301
     *  As the size is returned while the queue is unlocked, there
302
     *  is no warranty on its consistency. Not that we use the member
303
     *  size, not the container size() call which would need locking.
304
     */
305
    size_t size() 
306
    {
307
        return m_size;
308
    }
176
309
177
private:
310
private:
178
    bool ok() {return m_ok && m_worker_up;}
311
    bool ok() 
312
    {
313
        return m_ok && m_workers_exited == 0 && !m_worker_threads.empty();
314
    }
179
315
316
    long long nanodiff(const struct timespec& older, 
317
                       const struct timespec& newer)
318
    {
319
        return (newer.tv_sec - older.tv_sec) * 1000000000LL
320
            + newer.tv_nsec - older.tv_nsec;
321
    }
322
323
    string m_name;
180
    size_t m_high;
324
    size_t m_high;
181
    size_t m_low; 
325
    size_t m_low; 
182
    size_t m_size;
326
    size_t m_size;
183
    bool m_worker_up;
327
    /* Worker threads currently waiting for a job */
184
    bool m_worker_waiting;
328
    unsigned int m_workers_waiting;
329
    unsigned int m_workers_exited;
330
    /* Stats */
185
    int m_jobcnt;
331
    int m_jobcnt;
186
    int m_lenacc;
332
    long long m_clientwait;
333
    long long m_workerwait;
334
    long long m_workerwork;
187
335
188
    pthread_t m_worker_thread;
336
    unordered_map<pthread_t, WQTData> m_worker_threads;
189
    queue<T> m_queue;
337
    queue<T> m_queue;
190
    pthread_cond_t m_cond;
338
    pthread_cond_t m_cond;
191
    pthread_mutex_t m_mutex;
339
    pthread_mutex_t m_mutex;
192
    bool m_ok;
340
    bool m_ok;
193
};
341
};