NJUOS:多处理器编程

http://jyywiki.cn/OS/2022/slides/3.slides#

thread.h 简化的线程 API

进程是资源分配的基本单位,linux线程其实是通过轻量级进程实现的LWP(light weight process),所有Linux内核的角度去看线程和进程并没有区别,只不过他和主线程共享一些资源,线程是最小的执行单位,调度的基本单位。同时也是资源竞争的基本单位。

具体的东西之前 https://grxer.github.io/2023/03/26/TLS-Hijack/ 里有讨论

#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <stdatomic.h>
#include <assert.h>
#include <unistd.h>
#include <pthread.h>

#define NTHREAD 64
enum { T_FREE = 0, T_LIVE, T_DEAD, };
struct thread {
int id, status;
pthread_t thread;
void (*entry)(int);
};

struct thread tpool[NTHREAD], *tptr = tpool;

void *wrapper(void *arg) {
struct thread *thread = (struct thread *)arg;
thread->entry(thread->id);
return NULL;
}

void create(void *fn) {
assert(tptr - tpool < NTHREAD);
*tptr = (struct thread) {
.id = tptr - tpool + 1,
.status = T_LIVE,
.entry = fn,
};
pthread_create(&(tptr->thread), NULL, wrapper, tptr);
++tptr;
}

void join() {
for (int i = 0; i < NTHREAD; i++) {
struct thread *t = &tpool[i];
if (t->status == T_LIVE) {
pthread_join(t->thread, NULL);
t->status = T_DEAD;
}
}
}

__attribute__((destructor)) void cleanup() {
join();
}
  • POSIX线程库进一步做了一层封装,做了个main函数的析构函数阻塞进程等待

    #include <pthread.h>
    int pthread_create(
    pthread_t * tidp, //新创建的线程ID指向的内存单元。
    const pthread_attr_t * attr, //线程属性
    void *(*start_rtn)(void *), //新创建的线程从start_rtn函数的地址开始运行,使用参数时我们通常会进行类型强转
    void * arg //若上述函数需要参数,将参数放入结构中并将地址作为arg传入。
    );//创建成功返回0,否则返回错误信息对应的非0宏
    void类型的指针配合上类型强转给了我们更多的灵活性和自由性
    pthread_t类型的线程tibp,是进程内部,识别标志,两个进程间,这个线程ID允许相同
    pthread_create函数创建的线程默认非分离属性的,在非分离的情况下,当一个线程结束的时候,它所占用的系统资源并没有完全真正的释放,也没有真正终止。
    只有在pthread_join函数返回时,该线程才会释放自己的资源。或者是设置在分离属性的情况下,一个线程结束会立即释放它所占用的资源。
    int pthread_detach(pthread_t tid);分离一个线程,一个分离的线程是不能被其他线程回收或杀死的。它的内存资源在它终止时由系统自动释放。
    int pthread_join(pthread_t thread, //指定等待哪个子线程结束,接收哪个线程的返回值,只能是非分离线程
    void ** retval//接收参数返回值
    );//成功返回0,否则返回错误信息对应的非0宏
    一直阻塞调用它的线程,直至目标线程执行结束(接收到目标线程的返回值),阻塞状态才会解除。
    如果一个线程是非分离的(默认情况下创建的线程都是非分离)并且没有对该线程使用 pthread_join() 的话,该线程结束后并不会释放其内存(系统)空间,这会导致该线程变成了“僵尸线程”。join后我们也必须手动清除程序分配的空间(堆等等)
  • _attribute__关键字主要是用来在函数或数据声明中设置其属性。给函数赋给属性的主要目的在于让编译器进行优化。

  • 使用编译时pthread 库需要加上-lpthread,因为pthread并非Linux系统的默认库

    -lpthread 选项只是告诉编译器在链接时链接 pthread 库。

    -pthread 将自动添加必要的编译选项和链接选项,以确保程序正确地使用 pthread 库,编译器会自动将 “-lpthread” 选项添加到链接命令行中,以确保在链接时链接 pthread 库。

