5장 메세지를 받지 않고 소비하기 - JAVACAFE-STUDY/RabbitMQ-in-Depth GitHub Wiki

이번장에서 다루는 내용

  • 단순히 메시지를 전달받지 않고 소비하는 것이 좋은 이유
  • 메시지 배달 보장과 성능 간의 균형을 유지하는 방법
  • RabbitMQ의 큐별 설정을 사용해 큐를 자동으로 삭제하고, 메시지의 수명을 제한하는 등의 작업

5.1 Basic.Get vs Basic.Comsume

  • RabbitMQ는 큐에서 메시지를 가져오는 두 AMQP RPC 명령인 Basic.Get과 Basic.Consum을 구현
  • Basic.Get은 서버에서 메시지를 검색하는 이상적인 방법이 아님
  • Basic.Get은 폴링 모델
  • Basic.Consum은 푸시 기반 모델

5.1.1 Basic.Get

  • 큐에 대기 중인 메시지가 있으면 Basic.GetOk RPC로 응답
  • 큐에 대기 중인 메시지가 없으면 Basic.GetEmpty로 응답
  • Basic.Get을 사용하면 RabbitMQ의 RPC응답을 평가해 메시지가 수신됬는지 확인 해야 함
    • 메시지를 전달받는 프로세스의 오래 실행되면 주기적으로 메시지가 있는지 확인하고 처리 해야하는 비효율
  • Bacsic.Get 노트북에서 실행하는 코드
    • 메시지를 요청하는 무한 루프가 실행
    • 가장 간단한 방법이지만 대부분의 경우 성능이 좋지 않음
      • Basic.Consum을 사용하는 것보다 적어도 두 배 느림
      • 동기 방식으로 통신하며 클라이언트 요청 프레임을 보내고 RabbitMQ가 응답을 보내는것으로 구성된 오버헤드가 매번 발생
    • 메시지를 요구하는 시기를 알 수 없기 때문에 어떤 방식으로든 전달 프로세스를 최적화 할 수 없음
import rabbitpy

with rabbitpy.Connection() as conn:
    with conn.channel() as channel:
        queue = rabbitpy.Queue(channel, 'test-messages')
        while True:					  // 메시지를 받기 위한 무한 루프 
            message = queue.get()
            if message:					  // 메시지 평가 (빈 메시지인지 아닌지)
                message.pprint()
                message.ack()				  // 메시지 수신 확인 
                if message.body == 'stop':
                    break

5.1.2 Basic.Consume

  • 메시지를 소비하면 RabbitMQ에 애플리케이션을 등록하고 소비자 애플리케이션에 메시지를 비동기적으로 요청
    • 발행자-구독자 패턴 또는 Pub-Sub
  • 메시지를 구독하면 Basic.Cancel을 전송하기 전까지 클라이언트가 메시지를 자동으로 수신
  • 메시지를 받으면 메시지를 평가할 필요가 없음 (빈 메시지가 오지 않음)
  • Basic.Get과 마찬가지로 처리가 되었음을 ack를 호출 해야함
import rabbitpy

with rabbitpy.consume('amqp://guest:guest@localhost:5672/%2f', 'test-messages') as consume:
    for message in consume.next_message():
        message.pprint()
        message.ack()											

소비자 태그

  • Basic.Consum을 실행하면 RabbitMQ로 열린 채널에서 애플리케이션을 식별하는 고유한 문자열이 생성
    • 이게 소비자 테그라는건지? 아니면 식별자 뒤에 소비자 테그가 생성되는건지 잘 모르곘음
  • 소비자 태그는 Basic.Cansel RPC 명령으로 메시지 수신을 취소 할 떄 사용
  • 동시에 여러 큐에서 받은 메시지를 큐마다 다르게 작업 해야하는 경우 소비자 태그를 이용해서 구분하여 처리 가능
    • 대부분 라이브러리가 자동으로 처리
  • Consumer with Stop 코드
