Switch to unified view

a/sc2src/workqueue.h b/sc2src/workqueue.h
1
/*     Copyright (C) 2012 J.F.Dockes
1
/*   Copyright (C) 2012-2016 J.F.Dockes
2
 *     This program is free software; you can redistribute it and/or modify
2
 *   This program is free software; you can redistribute it and/or modify
3
 *     it under the terms of the GNU General Public License as published by
3
 *   it under the terms of the GNU General Public License as published by
4
 *     the Free Software Foundation; either version 2 of the License, or
4
 *   the Free Software Foundation; either version 2 of the License, or
5
 *     (at your option) any later version.
5
 *   (at your option) any later version.
6
 *
6
 *
7
 *     This program is distributed in the hope that it will be useful,
7
 *   This program is distributed in the hope that it will be useful,
8
 *     but WITHOUT ANY WARRANTY; without even the implied warranty of
8
 *   but WITHOUT ANY WARRANTY; without even the implied warranty of
9
 *     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
9
 *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10
 *     GNU General Public License for more details.
10
 *   GNU General Public License for more details.
11
 *
11
 *
12
 *     You should have received a copy of the GNU General Public License
12
 *   You should have received a copy of the GNU General Public License
13
 *     along with this program; if not, write to the
13
 *   along with this program; if not, write to the
14
 *     Free Software Foundation, Inc.,
14
 *   Free Software Foundation, Inc.,
15
 *     59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
15
 *   59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
16
 */
16
 */
17
#ifndef _WORKQUEUE_H_INCLUDED_
17
#ifndef _WORKQUEUE_H_INCLUDED_
18
#define _WORKQUEUE_H_INCLUDED_
18
#define _WORKQUEUE_H_INCLUDED_
19
19
20
#include <pthread.h>
20
#include <thread>
21
#include <time.h>
21
#include <future>
22
23
#include <string>
22
#include <string>
24
#include <queue>
23
#include <queue>
25
#include <unordered_map>
24
#include <list>
26
27
#include "ptmutex.h"
25
#include <mutex>
26
#include <condition_variable>
28
27
29
/// Store per-worker-thread data. Just an initialized timespec, and
28
#include "log.h"
30
/// used at the moment.
31
class WQTData {
32
public:
33
    WQTData() {wstart.tv_sec = 0; wstart.tv_nsec = 0;}
34
    struct timespec wstart;
35
};
36
29
37
/**
30
/**
38
 * A WorkQueue manages the synchronisation around a queue of work items,
31
 * A WorkQueue manages the synchronisation around a queue of work items,
39
 * where a number of client threads queue tasks and a number of worker
32
 * where a number of client threads queue tasks and a number of worker
40
 * threads take and execute them. The goal is to introduce some level
33
 * threads take and execute them. The goal is to introduce some level
...
...
44
 *
37
 *
45
 * There is no individual task status return. In case of fatal error,
38
 * There is no individual task status return. In case of fatal error,
46
 * the client or worker sets an end condition on the queue. A second
39
 * the client or worker sets an end condition on the queue. A second
47
 * queue could conceivably be used for returning individual task
40
 * queue could conceivably be used for returning individual task
48
 * status.
41
 * status.
42
 *
43
 * The strange thread functions argument and return values
44
 * comes from compatibility with an earlier pthread-based
45
 * implementation.
49
 */
46
 */
