6.1 并发设计的内涵

6.2 基于锁的并发数据结构

基于锁的栈

版本1

#include <iostream>
#include <stack>
#include <algorithm>
#include <thread>
#include <future>
#include <exception>

// 版本1  begin
struct empty_stack : std::exception
{
    const char *what() const throw()
    {
        return "empty stack!";
    }
};
template <typename T>
class threadsafe_stack
{
private:
    std::stack<T> data;
    mutable std::mutex m;

public:
    threadsafe_stack() {}
    threadsafe_stack(const threadsafe_stack &other)
    {
        std::lock_guard<std::mutex> lock(other.m);
        data = other.data;
    }
    threadsafe_stack &operator=(const threadsafe_stack &) = delete;

    void push(T new_value)
    {
        std::lock_guard<std::mutex> lock(m);
        data.push(std::move(new_value));
    }

    std::shared_ptr<T> pop()
    {
        std::lock_guard<std::mutex> lock(m);
        if (data.empty())
            throw empty_stack();
        std::shared_ptr<T> const res(std::make_shared<T>(std::move(data.top())));
        // std::shared_ptr<T> const res(data.top()); // 错误写法  因为data.top()返回的是对象的引用
        data.pop();
        return res;
    }
    void pop(T &value)
    {
        std::lock_guard<std::mutex> lock(m);
        if (data.empty())
            throw empty_stack();
        value = std::move(data.top());
        data.pop();
    }
    bool empty() const
    {
        std::lock_guard<std::mutex> lock(m);
        return data.empty();
    }
};

// 版本2 end

int main()
{
    threadsafe_stack<int> ts_stack;
    ts_stack.pop();
    return 0;
}
问题1

栈中执行用户自定义的操作可能会面临死锁。
在这里插入图片描述在这里插入图片描述
线程安全栈的锁,只能保护 “栈本身的操作”;但如果用户写的类型T,其构造 / 赋值函数里又操作了同一个栈,就会在 “持锁时再次请求锁”,导致死锁。这个风险无法由栈的实现者解决,只能靠用户规范自己的代码。

队列

基于锁和条件变量的队列

#include <iostream>
#include <stack>
#include <algorithm>
#include <thread>
#include <future>
#include <exception>

// 版本1  begin  安全的栈
// struct empty_stack : std::exception
// {
//     const char *what() const throw()
//     {
//         return "empty stack!";
//     }
// };
// template <typename T>
// class threadsafe_stack
// {
// private:
//     std::stack<T> data;
//     mutable std::mutex m;

// public:
//     threadsafe_stack() {}
//     threadsafe_stack(const threadsafe_stack &other)
//     {
//         std::lock_guard<std::mutex> lock(other.m);
//         data = other.data;
//     }
//     threadsafe_stack &operator=(const threadsafe_stack &) = delete;

//     void push(T new_value)
//     {
//         std::lock_guard<std::mutex> lock(m);
//         data.push(std::move(new_value));
//     }

//     std::shared_ptr<T> pop()
//     {
//         std::lock_guard<std::mutex> lock(m);
//         if (data.empty())
//             throw empty_stack();
//         std::shared_ptr<T> const res(std::make_shared<T>(std::move(data.top())));
//         // std::shared_ptr<T> const res(data.top()); // 错误写法  因为data.top()返回的是对象的引用
//         data.pop();
//         return res;
//     }
//     void pop(T &value)
//     {
//         std::lock_guard<std::mutex> lock(m);
//         if (data.empty())
//             throw empty_stack();
//         value = std::move(data.top());
//         data.pop();
//     }
//     bool empty() const
//     {
//         std::lock_guard<std::mutex> lock(m);
//         return data.empty();
//     }
// };

// 版本2 end

