CAS 与 无锁队列

CAS 与 无锁队列

假设我们有一全局变量 idx , 在执行 idx++ 时编译器会翻译为三条汇编指令

1
2
3
mov [idx] , %eax
inc %eax
mov %eax , [idx]

当三条汇编语句紧挨着执行时,idx++ 是保证正确的,但是当存在多个线程时 idx++ 的正确性就有待考证了如:

线程一 线程二
mov [idx] , %eax
inc %eax
mov [idx] , %eax
inc %eax
mov %eax , [idx]
mov %eax , [idx]

当线程一和线程二按照这样的方式执行时,虽然在各自的线程中都执行了 idx++ 但是 idx 的结果只增加了一次 , 解决的方法可以是为临界区代码段添加互斥锁或自旋锁,对于互斥锁:当资源已被加锁时将会切换线程而自旋锁不会,自旋锁将在此线程上一直等待直到资源可用为止,二者的选择需要从资源的竞争程度与线程切换的开销方面考虑,这并不是这篇文章的重点,我们要写一个大多数人未知的东西(至少我是这样的)原子操作

汇编实现原子操作:

“最轻量级的锁”,通常也叫”原子操作”,之所以加引号是因为他们在汇编级别并不是原子操作,是用多条指令完成的,这些操作大多都是利用CPU支持的汇编指令

最常见的原子操作有Compare and Exchange,Self Increase/Decrease等等

80486 CPU 相关指令:

LOCK:这是一个指令前缀,在所对应的指令操作期间使此指令的目标操作数指定的存储区域锁定,以得到保护。

XADD:先交换两个操作数的值,再进行算术加法操作。多处理器安全,在80486及以上CPU中支持。

CMPXCHG:比较交换指令,第一操作数先和AL/AX/EAX比较,如果相等ZF置1,第二操作数赋给第一操作数,否则ZF清0,第一操作数赋给AL/AX/EAX。多处理器安全,在80486及以上CPU中支持。

XCHG:交换两个操作数,其中至少有一个是寄存器寻址.其他寄存器和标志位不受影响.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/*
Dump of assembler code for function _Z3incPii:
0x0000555555555229 <+0>: endbr64
0x000055555555522d <+4>: push %rbp
0x000055555555522e <+5>: mov %rsp,%rbp
0x0000555555555231 <+8>: mov %rdi,-0x18(%rbp)
0x0000555555555235 <+12>: mov %esi,-0x1c(%rbp)
0x0000555555555238 <+15>: mov -0x18(%rbp),%rdx
0x000055555555523c <+19>: mov -0x1c(%rbp),%eax
0x000055555555523f <+22>: lock xadd %eax,(%rdx)
0x0000555555555243 <+26>: mov %eax,-0x4(%rbp)
0x0000555555555246 <+29>: mov -0x4(%rbp),%eax
0x0000555555555249 <+32>: pop %rbp
0x000055555555524a <+33>: ret
*/
int inc(int *value , int add) {
int old;
__asm__ volatile (
"lock; xaddl %2 , %1"
: "=a" (old)
: "m" (*value) , "a" (add)
: "cc" , "memory"
);
return old;
}

在c++中atomic类提供了 cas 原子操作的方法如比较交互std::atomic<T>::compare_exchange_strong

cpu 亲和性

硬亲和性(affinity):简单来说就是利用 linux 内核提供给用户的 API,强行将进程或者线程绑定到某一个指定的 cpu 核运行。

在 Linux 内核中,所有的进程都有一个相关的数据结构,称为 task_struct 这个结构非常重要,原因有很多;其中与亲和性(affinity)相关度最高的是 cpus_allowed 位掩码。这个位掩码由 n 位组成,与系统中的n个逻辑处理器一一对应。 具有 4 个物理 CPU 的系统可以有 4 位。如果这些 CPU 都启用了超线程,那么这个系统就有一个 8 位的位掩码。

如果为给定的进程设置了给定的位,那么这个进程就可以在相关的 CPU 上运行。因此,如果一个进程可以在任何 CPU 上运行,并且能够根据需要在处理器之间进行迁移,那么位掩码就全是 1。实际上,这就是 Linux 中进程的缺省状态。

Linux 内核 API 提供了一些方法,让用户可以修改位掩码或查看当前的位掩码:

1
2
3
4
5
6
7
   #define _GNU_SOURCE             /* See feature_test_macros(7) */
