0%

任务流水:加快程序运行和减少内存占用我全都要

2018 年计算机组成原理的大作业,五级流水不会写,三级流水写不出来。竟然没想不到多年后还会用到多级流水的思路去设计代码。

Python 线程池

在介绍多级流水之前,先简单介绍下 Python 线程池的使用:在通过线程池提交任务后,可以调用 result() 方法等待任务执行结束。该方法会阻塞当前线程,直到任务执行结束并返回结果,任务没有返回值时 result() 将获取 None。下面是一个简单的例子。

1
2
3
4
5
6
7
8
9
10
11
import concurrent.futures 
def task(x):
# return x * x
print("1")

# 创建
executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
# 提交
wait_token = executor.submit(task, 7)
# 等待结束
print(wait_token.result())

多级流水

适用场景

多级流水的核心作用是:通过异步调用来加速代码的执行,和多线程相比只需要更少的内存。尤其适用于以下场景:需要多次的顺序执行若干任务。假设此时有三个任务 1 2 3 需要循环执行 100 次。任务 1 从外界读取输入,而任务 2 的输入是任务 1 的输出,任务 3 的输入是任务 2 的输出,有明显的顺序依赖。

1
2
3
4
5
6
for _ in range(100): 
val = read()
val = Task1(val) # IO 任务, 0.1s
val = Task2(val) # 计算任务, 0.2s,且申请大内存
val = Task3(val) # IO 任务, 0.1s
write(val)

假设任务 1 3 均为 IO 任务,耗时 0.1 ms,任务 2 为计算任务,需要开辟很大的内存,耗时 0.2ms。如果是多线程加速的方式,因为存在明显的数据依赖,会将 1,2,3 视为一个整体进行处理。如前 50 个任务放到一个线程执行,后 50 个任务在另一个线程执行。需要的时间为 50 * (0.1 + 0.2 + 0.1) = 20s。但此时存在潜在风险:如果两个线程同时执行任务 2 ,会开辟两块的大内存空间。我用 python 代码搭建了一个具体的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import time 
import concurrent.futures

def task1(in_data, idx):
in_data[idx] += 1
time.sleep(0.1)

def task2(in_data, idx):
in_data[idx] *= 2
time.sleep(0.2)

def task3(in_data, idx):
in_data[idx] -= 1
time.sleep(0.1)

def serial(datas):
start = time.time()
for i in range(len(datas)):
task1(datas, i)
task2(datas, i)
task3(datas, i)
end = time.time()
print(" Serial Cost Time: {}".format(end - start))

if __name__ == "__main__":
n_data_serial = [i for i in range(100)]
serial(n_data_serial)

多级流水

通过异步调用来实现任务流水的方式,将任务 1 和任务 3 异步执行,在执行任务 2 的时同时完成任务 1 和任务 3 的 IO 处理。如下图所示,虚线框表示为异步执行,实线框为同步执行,相同的颜色区域表示存在数据依赖。

程序如下所示,流水的时间为:100 * (0.1 + 0.2 + 0.1) / 2 = 20s,且不存在同时执行两个任务 2 的情况,所以所需的峰值内存理论上是多线程的一半

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def parall(datas): 
start = time.time()
n_len = len(datas)
executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
wait_token = None
for i in range(n_len):
if 0 == i:
task1(datas, i)
wait_token = executor.submit(task1, datas, i + 1)
task2(datas, i)
elif i == n_len - 1:
wait_token.result()
wait_token = executor.submit(task3, datas, i - 1)
task2(datas, i)
wait_token.result()
task3(datas, i)
else:
wait_token.result()
wait_token = executor.submit(task31, datas, i - 1, i + 1)
task2(datas, i)
end_time = time.time()
print(" Parallel Cost Time: {}".format(end - start))


if __name__ == "__main__":
n_data_serial = [i for i in range(100)]
n_data_parall = [i for i in range(100)]

serial(n_data_serial)
parall(n_data_parall)

print(" Compare Res : {}".format(n_data_serial == n_data_parall))
感谢上学期间打赏我的朋友们。赛博乞讨:我,秦始皇,打钱。

欢迎订阅我的文章