KR_RabbitMQ - somaz94/python-study GitHub Wiki

Python RabbitMQ κ°œλ… 정리


1️⃣ κΈ°λ³Έ μ—°κ²°

RabbitMQλŠ” κ°•λ ₯ν•œ μ˜€ν”ˆμ†ŒμŠ€ λ©”μ‹œμ§€ 브둜컀둜, AMQP ν”„λ‘œν† μ½œμ„ 기반으둜 μ•ˆμ •μ μΈ λ©”μ‹œμ§€ νμž‰ μ‹œμŠ€ν…œμ„ μ œκ³΅ν•œλ‹€. Pythonμ—μ„œλŠ” pika 라이브러리λ₯Ό 톡해 μ‰½κ²Œ μ—°κ²°ν•˜κ³  μ‚¬μš©ν•  수 μžˆλ‹€.

import pika
from typing import Optional, Callable, Dict, Any
import json
import logging
import time
from contextlib import contextmanager

class RabbitMQClient:
    """RabbitMQ μ„œλ²„μ™€μ˜ 연결을 κ΄€λ¦¬ν•˜λŠ” ν΄λΌμ΄μ–ΈνŠΈ 클래슀"""
    
    def __init__(
        self,
        host: str = 'localhost',
        port: int = 5672,
        username: str = 'guest',
        password: str = 'guest',
        virtual_host: str = '/',
        connection_attempts: int = 3,
        retry_delay: int = 5,
        heartbeat: int = 600
    ):
        """
        RabbitMQ ν΄λΌμ΄μ–ΈνŠΈ μ΄ˆκΈ°ν™”
        
        Args:
            host: RabbitMQ μ„œλ²„ 호슀트
            port: RabbitMQ μ„œλ²„ 포트
            username: 인증 μ‚¬μš©μž 이름
            password: 인증 λΉ„λ°€λ²ˆν˜Έ
            virtual_host: 가상 호슀트
            connection_attempts: μ—°κ²° μ‹œλ„ 횟수
            retry_delay: μž¬μ‹œλ„ 간격(초)
            heartbeat: ν•˜νŠΈλΉ„νŠΈ 간격(초)
        """
        self.logger = logging.getLogger(__name__)
        
        # 인증 정보 μ„€μ •
        self.credentials = pika.PlainCredentials(username, password)
        
        # μ—°κ²° νŒŒλΌλ―Έν„° μ„€μ •
        self.parameters = pika.ConnectionParameters(
            host=host,
            port=port,
            virtual_host=virtual_host,
            credentials=self.credentials,
            connection_attempts=connection_attempts,
            retry_delay=retry_delay,
            heartbeat=heartbeat,
            socket_timeout=10
        )
        
        self.connection = None
        self.channel = None
    
    def create_connection(self):
        """
        RabbitMQ μ„œλ²„μ™€μ˜ μƒˆ μ—°κ²° 생성
        
        Returns:
            BlockingConnection: RabbitMQ μ—°κ²° 객체
        """
        try:
            self.logger.info("RabbitMQ μ„œλ²„μ— μ—°κ²° 쀑...")
            connection = pika.BlockingConnection(self.parameters)
            self.logger.info("RabbitMQ μ„œλ²„μ— 연결됨")
            self.connection = connection
            return connection
        except pika.exceptions.AMQPConnectionError as e:
            self.logger.error(f"RabbitMQ μ—°κ²° μ‹€νŒ¨: {e}")
            raise
    
    def create_channel(self):
        """
        RabbitMQ 채널 생성
        
        Returns:
            BlockingChannel: RabbitMQ 채널 객체
        """
        if not self.connection or self.connection.is_closed:
            self.create_connection()
            
        self.channel = self.connection.channel()
        return self.channel
    
    def close(self):
        """μ—°κ²° 및 채널 λ‹«κΈ°"""
        if self.channel and self.channel.is_open:
            self.logger.info("RabbitMQ 채널 λ‹«λŠ” 쀑...")
            self.channel.close()
            
        if self.connection and self.connection.is_open:
            self.logger.info("RabbitMQ μ—°κ²° λ‹«λŠ” 쀑...")
            self.connection.close()
    
    @contextmanager
    def connection_context(self):
        """
        μ»¨ν…μŠ€νŠΈ λ§€λ‹ˆμ €λ₯Ό ν†΅ν•œ RabbitMQ μ—°κ²° 관리
        
        μ‚¬μš© 예:
        ```
        with client.connection_context() as channel:
            channel.queue_declare(queue='hello')
            channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
        ```
        """
        connection = None
        channel = None
        try:
            connection = self.create_connection()
            channel = connection.channel()
            yield channel
        finally:
            if channel and channel.is_open:
                channel.close()
            if connection and connection.is_open:
                connection.close()
    
    def check_queue_exists(self, queue_name: str) -> bool:
        """
        큐가 μ‘΄μž¬ν•˜λŠ”μ§€ 확인
        
        Args:
            queue_name: 확인할 큐 이름
            
        Returns:
            bool: 큐 쑴재 μ—¬λΆ€
        """
        with self.connection_context() as channel:
            try:
                channel.queue_declare(queue=queue_name, passive=True)
                return True
            except pika.exceptions.ChannelClosedByBroker:
                return False

βœ… νŠΉμ§•:

  • μœ μ—°ν•œ μ—°κ²° μ„€μ • 및 νŒŒλΌλ―Έν„° ꡬ성
  • 인증 정보 관리 및 λ³΄μ•ˆ μ—°κ²°
  • μž¬μ‹œλ„ λ‘œμ§μ„ ν†΅ν•œ 내결함성 제곡
  • μ»¨ν…μŠ€νŠΈ λ§€λ‹ˆμ €λ₯Ό ν†΅ν•œ μžμ› 관리
  • 채널 생성 및 관리
  • λ‘œκΉ…μ„ ν†΅ν•œ μ—°κ²° μƒνƒœ 좔적
  • 가상 호슀트 지원
  • νƒ€μž… νžŒνŒ…μ„ ν†΅ν•œ μ½”λ“œ 가독성 ν–₯상


2️⃣ κΈ°λ³Έ λ©”μ‹œμ§•

RabbitMQμ—μ„œ λ©”μ‹œμ§€ μ£Όκ³ λ°›κΈ°λŠ” ν”„λ‘œλ“€μ„œμ™€ μ»¨μŠˆλ¨ΈλΌλŠ” 두 핡심 μ»΄ν¬λ„ŒνŠΈλ₯Ό 톡해 이루어진닀. ν”„λ‘œλ“€μ„œλŠ” λ©”μ‹œμ§€λ₯Ό λ°œν–‰ν•˜κ³ , μ»¨μŠˆλ¨ΈλŠ” λ©”μ‹œμ§€λ₯Ό μ†ŒλΉ„ν•œλ‹€.

