KR_Threading - somaz94/python-study GitHub Wiki

Python ์Šค๋ ˆ๋”ฉ ๊ฐœ๋… ์ •๋ฆฌ


1๏ธโƒฃ ์Šค๋ ˆ๋“œ ๊ธฐ์ดˆ

์Šค๋ ˆ๋“œ๋Š” ํ”„๋กœ๊ทธ๋žจ ๋‚ด์—์„œ ๋™์‹œ์— ์‹คํ–‰๋˜๋Š” ์ž‘์€ ์‹คํ–‰ ๋‹จ์œ„๋กœ, ๊ฐ™์€ ํ”„๋กœ์„ธ์Šค ๋‚ด์—์„œ ๋ฉ”๋ชจ๋ฆฌ๋ฅผ ๊ณต์œ ํ•ฉ๋‹ˆ๋‹ค.

import threading
import time
from typing import List, Dict, Any, Optional

def worker(name: str, sleep_time: float = 2.0) -> None:
    """
    ๊ธฐ๋ณธ ์ž‘์—…์ž ์Šค๋ ˆ๋“œ ํ•จ์ˆ˜
    
    Args:
        name: ์ž‘์—… ์‹๋ณ„ ์ด๋ฆ„
        sleep_time: ๋Œ€๊ธฐ ์‹œ๊ฐ„(์ดˆ)
    """
    print(f"์Šค๋ ˆ๋“œ {name} ์‹œ์ž‘")
    time.sleep(sleep_time)
    print(f"์Šค๋ ˆ๋“œ {name} ์ข…๋ฃŒ")

# ์Šค๋ ˆ๋“œ ์ƒ์„ฑ ๋ฐ ์‹œ์ž‘
thread = threading.Thread(target=worker, args=("Worker-1",))
thread.daemon = True  # ๋ฉ”์ธ ์Šค๋ ˆ๋“œ ์ข…๋ฃŒ์‹œ ํ•จ๊ป˜ ์ข…๋ฃŒ
thread.start()

# ์Šค๋ ˆ๋“œ ์ •๋ณด ํ™•์ธ
print(f"์Šค๋ ˆ๋“œ ID: {thread.ident}")
print(f"์Šค๋ ˆ๋“œ ์ด๋ฆ„: {thread.name}")
print(f"์Šค๋ ˆ๋“œ ํ™œ์„ฑ ์ƒํƒœ: {thread.is_alive()}")

thread.join()  # ์Šค๋ ˆ๋“œ๊ฐ€ ์ข…๋ฃŒ๋  ๋•Œ๊นŒ์ง€ ๋Œ€๊ธฐ

โœ… ํŠน์ง•:

  • ์Šค๋ ˆ๋“œ ์ƒ์„ฑ๊ณผ ์‹คํ–‰
  • ๋ฐ๋ชฌ ์Šค๋ ˆ๋“œ ์„ค์ • ๊ฐ€๋Šฅ
  • ์Šค๋ ˆ๋“œ ID์™€ ์ƒํƒœ ํ™•์ธ
  • join()์œผ๋กœ ์Šค๋ ˆ๋“œ ์™„๋ฃŒ ๋Œ€๊ธฐ
  • ํƒ€์ž… ํžŒํŒ…์œผ๋กœ ์ฝ”๋“œ ๊ฐ€๋…์„ฑ ํ–ฅ์ƒ


2๏ธโƒฃ ๋‹ค์ค‘ ์Šค๋ ˆ๋“œ

์—ฌ๋Ÿฌ ์Šค๋ ˆ๋“œ๋ฅผ ๋™์‹œ์— ์‹คํ–‰ํ•˜์—ฌ ๋ณ‘๋ ฌ ์ž‘์—…์„ ์ฒ˜๋ฆฌํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

import threading
import time
from typing import List, Iterable, Any

def count_numbers(name: str, numbers: Iterable[int]) -> None:
    """
    ์ฃผ์–ด์ง„ ์ˆซ์ž ๋ชฉ๋ก์„ ์ˆœํšŒํ•˜๋ฉด์„œ ์ถœ๋ ฅํ•˜๋Š” ์Šค๋ ˆ๋“œ ํ•จ์ˆ˜
    
    Args:
        name: ์Šค๋ ˆ๋“œ ์ด๋ฆ„
        numbers: ์ฒ˜๋ฆฌํ•  ์ˆซ์ž ๋ชฉ๋ก
    """
    for i in numbers:
        print(f"{name}: {i}")
        time.sleep(0.1)

class WorkerThread(threading.Thread):
    """
    Thread ํด๋ž˜์Šค๋ฅผ ์ƒ์†๋ฐ›๋Š” ์‚ฌ์šฉ์ž ์ •์˜ ์Šค๋ ˆ๋“œ ํด๋ž˜์Šค
    """
    def __init__(self, name: str, numbers: Iterable[int]) -> None:
        """
        ์ƒ์„ฑ์ž
        
        Args:
            name: ์Šค๋ ˆ๋“œ ์ด๋ฆ„
            numbers: ์ฒ˜๋ฆฌํ•  ์ˆซ์ž ๋ชฉ๋ก
        """
        super().__init__(name=name)
        self.numbers = numbers
        self.result: List[int] = []
        
    def run(self) -> None:
        """์Šค๋ ˆ๋“œ ์‹คํ–‰ ์‹œ ํ˜ธ์ถœ๋˜๋Š” ๋ฉ”์„œ๋“œ"""
        print(f"{self.name} ์‹œ์ž‘")
        for num in self.numbers:
            self.result.append(num * 2)  # ๊ฐ ์ˆซ์ž๋ฅผ 2๋ฐฐ๋กœ ์ฒ˜๋ฆฌ
            time.sleep(0.1)
        print(f"{self.name} ์ข…๋ฃŒ")
        
    def get_result(self) -> List[int]:
        """์ฒ˜๋ฆฌ ๊ฒฐ๊ณผ ๋ฐ˜ํ™˜"""
        return self.result

