KR_AsyncIO - somaz94/python-study GitHub Wiki

Python ๋น„๋™๊ธฐ ํ”„๋กœ๊ทธ๋ž˜๋ฐ(AsyncIO) ๊ฐœ๋… ์ •๋ฆฌ


1๏ธโƒฃ AsyncIO ๊ธฐ์ดˆ

AsyncIO๋Š” ํŒŒ์ด์ฌ์˜ ๋น„๋™๊ธฐ ํ”„๋กœ๊ทธ๋ž˜๋ฐ์„ ์œ„ํ•œ ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ์ด๋‹ค.

import asyncio

async def hello():
    print('Hello')
    await asyncio.sleep(1)
    print('World')

# ์‹คํ–‰
asyncio.run(hello())

โœ… ํŠน์ง•:

  • ๋น„๋™๊ธฐ ํ”„๋กœ๊ทธ๋ž˜๋ฐ ์ง€์›
  • ์ฝ”๋ฃจํ‹ด ๊ธฐ๋ฐ˜ ๋™์ž‘
  • ๋‹จ์ผ ์Šค๋ ˆ๋“œ์—์„œ ๋™์‹œ์„ฑ ๊ตฌํ˜„


2๏ธโƒฃ ์ฝ”๋ฃจํ‹ด๊ณผ ํƒœ์Šคํฌ

์ฝ”๋ฃจํ‹ด์€ ์ผ์‹œ ์ค‘๋‹จ๋˜๊ณ  ๋‚˜์ค‘์— ์žฌ๊ฐœ๋  ์ˆ˜ ์žˆ๋Š” ๋น„๋™๊ธฐ ํ•จ์ˆ˜์ด๋ฉฐ, ํƒœ์Šคํฌ๋Š” ์ฝ”๋ฃจํ‹ด์„ ์‹คํ–‰ํ•˜๋Š” ๋‹จ์œ„์ด๋‹ค.

async def task1():
    print('Task 1 ์‹œ์ž‘')
    await asyncio.sleep(2)
    print('Task 1 ์ข…๋ฃŒ')
    return 'Task 1 ๊ฒฐ๊ณผ'

async def task2():
    print('Task 2 ์‹œ์ž‘')
    await asyncio.sleep(1)
    print('Task 2 ์ข…๋ฃŒ')
    return 'Task 2 ๊ฒฐ๊ณผ'

async def main():
    # ๋™์‹œ์— ์—ฌ๋Ÿฌ ํƒœ์Šคํฌ ์‹คํ–‰
    results = await asyncio.gather(
        task1(),
        task2()
    )
    print(f'๊ฒฐ๊ณผ: {results}')

asyncio.run(main())

โœ… ํŠน์ง•:

  • ์—ฌ๋Ÿฌ ํƒœ์Šคํฌ ๋™์‹œ ์‹คํ–‰
  • ๊ฒฐ๊ณผ ์ˆ˜์ง‘ ๊ฐ€๋Šฅ
  • ๋น„๋™๊ธฐ ์ž‘์—… ์กฐ์œจ


3๏ธโƒฃ ์ด๋ฒคํŠธ ๋ฃจํ”„

์ด๋ฒคํŠธ ๋ฃจํ”„๋Š” AsyncIO์˜ ํ•ต์‹ฌ ์‹คํ–‰ ์žฅ์น˜๋กœ, ๋น„๋™๊ธฐ ํƒœ์Šคํฌ๋ฅผ ๊ด€๋ฆฌํ•˜๊ณ  ์‹คํ–‰ํ•œ๋‹ค.

async def long_operation():
    print('์‹œ์ž‘')
    await asyncio.sleep(3)
    print('์ข…๋ฃŒ')
    return '์™„๋ฃŒ'