import rabbitpy
with rabbitpy.Connection() as connection:    
	with connection.channel() as channel:        
		for message in rabbitpy.Queue(channel, 'test-messages'): 
			message.pprint()
			message.ack()
			if message.body == 'stop':
				break

  • Message Publisher
import rabbitpy

for iteration in range(10):
    rabbitpy.publish('amqp://guest:guest@localhost:5672/%2f', '', 'test-messages', 'go')
rabbitpy.publish('amqp://guest:guest@localhost:5672/%2f', '', 'test-messages', 'stop')

  • stop 메시지를 받으면 반복문이 중지 된다.
  • 반복문이 종료될 떄 rabbitpy 라이브러리의 내부에는 몇 가지 동작이 실행 됨
    1. 라이브러리는 Basic.Cancel 명령을 전송
    2. Basic.CancelOk 응답이 수신
    3. 처리되지 않은 메시지를 보낸 경우 라이브러리는 Basic.Nack 명령을 전송 (CancelOk 이후 RabbitMQ에서 처리되지 않은 메시지를 보낸 경우 인가???)
    4. Bacic.Nack를 RabbitMQ가 수신 받으면 메시지를 다시 큐에 삽입하도록 지시

정리

  • 동기 방식의 Basic.Get과 비동기 방식의 Basic.Consume 중 하나를 선택하는 것은 소비자 애플리케이션을 작성할 때 택해야 할 첫 번째 선택
  • 메시지를 발행할 떄 발생되는 트레이드오프와 마찬가지로 애플리케이션에 대한 선택 사항은 메시지 배달 보장 및 성능에 직접적인 영향을 줄 수 있음

5.2 소비자 성능 조정

  • 메시지를 발행할 떄와 마찬가지로 메시지를 소비할 때도 메시지 처리량과 배달 보장에 균형을 잡아야하는 절충점이 있다.
  • RabbitMQ에서 메시지를 전달하는 데 사용할 수 있는 몇가지 옵션이 있다
  • RabbitMQ에서 좀 더 빠르고 많은 메시지를 처리량을 위해 메시지 배달에 대한 보장을 거의 제공하지 않을 수 있다.
  • 알아볼 내용
    • 메시지 수식 확인 옵션을 선택 RabbitMQ의 메시지 전달 처리량을 조절하는 방법
    • RabbitMQ의 메시지 사전 할당 임계값을 조정하는 방법
    • 소비자를 사용할 때의 트랜잭션이 미치는 영향

5.2.1 빠른 성능을 위한 무응답 모드로 메시지 소비하기

  • Basic.Consum RPC 요청을 보낼 떄, no-ack 플래그를 활성화 하면 소비자가 메시지 수신 확인을 하지 않음 으로 가능한 빨리 메시지를 보낼 수 있다.
  • No-Ack Consumer
    • no_ack=True로 메시지를 소비하는 것은 소비자에게 메시지를 전달하는 가장 빠른 방법
    • 메시지를 보내는 가장 안정적인 방법
import rabbitpy

with rabbitpy.Connection() as connection:    
	with connection.channel() as channel:        
		queue = rabbitpy.Queue(channel, 'test-messages')        
		for message in queue.consume_messages(no_ack=True): 
			message.pprint()
  • 메시지를 전달하기 위해 소켓에 쓰려고 할 때 네트워크에 문제가 발생하면 운영체제에서 RabbitMQ에 문제가 있을 알리는 소켓 오류 발생
  • 오류가 발생하지 않으면 RabbitMQ는 메시지가 배달됬다고 가정
  • 실제로 RabbitMQ는 소켓 버퍼가 다 찰 때까지 가능한 경우 메시지를 사용자에게 계속 전송
  • 메시지를 수신 확인을 기다리지 않기 때문에 메시지를 소비하는 데 가장 빠른 메시지 처리량을 제공
  • 위험 요소가 있음
    • 운영체제의 수신 소켓 버퍼에 100KB 메시지를 버퍼링한 상태로 장애가 발생할 경우 어떻게 될지 생각해보자
    • RabbitMQ는 이미 메시지들을 보낸 것으로 간주
    • 소켓이 닫힐 때 운영체제에서 읽어야하는 메시지의 수를 표시하지 않는다.
    • 소켓 수신 버퍼와 메시지 크기 및 수량에 따라 문제가 달라진다. 메시지를 이런 방식으로 소비하는 것이 애플리케이션 아키텍처에 맞지 않지만, 단일 메시지 전달 후 메시지 수신이 제공하는 것보다 빠른 메시지를 처리를 원한다면 소비자 채널 서비스 설정의 프리체리를 제어하는 것이 좋다

