-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathconcurrent_queue.h
More file actions
116 lines (103 loc) · 2.59 KB
/
concurrent_queue.h
File metadata and controls
116 lines (103 loc) · 2.59 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
#ifndef __BLOCKING_QUEUE_H__
#define __BLOCKING_QUEUE_H__
#include <queue>
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition_variable.hpp>
template<typename Data>
class ConcurrentQueue
{
private:
std::queue<Data> mQueue;
size_t mMaxSize;
bool mIsReleased;
mutable boost::mutex mMutex;
boost::condition_variable mConsumerNotifier;
boost::condition_variable mProducerNotifier;
public:
/* Sets max_size = 0 for unbounded queue */
ConcurrentQueue(size_t max_size=100)
{
mMaxSize = max_size;
mIsReleased = false;
}
/* Release all threads waiting without any data guarantee.
* Should only be used when the queue is no longer used */
void release()
{
boost::mutex::scoped_lock lock(mMutex);
mIsReleased = true;
lock.unlock();
mConsumerNotifier.notify_all();
mProducerNotifier.notify_all();
}
bool tryPush(Data const& data)
{
boost::mutex::scoped_lock lock(mMutex);
if (mMaxSize == 0 || mQueue.size() < mMaxSize)
{
mQueue.push(data);
lock.unlock();
mConsumerNotifier.notify_one();
return true;
}
lock.unlock();
return false;
}
int waitAndPush(Data const& data)
{
boost::mutex::scoped_lock lock(mMutex);
while (!mIsReleased && mMaxSize > 0 && mQueue.size() >= mMaxSize)
{
mProducerNotifier.wait(lock);
}
if (mIsReleased)
{
return -1;
}
mQueue.push(data);
lock.unlock();
mConsumerNotifier.notify_one();
return 0;
}
bool empty() const
{
boost::mutex::scoped_lock lock(mMutex);
return mQueue.empty();
}
size_t size() const
{
boost::mutex::scoped_lock lock(mMutex);
return mQueue.size();
}
bool tryPop(Data& popped_value)
{
boost::mutex::scoped_lock lock(mMutex);
if (mQueue.empty())
{
return false;
}
popped_value = mQueue.front();
mQueue.pop();
lock.unlock();
mProducerNotifier.notify_one();
return true;
}
int waitAndPop(Data& popped_value)
{
boost::mutex::scoped_lock lock(mMutex);
while(!mIsReleased && mQueue.empty())
{
mConsumerNotifier.wait(lock);
}
if (mIsReleased)
{
return -1;
}
popped_value = mQueue.front();
lock.unlock();
mProducerNotifier.notify_one();
mQueue.pop();
return 0;
}
};
#endif