0%

C++ 多线程

rush 项目的时候,有些地方可以并行化,可以借助 C++ 的多线程来加速程序的执行。多线程的基本概念在一年前整过了,这里只是来看一下 C++ 的多线程该怎么写,顺便查漏补缺。

基本概念

在多线程进入 C++ 标准之前,人们使用 C++ 编写多线程的程序,只能依赖操作系统提供的 API。比如在 Linux 环境下就只能使用 pthread 库实现多线程,因此也一直被诟病。但有了 C++11 的 std::thread 以后,可以通过标准库在语言层面编写多线程程序了,直接的好处就是多线程程序的跨平台移植提供了便利。但是在编译的时候需要注意链接平台相关的线程库,如 g++ demo.cpp -lpthread -o test.o

简单实例

1
2
3
4
5
6
7
8
9
10
11
12
13
#include <iostream>
#include <thread>

void show_info(std::string str) {
std::cout << str << std::endl;
}

int main() {
std::string s{"hello world"};
std::thread t{show_info, s};
t.join();
return 0;
}

以上述程序为例,来详细的剖析一下多线程期间到底发生了什么:

  1. 首先引入头文件 thread,在这个头文件中,C++ 11 提供了创建、管理线程的类和方法;
  2. 使用 std::thread 创建线程,并通过列表初始化传入函数名作为构造函数的参数。传入的函数会作为子线程的入口函数,也就是说,当子线程准备就绪之后,就会开始执行这个入口函数。由于函数名表示函数的地址,子线程可以快捷的找到函数地址进而执行。

    我们知道,每个程序都有一个入口。当程序被装载到内存,处于系统态完成一些初始化的工作之后,控制权就转交给程序入口,并以此为标志进入用户态,这是一个程序的开始。同样地,线程也需要有「开始」的地方。作为线程入口的函数,就是线程函数,也就是例子中的 show_info。线程函数必须在启动线程之前,就准备好,否则线程去执行什么呢?并在线程初始化后立即执行。1

  3. 当线程函数返回时,线程也就随之终止了,上述程序中使用 join 衔接方法确保主线程在子线程退出之后才退出,因为主线程会阻塞住,直到该子线程退出为止。如果程序员没有显式的说明线程结束该如何处理,那么线程对象在被销毁时调用的析构函数中,会调用 std::terminate() 函数,销毁当前对象。如果程序写多了,应该不至于犯主线程退出子线程还没结束的低级错误。

detach

前面说过线程的 join 会阻塞调用线程,可以使用 detach 来避免,但一定要做好控制:避免主线程退出子线程还没结束的低级错误。一个 cppreference 官网的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
#include <iostream>
#include <chrono>
#include <thread>

void independentThread()
{
std::cout << "Starting concurrent thread.\n";
std::this_thread::sleep_for(std::chrono::seconds(2));
std::cout << "Exiting concurrent thread.\n";
}

void threadCaller()
{
std::cout << "Starting thread caller.\n";
std::thread t(independentThread);
t.detach();
std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout << "Exiting thread caller.\n";
}

int main()
{
threadCaller();
std::this_thread::sleep_for(std::chrono::seconds(5));
}

// Starting thread caller.
// Starting concurrent thread.
// Exiting thread caller.
// Exiting concurrent thread.

可调用类型

在创建线程对象时,传入的参数不仅是可被调用执行的函数,类的对象如果能被调用,也是可以作为线程对象的参数,用于构造函数初始化线程对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#include <iostream>
#include <thread>

class Task {
private:
int cnt;
public:
explicit Task()=default;
explicit Task(int a) : cnt{a} {};
void operator()() {
std::cout << this->cnt << std::endl;
}
};

int main() {
std::thread t{Task{1}};
t.join();
return 0;
}

因为要调用对象,所以重载了 () 运算符,不然线程不知道去哪个地址执行。此外,构造函数传入的是一个类类型的对象,所以对象会被拷贝到线程的存储空间,而后再开始执行。因此,类必须做好足够的拷贝控制,不然将出现难以调试的 bug,我大概只知道深浅拷贝,等有时间了去看下移动语义

当然,不重载 () 运算符,选择类中的函数执行也是可以的,不过需要注意以下两点:

  • 必须显式地使用函数指针,作为 std::thread 构造函数的第一个参数;知道执行哪个函数。
  • 非静态成员函数的第一个参数,实际上是类实例的指针。所以在创建线程时,需要显式地填入这个参数;知道执行的函数在哪个对象。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#include <iostream>
#include <thread>

class A{
private:
int a;
public:
explicit A()=default;
explicit A(int t) : a{t} {};
void show_info() {
std::cout << this->a << std::endl;
}
};

int main() {
A a{12};
std::thread t{&A::show_info, &a};
t.join();
return 0;
}

其他要注意的数据类型

引用

如果子线程函数的参数是引用类型,也需要格外注意。由于子线程的数据是主线程的拷贝,因此子线程函数得到的拷贝实际是「线程存储空间中的拷贝的引用」,并不是主线程中的变量,应该使用 std::ref() 来生成正确的引用绑定,否则会报错。

