0%

操作系统『二』:简单的多线程编程

继承上文,通过一些极度简单的例子来理解多线程、多进程编程,承接理论接触,为后面的同步、死锁打下基础。

多进程

可以考虑python的multiprocessing库来实现多进程。

进程不共享数据

众所周知,进程之间是不共享数据的,那么一探究竟。本例程的任务是,创建两个进程实现两个计算任务,完成计算任务1后才能完成计算任务2,这是明显的循序。首先,创建了两个子进程p和q,子进程p执行sum1函数,子进程q执行sum2函数。start()方法表示进程开始,join()方法表示等待进程结束。先执行完q进程,修改a和c的值,在执行p进程修改a的值,得到最后a的结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
from multiprocessing import Process
import os

a, b, c, d = 1, 2, 3, 4

# 子进程要执行的代码
def sum1(name):
global a, b, c
print('Run child process %s (%s)...' % (name, os.getpid()))
a = a + 1 + c
print(a)

def sum2(name):
global a, c, d
print('Run child process %s (%s)...' % (name, os.getpid()))
a = a + 2
c = c + d

if __name__=='__main__':
print('Parent process %s.' % os.getpid())
p = Process(target=sum1, args=('a + b',))
q = Process(target=sum2, args=('c + d',))
print('Child process will start.')
p.start()
q.start()
q.join()
p.join()
print('Child process end.')
print(a)

执行结果输出如下:

1
2
3
4
5
6
7
Parent process 23843.
Child process will start.
Run child process a + b (23844)...
5
Run child process c + d (23845)...
Child process end.
1

最后一条的数据结果表明现在的a是1(主进程的数据),即程序的主进程和创建的两个子进程之间不共享数据,证明了结论。即多进程中,同一个变量,各自有一份拷贝存在于每个进程中,互不影响。那如果就想让进程之间共享数据该如何操作呢?

进程共享数据

只需要特殊的声明变量即可,以下是数值类型的变量:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from multiprocessing import Process
import os, multiprocessing

a = multiprocessing.Value("d", 1)
b = multiprocessing.Value("d", 2)
c = multiprocessing.Value("d", 3)
d = multiprocessing.Value("d", 4)

# 子进程要执行的代码

def sum1(name):
global a, b, c
print('Run child process %s (%s)...' % (name, os.getpid()))
a.value = a.value + 1 + c.value

def sum2(name):
global a, c, d
print('Run child process %s (%s)...' % (name, os.getpid()))
a.value = a.value + 2
c.value = c.value + d.value

if __name__=='__main__':
print('Parent process %s.' % os.getpid())
p = Process(target=sum1, args=('a + b',))
q = Process(target=sum2, args=('c + d',))
print('Child process will start.')
p.start()
q.start()
q.join()
p.join()
print('Child process end.')
print(a.value)

输出结果如下,a的值是7,表明此时多进程以共享数据。

1
2
3
4
5
6
Parent process 23949.
Child process will start.
Run child process a + b (23950)...
Run child process c + d (23951)...
Child process end.
7.0

顺手给出其他类型的数据共享的方法:

  • 数组型:num=multiprocessing.Array("i",[1,2,3,4,5])
  • 字典:mydict=multiprocessing.Manager().dict()
  • 列表:mylist=multiprocessing.Manager().list(range(5))

多进程性能记录

并行计算基础篇提到,将两个进程分配给两个CPU核心,有利于提升程序的执行效率,那么来一探究竟。

这也要求了函数是平行的,即函数的执行没有明显的顺序关系。首先,创建两个进程,在创建两个函数。分别对比顺序执行两个函数和用两个进程执行两个函数的时间,以此来观察多进程是否提升了效率。其中计时函数利用python的time.time()方法,且只记录函数的执行时间。

顺序执行两个函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import time

a, b, c, d = 1, 2, 3, 4

def sum1():
global a, b, c
a = a + 1 + c

def sum2():
global a, c, d
a = a + 2
c = c + d

if __name__=='__main__':
start = time.time()
sum1()
sum2()
end = time.time()
print(end - start)
print(a)

两个进程分别执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from multiprocessing import Process
import os, multiprocessing
import time

a, b, c, d = 1, 2, 3, 4

def sum1(name):
global a, c
a = a + 1 + c

