线程安全队列的实现

  • 线程安全队列的实现

队列在单线程的运行环境中,我们常用的接口,包括empty,pop,front等等,在多线程的时候,这些接口之间其实存在固有的竞争关系。比如线程A调用front拿到了队列头数据,下一秒它准备pop的时候,或许线程B已经抢先一步pop了队列。还有线程A检查了队列此刻不为空,或许下一秒它准备取数据的时候,线程B已经把队列取空了。因此,按照常用的接口来设计线程安全队列是不可靠的。

在线程安全队列的实现,我们采用C++11的条件变量来实现堵塞模式。实现代码如下:

//created by mayusheng
template<typename T>
class threadsafe_queue{
public:
threadsafe_queue();
threadsafe_queue(const threadsafe_queue &);
threadsafe_queue &operator=(const threadsafe_queue &) = delete;
void push(T new_value);
bool try_pop(T & value);
std::shared_ptr<T> try_pop();
void wait_and_pop(T & value);
std::shared_ptr<T> wait_and_pop();
bool empty() const;
private:
mutable std::mutex mut;
std::queue<T> data_queue;
std::condition_variable data_cond;
};
template <typename T>
void threadsafe_queue<T>::push(T new_value) {
std::lock_guard<std::mutex> lk(mut);
data_queue.push(new_value);
data_cond.notify_one();
}
template <typename T>
void threadsafe_queue<T>::wait_and_pop(T &value) {
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk,[this]{
return !this->data_queue.empty();
});
value=data_queue.front();
data_queue.pop();
}
template <typename T>
bool threadsafe_queue<T>::try_pop(T &value) {
std::lock_guard<std::mutex> lk(mut);
if(!data_queue.empty())
return false;
else
{
value=data_queue.front();
data_queue.pop();
return true;
}
}
template <typename T>
std::shared_ptr<T> threadsafe_queue<T>::wait_and_pop() {
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk,[this]{
return !this->data_queue.empty();
});
std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
data_queue.pop();
return res;
}
template <typename T>
std::shared_ptr<T> threadsafe_queue<T>::try_pop() {
std::lock_guard<std::mutex> lk(mut);
if(!data_queue.empty())
return std::shared_ptr<T>();
std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
data_queue.pop();
return res;
}
template <typename T>
bool threadsafe_queue<T>::empty() const {
std::lock_guard<std::mutex> lk(mut);
return data_queue.empty();
}
template <typename T>
threadsafe_queue<T>::threadsafe_queue() {
}
template <typename T>
threadsafe_queue<T>::threadsafe_queue(const threadsafe_queue &other) {
std::lock_guard<std::mutex> lk(other.mut);
data_queue=other.data_queue;
}

条件变量 data_cond.notify_one(),可以尝试唤醒等待进程队列中的进程,从而进行队列中的数据处理。

data_cond.wait(lk,[this]{
return !this->data_queue.empty();
});

以上代码,如果队列为空,那么执行到这里的进程会进入等待状态,直到notify_one()提供一个提醒,那么才会继续检查队列,直到不为空,才会继续下面的运行,在检查的过程中,如果队列为空,会释放手里的互斥量,如果不为空,会继续锁住,进行下面的操作。这里对队列的检查函数写在lamdba表达式中,其实这里,会有多次伪唤醒,具体次数由平台决定,在ubuntu上测试,会有一次伪唤醒,代码第一次执行到此处,会主动检查队列。