Switch to side-by-side view

--- a/src/workqueue.h
+++ b/src/workqueue.h
@@ -30,8 +30,8 @@
 /// used at the moment.
 class WQTData {
 public:
-	WQTData() {wstart.tv_sec = 0; wstart.tv_nsec = 0;}
-	struct timespec wstart;
+    WQTData() {wstart.tv_sec = 0; wstart.tv_nsec = 0;}
+    struct timespec wstart;
 };
 
 /**
@@ -50,275 +50,295 @@
 template <class T> class WorkQueue {
 public:
 
-	/** Create a WorkQueue
-	 * @param name for message printing
-	 * @param hi number of tasks on queue before clients blocks. Default 0
-	 *	  meaning no limit. hi == -1 means that the queue is disabled.
-	 * @param lo minimum count of tasks before worker starts. Default 1.
-	 */
-	WorkQueue(const std::string& name, size_t hi = 0, size_t lo = 1)
-		: m_name(name), m_high(hi), m_low(lo),
-		  m_workers_exited(0), m_clients_waiting(0), m_workers_waiting(0),
-		  m_tottasks(0), m_nowake(0), m_workersleeps(0), m_clientsleeps(0)
-	{
-		m_ok = (pthread_cond_init(&m_ccond, 0) == 0) &&
-			(pthread_cond_init(&m_wcond, 0) == 0);
-	}
-
-	~WorkQueue()
-	{
-		if (!m_worker_threads.empty())
-			setTerminateAndWait();
-	}
-
-	/** Start the worker threads.
-	 *
-	 * @param nworkers number of threads copies to start.
-	 * @param start_routine thread function. It should loop
-	 *		taking (QueueWorker::take()) and executing tasks.
-	 * @param arg initial parameter to thread function.
-	 * @return true if ok.
-	 */
-	bool start(int nworkers, void *(*workproc)(void *), void *arg)
-	{
-		PTMutexLocker lock(m_mutex);
-		for	 (int i = 0; i < nworkers; i++) {
-			int err;
-			pthread_t thr;
-			if ((err = pthread_create(&thr, 0, workproc, arg))) {
-				return false;
-			}
-			m_worker_threads.insert(std::pair<pthread_t, WQTData>(thr, WQTData()));
-		}
-		return true;
-	}
-
-	/** Add item to work queue, called from client.
-	 *
-	 * Sleeps if there are already too many.
-	 */
-	bool put(T t, bool flushprevious = false)
-	{
-		PTMutexLocker lock(m_mutex);
-		if (!lock.ok() || !ok()) {
-			return false;
-		}
-
-		while (ok() && m_high > 0 && m_queue.size() >= m_high) {
-			m_clientsleeps++;
-			// Keep the order: we test ok() AFTER the sleep...
-			m_clients_waiting++;
-			if (pthread_cond_wait(&m_ccond, lock.getMutex()) || !ok()) {
-				m_clients_waiting--;
-				return false;
-			}
-			m_clients_waiting--;
-		}
-		if (flushprevious) {
-			while (!m_queue.empty())
-				m_queue.pop();
-		}
-		m_queue.push(t);
-		if (m_workers_waiting > 0) {
-			// Just wake one worker, there is only one new task.
-			pthread_cond_signal(&m_wcond);
-		} else {
-			m_nowake++;
-		}
-
-		return true;
-	}
-
-	/** Wait until the queue is inactive. Called from client.
-	 *
-	 * Waits until the task queue is empty and the workers are all
-	 * back sleeping. Used by the client to wait for all current work
-	 * to be completed, when it needs to perform work that couldn't be
-	 * done in parallel with the worker's tasks, or before shutting
-	 * down. Work can be resumed after calling this. Note that the
-	 * only thread which can call it safely is the client just above
-	 * (which can control the task flow), else there could be
-	 * tasks in the intermediate queues.
-	 * To rephrase: there is no warranty on return that the queue is actually
-	 * idle EXCEPT if the caller knows that no jobs are still being created.
-	 * It would be possible to transform this into a safe call if some kind
-	 * of suspend condition was set on the queue by waitIdle(), to be reset by
-	 * some kind of "resume" call. Not currently the case.
-	 */
-	bool waitIdle()
-	{
-		PTMutexLocker lock(m_mutex);
-		if (!lock.ok() || !ok()) {
-			return false;
-		}
-
-		// We're done when the queue is empty AND all workers are back
-		// waiting for a task.
-		while (ok() && (m_queue.size() > 0 ||
-						m_workers_waiting != m_worker_threads.size())) {
-			m_clients_waiting++;
-			if (pthread_cond_wait(&m_ccond, lock.getMutex())) {
-				m_clients_waiting--;
-				m_ok = false;
-				return false;
-			}
-			m_clients_waiting--;
-		}
-
-		return ok();
-	}
-
-
-	/** Tell the workers to exit, and wait for them.
-	 *
-	 * Does not bother about tasks possibly remaining on the queue, so
-	 * should be called after waitIdle() for an orderly shutdown.
-	 */
-	void* setTerminateAndWait()
-	{
-		PTMutexLocker lock(m_mutex);
-
-		if (m_worker_threads.empty()) {
-			// Already called ?
-			return (void*)0;
-		}
-
-		// Wait for all worker threads to have called workerExit()
-		m_ok = false;
-		while (m_workers_exited < m_worker_threads.size()) {
-			pthread_cond_broadcast(&m_wcond);
-			m_clients_waiting++;
-			if (pthread_cond_wait(&m_ccond, lock.getMutex())) {
-				m_clients_waiting--;
-				return (void*)0;
-			}
-			m_clients_waiting--;
-		}
-
-		// Perform the thread joins and compute overall status
-		// Workers return (void*)1 if ok
-		void *statusall = (void*)1;
-		std::unordered_map<pthread_t,  WQTData>::iterator it;
-		while (!m_worker_threads.empty()) {
-			void *status;
-			it = m_worker_threads.begin();
-			pthread_join(it->first, &status);
-			if (status == (void *)0)
-				statusall = status;
-			m_worker_threads.erase(it);
-		}
-
-		// Reset to start state.
-		m_workers_exited = m_clients_waiting = m_workers_waiting =
-			m_tottasks = m_nowake = m_workersleeps = m_clientsleeps = 0;
-		m_ok = true;
-
-		return statusall;
-	}
-
-	/** Take task from queue. Called from worker.
-	 *
-	 * Sleeps if there are not enough. Signal if we go to sleep on empty
-	 * queue: client may be waiting for our going idle.
-	 */
-	bool take(T* tp, size_t *szp = 0)
-	{
-		PTMutexLocker lock(m_mutex);
-		if (!lock.ok() || !ok()) {
-			return false;
-		}
-
-		while (ok() && m_queue.size() < m_low) {
-			m_workersleeps++;
-			m_workers_waiting++;
-			if (m_queue.empty())
-				pthread_cond_broadcast(&m_ccond);
-			if (pthread_cond_wait(&m_wcond, lock.getMutex()) || !ok()) {
-				m_workers_waiting--;
-				return false;
-			}
-			m_workers_waiting--;
-		}
-
-		m_tottasks++;
-		*tp = m_queue.front();
-		if (szp)
-			*szp = m_queue.size();
-		m_queue.pop();
-		if (m_clients_waiting > 0) {
-			// No reason to wake up more than one client thread
-			pthread_cond_signal(&m_ccond);
-		} else {
-			m_nowake++;
-		}
-		return true;
-	}
-
-	/** Advertise exit and abort queue. Called from worker
-	 *
-	 * This would happen after an unrecoverable error, or when
-	 * the queue is terminated by the client. Workers never exit normally,
-	 * except when the queue is shut down (at which point m_ok is set to
-	 * false by the shutdown code anyway). The thread must return/exit
-	 * immediately after calling this.
-	 */
-	void workerExit()
-	{
-		PTMutexLocker lock(m_mutex);
-		m_workers_exited++;
-		m_ok = false;
-		pthread_cond_broadcast(&m_ccond);
-	}
-
-	size_t qsize()
-	{
-		PTMutexLocker lock(m_mutex);
-		size_t sz = m_queue.size();
-		return sz;
+    /** Create a WorkQueue
+     * @param name for message printing
+     * @param hi number of tasks on queue before clients blocks. Default 0
+     *	  meaning no limit. hi == -1 means that the queue is disabled.
+     * @param lo minimum count of tasks before worker starts. Default 1.
+     */
+    WorkQueue(const std::string& name, size_t hi = 0, size_t lo = 1)
+        : m_name(name), m_high(hi), m_low(lo),
+          m_workers_exited(0), m_clients_waiting(0), m_workers_waiting(0),
+          m_tottasks(0), m_nowake(0), m_workersleeps(0), m_clientsleeps(0)
+	{
+            m_ok = (pthread_cond_init(&m_ccond, 0) == 0) &&
+                (pthread_cond_init(&m_wcond, 0) == 0);
+	}
+
+    ~WorkQueue()
+	{
+            if (!m_worker_threads.empty())
+                setTerminateAndWait();
+	}
+
+    /** Start the worker threads.
+     *
+     * @param nworkers number of threads copies to start.
+     * @param start_routine thread function. It should loop
+     *	  taking (WorkQueue::take()) and executing tasks.
+     * @param arg initial parameter to thread function.
+     * @return true if ok.
+     */
+    bool start(int nworkers, void *(*workproc)(void *), void *arg)
+	{
+            PTMutexLocker lock(m_mutex);
+            for	 (int i = 0; i < nworkers; i++) {
+                int err;
+                pthread_t thr;
+                if ((err = pthread_create(&thr, 0, workproc, arg))) {
+                    return false;
+                }
+                m_worker_threads.insert(std::pair<pthread_t, WQTData>(thr, WQTData()));
+            }
+            return true;
+	}
+
+    /** Add item to work queue, called from client.
+     *
+     * Sleeps if there are already too many.
+     */
+    bool put(T t, bool flushprevious = false)
+	{
+            PTMutexLocker lock(m_mutex);
+            if (!lock.ok() || !ok()) {
+                return false;
+            }
+
+            while (ok() && m_high > 0 && m_queue.size() >= m_high) {
+                m_clientsleeps++;
+                // Keep the order: we test ok() AFTER the sleep...
+                m_clients_waiting++;
+                if (pthread_cond_wait(&m_ccond, lock.getMutex()) || !ok()) {
+                    m_clients_waiting--;
+                    return false;
+                }
+                m_clients_waiting--;
+            }
+            if (flushprevious) {
+                while (!m_queue.empty())
+                    m_queue.pop();
+            }
+            m_queue.push(t);
+            if (m_workers_waiting > 0) {
+                // Just wake one worker, there is only one new task.
+                pthread_cond_signal(&m_wcond);
+            } else {
+                m_nowake++;
+            }
+
+            return true;
+	}
+
+    /** Wait until the queue is inactive. Called from client.
+     *
+     * Waits until the task queue is empty and the workers are all
+     * back sleeping. Used by the client to wait for all current work
+     * to be completed, when it needs to perform work that couldn't be
+     * done in parallel with the worker's tasks, or before shutting
+     * down. Work can be resumed after calling this. Note that the
+     * only thread which can call it safely is the client just above
+     * (which can control the task flow), else there could be
+     * tasks in the intermediate queues.
+     * To rephrase: there is no warranty on return that the queue is actually
+     * idle EXCEPT if the caller knows that no jobs are still being created.
+     * It would be possible to transform this into a safe call if some kind
+     * of suspend condition was set on the queue by waitIdle(), to be reset by
+     * some kind of "resume" call. Not currently the case.
+     */
+    bool waitIdle()
+	{
+            PTMutexLocker lock(m_mutex);
+            if (!lock.ok() || !ok()) {
+                return false;
+            }
+
+            // We're done when the queue is empty AND all workers are back
+            // waiting for a task.
+            while (ok() && (m_queue.size() > 0 ||
+                            m_workers_waiting != m_worker_threads.size())) {
+                m_clients_waiting++;
+                if (pthread_cond_wait(&m_ccond, lock.getMutex())) {
+                    m_clients_waiting--;
+                    m_ok = false;
+                    return false;
+                }
+                m_clients_waiting--;
+            }
+
+            return ok();
+	}
+
+
+    /** Tell the workers to exit, and wait for them.
+     *
+     * Does not bother about tasks possibly remaining on the queue, so
+     * should be called after waitIdle() for an orderly shutdown.
+     */
+    void* setTerminateAndWait()
+	{
+            PTMutexLocker lock(m_mutex);
+
+            if (m_worker_threads.empty()) {
+                // Already called ?
+                return (void*)0;
+            }
+
+            // Wait for all worker threads to have called workerExit()
+            m_ok = false;
+            while (m_workers_exited < m_worker_threads.size()) {
+                pthread_cond_broadcast(&m_wcond);
+                m_clients_waiting++;
+                if (pthread_cond_wait(&m_ccond, lock.getMutex())) {
+                    m_clients_waiting--;
+                    return (void*)0;
+                }
+                m_clients_waiting--;
+            }
+
+            // Perform the thread joins and compute overall status
+            // Workers return (void*)1 if ok
+            void *statusall = (void*)1;
+            std::unordered_map<pthread_t,  WQTData>::iterator it;
+            while (!m_worker_threads.empty()) {
+                void *status;
+                it = m_worker_threads.begin();
+                pthread_join(it->first, &status);
+                if (status == (void *)0)
+                    statusall = status;
+                m_worker_threads.erase(it);
+            }
+
+            // Reset to start state.
+            m_workers_exited = m_clients_waiting = m_workers_waiting =
+                m_tottasks = m_nowake = m_workersleeps = m_clientsleeps = 0;
+            m_ok = true;
+
+            return statusall;
+	}
+
+    /** Take task from queue. Called from worker.
+     *
+     * Sleeps if there are not enough. Signal if we go to sleep on empty
+     * queue: client may be waiting for our going idle.
+     */
+    bool take(T* tp, size_t *szp = 0)
+	{
+            PTMutexLocker lock(m_mutex);
+            if (!lock.ok() || !ok()) {
+                return false;
+            }
+
+            while (ok() && m_queue.size() < m_low) {
+                m_workersleeps++;
+                m_workers_waiting++;
+                if (m_queue.empty())
+                    pthread_cond_broadcast(&m_ccond);
+                if (pthread_cond_wait(&m_wcond, lock.getMutex()) || !ok()) {
+                    m_workers_waiting--;
+                    return false;
+                }
+                m_workers_waiting--;
+            }
+
+            m_tottasks++;
+            *tp = m_queue.front();
+            if (szp)
+                *szp = m_queue.size();
+            m_queue.pop();
+            if (m_clients_waiting > 0) {
+                // No reason to wake up more than one client thread
+                pthread_cond_signal(&m_ccond);
+            } else {
+                m_nowake++;
+            }
+            return true;
+	}
+    	
+    bool waitminsz(size_t sz) {
+        PTMutexLocker lock(m_mutex);
+        if (!lock.ok() || !ok()) {
+            return false;
+        }
+
+        while (ok() && m_queue.size() < sz) {
+            m_workersleeps++;
+            m_workers_waiting++;
+            if (m_queue.empty())
+                pthread_cond_broadcast(&m_ccond);
+            if (pthread_cond_wait(&m_wcond, lock.getMutex()) || !ok()) {
+                m_workers_waiting--;
+                return false;
+            }
+            m_workers_waiting--;
+        }
+        return true;
+    }
+
+    /** Advertise exit and abort queue. Called from worker
+     *
+     * This would happen after an unrecoverable error, or when
+     * the queue is terminated by the client. Workers never exit normally,
+     * except when the queue is shut down (at which point m_ok is set to
+     * false by the shutdown code anyway). The thread must return/exit
+     * immediately after calling this.
+     */
+    void workerExit()
+	{
+            PTMutexLocker lock(m_mutex);
+            m_workers_exited++;
+            m_ok = false;
+            pthread_cond_broadcast(&m_ccond);
+	}
+
+    size_t qsize()
+	{
+            PTMutexLocker lock(m_mutex);
+            size_t sz = m_queue.size();
+            return sz;
 	}
 
 private:
-	bool ok()
-	{
-		bool isok = m_ok && m_workers_exited == 0 && !m_worker_threads.empty();
-		return isok;
-	}
-
-	long long nanodiff(const struct timespec& older,
-					   const struct timespec& newer)
-	{
-		return (newer.tv_sec - older.tv_sec) * 1000000000LL
-			+ newer.tv_nsec - older.tv_nsec;
-	}
-
-	// Configuration
-	std::string m_name;
-	size_t m_high;
-	size_t m_low;
-
-	// Status
-	// Worker threads having called exit
-	unsigned int m_workers_exited;
-	bool m_ok;
-
-	// Per-thread data. The data is not used currently, this could be
-	// a set<pthread_t>
-	std::unordered_map<pthread_t, WQTData> m_worker_threads;
-
-	// Synchronization
-	std::queue<T> m_queue;
-	pthread_cond_t m_ccond;
-	pthread_cond_t m_wcond;
-	PTMutexInit m_mutex;
-	// Client/Worker threads currently waiting for a job
-	unsigned int m_clients_waiting;
-	unsigned int m_workers_waiting;
-
-	// Statistics
-	unsigned int m_tottasks;
-	unsigned int m_nowake;
-	unsigned int m_workersleeps;
-	unsigned int m_clientsleeps;
+    bool ok()
+	{
+            bool isok = m_ok && m_workers_exited == 0 && !m_worker_threads.empty();
+            return isok;
+	}
+
+    long long nanodiff(const struct timespec& older,
+                       const struct timespec& newer)
+	{
+            return (newer.tv_sec - older.tv_sec) * 1000000000LL
+                + newer.tv_nsec - older.tv_nsec;
+	}
+
+    // Configuration
+    std::string m_name;
+    size_t m_high;
+    size_t m_low;
+
+    // Status
+    // Worker threads having called exit
+    unsigned int m_workers_exited;
+    bool m_ok;
+
+    // Per-thread data. The data is not used currently, this could be
+    // a set<pthread_t>
+    std::unordered_map<pthread_t, WQTData> m_worker_threads;
+
+    // Synchronization
+    std::queue<T> m_queue;
+    pthread_cond_t m_ccond;
+    pthread_cond_t m_wcond;
+    PTMutexInit m_mutex;
+    // Client/Worker threads currently waiting for a job
+    unsigned int m_clients_waiting;
+    unsigned int m_workers_waiting;
+
+    // Statistics
+    unsigned int m_tottasks;
+    unsigned int m_nowake;
+    unsigned int m_workersleeps;
+    unsigned int m_clientsleeps;
 };
 
 #endif /* _WORKQUEUE_H_INCLUDED_ */