class RabbitMQProducer:
    """RabbitMQ λ©”μ‹œμ§€ λ°œν–‰μ„ λ‹΄λ‹Ήν•˜λŠ” ν”„λ‘œλ“€μ„œ 클래슀"""
    
    def __init__(self, client: RabbitMQClient):
        """
        RabbitMQ ν”„λ‘œλ“€μ„œ μ΄ˆκΈ°ν™”
        
        Args:
            client: RabbitMQ ν΄λΌμ΄μ–ΈνŠΈ μΈμŠ€ν„΄μŠ€
        """
        self.client = client
        self.logger = logging.getLogger(__name__)
    
    def publish(
        self,
        queue_name: str,
        message: Dict[str, Any],
        durable: bool = True,
        exchange: str = '',
        routing_key: str = None,
        properties: pika.BasicProperties = None,
        mandatory: bool = False
    ):
        """
        λ©”μ‹œμ§€ λ°œν–‰
        
        Args:
            queue_name: 큐 이름
            message: λ°œν–‰ν•  λ©”μ‹œμ§€ (λ”•μ…”λ„ˆλ¦¬)
            durable: 큐 지속성 μ—¬λΆ€
            exchange: κ΅ν™˜κΈ° 이름
            routing_key: λΌμš°νŒ… ν‚€ (μ§€μ •ν•˜μ§€ μ•ŠμœΌλ©΄ queue_name μ‚¬μš©)
            properties: λ©”μ‹œμ§€ 속성
            mandatory: λ©”μ‹œμ§€ 전달 보μž₯ μ—¬λΆ€
        
        Returns:
            bool: λ©”μ‹œμ§€ λ°œν–‰ 성곡 μ—¬λΆ€
        """
        if routing_key is None:
            routing_key = queue_name
            
        if properties is None:
            # κΈ°λ³Έ 속성 μ„€μ • - λ©”μ‹œμ§€ 지속성 보μž₯
            properties = pika.BasicProperties(
                delivery_mode=2,  # λ©”μ‹œμ§€ 지속성
                content_type='application/json',
                timestamp=int(time.time()),
                message_id=f"{int(time.time())}-{id(message)}"
            )
        
        try:
            with self.client.connection_context() as channel:
                # 큐 μ„ μ–Έ
                channel.queue_declare(queue=queue_name, durable=durable)
                
                # λ©”μ‹œμ§€ λ°œν–‰
                channel.basic_publish(
                    exchange=exchange,
                    routing_key=routing_key,
                    body=json.dumps(message),
                    properties=properties,
                    mandatory=mandatory
                )
                
                self.logger.info(f"λ©”μ‹œμ§€ λ°œν–‰ 성곡: queue={queue_name}, routing_key={routing_key}")
                return True
                
        except Exception as e:
            self.logger.error(f"λ©”μ‹œμ§€ λ°œν–‰ μ‹€νŒ¨: {e}")
            return False
    
    def publish_batch(
        self,
        queue_name: str,
        messages: list,
        batch_size: int = 100,
        durable: bool = True
    ):
        """
        배치 λ©”μ‹œμ§€ λ°œν–‰
        
        Args:
            queue_name: 큐 이름
            messages: λ°œν–‰ν•  λ©”μ‹œμ§€ λͺ©λ‘
            batch_size: 배치 크기
            durable: 큐 지속성 μ—¬λΆ€
            
        Returns:
            dict: 성곡/μ‹€νŒ¨ λ©”μ‹œμ§€ 수
        """
        with self.client.connection_context() as channel:
            # 큐 μ„ μ–Έ
            channel.queue_declare(queue=queue_name, durable=durable)
            
            success_count = 0
            failure_count = 0
            
            for i in range(0, len(messages), batch_size):
                batch = messages[i:i+batch_size]
                
                for message in batch:
                    try:
                        # λ©”μ‹œμ§€ λ°œν–‰
                        channel.basic_publish(
                            exchange='',
                            routing_key=queue_name,
                            body=json.dumps(message),
                            properties=pika.BasicProperties(
                                delivery_mode=2,  # λ©”μ‹œμ§€ 지속성
                                content_type='application/json'
                            )
                        )
                        success_count += 1
                    except Exception as e:
                        self.logger.error(f"배치 λ©”μ‹œμ§€ λ°œν–‰ μ‹€νŒ¨: {e}")
                        failure_count += 1
            
            return {
                'total': len(messages),
                'success': success_count,
                'failure': failure_count
            }

class RabbitMQConsumer:
    """RabbitMQ λ©”μ‹œμ§€ μ†ŒλΉ„λ₯Ό λ‹΄λ‹Ήν•˜λŠ” 컨슈머 클래슀"""
    
    def __init__(self, client: RabbitMQClient):
        """
        RabbitMQ 컨슈머 μ΄ˆκΈ°ν™”
        
        Args:
            client: RabbitMQ ν΄λΌμ΄μ–ΈνŠΈ μΈμŠ€ν„΄μŠ€
        """
        self.client = client
        self.logger = logging.getLogger(__name__)
        self.should_stop = False
    
    def consume(
        self,
        queue_name: str,
        callback: Callable,
        durable: bool = True,
        prefetch_count: int = 1,
        auto_ack: bool = False
    ):
        """
        λ©”μ‹œμ§€ μ†ŒλΉ„
        
        Args:
            queue_name: μ†ŒλΉ„ν•  큐 이름
            callback: λ©”μ‹œμ§€ 처리 콜백 ν•¨μˆ˜
            durable: 큐 지속성 μ—¬λΆ€
            prefetch_count: ν•œ λ²ˆμ— κ°€μ Έμ˜¬ λ©”μ‹œμ§€ 수
            auto_ack: μžλ™ 확인 μ—¬λΆ€
        """
        def wrapped_callback(ch, method, properties, body):
            """
            콜백 래퍼 ν•¨μˆ˜
            
            Args:
                ch: 채널
                method: 전달 λ©”μ„œλ“œ
                properties: λ©”μ‹œμ§€ 속성
                body: λ©”μ‹œμ§€ λ³Έλ¬Έ
            """
            try:
                message = json.loads(body)
                self.logger.info(f"λ©”μ‹œμ§€ μˆ˜μ‹ : {queue_name}")
                
                # μ‚¬μš©μž μ •μ˜ 콜백 μ‹€ν–‰
                result = callback(ch, method, properties, message)
                
                # μžλ™ 확인이 μ•„λ‹Œ 경우 λ©”μ‹œμ§€ 확인
                if not auto_ack:
                    ch.basic_ack(delivery_tag=method.delivery_tag)
                
                return result
                
            except json.JSONDecodeError as e:
                self.logger.error(f"λ©”μ‹œμ§€ ν˜•μ‹ 였λ₯˜: {e}")
                # λ©”μ‹œμ§€ ν˜•μ‹ 였λ₯˜ μ‹œ κ±°λΆ€ (μž¬νμž‰ μ—†μŒ)
                if not auto_ack:
                    ch.basic_reject(delivery_tag=method.delivery_tag, requeue=False)
            except Exception as e:
                self.logger.error(f"λ©”μ‹œμ§€ 처리 였λ₯˜: {e}")
                # 기타 였λ₯˜ μ‹œ μž¬νμž‰
                if not auto_ack:
                    ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
        
        # μ—°κ²° 및 채널 생성
        connection = self.client.create_connection()
        channel = connection.channel()
        
        try:
            # 큐 μ„ μ–Έ
            channel.queue_declare(queue=queue_name, durable=durable)
            
            # QoS μ„€μ •
            channel.basic_qos(prefetch_count=prefetch_count)
            
            # 콜백 μ„€μ •
            channel.basic_consume(
                queue=queue_name,
                on_message_callback=wrapped_callback,
                auto_ack=auto_ack
            )
            
            self.logger.info(f"큐 {queue_name}μ—μ„œ λ©”μ‹œμ§€ λŒ€κΈ° 쀑...")
            
            # λ©”μ‹œμ§€ μ†ŒλΉ„ μ‹œμž‘
            while not self.should_stop:
                connection.process_data_events(time_limit=1)  # 이벀트 처리
                
        except KeyboardInterrupt:
            self.logger.info("컨슈머 쀑지 μš”μ²­")
            self.should_stop = True
        except Exception as e:
            self.logger.error(f"컨슈머 였λ₯˜: {e}")
        finally:
            self.logger.info("컨슈머 μ’…λ£Œ")
            if channel and channel.is_open:
                channel.close()
            if connection and connection.is_open:
                connection.close()
    
    def stop(self):
        """컨슈머 쀑지"""
        self.should_stop = True

# μ‚¬μš© 예제
def message_handler(ch, method, properties, message):
    """
    λ©”μ‹œμ§€ 처리 ν•Έλ“€λŸ¬ ν•¨μˆ˜
    
    Args:
        ch: 채널
        method: 전달 λ©”μ„œλ“œ
        properties: λ©”μ‹œμ§€ 속성
        message: λ””μ½”λ”©λœ λ©”μ‹œμ§€ (dict)
    """
    print(f"λ©”μ‹œμ§€ 처리: {message}")
    # λ©”μ‹œμ§€ 처리 둜직...
    return True

