0%

操作系统『四』:进程通信

在使用多进程编程时,会经常涉及到进程之间通信的问题。除了借助文件、特殊变量等,还可以借助一些特殊的数据结构完成功能复杂的通信。并行应用常常需要在进程之间交换数据。python的Multiprocessing库有两个通信通道可以交换对象:队列(queue)和管道(pipe)。

  • 队列返回一个进程共享的队列,队列的操作是线程安全的,也是进程安全的。任何可序列化的对象(Python通过 pickable 模块序列化对象)都可以通过它进行交换。
  • 管道返回一对被管道连接的连接对象,然后对象就有了 send/receive 方法,可以在进程之间通信

同步/异步

消息通信机制分为同步和异步,在进程间调用:

  • 所谓同步,就是在发出一个调用时,在没有得到结果之前,该调用就不返回。但是一旦调用返回,就得到返回值了。用户进程发起请求后,需要等待或者轮询操作完成后才能继续执行。
  • 而异步则是相反,调用在发出之后,这个调用就直接返回了。可能去执行其他的东西,所以暂时没有返回结果。换句话说,当一个异步过程调用发出后,调用者不会立刻得到结果。而是在调用发出后,被调用者通过状态、通知来通知调用者。

举个通俗的例子:你打电话问书店老板有没有《分布式系统》这本书:

  • 如果是同步通信机制,书店老板会说,你稍等,”我查一下”,然后开始查啊查,等查好了(可能是5分钟,也可能是一天)告诉你结果(返回结果)。
  • 而异步通信机制,书店老板直接告诉你我查一下啊,查好了打电话给你,然后直接挂电话了(不返回结果)。然后查好了,他会主动打电话给你。在这里老板通过回电这种方式来回调。

一定要区分与阻塞、非阻塞的区别。在通俗一点,阻塞、非阻塞关注的是用户,同步、异步关注的是老板。

借助队列的通信

写了这么多理论,来点代码看一看。这里需要注意的是,如果要创建一个进程的类并封装任务,只需要继承multiprocessing.Process类,并覆写run()方法即可。实现一个多生产者、多消费者的模型。这是典型的异步通信,生产者无需通知消费者,消费者也不需要通知生产者,而是将『队列长度』作为信号,来判断接下来的动作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import multiprocessing
import random
import time
import threading

class Producer(multiprocessing.Process):
def __init__(self, queue):
multiprocessing.Process.__init__(self)
self.queue = queue

def run(self):
for i in range(3):
item = random.randint(0, 256)
self.queue.put(item)
print("Producer : item %d appended to queue %s" % (item, self.name))
time.sleep(1)
print("The size of queue is %s" % self.queue.qsize())

class Consumer(multiprocessing.Process):
def __init__(self, queue):
multiprocessing.Process.__init__(self)
self.queue = queue

def run(self):
while True:
if self.queue.empty():
print("the queue is empty")
else:
time.sleep(1)
item = self.queue.get()
print('Consumer : item %d popped from by %s \n' % (item, self.name))
# put到队列的每个任务都调用了task_done方法后,join才会完成阻塞
self.queue.task_done()

class main(object):
def __init__(self):
self.queue = multiprocessing.Queue()
self.process_producer = Producer(self.queue)
self.process_consumer = Consumer(self.queue)

def run(self):
self.t = threading.Timer(1, self.dead)
self.t.start()
self.process_producer.start()
self.process_consumer.start()
self.process_producer.join()

def dead(self):
t = threading.Timer(1, self.dead)
# 生产者终止,队列为空,则结束消费者
if not self.process_producer.is_alive() and self.queue.empty():
self.process_consumer.terminate()
self.t.cancel()
t.cancel()
t.start()

if __name__ == '__main__':
a = main()
a.run()

借助管道的通信

管道会返回两个对象,代表了管道的两个端点。每个对象都能通过 send() 和 recv() 方法进行通信。最简单的用法:

1
2
3
4
5
6
7
8
9
10
11
12
13
from multiprocessing import Process, Pipe

def f(conn):
conn.send([42, None, 'hello'])
conn.close()

if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target=f, args=(parent_conn, ))
p.start()
p.join()
print(child_conn.recv())
child_conn.close()