创建线程使用的是哪个系统调用?

strace了一下

mmap(NULL, 8392704, PROT_NONE, MAP_PRIVATE|MAP_ANONYMOUS|MAP_STACK, -1, 0) = 0x7ffff73ff000
mprotect(0x7ffff7400000, 8388608, PROT_READ|PROT_WRITE) = 0
getrandom("\x5c\xf7\x7d\x5a\x8a\x60\xbd\x73", 8, GRND_NONBLOCK) = 8
brk(NULL) = 0x555555559000
brk(0x55555557a000) = 0x55555557a000
rt_sigprocmask(SIG_BLOCK, ~[], [], 8) = 0
clone3({flags=CLONE_VM|CLONE_FS|CLONE_FILES|CLONE_SIGHAND|CLONE_THREAD|CLONE_SYSVSEM|CLONE_SETTLS|CLONE_PARENT_SETTID|CLONE_CHILD_CLEARTID, child_tid=0x7ffff7bff910, parent_tid=0x7ffff7bff910, exit_signal=0, stack=0x7ffff73ff000, stack_size=0x7fff00, tls=0x7ffff7bff640}gr
=> {parent_tid=[8670]}, 88) = 8670

gdb跟了一下,最后系统调用函数clong3进内核syscall的时候是一个很奇怪未知的调用号

image-20230404215319891

image-20230404215306823

gdb调试多线程

https://sourceware.org/gdb/onlinedocs/gdb/Threads.html

线程独立的栈区

#include "thread.h"

__thread char *base, *cur; // thread-local variables
__thread int id;

// objdump to see how thread-local variables are implemented
__attribute__((noinline)) void set_cur(void *ptr) { cur = ptr; }
__attribute__((noinline)) char *get_cur() { return cur; }

void stackoverflow(int n) {
set_cur(&n);
if (n % 1024 == 0) {
int sz = base - get_cur();
printf("Stack size of T%d >= %d KB\n", id, sz / 1024);
}
stackoverflow(n + 1);
}

void Tprobe(int tid) {
id = tid;
base = (void *)&tid;
stackoverflow(0);
}

int main() {
setbuf(stdout, NULL);
for (int i = 0; i < 4; i++) {
create(Tprobe);
}
}

把输出缓冲区关了,起了四个线程,每个线程都会Tprobe作为入口点,tid在相对高地址栈上,把栈地址给了base,调用StackOverflow

image-20230404185540104

n始终在在栈rbp-0x24偏移处,最后会递归自己,每1024次会计算一次n距离栈底的距离,没有终止条件,栈一直在栈,始终有一天会真正的stackoverflow,我们就可以估计栈的大小

我们可以通过pipe 把结果给sort排序

./stack-probe | sort -nk 6

  • -k选择一列
  • -n把某一列当作数字排序

image-20230404195902138

猜测出栈的大小默认也就是8M

线程内存模型

一组并发线程运行在一个进程的上下文中,各自独立的线程栈的内存模型不是那么整齐清楚的。栈被保存在虚拟地址空间的栈区域中,线程栈是不对其他线程设防的,所以如果一个线程得到另一个线程的栈指针,也是可以进行访问修改

#include <pthread.h>
#include <stdio.h>
#include <string.h>
#include <assert.h>
void* show(void* ptr) {
memcpy(ptr, "grxer", strlen("grxer"));
}
int main() {
char name[10] = "hello";
printf("%s", name);
pthread_t id;
pthread_create(&id, NULL, show,(void*) name);
pthread_join(id, NULL);
printf(" %s", name);
}
grxer@grxer ~/D/s/N/p3> gcc -g -o  test test.c -pthread
grxer@grxer ~/D/s/N/p3> ./test
hello grxer⏎

如何改变这个堆栈大小?