async def main():
    # ์ด๋ฒคํŠธ ๋ฃจํ”„ ๊ฐ€์ ธ์˜ค๊ธฐ
    loop = asyncio.get_running_loop()
    
    # ํƒœ์Šคํฌ ์ƒ์„ฑ
    task = loop.create_task(long_operation())
    
    # ๋‹ค๋ฅธ ์ž‘์—… ์ˆ˜ํ–‰
    print('๋‹ค๋ฅธ ์ž‘์—… ์ˆ˜ํ–‰ ์ค‘...')
    
    # ํƒœ์Šคํฌ ์™„๋ฃŒ ๋Œ€๊ธฐ
    result = await task
    print(f'๊ฒฐ๊ณผ: {result}')

# ์‹คํ–‰
asyncio.run(main())

โœ… ํŠน์ง•:

  • ์ด๋ฒคํŠธ ๋ฃจํ”„ ์ง์ ‘ ์ œ์–ด
  • ํƒœ์Šคํฌ ์ƒ์„ฑ ๋ฐ ๊ด€๋ฆฌ
  • ๋น„๋™๊ธฐ ์ž‘์—… ์Šค์ผ€์ค„๋ง


4๏ธโƒฃ ๋น„๋™๊ธฐ ์ปจํ…์ŠคํŠธ ๋งค๋‹ˆ์ €

๋น„๋™๊ธฐ ์ปจํ…์ŠคํŠธ ๋งค๋‹ˆ์ €๋Š” ๋น„๋™๊ธฐ ๋ฆฌ์†Œ์Šค๋ฅผ ์•ˆ์ „ํ•˜๊ฒŒ ๊ด€๋ฆฌํ•˜๋Š” ๋ฐ ์‚ฌ์šฉ๋œ๋‹ค.

class AsyncResource:
    async def __aenter__(self):
        print('๋ฆฌ์†Œ์Šค ํš๋“')
        await asyncio.sleep(1)
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print('๋ฆฌ์†Œ์Šค ํ•ด์ œ')
        await asyncio.sleep(1)

async def use_resource():
    async with AsyncResource() as resource:
        print('๋ฆฌ์†Œ์Šค ์‚ฌ์šฉ ์ค‘')
        await asyncio.sleep(1)

# ์‹คํ–‰
asyncio.run(use_resource())

โœ… ํŠน์ง•:

  • ๋น„๋™๊ธฐ ๋ฆฌ์†Œ์Šค ๊ด€๋ฆฌ
  • ์ž๋™ ์ •๋ฆฌ ๋ณด์žฅ
  • ์ปจํ…์ŠคํŠธ ๊ธฐ๋ฐ˜ ์ž‘์—…


5๏ธโƒฃ ๋น„๋™๊ธฐ ๋ฐ˜๋ณต์ž

๋น„๋™๊ธฐ ๋ฐ˜๋ณต์ž๋Š” ๋ฐ์ดํ„ฐ๋ฅผ ๋น„๋™๊ธฐ์ ์œผ๋กœ ์ˆœํšŒํ•  ์ˆ˜ ์žˆ๊ฒŒ ํ•ด์ค€๋‹ค.

class AsyncCounter:
    def __init__(self, limit):
        self.limit = limit
        self.counter = 0

    def __aiter__(self):
        return self

    async def __anext__(self):
        if self.counter < self.limit:
            self.counter += 1
            await asyncio.sleep(0.1)
            return self.counter
        raise StopAsyncIteration

async def use_counter():
    async for i in AsyncCounter(5):
        print(f'์นด์šดํŠธ: {i}')

# ์‹คํ–‰
asyncio.run(use_counter())

โœ… ํŠน์ง•:

  • ๋น„๋™๊ธฐ ๋ฐ˜๋ณต ์ง€์›
  • ์ง€์—ฐ ์‹คํ–‰ ๊ฐ€๋Šฅ
  • ๋ฉ”๋ชจ๋ฆฌ ํšจ์œจ์  ์ฒ˜๋ฆฌ


6๏ธโƒฃ ์‹ค์šฉ์ ์ธ ์˜ˆ์ œ

