0%

MPI基础,以 mpi4py 为例(操作系统番外篇)

以python为例,多线程由于GIL的存在,所以有multiprocessing来缓解;但它不能面向多个计算节点编程,所以MPI来了。当面向多个计算节点编程时,程序的各部分之间通过来回传递消息的方式通信。要使得消息传递方式可移植,就需要采用标准的消息传递库。这就促成消息传递接口(Message Passing Interface, MPI)的面世,MPI是一种被广泛采用的消息传递标准,所以MPI并不是一种语言。

在程序中,不同的进程需要相互的数据交换,特别是在科学计算中,需要大规模的计算与数据交换,集群可以很好解决单节点计算力不足的问题,但在集群中大规模的数据交换是很耗费时间的,因此需要一种在多节点的情况下能快速进行数据交流的标准,这就是MPI。所以MPI是一组用于多节点数据通信的标准,而非一种语言或者接口。具体的使用方法需要依赖它的具体实现(mpich or openmpi等)。对于 python 的 MPI 标准而言,可以使用 mpi4py 这个库。实现了点到点通信和集群通信等功能。

参考

博客简明的介绍 mpi4py 的使用,本博客翻译自瑞典皇家理工PDC高性能计算中心发布的文章,已经过原作者同意。原文写的很好,英文水平够建议阅读原文。

  1. https://www.kth.se/blogs/pdc/2019/08/parallel-programming-in-python-mpi4py-part-1/
  2. https://www.kth.se/blogs/pdc/2019/11/parallel-programming-in-python-mpi4py-part-2/

需要注意的是:

  • 里面所有的代码我都执行过,没问题
  • 并非原封不动的翻译。为了更好的理解和适配国内的阅读风格,做了些许修改。并加了点自己的东西,照搬原文不是我的习惯。
  • MPI 中没有主进程的概念,结合代码中变量的作用,将原文的 master process在这里翻译为管理者进程或领导者进程。任何进程都可以作为管理者进程,所以这里不能翻译为主进程。另外一点是,会与 multiprocess 中的主进程有歧义。

环境与运行方式

我在windows下安装最新版的mpi4py,但是包导入不进来,关于 windows 对代码很不友好这一点我领略很多次了。我也懒的细查是什么原因,反正我有linux操作系统。所以,本文的代码都是在 Arch 这款系统下跑通的。

运行一般MPI程序的方法是:

1
mpirun -n 4 python3 filename.py // 4 表示启动 4 个进程

因linux的mpi4py禁止系统创建大量进程,否则会报错。因此在项目根目录下创建hostfile文件,写入localhost slots=25,表示最大允许创建的进程数量是25。创建20个进程时,程序执行方式:

1
mpirun --hostfile hostfile -np 20 python filename.py

基础

在使用 MPI 并行编程时,需要一种叫『通信器』的东西,它是由多个能相互通信的进程组成的一个群组。为了确定进程是群组中的哪一个,每个进程都会被分配唯一的ID号,称为 rank。所以,能通过 rank 得知群组中有多少进程。

1
2
3
4
5
6
7
from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

print('Hello from process {} out of {}'.format(rank, size))

在上述代码中,comm变量是定义的默认『通信器』,能通过 Get_rank 方法取回通信器中每个进程的 rank ,通过 Get_size 方法获取通信器内进程的数量。启动四个进程,运行的输出是:

1
2
3
4
Hello from process 3 out of 4
Hello from process 0 out of 4
Hello from process 1 out of 4
Hello from process 2 out of 4

简单类型通信

点到点通信

点到点通信时,mpi4py 提供了和MPI中类似的 sendrecv 方法,用于传递简单的python对象。一个典型的例子,管理者进程和其他工作进程的通信,管理者进程发送一个python的字典到工作者进程中。通信模型与代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

# 主进程
if rank == 0:
data = {'x': 1, 'y': 2.0}
# 主进程发送字典到所有的子进程
print('Process {} sent data:'.format(rank), data)
for i in range(1, size):
comm.send(data, dest=i, tag=i)

