ZeroMQ - goddes4/python-study-wiki GitHub Wiki

νŠΉμ§•

  • μž„λ² λ””λ“œ λ„€νŠΈμ›Œν‚Ή 라이브러리 (ActiveMQ or RabbitMQ 같은 λ©”μ‹œμ§• λΈŒλ‘œμ»€κ°€ μ‘΄μž¬ν•˜μ§€ μ•ŠμŒ)
  • μ‚¬μš©μžμ— μ΅μˆ™ν•œ Socket Style API둜 λ””μžμΈλœ 라이브러리
  • λ©”μ‹œμ§€ μˆ˜μ‹ μ΄ κ°€λŠ₯ν•˜μ§€ μ•Šμ„ λ•ŒλŠ” Queuing 될 수 μžˆλ‹€ (over-full 큐에 λŒ€ν•œ 정책은 λ©”μ‹œμ§• νŒ¨ν„΄μ— 따라 κ²°μ •)
  • λ§Žμ€ λ…Έλ ₯ 없이도 λ³΅μž‘ν•œ 톡신 μ‹œμŠ€ν…œμ„ 섀계할 수 μžˆλ„λ‘ ν•΄μ£ΌλŠ” λ©”μ‹œμ§• 라이브러리
  • λΆ„μ‚° λ˜λŠ” λ™μ‹œμ„± μ• ν”Œλ¦¬μΌ€μ΄μ…˜μ—μ„œ μœ μš©ν•˜λ‹€
  • λΌμš°νŒ…μ΄ κ°€λŠ₯ν•œ λ©”μΌλ°•μŠ€λ‹€
  • lock-free (lock, semaphores λŒ€κΈ°κ°€ ν•„μš”ν•˜μ§€ μ•ŠμŒ)
  • μžλ™μœΌλ‘œ 재 μ—°κ²° μ‹œλ„ (μ–΄λ–€ μˆœμ„œλ‘œλ„ κ΅¬μ„±μš”μ†Œλ₯Ό μ‹œμž‘ ν•  수 있음) μ–Έμ œλ“ μ§€ λ„€νŠΈμ›Œν¬μ— μ°Έμ—¬ν•˜κ³  λ– λ‚  수 μžˆλŠ” μ„œλΉ„μŠ€ μ§€ν–₯ μ•„ν‚€ν…μ²˜(SOA)λ₯Ό λ§Œλ“€μˆ˜ μžˆλ‹€.

λΆ„μ‚° μ• ν”Œλ¦¬μΌ€μ΄μ…˜(Distributed Application)은 κ±°λŒ€ν•œ ν•˜λ‚˜μ˜ μ• ν”Œλ¦¬μΌ€μ΄μ…˜(Monolithic Application) 보닀 변화에 λŒ€μ‘ν•˜κΈ° 쉽닀.

server κ°€ bind() ν•˜μ§€ μ•Šμ€ μƒνƒœμ—μ„œ client κ°€ connect() ν•˜κ³  λ©”μ‹œμ§€λ₯Ό μ†‘μ‹ ν•˜λ”λΌλ„ queuing 후에 μ²˜λ¦¬λœλ‹€.
(λ©”μ‹œμ§€ 전솑 이후에 close() ν•˜λ”λΌλ„ λ‚˜μ€‘μ— μ„œλ²„κ°€ bind() 이후에 데이터가 전달됨,
 close() 이후에 데이터 전솑을 μ›ν•˜μ§€ μ•ŠμœΌλ©΄ `socket.setsockopt(zmq.LINGER, 0)` μ„€μ •)

(connect)(3)PUSH-PULL(1)(bind)
PUSH-1 μ—μ„œ 3번 전솑, PUSH-2 μ—μ„œ 3번 전솑, μ„œλ²„ bind() 순으둜 μ§„ν–‰ ν•  λ•Œ λ©”μ‹œμ§€λŠ” PUSH-1, PUSH-2 ν•œλ²ˆμ”© λ²ˆκ°ˆμ•„ μ²˜λ¦¬ν•œλ‹€.

(connect)(1)PUSH-PULL(3)(bind)
3개의 PULL μ—μ„œ λ™μ‹œμ— recv() λŒ€κΈ°μ€‘μΈ μƒνƒœμ—μ„œ PUSH μ—μ„œ μ „μ†‘ν•˜κ²Œ 되면 3쀑 ν•œκ°œμ˜ PULL만 λ©”μ‹œμ§€λ₯Ό μˆ˜μ‹ ν•œλ‹€.

λ…Έλ“œκ°„ μ—°κ²° 방식

  • ν•œ ν”„λ‘œμ„ΈμŠ€μ˜ 두 μ“°λ ˆλ“œ
  • ν•œ μ‹œμŠ€ν…œμ˜ 두 ν”„λ‘œμ„ΈμŠ€
  • λ„€νŠΈμ›Œν¬μƒμ˜ 두 μ‹œμŠ€ν…œ

