早些年前也翻过一遍 perfbook,记得每天午休都是从公司闲逛到清河地铁站,边走边看。可惜由于本人太菜,品鉴不到什么。现在重新做点笔记吧,常读常新。(缓慢更新中……)

注意这里 perfbook 指的是 paulmck 大神的 Is Parallel Programming Hard, And, If So, What Can You Do About It?,不是 dendibakh 的同名手册。看的版本是 v2023.06。

共享内存的系统

作者在开篇提到一个问题:并行编程是否应该尽可能抽象而不去关注硬件特性?回答是不要这么做。忽略硬件特性可以简化问题,但在大多数情况下其实相当愚蠢。一方面,追求并行编程的目的就是为了性能,那么性能是取决于硬件特性,在逻辑上就要求工程师至少需要了解一些硬件特性;另一方面,认知不足的工程师做不出像样的玩意。

以缓存互联为例

system-hardware-architecture

上图给了一个经典的系统硬件架构。当一个内存上的数据尝试读取到寄存器,或者从寄存器写入到内存的时候,系统必须要处理 cacheline。但是缓存可能是 CPU 共享的,也可能是独立且互联的。

For example, if CPU 0 were to write to a variable whose cacheline resided in CPU 7’s cache, the following over-simplified sequence of events might ensue:

  1. CPU 0 checks its local cache, and does not find the cacheline. It therefore records the write in its store buffer.
  2. A request for this cacheline is forwarded to CPU 0’s and 1’s interconnect, which checks CPU 1’s local cache, and does not find the cacheline.
  3. This request is forwarded to the system interconnect, which checks with the other three dies, learning that the cacheline is held by the die containing CPU 6 and 7.
  4. This request is forwarded to CPU 6’s and 7’s interconnect, which checks both CPUs’ caches, finding the value in CPU 7’s cache.
  5. CPU 7 forwards the cacheline to its interconnect, and also flushes the cacheline from its cache.
  6. CPU 6’s and 7’s interconnect forwards the cacheline to the system interconnect.
  7. The system interconnect forwards the cacheline to CPU 0’s and 1’s interconnect.
  8. CPU 0’s and 1’s interconnect forwards the cacheline to CPU 0’s cache.
  9. CPU 0 can now complete the write, updating the relevant portions of the newly arrived cacheline from the value previously recorded in the store buffer.

像上面的示例行为就是缓存一致性协议的简化形式,由 CAS 操作引起。从该硬件特性可以看到,即使是单个指令,也会引起可观的协议开销。另一个层面上,协议对于长时间不更新的数据是允许跨 CPU cachelines 做到多个副本并存,而我们也依靠这种硬件特性来实现 read-mostly 变量的优化(也就是 RCU 等延迟处理技术)。

原子操作的开销

Operation Cost (ns) Ratio (cost/clock) CPUs
Clock period 0.5 1.0  
Same-CPU     0
 CAS 7.0 14.6  
 lock 15.4 32.3  
On-Core     224
 Blind CAS 7.2 15.2  
 CAS 18.0 37.7  
Off-Core     1–27, 225–251
 Blind CAS 47.5 99.8  
 CAS 101.9 214.0  
Off-Socket     28–111, 252–335
 Blind CAS 148.8 312.5  
 CAS 442.9 930.1  
Cross-Interconnect     112–223, 336–447
 Blind CAS 336.6 706.8  
 CAS 944.8 1,984.2  
Off-System      
 Comms Fabric 5,000 10,500  
 Global Comms 195,000,000 409,500,000  

上表是一个 8-Socket with Intel Xeon Platinum 8176 CPUs 的系统,用以观察 CAS 操作的开销。

Same-CPU 前缀表示正在某个变量上执行 CAS 操作的 CPU 是上一次访问该变量的同一个 CPU。也就是说对应的 cacheline 在执行 CAS 前就已经缓存了。lock 可视为两次的 CAS 操作,即 acquire 和 release,所以开销接近翻倍。注意这份表都是以 CPU 0 的视角为前提。

On-Core 涉及同一核心的不同硬件线程之间的交互。由于硬件线程之间完全共享了缓存层次架构,其开销接近于 Same-CPU 操作。这里的 CPU 224 与 CPU 0 是同核心超线程的关系。

Blind CAS 指的是直接指定 cmpxchg 时的旧值参数(一般命名为 expected 或者 old)而无需查看内存地址。这种一般适用于持锁操作,比如一个 unlocked 状态使用 0 去表示,locked 状态使用 1 去表示,用于 lock() 实现的 CAS 操作只需判断旧值为 0,新值为 1。与常规 CAS 不同在于,这里只有 1 次(CAS 操作本身的)内存地址访问,因此开销会更低。

常规 CAS 操作的旧值是从之前早期的 load 操作得到的。比如尝试做一次原子增加计数的操作,load 操作先获取当前值;然后该数值加一以产生另一个数值;再到 CAS 操作,刚才实际被 load 的值作为旧值,产生的值作为新值。这里有 2 次内存地址访问操作,一次是 load 操作,另一次是 CAS 操作本身,因此开销会更高。所以这种额外的 load 操作并非免费的午餐,对比之下,On-Core blind CAS 和 Same-CPU CAS 的开销是接近的。

剩下的跨核心、跨 socket 这些,只能说域之间的距离越远,通信带来的延迟惩罚越大。对比一个普通指令,其开销已有百倍到千倍的差距。

硬件优化的方向

这本书不关注硬件优化这个话题,只给方向。除了传统的加大缓存、写入缓冲、预取优化和投机执行以外,还提供了更加先进的方向比如 3D 堆叠、新型材料、加速卡和光子芯片等等供人参考,唉跟不上 paul 大神的思维了。

软件实现的启发

CAS 操作通常要假设大概率要与外部 CPU 通信,这代表着(因被动地响应 invalidate message 而导致)cache miss 的产生。而且从上表来看,越是复杂庞大的系统要承受的开销占比则越高。

我们必须依靠硬件的特性减少通信的次数。就基本策略来看,一种思路是使用几乎没有通信的彼此隔离的线程,另一种思路是确保任何共享数据都是以读取为主。书里要讲的技巧基本都围绕这两点展开。

并行原语和工具

第四章开头比较偏手册(堆 API),就不摘抄太多了。总之作者的意思是,如果不强求「do parallel programming the hard way」,你可以考虑直接使用 shell 实现并行编程,比如 & 搭配 wait,或者是管道 |。除此以外,POSIX thread 和 GCC(classic)/C11/GCC(modern) atomic 库都是趁手的工具。

线程和锁的讨论

POSIX 通过 lock 原语避免 data race。为了避免可能存在的歧义,这里需要同步一下 data race 的概念:A data race occurs when there are multiple concurrent accesses to a given variable, at least one of which is a plain C-language access and at least one of which is a store. 另外,作者在这里没有深入讨论良性的数据竞争(benign data race,也就是符合上面定义,且不影响结果的正确性),只给结论:编译器越是先进(激进),这种现象就越是稀少(见共享变量的恶意小节)。

顺手找了点资料:

  • Oracle 的文档提供了一个良性的示例。注意这里可能假定了编译器不做激进优化。
  • HP 的论文给出了观点:(会被编译器介入的)源码层面的 data race 应该全部视为错误。

rwlock-scalability

POSIX 还为(线程数量上)读者多而写者少的场合提供了 POSIX Reader-Writer Locking(即读写锁 pthread_rwlock_t),以期望相比 POSIX Locking(即互斥锁 pthread_mutex_t)能获得更好的可扩展性。但是需要注意不同临界区(时间)长度对于读写锁的扩展性问题,上图给出了读写锁在不同临界区的扩展性衰减程度,即使是高达 10000 us 长度的临界区,依然有接近 10% 的性能衰减(别看近似一条水平直线,纵坐标是按对数尺度画出来的)。

要真正提供高扩展性,有多种思路:

  • 如果数据绝不更新,那根本不需要持有任何类型的锁。
  • 如果数据低频更新,可以设置检查点,在更新数据前终止线程,更新后从检查点重新启动线程。
  • 或者减小锁粒度:每个线程中维护自己的互斥锁,读操作获取自己的锁,写操作则获取所有的锁。
  • 对于极小的临界区,可以参考后续延迟处理章节。

NOTE: 这里忽略了不少实验细节,感兴趣翻原文。

原子操作和屏障

使用原子操作和屏障,这对于足够小的临界区来说也是比使用锁更好的方案。

GCC (Classic) 提供了原子操作和屏障的原语:

  • 原子操作:__sync_fetch_and_*()__sync_*_compare_and_swap() 等函数。
  • 内存屏障:__sync_synchronize()

在特殊场合下,可能需要更多(GCC 没提供的)原语:

  • 防止编译器重排序(编译器屏障):barrier()
  • 防止编译器访问优化:READ_ONCE()WRITE_ONCE()
// barrier() 和 *_ONCE() 这些函数可以使用下面的代码来实现
// 这也是 Linux 内核中的简化实现
#define ACCESS_ONCE(x) (*(volatile typeof(x) *)&(x))
#define READ_ONCE(x) \
                ({ typeof(x) ___x = ACCESS_ONCE(x); ___x; })
#define WRITE_ONCE(x, val) \
                do { ACCESS_ONCE(x) = (val); } while (0)
#define barrier() __asm__ __volatile__("": : :"memory")