# ํ•จ์ˆ˜ ๊ธฐ๋ฐ˜ ๋‹ค์ค‘ ์Šค๋ ˆ๋“œ ์‹คํ–‰
threads: List[threading.Thread] = []
for i in range(3):
    thread = threading.Thread(
        target=count_numbers,
        args=(f"Thread-{i}", range(i*3, (i+1)*3))
    )
    threads.append(thread)
    thread.start()

# ํด๋ž˜์Šค ๊ธฐ๋ฐ˜ ๋‹ค์ค‘ ์Šค๋ ˆ๋“œ ์‹คํ–‰
worker_threads: List[WorkerThread] = []
for i in range(3):
    worker = WorkerThread(f"WorkerThread-{i}", range(i*3, (i+1)*3))
    worker_threads.append(worker)
    worker.start()

# ๋ชจ๋“  ์Šค๋ ˆ๋“œ ์ข…๋ฃŒ ๋Œ€๊ธฐ
for thread in threads:
    thread.join()
    
# ์ž‘์—… ๊ฒฐ๊ณผ ์ˆ˜์ง‘
results: List[List[int]] = []
for worker in worker_threads:
    worker.join()
    results.append(worker.get_result())
    
print(f"์ฒ˜๋ฆฌ ๊ฒฐ๊ณผ: {results}")

โœ… ํŠน์ง•:

  • ํ•จ์ˆ˜ ๊ธฐ๋ฐ˜ ๋ฐ ํด๋ž˜์Šค ๊ธฐ๋ฐ˜ ์Šค๋ ˆ๋“œ ๊ตฌํ˜„
  • Thread ํด๋ž˜์Šค ์ƒ์†์„ ํ†ตํ•œ ํ™•์žฅ
  • ๋‹ค์ค‘ ์Šค๋ ˆ๋“œ ์ƒ์„ฑ ๋ฐ ๊ด€๋ฆฌ
  • ๋งค๊ฐœ๋ณ€์ˆ˜ ์ „๋‹ฌ๊ณผ ๊ฒฐ๊ณผ ์ˆ˜์ง‘
  • ์Šค๋ ˆ๋“œ ๊ฐ„ ๋…๋ฆฝ์  ์‹คํ–‰


3๏ธโƒฃ ์Šค๋ ˆ๋“œ ๋™๊ธฐํ™”

์—ฌ๋Ÿฌ ์Šค๋ ˆ๋“œ๊ฐ€ ๊ณต์œ  ์ž์›์— ์ ‘๊ทผํ•  ๋•Œ ๋ฐ์ดํ„ฐ ์ผ๊ด€์„ฑ์„ ์œ ์ง€ํ•˜๊ธฐ ์œ„ํ•œ ๋™๊ธฐํ™” ๋ฉ”์ปค๋‹ˆ์ฆ˜์ž…๋‹ˆ๋‹ค.

import threading
import time
from typing import List, Dict

class Counter:
    """์Šค๋ ˆ๋“œ ์•ˆ์ „ํ•œ ์นด์šดํ„ฐ ํด๋ž˜์Šค"""
    
    def __init__(self) -> None:
        self.value = 0
        self.lock = threading.Lock()
        
    def increment(self) -> None:
        """๋ฝ์„ ์‚ฌ์šฉํ•œ ์•ˆ์ „ํ•œ ์ฆ๊ฐ€ ์—ฐ์‚ฐ"""
        with self.lock:
            current = self.value
            time.sleep(0.1)  # ๊ฒฝ์Ÿ ์กฐ๊ฑด ์‹œ๋ฎฌ๋ ˆ์ด์…˜
            self.value = current + 1
            
    def safe_update(self, func) -> None:
        """์‚ฌ์šฉ์ž ์ •์˜ ์—…๋ฐ์ดํŠธ ํ•จ์ˆ˜๋ฅผ ๋ฝ์œผ๋กœ ๋ณดํ˜ธ"""
        with self.lock:
            func(self)

