0%

重返操作系统:信号量编程

从今年春节回来到现在,经历了不少事情。暑期勉强打赢了“复活赛”,不过主包也快到残血状态了。现在终于能腾出点时间,好好学习一下。

有很多想学的东西,技术方向包括 vllm、deepspeed、大模型、CUDA、模型量化等;非技术方面,想学剪视频和修图——毕竟手里攒了一堆素材,哈哈哈,慢慢来吧。

今天想聊聊信号量,这也是我在新工作中遇到的实际场景。信号量本质上是一个变量,常用于多线程环境中的同步控制。举个例子:假设有两个线程 A 和 B,A 在处理临界资源,当满足某些条件时,它会释放锁并通知 B;而 B 完全没必要一直轮询 “A 做完了吗?”。这样既浪费资源,又不够优雅。

信号量入门

一个经典的场景是生产者和消费者模型。我去线下店订购 100 杯咖啡,老板做好后通知我就可以了,我没必要隔 1 分钟就去问老板做好了没有。老板是 A 线程,我是 B 线程,临界资源是 100 杯咖啡。来看 C++ 中是如何使用条件变量的,我们来模拟生产 100 杯咖啡,而后一次性消费的过程。

单生产-单消费模式

C++ 提供的条件变量 std::condition_variable 允许一个或多个线程等待某个条件变真,等待的函数是 wait(lock, predicate),原子的释放锁,并检查谓词条件 predicate 是否成立,如果不成立,会再次等待。当另外的线程改变了条件时,可以使用 notify_one 或者 notify_all 通知正在等待的线程,前者唤醒一个等待的线程,后者唤醒所有正在等待的线程。我们用 queue 来存储咖啡,创建相关的变量:

1
2
3
4
5
6
7
8
std::mutex mtx;                  // 访问信号量加锁
std::queue<int> caffe; // 放咖啡
const int num = 100; // 100 杯咖啡
std::condition_variable is_full; // 100 杯咖啡是否做完了
std::uniform_int_distribution<>
dis(0, 10); // 用于随机睡眠 0 到 1000 ms,模拟咖啡制作
std::mt19937 gen(20251018); // Mersenne Twister 伪随机数生成器
int caffe_num = 0; // 已经生产的咖啡数

创建咖啡店,来制作 100 杯咖啡:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
void caffe_shop() {
for (int i = 0; i < num; i++) {
std::unique_lock<std::mutex> lock(mtx);

// 咖啡的数量小于 100
is_full.wait(lock, []() { return caffe_num < num; });

// 继续做咖啡
std::string caffe_id = " Produce caffe id [" + std::to_string(i) + "]\n";
std::cout << caffe_id;

caffe_num++;
caffe.push(i);
// 模拟咖啡制作过程
int delayMs = dis(gen);
std::this_thread::sleep_for(std::chrono::milliseconds(delayMs));
}

// 通知顾客
is_full.notify_all();
}

而我自己先做点别的事情,等 100 倍咖啡制作完毕:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
void cosumer() {
// 模拟我在订购咖啡后做的其他工作
std::this_thread::sleep_for(std::chrono::milliseconds(1 * 1000));
std::cout << " consumer do others \n";

// 等到 100 杯咖啡
std::unique_lock<std::mutex> lock(mtx);
is_full.wait(lock, []() { return caffe_num == num; });

// 开始喝咖啡
while (!caffe.empty()) {
auto id = caffe.front();
caffe.pop();
std::cout << " Consume Caffe " << id << "\n";
}
}

在 main 函数中启动这两个线程即可。g++ 编译运行时记得链接 pthread 库,不然编译不过去。

1
2
3
4
5
6
7
8
9
10
int main() {

std::thread t1(caffe_shop);
std::thread t2(cosumer);

t1.join();
t2.join();

return 0;
}

多生产-多消费模式

上面是最简单的情况:单生产者-单消费者。可以修改之前的代码,实现多生产者-单消费者,单生产者-多消费者,多生产者-多消费者等各种场景。我们进阶一下,这里只给出相对复杂的多生产者-多消费者的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
#include <chrono>
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <queue>
#include <random>
#include <thread>

