2018 年计算机组成原理的大作业,五级流水不会写,三级流水写不出来。竟然没想不到多年后还会用到多级流水的思路去设计代码。
Python 线程池 在介绍多级流水之前,先简单介绍下 Python
线程池的使用:在通过线程池提交任务后,可以调用 result()
方法等待任务执行结束。该方法会阻塞当前线程,直到任务执行结束并返回结果,任务没有返回值时 result()
将获取 None
。下面是一个简单的例子。
1 2 3 4 5 6 7 8 9 10 11 import concurrent.futures def task (x ): print ("1" ) executor = concurrent.futures.ThreadPoolExecutor(max_workers=1 ) wait_token = executor.submit(task, 7 ) print (wait_token.result())
多级流水 适用场景 多级流水的核心作用是:通过异步调用来加速代码的执行,和多线程相比只需要更少的内存。尤其适用于以下场景:需要多次的顺序执行若干任务。假设此时有三个任务 1 2 3 需要循环执行 100 次。任务 1 从外界读取输入,而任务 2 的输入是任务 1 的输出,任务 3 的输入是任务 2 的输出,有明显的顺序依赖。
1 2 3 4 5 6 for _ in range (100 ): val = read() val = Task1(val) val = Task2(val) val = Task3(val) write(val)
假设任务 1 3 均为 IO 任务,耗时 0.1 ms,任务 2 为计算任务,需要开辟很大的内存,耗时 0.2ms。如果是多线程加速的方式,因为存在明显的数据依赖,会将 1,2,3 视为一个整体进行处理。如前 50 个任务放到一个线程执行,后 50 个任务在另一个线程执行。需要的时间为 50 * (0.1 + 0.2 + 0.1) = 20s。但此时存在潜在风险 :如果两个线程同时执行任务 2 ,会开辟两块的大内存空间。我用 python
代码搭建了一个具体的例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 import time import concurrent.futures def task1 (in_data, idx ): in_data[idx] += 1 time.sleep(0.1 ) def task2 (in_data, idx ): in_data[idx] *= 2 time.sleep(0.2 ) def task3 (in_data, idx ): in_data[idx] -= 1 time.sleep(0.1 ) def serial (datas ): start = time.time() for i in range (len (datas)): task1(datas, i) task2(datas, i) task3(datas, i) end = time.time() print (" Serial Cost Time: {}" .format (end - start)) if __name__ == "__main__" : n_data_serial = [i for i in range (100 )] serial(n_data_serial)
多级流水 通过异步调用来实现任务流水的方式,将任务 1 和任务 3 异步执行,在执行任务 2 的时同时完成任务 1 和任务 3 的 IO 处理。如下图所示,虚线框表示为异步执行,实线框为同步执行,相同的颜色区域表示存在数据依赖。
程序如下所示,流水的时间为:100 * (0.1 + 0.2 + 0.1) / 2 = 20s,且不存在同时执行两个任务 2 的情况,所以所需的峰值内存理论上是多线程的一半 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 def parall (datas ): start = time.time() n_len = len (datas) executor = concurrent.futures.ThreadPoolExecutor(max_workers=1 ) wait_token = None for i in range (n_len): if 0 == i: task1(datas, i) wait_token = executor.submit(task1, datas, i + 1 ) task2(datas, i) elif i == n_len - 1 : wait_token.result() wait_token = executor.submit(task3, datas, i - 1 ) task2(datas, i) wait_token.result() task3(datas, i) else : wait_token.result() wait_token = executor.submit(task31, datas, i - 1 , i + 1 ) task2(datas, i) end_time = time.time() print (" Parallel Cost Time: {}" .format (end - start)) if __name__ == "__main__" : n_data_serial = [i for i in range (100 )] n_data_parall = [i for i in range (100 )] serial(n_data_serial) parall(n_data_parall) print (" Compare Res : {}" .format (n_data_serial == n_data_parall))