# 예제 μ‹€ν–‰
if __name__ == "__main__":
    # λ‘œκΉ… μ„€μ •
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    )
    
    # ν΄λΌμ΄μ–ΈνŠΈ 생성
    client = RabbitMQClient(
        host='localhost',
        port=5672,
        username='guest',
        password='guest'
    )
    
    # ν”„λ‘œλ“€μ„œ 생성 및 λ©”μ‹œμ§€ λ°œν–‰
    producer = RabbitMQProducer(client)
    producer.publish(
        queue_name='example_queue',
        message={'data': 'Hello RabbitMQ!', 'timestamp': time.time()}
    )
    
    # 컨슈머 생성 및 λ©”μ‹œμ§€ μ†ŒλΉ„
    consumer = RabbitMQConsumer(client)
    try:
        consumer.consume(
            queue_name='example_queue',
            callback=message_handler,
            prefetch_count=10
        )
    except KeyboardInterrupt:
        consumer.stop()

βœ… νŠΉμ§•:

  • κ°•λ ₯ν•œ λ©”μ‹œμ§€ λ°œν–‰ 및 μ†ŒλΉ„ κΈ°λŠ₯
  • μ»¨ν…μŠ€νŠΈ λ§€λ‹ˆμ €λ₯Ό ν†΅ν•œ μžμ› 관리
  • 배치 처리 μ§€μ›μœΌλ‘œ μ„±λŠ₯ μ΅œμ ν™”
  • λ©”μ‹œμ§€ ν˜•μ‹ 검증 및 였λ₯˜ 처리
  • QoS 섀정을 ν†΅ν•œ λΆ€ν•˜ 관리
  • λ©”μ‹œμ§€ 확인 및 κ±°λΆ€ λ©”μ»€λ‹ˆμ¦˜
  • μž¬μ‹œλ„ 둜직과 내결함성
  • λ‘œκΉ…μ„ ν†΅ν•œ λ©”μ‹œμ§€ μƒνƒœ 좔적
  • νƒ€μž… νžŒνŒ…μœΌλ‘œ μ½”λ“œ 가독성 및 μ•ˆμ •μ„± ν–₯상
  • 컨슈머 μš°μ•„ν•œ μ’…λ£Œ 지원


3️⃣ κ³ κΈ‰ λ©”μ‹œμ§• νŒ¨ν„΄

RabbitMQλŠ” λ‹¨μˆœν•œ μ λŒ€μ  λ©”μ‹œμ§• μ΄μƒμ˜ λ‹€μ–‘ν•œ κ³ κΈ‰ λ©”μ‹œμ§• νŒ¨ν„΄μ„ μ œκ³΅ν•œλ‹€. κ΅ν™˜κΈ°(Exchange)λŠ” λ©”μ‹œμ§€ λΌμš°νŒ…μ˜ 핡심 ꡬ성 μš”μ†Œλ‘œ, μ—¬λŸ¬ μœ ν˜•μ„ 톡해 λ³΅μž‘ν•œ λ©”μ‹œμ§• μš”κ΅¬μ‚¬ν•­μ„ κ΅¬ν˜„ν•  수 μžˆλ‹€.

class RabbitMQExchange:
    """RabbitMQ κ΅ν™˜κΈ°λ₯Ό κ΄€λ¦¬ν•˜λŠ” 클래슀"""
    
    def __init__(self, client: RabbitMQClient):
        """
        RabbitMQ κ΅ν™˜κΈ° κ΄€λ¦¬μž μ΄ˆκΈ°ν™”
        
        Args:
            client: RabbitMQ ν΄λΌμ΄μ–ΈνŠΈ μΈμŠ€ν„΄μŠ€
        """
        self.client = client
        self.logger = logging.getLogger(__name__)
    
    def setup_fanout(self, exchange_name: str, durable: bool = True):
        """
        νŒ¬μ•„μ›ƒ κ΅ν™˜κΈ° μ„€μ • - λͺ¨λ“  큐에 λ©”μ‹œμ§€ λΈŒλ‘œλ“œμΊμŠ€νŠΈ
        
        Args:
            exchange_name: κ΅ν™˜κΈ° 이름
            durable: κ΅ν™˜κΈ° 지속성 μ—¬λΆ€
            
        Returns:
            pika.channel.Channel: μ„€μ •λœ 채널
        """
        with self.client.connection_context() as channel:
            channel.exchange_declare(
                exchange=exchange_name,
                exchange_type='fanout',
                durable=durable
            )
            self.logger.info(f"νŒ¬μ•„μ›ƒ κ΅ν™˜κΈ° '{exchange_name}' μ„€μ • μ™„λ£Œ")
            return channel
    
    def setup_direct(self, exchange_name: str, durable: bool = True):
        """
        λ‹€μ΄λ ‰νŠΈ κ΅ν™˜κΈ° μ„€μ • - λΌμš°νŒ… ν‚€ 기반 λ©”μ‹œμ§€ 전달
        
        Args:
            exchange_name: κ΅ν™˜κΈ° 이름
            durable: κ΅ν™˜κΈ° 지속성 μ—¬λΆ€
            
        Returns:
            pika.channel.Channel: μ„€μ •λœ 채널
        """
        with self.client.connection_context() as channel:
            channel.exchange_declare(
                exchange=exchange_name,
                exchange_type='direct',
                durable=durable
            )
            self.logger.info(f"λ‹€μ΄λ ‰νŠΈ κ΅ν™˜κΈ° '{exchange_name}' μ„€μ • μ™„λ£Œ")
            return channel
    
    def setup_topic(self, exchange_name: str, durable: bool = True):
        """
        ν† ν”½ κ΅ν™˜κΈ° μ„€μ • - νŒ¨ν„΄ λ§€μΉ­ 기반 λ©”μ‹œμ§€ 전달
        
        Args:
            exchange_name: κ΅ν™˜κΈ° 이름
            durable: κ΅ν™˜κΈ° 지속성 μ—¬λΆ€
            
        Returns:
            pika.channel.Channel: μ„€μ •λœ 채널
        """
        with self.client.connection_context() as channel:
            channel.exchange_declare(
                exchange=exchange_name,
                exchange_type='topic',
                durable=durable
            )
            self.logger.info(f"ν† ν”½ κ΅ν™˜κΈ° '{exchange_name}' μ„€μ • μ™„λ£Œ")
            return channel
    
    def setup_headers(self, exchange_name: str, durable: bool = True):
        """
        헀더 κ΅ν™˜κΈ° μ„€μ • - 헀더 속성 기반 λ©”μ‹œμ§€ 전달
        
        Args:
            exchange_name: κ΅ν™˜κΈ° 이름
            durable: κ΅ν™˜κΈ° 지속성 μ—¬λΆ€
            
        Returns:
            pika.channel.Channel: μ„€μ •λœ 채널
        """
        with self.client.connection_context() as channel:
            channel.exchange_declare(
                exchange=exchange_name,
                exchange_type='headers',
                durable=durable
            )
            self.logger.info(f"헀더 κ΅ν™˜κΈ° '{exchange_name}' μ„€μ • μ™„λ£Œ")
            return channel
    
    def bind_queue_to_exchange(
        self,
        queue_name: str,
        exchange_name: str,
        routing_key: str = '',
        headers: dict = None
    ):
        """
        큐λ₯Ό κ΅ν™˜κΈ°μ— 바인딩
        
        Args:
            queue_name: 큐 이름
            exchange_name: κ΅ν™˜κΈ° 이름
            routing_key: λΌμš°νŒ… ν‚€
            headers: 헀더 속성 (헀더 κ΅ν™˜κΈ°μš©)
            
        Returns:
            bool: 바인딩 성곡 μ—¬λΆ€
        """
        try:
            with self.client.connection_context() as channel:
                # 큐 μ„ μ–Έ
                channel.queue_declare(queue=queue_name, durable=True)
                
                # 바인딩 μ„€μ •
                if headers:
                    channel.queue_bind(
                        queue=queue_name,
                        exchange=exchange_name,
                        arguments=headers
                    )
                else:
                    channel.queue_bind(
                        queue=queue_name,
                        exchange=exchange_name,
                        routing_key=routing_key
                    )
                
                self.logger.info(f"큐 '{queue_name}'λ₯Ό κ΅ν™˜κΈ° '{exchange_name}'에 바인딩 μ™„λ£Œ")
                return True
                
        except Exception as e:
            self.logger.error(f"바인딩 μ‹€νŒ¨: {e}")
            return False
    
    def publish_to_exchange(
        self,
        exchange_name: str,
        routing_key: str,
        message: dict,
        headers: dict = None
    ):
        """
        κ΅ν™˜κΈ°μ— λ©”μ‹œμ§€ λ°œν–‰
        
        Args:
            exchange_name: κ΅ν™˜κΈ° 이름
            routing_key: λΌμš°νŒ… ν‚€
            message: λ°œν–‰ν•  λ©”μ‹œμ§€
            headers: 헀더 속성 (헀더 κ΅ν™˜κΈ°μš©)
            
        Returns:
            bool: λ°œν–‰ 성곡 μ—¬λΆ€
        """
        properties = pika.BasicProperties(
            delivery_mode=2,  # λ©”μ‹œμ§€ 지속성
            content_type='application/json',
            headers=headers
        )
        
        try:
            with self.client.connection_context() as channel:
                channel.basic_publish(
                    exchange=exchange_name,
                    routing_key=routing_key,
                    body=json.dumps(message),
                    properties=properties
                )
                
                self.logger.info(f"κ΅ν™˜κΈ° '{exchange_name}'에 λ©”μ‹œμ§€ λ°œν–‰ 성곡")
                return True
                
        except Exception as e:
            self.logger.error(f"κ΅ν™˜κΈ° λ©”μ‹œμ§€ λ°œν–‰ μ‹€νŒ¨: {e}")
            return False