def sum2(name):
global a, c, d
a = a + 2
c = c + d

if __name__=='__main__':
print('Parent process %s.' % os.getpid())
p = Process(target=sum1, args=('a + b',))
q = Process(target=sum2, args=('c + d',))
print('Child process will start.')
start = time.time()
p.start()
q.start()
q.join()
p.join()
end = time.time()
print('Child process end.')
print(end - start)

这两个程序不用对比了,顺序执行的比进程执行的要块的多,顺序执行大约0.00000009秒,线程执行大约0.002秒,根本不在一个数量级。

修正

也许你看到这里会疑问,为什么顺序执行会比多进程要快,多进程明明把两个进程分配给了两个核心,并行执行为啥还慢了好几个数量级?

因为,进程的创建、销毁也有额外的开销啊~当开销时间远远比执行时间少的时候,才能体现多进程的优势。既然这样,就额外增加程序的执行时间即可,让两个函数都延时两秒看看。

延时的顺序执行

通过time.sleep(2)方法给每个函数增加两秒的延时:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import time

a, b, c, d = 1, 2, 3, 4

def sum1():
global a, b, c
a = a + 1 + c
time.sleep(2)

def sum2():
global a, c, d
a = a + 2
c = c + d
time.sleep(2)

if __name__=='__main__':
start = time.time()
sum1()
sum2()
end = time.time()
print(end - start)
print(a)

延时的多进程

通过time.sleep(2)方法给每个函数增加两秒的延时:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
from multiprocessing import Process
import os, multiprocessing
import time

a, b, c, d = 1, 2, 3, 4

def sum1(name):
global a, c
a = a + 1 + c
time.sleep(2)

def sum2(name):
global a, c, d
a = a + 2
c = c + d
time.sleep(2)

if __name__=='__main__':
print('Parent process %s.' % os.getpid())
p = Process(target=sum1, args=('a + b',))
q = Process(target=sum2, args=('c + d',))
print('Child process will start.')
start = time.time()
p.start()
q.start()
q.join()
p.join()
end = time.time()
print('Child process end.')
print(end - start)

我这里的对比结果是:顺序执行的需要4秒,而多进程仅需2秒。证明了多进程的确能有效提高程序的执行效率。

阻塞/非阻塞

通过操作系统,我们可以知道进程的状态转换:

  • 新建,分配进程所需资源,在创建进程后,进程将被设为就绪态,所以创建时不可能为其分配CPU去执行
  • 就绪,进程等待分配处理器,不会到阻塞态
  • 运行,获取处理器后,指令在执行。中断、时间片用完后为就绪态;等待IO变为阻塞态;
  • 阻塞,进程等待某个事件发生,事件完成后为就绪,不能到运行态。操作系统会把该线程阻塞起来,避免浪费CPU资源,等到得到了资源,再变成就绪状态,等待CPU调度运行
  • 终止,进程执行完毕

阻塞和非阻塞关注的是程序在等待调用结果时的状态

  • 阻塞调用是指调用结果返回之前,当前线程会被挂起。它会一直阻塞着,也就是后面的代码都不会执行了,调用线程只有在得到结果之后才会返回。
  • 非阻塞调用指在不能立刻得到结果之前,当前线程并不会阻塞,无需等待操作彻底完成,也就是后面的程序该执行就执行。

举个例子,你打电话问书店老板有没有《分布式系统》这本书:

  • 阻塞式调用,你会一直把自己『挂起』,直到得到这本书有没有的结果;
  • 如果是非阻塞式调用,你不管老板有没有告诉你,你自己先一边去玩了,当然你也要偶尔过几分钟检查一下老板有没有返回结果。在这里阻塞与非阻塞与是否同步异步无关。跟老板通过什么方式回答你结果无关。

注:阻塞和挂起:阻塞是被动的,比如抢不到资源。挂起是主动的,线程自己调用 suspend() 把自己退出运行态了。

进程池

可以使用进程池实现对每个进程的管理,即:决定哪些进程可以在进程池内,等待进程池内的所有进程执行完毕才执行下一部分的程序。粗暴的理解:一个大池子,这个池子里有好多进程,通过这个池子实现对进程的统一管理。

apply_async 方法

