KR_Celery - somaz94/python-study GitHub Wiki
Celeryλ λΆμ° μμ
νλ₯Ό μ 곡νλ λΉλκΈ° μμ
νμ΄λ€.
# tasks.py
from celery import Celery
# Celery μΈμ€ν΄μ€ μμ±
app = Celery('tasks',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/0')
# κΈ°λ³Έ νμ€ν¬ μ μ
@app.task
def add(x: int, y: int) -> int:
return x + y
# νμ€ν¬ μ€μ
app.conf.update(
task_serializer='json',
accept_content=['json'],
result_serializer='json',
timezone='Asia/Seoul',
enable_utc=True,
)
β
νΉμ§:
- Redis/RabbitMQ λΈλ‘컀 μ§μ
- λΉλκΈ° μμ μ²λ¦¬
- μ μ°ν μ€μ κ΄λ¦¬
- λΆμ° μμ μ²λ¦¬
- κ²°κ³Ό λ°±μλ μ μ₯
μμ
μ νμ€ν¬λ‘ μ μνκ³ λΉλκΈ°μ μΌλ‘ μ€ννλ λ°©λ²μ΄λ€.
from celery import Task
from typing import Any, Dict
class CustomTask(Task):
def on_success(self, retval: Any, task_id: str, args: tuple, kwargs: Dict) -> None:
print(f"Task {task_id} completed successfully")
def on_failure(self, exc: Exception, task_id: str, args: tuple, kwargs: Dict, einfo: Any) -> None:
print(f"Task {task_id} failed: {exc}")
@app.task(base=CustomTask, bind=True)
def process_data(self, data: Dict) -> Dict:
try:
# λ°μ΄ν° μ²λ¦¬ λ‘μ§
result = {'processed': data['value'] * 2}
return result
except Exception as e:
self.retry(exc=e, countdown=60) # 1λΆ ν μ¬μλ
# νμ€ν¬ 체μ΄λ
from celery import chain
@app.task
def validate_data(data: Dict) -> Dict:
if 'value' not in data:
raise ValueError("Missing 'value' key")
return data
# νμ€ν¬ μ²΄μΈ μ€ν
task_chain = chain(
validate_data.s({'value': 10}),
process_data.s()
)
result = task_chain()
β
νΉμ§:
- 컀μ€ν νμ€ν¬ ν΄λμ€
- νμ€ν¬ 체μ΄λ
- μλ¬ μ²λ¦¬μ μ¬μλ
- νμ€ν¬ μν μΆμ
- μκ·Έλμ²λ₯Ό ν΅ν μΈμ μ λ¬
μΌμ μ£ΌκΈ°λ‘ μλ μ€νλλ νμ€ν¬λ₯Ό μ€μ νλ λ°©λ²μ΄λ€.
from celery.schedules import crontab
app.conf.beat_schedule = {
'daily-cleanup': {
'task': 'tasks.cleanup',
'schedule': crontab(hour=0, minute=0), # λ§€μΌ μμ
},
'hourly-check': {
'task': 'tasks.health_check',
'schedule': 3600.0, # 1μκ°λ§λ€
'args': ('system',)
}
}
@app.task
def cleanup():
"""μΌμΌ μ 리 μμ
"""
# μ 리 μμ
μν
pass
@app.task
def health_check(system: str):
"""μμ€ν
μν νμΈ"""
# μν νμΈ λ‘μ§
pass
β
νΉμ§:
- Crontab μ€μΌμ€λ§
- μ£ΌκΈ°μ μμ μ€ν
- μΈμ μ λ¬ μ§μ
- λ€μν μκ° λ¨μ μ§μ
- λμ μ€μΌμ€ λ³κ²½ κ°λ₯
νμ€ν¬ μ€ν μ€ λ°μνλ μ€λ₯λ₯Ό μ²λ¦¬νκ³ μ¬μλνλ λ©μ»€λμ¦μ΄λ€.
from celery.exceptions import MaxRetriesExceededError
from typing import Optional
@app.task(
bind=True,
max_retries=3,
default_retry_delay=60,
autoretry_for=(ConnectionError,),
retry_backoff=True
)
def process_with_retry(self, data: Dict) -> Optional[Dict]:
try:
# μ²λ¦¬ λ‘μ§
result = external_api_call(data)
return result
except ConnectionError as exc:
# μλ μ¬μλ
raise self.retry(exc=exc)
except Exception as exc:
# λ€λ₯Έ μμΈ μ²λ¦¬
self.update_state(state='FAILURE', meta={'error': str(exc)})
return None
class TaskManager:
def __init__(self):
self.tasks = {}
def register_task(self, task_id: str, task):
self.tasks[task_id] = task
def revoke_task(self, task_id: str):
if task_id in self.tasks:
app.control.revoke(task_id, terminate=True)
del self.tasks[task_id]
β
νΉμ§:
- μλ μ¬μλ μ€μ
- μμΈν μλ¬ μ²λ¦¬
- νμ€ν¬ μν κ΄λ¦¬
- μ§μ λ°±μ€ν μ§μ
- νμ€ν¬ μ·¨μ κΈ°λ₯
Celery μ컀μ μ€μ κ³Ό λͺ¨λν°λ§μ μν λ°©λ²μ΄λ€.
from celery.signals import worker_ready, worker_shutting_down
from prometheus_client import Counter, Gauge
# λ©νΈλ¦ μ μ
task_counter = Counter('celery_tasks_total', 'Total number of Celery tasks')
task_latency = Gauge('celery_task_latency_seconds', 'Task processing latency')
@worker_ready.connect
def worker_ready_handler(**kwargs):
print("Celery worker is ready!")
@worker_shutting_down.connect
def worker_shutdown_handler(**kwargs):
print("Celery worker is shutting down...")
class MonitoredTask(Task):
def __call__(self, *args, **kwargs):
task_counter.inc()
with task_latency.time():
return super().__call__(*args, **kwargs)
@app.task(base=MonitoredTask)
def monitored_task():
# μμ
μν
pass
β
νΉμ§:
- μ컀 μλͺ μ£ΌκΈ° κ΄λ¦¬
- λ©νΈλ¦ μμ§
- λͺ¨λν°λ§ ν΅ν©
- μκ·Έλ μ²λ¦¬
- μ컀 컨νΈλ‘€ λͺ λ Ή
Celeryλ₯Ό μ¬μ©ν μ€μ μ ν리μΌμ΄μ
ꡬν μμ λ₯Ό μ΄ν΄λ³΄μ.
from celery import group
from typing import List, Dict
import smtplib
from email.mime.text import MIMEText
@app.task(rate_limit='100/m') # λΆλΉ 100κ° μ ν
def send_email(to_email: str, subject: str, body: str) -> bool:
try:
msg = MIMEText(body)
msg['Subject'] = subject
msg['To'] = to_email
with smtplib.SMTP('smtp.gmail.com', 587) as server:
server.starttls()
server.login('[email protected]', 'password')
server.send_message(msg)
return True
except Exception as e:
print(f"Failed to send email: {e}")
return False
def send_bulk_emails(email_list: List[Dict]):
# λ³λ ¬ μ²λ¦¬λ‘ μ΄λ©μΌ λ°μ‘
tasks = group(
send_email.s(
email['to'],
email['subject'],
email['body']
) for email in email_list
)
result = tasks.apply_async()
return result
import os
from PIL import Image
from typing import Tuple
@app.task
def process_image(image_path: str, size: Tuple[int, int]) -> str:
try:
with Image.open(image_path) as img:
# μ΄λ―Έμ§ 리μ¬μ΄μ§
img.thumbnail(size)
# μ μ₯ κ²½λ‘ μμ±
filename = os.path.basename(image_path)
output_path = f"processed_{filename}"
# μ²λ¦¬λ μ΄λ―Έμ§ μ μ₯
img.save(output_path)
return output_path
except Exception as e:
print(f"Image processing failed: {e}")
raise
class ImageProcessor:
def __init__(self):
self.processing_queue = []
def add_image(self, image_path: str, size: Tuple[int, int]):
task = process_image.delay(image_path, size)
self.processing_queue.append(task)
def get_results(self):
return [task.get() for task in self.processing_queue if task.ready()]
from celery import chord
import pandas as pd
import numpy as np
from typing import List, Dict, Any
@app.task
def extract_data(source: str) -> pd.DataFrame:
"""λ°μ΄ν° μμ€μμ λ°μ΄ν° μΆμΆ"""
if source.endswith('.csv'):
return pd.read_csv(source)
elif source.endswith('.json'):
return pd.read_json(source)
else:
raise ValueError(f"Unsupported source format: {source}")
@app.task
def transform_data(df: pd.DataFrame) -> pd.DataFrame:
"""λ°μ΄ν° λ³ν λ° μ μ²λ¦¬"""
# κ²°μΈ‘μΉ μ²λ¦¬
df = df.fillna(0)
# νΉμ± μμ§λμ΄λ§
if 'date' in df.columns:
df['year'] = pd.to_datetime(df['date']).dt.year
df['month'] = pd.to_datetime(df['date']).dt.month
# μ΄μμΉ μ²λ¦¬
for col in df.select_dtypes(include=[np.number]).columns:
q1 = df[col].quantile(0.25)
q3 = df[col].quantile(0.75)
iqr = q3 - q1
df[col] = df[col].clip(q1 - 1.5 * iqr, q3 + 1.5 * iqr)
return df
@app.task
def load_data(df: pd.DataFrame, destination: str) -> bool:
"""μ²λ¦¬λ λ°μ΄ν° μ μ₯"""
if destination.endswith('.csv'):
df.to_csv(destination, index=False)
elif destination.endswith('.json'):
df.to_json(destination, orient='records')
else:
raise ValueError(f"Unsupported destination format: {destination}")
return True
@app.task
def notify_completion(results: List[Any]) -> Dict:
"""λͺ¨λ μμ
μλ£ ν μλ¦Ό"""
return {
'status': 'completed',
'processed_files': len(results),
'success_count': sum(1 for r in results if r)
}
def process_data_pipeline(sources: List[str], destination_prefix: str):
"""ETL νμ΄νλΌμΈ μ€ν"""
# λ³λ ¬ μΆμΆ λ° λ³ν μμ
tasks = []
for i, source in enumerate(sources):
destination = f"{destination_prefix}_{i}.csv"
# κ°λ³ ETL νμ΄νλΌμΈ
pipeline = chain(
extract_data.s(source),
transform_data.s(),
load_data.s(destination)
)
tasks.append(pipeline)
# chordλ₯Ό μ¬μ©νμ¬ λͺ¨λ νμ΄νλΌμΈ μλ£ ν μλ¦Ό
workflow = chord(tasks)(notify_completion.s())
return workflow
β
νΉμ§:
- λλ μμ μ λ³λ ¬ μ²λ¦¬
- μμ μλ μ ν
- 볡μ‘ν μν¬νλ‘μ° κ΅¬μ±
- μ΄λ²€νΈ κΈ°λ° μ²λ¦¬
- μμ ν¨μ¨μ μ¬μ©
- νμ₯μ± μλ μ€κ³
볡μ‘ν λΆμ° μμ€ν
μ μν κ³ κΈ Celery ν¨ν΄μ΄λ€.
# μ°μ μμ ν μ€μ
app.conf.task_routes = {
'tasks.high_priority': {'queue': 'high_priority'},
'tasks.default_priority': {'queue': 'default'},
'tasks.low_priority': {'queue': 'low_priority'},
}
@app.task(queue='high_priority')
def high_priority(data):
"""κΈ΄κΈ μ²λ¦¬κ° νμν μμ
"""
# κ³ μ°μ μμ μμ
μ²λ¦¬
return "High priority task completed"
@app.task(queue='default')
def default_priority(data):
"""μΌλ° μμ
"""
# μΌλ° μμ
μ²λ¦¬
return "Default task completed"
@app.task(queue='low_priority')
def low_priority(data):
"""λ°°κ²½ μμ
"""
# μ μ°μ μμ μμ
μ²λ¦¬
return "Low priority task completed"
# μ컀 μμ λͺ
λ Ήμ΄ μμ:
# celery -A tasks worker -Q high_priority,default,low_priority -l info
# celeryconfig.py
worker_concurrency = 8 # CPU μ½μ΄ μμ λ§μΆ€
worker_prefetch_multiplier = 1 # μμ
λΉ νλμ©λ§ κ°μ Έμ€λλ‘ μ€μ
worker_max_tasks_per_child = 1000 # λ©λͺ¨λ¦¬ λμ λ°©μ§
task_time_limit = 3600 # 1μκ° μ ν
task_soft_time_limit = 3000 # μννΈ μ ν 50λΆ
# μμ
μ’
λ₯λ³ μ΅μ ν
task_annotations = {
'tasks.cpu_intensive': {'pool': 'solo'}, # CPU μμ
μ λ³λ νλ‘μΈμ€
'tasks.io_intensive': {'pool': 'gevent', 'rate_limit': '100/m'}, # I/O μμ
μ gevent
}
# λ°λλ ν° ν μ€μ
task_reject_on_worker_lost = True
task_acks_late = True
from celery import group, chain, chord, signature
@app.task
def analysis_task(data_chunk):
# λ°μ΄ν° λΆμ μμ
return {'chunk_id': data_chunk['id'], 'result': len(data_chunk['data'])}
@app.task
def reduce_results(results):
# κ²°κ³Ό μ§κ³
total = sum(result['result'] for result in results)
return {'total_count': total}
def map_reduce_workflow(data_chunks):
"""맡리λμ€ ν¨ν΄ ꡬν"""
# λ§΅ λ¨κ³ (λ³λ ¬ μ²λ¦¬)
map_tasks = group(analysis_task.s(chunk) for chunk in data_chunks)
# 리λμ€ λ¨κ³ (κ²°κ³Ό μ§κ³)
workflow = chord(map_tasks)(reduce_results.s())
return workflow
# λμ μν¬νλ‘μ° μμ±
def create_dynamic_workflow(initial_data):
"""μ€ν μμ μ λμ μΌλ‘ μν¬νλ‘μ° μμ±"""
if initial_data['type'] == 'simple':
return simple_task.s(initial_data)
elif initial_data['type'] == 'complex':
# 볡μ‘ν μ²΄μΈ μμ±
tasks = [initial_task.s(initial_data)]
# 쑰건μ λ°λΌ λ€λ₯Έ νμ€ν¬ μΆκ°
if initial_data.get('needs_validation'):
tasks.append(validation_task.s())
if initial_data.get('needs_processing'):
tasks.append(processing_task.s())
# κ²°κ³Ό νμ μ§μ
tasks.append(format_result.s())
# λμ μ²΄μΈ μμ±
return chain(*tasks)
else:
raise ValueError(f"Unknown workflow type: {initial_data['type']}")
β
νΉμ§:
- μ°μ μμ κΈ°λ° μμ μ²λ¦¬
- 리μμ€λ³ μ컀 μ΅μ ν
- 볡μ‘ν μν¬νλ‘μ° ν¨ν΄
- 맡리λμ€ κ΅¬ν
- λμ μν¬νλ‘μ° κ΅¬μ±
νλ‘λμ
νκ²½μμ Celeryλ₯Ό μμ μ μΌλ‘ μ΄μνκΈ° μν ꡬμ±μ΄λ€.
# supervisord.conf
[program:celery]
command=/path/to/venv/bin/celery -A tasks worker --loglevel=INFO
directory=/path/to/project
user=celery
numprocs=1
stdout_logfile=/var/log/celery/worker.log
stderr_logfile=/var/log/celery/worker.log
autostart=true
autorestart=true
startsecs=10
stopasgroup=true
priority=999
[program:celerybeat]
command=/path/to/venv/bin/celery -A tasks beat --loglevel=INFO
directory=/path/to/project
user=celery
numprocs=1
stdout_logfile=/var/log/celery/beat.log
stderr_logfile=/var/log/celery/beat.log
autostart=true
autorestart=true
startsecs=10
stopasgroup=true
priority=999
[group:celery-cluster]
programs=celery,celerybeat
priority=999
# celeryconfig_prod.py
import os
from kombu.common import Broadcast, Queue
# λΈλ‘컀 λ° λ°±μλ μ€μ
broker_url = os.environ.get('CELERY_BROKER_URL', 'redis://redis:6379/0')
result_backend = os.environ.get('CELERY_RESULT_BACKEND', 'redis://redis:6379/0')
# 보μ μ€μ
task_serializer = 'json'
accept_content = ['json']
result_serializer = 'json'
enable_utc = True
# ν μ€μ
task_default_queue = 'default'
task_queues = (
Queue('default', routing_key='task.#'),
Queue('high_priority', routing_key='high_task.#'),
Queue('low_priority', routing_key='low_task.#'),
Broadcast('broadcast'), # λΈλ‘λμΊμ€νΈ ν
)
# λΌμ°ν
μ€μ
task_routes = {
'tasks.critical_task': {'queue': 'high_priority', 'routing_key': 'high_task.critical'},
'tasks.background_task': {'queue': 'low_priority', 'routing_key': 'low_task.background'},
}
# μ컀 μ€μ
worker_concurrency = int(os.environ.get('CELERY_CONCURRENCY', 8))
worker_prefetch_multiplier = 1
worker_max_tasks_per_child = 1000
# λ‘κΉ
μ€μ
worker_hijack_root_logger = False
worker_log_format = '[%(asctime)s: %(levelname)s/%(processName)s] %(message)s'
worker_task_log_format = '[%(asctime)s: %(levelname)s/%(processName)s][%(task_name)s(%(task_id)s)] %(message)s'
# μλ¬ μ²λ¦¬
task_acks_late = True
task_reject_on_worker_lost = True
task_acks_on_failure_or_timeout = False
# λͺ¨λν°λ§
worker_send_task_events = True
task_send_sent_event = True
# docker-compose.yml
version: '3.8'
services:
redis:
image: redis:6-alpine
ports:
- "6379:6379"
volumes:
- redis-data:/data
restart: unless-stopped
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 30s
timeout: 10s
retries: 3
celery-worker:
build: .
command: celery -A tasks worker --loglevel=INFO
volumes:
- .:/app
environment:
- CELERY_BROKER_URL=redis://redis:6379/0
- CELERY_RESULT_BACKEND=redis://redis:6379/0
- CELERY_CONCURRENCY=8
depends_on:
- redis
restart: unless-stopped
celery-beat:
build: .
command: celery -A tasks beat --loglevel=INFO
volumes:
- .:/app
environment:
- CELERY_BROKER_URL=redis://redis:6379/0
- CELERY_RESULT_BACKEND=redis://redis:6379/0
depends_on:
- redis
restart: unless-stopped
flower:
build: .
command: celery -A tasks flower --port=5555
ports:
- "5555:5555"
environment:
- CELERY_BROKER_URL=redis://redis:6379/0
- CELERY_RESULT_BACKEND=redis://redis:6379/0
depends_on:
- redis
- celery-worker
restart: unless-stopped
volumes:
redis-data:
β
νΉμ§:
- κ³ κ°μ©μ± ꡬμ±
- 컨ν μ΄λνλ λ°°ν¬
- κ°μ λ° λ‘κΉ μ€μ
- ν λ° λΌμ°ν μ΅μ ν
- νμ₯ κ°λ₯ν μν€ν μ²
- 보μ μ€μ
- 리μμ€ κ΄λ¦¬
β
λͺ¨λ² μ¬λ‘:
- λ©±λ±μ± 보μ₯: νμ€ν¬λ μ¬λ¬ λ² μ€νν΄λ λμΌν κ²°κ³Όκ° λμ€λλ‘ μ€κ³
- μ μ ν ν λΆλ¦¬: μμ νΉμ±μ λ°λΌ νλ₯Ό λΆλ¦¬νμ¬ μμ κ²½μ λ°©μ§
- λ°μ΄ν° μ§λ ¬ν: νμ€ν¬ λ°μ΄ν°λ μ΅μνμΌλ‘ μ μ§νκ³ μ§λ ¬ν κ°λ₯ν΄μΌ ν¨
- μ€ν μκ° λͺ¨λν°λ§: μ€λ μ€νλλ νμ€ν¬λ λͺ¨λν°λ§νκ³ μκ° μ ν μ€μ
-
λ©λͺ¨λ¦¬ κ΄λ¦¬:
worker_max_tasks_per_child
λ‘ λ©λͺ¨λ¦¬ λμ λ°©μ§ - μ¬μλ μ λ΅: μΌμμ μ€λ₯λ μ§μ λ°±μ€νλ‘ μ¬μλ, μꡬμ μ€λ₯λ λ°λλ ν° νλ‘ μ΄λ
- νμ€ν¬ μ·¨μ μ²λ¦¬: νμ€ν¬ μ·¨μ μ 리μμ€ μ 리 λ‘μ§ κ΅¬ν
- λ‘κΉ μ λ΅: ꡬ쑰νλ λ‘κΉ μΌλ‘ νμ€ν¬ μΆμ μ©μ΄νκ² μ€μ
- λ°±μλ μ ν: λ°μ΄ν° μμ λ°λΌ μ μ ν κ²°κ³Ό λ°±μλ μ ν (Redis, RabbitMQ, λ°μ΄ν°λ² μ΄μ€)
- λΈλ‘컀 κ³ κ°μ©μ±: μ€μ μμ€ν μ λΈλ‘컀 ν΄λ¬μ€ν°λ§ ꡬμ±
- μ컀 νμ₯: λ‘λμ λ°λΌ μ컀 μλ₯Ό λμ μΌλ‘ μ‘°μ νλ λ©μ»€λμ¦ κ΅¬ν
- λͺ¨λν°λ§ λꡬ νμ©: Flower, Prometheus λ±μΌλ‘ μ€μκ° λͺ¨λν°λ§
- μ£ΌκΈ°μ μ 리: μλ£λ νμ€ν¬ κ²°κ³Ό μ κΈ°μ μΌλ‘ μ 리
- νμ€ν¬ νμμμ: λͺ¨λ νμ€ν¬μ νμμμ μ€μ
- 보μ κ³ λ €: λΈλ‘컀 λ° κ²°κ³Ό λ°±μλμ μΈμ¦ μ€μ
</rewritten_file>