C11 标准和 Modern GCC 也提供了类似的原子操作和屏障的原语,前者引入了 C11 内存模型(也就是 memory order 标记),后者还将原语扩展到了非原子类型。比如 __atomic_store_n(&x, v, __ATOMIC_RELAXED)WRITE_ONCE() 是非常相似的原语。

  • A READ_ONCE(), WRITE_ONCE(), and the now-obsolete ACCESS_ONCE() accesses may be modeled as a volatile memory_order_relaxed access.
  • Note that the volatile is absolutely required: Non-volatile memory_order_relaxed is not sufficient.

P0124R8: Variable Access

共享变量的恶意

尽管使用 GCC 的 synchronize 原语可以做到 full barrier 的保证,但这并不代表共享变量的访问是安全的。即使是标量类型的变量执行 plain store/load 操作,编译器都有权假定该变量不被其它线程所访问和修改,这意味着有大量的编译器优化隐含在里面破坏你的并发代码。

NOTES:

  • 使用 lock 原语可以保证访问共享变量的安全。
  • plain store/load 指的是不使用 C11 atomic、内联汇编以及 volatile 访问。
  • 永远记得并发编程要考虑单线程的场合,你不能忽略内核态中断和用户态 signal()
  • 极端的场合甚至要考虑 on-stack 和 per-CPU 变量的并发访问风险,不过这是仅限于内核。

共享变量的访问模式中,常见被破坏的现象有:

  • load tearing:一次 load 操作被编译器优化为多个 load 操作。
  • store tearing:一次 store 操作被编译器优化为多个 store 操作。
  • load fusing:使用单次 load 获得的值替代重复 load 操作。
  • store fusing:使用最后的 store 操作替代连续的 store 操作。
  • code reordering:即 program order 重排序。
  • invented loads:将 load 时使用的临时(局部)变量直接替换为最终使用的变量。
  • invented stores:将 store 操作临时存放于暂时未被使用或不再使用到的变量。
  • store-to-load transformations:一次 store 操作被转换为先 load(用于分支判断)再 store。
  • dead-code elimination:操作被消除。

NOTES:

// 清单 4.19 给出的问题代码
void shut_it_down(void)
{
    status = SHUTTING_DOWN; /* BUGGY!!! */
    start_shutdown();
    while (!other_task_ready) /* BUGGY!!! */
        continue;
    finish_shutdown();
    status = SHUT_DOWN; /* BUGGY!!! */
    do_something_else();
}

void work_until_shut_down(void)
{
    while (status != SHUTTING_DOWN) /* BUGGY!!! */
        do_more_work();
    other_task_ready = 1; /* BUGGY!!! */
}

清单 4.19 给出了一段非常短但是能让开发者献祭无数遍的代码,假设这两个函数是并行的:

  • 考虑 store fusing。假设 start_shutdown()finish_shutdown() 不会访问到 status(通过内联等方式得知),编译器有权认为首个 status 赋值是多余的,移除并只保留最后一次赋值;因此 work_until_shut_down() 无法结束 while 循环,从而导致 other_task_ready 无法置位;所以 shut_it_down() 中的 while 循环也无法结束。
  • 考虑 code reordering。假设 do_more_work() 不会访问到 other_task_ready,编译器也有权调换代码顺序,使得 other_task 值位先于 while 循环执行;从而破坏了 shut_it_down() 的状态,使得 finish_shutdown() 先于 do_more_work() 执行。
  • 考虑 invented stores。假设 do_more_work() 是内联于 work_until_shut_down() 当中,编译器也有权在 while 执行时,将 do_more_work() 内使用到的临时变量存放到函数外部的 other_task_ready;由于 other_task_ready 可以(提前)置为任意 do_more_work() 用到的数值,同样破坏了 shut_it_down() 的状态。
  • 最后还是要考虑硬件相关的 CPU reordering。

还有很多代码示例,推荐翻书或者看上面的 LWN 专题。后面小节也会有修复方案。

解决方案的选择

我们需要解决掉编译器带来的众多问题,解决方案可以有多种:

  • 使用 volatile 修饰符。尽管 volatile 具有明显的争议(实现定义)和不清晰的语言标准描述。但是 Linux 内核的内存模型指定了 volatile 的实现,在合适的对齐和大小的前提下禁止了上述的编译器优化行为。另外在使用上并不直接声明 volatile,而是 READ_ONCE()WRITE_ONCE()
  • 使用 barrier 编译器屏障。作者在 Code Reordering 小段中指出,现代 CPU 在特定场合下可以不考虑 CPU 重排序,比如中断例程已有「看到此前所有指令,且看不到后续任何指令」的屏障效果。而在必要的限制硬件乱序时,使用 smp_mb() 替代 barrier()
  • 使用原子类型和 memory order。
  • 使用锁或者限制特定 CPU / 线程访问变量以避免数据竞争。
问题代码 修复方案
// 清单 4.19 给出的问题代码
void shut_it_down(void)
{
    status = SHUTTING_DOWN; /* BUGGY!!! */
    start_shutdown();
    while (!other_task_ready) /* BUGGY!!! */
        continue;
    finish_shutdown();
    status = SHUT_DOWN; /* BUGGY!!! */
    do_something_else();
}

void work_until_shut_down(void)
{
    while (status != SHUTTING_DOWN) /* BUGGY!!! */
        do_more_work();
    other_task_ready = 1; /* BUGGY!!! */
}
// 清单 4.29 给出的 4.19 修复方案
void shut_it_down(void)
{
    // 使用 READ_ONCE() 和 WRITE_ONCE() 避免编译器优化
    WRITE_ONCE(status, SHUTTING_DOWN);
    // 同时使用 smp_mb() 避免 CPU 乱序执行
    smp_mb();
    start_shutdown();
    while (!READ_ONCE(other_task_ready))
        continue;
    smp_mb();
    finish_shutdown();
    smp_mb();
    WRITE_ONCE(status, SHUT_DOWN);
    do_something_else();
}

void work_until_shut_down(void)
{
    while (READ_ONCE(status) != SHUTTING_DOWN) {
        smp_mb();
        do_more_work();
    }
    smp_mb();
    WRITE_ONCE(other_task_ready, 1);
}

另外,作者还补充了 READ_ONCE()WRITE_ONCE() 的使用原则。在大多数情况下,共享变量必须使用 READ_ONCE()WRITE_ONCE(),但是也有一些范式(感觉分得太细了,先 mark):

  1. A shared variable is only modified by a given owning CPU or thread, but is read by other CPUs or threads. All stores must use WRITE_ONCE(). The owning CPU or thread may use plain loads. Everything else must use READ_ONCE() for loads.
  2. A shared variable is only modified while holding a given lock, but is read by code not holding that lock. All stores must use WRITE_ONCE(). CPUs or threads holding the lock may use plain loads. Everything else must use READ_ONCE() for loads.
  3. A shared variable is only modified while holding a given lock by a given owning CPU or thread, but is read by other CPUs or threads or by code not holding that lock. All stores must use WRITE_ONCE(). The owning CPU or thread may use plain loads, as may any CPU or thread holding the lock. Everything else must use READ_ONCE() for loads.
  4. A shared variable is only accessed by a given CPU or thread and by a signal or interrupt handler running in that CPU’s or thread’s context. The handler can use plain loads and stores, as can any code that has prevented the handler from being invoked, that is, code that has blocked signals and/or interrupts. All other code must use READ_ONCE() and WRITE_ONCE().
  5. A shared variable is only accessed by a given CPU or thread and by a signal or interrupt handler running in that CPU’s or thread’s context, and the handler always restores the values of any variables that it has written before return. The handler can use plain loads and stores, as can any code that has prevented the handler from being invoked, that is, code that has blocked signals and/or interrupts. All other code can use plain loads, but must use WRITE_ONCE() to prevent store tearing, store fusing, and invented stores.

注意虽然作者没有明说,但是这些惯用法是建立在 Linux 内核的基础上。如果我们是开发用户程序而非内核,第一选择应该是直接使用语言标准提供的内存模型,即原子类型和 memory order 标记。

内存模型的介绍

个人补充一点关于 LKMM(Linux 内核的内存模型)的简单介绍。主要是作者还没有说明 memory order 这一部分,后面第十五章才有讨论。

除了内核文档以外,也可以参考 P0124 提案和 liburing 封装:

  • smp_mb:接近 C/C++ 内存模型的 seq-cst 全序双向栅栏,但是还有额外的语义(略)。
  • smp_rmb:接近 C/C++ 内存模型的 acq-rel 双向栅栏,但是只保证栅栏前后 RR 不乱序。
  • smp_wmb:接近 C/C++ 内存模型的 acq-rel 双向栅栏,但是只保证栅栏前后 WW 不乱序。
  • ACQUIRE 语义:保证栅栏后的 pol-RW 都不越界,pol-W 对所有 CPU 可见。
  • RELEASE 语义:保证栅栏前的 poe-RW 都不越界,poe-W 对所有 CPU 可见。
  • 说明一下,R = read/load,W = write/store,po = program order, l = later, e = earlier。

可以看出 smp_mb 以及 ACQUIRE/RELEASE 基本和 C/C++ 语言标准用到的内存模型保持一致,但是 smp_rmb 和 smp_wmb 是非常宽松的(acq-rel 在重排序方面强势多了)。这里仍建议对比 P0124 的前后修订版本,关注一下 smp_rmb 和 smp_wmb 描述的变动。liburing 的实现也值得参考,它没有考虑 smp_rmb 和 smp_wmb,而其他的语义都照着语言标准做了一层封装。

