从今年春节回来到现在,经历了不少事情。暑期勉强打赢了“复活赛”,不过主包也快到残血状态了。现在终于能腾出点时间,好好学习一下。
有很多想学的东西,技术方向包括 vllm、deepspeed、大模型、CUDA、模型量化等;非技术方面,想学剪视频和修图——毕竟手里攒了一堆素材,哈哈哈,慢慢来吧。
今天想聊聊信号量,这也是我在新工作中遇到的实际场景。信号量本质上是一个变量,常用于多线程环境中的同步控制。举个例子:假设有两个线程 A 和 B,A 在处理临界资源,当满足某些条件时,它会释放锁并通知 B;而 B 完全没必要一直轮询 “A 做完了吗?”。这样既浪费资源,又不够优雅。
信号量入门 一个经典的场景是生产者和消费者模型。我去线下店订购 100 杯咖啡,老板做好后通知我就可以了,我没必要隔 1 分钟就去问老板做好了没有。老板是 A 线程,我是 B 线程,临界资源是 100 杯咖啡。来看 C++ 中是如何使用条件变量的,我们来模拟生产 100 杯咖啡,而后一次性消费的过程。
单生产-单消费模式 C++ 提供的条件变量 std::condition_variable
允许一个或多个线程等待某个条件变真,等待的函数是 wait(lock, predicate)
,原子的释放锁,并检查谓词条件 predicate 是否成立,如果不成立,会再次等待。当另外的线程改变了条件时,可以使用 notify_one
或者 notify_all
通知正在等待的线程,前者唤醒一个等待的线程,后者唤醒所有正在等待的线程。我们用 queue
来存储咖啡,创建相关的变量:
1 2 3 4 5 6 7 8 std::mutex mtx; std::queue<int > caffe; const int num = 100 ; std::condition_variable is_full; std::uniform_int_distribution<> dis (0 , 10 ); std::mt19937 gen (20251018 ) ; int caffe_num = 0 ;
创建咖啡店,来制作 100 杯咖啡:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 void caffe_shop () { for (int i = 0 ; i < num; i++) { std::unique_lock<std::mutex> lock (mtx) ; is_full.wait (lock, []() { return caffe_num < num; }); std::string caffe_id = " Produce caffe id [" + std::to_string (i) + "]\n" ; std::cout << caffe_id; caffe_num++; caffe.push (i); int delayMs = dis (gen); std::this_thread::sleep_for (std::chrono::milliseconds (delayMs)); } is_full.notify_all (); }
而我自己先做点别的事情,等 100 倍咖啡制作完毕:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 void cosumer () { std::this_thread::sleep_for (std::chrono::milliseconds (1 * 1000 )); std::cout << " consumer do others \n" ; std::unique_lock<std::mutex> lock (mtx) ; is_full.wait (lock, []() { return caffe_num == num; }); while (!caffe.empty ()) { auto id = caffe.front (); caffe.pop (); std::cout << " Consume Caffe " << id << "\n" ; } }
在 main 函数中启动这两个线程即可。g++ 编译运行时记得链接 pthread
库,不然编译不过去。
1 2 3 4 5 6 7 8 9 10 int main () { std::thread t1 (caffe_shop) ; std::thread t2 (cosumer) ; t1.join (); t2.join (); return 0 ; }
多生产-多消费模式 上面是最简单的情况:单生产者-单消费者。可以修改之前的代码,实现多生产者-单消费者,单生产者-多消费者,多生产者-多消费者等各种场景。我们进阶一下,这里只给出相对复杂的多生产者-多消费者的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 #include <chrono> #include <condition_variable> #include <iostream> #include <mutex> #include <queue> #include <random> #include <thread> std::mutex mtx; std::queue<int > caffe; const int num = 100 ; std::condition_variable is_full; std::uniform_int_distribution<> dis (0 , 10 ); std::mt19937 gen (20251018 ) ; int caffe_num_p = 0 ; int caffe_num_c = 0 ; void caffe_shop (int id) { while (true ) { int delayMs = dis (gen); std::this_thread::sleep_for (std::chrono::milliseconds (delayMs)); std::unique_lock<std::mutex> lock (mtx) ; if (caffe_num_p >= num) { break ; } is_full.wait (lock, []() { return caffe_num_p < num; }); std::string caffe_id = std::to_string (id) + " Produce caffe id [" + std::to_string (caffe_num_p) + "]\n" ; std::cout << caffe_id; caffe.push (caffe_num_p); is_full.notify_one (); caffe_num_p++; lock.unlock (); } is_full.notify_all (); } void cosumer (int id) { while (true ) { std::unique_lock<std::mutex> lock (mtx) ; is_full.wait (lock, []() { return !caffe.empty () || caffe_num_c >= num; }); if (caffe_num_c >= num) { break ; } caffe.pop (); std::cout << std::to_string (id) << " Consume Caffe " << caffe_num_c << "\n" ; caffe_num_c++; } } int main () { int consumer_ = 3 ; int procuder_ = 5 ; std::vector<std::thread> threads; for (int i = 0 ; i < procuder_; i++) { threads.emplace_back (caffe_shop, i + 1 ); } for (int i = 0 ; i < consumer_; i++) { threads.emplace_back (cosumer, i + 1 ); } for (auto &t : threads) { t.join (); } return 0 ; }
模型转换实战 学到这里,我们来打个 boss。以实际的模型转换任务为例,但有所改编。将 A 格式的模型转换为 B 格式的模型,且 tensor 顺序不能变。由于模型过大,单线程读 A 模型写出为 B 模型还是慢了很多,现在就需要多线程工作。受限于分布式文件系统 不支持 seek,写出 B 模型时仅支持在末尾追加。且写的速度远远快于读的速度。
所以就有了如下的 idea:多线程读取 A 格式的模型并且按 tensor 顺序存到缓冲区,另一个线程将缓冲区的模型拷贝到 B 模型。为了防止峰值内存过高,当缓冲区大于 512 MB 后,只允许写而不允许读。这就是一个典型靠条件变量工作的场景。来总结一下:
读线程将 A 模型按 tensor 顺序放到缓冲区
缓冲区大于 512 MB 后,读线程停止工作
缓冲区有数据时,写线程开始转换,并写到 B 模型
缓冲区无数据时,通知读线程开始工作
A 模型被读取完毕后,读线程退出
写线程写完 B 模型后,退出
这里用了两个条件变量,一个用于通知写线程可以开始写,一个用于通知读线程可以开始读。创建核心变量:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 std::mutex mtx; const int NUM = 1000 ; const int MID_BUFFER = 256 ; const int COPY_NUM = 64 ; std::vector<int > src_buffer; std::vector<int > dst_buffer; std::vector<int > mid_buffer (MID_BUFFER, 0 ) ; int p_size = 0 ; int c_size = 0 ; int m_size = 0 ; bool read_done = false ; std::condition_variable can_push; std::condition_variable can_pop;
读代码的时候先读变量,所有函数都是围绕变量工作的。给出完整代码,详解可以仔细看看注释 ~~~
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 #include <chrono> #include <condition_variable> #include <cstdlib> #include <iostream> #include <mutex> #include <queue> #include <random> #include <thread> std::mutex mtx; const int NUM = 1000 ; const int MID_BUFFER = 256 ; const int COPY_NUM = 64 ; std::vector<int > src_buffer; std::vector<int > dst_buffer; std::vector<int > mid_buffer (MID_BUFFER, 0 ) ; int p_size = 0 ; int c_size = 0 ; int m_size = 0 ; bool read_done = false ; std::condition_variable can_push; std::condition_variable can_pop; inline int calc (int x) { return 3 * (x / 2 + 7 ) / 4 ; }void Init () { for (int i = 0 ; i < NUM; i++) { src_buffer.push_back (i); } } void Test () { for (int i = 0 ; i < NUM; i++) { if (dst_buffer[i] != calc (src_buffer[i])) { std::cout << " FAIL \n" ; return ; } } std::cout << " SUCCESS \n" ; } void ReadBuffer (int thread_id) { while (true ) { std::this_thread::sleep_for (std::chrono::milliseconds (rand () % 50 )); std::unique_lock<std::mutex> lock (mtx) ; if (p_size >= NUM || dst_buffer.size () >= NUM || read_done) { break ; } int _p_num = std::min (COPY_NUM, NUM - p_size); can_push.wait (lock, [&_p_num]() { return m_size + _p_num < MID_BUFFER || read_done; }); if (read_done) { break ; } for (int i = p_size; i < p_size + _p_num; i++) { mid_buffer[m_size + i - p_size] = src_buffer[i]; } p_size += _p_num; m_size += _p_num; can_pop.notify_one (); } can_pop.notify_one (); } void PushBuffer () { while (true ) { std::this_thread::sleep_for (std::chrono::milliseconds (rand () % 10 )); std::unique_lock<std::mutex> lock (mtx) ; if (c_size >= NUM) { break ; } can_pop.wait (lock, []() { return m_size > 0 ; }); int _c_num = std::min (m_size, NUM - c_size); for (int i = 0 ; i < _c_num; i++) { auto val = calc (mid_buffer[i]); dst_buffer.push_back (val); } c_size += _c_num; m_size = 0 ; can_push.notify_one (); } read_done = true ; can_push.notify_all (); } int main () { Init (); int n_procuder = 5 ; int n_consumer = 1 ; std::vector<std::thread> threads; for (int i = 0 ; i < n_procuder; i++) { threads.emplace_back (ReadBuffer, i); } threads.emplace_back (PushBuffer); for (auto &t : threads) { t.join (); } Test (); return 0 ; }