asyncio vs thread - goddes4/python-study-wiki GitHub Wiki

๋‹ค์ˆ˜์˜ concurrent task ๊ฐ€ ์กด์žฌ ํ•  ๋•Œ thread ์™€ asyncio ์˜ ์„ฑ๋Šฅ ํ…Œ์ŠคํŠธ

  • ์ „์ œ ์กฐ๊ฑด :
  • cpu-bound task ๊ฐ€ ์•„๋‹Œ i/o bound task ์—ฌ์•ผ ํ•จ
  • cpu bound task ๋ผ๋ฉด multiprocessing ๊ณ ๋ ค
task 100๊ฐœ ๊ฒฐ๊ณผ
- thread  : 5.033 seconds
- asyncio : 5.013 seconds

task 400๊ฐœ ๊ฒฐ๊ณผ
- thread  : 5.105 seconds
- asyncio : 5.045 seconds

task 800๊ฐœ ๊ฒฐ๊ณผ
- thread  : 5.202 seconds
- asyncio : 5.079 seconds

๊ฒฐ๋ก  :

  • ๋™์‹œ์„ฑ ์ฒ˜๋ฆฌ๋ฅผ ์œ„ํ•ด์„œ task ๋‹น thread ๋ฅผ ํ• ๋‹น ํ•˜๋Š”๊ฒƒ์€ ๋ฉ”๋ชจ๋ฆฌ ์‚ฌ์šฉ๋Ÿ‰ ๋ฐ ์ปจํ…์ŠคํŠธ ์Šค์œ„์น˜ ๋น„์šฉ์ด ์ฆ๊ฐ€ ํ•˜๋Š” ๋ฌธ์ œ๊ฐ€ ๋ฐœ์ƒ.
  • C10K ๋ฌธ์ œ์˜ ๊ฒฝ์šฐ asyncio ๊ฐ€ ์ข‹์€ ์†”๋ฃจ์…˜์ด ๋  ์ˆ˜ ์žˆ์Œ.
  • asyncio ์˜ ๊ฒฝ์šฐ ์‹ฑ๊ธ€ ์“ฐ๋ ˆ๋“œ๋กœ event loop ๊ฐ€ task ๋ฅผ ์ฒ˜๋ฆฌํ•˜๋Š” ๋ฐฉ์‹.
  • asyncio ๋Š” ์‹ฑ๊ธ€ ์“ฐ๋ ˆ๋“œ์ด๊ธฐ ๋•Œ๋ฌธ์— task ์— blocking ์ฝ”๋“œ๊ฐ€ ์žˆ์œผ๋ฉด ์ „์ฒด์ ์œผ๋กœ ์„ฑ๋Šฅ ์ €ํ•˜๊ฐ€ ์˜ค๊ธฐ ๋•Œ๋ฌธ์— ๋ชจ๋“  ์ฝ”๋“œ๋ฅผ ๋น„๋™๊ธฐํ™” ์‹œ์ผœ์•ผ ํ•จ. (using yield from coroutine)
  • asyncio ์˜ ์žฅ์ ์€ Coroutine ์„ ํ†ตํ•ด ๋น„๋™๊ธฐ์‹ ์ฝ”๋“œ๋ฅผ ๋งˆ์น˜ ๋™๊ธฐ์‹ ์ฝ”๋“œ์ฒ˜๋Ÿผ ์ž‘์„ฑ ํ•  ์ˆ˜ ์žˆ์–ด์„œ ์ฝ”๋“œ ๊ฐ€๋…์„ฑ๋„ ์ข‹์Œ.
  • ๋ณดํ†ต ๋น„๋™๊ธฐ ํ˜ธ์ถœ์˜ ๊ฒฐ๊ณผ๋ฅผ ๋ฐ›๊ธฐ ์œ„ํ•ด์„œ callback ์„ ๋“ฑ๋กํ•˜์—ฌ ์ฒ˜๋ฆฌ ํ•˜๊ฑฐ๋‚˜, queue ๋ฅผ ํ†ตํ•ด์„œ ์ˆ˜์‹  ๋ฐ›์•„ ์ฒ˜๋ฆฌ ํ•˜๋Š”๋ฐ ์ด๋Ÿฐ ์ฝ”๋“œ๊ฐ€ ๋งŽ์•„ ์ง€๋ฉด ๊ฐ€๋…์„ฑ์ด ์ €ํ•˜๋œ๋‹ค. ์ด๋ฅธ๋ฐ” callback hell.
  • ๊ฐœ๋ฐœ์ž๊ฐ€ ์ง์ ‘ I/O Multiplexing (select, poll, epoll, kqueue) ์ฝ”๋“œ๋ฅผ ๊ตฌํ˜„ํ•˜๋Š” ๊ฒƒ ๋‚œ์ด๋„๊ฐ€ ์ƒ๋‹นํ•˜๋‹ค