서비스 품질 설정을 통한 소비자 프리페치 제어

  • AMQP 스펙에서 소비자가 메시지 수신을 확인하기 전에 미리 지정된 수의 메시지를 수신하도록 처리할 수 있는 서비스 품질(Quality of Service) 설정을 채널에 요청할 수 있다.
  • QoS 설정을 통해 RabbitMQ는 소비자에게 미리 할당할 메시지 수를 지정해 메시지를 좀 더 효율적으로 보낼 수 있다.
  • 수신 확인을 비활성화(no_ack-True)한 소비자와 달리, 소비자 애플리케이션은 메시지를 확인하기 전에 충동하는 경우 소켓을 닫으면 미리 가졍온 모든 메시지가 큐로 반환된다.
  • 프로토콜 수준에서 내철 에 Basic.QoS RPC 요청을 보내면 서비스 품질이 지정된다.
  • 전송하는 채널에 대해서만 QoS를 설정할지 혹은 연결된 모든 채널에 대해 QoS를 설정할지 지정할 수 있다.
  • Specifying QoS
import rabbitpy

with rabbitpy.Connection() as connection:    
	with connection.channel() as channel:
		channel.prefetch_count(10)        // 프리페치 카운트를 10으로 설정 
		for message in rabbitpy.Queue(channel, 'test-messages'): 
			message.pprint()
			message.ack()
* 노트: AMQP 스펙에는 Basic.QoS 메소드의 프리페치 수와 프리페치 크기를 모두 설정하지만, no_ack 옵션을 설정하면 프리페치 크기가 무시된다.

프리페치 값을 최적의 수준으로 교정

  • 프리페치 수를 지나치게 할당하면 메시지 처리량에 부정적인 영향을 미칠 수 있음을 인식하는 것이 중요
  • 프리페치 수가 성능에 영향을 미치는지 벤치마크하는 것이 중요
  • 간단한 메시지를 단일 소비자에서 벤치마크했을 때 2500의 프리페치 카운트 값이 적합한 설정임을 확인

한 번에 여러 메시지 확인하기

  • QoS 설정 중 유용한 또 다른 점은 Basic.Ack RPC 응답과 함께 받은 각 메시지를 개별적으로 하나씩 확인하지 않아도 됨
  • Basic.Ack RPC 응답의 multiple 속성을 True로 설정해 반환하면 RabbitMQ는 수신 확인하지 않은 모든 메시지를 수신 확인으로 처리
  • Multi-Ack Consumer
import rabbitpy

with rabbitpy.Connection() as connection:    
	with connection.channel() as channel:
		channel.prefetch_count(10)        // 프리페치 카운트를 10으로 설정 
		unacknowledged = 0
		for message in rabbitpy.Queue(channel, 'test-messages'): 
			message.pprint()
			unacknowledged += 1
			if unacknowledged == 10:
				message.ack(all_previous=True)
				unacknowledged = 0

  • 이 방식은 일정 수준의 위험이 따른다
    • 일부 메시지를 성공적으로 처리하고 애플리케이션이 메시지를 확인하기 전에 장애가 발생한다면, 모든 미확인 메시지는 큐로 돌아가서 다른 소비자 프로세스에 의해 처리된다.
    • 최고 성능 사이의 적절한 지점을 찾는 부분에는 골디락스 원칙이 적용된다.