#include <sched.h>
int sched_setaffinity(pid_t pid, size_t cpusetsize , const cpu_set_t *mask);
/*sched_setaffinity()将ID为pid的线程的CPU关联掩码设置为掩码指定的值。
如果pid为零,则使用调用线程。 参数cpusetsize是掩码指向的数据的长度(字节)。 通常这个参数会被指定为sizeof(cpu_set_t)。*/
int sched_getaffinity(pid_t pid, size_t cpusetsize , cpu_set_t *mask);
/*sched_getaffinity()将ID为pid的线程的关联掩码写入掩码指向的cpu_set_t结构中。 cpusetsize参数指定掩码的大小(以字节为单位)。 如果pid为零,则返回调用线程的掩码。*/

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
void process_affinity(int num) {                // 设置进程 cpu 亲和性
long int tid = syscall(__NR_gettid);
std::cout << "tid : " << tid << std::endl;
cpu_set_t mask;
CPU_ZERO(&mask);
CPU_SET(tid % num , &mask);
sched_setaffinity(tid , sizeof mask , &mask);
}
// 设置了进程的亲和性并不保证进程中创建的新线程在此cpu上运行,只能保证进程中的主线程在cpu上运行
// 本质就是 gettid,在对应cpu上执行对应 tid 的执行流

void process_affinity(int num) { // 设置线程 cpu 亲和性
std::cout << "tid : " << tid << std::endl;
cpu_set_t mask;
CPU_ZERO(&mask);
CPU_SET(tid % num , &mask);
pthread_setaffinity_np(pthread_self() , sizeof mask , &mask);
}

void func(int num) {
process_affinity(num);
while (1) {}
}

int main() {
int num = sysconf(_SC_NPROCESSORS_CONF);
std::cout << num << std::endl;
for(int i=0 ; i<num/2 ; ++i) {
std::thread t(func , num);
t.detach();
}
usleep(1000000000);
}

通过 htop 可以看到在 usleep 结束之前有两个 cpu 的占用率是 100% 而其他线程占用几乎为 0,这说明两个线程分别绑定到了两个逻辑核心上

补充:

main 函数结束后相当于调用 exit ,这将导致整个进程结束,意味着其他线程如 detach 后的线程也跟着结束,由于这里尝试多次没有看到希望的效果,当然也可以使用 posix_exit() 令主线程退出,而不影响其他线程

无锁队列(zeromq)

为什么需要无锁队列:

Cache 损坏
拿互斥锁来说,采用休眠等待,线程被频繁抢占产生的Cache损坏将导致应用程序性能下降。

在同步机制上的争抢队列
由于锁机制,当资源争取发生的频率很高时,任务将大量的时间 (睡眠,等待,唤醒)浪费在获得保护队列数据的互斥锁,而不是处理队列中的数据上。

动态内存分配
当一个任务从堆中分配内存时,标准的内存分配机制会 阻塞所有与这个任务共享地址空间的其它任务(进程中的所有线程)

提供的原子操作

atom_op.h

1
2
3
4
5
6
7
8
9
10
	#define CAS(a_ptr, a_oldVal, a_newVal) __sync_bool_compare_and_swap(a_ptr, a_oldVal, a_newVal)
#define AtomicAdd(a_ptr,a_count) __sync_fetch_and_add (a_ptr, a_count)
#define AtomicSub(a_ptr,a_count) __sync_fetch_and_sub (a_ptr, a_count)
#include <sched.h> // sched_yield()

对应函数原型:
bool __sync_bool_compare_and_swap (T* __p, U __compVal, V __exchVal, ...);
此函数将 __compVal 的值与 __p 指向的变量的值进行比较。 如果它们相等,那么 __exchVal 的值将存储在 __p指定的地址中; 否则,不会执行任何操作,如果 __compVal 的值与 __p 指向的变量的值相等,那么该函数返回 true; 否则,返回 false
T __sync_fetch_and_add ( T * __p , U __v , ...);
该函数以原子方式将__v的值添加到__p指向的变量中。结果存储在__p指定的地址中,该函数返回__p指向的变量的初始值。

atomic_ptr.hpp 中对模板类型参数T的指针提供了原子操作,那其中一条进行分析

1
2
3
4
5
6
7
8
9
10
11
12
inline T *cas (T *cmp_, T *val_)    // if(cmp == ptr) 则 ptr=val 返回旧值 , 不相等直接返回ptr
{
T *old;
__asm__ volatile (
"lock; cmpxchg %2, %3"
: "=a" (old), "=m" (ptr) // 输出操作数:old绑定到EAX寄存器,ptr绑定到内存位置
: "r" (val_), "m" (ptr), "0" (cmp_) // 输入操作数:val_可以使用任意寄存器,ptr绑定到内存位置,cmp_绑定到EAX寄存器
: "cc"); // 声明修改条件码寄存器
return old;
}
private:
volatile T *ptr;

无锁队列的实现(单读单写)

yqueue 数据结构

yqueue是一种高效的队列实现。 主要目标是最大限度地减少所需的分配/解除分配次数。 因此,yqueue以N批为单位分配/解除分配元素。

