Switch to unified view

a/libupnpp/workqueue.hxx b/libupnpp/workqueue.hxx
1
/*     Copyright (C) 2012 J.F.Dockes
1
/*       Copyright (C) 2012 J.F.Dockes
2
 *     This program is free software; you can redistribute it and/or modify
2
 *       This program is free software; you can redistribute it and/or modify
3
 *     it under the terms of the GNU General Public License as published by
3
 *       it under the terms of the GNU General Public License as published by
4
 *     the Free Software Foundation; either version 2 of the License, or
4
 *       the Free Software Foundation; either version 2 of the License, or
5
 *     (at your option) any later version.
5
 *       (at your option) any later version.
6
 *
6
 *
7
 *     This program is distributed in the hope that it will be useful,
7
 *       This program is distributed in the hope that it will be useful,
8
 *     but WITHOUT ANY WARRANTY; without even the implied warranty of
8
 *       but WITHOUT ANY WARRANTY; without even the implied warranty of
9
 *     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
9
 *       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10
 *     GNU General Public License for more details.
10
 *       GNU General Public License for more details.
11
 *
11
 *
12
 *     You should have received a copy of the GNU General Public License
12
 *       You should have received a copy of the GNU General Public License
13
 *     along with this program; if not, write to the
13
 *       along with this program; if not, write to the
14
 *     Free Software Foundation, Inc.,
14
 *       Free Software Foundation, Inc.,
15
 *     59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
15
 *       59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
16
 */
16
 */
17
#ifndef _WORKQUEUE_H_INCLUDED_
17
#ifndef _WORKQUEUE_H_INCLUDED_
18
#define _WORKQUEUE_H_INCLUDED_
18
#define _WORKQUEUE_H_INCLUDED_
19
19
20
#include "libupnpp/config.h"
20
#include "libupnpp/config.h"
...
...
47
public:
47
public:
48
48
49
    /** Create a WorkQueue
49
    /** Create a WorkQueue
50
     * @param name for message printing
50
     * @param name for message printing
51
     * @param hi number of tasks on queue before clients blocks. Default 0
51
     * @param hi number of tasks on queue before clients blocks. Default 0
52
     *      meaning no limit. hi == -1 means that the queue is disabled.
52
     *    meaning no limit. hi == -1 means that the queue is disabled.
53
     * @param lo minimum count of tasks before worker starts. Default 1.
53
     * @param lo minimum count of tasks before worker starts. Default 1.
54
     */
54
     */
55
    WorkQueue(const std::string& name, size_t hi = 0, size_t lo = 1)
55
    WorkQueue(const std::string& name, size_t hi = 0, size_t lo = 1)
56
        : m_name(name), m_high(hi), m_low(lo),
56
        : m_name(name), m_high(hi), m_low(lo),
57
          m_workers_exited(0), m_clients_waiting(0), m_workers_waiting(0),
57
          m_workers_exited(0), m_clients_waiting(0), m_workers_waiting(0),
58
          m_tottasks(0), m_nowake(0), m_workersleeps(0), m_clientsleeps(0)
58
          m_tottasks(0), m_nowake(0), m_workersleeps(0), m_clientsleeps(0) {
59
    {
60
        m_ok = (pthread_cond_init(&m_ccond, 0) == 0) &&
59
        m_ok = (pthread_cond_init(&m_ccond, 0) == 0) &&
61
               (pthread_cond_init(&m_wcond, 0) == 0);
60
            (pthread_cond_init(&m_wcond, 0) == 0);
62
    }
61
    }
63
62
64
    ~WorkQueue()
63
    ~WorkQueue() {
65
    {
66
        if (!m_worker_threads.empty())
64
        if (!m_worker_threads.empty())
67
            setTerminateAndWait();
65
            setTerminateAndWait();
68
    }
66
    }
69
67
70
    /** Start the worker threads.
68
    /** Start the worker threads.
71
     *
69
     *
72
     * @param nworkers number of threads copies to start.
70
     * @param nworkers number of threads copies to start.
73
     * @param start_routine thread function. It should loop
71
     * @param start_routine thread function. It should loop
74
     *        taking (QueueWorker::take()) and executing tasks.
72
     *          taking (QueueWorker::take()) and executing tasks.
75
     * @param arg initial parameter to thread function.
73
     * @param arg initial parameter to thread function.
76
     * @return true if ok.
74
     * @return true if ok.
77
     */
75
     */
78
    bool start(int nworkers, void *(*workproc)(void *), void *arg)
