⚡C++ 并发编程

无锁编程基础

难度:⭐⭐⭐ | 高频指数:🔥🔥

面试回答

常见问法

  • 什么是无锁(lock-free)编程?和无等待(wait-free)有什么区别?
  • CAS 是什么?原理是什么?
  • 无锁栈/队列的基本思路是什么?
  • 什么是 ABA 问题?怎么解决?
  • 工程中什么时候该用无锁数据结构?

回答

无锁编程是一种不使用互斥锁来实现线程安全的并发编程方式。它的核心思想是利用原子操作(主要是 CAS)来保证数据一致性。

三个层次的并发保证:

  1. 阻塞(blocking):使用 mutex,线程可能被挂起等待。
  2. 无锁(lock-free):保证系统整体总有线程在推进,但某个线程可能暂时被”饿住”。
  3. 无等待(wait-free):保证每个线程都能在有限步内完成操作,最强但最难实现。

CAS(Compare-And-Swap)原理:

CAS 是无锁编程的基础原语。它的语义是: “如果当前值等于期望值,就把它改成新值;否则什么都不做,返回失败。”

// 伪代码
bool CAS(T* addr, T expected, T desired) {
    if (*addr == expected) {
        *addr = desired;
        return true;
    }
    return false;  // 被别人改过了,重试
}

在 C++ 中对应 std::atomic<T>::compare_exchange_weak/strong

无锁的典型模式:CAS 循环

void lock_free_increment(std::atomic<int>& counter) {
    int old_val = counter.load();
    while (!counter.compare_exchange_weak(old_val, old_val + 1)) {
        // old_val 被自动更新为当前值,继续重试
    }
}

追问

1. compare_exchange_weakcompare_exchange_strong 的区别? weak 版本允许伪失败(spurious failure),即使值相等也可能返回 false。它通常用在循环中,性能更好(在某些架构上)。strong 保证不会伪失败,适合只调用一次的场景。

2. 无锁一定比有锁快吗? 不一定。 无锁在低竞争场景下通常更快(避免了系统调用和上下文切换),但在高竞争场景下,CAS 循环可能导致大量重试,反而不如 mutex 的排队机制高效。

3. 无锁编程最大的难点是什么?

  • 正确性极难验证(很多 bug 只在特定时序下出现)
  • 内存序(memory order)选择复杂
  • ABA 问题
  • 内存回收问题(何时安全释放节点)

4. 工程中什么时候该用无锁?

  • 对延迟极度敏感的路径(如高频交易、音频处理)
  • 读多写少的场景
  • 已有成熟的无锁库可用时
  • 大多数业务代码不需要自己写无锁结构,用好 std::atomic 和标准库就够了

原理展开

1. CAS 的硬件支持

CAS 不是软件模拟的,它对应 CPU 的原子指令:

  • x86:CMPXCHG 指令
  • ARM:LDREX / STREX(LL/SC 模式)

这些指令保证了”比较+交换”是一个不可分割的原子操作。

2. 无锁栈的基本思路

无锁栈是最简单的无锁数据结构,基于单链表实现:

template <typename T>
class LockFreeStack {
    struct Node {
        T data;
        Node* next;
    };
    std::atomic<Node*> head{nullptr};

public:
    void push(const T& val) {
        Node* new_node = new Node{val, nullptr};
        new_node->next = head.load();
        // CAS:如果 head 还是 new_node->next,就把 head 改成 new_node
        while (!head.compare_exchange_weak(new_node->next, new_node)) {
            // 失败说明别人改了 head,new_node->next 被自动更新
            // 继续重试
        }
    }

    bool pop(T& result) {
        Node* old_head = head.load();
        while (old_head &&
               !head.compare_exchange_weak(old_head, old_head->next)) {
            // 重试
        }
        if (!old_head) return false;
        result = old_head->data;
        // 注意:这里 delete old_head 有 ABA 和内存回收问题!
        return true;
    }
};

核心思路:每次操作前读取当前状态,计算新状态,然后用 CAS 尝试提交。如果被别人抢先修改了,就重新读取再试。

3. 无锁队列的基本思路

Michael & Scott 无锁队列是经典实现,基于带哨兵节点的单链表:

  • 入队:CAS 修改 tail->next,然后 CAS 推进 tail
  • 出队:CAS 推进 head

难点在于入队是两步操作(链接节点 + 推进 tail),需要处理中间状态被其他线程观察到的情况。

4. ABA 问题

什么是 ABA:

线程 1 读到值 A,准备 CAS(A → C)。在它操作之前:

  • 线程 2 把 A 改成 B
  • 线程 3 又把 B 改回 A

