Switch to unified view

a/libupnpp/workqueue.hxx b/libupnpp/workqueue.hxx
...
...
32
#include "ptmutex.hxx"
32
#include "ptmutex.hxx"
33
33
34
/// Store per-worker-thread data. Just an initialized timespec, and
34
/// Store per-worker-thread data. Just an initialized timespec, and
35
/// used at the moment.
35
/// used at the moment.
36
class WQTData {
36
class WQTData {
37
  public:
37
public:
38
    WQTData() {wstart.tv_sec = 0; wstart.tv_nsec = 0;}
38
    WQTData() {wstart.tv_sec = 0; wstart.tv_nsec = 0;}
39
    struct timespec wstart;
39
    struct timespec wstart;
40
};
40
};
41
41
42
/**
42
/**
...
...
65
        : m_name(name), m_high(hi), m_low(lo),
65
        : m_name(name), m_high(hi), m_low(lo),
66
          m_workers_exited(0), m_clients_waiting(0), m_workers_waiting(0),
66
          m_workers_exited(0), m_clients_waiting(0), m_workers_waiting(0),
67
          m_tottasks(0), m_nowake(0), m_workersleeps(0), m_clientsleeps(0)
67
          m_tottasks(0), m_nowake(0), m_workersleeps(0), m_clientsleeps(0)
68
    {
68
    {
69
        m_ok = (pthread_cond_init(&m_ccond, 0) == 0) &&
69
        m_ok = (pthread_cond_init(&m_ccond, 0) == 0) &&
70
        (pthread_cond_init(&m_wcond, 0) == 0);
70
         (pthread_cond_init(&m_wcond, 0) == 0);
71
    }
71
    }
72
72
73
    ~WorkQueue()
73
    ~WorkQueue()
74
    {
74
    {
75
        if (!m_worker_threads.empty())
75
        if (!m_worker_threads.empty())
...
...
100
100
101
    /** Add item to work queue, called from client.
101
    /** Add item to work queue, called from client.
102
     *
102
     *
103
     * Sleeps if there are already too many.
103
     * Sleeps if there are already too many.
104
     */
104
     */
105
  bool put(T t)
105
  bool put(T t, bool flushprevious = false)
106
    {
106
    {
107
        PTMutexLocker lock(m_mutex);
107
        PTMutexLocker lock(m_mutex);
108
        if (!lock.ok() || !ok()) {
108
        if (!lock.ok() || !ok()) {
109
            return false;
109
            return false;
110
        }
110
        }
111
111
112
        while (ok() && m_high > 0 && m_queue.size() >= m_high) {
112
        while (ok() && m_high > 0 && m_queue.size() >= m_high) {
113
        m_clientsleeps++;
113
         m_clientsleeps++;
114
            // Keep the order: we test ok() AFTER the sleep...
114
            // Keep the order: we test ok() AFTER the sleep...
115
        m_clients_waiting++;
115
         m_clients_waiting++;
116
            if (pthread_cond_wait(&m_ccond, lock.getMutex()) || !ok()) {
116
            if (pthread_cond_wait(&m_ccond, lock.getMutex()) || !ok()) {
117
        m_clients_waiting--;
117
             m_clients_waiting--;
118
                return false;
118
                return false;
119
            }
119
            }
120
        m_clients_waiting--;
120
         m_clients_waiting--;
121
        }
121
        }
122
122
      if (flushprevious) {
123
          while (!m_queue.empty())
124
              m_queue.pop();
125
      }
123
        m_queue.push(t);
126
        m_queue.push(t);
124
        if (m_workers_waiting > 0) {
127
        if (m_workers_waiting > 0) {
125
            // Just wake one worker, there is only one new task.
128
            // Just wake one worker, there is only one new task.
126
            pthread_cond_signal(&m_wcond);
129
            pthread_cond_signal(&m_wcond);
127
        } else {
130
        } else {
...
...
322
    unsigned int m_workersleeps;
325
    unsigned int m_workersleeps;
323
    unsigned int m_clientsleeps;
326
    unsigned int m_clientsleeps;
324
};
327
};
325
328
326
#endif /* _WORKQUEUE_H_INCLUDED_ */
329
#endif /* _WORKQUEUE_H_INCLUDED_ */
327
/* Local Variables: */
328
/* mode: c++ */
329
/* c-basic-offset: 4 */
330
/* tab-width: 4 */
331
/* indent-tabs-mode: t */
332
/* End: */