Switch to unified view

a/src/workqueue.h b/src/workqueue.h
...
...
28
28
29
/// Store per-worker-thread data. Just an initialized timespec, and
29
/// Store per-worker-thread data. Just an initialized timespec, and
30
/// used at the moment.
30
/// used at the moment.
31
class WQTData {
31
class WQTData {
32
public:
32
public:
33
  WQTData() {wstart.tv_sec = 0; wstart.tv_nsec = 0;}
33
    WQTData() {wstart.tv_sec = 0; wstart.tv_nsec = 0;}
34
  struct timespec wstart;
34
    struct timespec wstart;
35
};
35
};
36
36
37
/**
37
/**
38
 * A WorkQueue manages the synchronisation around a queue of work items,
38
 * A WorkQueue manages the synchronisation around a queue of work items,
39
 * where a number of client threads queue tasks and a number of worker
39
 * where a number of client threads queue tasks and a number of worker
...
...
48
 * status.
48
 * status.
49
 */
49
 */
50
template <class T> class WorkQueue {
50
template <class T> class WorkQueue {
51
public:
51
public:
52
52
53
  /** Create a WorkQueue
53
    /** Create a WorkQueue
54
   * @param name for message printing
54
     * @param name for message printing
55
   * @param hi number of tasks on queue before clients blocks. Default 0
55
     * @param hi number of tasks on queue before clients blocks. Default 0
56
   *   meaning no limit. hi == -1 means that the queue is disabled.
56
     *     meaning no limit. hi == -1 means that the queue is disabled.
57
   * @param lo minimum count of tasks before worker starts. Default 1.
57
     * @param lo minimum count of tasks before worker starts. Default 1.
58
   */
58
     */
59
  WorkQueue(const std::string& name, size_t hi = 0, size_t lo = 1)
59
    WorkQueue(const std::string& name, size_t hi = 0, size_t lo = 1)
60
      : m_name(name), m_high(hi), m_low(lo),
60
        : m_name(name), m_high(hi), m_low(lo),
61
        m_workers_exited(0), m_clients_waiting(0), m_workers_waiting(0),
61
          m_workers_exited(0), m_clients_waiting(0), m_workers_waiting(0),
62
        m_tottasks(0), m_nowake(0), m_workersleeps(0), m_clientsleeps(0)
62
          m_tottasks(0), m_nowake(0), m_workersleeps(0), m_clientsleeps(0)
63
    {
63
    {
64
      m_ok = (pthread_cond_init(&m_ccond, 0) == 0) &&
64
            m_ok = (pthread_cond_init(&m_ccond, 0) == 0) &&
65
          (pthread_cond_init(&m_wcond, 0) == 0);
65
                (pthread_cond_init(&m_wcond, 0) == 0);
66
    }
66
    }
67
67
68
  ~WorkQueue()
68
    ~WorkQueue()
69
    {
69
    {
70
      if (!m_worker_threads.empty())
70
            if (!m_worker_threads.empty())
71
          setTerminateAndWait();
71
                setTerminateAndWait();
72
    }
72
    }
73
73
74
  /** Start the worker threads.
74
    /** Start the worker threads.
75
   *
75
     *
76
   * @param nworkers number of threads copies to start.
76
     * @param nworkers number of threads copies to start.
77
   * @param start_routine thread function. It should loop
77
     * @param start_routine thread function. It should loop
78
   *   taking (QueueWorker::take()) and executing tasks.
78
     *     taking (WorkQueue::take()) and executing tasks.
79
   * @param arg initial parameter to thread function.
79
     * @param arg initial parameter to thread function.
80
   * @return true if ok.
80
     * @return true if ok.
81
   */
81
     */
82
  bool start(int nworkers, void *(*workproc)(void *), void *arg)
82
    bool start(int nworkers, void *(*workproc)(void *), void *arg)
83
    {
83
    {
84
      PTMutexLocker lock(m_mutex);
84
            PTMutexLocker lock(m_mutex);
85
      for     (int i = 0; i < nworkers; i++) {
85
            for   (int i = 0; i < nworkers; i++) {
86
          int err;
86
                int err;
87
          pthread_t thr;
87
                pthread_t thr;
88
          if ((err = pthread_create(&thr, 0, workproc, arg))) {
88
                if ((err = pthread_create(&thr, 0, workproc, arg))) {
89
              return false;
89
                    return false;
90
          }
90
                }
91
          m_worker_threads.insert(std::pair<pthread_t, WQTData>(thr, WQTData()));
91
                m_worker_threads.insert(std::pair<pthread_t, WQTData>(thr, WQTData()));
92
            }
93
            return true;
92
     }
94
    }
93
      return true;
94
  }
95
95
96
  /** Add item to work queue, called from client.
96
    /** Add item to work queue, called from client.
97
   *
97
     *
98
   * Sleeps if there are already too many.
98
     * Sleeps if there are already too many.
99
   */
99
     */
100
  bool put(T t, bool flushprevious = false)
100
    bool put(T t, bool flushprevious = false)
101
    {
101
    {
102
      PTMutexLocker lock(m_mutex);
102
            PTMutexLocker lock(m_mutex);
103
      if (!lock.ok() || !ok()) {
103
            if (!lock.ok() || !ok()) {
104
          return false;
104
                return false;
105
      }
105
            }
106
106
107
      while (ok() && m_high > 0 && m_queue.size() >= m_high) {
107
            while (ok() && m_high > 0 && m_queue.size() >= m_high) {
108
          m_clientsleeps++;
108
                m_clientsleeps++;
109
          // Keep the order: we test ok() AFTER the sleep...
109
                // Keep the order: we test ok() AFTER the sleep...
110
          m_clients_waiting++;
110
                m_clients_waiting++;
111
          if (pthread_cond_wait(&m_ccond, lock.getMutex()) || !ok()) {
111
                if (pthread_cond_wait(&m_ccond, lock.getMutex()) || !ok()) {
112
              m_clients_waiting--;
112
                    m_clients_waiting--;
113
              return false;
113
                    return false;
114
          }
114
                }
115
          m_clients_waiting--;
115
                m_clients_waiting--;
116
            }
117
            if (flushprevious) {
118
                while (!m_queue.empty())
119
                    m_queue.pop();
120
            }
121
            m_queue.push(t);
122
            if (m_workers_waiting > 0) {
123
                // Just wake one worker, there is only one new task.
124
                pthread_cond_signal(&m_wcond);
125
            } else {
126
                m_nowake++;
127
            }
128
129
            return true;
116
     }
130
    }
117
      if (flushprevious) {
118
          while (!m_queue.empty())
119
              m_queue.pop();
120
      }
121
      m_queue.push(t);
122
      if (m_workers_waiting > 0) {
123
          // Just wake one worker, there is only one new task.
124
          pthread_cond_signal(&m_wcond);
125
      } else {
126
          m_nowake++;
127
      }
128
131
129
      return true;
130
  }
131
132
  /** Wait until the queue is inactive. Called from client.
132
    /** Wait until the queue is inactive. Called from client.
133
   *
133
     *
134
   * Waits until the task queue is empty and the workers are all
134
     * Waits until the task queue is empty and the workers are all
135
   * back sleeping. Used by the client to wait for all current work
135
     * back sleeping. Used by the client to wait for all current work
136
   * to be completed, when it needs to perform work that couldn't be
136
     * to be completed, when it needs to perform work that couldn't be
137
   * done in parallel with the worker's tasks, or before shutting
137
     * done in parallel with the worker's tasks, or before shutting
138
   * down. Work can be resumed after calling this. Note that the
138
     * down. Work can be resumed after calling this. Note that the
139
   * only thread which can call it safely is the client just above
139
     * only thread which can call it safely is the client just above
140
   * (which can control the task flow), else there could be
140
     * (which can control the task flow), else there could be
141
   * tasks in the intermediate queues.
141
     * tasks in the intermediate queues.
142
   * To rephrase: there is no warranty on return that the queue is actually
142
     * To rephrase: there is no warranty on return that the queue is actually
143
   * idle EXCEPT if the caller knows that no jobs are still being created.
143
     * idle EXCEPT if the caller knows that no jobs are still being created.
144
   * It would be possible to transform this into a safe call if some kind
144
     * It would be possible to transform this into a safe call if some kind
145
   * of suspend condition was set on the queue by waitIdle(), to be reset by
145
     * of suspend condition was set on the queue by waitIdle(), to be reset by
146
   * some kind of "resume" call. Not currently the case.
146
     * some kind of "resume" call. Not currently the case.
147
   */
147
     */
148
  bool waitIdle()
148
    bool waitIdle()
149
    {
149
    {
150
      PTMutexLocker lock(m_mutex);
150
            PTMutexLocker lock(m_mutex);
151
      if (!lock.ok() || !ok()) {
151
            if (!lock.ok() || !ok()) {
152
          return false;
152
                return false;
153
      }
153
            }
154
154
155
      // We're done when the queue is empty AND all workers are back
155
            // We're done when the queue is empty AND all workers are back
156
      // waiting for a task.
156
            // waiting for a task.
157
      while (ok() && (m_queue.size() > 0 ||
157
            while (ok() && (m_queue.size() > 0 ||
158
                      m_workers_waiting != m_worker_threads.size())) {
158
                            m_workers_waiting != m_worker_threads.size())) {
159
          m_clients_waiting++;
159
                m_clients_waiting++;
160
          if (pthread_cond_wait(&m_ccond, lock.getMutex())) {
160
                if (pthread_cond_wait(&m_ccond, lock.getMutex())) {
161
              m_clients_waiting--;
161
                    m_clients_waiting--;
162
              m_ok = false;
162
                    m_ok = false;
163
              return false;
163
                    return false;
164
          }
164
                }
165
          m_clients_waiting--;
165
                m_clients_waiting--;
166
            }
167
168
            return ok();
166
     }
169
    }
167
170
168
      return ok();
169
  }
170
171
171
172
  /** Tell the workers to exit, and wait for them.
172
    /** Tell the workers to exit, and wait for them.
173
   *
173
     *
174
   * Does not bother about tasks possibly remaining on the queue, so
174
     * Does not bother about tasks possibly remaining on the queue, so
175
   * should be called after waitIdle() for an orderly shutdown.
175
     * should be called after waitIdle() for an orderly shutdown.
176
   */
176
     */
177
  void* setTerminateAndWait()
177
    void* setTerminateAndWait()
178
    {
178
    {
179
      PTMutexLocker lock(m_mutex);
179
            PTMutexLocker lock(m_mutex);
180
180
181
      if (m_worker_threads.empty()) {
181
            if (m_worker_threads.empty()) {
182
          // Already called ?
182
                // Already called ?
183
          return (void*)0;
183
                return (void*)0;
184
      }
184
            }
185
185
186
      // Wait for all worker threads to have called workerExit()
186
            // Wait for all worker threads to have called workerExit()
187
      m_ok = false;
187
            m_ok = false;
188
      while (m_workers_exited < m_worker_threads.size()) {
188
            while (m_workers_exited < m_worker_threads.size()) {
189
          pthread_cond_broadcast(&m_wcond);
189
                pthread_cond_broadcast(&m_wcond);
190
          m_clients_waiting++;
190
                m_clients_waiting++;
191
          if (pthread_cond_wait(&m_ccond, lock.getMutex())) {
191
                if (pthread_cond_wait(&m_ccond, lock.getMutex())) {
192
              m_clients_waiting--;
192
                    m_clients_waiting--;
193
              return (void*)0;
193
                    return (void*)0;
194
          }
194
                }
195
          m_clients_waiting--;
195
                m_clients_waiting--;
196
      }
196
            }
197
197
198
      // Perform the thread joins and compute overall status
198
            // Perform the thread joins and compute overall status
199
      // Workers return (void*)1 if ok
199
            // Workers return (void*)1 if ok
200
      void *statusall = (void*)1;
200
            void *statusall = (void*)1;
201
      std::unordered_map<pthread_t,  WQTData>::iterator it;
201
            std::unordered_map<pthread_t,  WQTData>::iterator it;
202
      while (!m_worker_threads.empty()) {
202
            while (!m_worker_threads.empty()) {
203
          void *status;
203
                void *status;
204
          it = m_worker_threads.begin();
204
                it = m_worker_threads.begin();
205
          pthread_join(it->first, &status);
205
                pthread_join(it->first, &status);
206
          if (status == (void *)0)
206
                if (status == (void *)0)
207
              statusall = status;
207
                    statusall = status;
208
          m_worker_threads.erase(it);
208
                m_worker_threads.erase(it);
209
      }
209
            }
210
210
211
      // Reset to start state.
211
            // Reset to start state.
212
      m_workers_exited = m_clients_waiting = m_workers_waiting =
212
            m_workers_exited = m_clients_waiting = m_workers_waiting =
213
          m_tottasks = m_nowake = m_workersleeps = m_clientsleeps = 0;
213
                m_tottasks = m_nowake = m_workersleeps = m_clientsleeps = 0;
214
      m_ok = true;
214
            m_ok = true;
215
215
216
      return statusall;
216
            return statusall;
217
    }
217
    }
218
218
219
  /** Take task from queue. Called from worker.
219
    /** Take task from queue. Called from worker.
220
   *
220
     *
221
   * Sleeps if there are not enough. Signal if we go to sleep on empty
221
     * Sleeps if there are not enough. Signal if we go to sleep on empty
222
   * queue: client may be waiting for our going idle.
222
     * queue: client may be waiting for our going idle.
223
   */
223
     */
224
  bool take(T* tp, size_t *szp = 0)
224
    bool take(T* tp, size_t *szp = 0)
225
    {
225
    {
226
      PTMutexLocker lock(m_mutex);
226
            PTMutexLocker lock(m_mutex);
227
      if (!lock.ok() || !ok()) {
227
            if (!lock.ok() || !ok()) {
228
          return false;
228
                return false;
229
            }
230
231
            while (ok() && m_queue.size() < m_low) {
232
                m_workersleeps++;
233
                m_workers_waiting++;
234
                if (m_queue.empty())
235
                    pthread_cond_broadcast(&m_ccond);
236
                if (pthread_cond_wait(&m_wcond, lock.getMutex()) || !ok()) {
237
                    m_workers_waiting--;
238
                    return false;
239
                }
240
                m_workers_waiting--;
241
            }
242
243
            m_tottasks++;
244
            *tp = m_queue.front();
245
            if (szp)
246
                *szp = m_queue.size();
247
            m_queue.pop();
248
            if (m_clients_waiting > 0) {
249
                // No reason to wake up more than one client thread
250
                pthread_cond_signal(&m_ccond);
251
            } else {
252
                m_nowake++;
253
            }
254
            return true;
229
     }
255
    }
256
      
257
    bool waitminsz(size_t sz) {
258
        PTMutexLocker lock(m_mutex);
259
        if (!lock.ok() || !ok()) {
260
            return false;
261
        }
230
262
231
      while (ok() && m_queue.size() < m_low) {
263
        while (ok() && m_queue.size() < sz) {
232
          m_workersleeps++;
264
            m_workersleeps++;
233
          m_workers_waiting++;
265
            m_workers_waiting++;
234
          if (m_queue.empty())
266
            if (m_queue.empty())
235
              pthread_cond_broadcast(&m_ccond);
267
                pthread_cond_broadcast(&m_ccond);
236
          if (pthread_cond_wait(&m_wcond, lock.getMutex()) || !ok()) {
268
            if (pthread_cond_wait(&m_wcond, lock.getMutex()) || !ok()) {
237
              m_workers_waiting--;
269
                m_workers_waiting--;
238
              return false;
270
                return false;
239
          }
271
            }
240
          m_workers_waiting--;
272
            m_workers_waiting--;
241
      }
273
        }
274
        return true;
275
    }
242
276
243
      m_tottasks++;
244
      *tp = m_queue.front();
245
      if (szp)
246
          *szp = m_queue.size();
247
      m_queue.pop();
248
      if (m_clients_waiting > 0) {
249
          // No reason to wake up more than one client thread
250
          pthread_cond_signal(&m_ccond);
251
      } else {
252
          m_nowake++;
253
      }
254
      return true;
255
  }
256
257
  /** Advertise exit and abort queue. Called from worker
277
    /** Advertise exit and abort queue. Called from worker
258
   *
278
     *
259
   * This would happen after an unrecoverable error, or when
279
     * This would happen after an unrecoverable error, or when
260
   * the queue is terminated by the client. Workers never exit normally,
280
     * the queue is terminated by the client. Workers never exit normally,
261
   * except when the queue is shut down (at which point m_ok is set to
281
     * except when the queue is shut down (at which point m_ok is set to
262
   * false by the shutdown code anyway). The thread must return/exit
282
     * false by the shutdown code anyway). The thread must return/exit
263
   * immediately after calling this.
283
     * immediately after calling this.
264
   */
284
     */
265
  void workerExit()
285
    void workerExit()
266
    {
286
    {
267
      PTMutexLocker lock(m_mutex);
287
            PTMutexLocker lock(m_mutex);
268
      m_workers_exited++;
288
            m_workers_exited++;
269
      m_ok = false;
289
            m_ok = false;
270
      pthread_cond_broadcast(&m_ccond);
290
            pthread_cond_broadcast(&m_ccond);
271
    }
291
    }
272
292
273
  size_t qsize()
293
    size_t qsize()
274
    {
294
    {
275
      PTMutexLocker lock(m_mutex);
295
            PTMutexLocker lock(m_mutex);
276
      size_t sz = m_queue.size();
296
            size_t sz = m_queue.size();
277
      return sz;
297
            return sz;
278
    }
298
    }
279
299
280
private:
300
private:
281
  bool ok()
301
    bool ok()
282
    {
302
    {
283
      bool isok = m_ok && m_workers_exited == 0 && !m_worker_threads.empty();
303
            bool isok = m_ok && m_workers_exited == 0 && !m_worker_threads.empty();
284
      return isok;
304
            return isok;
285
    }
305
    }
286
306
287
  long long nanodiff(const struct timespec& older,
307
    long long nanodiff(const struct timespec& older,
288
                     const struct timespec& newer)
308
                       const struct timespec& newer)
289
    {
309
    {
290
      return (newer.tv_sec - older.tv_sec) * 1000000000LL
310
            return (newer.tv_sec - older.tv_sec) * 1000000000LL
291
          + newer.tv_nsec - older.tv_nsec;
311
                + newer.tv_nsec - older.tv_nsec;
292
    }
312
    }
293
313
294
  // Configuration
314
    // Configuration
295
  std::string m_name;
315
    std::string m_name;
296
  size_t m_high;
316
    size_t m_high;
297
  size_t m_low;
317
    size_t m_low;
298
318
299
  // Status
319
    // Status
300
  // Worker threads having called exit
320
    // Worker threads having called exit
301
  unsigned int m_workers_exited;
321
    unsigned int m_workers_exited;
302
  bool m_ok;
322
    bool m_ok;
303
323
304
  // Per-thread data. The data is not used currently, this could be
324
    // Per-thread data. The data is not used currently, this could be
305
  // a set<pthread_t>
325
    // a set<pthread_t>
306
  std::unordered_map<pthread_t, WQTData> m_worker_threads;
326
    std::unordered_map<pthread_t, WQTData> m_worker_threads;
307
327
308
  // Synchronization
328
    // Synchronization
309
  std::queue<T> m_queue;
329
    std::queue<T> m_queue;
310
  pthread_cond_t m_ccond;
330
    pthread_cond_t m_ccond;
311
  pthread_cond_t m_wcond;
331
    pthread_cond_t m_wcond;
312
  PTMutexInit m_mutex;
332
    PTMutexInit m_mutex;
313
  // Client/Worker threads currently waiting for a job
333
    // Client/Worker threads currently waiting for a job
314
  unsigned int m_clients_waiting;
334
    unsigned int m_clients_waiting;
315
  unsigned int m_workers_waiting;
335
    unsigned int m_workers_waiting;
316
336
317
  // Statistics
337
    // Statistics
318
  unsigned int m_tottasks;
338
    unsigned int m_tottasks;
319
  unsigned int m_nowake;
339
    unsigned int m_nowake;
320
  unsigned int m_workersleeps;
340
    unsigned int m_workersleeps;
321
  unsigned int m_clientsleeps;
341
    unsigned int m_clientsleeps;
322
};
342
};
323
343
324
#endif /* _WORKQUEUE_H_INCLUDED_ */
344
#endif /* _WORKQUEUE_H_INCLUDED_ */