为了体现进程的并发,我特意在12个核的CPU内创建了13个进程。意思是:会有一个进程要等待其他进程释放一个CPU资源才能执行。进程池的close()方法会阻止其他进程在进入进程池,join()方法会等待进程池全部的进程执行完毕。创建13个进程,并使每个进程等待2秒观看执行效果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from multiprocessing import Pool
import os, time

def long_time_task(name):
print('Run task %s (%s)...' % (name, os.getpid()))
start = time.time()
time.sleep(2)
end = time.time()
print('Task %s runs %0.2f seconds.' % (name, (end - start)))

if __name__=='__main__':
print('Parent process %s.' % os.getpid())
p = Pool(13)
for i in range(14):
​ p.apply_async(long_time_task, args=(i,))
print('Waiting for all subprocesses done...')
# 不能在增加其他进程
p.close()
# Wait for the worker processes to exit.
p.join()
print('All subprocesses done.')

可以看到,第13个进程很晚才创建,在3, 2, 1, 12, 0, 10, 4, 7, 11, 8这几个进程执行完毕后,第13个进程才刚刚创建。

在这13个进程中,每个进程都有一个主线程。CPU给线程分配时间片(也就是分配给线程的时间),执行完时间片后会切换都另一个线程(不是时间片也可能是其他的调度算法)。这里的进程池可以阻塞也可以不阻塞:

  • apply:阻塞执行,同步,池子中第一个进程执行完才会执行第二个
  • apply_async:阻塞执行,异步,返回的结果是异步的
  • 官方文档可知,两者都是阻塞的

map 方法

普通的map方法只支持一个可迭代对象。这个方法会将可迭代对象分割为许多块,然后提交给进程池。对于很长的对象,可以使用imap 将 chunksize 设置为一个正整数从而(近似)指定每个块的大小。若想添加额外参数,可以使用 starmap 方法。同样,也都有对应的异步版本。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import multiprocessing as mp

def square(x):
return x * x

if __name__ == "__main__":

nprocs = mp.cpu_count()
print(f"Number of CPU cores: {nprocs}")

pool = mp.Pool(processes=nprocs)

result = pool.map(square, range(20))
print(result)
pool.close()
pool.join()

多线程编程

假如机器本身安装了多个处理器,那么程序会运行得更快,毋需作出任何特殊的调校。多任务可以由多进程完成,也可以由一个进程内的多线程完成。

创建线程

1
2
3
4
5
6
7
8
9
10
11
12
import time, threading

def loop():
print('thread %s is running...' % threading.current_thread().name)
time.sleep(1)
print('thread %s ended.' % threading.current_thread().name)

print('thread %s is running...' % threading.current_thread().name)
t = threading.Thread(target=loop, name='Thread-1')
t.start()
t.join()
print('thread %s ended.' % threading.current_thread().name)

由于任何进程默认就会启动一个线程,我们把该线程称为主线程,主线程又可以启动新的线程。current_thread()函数,它永远返回当前线程的实例。主线程实例的名字叫MainThread,子线程的名字在创建时指定,我们用Thread-1命名子线程。输出结果如下(最后输出的是主进程的结束):

1
2
3
4
thread MainThread is running...
thread LoopThread is running...
thread LoopThread ended.
thread MainThread ended.

join()方法会阻塞调用线程,直到当前进程返回结果,所以这是阻塞调用。

多线程共享数据

仿照上文中多进程的程序,仿一个多线程的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import threading

a, b, c, d = 1, 2, 3, 4

def sum1():
global a, b, c
a = a + 1 + c
print(a)

def sum2():
global a, c, d
a = a + 2
c = c + d
print(a)

if __name__=='__main__':
p = threading.Thread(target=sum1)
q = threading.Thread(target=sum2)
print('Child process will start.')
p.start()
q.start()
q.join()
p.join()
print('Child process end.')
print(a)

输出结果为:

1
2
3
4
5
Child process will start.
5
7
Child process end.
7

多核线程

如果你不幸拥有一个多核CPU,你肯定在想,多核应该可以同时执行多个线程。如果写一个死循环的话,会出现什么情况呢?我们可以监控到一个死循环线程会100%占用一个CPU。

如果有两个死循环线程,在多核CPU中,可以监控到会占用200%的CPU,也就是占用两个CPU核心。要想把N核CPU的核心全部跑满,就必须启动N个死循环线程。

