--- a/src/utils/workqueue.h
+++ b/src/utils/workqueue.h
@@ -30,6 +30,7 @@
using std::string;
#include "debuglog.h"
+#include "ptmutex.h"
/// Just an initialized timespec. Not really used any more.
class WQTData {
@@ -67,8 +68,7 @@
m_workersleeps(0)
{
m_ok = (m_high >= 0) && (pthread_cond_init(&m_ccond, 0) == 0) &&
- (pthread_cond_init(&m_wcond, 0) == 0) &&
- (pthread_mutex_init(&m_mutex, 0) == 0);
+ (pthread_cond_init(&m_wcond, 0) == 0);
}
~WorkQueue()
@@ -88,19 +88,17 @@
*/
bool start(int nworkers, void *(*start_routine)(void *), void *arg)
{
- pthread_mutex_lock(&m_mutex);
+ PTMutexLocker lock(m_mutex);
for (int i = 0; i < nworkers; i++) {
int err;
pthread_t thr;
if ((err = pthread_create(&thr, 0, start_routine, arg))) {
LOGERR(("WorkQueue:%s: pthread_create failed, err %d\n",
m_name.c_str(), err));
- pthread_mutex_unlock(&m_mutex);
return false;
}
m_worker_threads.insert(pair<pthread_t, WQTData>(thr, WQTData()));
}
- pthread_mutex_unlock(&m_mutex);
return true;
}
@@ -110,7 +108,8 @@
*/
bool put(T t)
{
- if (pthread_mutex_lock(&m_mutex) != 0 || !ok()) {
+ PTMutexLocker lock(m_mutex);
+ if (!lock.ok() || !ok()) {
LOGERR(("WorkQueue::put:%s: !ok or mutex_lock failed\n",
m_name.c_str()));
return false;
@@ -119,9 +118,8 @@
while (ok() && m_high > 0 && m_queue.size() >= m_high) {
// Keep the order: we test ok() AFTER the sleep...
m_clients_waiting++;
- if (pthread_cond_wait(&m_ccond, &m_mutex) || !ok()) {
+ if (pthread_cond_wait(&m_ccond, lock.getMutex()) || !ok()) {
m_clients_waiting--;
- pthread_mutex_unlock(&m_mutex);
return false;
}
m_clients_waiting--;
@@ -135,7 +133,6 @@
m_nowake++;
}
- pthread_mutex_unlock(&m_mutex);
return true;
}
@@ -152,7 +149,8 @@
*/
bool waitIdle()
{
- if (pthread_mutex_lock(&m_mutex) != 0 || !ok()) {
+ PTMutexLocker lock(m_mutex);
+ if (!lock.ok() || !ok()) {
LOGERR(("WorkQueue::waitIdle:%s: not ok or can't lock\n",
m_name.c_str()));
return false;
@@ -163,18 +161,16 @@
while (ok() && (m_queue.size() > 0 ||
m_workers_waiting != m_worker_threads.size())) {
m_clients_waiting++;
- if (pthread_cond_wait(&m_ccond, &m_mutex)) {
+ if (pthread_cond_wait(&m_ccond, lock.getMutex())) {
m_clients_waiting--;
m_ok = false;
LOGERR(("WorkQueue::waitIdle:%s: cond_wait failed\n",
m_name.c_str()));
- pthread_mutex_unlock(&m_mutex);
return false;
}
m_clients_waiting--;
}
- pthread_mutex_unlock(&m_mutex);
return ok();
}
@@ -185,8 +181,8 @@
*/
void* setTerminateAndWait()
{
+ PTMutexLocker lock(m_mutex);
LOGDEB(("setTerminateAndWait:%s\n", m_name.c_str()));
- pthread_mutex_lock(&m_mutex);
if (m_worker_threads.empty()) {
// Already called ?
@@ -198,8 +194,7 @@
while (m_workers_exited < m_worker_threads.size()) {
pthread_cond_broadcast(&m_wcond);
m_clients_waiting++;
- if (pthread_cond_wait(&m_ccond, &m_mutex)) {
- pthread_mutex_unlock(&m_mutex);
+ if (pthread_cond_wait(&m_ccond, lock.getMutex())) {
LOGERR(("WorkQueue::setTerminate:%s: cond_wait failed\n",
m_name.c_str()));
m_clients_waiting--;
@@ -222,8 +217,13 @@
statusall = status;
m_worker_threads.erase(it);
}
+
+ // Reset to start state.
+ m_workers_waiting = m_workers_exited = m_clients_waiting = m_tottasks =
+ m_nowake = m_workersleeps = 0;
+ m_ok = true;
+
LOGDEB(("setTerminateAndWait:%s done\n", m_name.c_str()));
- pthread_mutex_unlock(&m_mutex);
return statusall;
}
@@ -234,7 +234,8 @@
*/
bool take(T* tp, size_t *szp = 0)
{
- if (pthread_mutex_lock(&m_mutex) != 0 || !ok()) {
+ PTMutexLocker lock(m_mutex);
+ if (!lock.ok() || !ok()) {
LOGDEB(("WorkQueue::take:%s: not ok\n", m_name.c_str()));
return false;
}
@@ -244,13 +245,12 @@
m_workers_waiting++;
if (m_queue.empty())
pthread_cond_broadcast(&m_ccond);
- if (pthread_cond_wait(&m_wcond, &m_mutex) || !ok()) {
+ if (pthread_cond_wait(&m_wcond, lock.getMutex()) || !ok()) {
// !ok is a normal condition when shutting down
if (ok())
LOGERR(("WorkQueue::take:%s: cond_wait failed or !ok\n",
m_name.c_str()));
m_workers_waiting--;
- pthread_mutex_unlock(&m_mutex);
return false;
}
m_workers_waiting--;
@@ -267,7 +267,6 @@
} else {
m_nowake++;
}
- pthread_mutex_unlock(&m_mutex);
return true;
}
@@ -281,19 +280,16 @@
void workerExit()
{
LOGDEB(("workerExit:%s\n", m_name.c_str()));
- if (pthread_mutex_lock(&m_mutex) != 0)
- return;
+ PTMutexLocker lock(m_mutex);
m_workers_exited++;
m_ok = false;
pthread_cond_broadcast(&m_ccond);
- pthread_mutex_unlock(&m_mutex);
}
size_t qsize()
{
- pthread_mutex_lock(&m_mutex);
+ PTMutexLocker lock(m_mutex);
size_t sz = m_queue.size();
- pthread_mutex_unlock(&m_mutex);
return sz;
}
@@ -330,7 +326,7 @@
queue<T> m_queue;
pthread_cond_t m_ccond;
pthread_cond_t m_wcond;
- pthread_mutex_t m_mutex;
+ PTMutexInit m_mutex;
unsigned int m_clients_waiting;
unsigned int m_tottasks;
unsigned int m_nowake;