Switch to side-by-side view

--- a/src/utils/workqueue.h
+++ b/src/utils/workqueue.h
@@ -17,17 +17,33 @@
 #ifndef _WORKQUEUE_H_INCLUDED_
 #define _WORKQUEUE_H_INCLUDED_
 
-#include "pthread.h"
+#include <pthread.h>
+#include <time.h>
+
 #include <string>
 #include <queue>
+#include <tr1/unordered_map>
+#include <tr1/unordered_set>
+using std::tr1::unordered_map;
+using std::tr1::unordered_set;
 using std::queue;
 using std::string;
 
+#include "debuglog.h"
+
+#define WORKQUEUE_TIMING
+
+class WQTData {
+    public:
+    WQTData() {wstart.tv_sec = 0; wstart.tv_nsec = 0;}
+    struct timespec wstart;
+};
+
 /**
  * A WorkQueue manages the synchronisation around a queue of work items,
- * where a single client thread queues tasks and a single worker takes
- * and executes them. The goal is to introduce some level of
- * parallelism between the successive steps of a previously single
+ * where a number of client threads queue tasks and a number of worker
+ * threads takes and executes them. The goal is to introduce some level
+ * of parallelism between the successive steps of a previously single
  * threaded pipe-line (data extraction / data preparation / index
  * update).
  *
@@ -38,154 +54,286 @@
  */
 template <class T> class WorkQueue {
 public:
-    WorkQueue(int hi = 0, int lo = 1)
-	: m_high(hi), m_low(lo), m_size(0), m_worker_up(false),
-	  m_worker_waiting(false), m_jobcnt(0), m_lenacc(0)
-    {
-	m_ok = (pthread_cond_init(&m_cond, 0) == 0) && 
-	    (pthread_mutex_init(&m_mutex, 0) == 0);
+
+    /** Create a WorkQueue
+     * @param name for message printing
+     * @param hi number of tasks on queue before clients blocks. Default 0 
+     *    meaning no limit.
+     * @param lo minimum count of tasks before worker starts. Default 1.
+     */
+    WorkQueue(const string& name, int hi = 0, int lo = 1)
+        : m_name(name), m_high(hi), m_low(lo), m_size(0), 
+          m_workers_waiting(0), m_workers_exited(0), m_jobcnt(0), 
+          m_clientwait(0), m_workerwait(0), m_workerwork(0)
+    {
+        m_ok = (pthread_cond_init(&m_cond, 0) == 0) && 
+            (pthread_mutex_init(&m_mutex, 0) == 0);
     }
 
     ~WorkQueue() 
     {
-	if (m_worker_up)
-	    setTerminateAndWait();
-    }
-
-    /** Start the worker thread. The start_routine will loop
-     *  taking and executing tasks. */
-    bool start(void *(*start_routine)(void *), void *arg)
-    {
-	bool status = pthread_create(&m_worker_thread, 0, 
-				     start_routine, arg) == 0;
-	if (status)
-	    m_worker_up = true;
-	return status;
-    }
-
-    /**
-     * Add item to work queue. Sleep if there are already too many.
-     * Called from client.
+        LOGDEB2(("WorkQueue::~WorkQueue: name %s\n", m_name.c_str()));
+        if (!m_worker_threads.empty())
+            setTerminateAndWait();
+    }
+
+    /** Start the worker threads. 
+     *
+     * @param nworkers number of threads copies to start.
+     * @param start_routine thread function. It should loop
+     *      taking (QueueWorker::take() and executing tasks. 
+     * @param arg initial parameter to thread function.
+     * @return true if ok.
+     */
+    bool start(int nworkers, void *(*start_routine)(void *), void *arg)
+    {
+        for  (int i = 0; i < nworkers; i++) {
+            int err;
+            pthread_t thr;
+            if ((err = pthread_create(&thr, 0, start_routine, arg))) {
+                LOGERR(("WorkQueue:%s: pthread_create failed, err %d\n",
+                        m_name.c_str(), err));
+                return false;
+            }
+            m_worker_threads.insert(pair<pthread_t, WQTData>(thr, WQTData()));
+        }
+        return true;
+    }
+
+    /** Add item to work queue, called from client.
+     *
+     * Sleeps if there are already too many.
      */
     bool put(T t)
     {
-	if (!ok() || pthread_mutex_lock(&m_mutex) != 0) 
-	    return false;
-
-	while (ok() && m_high > 0 && m_queue.size() >= m_high) {
-	    // Keep the order: we test ok() AFTER the sleep...
-	    if (pthread_cond_wait(&m_cond, &m_mutex) || !ok()) {
-		pthread_mutex_unlock(&m_mutex);
-		return false;
-	    }
+        if (!ok() || pthread_mutex_lock(&m_mutex) != 0) 
+            return false;
+
+#ifdef WORKQUEUE_TIMING
+        struct timespec before;
+        clock_gettime(CLOCK_MONOTONIC, &before);
+#endif
+
+        while (ok() && m_high > 0 && m_queue.size() >= m_high) {
+            // Keep the order: we test ok() AFTER the sleep...
+            if (pthread_cond_wait(&m_cond, &m_mutex) || !ok()) {
+                pthread_mutex_unlock(&m_mutex);
+                return false;
+            }
+        }
+
+#ifdef WORKQUEUE_TIMING
+        struct timespec after;
+        clock_gettime(CLOCK_MONOTONIC, &after);
+        m_clientwait += nanodiff(before, after);
+#endif
+
+        m_queue.push(t);
+        ++m_size;
+	// Just wake one worker, there is only one new task.
+        pthread_cond_signal(&m_cond);
+        pthread_mutex_unlock(&m_mutex);
+        return true;
+    }
+
+    /** Wait until the queue is inactive. Called from client.
+     *
+     * Waits until the task queue is empty and the workers are all
+     * back sleeping. Used by the client to wait for all current work
+     * to be completed, when it needs to perform work that couldn't be
+     * done in parallel with the worker's tasks, or before shutting
+     * down. Work can be resumed after calling this.
+     */
+    bool waitIdle()
+    {
+        if (!ok() || pthread_mutex_lock(&m_mutex) != 0) {
+            LOGERR(("WorkQueue::waitIdle: %s not ok or can't lock\n",
+                    m_name.c_str()));
+            return false;
+        }
+
+        // We're done when the queue is empty AND all workers are back
+        // waiting for a task.
+        while (ok() && (m_queue.size() > 0 || 
+                        m_workers_waiting != m_worker_threads.size())) {
+            if (pthread_cond_wait(&m_cond, &m_mutex)) {
+                pthread_mutex_unlock(&m_mutex);
+                m_ok = false;
+                LOGERR(("WorkQueue::waitIdle: cond_wait failed\n"));
+                return false;
+            }
+        }
+
+#ifdef WORKQUEUE_TIMING
+        long long M = 1000000LL;
+        long long wscl = m_worker_threads.size() * M;
+        LOGERR(("WorkQueue:%s: clients wait (all) %lld mS, "
+                 "worker wait (avg) %lld mS, worker work (avg) %lld mS\n", 
+                 m_name.c_str(), m_clientwait / M, m_workerwait / wscl,  
+                 m_workerwork / wscl));
+#endif // WORKQUEUE_TIMING
+
+        pthread_mutex_unlock(&m_mutex);
+        return ok();
+    }
+
+
+    /** Tell the workers to exit, and wait for them. Does not bother about
+     * tasks possibly remaining on the queue, so should be called
+     * after waitIdle() for an orderly shutdown.
+     */
+    void* setTerminateAndWait()
+    {
+        LOGDEB(("setTerminateAndWait:%s\n", m_name.c_str()));
+        pthread_mutex_lock(&m_mutex);
+
+	if (m_worker_threads.empty()) {
+	    // Already called ?
+	    return (void*)0;
 	}
 
-	m_queue.push(t);
-	++m_size;
-	pthread_cond_broadcast(&m_cond);
-	pthread_mutex_unlock(&m_mutex);
-	return true;
-    }
-
-    /** Wait until the queue is empty and the worker is
-     *  back waiting for task. Called from the client when it needs to
-     *  perform work that couldn't be done in parallel with the
-     *  worker's tasks.
-     */
-    bool waitIdle()
-    {
-	if (!ok() || pthread_mutex_lock(&m_mutex) != 0) 
-	    return false;
-
-	// We're done when the queue is empty AND the worker is back
-	// for a task (has finished the last)
-	while (ok() && (m_queue.size() > 0 || !m_worker_waiting)) {
-	    if (pthread_cond_wait(&m_cond, &m_mutex)) {
-		pthread_mutex_unlock(&m_mutex);
-		return false;
-	    }
-	}
-	pthread_mutex_unlock(&m_mutex);
-	return ok();
-    }
-
-    /** Tell the worker to exit, and wait for it. There may still
-	be tasks on the queue. */
-    void* setTerminateAndWait()
-    {
-	if (!m_worker_up)
-	    return (void *)0;
-
-	pthread_mutex_lock(&m_mutex);
-	m_ok = false;
-	pthread_cond_broadcast(&m_cond);
-	pthread_mutex_unlock(&m_mutex);
-	void *status;
-	pthread_join(m_worker_thread, &status);
-	m_worker_up = false;
-	return status;
-    }
-
-    /** Remove task from queue. Sleep if there are not enough. Signal if we go
-	to sleep on empty queue: client may be waiting for our going idle */
+	// Wait for all worker threads to have called workerExit()
+        m_ok = false;
+        while (m_workers_exited < m_worker_threads.size()) {
+            pthread_cond_broadcast(&m_cond);
+            if (pthread_cond_wait(&m_cond, &m_mutex)) {
+                pthread_mutex_unlock(&m_mutex);
+                LOGERR(("WorkQueue::setTerminate: cond_wait failed\n"));
+                return false;
+            }
+        }
+
+	// Perform the thread joins and compute overall status
+        // Workers return (void*)1 if ok
+        void *statusall = (void*)1;
+        unordered_map<pthread_t,  WQTData>::iterator it;
+        while (!m_worker_threads.empty()) {
+            void *status;
+            it = m_worker_threads.begin();
+            pthread_join(it->first, &status);
+            if (status == (void *)0)
+                statusall = status;
+            m_worker_threads.erase(it);
+        }
+        pthread_mutex_unlock(&m_mutex);
+        LOGDEB(("setTerminateAndWait:%s done\n", m_name.c_str()));
+        return statusall;
+    }
+
+    /** Take task from queue. Called from worker.
+     * 
+     * Sleeps if there are not enough. Signal if we go
+     * to sleep on empty queue: client may be waiting for our going idle.
+     */
     bool take(T* tp)
     {
-	if (!ok() || pthread_mutex_lock(&m_mutex) != 0)
-	    return false;
-
-	while (ok() && m_queue.size() < m_low) {
-	    m_worker_waiting = true;
-	    if (m_queue.empty())
-		pthread_cond_broadcast(&m_cond);
-	    if (pthread_cond_wait(&m_cond, &m_mutex) || !ok()) {
-		pthread_mutex_unlock(&m_mutex);
-		m_worker_waiting = false;
-		return false;
-	    }
-	    m_worker_waiting = false;
-	}
-
-	++m_jobcnt;
-	m_lenacc += m_size;
-
-	*tp = m_queue.front();
-	m_queue.pop();
-	--m_size;
-
-	pthread_cond_broadcast(&m_cond);
-	pthread_mutex_unlock(&m_mutex);
-	return true;
-    }
-
-    /** Take note of the worker exit. This would normally happen after an
-	unrecoverable error */
+        if (!ok() || pthread_mutex_lock(&m_mutex) != 0)
+            return false;
+
+#ifdef WORKQUEUE_TIMING
+        struct timespec beforesleep;
+        clock_gettime(CLOCK_MONOTONIC, &beforesleep);
+
+        pthread_t me = pthread_self();
+        unordered_map<pthread_t, WQTData>::iterator it = 
+            m_worker_threads.find(me);
+        if (it != m_worker_threads.end() && 
+            it->second.wstart.tv_sec != 0 && it->second.wstart.tv_nsec != 0) {
+            long long nanos = nanodiff(it->second.wstart, beforesleep);
+            m_workerwork += nanos;
+        }
+#endif
+
+        while (ok() && m_queue.size() < m_low) {
+            m_workers_waiting++;
+            if (m_queue.empty())
+                pthread_cond_broadcast(&m_cond);
+            if (pthread_cond_wait(&m_cond, &m_mutex) || !ok()) {
+		// !ok is a normal condition when shutting down
+		if (ok())
+		    LOGERR(("WorkQueue::take:%s: cond_wait failed or !ok\n",
+			    m_name.c_str()));
+                pthread_mutex_unlock(&m_mutex);
+                m_workers_waiting--;
+                return false;
+            }
+            m_workers_waiting--;
+        }
+
+#ifdef WORKQUEUE_TIMING
+        struct timespec aftersleep;
+        clock_gettime(CLOCK_MONOTONIC, &aftersleep);
+        m_workerwait += nanodiff(beforesleep, aftersleep);
+        it = m_worker_threads.find(me);
+        if (it != m_worker_threads.end())
+            it->second.wstart = aftersleep;
+#endif
+
+        ++m_jobcnt;
+        *tp = m_queue.front();
+        m_queue.pop();
+        --m_size;
+	// No reason to wake up more than one client thread
+        pthread_cond_signal(&m_cond);
+        pthread_mutex_unlock(&m_mutex);
+        return true;
+    }
+
+    /** Advertise exit and abort queue. Called from worker
+     * This would normally happen after an unrecoverable error, or when 
+     * the queue is terminated by the client. Workers never exit normally,
+     * except when the queue is shut down (at which point m_ok is set to false
+     * by the shutdown code anyway). The thread must return/exit immediately 
+     * after calling this
+     */
     void workerExit()
     {
-	if (!ok() || pthread_mutex_lock(&m_mutex) != 0)
-	    return;
-	m_ok = false;
-	pthread_cond_broadcast(&m_cond);
-	pthread_mutex_unlock(&m_mutex);
-    }
-
-    /** Debug only: as the size is returned while the queue is unlocked, there
-     *  is no warranty on its consistency. Not that we use the member size, not 
-     *  the container size() call which would need locking.
-     */
-    size_t size() {return m_size;}
+        if (pthread_mutex_lock(&m_mutex) != 0)
+            return;
+        m_workers_exited++;
+        m_ok = false;
+        pthread_cond_broadcast(&m_cond);
+        pthread_mutex_unlock(&m_mutex);
+    }
+
+    /** Return current queue size. Debug only.
+     *
+     *  As the size is returned while the queue is unlocked, there
+     *  is no warranty on its consistency. Not that we use the member
+     *  size, not the container size() call which would need locking.
+     */
+    size_t size() 
+    {
+        return m_size;
+    }
 
 private:
-    bool ok() {return m_ok && m_worker_up;}
-
+    bool ok() 
+    {
+        return m_ok && m_workers_exited == 0 && !m_worker_threads.empty();
+    }
+
+    long long nanodiff(const struct timespec& older, 
+                       const struct timespec& newer)
+    {
+        return (newer.tv_sec - older.tv_sec) * 1000000000LL
+            + newer.tv_nsec - older.tv_nsec;
+    }
+
+    string m_name;
     size_t m_high;
     size_t m_low; 
     size_t m_size;
-    bool m_worker_up;
-    bool m_worker_waiting;
+    /* Worker threads currently waiting for a job */
+    unsigned int m_workers_waiting;
+    unsigned int m_workers_exited;
+    /* Stats */
     int m_jobcnt;
-    int m_lenacc;
-
-    pthread_t m_worker_thread;
+    long long m_clientwait;
+    long long m_workerwait;
+    long long m_workerwork;
+
+    unordered_map<pthread_t, WQTData> m_worker_threads;
     queue<T> m_queue;
     pthread_cond_t m_cond;
     pthread_mutex_t m_mutex;