锁将原本由操作系统调度线程的混乱状态变得可控
评价锁🔒
正确性:能否阻止线程进入临界区
公平性:是否每一个竞争线程有公平的机会抢到锁
性能:加锁之后增加的时间开销
没有竞争,一个线程加锁开锁
一个cpu,多个线程竞争
多个cpu,多个线程竞争
控制中断 最早的单处理器解决方法:进入临界区前关中断
缺点
需要允许程序开关中断
多处理器不可行,其他处理器上线程可以进入临界区
关中断期间可能导致其他正常中断丢失
效率低
硬件支持 与其软件上利用:DEKKER,PETERSON等算法,不如硬件支持👌,增加了一些硬件原语
x86 xchg (atomic exchange,原子交换)指令 ,通常称为测试并设置指令(test-and-set)
自旋锁spin lock实现
int xchg (volatile int *addr, int newval) { int result; asm volatile ("lock xchg %0, %1" : "+m" (*addr), "=a" (result) : "1" (newval)) ; return result; }
int flag = YES;void lock () {retry: int got = xchg(&flag, NOPE); if (got == NOPE) goto retry; assert(got == YES); } void unlock () { xchg(&flag, YES) }
和上面代码等价的精简
int flag = 0 void lock() { while (xchg(&flag,1 ))}void unlock() { xchg(&locked, 0 ); }
cpmxhg指令 Compare and Exchange
x86/64架构中的cmpxchg指令是如何实现并保证原子性的
https://www.zhihu.com/question/492576993
char CompareAndSwap (int *ptr, int expected, int new) { unsigned char ret; __asm__ __volatile__ ( " lock\n" " cmpxchgl %2,%1\n" " sete %0\n" : "=q" (ret), "=m" (*ptr) : "r" (new), "m" (*ptr), "a" (old) : "memory" ); return ret; }
cmpxchg dest,src:将AL、AX、EAX或RAX寄存器中的值与第一个操作数dest(目标操作数)进行比较。如果两个值相等,则将第二个操作数src(源操作数)加载到dest目标操作数中同时ZF标志位置1。 如果不相等,则目标操作数被加载到AL、AX、EAX或RAX寄存器中,x同时ZF标志位置0。
SETE r/m8:Set byte if equal.设置r/m8为zf标志位的值
c的伪代码就是
int CompareAndSwap (int *ptr, int expected, int new) { int ret = *ptr; if (ret == expected) { *ptr = new; return 1 ; } return 0 ; }
自旋锁spin lock实现
int flag = 0 ;void lock () {while (CompareAndSwap(flag,1 ,1 ) == 1 );}void unlock () {flag = 0 }
riscv LR/SC 指令
lr.{w/d}.{aqrl} rd, (rs1) 从内存地址 rs1 中加载内容到 rd 寄存器。然后在 rs1 对应地址上设置保留标记(reservation set),w/d 分别对应 32 位/64 位版本,中断、其他处理器写入都会导致标记消除 sc.{w/d}.{aqrl} rd, rs2, (rs1) sc指令在把rs2值写到rs1地址之前,会先判断 rs1 内存地址是否有设置保留标记,如果设置了,则把 rs2 值正常写入到 rs1 内存地址里,并把 rd 寄存器设置成 0,表示保存成功。如果 rs1 内存地址没有设置保留标记,则不保存,并把 rd 寄存器设置成 1 表示保存失败。不管成功还是失败,sc指令都会把当前hart保留的所有保留标记全部清除
xchg升级版:Compare-and-Swap 的 LR/SC 实现
int cas (int *addr, int cmp_val, int new_val) { int old_val = *addr; if (old_val == cmp_val) { *addr = new_val; return 0 ; } else { return 1 ; } }
cas: lr.w t0, (a0) # Load original value. bne t0, a1, fail # Doesn’t match, so fail. sc.w t0, a2, (a0) # Try to update. bnez t0, cas # Retry if store-conditional failed. li a0, 0 # Set return to success. jr ra # Return. fail: li a0, 1 # Set return to failure. jr ra # Return
自旋锁评价
正确性 👍
公平性 👎
性能
单核处理器竞争,每个竞争锁的线程都会浪费一个时间片
多核处理器竞争,临界区一般很短,另一个处理器上的竞争锁的线程会很快得到锁
ticket锁 利用硬件原语fetch-and-add,原子地返回特定地址的旧值,并且让该值自增一
x86
xadd dest,src指令:Exchange and Add
dest,src先交换,然后再把两者之和付给dest TEMP := SRC + DEST; SRC := DEST; DEST := TEMP;
static inline int fetch_and_add (int * variable, int value) { __asm__ volatile ("lock; xaddl %0, %1" : "+r" (value), "+m" (*variable) : : "memory" ) ; return value; }
ticket锁 实现
int FetchAndAdd (int *ptr) { int old = *ptr; *ptr = old + 1 ; return old; } typedef struct lock_t { int ticket; int turn; } lock_t ; void lock_init (lock_t *lock) { lock->ticket = 0 ; lock->turn = 0 ; } void lock (lock_t *lock) { int myturn = FetchAndAdd(&lock->ticket); while (lock->turn != myturn) ; } void unlock (lock_t *lock) { FetchAndAdd(&lock->turn); }
ticket能够保证每一个竞争线程都能够抢到锁,且按照自旋等待的先后顺序
操作系统支持 对于简单的yield+自旋,yield()系统调用让线程让出cpu,可能操作系统下次还是选择运行该线程,同时其他线程只能看着有锁的线程做事,虽然比浪费完整时间片要好,但是系统调用的上下文切换开销也很大
park+自旋锁 Solaris
Solaris系统提供了系统调用,park()能够让调用线程休眠,unpark(threadID)则会唤醒 threadID 标识的线程。
typedef struct lock_t { int flag; int guard; queue_t *q; } lock_t ; void lock_init (lock_t *m) { m->flag = 0 ; m->guard = 0 ; queue_init(m->q); } void lock (lock_t *m) { while (TestAndSet(&m->guard, 1 ) == 1 ) ; if (m->flag == 0 ) { m->flag = 1 ; m->guard = 0 ; } else { queue_add(m->q, gettid()); m->guard = 0 ; park(); } } void unlock (lock_t *m) { while (TestAndSet(&m->guard, 1 ) == 1 ) ; if (queue_empty(m->q)) m->flag = 0 ; else unpark(queue_remove(m->q)); m->guard = 0 ; }
问题:假设一个线程已经加入休眠队列将要调用park休眠,此时如果系统切换到了正在持有锁的线程。如果该线程随后释放了锁,前面的线程调用park后可能会永远休眠下去。
Solaris增加了第三个系统调用 separk()来解决问题
setpark():一个线程表明自己马上要 park。如果刚好另一个线程被调度,并且调用了 unpark,那么后续的park调用就会直接返回,不会休眠
void lock (lock_t *m) { while (TestAndSet(&m->guard, 1 ) == 1 ) ; if (m->flag == 0 ) { m->flag = 1 ; m->guard = 0 ; } else { queue_add(m->q, gettid()); setpark(); m->guard = 0 ; park(); } }
死锁 死锁一般有以下四个必要条件
互斥,线程对于需要的资源进行互斥的访问
持有和等待,线程持有已有的资源,同时等待使用下一个资源。
不可抢占,进程所获得的资源在未使用完毕之前,资源申请者不能强行地从资源占有者手中夺取资源,而只能由该资源的占有者进程自行释放。
循环等待,线程之间存在一个环路,环路上每个线程都额外持有一个资源,而这个资源又是下一个线程要申请的。
如果这 4 个条件的任何一个没有满足,死锁就不会产生。