์‹ค์ œ ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜์—์„œ AsyncIO๋ฅผ ํ™œ์šฉํ•˜๋Š” ๋‹ค์–‘ํ•œ ์˜ˆ์ œ๋ฅผ ์‚ดํŽด๋ณด์ž.

๋น„๋™๊ธฐ ์›น ์š”์ฒญ:

import aiohttp
import asyncio

async def fetch_url(session, url):
    async with session.get(url) as response:
        return await response.text()

async def main():
    urls = [
        'http://example.com',
        'http://example.org',
        'http://example.net'
    ]
    
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        
        for url, html in zip(urls, results):
            print(f'{url}: {len(html)} bytes')

asyncio.run(main())

๋น„๋™๊ธฐ ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค ์ž‘์—…:

import asyncpg

async def create_table(pool):
    async with pool.acquire() as conn:
        await conn.execute('''
            CREATE TABLE IF NOT EXISTS users (
                id SERIAL PRIMARY KEY,
                name TEXT,
                email TEXT
            )
        ''')

async def insert_user(pool, name, email):
    async with pool.acquire() as conn:
        return await conn.fetchval(
            'INSERT INTO users(name, email) VALUES($1, $2) RETURNING id',
            name, email
        )

async def main():
    # ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค ์—ฐ๊ฒฐ ํ’€ ์ƒ์„ฑ
    pool = await asyncpg.create_pool(
        user='user',
        password='password',
        database='dbname',
        host='localhost'
    )
    
    await create_table(pool)
    
    # ๋น„๋™๊ธฐ๋กœ ์—ฌ๋Ÿฌ ์‚ฌ์šฉ์ž ์ถ”๊ฐ€
    users = [
        ('User1', '[email protected]'),
        ('User2', '[email protected]'),
        ('User3', '[email protected]')
    ]
    
    tasks = [insert_user(pool, name, email) for name, email in users]
    user_ids = await asyncio.gather(*tasks)
    print(f'์ถ”๊ฐ€๋œ ์‚ฌ์šฉ์ž ID: {user_ids}')
    
    await pool.close()

๋น„๋™๊ธฐ ํŒŒ์ผ ์ฒ˜๋ฆฌ:

import aiofiles
import asyncio

async def read_large_file(filename):
    async with aiofiles.open(filename, mode='r') as f:
        contents = await f.read()
    return contents

async def write_file(filename, data):
    async with aiofiles.open(filename, mode='w') as f:
        await f.write(data)

async def process_files():
    # ๋™์‹œ์— ์—ฌ๋Ÿฌ ํŒŒ์ผ ์ฝ๊ธฐ
    files = ['file1.txt', 'file2.txt', 'file3.txt']
    tasks = [read_large_file(file) for file in files]
    
    contents = await asyncio.gather(*tasks)
    
    # ์ฒ˜๋ฆฌ๋œ ๋ฐ์ดํ„ฐ ์ €์žฅ
    processed = [f"Processed: {content[:100]}..." for content in contents]
    
    # ๋™์‹œ์— ์—ฌ๋Ÿฌ ํŒŒ์ผ ์“ฐ๊ธฐ
    write_tasks = [
        write_file(f"processed_{file}", data) 
        for file, data in zip(files, processed)
    ]
    
    await asyncio.gather(*write_tasks)
    print("๋ชจ๋“  ํŒŒ์ผ ์ฒ˜๋ฆฌ ์™„๋ฃŒ")

# ์‹คํ–‰
asyncio.run(process_files())

๋น„๋™๊ธฐ ํ ์ฒ˜๋ฆฌ:

import asyncio
import random

