Switch to unified view

a b/src/workqueue.hxx
1
/*     Copyright (C) 2012 J.F.Dockes
2
 *     This program is free software; you can redistribute it and/or modify
3
 *     it under the terms of the GNU General Public License as published by
4
 *     the Free Software Foundation; either version 2 of the License, or
5
 *     (at your option) any later version.
6
 *
7
 *     This program is distributed in the hope that it will be useful,
8
 *     but WITHOUT ANY WARRANTY; without even the implied warranty of
9
 *     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10
 *     GNU General Public License for more details.
11
 *
12
 *     You should have received a copy of the GNU General Public License
13
 *     along with this program; if not, write to the
14
 *     Free Software Foundation, Inc.,
15
 *     59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
16
 */
17
#ifndef _WORKQUEUE_H_INCLUDED_
18
#define _WORKQUEUE_H_INCLUDED_
19
20
#include <pthread.h>
21
#include <time.h>
22
23
#include <string>
24
#include <queue>
25
#include <unordered_map>
26
27
#include "ptmutex.hxx"
28
29
namespace UPnPP {
30
31
/// Store per-worker-thread data. Just an initialized timespec, and
32
/// used at the moment.
33
class WQTData {
34
public:
35
  WQTData() {wstart.tv_sec = 0; wstart.tv_nsec = 0;}
36
  struct timespec wstart;
37
};
38
39
/**
40
 * A WorkQueue manages the synchronisation around a queue of work items,
41
 * where a number of client threads queue tasks and a number of worker
42
 * threads take and execute them. The goal is to introduce some level
43
 * of parallelism between the successive steps of a previously single
44
 * threaded pipeline. For example data extraction / data preparation / index
45
 * update, but this could have other uses.
46
 *
47
 * There is no individual task status return. In case of fatal error,
48
 * the client or worker sets an end condition on the queue. A second
49
 * queue could conceivably be used for returning individual task
50
 * status.
51
 */
52
template <class T> class WorkQueue {
53
public:
54
55
  /** Create a WorkQueue
56
   * @param name for message printing
57
   * @param hi number of tasks on queue before clients blocks. Default 0
58
   *    meaning no limit. hi == -1 means that the queue is disabled.
59
   * @param lo minimum count of tasks before worker starts. Default 1.
60
   */
61
  WorkQueue(const std::string& name, size_t hi = 0, size_t lo = 1)
62
      : m_name(name), m_high(hi), m_low(lo),
63
        m_workers_exited(0), m_clients_waiting(0), m_workers_waiting(0),
64
        m_tottasks(0), m_nowake(0), m_workersleeps(0), m_clientsleeps(0)
65
  {
66
      m_ok = (pthread_cond_init(&m_ccond, 0) == 0) &&
67
          (pthread_cond_init(&m_wcond, 0) == 0);
68
  }
69
70
  ~WorkQueue()
71
  {
72
      if (!m_worker_threads.empty())
73
          setTerminateAndWait();
74
  }
75
76
  /** Start the worker threads.
77
   *
78
   * @param nworkers number of threads copies to start.
79
   * @param start_routine thread function. It should loop
80
   *      taking (QueueWorker::take()) and executing tasks.
81
   * @param arg initial parameter to thread function.
82
   * @return true if ok.
83
   */
84
  bool start(int nworkers, void *(*workproc)(void *), void *arg)
85
  {
86
      PTMutexLocker lock(m_mutex);
87
      for  (int i = 0; i < nworkers; i++) {
88
          int err;
89
          pthread_t thr;
90
          if ((err = pthread_create(&thr, 0, workproc, arg))) {
91
              return false;
92
          }
93
          m_worker_threads.insert(std::pair<pthread_t, WQTData>(thr, WQTData()));
94
      }
95
      return true;
96
  }
97
98
  /** Add item to work queue, called from client.
99
   *
100
   * Sleeps if there are already too many.
101
   */
102
  bool put(T t, bool flushprevious = false)
103
  {
104
      PTMutexLocker lock(m_mutex);
105
      if (!lock.ok() || !ok()) {
106
          return false;
107
      }
108
109
      while (ok() && m_high > 0 && m_queue.size() >= m_high) {
110
          m_clientsleeps++;
111
          // Keep the order: we test ok() AFTER the sleep...
112
          m_clients_waiting++;
113
          if (pthread_cond_wait(&m_ccond, lock.getMutex()) || !ok()) {
114
              m_clients_waiting--;
115
              return false;
116
          }
117
          m_clients_waiting--;
118
      }
119
      if (flushprevious) {
120
          while (!m_queue.empty())
121
              m_queue.pop();
122
      }
123
      m_queue.push(t);
124
      if (m_workers_waiting > 0) {
125
          // Just wake one worker, there is only one new task.
126
          pthread_cond_signal(&m_wcond);
127
      } else {
128
          m_nowake++;
129
      }
130
131
      return true;
132
  }
133
134
  /** Wait until the queue is inactive. Called from client.
135
   *
136
   * Waits until the task queue is empty and the workers are all
137
   * back sleeping. Used by the client to wait for all current work
138
   * to be completed, when it needs to perform work that couldn't be
139
   * done in parallel with the worker's tasks, or before shutting
140
   * down. Work can be resumed after calling this. Note that the
141
   * only thread which can call it safely is the client just above
142
   * (which can control the task flow), else there could be
143
   * tasks in the intermediate queues.
144
   * To rephrase: there is no warranty on return that the queue is actually
145
   * idle EXCEPT if the caller knows that no jobs are still being created.
146
   * It would be possible to transform this into a safe call if some kind
147
   * of suspend condition was set on the queue by waitIdle(), to be reset by
148
   * some kind of "resume" call. Not currently the case.
149
   */
150
  bool waitIdle()
151
  {
152
      PTMutexLocker lock(m_mutex);
153
      if (!lock.ok() || !ok()) {
154
          return false;
155
      }
156
157
      // We're done when the queue is empty AND all workers are back
158
      // waiting for a task.
159
      while (ok() && (m_queue.size() > 0 ||
160
                      m_workers_waiting != m_worker_threads.size())) {
161
          m_clients_waiting++;
162
          if (pthread_cond_wait(&m_ccond, lock.getMutex())) {
163
              m_clients_waiting--;
164
              m_ok = false;
165
              return false;
166
          }
167
          m_clients_waiting--;
168
      }
169
170
      return ok();
171
  }
172
173
174
  /** Tell the workers to exit, and wait for them.
175
   *
176
   * Does not bother about tasks possibly remaining on the queue, so
177
   * should be called after waitIdle() for an orderly shutdown.
178
   */
179
  void* setTerminateAndWait()
180
  {
181
      PTMutexLocker lock(m_mutex);
182
183
      if (m_worker_threads.empty()) {
184
          // Already called ?
185
          return (void*)0;
186
      }
187
188
      // Wait for all worker threads to have called workerExit()
189
      m_ok = false;
190
      while (m_workers_exited < m_worker_threads.size()) {
191
          pthread_cond_broadcast(&m_wcond);
192
          m_clients_waiting++;
193
          if (pthread_cond_wait(&m_ccond, lock.getMutex())) {
194
              m_clients_waiting--;
195
              return (void*)0;
196
          }
197
          m_clients_waiting--;
198
      }
199
200
      // Perform the thread joins and compute overall status
201
      // Workers return (void*)1 if ok
202
      void *statusall = (void*)1;
203
      std::unordered_map<pthread_t,  WQTData>::iterator it;
204
      while (!m_worker_threads.empty()) {
205
          void *status;
206
          it = m_worker_threads.begin();
207
          pthread_join(it->first, &status);
208
          if (status == (void *)0)
209
              statusall = status;
210
          m_worker_threads.erase(it);
211
      }
212
213
      // Reset to start state.
214
      m_workers_exited = m_clients_waiting = m_workers_waiting =
215
          m_tottasks = m_nowake = m_workersleeps = m_clientsleeps = 0;
216
      m_ok = true;
217
218
      return statusall;
219
  }
220
221
  /** Take task from queue. Called from worker.
222
   *
223
   * Sleeps if there are not enough. Signal if we go to sleep on empty
224
   * queue: client may be waiting for our going idle.
225
   */
226
  bool take(T* tp, size_t *szp = 0)
227
  {
228
      PTMutexLocker lock(m_mutex);
229
      if (!lock.ok() || !ok()) {
230
          return false;
231
      }
232
233
      while (ok() && m_queue.size() < m_low) {
234
          m_workersleeps++;
235
          m_workers_waiting++;
236
          if (m_queue.empty())
237
              pthread_cond_broadcast(&m_ccond);
238
          if (pthread_cond_wait(&m_wcond, lock.getMutex()) || !ok()) {
239
              m_workers_waiting--;
240
              return false;
241
          }
242
          m_workers_waiting--;
243
      }
244
245
      m_tottasks++;
246
      *tp = m_queue.front();
247
      if (szp)
248
          *szp = m_queue.size();
249
      m_queue.pop();
250
      if (m_clients_waiting > 0) {
251
          // No reason to wake up more than one client thread
252
          pthread_cond_signal(&m_ccond);
253
      } else {
254
          m_nowake++;
255
      }
256
      return true;
257
  }
258
259
  /** Advertise exit and abort queue. Called from worker
260
   *
261
   * This would happen after an unrecoverable error, or when
262
   * the queue is terminated by the client. Workers never exit normally,
263
   * except when the queue is shut down (at which point m_ok is set to
264
   * false by the shutdown code anyway). The thread must return/exit
265
   * immediately after calling this.
266
   */
267
  void workerExit()
268
  {
269
      PTMutexLocker lock(m_mutex);
270
      m_workers_exited++;
271
      m_ok = false;
272
      pthread_cond_broadcast(&m_ccond);
273
  }
274
275
  size_t qsize()
276
  {
277
      PTMutexLocker lock(m_mutex);
278
      size_t sz = m_queue.size();
279
      return sz;
280
  }
281
282
private:
283
  bool ok()
284
  {
285
      bool isok = m_ok && m_workers_exited == 0 && !m_worker_threads.empty();
286
      return isok;
287
  }
288
289
  long long nanodiff(const struct timespec& older,
290
                     const struct timespec& newer)
291
  {
292
      return (newer.tv_sec - older.tv_sec) * 1000000000LL
293
          + newer.tv_nsec - older.tv_nsec;
294
  }
295
296
  // Configuration
297
  std::string m_name;
298
  size_t m_high;
299
  size_t m_low;
300
301
  // Status
302
  // Worker threads having called exit
303
  unsigned int m_workers_exited;
304
  bool m_ok;
305
306
  // Per-thread data. The data is not used currently, this could be
307
  // a set<pthread_t>
308
  std::unordered_map<pthread_t, WQTData> m_worker_threads;
309
310
  // Synchronization
311
  std::queue<T> m_queue;
312
  pthread_cond_t m_ccond;
313
  pthread_cond_t m_wcond;
314
  PTMutexInit m_mutex;
315
  // Client/Worker threads currently waiting for a job
316
  unsigned int m_clients_waiting;
317
  unsigned int m_workers_waiting;
318
319
  // Statistics
320
  unsigned int m_tottasks;
321
  unsigned int m_nowake;
322
  unsigned int m_workersleeps;
323
  unsigned int m_clientsleeps;
324
};
325
326
} // namespace
327
328
#endif /* _WORKQUEUE_H_INCLUDED_ */