OSTEP:30&&31 条件变量&&信号量

条件变量实现生产者/消费者

int buffer[MAX];
int fill = 0;
int use = 0;
int count = 0;

void put(int value) {
buffer[fill] = value;
fill = (fill + 1) % MAX;
count++;
}

int get() {
int tmp = buffer[use];
use = (use + 1) % MAX;
count--;
return tmp;
}
cond_t empty, fill;
mutex_t mutex;

void *producer(void *arg) {
int i;
for (i = 0; i < loops; i++) {
Pthread_mutex_lock(&mutex); // p1
while (count == MAX) // p2
Pthread_cond_wait(&empty, &mutex); // p3
put(i); // p4
Pthread_cond_signal(&fill); // p5
Pthread_mutex_unlock(&mutex); // p6
}
}

void *consumer(void *arg) {
int i;
for (i = 0; i < loops; i++) {
Pthread_mutex_lock(&mutex); // c1
while (count == 0) // c2
Pthread_cond_wait(&fill, &mutex); // c3
int tmp = get(); // c4
Pthread_cond_signal(&empty); // c5
Pthread_mutex_unlock(&mutex); // c6
printf("%d\n", tmp);
}
}

条件变量使用技巧

  • Pthread_cond_signal发信号时总是持有锁
  • 对条件变量使用 while(不是 if)

例子生产者/消费者p2和c2为什么用while?

如果用if,考虑一个生产者Tp,两个消费者Tc1和Tc2,Tc1先执行此时buffer里没有数据,执行到c3休眠,Tp随后执行,放入数据后,执行到p5唤醒Tc1,Tc1进入就绪态还未运行时,Tc2执行拿走了buffer里所有数据,随后Tc1运行,如果时if直接就好执行c4去get空的buffer里的数据,触发错误,如果时while还会检测(count==0)一些缓冲区是否有数据,从而避免错误

为什么用了两个条件变量empty, fill?

是为了实现消费者不应该唤醒消费者,而应该只唤醒生产者,同样生产者只能唤醒消费者

如果共用一个条件变量,即消费者可以唤醒消费者,试想缓冲区没有数据Tc1,Tc2休眠,随后Tp放入一个数据后睡眠,Tc1唤醒拿走数据,如果此时唤醒Tc2,Tc2发现没有数据继续睡眠,此时三个都在睡眠,形成死锁

一个比较形象的例子,下面的代码打印左括号代表生产,右括号代表取走

#include "thread-sync.h"
#include "thread.h"

int n, count = 0;
mutex_t lk = MUTEX_INIT();

void Tproduce() {
while (1) {
retry:
mutex_lock(&lk);
if (count == n) {//缓冲区生产满了
mutex_unlock(&lk);
goto retry;
}
count++;
printf("(");
mutex_unlock(&lk);
}
}
void Tconsume() {
while (1) {
retry:
mutex_lock(&lk);
if (count == 0) {//缓冲区为空
mutex_unlock(&lk);
goto retry;
}
count--;
printf(")");
mutex_unlock(&lk);
}
}
int main(int argc, char *argv[]) {
assert(argc == 2);
n = atoi(argv[1]);//缓冲区大小
setbuf(stdout, NULL);
for (int i = 0; i < 8; i++) {
create(Tproduce);
create(Tconsume);
}
}

压力测试脚本 pc-check.py

import sys

limit = int(sys.argv[1])
count, n = 0, 100000
while True:
for ch in sys.stdin.read(n):
if ch == '(': count += 1
if ch == ')': count -= 1
assert 0 <= count <= limit#消费者不会多取
print(f'{n} Ok.')
grxer@Ubuntu22 ~/s/N/p6> ./a.out 10 | python pc-check.py 10
100000 Ok.
100000 Ok.
100000 Ok.
100000 Ok.

信号量实现生产者/消费者

int sem_wait(sem_t *sem);
信号量sem减一,如果结果大于0直接返回,否则阻塞到sem能够减一即等待直到有其它线程增加了这个值使它不再是0为止
int sem_post(sem_t *sem);
信号量sem加一,唤醒一个该信号量阻塞的线程

信号量实现锁

sem_t m;
sem_init(&m, 0, 1);

sem_wait(&m);
// critical section here
sem_post(&m);

信号量实现条件变量

sem_t m;
sem_init(&m, 0, 0);
sem_wait(&s);//阻塞该线程等待某个线程程执行sem_post(&s);唤醒才能继续往下执行
.....

生产者/消费者

sem_t empty;
sem_t full;
sem_t mutex;

void *producer(void *arg) {
int i;
for (i = 0; i < loops; i++) {
sem_wait(&empty); // line p1
sem_wait(&mutex); // line p1.5 (MOVED MUTEX HERE...)
put(i); // line p2
sem_post(&mutex); // line p2.5 (... AND HERE)
sem_post(&full); // line p3
}
}
void *consumer(void *arg) {
int i;
for (i = 0; i < loops; i++) {
sem_wait(&full); // line c1
sem_wait(&mutex); // line c1.5 (MOVED MUTEX HERE...)
int tmp = get(); // line c2
sem_post(&mutex); // line c2.5 (... AND HERE)
sem_post(&empty); // line c3
printf("%d\n", tmp);
}
}

int main(int argc, char *argv[]) {
// ...
sem_init(&empty, 0, MAX); // MAX buffers are empty to begin with...
sem_init(&full, 0, 0); // ... and 0 are full
sem_init(&mutex, 0, 1); // mutex=1 because it is a lock
// ...
}