async def producer(queue, id):
    for i in range(5):
        # ์ž‘์—… ์ƒ์„ฑ
        item = f"์ƒ์‚ฐ์ž {id} - ํ•ญ๋ชฉ {i}"
        
        # ํ์— ํ•ญ๋ชฉ ์ถ”๊ฐ€
        await queue.put(item)
        print(f"์ƒ์‚ฐ๋จ: {item}")
        
        # ๋žœ๋ค ์ง€์—ฐ
        await asyncio.sleep(random.uniform(0.1, 0.5))

async def consumer(queue, id):
    while True:
        # ํ์—์„œ ํ•ญ๋ชฉ ๊ฐ€์ ธ์˜ค๊ธฐ
        item = await queue.get()
        
        # ํ•ญ๋ชฉ ์ฒ˜๋ฆฌ
        print(f"์†Œ๋น„์ž {id} ์ฒ˜๋ฆฌ ์ค‘: {item}")
        await asyncio.sleep(random.uniform(0.2, 0.6))
        
        # ์ž‘์—… ์™„๋ฃŒ ํ‘œ์‹œ
        queue.task_done()

async def main():
    # ํ ์ƒ์„ฑ
    queue = asyncio.Queue()
    
    # ์ƒ์‚ฐ์ž ๋ฐ ์†Œ๋น„์ž ํƒœ์Šคํฌ ์ƒ์„ฑ
    producers = [producer(queue, i) for i in range(3)]
    consumers = [asyncio.create_task(consumer(queue, i)) for i in range(2)]
    
    # ์ƒ์‚ฐ์ž ์‹คํ–‰ ๋ฐ ๋Œ€๊ธฐ
    await asyncio.gather(*producers)
    
    # ๋ชจ๋“  ํ•ญ๋ชฉ์ด ์ฒ˜๋ฆฌ๋  ๋•Œ๊นŒ์ง€ ๋Œ€๊ธฐ
    await queue.join()
    
    # ์†Œ๋น„์ž ํƒœ์Šคํฌ ์ทจ์†Œ
    for c in consumers:
        c.cancel()

# ์‹คํ–‰
asyncio.run(main())

โœ… ์ฃผ์š” ํŒ:

  • I/O ๋ฐ”์šด๋“œ ์ž‘์—…์— ์ ํ•ฉ
  • ๋™๊ธฐ ์ฝ”๋“œ์™€ ํ˜ผํ•ฉ ์‹œ ์ฃผ์˜
  • ์ด๋ฒคํŠธ ๋ฃจํ”„ ๋ธ”๋กœํ‚น ํ”ผํ•˜๊ธฐ
  • ์ ์ ˆํ•œ ์—๋Ÿฌ ์ฒ˜๋ฆฌ ๊ตฌํ˜„
  • ๋น„๋™๊ธฐ ์ปจํ…์ŠคํŠธ ๋งค๋‹ˆ์ € ํ™œ์šฉ
  • ํƒœ์Šคํฌ ์ทจ์†Œ ์ฒ˜๋ฆฌ ๊ตฌํ˜„
  • ๋””๋ฒ„๊น… ๋„๊ตฌ ํ™œ์šฉ
  • ํšจ์œจ์ ์ธ ํƒœ์Šคํฌ ๊ทธ๋ฃนํ™”
  • ์ฝ”๋ฃจํ‹ด ์ฒด์ธ ๊ตฌ์„ฑ ์‹œ ์˜ˆ์™ธ ์ „ํŒŒ ๊ณ ๋ ค
  • ๋ฐฑ๊ทธ๋ผ์šด๋“œ ํƒœ์Šคํฌ ๊ด€๋ฆฌ


7๏ธโƒฃ ๋™์‹œ์„ฑ ์ œ์–ด ๋„๊ตฌ

AsyncIO๋Š” ๋‹ค์–‘ํ•œ ๋™์‹œ์„ฑ ์ œ์–ด ๋„๊ตฌ๋ฅผ ์ œ๊ณตํ•˜์—ฌ ๋ณต์žกํ•œ ๋น„๋™๊ธฐ ์ž‘์—…์„ ๊ด€๋ฆฌํ•œ๋‹ค.

