Switch to unified view

a b/src/workqueue.h
1
/*     Copyright (C) 2012 J.F.Dockes
2
 *     This program is free software; you can redistribute it and/or modify
3
 *     it under the terms of the GNU General Public License as published by
4
 *     the Free Software Foundation; either version 2 of the License, or
5
 *     (at your option) any later version.
6
 *
7
 *     This program is distributed in the hope that it will be useful,
8
 *     but WITHOUT ANY WARRANTY; without even the implied warranty of
9
 *     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10
 *     GNU General Public License for more details.
11
 *
12
 *     You should have received a copy of the GNU General Public License
13
 *     along with this program; if not, write to the
14
 *     Free Software Foundation, Inc.,
15
 *     59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
16
 */
17
#ifndef _WORKQUEUE_H_INCLUDED_
18
#define _WORKQUEUE_H_INCLUDED_
19
20
#include <pthread.h>
21
#include <time.h>
22
23
#include <string>
24
#include <queue>
25
#include <unordered_map>
26
27
#include "ptmutex.h"
28
29
/// Store per-worker-thread data. Just an initialized timespec, and
30
/// used at the moment.
31
class WQTData {
32
public:
33
  WQTData() {wstart.tv_sec = 0; wstart.tv_nsec = 0;}
34
  struct timespec wstart;
35
};
36
37
/**
38
 * A WorkQueue manages the synchronisation around a queue of work items,
39
 * where a number of client threads queue tasks and a number of worker
40
 * threads take and execute them. The goal is to introduce some level
41
 * of parallelism between the successive steps of a previously single
42
 * threaded pipeline. For example data extraction / data preparation / index
43
 * update, but this could have other uses.
44
 *
45
 * There is no individual task status return. In case of fatal error,
46
 * the client or worker sets an end condition on the queue. A second
47
 * queue could conceivably be used for returning individual task
48
 * status.
49
 */