# RLock ์‚ฌ์šฉ ์˜ˆ์ œ (์žฌ์ง„์ž… ๊ฐ€๋Šฅ ๋ฝ)
class RecursiveCounter:
    """์žฌ์ง„์ž… ๋ฝ์„ ์‚ฌ์šฉํ•˜๋Š” ์นด์šดํ„ฐ"""
    
    def __init__(self) -> None:
        self.value = 0
        self.lock = threading.RLock()  # ์žฌ์ง„์ž… ๊ฐ€๋Šฅํ•œ ๋ฝ
        
    def increment(self, n: int = 1) -> None:
        """๊ฐ’ ์ฆ๊ฐ€"""
        with self.lock:
            self.value += n
            
    def increment_by_10(self) -> None:
        """10์”ฉ ์ฆ๊ฐ€ (๋‚ด๋ถ€์ ์œผ๋กœ increment ํ˜ธ์ถœ)"""
        with self.lock:  # ๊ฐ™์€ ์Šค๋ ˆ๋“œ์—์„œ ์ด๋ฏธ ํš๋“ํ•œ ๋ฝ ์žฌ์ง„์ž…
            for _ in range(10):
                self.increment()  # ๋ฝ์„ ์ด๋ฏธ ๋ณด์œ ํ•œ ์ƒํƒœ์—์„œ ํ˜ธ์ถœ

# ๊ธฐ๋ณธ ๋ฝ ์‚ฌ์šฉ ์˜ˆ์ œ
counter = Counter()

def increment_task() -> None:
    """์นด์šดํ„ฐ ์ฆ๊ฐ€ ์ž‘์—…"""
    for _ in range(3):
        counter.increment()

threads: List[threading.Thread] = [
    threading.Thread(target=increment_task) for _ in range(5)
]

for thread in threads:
    thread.start()
    
for thread in threads:
    thread.join()
    
print(f"์ตœ์ข… ์นด์šดํ„ฐ ๊ฐ’: {counter.value}")  # ์˜ˆ์ƒ ๊ฒฐ๊ณผ: 15

# ์žฌ์ง„์ž… ๋ฝ ์‚ฌ์šฉ ์˜ˆ์ œ
recursive_counter = RecursiveCounter()
recursive_counter.increment_by_10()
print(f"์žฌ์ง„์ž… ๋ฝ ์นด์šดํ„ฐ ๊ฐ’: {recursive_counter.value}")  # ์˜ˆ์ƒ ๊ฒฐ๊ณผ: 10

โœ… ํŠน์ง•:

  • Lock์„ ์‚ฌ์šฉํ•œ ์ƒํ˜ธ ๋ฐฐ์ œ
  • RLock์„ ํ†ตํ•œ ์žฌ์ง„์ž… ์ง€์›
  • ์ปจํ…์ŠคํŠธ ๊ด€๋ฆฌ์ž(with ๋ฌธ) ํ™œ์šฉ
  • ์Šค๋ ˆ๋“œ ์•ˆ์ „ํ•œ ํด๋ž˜์Šค ๊ตฌํ˜„
  • ๊ฒฝ์Ÿ ์กฐ๊ฑด(Race Condition) ๋ฐฉ์ง€


4๏ธโƒฃ ์กฐ๊ฑด ๋ณ€์ˆ˜์™€ ์ด๋ฒคํŠธ

์Šค๋ ˆ๋“œ ๊ฐ„ ํ†ต์‹ ๊ณผ ์ƒํƒœ ์•Œ๋ฆผ์„ ์œ„ํ•œ ๋ฉ”์ปค๋‹ˆ์ฆ˜์ž…๋‹ˆ๋‹ค.

import threading
import time
import queue
from typing import List, Dict, Any, Optional, Deque
from collections import deque

# ์กฐ๊ฑด ๋ณ€์ˆ˜๋ฅผ ์‚ฌ์šฉํ•œ ์ƒ์‚ฐ์ž-์†Œ๋น„์ž ํŒจํ„ด
class ThreadSafeQueue:
    """์กฐ๊ฑด ๋ณ€์ˆ˜๋ฅผ ์‚ฌ์šฉํ•œ ์Šค๋ ˆ๋“œ ์•ˆ์ „ ํ"""
    
    def __init__(self, max_size: int = 5) -> None:
        self.queue: Deque[Any] = deque()
        self.max_size = max_size
        self.condition = threading.Condition()
        
    def put(self, item: Any) -> None:
        """์•„์ดํ…œ ์ถ”๊ฐ€"""
        with self.condition:
            # ํ๊ฐ€ ๊ฐ€๋“ ์ฐผ์œผ๋ฉด ๋Œ€๊ธฐ
            while len(self.queue) >= self.max_size:
                print(f"ํ๊ฐ€ ๊ฐ€๋“ ์ฐธ (ํฌ๊ธฐ: {len(self.queue)}), ์ƒ์‚ฐ์ž ๋Œ€๊ธฐ ์ค‘...")
                self.condition.wait()
                
            self.queue.append(item)
            print(f"์•„์ดํ…œ ์ถ”๊ฐ€: {item}, ํ ํฌ๊ธฐ: {len(self.queue)}")
            
            # ์†Œ๋น„์ž์—๊ฒŒ ์•Œ๋ฆผ
            self.condition.notify()
    
    def get(self) -> Any:
        """์•„์ดํ…œ ๊ฐ€์ ธ์˜ค๊ธฐ"""
        with self.condition:
            # ํ๊ฐ€ ๋น„์—ˆ์œผ๋ฉด ๋Œ€๊ธฐ
            while len(self.queue) == 0:
                print("ํ๊ฐ€ ๋น„์—ˆ์Œ, ์†Œ๋น„์ž ๋Œ€๊ธฐ ์ค‘...")
                self.condition.wait()
                
            item = self.queue.popleft()
            print(f"์•„์ดํ…œ ๊ฐ€์ ธ์˜ด: {item}, ํ ํฌ๊ธฐ: {len(self.queue)}")
            
            # ์ƒ์‚ฐ์ž์—๊ฒŒ ์•Œ๋ฆผ
            self.condition.notify()
            return item