์„ธ๋งˆํฌ์–ด:

import asyncio

async def worker(semaphore, id):
    async with semaphore:
        # ์„ธ๋งˆํฌ์–ด๋กœ ์ œํ•œ๋œ ๊ตฌ์—ญ
        print(f"์ž‘์—…์ž {id} ์‹œ์ž‘")
        await asyncio.sleep(1)
        print(f"์ž‘์—…์ž {id} ์ข…๋ฃŒ")

async def main():
    # ์ตœ๋Œ€ 3๊ฐœ์˜ ๋™์‹œ ํƒœ์Šคํฌ๋งŒ ํ—ˆ์šฉ
    semaphore = asyncio.Semaphore(3)
    
    # 10๊ฐœ์˜ ์ž‘์—… ๋™์‹œ ์‹คํ–‰ (์ตœ๋Œ€ 3๊ฐœ์”ฉ ์ฒ˜๋ฆฌ)
    tasks = [worker(semaphore, i) for i in range(10)]
    await asyncio.gather(*tasks)

# ์‹คํ–‰
asyncio.run(main())

๋ฝ(Lock):

import asyncio

async def protected_resource(lock, id):
    # ๋ฝ ํš๋“
    async with lock:
        print(f"๋ฆฌ์†Œ์Šค ์‚ฌ์šฉ ์ค‘ (์ž‘์—…์ž {id})")
        await asyncio.sleep(1)
        print(f"๋ฆฌ์†Œ์Šค ์‚ฌ์šฉ ์™„๋ฃŒ (์ž‘์—…์ž {id})")

async def main():
    # ๋ฝ ์ƒ์„ฑ
    lock = asyncio.Lock()
    
    # ์—ฌ๋Ÿฌ ํƒœ์Šคํฌ๊ฐ€ ๋™์ผํ•œ ๋ฝ์„ ๊ณต์œ 
    tasks = [protected_resource(lock, i) for i in range(5)]
    await asyncio.gather(*tasks)

# ์‹คํ–‰
asyncio.run(main())

์ด๋ฒคํŠธ:

import asyncio

async def waiter(event, id):
    print(f"๋Œ€๊ธฐ์ž {id}: ์ด๋ฒคํŠธ ๋Œ€๊ธฐ ์ค‘")
    await event.wait()
    print(f"๋Œ€๊ธฐ์ž {id}: ์ด๋ฒคํŠธ ๋ฐœ์ƒ! ์ž‘์—… ์ง„ํ–‰")

async def setter(event):
    print("์ค€๋น„ ์ค‘...")
    await asyncio.sleep(2)
    print("์ด๋ฒคํŠธ ์„ค์ •!")
    
    # ๋ชจ๋“  ๋Œ€๊ธฐ ์ค‘์ธ ํƒœ์Šคํฌ์— ์‹ ํ˜ธ ๋ณด๋‚ด๊ธฐ
    event.set()

async def main():
    # ์ด๋ฒคํŠธ ์ƒ์„ฑ
    event = asyncio.Event()
    
    # ๋Œ€๊ธฐ ํƒœ์Šคํฌ์™€ ์„ค์ • ํƒœ์Šคํฌ ์ƒ์„ฑ
    waiters = [asyncio.create_task(waiter(event, i)) for i in range(5)]
    setter_task = asyncio.create_task(setter(event))
    
    # ๋ชจ๋“  ํƒœ์Šคํฌ ์™„๋ฃŒ ๋Œ€๊ธฐ
    await asyncio.gather(setter_task, *waiters)

# ์‹คํ–‰
asyncio.run(main())

์กฐ๊ฑด(Condition):

import asyncio
import random

