w3_pool_queue - steelbear/HMG_Softeer_DE GitHub Wiki

๋ฌธ์ œ

ํŒŒ์ด์ฌ ๊ณต์‹ ๋ฌธ์„œ์— ๋”ฐ๋ฅด๋ฉด, Queue๋Š” ํ”„๋กœ์„ธ์Šค๊ฐ„ ๋ฐ์ดํ„ฐ๋ฅผ ๊ณต์œ ํ•  ์ˆ˜ ์žˆ๋Š” ๊ฐ์ฒด์˜ ํ•˜๋‚˜๋กœ Process์—๊ฒŒ ์ „๋‹ฌํ•˜์—ฌ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

if __name__ == '__main__':
    queue = Queue()
    p1 = Process(work, (queue, ))
    p2 = Process(work, (queue, ))

    p1.start()
    p2.start()
    print(queue.get())
    print(queue.get())
    p1.join()
    p2.join()

์œ„์˜ ์ฝ”๋“œ์—์„œ ๊ฐ ํ”„๋กœ์„ธ์Šค๊ฐ€ queue์—๊ฒŒ ์–ด๋–ค ๊ฐ’์„ ์ง‘์–ด๋„ฃ์—ˆ๋‹ค๋ฉด ๋ฉ”์ธ ํ”„๋กœ์„ธ์Šค์—์„œ ์ด๋ฅผ ๋ณผ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

ํ•˜์ง€๋งŒ ์ด๋Š” Pool์—์„œ๋Š” ๊ทธ๋Œ€๋กœ ์ˆ˜ํ–‰ํ•  ์ˆ˜ ์—†์Šต๋‹ˆ๋‹ค.

if __name__ == '__main__':
    queue = Queue()
    
    with Pool(2) as p:
        p.apply(work, (queue, ))

๋‹ค์Œ์„ ์‹คํ–‰ํ•˜๋ฉด ์ด๋Ÿฐ ์˜ค๋ฅ˜๊ฐ€ ๋ฐœ์ƒํ•ฉ๋‹ˆ๋‹ค.

RuntimeError: Queue objects should only be shared between processes through inheritance

๊ทธ๋ ‡๋‹ค๋ฉด Process์€ ์ž˜ ๋ฐ›์•˜๋˜ Queue๋ฅผ Pool์€ ์™œ ๋ฐ›์„ ์ˆ˜ ์—†์„๊นŒ์š”?

Pool์€ ์ธ์ž๋ฅผ pickling ํ•œ๋‹ค

def apply(self, func, args=(), kwds={}):
    '''
    Equivalent of `func(*args, **kwds)`.
    Pool must be running.
    '''
    return self.apply_async(func, args, kwds).get()


def apply_async(self, func, args=(), kwds={}, callback=None,
        error_callback=None):
    '''
    Asynchronous version of `apply()` method.
    '''
    self._check_running()
    result = ApplyResult(self, callback, error_callback)
    self._taskqueue.put(([(result._job, 0, func, args, kwds)], None))
    return result

๋‹ค์Œ ์ฝ”๋“œ๋Š” Pool.apply์™€ Pool.apply_async์˜ ๊ตฌํ˜„ ์ฝ”๋“œ์ž…๋‹ˆ๋‹ค. ์ฝ”๋“œ์—์„œ๋Š” func์™€ args, kwds ๋ชจ๋‘ self._taskqueue์— ์ง‘์–ด๋„ฃ๋Š”๋‹ค๋Š” ๊ฒƒ์„ ๋ณผ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

class Pool(object):
    '''
    Class which supports an async version of applying functions to arguments.
    '''
    _wrap_exception = True

    @staticmethod
    def Process(ctx, *args, **kwds):
        return ctx.Process(*args, **kwds)

    def __init__(self, processes=None, initializer=None, initargs=(),
                 maxtasksperchild=None, context=None):
        # Attributes initialized early to make sure that they exist in
        # __del__() if __init__() raises an exception
        self._pool = []
        self._state = INIT

        self._ctx = context or get_context()
        self._setup_queues()
        self._taskqueue = queue.SimpleQueue()

๊ทธ๋ฆฌ๊ณ  self._taskqueue๋Š” SimpleQueue ๊ฐ์ฒด์ž…๋‹ˆ๋‹ค.

๊ทธ๋ ‡๋‹ค๋ฉด SimpleQueue๊ฐ€ ์–ด๋–ป๊ฒŒ ์ €์žฅํ•˜๋Š”์ง€๋ฅผ ํ™•์ธํ•ด๋ด…์‹œ๋‹ค.

def put(self, obj):
    # serialize the data before acquiring the lock
    obj = _ForkingPickler.dumps(obj)
    if self._wlock is None:
        # writes to a message oriented win32 pipe are atomic
        self._writer.send_bytes(obj)
    else:
        with self._wlock:
            self._writer.send_bytes(obj)

SimpleQueue.put์˜ ๊ตฌํ˜„ ์ฝ”๋“œ๋ฅผ ๋ณด๋ฉด _ForkingPickler.dumps๋ฅผ ํ†ตํ•ด ๋ฐ›์€ obj ์ „์ฒด๋ฅผ pickling ํ•ฉ๋‹ˆ๋‹ค.

์—ฌ๊ธฐ์„œ obj์—๋Š” func ๋ฟ๋งŒ ์•„๋‹ˆ๋ผ args ๋ชจ๋‘ ํฌํ•จ๋˜์–ด์žˆ์–ด pickling์ด ๋œ๋‹ค๋Š” ๊ฒƒ์„ ํ™•์ธํ–ˆ์Šต๋‹ˆ๋‹ค.

์ฆ‰, Pool.apply๋ฅผ ํ†ตํ•ด Queue๋ฅผ ์ „๋‹ฌํ•˜๋ฉด Queue ๊ฐ์ฒด๊ฐ€ pickling ๋˜๊ธฐ์— ์ด๋ฅผ ๋ง‰๊ธฐ์œ„ํ•ด ์˜ค๋ฅ˜๊ฐ€ ๋ฐœ์ƒํ–ˆ์Šต๋‹ˆ๋‹ค.

๊ทธ๋ ‡๊ธฐ ๋•Œ๋ฌธ์— Manager ๊ฐ์ฒด๋ฅผ ํ†ตํ•ด pickling ๋˜๋”๋ผ๋„ ์›๋ณธ ๊ฐ์ฒด์— ์ ‘๊ทผํ•  ์ˆ˜ ์žˆ๋Š” ํ”„๋ก์‹œ ๊ฐ์ฒด๋ฅผ ๋งŒ๋“ค์–ด์„œ ์ „๋‹ฌํ•˜๋Š” ๋ฐฉ๋ฒ•์„ ์‚ฌ์šฉํ•ด์•ผ ํ•ฉ๋‹ˆ๋‹ค.