# ์ด๋ฒคํŠธ๋ฅผ ์‚ฌ์šฉํ•œ ์Šค๋ ˆ๋“œ ๋™๊ธฐํ™”
def wait_for_event(event: threading.Event, name: str) -> None:
    """์ด๋ฒคํŠธ ๋Œ€๊ธฐ ํ•จ์ˆ˜"""
    print(f"{name}: ์ด๋ฒคํŠธ ๋Œ€๊ธฐ ์ค‘...")
    event.wait()  # ์ด๋ฒคํŠธ๊ฐ€ ์„ค์ •๋  ๋•Œ๊นŒ์ง€ ๋Œ€๊ธฐ
    print(f"{name}: ์ด๋ฒคํŠธ ๋ฐœ์ƒ! ์ž‘์—… ์‹คํ–‰")

# ์กฐ๊ฑด ๋ณ€์ˆ˜ ์‚ฌ์šฉ ์˜ˆ์ œ
queue = ThreadSafeQueue(max_size=3)

def producer() -> None:
    """์ƒ์‚ฐ์ž ์Šค๋ ˆ๋“œ"""
    for i in range(10):
        queue.put(f"์•„์ดํ…œ-{i}")
        time.sleep(0.5)

def consumer() -> None:
    """์†Œ๋น„์ž ์Šค๋ ˆ๋“œ"""
    for _ in range(10):
        item = queue.get()
        # ์•„์ดํ…œ ์ฒ˜๋ฆฌ ์‹œ๋ฎฌ๋ ˆ์ด์…˜
        time.sleep(1)

# ์ƒ์‚ฐ์ž-์†Œ๋น„์ž ์Šค๋ ˆ๋“œ ์‹œ์ž‘
producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)

producer_thread.start()
consumer_thread.start()

# ์ด๋ฒคํŠธ ์‚ฌ์šฉ ์˜ˆ์ œ
shutdown_event = threading.Event()

event_threads = [
    threading.Thread(target=wait_for_event, args=(shutdown_event, f"Worker-{i}"))
    for i in range(3)
]

for t in event_threads:
    t.start()

print("๋ฉ”์ธ: 3์ดˆ ํ›„ ์ด๋ฒคํŠธ ์„ค์ •...")
time.sleep(3)
shutdown_event.set()  # ๋ชจ๋“  ๋Œ€๊ธฐ ์ค‘์ธ ์Šค๋ ˆ๋“œ์—๊ฒŒ ์‹ ํ˜ธ ์ „์†ก

# ๋ชจ๋“  ์Šค๋ ˆ๋“œ ์ข…๋ฃŒ ๋Œ€๊ธฐ
producer_thread.join()
consumer_thread.join()
for t in event_threads:
    t.join()

โœ… ํŠน์ง•:

  • ์กฐ๊ฑด ๋ณ€์ˆ˜(Condition)๋ฅผ ํ†ตํ•œ ๋Œ€๊ธฐ์™€ ํ†ต์ง€
  • ์ด๋ฒคํŠธ(Event)๋ฅผ ์‚ฌ์šฉํ•œ ์Šค๋ ˆ๋“œ ์‹ ํ˜ธ ์ „๋‹ฌ
  • ์ƒ์‚ฐ์ž-์†Œ๋น„์ž ํŒจํ„ด ๊ตฌํ˜„
  • ์Šค๋ ˆ๋“œ ๊ฐ„ ํšจ์œจ์ ์ธ ํ†ต์‹ 
  • ๋‚ด์žฅ collections.deque ํ™œ์šฉ


5๏ธโƒฃ ์„ธ๋งˆํฌ์–ด์™€ ๋ฐ”์šด๋””๋“œ ์„ธ๋งˆํฌ์–ด

๋ฆฌ์†Œ์Šค ์ ‘๊ทผ์„ ์ œํ•œํ•˜๊ณ  ๊ด€๋ฆฌํ•˜๊ธฐ ์œ„ํ•œ ๋™๊ธฐํ™” ๋ฉ”์ปค๋‹ˆ์ฆ˜์ž…๋‹ˆ๋‹ค.

import threading
import time
import random
from typing import List, Dict, Set, Any

# ์„ธ๋งˆํฌ์–ด๋กœ ๋™์‹œ ์ ‘๊ทผ ์ œํ•œ
class ConnectionPool:
    """์„ธ๋งˆํฌ์–ด๋ฅผ ์‚ฌ์šฉํ•œ ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค ์—ฐ๊ฒฐ ํ’€"""
    
    def __init__(self, max_connections: int = 3) -> None:
        self.semaphore = threading.Semaphore(max_connections)
        self.connections: Set[int] = set()
        self.lock = threading.Lock()
        self.connection_count = 0
        
    def acquire_connection(self) -> int:
        """์—ฐ๊ฒฐ ํš๋“"""
        with self.semaphore:
            # ์ž„๊ณ„ ์˜์—ญ ๋‚ด์—์„œ ์—ฐ๊ฒฐ ์ƒ์„ฑ
            with self.lock:
                self.connection_count += 1
                connection_id = self.connection_count
                self.connections.add(connection_id)
                print(f"์—ฐ๊ฒฐ ํš๋“: {connection_id}, ํ™œ์„ฑ ์—ฐ๊ฒฐ: {len(self.connections)}")
            
            return connection_id
            
    def release_connection(self, connection_id: int) -> None:
        """์—ฐ๊ฒฐ ๋ฐ˜ํ™˜"""
        with self.lock:
            if connection_id in self.connections:
                self.connections.remove(connection_id)
                print(f"์—ฐ๊ฒฐ ๋ฐ˜ํ™˜: {connection_id}, ํ™œ์„ฑ ์—ฐ๊ฒฐ: {len(self.connections)}")