std::mutex mtx; // 访问信号量加锁
std::queue<int> caffe; // 放咖啡
const int num = 100; // 100 杯咖啡
std::condition_variable is_full; // 100 杯咖啡是否做完了
std::uniform_int_distribution<>
dis(0, 10); // 用于随机睡眠 0 到 1000 ms,模拟咖啡制作
std::mt19937 gen(20251018); // Mersenne Twister 伪随机数生成器
int caffe_num_p = 0; // 已经生产的咖啡数
int caffe_num_c = 0; // 已经消费的咖啡数

void caffe_shop(int id) {
while (true) {
// 模拟咖啡制作过程
int delayMs = dis(gen);
std::this_thread::sleep_for(std::chrono::milliseconds(delayMs));

std::unique_lock<std::mutex> lock(mtx);
if (caffe_num_p >= num) {
break;
}

// 咖啡的数量小于 100
is_full.wait(lock, []() { return caffe_num_p < num; });

// 继续做咖啡
std::string caffe_id = std::to_string(id) + " Produce caffe id [" +
std::to_string(caffe_num_p) + "]\n";
std::cout << caffe_id;

caffe.push(caffe_num_p);

// 通知一个消费者
is_full.notify_one();
caffe_num_p++;
// 可以解锁了,不过析构也会自动解锁
lock.unlock();
}

// 通知所有消费者
is_full.notify_all();
}

void cosumer(int id) {
while (true) {
std::unique_lock<std::mutex> lock(mtx);
// 有咖啡解锁,消费满 100 杯也解锁
is_full.wait(lock, []() { return !caffe.empty() || caffe_num_c >= num; });
// 如果解锁条件是消费满 100 杯,退出
if (caffe_num_c >= num) {
break;
}
caffe.pop();
std::cout << std::to_string(id) << " Consume Caffe " << caffe_num_c << "\n";
caffe_num_c++;
}
}

int main() {

int consumer_ = 3;
int procuder_ = 5;

std::vector<std::thread> threads;

for (int i = 0; i < procuder_; i++) {
threads.emplace_back(caffe_shop, i + 1);
}

for (int i = 0; i < consumer_; i++) {
threads.emplace_back(cosumer, i + 1);
}

for (auto &t : threads) {
t.join();
}

return 0;
}

模型转换实战

学到这里,我们来打个 boss。以实际的模型转换任务为例,但有所改编。将 A 格式的模型转换为 B 格式的模型,且 tensor 顺序不能变。由于模型过大,单线程读 A 模型写出为 B 模型还是慢了很多,现在就需要多线程工作。受限于分布式文件系统不支持 seek,写出 B 模型时仅支持在末尾追加。且写的速度远远快于读的速度。

所以就有了如下的 idea:多线程读取 A 格式的模型并且按 tensor 顺序存到缓冲区,另一个线程将缓冲区的模型拷贝到 B 模型。为了防止峰值内存过高,当缓冲区大于 512 MB 后,只允许写而不允许读。这就是一个典型靠条件变量工作的场景。来总结一下:

  1. 读线程将 A 模型按 tensor 顺序放到缓冲区
  2. 缓冲区大于 512 MB 后,读线程停止工作
  3. 缓冲区有数据时,写线程开始转换,并写到 B 模型
  4. 缓冲区无数据时,通知读线程开始工作
  5. A 模型被读取完毕后,读线程退出
  6. 写线程写完 B 模型后,退出

pVLJvuQ.png

这里用了两个条件变量,一个用于通知写线程可以开始写,一个用于通知读线程可以开始读。创建核心变量:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
std::mutex mtx;                             // 访问信号量加锁
const int NUM = 1000; // 模型大小
const int MID_BUFFER = 256; // 缓冲区大小
const int COPY_NUM = 64; // 一次拷贝的数据量
std::vector<int> src_buffer; // 源模型
std::vector<int> dst_buffer; // 目标模型
std::vector<int> mid_buffer(MID_BUFFER, 0); // 缓冲区
int p_size = 0; // 读取的 tensor 数,读取结束后,读线程退出
int c_size = 0; // 写出的 tensor 数,写完后,写线程退出
int m_size = 0; // 已用缓冲区大小
bool read_done = false; // 写完后,设置为 true,告诉读线程不用读了
std::condition_variable
can_push; // 当缓冲区为 0 时通知读线程工作,大于 256 时,读线程禁止工作
std::condition_variable can_pop; // 当缓冲区小于 256时,通知写线程工作