这里需要注意的是,如果俩个进程试图在管道的一端同时进行读或写操作,可能会导致管道崩溃,也就是需要互斥。如果使用管道的两端来同时干这件事是安全的。

来演示一种通信模型,异步阻塞的通信方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import multiprocessing 

def sender(conn, msgs):
"""
function to send messages to other end of pipe
"""
for msg in msgs:
conn.send(msg)
print("Sent the message: {}".format(msg))
# 不发送了就关闭
conn.close()

def receiver(conn):
"""
function to print the messages received from other
end of pipe
"""
while True:
msg = conn.recv()
if msg == "END":
break
print("Received the message: {}".format(msg))

if __name__ == "__main__":
# messages to be sent
msgs = ["hello", "hey", "hru?", "END"]

# creating a pipe
parent_conn, child_conn = multiprocessing.Pipe()
print(parent_conn)

# creating new processes
p1 = multiprocessing.Process(target=sender, args=(parent_conn, msgs))
p2 = multiprocessing.Process(target=receiver, args=(child_conn))

# running processes
p1.start()
p2.start()

# wait until processes finish
p1.join()
p2.join()

但是我在使用管道的通信的时候发现了一个问题。众所周知多进程不共享变量,我在主进程、两个子进程中打印了parent_conn的地址,三个地址都是不相同的。也就是说,p1的发送端和p2的发送端是不一样的,也就是p1的发送端和p2的接收端可能对不上,离奇的是他们居然能通信。

1
2
3
主进程 parent_coon 地址:0x000001F239546580
p1进程 parent_coon 地址:0x0000016B62B6D460
p2进程 parent_coon 地址:0x000001CC9A709190
  • 子进程虽然在不同的变量空间中,他们的变量也不一样。这不影响通信,也就是说,子进程发送的数据,被另一个子进程收到,这也正是管道的用处。
  • sender()函数中关闭了发送端,但我在主进程检测发送端是否关闭的时候他又没有关闭。这是因为多进程不共享变量,子进程关闭不会影响主进程。神奇。

与阻塞的组合

结合之前提到的阻塞,有四种工作模式:

  • 同步阻塞方式:发送方发送请求之后一直等待响应。接收方处理请求时,直到进行的操作返回结果后,才响应发送方,期间不能进行其他工作。
  • 同步非阻塞方式:发送方发送请求之后,一直等待响应。接受方处理请求时进行的IO操作如果不能马上的得到结果,就立即返回,去做其他事情。但是由于没有得到请求处理结果,不响应发送方,发送方一直等待。当操作完成以后,将完成状态和结果通知接收方,接收方再响应发送方,发送方才进入下一次请求过程。(实际不应用)
  • 异步阻塞方式:发送方向接收方请求后,不等待响应,可以继续其他工作。接收方处理请求时进行IO操作如果不能马上得到结果,就一直等到返回结果后,才响应发送方,期间不能进行其他操作。(实际不应用)
  • 异步非阻塞方式:发送方向接收方请求后,不等待响应,可以继续其他工作。接收方处理请求时进行IO操作如果不能马上得到结果,也不等待,而是马上返回去做其他事情。当IO操作完成以后,将完成状态和结果通知接收方,接收方再响应发送方。(效率最高)

写完上面的理论后,讲真我是有点晕的。网上也没有多少货真价实的代码,要么乱抄,要么结合到具体的框架,如协称等,不容易让人读懂。2021年4月回来填坑,每个示例加一部分代码,企图让自己读懂 and 帮助后来者更好的理解这一部分。每一个代码的后面都有输出,可以仔细看看输出。

同步阻塞

这也是最容易实现的一种方式,实现不了好的,我还实现不出来坏的?loop 函数阻塞了主线程,主线程等待子线程执行完毕。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import time, threading

def loop():
print('thread %s is running...' % threading.current_thread().name)
time.sleep(1)
print('thread %s ended.' % threading.current_thread().name)

for i in range(3):
t = threading.Thread(target=loop, name='Thread-{}'.format(i))
t.start()
# 阻塞调用线程直至线程的join()方法结束
t.join()

print('Main Thread can run now')

# thread Thread-0 is running...
# thread Thread-0 ended.
# thread Thread-1 is running...
# thread Thread-1 ended.
# thread Thread-2 is running...
# thread Thread-2 ended.
# Main Thread can run now

异步非阻塞