# BoundedSemaphore ์‚ฌ์šฉ ์˜ˆ์ œ
class ResourceManager:
    """์ •ํ™•ํ•œ ๋ฆฌ์†Œ์Šค ๊ด€๋ฆฌ๋ฅผ ์œ„ํ•œ ๋ฐ”์šด๋””๋“œ ์„ธ๋งˆํฌ์–ด"""
    
    def __init__(self, resources: int = 5) -> None:
        # BoundedSemaphore๋Š” ์ตœ๋Œ€ ์นด์šดํŠธ๋ฅผ ์ดˆ๊ณผํ•˜์—ฌ releaseํ•˜๋ฉด ValueError ๋ฐœ์ƒ
        self.semaphore = threading.BoundedSemaphore(resources)
        self.resource_count = resources
        
    def use_resource(self, worker_id: int) -> None:
        """๋ฆฌ์†Œ์Šค ์‚ฌ์šฉ"""
        with self.semaphore:
            print(f"์›Œ์ปค {worker_id}: ๋ฆฌ์†Œ์Šค ํš๋“")
            # ๋ฆฌ์†Œ์Šค ์‚ฌ์šฉ ์‹œ๋ฎฌ๋ ˆ์ด์…˜
            time.sleep(random.uniform(0.5, 1.5))
            print(f"์›Œ์ปค {worker_id}: ๋ฆฌ์†Œ์Šค ๋ฐ˜ํ™˜")

# ์„ธ๋งˆํฌ์–ด ์‚ฌ์šฉ ์˜ˆ์ œ
pool = ConnectionPool(max_connections=3)

def worker(worker_id: int) -> None:
    """์—ฐ๊ฒฐ ํ’€์„ ์‚ฌ์šฉํ•˜๋Š” ์ž‘์—…์ž"""
    connection_id = pool.acquire_connection()
    try:
        # ์—ฐ๊ฒฐ ์‚ฌ์šฉ ์‹œ๋ฎฌ๋ ˆ์ด์…˜
        print(f"์›Œ์ปค {worker_id}: ์—ฐ๊ฒฐ {connection_id} ์‚ฌ์šฉ ์ค‘")
        time.sleep(random.uniform(1, 3))
    finally:
        # ํ•ญ์ƒ ์—ฐ๊ฒฐ ๋ฐ˜ํ™˜
        pool.release_connection(connection_id)
        print(f"์›Œ์ปค {worker_id}: ์ž‘์—… ์™„๋ฃŒ")

# ๋ฐ”์šด๋””๋“œ ์„ธ๋งˆํฌ์–ด ์‚ฌ์šฉ ์˜ˆ์ œ
resource_mgr = ResourceManager(resources=2)

def resource_worker(worker_id: int) -> None:
    """๋ฆฌ์†Œ์Šค ๊ด€๋ฆฌ์ž๋ฅผ ์‚ฌ์šฉํ•˜๋Š” ์ž‘์—…์ž"""
    for _ in range(3):
        resource_mgr.use_resource(worker_id)
        time.sleep(random.uniform(0.1, 0.5))

# ์Šค๋ ˆ๋“œ ์ƒ์„ฑ ๋ฐ ์‹คํ–‰
semaphore_threads = [threading.Thread(target=worker, args=(i,)) for i in range(5)]
resource_threads = [threading.Thread(target=resource_worker, args=(i,)) for i in range(3)]

for thread in semaphore_threads + resource_threads:
    thread.start()

for thread in semaphore_threads + resource_threads:
    thread.join()

โœ… ํŠน์ง•:

  • ์„ธ๋งˆํฌ์–ด๋ฅผ ํ†ตํ•œ ๋™์‹œ ์ ‘๊ทผ ์ œํ•œ
  • ๋ฐ”์šด๋””๋“œ ์„ธ๋งˆํฌ์–ด๋กœ ์ •ํ™•ํ•œ ๋ฆฌ์†Œ์Šค ์นด์šดํŒ…
  • ๋ฆฌ์†Œ์Šค ํ’€ ํŒจํ„ด ๊ตฌํ˜„
  • ๋™์‹œ์„ฑ ์ˆ˜์ค€ ์ œ์–ด
  • try/finally๋ฅผ ํ†ตํ•œ ๋ฆฌ์†Œ์Šค ์ •๋ฆฌ ๋ณด์žฅ


6๏ธโƒฃ ์Šค๋ ˆ๋“œ ํ’€ ๋ฐ Future