template <typename T>
class threadsafe_queue
{
private:
    mutable std::mutex mut;
    std::queue<T> data_queue;
    std::condition_variable data_cond;
public:
    threadsafe_queue() {}
    void push(T new_value)
    {
        std::lock_guard<std::mutex> lk(mut);
        data_queue.push(std::move(new_value));
        data_cond.notify_one();
    }
    void wait_and_pop(T &value)
    {
        std::unique_lock<std::mutex> lk(mut);
        data_cond.wait(lk, [this]
                       { return !data_queue.empty(); });
        value = std::move(data_queue.front());
        data_queue.pop();
    }
    std::shared_ptr<T> wait_and_pop()
    {
        std::unique_lock<std::mutex> lk(mut);
        data_cond.wait(lk, [this]
                       { return !data_queue.empty(); });
        std::shared_ptr<T> res(std::make_shared<T>(std::move(data_queue.front())));
        data_queue.pop();
        return res;
    }
    bool try_pop(T &value)
    {
        std::lock_guard<std::mutex> lk(mut);
        if (data_queue.empty())
            return false;
        value = std::move(data_queue.front());
        data_queue.pop();
        return true;
    }
    std::shared_ptr<T> try_pop()
    {
        std::lock_guard<std::mutex> lk(mut);
        if (data_queue.empty())
            return std::shared_ptr<T>();
        std::shared_ptr<T> res(
            std::make_shared<T>(std::move(data_queue.front())));
        data_queue.pop();
        return res;
    }
    bool empty() const
    {
        std::lock_guard<std::mutex> lk(mut);
        return data_queue.empty();
    }
};

int main()
{
    return 0;
}
问题1 wait_and_pop()时抛出异常

然而,若该觉醒的线程在执行wait_and_pop()时抛出异常(譬如新指针std::shared_ptr<>在构建时就有可能产生异常④)​,就不会有任何其他线程被唤醒。
第一种处理方式是,如果我们不能接受这种行为方式,则将data_cond.notify_one()改为data_cond.notify_all(),这轻而易举。这样就会唤醒全体线程,但要大大增加开销:它们绝大多数还是会发现队列依然为空[2],只好重新休眠。
第二种处理方式是,倘若有异常抛出,则在wait_and_pop()中再次调用notify_one(),从而再唤醒另一线程,让它去获取存储的值。
第三种处理方式是,将std::shared_ptr<>的初始化语句移动到push()的调用处,令队列容器改为存储std::shared_ptr<>,而不再直接存储数据的值。从内部std::queue<>复制std::shared_ptr<>实例的操作不会抛出异常,所以wait_and_pop()也是异常安全的

为什么使用共享智能指针不会抛出异常,因为赋值操作对应共享智能指针而要是将引用计数加1。std::shared_ptr 的“拷贝”是轻量、原子、无分配、noexcept 的;抛异常只可能发生在“创建 shared_ptr”那一刻,而不是“复制 shared_ptr”。

基于方法三的改进

template<typename T>
class threadsafe_queue
{
private:
    mutable std::mutex mut;
    std::queue<std::shared_ptr<T>> data_queue;
    std::condition_variable data_cond;
public:
    threadsafe_queue()
    {}
    void wait_and_pop(T& value)
    {
        std::unique_lock<std::mutex> lk(mut);
        data_cond.wait(lk,[this]{return !data_queue.empty();});
        value=std::move(*data_queue.front());---  ①
        data_queue.pop();
    }
    bool try_pop(T& value)
    {
        std::lock_guard<std::mutex> lk(mut);
        if(data_queue.empty())
            return false;
        value=std::move(*data_queue.front());---  ②
        data_queue.pop();
        return true;
    }
    std::shared_ptr<T> wait_and_pop()
    {
        std::unique_lock<std::mutex> lk(mut);
        data_cond.wait(lk,[this]{return !data_queue.empty();});
        std::shared_ptr<T> res=data_queue.front();---  ③
        data_queue.pop();
        return res;
    }
    std::shared_ptr<T> try_pop()
    {
        std::lock_guard<std::mutex> lk(mut);
        if(data_queue.empty())
            return std::shared_ptr<T>();
        std::shared_ptr<T> res=data_queue.front();---  ④
        data_queue.pop();
        return res;
    }
    void push(T new_value)
    {
        std::shared_ptr<T> data(
            std::make_shared<T>(std::move(new_value)));---  ⑤
        std::lock_guard<std::mutex> lk(mut);
        data_queue.push(data);
        data_cond.notify_one();
    }
    bool empty() const
    {
        std::lock_guard<std::mutex> lk(mut);
        return data_queue.empty();
    }
};

基于精细粒度的锁和条件变量的队列

在这里插入图片描述

template <typename T>
class queue
{
private:
    struct node
    {
        T data;
        sdt::unique_ptr<node> next;
        node(T data_) : data(std::move(data)) {}
    };
    std::unique_ptr<node> head;
    node *tail;

public:
    queue() : tail(nullptr) {}
    queue(const queue &other) = delete;
    queue &operator=(const queue &other) = delete;