但python启动与CPU核心数量相同的N个线程跑死循环,在4核CPU上可以监控到CPU占用率仅有102%,也就是仅使用了一核。但是用C、C++或Java来改写相同的死循环,直接可以把全部核心跑满,4核就跑到400%,8核就跑到800%,为什么Python不行呢?

因为Python的线程虽然是真正的线程,但解释器执行代码时,有一个GIL锁:Global Interpreter Lock。任何Python线程执行前,必须先获得GIL锁,然后,每执行100条字节码,解释器就自动释放GIL锁,让别的线程有机会执行。这个GIL全局锁实际上把所有线程的执行代码都给上了锁,所以,多线程在Python中只能交替执行,即使100个线程跑在100核CPU上,也只能用到1个核。

GIL最大的问题就是Python的多线程程序并不能利用多核CPU的优势 。在讨论普通的GIL之前,有一点要强调的是GIL只会影响到那些严重依赖CPU的程序(比如计算型的)。 如果你的程序大部分只会涉及到I/O,比如网络交互,那么使用多线程就很合适, 因为它们大部分时间都在等待。实际上,你完全可以放心的创建几千个Python线程, 现代操作系统运行这么多线程没有任何压力,没啥可担心的。

GIL是Python解释器设计的历史遗留问题,通常我们用的解释器是官方实现的CPython,要真正利用多核,除非重写一个不带GIL的解释器。所以,在Python中,可以使用多线程,但不要指望能有效利用多核。不过,也不用过于担心,Python虽然不能利用多线程实现多核任务,但可以通过多进程实现多核任务。多个Python进程有各自独立的GIL锁,互不影响。

线程的变量

不同的线程调用函数,且需要处理一些局部变量时,也可以写成这样不断的传参,只是很麻烦罢了:

1
2
3
4
5
6
7
8
9
10
11
12
13
def process_student(name):
std = Student(name)
# std是局部变量,但是每个函数都要用它,因此必须传进去:
do_task_1(std)
do_task_2(std)

def do_task_1(std):
do_subtask_1(std)
do_subtask_2(std)

def do_task_2(std):
do_subtask_2(std)
do_subtask_2(std)

在多线程环境下,每个线程都有自己的数据。一个线程使用自己的局部变量比使用全局变量好(不是程序中的局部变量和全局变量,是线程里的局部变量和全局变量)。因为线程的局部变量只有线程自己能看见,不会影响其他线程。可以通过ThreadLocal实现避免局部变量的层层传递的问题:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import threading

# 创建全局 ThreadLocal 对象:
local_school = threading.local()

def process_student():
# 获取当前线程关联的 student:
std = local_school.student
print('Hello, %s in %s' % (std, threading.current_thread().name))

def process_thread(string):
# 绑定 ThreadLocal 的 student:
local_school.student = string
process_student()

t1 = threading.Thread(target = process_thread, args=('Alice',), name='Thread-A')
t2 = threading.Thread(target = process_thread, args=('Bob',), name='Thread-B')
t1.start()
t2.start()
t1.join()
t2.join()

输出结果如下:

1
2
Hello, Alice in Thread-A
Hello, Bob in Thread-B

全局变量local_school就是一个ThreadLocal对象,每个Thread对它都可以读写student属性,但每个线程都只能读写自己线程的独立副本,互不影响。你可以把local_school看成全局变量,但每个属性如local_school.student都是线程的局部变量,可以任意读写而互不干扰,也不用管理锁的问题。ThreadLocal解决了参数在一个线程中各个函数之间互相传递的问题。

实例

通过多线程计算$\pi$,来演示串行代码和并行代码的时间差,展示并行计算的魅力。

串行代码

学过积分应该能根据公式看懂代码吧,耗时大概 3 秒左右。

1
2
3
4
5
6
7
8
9
10
11
12
13
import time

nsteps = 10000000
dx = 1.0 / nsteps
pi = 0.0
since = time.time()
for i in range(nsteps):
x = i / nsteps
pi += 4.0 / (1.0 + x * x)
pi *= dx
end = time.time()
print(pi / nsteps)
print(end - since)

并行代码

