[转载]C++无锁队列

2019/10 27 20:10

原文链接: http://chenshungen.cn/lock_free_queue/

关于无锁队列的说明请参考陈皓的博客上的一篇关于无锁队列的实现的理论说明(coolshell)分享使用环形数组实现的无锁队列的c++实现代码。单生产者-单消费者

#ifndef WATER_BASE_LOCK_FREE_CIRCULAR_QUEUE_SS_HPP
#define WATER_BASE_LOCK_FREE_CIRCULAR_QUEUE_SS_HPP
#include "class_helper.h"
#include <vector>
#include <atomic>
namespace water{
namespace componet{
template <typename T>
class LockFreeCircularQueueSPSC final //不可作为基类
{
    struct Cell
    {
        enum class Status : uint_fast32_t 
        {
            empty, 
            full, 
        };
        Cell()
        : status(Status::empty), t()
        {
        }
        std::atomic<Status> status;
        T t;
    };
public:
    TYPEDEF_PTR(LockFreeCircularQueueSPSC);
    explicit LockFreeCircularQueueSPSC(uint32_t powArg = 10) //队列长度为 2^powArg
    : m_begin(0), m_end(0), m_maxSize(1u << (powArg < 24 ? powArg : 24)), m_data(m_maxSize)
    {
    }
    ~LockFreeCircularQueueSPSC() = default;
    //noncopyable
    LockFreeCircularQueueSPSC(const LockFreeCircularQueueSPSC&) = delete;
    LockFreeCircularQueueSPSC& operator=(const LockFreeCircularQueueSPSC&) = delete;
    bool isLockFree() const
    {
        return m_data[0].status.is_lock_free();
    }
    bool push(const T& item)
    {
        const uint_fast32_t pos = realPos(m_end);
        int i = 0;
        while(true)
        {
            if(++i == 3) //最多尝试3次数依然队列满,返回失败,无法放入
                return false;
            //尾部为空,即队列非满,可以push
            if(m_data[pos].status.load(std::memory_order_acquire) == Cell::Status::empty)
                break;
        };
        //队尾后移
        m_end++;
        //向节点填入数据
        m_data[pos].t = item;
        //标记为已放入并将提交内存修改,令所有线程可见
        m_data[pos].status.store(Cell::Status::full, std::memory_order_release);
        return true;
    }
    bool pop(T* t)
    {
        const uint_fast32_t pos = realPos(m_begin);
        int i = 0;
        while(true)
        {
            if(i++ == 3) //最多尝试3次数依然队列空,返回失败,无法取出
                return false;
             //pos(头部)位置非空,即队列非空,可以pop
            if(m_data[pos].status.load(std::memory_order_acquire) == Cell::Status::full)
                break;
        }
        //队首后移
        m_begin++;
        //取出节点内的数据
        *t = m_data[pos].t;
        m_data[pos].t = T();
        //标记为已取出并将提交内存修改,令所有线程可见
        m_data[pos].status.store(Cell::Status::empty, std::memory_order_release);
        return true;
    }
    bool empty() const
    {
        return m_data[realPos(m_begin)].status.load(std::memory_order_relaxed) == Cell::Status::empty;
    }
    bool full() const
    {
        return m_data[realPos(m_end)].status.load(std::memory_order_relaxed) == Cell::Status::full;
    }
    uint64_t maxSize() const
    {
        return static_cast<uint64_t>(m_data.size());
    }
private: 
    uint64_t realPos(uint64_t pos) const
    {
        return pos & (maxSize() - 1);
    }
private: 
    uint_fast32_t m_begin;
    uint_fast32_t m_end;
    const uint_fast32_t m_maxSize;
    std::vector<Cell> m_data;
};
}}
#endif //#ifndef WATER_BASE_CIRCULAR_QUEUE_HPP

多生产者多消费者

#ifndef WATER_BASE_M_LOCK_FREE_CIRCULAR_QUEUE_MM_HPP
#define WATER_BASE_M_LOCK_FREE_CIRCULAR_QUEUE_MM_HPP
 
