OSTEP:27 锁🔒

锁将原本由操作系统调度线程的混乱状态变得可控

评价锁🔒

  • 正确性:能否阻止线程进入临界区
  • 公平性:是否每一个竞争线程有公平的机会抢到锁
  • 性能:加锁之后增加的时间开销
    • 没有竞争,一个线程加锁开锁
    • 一个cpu,多个线程竞争
    • 多个cpu,多个线程竞争

控制中断

最早的单处理器解决方法:进入临界区前关中断

缺点

  • 需要允许程序开关中断
  • 多处理器不可行,其他处理器上线程可以进入临界区
  • 关中断期间可能导致其他正常中断丢失
  • 效率低

硬件支持

与其软件上利用:DEKKER,PETERSON等算法,不如硬件支持👌,增加了一些硬件原语

x86

xchg (atomic exchange,原子交换)指令,通常称为测试并设置指令(test-and-set)

自旋锁spin lock实现

int xchg(volatile int *addr, int newval) {
int result;
asm volatile ("lock xchg %0, %1"
: "+m"(*addr), "=a"(result) : "1"(newval));
return result;
}
int flag = YES;//flag为YES代表可以进临界区,
void lock() {
retry:
int got = xchg(&flag, NOPE);
if (got == NOPE)
goto retry;
assert(got == YES);
}

void unlock() {
xchg(&flag, YES)
}

和上面代码等价的精简

int flag = 0//flag为0代表没有锁上,可以进临界区
void lock() { while(xchg(&flag,1))}
void unlock() { xchg(&locked, 0); }

cpmxhg指令 Compare and Exchange

x86/64架构中的cmpxchg指令是如何实现并保证原子性的

https://www.zhihu.com/question/492576993

char CompareAndSwap(int *ptr, int expected, int new) {
unsigned char ret;
// Note that sete sets a 'byte' not the word
__asm__ __volatile__ (
" lock\n"
" cmpxchgl %2,%1\n"
" sete %0\n"
: "=q" (ret), "=m" (*ptr)
: "r" (new), "m" (*ptr), "a" (old)//对于相同的"m" (*ptr)似乎是按照后面的来算的,所以这里%0代表ret,%1代表 new, %2代表*ptr
: "memory");
return ret;
}

cmpxchg dest,src:将AL、AX、EAX或RAX寄存器中的值与第一个操作数dest(目标操作数)进行比较。如果两个值相等,则将第二个操作数src(源操作数)加载到dest目标操作数中同时ZF标志位置1。 如果不相等,则目标操作数被加载到AL、AX、EAX或RAX寄存器中,x同时ZF标志位置0。

SETE r/m8:Set byte if equal.设置r/m8为zf标志位的值

c的伪代码就是

int CompareAndSwap(int *ptr, int expected, int new) {
int ret = *ptr;
if (ret == expected) {
*ptr = new;
return 1;
}
return 0;
}

自旋锁spin lock实现

int flag = 0;//flag为0代表没有锁上,可以进临界区
void lock(){while(CompareAndSwap(flag,1,1) == 1);}
void unlock(){flag = 0}

riscv

LR/SC 指令

lr.{w/d}.{aqrl} rd, (rs1)
从内存地址 rs1 中加载内容到 rd 寄存器。然后在 rs1 对应地址上设置保留标记(reservation set),w/d 分别对应 32 位/64 位版本,中断、其他处理器写入都会导致标记消除
sc.{w/d}.{aqrl} rd, rs2, (rs1)
sc指令在把rs2值写到rs1地址之前,会先判断 rs1 内存地址是否有设置保留标记,如果设置了,则把 rs2 值正常写入到 rs1 内存地址里,并把 rd 寄存器设置成 0,表示保存成功。如果 rs1 内存地址没有设置保留标记,则不保存,并把 rd 寄存器设置成 1 表示保存失败。不管成功还是失败,sc指令都会把当前hart保留的所有保留标记全部清除

xchg升级版:Compare-and-Swap 的 LR/SC 实现

int cas(int *addr, int cmp_val, int new_val) {
int old_val = *addr;
if (old_val == cmp_val) {
*addr = new_val; return 0;
} else { return 1; }
}
cas:
lr.w t0, (a0) # Load original value.
bne t0, a1, fail # Doesn’t match, so fail.
sc.w t0, a2, (a0) # Try to update.
bnez t0, cas # Retry if store-conditional failed.
li a0, 0 # Set return to success.
jr ra # Return.
fail:
li a0, 1 # Set return to failure.
jr ra # Return