计数器的设计

第五章讨论的是不同场合下的计数器实现。

data-flow-atomic 引起洪荒的原子操作

至少有两个不好的做法:

  1. 直接使用非原子自增(*_ONCE() 版本的 counter 自增),大量的数据会被丢失。
  2. 全部使用原子自增(atomic_inc(&counter)),多核的扩展性会很差。

总之要学会看场景去优化。

统计计数器

统计计数器的特点在于写多读少,比如统计网络包的数据量。

data-flow-per-thread

设计思路就是使用 per-thread/CPU 的计数器。当更新时只需更新线程本地的计数器,读取时再把所有线程的计数器聚合起来。

点击展开/折叠
DEFINE_PER_THREAD(unsigned long, counter);

static __inline__ void inc_count(void)
{
    unsigned long *p_counter = &__get_thread_var(counter);
    WRITE_ONCE(*p_counter, *p_counter + 1);
}

static __inline__ unsigned long read_count(void)
{
    int t;
    unsigned long sum = 0;
    for_each_thread(t)
        sum += READ_ONCE(per_thread(counter, t));
    return sum;
}

一种实现方式是基于数组。也就是分配一个数组,然后每个线程使用唯一的下标去访问元素。再经过 per-thread 原语(类似 uRCU 用到的实现,同样是作者写的代码)封装后的例子如上。

点击展开/折叠
// thread local 计数器
unsigned long _Thread_local counter = 0;
// 将 thread local 暴露给读者
unsigned long *counterp[NR_THREADS] = {NULL};
// 线程退出时的累计计数
unsigned long finalcount = 0;
// 控制线程注册
DEFINE_SPINLOCK(final_mutex);

static inline void inc_count(void)
{
    WRITE_ONCE(counter, counter + 1);
}

static inline unsigned long read_count(void)
{
    int t;
    unsigned long sum;

    spin_lock(&final_mutex);
    sum = finalcount;
    for_each_thread(t)
        if (counterp[t] != NULL)
            sum += READ_ONCE(*counterp[t]);
    spin_unlock(&final_mutex);

    return sum;
}

void count_register_thread(unsigned long *p)
{
    // smp_thread_id() 原语返回一个与发出请求的线程相对应的线程索引
    // 这个索引保证小于程序启动以来存在的线程最大数量
    // 因此它对于位掩码、数组索引等用途非常有用
    int idx = smp_thread_id();

    spin_lock(&final_mutex);
    counterp[idx] = &counter;
    spin_unlock(&final_mutex);
}

void count_unregister_thread(int nthreadsexpected)
{
    int idx = smp_thread_id();

    spin_lock(&final_mutex);
    finalcount += counter;
    counterp[idx] = NULL;
    spin_unlock(&final_mutex);
}

另一种实现方式是基于编译器提供的 thread local 变量,它避开了数组大小对线程数的限制。

点击展开/折叠
// 写侧计数器
DEFINE_PER_THREAD(unsigned long, counter);
// 读侧计数器
unsigned long global_count;
// 用于协作中断
int stopflag;

static __inline__ void inc_count(void)
{
    unsigned long *p_counter = &__get_thread_var(counter);

    WRITE_ONCE(*p_counter, *p_counter + 1);
}

static __inline__ unsigned long read_count(void)
{
    return READ_ONCE(global_count);
}

void *eventual(void *arg)
{
    int t;
    unsigned long sum;

    // 只有调用 count_cleanup() 才会推动 stopflag
    while (READ_ONCE(stopflag) < 3) {
        sum = 0;
        for_each_thread(t)
            sum += READ_ONCE(per_thread(counter, t));
        // 从写侧下推计数结果到读侧
        WRITE_ONCE(global_count, sum);
        // 定时下推,间隔 1 ms
        poll(NULL, 0, 1);
        if (READ_ONCE(stopflag))
            smp_store_release(&stopflag, stopflag + 1);
    }
    return NULL;
}

void count_init(void)
{
    int en;
    pthread_t tid;

    // 独立的下推线程
    en = pthread_create(&tid, NULL, eventual, NULL);
    if (en != 0) {
        fprintf(stderr, "pthread_create: %s\n", strerror(en));
        exit(EXIT_FAILURE);
    }
}

void count_cleanup(void)
{
    WRITE_ONCE(stopflag, 1);
    while (smp_load_acquire(&stopflag) < 3)
        poll(NULL, 0, 1);
}

还有一种实现方式是利用最终一致性来提高读侧性能。之前的实现版本是保证了返回值肯定是在 read_count() 执行前后的理想范围内,而最终一致性弱化了保证:在没有调用 inc_count() 的情况下,(随着时间的推移)read_count() 最终会返回一个准确的计数值。具体看代码注释,这需要一个独立的线程协助完成下推工作。

近似限制计数器

计数器的某些场合可以只要求近似处理。比如一个结构体分配器的已占用次数达到限制阈值后,将不再处理新的分配请求,而这个阈值是允许为粗略近似的。

一种设计思路可以是分区,但是可能不太通用。以结构体分配器为例,假设有 10 个线程,限制阈值是 10000,那么每个线程只需管理自身的 1000 个结构体占用计数。这种设计思路的问题在于难以处理分配和释放线程不一致的问题,计数要么是算入到释放线程,但是很快就会严重失衡;要么是算回到分配线程,但是需要用昂贵的原子手段。

另一种设计思路是部分分区。每线程维护 counter 计数和 counter max 阈值。通常每一次使用是增加线程私有的计数器;当线程的已占用计数到达阈值时,将把一半的计数算入到 global count 全局计数器中。这两种操作看起来前面就是快路径,后面就是慢路径。慢路径是要持有全局锁的,因为还需要做全局的重平衡操作。另外还有优化,作者引入了 global reserve 作为一个动态调整 counter max 的重平衡参数。总之要使得实现满足三个不变式:

\[\begin{align*} global\_count + global\_reserve &\le global\_count\_max \\ \sum_{i}^{nthreads}counter\_max_i &\le global\_reserve \\ counter_i &\le counter\_max_i \end{align*}\]
点击展开/折叠
unsigned long __thread counter = 0;
unsigned long __thread countermax = 0;
unsigned long globalcountmax = 10000;
unsigned long globalcount = 0;
unsigned long globalreserve = 0;
unsigned long *counterp[NR_THREADS] = { NULL };
DEFINE_SPINLOCK(gblcnt_mutex);
#define MAX_COUNTERMAX 1000

static __inline__ int add_count(unsigned long delta)
{
    if (counter_max - counter >= delta) {
        WRITE_ONCE(counter, counter + delta);
        return 1;
    }
    spin_lock(&gblcnt_mutex);
    globalize_count();
    if (globalcount_max - globalcount - global_reserve < delta) {
        spin_unlock(&gblcnt_mutex);
        return 0;
    }
    globalcount += delta;
    balance_count();
    spin_unlock(&gblcnt_mutex);
    return 1;
}

static __inline__ int sub_count(unsigned long delta)
{
    if (counter >= delta) {
        WRITE_ONCE(counter, counter - delta);
        return 1;
    }
    spin_lock(&gblcnt_mutex);
    globalize_count();
    if (globalcount < delta) {
        spin_unlock(&gblcnt_mutex);
        return 0;
    }
    globalcount -= delta;
    balance_count();
    spin_unlock(&gblcnt_mutex);
    return 1;
}

static __inline__ unsigned long read_count(void)
{
    int t;
    unsigned long sum;

    spin_lock(&gblcnt_mutex);
    sum = globalcount;
    for_each_thread(t) {
        if (counterp[t] != NULL)
            sum += READ_ONCE(*counterp[t]);
    }
    spin_unlock(&gblcnt_mutex);
    return sum;
}

static __inline__ void globalize_count(void)
{
    globalcount += counter;
    counter = 0;
    globalreserve -= countermax;
    countermax = 0;
}

static __inline__ void balance_count(void)
{
    countermax = globalcountmax -
                 globalcount - globalreserve;
    countermax /= num_online_threads();
    if (countermax > MAX_COUNTERMAX)
        countermax = MAX_COUNTERMAX;
    globalreserve += countermax;
    counter = countermax / 2;
    if (counter > globalcount)
        counter = globalcount;
    globalcount -= counter;
}

void count_register_thread(void)
{
    int idx = smp_thread_id();

    spin_lock(&gblcnt_mutex);
    counterp[idx] = &counter;
    spin_unlock(&gblcnt_mutex);
}

void count_unregister_thread(int nthreads_expected)
{
    int idx = smp_thread_id();

    spin_lock(&gblcnt_mutex);
    globalize_count();
    counterp[idx] = NULL;
    spin_unlock(&gblcnt_mutex);
}

NOTES:

  • counter 到达 counter max 时并非必须移入一半的 counter 数目,这是精度和扩展性的权衡。
  • counter max 简单来说是等于 global reserve 除以线程数。在程序早期,配额比较充足时,可以得到更高的 counter max 数值以减少进入慢路径的可能,以提高扩展性;而在后期配额较少时,会更加高频地进入慢路径执行重平衡,以提高精度。
  • 实现上的 counter 先置零再从 global count 拿回是为了简化 corner case。

精确限制计数器

作者给了两套做法:

  1. 原子实现,将计数和阈值高低位拆分压入到一个原子类型中。
  2. signal-theft,使用 signal()/kill() 免除原子操作,需要维护状态机。