5.2.3 소비자 애플리케이션에서 트랜잭션 사용하기

  • 트랜잭션을 사용해 소비자 애플리케이션에서 일관 작업을 커밋하고 롤백할 수 있다.
  • 트랜잭션은 한가지 예외적인 상황을 제외하고는 메시지 처리량에 부정적인 영향을 미칠 수 있다.
  • QoS 설정을 사용하지 않는 경우에도 트랜잭션을 사용해 메시지 확인 응답을 일관 처리할 때 약간의 성능 향상 이점이 있다.
  • 트랜젝션을 사용할 떄도 벤치마크해 판단하는 것이 좋다

5.3 메시지 거부하기

  • 이번에 알아볼 내용
    • 메시지 처리중 문제가 발생하면 어떻게 할까?
    • Basic.Reject, Basic.Nack 두 기능의 차이점
    • 거부된 메시지를 일괄 처리해서 시스템 문제를 파악하는데 용이한 RabbitMQ 전용 확장, 데드 레터 익스체인지

5.3.1 Basic.Recject

  • 메시지를 처리할 수 없을을 메시지 브로커에게 알리는 AMQP의 RPC 응답
  • 소비자가 메시지를 거부하면 RabbitMQ가 메시지를 삭제하거나 큐에 있는 메시지를 다시 삽입하도록 지사 할 수 있다.
  • 재삽입 플래그가 활성화되면 RabbitMQ는 차후에 다시 처리되도록 큐에 메시지를 넣는다.
  • 재삽입 플레그는 데이터베이스 원격 API와 같은 다른 서비스와 통신하는데 종종 사용
  • 재시도를 위해 소비자에서 로직을 구현하는 대신, 예외를 잡아서 재삽입 플래그를 활성화한 메시지를 거부해서 처리한다.
  • Message Rejection
    • 재삽입된 메시지임을 알리는 redelivered 플래그가 참으로 출력
    • '두 번 실패 시 제거' 정책으로 구현되어 있다.
    • 메시지가 잘못된 형식 때문에 예외가 발생할 수도 있다
import rabbitpy

for message in rabbitpy.consume('amqp://guest:guest@localhost:5672/%2f', 'test-messages'):
	message.pprint()
	print('Redelivered: %s' % message.redelivered)
	message.reject(True)

  • Basic.Recject는 동시에 여러 메시지를 거부할 수는 없다.

5.3.2 Basic.Nack

  • 메시지를 거부할 때 다중 메시지를 처리해야 하는데, 아쉽게도 AMQP 스펙에는 다중 메시지 거부 기능이 제공되지 않음
  • RabbitMQ 팀에서 Basic.Nack 라는 새로운 RPC 응답 메소드를 구현
    • 다른 AMQP 메시지 브로커에 존재하지 않을 수도 있다.
  • Basic.Reject 응답 메소드와 동일한 메시지 거부를 구현했지만 다중 메시지를 처리할 수 있다.

5.3.3 데드 레터 익스체인지 (DLX, Dead-Latter Exchange)

  • RabbitMQ 확장 스팩

  • 전달된 메시지를 거부할 수 있는 추가적인 기능

  • 특정 메시지를 소비하는 데 발생한 문제 원인을 찾는 데 유용

  • 오류를 저장하고 찾는 코드를 직접 작성하는 대신, 데드 레터 익스체인지를 이용해 문제 원인을 찾을 수 있다.

  • 다른 일반적인 익스체인지와 동일하고 생성하고 실행하는 데 특별한 점은 없다.

  • 메시지가 거부되면 RabbitMQ는 메시지를 큐의 x-dead-letter-exchange 인수에 지정된 익스체인지로 라우팅한다. Uploading 스크린샷 2020-02-21 오후 2.50.33.png…

  • Specifying a Dead Letter Exchange

    • Queue 객체 생성을 하고 익스체인지의 이름과 dead_letter_exchange 인수 혹은 x-dead-letter-exchange 인수를 전달하면 된다.
import rabbitpy