# κ΅ν™˜κΈ° νŒ¨ν„΄ 예제
class ExchangePatterns:
    """RabbitMQ κ΅ν™˜κΈ° νŒ¨ν„΄ κ΅¬ν˜„ 예제"""
    
    def __init__(self, client: RabbitMQClient):
        self.client = client
        self.exchange_manager = RabbitMQExchange(client)
        self.logger = logging.getLogger(__name__)
    
    def setup_pub_sub(self, exchange_name: str):
        """
        λ°œν–‰-ꡬ독 νŒ¨ν„΄ μ„€μ • (νŒ¬μ•„μ›ƒ κ΅ν™˜κΈ° μ‚¬μš©)
        
        Args:
            exchange_name: κ΅ν™˜κΈ° 이름
            
        Returns:
            list: μƒμ„±λœ 큐 이름 λͺ©λ‘
        """
        # νŒ¬μ•„μ›ƒ κ΅ν™˜κΈ° μ„€μ •
        self.exchange_manager.setup_fanout(exchange_name)
        
        # μ—¬λŸ¬ 큐 생성 및 바인딩
        queues = []
        for i in range(3):
            queue_name = f"{exchange_name}_queue_{i}"
            with self.client.connection_context() as channel:
                result = channel.queue_declare(queue=queue_name, exclusive=True)
                queue_name = result.method.queue
                queues.append(queue_name)
                
                # 큐λ₯Ό κ΅ν™˜κΈ°μ— 바인딩
                channel.queue_bind(
                    exchange=exchange_name,
                    queue=queue_name
                )
                
        self.logger.info(f"λ°œν–‰-ꡬ독 νŒ¨ν„΄ μ„€μ • μ™„λ£Œ: {queues}")
        return queues
    
    def setup_routing(self, exchange_name: str, routing_keys: list):
        """
        λΌμš°νŒ… νŒ¨ν„΄ μ„€μ • (λ‹€μ΄λ ‰νŠΈ κ΅ν™˜κΈ° μ‚¬μš©)
        
        Args:
            exchange_name: κ΅ν™˜κΈ° 이름
            routing_keys: λΌμš°νŒ… ν‚€ λͺ©λ‘
            
        Returns:
            dict: λΌμš°νŒ… 킀별 큐 이름 λ§€ν•‘
        """
        # λ‹€μ΄λ ‰νŠΈ κ΅ν™˜κΈ° μ„€μ •
        self.exchange_manager.setup_direct(exchange_name)
        
        # λΌμš°νŒ… 킀별 큐 생성 및 바인딩
        routing_map = {}
        for key in routing_keys:
            queue_name = f"{exchange_name}_{key}"
            with self.client.connection_context() as channel:
                result = channel.queue_declare(queue=queue_name, exclusive=True)
                queue_name = result.method.queue
                routing_map[key] = queue_name
                
                # 큐λ₯Ό κ΅ν™˜κΈ°μ— 바인딩
                channel.queue_bind(
                    exchange=exchange_name,
                    queue=queue_name,
                    routing_key=key
                )
        
        self.logger.info(f"λΌμš°νŒ… νŒ¨ν„΄ μ„€μ • μ™„λ£Œ: {routing_map}")
        return routing_map
    
    def setup_topics(self, exchange_name: str, topic_patterns: list):
        """
        ν† ν”½ νŒ¨ν„΄ μ„€μ • (ν† ν”½ κ΅ν™˜κΈ° μ‚¬μš©)
        
        Args:
            exchange_name: κ΅ν™˜κΈ° 이름
            topic_patterns: ν† ν”½ νŒ¨ν„΄ λͺ©λ‘
            
        Returns:
            dict: ν† ν”½ νŒ¨ν„΄λ³„ 큐 이름 λ§€ν•‘
        """
        # ν† ν”½ κ΅ν™˜κΈ° μ„€μ •
        self.exchange_manager.setup_topic(exchange_name)
        
        # ν† ν”½ νŒ¨ν„΄λ³„ 큐 생성 및 바인딩
        topic_map = {}
        for pattern in topic_patterns:
            queue_name = f"{exchange_name}_{pattern.replace('.', '_').replace('*', 'star').replace('#', 'hash')}"
            with self.client.connection_context() as channel:
                result = channel.queue_declare(queue=queue_name, exclusive=True)
                queue_name = result.method.queue
                topic_map[pattern] = queue_name
                
                # 큐λ₯Ό κ΅ν™˜κΈ°μ— 바인딩
                channel.queue_bind(
                    exchange=exchange_name,
                    queue=queue_name,
                    routing_key=pattern
                )
        
        self.logger.info(f"ν† ν”½ νŒ¨ν„΄ μ„€μ • μ™„λ£Œ: {topic_map}")
        return topic_map

βœ… νŠΉμ§•:

  • λ‹€μ–‘ν•œ κ΅ν™˜κΈ° μœ ν˜• 지원 (νŒ¬μ•„μ›ƒ, λ‹€μ΄λ ‰νŠΈ, ν† ν”½, 헀더)
  • μœ μ—°ν•œ λ©”μ‹œμ§€ λΌμš°νŒ… ꡬ성
  • 바인딩을 ν†΅ν•œ λ©”μ‹œμ§€ 필터링
  • λ°œν–‰-ꡬ독 νŒ¨ν„΄ κ΅¬ν˜„
  • λΌμš°νŒ… νŒ¨ν„΄ κ΅¬ν˜„
  • ν† ν”½ 기반 νŒ¨ν„΄ λ§€μΉ­
  • 헀더 기반 λ©”μ‹œμ§€ λΌμš°νŒ…
  • μ»¨ν…μŠ€νŠΈ λ§€λ‹ˆμ €λ₯Ό ν†΅ν•œ μžμ› 관리
  • λ‘œκΉ…μ„ ν†΅ν•œ κ΅ν™˜κΈ° μž‘μ—… 좔적


