Switch to unified view

a b/src/utils/workqueue.h
1
/*
2
 *   This program is free software; you can redistribute it and/or modify
3
 *   it under the terms of the GNU General Public License as published by
4
 *   the Free Software Foundation; either version 2 of the License, or
5
 *   (at your option) any later version.
6
 *
7
 *   This program is distributed in the hope that it will be useful,
8
 *   but WITHOUT ANY WARRANTY; without even the implied warranty of
9
 *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10
 *   GNU General Public License for more details.
11
 *
12
 *   You should have received a copy of the GNU General Public License
13
 *   along with this program; if not, write to the
14
 *   Free Software Foundation, Inc.,
15
 *   59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
16
 */
17
#ifndef _WORKQUEUE_H_INCLUDED_
18
#define _WORKQUEUE_H_INCLUDED_
19
20
#include "pthread.h"
21
#include <string>
22
#include <queue>
23
using std::queue;
24
using std::string;
25
26
/**
27
 * A WorkQueue manages the synchronisation around a queue of work items,
28
 * where a single client thread queues tasks and a single worker takes
29
 * and executes them. The goal is to introduce some level of
30
 * parallelism between the successive steps of a previously single
31
 * threaded pipe-line (data extraction / data preparation / index
32
 * update).
33
 *
34
 * There is no individual task status return. In case of fatal error,
35
 * the client or worker sets an end condition on the queue. A second
36
 * queue could conceivably be used for returning individual task
37
 * status.
38
 */
39
template <class T> class WorkQueue {
40
public:
41
    WorkQueue(int hi = 0, int lo = 1)
42
  : m_high(hi), m_low(lo), m_size(0), m_worker_up(false),
43
    m_worker_waiting(false), m_jobcnt(0), m_lenacc(0)
44
    {
45
  m_ok = (pthread_cond_init(&m_cond, 0) == 0) && 
46
      (pthread_mutex_init(&m_mutex, 0) == 0);
47
    }
48
49
    ~WorkQueue() 
50
    {
51
  if (m_worker_up)
52
      setTerminateAndWait();
53
    }
54
55
    /** Start the worker thread. The start_routine will loop
56
     *  taking and executing tasks. */
57
    bool start(void *(*start_routine)(void *), void *arg)
58
    {
59
  bool status = pthread_create(&m_worker_thread, 0, 
60
                   start_routine, arg) == 0;
61
  if (status)
62
      m_worker_up = true;
63
  return status;
64
    }
65
66
    /**
67
     * Add item to work queue. Sleep if there are already too many.
68
     * Called from client.
69
     */
70
    bool put(T t)
71
    {
72
  if (!ok() || pthread_mutex_lock(&m_mutex) != 0) 
73
      return false;
74
75
  while (ok() && m_high > 0 && m_queue.size() >= m_high) {
76
      // Keep the order: we test ok() AFTER the sleep...
77
      if (pthread_cond_wait(&m_cond, &m_mutex) || !ok()) {
78
      pthread_mutex_unlock(&m_mutex);
79
      return false;
80
      }
81
  }
82
83
  m_queue.push(t);
84
  ++m_size;
85
  pthread_cond_broadcast(&m_cond);
86
  pthread_mutex_unlock(&m_mutex);
87
  return true;
88
    }
89
90
    /** Wait until the queue is empty and the worker is
91
     *  back waiting for task. Called from the client when it needs to
92
     *  perform work that couldn't be done in parallel with the
93
     *  worker's tasks.
94
     */
95
    bool waitIdle()
96
    {
97
  if (!ok() || pthread_mutex_lock(&m_mutex) != 0) 
98
      return false;
99
100
  // We're done when the queue is empty AND the worker is back
101
  // for a task (has finished the last)
102
  while (ok() && (m_queue.size() > 0 || !m_worker_waiting)) {
103
      if (pthread_cond_wait(&m_cond, &m_mutex)) {
104
      pthread_mutex_unlock(&m_mutex);
105
      return false;
106
      }
107
  }
108
  pthread_mutex_unlock(&m_mutex);
109
  return ok();
110
    }
111
112
    /** Tell the worker to exit, and wait for it. There may still
113
  be tasks on the queue. */
114
    void* setTerminateAndWait()
115
    {
116
  if (!m_worker_up)
117
      return (void *)0;
118
119
  pthread_mutex_lock(&m_mutex);
120
  m_ok = false;
121
  pthread_cond_broadcast(&m_cond);
122
  pthread_mutex_unlock(&m_mutex);
123
  void *status;
124
  pthread_join(m_worker_thread, &status);
125
  m_worker_up = false;
126
  return status;
127
    }
128
129
    /** Remove task from queue. Sleep if there are not enough. Signal if we go
130
  to sleep on empty queue: client may be waiting for our going idle */
131
    bool take(T* tp)
132
    {
133
  if (!ok() || pthread_mutex_lock(&m_mutex) != 0)
134
      return false;
135
136
  while (ok() && m_queue.size() < m_low) {
137
      m_worker_waiting = true;
138
      if (m_queue.empty())
139
      pthread_cond_broadcast(&m_cond);
140
      if (pthread_cond_wait(&m_cond, &m_mutex) || !ok()) {
141
      pthread_mutex_unlock(&m_mutex);
142
      m_worker_waiting = false;
143
      return false;
144
      }
145
      m_worker_waiting = false;
146
  }
147
148
  ++m_jobcnt;
149
  m_lenacc += m_size;
150
151
  *tp = m_queue.front();
152
  m_queue.pop();
153
  --m_size;
154
155
  pthread_cond_broadcast(&m_cond);
156
  pthread_mutex_unlock(&m_mutex);
157
  return true;
158
    }
159
160
    /** Take note of the worker exit. This would normally happen after an
161
  unrecoverable error */
162
    void workerExit()
163
    {
164
  if (!ok() || pthread_mutex_lock(&m_mutex) != 0)
165
      return;
166
  m_ok = false;
167
  pthread_cond_broadcast(&m_cond);
168
  pthread_mutex_unlock(&m_mutex);
169
    }
170
171
    /** Debug only: as the size is returned while the queue is unlocked, there
172
     *  is no warranty on its consistency. Not that we use the member size, not 
173
     *  the container size() call which would need locking.
174
     */
175
    size_t size() {return m_size;}
176
177
private:
178
    bool ok() {return m_ok && m_worker_up;}
179
180
    size_t m_high;
181
    size_t m_low; 
182
    size_t m_size;
183
    bool m_worker_up;
184
    bool m_worker_waiting;
185
    int m_jobcnt;
186
    int m_lenacc;
187
188
    pthread_t m_worker_thread;
189
    queue<T> m_queue;
190
    pthread_cond_t m_cond;
191
    pthread_mutex_t m_mutex;
192
    bool m_ok;
193
};
194
195
#endif /* _WORKQUEUE_H_INCLUDED_ */