原子实现源码
atomic_t __thread counterandmax = ATOMIC_INIT(0);
unsigned long globalcountmax = 1 << 25;
unsigned long globalcount = 0;
unsigned long globalreserve = 0;
atomic_t *counterp[NR_THREADS] = { NULL };
DEFINE_SPINLOCK(gblcnt_mutex);

#define CM_BITS (sizeof(atomic_t) * 4)
#define MAX_COUNTERMAX ((1 << CM_BITS) - 1)

static __inline__ void split_counterandmax_int(int cami, int *c, int *cm) {
    *c = (cami >> CM_BITS) & MAX_COUNTERMAX;
    *cm = cami & MAX_COUNTERMAX;
}

static __inline__ void split_counterandmax(atomic_t *cam, int *old, int *c, int *cm) {
    unsigned int cami = atomic_read(cam);
    *old = cami;
    split_counterandmax_int(cami, c, cm);
}

static __inline__ int merge_counterandmax(int c, int cm) {
    unsigned int cami = (c << CM_BITS) | cm;
    return ((int)cami);
}

int add_count(unsigned long delta) {
    int c, cm, old, new;

    do {
        split_counterandmax(&counterandmax, &old, &c, &cm);
        if (delta > MAX_COUNTERMAX || c + delta > cm)
            goto slowpath;
        new = merge_counterandmax(c + delta, cm);
    } while (atomic_cmpxchg(&counterandmax, old, new) != old);
    return 1;

slowpath:
    spin_lock(&gblcnt_mutex);
    globalize_count();
    if (globalcountmax - globalcount - globalreserve < delta) {
        flush_local_count();
        if (globalcountmax - globalcount - globalreserve < delta) {
            spin_unlock(&gblcnt_mutex);
            return 0;
        }
    }
    globalcount += delta;
    balance_count();
    spin_unlock(&gblcnt_mutex);
    return 1;
}

int sub_count(unsigned long delta) {
    int c, cm, old, new;

    do {
        split_counterandmax(&counterandmax, &old, &c, &cm);
        if (delta > c)
            goto slowpath;
        new = merge_counterandmax(c - delta, cm);
    } while (atomic_cmpxchg(&counterandmax, old, new) != old);
    return 1;

slowpath:
    spin_lock(&gblcnt_mutex);
    globalize_count();
    if (globalcount < delta) {
        flush_local_count();
        if (globalcount < delta) {
            spin_unlock(&gblcnt_mutex);
            return 0;
        }
    }
    globalcount -= delta;
    balance_count();
    spin_unlock(&gblcnt_mutex);
    return 1;
}

unsigned long read_count(void) {
    int c, cm, old, t;
    unsigned long sum;

    spin_lock(&gblcnt_mutex);
    sum = globalcount;
    for_each_thread(t) {
        if (counterp[t] != NULL) {
            split_counterandmax(counterp[t], &old, &c, &cm);
            sum += c;
        }
    }
    spin_unlock(&gblcnt_mutex);
    return sum;
}

static void globalize_count(void) {
    int c, cm, old;

    split_counterandmax(&counterandmax, &old, &c, &cm);
    globalcount += c;
    globalreserve -= cm;
    old = merge_counterandmax(0, 0);
    atomic_set(&counterandmax, old);
}

static void flush_local_count(void) {
    int c, cm, old, t, zero;

    if (globalreserve == 0)
        return;

    zero = merge_counterandmax(0, 0);
    for_each_thread(t)
        if (counterp[t] != NULL) {
            old = atomic_xchg(counterp[t], zero);
            split_counterandmax_int(old, &c, &cm);
            globalcount += c;
            globalreserve -= cm;
        }
}

static void balance_count(void) {
    int c, cm, old;
    unsigned long limit;

    limit = globalcountmax - globalcount - globalreserve;
    limit /= num_online_threads();
    cm = (limit > MAX_COUNTERMAX) ? MAX_COUNTERMAX : limit;

    globalreserve += cm;
    c = cm / 2;
    if (c > globalcount)
        c = globalcount;
    globalcount -= c;

    old = merge_counterandmax(c, cm);
    atomic_set(&counterandmax, old);
}

void count_register_thread(void) {
    int idx = smp_thread_id();

    spin_lock(&gblcnt_mutex);
    counterp[idx] = &counterandmax;
    spin_unlock(&gblcnt_mutex);
}

void count_unregister_thread(int nthreadsexpected) {
    int idx = smp_thread_id();

    spin_lock(&gblcnt_mutex);
    globalize_count();
    counterp[idx] = NULL;
    spin_unlock(&gblcnt_mutex);
}

原子实现类似上面的近似计数器,主要是并行的 flush 和 per-thread 流程需要保证数据的原子性,以避免 counter 和 counter max 仅修改一边却被另一个线程再次改动的问题,不多讨论了。

signal-theft

signal-theft 实现源码
/********** 数据结构部分 **********/

#define THEFT_IDLE 0
#define THEFT_REQ 1
#define THEFT_ACK 2
#define THEFT_READY 3

// 初始状态为 IDLE
int __thread theft = THEFT_IDLE;
// 是否处于 fast path 路径当中
int __thread counting = 0;
unsigned long __thread counter = 0;
unsigned long __thread countermax = 0;

unsigned long globalcountmax = 10000;
unsigned long globalcount = 0;
unsigned long globalreserve = 0;

unsigned long *counterp[NR_THREADS] = { NULL };
unsigned long *countermaxp[NR_THREADS] = { NULL };
int *theftp[NR_THREADS] = { NULL };

DEFINE_SPINLOCK(gblcnt_mutex);

#define MAX_COUNTERMAX 100

/********** 数值迁移部分 **********/

static void globalize_count(void) {
    globalcount += counter;
    counter = 0;
    globalreserve -= countermax;
    countermax = 0;
}

// 信号例程,注册流程见 count_init()
static void flush_local_count_sig(int unused) {
    // 非 REQ 状态会屏蔽信号
    if (READ_ONCE(theft) != THEFT_REQ)
        return;
    // REQ 可以转移为 ACK 或者 READY
    // 取决与 counting 位(是否在 fast path)
    WRITE_ONCE(theft, THEFT_ACK);
    if (!counting)
        smp_store_release(&theft, THEFT_READY);
}

static void flush_local_count(void) {
    int t;
    thread_id_t tid;

    for_each_tid(t, tid) {
        if (theftp[t] != NULL) {
            // 针对 no count 的特例,不需发出信号
            if (*countermaxp[t] == 0) {
                WRITE_ONCE(*theftp[t], THEFT_READY);
                continue;
            }
            // 只有 REQ 状态才会处理信号,处理完成回切换到 READY 或者 ACK
            // 可用于判断信号丢失或者信号延迟
            WRITE_ONCE(*theftp[t], THEFT_REQ);
            // 发出信号
            pthread_kill(tid, SIGUSR1);
        }
    }

    for_each_tid(t, tid) {
        if (theftp[t] == NULL)
            continue;
        while (smp_load_acquire(theftp[t]) != THEFT_READY) {
            poll(NULL, 0, 1);
            // 处理可能的信号丢失
            if (READ_ONCE(*theftp[t]) == THEFT_REQ)
                pthread_kill(tid, SIGUSR1);
        }
        globalcount += *counterp[t];
        *counterp[t] = 0;
        globalreserve -= *countermaxp[t];
        *countermaxp[t] = 0;
        // 盗窃完成则回到 IDLE
        smp_store_release(theftp[t], THEFT_IDLE);
    }
}

// 见前面的近似计数器小节
static void balance_count(void) {
    countermax = globalcountmax - globalcount - globalreserve;
    countermax /= num_online_threads();
    if (countermax > MAX_COUNTERMAX)
        countermax = MAX_COUNTERMAX;
    globalreserve += countermax;
    counter = countermax / 2;
    if (counter > globalcount)
        counter = globalcount;
    globalcount -= counter;
}

/********** 计数部分 **********/

int add_count(unsigned long delta) {
    int fastpath = 0;

    WRITE_ONCE(counting, 1);
    // 确保信号例程的并发
    barrier();

    // 尚未执行信号例程(IDLE 或者 REQ 状态)才可走 fast path
    if (smp_load_acquire(&theft) <= THEFT_REQ && countermax - counter >= delta) {
        WRITE_ONCE(counter, counter + delta);
        fastpath = 1;
    }

    barrier();
    WRITE_ONCE(counting, 0);
    barrier();

    // ACK 由信号例程设置,见 flush_local_count_sig()
    if (READ_ONCE(theft) == THEFT_ACK)
        smp_store_release(&theft, THEFT_READY);

    if (fastpath)
        return 1;

    // slow path 流程
    spin_lock(&gblcnt_mutex);
    globalize_count();

    if (globalcountmax - globalcount - globalreserve < delta) {
        // 发出信号且等待完成
        flush_local_count();
        // 如果盗窃完成了仍然超出阈值
        if (globalcountmax - globalcount - globalreserve < delta) {
            spin_unlock(&gblcnt_mutex);
            return 0;
        }
    }

    globalcount += delta;
    balance_count();
    spin_unlock(&gblcnt_mutex);

    return 1;
}