concurrent.futures ๋ชจ๋“ˆ์„ ์‚ฌ์šฉํ•œ ์Šค๋ ˆ๋“œ ํ’€ ๊ธฐ๋ฐ˜ ๋ณ‘๋ ฌ ์ฒ˜๋ฆฌ ๋ฐฉ์‹์ž…๋‹ˆ๋‹ค.

import concurrent.futures
import threading
import requests
import time
from typing import List, Dict, Any, Callable, TypeVar, Generic

T = TypeVar('T')  # ์ œ๋„ค๋ฆญ ํƒ€์ž… ๋ณ€์ˆ˜

def download_url(url: str) -> Dict[str, Any]:
    """URL์—์„œ ๋ฐ์ดํ„ฐ ๋‹ค์šด๋กœ๋“œ"""
    print(f"๋‹ค์šด๋กœ๋“œ ์‹œ์ž‘: {url} (์Šค๋ ˆ๋“œ: {threading.current_thread().name})")
    start_time = time.time()
    response = requests.get(url)
    data = response.text
    elapsed = time.time() - start_time
    return {
        'url': url,
        'status': response.status_code,
        'size': len(data),
        'time': elapsed,
        'thread': threading.current_thread().name
    }

def process_data(data: Dict[str, Any]) -> Dict[str, Any]:
    """๋‹ค์šด๋กœ๋“œํ•œ ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ"""
    print(f"๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ ์‹œ์ž‘: {data['url']} (์Šค๋ ˆ๋“œ: {threading.current_thread().name})")
    # ์ฒ˜๋ฆฌ ์‹œ๊ฐ„ ์‹œ๋ฎฌ๋ ˆ์ด์…˜
    time.sleep(0.5)
    data['processed'] = True
    return data

class ThreadPoolManager(Generic[T]):
    """ThreadPoolExecutor ๊ด€๋ฆฌ ํด๋ž˜์Šค"""
    
    def __init__(self, max_workers: int = None) -> None:
        self.max_workers = max_workers
        
    def map_tasks(self, func: Callable[..., T], tasks: List[Any]) -> List[T]:
        """์ž‘์—… ๋งคํ•‘ ๋ฐ ๋ณ‘๋ ฌ ์‹คํ–‰"""
        results: List[T] = []
        
        with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            # submit ๋ฉ”์„œ๋“œ๋กœ ์ž‘์—… ์ œ์ถœ ๋ฐ Future ๊ฐ์ฒด ์ˆ˜์ง‘
            future_to_task = {executor.submit(func, task): task for task in tasks}
            
            # as_completed๋กœ ์™„๋ฃŒ๋œ ์ž‘์—…๋ถ€ํ„ฐ ๊ฒฐ๊ณผ ์ˆ˜์ง‘
            for future in concurrent.futures.as_completed(future_to_task):
                try:
                    result = future.result()
                    results.append(result)
                except Exception as exc:
                    task = future_to_task[future]
                    print(f"์ž‘์—… {task} ์‹คํ–‰ ์ค‘ ์˜ค๋ฅ˜ ๋ฐœ์ƒ: {exc}")
        
        return results
    
    def process_tasks(self, tasks: List[Any], task_func: Callable[[Any], T]) -> List[T]:
        """์ž‘์—… ์ฒ˜๋ฆฌ with progress callback"""
        results: List[T] = []
        completed = 0
        total = len(tasks)
        
        with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            # submit ๋ฉ”์„œ๋“œ๋กœ ์ž‘์—… ์ œ์ถœ
            futures = [executor.submit(task_func, task) for task in tasks]
            
            # as_completed๋กœ ์™„๋ฃŒ๋œ ์ž‘์—…๋ถ€ํ„ฐ ๊ฒฐ๊ณผ ์ˆ˜์ง‘
            for future in concurrent.futures.as_completed(futures):
                try:
                    result = future.result()
                    results.append(result)
                    
                    # ์ง„ํ–‰ ์ƒํ™ฉ ์—…๋ฐ์ดํŠธ
                    completed += 1
                    print(f"์ง„ํ–‰ ์ƒํ™ฉ: {completed}/{total} ({completed/total*100:.1f}%)")
                    
                except Exception as exc:
                    print(f"์ž‘์—… ์‹คํ–‰ ์ค‘ ์˜ค๋ฅ˜ ๋ฐœ์ƒ: {exc}")
        
        return results
    
    def chain_tasks(self, initial_tasks: List[Any], 
                   first_func: Callable[[Any], Any], 
                   second_func: Callable[[Any], T]) -> List[T]:
        """์—ฐ์‡„ ์ž‘์—… ์ฒ˜๋ฆฌ (์ฒซ ์ž‘์—… ๊ฒฐ๊ณผ๋ฅผ ๋‘ ๋ฒˆ์งธ ์ž‘์—…์˜ ์ž…๋ ฅ์œผ๋กœ ์‚ฌ์šฉ)"""
        intermediate_results = []
        final_results: List[T] = []
        
        with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as first_executor:
            # ์ฒซ ๋ฒˆ์งธ ๋‹จ๊ณ„ ์ž‘์—… ์ œ์ถœ
            first_futures = [first_executor.submit(first_func, task) for task in initial_tasks]
            
            # ์ฒซ ๋ฒˆ์งธ ์ž‘์—… ๊ฒฐ๊ณผ ์ˆ˜์ง‘
            for future in concurrent.futures.as_completed(first_futures):
                try:
                    result = future.result()
                    intermediate_results.append(result)
                except Exception as exc:
                    print(f"์ฒซ ๋ฒˆ์งธ ์ž‘์—… ์‹คํ–‰ ์ค‘ ์˜ค๋ฅ˜ ๋ฐœ์ƒ: {exc}")
        
        with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as second_executor:
            # ๋‘ ๋ฒˆ์งธ ๋‹จ๊ณ„ ์ž‘์—… ์ œ์ถœ
            second_futures = [second_executor.submit(second_func, result) for result in intermediate_results]
            
            # ๋‘ ๋ฒˆ์งธ ์ž‘์—… ๊ฒฐ๊ณผ ์ˆ˜์ง‘
            for future in concurrent.futures.as_completed(second_futures):
                try:
                    result = future.result()
                    final_results.append(result)
                except Exception as exc:
                    print(f"๋‘ ๋ฒˆ์งธ ์ž‘์—… ์‹คํ–‰ ์ค‘ ์˜ค๋ฅ˜ ๋ฐœ์ƒ: {exc}")
        
        return final_results

