使用多线程必须注意一个问题,如果有多个线程同时运行,而且它们试图访问相同的资源,就会遇到各种问题。如请求在代码在阅读时写入其他函数,即一个线程要读,一个线程要写,读线程占据着资源,写线程得不到资源,自然就会与理想的结果相违背。本文更侧重理论解读,某些场景下,这些概念可同样用到进程中;样例代码为C语言,其他语言仍然适用。
没有锁的保护 多线程中,变量都由所有线程共享。所以,任何一个变量都可以被任何一个线程修改。如修改一个变量需要多条语句,在执行这几条语句时,线程可能中断,很可能把内容给改乱了。如下所示的代码,创建一个为0的变量,创建两个线程,每个线程对变量执行+1的操作,得到的最后结果不一定是2。t1, t2为线程,x1, x2为各自线程的临时变量:
1 2 3 4 5 6 7 t1: x1 = g + 1 t2: x2 = g + 1 t1: g = x1 t2: g = x2
而执行顺序则完全取决于操作系统的调度方案和CPU的核心是否忙碌。如:操作系统是否允许任务长时间占用CPU,CPU在不忙碌的情况下是否会进行线程切换。如果一旦发生线程切换,且没有对临界资源添加保护锁,很容易写出危险的程序。
示例代码 实现一个上面所描述情况的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 #include <stdio.h> #include <pthread.h> #define NUM_THREADS 2000 int g = 0 ;void * count (void * args) { g++; } int main () { pthread_t tids[NUM_THREADS]; for (int j = 0 ; j < 10 ; j++){ for (int i = 0 ; i < NUM_THREADS; ++i) { int ret = pthread_create(&tids[i], NULL , count, NULL ); if (ret != 0 ){ printf ("pthread_create error: error_code = %d" , ret); } } for (int i = 0 ; i < NUM_THREADS; i++) { pthread_join(tids[i], NULL ); } printf ("g = %d\n" , g); g = 0 ; } return 0 ; }
危险的程序自然得不到准确的结果:
临界区 临界资源:一个只允许一个进程使用的资源,也称互斥资源,独占资源,共享变量,如打印机,和上面程序中的变量g。涉及临界资源的代码段称为临界区。临界区是代码片段,包含了对临界资源的访问,每个进程可以有一个或多个临界区,临界区的设置由程序员设置。
为保证结果正确,需要各个进程互斥进入具有相同临界资源的临界区,实现对临界资源的互斥访问。有以下规则:
互斥准则:某个进程在临界区执行,其他所有进程被排斥在临界区外。其他进程不能进入相同临界资源的临界区。所以只有相同临界资源的临界区需要互斥。
有空让进:临界区无进程执行时,不能无限期延长下一个进入临界区进程的等待时间。离开临界区的进程要开放临界区,让其他进程加入。
有限等待:每个进程进入临界区的等待时间有限,不能无限等待。临界区资源尽可能小,否则一个进程霸占资源,其他进程等待时间会很长。
按照上述原则,得到访问临界区的方法:
进入区,实现互斥准则,保证一个进程进入
临界区,有限等待准则,不能过大
退出区,有空让进准则,放行其他进程的进入
互斥量 互斥量用于线程互斥访问临界资源,允许多个线程安全地共享一个关联的软件或者硬件资源。当一个线程想使用共享资源时,它必须先通过获取互斥量以获得专有的访问权限,成功加锁才能操作,操作结束解锁。如果该互斥量已被另一个线程锁定,请求线程可以等待该互斥量被解锁。
所以互斥信号量的取值范围只有0和1。在同一进程内使用,使用步骤为:
初始化
临界区前wait操作
临界区后signal操作
因此同一时刻,只能有一个线程持有该锁。因为资源是共享的,线程间也还是竞争的,但通过锁就将资源的访问变成互斥操作,线程不能同时操作数据。虽然降低了并发性,但能保证结果正确。
所以在锁定互斥量后,线程可以长时间地安全地使用相关联的资源。但是单个线程持有互斥量时间应尽可能的短,避免其他线程一直处于等待状态。当线程不再需要使用资源时,它必须将互斥量解锁,使得其它线程可以使用该资源。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 #include <stdio.h> #include <string.h> #include <pthread.h> #define NUM_THREADS 200 pthread_t tid[NUM_THREADS];int counter = 0 ;pthread_mutex_t lock;void *thread_func (void *arg) { pthread_mutex_lock(&lock); counter += 1 ; pthread_mutex_unlock(&lock); } int main () { int i = 0 ; int err; if (pthread_mutex_init(&lock, NULL ) != 0 ) { printf ("\n mutex init failed\n" ); return 1 ; } for (int j = 0 ; j < 10 ; j++) { while (i < NUM_THREADS) { err = pthread_create(&(tid[i]), NULL , &thread_func, NULL ); if (err != 0 ) printf ("\ncan't create thread :[%s]" , strerror(err)); i++; } i = 0 ; while (i < NUM_THREADS) { pthread_join(tid[i], NULL ); i++; } printf ("%d\n" , counter); counter = 0 ; i = 0 ; } pthread_mutex_destroy(&lock); return 0 ; }
信号量 信号量用于线程的同步,如经典的生产者消费者。信号量是一个同步对象,用于保持在0至指定最大值(初始化指定)之间的一个计数值,同样只能被两个标准的原语wait和signal访问。使用步骤为:
进入关键代码前,进程必须获取一个信号量,否则不能运行
执行完关键代码,释放信号量
信号量有数值,正值表示空闲,负值表示忙碌。
只能通过两个不可分割的原子操作来访问信号量,一个是申请操作wait,一个是释放操作signal。
当线程访问共享资源时,需完成一次对信号量的等待wait操作,该计数值减一;
当线程访问完毕共享资源时,需要完成一次对信号量的释放signal操作,计数值加一。
计数值大于0,表示其他线程可以访问该共享资源;
计数值等于0,则其他线程不能访问该资源;
假设信号量的初始值为$S$:
$S>0$:有S个资源可用
$S=0$:无资源可用
$S<0$:有$|S|$个进程在等待
有m个进程共享同一临界资源,若使用信号量机制实现对这一临界资源的互斥访问,则信号量的变化范围是[-(m-1),1]。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 #include <stdio.h> #include <stdlib.h> #include <string.h> #include <pthread.h> #include <semaphore.h> #define NUM_THREADS 200 pthread_t tid[NUM_THREADS];int count = 0 ;sem_t count_sem;void *thread_func (void *arg) { sem_wait(&count_sem); count++; sem_post(&count_sem); } int main (int argc, char *argv[]) { if (sem_init(&count_sem, 0 , 1 ) == -1 ) { printf ("sem_init: failed\n" ); } int i = 0 ; int err; for (int j = 0 ; j < 10 ; j++){ while (i < NUM_THREADS) { err = pthread_create(&(tid[i]), NULL , &thread_func, NULL ); if (err != 0 ) printf ("\ncan't create thread :[%s]" , strerror(err)); i++; } i = 0 ; while (i < NUM_THREADS) { pthread_join(tid[i], NULL ); i++; } printf ("%d\n" , count); count = 0 ; i = 0 ; } sem_destroy(&count_sem); return 0 ; }
信号量与互斥量的区别 虽然互斥量和信号量在一定程度上可以互相替代,比如可以把值最大为1的信号量当互斥量用,也可以用互斥量+计数器当信号量。
但是对于设计理念上还是有不同的,互斥量管理的是资源的使用权,偏向锁的概念;而信号量的绝对值表示资源的数量,有那么一点微妙的小区别。
打个比方,在早餐餐厅,大家要喝咖啡。
如果用互斥量的方式,同时只有一个人可以使用咖啡机,他获得了咖啡机的使用权后,开始做咖啡,其他人只能在旁边等着,直到他做好咖啡后,另外一个人才能获得咖啡机的使用权。
如果用信号量的模式,服务员获取咖啡机的锁,开始做咖啡。把咖啡做好放到柜台上,谁想喝咖啡就拿走一杯,服务员会不断做咖啡,如果咖啡杯被拿光了,想喝咖啡的人就排队等着。互斥量管理的是咖啡机的使用权,而信号量管理的是做好的咖啡数量。
如果信号量只有二进制的0或1,称为二进制信号量。在linux系统中,二进制信号量又称互斥锁。所以互斥锁和信号量的使用方法较为相似。
条件变量 以生产者消费者模型为例说明条件变量的使用条件。为节省开销,希望生产者制造出 100 个产品后才通知消费者。如果直接使用 mutex 互斥锁,需要在某个线程上不断地轮询:100 个产品是否生产完毕。相当于进行大量无效的询问,才能知道条件是否已经满足,并且每次询问均需要加锁和释放锁,这无疑会带来额外的开销。
而条件变量则高效地解决了这个问题。使用条件变量的情况下,我们可以直接等待某个条件的发生,而不需要主动轮询。条件变量是一种线程同步机制。核心思想是:一个线程等待『条件变量的条件成立』而挂起;另一个线程使『条件成立』。为了防止竞争,条件的检测是在互斥锁的保护下进行的,线程在改变条件状态前先要锁住互斥量。如果一个条件为假,则一个线程自动阻塞,该线程处于等待状态,并释放相关变量的互斥锁。如果另一个线程改变了条件,它将信号发送给关联的条件变量,唤醒一个或多个处于等待中的线程,使其重新获得互斥锁,重新评价条件。
但由于系统实现,会存在虚假唤醒等情况。即:线程并没有发出唤醒信号,处于等待中的线程仍然会自己醒来,这是一种能保证执行效率的方法。假设此时有10个线程处于等待中,在收到一个唤醒信号后,操作系统尝试去唤醒所有的线程,这会打破发送信号与唤醒之间一对一的关系。所以此时只能唤醒一个线程,而其余九个线程处于等待阶段。为了更灵活的处理这种情况,所以无论条件是否满足,操作系统允许等待中的线程自己醒来,称为虚假唤醒。
为了避免虚假唤醒对条件带来的影响,条件判断需要使用while而不是if。这是由于存在虚假唤醒等情况,导致唤醒后发现条件依旧不成立。因此需要使用while语句来循环地进行等待,直到条件成立为止。根据原理解析中生产者与消费者的模型,创建100个线程,且每当count \% 20 == 0时叫醒等待中的线程,防止过度轮循。试验结果如下图所示。
在上图中,可以观察到,在count取值为20,40,60,80时,由于叫醒速度过快,等待线程没有及时获取mutex锁来响应,因此等待线程中没有任何输出。而当等待线程因为虚假唤醒判断条件是否满足时,因为99小于100,不满足条件,所以等待线程继续等待。当最后count的取值为100时,等待线程被唤醒并继续向后执行,打印[thread main] wake - cond was signalled.语句。打印出最后的输出语句来判断count的值是否正确,结果[thread main] count == 100 so everyone is count显示答案正确。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 #include <pthread.h> #include <stdio.h> #include <unistd.h> #include <assert.h> int NUMTHREADS = 100 ;int count = 0 ;pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;pthread_cond_t cond = PTHREAD_COND_INITIALIZER;void *ThreadEntry () { pthread_mutex_lock(&mutex); count++; if (count % 20 == 0 && count != 0 ){ printf ("count is now %d. Signalling cond.\n" , count); pthread_cond_broadcast(&cond); } pthread_mutex_unlock(&mutex); } int main (int argc, char **argv) { pthread_t threads[NUMTHREADS]; for (int t = 0 ; t < NUMTHREADS; t++) pthread_create(&threads[t], NULL , ThreadEntry, NULL ); pthread_mutex_lock(&mutex); while (count != NUMTHREADS) { printf ("[thread main] count is %d which is < %d so waiting on cond\n" , count, NUMTHREADS); pthread_cond_wait(&cond, &mutex); } puts ("[thread main] wake - cond was signalled." ); pthread_mutex_unlock(&mutex); printf ("[thread main] count == %d so everyone is count\n" , count); pthread_cond_destroy(&cond); pthread_mutex_destroy(&mutex); return 0 ; }
参考 https://www.zhihu.com/question/47704079/answer/135960522