int sub_count(unsigned long delta) {
    int fastpath = 0;

    WRITE_ONCE(counting, 1);
    barrier();

    if (smp_load_acquire(&theft) <= THEFT_REQ && counter >= delta) {
        WRITE_ONCE(counter, counter - delta);
        fastpath = 1;
    }

    barrier();
    WRITE_ONCE(counting, 0);
    barrier();

    if (READ_ONCE(theft) == THEFT_ACK)
        smp_store_release(&theft, THEFT_READY);

    if (fastpath)
        return 1;

    spin_lock(&gblcnt_mutex);
    globalize_count();

    if (globalcount < delta) {
        flush_local_count();
        if (globalcount < delta) {
            spin_unlock(&gblcnt_mutex);
            return 0;
        }
    }

    globalcount -= delta;
    balance_count();
    spin_unlock(&gblcnt_mutex);

    return 1;
}

unsigned long read_count(void) {
    int t;
    unsigned long sum;

    spin_lock(&gblcnt_mutex);
    sum = globalcount;
    for_each_thread(t) {
        if (counterp[t] != NULL)
            sum += READ_ONCE(*counterp[t]);
    }
    spin_unlock(&gblcnt_mutex);

    return sum;
}

/********** 初始化部分 **********/

void count_init(void) {
    struct sigaction sa;

    // 信号例程注册为 flush_local_count_sig
    sa.sa_handler = flush_local_count_sig;
    sigemptyset(&sa.sa_mask);
    sa.sa_flags = 0;
    // 通过 SIGUSR1 信号派发,见 flush_local_count() 调用
    if (sigaction(SIGUSR1, &sa, NULL) != 0) {
        perror("sigaction");
        exit(EXIT_FAILURE);
    }
}

void count_register_thread(void) {
    int idx = smp_thread_id();

    spin_lock(&gblcnt_mutex);
    counterp[idx] = &counter;
    countermaxp[idx] = &countermax;
    theftp[idx] = &theft;
    spin_unlock(&gblcnt_mutex);
}

void count_unregister_thread(int nthreadsexpected) {
    int idx = smp_thread_id();

    spin_lock(&gblcnt_mutex);
    globalize_count();
    counterp[idx] = NULL;
    countermaxp[idx] = NULL;
    theftp[idx] = NULL;
    spin_unlock(&gblcnt_mutex);
}

signal-theft 版本的诀窍就是信号处理的上下文处于指定的线程本地,因此是通过信号去驱动 flush 操作。具体见代码注释和状态转移图。

Quick Quiz 8.5: What mechanisms other than POSIX signals may be used for function shipping?

Answer: There is a very large number of such mechanisms, including:

  1. System V message queues.
  2. Shared-memory dequeue.
  3. Shared-memory mailboxes.
  4. UNIX-domain sockets.
  5. TCP/IP or UDP, possibly augmented by any number of higher-level protocols, including RPC, HTTP, XML, SOAP, and so on.

当然使用其他的消息机制来替代信号也是可以的,不过个人觉得信号才是恰到好处的异步消息。

分区和同步

第六章是讲得比较宽泛的介绍环节,简单过一遍。

分区示例

perfbook-dining-philosophers-problem

作者抛出一个经典的哲学家进餐问题,一种解决饥饿的办法是给叉子排号然后按规定的大小顺序取,这样总有一位可以有进展。但是从分区的角度去思考,如果我们可以将 N 个哲学家分区成 N/2 对,每一对只竞争中间的一双叉子(如上图),那么每一对里总有一个人不会饥饿,也就是 N/2 位有进展。并且不管你的哲学家规模有多大,其同步开销几乎是不变的(因为没有数据依赖),这就是分区的威力。

另一个示例是锁实现的双端队列,除了使用一个队列持两把锁(需要处理重叠问题)以及两个队列各持一把锁组成复合队列(和原版哲学家问题类似,需要固定持锁顺序以避免死锁)的实现方式以外,还可以通过哈希分区的方式去完成这件事情。

One of the simplest and most effective ways to deterministically partition a data structure is to hash it.

NOTE: 虽然哈希分区的方式比较通用,但是论性能的话还是复合队列要更胜一筹:因为后者通常每次操作只需持锁一次,而前者必须持锁两次。

TODO: 搬运实现代码。

锁的粒度

介绍了一些基本的按锁分区范式:

  • code lock:按代码持锁,时间分区,高同步开销。
  • data lock:按数据结构持锁,时间和数据分区,高到低的同步开销。
  • data ownership:数据绑定线程或者 CPU,数据分区,没有同步开销。

NOTES:

  • 只看代码上锁仍适用于低频操作,因为设计简单。它可用于快速路径难以处理的场合。
  • 所有权虽然没有同步开销,但是转移所有权是个问题。后面第八章对所有权做出了总结。
  • 只读的所有权也是「共享」所有权,因为可以做副本。前面第三章提过这种 cache 特性。
  • 数据倾斜问题(单 key/CPU 热点)不能靠这些范式解决,需要设计具体的算法。

快速路径

快速路径是个简化设计并且保证高扩展的好思路,比较常见的快速路径有:

  • 读写锁。经典的非对称设计,前面有讨论过它的临界区问题,后面还会讨论抗饥饿和读写改进。
  • RCU。高性能场景下的读写锁替代品。
  • 层级锁。将一把又大又长(粒度和占用时间)的锁替换成多把(逐层变细并降低持大锁时间)。
  • 分配器缓存。
分配器缓存实现源码
/* Allocator-Cache Data Structures */

#define TARGET_POOL_SIZE 3
#define GLOBAL_POOL_SIZE 40

struct globalmempool {
    spinlock_t mutex;
    int cur;
    struct memblock *pool[GLOBAL_POOL_SIZE];
} globalmem;

struct perthreadmempool {
    int cur;
    // 常数 2 是批处理阈值,具体看 free 操作
    struct memblock *pool[2 * TARGET_POOL_SIZE];
};

DEFINE_PER_THREAD(struct perthreadmempool, perthreadmem);

/* Allocator-Cache Allocator Function */

struct memblock *memblock_alloc(void)
{
    int i;
    struct memblock *p;
    struct perthreadmempool *pcpp;

    pcpp = &__get_thread_var(perthreadmem);
    if (pcpp->cur < 0) {
        spin_lock(&globalmem.mutex);
        for (i = 0; i < TARGET_POOL_SIZE && globalmem.cur >= 0; i++) {
            pcpp->pool[i] = globalmem.pool[globalmem.cur];
            globalmem.pool[globalmem.cur--] = NULL;
        }
        pcpp->cur = i - 1;
        spin_unlock(&globalmem.mutex);
    }
    if (pcpp->cur >= 0) {
        p = pcpp->pool[pcpp->cur];
        pcpp->pool[pcpp->cur--] = NULL;
        return p;
    }
    return NULL;
}

/* Allocator-Cache Free Function */

void memblock_free(struct memblock *p)
{
    int i;
    struct perthreadmempool *pcpp;

    pcpp = &__get_thread_var(perthreadmem);
    if (pcpp->cur >= 2 * TARGET_POOL_SIZE - 1) {
        spin_lock(&globalmem.mutex);
        for (i = pcpp->cur; i >= TARGET_POOL_SIZE; i--) {
            globalmem.pool[++globalmem.cur] = pcpp->pool[i];
            pcpp->pool[i] = NULL;
        }
        pcpp->cur = i;
        spin_unlock(&globalmem.mutex);
    }
    pcpp->pool[++pcpp->cur] = p;
}

(内存)分配器缓存是常见的分配器设计技巧。一个直接思路是分区,也就是用所有权把内存均等隔离到每个 CPU 上。但是问题在于生产者-消费者模型(一些 CPU 只负责分配,另一些 CPU 只负责释放)会使得分配器不可用。因此一个改进思路是慢速路径做批处理加上全局大锁的组合设计,实现从全局内存池的推拉工作(也就是到达一定阈值后需要转移所有权)。但是快速路径仍然保留了所有权的思路,使得常规分配和释放足够快。

第七章介绍了锁的潜在问题和优化实现。

死锁问题

极端示例:死锁可发生在单线程单锁的情况下,比如自己锁自己的锁再锁自己的锁。

非侵入式的死锁检测算法就是判断关系图是否违反了有向无环图,Lock 和 Thread 作为顶点,Lock-to-Thread 的边表示已持锁,Thread-to-Lock 的边表示等待锁,形成任意环就意味着存在死锁,记得 facebook.folly 使用这个原理实现过一个 gdb 脚本;侵入式的死锁检测算法可以了解 Linux 内核的 lockdep 机制,可惜文档没把具体的实现说清楚,只提到检查状态和依赖。

死锁避免的方式非常多:

  • 锁的层级。给每个锁编号,持锁方在持有一定的锁时,下一把锁必须按照顺序取。比如持有了 2 号锁和 4 号锁,就不得再尝试持有 3 号锁。对于不同类型的锁,需要按实际用途考虑清楚编号顺序;而对于相同类型的锁,一个简单策略是直接按照地址顺序排序。在大型系统中依赖人工并不可靠,需要引入工具,比如 lockdep 是强制实行这种层级策略。
  • 锁的本地层级。一个问题是外部库和锁的管理,因为库实现不知道用户持有了什么锁(反过来也一样)。一种做法是库实现不调用用户定义的函数(回调)。另一种做法是调用用户定义函数前先释放所有的锁。
  • 锁的分级层级。给上面的问题再加补丁,因为直接释放锁不现实,改成获取新的锁(再释放旧的冲突锁)。作者提到这种做法可以任意扩展分级,但是属于破坏原有设计。
  • 锁的临时层级。也就是延迟获取锁的时机,思路可参考 RCU 的宽限期。
  • 避免指向锁的指针。通常暴露锁的指针是一种错误设计,除了一些用于扩展临界区以避免无谓唤醒的特例(让我想起了 C++ 的 unique_lock 封装,可以移动它并暴露到外部以避免过早的 RAII;作者提了一个 pthread_cond_wait 也必须要传递指针)。所以见到锁的指针都要特别注意。
  • 条件锁。在不适用层级的设计中(很复杂的拓扑关系?),可以用 trylock 的设计替代 lock。
  • 先获取所有锁。把双方交织持锁的现象给禁止了。这种做法如果暂时无法满足,就释放锁并重试。
  • 单锁设计。字面意思,没有嵌套的可能性。这里不仅指全局大锁,还可以指分区良好的细锁。
  • 避免信号和中断。注意信号也可能会持锁,因此用户可以在持锁前先屏蔽信号,中断同理。