    std::shared_ptr<T> try_pop()
    {
        if (!head) // 当指针为空
        {
            return std::shared_ptr<T>();
        }
        std::shared_ptr<T> const res(std::make_shared<T>(std::move(head->data)));
        std::unique_ptr<node> const old_head = std::move(head); // 这一步可以省略吗? 可以
        head = sdt::move(old_head->next);
        if (!head) // 当取出元素之后,头指针为空
            tail = nullptr;
        return res; // 旧的头结点old_head在这里被销毁
    }

    void push(T new_value)
    {
        std::unique_ptr<node> p(new node(std::move(new_value)));
        node *const new_tail = p.get();
        if (tail)
        {
            tail->next = std::move(p);
        }
        else // 队列为空
        {
            head = std::move(p);
        }
        tail = new_tail;
    }
};
  • 对于上面代码的疑问
    为什么使用unique_ptr指针,
    在这里插入图片描述
  • 为什么需要 tail 原生指针
    因为你需要去操作为尾节点,而尾阶段只能被他前一个阶段的unique_ptr独占指向,所以只能使用裸指针
    在这里插入图片描述
    在这里插入图片描述
1.通过分离数据而实现并

上述问题的本质就是就算我们使用两个互斥来互斥访问头尾指针,但是我们不能分清一种时机,就是两个线程同时去互斥的访问头指针和尾指针时,不能确定这两个指针所指向的阶段是否是统一个节点。

本方法的解决思路就是,保证队列中至少有一个节点。
在这里插入图片描述

push()只访问tail指针而不再触及head指针

template <typename T>
class queue
{
private:
    struct node
    {
        T data;
        sdt::unique_ptr<node> next;
        node(T data_) : data(std::move(data)) {}
    };
    std::unique_ptr<node> head;
    node *tail;

public:
    queue() : tail(nullptr) {}
    queue(const queue &other) = delete;
    queue &operator=(const queue &other) = delete;

    std::shared_ptr<T> try_pop()
    {
        if (!head) // 当指针为空
        {
            return std::shared_ptr<T>();
        }
        std::shared_ptr<T> const res(std::make_shared<T>(std::move(head->data)));
        std::unique_ptr<node> const old_head = std::move(head); // 这一步可以省略吗? 可以
        head = sdt::move(old_head->next);
        if (!head) // 当取出元素之后,头指针为空
            tail = nullptr;
        return res; // 旧的头结点old_head在这里被销毁
    }

    void push(T new_value)
    {
        std::unique_ptr<node> p(new node(std::move(new_value)));
        node *const new_tail = p.get();
        if (tail)
        {
            tail->next = std::move(p);
        }
        else // 队列为空
        {
            head = std::move(p);
        }
        tail = new_tail;
    }
};

在这里插入图片描述

  • 最后的方案:
template <typename T>
class threadsafe_queue
{
private:
    struct node
    {
        std::shared_ptr<T> data;
        std::unique_ptr<node> next;
    };
    node *tail;
    std::unique_ptr<node> head;
    std::mutex head_mutex;
    std::mutex tail_mutex;
    node *get_tail()
    {
        std::lock_guard<std::mutex> tail_lock(tail_mutex);
        return tail;
    }
    std::unique_ptr<node> pop_head()
    {
        std::lock_guard<std::mutex> head_lock(head_mutex);

        if (head.get() == get_tail())
        {
            return nullptr;
        }
        std::unique_ptr<node> old_head = std::move(head);
        head = std::move(old_head->next);
        return old_head;
    }

public:
    threadsafe_queue() : head(new node), tail(head.get())
    {
    }
    threadsafe_queue(const threadsafe_queue &other) = delete;
    threadsafe_queue &operator=(const threadsafe_queue &other) = delete;
    std::shared_ptr<T> try_pop()
    {
        std::unique_ptr<node> old_head = pop_head();
        return old_head ? old_head->data : std::shared_ptr<T>();
    }
    void push(T new_value)
    {
        // 创建一个新的结点
        std::shared_ptr<T> new_data(std::make_shared<T>(std::move(new_value)));
        std::unique_ptr<node> p(new node);
        node *const new_tail = p.get();
        // 修改旧尾结点的数据和next指针 : 开始加锁
        std::lock_guard<std::mutex> tail_lock(tail_mutex);
        tail->data = new_data;
        tail->next = std::move(p);
        tail = new_tail;
    }
};