读代码的时候先读变量,所有函数都是围绕变量工作的。给出完整代码,详解可以仔细看看注释 ~~~

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
#include <chrono>
#include <condition_variable>
#include <cstdlib>
#include <iostream>
#include <mutex>
#include <queue>
#include <random>
#include <thread>

std::mutex mtx; // 访问信号量加锁
const int NUM = 1000; // 模型大小
const int MID_BUFFER = 256; // 缓冲区大小
const int COPY_NUM = 64; // 一次拷贝的数据量
std::vector<int> src_buffer; // 源模型
std::vector<int> dst_buffer; // 目标模型
std::vector<int> mid_buffer(MID_BUFFER, 0); // 缓冲区
int p_size = 0; // 读取的 tensor 数,读取结束后,读线程退出
int c_size = 0; // 写出的 tensor 数,写完后,写线程退出
int m_size = 0; // 已用缓冲区大小
bool read_done = false; // 写完后,设置为 true,告诉读线程不用读了
std::condition_variable
can_push; // 当缓冲区为 0 时通知读线程工作,大于 256 时,读线程禁止工作
std::condition_variable can_pop; // 当缓冲区小于 256时,通知写线程工作

inline int calc(int x) { return 3 * (x / 2 + 7) / 4; }

void Init() {
// 初始化模型
for (int i = 0; i < NUM; i++) {
src_buffer.push_back(i);
}
}

void Test() {
// 检查转换是否正确
for (int i = 0; i < NUM; i++) {
if (dst_buffer[i] != calc(src_buffer[i])) {
std::cout << " FAIL \n";
return;
}
}
std::cout << " SUCCESS \n";
}

void ReadBuffer(int thread_id) {
while (true) {
std::this_thread::sleep_for(std::chrono::milliseconds(rand() % 50));
std::unique_lock<std::mutex> lock(mtx);

// 如果读完或者写完,退出
if (p_size >= NUM || dst_buffer.size() >= NUM || read_done) {
break;
}
int _p_num = std::min(COPY_NUM, NUM - p_size);

// 如果读完了,或者 buffer 小于 256,可以解锁
can_push.wait(lock, [&_p_num]() {
return m_size + _p_num < MID_BUFFER || read_done;
});

// 如果是 read_done 导致解锁,退出
if (read_done) {
break;
}

// 拷贝到缓冲区
for (int i = p_size; i < p_size + _p_num; i++) {
mid_buffer[m_size + i - p_size] = src_buffer[i];
}

// 更新指针
p_size += _p_num;
m_size += _p_num;

// 通知写线程可以搬运缓冲区了
can_pop.notify_one();
}
can_pop.notify_one();
}

void PushBuffer() {
while (true) {
std::this_thread::sleep_for(std::chrono::milliseconds(rand() % 10));
std::unique_lock<std::mutex> lock(mtx);

// 如果写完了,退出
if (c_size >= NUM) {
break;
}

// 缓冲区有内容就可以写,也可以设置阈值,缓冲区大于 32 MB 时才开始写
can_pop.wait(lock, []() { return m_size > 0; });
int _c_num = std::min(m_size, NUM - c_size);

// 拷贝所有缓冲区内容,并转换
for (int i = 0; i < _c_num; i++) {
auto val = calc(mid_buffer[i]);
dst_buffer.push_back(val);
}
c_size += _c_num;

// 清空缓冲区
m_size = 0;
can_push.notify_one();
}
// 只有一个写线程,可以不加锁修改 read_done,通知读线程可以退出了
read_done = true;
can_push.notify_all();
}

int main() {
Init();

int n_procuder = 5;
int n_consumer = 1;

std::vector<std::thread> threads;

for (int i = 0; i < n_procuder; i++) {
threads.emplace_back(ReadBuffer, i);
}

threads.emplace_back(PushBuffer);

for (auto &t : threads) {
t.join();
}

Test();
return 0;
}
感谢上学期间打赏我的朋友们。赛博乞讨:我,秦始皇,打钱。

欢迎订阅我的文章