这里通俗一点,假设A线程调用B线程。就是调用线程A没有被阻塞,目标线程B以异步的形式去执行代码,获取结果后回调。而A线程不用一直等待结果,而是通过回调函数拿到返回结果并处理,直接执行下一步操作。

我们创建一个线程池,通过线程池来观察异步的执行情况。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
from concurrent.futures import ThreadPoolExecutor
import time, threading

pool = ThreadPoolExecutor(5)

def task(n):
print(threading.current_thread().name, 'has task {}'.format(n))
time.sleep(2)
return n ** 2

# 回调函数:异步提交之后一旦任务有返回结果,自动交给另外一个去执行
def call_back(n):
print(threading.current_thread().name, "get result {}".format(n.result()))

if __name__ == '__main__':
t_list = []
for i in range(6):
# 异步提交任务,通过回掉函数拿到执行结果
future = pool.submit(task, i).add_done_callback(call_back)
t_list.append(future)
# 异步提交,所以没有返回结果,t_list 全是 None
print(t_list)
print('Main thread can run others')

# ThreadPoolExecutor-0_0 has task 0
# ThreadPoolExecutor-0_1 has task 1
# ThreadPoolExecutor-0_2 has task 2
# ThreadPoolExecutor-0_3 has task 3
# ThreadPoolExecutor-0_4 has task 4
# [None, None, None, None, None, None]
# Main thread can run others

# ThreadPoolExecutor-0_2 get result 4
# ThreadPoolExecutor-0_0 get result 0
# ThreadPoolExecutor-0_2 has task 5
# ThreadPoolExecutor-0_4 get result 16
# ThreadPoolExecutor-0_1 get result 1
# ThreadPoolExecutor-0_3 get result 9
# ThreadPoolExecutor-0_2 get result 25

关于 add_done_callback,将可调用的 func 附加到 future 上。当 future 被取消或完成运行时,将调用 func,并将 future 作为其唯一的参数。

同步非阻塞

和上述代码类似,创建一个线程池提交任务,主进程不被阻塞,但同步的等待结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# 同步非阻塞
from concurrent.futures import ThreadPoolExecutor
import time, threading

pool = ThreadPoolExecutor(5)

def task(n):
print(threading.current_thread().name, 'has task {}'.format(n))
time.sleep(2)
return n ** 2

if __name__ == '__main__':
t_list = []
for i in range(6):
# 同步:提交任务之后,原地等待任务的返回结果,再继续执行下一步代码
# 所以不需要回调
future = pool.submit(task, i)
t_list.append(future.result())
# 同步提交能等待结果,所以 list 里面就是结果
print(t_list)
print('Main thread can run others')

# ThreadPoolExecutor-0_0 has task 0
# ThreadPoolExecutor-0_0 has task 1
# ThreadPoolExecutor-0_0 has task 2
# ThreadPoolExecutor-0_0 has task 3
# ThreadPoolExecutor-0_0 has task 4
# ThreadPoolExecutor-0_0 has task 5
# [0, 1, 4, 9, 16, 25]
# Main thread can run others

异步阻塞

这个实现也是比较简单的,主线程启动一个子任务后去执行其它命令,等待子任务的回调,这就是异步;而子任务会把调用子任务的线程给阻塞起来;所以要实现这个任务,包括主线程在内,至少要有三个线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import threading, time

def loop():
print(threading.current_thread().name, 'is running')
time.sleep(1)
print(threading.current_thread().name, 'is ended')

def task():
for i in range(3):
t = threading.Thread(target=loop, name='Thread-{}'.format(i))
t.start()
t.join()
print("call back here")

t = threading.Timer(2, task)
t.start()

print("Main thread can run others")

# Main thread can run others
# Thread-0 is running
# Thread-0 is ended
# Thread-1 is running
# Thread-1 is ended
# Thread-2 is running
# Thread-2 is ended
# call back here

结语

例子尽力举的通俗一些,企图让我和其他小白读者看懂。等哪天学到协称了吧,通过一个项目实际的讲解下。

参考

  1. https://www.zhihu.com/question/19732473/answer/20851256
  2. https://www.cnblogs.com/loveer/p/11479249.html
感谢上学期间打赏我的朋友们。赛博乞讨:我,秦始皇,打钱。

欢迎订阅我的文章