线程 1 的 CAS 看到值还是 A,认为没人动过,操作成功。但实际上中间状态已经变化了,可能导致逻辑错误。

在无锁栈中的具体表现:

初始栈:A → B → C(head 指向 A)

线程 1:准备 pop,读到 head=A,next=B
线程 1:被挂起...

线程 2:pop A
线程 2:pop B
线程 2:push A(A 被重新放回栈顶,但 A->next 可能已经变了)

线程 1:恢复,CAS(head, A, B) 成功!
         但 B 可能已经被释放了!

解决方案:

  1. 带版本号的指针(Tagged Pointer): 把指针和一个递增计数器打包成一个原子值,每次修改计数器+1,这样即使指针值相同,版本号也不同。

    struct TaggedPtr {
        Node* ptr;
        uint64_t tag;  // 版本号,只增不减
    };
    std::atomic<TaggedPtr> head;
  2. Hazard Pointer: 每个线程公布自己正在访问的指针,其他线程在回收内存前检查是否有人还在用。

  3. RCU(Read-Copy-Update): 读者无锁访问,写者创建副本修改后原子替换,等所有旧读者退出后再回收旧数据。

  4. Epoch-Based Reclamation: 把时间分成 epoch,只有当所有线程都离开某个 epoch 后,该 epoch 中退役的节点才能被安全回收。

5. 内存序与无锁编程

无锁编程通常需要仔细选择内存序:

// 典型的无锁栈 push
new_node->next = head.load(std::memory_order_relaxed);
while (!head.compare_exchange_weak(
    new_node->next, new_node,
    std::memory_order_release,   // 成功时:保证之前的写对其他线程可见
    std::memory_order_relaxed    // 失败时:只需要读取最新值
)) {}

面试中如果被问到,可以说:

  • 默认用 memory_order_seq_cst(最安全)
  • 性能敏感路径才考虑放松内存序
  • 放松内存序极易出错,需要非常谨慎

6. 工程中的无锁实践

什么时候该用无锁

场景是否适合无锁原因
高频交易延迟敏感,不能容忍 mutex 的系统调用开销
音频/视频实时处理不能在音频线程上阻塞
简单计数器/标志位std::atomic 就够了
复杂业务逻辑正确性难保证,维护成本高
低竞争场景mutex 已经够快,无锁收益不大
需要多个变量原子更新CAS 只能原子操作一个值

工程建议

  1. 优先用 std::atomic 处理简单共享状态
  2. 需要无锁容器时,优先用成熟库(如 Folly、Boost.Lockfree、moodycamel::ConcurrentQueue)
  3. 自己写无锁结构是最后手段,需要大量测试和形式化验证
  4. 性能测量先行:先证明 mutex 是瓶颈,再考虑无锁

易错点

  • 认为”无锁 = 无等待 = 一定更快”,实际上无锁在高竞争下可能比 mutex 更慢。
  • 忽略 ABA 问题,直接对指针做 CAS 而不加版本号。
  • 忽略内存回收问题:pop 出来的节点什么时候能安全 delete?如果有其他线程还持有指向它的指针就会 crash。
  • 在无锁代码中使用 memory_order_relaxed 但没有仔细分析是否安全。
  • 把”用了 std::atomic”等同于”实现了无锁数据结构”。atomic 只是工具,无锁的正确性需要算法设计保证。
  • 面试时说”我会写无锁队列”但说不清 ABA 和内存回收,反而减分。

记忆技巧

  • 三层递进:阻塞(mutex)→ 无锁(系统推进)→ 无等待(每个线程推进)
  • CAS 口诀:读-算-试,失败就重来
  • ABA 一句话:值没变不代表中间没被动过
  • 工程判断:先量化瓶颈,再决定是否无锁;优先用库,最后才自己写
  • 面试安全答法
    1. 解释 CAS 原理
    2. 画一个无锁栈的 push 流程
    3. 主动提 ABA 问题和解决方案
    4. 强调”工程中优先用成熟库”

面试速答版

无锁编程是不使用互斥锁、而是通过原子操作(主要是 CAS)来实现线程安全的并发方式。CAS 的语义是”如果当前值等于期望值就更新,否则重试”,对应 C++ 的 compare_exchange_weak/strong。无锁保证系统整体总有线程在推进,但不保证每个线程都能立即完成。典型的无锁栈用 CAS 修改链表头指针实现 push/pop。主要难点是 ABA 问题(值相同但中间被改过)和内存回收问题,解决方案包括带版本号的指针、Hazard Pointer、Epoch-Based Reclamation 等。工程中,大多数场景 mutex 就够用,只有对延迟极度敏感的路径才考虑无锁,且优先使用成熟的无锁库而不是自己实现。

Related · 并发编程