4️⃣ λ©”μ‹œμ§€ μ²˜λ¦¬μ™€ 확인

λ©”μ‹œμ§€ μ²˜λ¦¬λŠ” RabbitMQ μ‹œμŠ€ν…œμ˜ 핡심 λΆ€λΆ„μœΌλ‘œ, μ•ˆμ •μ μΈ μ²˜λ¦¬μ™€ 였λ₯˜ 볡ꡬ λ©”μ»€λ‹ˆμ¦˜μ΄ ν•„μˆ˜μ μ΄λ‹€. μ μ ˆν•œ 확인(ack)κ³Ό κ±°λΆ€(nack) 처리λ₯Ό 톡해 λ©”μ‹œμ§€κ°€ μ†μ‹€λ˜μ§€ μ•Šλ„λ‘ 보μž₯ν•  수 μžˆλ‹€.

class MessageHandler:
    """RabbitMQ λ©”μ‹œμ§€ 처리 및 확인 관리 클래슀"""
    
    def __init__(self):
        self.logger = logging.getLogger(__name__)
    
    @staticmethod
    def process_message(
        channel,
        method,
        properties,
        body
    ):
        """
        λ©”μ‹œμ§€ 처리 κΈ°λ³Έ ν•Έλ“€λŸ¬
        
        Args:
            channel: RabbitMQ 채널
            method: 전달 λ©”μ„œλ“œ
            properties: λ©”μ‹œμ§€ 속성
            body: λ©”μ‹œμ§€ λ³Έλ¬Έ
        """
        try:
            # λ©”μ‹œμ§€ νŒŒμ‹±
            message = json.loads(body)
            print(f"Received message: {message}")
            
            # λ©”μ‹œμ§€ 처리 둜직 (μ˜ˆμ‹œ)
            # μ‹€μ œ κ΅¬ν˜„μ—μ„œλŠ” λΉ„μ¦ˆλ‹ˆμŠ€ λ‘œμ§μ— 맞게 처리
            processing_time = 0.5  # λ©”μ‹œμ§€ μ²˜λ¦¬μ— κ±Έλ¦¬λŠ” μ‹œκ°„ (μ˜ˆμ‹œ)
            time.sleep(processing_time)
            
            # λ©”μ‹œμ§€ 처리 성곡 μ‹œ 확인(ack)
            channel.basic_ack(delivery_tag=method.delivery_tag)
            print(f"처리 μ™„λ£Œ 및 확인(ack): {message.get('id', 'unknown')}")
            
        except json.JSONDecodeError as e:
            # JSON νŒŒμ‹± 였λ₯˜ - λ©”μ‹œμ§€ ν˜•μ‹μ΄ 잘λͺ»λœ 경우
            print(f"JSON νŒŒμ‹± 였λ₯˜: {e}")
            # ν˜•μ‹ 였λ₯˜λŠ” μž¬μ‹œλ„ν•΄λ„ λ™μΌν•˜λ―€λ‘œ μž¬νμž‰ν•˜μ§€ μ•ŠμŒ
            channel.basic_reject(
                delivery_tag=method.delivery_tag,
                requeue=False
            )
        except Exception as e:
            # 기타 처리 였λ₯˜ - μΌμ‹œμ μΈ 문제일 수 μžˆμœΌλ―€λ‘œ μž¬νμž‰
            print(f"λ©”μ‹œμ§€ 처리 였λ₯˜: {e}")
            channel.basic_nack(
                delivery_tag=method.delivery_tag,
                requeue=True
            )
    
    @staticmethod
    def process_with_retry(
        channel,
        method,
        properties,
        body,
        max_retries: int = 3,
        retry_exchange: str = None,
        retry_queue: str = None,
        dead_letter_exchange: str = None
    ):
        """
        μž¬μ‹œλ„ 둜직이 ν¬ν•¨λœ λ©”μ‹œμ§€ 처리 ν•Έλ“€λŸ¬
        
        Args:
            channel: RabbitMQ 채널
            method: 전달 λ©”μ„œλ“œ
            properties: λ©”μ‹œμ§€ 속성
            body: λ©”μ‹œμ§€ λ³Έλ¬Έ
            max_retries: μ΅œλŒ€ μž¬μ‹œλ„ 횟수
            retry_exchange: μž¬μ‹œλ„ κ΅ν™˜κΈ° 이름
            retry_queue: μž¬μ‹œλ„ 큐 이름
            dead_letter_exchange: λ°λ“œλ ˆν„° κ΅ν™˜κΈ° 이름
        """
        try:
            # ν—€λ”μ—μ„œ μž¬μ‹œλ„ 횟수 확인
            headers = properties.headers or {}
            retry_count = headers.get('x-retry-count', 0)
            
            # λ©”μ‹œμ§€ νŒŒμ‹±
            message = json.loads(body)
            print(f"처리 쀑 (μ‹œλ„ {retry_count+1}/{max_retries+1}): {message}")
            
            # λ©”μ‹œμ§€ 처리 둜직 (μ˜ˆμ‹œ)
            # μ‹€μ œ κ΅¬ν˜„μ—μ„œλŠ” λΉ„μ¦ˆλ‹ˆμŠ€ λ‘œμ§μ— 맞게 처리
            if random.random() < 0.3:  # 30% ν™•λ₯ λ‘œ 였λ₯˜ λ°œμƒ (μ˜ˆμ‹œ)
                raise Exception("랜덀 처리 였λ₯˜ (ν…ŒμŠ€νŠΈμš©)")
                
            # λ©”μ‹œμ§€ 처리 성곡 μ‹œ 확인(ack)
            channel.basic_ack(delivery_tag=method.delivery_tag)
            print(f"처리 μ™„λ£Œ: {message.get('id', 'unknown')}")
            
        except Exception as e:
            print(f"처리 였λ₯˜: {e}")
            
            # μ΅œλŒ€ μž¬μ‹œλ„ 횟수 초과 μ—¬λΆ€ 확인
            if retry_count >= max_retries:
                print(f"μ΅œλŒ€ μž¬μ‹œλ„ 횟수 초과: {message.get('id', 'unknown')}")
                
                # λ°λ“œλ ˆν„° κ΅ν™˜κΈ°κ°€ μ„€μ •λœ 경우 ν•΄λ‹Ή κ΅ν™˜κΈ°λ‘œ 전솑
                if dead_letter_exchange:
                    print(f"λ°λ“œλ ˆν„° 큐둜 이동: {message.get('id', 'unknown')}")
                    properties.headers = headers
                    properties.headers['x-error'] = str(e)
                    properties.headers['x-failed-at'] = time.time()
                    
                    channel.basic_publish(
                        exchange=dead_letter_exchange,
                        routing_key='dead_letter',
                        body=body,
                        properties=properties
                    )
                
                # 원본 λ©”μ‹œμ§€ 확인(ack)
                channel.basic_ack(delivery_tag=method.delivery_tag)
                
            else:
                # μž¬μ‹œλ„ 큐둜 λ‹€μ‹œ 전솑
                if retry_exchange and retry_queue:
                    print(f"μž¬μ‹œλ„ 큐둜 이동: {message.get('id', 'unknown')}")
                    
                    # μž¬μ‹œλ„ 정보 μ—…λ°μ΄νŠΈ
                    headers['x-retry-count'] = retry_count + 1
                    headers['x-original-exchange'] = properties.headers.get('x-original-exchange', '')
                    headers['x-original-routing-key'] = properties.headers.get('x-original-routing-key', method.routing_key)
                    headers['x-last-retry-at'] = time.time()
                    
                    new_properties = pika.BasicProperties(
                        delivery_mode=2,
                        headers=headers
                    )
                    
                    # μž¬μ‹œλ„ 큐둜 λ°œν–‰
                    channel.basic_publish(
                        exchange=retry_exchange,
                        routing_key=retry_queue,
                        body=body,
                        properties=new_properties
                    )
                    
                    # 원본 λ©”μ‹œμ§€ 확인(ack)
                    channel.basic_ack(delivery_tag=method.delivery_tag)
                    
                else:
                    # μž¬μ‹œλ„ 섀정이 μ—†λŠ” 경우 원본 큐에 μž¬νμž‰
                    print(f"원본 큐에 μž¬νμž‰: {message.get('id', 'unknown')}")
                    channel.basic_nack(
                        delivery_tag=method.delivery_tag,
                        requeue=True
                    )
    
    @staticmethod
    def setup_retry_mechanism(
        channel,
        queue_name: str,
        retry_delay: int = 30000,  # μž¬μ‹œλ„ μ§€μ—° μ‹œκ°„ (λ°€λ¦¬μ΄ˆ)
        max_retries: int = 3
    ):
        """
        μž¬μ‹œλ„ λ©”μ»€λ‹ˆμ¦˜ μ„€μ •
        
        Args:
            channel: RabbitMQ 채널
            queue_name: 원본 큐 이름
            retry_delay: μž¬μ‹œλ„ μ§€μ—° μ‹œκ°„ (λ°€λ¦¬μ΄ˆ)
            max_retries: μ΅œλŒ€ μž¬μ‹œλ„ 횟수
            
        Returns:
            tuple: (μž¬μ‹œλ„ 큐 이름, λ°λ“œλ ˆν„° 큐 이름)
        """
        # λ°λ“œλ ˆν„° κ΅ν™˜κΈ° 및 큐 μ„€μ •
        dlx_name = f"{queue_name}.dlx"
        dl_queue_name = f"{queue_name}.deadletter"
        
        channel.exchange_declare(
            exchange=dlx_name,
            exchange_type='direct',
            durable=True
        )
        
        channel.queue_declare(
            queue=dl_queue_name,
            durable=True
        )
        
        channel.queue_bind(
            queue=dl_queue_name,
            exchange=dlx_name,
            routing_key='dead_letter'
        )
        
        # μž¬μ‹œλ„ κ΅ν™˜κΈ° 및 큐 μ„€μ •
        retry_exchange = f"{queue_name}.retry"
        retry_queue = f"{queue_name}.retry"
        
        channel.exchange_declare(
            exchange=retry_exchange,
            exchange_type='direct',
            durable=True
        )
        
        # μž¬μ‹œλ„ 큐 μ„€μ • (TTL 적용)
        channel.queue_declare(
            queue=retry_queue,
            durable=True,
            arguments={
                'x-dead-letter-exchange': '',  # κΈ°λ³Έ κ΅ν™˜κΈ°λ‘œ μ„€μ •
                'x-dead-letter-routing-key': queue_name,  # 원본 큐둜 λ‹€μ‹œ λΌμš°νŒ…
                'x-message-ttl': retry_delay  # μž¬μ‹œλ„ μ§€μ—° μ‹œκ°„
            }
        )
        
        channel.queue_bind(
            queue=retry_queue,
            exchange=retry_exchange,
            routing_key=retry_queue
        )
        
        # 원본 큐에 λ°λ“œλ ˆν„° κ΅ν™˜κΈ° μ„€μ •
        channel.queue_declare(
            queue=queue_name,
            durable=True,
            arguments={
                'x-dead-letter-exchange': dlx_name,
                'x-dead-letter-routing-key': 'dead_letter'
            }
        )
        
        print(f"μž¬μ‹œλ„ λ©”μ»€λ‹ˆμ¦˜ μ„€μ • μ™„λ£Œ: {queue_name}")
        return retry_queue, dl_queue_name