自旋锁评价

  • 正确性 👍

  • 公平性 👎

  • 性能

    • 单核处理器竞争,每个竞争锁的线程都会浪费一个时间片
    • 多核处理器竞争,临界区一般很短,另一个处理器上的竞争锁的线程会很快得到锁

ticket锁

利用硬件原语fetch-and-add,原子地返回特定地址的旧值,并且让该值自增一

x86

xadd dest,src指令:Exchange and Add

dest,src先交换,然后再把两者之和付给dest
TEMP := SRC + DEST;
SRC := DEST;
DEST := TEMP;
static inline int fetch_and_add(int* variable, int value)
{
__asm__ volatile("lock; xaddl %0, %1"
: "+r" (value), "+m" (*variable) // input+output
: // No input-only
: "memory"
);
return value;
}

ticket锁实现

int FetchAndAdd(int *ptr) {
int old = *ptr;
*ptr = old + 1;
return old;
}
typedef struct lock_t {
int ticket;
int turn;
} lock_t;

void lock_init(lock_t *lock) {
lock->ticket = 0;
lock->turn = 0;
}

void lock(lock_t *lock) {
int myturn = FetchAndAdd(&lock->ticket);
while (lock->turn != myturn)
; // spin
}
void unlock(lock_t *lock) {
FetchAndAdd(&lock->turn);
}

ticket能够保证每一个竞争线程都能够抢到锁,且按照自旋等待的先后顺序

操作系统支持

对于简单的yield+自旋,yield()系统调用让线程让出cpu,可能操作系统下次还是选择运行该线程,同时其他线程只能看着有锁的线程做事,虽然比浪费完整时间片要好,但是系统调用的上下文切换开销也很大

park+自旋锁

Solaris

Solaris系统提供了系统调用,park()能够让调用线程休眠,unpark(threadID)则会唤醒 threadID 标识的线程。

typedef struct lock_t {
int flag;
int guard;
queue_t *q;//队列防止了饿死某个线程的问题
} lock_t;
void lock_init(lock_t *m) {
m->flag = 0;
m->guard = 0;
queue_init(m->q);
}
void lock(lock_t *m) {
while (TestAndSet(&m->guard, 1) == 1)//获取锁或者释放锁其他线程还是需要自旋,来保证内存一致
; // acquire guard lock by spinning
if (m->flag == 0) {
m->flag = 1; // lock is acquired 真正获取锁
m->guard = 0;//设置后有线程抢锁会结束自旋,加入队列
} else {
queue_add(m->q, gettid());
m->guard = 0;//设置后有线程抢锁会结束自旋,加入队列
park();
}
}
void unlock(lock_t *m) {
while (TestAndSet(&m->guard, 1) == 1)//获取锁或者释放锁其他线程还是需要自旋,来保证内存一致
; // acquire guard lock by spinning
if (queue_empty(m->q))
m->flag = 0; // let go of lock; no one wants it
else
unpark(queue_remove(m->q)); // hold lock (for next thread!)直接把flag的锁给下一个线程即可,
m->guard = 0;
}

问题:假设一个线程已经加入休眠队列将要调用park休眠,此时如果系统切换到了正在持有锁的线程。如果该线程随后释放了锁,前面的线程调用park后可能会永远休眠下去。

Solaris增加了第三个系统调用 separk()来解决问题

setpark():一个线程表明自己马上要 park。如果刚好另一个线程被调度,并且调用了 unpark,那么后续的park调用就会直接返回,不会休眠

void lock(lock_t *m) {
while (TestAndSet(&m->guard, 1) == 1)//获取锁或者释放锁其他线程还是需要自旋,来保证内存一致
; // acquire guard lock by spinning
if (m->flag == 0) {
m->flag = 1; // lock is acquired 真正获取锁
m->guard = 0;//设置后有线程抢锁会结束自旋,加入队列
} else {
queue_add(m->q, gettid());
setpark();// new code
m->guard = 0;//设置后有线程抢锁会结束自旋,加入队列
park();
}
}

死锁

死锁一般有以下四个必要条件

  • 互斥,线程对于需要的资源进行互斥的访问
  • 持有和等待,线程持有已有的资源,同时等待使用下一个资源。
  • 不可抢占,进程所获得的资源在未使用完毕之前,资源申请者不能强行地从资源占有者手中夺取资源,而只能由该资源的占有者进程自行释放。
  • 循环等待,线程之间存在一个环路,环路上每个线程都额外持有一个资源,而这个资源又是下一个线程要申请的。

如果这 4 个条件的任何一个没有满足,死锁就不会产生。