Messaging Pattern

  • Request-Reply Pattern
  • Publish/Subscribe pattern
  • Pipeline Pattern

Device

Queue

Forwarder

Streamer

Installation in python

# pip install pyzmq

버전 확인

import zmq

print(zmq.pyzmq_version())

ZeroMQ Context

zmq 라이브러리 κΈ°λŠ₯을 μ‚¬μš©ν•˜κΈ° 전에 λ¨Όμ € 생성 λ˜μ•Ό ν•œλ‹€.

import zmq

context = zmq.Context()

ZeroMQ Sockets

zmq μ†ŒμΌ“μ€ context λ₯Ό 톡해 생성할 수 μžˆλ‹€.

socket = context.socket(zmq.REP)

Example

REQ/REP

νŠΉμ§•

  • REQ socket 은 λ§Žμ€ μ„œλ²„μ— μ—°κ²°(connect) ν•  수 μžˆλ‹€.
  • REQ 의 send() λŠ” 응닡이 올 λ•Œ κΉŒμ§€ block λœλ‹€.
  • REP 의 recv() λŠ” μš”μ²­μ΄ μˆ˜μ‹  될 λ•Œ κΉŒμ§€ block λœλ‹€.

Replay

λ©”μ‹œμ§€ 응닡

import zmq

context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind('tcp://127.0.0.1:10101')

while True:
    print('recv : ' + socket.recv_string())
    socket.send_string('world')

Request

import zmq
import time

context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect('tcp://127.0.0.1:10101')

while True:
    socket.send_string('hello')
    print('recv : ' + socket.recv_string())
    time.sleep(3)

===

Pub/Sub

νŠΉμ§•

  • PUB socket의 경우 μ—°κ²°λœ SUBκ°€ μ—†λŠ” 경우 λ©”μ‹œμ§€λŠ” 버렀진닀.

SUB 의 경우 λ°˜λ“œμ‹œ setsockopt() λ₯Ό μ‚¬μš©ν•˜μ—¬ subscription 을 μ„€μ •ν•΄μ•Ό ν•œλ‹€. SUB/PUB μ–΄λŠκ³³μ—μ„œ μ—°κ²°(connect)ν•˜λ“ , λ°”μΈλ“œ(bind) ν•˜λ“  λ¬Έμ œλŠ” λ˜μ§€ μ•ŠμœΌλ‚˜, λ§Œμ•½ SUB μ†ŒμΌ“μ„ λ¨Όμ € λ°”μΈλ“œ(bind) ν•˜κ³  λ‚˜μ€‘μ— PUB μ†ŒμΌ“μ„ μ—°κ²°(connect)ν•˜λ©΄ SUB μ†ŒμΌ“μ€ 였래된 λ©”μ‹œμ§€λ₯Ό 받을 수 μ—†κ²Œ λœλ‹€. κ·ΈλŸ¬λ―€λ‘œ κ°€λŠ₯ν•˜λ©΄ PUB은 λ°”μΈλ“œ(bind), SUB 은 μ—°κ²°(connect) ν•˜λŠ”κ²ƒμ΄ κ°€μž₯ μ’‹λ‹€.

Publisher κ°€ 바인딩 ν›„ μ¦‰μ‹œ λ©”μ‹œμ§€λ₯Ό μ „μ†‘ν•˜λ©΄, Subscriber λŠ” 데이터λ₯Ό μˆ˜μ‹  λͺ»ν•  κ°€λŠ₯성이 μžˆμŠ΅λ‹ˆλ‹€. 이λ₯Ό μœ„ν•΄μ„œ Subscriber κ°€ μ—°κ²°ν•˜κ³  μ€€λΉ„λ˜κΈ°κΉŒμ§€ 데이터λ₯Ό λ°œμ†‘ν•˜μ§€ μ•Šλ„λ‘ 동기화 ν•˜λŠ” 방법을 μ œκ³΅ν•œλ‹€.

ØMQ의 PUB-SUB Pattern νŠΉμ§•

  • ν•˜λ‚˜μ˜ Subscriber λŠ” ν•œ 개 μ΄μƒμ˜ Publisher 에 μ—°κ²°ν•  수 μžˆλ‹€.
  • Subscriber κ°€ μ—†λ‹€λ©΄ λͺ¨λ“  Publisher 의 λ©”μ‹œμ§€λŠ” μœ μ‹€ λœλ‹€.
  • Subscriber μ—μ„œλ§Œ λ©”μ‹œμ§€ 필터링이 κ°€λŠ₯ν•˜λ‹€.

Publisher

3 μ΄ˆμ— ν•œλ²ˆ λ©”μ‹œμ§€ 전솑