example : thread

import asyncio
from concurrent.futures.thread import ThreadPoolExecutor
from logging import DEBUG
import random
import threading
import datetime
import logging
import time

MAX_TASKS = 800


def long_time_process(task_id):
    """
    ์˜ˆ๋ฅผ ๋“ค๋ฉด ์™ธ๋ถ€ open_api ๋ฅผ ์ด์šฉํ•˜์—ฌ ๋ฐ์ดํ„ฐ ์ˆ˜์ง‘
    :param task_id:
    :return:
    """
    print(datetime.datetime.now(), threading.current_thread(), "long_time_process start.", task_id)
    # time.sleep(random.randint(1, 5))
    time.sleep(5)
    return "done"


def slow_task(task_id):
    ret = long_time_process(task_id)
    print(datetime.datetime.now(), threading.current_thread(), "slow_task-", task_id, "ret=", ret)


if __name__ == '__main__':

    start_time = time.time()
    with ThreadPoolExecutor(max_workers=MAX_TASKS) as executor:
        for i in range(MAX_TASKS):
            # task ์‹คํ–‰
            executor.submit(slow_task, task_id=i)
    process_time = time.time() - start_time

    print("process time : {}".format(process_time))

example : asyncio

import asyncio
from logging import DEBUG
import threading
import datetime
import logging
import time

MAX_TASKS = 800


@asyncio.coroutine
def long_time_process(task_id):
    """
    ์˜ˆ๋ฅผ ๋“ค๋ฉด ์™ธ๋ถ€ open_api ๋ฅผ ์ด์šฉํ•˜์—ฌ ๋ฐ์ดํ„ฐ ์ˆ˜์ง‘
    :param task_id:
    :return:
    """
    print(datetime.datetime.now(), threading.current_thread(), "long_time_process start.", task_id)
    # yield from asyncio.sleep(random.randint(1, 5))
    yield from asyncio.sleep(5)
    return "done"


@asyncio.coroutine
def slow_task(task_id):
    ret = yield from long_time_process(task_id)
    print(datetime.datetime.now(), threading.current_thread(), "slow_task-", task_id, "ret=", ret)


logging.basicConfig(level=DEBUG)

if __name__ == '__main__':
    event_loop = asyncio.get_event_loop()
    # event_loop.set_debug(True)

    futures = []

    start_time = time.time()
    for i in range(MAX_TASKS):
        # task ๋“ฑ๋ก : coroutine function ๋งŒ ๋“ฑ๋ก ๊ฐ€๋Šฅ (coroutine decorating)
        f = event_loop.create_task(slow_task(task_id=i))
        futures.append(f)

    # ์ตœ๋Œ€ 5์ดˆ๊ฐ„ ๋“ฑ๋ก๋œ task ์˜ ์ข…๋ฃŒ๋ฅผ ๊ธฐ๋‹ค๋ฆฐ๋‹ค.
    event_loop.run_until_complete(asyncio.wait(futures, timeout=5))
    event_loop.close()

    process_time = time.time() - start_time

    print("process time : {}".format(process_time))