Switch to side-by-side view

--- a/src/utils/workqueue.h
+++ b/src/utils/workqueue.h
@@ -30,6 +30,7 @@
 using std::string;
 
 #include "debuglog.h"
+#include "ptmutex.h"
 
 /// Just an initialized timespec. Not really used any more.
 class WQTData {
@@ -67,8 +68,7 @@
 	  m_workersleeps(0)
     {
         m_ok = (m_high >= 0) && (pthread_cond_init(&m_ccond, 0) == 0) &&
-	    (pthread_cond_init(&m_wcond, 0) == 0) && 
-            (pthread_mutex_init(&m_mutex, 0) == 0);
+	    (pthread_cond_init(&m_wcond, 0) == 0);
     }
 
     ~WorkQueue() 
@@ -88,19 +88,17 @@
      */
     bool start(int nworkers, void *(*start_routine)(void *), void *arg)
     {
-	pthread_mutex_lock(&m_mutex);
+	PTMutexLocker lock(m_mutex);
         for  (int i = 0; i < nworkers; i++) {
             int err;
             pthread_t thr;
             if ((err = pthread_create(&thr, 0, start_routine, arg))) {
                 LOGERR(("WorkQueue:%s: pthread_create failed, err %d\n",
                         m_name.c_str(), err));
-		pthread_mutex_unlock(&m_mutex);
                 return false;
             }
             m_worker_threads.insert(pair<pthread_t, WQTData>(thr, WQTData()));
         }
-	pthread_mutex_unlock(&m_mutex);
         return true;
     }
 
@@ -110,7 +108,8 @@
      */
     bool put(T t)
     {
-        if (pthread_mutex_lock(&m_mutex) != 0 || !ok()) {
+	PTMutexLocker lock(m_mutex);
+        if (!lock.ok() || !ok()) {
 	    LOGERR(("WorkQueue::put:%s: !ok or mutex_lock failed\n", 
 		    m_name.c_str()));
             return false;
@@ -119,9 +118,8 @@
         while (ok() && m_high > 0 && m_queue.size() >= m_high) {
             // Keep the order: we test ok() AFTER the sleep...
 	    m_clients_waiting++;
-            if (pthread_cond_wait(&m_ccond, &m_mutex) || !ok()) {
+            if (pthread_cond_wait(&m_ccond, lock.getMutex()) || !ok()) {
 		m_clients_waiting--;
-                pthread_mutex_unlock(&m_mutex);
                 return false;
             }
 	    m_clients_waiting--;
@@ -135,7 +133,6 @@
 	    m_nowake++;
 	}
 
-        pthread_mutex_unlock(&m_mutex);
         return true;
     }
 
@@ -152,7 +149,8 @@
      */
     bool waitIdle()
     {
-        if (pthread_mutex_lock(&m_mutex) != 0 || !ok()) {
+	PTMutexLocker lock(m_mutex);
+        if (!lock.ok() || !ok()) {
             LOGERR(("WorkQueue::waitIdle:%s: not ok or can't lock\n",
                     m_name.c_str()));
             return false;
@@ -163,18 +161,16 @@
         while (ok() && (m_queue.size() > 0 || 
                         m_workers_waiting != m_worker_threads.size())) {
 	    m_clients_waiting++;
-            if (pthread_cond_wait(&m_ccond, &m_mutex)) {
+            if (pthread_cond_wait(&m_ccond, lock.getMutex())) {
 		m_clients_waiting--;
                 m_ok = false;
                 LOGERR(("WorkQueue::waitIdle:%s: cond_wait failed\n",
 			   m_name.c_str()));
-                pthread_mutex_unlock(&m_mutex);
                 return false;
             }
 	    m_clients_waiting--;
         }
 
-        pthread_mutex_unlock(&m_mutex);
         return ok();
     }
 
@@ -185,8 +181,8 @@
      */
     void* setTerminateAndWait()
     {
+	PTMutexLocker lock(m_mutex);
         LOGDEB(("setTerminateAndWait:%s\n", m_name.c_str()));
-        pthread_mutex_lock(&m_mutex);
 
 	if (m_worker_threads.empty()) {
 	    // Already called ?
@@ -198,8 +194,7 @@
         while (m_workers_exited < m_worker_threads.size()) {
             pthread_cond_broadcast(&m_wcond);
 	    m_clients_waiting++;
-            if (pthread_cond_wait(&m_ccond, &m_mutex)) {
-                pthread_mutex_unlock(&m_mutex);
+            if (pthread_cond_wait(&m_ccond, lock.getMutex())) {
                 LOGERR(("WorkQueue::setTerminate:%s: cond_wait failed\n",
 			   m_name.c_str()));
 		m_clients_waiting--;
@@ -222,8 +217,13 @@
                 statusall = status;
             m_worker_threads.erase(it);
         }
+
+	// Reset to start state.
+	m_workers_waiting = m_workers_exited = m_clients_waiting = m_tottasks =
+	    m_nowake = m_workersleeps = 0;
+        m_ok = true;
+
         LOGDEB(("setTerminateAndWait:%s done\n", m_name.c_str()));
-        pthread_mutex_unlock(&m_mutex);
         return statusall;
     }
 
@@ -234,7 +234,8 @@
      */
     bool take(T* tp, size_t *szp = 0)
     {
-        if (pthread_mutex_lock(&m_mutex) != 0 || !ok()) {
+	PTMutexLocker lock(m_mutex);
+        if (!lock.ok() || !ok()) {
 	    LOGDEB(("WorkQueue::take:%s: not ok\n", m_name.c_str()));
             return false;
 	}
@@ -244,13 +245,12 @@
             m_workers_waiting++;
             if (m_queue.empty())
                 pthread_cond_broadcast(&m_ccond);
-            if (pthread_cond_wait(&m_wcond, &m_mutex) || !ok()) {
+            if (pthread_cond_wait(&m_wcond, lock.getMutex()) || !ok()) {
 		// !ok is a normal condition when shutting down
 		if (ok())
 		    LOGERR(("WorkQueue::take:%s: cond_wait failed or !ok\n",
 			    m_name.c_str()));
                 m_workers_waiting--;
-                pthread_mutex_unlock(&m_mutex);
                 return false;
             }
             m_workers_waiting--;
@@ -267,7 +267,6 @@
 	} else {
 	    m_nowake++;
 	}
-        pthread_mutex_unlock(&m_mutex);
         return true;
     }
 
@@ -281,19 +280,16 @@
     void workerExit()
     {
 	LOGDEB(("workerExit:%s\n", m_name.c_str()));
-        if (pthread_mutex_lock(&m_mutex) != 0)
-            return;
+	PTMutexLocker lock(m_mutex);
         m_workers_exited++;
         m_ok = false;
         pthread_cond_broadcast(&m_ccond);
-        pthread_mutex_unlock(&m_mutex);
     }
 
     size_t qsize() 
     {
-        pthread_mutex_lock(&m_mutex);
+	PTMutexLocker lock(m_mutex);
         size_t sz = m_queue.size();
-        pthread_mutex_unlock(&m_mutex);
         return sz;
     }
 
@@ -330,7 +326,7 @@
     queue<T> m_queue;
     pthread_cond_t m_ccond;
     pthread_cond_t m_wcond;
-    pthread_mutex_t m_mutex;
+    PTMutexInit m_mutex;
     unsigned int m_clients_waiting;
     unsigned int m_tottasks;
     unsigned int m_nowake;