βœ… νŠΉμ§•:

  • 였λ₯˜ 처리 및 볡ꡬ λ©”μ»€λ‹ˆμ¦˜
  • μž¬μ‹œλ„ 둜직 κ΅¬ν˜„
  • λ©”μ‹œμ§€ 응닡 (ack, nack, reject)
  • λ°λ“œλ ˆν„° 큐 μ„€μ • 및 관리
  • μ§€μ—° μž¬μ‹œλ„ (TTL ν™œμš©)
  • μ§„ν–‰ μƒνƒœ 좔적 (헀더 μ‚¬μš©)
  • μ΅œλŒ€ μž¬μ‹œλ„ 횟수 μ„€μ •
  • 였λ₯˜ 정보 기둝
  • λ©”μ‹œμ§€ 처리 μƒνƒœ λ‘œκΉ…


5️⃣ λͺ¨λ‹ˆν„°λ§κ³Ό 관리

RabbitMQλŠ” κ°•λ ₯ν•œ λͺ¨λ‹ˆν„°λ§ κΈ°λŠ₯κ³Ό 관리 도ꡬλ₯Ό μ œκ³΅ν•˜μ—¬ λ©”μ‹œμ§• μΈν”„λΌμ˜ μƒνƒœλ₯Ό μΆ”μ ν•˜κ³  μ΅œμ ν™”ν•  수 μžˆλ‹€. Pythonμ—μ„œλŠ” 관리 APIλ₯Ό 톡해 μ΄λŸ¬ν•œ κΈ°λŠ₯에 ν”„λ‘œκ·Έλž˜λ§€ν‹±ν•˜κ²Œ μ ‘κ·Όν•  수 μžˆλ‹€.

import requests
import base64
import urllib.parse
import time
from typing import Dict, List, Any, Optional