注意条件锁可能会引起活锁问题,可以用一些(睡眠等待的)退避算法来打补丁。

锁的种类

这一小节简单讨论互斥锁、读写锁(多角色锁)和作用域锁的一些日常话题。

即使是互斥锁,也需要考虑不同策略:

  • Strict FIFO
  • Approximate FIFO
  • FIFO within priority level
  • Random
  • Unfair

越是强序的保证越是高成本,那么该怎么选?有些硬性要求是要满足的,比如实时系统总得提供同优先级下的 FIFO 顺序。其它的看选择,如果要想避开争用,完全不考虑公平性(忍受较长时间的饥饿)也是可选的。后面章节提到的(Linux 使用过的)ticket lock 是选择了 strict FIFO,因为设计背景就是为了在高争用时保持公平性。

还有读写锁的再次讨论。虽然这本书已经鞭尸过读写锁很多遍了,但是还是提到了饥饿问题,到底要读者饥饿还是写者饥饿,总得要选择。这里的技术选型也可参考 IO 调度器的设计,比如结合批处理让读者写者按比例轮流饥饿(这不就是 deadline 调度?),总之没有唯一的答案。顺便还介绍了类似读写锁思路的 VAX/VMS 分布式锁,可以更加精细地控制并发读和并发写的行为(也就是权限管理)。不过协议开销估计会……?客气点的说法是分布式不谈单机开销。另外,Linux 提供了读写锁的改进实现:顺序锁(sequence lock),它允许单写多读的并行,这一部分会留到延迟处理章节再做介绍。

最后提到作用域锁,也就是 C++ 的 RAII 锁封装。作者提到 lock_guard 这种作用域结束就解锁的做法(strict RAII)在日用时可以避免小错误,但是可能与层级锁的需求是冲突的,甚至对于基本的迭代器遍历上锁都是不友好的。除非使用改良的 unique_lock,但是(个人觉得)内部实现使用的控制位在这种场合又可能是多余的。总之半自动挡是不讨好的。

锁的实现

TAS lock 实现源码
typedef int xchglock_t;
#define DEFINE_XCHG_LOCK(n) xchglock_t n = 0

void xchg_lock(xchglock_t *xp)
{
    while (xchg(xp, 1) == 1) {
        // 避免多次 full barrier 造成 cacheline bouncing
        while (READ_ONCE(*xp) == 1)
            continue;
    }
}

void xchg_unlock(xchglock_t *xp)
{
    // 暗示 full barrier,避免临界区的执行越过该函数
    // 也可使用 smp_mb() + WRITE_ONCE() 替代
    (void)xchg(xp, 0);
}

上面是一个简单的通过原子操作完成的 TAS (test-and-set) lock 实现,即使是工业级的自旋锁也是类似的操作。这种实现在低争用时表现良好,并且有内存体积小的优势。代码注释也标记了需要考虑的点:乱序执行和乒乓效应。

其它基于原子操作实现的锁有很多,主要是看场合。

ticket lock 实现源码
/* 从 Linux 内核 v3.4 截获的代码,有部分精简 */

// spinlock 拆分为 head 和 tail 两部分
typedef struct arch_spinlock {
    struct __raw_tickets {
        __ticket_t head, tail;
    } tickets;
} arch_spinlock_t;

/*
 * Ticket locks are conceptually two parts, one indicating the current head of
 * the queue, and the other indicating the current tail. The lock is acquired
 * by atomically noting the tail and incrementing it by one (thus adding
 * ourself to the queue and noting our position), then waiting until the head
 * becomes equal to the the initial value of the tail.
 *
 * We use an xadd covering *both* parts of the lock, to increment the tail and
 * also load the position of the head, which takes care of memory ordering
 * issues and should be optimal for the uncontended case. Note the tail must be
 * in the high part, because a wide xadd increment of the low part would carry
 * up and contaminate the high part.
 */
void __ticket_spin_lock(arch_spinlock_t *lock)
{
    register struct __raw_tickets inc = { .tail = 1 };

    // 「总号数」(tail)原子自增一,并得到自己的「号」(head)
    // 这里 xadd 同时处理了 tail 更新 和 head 获取
    inc = xadd(&lock->tickets, inc);

    // 在 head 处自旋,直到「叫号」
    for (;;) {
        if (inc.head == inc.tail)
            break;
        cpu_relax();
        inc.head = ACCESS_ONCE(lock->tickets.head);
    }
    barrier();
}

void __ticket_spin_unlock(arch_spinlock_t *lock)
{
    // head 原子自增一,表示可以下一位
    __add(&lock->tickets.head, 1, UNLOCK_LOCK_PREFIX);
}

比如上面的做法在高争用时(因不考虑公平性)会造成饥饿,所以内核提供了 ticket lock 实现解决这个问题(类似排号系统)。但是这种锁也有问题,它为了公平甚至能把锁交给当前不能用(被抢占或中断)的线程,不过作者说也不用太纠结这种问题。(现在用不了过一会就能用?不太确定表意)

MCS lock 实现源码
/* 从 Linux 内核 v6.4 截获的代码,有部分精简 */

struct mcs_spinlock {
    struct mcs_spinlock *next;
    int locked; /* 1 if lock acquired */
};

void mcs_spin_lock(struct mcs_spinlock **lock, struct mcs_spinlock *node)
{
    struct mcs_spinlock *prev;

    node->locked = 0;
    node->next   = NULL;

    // 原子更新 *lock = node
    // 这里也暗示了 full barrier,避免前面的初始化操作乱序
    prev = xchg(lock, node);
    if (likely(prev == NULL)) {
        // 快速路径,甚至连 locked = 1 都不需要设置
        // 具体见 unlock 操作
        return;
    }
    // node 实际被连接到最后面,因此也可以满足 FIFO 的要求
    WRITE_ONCE(prev->next, node);

    // 在本地的 node->locked 字段上自旋,直到设为 1
    arch_mcs_spin_lock_contended(&node->locked);
}

void mcs_spin_unlock(struct mcs_spinlock **lock, struct mcs_spinlock *node)
{
    struct mcs_spinlock *next = READ_ONCE(node->next);

    // 快速路径,表示当前无争用
    if (likely(!next)) {
        // 设置 *lock 为 NULL 即可表示释放锁
        if (likely(cmpxchg_release(lock, node, NULL) == node))
            return;
        // 如果上面的操作失败,那就说明在 if 和 cmpxchg 之间有新的 lock() 请求
        // (这种情况因为 *lock 的变动而被检测出来,具体见上面的 lock 操作)
        // 也就是说 node 后续有新的节点产生,需要更新 next 以复用下面的自旋操作
        while (!(next = READ_ONCE(node->next)))
            cpu_relax();
    }

    // 给 next->locked 设为 1
    // 所谓的解锁甚至不需考虑当前 node->locked 字段
    arch_mcs_spin_unlock_contended(&next->locked);
}

另一个角度是性能,不管是 TAS lock 还是 ticket lock,它们的共同点都是在单个内存地址上做更新(自旋)。低争用时,这种更新依然可通过硬件回写实现的,所以 cacheline 还能停留在本地;而高争用时 cacheline 大概率是被清掉了,这就是争用上来就掉性能的原因。所以这又怎么了?这提示了 Linux 内核的 MCS lock 的实现。MCS lock 的诀窍是只在本地 CPU 变量上自旋,而自旋结构体会通过链表的方式连接起来(这点非常巧妙,只有链表中相邻的 CPU 才能彼此竞争),所以在极端情况也只有 2 个 CPU 在竞争同一个地址。具体看代码注释。

NOTE:「cacheline 还能停留在本地」原文是 The corresponding cache line is very likely still local to and writeable by the thread holding the lock。个人猜测作者进一步的意思是当没别人争用这个 cacheline 时可以靠异步回写操作慢慢来(MESI 协议的 writeback message,主存是必须写进去的,但是对端 cacheline 也允许广播写入,这是个可选项),但是有人争用(需要最新消息!)就得立刻同步刷新了(read invalidate message 或者 read message,前者是写操作或者 RMW 发出的,不仅刷新对方还要刷走自己)。凭印象写的,还没对照资料,要是写错了请羞辱我。

进一步的改进点可以是内存占用。MCS lock 的结构体大小比 ticket lock 大了两倍不止,因此还在 MCS lock 的基础上做了 qspinlock 改进。实现原理类似,但是把存储地址改为了存储 CPU 编号,从而压缩至四字节。

最后作者提到:使用非原子操作也可以实现锁!不过目前这种技巧在生产上的实践比较少。

数据的所有权