import zmq
import time

context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind('tcp://127.0.0.1:10100')

while True:
    socket.send_string('Hello')
    time.sleep(3)

Subscriber

import zmq

context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect('tcp://127.0.0.1:10100')
socket.setsockopt_string(zmq.SUBSCRIBE, '') # it will capture all messages

while True:
    print(socket.recv_string())

===

PULL/PUSH (Pipeline νŒ¨ν„΄)

νŠΉμ§•

  • PUSH socket 은 연결이 μ‘΄μž¬ν•˜μ§€ μ•Šμ„ λ•Œ send() ν•˜κΈ° 되면 μ—°κ²° 될 λ•Œ κΉŒμ§€ 블둝 λœλ‹€.
  • μ „λ‹¬λœ λ©”μ‹œμ§€λŠ” μ—°κ²°λœ socket 에 round robin λœλ‹€.

  • WorkerλŠ” ventilator에 PULL둜 μ—°κ²°(connect)λ˜μ–΄ 있고, sinkμ™€λŠ” PUSH둜 μ—°κ²°(Connect) λ˜μ–΄ μžˆλ‹€. 이것은 Worker λŠ” μž„μ˜λ‘œ μΆ”κ°€ ν•  수 μžˆλ‹€λŠ” 것을 μ˜λ―Έν•œλ‹€. λ§Œμ•½ worker κ°€ 바인딩(bind) λ˜μ–΄ μžˆλ‹€λ©΄ worker κ°€ μΆ”κ°€ 될 λ•Œλ§ˆλ‹€ 맀번 ventilator와 sink에 더 λ§Žμ€ μ†ŒμΌ“μ΄ ν•„μš”ν•˜λ‹€. 이 κ΅¬μ‘°μ—μ„œ ventilator와 sink λŠ” stable part, workerλŠ” dynamic part 라 λΆ€λ₯Έλ‹€.
  • ventilator의 PUSH μ†ŒμΌ“μ€ κ· λ“±ν•˜κ²Œ Worker에 μž‘μ—…μ„ λΆ„λ°°ν•œλ‹€. (load-balancing)
  • Sink의 PULL μ†ŒμΌ“μ€ κ· λ“±ν•˜κ²Œ Worker둜 λΆ€ν„° κ²°κ³Όλ₯Ό μˆ˜μ§‘ν•œλ‹€. (fair-queuing)

PUSH, PULL의 bind, connect λŠ” 상황에 따라 μœ μš©ν•œ νŒ¨ν„΄μ΄ μžˆλ‹€.

  • PUSH - bind, PULL - connect 의 κ²½μš°λŠ” λ™μ‹œ 처리λ₯Ό μœ„ν•œ Producer-Consumer νŒ¨ν„΄μ— 적합
  • PUSH - connect, PULL - bind 의 경우 처리 데이터λ₯Ό ν•œκ³³μœΌλ‘œ 집쀑 μ‹œμΌœ λͺ¨μ„ λ•Œ μ’‹μŒ

Parallel Pipeline with Kill Signaling

Parallel task ventilator

3 μ΄ˆμ— ν•œλ²ˆ λ©”μ‹œμ§€ 전솑

import zmq
import random
import time

context = zmq.Context()
sender = context.socket(zmq.PUSH)
sender.bind('tcp://127.0.0.1:10102')

sink = context.socket(zmq.PUSH)
sink.connect('tcp://127.0.0.1:10103')

print("Press Enter when the workers are ready: ")
_ = input()
print("Sending tasks to workers")

# The first message is "0" and signals start of batch
sink.send(b'0')

# Initialize random number generator
random.seed()

# Send 100 tasks
total_msec = 0

for task_nbr in range(100):
    # Random workload from 1 to 100 msecs
    workload = random.randint(1, 100)
    total_msec += workload

    sender.send_string(u'%i' % workload)
    print(i)

print("Total expected cost: %s msec" % total_msec)

# Give 0MQ time to deliver
time.sleep(1)

Parallel task worker

import sys
import time
import zmq

context = zmq.Context()

# Socket to receive messages on
receiver = context.socket(zmq.PULL)
receiver.connect("tcp://localhost:10102")

# Socket to send messages to
sender = context.socket(zmq.PUSH)
sender.connect("tcp://localhost:10103")

# Process tasks forever
while True:
    s = receiver.recv()

    # Simple progress indicator for the viewer
    sys.stdout.write('.')
    sys.stdout.flush()

    # Do the work
    time.sleep(int(s)*0.001)

    # Send results to sink
    sender.send(b'')

Parallel task sink

import sys
import time
import zmq

context = zmq.Context()

# Socket to receive messages on
receiver = context.socket(zmq.PULL)
receiver.bind("tcp://127.0.0.1:10103")