50
template <class T> class WorkQueue {
51
public:
52
53
  /** Create a WorkQueue
54
   * @param name for message printing
55
   * @param hi number of tasks on queue before clients blocks. Default 0
56
   *    meaning no limit. hi == -1 means that the queue is disabled.
57
   * @param lo minimum count of tasks before worker starts. Default 1.
58
   */
59
  WorkQueue(const std::string& name, size_t hi = 0, size_t lo = 1)
60
      : m_name(name), m_high(hi), m_low(lo),
61
        m_workers_exited(0), m_clients_waiting(0), m_workers_waiting(0),
62
        m_tottasks(0), m_nowake(0), m_workersleeps(0), m_clientsleeps(0)
63
  {
64
      m_ok = (pthread_cond_init(&m_ccond, 0) == 0) &&
65
          (pthread_cond_init(&m_wcond, 0) == 0);
66
  }
67
68
  ~WorkQueue()
69
  {
70
      if (!m_worker_threads.empty())
71
          setTerminateAndWait();
72
  }
73
74
  /** Start the worker threads.
75
   *
76
   * @param nworkers number of threads copies to start.
77
   * @param start_routine thread function. It should loop
78
   *      taking (QueueWorker::take()) and executing tasks.
79
   * @param arg initial parameter to thread function.
80
   * @return true if ok.
81
   */
82
  bool start(int nworkers, void *(*workproc)(void *), void *arg)
83
  {
84
      PTMutexLocker lock(m_mutex);
85
      for  (int i = 0; i < nworkers; i++) {
86
          int err;
87
          pthread_t thr;
88
          if ((err = pthread_create(&thr, 0, workproc, arg))) {
89
              return false;
90
          }
91
          m_worker_threads.insert(std::pair<pthread_t, WQTData>(thr, WQTData()));
92
      }
93
      return true;
94
  }
95
96
  /** Add item to work queue, called from client.
97
   *
98
   * Sleeps if there are already too many.
99
   */
100
  bool put(T t, bool flushprevious = false)
101
  {
102
      PTMutexLocker lock(m_mutex);
103
      if (!lock.ok() || !ok()) {
104
          return false;
105
      }
106
107
      while (ok() && m_high > 0 && m_queue.size() >= m_high) {
108
          m_clientsleeps++;
109
          // Keep the order: we test ok() AFTER the sleep...
110
          m_clients_waiting++;
111
          if (pthread_cond_wait(&m_ccond, lock.getMutex()) || !ok()) {
112
              m_clients_waiting--;
113
              return false;
114
          }
115
          m_clients_waiting--;
116
      }
117
      if (flushprevious) {
118
          while (!m_queue.empty())
119
              m_queue.pop();
120
      }
121
      m_queue.push(t);
122
      if (m_workers_waiting > 0) {
123
          // Just wake one worker, there is only one new task.
124
          pthread_cond_signal(&m_wcond);
125
      } else {
126
          m_nowake++;
127
      }
128
129
      return true;
130
  }
131
132
  /** Wait until the queue is inactive. Called from client.
133
   *
134
   * Waits until the task queue is empty and the workers are all
135
   * back sleeping. Used by the client to wait for all current work
136
   * to be completed, when it needs to perform work that couldn't be
137
   * done in parallel with the worker's tasks, or before shutting
138
   * down. Work can be resumed after calling this. Note that the
139
   * only thread which can call it safely is the client just above
140
   * (which can control the task flow), else there could be
141
   * tasks in the intermediate queues.
142
   * To rephrase: there is no warranty on return that the queue is actually
143
   * idle EXCEPT if the caller knows that no jobs are still being created.
144
   * It would be possible to transform this into a safe call if some kind
145
   * of suspend condition was set on the queue by waitIdle(), to be reset by
146
   * some kind of "resume" call. Not currently the case.
147
   */
148
  bool waitIdle()
149
  {
150
      PTMutexLocker lock(m_mutex);
151
      if (!lock.ok() || !ok()) {
152
          return false;
153
      }
154
155
      // We're done when the queue is empty AND all workers are back
156
      // waiting for a task.
157
      while (ok() && (m_queue.size() > 0 ||
158
                      m_workers_waiting != m_worker_threads.size())) {
159
          m_clients_waiting++;
160
          if (pthread_cond_wait(&m_ccond, lock.getMutex())) {
161
              m_clients_waiting--;
162
              m_ok = false;
163
              return false;
164
          }
165
          m_clients_waiting--;
166
      }
167
168
      return ok();
169
  }
170
171
172
  /** Tell the workers to exit, and wait for them.
173
   *
174
   * Does not bother about tasks possibly remaining on the queue, so
175
   * should be called after waitIdle() for an orderly shutdown.
176
   */
177
  void* setTerminateAndWait()
178
  {
179
      PTMutexLocker lock(m_mutex);
180
181
      if (m_worker_threads.empty()) {
182
          // Already called ?
183
          return (void*)0;
184
      }
185
186
      // Wait for all worker threads to have called workerExit()
187
      m_ok = false;
188
      while (m_workers_exited < m_worker_threads.size()) {
189
          pthread_cond_broadcast(&m_wcond);
190
          m_clients_waiting++;
191
          if (pthread_cond_wait(&m_ccond, lock.getMutex())) {
192
              m_clients_waiting--;
193
              return (void*)0;
194
          }
195
          m_clients_waiting--;
196
      }
197
198
      // Perform the thread joins and compute overall status
199
      // Workers return (void*)1 if ok
200
      void *statusall = (void*)1;
201
      std::unordered_map<pthread_t,  WQTData>::iterator it;
202
      while (!m_worker_threads.empty()) {
203
          void *status;
204
          it = m_worker_threads.begin();
205
          pthread_join(it->first, &status);
206
          if (status == (void *)0)
207
              statusall = status;
208
          m_worker_threads.erase(it);
209
      }
210
211
      // Reset to start state.
212
      m_workers_exited = m_clients_waiting = m_workers_waiting =
213
          m_tottasks = m_nowake = m_workersleeps = m_clientsleeps = 0;
214
      m_ok = true;
215
216
      return statusall;
217
  }
218
219
  /** Take task from queue. Called from worker.
220
   *
221
   * Sleeps if there are not enough. Signal if we go to sleep on empty
222
   * queue: client may be waiting for our going idle.
223
   */
224
  bool take(T* tp, size_t *szp = 0)
225
  {
226
      PTMutexLocker lock(m_mutex);
227
      if (!lock.ok() || !ok()) {
228
          return false;
229
      }
230
231
      while (ok() && m_queue.size() < m_low) {
232
          m_workersleeps++;
233
          m_workers_waiting++;
234
          if (m_queue.empty())
235
              pthread_cond_broadcast(&m_ccond);
236
          if (pthread_cond_wait(&m_wcond, lock.getMutex()) || !ok()) {
237
              m_workers_waiting--;
238
              return false;
239
          }
240
          m_workers_waiting--;
241
      }
242
243
      m_tottasks++;
244
      *tp = m_queue.front();
245
      if (szp)
246
          *szp = m_queue.size();
247
      m_queue.pop();
248
      if (m_clients_waiting > 0) {
249
          // No reason to wake up more than one client thread
250
          pthread_cond_signal(&m_ccond);
251
      } else {
252
          m_nowake++;
253
      }
254
      return true;
255
  }
256
257
  /** Advertise exit and abort queue. Called from worker
258
   *
259
   * This would happen after an unrecoverable error, or when
260
   * the queue is terminated by the client. Workers never exit normally,
261
   * except when the queue is shut down (at which point m_ok is set to
262
   * false by the shutdown code anyway). The thread must return/exit
263
   * immediately after calling this.
264
   */
265
  void workerExit()
266
  {
267
      PTMutexLocker lock(m_mutex);
268
      m_workers_exited++;
269
      m_ok = false;
270
      pthread_cond_broadcast(&m_ccond);
271
  }
272
273
  size_t qsize()
274
  {
275
      PTMutexLocker lock(m_mutex);
276
      size_t sz = m_queue.size();
277
      return sz;
278
  }
279
280
private:
281
  bool ok()
282
  {
283
      bool isok = m_ok && m_workers_exited == 0 && !m_worker_threads.empty();
284
      return isok;
285
  }
286
287
  long long nanodiff(const struct timespec& older,
288
                     const struct timespec& newer)
289
  {
290
      return (newer.tv_sec - older.tv_sec) * 1000000000LL
291
          + newer.tv_nsec - older.tv_nsec;
292
  }
293
294
  // Configuration
295
  std::string m_name;
296
  size_t m_high;
297
  size_t m_low;
298
299
  // Status
300
  // Worker threads having called exit
301
  unsigned int m_workers_exited;
302
  bool m_ok;
303
304
  // Per-thread data. The data is not used currently, this could be
305
  // a set<pthread_t>
306
  std::unordered_map<pthread_t, WQTData> m_worker_threads;
307
308
  // Synchronization
309
  std::queue<T> m_queue;
310
  pthread_cond_t m_ccond;
311
  pthread_cond_t m_wcond;
312
  PTMutexInit m_mutex;
313
  // Client/Worker threads currently waiting for a job
314
  unsigned int m_clients_waiting;
315
  unsigned int m_workers_waiting;
316
317
  // Statistics
318
  unsigned int m_tottasks;
319
  unsigned int m_nowake;
320
  unsigned int m_workersleeps;
321
  unsigned int m_clientsleeps;
322
};
323
324
#endif /* _WORKQUEUE_H_INCLUDED_ */