1
2
3
4
5
6
7
8
9
10
11
12
13
#include <iostream>
#include <thread>

void show_info(std::string& s) {
std::cout << s << std::endl;
}

int main() {
std::string s{"hello world"};
std::thread t{show_info, std::ref(s)};
t.join();
return 0;
}

右值引用和移动语义等我后期开坑了。

锁与线程安全

众所周知,写代码的人都学过操作系统,学过操作系统都知道线程同步。线程同步一般有三种机制:互斥量、信号量和条件变量,这三者到底什么已经在这篇博客中详细的描写过了,所以不再多说。不过当时是用 C 语言写的,现在来了解下 C++ 的写法。

mutex

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#include <iostream>
#include <thread>
#include <mutex>
#include <chrono>

int counter = 0;
std::mutex mtx;

void increase(int time) {
for (int i = 0; i < time; i++) {
mtx.lock();
std::this_thread::sleep_for(std::chrono::milliseconds(1));
counter++;
mtx.unlock();
}
}

int main(int argc, char** argv) {
std::thread t1(increase, 100);
std::thread t2(increase, 100);
t1.join();
t2.join();
std::cout << "counter:" << counter << std::endl;
return 0;
}
  1. 引入 mutex 头文件,创建 std::mutex 对象 mtx
  2. 对于 mtx 对象,任意时刻最多允许一个线程对其进行上锁,上锁后操作变量,就不会出错
  3. mtx.try_lock() 是尝试上锁,如果上锁不成功,当前线程不阻塞
  4. 在用完锁之后一定记得释放锁,否则会发生死锁现象

lock_guard

为了避免 mutex 忘记解锁等情况,可以使用 std::lock_guard这个类只有构造函数和析构函数,搭配 mutex 使用,在创建这个对象时传入锁,调用锁的 lock 函数;变量销毁会调用析构函数,此时调用锁的 unlock 函数,这也就是传说中的 RAII 机制 2

如下述程序 3 ,避免一个线程意外退出没来得及释放锁,导致另一个线程无法获取资源而死锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
#include <iostream>
#include <thread>
#include <mutex>
#include <chrono>
#include <stdexcept>

int counter = 0;
std::mutex mtx;

void increase_proxy(int time, int id) {
for (int i = 0; i < time; i++) {
std::lock_guard<std::mutex> lk(mtx);
if (id == 1) {
throw std::runtime_error("throw excption....");
}
// 当前线程休眠1毫秒
std::this_thread::sleep_for(std::chrono::milliseconds(1));
counter++;
}
}

void increase(int time, int id) {
try {
increase_proxy(time, id);
}
catch (const std::exception& e){
std::cout << "id:" << id << ", " << e.what() << std::endl;
}
}

int main(int argc, char** argv) {
std::thread t1(increase, 100, 1);
std::thread t2(increase, 100, 2);
t1.join();
t2.join();
std::cout << "counter:" << counter << std::endl;
return 0;
}

lock_guard 与 adopt_lock

还有一种为了防止死锁的方式是一次性申请所有临界资源的互斥量,只有申请到才能进行之后的操作,而 std::lock 提供了这种实现 4。此外,为了防止没有锁定或提前释放互斥量导致危险,可以使用 lock_guard 并传入 std::adopt_lock,前者保证当变量销毁时释放互斥量,后者保证线程已经上锁成功时不再调用 lock() 函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
#include <mutex>
#include <thread>

struct bank_account {
explicit bank_account(int balance) : balance(balance) {}
int balance;
std::mutex m;
};

void transfer(bank_account &from, bank_account &to, int amount) {
// avoid deadlock in case of self transfer
if(&from == &to)
return;
// lock both mutexes without deadlock
std::lock(from.m, to.m);
// make sure both already-locked mutexes are unlocked at the end of scope
std::lock_guard<std::mutex> lock1(from.m, std::adopt_lock);
std::lock_guard<std::mutex> lock2(to.m, std::adopt_lock);

from.balance -= amount;
to.balance += amount;
}

int main() {
bank_account my_account(100);
bank_account your_account(50);
std::thread t1(transfer, std::ref(my_account), std::ref(your_account), 10);
std::thread t2(transfer, std::ref(your_account), std::ref(my_account), 5);
t1.join();
t2.join();
return 0;
}

除了 adopt_lock 之外,还有 try_to_lockdefer_lock,他们都有不同的应用场景,还可以配合使用:

1
2
3
4
5
6
7
8
9
10
11
void print_block (int n, char c) {
//unique_lock有多组构造函数, 这里std::defer_lock不设置锁状态
std::unique_lock<std::mutex> my_lock (mtx, std::defer_lock);
//尝试加锁, 如果加锁成功则执行
//(适合定时执行一个job的场景, 一个线程执行就可以, 可以用更新时间戳辅助)
if(my_lock.try_lock()) {
for (int i = 0; i < n; ++i)
std::cout << c;
std::cout << '\n';
}
}

其他锁的内容实在是太多了,还有时间锁、递归锁、lock_unique,读写锁的 shared_lock 等等,等哪天用到在整理这些,这里只整理最简单的,详情可以参考 cppreference 5