# Wait for start of batch
s = receiver.recv()

# Start our clock now
tstart = time.time()

# Process 100 confirmations
total_msec = 0
for task_nbr in range(100):
    s = receiver.recv()
    if task_nbr % 10 == 0:
        sys.stdout.write(':')
    else:
        sys.stdout.write('.')
    sys.stdout.flush()

# Calculate and report duration of batch
tend = time.time()
print("Total elapsed time: %d msec" % ((tend-tstart)*1000))

context.term() 을 호좜 ν•˜κΈ° 전에 체크 사항

  • μ—΄λ €μžˆλŠ” μ†ŒμΌ“μ΄ μžˆλ‹€λ©΄ Blocking (LINGER : 0 μ‹œμ—λ„ Blocking)
  • μ†ŒμΌ“μ΄ Close() μƒνƒœλ”λΌλ„ send() κ°€ 처리 되기 μ „κΉŒμ§€ Blocking (LINGER : 0 μ μš©μ‹œ μ˜ˆμ™Έ)
  • 즉 데이터 전솑이 μ™„λ£Œλœ 이후에 μ†ŒμΌ“ μ’…λ£Œ, Context μ’…λ£Œ 순으둜 μ§„ν–‰λ˜μ–΄μ•Ό 함. 데이터 전솑과 상관없이 μ’…λ£Œ ν•˜κ³  μ‹ΆμœΌλ©΄ LINGER zero μ˜΅μ…˜ 적용

Handling Multiple Sockets

This version uses a simple recv loop

import zmq
import time

# Prepare our context and sockets
context = zmq.Context()

# Connect to task ventilator
receiver = context.socket(zmq.PULL)
receiver.connect("tcp://localhost:5557")

# Connect to weather server
subscriber = context.socket(zmq.SUB)
subscriber.connect("tcp://localhost:5556")
subscriber.setsockopt(zmq.SUBSCRIBE, b"10001")

# Process messages from both sockets
# We prioritize traffic from the task ventilator
while True:

    # Process any waiting tasks
    while True:
        try:
            msg = receiver.recv(zmq.DONTWAIT)
        except zmq.Again:
            break
        # process task

    # Process any waiting weather updates
    while True:
        try:
            msg = subscriber.recv(zmq.DONTWAIT)
        except zmq.Again:
            break
        # process weather update

    # No activity, so sleep for 1 msec
    time.sleep(0.001)

This version uses zmq.Poller()

import zmq

# Prepare our context and sockets
context = zmq.Context()

# Connect to task ventilator
receiver = context.socket(zmq.PULL)
receiver.connect("tcp://localhost:5557")

# Connect to weather server
subscriber = context.socket(zmq.SUB)
subscriber.connect("tcp://localhost:5556")
subscriber.setsockopt(zmq.SUBSCRIBE, b"10001")

# Initialize poll set
poller = zmq.Poller()
poller.register(receiver, zmq.POLLIN)
poller.register(subscriber, zmq.POLLIN)

# Process messages from both sockets
while True:
    try:
        socks = dict(poller.poll())
    except KeyboardInterrupt:
        break

    if receiver in socks:
        message = receiver.recv()
        # process task

    if subscriber in socks:
        message = subscriber.recv()
        # process weather update

그밖에 λ©”μ‹œμ§€ νŒ¨ν„΄

Small-Scale Pub-Sub Network

Extended Pub-Sub

Request Distribution

Extended Request-Reply

Pub-Sub Forwarder Proxy

Queue Broker Exameple

Simple request-reply broker

import zmq

# Prepare our context and sockets
context = zmq.Context()
frontend = context.socket(zmq.ROUTER)
backend = context.socket(zmq.DEALER)
frontend.bind("tcp://*:5559")
backend.bind("tcp://*:5560")

# Initialize poll set
poller = zmq.Poller()
poller.register(frontend, zmq.POLLIN)
poller.register(backend, zmq.POLLIN)

# Switch messages between sockets
while True:
    socks = dict(poller.poll())

    if socks.get(frontend) == zmq.POLLIN:
        message = frontend.recv_multipart()
        backend.send_multipart(message)

    if socks.get(backend) == zmq.POLLIN:
        message = backend.recv_multipart()
        frontend.send_multipart(message)

Same as request-reply broker but using zmq.proxy

import zmq

def main():
    """ main method """

    context = zmq.Context()

    # Socket facing clients
    frontend = context.socket(zmq.ROUTER)
    frontend.bind("tcp://*:5559")

    # Socket facing services
    backend  = context.socket(zmq.DEALER)
    backend.bind("tcp://*:5560")

    zmq.proxy(frontend, backend)

    # We never get here…
    frontend.close()
    backend.close()
    context.term()

if __name__ == "__main__":
    main()

Missing Message Problem Solver

참고자료