2018 年计算机组成原理的大作业,五级流水不会写,三级流水写不出来。竟然没想不到多年后还会用到多级流水的思路去设计代码。
Python 线程池 在介绍多级流水之前,先简单介绍下 Python 线程池的使用:在通过线程池提交任务后,可以调用 result() 方法等待任务执行结束。该方法会阻塞当前线程,直到任务执行结束并返回结果,任务没有返回值时 result() 将获取 None。下面是一个简单的例子。
1 2 3 4 5 6 7 8 9 10 11 import  concurrent.futures def  task (x ):          print ("1" )       executor = concurrent.futures.ThreadPoolExecutor(max_workers=1 )  wait_token = executor.submit(task, 7 )  print (wait_token.result())
多级流水 适用场景 多级流水的核心作用是:通过异步调用来加速代码的执行,和多线程相比只需要更少的内存。尤其适用于以下场景:需要多次的顺序执行若干任务。假设此时有三个任务 1 2 3 需要循环执行 100 次。任务 1 从外界读取输入,而任务 2 的输入是任务 1 的输出,任务 3 的输入是任务 2 的输出,有明显的顺序依赖。
1 2 3 4 5 6 for  _ in  range (100 ):     val = read()      val = Task1(val)      val = Task2(val)      val = Task3(val)      write(val) 
假设任务 1 3 均为 IO 任务,耗时 0.1 ms,任务 2 为计算任务,需要开辟很大的内存,耗时 0.2ms。如果是多线程加速的方式,因为存在明显的数据依赖,会将 1,2,3 视为一个整体进行处理。如前 50 个任务放到一个线程执行,后 50 个任务在另一个线程执行。需要的时间为 50 * (0.1 + 0.2 + 0.1) = 20s。但此时存在潜在风险 :如果两个线程同时执行任务 2 ,会开辟两块的大内存空间。我用 python 代码搭建了一个具体的例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 import  time import  concurrent.futures def  task1 (in_data, idx ):     in_data[idx] += 1       time.sleep(0.1 )       def  task2 (in_data, idx ):     in_data[idx] *= 2       time.sleep(0.2 )       def  task3 (in_data, idx ):     in_data[idx] -= 1       time.sleep(0.1 )       def  serial (datas ):     start = time.time()      for  i in  range (len (datas)):          task1(datas, i)          task2(datas, i)          task3(datas, i)      end = time.time()      print (" Serial Cost Time: {}" .format (end - start)) if  __name__ == "__main__" :     n_data_serial = [i for  i in  range (100 )]      serial(n_data_serial)  
多级流水 通过异步调用来实现任务流水的方式,将任务 1 和任务 3 异步执行,在执行任务 2 的时同时完成任务 1 和任务 3 的 IO 处理。如下图所示,虚线框表示为异步执行,实线框为同步执行,相同的颜色区域表示存在数据依赖。
程序如下所示,流水的时间为:100 * (0.1 + 0.2 + 0.1) / 2 = 20s,且不存在同时执行两个任务 2 的情况,所以所需的峰值内存理论上是多线程的一半 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 def  parall (datas ):     start = time.time()      n_len = len (datas)      executor = concurrent.futures.ThreadPoolExecutor(max_workers=1 )      wait_token = None       for  i in  range (n_len):          if  0  == i:              task1(datas, i)              wait_token = executor.submit(task1, datas, i + 1 )              task2(datas, i)          elif  i == n_len - 1 :              wait_token.result()              wait_token = executor.submit(task3, datas, i - 1 )              task2(datas, i)              wait_token.result()              task3(datas, i)          else :              wait_token.result()              wait_token = executor.submit(task31, datas, i - 1 , i + 1 )              task2(datas, i)      end_time = time.time()     print (" Parallel Cost Time: {}" .format (end - start)) if  __name__ == "__main__" :     n_data_serial = [i for  i in  range (100 )]      n_data_parall = [i for  i in  range (100 )]           serial(n_data_serial)      parall(n_data_parall)           print (" Compare Res : {}" .format (n_data_serial == n_data_parall))