可以利用pthread_create第二个参数线程属性在创建线程时做操作

/* Data structure for condition variable handling.  The structure of
the attribute type is not exposed on purpose. */
typedef union
{
char __size[__SIZEOF_PTHREAD_CONDATTR_T];
int __align;
} pthread_condattr_t;

没有公开类型,但是给我们了api

int pthread_attr_init(pthread_attr_t *attr);
int pthread_attr_getstacksize(pthread_attr_t *attr, size_t *stacksize);
int pthread_attr_setstacksize(pthread_attr_t *attr, size_t stacksize);
返回值0-1分别表示成功与失败,这里的stacksize都是以byte为单位
#include <pthread.h>
#include <stdio.h>
#include <assert.h>
int main() {
pthread_t id;
pthread_attr_t thread_attr;
size_t stacksize;
int res = pthread_attr_init(&thread_attr);
assert(!res);
res = pthread_attr_getstacksize(&thread_attr, &stacksize);
assert(!res);
printf("before:%zu\n", stacksize);
res = pthread_attr_setstacksize(&thread_attr,10485760);
assert(!res);
res = pthread_attr_getstacksize(&thread_attr, &stacksize);
assert(!res);
printf("after:%zu\n", stacksize);
}

grxer@grxer ~/D/s/N/p3> ./test
before:8388608
after:10485760

我们这样去改造一下thread.h就好

pthread_attr_t thread_attr;
int res = pthread_attr_init(&thread_attr);
assert(!res);
res = pthread_attr_setstacksize(&thread_attr, 10485760);
assert(!res);
pthread_create(&(tptr->thread), &thread_attr, wrapper, tptr);

image-20230404203402253

__thread浅谈

Thread Local Storage,之前劫持过tls来bypass canary,但是对底层理解并不深,这次有了新的体会,但是也只是浅谈,好多东西还没搞懂,要去补补保护模式

get_cur
.text:0000000000001388 55 push rbp
.text:0000000000001389 48 89 E5 mov rbp, rsp
.text:000000000000138C 64 48 8B 04 25 F0 FF FF FF mov rax, fs:0FFFFFFFFFFFFFFF0h
.text:0000000000001395 5D pop rbp

还是和fs段寄存器有关,其实准确来说,自从8086的实模式过后,cs ss ds es等段寄存器里面存的不再是为了满足20位地址总线和重定位的物理段基地址,在保护模式下变为了段选择字,现在更应该叫段选择器,保护模式下会有用户态不可见的部分,叫做描述符高速缓冲器(是一个缓存,每次进行段解析后保存再里面),由处理器自动使用,而真正的段地址就在这里面

有以下方式去获得fs段地址

>>>通过内联汇编的方式

>>>系统调用int arch_prctl(int code, unsigned long *addr)

#include <stdio.h>
#include <pthread.h>
#include <asm/prctl.h>
#include <sys/prctl.h>
int main() {
unsigned long fs_value;
unsigned long fs_value2;
pthread_t pid = pthread_self();
int ret = arch_prctl(ARCH_GET_FS,&fs_value2);
__asm__ volatile("mov %%fs:0, %0" : "=r" (fs_value));
printf("FS寄存器的值为: 0x%lx\narch_prctl:0x%lx\npid:0x%lx", fs_value,fs_value2,pid);
return 0;
}

grxer@grxer ~/D/s/N/ctest> ./test
FS寄存器的值为: 0x7ffff7fa6740
arch_prctl:0x7ffff7fa6740
pid:0x7ffff7fa6740

>>>gdb的fsbase命令p/x pthread_self()获得这个值

image-20230404205339561

这里才惊奇的发现gdb里可以直接运行c函数

可以看到POSIX线程库的pthread_create返回给我们的tidp是每个线程的fs段地址

观察一下程序里定义的__thread变量和汇编里的关系

__thread char *base, *cur; // thread-local variables
__thread int id;