async def consumer(condition, id, buffer):
    while True:
        # ์กฐ๊ฑด ํš๋“ ๋ฐ ๋Œ€๊ธฐ
        async with condition:
            # ๋ฒ„ํผ๊ฐ€ ๋น„์–ด์žˆ์œผ๋ฉด ๋Œ€๊ธฐ
            while len(buffer) == 0:
                print(f"์†Œ๋น„์ž {id}: ๋ฒ„ํผ๊ฐ€ ๋น„์–ด์žˆ์–ด ๋Œ€๊ธฐ ์ค‘")
                await condition.wait()
            
            # ํ•ญ๋ชฉ ์†Œ๋น„
            item = buffer.pop(0)
            print(f"์†Œ๋น„์ž {id}: {item} ์ฒ˜๋ฆฌ")
            
            # ์ƒ์‚ฐ์ž์—๊ฒŒ ์‹ ํ˜ธ
            condition.notify()
        
        await asyncio.sleep(random.uniform(0.1, 0.3))

async def producer(condition, id, buffer, max_size):
    for i in range(5):
        item = f"ํ•ญ๋ชฉ {i} (์ƒ์‚ฐ์ž {id})"
        
        # ์กฐ๊ฑด ํš๋“
        async with condition:
            # ๋ฒ„ํผ๊ฐ€ ๊ฐ€๋“ ์ฐผ์œผ๋ฉด ๋Œ€๊ธฐ
            while len(buffer) >= max_size:
                print(f"์ƒ์‚ฐ์ž {id}: ๋ฒ„ํผ๊ฐ€ ๊ฐ€๋“ ์ฐจ์„œ ๋Œ€๊ธฐ ์ค‘")
                await condition.wait()
            
            # ํ•ญ๋ชฉ ์ƒ์‚ฐ
            buffer.append(item)
            print(f"์ƒ์‚ฐ์ž {id}: {item} ์ƒ์„ฑ")
            
            # ์†Œ๋น„์ž์—๊ฒŒ ์‹ ํ˜ธ
            condition.notify()
        
        await asyncio.sleep(random.uniform(0.1, 0.5))

async def main():
    # ๊ณต์œ  ๋ฒ„ํผ์™€ ์กฐ๊ฑด ๋ณ€์ˆ˜
    buffer = []
    max_size = 2
    condition = asyncio.Condition()
    
    # ์ƒ์‚ฐ์ž ๋ฐ ์†Œ๋น„์ž ํƒœ์Šคํฌ
    producers = [producer(condition, i, buffer, max_size) for i in range(2)]
    consumers = [asyncio.create_task(consumer(condition, i, buffer)) for i in range(2)]
    
    # ์ƒ์‚ฐ์ž ์™„๋ฃŒ ๋Œ€๊ธฐ
    await asyncio.gather(*producers)
    
    # ์†Œ๋น„์ž ํƒœ์Šคํฌ ์ทจ์†Œ
    for c in consumers:
        c.cancel()
    
    # ์ทจ์†Œ๋œ ํƒœ์Šคํฌ ์˜ˆ์™ธ ์ฒ˜๋ฆฌ
    await asyncio.gather(*consumers, return_exceptions=True)

# ์‹คํ–‰
asyncio.run(main())

โœ… ํŠน์ง•:

  • ๋ฉ€ํ‹ฐํƒœ์Šคํ‚น ์กฐ์ •
  • ๋ฆฌ์†Œ์Šค ๊ฒฝ์Ÿ ๊ด€๋ฆฌ
  • ํƒœ์Šคํฌ ๊ฐ„ ๋™๊ธฐํ™”
  • ๋น„๋™๊ธฐ ์ž‘์—… ํ๋ฆ„ ์ œ์–ด
  • ํšจ์œจ์ ์ธ ๋ณ‘๋ ฌ ์ฒ˜๋ฆฌ


8๏ธโƒฃ ๋น„๋™๊ธฐ ์ŠคํŠธ๋ฆผ ์ฒ˜๋ฆฌ