条件变量

如果按照之前 C 语言的写法,条件变量需要注意的是 wait 那边的判断一定是 while 循环。C 语言风格的代码

当然,如果按照 C++ 的写法,我们发现条件变量的 wait 方法有两个参数,第二个参数用于接受一个变量,如果继续等待,那么那个变量的取值是 false,如果不需等待,那么那个变量返回 true

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
#include <iostream>
#include <mutex>
#include <thread>
#include <condition_variable>

std::mutex g_mutex;
std::condition_variable g_cond;
int g_i = 0;
bool g_running = false;

void ThreadFunc(int n) {
for (int i = 0; i < n; ++i) {
{
std::lock_guard<std::mutex> lock(g_mutex);
++g_i;
std::cout << "plus g_i by func thread "
<< std::this_thread::get_id() << std::endl;
}
}

// 等待被唤醒
std::unique_lock<std::mutex> lock(g_mutex);
std::cout << "wait for exit" << std::endl;

g_cond.wait(lock, [=] {return g_running;});

std::cout << "func thread exit" << std::endl;
}

int main() {
int n = 100;
std::thread t1(ThreadFunc, n);
std::this_thread::sleep_for(std::chrono::seconds(1));
for (int i = 0; i < n; ++i) {
{
std::lock_guard<std::mutex> lock(g_mutex);
++g_i;
std::cout << "plus g_i by main thread "
<< std::this_thread::get_id() << std::endl;
}
}

// 唤醒
{
std::lock_guard<std::mutex> lock(g_mutex);
g_running = true;
g_cond.notify_one();
}

t1.join();
std::cout << "g_i = " << g_i << std::endl;
}

输出如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
plus g_i by func thread 140476623930944
plus g_i by func thread 140476623930944
wait for exit // 表示子线程等待唤醒
plus g_i by main thread 140476623935296
plus g_i by main thread 140476623935296
plus g_i by main thread 140476623935296
plus g_i by main thread 140476623935296
plus g_i by main thread 140476623935296
plus g_i by main thread 140476623935296
plus g_i by main thread 140476623935296
plus g_i by main thread 140476623935296
plus g_i by main thread 140476623935296
plus g_i by main thread 140476623935296
plus g_i by main thread 140476623935296
plus g_i by main thread 140476623935296
func thread exit // 子线程被唤醒
g_i = 200

信号量

因为一开始我也不知道该怎么去写信号量,所以打开了万能的搜索引擎,看到了关于 C++ 不支持信号量这样的东西 6。如果想实现信号量,可以通过互斥量和条件变量来实现。而关于信号量和互斥量的区别,在这篇文章中已经写明了。那么来实现一个信号量的类 7

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
#include <iostream>
#include <mutex>
#include <thread>
#include <vector>
#include <condition_variable>

class Semaphore {
private:
std::mutex mutex_;
std::condition_variable cv_;
int count_;

public:
explicit Semaphore(int count = 0) : count_(count) {
}

void Signal() {
std::unique_lock<std::mutex> lock(mutex_);
++count_;
cv_.notify_one();
}

void Wait() {
std::unique_lock<std::mutex> lock(mutex_);
// 第二个参数,如果返回 false 继续等待, 如果为 true,可以继续申请资源
cv_.wait(lock, [=] { return count_ > 0; });
--count_;
}
};

std::string FormatTimeNow(const char* format) {
auto now = std::chrono::system_clock::now();
std::time_t now_c = std::chrono::system_clock::to_time_t(now);
std::tm* now_tm = std::localtime(&now_c);

char buf[20];
std::strftime(buf, sizeof(buf), format, now_tm);
return std::string(buf);
}

Semaphore g_semaphore(3);
// 防止同时抢占输出资源
std::mutex g_io_mutex;

void Worker() {
g_semaphore.Wait();

std::thread::id thread_id = std::this_thread::get_id();
std::string now = FormatTimeNow("%H:%M:%S");
{
std::lock_guard<std::mutex> lock(g_io_mutex);
std::cout << "Thread " << thread_id << ": wait succeeded"
<< " (" << now << ")" << std::endl;
}
// Sleep 1 second to simulate data processing.
std::this_thread::sleep_for(std::chrono::seconds(1));

g_semaphore.Signal();
}

int main() {
std::vector<std::thread> v;
for (std::size_t i = 0; i < 3; ++i) {
v.emplace_back(&Worker);
}
for (std::thread& t : v) {
t.join();
}
return 0;
}
  • 信号量的值为 3,表示能同时申请 3 个资源
  • 当一个线程申请资源后,即执行了 wait 操作,count_ 取值递减,表示有一个资源被占用
  • count_ 取值小于 0 时,调用条件变量的 wait 方法,当先线程等待有了资源被唤醒
  • 当一个线程释放资源后,执行了 signal 操作,count_ 取值递增,表示有一个资源被释放,并执行 notify_one 方法,即唤醒一个等待的线程

参考

感谢上学期间打赏我的朋友们。赛博乞讨:我,秦始皇,打钱。

欢迎订阅我的文章