|
a/src/utils/workqueue.h |
|
b/src/utils/workqueue.h |
|
... |
|
... |
20 |
#include <pthread.h>
|
20 |
#include <pthread.h>
|
21 |
#include <time.h>
|
21 |
#include <time.h>
|
22 |
|
22 |
|
23 |
#include <string>
|
23 |
#include <string>
|
24 |
#include <queue>
|
24 |
#include <queue>
|
25 |
#include "unordered_defs.h"
|
25 |
#include <list>
|
26 |
using std::queue;
|
|
|
27 |
using std::string;
|
|
|
28 |
|
26 |
|
29 |
#include "debuglog.h"
|
27 |
#include "debuglog.h"
|
30 |
#include "ptmutex.h"
|
28 |
#include "ptmutex.h"
|
31 |
|
|
|
32 |
/// Store per-worker-thread data. Just an initialized timespec, and
|
|
|
33 |
/// used at the moment.
|
|
|
34 |
class WQTData {
|
|
|
35 |
public:
|
|
|
36 |
WQTData() {wstart.tv_sec = 0; wstart.tv_nsec = 0;}
|
|
|
37 |
struct timespec wstart;
|
|
|
38 |
};
|
|
|
39 |
|
29 |
|
40 |
/**
|
30 |
/**
|
41 |
* A WorkQueue manages the synchronisation around a queue of work items,
|
31 |
* A WorkQueue manages the synchronisation around a queue of work items,
|
42 |
* where a number of client threads queue tasks and a number of worker
|
32 |
* where a number of client threads queue tasks and a number of worker
|
43 |
* threads take and execute them. The goal is to introduce some level
|
33 |
* threads take and execute them. The goal is to introduce some level
|
|
... |
|
... |
92 |
if ((err = pthread_create(&thr, 0, start_routine, arg))) {
|
82 |
if ((err = pthread_create(&thr, 0, start_routine, arg))) {
|
93 |
LOGERR(("WorkQueue:%s: pthread_create failed, err %d\n",
|
83 |
LOGERR(("WorkQueue:%s: pthread_create failed, err %d\n",
|
94 |
m_name.c_str(), err));
|
84 |
m_name.c_str(), err));
|
95 |
return false;
|
85 |
return false;
|
96 |
}
|
86 |
}
|
97 |
m_worker_threads.insert(pair<pthread_t, WQTData>(thr, WQTData()));
|
87 |
m_worker_threads.push_back(thr);
|
98 |
}
|
88 |
}
|
99 |
return true;
|
89 |
return true;
|
100 |
}
|
90 |
}
|
101 |
|
91 |
|
102 |
/** Add item to work queue, called from client.
|
92 |
/** Add item to work queue, called from client.
|
|
... |
|
... |
211 |
m_name.c_str(), m_tottasks, m_nowake, m_workersleeps,
|
201 |
m_name.c_str(), m_tottasks, m_nowake, m_workersleeps,
|
212 |
m_clientsleeps));
|
202 |
m_clientsleeps));
|
213 |
// Perform the thread joins and compute overall status
|
203 |
// Perform the thread joins and compute overall status
|
214 |
// Workers return (void*)1 if ok
|
204 |
// Workers return (void*)1 if ok
|
215 |
void *statusall = (void*)1;
|
205 |
void *statusall = (void*)1;
|
216 |
STD_UNORDERED_MAP<pthread_t, WQTData>::iterator it;
|
206 |
std::list<pthread_t>::iterator it;
|
217 |
while (!m_worker_threads.empty()) {
|
207 |
while (!m_worker_threads.empty()) {
|
218 |
void *status;
|
208 |
void *status;
|
219 |
it = m_worker_threads.begin();
|
209 |
it = m_worker_threads.begin();
|
220 |
pthread_join(it->first, &status);
|
210 |
pthread_join(*it, &status);
|
221 |
if (status == (void *)0)
|
211 |
if (status == (void *)0)
|
222 |
statusall = status;
|
212 |
statusall = status;
|
223 |
m_worker_threads.erase(it);
|
213 |
m_worker_threads.erase(it);
|
224 |
}
|
214 |
}
|
225 |
|
215 |
|
|
... |
|
... |
328 |
unsigned int m_workers_exited;
|
318 |
unsigned int m_workers_exited;
|
329 |
bool m_ok;
|
319 |
bool m_ok;
|
330 |
|
320 |
|
331 |
// Per-thread data. The data is not used currently, this could be
|
321 |
// Per-thread data. The data is not used currently, this could be
|
332 |
// a set<pthread_t>
|
322 |
// a set<pthread_t>
|
333 |
STD_UNORDERED_MAP<pthread_t, WQTData> m_worker_threads;
|
323 |
std::list<pthread_t> m_worker_threads;
|
334 |
|
324 |
|
335 |
// Synchronization
|
325 |
// Synchronization
|
336 |
queue<T> m_queue;
|
326 |
queue<T> m_queue;
|
337 |
pthread_cond_t m_ccond;
|
327 |
pthread_cond_t m_ccond;
|
338 |
pthread_cond_t m_wcond;
|
328 |
pthread_cond_t m_wcond;
|