76
    bool start(int nworkers, void *(*workproc)(void *), void *arg) {
79
    {
80
        PTMutexLocker lock(m_mutex);
77
        PTMutexLocker lock(m_mutex);
81
        for    (int i = 0; i < nworkers; i++) {
78
        for (int i = 0; i < nworkers; i++) {
82
            int err;
79
            int err;
83
            pthread_t thr;
80
            pthread_t thr;
84
            if ((err = pthread_create(&thr, 0, workproc, arg))) {
81
            if ((err = pthread_create(&thr, 0, workproc, arg))) {
85
                return false;
82
                return false;
86
            }
83
            }
...
...
91
88
92
    /** Add item to work queue, called from client.
89
    /** Add item to work queue, called from client.
93
     *
90
     *
94
     * Sleeps if there are already too many.
91
     * Sleeps if there are already too many.
95
     */
92
     */
96
    bool put(T t, bool flushprevious = false)
93
    bool put(T t, bool flushprevious = false) {
97
    {
98
        PTMutexLocker lock(m_mutex);
94
        PTMutexLocker lock(m_mutex);
99
        if (!lock.ok() || !ok()) {
95
        if (!lock.ok() || !ok()) {
100
            return false;
96
            return false;
101
        }
97
        }
102
98
...
...
139
     * idle EXCEPT if the caller knows that no jobs are still being created.
135
     * idle EXCEPT if the caller knows that no jobs are still being created.
140
     * It would be possible to transform this into a safe call if some kind
136
     * It would be possible to transform this into a safe call if some kind
141
     * of suspend condition was set on the queue by waitIdle(), to be reset by
137
     * of suspend condition was set on the queue by waitIdle(), to be reset by
142
     * some kind of "resume" call. Not currently the case.
138
     * some kind of "resume" call. Not currently the case.
143
     */
139
     */
144
    bool waitIdle()
140
    bool waitIdle() {
145
    {
146
        PTMutexLocker lock(m_mutex);
141
        PTMutexLocker lock(m_mutex);
147
        if (!lock.ok() || !ok()) {
142
        if (!lock.ok() || !ok()) {
148
            return false;
143
            return false;
149
        }
144
        }
150
145
...
...
168
    /** Tell the workers to exit, and wait for them.
163
    /** Tell the workers to exit, and wait for them.
169
     *
164
     *
170
     * Does not bother about tasks possibly remaining on the queue, so
165
     * Does not bother about tasks possibly remaining on the queue, so
171
     * should be called after waitIdle() for an orderly shutdown.
166
     * should be called after waitIdle() for an orderly shutdown.
172
     */
167
     */
173
    void* setTerminateAndWait()
168
    void* setTerminateAndWait() {
174
    {
175
        PTMutexLocker lock(m_mutex);
169
        PTMutexLocker lock(m_mutex);
176
170
177
        if (m_worker_threads.empty()) {
171
        if (m_worker_threads.empty()) {
178
            // Already called ?
172
            // Already called ?
179
            return (void*)0;
173
            return (void*)0;
...
...
204
            m_worker_threads.erase(it);
198
            m_worker_threads.erase(it);
205
        }
199
        }
206
200
207
        // Reset to start state.
201
        // Reset to start state.
208
        m_workers_exited = m_clients_waiting = m_workers_waiting =
202
        m_workers_exited = m_clients_waiting = m_workers_waiting =
209
                m_tottasks = m_nowake = m_workersleeps = m_clientsleeps = 0;
203
            m_tottasks = m_nowake = m_workersleeps = m_clientsleeps = 0;
210
        m_ok = true;
204
        m_ok = true;
211
205
212
        return statusall;
206
        return statusall;
213
    }
207
    }
214
208
215
    /** Take task from queue. Called from worker.
209
    /** Take task from queue. Called from worker.
216
     *
210
     *
217
     * Sleeps if there are not enough. Signal if we go to sleep on empty
211
     * Sleeps if there are not enough. Signal if we go to sleep on empty
218
     * queue: client may be waiting for our going idle.
212
     * queue: client may be waiting for our going idle.
219
     */
213
     */
220
    bool take(T* tp, size_t *szp = 0)
214
    bool take(T* tp, size_t *szp = 0) {
221
    {
222
        PTMutexLocker lock(m_mutex);
215
        PTMutexLocker lock(m_mutex);
223
        if (!lock.ok() || !ok()) {
216
        if (!lock.ok() || !ok()) {
224
            return false;
217
            return false;
225
        }
218
        }
226
219
...
...
256
     * the queue is terminated by the client. Workers never exit normally,
249
     * the queue is terminated by the client. Workers never exit normally,
257
     * except when the queue is shut down (at which point m_ok is set to
250
     * except when the queue is shut down (at which point m_ok is set to
258
     * false by the shutdown code anyway). The thread must return/exit
251
     * false by the shutdown code anyway). The thread must return/exit
259
     * immediately after calling this.
252
     * immediately after calling this.
260
     */
253
     */
261
    void workerExit()
254
    void workerExit() {
262
    {
263
        PTMutexLocker lock(m_mutex);
255
        PTMutexLocker lock(m_mutex);
264
        m_workers_exited++;
256
        m_workers_exited++;
265
        m_ok = false;
257
        m_ok = false;
266
        pthread_cond_broadcast(&m_ccond);
258
        pthread_cond_broadcast(&m_ccond);
267
    }
259
    }
268
260
269
    size_t qsize()
261
    size_t qsize() {
270
    {
271
        PTMutexLocker lock(m_mutex);
262
        PTMutexLocker lock(m_mutex);
272
        size_t sz = m_queue.size();
263
        size_t sz = m_queue.size();
273
        return sz;
264
        return sz;
274
    }
265
    }
275
266
276
private:
267
private:
277
    bool ok()
268
    bool ok() {
278
    {
279
        bool isok = m_ok && m_workers_exited == 0 && !m_worker_threads.empty();
269
        bool isok = m_ok && m_workers_exited == 0 && !m_worker_threads.empty();
280
        return isok;
270
        return isok;
281
    }
271
    }
282
272
283
    long long nanodiff(const struct timespec& older,
273
    long long nanodiff(const struct timespec& older,
284
                       const struct timespec& newer)
274
                       const struct timespec& newer) {
285
    {
286
        return (newer.tv_sec - older.tv_sec) * 1000000000LL
275
        return (newer.tv_sec - older.tv_sec) * 1000000000LL
287
               + newer.tv_nsec - older.tv_nsec;
276
            + newer.tv_nsec - older.tv_nsec;
288
    }
277
    }
289
278
290
    // Configuration
279
    // Configuration
291
    std::string m_name;
280
    std::string m_name;
292
    size_t m_high;
281
    size_t m_high;