class RabbitMQMonitor:
    """RabbitMQ λͺ¨λ‹ˆν„°λ§ 및 관리λ₯Ό μœ„ν•œ 클래슀"""
    
    def __init__(
        self,
        host: str = 'localhost',
        port: int = 15672,
        username: str = 'guest',
        password: str = 'guest',
        protocol: str = 'http'
    ):
        """
        RabbitMQ λͺ¨λ‹ˆν„° μ΄ˆκΈ°ν™”
        
        Args:
            host: RabbitMQ 관리 μΈν„°νŽ˜μ΄μŠ€ 호슀트
            port: RabbitMQ 관리 μΈν„°νŽ˜μ΄μŠ€ 포트 (κΈ°λ³Έ 15672)
            username: κ΄€λ¦¬μž μ‚¬μš©μž 이름
            password: κ΄€λ¦¬μž λΉ„λ°€λ²ˆν˜Έ
            protocol: 톡신 ν”„λ‘œν† μ½œ (http λ˜λŠ” https)
        """
        self.base_url = f"{protocol}://{host}:{port}/api"
        self.auth = (username, password)
        self.logger = logging.getLogger(__name__)
    
    def _request(
        self,
        method: str,
        endpoint: str,
        data: dict = None,
        params: dict = None
    ) -> dict:
        """
        RabbitMQ API μš”μ²­ λ©”μ„œλ“œ
        
        Args:
            method: HTTP λ©”μ„œλ“œ (GET, POST, PUT, DELETE)
            endpoint: API μ—”λ“œν¬μΈνŠΈ
            data: μš”μ²­ 데이터 (μžˆλŠ” 경우)
            params: URL νŒŒλΌλ―Έν„° (μžˆλŠ” 경우)
            
        Returns:
            dict: API 응닡
        """
        url = f"{self.base_url}/{endpoint}"
        
        try:
            response = requests.request(
                method=method,
                url=url,
                auth=self.auth,
                json=data,
                params=params
            )
            
            response.raise_for_status()
            
            if response.content:
                return response.json()
            return {}
            
        except requests.exceptions.HTTPError as e:
            self.logger.error(f"API μš”μ²­ 였λ₯˜ ({method} {url}): {e}")
            if response.content:
                self.logger.error(f"응닡: {response.text}")
            raise
        except Exception as e:
            self.logger.error(f"API μš”μ²­ 쀑 μ˜ˆμ™Έ λ°œμƒ: {e}")
            raise
    
    def get_overview(self) -> dict:
        """
        RabbitMQ μ„œλ²„ κ°œμš” 정보 쑰회
        
        Returns:
            dict: μ„œλ²„ κ°œμš” 정보
        """
        return self._request('GET', 'overview')
    
    def get_nodes(self) -> List[dict]:
        """
        RabbitMQ λ…Έλ“œ λͺ©λ‘ 쑰회
        
        Returns:
            List[dict]: λ…Έλ“œ 정보 λͺ©λ‘
        """
        return self._request('GET', 'nodes')
    
    def get_queues(self, vhost: str = None) -> List[dict]:
        """
        큐 λͺ©λ‘ 쑰회
        
        Args:
            vhost: 가상 호슀트 (μ§€μ •ν•˜μ§€ μ•ŠμœΌλ©΄ λͺ¨λ“  가상 호슀트)
            
        Returns:
            List[dict]: 큐 정보 λͺ©λ‘
        """
        endpoint = 'queues'
        if vhost:
            endpoint = f"queues/{urllib.parse.quote_plus(vhost)}"
        return self._request('GET', endpoint)
    
    def get_queue(self, vhost: str, queue_name: str) -> dict:
        """
        νŠΉμ • 큐 정보 쑰회
        
        Args:
            vhost: 가상 호슀트
            queue_name: 큐 이름
            
        Returns:
            dict: 큐 정보
        """
        endpoint = f"queues/{urllib.parse.quote_plus(vhost)}/{urllib.parse.quote_plus(queue_name)}"
        return self._request('GET', endpoint)
    
    def get_exchanges(self, vhost: str = None) -> List[dict]:
        """
        κ΅ν™˜κΈ° λͺ©λ‘ 쑰회
        
        Args:
            vhost: 가상 호슀트 (μ§€μ •ν•˜μ§€ μ•ŠμœΌλ©΄ λͺ¨λ“  가상 호슀트)
            
        Returns:
            List[dict]: κ΅ν™˜κΈ° 정보 λͺ©λ‘
        """
        endpoint = 'exchanges'
        if vhost:
            endpoint = f"exchanges/{urllib.parse.quote_plus(vhost)}"
        return self._request('GET', endpoint)
    
    def get_bindings(self, vhost: str = None) -> List[dict]:
        """
        바인딩 λͺ©λ‘ 쑰회
        
        Args:
            vhost: 가상 호슀트 (μ§€μ •ν•˜μ§€ μ•ŠμœΌλ©΄ λͺ¨λ“  가상 호슀트)
            
        Returns:
            List[dict]: 바인딩 정보 λͺ©λ‘
        """
        endpoint = 'bindings'
        if vhost:
            endpoint = f"bindings/{urllib.parse.quote_plus(vhost)}"
        return self._request('GET', endpoint)
    
    def get_connections(self) -> List[dict]:
        """
        μ—°κ²° λͺ©λ‘ 쑰회
        
        Returns:
            List[dict]: μ—°κ²° 정보 λͺ©λ‘
        """
        return self._request('GET', 'connections')
    
    def get_channels(self) -> List[dict]:
        """
        채널 λͺ©λ‘ 쑰회
        
        Returns:
            List[dict]: 채널 정보 λͺ©λ‘
        """
        return self._request('GET', 'channels')
    
    def purge_queue(self, vhost: str, queue_name: str) -> bool:
        """
        큐 λ‚΄μš© 제거
        
        Args:
            vhost: 가상 호슀트
            queue_name: 큐 이름
            
        Returns:
            bool: 성곡 μ—¬λΆ€
        """
        try:
            endpoint = f"queues/{urllib.parse.quote_plus(vhost)}/{urllib.parse.quote_plus(queue_name)}/contents"
            self._request('DELETE', endpoint)
            self.logger.info(f"큐 '{queue_name}' λ‚΄μš© 제거 μ™„λ£Œ")
            return True
        except Exception as e:
            self.logger.error(f"큐 λ‚΄μš© 제거 μ‹€νŒ¨: {e}")
            return False
    
    def create_user(self, username: str, password: str, tags: str = "management") -> bool:
        """
        μ‚¬μš©μž 생성
        
        Args:
            username: μ‚¬μš©μž 이름
            password: λΉ„λ°€λ²ˆν˜Έ
            tags: μ‚¬μš©μž νƒœκ·Έ (μ‰Όν‘œλ‘œ ꡬ뢄)
            
        Returns:
            bool: 성곡 μ—¬λΆ€
        """
        try:
            data = {
                "password": password,
                "tags": tags
            }
            self._request('PUT', f"users/{urllib.parse.quote_plus(username)}", data)
            self.logger.info(f"μ‚¬μš©μž '{username}' 생성 μ™„λ£Œ")
            return True
        except Exception as e:
            self.logger.error(f"μ‚¬μš©μž 생성 μ‹€νŒ¨: {e}")
            return False
    
    def set_permissions(
        self,
        vhost: str,
        username: str,
        configure: str = ".*",
        write: str = ".*",
        read: str = ".*"
    ) -> bool:
        """
        μ‚¬μš©μž κΆŒν•œ μ„€μ •
        
        Args:
            vhost: 가상 호슀트
            username: μ‚¬μš©μž 이름
            configure: μ„€μ • κΆŒν•œ μ •κ·œμ‹
            write: μ“°κΈ° κΆŒν•œ μ •κ·œμ‹
            read: 읽기 κΆŒν•œ μ •κ·œμ‹
            
        Returns:
            bool: 성곡 μ—¬λΆ€
        """
        try:
            data = {
                "configure": configure,
                "write": write,
                "read": read
            }
            endpoint = f"permissions/{urllib.parse.quote_plus(vhost)}/{urllib.parse.quote_plus(username)}"
            self._request('PUT', endpoint, data)
            self.logger.info(f"μ‚¬μš©μž '{username}'의 '{vhost}' κΆŒν•œ μ„€μ • μ™„λ£Œ")
            return True
        except Exception as e:
            self.logger.error(f"κΆŒν•œ μ„€μ • μ‹€νŒ¨: {e}")
            return False
    
    def get_queue_stats(self, vhost: str, queue_name: str) -> dict:
        """
        큐 톡계 정보 쑰회
        
        Args:
            vhost: 가상 호슀트
            queue_name: 큐 이름
            
        Returns:
            dict: 큐 톡계 정보
        """
        try:
            queue_info = self.get_queue(vhost, queue_name)
            
            stats = {
                "name": queue_info.get("name"),
                "messages": queue_info.get("messages", 0),
                "messages_ready": queue_info.get("messages_ready", 0),
                "messages_unacknowledged": queue_info.get("messages_unacknowledged", 0),
                "consumers": queue_info.get("consumers", 0),
                "message_stats": queue_info.get("message_stats", {}),
                "memory": queue_info.get("memory", 0),
                "state": queue_info.get("state", "unknown"),
                "node": queue_info.get("node", "unknown")
            }
            
            return stats
        except Exception as e:
            self.logger.error(f"큐 톡계 쑰회 μ‹€νŒ¨: {e}")
            return {}
    
    def monitor_queues(
        self,
        vhost: str = '/',
        interval: int = 5,
        limit: int = 10,
        output: bool = True
    ):
        """
        큐 λͺ¨λ‹ˆν„°λ§ μ‹€ν–‰
        
        Args:
            vhost: 가상 호슀트
            interval: λͺ¨λ‹ˆν„°λ§ 간격(초)
            limit: λͺ¨λ‹ˆν„°λ§ 횟수 (0=λ¬΄μ œν•œ)
            output: 좜λ ₯ μ—¬λΆ€
        """
        count = 0
        try:
            while limit == 0 or count < limit:
                queues = self.get_queues(vhost)
                
                if output:
                    print(f"\n===== 큐 λͺ¨λ‹ˆν„°λ§ ({time.strftime('%Y-%m-%d %H:%M:%S')}) =====")
                    print(f"{'이름':30} {'λ©”μ‹œμ§€':10} {'μ€€λΉ„':10} {'미확인':10} {'μ†ŒλΉ„μž':5}")
                    print("-" * 70)
                
                for q in queues:
                    name = q.get("name", "")
                    messages = q.get("messages", 0)
                    messages_ready = q.get("messages_ready", 0)
                    messages_unack = q.get("messages_unacknowledged", 0)
                    consumers = q.get("consumers", 0)
                    
                    if output:
                        print(f"{name[:30]:30} {messages:10} {messages_ready:10} {messages_unack:10} {consumers:5}")
                
                count += 1
                if limit == 0 or count < limit:
                    time.sleep(interval)
                    
        except KeyboardInterrupt:
            print("\nλͺ¨λ‹ˆν„°λ§ 쀑단됨")
        except Exception as e:
            self.logger.error(f"λͺ¨λ‹ˆν„°λ§ 였λ₯˜: {e}")