with rabbitpy.Connection() as connection:
	with connection.channel() as channel:
		rabbitpy.Exchange(channel, 'rejected-messages').declare()
		queue = rabbitpy.Queue(channel, 'dlx-example', dead_letter_exchange='rejected-messages')
		queue.declare()

  • 데드 레터 익스체인지 응용 분야
    • 잘못된 형식의 메시지를 안전한 장소로 저장
    • 거부된 신용카드 승인 처리 같은 기능

큐 제어하기

  • RabbitMQ는 큐 사용에 대한 다양한 옵션을 제공
  • 큐의 동작을 결정하는 설정
    • 자동 삭제 큐
    • 큐 독점 설정
    • 자동 메시지 만료
    • 대기 메시지 수 제한
    • 오래된 메시지 큐에서 제거

5.4.1 임시 큐

자동 삭제 큐

  • 사용후 더 이상 필요하지 않은 경우 자신을 삭제하는 큐를 제공
  • 소비자와 연결을 맺고 메시지를 전달 후 연결을 끊으면 큐는 제거
    • 구독자가 더 이상 없을 때만 삭제
    • 다수 소비자가 구독할 수 있다
  • Auto-Delete Queue
import rabbitpy

with rabbitpy.Connection() as connection:
	with connection.channel() as channel:
		queue = rabbitpy.Queue(channel, 'ad-example', auto_delete=True)
		queue.declare()
  • 사용 예
    • 채팅 어플리케이션에서 사용자의 연결이 끊어지면 큐에 읽지 않은 메시지가 있더라도 자동으로 삭제
    • 애플리케이션이 종료할 때 자동으로 큐를 정리하는 용도

큐 독점 설정

  • 큐를 구독해서 메시지를 소비하는 소비자의 수에 대한 제한은 없다.
  • 특정 시나리오에서는 다닐 소비자만 큐의 메시지를 사용할 수 있도록 해야한다.
  • 큐에 독점 기능을 활성화하면 소비자가 연결 해제된 후 큐가 자동으로 제거가 된다.
  • Exclusive Queue
import rabbitpy

with rabbitpy.Connection() as connection:
    with connection.channel() as channel:
        queue = rabbitpy.Queue(channel, 'exclusive-example', exclusive=True)
        queue.declare()

  • 채널이 닫힐 때까지 독점 큐를 구독하는 사용자가 원하는 만큼 소비하고 취소 할 수 있다.
  • Basic.Consume 요청이 발행됐는지에 관계없이 독점 큐의 자동 삭제가 발생한다.

자동 만료 큐

  • RabbitMQ는 일정 기간 사용하지 않는 큐를 삭제할 수 있는 기능 제공
  • Expiring Queue
import rabbitpy
import time

with rabbitpy.Connection() as connection:
	with connection.channel() as channel:
		queue = rabbitpy.Queue(channel, 'expiring-queue', arguments={'x-expires': 1000})
		queue.declare()

		messages, consumers = queue.declare(passive=True)
		time.sleep(2)
		try:
			messages, consumers = queue.declare(passive=True)
		except rabbitpy.exceptions.AMQPNotFound:
			print('The queue no longer exists')
  • 자동 만료 큐 규칙
    • 큐는 소비자가 없으면 만료된다. 연결된 소비자가 있는 경우 큐는 Basic.Cancel을 실행하거나 연결을 해제한 후에만 자동으로 제거된다.
    • 큐는 TTL지속 시간 동안 Basic.Get 요청이 없으면 만료된다. 하나의 Basic.Get 요청이 만료 값이 있는 큐로 전송이 되면 만료 설정이 초기화되고 큐는 자동으로 삭제 되지 않는다.
    • 다른 일반적인 큐와 마찬가지로 x-expires 설정은 다시 설정되거나 변경될 수 없다. 큐를 다시 선언하고 x-expires 인수의 값을 원하는 값으로 설정한다면 AMQP 스펙에서 클라이언트가 다른 설정으로 큐를 다시 선언하지 말아야 한다는 큐칙을 위반하게 된다.
    • RabbitMQ는 큐가 만료될때 즉시 제거되는 것을 보장하지 않는다.

5.4.2 영구적인 큐