# ThreadPoolExecutor ์‚ฌ์šฉ ์˜ˆ์ œ
urls = [
    "https://www.example.com",
    "https://www.google.com",
    "https://www.github.com",
    "https://www.python.org",
    "https://www.wikipedia.org"
]

# 1. ๊ธฐ๋ณธ ThreadPoolExecutor ์‚ฌ์šฉ
print("1. ๊ธฐ๋ณธ ThreadPoolExecutor ์‹คํ–‰")
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    # map ๋ฉ”์„œ๋“œ๋กœ ๊ฐ„๋‹จํ•˜๊ฒŒ ๋ณ‘๋ ฌ ์ฒ˜๋ฆฌ
    results = list(executor.map(download_url, urls))
    for result in results:
        print(f"๋‹ค์šด๋กœ๋“œ ์™„๋ฃŒ: {result['url']}, ํฌ๊ธฐ: {result['size']}, ์‹œ๊ฐ„: {result['time']:.2f}์ดˆ")

# 2. ThreadPoolManager ํด๋ž˜์Šค ์‚ฌ์šฉ
print("\n2. ThreadPoolManager ํด๋ž˜์Šค ์‹คํ–‰")
pool_manager = ThreadPoolManager[Dict[str, Any]](max_workers=3)

# ์—ฐ์‡„ ์ž‘์—… - ๋‹ค์šด๋กœ๋“œ ํ›„ ์ฒ˜๋ฆฌ
print("\n2.1 ์—ฐ์‡„ ์ž‘์—… ์‹คํ–‰")
final_results = pool_manager.chain_tasks(urls, download_url, process_data)
for result in final_results:
    print(f"์ฒ˜๋ฆฌ ์™„๋ฃŒ: {result['url']}, ์ฒ˜๋ฆฌ๋จ: {result.get('processed', False)}")

print("\n2.2 ๋‹จ์ผ ์ž‘์—… ์ง„ํ–‰๋ฅ  ์ถ”์ ")
download_results = pool_manager.process_tasks(urls, download_url)
print(f"๋ชจ๋“  ๋‹ค์šด๋กœ๋“œ ์™„๋ฃŒ. ์ด {len(download_results)}๊ฐœ URL ์ฒ˜๋ฆฌ๋จ.")

โœ… ํŠน์ง•:

  • ThreadPoolExecutor๋ฅผ ํ†ตํ•œ ์Šค๋ ˆ๋“œ ํ’€ ๊ด€๋ฆฌ
  • Future ๊ฐ์ฒด๋ฅผ ํ†ตํ•œ ๋น„๋™๊ธฐ ์ž‘์—… ๊ด€๋ฆฌ
  • ์ž‘์—… ์ง„ํ–‰ ์ƒํ™ฉ ์ถ”์  ๊ธฐ๋Šฅ
  • ๋‹ค์–‘ํ•œ ๋ณ‘๋ ฌ ์ฒ˜๋ฆฌ ํŒจํ„ด ์ง€์›
  • ์ œ๋„ค๋ฆญ ํƒ€์ž…์„ ํ™œ์šฉํ•œ ์œ ์—ฐํ•œ ์ธํ„ฐํŽ˜์ด์Šค
  • ์—ฐ์‡„ ์ž‘์—… ์ฒ˜๋ฆฌ ํŒจํ„ด
  • as_completed๋ฅผ ํ†ตํ•œ ์™„๋ฃŒ ์ˆœ์„œ๋Œ€๋กœ ๊ฒฐ๊ณผ ์ฒ˜๋ฆฌ
  • ์˜ˆ์™ธ ์ฒ˜๋ฆฌ ๋ฐ ์—๋Ÿฌ ๊ด€๋ฆฌ


์ฃผ์š” ํŒ