# μ‚¬μš© 예제
if __name__ == "__main__":
    # λ‘œκΉ… μ„€μ •
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    )
    
    # RabbitMQ λͺ¨λ‹ˆν„° 생성
    monitor = RabbitMQMonitor(
        host='localhost',
        port=15672,
        username='guest',
        password='guest'
    )
    
    # μ„œλ²„ κ°œμš” 정보 쑰회
    overview = monitor.get_overview()
    print(f"RabbitMQ 버전: {overview.get('rabbitmq_version')}")
    print(f"Erlang 버전: {overview.get('erlang_version')}")
    print(f"λ…Έλ“œ: {overview.get('node')}")
    
    # 큐 λͺ©λ‘ 쑰회
    queues = monitor.get_queues('/')
    print(f"\n큐 λͺ©λ‘ ({len(queues)}개):")
    for q in queues:
        print(f"- {q.get('name')}: {q.get('messages')} λ©”μ‹œμ§€")
    
    # 큐 λͺ¨λ‹ˆν„°λ§ (5초 간격, 3회)
    print("\n큐 λͺ¨λ‹ˆν„°λ§ μ‹œμž‘...")
    monitor.monitor_queues(interval=5, limit=3)

βœ… νŠΉμ§•:

  • RabbitMQ 관리 API ν™œμš©
  • μ„œλ²„ μƒνƒœ λͺ¨λ‹ˆν„°λ§
  • 큐와 κ΅ν™˜κΈ° 정보 쑰회
  • λ¦¬μ†ŒμŠ€ μ‚¬μš©λŸ‰ 좔적
  • λ©”μ‹œμ§€ 톡계 뢄석
  • μ‚¬μš©μž 관리 및 κΆŒν•œ μ„€μ •
  • μ‹€μ‹œκ°„ λͺ¨λ‹ˆν„°λ§ κΈ°λŠ₯
  • λŒ€μ‹œλ³΄λ“œ 정보 ν”„λ‘œκ·Έλž˜λ§€ν‹± μ ‘κ·Ό
  • REST API 기반 톡합
  • λͺ¨λ‹ˆν„°λ§ μžλ™ν™”


μ£Όμš” 팁

βœ… λͺ¨λ²” 사둀:

  • μ—°κ²° 관리: μž₯κΈ° μ‹€ν–‰ μ• ν”Œλ¦¬μΌ€μ΄μ…˜μ€ μ—°κ²° 풀링과 μžλ™ μž¬μ—°κ²° λ©”μ»€λ‹ˆμ¦˜μ„ κ΅¬ν˜„ν•˜μ—¬ μ•ˆμ •μ„±μ„ 높인닀.
  • λ©”μ‹œμ§€ 지속성: μ€‘μš” λ©”μ‹œμ§€λŠ” 지속성을 μ„€μ •ν•˜κ³  큐도 내ꡬ성 있게 κ΅¬μ„±ν•˜μ—¬ μ„œλ²„ μž¬μ‹œμž‘ μ‹œμ—λ„ 데이터 손싀을 λ°©μ§€ν•œλ‹€.
  • QoS μ„€μ •: prefetch_count 값을 적절히 μ‘°μ •ν•˜μ—¬ μ›Œμ»€ κ°„ λΆ€ν•˜ κ· ν˜•μ„ μœ μ§€ν•˜κ³  λ©”λͺ¨λ¦¬ μ‚¬μš©λŸ‰μ„ κ΄€λ¦¬ν•œλ‹€.
  • κ΅ν™˜κΈ° 선택: λ©”μ‹œμ§• νŒ¨ν„΄μ— λ§žλŠ” μ μ ˆν•œ κ΅ν™˜κΈ° μœ ν˜•μ„ μ„ νƒν•˜μ—¬ λΌμš°νŒ… νš¨μœ¨μ„±μ„ κ·ΉλŒ€ν™”ν•œλ‹€.
  • μž¬μ‹œλ„ λ©”μ»€λ‹ˆμ¦˜: λ°λ“œλ ˆν„° 큐와 TTL을 ν™œμš©ν•œ μ§€μ—° μž¬μ‹œλ„ μ „λž΅μœΌλ‘œ μΌμ‹œμ  였λ₯˜λ₯Ό μš°μ•„ν•˜κ²Œ μ²˜λ¦¬ν•œλ‹€.
  • λͺ¨λ‹ˆν„°λ§: 큐 길이, λ©”μ‹œμ§€ 처리율, λ©”λͺ¨λ¦¬ μ‚¬μš©λŸ‰μ„ μ •κΈ°μ μœΌλ‘œ λͺ¨λ‹ˆν„°λ§ν•˜μ—¬ 병λͺ© ν˜„μƒμ„ κ°μ§€ν•œλ‹€.
  • μ„±λŠ₯ μ΅œμ ν™”: 배치 처리, λ©”μ‹œμ§€ μ••μΆ•, 채널 μž¬μ‚¬μš©μœΌλ‘œ μ²˜λ¦¬λŸ‰μ„ μ΅œμ ν™”ν•œλ‹€.
  • λ³΄μ•ˆ μ„€μ •: κ°•λ ₯ν•œ μ•”ν˜Έμ™€ TLS μ—°κ²°λ‘œ λ©”μ‹œμ§€ 전솑을 λ³΄ν˜Έν•˜κ³ , VHOST와 μ‚¬μš©μž κΆŒν•œμ„ μ„ΈλΆ„ν™”ν•˜μ—¬ 접근을 μ œν•œν•œλ‹€.
  • ν΄λŸ¬μŠ€ν„°λ§: κ³ κ°€μš©μ„±μ„ μœ„ν•΄ RabbitMQ ν΄λŸ¬μŠ€ν„°λ₯Ό κ΅¬μ„±ν•˜κ³  μ μ ˆν•œ 정책을 μ„€μ •ν•œλ‹€.
  • λ°±μ—… μ „λž΅: 정기적인 ꡬ성 λ°±μ—…κ³Ό λ©”μ‹œμ§€ μŠ€λƒ…μƒ·μœΌλ‘œ μž¬ν•΄ 볡ꡬ λŒ€λΉ„μ±…μ„ λ§ˆλ ¨ν•œλ‹€.
  • λ‘œκΉ… μ „λž΅: λ©”μ‹œμ§€ 처리 κ³Όμ •μ—μ„œ μΆ©λΆ„ν•œ μ»¨ν…μŠ€νŠΈλ₯Ό λ‘œκΉ…ν•˜μ—¬ 문제 ν•΄κ²°κ³Ό 감사λ₯Ό μš©μ΄ν•˜κ²Œ ν•œλ‹€.


⚠️ **GitHub.com Fallback** ⚠️