Apache Kafka ‐ 카프카 컨슈머 애플리케이션 - dnwls16071/Backend_Summary GitHub Wiki
📚 컨슈머
- 프로듀서가 전송한 데이터는 카프카 브로커에 적재된다.
- 컨슈머는 적재된 데이터를 사용하기 위해 브로커로부터 데이터를 가져와서 필요한 처리를 하게 된다.
- Fetcher : 리더 파티션으로부터 레코드들을 미리 가져와서 대기한다.
poll() : Fetcher에 있는 레코드들을 리턴하는 메서드
- ConsumerRecords : 처리하고자하는 레코드들의 모음. 오프셋이 포함되어 있다.
📚 컨슈머 그룹⭐
- 컨슈머 그룹으로 운영하는 방법은 컨슈머를 각 컨슈머 그룹으로부터 격리된 환경에서 안전하게 운영할 수 있도록 도와주는 카프카의 독특한 방식이다.
- 컨슈머 그룹으로 묶인 컨슈머들은 토픽의 1개 이상 파티션에 할당되어 데이터를 가져갈 수 있다.
- 컨슈머 그룹으로 묶인 컨슈머들은 토픽을 구독해서 데이터를 가져가는데 1개의 파티션은 최대 1개의 컨슈머에만 할당 가능하다.
- 1개의 컨슈머는 여러 개의 파티션에 할당될 수 있다. 이런 특징으로 컨슈머 그룹의 컨슈머 개수는 가져가고자 하는 토픽의 파티션 개수보다 같거나 작아야 한다.
컨슈머 그룹 컨슈머가 파티션 개수보다 많은 경우
- 4개의 컨슈머로 이루어진 컨슈머 그룹으로 3개의 파티션을 가진 토픽에서 데이터를 가져가기 위해 할당하면 1개의 컨슈머는 파티션을 할당받지 못하고 유휴 상태로 남게 된다.
- 파티션을 할당받지 못한 컨슈머는 쓰레드만 차지하고 실질적인 데이터 처리를 하지 못하므로 애플리케이션 실행에 있어 불필요한 쓰레드로 남게 된다.
컨슈머 그룹을 활용하는 이유
- 카프카는 위와 같은 파이프라인을 운영함에 있어 최종 적재되는 저장소의 장애에 유연하게 대응할 수 있도록 각기 다른 저장소에 저장하는 컨슈머를 다른 컨슈머 그룹으로 묶음으로써 각 최종 저장소의 장애에 격리되어 운영할 수 있게 된다.
- 만약 예를 들어 엘라스틱서치의 장애로 인해 더는 적재가 되지 못하더라도 하둡으로 데이터를 적재하는 데에는 문제가 없어진다.
- 엘라스틱서치 장애가 해소되면 엘라스틱서치로 적재하는 컨슈머의 컨슈머 그룹에는 마지막으로 적재가 완료된 데이터 이후부터 다시 적재를 수행해 최종적으로 모두 정상화된다.
📚 리밸런싱
- 컨슈머 그룹으로 이루어진 컨슈머 중 일부 컨슈머에서 장애가 발생하면 장애가 발생한 컨슈머에 할당된 파티션은 장애가 발생하지 않은 컨슈머에 소유권이 넘어간다.
- 이런 과정을 리밸런싱이라고 한다.
- 컨슈머가 추가되는 상황
- 컨슈머가 제외되는 상황
- 이슈가 발생한 컨슈머를 컨슈머 그룹에서 제외하여 모든 파티션이 지속적으로 데이터를 처리할 수 있도록 가용성을 높여준다.
- 리밸런싱은 컨슈머가 데이터를 처리하는 도중에 언제든지 발생할 수 있으므로 데이터 처리 중 발생한 리밸런싱에 대응하는 코드를 작성해줘야 한다.
📚 커밋
- 컨슈머는 카프카 브로커로부터 데이터를 어디까지 가져갔는지 커밋(commit)을 통해 기록한다.
- 특정 토픽의 파티션을 어떤 컨슈머 그룹이 몇 번째까지를 가져갔는지 카프카 브로커 내부에서 사용되는 내부 토픽
(__consumer_offsets)에 기록된다.
- 컨슈머 동작에서 이슈가 발생하여
__consumer_offsets 토픽에 어느 레코드까지 읽어갔는가 오프셋 커밋이 기록되지 못했다면 데이터 처리의 중복이 발생할 수 있다.
- 그러므로 데이터 중복이 발생하지 않게 하기 위해서 컨슈머 애플리케이션이 오프셋 커밋을 정상적으로 처리했는지를 검증해야한다.
📚 어사이너
- 컨슈머와 파티션 할당 정책은 컨슈머의 어사이너(Assignor)에 의해 결정된다.
- 카프카에서는 RangeAssignor, RoundRobinAssignor, StickyAssignor를 제공한다.
- 카프카 2.5.0에서는 RangeAssignor가 기본값으로 설정된다.
- RangeAssignor : 각 토픽에서 파티션을 숫자로 정렬, 컨슈머를 사전 순서로 정렬하여 할당
- RoundRobinAssignor : 모든 파티션을 컨슈머에서 번갈아가면서 할당
- StickyAssignor : 최대한 파티션을 균등하게 배분하면서 할당
📚 컨슈머 주요 옵션⭐
컨슈머 주요 옵션(필수 옵션)
bootstrap.servers : 프로듀서가 데이터를 전송할 대상 카프카 클러스터에 속한 브로커의 호스트 이름:포트를 1개 이상 작성한다. 2개 이상 브로커 정보를 입력해 일부 브로커에서 이슈가 발생하더라도 접속하는 데 이슈가 없도록 설정하는 것이 가능하다.
key.deserializer : 레코드 메시지 키를 역직렬화하는 클래스 지정
value.deserializer : 레코드 메시지 값을 역직렬화하는 클래스 지정
컨슈머 주요 옵션(선택 옵션)
group.id : 컨슈머 그룹의 아이디를 지정한다. subscribe() 메서드로 토픽을 구독해 사용할 때 이 옵션을 필수로 넣어야 한다. 기본 값은 null이다.
auto.offset.reset : 컨슈머 그룹이 특정 파티션을 읽을 때 저장된 컨슈머 오프셋이 없는 경우 어느 오프셋부터 읽을 것인가를 지정하는 옵션이다. 이미 컨슈머 오프셋이 있다면 이 옵션값은 무시된다. 기본값은 latest이다.
enable.auto.commit : 자동 커밋으로 할지 수동 커밋으로 할지 선택한다. 기본값은 true이다.
auto.commit.interval.ms : 자동 커밋일 경우 오프셋 커밋 간격을 지정한다. 기본값은 5000(5초)이다.
max.poll.records : `poll() 메서드를 통해 반환되는 레코드 개수를 지정한다. 기본값은 500이다.
session.timeout.ms : 컨슈머가 브로커와 연결이 끊기는 최대 시간이다. 기본값은 10000(10초)이다.
heartbeat.interval.ms : 하트비트를 전송하는 시간 간격이다. 기본값은 3000(3초)이다.
max.poll.interval.ms : poll() 메서드를 호출하는 간격의 최대 시간. 기본값은 300000(5분)이다.
isolation.level : 트랜잭션 프로듀서가 레코드를 트랜잭션 단위로 보낼 경우 사용한다.
auto.offset.reset
- 컨슈머 그룹이 특정 파티션을 읽을 때 저장된 컨슈머 오프셋이 없는 경우 어느 오프셋부터 읽을지 선택하는 옵션이다.
- 이미 컨슈머 오프셋이 있다면 이 옵션값은 무시된다. 이 옵션은 latest, earliest, none 중 1개를 설정할 수 있다.
latest : 설정하면 가장 높은(=가장 최근에 넣은) 오프셋부터 읽기 시작한다.
earliest : 설정하면 가장 낮은(=가장 오래전에 넣은) 오프셋부터 읽기 시작한다.
none : 설정하면 컨슈머 그룹이 커밋한 기록이 있는지 찾아본다. 만약 커밋 기록이 없으면 오류를 반환하게 되고 커밋 기록이 있다면 기존 커밋 기록 이후 오프셋부터 읽기 시작한다. 기본값은 latest이다.
📚 컨슈머 애플리케이션 종료
- 컨슈머 애플리케이션은 안전하게 종료되어야 한다.
- 정상적으로 종료되지 않은 컨슈머는 세션 타임아웃이 발생할 때까지 컨슈머 그룹에 남게 된다.
- 컨슈머를 종료하기 위해
wakeUp() 메서드를 지원한다. wakeUp() 메서드를 실행해 KafkaConsumer 인스턴스를 안전하게 종료할 수 있다.
wakeUp() 메서드가 실행된 이후 프로듀서에서 poll() 메서드가 호출되면 WakeUpException 예외가 발생한다. 예외를 받은 뒤에 데이터 처리를 위해 자원들을 해제하면 된다.
📚 멀티쓰레드 컨슈머
- 카프카 처리량을 늘리기 위해 파티션과 컨슈머 개수를 늘려서 운영할 수 있다.
- 파티션을 여러 개로 운영하는 경우 데이터를 병렬처리하기 위해서 파티션 개수와 컨슈머 개수를 동일하게 맞추는 것이 가장 좋은 방법이다.
- 토픽의 파티션은 1개 이상으로 이루어져 있으며, 1개의 파티션은 1개의 컨슈머가 할당되어 데이터를 처리할 수 있다.
- 파티션 개수가 N개라면 동일 컨슈머 그룹으로 묶인 컨슈머 쓰레드를 최대 N개까지 운영할 수 있다.
- 그러므로 N개의 쓰레드를 가진 1개의 프로세스를 운영하거나 1개의 쓰레드를 가진 프로세스를 N개 운영하는 방법도 있다.
📚 컨슈머 랙⭐
- 컨슈머 랙(LAG)은 파티션의 최신 오프셋과 컨슈머 오프셋 간의 차이를 말한다.
- 프로듀서는 계속해서 새로운 데이터를 파티션에 저장하고 컨슈머는 자신이 처리할 수 있는 만큼 데이터를 가져간다.
- 컨슈머 랙은 컨슈머가 정상 동작하는가에 대한 여부를 확인할 수 있기 때문에 컨슈머 애플리케이션을 운영한다면 필수적으로 모니터링해야 하는 지표이다.
- 컨슈머 랙은 컨슈머 그룹과 토픽, 파티션별로 생성된다. 1개의 토픽에 3개의 파티션이 존재하고 1개의 컨슈머 그룹이 토픽을 구독해 데이터를 가져가면 컨슈머 랙은 총 3개가 되는 것이다.
- 프로듀서가 보내는 데이터 양이 컨슈머 데이터 처리량보다 크다면 컨슈머 랙은 비례해서 늘어나게 된다.
- 반대로 프로듀서가 보내는 데이터 양이 컨슈머 데이터 처리량보다 작다면 컨슈머 랙은 줄어들게 되고 최솟값은 0으로 지연이 없음을 뜻하게 된다.
컨슈머 랙 모니터링
- 컨슈머 랙을 모니터링하는 것은 카프카를 통한 데이터 파이프라인을 운영하는데에 핵심적인 역할을 한다.
- 컨슈머 랙을 모니터링함으로써 컨슈머 장애를 확인할 수 있고 파티션 개수를 정하는 데 참고할 수 있다.
컨슈머 랙 모니터링 - 처리량 이슈
- 프로듀서 데이터 양이 늘어날 경우 컨슈머 랙이 늘어날 수 있다.
- 이 경우 파티션 개수와 컨슈머 개수를 늘려 병렬처리량을 늘려 컨슈머 랙을 줄일 수 있다.
- 컨슈머 개수를 2개로 늘림으로써 컨슈머 데이터 처리량을 2배로 늘릴 수 있게 된다.
컨슈머 랙 모니터링 - 파티션 이슈
- 프로듀서 데이터 양이 일정함에도 불구하고 컨슈머 장애로 인해 컨슈머 랙이 증가할 수도 있다.
- 컨슈머는 파티션 개수만큼 늘려 병렬처리하며 파티션마다 컨슈머가 할당되어 데이터를 처리하게 되는데 위와 같이 2개의 파티션과 2개의 컨슈머가 각각 할당되어 데이터를 처리하는 상황에서 프로듀서가 보내는 데이터 양이 동일한데 1번 파티션에서 컨슈머 랙이 발생한다면 1번 파티션에 할당된 컨슈머에 이슈가 발생했음을 추측할 수 있다.
컨슈머 랙을 확인하는 방법①
kafka-consumer-group.sh 명령어를 사용하면 컨슈머 랙을 포함한 특정 컨슈머 그룹의 상태를 확인할 수 있다.
- 컨슈머 랙을 확인하기 위한 가장 기초적인 방법으로 다음과 같은 명령어를 사용한다.
- 카프카 명령어를 통해 컨슈머 랙을 확인하는 방법은 일회성에 그치고 지표를 지속적으로 기록하고 모니터링하기에는 부족하다.
- 그렇기에
kafka-consumer-group.sh를 통해 컨슈머 랙을 확인하는 것은 테스트용 카프카에서 주로 사용한다.
$ bin/kafka-consumer-group.sh --bootstrap-server localhost:9092 --group test-consumer-group --describe
컨슈머 랙을 확인하는 방법②
- 컨슈머 애플리케이션에서 KafkaConsumer 인스턴스의
metrics() 메서드를 활용하면 컨슈머 랙 지표를 체크할 수 있다.
- 컨슈머 인스턴스가 제공하는 컨슈머 랙 관련 모니터링 지표는 3가지로
records-lag-max, records-lag, records-lag-avg이다.
- 허나 컨슈머가 정상 동작할 경우만 확인할 수 있다는 것이다.
metrics() 메서드는 컨슈머가 정상적으로 실행될 경우에만 호출된다. 그렇기에 만약 컨슈머 애플리케이션이 비정상적으로 종료되면 더는 컨슈머 랙을 모니터링하기 어렵다.
- 모든 컨슈머 애플리케이션에 컨슈머 랙 모니터링 코드를 중복해서 작성해야 한다. 컨슈머 애플리케이션을 여러 종류로 운영할 경우 각기 다른 컨슈머 애플리케이션에
metrics() 메서드를 호출하여 컨슈머 랙을 수집하는 로직을 중복해서 넣어야만 한다. 왜냐하면 특정 컨슈머 그룹에 해당하는 애플리케이션이 수집하는 컨슈머 랙은 자기 자신이 속한 컨슈머 그룹에 대한 컨슈머 랙만 한정되기 때문이다.
- 컨슈머 랙을 모니터링하는 코드를 추가할 수 없는 카프카 서드 파티 애플리케이션의 컨슈머 랙 모니터링이 불가능하다.
컨슈머 랙을 확인하는 방법③
- 가장 권장되는 방법으로 외부 모니터링 툴을 사용하는 것이다.
- 모니터링 지표에는 컨슈머 랙도 포함되기 때문에 적합하다.
- 컨슈머 랙 모니터링만을 위한 툴로는 오픈소스로 공개된 버로우(Burrow)가 있다.