why zmq socket cannot be init in process.__init__() ????????? - Serbipunk/notes GitHub Wiki
from multiprocessing import Process
import platform
import zmq
import random
import time
PRODUCER_PORT = "tcp://127.0.0.1:25557" # 安排工作的端口
COLLECTOR_PORT = "tcp://127.0.0.1:25558" # 收集工作的端口
class BaseProducer(Process):
def __init__(self, producer_addr=None, collector_addr=None):
if platform.python_version()[0] == '2':
super(BaseProducer, self).__init__() # works well in Linux
else:
super().__init__()
self.producer_addr = producer_addr
self.collector_addr = collector_addr
self.zmq_socket = None
# ==== won't work ====
# binding socket
# context = zmq.Context()
# self.zmq_socket = context.socket(zmq.PUSH)
# self.zmq_socket.bind(self.producer_addr)
def run(self):
# ==== will work ====
context = zmq.Context()
self.zmq_socket = context.socket(zmq.PUSH)
self.zmq_socket.bind(self.producer_addr)
for num in range(10000):
print('pushing {}'.format(num))
work_message = {'num': num}
self.zmq_socket.send_json(work_message) # 阻塞,直到push成功
class BaseConsumer(Process):
WORK_WITH_COLLECTOR = False # 是否加上collector
def __init__(self, producer_addr=None, collector_addr=None):
if platform.python_version()[0] == '2':
super(BaseConsumer, self).__init__() # works well in Linux
else:
super().__init__()
self.producer_addr = producer_addr
self.collector_addr = collector_addr
self.consumer_receiver = None
# ==== won't work ====
# context = zmq.Context()
# self.consumer_receiver = context.socket(zmq.PULL)
# self.consumer_receiver.connect(self.producer_addr)
def run(self):
# ==== will work ====
context = zmq.Context()
self.consumer_receiver = context.socket(zmq.PULL)
self.consumer_receiver.connect(self.producer_addr)
while True:
print('worker {} waiting json:'.format(self.pid))
work = self.consumer_receiver.recv_json() # 阻塞,直到收到
# data = work['num']
# print('worker {} json received:'.format(self.pid), work)
#
# if RUN_WITH_COLLECTOR:
# result = {'consumer': self.pid, 'num': data}
# # consumer_sender.send_json(result) # 应该阻塞
from definition import PRODUCER_PORT, COLLECTOR_PORT
from definition import Collector, Consummer, Producer
from definition import BaseProducer, BaseConsumer
def main1():
consummer_pool = []
for i in range(4):
pc = BaseConsumer(producer_addr=PRODUCER_PORT, collector_addr=COLLECTOR_PORT)
pc.start()
consummer_pool.append(pc)
pp = BaseProducer(producer_addr=PRODUCER_PORT)
pp.start()
pp.join()
for p in consummer_pool:
p.join()
while True:
time.sleep(2)