내구성 큐

  • 서버를 재시작한 후에도 계속 유지돼야 하는 큐를 선언하려면 내구성 플래그를 True로 설정
  • Queue.Delete 가 호출되기 전까지 RabbitMQ가 큐를 삭제하지 않도록 설정
    • 큐의 메시지가 아니라 큐가 유지된다는 내용같음
  • Durable Queue
import rabbitpy

with rabbitpy.Connection() as connection:
	with connection.channel() as channel:
		queue = rabbitpy.Queue(channel, 'durable-queue', durable=True)
		if queue.declare():
			print('Queue declared')

큐에서 메시지의 자동 만료

  • 메시지를 너무 오랫동안 소비하지 않을 때 자동으로 삭제하는 기능이 필요할 수 있다.
  • 데드 레터 익스체인지와 TTL값을 모두 설정한 큐는 만료시에 메시지를 데드 레터로 간주
  • x-message-ttl 큐 설정은 퀴에 있는 모든 메시지의 최대 수명을 적용
    • 메시지마다 적용되는 expiration 속성과는 다름
  • Queue with Message
import rabbitpy

with rabbitpy.Connection() as connection:
	with connection.channel() as channel:
		queue = rabbitpy.Queue(channel, 'expiring-msg-queue', arguments={'x-message-ttl': 1000})
		queue.declare()

제한된 수의 메시지 보관

  • RabbitMQ 3.1.0부터 큐의 메시지 최대 크기를 설정 할 수 있다.
  • 큐의 메시지 최대크기에 도달하면 새로운 메시지가 추가될 때 가장 먼저 받은 메시지를 삭제
  • 맨 앞에서 제거된 메시지는 데드 레터 익스체인지로 설정한 경우 해당 익스체인지로 이동
  • Queue width Maximum Length
import rabbitpy

with rabbitpy.Connection() as connection:
    with connection.channel() as channel:
        queue = rabbitpy.Queue(channel, 'max-length-queue', arguments={'x-max-length': 1000})
        queue.declare()

5.4.3 임의의 큐 설정

  • RabbitMQ 팀은 큐와 관련된 AMQP 스펙을 확장하는 새로운 기능을 구현
    • x-dead-letter-exchange : 메시지가 재삽입되지 않고 거부될 때, 라우팅할 익스체인지
    • x-dead-letter-routing-key : 거부된 메시지를 라우팅하는 데 사용하는 라우팅 키
    • x-expires : 지정된 시간(밀리초 단위) 후에 큐를 삭제
    • x-ha-policy : HA 큐를 만들 떄, 노드간에 HA를 적용하는 정책 지정
    • x-ha-nodes : HA 큐를 분산할 노드(4.1.6절 참조)
    • x-max-length : 큐의 최대 메시지 수
    • x-message-ttl : 큐에서 지정하는 메시지 만료 시간(밀리초 단위)
    • x-max-priority : 최대 값이 255인 큐의 우선순위를 지정하는 데 사용 (RabbitMQ 3.5.0 이상)

5.5 요약

  • 빠른 처리량과 메시지 배달 보장 사이의 균형을 고려하고 벤치마크를 수행해서 결정 해야 한다.
  • 다음 질문을 고려해 적합한 옵션을 찾는 것을 추천
    • 모든 메시지를 수신했는지 또는 폐기할 수 있는지 확인해야 하는가?
    • 메시지를 받은 다음 일괄적으로 수신 확인하거나 거부해야 하는가?
    • 그렇지 않다면, 개별 작업을 자동으로 일괄 처리하고 트랜잭션을 사용해 성능을 향상시킬 수 있는가?
    • 소비자 애플리케이션에서 트랜잭션 커밋 및 롤백 기능이 정말로 필요한가?
    • 소비자가 구독하는 큐의 메시지를 독점적으로 접근해야 하는가?
    • 소비자 애플리케이션에 오류가 발생했을 때 어떻게 처리해야 하는가? 메시지를 보내버려야 하는가? 큐에 재 삽입해야 하는가? 혹은 데드 레터 익스체인지로 보내야 하는가?