Switch to unified view

a/src/utils/workqueue.h b/src/utils/workqueue.h
...
...
20
#include <pthread.h>
20
#include <pthread.h>
21
#include <time.h>
21
#include <time.h>
22
22
23
#include <string>
23
#include <string>
24
#include <queue>
24
#include <queue>
25
#include "unordered_defs.h"
25
#include <list>
26
using std::queue;
27
using std::string;
28
26
29
#include "debuglog.h"
27
#include "debuglog.h"
30
#include "ptmutex.h"
28
#include "ptmutex.h"
31
32
/// Store per-worker-thread data. Just an initialized timespec, and
33
/// used at the moment.
34
class WQTData {
35
    public:
36
    WQTData() {wstart.tv_sec = 0; wstart.tv_nsec = 0;}
37
    struct timespec wstart;
38
};
39
29
40
/**
30
/**
41
 * A WorkQueue manages the synchronisation around a queue of work items,
31
 * A WorkQueue manages the synchronisation around a queue of work items,
42
 * where a number of client threads queue tasks and a number of worker
32
 * where a number of client threads queue tasks and a number of worker
43
 * threads take and execute them. The goal is to introduce some level
33
 * threads take and execute them. The goal is to introduce some level
...
...
92
            if ((err = pthread_create(&thr, 0, start_routine, arg))) {
82
            if ((err = pthread_create(&thr, 0, start_routine, arg))) {
93
                LOGERR(("WorkQueue:%s: pthread_create failed, err %d\n",
83
                LOGERR(("WorkQueue:%s: pthread_create failed, err %d\n",
94
                        m_name.c_str(), err));
84
                        m_name.c_str(), err));
95
                return false;
85
                return false;
96
            }
86
            }
97
            m_worker_threads.insert(pair<pthread_t, WQTData>(thr, WQTData()));
87
            m_worker_threads.push_back(thr);
98
        }
88
        }
99
        return true;
89
        return true;
100
    }
90
    }
101
91
102
    /** Add item to work queue, called from client.
92
    /** Add item to work queue, called from client.
...
...
211
        m_name.c_str(), m_tottasks, m_nowake, m_workersleeps, 
201
        m_name.c_str(), m_tottasks, m_nowake, m_workersleeps, 
212
        m_clientsleeps));
202
        m_clientsleeps));
213
    // Perform the thread joins and compute overall status
203
    // Perform the thread joins and compute overall status
214
        // Workers return (void*)1 if ok
204
        // Workers return (void*)1 if ok
215
        void *statusall = (void*)1;
205
        void *statusall = (void*)1;
216
        STD_UNORDERED_MAP<pthread_t,  WQTData>::iterator it;
206
        std::list<pthread_t>::iterator it;
217
        while (!m_worker_threads.empty()) {
207
        while (!m_worker_threads.empty()) {
218
            void *status;
208
            void *status;
219
            it = m_worker_threads.begin();
209
            it = m_worker_threads.begin();
220
            pthread_join(it->first, &status);
210
            pthread_join(*it, &status);
221
            if (status == (void *)0)
211
            if (status == (void *)0)
222
                statusall = status;
212
                statusall = status;
223
            m_worker_threads.erase(it);
213
            m_worker_threads.erase(it);
224
        }
214
        }
225
215
...
...
328
    unsigned int m_workers_exited;
318
    unsigned int m_workers_exited;
329
    bool m_ok;
319
    bool m_ok;
330
320
331
    // Per-thread data. The data is not used currently, this could be
321
    // Per-thread data. The data is not used currently, this could be
332
    // a set<pthread_t>
322
    // a set<pthread_t>
333
    STD_UNORDERED_MAP<pthread_t, WQTData> m_worker_threads;
323
    std::list<pthread_t> m_worker_threads;
334
324
335
    // Synchronization
325
    // Synchronization
336
    queue<T> m_queue;
326
    queue<T> m_queue;
337
    pthread_cond_t m_ccond;
327
    pthread_cond_t m_ccond;
338
    pthread_cond_t m_wcond;
328
    pthread_cond_t m_wcond;