base:.text:0000000000001435 64 89 04 25 F8 FF FF FF mov fs:0FFFFFFFFFFFFFFF8h(-8), eax
cur:.text:000000000000138C 64 48 8B 04 25 F0 FF FF FF mov rax, fs:0FFFFFFFFFFFFFFF0h(-16)
id:.text:0000000000001441 64 48 89 04 25 E8 FF FF FF mov fs:0FFFFFFFFFFFFFFE8h(-24), rax

在fs-8处开始排列,他的下面也就是我们的之前看的canary和__exit_funcs析构函数组的key,pthread_create去创建一个进程是用mmap一块内存当作堆栈,fs则位于这个堆栈的高地址处,也就是堆栈的底部

原子性的丧失

#include "thread.h"

#define N 100000000

long sum = 0;

void Tsum() {
for (int i = 0; i < N; i++) {
sum++;
}
}

int main() {
create(Tsum);
create(Tsum);
join();
printf("sum = %ld\n", sum);
}

就连单处理器并发都得不到正确的结果。taskset -c cpu-cpu <command>限制在cpu-cpu个处理器上运行

image-20230404235118554

多处理器线程在并行,偏差更大

image-20230404235238898

具体来说,当一个线程正在读取sum的值时,另一个线程可能已经修改了sum的值,但是第一个线程并不知道。这可能导致一个线程覆盖了另一个线程的结果,从而导致最终结果错误。

sum++翻译为汇编是三条指令,取出,+1,写回,这也就是我们单处理器为什么会出错原因

.text:000000000000145A 48 8B 05 DF 31 00 00          mov     rax, cs:sum
.text:0000000000001461 48 83 C0 01 add rax, 1
.text:0000000000001465 48 89 05 D4 31 00 00 mov cs:sum, rax

我们用内联汇编把sum++改为一句

asm volatile("add $1,%0":"+m"(sum));
.text:000000000000145A 83 05 DF 31 00 00 01          add     dword ptr cs:sum, 1

此时单核处理器没有多处理器乱序的并行,可以正常得到结果

image-20230405000115259

多核处理器依旧出差,不过误差低了一点

image-20230405000145046

顺序的丧失

  • 用gcc对程序进行-O1的优化发现每次运行结果都是sum = 100000000