๋Œ€์šฉ๋Ÿ‰ ๋ฐ์ดํ„ฐ๋ฅผ ๋ฉ”๋ชจ๋ฆฌ ํšจ์œจ์ ์œผ๋กœ ์ฒ˜๋ฆฌํ•˜๊ธฐ ์œ„ํ•œ ๋น„๋™๊ธฐ ์ŠคํŠธ๋ฆผ ๊ธฐ๋ฒ•์ด๋‹ค.

import asyncio

async def number_generator(limit):
    # ์ŠคํŠธ๋ฆผ ์†Œ์Šค
    for i in range(limit):
        await asyncio.sleep(0.1)  # ๋ฐ์ดํ„ฐ ์ƒ์„ฑ ์‹œ๊ฐ„ ํ‰๋‚ด
        yield i

async def process_numbers():
    # ๋น„๋™๊ธฐ ์ƒ์„ฑ๊ธฐ์—์„œ ๋ฐ์ดํ„ฐ ์ŠคํŠธ๋ฆฌ๋ฐ
    async for number in number_generator(10):
        result = number * number
        print(f"์ฒ˜๋ฆฌ: {number} -> {result}")

# ์‹คํ–‰
asyncio.run(process_numbers())

์ŠคํŠธ๋ฆผ ํŒŒ์ดํ”„๋ผ์ธ:

import asyncio

async def data_source():
    # ๋ฐ์ดํ„ฐ ์†Œ์Šค (API, ํŒŒ์ผ ๋“ฑ)
    for i in range(10):
        await asyncio.sleep(0.2)
        yield i

async def filter_even(stream):
    # ์ง์ˆ˜๋งŒ ํ•„ํ„ฐ๋ง
    async for item in stream:
        if item % 2 == 0:
            yield item

async def multiply(stream, factor=2):
    # ๊ฐ ํ•ญ๋ชฉ์— factor ๊ณฑํ•˜๊ธฐ
    async for item in stream:
        yield item * factor

async def process_pipeline():
    # ํŒŒ์ดํ”„๋ผ์ธ ๊ตฌ์„ฑ
    source = data_source()
    filtered = filter_even(source)
    multiplied = multiply(filtered, 3)
    
    # ๊ฒฐ๊ณผ ์†Œ๋น„
    async for result in multiplied:
        print(f"๊ฒฐ๊ณผ: {result}")

# ์‹คํ–‰
asyncio.run(process_pipeline())

โœ… ํŠน์ง•:

  • ๋ฉ”๋ชจ๋ฆฌ ํšจ์œจ์ ์ธ ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ
  • ๋Œ€์šฉ๋Ÿ‰ ๋ฐ์ดํ„ฐ์…‹ ์ฒ˜๋ฆฌ์— ์ ํ•ฉ
  • ๋น„๋™๊ธฐ ํŒŒ์ดํ”„๋ผ์ธ ๊ตฌ์„ฑ
  • ๋ฐฑํ”„๋ ˆ์…” ๊ด€๋ฆฌ
  • ์ŠคํŠธ๋ฆผ ๋ณ€ํ™˜ ๋ฐ ํ•„ํ„ฐ๋ง


์ฃผ์š” ํŒ