1
2
template <typename T, int N>
class yqueue_t;

yqueue 内部是由一个个 chunk_t 组成的,使用双向链表的数据结构

1
2
3
4
5
6
struct chunk_t
{
T values[N]; //每个chunk_t可以容纳N个T类型的元素,以后就以一个chunk_t为单位申请内存
chunk_t *prev;
chunk_t *next;
};

chunk_t的组织方式

ypipe

ypipe_t在yqueue_t的基础上构建一个单写单读的无锁队列

ypipe内维护的成员变量:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
protected:
yqueue_t<T, N> queue;

// Points to the first un-flushed item. This variable is used
// exclusively by writer thread.
T *w; //指向第一个未刷新的元素,只被写线程使用 要从哪里刷

// Points to the first un-prefetched item. This variable is used
// exclusively by reader thread.
T *r; //指向第一个还没预提取的元素,只被读线程使用

// Points to the first item to be flushed in the future.
T *f; //指向下一轮要被刷新的一批元素中的第一个 要刷到哪里

// The single point of contention between writer and reader thread.
// Points past the last flushed item. If it is NULL,
// reader is asleep. This pointer should be always accessed using
// atomic operations.
atomic_ptr_t<T> c; //读写线程共享的指针,指向每一轮刷新的起点。当c为空时,表示读线程睡眠(只会在读线程中被设置为空)

构造函数:

1
2
3
4
5
6
7
8
9
10
inline ypipe_t()
{
// Insert terminator element into the queue.
queue.push(); //yqueue_t的尾指针加1,开始back_chunk为空,现在back_chunk指向第一个chunk_t块的第一个位置

// Let all the pointers to point to the terminator.
// (unless pipe is dead, in which case c is set to NULL).
r = w = f = &queue.back(); //就是让r、w、f、c四个指针都指向这个end迭代器
c.set(&queue.back());
}

write unwrite:写入可以单独写,也可以批量写。可以看到如果incomplete_ = true,则说明在批量写,直到incomplete_ = false时,进行写提交刷新 f 指针。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
//  Write an item to the pipe.  Don't flush it yet. If incomplete is
// set to true the item is assumed to be continued by items
// subsequently written to the pipe. Incomplete items are neverflushed down the stream.
// 写入数据,incomplete参数表示写入是否还没完成,在没完成的时候不会修改flush指针,即这部分数据不会让读线程看到。
inline void write(const T &value_, bool incomplete_)
{
// Place the value to the queue, add new terminator element.
queue.back() = value_;
queue.push();

// Move the "flush up to here" poiter.
if (!incomplete_)
{
f = &queue.back(); // 记录要刷新的位置
// printf("1 f:%p, w:%p\n", f, w);
}
else
{
// printf("0 f:%p, w:%p\n", f, w);
}
}

// Pop an incomplete item from the pipe. Returns true is such
// item exists, false otherwise.
inline bool unwrite(T *value_)
{
if (f == &queue.back())
return false;
queue.unpush();
*value_ = queue.back();
return true;
}

flush

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
//  Flush all the completed items into the pipe. Returns false if
// the reader thread is sleeping. In that case, caller is obliged to
// wake the reader up before using the pipe again.
// 刷新所有已经完成的数据到管道,返回false意味着读线程在休眠,在这种情况下调用者需要唤醒读线程。
// 批量刷新的机制, 写入批量后唤醒读线程;
// 反悔机制 unwrite
inline bool flush() {
// If there are no un-flushed items, do nothing.
if (w == f) // 不需要刷新,即是还没有新元素加入
return true;

// Try to set 'c' to 'f'.
// read时如果没有数据可以读取则c的值会被置为NULL
if (c.cas(w, f) != w) // 尝试将c设置为f,即是准备更新w的位置
{ // 进入此分支:在flush之前进行了 check_read但是失败了,c被置为 NULL
// Compare-and-swap was unseccessful because 'c' is NULL.
// This means that the reader is asleep. Therefore we don't
// care about thread-safeness and update c in non-atomic
// manner. We'll return false to let the caller know
// that reader is sleeping.
c.set(f); // 更新为新的f位置
w = f;
return false; //线程看到flush返回false之后会发送一个消息给读线程,这需要写业务去做处理
}
else // 读端还有数据可读取
{
// Reader is alive. Nothing special to do now. Just move
// the 'first un-flushed item' pointer to 'f'.
w = f; // 更新w的位置
return true;
}
}

以下参考文章中提供了多读多写的基于数据的无锁队列实现,以及理论分析

内联汇编:https://www.jianshu.com/p/1782e14a0766

cas 单例模式:https://blog.csdn.net/q5707802/article/details/79251491

说说无锁(Lock-Free)编程那些事 (yuque.com)

基于数组的无锁队列(译) - 知乎 (zhihu.com)