/* Copyright (C) 2016 J.F.Dockes
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the
* Free Software Foundation, Inc.,
* 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*/
#ifndef _BUFFERXCHANGE_H_INCLUDED_
#define _BUFFERXCHANGE_H_INCLUDED_
#include <thread>
#include <string>
#include <queue>
#include <list>
#include <mutex>
#include <condition_variable>
#include "log.h"
/**
* A BufferXChange is a synchronized 2 way meeting point for 2 threads
* exchanging objects (or 1 thread sending objects to another).
*
* Example: HTTP proxy, one thread reads data buffers, the other one
* serves them.
* Once the objects are used by the consumer, they can be queued back
* to avoid memory allocations, or just destroyed.
*
* T has to be a pointer type.
* BufXChange will delete excess recycled objects, and any remaining on the
* queues at delete time.
*/
template <class T> class BufXChange {
public:
/** Create a BufXChange. Terminology: client is upstream, worker downstream
* @param name for message printing
* @param hi number of tasks on queue before upstream blocks. Default 0
* meaning no limit.
*/
BufXChange(const std::string& name, size_t hi = 0)
: m_name(name), m_high(hi), m_maxrecycled(4),
m_ok(true), m_clients_waiting(0), m_workers_waiting(0) {
}
~BufXChange() {
std::unique_lock<std::mutex> lock(m_mutex);
while (m_queue.size()) {
T t = m_queue.front();
m_queue.pop_front();
delete t;
}
while (m_rqueue.size()) {
T t = m_rqueue.front();
m_rqueue.pop();
delete t;
}
}
/** Add item to work queue, called from client.
*
* Sleeps if there are already too many.
*/
bool put(T t, bool flushprevious = false) {
std::unique_lock<std::mutex> lock(m_mutex);
while (ok() && m_high > 0 && m_queue.size() >= m_high) {
m_clients_waiting++;
m_ccond.wait(lock);
m_clients_waiting--;
}
if (!ok()) {
LOGERR("BufXChange::put:" << m_name << ": !ok\n");
return false;
}
if (flushprevious) {
while (!m_queue.empty()) {
T t = m_queue.front();
m_queue.pop_front();
delete t;
}
}
m_queue.push_back(t);
if (m_workers_waiting > 0) {
// Just wake one worker, there is only one new task.
m_wcond.notify_one();
}
return true;
}
// Called from worker to put a (probably partially processed)
// buffer at the front of the queue.
bool untake(T t) {
std::unique_lock<std::mutex> lock(m_mutex);
if (!ok()) {
return false;
}
m_queue.push_front(t);
return true;
}
// Take back buffer. This does not wait, it is assumed that the
// client will allocate another object if none are found on the
// back queue
bool take_recycled(T* tp) {
std::unique_lock<std::mutex> lock(m_mutex);
if (!ok()) {
return false;
}
if (m_rqueue.empty()) {
*tp = nullptr;
return false;
} else {
*tp = m_rqueue.front();
m_rqueue.pop();
return true;
}
}
/** Wait until the queue is inactive. Called from client. */
bool waitIdle() {
std::unique_lock<std::mutex> lock(m_mutex);
if (!ok()) {
// This may happen if setTerminate was called by the
// worker before we're awaken
LOGDEB("BufXChange::waitIdle:" << m_name << ": not ok\n");
return false;
}
// We're done when the queue is empty. Unlike workqueue, we
// don't manage the clients, so we can't check that they are
// idle.
while (ok() && (m_queue.size() > 0)) {
m_clients_waiting++;
m_ccond.wait(lock);
m_clients_waiting--;
}
return ok();
}
/** Tell the workers we're done here.
*
* Does not bother about tasks possibly remaining on the queue, so
* should be called after waitIdle() for an orderly shutdown.
*/
void setTerminate() {
std::unique_lock<std::mutex> lock(m_mutex);
m_ok = false;
m_ccond.notify_all();
m_wcond.notify_all();
}
/** Take task from queue. Called from worker.
*
* Sleeps if there are not enough. Signal if we go to sleep on empty
* queue: client may be waiting for our going idle.
*/
bool take(T* tp, size_t *szp = 0) {
std::unique_lock<std::mutex> lock(m_mutex);
while (ok() && m_queue.size() < 1) {
if (m_queue.empty()) {
m_ccond.notify_all();
}
m_workers_waiting++;
m_wcond.wait(lock);
m_workers_waiting--;
}
if (!ok()) {
return false;
}
*tp = m_queue.front();
if (szp) {
*szp = m_queue.size();
}
m_queue.pop_front();
if (m_clients_waiting > 0) {
// No reason to wake up more than one client thread
m_ccond.notify_one();
}
return true;
}
bool recycle(T t) {
std::unique_lock<std::mutex> lock(m_mutex);
return recycle_nolock(t);
}
bool recycle_nolock(T t) {
while (m_rqueue.size() >= m_maxrecycled) {
T tt = m_rqueue.front();
m_rqueue.pop();
delete tt;
}
m_rqueue.push(t);
return true;
}
bool waitminsz(size_t sz) {
std::unique_lock<std::mutex> lock(m_mutex);
if (!ok()) {
return false;
}
while (ok() && m_queue.size() < sz) {
if (m_queue.empty()) {
m_ccond.notify_all();
}
m_workers_waiting++;
m_wcond.wait(lock);
m_workers_waiting--;
if (!ok()) {
return false;
}
}
return true;
}
/** Advertise exit and abort queue. Called from worker
*
* This would happen after an unrecoverable error, or when
* the queue is terminated by the client.
*/
void workerExit() {
LOGDEB("workerExit:" << m_name << "\n");
setTerminate();
}
size_t qsize() {
std::unique_lock<std::mutex> lock(m_mutex);
return m_queue.size();
}
void visitq(std::function<void(const T&)>v) {
std::unique_lock<std::mutex> lock(m_mutex);
for (const auto& it : m_queue) {
v(it);
}
}
void reset() {
// Reset to start state.
std::unique_lock<std::mutex> lock(m_mutex);
while (m_queue.size()) {
T t = m_queue.front();
m_queue.pop_front();
recycle_nolock(t);
}
m_clients_waiting = m_workers_waiting = 0;
m_ok = true;
m_ccond.notify_all();
m_wcond.notify_all();
}
const std::string& getname() const {return m_name;}
private:
bool ok() {
return m_ok;
}
// Configuration
std::string m_name;
size_t m_high;
size_t m_maxrecycled;
// Status
bool m_ok;
// Client/Worker threads currently waiting for a job
unsigned int m_clients_waiting;
unsigned int m_workers_waiting;
// Jobs input queue.
std::deque<T> m_queue;
// Queue for recycling the objects
std::queue<T> m_rqueue;
// Synchronization
std::condition_variable m_ccond;
std::condition_variable m_wcond;
std::mutex m_mutex;
};
#endif /* _BUFFERXCHANGE_H_INCLUDED_ */