第八章可以跳过了,因为内容说得比较少,相关的技巧可以从前面的章节中总结得出:

  1. 数据完全私有。也就是不需要同步,例子可参考分区和同步一章。
  2. 数据部分共享。例子见 per-thread 计数器:跨线程的数据只读不写。
  3. 数据间接共享。例子见 signal-theft 计数器:通过信号或其他消息机制来协助收割数据。
  4. 数据只能被指定的线程访问。解释比较麻烦,总之最终一致性计数器的下推线程是一个例子。
  5. 数据从共享转为私有。作者提的例子有点抽象,个人觉得以分配器的批处理为例更好理解一些。

延迟处理

早期 BSD 路由表的实现
struct route_entry {
    struct cds_list_head re_next;
    unsigned long addr;
    unsigned long iface;
};

CDS_LIST_HEAD(route_list);

unsigned long route_lookup(unsigned long addr)
{
    struct route_entry *rep;
    unsigned long ret;

    cds_list_for_each_entry(rep, &route_list, re_next) {
        if (rep->addr == addr) {
            ret = rep->iface;
            return ret;
        }
    }

    return ULONG_MAX;
}

int route_add(unsigned long addr, unsigned long interface)
{
    struct route_entry *rep;

    rep = malloc(sizeof(*rep));
    if (!rep)
        return -ENOMEM;

    rep->addr = addr;
    rep->iface = interface;
    cds_list_add(&rep->re_next, &route_list);

    return 0;
}

int route_del(unsigned long addr)
{
    struct route_entry *rep;

    cds_list_for_each_entry(rep, &route_list, re_next) {
        if (rep->addr == addr) {
            cds_list_del(&rep->re_next);
            free(rep);
            return 0;
        }
    }

    return -ENOENT;
}

第九章比较硬核,介绍延迟处理技术:比如引用计数、风险指针和 RCU。并且使用这些技术给 BSD 路由表提供并行支持。BSD 路由表的实现非常朴素,就是添加、删除和按照地址查找接口。

引用计数

基于引用计数的路由表实现(有问题)
struct route_entry {
    // 多了一个引用计数字段
    atomic_t re_refcnt;
    struct route_entry *re_next;
    unsigned long addr;
    unsigned long iface;
    // 检测 use-after-free 标记
    int re_freed;
};

struct route_entry route_list;
DEFINE_SPINLOCK(routelock);

static void re_free(struct route_entry *rep)
{
    // 这是用来检测是否执行过 free 的标记
    WRITE_ONCE(rep->re_freed, 1);
    // 不应该真的 free(),因为 rep->re_freed 不能被释放
    // 在这里只是作为一个实现参考,具体可以翻邮件列表的讨论
    // 不过那里的话题已经聊到大气层去了,不贴地址了
    free(rep);
}

unsigned long route_lookup(unsigned long addr)
{
    int old;
    int new_val;
    struct route_entry *rep;
    struct route_entry **repp;
    unsigned long ret;

retry:
    repp = &route_list.re_next;
    rep = NULL;
    do {
        // 减少前一位路由项的引用计数
        // 如果引用计数减为 0,则释放 rep
        if (rep && atomic_dec_and_test(&rep->re_refcnt)) {
            re_free(rep);
        }
        rep = READ_ONCE(*repp);
        if (rep == NULL) {
            return ULONG_MAX;
        }
        do {
            // 检查这个程序是否有问题(确实有问题)
            if (READ_ONCE(rep->re_freed)) {
                abort();
            }
            old = atomic_read(&rep->re_refcnt);
            if (old <= 0) {
                goto retry;
            }
            new_val = old + 1;
        // 引用计数增加
        } while (atomic_cmpxchg(&rep->re_refcnt, old, new_val) != old);
        // 前进一位,这里有引用计数,按理说不应该解引用会失败
        // 前进后,回到上面的 do-while 头部,减去引用计数
        repp = &rep->re_next;
    } while (rep->addr != addr);
    ret = rep->iface;
    if (atomic_dec_and_test(&rep->re_refcnt)) {
        re_free(rep);
    }
    return ret;
}

int route_add(unsigned long addr, unsigned long interface)
{
    struct route_entry *rep;

    rep = malloc(sizeof(*rep));
    if (!rep)
        return -ENOMEM;

    // 添加时引用计数初始化为 1
    atomic_set(&rep->re_refcnt, 1);
    rep->addr = addr;
    rep->iface = interface;
    spin_lock(&routelock);
    rep->re_next = route_list.re_next;
    rep->re_freed = 0;
    route_list.re_next = rep;
    spin_unlock(&routelock);

    return 0;
}

int route_del(unsigned long addr)
{
    struct route_entry *rep;
    struct route_entry **repp;

    spin_lock(&routelock);
    repp = &route_list.re_next;
    for (;;) {
        rep = *repp;
        if (rep == NULL)
            break;
        if (rep->addr == addr) {
            *repp = rep->re_next;
            spin_unlock(&routelock);
            if (atomic_dec_and_test(&rep->re_refcnt)) {
                re_free(rep);
            }
            return 0;
        }
        repp = &rep->re_next;
    }
    spin_unlock(&routelock);
    return -ENOENT;
}

引用计数的思路和实现都比较简单,直接看代码就好。总之想表达的意思是:只要我在确保执行下一步前通过 CAS 获得新的引用计数,那就能确保路由表查询过程中不会解引用失败。

但是只要多线程执行 lookup 和 delete 就会曝出 use-after-free 问题,因为获取指针和增加引用两个操作并非原子。例子:线程 A 在 lookup 获取了 rep,然后被调度器拉走;线程 B 在 del 处理同一地址的 rep(引用计数为 1,减到 0 被回收);线程 A 被调度器唤起,访问 rep->...,解引用失败。

bsd-routing-table-refcnt 注意地板藏着一条直线(没想到吧)

另外作者还指出了引用计数的性能问题(垫底😓),原因就在于原子操作,已经提过很多次了。

场景的选择:想要安全的回收,可以考虑风险指针;想要更合理的性能,可以考虑顺序锁;想全部都要,可以考虑 RCU。不过我个人还是留有疑问,既然引用计数被贬低到这种程度,那为什么 Linux 的文件操作还是基于引用计数?

风险指针

风险指针的实现源码
/********** record 和 clear **********/

// record:将 *p 记录到风险指针
static inline void *_h_t_r_impl(void **p, hazard_pointer *hp)
{
    void *tmp;

    tmp = READ_ONCE(*p);
    if (!tmp || tmp == (void *)HAZPTR_POISON)
        return tmp;

    WRITE_ONCE(hp->p, tmp);
    smp_mb();
    // 保证 *p 和 hp->p 的一致性,当前无法保证就返回 HAZPTR_POISON
    // 这是非原子的情况下保证一致性的一种手段(重试解决一切问题)
    if (tmp == READ_ONCE(*p))
        return tmp;

    // 表示未达成一致或者已删除
    return (void *)HAZPTR_POISON;
}

#define hp_try_record(p, hp) _h_t_r_impl((void **)(p), hp)

// record 的忙循环版本
static inline void *hp_record(void **p, hazard_pointer *hp)
{
    void *tmp;

    do {
        tmp = hp_try_record(p, hp);
    } while (tmp == (void *)HAZPTR_POISON);

    return tmp;
}

// clear:清理风险指针
static inline void hp_clear(hazard_pointer *hp)
{
    smp_mb();
    WRITE_ONCE(hp->p, NULL);
}

/********** scan 和 free **********/

// 用于二分查找加速 scan 阶段,你也可以不使用二分
int compare(const void *a, const void *b)
{
    return (*(hazptr_head_t **)a - *(hazptr_head_t **)b);
}

void hazptr_scan()
{
    hazptr_head_t *cur;
    int i;
    hazptr_head_t *tmplist;
    hazptr_head_t **plist = gplist;
    unsigned long psize;

    // 动态 plist 初始化
    if (plist == NULL) {
        // 这里的大写字母应该是从论文拿过来的命名
        // K:per-thread 需要的风险指针个数
        psize = sizeof(hazptr_head_t *) * K * NR_THREADS;
        // pointer list:存放风险指针的链表
        plist = (hazptr_head_t **)malloc(psize);
        BUG_ON(!plist);
        gplist = plist;
    }
    smp_mb();
    psize = 0;

    // H:全局的风险指针个数
    // 扫描所有线程的风险指针
    for (i = 0; i < H; i++) {
        // 书里缺少 HP[] 初始化了,应该是所有线程的所有风险指针都连到 HP 上(只读所有权)
        uintptr_t hp = (uintptr_t)READ_ONCE(HP[i].p);

        if (!hp)
            continue;

        plist[psize++] = (hazptr_head_t *)(hp & ~0x1UL);
    }
    // 确保初始化不会乱序
    smp_mb();
    // 按地址排序,用于后续二分查找
    qsort(plist, psize, sizeof(hazptr_head_t *), compare);
    // rlist:表示 retire list,要(延迟)回收的指针存放在此
    tmplist = rlist;
    rlist = NULL;
    rcount = 0;

    while (tmplist != NULL) {
        cur = tmplist;
        tmplist = tmplist->next;

        // 如果当前指针存放于任意风险指针当中,则放回到 retire list,下次再处理
        if (bsearch(&cur, plist, psize, sizeof(hazptr_head_t *), compare)) {
            cur->next = rlist;
            rlist = cur;
            rcount++;
        // 没有风险指针在指涉,you are free to free()
        } else {
            // 书里没给出实现,但至少在 free() 前还要处理 container_of
            hazptr_free(cur);
        }
    }
}

