KR_Multiprocessing - somaz94/python-study GitHub Wiki
๋ฉํฐํ๋ก์ธ์ฑ์ ์ฌ๋ฌ ํ๋ก์ธ์ค๋ฅผ ๋์์ ์คํํ์ฌ ๋ณ๋ ฌ ์ฒ๋ฆฌ๋ฅผ ๊ตฌํํ๋ ๊ธฐ์ ์ด๋ค.
from multiprocessing import Process, current_process
import os
import time
from typing import List, Callable, Any, Optional, Dict, Union, Tuple
def basic_worker() -> None:
"""๊ธฐ๋ณธ ์์
์ ํจ์
ํ์ฌ ํ๋ก์ธ์ค ์ ๋ณด๋ฅผ ์ถ๋ ฅํ๋ค.
"""
process = current_process()
print(f'ํ๋ก์ธ์ค ์ด๋ฆ: {process.name}')
print(f'ํ๋ก์ธ์ค ID: {os.getpid()}')
print(f'๋ถ๋ชจ ํ๋ก์ธ์ค ID: {os.getppid()}')
print(f'ํ๋ก์ธ์ค ์ํ: {"alive" if process.is_alive() else "dead"}')
def worker_with_args(number: int, text: str) -> None:
"""์ธ์๋ฅผ ๋ฐ๋ ์์
์ ํจ์
Args:
number: ์ ์ ์ธ์
text: ๋ฌธ์์ด ์ธ์
"""
print(f'ํ๋ก์ธ์ค ID: {os.getpid()}')
print(f'์ธ์ ๊ฐ: number={number}, text={text}')
time.sleep(2) # ์์
์๋ฎฌ๋ ์ด์
print(f'ํ๋ก์ธ์ค {os.getpid()} ์์
์๋ฃ')
class ProcessManager:
"""ํ๋ก์ธ์ค ๊ด๋ฆฌ ํด๋์ค"""
def __init__(self, max_processes: int = 4):
"""์ด๊ธฐํ
Args:
max_processes: ์ต๋ ํ๋ก์ธ์ค ์
"""
self.max_processes = max_processes
self.processes: List[Process] = []
def create_process(self, target: Callable, args: tuple = (),
kwargs: Dict = None, name: Optional[str] = None) -> Process:
"""์ ํ๋ก์ธ์ค ์์ฑ
Args:
target: ์คํํ ํจ์
args: ์์น ์ธ์
kwargs: ํค์๋ ์ธ์
name: ํ๋ก์ธ์ค ์ด๋ฆ
Returns:
Process: ์์ฑ๋ ํ๋ก์ธ์ค ๊ฐ์ฒด
"""
if kwargs is None:
kwargs = {}
p = Process(target=target, args=args, kwargs=kwargs, name=name)
self.processes.append(p)
return p
def start_all(self) -> None:
"""๋ชจ๋ ํ๋ก์ธ์ค ์์"""
for p in self.processes:
p.start()
print(f'ํ๋ก์ธ์ค ์์: {p.name} (PID: {p.pid})')
def join_all(self, timeout: Optional[float] = None) -> None:
"""๋ชจ๋ ํ๋ก์ธ์ค ๋๊ธฐ
Args:
timeout: ์ต๋ ๋๊ธฐ ์๊ฐ(์ด)
"""
for p in self.processes:
p.join(timeout)
def terminate_all(self) -> None:
"""๋ชจ๋ ํ๋ก์ธ์ค ๊ฐ์ ์ข
๋ฃ"""
for p in self.processes:
if p.is_alive():
p.terminate()
print(f'ํ๋ก์ธ์ค ์ข
๋ฃ: {p.name} (PID: {p.pid})')
def get_active_count(self) -> int:
"""ํ์ฑ ํ๋ก์ธ์ค ์ ๋ฐํ
Returns:
int: ํ์ฑ ํ๋ก์ธ์ค ์
"""
return sum(1 for p in self.processes if p.is_alive())
# ๊ธฐ๋ณธ ์ฌ์ฉ ์์
if __name__ == '__main__':
# ๊ธฐ๋ณธ ํ๋ก์ธ์ค ์์ฑ ๋ฐ ์คํ
processes = []
for i in range(3):
p = Process(target=basic_worker)
p.start()
processes.append(p)
for p in processes:
p.join()
print("\nํ๋ก์ธ์ค ๋งค๋์ ์ฌ์ฉ ์์:")
# ํ๋ก์ธ์ค ๋งค๋์ ์ฌ์ฉ
manager = ProcessManager()
# ์ฌ๋ฌ ํ๋ก์ธ์ค ์์ฑ
for i in range(3):
manager.create_process(
target=worker_with_args,
args=(i, f"์์
{i}"),
name=f"Worker-{i}"
)
# ๋ชจ๋ ํ๋ก์ธ์ค ์์ ๋ฐ ๋๊ธฐ
manager.start_all()
print(f"ํ์ฑ ํ๋ก์ธ์ค ์: {manager.get_active_count()}")
manager.join_all()
print("๋ชจ๋ ํ๋ก์ธ์ค ์๋ฃ")
โ
ํน์ง:
- ํ๋ก์ธ์ค ์์ฑ
- ๋ณ๋ ฌ ์คํ
- ํ๋ก์ธ์ค ๊ด๋ฆฌ
- ์์ ๊ฒฉ๋ฆฌ
- ์์ ์ฑ ํฅ์
- ๋ค์ค ์ฝ์ด ํ์ฉ
ํ์ด์ฌ์์ ๋ณ๋ ฌ ์ฒ๋ฆฌ๋ฅผ ์ํ ๋ ๊ฐ์ง ์ฃผ์ ์ ๊ทผ ๋ฐฉ์์ ๋น๊ต์ด๋ค.
ํน์ฑ | ๋ฉํฐํ๋ก์ธ์ฑ | ๋ฉํฐ์ค๋ ๋ฉ |
---|---|---|
๋ฉ๋ชจ๋ฆฌ ๊ณต์ | ๋ณ๋ ๋ฉ๋ชจ๋ฆฌ ๊ณต๊ฐ | ๋ฉ๋ชจ๋ฆฌ ๊ณต๊ฐ ๊ณต์ |
GIL ์ํฅ | ์ํฅ ์์ | GIL์ ์ํด ์ ํ๋จ |
์ฅ์ | ์์ ํ ๋ณ๋ ฌ ์ฒ๋ฆฌ, CPU ๋ฐ์ด๋ ์์ ์ ์ ํฉ | ์ ์ ๋ฉ๋ชจ๋ฆฌ ์ฌ์ฉ, I/O ๋ฐ์ด๋ ์์ ์ ์ ํฉ |
๋จ์ | ๋์ ๋ฉ๋ชจ๋ฆฌ ์ฌ์ฉ, ํ๋ก์ธ์ค ๊ฐ ํต์ ๋น์ฉ | Python์ GIL๋ก ์ธํ ์ฑ๋ฅ ์ ํ |
์์ฑ ๋น์ฉ | ๋์ | ๋ฎ์ |
์ ํฉํ ์์ | CPU ์ง์ฝ์ ๊ณ์ฐ, ๋ณ๋ ๋ฉ๋ชจ๋ฆฌ ํ์ ์์ | I/O ์์ , ๋คํธ์ํฌ ํต์ , ๋๋ฆฐ ์ธ๋ถ ๋ฆฌ์์ค ์ ๊ทผ |
์ฌ๋ฌ ํ๋ก์ธ์ค ๊ฐ์ ๋ฐ์ดํฐ๋ฅผ ๊ตํํ๋ ๋ค์ํ ๋ฐฉ๋ฒ์ด๋ค.
from multiprocessing import Process, Queue, Pipe, Event
import time
from typing import List, Dict, Any, Tuple, Optional
class QueueCommunication:
"""Queue๋ฅผ ์ฌ์ฉํ ํ๋ก์ธ์ค ๊ฐ ํต์ """
def sender(self, queue: Queue, data: List[Any]) -> None:
"""๋ฐ์ดํฐ๋ฅผ ํ์ ์ ์กํ๋ ํ๋ก์ธ์ค
Args:
queue: ๋ฐ์ดํฐ๋ฅผ ๋ณด๋ผ ํ
data: ๋ณด๋ผ ๋ฐ์ดํฐ ๋ฆฌ์คํธ
"""
for item in data:
queue.put(item)
print(f'[Sender] ์ ์ก: {item}')
time.sleep(0.5) # ์ ์ก ๊ฐ๊ฒฉ
# ์ข
๋ฃ ์ ํธ ์ ์ก
queue.put(None)
print('[Sender] ์ ์ก ์๋ฃ')
def receiver(self, queue: Queue) -> None:
"""ํ์์ ๋ฐ์ดํฐ๋ฅผ ์์ ํ๋ ํ๋ก์ธ์ค
Args:
queue: ๋ฐ์ดํฐ๋ฅผ ๋ฐ์ ํ
"""
while True:
item = queue.get()
if item is None: # ์ข
๋ฃ ์ ํธ ํ์ธ
break
print(f'[Receiver] ์์ : {item}')
print('[Receiver] ์์ ์๋ฃ')
def demonstrate(self, data: List[Any]) -> None:
"""Queue ํต์ ์์ฐ
Args:
data: ์ ์กํ ๋ฐ์ดํฐ
"""
queue = Queue()
sender_process = Process(
target=self.sender,
args=(queue, data)
)
receiver_process = Process(
target=self.receiver,
args=(queue,)
)
sender_process.start()
receiver_process.start()
sender_process.join()
receiver_process.join()
class PipeCommunication:
"""Pipe๋ฅผ ์ฌ์ฉํ ํ๋ก์ธ์ค ๊ฐ ํต์ """
def pipe_process_a(self, conn: Pipe, data: List[Any]) -> None:
"""์ฒซ ๋ฒ์งธ ํ๋ก์ธ์ค
Args:
conn: ํ์ดํ ์ฐ๊ฒฐ ๊ฐ์ฒด
data: ์ ์กํ ๋ฐ์ดํฐ
"""
for item in data:
conn.send(item)
print(f'[Process A] ์ ์ก: {item}')
# ์๋ต ๋๊ธฐ
response = conn.recv()
print(f'[Process A] ์์ ํ์ธ: {response}')
time.sleep(0.5)
# ์ข
๋ฃ ์ ํธ
conn.send('END')
conn.close()
def pipe_process_b(self, conn: Pipe) -> None:
"""๋ ๋ฒ์งธ ํ๋ก์ธ์ค
Args:
conn: ํ์ดํ ์ฐ๊ฒฐ ๊ฐ์ฒด
"""
while True:
item = conn.recv()
if item == 'END':
print('[Process B] ์ข
๋ฃ ์ ํธ ์์ ')
break
print(f'[Process B] ์์ : {item}')
# ์๋ต ์ ์ก
conn.send(f'ACK: {item}')
conn.close()
def demonstrate(self, data: List[Any]) -> None:
"""Pipe ํต์ ์์ฐ
Args:
data: ์ ์กํ ๋ฐ์ดํฐ
"""
# ์๋ฐฉํฅ ํ์ดํ ์์ฑ
parent_conn, child_conn = Pipe()
process_a = Process(
target=self.pipe_process_a,
args=(parent_conn, data)
)
process_b = Process(
target=self.pipe_process_b,
args=(child_conn,)
)
process_a.start()
process_b.start()
process_a.join()
process_b.join()
class EventCommunication:
"""Event๋ฅผ ์ฌ์ฉํ ํ๋ก์ธ์ค ๊ฐ ๋๊ธฐํ"""
def wait_for_event(self, event: Event) -> None:
"""์ด๋ฒคํธ ๋๊ธฐ ํ๋ก์ธ์ค
Args:
event: ๋๊ธฐํ ์ด๋ฒคํธ ๊ฐ์ฒด
"""
print('[๋๊ธฐ ํ๋ก์ธ์ค] ์ด๋ฒคํธ ๋๊ธฐ ์ค...')
event.wait() # ์ด๋ฒคํธ๊ฐ ์ค์ ๋ ๋๊น์ง ๋๊ธฐ
print('[๋๊ธฐ ํ๋ก์ธ์ค] ์ด๋ฒคํธ ๋ฐ์! ์์
์์')
# ์์
์๋ฎฌ๋ ์ด์
time.sleep(2)
print('[๋๊ธฐ ํ๋ก์ธ์ค] ์์
์๋ฃ')
def trigger_event(self, event: Event, delay: float) -> None:
"""์ผ์ ์๊ฐ ํ ์ด๋ฒคํธ ํธ๋ฆฌ๊ฑฐ ํ๋ก์ธ์ค
Args:
event: ์ค์ ํ ์ด๋ฒคํธ ๊ฐ์ฒด
delay: ์ง์ฐ ์๊ฐ(์ด)
"""
print(f'[ํธ๋ฆฌ๊ฑฐ ํ๋ก์ธ์ค] {delay}์ด ํ ์ด๋ฒคํธ ์ค์ ์์ ')
time.sleep(delay)
print('[ํธ๋ฆฌ๊ฑฐ ํ๋ก์ธ์ค] ์ด๋ฒคํธ ์ค์ !')
event.set() # ์ด๋ฒคํธ ์ค์
def demonstrate(self, delay: float = 3.0) -> None:
"""Event ํต์ ์์ฐ
Args:
delay: ์ด๋ฒคํธ ํธ๋ฆฌ๊ฑฐ ์ง์ฐ ์๊ฐ(์ด)
"""
event = Event()
wait_process = Process(
target=self.wait_for_event,
args=(event,)
)
trigger_process = Process(
target=self.trigger_event,
args=(event, delay)
)
wait_process.start()
trigger_process.start()
wait_process.join()
trigger_process.join()
# ์ฌ์ฉ ์์
if __name__ == '__main__':
# Queue ํต์ ์์
print("\n===== Queue ํต์ ์์ =====")
queue_comm = QueueCommunication()
queue_comm.demonstrate(['๋ฉ์์ง1', '๋ฉ์์ง2', '๋ฉ์์ง3'])
# Pipe ํต์ ์์
print("\n===== Pipe ํต์ ์์ =====")
pipe_comm = PipeCommunication()
pipe_comm.demonstrate(['๋ฐ์ดํฐ1', '๋ฐ์ดํฐ2', '๋ฐ์ดํฐ3'])
# Event ํต์ ์์
print("\n===== Event ํต์ ์์ =====")
event_comm = EventCommunication()
event_comm.demonstrate(2.0)
โ
ํน์ง:
- ํ ์ฌ์ฉ
- ํ์ดํ ํต์
- ๋ฐ์ดํฐ ๊ตํ
- ๋น๋๊ธฐ ํต์
- ๋๊ธฐํ ๋ฉ์ปค๋์ฆ
- ์ด๋ฒคํธ ๊ธฐ๋ฐ ์กฐ์
ํ๋ก์ธ์ค ๊ฐ ํต์ (IPC)์ ๋ค์ํ ๋ฐฉ์๊ณผ ํน์ง์ด๋ค.
ํต์ ๋ฐฉ์ | ํน์ง | ์ฉ๋ |
---|---|---|
Queue | ๋ค์ค ์์ฐ์/์๋น์, ์ค๋ ๋ ์์ | ์์ ๋ถ๋ฐฐ, ๊ฒฐ๊ณผ ์์ง |
Pipe | ์๋ฐฉํฅ ํต์ , ์ผ๋ฐ์ ์ผ๋ก ๋ ํ๋ก์ธ์ค ๊ฐ ์ฌ์ฉ | ์๋ฐฉํฅ ๋ฉ์์ง ๊ตํ |
Value/Array | ๊ณต์ ๋ฉ๋ชจ๋ฆฌ ๊ธฐ๋ฐ, ์์์ ์ฐ์ฐ ์ง์ | ๋จ์ ๋ฐ์ดํฐ ๊ณต์ |
Manager | ๋ณต์กํ ๊ณต์ ๊ฐ์ฒด, ๋คํธ์ํฌ ํฌ๋ช ์ฑ | ๋ณต์กํ ๋ฐ์ดํฐ ๊ตฌ์กฐ ๊ณต์ |
Event | ํ๋ก์ธ์ค ๊ฐ ๋๊ธฐํ | ์ํ ์ ํธ, ์์ ์กฐ์ |
Lock/RLock | ์ํธ ๋ฐฐ์ , ๋ฆฌ์์ค ๋ณดํธ | ๊ณต์ ์์ ๋ณดํธ |
Semaphore | ๋ฆฌ์์ค ์นด์ดํ , ์ ๊ทผ ์ ํ | ๋ฆฌ์์ค ํ ๊ด๋ฆฌ |
Condition | ์กฐ๊ฑด ๊ธฐ๋ฐ ๋๊ธฐํ | ์์ฐ์/์๋น์ ํจํด |
๋ฐ๋ณต์ ์ธ ์์
์ ํจ์จ์ ์ผ๋ก ์ฒ๋ฆฌํ๊ธฐ ์ํ ํ๋ก์ธ์ค ํ ํ์ฉ ๋ฐฉ๋ฒ์ด๋ค.
from multiprocessing import Pool, cpu_count
import time
import os
from typing import List, Callable, Any, Dict, Union, Tuple
import functools
def cpu_bound_task(x: int) -> int:
"""CPU ์ง์ฝ์ ๊ณ์ฐ ์์
(์: ์์ ํ๋ณ)
Args:
x: ์์
ํ ์ซ์
Returns:
int: ๊ณ์ฐ ๊ฒฐ๊ณผ
"""
if x <= 1:
return 0
# ์์ ํ๋ณ (CPU ์ง์ฝ์ ์์
์๋ฎฌ๋ ์ด์
)
count = 0
for i in range(2, int(x**0.5) + 1):
if x % i == 0:
count += 1
# ์์
์๊ฐ ์๋ฎฌ๋ ์ด์
time.sleep(0.1)
return count
def io_bound_task(filename: str) -> Tuple[str, int]:
"""I/O ์ง์ฝ์ ์์
(์: ํ์ผ ์ฒ๋ฆฌ)
Args:
filename: ์ฒ๋ฆฌํ ํ์ผ ์ด๋ฆ
Returns:
Tuple[str, int]: (ํ์ผ๋ช
, ์ฒ๋ฆฌ ๊ฒฐ๊ณผ)
"""
print(f"ํ๋ก์ธ์ค {os.getpid()}: ํ์ผ {filename} ์ฒ๋ฆฌ ์ค")
# I/O ์์
์๋ฎฌ๋ ์ด์
time.sleep(0.5)
# ์ค์ ๋ก๋ ์ฌ๊ธฐ์ ํ์ผ์ ์ฝ๊ณ ์ฒ๋ฆฌ
result = len(filename)
return (filename, result)
def task_with_callback(x: int) -> int:
"""์ฝ๋ฐฑ์ ์ํ ์์
Args:
x: ์
๋ ฅ ๊ฐ
Returns:
int: ๊ณ์ฐ ๊ฒฐ๊ณผ
"""
print(f"์์
{x} ์์ (PID: {os.getpid()})")
time.sleep(1) # ์์
์๋ฎฌ๋ ์ด์
result = x * x
print(f"์์
{x} ์๋ฃ")
return result
def result_callback(result: int) -> None:
"""๋น๋๊ธฐ ์์
์ ๊ฒฐ๊ณผ ์ฒ๋ฆฌ ์ฝ๋ฐฑ
Args:
result: ์์
๊ฒฐ๊ณผ
"""
print(f"์ฝ๋ฐฑ: ๊ฒฐ๊ณผ = {result}")
class ProcessPoolManager:
"""ํ๋ก์ธ์ค ํ ๊ด๋ฆฌ ํด๋์ค"""
def __init__(self, processes: int = None):
"""์ด๊ธฐํ
Args:
processes: ํ๋ก์ธ์ค ์ (None์ด๋ฉด CPU ์ฝ์ด ์ ์ฌ์ฉ)
"""
self.processes = processes or cpu_count()
print(f"ํ๋ก์ธ์ค ํ ํฌ๊ธฐ: {self.processes} (๊ฐ์ฉ CPU: {cpu_count()})")
def map_demonstration(self, func: Callable, iterable: List) -> List:
"""map ๋ฉ์๋ ์์ฐ
Args:
func: ์ ์ฉํ ํจ์
iterable: ์
๋ ฅ ๋ฐ์ดํฐ ๋ฆฌ์คํธ
Returns:
List: ๊ฒฐ๊ณผ ๋ฆฌ์คํธ
"""
start_time = time.time()
with Pool(processes=self.processes) as pool:
results = pool.map(func, iterable)
end_time = time.time()
print(f"map ์คํ ์๊ฐ: {end_time - start_time:.2f}์ด")
return results
def imap_unordered_demonstration(self, func: Callable, iterable: List) -> List:
"""imap_unordered ๋ฉ์๋ ์์ฐ (์์ ๋ฌด๊ด, ๊ฒฐ๊ณผ ์ฆ์ ๋ฐํ)
Args:
func: ์ ์ฉํ ํจ์
iterable: ์
๋ ฅ ๋ฐ์ดํฐ ๋ฆฌ์คํธ
Returns:
List: ๊ฒฐ๊ณผ ๋ฆฌ์คํธ
"""
start_time = time.time()
results = []
with Pool(processes=self.processes) as pool:
for result in pool.imap_unordered(func, iterable):
print(f"๊ฒฐ๊ณผ ๋์ฐฉ: {result}")
results.append(result)
end_time = time.time()
print(f"imap_unordered ์คํ ์๊ฐ: {end_time - start_time:.2f}์ด")
return results
def async_demonstration(self, func: Callable, args_list: List) -> List:
"""apply_async ๋ฉ์๋ ์์ฐ (๋น๋๊ธฐ ์คํ)
Args:
func: ์ ์ฉํ ํจ์
args_list: ์ธ์ ๋ฆฌ์คํธ
Returns:
List: ๊ฒฐ๊ณผ ๋ฆฌ์คํธ
"""
results = []
with Pool(processes=self.processes) as pool:
async_results = [
pool.apply_async(
func,
args=(arg,),
callback=result_callback
) for arg in args_list
]
# ๋ชจ๋ ๊ฒฐ๊ณผ ์์ง
for async_result in async_results:
result = async_result.get() # ์๋ฃ๋ ๋๊น์ง ๋๊ธฐ
results.append(result)
return results
def chunksize_comparison(self, func: Callable, iterable: List) -> None:
"""์ฒญํฌ ํฌ๊ธฐ์ ๋ฐ๋ฅธ ์ฑ๋ฅ ๋น๊ต
Args:
func: ์ ์ฉํ ํจ์
iterable: ์
๋ ฅ ๋ฐ์ดํฐ ๋ฆฌ์คํธ
"""
chunk_sizes = [1, 10, 25, 50]
for chunk_size in chunk_sizes:
start_time = time.time()
with Pool(processes=self.processes) as pool:
results = pool.map(func, iterable, chunksize=chunk_size)
end_time = time.time()
print(f"์ฒญํฌ ํฌ๊ธฐ {chunk_size}: {end_time - start_time:.2f}์ด")
# ์ฌ์ฉ ์์
if __name__ == '__main__':
pool_manager = ProcessPoolManager()
# 1. map ๋ฉ์๋ ์์ (CPU ๋ฐ์ด๋ ์์
)
print("\n===== Pool.map ์์ (CPU ๋ฐ์ด๋) =====")
numbers = list(range(100, 150))
results = pool_manager.map_demonstration(cpu_bound_task, numbers)
print(f"์ฒ๋ฆฌ๋ ๊ฒฐ๊ณผ ์: {len(results)}")
# 2. imap_unordered ์์ (I/O ๋ฐ์ด๋ ์์
)
print("\n===== Pool.imap_unordered ์์ (I/O ๋ฐ์ด๋) =====")
filenames = [f"file_{i}.txt" for i in range(1, 11)]
results = pool_manager.imap_unordered_demonstration(io_bound_task, filenames)
# 3. apply_async ์์ (๋น๋๊ธฐ + ์ฝ๋ฐฑ)
print("\n===== Pool.apply_async ์์ (์ฝ๋ฐฑ ํฌํจ) =====")
args_list = [3, 6, 9, 12]
results = pool_manager.async_demonstration(task_with_callback, args_list)
print(f"์ต์ข
๊ฒฐ๊ณผ: {results}")
# 4. ์ฒญํฌ ํฌ๊ธฐ์ ๋ฐ๋ฅธ ์ฑ๋ฅ ๋น๊ต
print("\n===== ์ฒญํฌ ํฌ๊ธฐ์ ๋ฐ๋ฅธ ์ฑ๋ฅ ๋น๊ต =====")
large_numbers = list(range(100, 300))
pool_manager.chunksize_comparison(cpu_bound_task, large_numbers)
โ
ํน์ง:
- ์์ ๋ถ๋ฐฐ
- ๋ณ๋ ฌ ์ฒ๋ฆฌ
- ๊ฒฐ๊ณผ ์์ง
- ๋น๋๊ธฐ ์คํ
- ์๋ ํ๋ก์ธ์ค ๊ด๋ฆฌ
- ํจ์จ์ ์ธ ์์ ํ์ฉ
ํ๋ก์ธ์ค ๊ฐ์ ๋ฐ์ดํฐ๋ฅผ ํจ์จ์ ์ผ๋ก ๊ณต์ ํ๊ธฐ ์ํ ๊ณต์ ๋ฉ๋ชจ๋ฆฌ ๋ฉ์ปค๋์ฆ์ด๋ค.
from multiprocessing import Process, Value, Array, RawValue, RawArray
import time
import ctypes
from typing import List, Callable, Any
class SharedMemoryExample:
"""๊ณต์ ๋ฉ๋ชจ๋ฆฌ ์ฌ์ฉ ์์ """
def process_value(self, value: Value, operation: str) -> None:
"""Value ๊ฐ์ฒด ์กฐ์ ํ๋ก์ธ์ค
Args:
value: ๊ณต์ Value ๊ฐ์ฒด
operation: ์ํํ ์ฐ์ฐ (์ฆ๊ฐ ๋๋ ๊ฐ์)
"""
for i in range(5):
with value.get_lock(): # ๋ฝ ํ๋
if operation == 'increase':
value.value += 1
elif operation == 'decrease':
value.value -= 1
print(f"[{operation}] ์ ๊ฐ: {value.value} (PID: {os.getpid()})")
time.sleep(0.1)
def process_array(self, shared_array: Array, start_idx: int) -> None:
"""Array ๊ฐ์ฒด ์กฐ์ ํ๋ก์ธ์ค
Args:
shared_array: ๊ณต์ Array ๊ฐ์ฒด
start_idx: ์์ ์ธ๋ฑ์ค
"""
for i in range(3):
idx = start_idx + i
if 0 <= idx < len(shared_array):
with shared_array.get_lock(): # ๋ฝ ํ๋
shared_array[idx] = shared_array[idx] * 2
print(f"[PID: {os.getpid()}] ์ธ๋ฑ์ค {idx} ๊ฐ ๋ณ๊ฒฝ: {shared_array[idx]}")
time.sleep(0.2)
def demonstrate_value(self) -> None:
"""Value ๊ฐ์ฒด ๋ฐ๋ชจ"""
# ์ด๊ธฐ๊ฐ 0์ธ ์ ์ํ ๊ณต์ ๋ณ์ ์์ฑ
shared_value = Value('i', 0)
# ๋ ํ๋ก์ธ์ค ์์ฑ
p1 = Process(target=self.process_value, args=(shared_value, 'increase'))
p2 = Process(target=self.process_value, args=(shared_value, 'decrease'))
p1.start()
p2.start()
p1.join()
p2.join()
print(f"์ต์ข
๊ฐ: {shared_value.value}")
def demonstrate_array(self) -> None:
"""Array ๊ฐ์ฒด ๋ฐ๋ชจ"""
# ์ด๊ธฐ๊ฐ [1,2,3,4,5]์ธ ์ ์ํ ๊ณต์ ๋ฐฐ์ด ์์ฑ
shared_array = Array('i', [1, 2, 3, 4, 5])
processes = []
for i in range(3):
p = Process(target=self.process_array, args=(shared_array, i))
processes.append(p)
p.start()
for p in processes:
p.join()
print(f"์ต์ข
๋ฐฐ์ด: {list(shared_array)}")
def demonstrate_raw_types(self) -> None:
"""Raw ํ์
๋ฐ๋ชจ (๋ฝ ์์)"""
# ๋ฝ ์๋ ๊ณต์ ๋ณ์
raw_value = RawValue(ctypes.c_double, 3.14)
# ๋ฝ ์๋ ๊ณต์ ๋ฐฐ์ด
raw_array = RawArray(ctypes.c_int, 5)
for i in range(5):
raw_array[i] = i * 2
def modify_raw(rv, ra):
rv.value += 1.0
for i in range(len(ra)):
ra[i] += 10
print(f"Raw ๊ฐ ์์ : {rv.value}, ๋ฐฐ์ด: {[ra[i] for i in range(len(ra))]}")
processes = []
for _ in range(2):
p = Process(target=modify_raw, args=(raw_value, raw_array))
processes.append(p)
p.start()
for p in processes:
p.join()
print(f"์ต์ข
Raw ๊ฐ: {raw_value.value}")
print(f"์ต์ข
Raw ๋ฐฐ์ด: {[raw_array[i] for i in range(5)]}")
class SharedCounterExample:
"""๊ณต์ ์นด์ดํฐ ์์ """
def __init__(self):
"""๊ณต์ ์นด์ดํฐ ์ด๊ธฐํ"""
self.counter = Value('i', 0)
def increment(self, n: int) -> None:
"""์นด์ดํฐ ์ฆ๊ฐ
Args:
n: ์ฆ๊ฐ์ํฌ ํ์
"""
for _ in range(n):
with self.counter.get_lock():
self.counter.value += 1
def run_workers(self, num_processes: int, increments_per_process: int) -> None:
"""์ฌ๋ฌ ํ๋ก์ธ์ค๋ก ์นด์ดํฐ ์ฆ๊ฐ
Args:
num_processes: ์์ฑํ ํ๋ก์ธ์ค ์
increments_per_process: ํ๋ก์ธ์ค๋น ์ฆ๊ฐ ํ์
"""
processes = []
for _ in range(num_processes):
p = Process(target=self.increment, args=(increments_per_process,))
processes.append(p)
p.start()
for p in processes:
p.join()
expected = num_processes * increments_per_process
print(f"์์ ์นด์ดํฐ ๊ฐ: {expected}")
print(f"์ค์ ์นด์ดํฐ ๊ฐ: {self.counter.value}")
print(f"์ ํ์ฑ: {'์ ํํจ' if expected == self.counter.value else '๊ฒฝ์ ์ํ ๋ฐ์'}")
# ์ฌ์ฉ ์์
if __name__ == '__main__':
import os
# ๊ณต์ ๋ฉ๋ชจ๋ฆฌ ์์
print("\n===== ๊ณต์ ๋ฉ๋ชจ๋ฆฌ ์์ =====")
shared_mem = SharedMemoryExample()
print("\n[Value ๊ฐ์ฒด ์ฌ์ฉ]")
shared_mem.demonstrate_value()
print("\n[Array ๊ฐ์ฒด ์ฌ์ฉ]")
shared_mem.demonstrate_array()
print("\n[Raw ํ์
์ฌ์ฉ]")
shared_mem.demonstrate_raw_types()
# ๊ณต์ ์นด์ดํฐ ์์
print("\n===== ๊ณต์ ์นด์ดํฐ ๊ฒฝ์ ์ํ ์๋ฐฉ =====")
counter = SharedCounterExample()
counter.run_workers(num_processes=4, increments_per_process=10000)
โ
ํน์ง:
- ๊ณต์ ๋ณ์
- ๊ณต์ ๋ฐฐ์ด
- ๋ฉ๋ชจ๋ฆฌ ํจ์จ์ฑ
- ํ๋ก์ธ์ค ๊ฐ ๋๊ธฐํ
- ์์์ ์ฐ์ฐ
- ํ์ ์์ ์ฑ
๋ฉํฐํ๋ก์ธ์ฑ์์ ์ฌ์ฉ ๊ฐ๋ฅํ ๋ค์ํ ๊ณต์ ๋ฉ๋ชจ๋ฆฌ ํ์
์ด๋ค.
ํ์ | ๋ฝ ์ ๊ณต | ์ฌ์ฉ ์ฌ๋ก | ํน์ง |
---|---|---|---|
Value | โ | ๋จ์ผ ๊ฐ ๊ณต์ | ๋ด์ฅ ๋ฝ, ํ์ ์์ ์ฑ |
Array | โ | ๋ฐฐ์ด ๊ณต์ | ๋ด์ฅ ๋ฝ, ๊ณ ์ ํฌ๊ธฐ, ๋์ข ํ์ |
RawValue | โ | ์ฑ๋ฅ ์ค์ฌ ๋จ์ผ ๊ฐ | ๋ฝ ์์, ์ง์ ๋๊ธฐํ ํ์ |
RawArray | โ | ์ฑ๋ฅ ์ค์ฌ ๋ฐฐ์ด | ๋ฝ ์์, ์ง์ ๋๊ธฐํ ํ์ |
Manager ๊ฐ์ฒด | โ | ๋ณต์กํ ์๋ฃ๊ตฌ์กฐ | ๋คํธ์ํฌ ํฌ๋ช ์ฑ, ์ ์ฐ์ฑ |
shared_memory | โ | ๋์ฉ๋ ๋ฐ์ดํฐ | Python 3.8+, ์ง์ ๊ด๋ฆฌ |
๋ค์ํ ๋ณต์กํ ์๋ฃ๊ตฌ์กฐ๋ฅผ ํ๋ก์ธ์ค ๊ฐ์ ์์ ํ๊ฒ ๊ณต์ ํ๊ธฐ ์ํ ๋งค๋์ ๊ฐ์ฒด์ด๋ค.
from multiprocessing import Process, Manager, Lock
import time
import random
from typing import Dict, List, Any, Set, DefaultDict
import collections
class ManagerExample:
"""Manager ๊ฐ์ฒด ์ฌ์ฉ ์์ """
def worker_dict(self, shared_dict: Dict, worker_id: int, keys: List[str]) -> None:
"""๊ณต์ ๋์
๋๋ฆฌ ์์
ํ๋ก์ธ์ค
Args:
shared_dict: ๊ณต์ ๋์
๋๋ฆฌ
worker_id: ์์
์ ID
keys: ์์
ํ ํค ๋ชฉ๋ก
"""
for key in keys:
# ํ์ฌ ๊ฐ ์ฝ๊ธฐ
current = shared_dict.get(key, 0)
# ๊ฐ ์
๋ฐ์ดํธ
shared_dict[key] = current + worker_id
print(f"Worker {worker_id}: ํค '{key}' ๊ฐ์ {current} โ {shared_dict[key]}๋ก ์
๋ฐ์ดํธ")
time.sleep(random.random() * 0.3) # ์์ ์ง์ฐ
def worker_list(self, shared_list: List, items: List[int]) -> None:
"""๊ณต์ ๋ฆฌ์คํธ ์์
ํ๋ก์ธ์ค
Args:
shared_list: ๊ณต์ ๋ฆฌ์คํธ
items: ์ถ๊ฐํ ํญ๋ชฉ ๋ชฉ๋ก
"""
for item in items:
shared_list.append(item)
print(f"ํ๋ก์ธ์ค {os.getpid()}: ํญ๋ชฉ {item} ์ถ๊ฐ๋จ, ํ์ฌ ๋ฆฌ์คํธ: {list(shared_list)}")
time.sleep(random.random() * 0.2) # ์์ ์ง์ฐ
def worker_namespace(self, ns: Any, worker_id: int) -> None:
"""๊ณต์ ๋ค์์คํ์ด์ค ์์
ํ๋ก์ธ์ค
Args:
ns: ๊ณต์ ๋ค์์คํ์ด์ค
worker_id: ์์
์ ID
"""
# ์์ฑ ์
๋ฐ์ดํธ
current = getattr(ns, f'count_{worker_id}', 0)
setattr(ns, f'count_{worker_id}', current + 1)
# ๊ณต์ total ๊ฐ ์
๋ฐ์ดํธ
ns.total += 1
print(f"Worker {worker_id}: count_{worker_id}={getattr(ns, f'count_{worker_id}')}, total={ns.total}")
time.sleep(0.1)
def demonstrate_dict(self) -> None:
"""๊ณต์ ๋์
๋๋ฆฌ ๋ฐ๋ชจ"""
with Manager() as manager:
# ๊ณต์ ๋์
๋๋ฆฌ ์์ฑ
shared_dict = manager.dict()
shared_dict['์ด๊ธฐ๊ฐ'] = 100
processes = []
all_keys = ['a', 'b', 'c', 'd', 'e']
# ์์
์๋ณ๋ก ํค ๋ถ๋ฐฐ
for i in range(3):
keys = all_keys[i:i+3] # ์ผ๋ถ ํค ๊ฒน์นจ
p = Process(target=self.worker_dict, args=(shared_dict, i+1, keys))
processes.append(p)
p.start()
for p in processes:
p.join()
# ์ต์ข
๊ฒฐ๊ณผ ์ถ๋ ฅ
print(f"์ต์ข
๋์
๋๋ฆฌ: {dict(shared_dict)}")
def demonstrate_list(self) -> None:
"""๊ณต์ ๋ฆฌ์คํธ ๋ฐ๋ชจ"""
with Manager() as manager:
# ๊ณต์ ๋ฆฌ์คํธ ์์ฑ
shared_list = manager.list([0, 10, 20])
processes = []
# ์ฌ๋ฌ ํ๋ก์ธ์ค๊ฐ ๋ฆฌ์คํธ์ ํญ๋ชฉ ์ถ๊ฐ
for i in range(3):
items = list(range(i*10+1, i*10+4))
p = Process(target=self.worker_list, args=(shared_list, items))
processes.append(p)
p.start()
for p in processes:
p.join()
# ์ต์ข
๊ฒฐ๊ณผ ์ถ๋ ฅ
print(f"์ต์ข
๋ฆฌ์คํธ: {list(shared_list)}")
def demonstrate_namespace(self) -> None:
"""๊ณต์ ๋ค์์คํ์ด์ค ๋ฐ๋ชจ"""
with Manager() as manager:
# ๊ณต์ ๋ค์์คํ์ด์ค ์์ฑ
ns = manager.Namespace()
ns.total = 0
processes = []
for i in range(1, 5):
setattr(ns, f'count_{i}', 0) # ์ด๊ธฐํ
p = Process(target=self.worker_namespace, args=(ns, i))
processes.append(p)
p.start()
for p in processes:
p.join()
# ๊ฒฐ๊ณผ ์ถ๋ ฅ
print("\n์ต์ข
๋ค์์คํ์ด์ค ์ํ:")
print(f"total: {ns.total}")
for i in range(1, 5):
print(f"count_{i}: {getattr(ns, f'count_{i}')}")
def demonstrate_complex_structures(self) -> None:
"""๋ณตํฉ ์๋ฃ๊ตฌ์กฐ ๋ฐ๋ชจ"""
with Manager() as manager:
# ๋ค์ํ ๊ณต์ ์ปฌ๋ ์
์์ฑ
shared_list = manager.list()
shared_dict = manager.dict()
shared_set = manager.set()
# ๋ฐ๋ก ํจ์ ์ ์
def complex_worker(lock, list_obj, dict_obj, set_obj, worker_id):
with lock:
# ๋ชจ๋ ๊ณต์ ๊ฐ์ฒด ์
๋ฐ์ดํธ
list_obj.append(worker_id)
dict_obj[f'worker_{worker_id}'] = worker_id * 10
set_obj.add(worker_id * 100)
print(f"Worker {worker_id} ์
๋ฐ์ดํธ:")
print(f" ๋ฆฌ์คํธ: {list(list_obj)}")
print(f" ๋์
๋๋ฆฌ: {dict(dict_obj)}")
print(f" ์งํฉ: {set(set_obj)}")
# ๊ณต์ ๋ฝ ์์ฑ
lock = manager.Lock()
processes = []
for i in range(1, 4):
p = Process(target=complex_worker, args=(lock, shared_list, shared_dict, shared_set, i))
processes.append(p)
p.start()
for p in processes:
p.join()
# ์ต์ข
๊ฒฐ๊ณผ ์ถ๋ ฅ
print("\n์ต์ข
๊ฒฐ๊ณผ:")
print(f"๋ฆฌ์คํธ: {list(shared_list)}")
print(f"๋์
๋๋ฆฌ: {dict(shared_dict)}")
print(f"์งํฉ: {set(shared_set)}")
# ์ฌ์ฉ ์์
if __name__ == '__main__':
import os
manager_example = ManagerExample()
# ๋์
๋๋ฆฌ ์์
print("\n===== ๊ณต์ ๋์
๋๋ฆฌ ์์ =====")
manager_example.demonstrate_dict()
# ๋ฆฌ์คํธ ์์
print("\n===== ๊ณต์ ๋ฆฌ์คํธ ์์ =====")
manager_example.demonstrate_list()
# ๋ค์์คํ์ด์ค ์์
print("\n===== ๊ณต์ ๋ค์์คํ์ด์ค ์์ =====")
manager_example.demonstrate_namespace()
# ๋ณตํฉ ์๋ฃ๊ตฌ์กฐ ์์
print("\n===== ๋ณตํฉ ์๋ฃ๊ตฌ์กฐ ์์ =====")
manager_example.demonstrate_complex_structures()
โ
ํน์ง:
- ๋ณต์กํ ์๋ฃ๊ตฌ์กฐ ๊ณต์
- ๋ถ์ฐ ์์คํ ์ง์
- ๋คํธ์ํฌ ํฌ๋ช ์ฑ
- ์ ์ฐํ ๋๊ธฐํ
- ์๋ ์ง๋ ฌํ
- ํ๋ก์ ๊ฐ์ฒด ์ฌ์ฉ
Manager ๊ฐ์ฒด์ ๋ค๋ฅธ ๊ณต์ ๋ฐฉ๋ฒ์ ๋น๊ต์ด๋ค.
ํน์ฑ | Manager | Value/Array | Queue/Pipe |
---|---|---|---|
์๋ฃ๊ตฌ์กฐ ๋ณต์ก์ฑ | ๋ณต์กํ ์ค์ฒฉ ๊ตฌ์กฐ | ๋จ์ ํ์ | ๋ฉ์์ง ๊ธฐ๋ฐ |
์ฑ๋ฅ | ์๋์ ์ผ๋ก ๋๋ฆผ | ๋น ๋ฆ | ๋ณดํต |
์ฌ์ฉ ํธ์์ฑ | ๋์ | ์ค๊ฐ | ์ค๊ฐ |
๋คํธ์ํฌ ์ง์ | ์ง์ | ๋ฏธ์ง์ | ๋ฏธ์ง์ |
์ ๊ณต ๊ฐ์ฒด | ๋์ ๋๋ฆฌ, ๋ฆฌ์คํธ, ๋ค์์คํ์ด์ค, ํ, ๋ฝ, ๋ฑ | ๋จ์ผ ๊ฐ, ๋ฐฐ์ด | ๋ฐ์ดํฐ ์คํธ๋ฆผ |
์ฉ๋ | ๋ณต์กํ ๊ณต์ , ๋คํธ์ํฌ ํฌ๋ช ์ฑ | ๋น ๋ฅธ ๊ณต์ ๋ฉ๋ชจ๋ฆฌ | ๋ฐ์ดํฐ ํ์ดํ๋ผ์ธ |
โ
๋ชจ๋ฒ ์ฌ๋ก:
- ํ๋ก์ธ์ค ์์ฑ ๋น์ฉ ์ดํด
- ์ ์ ํ IPC ๋ฉ์ปค๋์ฆ ์ ํ
- Pool ํฌ๊ธฐ ์ต์ ํ
- ์์ ๊ด๋ฆฌ ์ฃผ์
- ๊ฒฝ์ ์ํ ๋ฐฉ์ง
- ๊ต์ฐฉ ์ํ ์๋ฐฉ
- ํ๋ก์ธ์ค ์ข ๋ฃ ์ฒ๋ฆฌ
- ์์ธ ์ฒ๋ฆฌ ๊ตฌํ
- ๋ก๊น ์ ๋ต ์๋ฆฝ
- ๋ถํ ๋ถ์ฐ ๊ณ ๋ ค