# 子进程收到主进程的数据
else:
data = comm.recv(source=0, tag=rank)
print('Process {} received data:'.format(rank), data)

输出结果:

1
2
3
4
5
6
Process 0 sent data: {'x': 1, 'y': 2.0}
Process 0 sent data: {'x': 1, 'y': 2.0}
Process 0 sent data: {'x': 1, 'y': 2.0}
Process 1 received data: {'x': 1, 'y': 2.0}
Process 2 received data: {'x': 1, 'y': 2.0}
Process 3 received data: {'x': 1, 'y': 2.0}

dest表示发送的目标进程的 rank,tag表示消息的 ID,source 表示源头进程的 rank,tag表示信息的 ID。两个 ID 对不上,消息则无法接收。通过 rank 号来设置主进程,在这里,指定的 rank=0 的就是『管理者进程』。然后通过一个 for 循环给其他进程发送数据。当通信的内容很多时,可以使用 tag 来区分不同的消息。

然而上述代码的通信方式是阻塞通信,意思是,通信不结束,代码无法继续往下执行。记录下时间,大概 1.44秒。

阻塞的行为在并行编程中不可取,所以我们要使用非阻塞通信的方法,isendirecv。会立即返回请求对象,并使用wait方法来管理这些进程完成通信。大约 0.34 秒。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
if rank == 0:
data = {'x': 1, 'y': 2.0}
for i in range(1, size):
req = comm.isend(data, dest=i, tag=i)
req.wait()
print('Process {} sent data:'.format(rank), data)

tmp = comm.irecv(source=i, tag=i)
time_ += tmp.wait()
print(time_)

else:
since = time.time()
req = comm.irecv(source=0, tag=rank)
data = req.wait()
end = time.time()

# 将消耗的时间发送回去
comm.send(end-since, dest=0, tag=rank)

print('Process {} received data:'.format(rank), data)

集体通信

广播

广播通信的意思是,一个节点给其他所有的节点发送信息。以发送 numpy 数组为例,root 表示广播节点的 rank 号。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from mpi4py import MPI
import numpy as np

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

if rank == 0:
data = np.arange(4.0)
else:
data = None

msg = comm.bcast(data, root=0)

if rank == 0:
print('Process {} broadcast data:'.format(rank), msg)
else:
print('Process {} received data:'.format(rank), msg)

输出为:

1
2
3
4
Process 0 broadcast data: [0. 1. 2. 3.]
Process 1 received data: [0. 1. 2. 3.]
Process 2 received data: [0. 1. 2. 3.]
Process 3 received data: [0. 1. 2. 3.]

scatter

在面对大规模任务时,尤其是面对巨大的数组、列表数据,可以先对数据进行划分,分成不同的子任务。将子任务交给各个工作者进程去执行,这时候可以使用scatter方法,但分配的数量不能超过处理器的数量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from mpi4py import MPI
import numpy as np

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
nprocs = comm.Get_size()

if rank == 0:
data = np.arange(17.0)

# ave 除数,res 余数
ave, res = divmod(data.size, nprocs)

# 余数是多出来的,加到前面 防止划分不均
counts = [ave + 1 if p < res else ave for p in range(nprocs)]

# 子任务的起始点与结束点 个数求和 表示下标
starts = [sum(counts[:p]) for p in range(nprocs)]
ends = [sum(counts[:p+1]) for p in range(nprocs)]

# 切片
data = [data[starts[p]:ends[p]] for p in range(nprocs)]
else:
data = None

data = comm.scatter(data, root=0)

print('Process {} has data:'.format(rank), data)

输出为:

1
2
3
4
Process 0 has data: [0. 1. 2. 3. 4.]
Process 1 has data: [5. 6. 7. 8.]
Process 3 has data: [13. 14. 15. 16.]
Process 2 has data: [ 9. 10. 11. 12.]

gather