// 回收命令,会缓存退役结点,确保在合适安全的时机再真正回收
void hazptr_free_later(hazptr_head_t *n)
{
    // 侵入式结点 n 放入到 retire list 中
    n->next = rlist;
    rlist = n;
    rcount++;

    // 批处理,攒满后执行 scan 操作
    if (rcount >= R) {
        hazptr_scan();
    }
}

风险指针(hazard pointer)是一种 SMR(安全内存回收)技术,可以直接解决上面引用计数的回收问题。其实我个人觉得风险指针就是一个不计数的引用计数。

实现有点复杂,但是思路很简单:

  • 只要我确保自己访问数据没有风险,那肯定可以安全回收。
  • 怎样才算没有风险?那自然是没别人在使用它(没有风险指针指涉它)。
  • 怎样才知道没有使用?扫描所有线程的风险指针,没问题就真的回收。
  • 什么时候扫描?当你发出回收命令时,可能是批处理过程而不会立刻处理。
  • 回收检测到风险怎么办?缓存退役结点并延迟回收,下次扫描再处理。
基于风险指针的路由表实现
struct route_entry {
    struct hazptr_head hh;
    struct route_entry *re_next;
    unsigned long addr;
    unsigned long iface;
    int re_freed;
};

struct route_entry route_list;
DEFINE_SPINLOCK(routelock);
// per-thread 的风险指针(数组,但是这里实现固定为单个元素)
hazard_pointer __thread *my_hazptr;

unsigned long route_lookup(unsigned long addr)
{
    // 这里 offset 固定为 0,就是 record 用到的风险指针数组下标
    int offset = 0;
    struct route_entry *rep;
    struct route_entry **repp;

retry:
    repp = &route_list.re_next;
    do {
        // (try) record 请求存放到风险指针提供保护
        // 重复的 record 会覆盖掉当前线程、相同 offset 的上一次保护
        // 所以后面没有用到 clear
        //
        // 作者提醒的关键点:
        // * 不要使用 record,隐藏的忙循环会死得很惨
        // * 应该使用 try record,并且失败时要完全重新遍历
        rep = hp_try_record(repp, &my_hazptr[offset]);
        if (!rep)
            return ULONG_MAX;
        if ((uintptr_t)rep == HAZPTR_POISON)
            goto retry;
        // cool,这下访问没问题了
        repp = &rep->re_next;
    } while (rep->addr != addr);
    if (READ_ONCE(rep->re_freed))
        abort();
    return rep->iface;
}

int route_add(unsigned long addr, unsigned long interface)
{
    struct route_entry *rep;

    rep = malloc(sizeof(*rep));
    if (!rep)
        return -ENOMEM;

    rep->addr = addr;
    rep->iface = interface;
    rep->re_freed = 0;
    spin_lock(&routelock);
    rep->re_next = route_list.re_next;
    route_list.re_next = rep;
    spin_unlock(&routelock);

    return 0;
}

int route_del(unsigned long addr)
{
    struct route_entry *rep;
    struct route_entry **repp;

    spin_lock(&routelock);
    repp = &route_list.re_next;
    for (;;) {
        rep = *repp;
        if (rep == NULL)
            break;
        if (rep->addr == addr) {
            *repp = rep->re_next;
            rep->re_next = (struct route_entry *)HAZPTR_POISON;
            spin_unlock(&routelock);
            // 回收命令
            hazptr_free_later(&rep->hh);
            return 0;
        }
        repp = &rep->re_next;
    }
    spin_unlock(&routelock);
    return -ENOENT;
}

所以要在临界区的前面施放 record 以确保不会被回收,后面完全不需要风险指针保护再施放 clear。

需要注意作者提到的一个惯用法:使用 try record 而非 record 版本,并且当失败时,直接从头遍历。这涉及到正确性。假如上面的路由表实现 lookup 内部从 try record 改为了 record,当线程 A 访问了 rep 时,线程 B 删除了 rep->re_next 并重设为 HAZPTR_POISON(这里 rep 已有风险指针保护,但是 rep->re_next 没有风险指针保护,立刻回收也是安全合理的),因此线程 A 在遍历下一个元素 rep = rep->re_next = HAZPTR_POISON 时再使用 record(HAZPTR_POISON) 会导致无限循环。结论是:风险指针只保护了数据的回收,但是不关心数据的修改。觉得复杂就记住上面的惯用法。

NOTE: 在 facebook.folly 的风险指针实现中,用到的术语会有所不同:record = protect,clear = reset_hazard(nullptr),free_later = retire。folly 的实现版本由风险指针的发明者维护。

顺序锁

顺序锁(sequence lock)类似读写锁,但是可以保证单个写者和多个读者并行处理。

顺序锁的实现源码
typedef struct {
    unsigned long seq;
    spinlock_t lock;
} seqlock_t;

static inline void seqlock_init(seqlock_t *slp)
{
    slp->seq = 0;
    spin_lock_init(&slp->lock);
}

static inline unsigned long read_seqbegin(seqlock_t *slp)
{
    unsigned long s;

    s = READ_ONCE(slp->seq);
    smp_mb();
    // 做了点小优化,后续 seqretry 不需每次区分奇偶判断
    return s & ~0x1UL;
}

static inline int read_seqretry(seqlock_t *slp, unsigned long oldseq)
{
    unsigned long s;

    smp_mb();
    s = READ_ONCE(slp->seq);
    return s != oldseq;
}

static inline void write_seqlock(seqlock_t *slp)
{
    spin_lock(&slp->lock);
    ++slp->seq;
    smp_mb();
}

static inline void write_sequnlock(seqlock_t *slp)
{
    smp_mb();
    ++slp->seq;
    spin_unlock(&slp->lock);
}

思路很简单,初给定一个偶数序列号,更新过程中 +1 设为奇数,更新完成再 +1 设回不同的偶数。读者在读操作时获取序列号快照并对比,如果存在任意奇数或者两个不同的偶数,那说明当前处于读不一致的状态(并发更新中),需要丢弃读结果和再次重试。

NOTES:

  • 顺序锁需要 spinlock 用于支持多写者的互斥。
  • 个人觉得 sequence lock 应该命名为序列锁而不是顺序锁,因为序列指的是它的实现,跟顺序又没啥关系。但是民间翻译早就形成习惯了。

虽然读写锁写时需要排斥读,但是顺序锁也是读了就得抛弃,那为什么这么做?个人臆测:

  1. 写者至少不会饥饿。尤其这种锁的场合本身是数量上严重倾斜到读者,必须处理写饥饿。
  2. 读者可以偷鸡(投机)。写会造成 cache miss 没错,但是不一定所有的数据都会 cache miss。假设前一次偷鸡成功,只有 20% cache miss,那下一次重试就能劲省 80% 的访存同步成本。
基于顺序锁的路由表实现(有问题)
struct route_entry {
    struct route_entry *re_next;
    unsigned long addr;
    unsigned long iface;
    int re_freed;
};

struct route_entry route_list;
DEFINE_SEQ_LOCK(sl);

unsigned long route_lookup(unsigned long addr)
{
    struct route_entry *rep;
    struct route_entry **repp;
    unsigned long ret;
    unsigned long s;

retry:
    s = read_seqbegin(&sl);
    repp = &route_list.re_next;
    do {
        rep = READ_ONCE(*repp);
        if (rep == NULL) {
            if (read_seqretry(&sl, s))
                goto retry;
            return ULONG_MAX;
        }
        // 还是老问题,没有原子地判断非空且访问字段
        repp = &rep->re_next;
    } while (rep->addr != addr);
    if (READ_ONCE(rep->re_freed))
        abort();
    ret = rep->iface;
    if (read_seqretry(&sl, s))
        goto retry;
    return ret;
}

int route_add(unsigned long addr, unsigned long interface)
{
    struct route_entry *rep;

    rep = malloc(sizeof(*rep));
    if (!rep)
        return -ENOMEM;

    rep->addr = addr;
    rep->iface = interface;
    rep->re_freed = 0;
    write_seqlock(&sl);
    rep->re_next = route_list.re_next;
    route_list.re_next = rep;
    write_sequnlock(&sl);

    return 0;
}

int route_del(unsigned long addr)
{
    struct route_entry *rep;
    struct route_entry **repp;

    write_seqlock(&sl);
    repp = &route_list.re_next;
    for (;;) {
        rep = *repp;
        if (rep == NULL)
            break;
        if (rep->addr == addr) {
            *repp = rep->re_next;
            write_sequnlock(&sl);
            smp_mb();
            rep->re_freed = 1;
            free(rep);
            return 0;
        }
        repp = &rep->re_next;
    }
    write_sequnlock(&sl);
    return -ENOENT;
}

需要注意的是,偷鸡要先确保鸡是存在的,如果写者并行地把鸡给 free 掉,那就会偷鸡不成蚀把米,造成解引用失败。也就是说,顺序锁依然是非安全的内存回收,上面的路由表实现存在着和引用计数相同的问题。

如果不考虑安全回收,顺序锁的扩展性优于风险指针,风险指针的扩展性优于引用计数。

RCU

由于 paul 大神就是 RCU 的发明者,所以……这一节的篇幅特别长,需要花点时间做心理准备。

WIP

!!!!! WORK IN PROGRESS !!!!!

我觉得目前的阅读笔记算是覆盖面足够广了,但是深度和内容分量都还不到这本书的一半。补充完 RCU 以后,更新速度会放缓,有新的内容也会直接在本文更新。

最后更新于:2024/08/22