#include <vector>
#include <atomic>
 
#include <iostream>
using std::cerr;
using std::endl;
 
namespace water{
namespace componet{
 
template <typename T>
class LockFreeCircularQueueMPMC final //不可作为基类
{
    struct Cell
    {
        enum class Status : uint8_t
        {
            writing,
            reading,
            empty,
            full,
        };
 
        Cell()
        : status(Status::empty) ,t()
        {
        }
 
        std::atomic<Status> status;
        T t;
    };
 
public:
    explicit LockFreeCircularQueueMPMC(uint64_t powArg = 16)
    : m_begin(0), m_end(0), m_maxSize(1u << (powArg < 24 ? powArg : 24)), m_data(m_maxSize)
    {
    }
    ~LockFreeCircularQueueMPMC() = default;
 
    //noncopyable
    LockFreeCircularQueueMPMC(const LockFreeCircularQueueMPMC&) = delete;
    LockFreeCircularQueueMPMC& operator=(const LockFreeCircularQueueMPMC&) = delete;
 
    bool isLockFree() const
    {
        return m_data[0].status.is_lock_free();
    }
 
    bool push(const T& item)
    {
        int i = 0;
        while(true) //尝试锁定队尾为可写
        {
            if(i++ == 5) //最大尝试次数
                return false;
 
            uint32_t oldEnd = m_end.load(std::memory_order_acquire);//队尾
            uint32_t index = realIndex(oldEnd);
 
            typename Cell::Status oldStatus = m_data[index].status.load(std::memory_order_acquire);
            if(oldStatus != Cell::Status::empty)
                continue;
 
            if(!m_data[index].status.compare_exchange_weak(oldStatus, Cell::Status::writing))
                continue;
 
            //队尾后移
            if(!m_end.compare_exchange_weak(oldEnd, oldEnd + 1))
            {
                m_data[index].status.store(Cell::Status::empty, std::memory_order_relaxed);
                continue;
            }
 
            //数据放入队列
            m_data[index].t = item;
            m_data[index].status.store(Cell::Status::full, std::memory_order_release);
            break;
        }
        return true;
    }
 
    bool pop(T* t)
    {
        int i = 1;
        while(true) //尝试锁定队首为可读
        {
            if(i++ == 5) //最大尝试次数
                return false;
 
            uint32_t oldBegin = m_begin.load(std::memory_order_acquire);//队首
            uint_fast32_t index = realIndex(oldBegin);
 
            typename Cell::Status oldStatus = m_data[index].status.load(std::memory_order_acquire);
            if(oldStatus != Cell::Status::full)
                continue;
 
            if(!m_data[index].status.compare_exchange_weak(oldStatus, Cell::Status::reading))
                continue;
 
            //队首后移
            if(!m_begin.compare_exchange_weak(oldBegin, oldBegin + 1))
            {
                m_data[index].status.store(Cell::Status::full, std::memory_order_relaxed);
                continue;
            }
 
            //取出节结点内的数据
            *t = m_data[index].t;
            m_data[index].t = T();
            m_data[index].status.store(Cell::Status::empty, std::memory_order_release);
            break;
        }
        return true;
    }
 
    bool empty() const
    {
        return m_data[realIndex(m_begin.load(std::memory_order_relaxed))].status.load(std::memory_order_relaxed) == Cell::Status::empty;
    }
 
    bool full() const
    {
        return m_data[realIndex(m_end.load(std::memory_order_relaxed))].status.load(std::memory_order_relaxed) == Cell::Status::full;
    }
 
    inline uint32_t maxSize() const
    {
        return m_maxSize;
    }
 
private:
    inline uint32_t realIndex(uint64_t index) const
    {
        return index & (maxSize() - 1);
    }
 
private:
    std::atomic<uint32_t> m_begin;
    std::atomic<uint32_t> m_end;
    const uint_fast32_t m_maxSize;
    std::vector<Cell> m_data;
};
 
}}
 
#endif //#ifndef WATER_BASE_CIRCULAR_QUEUE_HPP