scatter 相反的是 gather,用于收集每个进程中指定的变量,这个方法收集每个进程的元素会汇集到一个列表中。一个实例,两者同时使用来计算 $\pi$。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
from mpi4py import MPI
import time
import math

t0 = time.time()

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
nprocs = comm.Get_size()

nsteps = 10000000
dx = 1.0 / nsteps

if rank == 0:
ave, res = divmod(nsteps, nprocs)
counts = [ave + 1 if p < res else ave for p in range(nprocs)]

starts = [sum(counts[:p]) for p in range(nprocs)]
ends = [sum(counts[:p+1]) for p in range(nprocs)]

data = [(starts[p], ends[p]) for p in range(nprocs)]
else:
data = None

data = comm.scatter(data, root=0)

partial_pi = 0.0
for i in range(data[0], data[1]):
x = (i + 0.5) * dx
partial_pi += 4.0 / (1.0 + x * x)
partial_pi *= dx

# 收集每个进程的 partial_pi 到 0号进程中
partial_pi = comm.gather(partial_pi, root=0)

if rank == 0:
print('pi computed in {:.3f} sec'.format(time.time() - t0))
print('error is {}'.format(abs(sum(partial_pi) - math.pi)))

输出为:

1
2
pi computed in 0.546 sec
error is 1.234568003383174e-13

同样,也可以直接使用 reduce 收集结果,在收集的时候就能进行求和等操作。

1
2
3
4
5
6
# 收集每个进程的 partial_pi
partial_pi = comm.reduce(partial_pi, root=0, op=MPI.SUM)

if rank == 0:
print('pi computed in {:.3f} sec'.format(time.time() - t0))
print('error is {}'.format(abs(partial_pi) - math.pi))

类缓冲区对象通信

mpi4py 提供了一些方法来直接发送和接收类缓冲区对象。优点是通信快,缺点是需要程序员要显式的声明所需分配的内存空间。在接收数据之前就需要显式的分配内存空间,且发送数据的大小不能超过缓冲区。同样需要注意的是,类缓冲区对象在物理地址上应该是连续的。好巧不巧,numpy 恰好符合这一点。所以在科学计算中,numpy 数组常用于类缓冲区对象。所以接下来使用用 numpy 来演示类缓冲区对象的发送与接受。

与之前python普通对象通信不同的是,这里的类缓冲区对象的方法首字母都是大写的。

点到点通信

与之前不同的时,接收数据的进程需要提前初始化缓冲区,也就是在 recv 被调用之前。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from mpi4py import MPI
import numpy as np

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
num = 4

# master process
if rank == 0:
data = np.arange(float(num))
for i in range(1, size):
comm.Send(data, dest=i, tag=i)
print('Process {} sent data:'.format(rank), data)

# worker processes
else:
# 初始化
data = np.zeros(num)
# 接收
comm.Recv(data, source=0, tag=rank)
print('Process {} received data:'.format(rank), data)

data作为第一个参数,被写入接收的数据。输出是:

1
2
3
4
5
6
Process 0 sent data: [0. 1. 2. 3.]
Process 0 sent data: [0. 1. 2. 3.]
Process 2 received data: [0. 1. 2. 3.]
Process 0 sent data: [0. 1. 2. 3.]
Process 3 received data: [0. 1. 2. 3.]
Process 1 received data: [0. 1. 2. 3.]

再使用 SendRecv 的时候需要注意的是发送数据大小和接收的数据大小应该匹配。

  • 如果发送的数据大小大于接收区的缓存大小,将会报错;
  • 缓冲区的数据大小大于发送区的数据大小是没关系的。
1
2
3
4
5
6
7
8
9
10
if rank == 0:
data = np.arange(4.)
for i in range(1, size):
comm.Send(data, dest=i, tag=i)
print('Process {} sent data:'.format(rank), data)

else:
data = np.zeros(6)
comm.Recv(data, source=0, tag=rank)
print('Process {} has data:'.format(rank), data)

上述程序中,发送4个大小的数据,缓冲区用6个大小的数据块去接收,只有前4个数据被覆盖了:

1
2
3
4
5
6
Process 0 sent data: [0. 1. 2. 3.]
Process 0 sent data: [0. 1. 2. 3.]
Process 0 sent data: [0. 1. 2. 3.]
Process 1 has data: [0. 1. 2. 3. 0. 0.]
Process 2 has data: [0. 1. 2. 3. 0. 0.]
Process 3 has data: [0. 1. 2. 3. 0. 0.]

同样,也有对应的阻塞版本和非阻塞版本。

1
2
3
4
5
6
7
8
9
10
11
12
if rank == 0:
data = np.arange(4.)
for i in range(1, size):
req = comm.Isend(data, dest=i, tag=i)
req.Wait()
print('Process {} sent data:'.format(rank), data)

else:
data = np.zeros(4)
req = comm.Irecv(data, source=0, tag=rank)
req.wait()
print('Process {} received data:'.format(rank), data)

集体通信

其实这里的内容基本和之前的类似了。

广播

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from mpi4py import MPI
import numpy as np

comm = MPI.COMM_WORLD
rank = comm.Get_rank()

if rank == 0:
data = np.arange(4.0)
else:
data = np.zeros(4)

comm.Bcast(data, root=0)

print('Process {} has data:'.format(rank), data)

输出是:

1
2
3
4
Process 0 has data: [0. 1. 2. 3.]
Process 1 has data: [0. 1. 2. 3.]
Process 3 has data: [0. 1. 2. 3.]
Process 2 has data: [0. 1. 2. 3.]

Scatter

另一种集群通信是 Scatter,将一个大型的数组进行切片。之前演示过 scatter,但是 Scatter 不像 scatter 一样容易使用,因为它需要提前知道数组的大小。问题是,实际中往往无法提前获知数组的大小,因此任务的划分也成了问题。更推荐去使用 Scatterv,这是 Scatter 的向量版本,提供了很多灵活的方法来分配数组。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
from mpi4py import MPI
import numpy as np

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
nprocs = comm.Get_size()

if rank == 0:
sendbuf = np.arange(15.0)

ave, res = divmod(sendbuf.size, nprocs)
# 每个进程的元素数量
count = [ave + 1 if p < res else ave for p in range(nprocs)]
count = np.array(count)

# 每个进程的起始位置
displ = [sum(count[:p]) for p in range(nprocs)]
displ = np.array(displ)
else:
sendbuf = None
count = np.zeros(nprocs, dtype=np.int)
displ = None

comm.Bcast(count, root=0)

recvbuf = np.zeros(count[rank])

comm.Scatterv([sendbuf, count, displ, MPI.DOUBLE], recvbuf, root=0)

print('After Scatterv, process {} has data:'.format(rank), recvbuf)

通过[sendbuf, count, displ, MPI.DOUBLE]来指定类缓冲区对象,count表示发送给每个进程的数量,displ表示子任务的起始切片。所以这里是对 sendbuf 进行切分,并保存到 recvbuf 中。

1
2
3
4
After Scatterv, process 0 has data: [0. 1. 2. 3.]
After Scatterv, process 1 has data: [4. 5. 6. 7.]
After Scatterv, process 2 has data: [8. 9. 10. 11.]
After Scatterv, process 3 has data: [12. 13. 14.]

Gatherv

Gatherv 的行为和 Scatterv 相反,所以,在使用 Gatherv 的时候,需要指定接收缓存的形式为:[recvbuf2, count, displ, MPI.DOUBLE]。管理者进程中,sendbuf2会被收集到一个更大的recvbuf2中。

1
2
3
4
5
6
7
sendbuf2 = recvbuf
recvbuf2 = np.zeros(sum(count))
# sendbuf2 按照指定的规则汇总
comm.Gatherv(sendbuf2, [recvbuf2, count, displ, MPI.DOUBLE], root=0)

if comm.Get_rank() == 0:
print('After Gatherv, process 0 has data:', recvbuf2)

输出的结果是:

1
2
After Gatherv, process 0 has data: 
[ 0. 1. 2. 3. 4. 5. 6. 7. 8. 9. 10. 11. 12. 13. 14.]

reduce

在另一个角度而言,reduce 可以对每个进程所收集的数据进行求和,也就是计算每个进程 recvbuf 的和,并汇总到管理者进程中。

1
2
3
4
5
6
7
8
partial_sum = np.zeros(1)
partial_sum[0] = sum(recvbuf)
print('Partial sum on process {} is:'.format(rank), partial_sum[0])

total_sum = np.zeros(1)
comm.Reduce(partial_sum, total_sum, op=MPI.SUM, root=0)
if comm.Get_rank() == 0:
print('After Reduce, total sum on process 0 is:', total_sum[0])

输出是:

1
2
3
4
5
Partial sum on process 0 is: 6.0
Partial sum on process 1 is: 22.0
Partial sum on process 2 is: 38.0
Partial sum on process 3 is: 39.0
After Reduce, total sum on process 0 is: 105.0

实例

这里的实例是一百万维的向量加法串行计算和并行计算的性能对比。因为Scatterv只支持一维向量的通信,矩阵乘法还得从二维转一维,我懒的费尽了。

串行

大概 0.35 秒左右。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import numpy as np
import time

np.random.seed(2)
size = 1000000

x1 = np.random.random(size)
x2 = np.random.random(size)
result = np.zeros(size, dtype=float)

since = time.time()
for i in range(size):
result[i] = x1[i] + x2[i]
end = time.time()

print(end - since)

并行

我开了 5 个进程,大概 0.083 秒左右。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
from mpi4py import MPI
import numpy as np
import time

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
nprocs = comm.Get_size()

size = 1000000
x1 = np.random.random(size)
x2 = np.random.random(size)

if rank == 0:

ave, res = divmod(size, nprocs)

count = [ave + 1 if p < res else ave for p in range(nprocs)]
count = np.array(count)

displ = [sum(count[:p]) for p in range(nprocs)]
displ = np.array(displ)
else:
sendbuf = None
count = np.zeros(nprocs, dtype=np.int)
displ = None

t0 = time.time()
comm.Bcast(count, root=0)

recvbuf1 = np.zeros(count[rank])
recvbuf2 = np.zeros(count[rank])

comm.Scatterv([x1, count, displ, MPI.DOUBLE], recvbuf1, root=0)
comm.Scatterv([x2, count, displ, MPI.DOUBLE], recvbuf2, root=0)

print('After Scatterv, process {} has data:'.format(rank), recvbuf1)
print('After Scatterv, process {} has data:'.format(rank), recvbuf2)

for i in range(recvbuf1.shape[0]):
recvbuf1[i] += recvbuf2[i]

sendbuf2 = recvbuf1
recvbuf2 = np.zeros(sum(count))
comm.Gatherv(sendbuf2, [recvbuf2, count, displ, MPI.DOUBLE], root=0)

if comm.Get_rank() == 0:
print('pi computed in {:.3f} sec'.format(time.time() - t0))
print('After Gatherv, process 0 has data:', recvbuf2)

结语

并行计算的坑大概是开完了,这里写点结语吧。我大一上学期寒假的时候,有个叫 CCF 的组织来我校讲过一次并行计算。我那会儿还小,对一切未知事物都感到好奇。那会他们讲的我也实在听不懂,毕竟没学过多少计算机课程,还去问老师有没有什么相关资料,那会儿可真是傻啊,这有啥可问的。

时至上学期的『高性能计算实验课』,又再次接触到了这个东西。今时不同往日,进程、线程、系统、编程这些概念我都学过了,所以入门并行计算也是很一蹴而就的事情,就相当于把课程内容and所学的操作系统之类的知识整理一下。

掏出之前用 MPI 写过的一点并行计算的任务,加速比还是很明显的。代码太烂就不放了

感谢上学期间打赏我的朋友们。赛博乞讨:我,秦始皇,打钱。

欢迎订阅我的文章