4.1.1

代码清单 4.1 条件变量示例

std::queue<data_chunk> data_queue;
std::mutex mut;
std::condition_variable data_cond;
 
void data_preparation_thread() { // 由线程乙运行
    while(more_data_to_prepare()) {
        data_chunk const data = prepare_data();
        {
            std::lock_guard<std::mutex> lk(mut);
            data_queue.push(data);
        }
        data_cond.notify_one(); // 在解锁互斥后通知
    }
}
 
void data_processing_thread() { // 由线程甲运行
    while(true) {
        std::unique_lock<std::mutex> lk(mut);
        data_cond.wait(lk, []{ return !data_queue.empty(); });
        data_chunk data = data_queue.front();
        data_queue.pop();
        lk.unlock();
 
        process(data);
        if(is_last_chunk(data))
            break;
    }
}

条件变量在 wait 端可能被伪唤醒,所以 wait 端要用while循环。

4.2.3

std::promise 和 std::future 配对使用:等待数据的线程在 future 上阻塞,提供数据的线程利用配对的 promise 设定关联的值。

4.4.1

代码清单 4.13 快速排序的并行实现,通过std::async()在另一线程上操作

4.4.8

latch相当于 go 的WaitGroup

4.4.9

barrier相当于 go 的什么 #?

5.3.3

seq_cst(sequentially consistent,保序)

线程内的先后顺序就是实际的先后顺序

relaxed

每个 relaxed 的 atomic 变量都是一个小隔间内的记录员。记录员处理写请求,在列表最后追加记录;记录员处理读请求,对每个线程各有一个读取游标,读取游标只能不动或往后移动。

release / acquire

跨线程同一 atomic 变量上的 release / acquire 构成同步,记为一个批次。“同步”指如果发生就先后发生。 同线程内 release 之前都记为一个批次,把 release 想象成 before ; 同线程内 acquire 之后都记为一个批次,把 acquire 想象成 after。

  • seq_cst 与 release / acquire 混杂使用,seq_cst 退化成 release / acquire
  • relaxed 与 release / acquire 混杂使用,relaxed 仍为 relaxed,但受到 release / acquire 语义的限制

6.2.3

代码清单 6.8 data_cond.notify_one() 要在 mutex 区外调用

代码清单 6.9 有趣部分

std::unique_lock<std::mutex> wait_for_data() {
    std::unique_lock<std::mutex> head_lock(head_mutex);
    data_cond.wait(head_lock, [&]{ return ead.get()!=get_tail(); });
    return std::move(head_lock);
}
 
std::unique_ptr<node> wait_pop_head() {
    std::unique_lock<std::mutex> head_lock(wait_for_data());
    return pop_head();
}

7.1.1 非阻塞型数据结构

atomic_flag 实现自旋锁互斥

class spinlock_mutex {
    std::atomic_flag flag;
public:
    spinlock_mutex() :flag(ATOMIC_FLAG_INIT) {}
    void lock() {
        while(flag.test_and_set(std::memory_order_acquire))
            ;
    }
    void unlock() {
        flag.clear(std::memory_order_release);
    }
};

无锁栈实现

注:下面代码没管 pop()节点的删除,有内存泄漏。

template<typename T>
class lock_free_stack {
private:
    struct node {
      std::shared_ptr<T> data;
      node* next;
      node(T const& data_): data(std::make_shared<T>(data)) {}
    };
    std::atomic<node*> head;
public:
    void push(T const& data) {
        node* const new_node = new node(data);
        new_node->next=head.load();
        while(!head.compare_exchange_weak(new_node->next,new_node));
    }
 
    std::shared_ptr<T> pop() {
        node* old_head = head.load();
        while(old_head && !head.compare_exchange_weak(old_head, old_head->next));
        return old_head ? old_head->data : std::shared_ptr<T>();
    }
};

8.2.2 缓存乒乓

std::atomic<unsigned long> counter(0);
void processing_loop() {
    while(counter.fetch_add(1,std::memory_order_relaxed) < 100000000) {
        do_something();
    }
}

若另一线程正在另一处理器上运行相同的代码,两个处理器的缓存中将分别形成变量 counter 的副本,它们必须在两个处理器之间来回传递,两者所含的 counter 值才可以保持最新。

变量 counter 所含的数据在不同的缓存之间多次来回传递,这称为缓存乒乓,会严重影响应用程序的性能。

8.2.3 伪共享(false sharing)

通常,处理器的缓存单元并非独立的小片内存范围,而是连续的大块内存,称为缓存块。这些缓存块的大小往往是 32 字节或 64 字节,其准确值取决于我们使用的处理器的具体型号。缓存硬件只能按缓存块的大小来处理内存块,若多个小型数据项在内存中的位置彼此相邻,那它们将被纳入同一个缓存块。

假定多个线程要访问一个整型数组(包括更新),其中各线程都具有专属的元素,且只反复读写自己的元素。由于整型变量的尺寸往往比缓存块小很多,因此同一缓存块能够容纳多个数组元素。结果,尽管各线程仅访问数组中属于自己的元素,但仍会让硬件产生缓存乒乓的现象。 假定某缓存块内有两个元素,序号是 0 和 1,每当有线程要更新 0 号元素,便需把整个缓存块传送到相关的处理器。若另一个处理器上的线程要更新 1 号元素,该缓存块则需再次传递。虽然两个线程没有共享任何数据,但缓存块却被它们共享,因此称之为伪共享。 这里的解决方法是编排数据布局,使得相同线程访问的数据在内存中彼此靠近,增加它们被纳入同一个缓存块的机会,而由不同线程访问的数据在内存中彼此远离,因此更有可能散布到多个独立的缓存块中。

参考

  • C++并发编程实战(第2版), Anthony Williams