在这里插入图片描述

  • 注意分配内存可能会导致抛出异常,所以建议使用智能指针,就算抛出异常,也会被释放
2.等待数据弹出
template<typename T>
class threadsafe_queue
{
private:
    struct node
    {
        std::shared_ptr<T> data;
        std::unique_ptr<node> next;
    };
    std::mutex head_mutex;
    std::unique_ptr<node> head;
    std::mutex tail_mutex;
    node* tail;
    std::condition_variable data_cond;
public:
    threadsafe_queue():
        head(new node),tail(head.get())
    {}
    threadsafe_queue(const threadsafe_queue& other)=delete;
    threadsafe_queue& operator=(const threadsafe_queue& other)=delete;
    std::shared_ptr<T> try_pop();
    bool try_pop(T& value);
    std::shared_ptr<T> wait_and_pop();
    void wait_and_pop(T& value);
    void push(T new_value);
    bool empty();
};

template<typename T>
void threadsafe_queue<T>::push(T new_value)
{
    std::shared_ptr<T> new_data(
        std::make_shared<T>(std::move(new_value)));
    std::unique_ptr<node> p(new node);
    {
        std::lock_guard<std::mutex> tail_lock(tail_mutex);
        tail->data=new_data;
        node* const new_tail=p.get();
        tail->next=std::move(p);
        tail=new_tail;
    }
    data_cond.notify_one();
}

template<typename T>
class threadsafe_queue
{
private:
    node* get_tail()
    {
        std::lock_guard<std::mutex> tail_lock(tail_mutex);
        return tail;
    }
    std::unique_ptr<node> pop_head()---{
        std::unique_ptr<node> old_head=std::move(head);
        head=std::move(old_head->next);
        return old_head;
    }
    std::unique_lock<std::mutex> wait_for_data()---{
        std::unique_lock<std::mutex> head_lock(head_mutex);
        data_cond.wait(head_lock,[&]{return head.get()!=get_tail();});
        return std::move(head_lock);---}
    std::unique_ptr<node> wait_pop_head()
    {
        std::unique_lock<std::mutex> head_lock(wait_for_data());---return pop_head();
    }
    std::unique_ptr<node> wait_pop_head(T& value)
    {
        std::unique_lock<std::mutex> head_lock(wait_for_data());---  ⑤
        value=std::move(*head->data);
        return pop_head();
    }
public:
    std::shared_ptr<T> wait_and_pop()
    {
        std::unique_ptr<node> const old_head=wait_pop_head();
        return old_head->data;
    }
    void wait_and_pop(T& value)
    {
        std::unique_ptr<node> const old_head=wait_pop_head(value);
    }
};

template<typename T>
class threadsafe_queue
{
private:
    std::unique_ptr<node> try_pop_head()
    {
        std::lock_guard<std::mutex> head_lock(head_mutex);
        if(head.get()==get_tail())
        {
            return std::unique_ptr<node>();
        }
        return pop_head();
    }
    std::unique_ptr<node> try_pop_head(T& value)
    {
        std::lock_guard<std::mutex> head_lock(head_mutex);
        if(head.get()==get_tail())
        {
            return std::unique_ptr<node>();
        }
        value=std::move(*head->data);
        return pop_head();
    }
public:
    std::shared_ptr<T> try_pop()
    {
        std::unique_ptr<node> old_head=try_pop_head();
        return old_head?old_head->data:std::shared_ptr<T>();
    }
    bool try_pop(T& value)
    {
        std::unique_ptr<node> const old_head=try_pop_head(value);
        return old_head;
    }
    bool empty()
    {
        std::lock_guard<std::mutex> head_lock(head_mutex);
        return (head.get()==get_tail());
    }
};
  • 上述讨论的队列是无线队列,即队列的数量是无限的。

6.3 更复杂的并发数据结构

基于锁的查找表

template <typename Key, typename Value, typename Hash = std::hash<Key>>
class threadsafe_lookup_table
{
private:
    class bucket_type
    {
    private:
        typedef std::pair<Key, Value> bucket_value;
        typedef std::list<bucket_value> bucket_data;
        typedef typename bucket_data::iterator bucket_iterator;
        bucket_data data;
        mutable std::shared_mutex mutex;

        bucket_iterator find_entry_for(Key const &key) const
        {
            return std::find_if(data.begin(), data.end(),
                                [&](bucket_value const &item)
                                { return item.first == key; });
        }