.text:0000000000001223
.text:0000000000001223 ; void __cdecl Tsum()
.text:0000000000001223 public Tsum
.text:0000000000001223 Tsum proc near ; DATA XREF: main+5↓o
.text:0000000000001223 ; __unwind {
.text:0000000000001223 F3 0F 1E FA endbr64
.text:0000000000001227 48 8B 15 12 2E 00 00 mov rdx, cs:sum
.text:000000000000122E 48 8D 42 01 lea rax, [rdx+1]
.text:0000000000001232 48 81 C2 01 E1 F5 05 add rdx, 5F5E101h
.text:0000000000001232
.text:0000000000001239
.text:0000000000001239 loc_1239: ; CODE XREF: Tsum+20↓j
.text:0000000000001239 48 89 C1 mov rcx, rax
.text:000000000000123C 48 83 C0 01 add rax, 1
.text:0000000000001240 48 39 D0 cmp rax, rdx
.text:0000000000001243 75 F4 jnz short loc_1239
.text:0000000000001243
.text:0000000000001245 48 89 0D F4 2D 00 00 mov cs:sum, rcx
.text:000000000000124C C3 retn

-01的优化保留了循环,但是优化了每次写回内存的操作,用rcx寄存器代替

mov cs:sum, rcx最后写回的这一步决定了sum = 100000000

  • -O2优化 每次运行结果sum = 200000000
.text:0000000000001290                               ; void __cdecl Tsum()
.text:0000000000001290 public Tsum
.text:0000000000001290 Tsum proc near ; DATA XREF: main+5↑o
.text:0000000000001290 ; __unwind {
.text:0000000000001290 F3 0F 1E FA endbr64
.text:0000000000001294 48 81 05 A1 2D 00 00 00 E1 F5+add cs:sum, 5F5E100h
.text:0000000000001294 05
.text:000000000000129F C3 retn

循环也没有了,直接把正确结果一个add,得到正确结果,但是这只是一个假象,和前面一句add汇编一样,当一个线程正在读取sum的值时,另一个线程可能已经修改了sum的值,我们只是两个线程去add,只是错误得几率很小,我们把线程增加到14,14次竞争add就已经有可见概率错误了

image-20230405004225147

extern int done;
void join(){
while (!done) {

}
}

这一一段代码我们只编译不链接

grxer@grxer ~/D/s/N/p3> gcc -c test.c
grxer@grxer ~/D/s/N/p3> objdump -d test.o

test.o: file format elf64-x86-64


Disassembly of section .text:

0000000000000000 <join>:
0: f3 0f 1e fa endbr64
4: 55 push %rbp
5: 48 89 e5 mov %rsp,%rbp
8: 90 nop
9: 8b 05 00 00 00 00 mov 0x0(%rip),%eax # f <join+0xf>
f: 85 c0 test %eax,%eax
11: 74 f6 je 9 <join+0x9>
13: 90 nop
14: 90 nop
15: 5d pop %rbp
16: c3 ret

没加优化,每一次都会把done取出来,判断done是否为0

-O1优化

grxer@grxer ~/D/s/N/p3> gcc -O1 -c test.c
grxer@grxer ~/D/s/N/p3> objdump -d test.o

test.o: file format elf64-x86-64


Disassembly of section .text:

0000000000000000 <join>:
0: f3 0f 1e fa endbr64
4: 8b 05 00 00 00 00 mov 0x0(%rip),%eax # a <join+0xa>
a: 85 c0 test %eax,%eax
c: 74 fc je a <join+0xa>
e: c3 ret

只进行了一次取出done,多次判断相当于

if(!(x=done)){
while(x)
}

但是我们知道这个done为全局变量,万一被某个线程改掉了呢?就叛变了原本的逻辑

-O2优化

grxer@grxer ~/D/s/N/p3> objdump -d test.o

test.o: file format elf64-x86-64


Disassembly of section .text:

0000000000000000 <join>:
0: f3 0f 1e fa endbr64
4: 8b 05 00 00 00 00 mov 0x0(%rip),%eax # a <join+0xa>
a: 85 c0 test %eax,%eax
c: 75 02 jne 10 <join+0x10>
e: eb fe jmp e <join+0xe>
10: c3 ret

更离谱了

if(!done){
while(1)
}

解决办法

给编译器加一下限制

  • Memory Barrier

    asm volatile (“” ::: “memory”)

    volatile告诉编译器不要对这条指令优化

    memory 表示指令以不可预测的方式修改了内存, 强制gcc编译器假设RAM所有内存单元均被汇编指令修改,这样cpu中的registers和cache中已缓存的内存单元中的数据将作废。cpu将不得不在需要的时候重新读取内存中的数据。

  • 使用volatile变量

    extern int volatile done;

    volatile 指出 done 是随时可能发生变化的,每次使用它的时候必须从done的内存地址中读取,即使cache或寄存器里有

可见性的丧失

#include "thread.h"

int x = 0, y = 0;

atomic_int flag;
#define FLAG atomic_load(&flag)
#define FLAG_XOR(val) atomic_fetch_xor(&flag, val)
#define WAIT_FOR(cond) while (!(cond)) ;

__attribute__((noinline))
void write_x_read_y() {
int y_val;
asm volatile(
"movl $1, %0;" // x = 1
"movl %2, %1;" // y_val = y
: "=m"(x), "=r"(y_val) : "m"(y)
);
printf("%d ", y_val);
}

__attribute__((noinline))
void write_y_read_x() {
int x_val;
asm volatile(
"movl $1, %0;" // y = 1
"movl %2, %1;" // x_val = x
: "=m"(y), "=r"(x_val) : "m"(x)
);
printf("%d ", x_val);
}

void T1(int id) {
while (1) {
WAIT_FOR((FLAG & 1));
write_x_read_y();
FLAG_XOR(1);
}
}

void T2() {
while (1) {
WAIT_FOR((FLAG & 2));
write_y_read_x();
FLAG_XOR(2);
}
}

void Tsync() {
while (1) {
x = y = 0;
__sync_synchronize(); // full barrier
usleep(1); // + delay
assert(FLAG == 0);
FLAG_XOR(3);
// T1 and T2 clear 0/1-bit, respectively
WAIT_FOR(FLAG == 0);
printf("\n"); fflush(stdout);
}
}

int main() {
create(T1);
create(T2);
create(Tsync);
}
Built-in Function: void __atomic_load (type *ptr, type *ret, int memorder)
This is the generic version of an atomic load. It returns the contents of *ptr in *ret.

首先用来原子变量的flag的低两个bit当作开关

初始时为00,最低bit为1时T1打开,倒二bit为1时T2打开

Tsync是我们的控制线程中FLAG_XOR(3)同时把T1T2打开,然后T1T2同时执行,不管哪一个线程快一点慢一点,得到的结果只能是01 10 11,执行过后会把自己对应的开关置零关闭

根据该程序的状态机模型是不可能出现00的,但是

grxer@grxer ~/D/s/N/p3> gcc -O2 -g -o mem-ordering mem-ordering.c
grxer@grxer ~/D/s/N/p3> ./mem-ordering | head -n 10000 |sort| uniq -c
7970 0 0
1756 0 1
274 1 0
#head -n 取前n行 sort排序后 uniq命令用于检查及删除文本文件中重复出现的行列 -c统计该行重复出现的次数

导致这方面的原因来自多核处理器的微架构 https://www.bilibili.com/video/BV1844y1z7Dx

为了实现流水线,需要一条指令拆分为更小的可执行单元,叫做micro-operations简称uOps,利用寄存器重命名和分支目标预测的技术,实现超标量乱序执行。

在代码级上,看上去似乎是一次执行一条指令,但是实际上是指令级并行,呈现出一种简单的顺序执行指令的表象。正好能获得机器级程序要求的顺序语义模型的效果。

具体到我们上面的程序

     # <-----------+
movl $1, (x) # |
movl (y), %eax # --+

我们FLAG_XOR(3)后,在多处理器系统中,当一个CPU对共享内存进行修改时,需要使其他CPU中的缓存无效,导致movl $1, %0;几乎总是cache miss的,cpu在找内存同时执行下一条和上一条无关的指令,导致了输出0

"movl $1, %0;" // y = 1    
"movl %2, %1;" // x_val = x
: "=m"(y), "=r"(x_val) : "m"(x)

实现一致性

  • Memory barrier:__sync_synchronize()

    好文

    Built-in Function: void __sync_synchronize (…)

    This built-in function issues a full memory barrier.

    static inline void __sync_synchronize (void)
    {
    /* This issues the "mfence" instruction on x86/x86_64. */
    __asm__ __volatile__ ("mfence" : : : "memory");
    }

    我们之前在解决顺序性,用到的asm volatile (“” ::: “memory”)引导编译器,而我们这次需要和硬件进行交流

    防止这种CPU乱序,我们需要添加CPU memory fence。X86专门的memory fence指令是”mfence”;保证存访问操作完成后才能执行后续的内存访问操作

    "movl $1, %0;" // x = 1
    "mfence;"
    "movl %2, %1;" // y_val = y
    grxer@grxer ~/D/s/N/p3> ./mem-ordering | head -n 10000 |sort| uniq -c
    1462 0 1
    221 1 0
    8317 1 1
  • 原子指令

    和我们的flag一样用原子变量

    stdatomic.h