โœ… AsyncIO ๋ชจ๋ฒ” ์‚ฌ๋ก€:

  • CPU ๋ฐ”์šด๋“œ ์ž‘์—… ๋ถ„๋ฆฌ: CPU ์ง‘์•ฝ์  ์ž‘์—…์€ concurrent.futures.ProcessPoolExecutor๋กœ ์ฒ˜๋ฆฌ
  • ํƒ€์ž„์•„์›ƒ ์„ค์ •: ๋ชจ๋“  ์™ธ๋ถ€ ํ˜ธ์ถœ์— ํƒ€์ž„์•„์›ƒ ์ ์šฉํ•˜์—ฌ ๋ฌดํ•œ ๋Œ€๊ธฐ ๋ฐฉ์ง€
  • ๋””๋ฒ„๊น… ํ™œ์„ฑํ™”: asyncio.run(main(), debug=True)๋กœ ๋””๋ฒ„๊น… ๊ฐ•ํ™”
  • ํƒœ์Šคํฌ ๊ทธ๋ฃนํ™”: ๊ด€๋ จ ํƒœ์Šคํฌ๋Š” asyncio.TaskGroup ๋˜๋Š” asyncio.gather๋กœ ๊ทธ๋ฃนํ™”
  • ์ ์ ˆํ•œ ์ทจ์†Œ ์ฒ˜๋ฆฌ: ํƒœ์Šคํฌ ์ทจ์†Œ ์‹œ try/except asyncio.CancelledError ํŒจํ„ด ์‚ฌ์šฉ
  • ํ”„๋กœ๋ฉ”ํ…Œ์šฐ์Šค ๋ฐฉ์ง€: ๋ฉ”์ธ ์ด๋ฒคํŠธ ๋ฃจํ”„๋ฅผ ๋ธ”๋กœํ‚นํ•˜๋Š” ์ฝ”๋“œ ํ”ผํ•˜๊ธฐ
  • ๋ฐฑ์˜คํ”„ ์žฌ์‹œ๋„: ์™ธ๋ถ€ ์„œ๋น„์Šค ํ˜ธ์ถœ ์‹œ ์ง€์ˆ˜ ๋ฐฑ์˜คํ”„ ์ ์šฉ
  • ์ปจํ…์ŠคํŠธ ๊ด€๋ฆฌ์ž ํ™œ์šฉ: ๋ฆฌ์†Œ์Šค ๊ด€๋ฆฌ์— async with ์‚ฌ์šฉ
  • ๊ตฌ์กฐํ™”๋œ ๋™์‹œ์„ฑ: ๋ถ€๋ชจ ํƒœ์Šคํฌ๊ฐ€ ์ž์‹ ํƒœ์Šคํฌ์˜ ์ƒ๋ช…์ฃผ๊ธฐ ๊ด€๋ฆฌ
  • ๋น„๋™๊ธฐ ์ž‘์—… ์ทจ์†Œ: ๋” ์ด์ƒ ํ•„์š” ์—†๋Š” ํƒœ์Šคํฌ๋Š” ๋ช…์‹œ์ ์œผ๋กœ ์ทจ์†Œ
  • ํšจ์œจ์ ์ธ I/O ์ฒ˜๋ฆฌ: ํŒŒ์ผ I/O๋Š” aiofiles ์‚ฌ์šฉ, ๋„คํŠธ์›Œํฌ๋Š” aiohttp ์‚ฌ์šฉ
  • ์˜ˆ์™ธ ์ฒ˜๋ฆฌ: ๋ชจ๋“  ๋น„๋™๊ธฐ ์ฝ”๋“œ์— ์ ์ ˆํ•œ ์˜ˆ์™ธ ์ฒ˜๋ฆฌ ๊ตฌํ˜„
  • ์ฝ”๋“œ ๊ฐ€๋…์„ฑ ์œ ์ง€: ๋ณต์žกํ•œ ๋น„๋™๊ธฐ ๋กœ์ง์€ ํ•จ์ˆ˜๋กœ ๋ถ„๋ฆฌ
  • ์ด๋ฒคํŠธ ๋ฃจํ”„ ์ปค์Šคํ„ฐ๋งˆ์ด์ง•: ํ•„์š”์‹œ ์ด๋ฒคํŠธ ๋ฃจํ”„ ์ •์ฑ… ์ปค์Šคํ„ฐ๋งˆ์ด์ง•
  • ๋กœ๊น… ํ†ตํ•ฉ: ๋น„๋™๊ธฐ ์ฝ”๋“œ์—๋„ ์ผ๊ด€๋œ ๋กœ๊น… ์ „๋žต ์ ์šฉ


โš ๏ธ **GitHub.com Fallback** โš ๏ธ