50
template <class T> class WorkQueue {
47
template <class T> class WorkQueue {
51
public:
48
public:
52
49
53
    /** Create a WorkQueue
50
    /** Create a WorkQueue
54
     * @param name for message printing
51
     * @param name for message printing
55
     * @param hi number of tasks on queue before clients blocks. Default 0
52
     * @param hi number of tasks on queue before clients blocks. Default 0
56
     *      meaning no limit. hi == -1 means that the queue is disabled.
53
     *    meaning no limit. hi == -1 means that the queue is disabled.
57
     * @param lo minimum count of tasks before worker starts. Default 1.
54
     * @param lo minimum count of tasks before worker starts. Default 1.
58
     */
55
     */
59
    WorkQueue(const std::string& name, size_t hi = 0, size_t lo = 1)
56
    WorkQueue(const std::string& name, size_t hi = 0, size_t lo = 1)
60
        : m_name(name), m_high(hi), m_low(lo),
57
        : m_name(name), m_high(hi), m_low(lo), m_workers_exited(0),
61
          m_workers_exited(0), m_clients_waiting(0), m_workers_waiting(0),
58
          m_ok(true), m_clients_waiting(0), m_workers_waiting(0),
62
          m_tottasks(0), m_nowake(0), m_workersleeps(0), m_clientsleeps(0)
59
          m_tottasks(0), m_nowake(0), m_workersleeps(0), m_clientsleeps(0) {
63
  {
60
    }
64
            m_ok = (pthread_cond_init(&m_ccond, 0) == 0) &&
65
                (pthread_cond_init(&m_wcond, 0) == 0);
66
  }
67
61
68
    ~WorkQueue()
62
    ~WorkQueue() {
69
  {
70
            if (!m_worker_threads.empty())
63
        if (!m_worker_threads.empty()) {
71
                setTerminateAndWait();
64
            setTerminateAndWait();
72
  }
65
        }
66
    }
73
67
74
    /** Start the worker threads.
68
    /** Start the worker threads.
75
     *
69
     *
76
     * @param nworkers number of threads copies to start.
70
     * @param nworkers number of threads copies to start.
77
     * @param start_routine thread function. It should loop
71
     * @param start_routine thread function. It should loop
78
     *      taking (WorkQueue::take()) and executing tasks.
72
     *      taking (QueueWorker::take()) and executing tasks.
79
     * @param arg initial parameter to thread function.
73
     * @param arg initial parameter to thread function.
80
     * @return true if ok.
74
     * @return true if ok.
81
     */
75
     */
82
    bool start(int nworkers, void *(*workproc)(void *), void *arg)
76
    bool start(int nworkers, void *(workproc)(void *), void *arg) {
83
  {
77
        std::unique_lock<std::mutex> lock(m_mutex);
84
            PTMutexLocker lock(m_mutex);
85
            for     (int i = 0; i < nworkers; i++) {
78
        for (int i = 0; i < nworkers; i++) {
86
                int err;
79
            std::packaged_task<void *(void *)> task(workproc);
87
                pthread_t thr;
80
            Worker w;
88
                if ((err = pthread_create(&thr, 0, workproc, arg))) {
81
            w.res = task.get_future();
89
                    return false;
82
            w.thr = std::thread(std::move(task), arg);
90
                }
83
            m_worker_threads.push_back(std::move(w));
91
                m_worker_threads.insert(std::pair<pthread_t, WQTData>(thr, WQTData()));
92
            }
84
        }
93
            return true;
85
        return true;
94
  }
86
    }
95
87
96
    /** Add item to work queue, called from client.
88
    /** Add item to work queue, called from client.
97
     *
89
     *
98
     * Sleeps if there are already too many.
90
     * Sleeps if there are already too many.
99
     */
91
     */
100
    bool put(T t, bool flushprevious = false)
92
    bool put(T t, bool flushprevious = false) {
101
  {
93
        std::unique_lock<std::mutex> lock(m_mutex);
102
            PTMutexLocker lock(m_mutex);
94
        if (!ok()) {
95
            LOGERR("WorkQueue::put:"  << m_name << ": !ok\n");
96
            return false;
97
        }
98
99
        while (ok() && m_high > 0 && m_queue.size() >= m_high) {
100
            m_clientsleeps++;
101
            // Keep the order: we test ok() AFTER the sleep...
102
            m_clients_waiting++;
103
            m_ccond.wait(lock);
103
            if (!lock.ok() || !ok()) {
104
            if (!ok()) {
105
                m_clients_waiting--;
104
                return false;
106
                return false;
105
            }
107
            }
106
107
            while (ok() && m_high > 0 && m_queue.size() >= m_high) {
108
                m_clientsleeps++;
109
                // Keep the order: we test ok() AFTER the sleep...
110
                m_clients_waiting++;
111
                if (pthread_cond_wait(&m_ccond, lock.getMutex()) || !ok()) {
112
                    m_clients_waiting--;
113
                    return false;
114
                }
115
                m_clients_waiting--;
108
            m_clients_waiting--;
116
            }
109
        }
117
            if (flushprevious) {
110
        if (flushprevious) {
118
                while (!m_queue.empty())
111
            while (!m_queue.empty()) {
119
                    m_queue.pop();
112
                m_queue.pop();
113
            }
120
            }
114
        }
115
121
            m_queue.push(t);
116
        m_queue.push(t);
122
            if (m_workers_waiting > 0) {
117
        if (m_workers_waiting > 0) {
123
                // Just wake one worker, there is only one new task.
118
            // Just wake one worker, there is only one new task.
124
                pthread_cond_signal(&m_wcond);
119
            m_wcond.notify_one();
125
            } else {
120
        } else {
126
                m_nowake++;
121
            m_nowake++;
127
            }
122
        }
128
123
129
            return true;
124
        return true;
130
  }
125
    }
131
126
132
    /** Wait until the queue is inactive. Called from client.
127
    /** Wait until the queue is inactive. Called from client.
133
     *
128
     *
134
     * Waits until the task queue is empty and the workers are all
129
     * Waits until the task queue is empty and the workers are all
135
     * back sleeping. Used by the client to wait for all current work
130
     * back sleeping. Used by the client to wait for all current work
...
...
143
     * idle EXCEPT if the caller knows that no jobs are still being created.
138
     * idle EXCEPT if the caller knows that no jobs are still being created.
144
     * It would be possible to transform this into a safe call if some kind
139
     * It would be possible to transform this into a safe call if some kind
145
     * of suspend condition was set on the queue by waitIdle(), to be reset by
140
     * of suspend condition was set on the queue by waitIdle(), to be reset by
146
     * some kind of "resume" call. Not currently the case.
141
     * some kind of "resume" call. Not currently the case.
147
     */
142
     */
148
    bool waitIdle()
143
    bool waitIdle() {
149
  {
144
        std::unique_lock<std::mutex> lock(m_mutex);
150
            PTMutexLocker lock(m_mutex);
145
        if (!ok()) {
151
            if (!lock.ok() || !ok()) {
146
            LOGERR("WorkQueue::waitIdle:"  << m_name << ": not ok\n");
152
                return false;
147
            return false;
153
            }
148
        }
154
149
155
            // We're done when the queue is empty AND all workers are back
150
        // We're done when the queue is empty AND all workers are back
156
            // waiting for a task.
151
        // waiting for a task.
157
            while (ok() && (m_queue.size() > 0 ||
152
        while (ok() && (m_queue.size() > 0 ||
158
                            m_workers_waiting != m_worker_threads.size())) {
153
                        m_workers_waiting != m_worker_threads.size())) {
159
                m_clients_waiting++;
154
            m_clients_waiting++;
160
                if (pthread_cond_wait(&m_ccond, lock.getMutex())) {
155
            m_ccond.wait(lock);
161
                    m_clients_waiting--;
162
                    m_ok = false;
163
                    return false;
164
                }
165
                m_clients_waiting--;
156
            m_clients_waiting--;
166
            }
157
        }
167
158
168
            return ok();
159
        return ok();
169
  }
160
    }
170
171
161
172
    /** Tell the workers to exit, and wait for them.
162
    /** Tell the workers to exit, and wait for them.
173
     *
163
     *
174
     * Does not bother about tasks possibly remaining on the queue, so
164
     * Does not bother about tasks possibly remaining on the queue, so
175
     * should be called after waitIdle() for an orderly shutdown.
165
     * should be called after waitIdle() for an orderly shutdown.
176
     */
166
     */
177
    void* setTerminateAndWait()
167
    void *setTerminateAndWait() {
178
  {
168
        std::unique_lock<std::mutex> lock(m_mutex);
179
            PTMutexLocker lock(m_mutex);
169
        LOGDEB("setTerminateAndWait:"  << m_name << "\n");
180
170
181
            if (m_worker_threads.empty()) {
171
        if (m_worker_threads.empty()) {
182
                // Already called ?
172
            // Already called ?
183
                return (void*)0;
173
            return (void*)0;
184
            }
174
        }
185
175
186
            // Wait for all worker threads to have called workerExit()
176
        // Wait for all worker threads to have called workerExit()
187
            m_ok = false;
177
        m_ok = false;
188
            while (m_workers_exited < m_worker_threads.size()) {
178
        while (m_workers_exited < m_worker_threads.size()) {
189
                pthread_cond_broadcast(&m_wcond);
179
            m_wcond.notify_all();
190
                m_clients_waiting++;
180
            m_clients_waiting++;
191
                if (pthread_cond_wait(&m_ccond, lock.getMutex())) {
181
            m_ccond.wait(lock);
192
                    m_clients_waiting--;
193
                    return (void*)0;
194
                }
195
                m_clients_waiting--;
182
            m_clients_waiting--;
196
            }
183
        }
197
184
185
        LOGINFO(""  << m_name << ": tasks "  << m_tottasks << " nowakes "  <<
186
                m_nowake << " wsleeps "  << m_workersleeps << " csleeps "  <<
187
                m_clientsleeps << "\n");
198
            // Perform the thread joins and compute overall status
188
        // Perform the thread joins and compute overall status
199
            // Workers return (void*)1 if ok
189
        // Workers return (void*)1 if ok
200
            void *statusall = (void*)1;
190
        void *statusall = (void*)1;
201
            std::unordered_map<pthread_t,  WQTData>::iterator it;
202
            while (!m_worker_threads.empty()) {
191
        while (!m_worker_threads.empty()) {
203
                void *status;
192
            void *status = m_worker_threads.front().res.get();
204
                it = m_worker_threads.begin();
193
            m_worker_threads.front().thr.join();
205
                pthread_join(it->first, &status);
206
                if (status == (void *)0)
194
            if (status == (void *)0) {
207
                    statusall = status;
195
                statusall = status;
196
            }
208
                m_worker_threads.erase(it);
197
            m_worker_threads.pop_front();
209
            }
198
        }
210
199
211
            // Reset to start state.
200
        // Reset to start state.
212
            m_workers_exited = m_clients_waiting = m_workers_waiting =
201
        m_workers_exited = m_clients_waiting = m_workers_waiting =
213
                m_tottasks = m_nowake = m_workersleeps = m_clientsleeps = 0;
202
                m_tottasks = m_nowake = m_workersleeps = m_clientsleeps = 0;
214
            m_ok = true;
203
        m_ok = true;
215
204
205
        LOGDEB("setTerminateAndWait:"  << m_name << " done\n");
216
            return statusall;
206
        return statusall;
217
  }
207
    }
218
208
219
    /** Take task from queue. Called from worker.
209
    /** Take task from queue. Called from worker.
220
     *
210
     *
221
     * Sleeps if there are not enough. Signal if we go to sleep on empty
211
     * Sleeps if there are not enough. Signal if we go to sleep on empty
222
     * queue: client may be waiting for our going idle.
212
     * queue: client may be waiting for our going idle.
223
     */
213
     */
224
    bool take(T* tp, size_t *szp = 0)
214
    bool take(T* tp, size_t *szp = 0) {
225
  {
215
        std::unique_lock<std::mutex> lock(m_mutex);
226
            PTMutexLocker lock(m_mutex);
216
        if (!ok()) {
217
            LOGDEB("WorkQueue::take:"  << m_name << ": not ok\n");
218
            return false;
219
        }
220
221
        while (ok() && m_queue.size() < m_low) {
222
            m_workersleeps++;
223
            m_workers_waiting++;
224
            if (m_queue.empty()) {
225
                m_ccond.notify_all();
226
            }
227
            m_wcond.wait(lock);
227
            if (!lock.ok() || !ok()) {
228
            if (!ok()) {
229
                // !ok is a normal condition when shutting down
230
                m_workers_waiting--;
228
                return false;
231
                return false;
229
            }
232
            }
230
231
            while (ok() && m_queue.size() < m_low) {
232
                m_workersleeps++;
233
                m_workers_waiting++;
234
                if (m_queue.empty())
235
                    pthread_cond_broadcast(&m_ccond);
236
                if (pthread_cond_wait(&m_wcond, lock.getMutex()) || !ok()) {
237
                    m_workers_waiting--;
238
                    return false;
239
                }
240
                m_workers_waiting--;
233
            m_workers_waiting--;
241
            }
234
        }
242
235
243
            m_tottasks++;
236
        m_tottasks++;
244
            *tp = m_queue.front();
237
        *tp = m_queue.front();
245
            if (szp)
238
        if (szp) {
246
                *szp = m_queue.size();
239
            *szp = m_queue.size();
240
        }
247
            m_queue.pop();
241
        m_queue.pop();
248
            if (m_clients_waiting > 0) {
242
        if (m_clients_waiting > 0) {
249
                // No reason to wake up more than one client thread
243
            // No reason to wake up more than one client thread
250
                pthread_cond_signal(&m_ccond);
244
            m_ccond.notify_one();
251
            } else {
245
        } else {
252
                m_nowake++;
246
            m_nowake++;
253
            }
247
        }
254
            return true;
248
        return true;
255
  }
256
      
249
    }
250
257
    bool waitminsz(size_t sz) {
251
    bool waitminsz(size_t sz) {
258
        PTMutexLocker lock(m_mutex);
252
        std::unique_lock<std::mutex> lock(m_mutex);
259
        if (!lock.ok() || !ok()) {
253
        if (!ok()) {
260
            return false;
254
            return false;
261
        }
255
        }
262
256
263
        while (ok() && m_queue.size() < sz) {
257
        while (ok() && m_queue.size() < sz) {
264
            m_workersleeps++;
258
            m_workersleeps++;
265
            m_workers_waiting++;
259
            m_workers_waiting++;
266
            if (m_queue.empty())
260
            if (m_queue.empty()) {
267
                pthread_cond_broadcast(&m_ccond);
261
                m_ccond.notify_all();
268
            if (pthread_cond_wait(&m_wcond, lock.getMutex()) || !ok()) {
262
            }
263
            m_wcond.wait(lock);
264
            if (!ok()) {
269
                m_workers_waiting--;
265
                m_workers_waiting--;
270
                return false;
266
                return false;
271
            }
267
            }
272
            m_workers_waiting--;
268
            m_workers_waiting--;
273
        }
269
        }
...
...
280
     * the queue is terminated by the client. Workers never exit normally,
276
     * the queue is terminated by the client. Workers never exit normally,
281
     * except when the queue is shut down (at which point m_ok is set to
277
     * except when the queue is shut down (at which point m_ok is set to
282
     * false by the shutdown code anyway). The thread must return/exit
278
     * false by the shutdown code anyway). The thread must return/exit
283
     * immediately after calling this.
279
     * immediately after calling this.
284
     */
280
     */
285
    void workerExit()
281
    void workerExit() {
286
  {
282
        LOGDEB("workerExit:"  << m_name << "\n");
287
            PTMutexLocker lock(m_mutex);
283
        std::unique_lock<std::mutex> lock(m_mutex);
288
            m_workers_exited++;
284
        m_workers_exited++;
289
            m_ok = false;
285
        m_ok = false;
290
            pthread_cond_broadcast(&m_ccond);
286
        m_ccond.notify_all();
291
  }
287
    }
292
288
293
    size_t qsize()
289
    size_t qsize() {
294
  {
290
        std::unique_lock<std::mutex> lock(m_mutex);
295
            PTMutexLocker lock(m_mutex);
296
            size_t sz = m_queue.size();
291
        return m_queue.size();
297
            return sz;
292
    }
298
  }
299
293
300
private:
294
private:
301
    bool ok()
295
    bool ok() {
302
  {
303
            bool isok = m_ok && m_workers_exited == 0 && !m_worker_threads.empty();
296
        bool isok = m_ok && m_workers_exited == 0 && !m_worker_threads.empty();
297
        if (!isok) {
298
            LOGDEB("WorkQueue:ok:" << m_name << ": not ok m_ok " << m_ok <<
299
                   " m_workers_exited " << m_workers_exited <<
300
                   " m_worker_threads size " << m_worker_threads.size() <<
301
                   "\n");
302
        }
304
            return isok;
303
        return isok;
305
  }
304
    }
306
305
307
    long long nanodiff(const struct timespec& older,
306
    struct Worker {
308
                       const struct timespec& newer)
307
        std::thread         thr;
309
  {
308
        std::future<void *> res;
310
            return (newer.tv_sec - older.tv_sec) * 1000000000LL
309
    };
311
                + newer.tv_nsec - older.tv_nsec;
310
    
312
  }
313
314
    // Configuration
311
    // Configuration
315
    std::string m_name;
312
    std::string m_name;
316
    size_t m_high;
313
    size_t m_high;
317
    size_t m_low;
314
    size_t m_low;
318
315
316
    // Worker threads having called exit. Used to decide when we're done
317
    unsigned int m_workers_exited;
319
    // Status
318
    // Status
320
    // Worker threads having called exit
321
    unsigned int m_workers_exited;
322
    bool m_ok;
319
    bool m_ok;
323
320
324
    // Per-thread data. The data is not used currently, this could be
321
    // Our threads. 
325
    // a set<pthread_t>
322
    std::list<Worker> m_worker_threads;
326
    std::unordered_map<pthread_t, WQTData> m_worker_threads;
327
323
324
    // Jobs input queue
325
    std::queue<T> m_queue;
326
    
328
    // Synchronization
327
    // Synchronization
329
    std::queue<T> m_queue;
328
    std::condition_variable m_ccond;
330
    pthread_cond_t m_ccond;
329
    std::condition_variable m_wcond;
331
    pthread_cond_t m_wcond;
330
    std::mutex m_mutex;
332
    PTMutexInit m_mutex;
331
333
    // Client/Worker threads currently waiting for a job
332
    // Client/Worker threads currently waiting for a job
334
    unsigned int m_clients_waiting;
333
    unsigned int m_clients_waiting;
335
    unsigned int m_workers_waiting;
334
    unsigned int m_workers_waiting;
336
335
337
    // Statistics
336
    // Statistics
...
...
340
    unsigned int m_workersleeps;
339
    unsigned int m_workersleeps;
341
    unsigned int m_clientsleeps;
340
    unsigned int m_clientsleeps;
342
};
341
};
343
342
344
#endif /* _WORKQUEUE_H_INCLUDED_ */
343
#endif /* _WORKQUEUE_H_INCLUDED_ */
344