some knowledgement of ZMQ - Serbipunk/notes GitHub Wiki

sender code

import storm_conf
import sys


def stdin():
    for line in sys.stdin:
        yield line

STDIN = stdin()


def cradle():
    u"""
    """
    import zmq
    context = zmq.Context(10)
    pusher = context.socket(zmq.PUSH)
    pusher.setsockopt(zmq.HWM,10)
    pusher.setsockopt(zmq.SWAP,20*1024*1024*8)
    pusher.setsockopt(zmq.SNDBUF,10240)
    pusher.setsockopt(zmq.AFFINITY,1)
    pusher.bind(storm_conf.PIPE_CRADLE)
    while True:
        try:
            task = STDIN.next()
            pusher.send(task)
        except:
            sys.exit()
            break

if __name__ == "__main__":
    cradle()

reveiver code

import os
import sys
import storm_conf
import json
DIR_PATH = os.path.abspath(os.path.dirname(os.path.realpath(__file__)))
sys.path[0:0] = [os.path.split(DIR_PATH)[0]]
from realtime.parse.parse import Qparse
from upload.upload import up2jdata
from tools.merge import Merge
from tools.tasks import Q
from conf import realtime as realtime_conf
from tools.qfile import split_file_by_date
from tools.paths import buffile
import zmq
from time import sleep



def clean():
    u"""
    """
    context = zmq.Context()
    socket_in = context.socket(zmq.PULL)
    socket_in.setsockopt(zmq.HWM, storm_conf.HWM_CLEAN)
    #socket.bind(storm_conf.PIPE_PORT)
    socket_in.connect(storm_conf.PIPE_CRADLE)

    consumer_sender = context.socket(zmq.PUSH)
    consumer_sender.connect(storm_conf.PIPE_ROUTER_IN)
    n = 0
    merge = Merge(mergerule=realtime_conf.REALTIME_MERGE_RULE)
    while True:
        task = socket_in.recv()
        #print task
        if task is None:
            sleep(1)
            continue
        task = Qparse(task)
        if task is None:
            continue
        buff = [task['date_5m_s'], task['customer'],task['idc'], task['isp'],task['area'], task['contlength'],1, task['HIT'],task['MISS']] 
        if n < 400000:
            merge.add(buff)
            n += 1
            #print n
        else:
            print "upload to jdata"
            n = 0
            path = buffile()
            merge.to_file(path)
            sys.stdout.write(">> file" + str(path))
            merge.reset()
            files = split_file_by_date(path)
            sys.stdout.write('files:>>'+str(files))
            for date, f in files.iteritems():
                Q.enqueue(up2jdata,f.name, date, "cache_realtime_bw", ',', keep=True)
                #up2jdata(f.name, date, "cache_realtime_bw", ',', keep=True)
        del buff
        del task

if __name__ == "__main__":
    clean()

https://stackoverflow.com/questions/42948798/understanding-zmqs-hwm

https://github.com/zeromq/pyzmq/issues/276

https://stackoverflow.com/questions/44145964/zeromq-can-someone-please-explain-how-hwm-works