为了并行串行的代码,我们需要将for循环划分为几个子任务,并将它们分配给多个进程。所以需要提供每个进程的起始点、终止点和步长,且需要防止重复计算。然后把各个进程扔到进程池中,得到最后的返回结果后相加即可。为了使展示代码更清晰,我手动划分了两个进程。执行时间大概一秒左右。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import multiprocessing, time

def calc_pi(start, end):
steps = end - start
pi = 0
dx = 1 / (end - start)
for i in range(steps):
x = i / steps
pi += 4.0 / (1.0 + x * x)
return pi * dx

if __name__ == "__main__":

num = 2
p = multiprocessing.Pool(num)
inputs = [(1, 5000000), (5000000, 10000000)]

since = time.time()
multi_result = [p.apply_async(calc_pi, inp) for inp in inputs]
p.close()
p.join()
end = time.time()

result = [p.get() for p in multi_result]
pi = sum(result) / 2

print(pi)
print(end - since)

总结

从保存线程A的状态到准备切换为线程B时,加载线程B的状态的这个过程就叫上下文切换,而上下切换时会消耗大量的CPU时间。因此,只有在特定的条件下开启多线程才会更合适。不可避免的线程开销包括:

  • 上下文切换消耗
  • 线程创建和消亡的开销
  • 线程需要保存维持线程本地栈,会消耗内存

对于计算密集型的程序,程序主要为复杂的逻辑判断和复杂的运算,此时CPU的利用率高,不用开太多的线程,开太多线程反而会因为线程切换时切换上下文而浪费资源。计算密集型任务由于主要消耗CPU资源,因此,代码运行效率至关重要。Python这样的脚本语言运行效率很低,完全不适合计算密集型任务。对于计算密集型任务,最好用C语言编写。

对于IO密集型的程序,比如磁盘IO(读取文件)和网络IO(网络请求)。因为IO操作会阻塞线程,CPU利用率不高,可以开多点线程,一个线程阻塞时(网络延迟等)可以切换到其他就绪线程,提高CPU的利用率。这类任务的特点是CPU消耗很少,任务的大部分时间都在等待IO操作完成(因为IO的速度远远低于CPU和内存的速度)。IO密集型任务执行期间,99%的时间都花在IO上,花在CPU上的时间很少,因此,用运行速度极快的C语言替换用Python这样运行速度极低的脚本语言,完全无法提升运行效率。对于IO密集型任务,最合适的语言就是开发效率最高(代码量最少)的语言,脚本语言是首选,C语言最差。

我们介绍了多进程和多线程,这是实现多任务最常用的两种方式。虽然很水,在实际项目中能应用的概率四舍五入为0,但对于理解多线程、多进程还是有一定帮助的。现在,我们来讨论一下这两种方式的优缺点。

多进程模式最大的优点就是稳定性高,因为一个子进程崩溃了,不会影响主进程和其他子进程。(当然主进程挂了所有进程就全挂了,但是Master进程只负责分配任务,挂掉的概率低)。

多线程模式致命的缺点就是任何一个线程挂掉都可能直接造成整个进程崩溃,因为所有线程共享进程的内存。在Windows上,如果一个线程执行的代码出了问题,你经常可以看到这样的提示:“该程序执行了非法操作,即将关闭”,其实往往是某个线程出了问题,但是操作系统会强制结束整个进程。

无论是多进程还是多线程,只要数量一多,效率肯定上不去,为什么呢?

因为进程或线程的切换是有代价的,操作系统在切换进程或者线程时需要先保存当前执行的现场环境(CPU寄存器状态、内存页等),然后,把新任务的执行环境准备好(恢复上次的寄存器状态,切换内存页等),才能开始执行。这个切换过程虽然很快,但是也需要耗费时间。如果有几千个任务同时进行,操作系统可能就主要忙着切换任务,根本没有多少时间去执行任务了,这种情况最常见的就是硬盘狂响,点窗口无反应,系统处于假死状态。

所以,多任务一旦多到一个限度,就会消耗掉系统所有的资源,结果效率急剧下降,所有任务都做不好。

后续

应该是通信、MPI什么的吧,进程死锁、上下文切换开销对比什么的看有没有时间。

感谢上学期间打赏我的朋友们。赛博乞讨:我,秦始皇,打钱。

欢迎订阅我的文章