    public:
        Value value_for(Key const &key, Value const &default_value) const
        {
            std::shared_lock<std::shared_mutex> lock(mutex);
            bucket_iterator const found_entry = find_entry_for(key);
            return (found_entry == data.end()) ? default_value : found_entry->second;
        }
        void add_or_update_mapping(Key const &key, Value const &value)
        {
            std::unique_lock<std::shared_mutex> lock(mutex);
            bucket_iterator const found_entry = find_entry_for(key);
            if (found_entry == data.end()) // 要添加的键不存在
            {
                data.push_back(bucket_value(key, value));
            }
            else // 要添加的键已存在
            {
                found_entry->second = value;
            }
        }

        void remove_mapping(Key const &key)
        {
            std::unique_lock<std::shared_mutex> lock(mutex);
            bucket_iterator const found_entry = find_entry_for(key);
            if (found_entry != data.end())
            {
                data.erase(found_entry);
            }
        }
    };

    std::vector<std::unique_ptr<bucket_type>> buckets;
    Hash hasher;
    bucket_type &get_bucket(Key const &key) const // 函数作用
    {
        std::size_t const bucket_index = hasher(key) % buckets.size();
        return *buckets[bucket_index];
    }

public:
    typedef Key key_type;
    typedef Value mapped_type;
    typedef Hash hash_type;
    threadsafe_lookup_table(unsigned num_buckets = 19, Hash const &hasher_ = Hash()) : buckets(num_buckets), hasher(hasher_)
    {
        for (unsigned i = 0; i < num_buckets; ++i)
        {
            buckets[i].reset(new bucket_type);
        }
    }
    threadsafe_lookup_table(threadsafe_lookup_table const &other) = delete;
    threadsafe_lookup_table &operator=(threadsafe_lookup_table const &other) = delete;

    Value value_for(Key const &key, Value const &default_value = Value()) const
    {
        return get_bucket(key).value_for(key, default_value);
    }
    void add_or_update_mapping(Key const &key, Value const &value)
    {
        get_bucket(key).add_or_update_mapping(key, value);
    }
    void remove_mapping(Key const &key)
    {
        get_bucket(key).remove_mapping(key);
    }
};

添加快照

在这里插入图片描述

基于多种锁的链表

让每个节点都具备自己的互斥

支持迭代功能的线程安全的链表

template <typename T>
class threadsafe_list
{
    struct node
    {
        std::mutex m;
        std::shared_ptr<T> data;
        std::unique_ptr<node> next;
        node() : next()
        {
        }

        node(T const &value) : data(std::make_shared<T>(value)) {}
    };
    node head;

public:
    threadsafe_list(threadsafe_list const &other) = delete;
    threadsafe_list &operator=(threadsafe_list const &other) = delete;

    threadsafe_list() {}
    ~threadsafe_list()
    {
        remove_if([](node const &)
                  { return true; });
    }

    void push_front(T const &value)
    {
        std::unique_ptr<node> new_node(new node(value));
        std::lock_guard<std::mutex> lk(head.m);
        new_node->next = std::move(head.next);
        head.next = std::move(new_node);
    }

    template <typename Function>
    void for_each(Function f)
    {
        node *current = &head;
        std::unique_lock<std::mutex> lk(head.m);
        while (node *const next = current->next.get())
        {
            std::unique_lock<std::mutex> next_lk(next->m);
            lk.unlock();
            f(*next->data);
            current = next;
            lk = std::move(next_lk);
        }
    }

    template <typename Predicate>
    std::shared_ptr<T> find_first_if(Predicate p)
    {
        node *current = &head;
        std::unique_lock<std::mutex> lk(head.m);
        while (node *const next = current->next.get())
        {
            std::unique_lock<std::mutex> next_lk(next->m);
            lk.unlock();
            if (p(*next->data))
            {
                return next->data;
            }
            current = next;
            lk = std::move(next_lk);
        }
        return std::shared_ptr<T>();
    }

    template <typename Predicate>
    void remove_if(Predicate p)
    {
        node *current = &head;
        std::unique_lock<std::mutex> lk(head.m);
        while (node *const next = current->next.get())
        {
            std::unique_lock<std::mutex> next_lk(next->m);
            if (p(*next->data))
            {
                std::unique_ptr<node> old_next = std::move(current->next);
                current->next = std::move(next->next);
                next_lk.unlock();
            }
            else
            {
                lk.unlock();
                current = next;
                lk = std::move(next_lk);
            }
        }
    }
};
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