--- a/src/utils/workqueue.h
+++ b/src/utils/workqueue.h
@@ -1,4 +1,4 @@
-/* Copyright (C) 2012 J.F.Dockes
+/* Copyright (C) 2012-2016 J.F.Dockes
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
@@ -17,15 +17,15 @@
#ifndef _WORKQUEUE_H_INCLUDED_
#define _WORKQUEUE_H_INCLUDED_
-#include <pthread.h>
-#include <time.h>
-
+#include <thread>
+#include <future>
#include <string>
#include <queue>
#include <list>
+#include <mutex>
+#include <condition_variable>
#include "log.h"
-#include "ptmutex.h"
/**
* A WorkQueue manages the synchronisation around a queue of work items,
@@ -39,85 +39,87 @@
* the client or worker sets an end condition on the queue. A second
* queue could conceivably be used for returning individual task
* status.
+ *
+ * The strange thread functions argument and return values
+ * comes from compatibility with an earlier pthread-based
+ * implementation.
*/
template <class T> class WorkQueue {
public:
/** Create a WorkQueue
* @param name for message printing
- * @param hi number of tasks on queue before clients blocks. Default 0
+ * @param hi number of tasks on queue before clients blocks. Default 0
* meaning no limit. hi == -1 means that the queue is disabled.
* @param lo minimum count of tasks before worker starts. Default 1.
*/
WorkQueue(const std::string& name, size_t hi = 0, size_t lo = 1)
- : m_name(name), m_high(hi), m_low(lo),
- m_workers_exited(0), m_clients_waiting(0), m_workers_waiting(0),
- m_tottasks(0), m_nowake(0), m_workersleeps(0), m_clientsleeps(0)
- {
- m_ok = (pthread_cond_init(&m_ccond, 0) == 0) &&
- (pthread_cond_init(&m_wcond, 0) == 0);
- }
-
- ~WorkQueue()
- {
- LOGDEB2("WorkQueue::~WorkQueue:" << (m_name) << "\n" );
- if (!m_worker_threads.empty())
+ : m_name(name), m_high(hi), m_low(lo), m_workers_exited(0),
+ m_ok(true), m_clients_waiting(0), m_workers_waiting(0),
+ m_tottasks(0), m_nowake(0), m_workersleeps(0), m_clientsleeps(0) {
+ }
+
+ ~WorkQueue() {
+ if (!m_worker_threads.empty()) {
setTerminateAndWait();
- }
-
- /** Start the worker threads.
+ }
+ }
+
+ /** Start the worker threads.
*
* @param nworkers number of threads copies to start.
* @param start_routine thread function. It should loop
- * taking (QueueWorker::take()) and executing tasks.
+ * taking (QueueWorker::take()) and executing tasks.
* @param arg initial parameter to thread function.
* @return true if ok.
*/
- bool start(int nworkers, void *(*start_routine)(void *), void *arg)
- {
- PTMutexLocker lock(m_mutex);
- for (int i = 0; i < nworkers; i++) {
- int err;
- pthread_t thr;
- if ((err = pthread_create(&thr, 0, start_routine, arg))) {
- LOGERR("WorkQueue:" << (m_name) << ": pthread_create failed, err " << (err) << "\n" );
+ bool start(int nworkers, void *(workproc)(void *), void *arg) {
+ std::unique_lock<std::mutex> lock(m_mutex);
+ for (int i = 0; i < nworkers; i++) {
+ std::packaged_task<void *(void *)> task(workproc);
+ Worker w;
+ w.res = task.get_future();
+ w.thr = std::thread(std::move(task), arg);
+ m_worker_threads.push_back(std::move(w));
+ }
+ return true;
+ }
+
+ /** Add item to work queue, called from client.
+ *
+ * Sleeps if there are already too many.
+ */
+ bool put(T t, bool flushprevious = false) {
+ std::unique_lock<std::mutex> lock(m_mutex);
+ if (!ok()) {
+ LOGERR("WorkQueue::put:" << m_name << ": !ok\n");
+ return false;
+ }
+
+ while (ok() && m_high > 0 && m_queue.size() >= m_high) {
+ m_clientsleeps++;
+ // Keep the order: we test ok() AFTER the sleep...
+ m_clients_waiting++;
+ m_ccond.wait(lock);
+ if (!ok()) {
+ m_clients_waiting--;
return false;
}
- m_worker_threads.push_back(thr);
- }
- return true;
- }
-
- /** Add item to work queue, called from client.
- *
- * Sleeps if there are already too many.
- */
- bool put(T t)
- {
- PTMutexLocker lock(m_mutex);
- if (!lock.ok() || !ok()) {
- LOGERR("WorkQueue::put:" << (m_name) << ": !ok or mutex_lock failed\n" );
- return false;
- }
-
- while (ok() && m_high > 0 && m_queue.size() >= m_high) {
- m_clientsleeps++;
- // Keep the order: we test ok() AFTER the sleep...
- m_clients_waiting++;
- if (pthread_cond_wait(&m_ccond, lock.getMutex()) || !ok()) {
- m_clients_waiting--;
- return false;
- }
- m_clients_waiting--;
+ m_clients_waiting--;
+ }
+ if (flushprevious) {
+ while (!m_queue.empty()) {
+ m_queue.pop();
+ }
}
m_queue.push(t);
- if (m_workers_waiting > 0) {
- // Just wake one worker, there is only one new task.
- pthread_cond_signal(&m_wcond);
- } else {
- m_nowake++;
- }
+ if (m_workers_waiting > 0) {
+ // Just wake one worker, there is only one new task.
+ m_wcond.notify_one();
+ } else {
+ m_nowake++;
+ }
return true;
}
@@ -133,127 +135,138 @@
* (which can control the task flow), else there could be
* tasks in the intermediate queues.
* To rephrase: there is no warranty on return that the queue is actually
- * idle EXCEPT if the caller knows that no jobs are still being created.
+ * idle EXCEPT if the caller knows that no jobs are still being created.
* It would be possible to transform this into a safe call if some kind
- * of suspend condition was set on the queue by waitIdle(), to be reset by
+ * of suspend condition was set on the queue by waitIdle(), to be reset by
* some kind of "resume" call. Not currently the case.
*/
- bool waitIdle()
- {
- PTMutexLocker lock(m_mutex);
- if (!lock.ok() || !ok()) {
- LOGERR("WorkQueue::waitIdle:" << (m_name) << ": not ok or can't lock\n" );
+ bool waitIdle() {
+ std::unique_lock<std::mutex> lock(m_mutex);
+ if (!ok()) {
+ LOGERR("WorkQueue::waitIdle:" << m_name << ": not ok\n");
return false;
}
// We're done when the queue is empty AND all workers are back
// waiting for a task.
- while (ok() && (m_queue.size() > 0 ||
+ while (ok() && (m_queue.size() > 0 ||
m_workers_waiting != m_worker_threads.size())) {
- m_clients_waiting++;
- if (pthread_cond_wait(&m_ccond, lock.getMutex())) {
- m_clients_waiting--;
- m_ok = false;
- LOGERR("WorkQueue::waitIdle:" << (m_name) << ": cond_wait failed\n" );
- return false;
- }
- m_clients_waiting--;
+ m_clients_waiting++;
+ m_ccond.wait(lock);
+ m_clients_waiting--;
}
return ok();
}
-
- /** Tell the workers to exit, and wait for them.
+ /** Tell the workers to exit, and wait for them.
*
* Does not bother about tasks possibly remaining on the queue, so
* should be called after waitIdle() for an orderly shutdown.
*/
- void* setTerminateAndWait()
- {
- PTMutexLocker lock(m_mutex);
- LOGDEB("setTerminateAndWait:" << (m_name) << "\n" );
-
- if (m_worker_threads.empty()) {
- // Already called ?
- return (void*)0;
- }
-
- // Wait for all worker threads to have called workerExit()
+ void *setTerminateAndWait() {
+ std::unique_lock<std::mutex> lock(m_mutex);
+ LOGDEB("setTerminateAndWait:" << m_name << "\n");
+
+ if (m_worker_threads.empty()) {
+ // Already called ?
+ return (void*)0;
+ }
+
+ // Wait for all worker threads to have called workerExit()
m_ok = false;
while (m_workers_exited < m_worker_threads.size()) {
- pthread_cond_broadcast(&m_wcond);
- m_clients_waiting++;
- if (pthread_cond_wait(&m_ccond, lock.getMutex())) {
- LOGERR("WorkQueue::setTerminate:" << (m_name) << ": cond_wait failed\n" );
- m_clients_waiting--;
- return (void*)0;
- }
- m_clients_waiting--;
- }
-
- LOGINFO("" << (m_name) << ": tasks " << (m_tottasks) << " nowakes " << (m_nowake) << " wsleeps " << (m_workersleeps) << " csleeps " << (m_clientsleeps) << "\n" );
- // Perform the thread joins and compute overall status
+ m_wcond.notify_all();
+ m_clients_waiting++;
+ m_ccond.wait(lock);
+ m_clients_waiting--;
+ }
+
+ LOGINFO("" << m_name << ": tasks " << m_tottasks << " nowakes " <<
+ m_nowake << " wsleeps " << m_workersleeps << " csleeps " <<
+ m_clientsleeps << "\n");
+ // Perform the thread joins and compute overall status
// Workers return (void*)1 if ok
void *statusall = (void*)1;
- std::list<pthread_t>::iterator it;
while (!m_worker_threads.empty()) {
- void *status;
- it = m_worker_threads.begin();
- pthread_join(*it, &status);
- if (status == (void *)0)
+ void *status = m_worker_threads.front().res.get();
+ m_worker_threads.front().thr.join();
+ if (status == (void *)0) {
statusall = status;
- m_worker_threads.erase(it);
- }
-
- // Reset to start state.
- m_workers_exited = m_clients_waiting = m_workers_waiting =
- m_tottasks = m_nowake = m_workersleeps = m_clientsleeps = 0;
+ }
+ m_worker_threads.pop_front();
+ }
+
+ // Reset to start state.
+ m_workers_exited = m_clients_waiting = m_workers_waiting =
+ m_tottasks = m_nowake = m_workersleeps = m_clientsleeps = 0;
m_ok = true;
- LOGDEB("setTerminateAndWait:" << (m_name) << " done\n" );
+ LOGDEB("setTerminateAndWait:" << m_name << " done\n");
return statusall;
}
/** Take task from queue. Called from worker.
- *
- * Sleeps if there are not enough. Signal if we go to sleep on empty
+ *
+ * Sleeps if there are not enough. Signal if we go to sleep on empty
* queue: client may be waiting for our going idle.
*/
- bool take(T* tp, size_t *szp = 0)
- {
- PTMutexLocker lock(m_mutex);
- if (!lock.ok() || !ok()) {
- LOGDEB("WorkQueue::take:" << (m_name) << ": not ok\n" );
+ bool take(T* tp, size_t *szp = 0) {
+ std::unique_lock<std::mutex> lock(m_mutex);
+ if (!ok()) {
+ LOGDEB("WorkQueue::take:" << m_name << ": not ok\n");
return false;
- }
+ }
while (ok() && m_queue.size() < m_low) {
- m_workersleeps++;
+ m_workersleeps++;
m_workers_waiting++;
- if (m_queue.empty())
- pthread_cond_broadcast(&m_ccond);
- if (pthread_cond_wait(&m_wcond, lock.getMutex()) || !ok()) {
- // !ok is a normal condition when shutting down
- if (ok())
- LOGERR("WorkQueue::take:" << (m_name) << ": cond_wait failed or !ok\n" );
+ if (m_queue.empty()) {
+ m_ccond.notify_all();
+ }
+ m_wcond.wait(lock);
+ if (!ok()) {
+ // !ok is a normal condition when shutting down
m_workers_waiting--;
return false;
}
m_workers_waiting--;
}
- m_tottasks++;
+ m_tottasks++;
*tp = m_queue.front();
- if (szp)
- *szp = m_queue.size();
+ if (szp) {
+ *szp = m_queue.size();
+ }
m_queue.pop();
- if (m_clients_waiting > 0) {
- // No reason to wake up more than one client thread
- pthread_cond_signal(&m_ccond);
- } else {
- m_nowake++;
- }
+ if (m_clients_waiting > 0) {
+ // No reason to wake up more than one client thread
+ m_ccond.notify_one();
+ } else {
+ m_nowake++;
+ }
+ return true;
+ }
+
+ bool waitminsz(size_t sz) {
+ std::unique_lock<std::mutex> lock(m_mutex);
+ if (!ok()) {
+ return false;
+ }
+
+ while (ok() && m_queue.size() < sz) {
+ m_workersleeps++;
+ m_workers_waiting++;
+ if (m_queue.empty()) {
+ m_ccond.notify_all();
+ }
+ m_wcond.wait(lock);
+ if (!ok()) {
+ m_workers_waiting--;
+ return false;
+ }
+ m_workers_waiting--;
+ }
return true;
}
@@ -265,58 +278,57 @@
* false by the shutdown code anyway). The thread must return/exit
* immediately after calling this.
*/
- void workerExit()
- {
- LOGDEB("workerExit:" << (m_name) << "\n" );
- PTMutexLocker lock(m_mutex);
+ void workerExit() {
+ LOGDEB("workerExit:" << m_name << "\n");
+ std::unique_lock<std::mutex> lock(m_mutex);
m_workers_exited++;
m_ok = false;
- pthread_cond_broadcast(&m_ccond);
- }
-
- size_t qsize()
- {
- PTMutexLocker lock(m_mutex);
- size_t sz = m_queue.size();
- return sz;
+ m_ccond.notify_all();
+ }
+
+ size_t qsize() {
+ std::unique_lock<std::mutex> lock(m_mutex);
+ return m_queue.size();
}
private:
- bool ok()
- {
- bool isok = m_ok && m_workers_exited == 0 && !m_worker_threads.empty();
- if (!isok) {
- LOGDEB("WorkQueue:ok:" << (m_name) << ": not ok m_ok " << (m_ok) << " m_workers_exited " << (m_workers_exited) << " m_worker_threads size " << (int(m_worker_threads.size())) << "\n" );
- }
+ bool ok() {
+ bool isok = m_ok && m_workers_exited == 0 && !m_worker_threads.empty();
+ if (!isok) {
+ LOGDEB("WorkQueue:ok:" << m_name << ": not ok m_ok " << m_ok <<
+ " m_workers_exited " << m_workers_exited <<
+ " m_worker_threads size " << m_worker_threads.size() <<
+ "\n");
+ }
return isok;
}
- long long nanodiff(const struct timespec& older,
- const struct timespec& newer)
- {
- return (newer.tv_sec - older.tv_sec) * 1000000000LL
- + newer.tv_nsec - older.tv_nsec;
- }
-
+ struct Worker {
+ std::thread thr;
+ std::future<void *> res;
+ };
+
// Configuration
std::string m_name;
size_t m_high;
- size_t m_low;
-
+ size_t m_low;
+
+ // Worker threads having called exit. Used to decide when we're done
+ unsigned int m_workers_exited;
// Status
- // Worker threads having called exit
- unsigned int m_workers_exited;
bool m_ok;
- // Per-thread data. The data is not used currently, this could be
- // a set<pthread_t>
- std::list<pthread_t> m_worker_threads;
-
+ // Our threads.
+ std::list<Worker> m_worker_threads;
+
+ // Jobs input queue
+ std::queue<T> m_queue;
+
// Synchronization
- std::queue<T> m_queue;
- pthread_cond_t m_ccond;
- pthread_cond_t m_wcond;
- PTMutexInit m_mutex;
+ std::condition_variable m_ccond;
+ std::condition_variable m_wcond;
+ std::mutex m_mutex;
+
// Client/Worker threads currently waiting for a job
unsigned int m_clients_waiting;
unsigned int m_workers_waiting;