โœ… ๋ชจ๋ฒ” ์‚ฌ๋ก€:

  • GIL(Global Interpreter Lock) ์ดํ•ดํ•˜๊ธฐ
    • ํŒŒ์ด์ฌ์˜ GIL๋กœ ์ธํ•ด CPU ๋ฐ”์šด๋“œ ์ž‘์—…์€ ๋ฉ€ํ‹ฐํ”„๋กœ์„ธ์‹ฑ์ด ๋” ํšจ์œจ์ 
    • I/O ๋ฐ”์šด๋“œ ์ž‘์—…(๋„คํŠธ์›Œํฌ, ํŒŒ์ผ)์— ์Šค๋ ˆ๋”ฉ ํ™œ์šฉ์ด ์ ํ•ฉ
  • ์Šค๋ ˆ๋“œ ์ƒ๋ช…์ฃผ๊ธฐ ๊ด€๋ฆฌ
    • ๋ฐ๋ชฌ ์Šค๋ ˆ๋“œ๋ฅผ ์ ์ ˆํžˆ ํ™œ์šฉํ•˜์—ฌ ์ž์› ๋ˆ„์ˆ˜ ๋ฐฉ์ง€
    • join()์œผ๋กœ ๋ชจ๋“  ์Šค๋ ˆ๋“œ๊ฐ€ ์™„๋ฃŒ๋  ๋•Œ๊นŒ์ง€ ๋Œ€๊ธฐ
  • ๊ณต์œ  ์ž์› ๊ด€๋ฆฌ
    • ๊ฒฝ์Ÿ ์กฐ๊ฑด(Race Condition)์„ ๋ฐฉ์ง€ํ•˜๊ธฐ ์œ„ํ•ด Lock ์‚ฌ์šฉ
    • ๊ณต์œ  ์ž์› ์ ‘๊ทผ ์‹œ ํ•ญ์ƒ ๋™๊ธฐํ™” ๋ฉ”์ปค๋‹ˆ์ฆ˜ ์ ์šฉ
  • ๋ฐ๋“œ๋ฝ(Deadlock) ๋ฐฉ์ง€
    • ๋ฝ ํš๋“ ์ˆœ์„œ๋ฅผ ์ผ๊ด€๋˜๊ฒŒ ์œ ์ง€
    • ๊ฐ€๋Šฅํ•˜๋ฉด with ๋ฌธ์„ ์‚ฌ์šฉํ•˜์—ฌ ๋ฆฌ์†Œ์Šค ์ž๋™ ํ•ด์ œ
    • ํƒ€์ž„์•„์›ƒ ์„ค์ •์œผ๋กœ ์˜์›ํ•œ ๋Œ€๊ธฐ ๋ฐฉ์ง€
  • ์—๋Ÿฌ ์ฒ˜๋ฆฌ
    • ๋ชจ๋“  ์Šค๋ ˆ๋“œ์—์„œ ์˜ˆ์™ธ ์ฒ˜๋ฆฌ ๊ตฌํ˜„
    • ๋ฉ”์ธ ์Šค๋ ˆ๋“œ์—์„œ ์˜ˆ์™ธ ๊ฐ์ง€ ๋ฐ ๋กœ๊น…
    • threading.excepthook ํ™œ์šฉ
  • ์„ฑ๋Šฅ ์ตœ์ ํ™”
    • ๋„ˆ๋ฌด ๋งŽ์€ ์Šค๋ ˆ๋“œ ์ƒ์„ฑ ํ”ผํ•˜๊ธฐ(์ˆ˜๋ฐฑ ๊ฐœ ์ด์ƒ)
    • ThreadPoolExecutor๋กœ ํšจ์œจ์ ์ธ ์Šค๋ ˆ๋“œ ์žฌ์‚ฌ์šฉ
    • CPU ๋ฐ”์šด๋“œ ์ž‘์—…์€ multiprocessing ๋ชจ๋“ˆ ๊ณ ๋ ค
  • ์Šค๋ ˆ๋“œ ์•ˆ์ „(Thread-Safe) ์„ค๊ณ„
    • ๋ถˆ๋ณ€ ๊ฐ์ฒด ์„ ํ˜ธ
    • ์Šค๋ ˆ๋“œ ๋กœ์ปฌ ์ €์žฅ์†Œ(thread-local storage) ํ™œ์šฉ
    • ๊ฐ€๋Šฅํ•˜๋ฉด ํ ๋“ฑ์˜ ์Šค๋ ˆ๋“œ ์•ˆ์ „ํ•œ ์ž๋ฃŒ๊ตฌ์กฐ ์‚ฌ์šฉ
  • ๋””๋ฒ„๊น…๊ณผ ํ…Œ์ŠคํŠธ
    • ์Šค๋ ˆ๋“œ ์ด๋ฆ„ ์ง€์ •์œผ๋กœ ๋กœ๊น… ๋ฐ ๋””๋ฒ„๊น… ์šฉ์ดํ•˜๊ฒŒ
    • ์Šค๋ ˆ๋“œ ๊ฐ„ ๊ฒฝ์Ÿ ์กฐ๊ฑด ํ…Œ์ŠคํŠธ๋ฅผ ์œ„ํ•œ ์ „๋žต ์ˆ˜๋ฆฝ
    • ๋‹จ์ผ ์Šค๋ ˆ๋“œ ๋ชจ๋“œ๋กœ ๊ฒ€์ฆ ํ›„ ๋ณ‘๋ ฌ ๋ชจ๋“œ๋กœ ์ „ํ™˜


โš ๏ธ **GitHub.com Fallback** โš ๏ธ