Kafka 정리 - asloud/dev_note01 GitHub Wiki
Kafka 참고자료
- Apache Kafka 기술정리.docx
- 카프카에 전반적 소개
- 카프카 프로듀서 컨슈터 실전 코딩
- kafka multi-node 클러스터 구성방법 :
- kafka 기본 튜토리얼 :
카프카 특징
- Persistent messaging
- 메시지를 디스크에 저장(타 시스템은 메모리)하는 덕분에 메시지 손실이 없으며, 저장된 용량이 커져도 성능저하 업이 동일한 속도를 보장한다.
- High throughput
- 한 대의 일반 범용 서버로 초당 수백만 개의 메시지를 처리할 수 있다.
- Distrubuted
- 카프카는 파티셔닝을 지원해 다수의 카프카 서버에 메시지를 분산 처리할 수 있다. 로드밸런싱과 Fault Tolerance도 지원한다. 카프카 서버가 늘어날수록 성능히 선형적으로 증가한다
- Multiple client support
- 카프카는 자바와 스칼라 언어로 개발됐지만 .NET PHP, 루비, 파이썬 등 다양한 언어로 개발된 클라이언트와도 손쉽게 통합할 수 있다.
- Real Time
- 카프카 서버를 기준으로 프로듀서,컨슈머는 파이프라인으로 연결돼 프로듀서에 의해 생성된 메시지는 컨슈머에 의해 즉시 처히된다.
카프카 구조,기본 구성
- kafka는 producer, consumer, broker로 구성되고 kafka 서버를 기준으로 프로듀서(producer)와 컨슈머(consumer)는 파이프라인으로 연결돼있는 구조, 프로듀서와 컨슈머사이에 직접연결이 없고 카프카 브로커(서버)를 통해서만 통신이 이루어짐
- broker : 카프카 서버이다. topic을 기준으로 메시지를 관리한다.
- topic : 유사한 메시지의 집합. 프로듀서는 전달할 메시지의 토픽을 반드시 지정해야 한다.
- partition : 로드밸런싱을 목적으로 토픽을 논리적으로 분할한 것
- replication : fault Tolerance를 위해 파티션 단위로 복제한다
- producer : 메시지 송신 API
- consumer: 메시지 수신 API
카프카 아키텍쳐
- 카프카의 서버인 브로커는 마스터 브로커라는 개념이 없고 피어(peer)만으로 구성되어 브로커의 추가 제거가 쉬운 구조.
- 카프카의 구성요소 중 브로커와 컨슈머는 아파치 주키퍼의 도움을 받는다
- 브로커 - 주키퍼 : 상태정보, 파티션 리더선출 등
- 컨슈터 - 주키퍼 : 메시지 오프셋(offset) 트래킹
- 브로커는 여러노드에 걸쳐 배치될 수 있다 하나의 노드에 여러 브로커를 배치하는 것도 가능하다.
- 각 브로커는 IP,Port로 구분한다.
- 운영적으로는 하나의 노드에 하나의 브로커를 설치하는것을 권장
토픽 (TOPIC)
- 토픽은 다양한 종류의 데이터 스트림을 다루는 방법을 제공한다
- 유사한 메시지 그룹을 토픽으로 지정하며 프로듀서는 토필을 지정하여 메시지를 전송한다. 컨슈머는 지정된 토픽으로부터 메시지를 수신한다.
- 토픽은 카프카의 쉘명령어를 통해 생성될 수 있고 지정한 토픽이 없을 경우 서버에서 자동으로 생성되게 할 수도 있다.
파티션 (partition)
- 토픽은 반드시 1개이상의 파티션을 가진다
- 토픽은 하나의 브로커에 단일 파티션으로 배치되거나 다수의 브로커에 분산 배치 될 수 있다. 분산 배치의 경우 부하를 분산시킬 수 있다.
- 토픽 내 파티션의 갯수 조정,브로커 배치는 카프카 명령어를 통해 조정할 수 있다.
- 메시지는 파티션에 순차적으로 저장되고 파티션 번호와 오프셋을 키값으로 식별된다.
- 카프카에서 메시지는 순차적으로만 읽을 수 있으며 읽은 후 바로 삭제되지 않고 일정 기간 보관된다
레플리케이션 (replication)
- 카프카는 파티션단위로 복제를 한다.
- 복제 계수를 3으로 했을 때, 2대의 장애가 발생해도 서비스에 지장이 없다.(이때, 리더는 1개 나머지 2개는 팔로어)
- 복제계수는 브로커 갯수 이상으로 설정할 수 없다.
- 프로듀서가 메시지를 기록하거나 컨슈머가 메시지를 가져갈때 리더 파티션이 담당한다.
- 리더가 장애가 생기면 팔로어 중 하나가 리더가 된다. 장애 체크나 리더선출은 주키퍼가 담당
컨슈머 그룹 (consumer group)
-
카프카 컨슈머는 심플 컨슈머와 하이-레벨 컨슈머 두 종류
-
컨슈머는 자신이 읽은 위치(오프셋)을 직접 관리한다. 하이레벨 컨슈머의 오프셋은 주키퍼에 의해 관리된다.
-
TOPIC 1에 새로운 메시지가 들어올 경우 group1,group2 는 동시에 메시지를 전달받게 된다.
-
group1 : 메시지는 3개의 파티션중 하나의 파티션으로 들어가는데 그 파티션의 소유권을 가진 컨슈머가 메시지를 처리한다.
-
group2 : 메시지가 어느 파티션으로 들어가던지 단일 컨슈머에서 메시지를 처리한다.
-
컨슈머 그룹 주의점 : 그룹내의 컨슈머 수가 토픽의 파티션 수보다 많이 생성해서는 안된다.
-
추가된 컨슈머는 메시지 처리에 참여할 수 없다.
-
리밸선싱할 때, 문제가 발생할 수 있다.
-
-
발행-구독모델 :
-
비동기 메시징 방식 발신자의 메시지는 수신자가 지정되어 있지 않다. 수신자는 원하는 메시지만을 수신할 수 있다. 관심사는 수신자 발신자의 정보가 아닌 토픽이 관심.
topic vs queue
-
Topics
-
In JMS a Topic implements publish and subscribe semantics. When you publish a message it goes to all the subscribers who are interested - so zero to many subscribers will receive a copy of the message. Only subscribers who had an active subscription at the time the broker receives the message will get a copy of the message.
-
-
Queues
-
A JMS Queue implements load balancer semantics. A single message will be received by exactly one consumer. If there are no consumers available at the time the message is sent it will be kept until a consumer is available that can process the message. If a consumer receives a message and does not acknowledge it before closing then the message will be redelivered to another consumer. A queue can have many consumers with messages load balanced across the available consumers.
-
So Queues implement a reliable load balancer in JMS.
-
기존 메시징 시스템과 차이점 (ActiveMQ, RabbitMQ 등)
-
ActiveMQ ibatis와 spring 연동시 SqlMapClientTemplate과 역할이 유사한 JmsTemplate이 queue와 sender의 가운데 껴있음
-
AMQP 프로토콜이나 JMS API를 사용하지 않고 단순한 메시지 헤더를 지닌 TCP기반의 프로토콜을 사용하여 프로토콜에 의한 오버헤드를 감소 시켰다.
-
Producer가 Broker에게 다수의 메시지를 전송할 때, 각 메시지를 개별적으로 전송해야하는 기존 메시지 시스템과는 달리, 다수의 메시지를 Batch형태로 Broker에게 한 번에 전달할 수 있어 TCP/IP 라운드트립 횟수를 줄일 수 있다.
-
-
메시지를 기본적으로 메모리에 저장하는 기존 메시징 시스템과는 달리 메시지를 파일 시스템에 저장한다.
- 별도의 설정이 없이 데이터의 영속성이 보장된다.
- 기존 메시징 시스템은 처리되지 않고 남아있는 메시지의 수가 시스템의 성능에 영향을 미쳤다. Kafka에서는 메시지를 파일 시스템에 저장하기 때문에 메시지를 쌓아두어도 성능에 영향을 미치지 않는다.
- (Kafka) 메시지를 용도에 따라서 쌓아 두어도 되기 때문에 실시간 처리 뿐 아니라 배치성 작업에도 적합하다.
- Consumer에 의해 처리된 메시지(ack message)를 곧바로 삭제하는 기존 메시징시스템과는 달리 처리된 메시지를 삭제하지 않고 파일 시스템에 그대로 두었다가 설정에 따라 삭제한다.
- 메시지를 일정시간동안 보관하고 있기 때문에 메시지 처리 도중 문제가 발생했거나 로직이 변경될 경우 Consumer가 메시지를 처음부터 다시 처리 (Rewind)하도록 할 수 있다.
- 기존의 메시징 시스템에는 Broker가 Consumer에게 메시지를 push해 주는 방식, Kafka는 Consumer가 Broker로 부터 메시지를 가져가는 pull방식 (Consumer의 처리능력만큼의 메시지만 가져오기 때문에 최적의 성능을 낼 수 있다.)
- push 방식의 메시징 시스템에서는 Broker가 직접 각 Consumer가 어떤 메시지를 처리해야 하는지 계산하고 어떤 메시지를 처리 중인지 트랙킹
- pull 방식의 Kafka 메시징 시스템은 Consumer가 직접 필요한 메시지를 가져오므로 Broker의 메시지 관리기능, Consumer 관리기능에 대한 부담 저하
- kafka의 pull방식은 메시지를 쌓아 두었다가 주기적으로 처리하는 batch Consumer의 구현이 가능하다
Kafka 동작 방식 자세히
- Topic과 Partitions
- kafka의 topic은 partition이라는 단위로 쪼개져 클러스터에 분산되어 저장된다. 복제(replication) 설정을 할 경우 이 또한 partition 단위로 각 서버들에 분산되어 복제되고 장애가 발생하면 partition 단위로 fail over(장애극복)가 수행됨
- 위 그림은 하나의 Topic이 3개의 partition에 분산되어 순차적으로 저장되는 모습을 보여주고 있다.
- topic에서 쪼개진 각 partition은 0부터 1씩 증가하는 offset 값을 메시지에 부여하는데 이 값은 partition 내에서 메시지를 식별하는 ID로 사용된다.
- 메시지의 offset값은 partition마다 별도 관리되므로 partition번호 + 메시지 offset값을 함께 사용한다.
Partition의 분산(복제 아님)
- 위 그림은 3개의 broker로 이루어진 클러스터에서 하나의 Topic이 3개의 partition으로 분산되어 저장되어 있다.
- Producer가 메시지를 전송할때 어떤 파티션을 선택할지는 사용자가 구현한 partition 분배 정책에 의해 결정된다.
- 라운드로빈 : partition내 메시지 균등 분배하는 형태
- 메시지-키 : 알파벳 A로 시작하는 메시지는 P0, B로 시작하는 메시지는 P1으로 전송하는 형태도 가능
- mod 연산 : 사용자 ID의 CRC32 값을 partition수로 나눠 modulo 연산을 수행해서 동일한 ID를 가지는 메시지끼리 동일한 partition에 저장하는 방식 CRC32(ID) % count(Partition)
Partition의 복제 Replication
- Kafka에서는 고사용성을 위하여 각 partition을 복제하여 클러스터에 분산시킬 수 있다. 다음은 하나의 topic을 3개의 partition으로 분할하고 Replication-Factor를 3으로 설정한 상태이다. 1객체의 명칭은 Replica
- Replication-factor를 N으로 설정할 경우 N개의 replica가 생성되고 1개의 replica는 리더가 되고 N-1개의 replica는 follower가 된다.
- Leader replica만 partition에 메시지를 읽고 쓰기를 할 수 있다. follower는 단순히 leader를 복제하기만 한다.
- leader에 장애가 발생할 경우 follower중 하나가 leader가 된다. ** leader에서만 읽기/쓰기를 수행한다고 하면 부하 분산이 되지 않고 리더가 쏠려버리지 않나 생각할 수 있는데, 각 partition의 leader가 클러스터 내의 broker들에 균등하게 분배되도록 알고리즘이 설계되어 있기 때문에 부하는 자연스럽게 분산이 된다. 위 그림과 같이 leader를 고려하여 partition별로 분배가 된다.
Consumer와 Consumer Group
-
메시징 모델은 크게 큐(queue) 모델과 발행-구독(publish-subscribe)모델로 나뉜다.
-
-
큐 모델 : 메시지가 쌓여있는 큐로 부터 메시지를 가져와서 consumer pool에 있는 consumer중 하나에 메시지를 할당하는 방식
-
발행-구독 : topic을 구독하는 모든 consumer에게 메시지를 브로드캐스팅하는 방식
-
Kafka에서는 consumer group이라는 개념을 도입하여 두 가지 모델을 발행-구독 모델로 일반화 했다.
-
Kafka의 partition은 Consumer Group내에서 오로지 하나의 Consumer의 접근만 허용하며 그 consumer를 partition owner라고 한다. 따라서 동일 consumer group내에서 consumer들이 각각 접근할 수 있는 partition이 다르다
-
한 번 정해진 partition owner는 broker나 consumer 구성의 변동이 있을때에야 바뀐다. Consumer가 추가/제거되면 consumer group에 속한 consumer들끼리 partition 재분배(rebalancing)가 발생
-
broker가 추가/제거되면 전체 consumer group에서 partition 재분배가 발생
-
Consumer group내 consumer 갯수 < partition 갯수 (partition이 더 많으면)
-
하나의 consumer가 여러개의 partition을 소유하게 된다.
-
Consumer group내 consumer 갯수 > partition 갯수 (Consumer가 더 많으면)
-
여분의 consumer는 메시지를 처리하지 않는다(논다)
-
위 그림과 같이 consumer group내 다수의 consumer를 할당하면 각 consumer마다 별도의 partition으로 부터 메시지를 받아오기 때문에(Producer가 각 partition에 메시지를 균등하게 분배한다고 가정할 경우) consumer group은 큐모델로 동작하게 된다.
-
단일 consumer로 이루어진 consumer group을 활용하면 다수의 consumer가 동일한 partition에 동시에 접근하여 동일한 메시지를 액세스 하기 때문에 발행-구독 모델을 구성할 수 있다.
-
이처럼 하나의 consumer에 의하여 독점적으로 partition이 액세스 되기 때문에 동일 partition 의 메시지는 partition에 저장된 순서대로 처리된다 만약 특정 키를 지닌 메시지가 발생시간 순으로 처리되어야 한다면 분배 알고리즘을 통해 특정 키를 지닌 메시지는 동일한 파티션에 할당되어 단일 consumer가 처리하도록 해야한다.
-
다른 partition에 속한 메시지의 순차적 처리는 보장되어 있지 않기 때문에 특정 topic의 전체 메시지가 발생시간 순으로 처리되어야 할 경우 topic을 나누지 말고 단일 partition만을 가지도록 해야한다.
** zero-copy : https://www.ibm.com/developerworks/linux/library/j-zerocopy/
파일 시스템을 활용한 고성능 디자인
-
-
일반적으로 하드디스크는 메모리보다 수백-수천배 이상 느리다 그러나 특정 조건에서는 메모리보다 10배 이내로 느리거나 심지어는 빠를 수도 있다.
-
하드디스크의 순차적 읽기 성능은 메모리에 대한 랜덤 읽기 성능보다 뛰어나며 메모리의 순차적 읽기 성능보다 7배 정도 느리다.( 물론 하드의 랜덤읽기와 메모리의 랜덤읽기는 10만배 정도 차이가 난다)
-
Kafka는 메모리에 별도의 캐시를 구현하지 않고 OS의 페이지 캐시에 이를 모두 위임한다. OS가 알아서 서버의 유휴 메모리를 페이지 캐시로 활용하여 앞으로 필요할 것으로 예상되는 메시지들을 미리 읽어서(readhead) 디스크 읽기 성능을 향상 시킨다
-
Kafka의 메시지는 하드디스크로 부터 순차적으로 읽혀지기 때문에 하드디스크의 핸덤읽기 성능에 대한 단점을 보완함과 동시에 OS페이지 캐시를 효과 적으로 활용할 수 있다.
-
메시지를 파일시스템에 저장하면서 얻는 부수적인 효과
-
메모리에 메시지를 저장하지 않기 때문에 메시지가 JVM 객체로 변환되면서 크기가 커지는 것을 방지할 수 있고, JVM의 GC로 인한 성능저하 또한 피할 수 있다.
-
Kafka 프로세스가 직접 캐시를 관리하지 않고 OS에 위임하기 때문에 프로세스를 재시작 하더라도 OS의 페이지 캐시는 그대로 남아있기 때문에 프로세스 재시작 후 캐시를 워밍업할 필요가 없다.
-
kafka에서는 파일 시스템에 저장된 메시지를 네트워크를 통해 consumer에게 전송할 때 zore-copy기법을 사용하여 데이터 전송 성능을 향상 시켰다
-
zero-copy:
-
1 .The first is that Kafka does only sequential file I/O. To enable this kafka enforces end-to-end ordering of messages in delivery.
-
- The second reason is because Kafka supports end-to-end batching of messages. Computers love linear scans and transfers with big arrays, they hate little bursty random messages
-
consumer offset tracking http://kafka.apache.org/documentation.html
kafka test tutorial
- start zookeeper
- start kafka-server (각 인스턴스에서) :
$ bin/kafka-server-start.sh config/server.properties
- create topic
$ bin/kafka-topics.sh --zookeeper localhost:2181 \--create --topic zerg.hydra --partitions 3 --replication-factor 2
bin/kafka-topics.sh --create --zookeeper hadoop.apps:2181 --replication-factor 3 --partitions 3 --topic test
-
list topic
-
bin/kafka-topics.sh --list --zookeeper hadoop.apps:2181
-
describe topic
-
bin/kafka-topics.sh --describe --zookeeper hadoop.apps:2181 --topic test
-
alter topic
-
bin/kafka-topics.sh --alter --zookeeper hadoop.apps:2181 --topic test --partitions 40
-
kafka topic 삭제하는 방법
-
bin/kafka-run-class.sh kafka.admin.DeleteTopicCommand --zookeeper localhost:2181 --topic test
-
[config] rebalancing
-
auto.leader.rebalance.enable=true
-
producer
-
bin/kafka-console-producer.sh --broker-list hadoop.slave01:9092,hadoop.slave02:9092,hadoop.slave03:9092 --sync --topic cluster.test1
-
consumer
-
bin/kafka-console-consumer.sh --zookeeper hadoop.apps:2181 --topic cluster.test1 --from-beginning
-
offset monitor
-
java -cp KafkaOffsetMonitor-assembly-0.2.1.jar \ com.quantifind.kafka.offsetapp.OffsetGetterWeb \ --zk hadoop.apps:2181,hadoop.slave02:2181,hadoop.slave03:2181 \ --port 8081 \ --refresh 10.seconds \ --retain 2.days