- 카프카 브로커는 카프카 클라이언트와 데이터를 주고받기 위해 사용하는 주체로 데이터를 분산 저장해 장애가 발생하더라도 안전하게 사용할 수 있도록 도와주는 애플리케이션이다.
- 카프카는 메모리나 데이터베이스에 저장하지 않으며 따로 캐시 메모리를 구현하여 사용하지도 않는다.
- 카프카는 페이지 캐시(Page Cache)를 사용하여 디스크 입출력 속도를 높여서 성능 저하를 해결했다. 페이지 캐시란, OS에서 파일 입출력 성능 향상을 위해 만들어 놓은 메모리 영역이다.
- 한 번 읽은 파일의 내용은 메모리의 페이지 캐시 영역에 저장한다. 추후 동일한 파일의 접근이 일어나면 디스크에서 읽지 않고 메모리에서 직접 읽는 방식이다.
- 데이터 복제는 카프카를 장애 허용 시스템으로 동작하도록 하는 원동력이다.
- 복제의 이유는 클러스터로 묶인 브로커 중 일부에 장애가 발생하더라도 데이터를 유실하지 않고 안전하게 사용하기 위함이다.
- 카프카의 데이터 복제는 파티션 단위로 이루어진다.
- 토픽을 생성할 때 파티션의 복제 개수도 같이 설정되는데 직접 옵션을 선택하지 않으면 브로커에 설정된 옵션 값을 따라간다.
- 복제 개수의 최솟값은 1(=복제 없음)이고 최댓값은 클러스터 내의 브로커 개수만큼 설정하여 사용할 수 있다.
- 프로듀서 또는 컨슈머와 직접 통신하는 파티션을 리더 파티션, 나머지 복제 데이터를 가지고 있는 파티션을 팔로워 파티션이라고 부른다.
- 팔로워 파티션들은 리더 파티션의 오프셋을 확인해 현재 자신이 가지고 있는 오프셋과 차이가 나는 경우 리더 파티션으로부터 데이터를 가져와서 자신의 파티션에 저장하는데 이 과정을 복제라고 부른다.
- 파티션 복제로 인해 나머지 브로커에도 데이터가 복제되므로 복제 개수만큼의 저장 용량이 증가한다는 단점이 있으나 복제를 통해 데이터를 안전하게 사용할 수 있다.
- 클러스터의 다수 브로커 중 한 대가 컨트롤러 역할을 한다.
- 컨트롤러는 다른 브로커들의 상태를 체크하고 브로커가 클러스터에서 빠지는 경우 해당 브로커에 존재하는 리더 파티션을 재분배한다.
- 카프카는 지속적으로 데이터를 처리해야 하므로 브로커의 상태가 비정상이라면 빠르게 클러스터에서 빼내는 것이 중요하다.
- 만약 컨트롤러 역할을 하는 브로커에 장애가 생기면 다른 브로커가 컨트롤러 역할을 수행한다.
- 카프카는 RabbitMQ와 달리 컨슈머가 데이터를 가져가더라도 토픽의 데이터는 삭제되지 않는다.
- 또한, 컨슈머나 프로듀서가 데이터 삭제를 요청할 수도 없다. 오직 브로커만이 데이터를 삭제할 수 있다.
- 데이터 삭제는 파일 단위로 이루어지는데 이 단위를 로그 세그먼트(Log Segment)라고 한다.
- 이 세그먼트에는 다수의 데이터가 들어있기 때문에 일반적인 데이터베이스처럼 특정 데이터를 선별해서 삭제할 수가 없다.
- 카프카는 데이터를 삭제하지 않고 메시지 키를 기준으로 오래된 데이터를 압축하는 정책을 가져갈 수 있다.
- 컨슈머 그룹은 토픽이 특정 파티션으로부터 데이터를 가져가서 처리하고 이 파티션의 어느 레코드까지 가져갔는지를 확인하기 위해 오프셋을 커밋한다.
- 커밋한 오프셋은
__consumer_offsets 토픽에 저장한다. 여기에 저장된 오프셋을 토대로 컨슈머 그룹은 다음 레코드를 가져가서 처리한다.
- 클러스터 다수 브로커 중 한 대는 코디네이터 역할을 수행한다.
- 코디네이터는 컨슈머 그룹의 상태를 체크하고 파티션을 컨슈머와 매칭되도록 분배하는 역할을 한다.
- 컨슈머가 컨슈머 그룹에서 빠지면 매칭되지 않은 파티션을 정상 동작하는 컨슈머로 할당해 끊임없이 데이터가 처리되도록 도와주는데 이렇게 파티션을 컨슈머로 재할당하는 과정을 리밸런스라고 부른다.
- 토픽은 카프카에서 데이터를 구분하기 위해 사용되는 단위이다.
- 토픽은 1개 이상의 파티션을 소유하고 있다.
- 파티션에는 프로듀서가 보낸 데이터들이 들어가 저장되는데 이 데이터를 레코드라고 부른다.
- 파티션은 카프카 병렬 처리의 핵심으로써 그룹으로 묶인 컨슈머들이 레코드를 병렬로 처리할 수 있도록 매칭된다.
- 컨슈머 처리량이 한정된 상황에서 많은 레코드를 병렬로 처리하는 가장 좋은 방법은 컨슈머 개수를 늘려 스케일 아웃하는 것이다.
- 컨슈머 개수를 늘림과 동시에 파티션 개수도 늘리면 처리량이 증가하는 효과를 볼 수 있다.
- 파티션은 자료구조에서 접하는 큐와 비슷한 구조라고 생각하면 쉽다. FIFO 구조와 같이 먼저 들어간 레코드는 컨슈머가 먼저 가져가게 된다.
토픽 이름 제약 조건과 네이밍 1
토픽 이름 제약 조건과 네이밍 2
토픽 이름 제약 조건과 네이밍 3
- 레코드는 타임스탬프, 메시지 키, 메시지 값, 오프셋, 헤더로 구성되어 있다.
- 프로듀서가 생성한 레코드가 브로커로 전송되면 오프셋과 타임스탬프가 지정되어 저장된다. 브로커에 한 번 적재된 레코드는 수정할 수 없고 로그 리텐션 기간 또는 용량에 따라서만 삭제가 된다.
- 타임스탬프(Timestamp)는 프로듀서에서 해당 레코드가 생성된 시점의 유닉스 타임이 설정된다. 컨슈머는 레코드의 타임스탬프를 토대로 언제 레코드가 생성되었는가를 알 수 있다.
- 다만, 프로듀서가 레코드를 생성할 때 임의의 타임스탬프 값을 설정할 수 있고 토픽 설정에 따라 브로커에 적재된 시간(LogAppendTime)으로 설정될 수 있다는 점을 유의해야 한다.
- 메시지 키는 메시지 값을 순서대로 처리하거나 메시지 값 종류를 나타내기 위해 사용한다.
- 메시지 키를 사용하면 프로듀서가 토픽에 레코드를 전송할 때 메시지 키의 해시 값을 토대로 파티션을 지정하게 된다. 즉, 동일한 메시지 키라면 동일 파티션에 들어간다. 다만 어느 파티션에 지정될지 알 수 없고 파티션 개수가 변경되면 메시지 키와 파티션 매칭이 달라지게 되므로 주의해야 한다.
- 메시지 키를 선언하지 않으면 null로 설정된다. 메시지 키가 null로 설정된 레코드는 프로듀서 기본 설정 파티셔너에 따라서 파티션에 분배되어 적재된다.
- 메시지 값에는 실질적으로 처리할 데이터가 들어 있다.
- 메시지 키와 값은 직렬화되어 브로커로 전송되기 때문에 컨슈머가 이용할 때는 직렬화한 형태와 동일한 형태로 역직렬화를 수행해야 한다.
- 직렬화/역직렬화할 때는 반드시 동일한 형태로 처리해야 한다.(예를 들어, StringSerializer로 직렬화한 메시지 값을 컨슈머가 IntegerDeserializer로 역직렬화하면 문제가 발생)
- 레코드 오프셋은 0이상의 숫자로 이루어져 있다.
- 레코드의 오프셋은 직접 지정할 수 없고 브로커에 저장될 때 이전에 전송된 레코드의 (오프셋 + 1) 값으로 생성된다.
- 오프셋은 카프카 컨슈머가 데이터를 가져갈 때 사용된다.
- 오프셋을 사용하면 컨슈머 그룹으로 이루어진 카프카 컨슈머들이 파티션의 데이터를 어디까지 가져갔는지 명확히 알 수 있다.
헤더(Header)
- 레코드의 추가적인 정보를 담는 메타 데이터 저장소 용도로 사용한다.
- 헤더는 키/값 형태로 데이터를 추가하여 레코드의 속성을 저장하여 컨슈머에서 참조할 수 있다.
- 카프카 클러스터에 명령을 내리거나 데이터를 송수신하기 위해 카프카 클라이언트 라이브러리는 카프카 프로듀서/컨슈머/어드민 클라이언트를 제공하는 카프카 클라이언트를 사용해 애플리케이션을 개발한다.
- 자바 기본형과 참조형뿐만 아니라 동영상, 이미지와 같은 바이너리 데이터도 프로듀서를 통해 전송할 수 있다.
-
UniformStickyPartitioner : 프로듀서 동작에 특화되어 높은 처리량과 낮은 리소스 사용률을 가진다.(기본)
-
RoundPartitioner : 프로듀서에서 전송한 레코드가 들어오는대로 파티션을 순회하면서 전송하기 때문에 배치로 묶이는 빈도가 적다.
- 추가적으로 카프카 프로듀서는 압축 옵션을 통해 브로커로 전송 시 압축 방식을 정할 수 있다.
- 압축 옵션을 정하지 않으면 압축이 되지 않은 상태로 전송된다.
- 카프카 프로듀서에는 압축 옵션으로 gzip, snappy, lz4, zstd를 지원한다.
- 압축을 하면 데이터 전송 시 네트워크 처리량에서 이득을 볼 수 있지만 압축을 하는 데 CPU 또는 메모리 리소스를 사용하기 때문에 사용환경에 따라 적절한 압축 옵션을 사용하는 것이 좋다.
- 또한, 프로듀서에서 압축한 메시지는 컨슈머가 압축을 풀게 되는데 이 때도 컨슈머 애플리케이션 리소스가 사용된다.
-
bootstrap.servers : 프로듀서가 데이터를 전송할 대상 카프카 클러스터에 속한 브로커의 호스트 이름:포트를 1개 이상 작성한다. 2개 이상 브로커 정보를 입력해 일부 브로커에 이슈가 발생하더라도 접속하는 데에 이슈가 없도록 설정 가능하다.
-
key.serializer : 레코드 메시지 키 직렬화 클래스 지정
-
value.serializer : 레코드 메시지 값 직렬화 클래스 지정
-
acks : 프로듀서가 전송한 데이터가 브로커들에 정상적으로 전송되었는지 전송 성공 여부를 확인하는 데 사용하는 옵션이다.
-
buffer.memory : 브로커로 전송할 데이터를 배치로 모으기 위해 설정할 버퍼 메모리양을 지정한다. 기본값은 33554432(32MB)이다.
-
retries : 프로듀서가 브로커로부터 에러를 받고 난 뒤 재전송을 시도하는 횟수를 지정한다. 기본값은 2147483647이다.
-
batch.size : 배치로 전송할 레코드 최대 용량을 지정한다.
-
linger.ms : 배치를 전송하기 전까지 기다리는 최소 시간이다. 기본값은 0이다.
-
partitioner.class : 레코드를 파티션에 전송할 때 적용하는 파티셔너 클래스를 지정한다. 기본값은 org.apache.kafka.clients.producer.internals.DefaultPartitioner이다.
-
enable.idempotence : 멱등성 프로듀서로 동작할지 여부를 결정한다. 기본값은 false이다.
-
transactional.id : 프로듀서가 레코드를 트랜잭션 단위로 묶을지 여부를 설정한다. 프로듀서의 고유한 트랜잭션 ID를 설정할 수 있다. 이 값을 설정하면 트랜잭션 프로듀서로 동작한다. 기본값은 null이다.
- 특정 데이터를 가지는 레코드를 특정 파티션으로 보낼 때가 있는데 이 때, 기본 설정 파티셔너(=UniformStickyPartitioner)를 사용하면 메시지 키의 해시 값을 기준으로 파티션에 매칭하여 데이터를 전송하기 때문에 어느 파티션으로 들어가는지 알 수 없다.
-
Partitioner 인터페이스를 구현해 커스텀 파티셔너를 만들면 해결할 수 있다.
public class CustomPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
if (keyBytes == null) {
throw new InvalidRecordException("Need Message Key");
}
if (((String) key).equals("Pangyo")) {
return 0; // 0번 파티션 고정
}
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
-
KafkaProducer의 send() 메서드는 Future 객체를 반환한다.
- 이 객체는 RecordMetadata의 비동기 결과를 표현한 것으로 ProducerRecord가 카프카 브로커에 정상적으로 적재되었는지에 대한 데이터가 포함되어 있다.
-
get() 메서드를 사용하면 프로듀서로 보낸 데이터의 결과를 동기적으로 가져올 수 있다.
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, messageValue);
RecordMetadata metadata = producer.send(record).get();
logger.info(metadata.toString());
- 그러나 동기로 프로듀서 전송 결과를 확인하는 것은 빠른 전송에 허들이 될 수 있다.
- 프로듀서가 전송하고 난 뒤 브로커로부터 전송에 대한 응답 값을 받기 전까지 대기하기 때문이다. 따라서 이를 원하지 않을 경우를 위해 프로듀서는 비동기로 결과를 확인할 수 있도록 Callback 인터페이스를 제공하고 있다.
- 사용자는 사용자 정의 Callback 클래스를 생성해 레코드의 전송 결과에 대응하는 로직을 만들 수 있다.
public class ProducerCallback implements Callback {
private final static Logger logger = LoggerFactory.getLogger(ProducerCallback.class);
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
logger.error(e.getMessage(), e);
} else {
logger.info(recordMetadata.toString());
}
}
}
- 프로듀서가 전송한 데이터는 카프카 브로커에 적재된다. 컨슈머는 적재된 데이터를 사용하기 위해 브로커로부터 데이터를 가져와서 필요한 처리를 한다.
- 토픽의 파티션으로부터 데이터를 가져가기 위해 컨슈머를 운영하는 방법은 크게 2가지가 있는데, 첫 번째는 1개 이상의 컨슈머로 이루어진 컨슈머 그룹을 운영하는 것이고 두 번째는 토픽의 특정 파티션만 구독하는 컨슈머를 운영하는 것이다.
- 그러나 아래와 같이 파티션을 할당받지 못하는 컨슈머는 쓰레드만 차지하고 실질적인 데이터 처리를 하지 못하므로 애플리케이션 실행에 있어 불필요한 쓰레드로 남게 된다.
- 컨슈머 그룹의 컨슈머에 장애가 발생하면 장애가 발생한 컨슈머에 할당된 파티션은 장애가 발생하지 않은 컨슈머에 소유권이 넘어간다.
- 이런 과정을 리밸런싱(rebalancing)이라고 부르는데 리밸런싱은 크게 2가지 상황에서 발생한다.
- 컨슈머가 추가되는 상황
- 컨슈머가 제외되는 상황
- 컨슈머 중 1개에 이슈가 발생해 더는 동작을 안 하고 있다면 이슈가 발생한 컨슈머에 할당된 파티션은 더는 데이터 처리를 하지 못하게 되므로 데이터 처리에 지연이 발생할 수 있다. 이를 해소하기 위해 이슈가 발생한 컨슈머를 컨슈머 그룹에서 제외시켜 모든 파티션이 지속적으로 데이터를 처리할 수 있도록 가용성을 높여준다.
- 리밸런싱은 언제든지 발생할 수 있으므로 데이터 처리 중 발생한 리밸런싱에 대응하는 코드를 작성해야 한다.
- 컨슈머는 카프카 브로커로부터 데이터를 어디까지 가져갔는지 커밋(Commit)을 통해 기록한다. 특정 토픽의 파티션을 어떤 컨슈머 그룹이 몇 번째 가져갔는지 카프카 브로커 내부에서 사용되는 내부 토픽인
__consumer_offsets에 기록된다.
- 컨슈머 동작 이슈가 발생해
__consumer_offsets 토픽에 어느 레코드까지 읽어갔는지 오프셋 커밋이 기록되지 못했다면 데이터 처리 중복이 발생할 수 있다. 그러므로 데이터 처리 중복이 발생하지 않게 하기 위해서 컨슈머 애플리케이션에서 오프셋 커밋을 정상적으로 처리했는지 검증해야만 한다.
-
bootstrap.servers : 프로듀서가 데이터를 전송할 대상 카프카 클러스터에 속한 브로커의 호스트 이름:포트를 1개 이상 작성한다. 2개 이상 브로커 정보를 입력해 일부 브로커에 이슈가 발생하더라도 접속에 이슈가 없도록 설정 가능하다.
-
key.deserializer : 레코드 메시지 키를 역직렬화하는 클래스를 지정
-
value.deserializer : 레코드 메시지 값을 역직렬화하는 클래스를 지정
-
group.id : 컨슈머 그룹 아이디를 지정한다.
-
auto.offset.reset : 컨슈머 그룹이 특정 파티션을 읽을 때 저장된 컨슈머 오프셋이 없는 경우 어느 오프셋부터 읽어야 할지 선택하는 옵션이다.
- latest : 가장 최근에 넣은 오프셋부터 읽기 시작한다.
- earliest : 가장 오래 전에 넣은 오프셋부터 읽기 시작한다.
- none : 컨슈머 그룹이 커밋한 기록이 있는지 찾아본다.
-
enable.auto.commit : 자동 커밋으로 할지 수동 커밋으로 할지 선택한다. 기본값은 true이다.
-
auto.commit.interval.ms : 자동 커밋일 경우 오프셋 커밋 간격을 지정한다. 기본값은 5000(5초)이다.
-
max.poll.records : poll() 메서드를 통해 반환되는 레코드 개수를 지정한다. 기본값은 500이다.
-
session.timeout.ms : 컨슈머와 브로커 연결이 끊기는 최대 시간이다. 이 시간 내에 하트비트를 전송하지 않으면 브로커는 컨슈머에 이슈가 발생했다고 가정하고 리밸런싱을 시작한다. 보통 하트비트 시간 간격의 3배로 설정한다. 기본값은 10000(10초)이다.
-
heartbeat.interval.ms : 하트비트를 전송하는 시간 간격이다. 기본값은 3000(3초)이다.
-
max.poll.interval.ms : poll() 메서드를 호출하는 간격의 최대 시간을 지정한다. poll() 메서드를 호출한 이후에 데이터를 처리하는데 시간이 너무 많이 걸리는 경우 비정상으로 판단해 리밸런싱을 시작한다. 기본값은 300000(5분)이다.
-
isolation.level : 트랜잭션 프로듀서가 레코드를 트랜잭션 단위로 보낼 경우 사용한다. 이 옵션은 read_committed, read_uncommitted로 설정할 수 있다. read_committed로 설정하면 커밋이 완료된 레코드만 읽는다. read_uncommitted로 설정하면 커밋 여부와 관계없이 파티션에 있는 모든 레코드를 읽는다.
- 컨슈머 그룹에서 컨슈머가 추가 또는 제거가 되면 파티션을 컨슈머에 재할당하는 과정인 리밸런스가 일어난다.
-
poll() 메서드를 통해 반환받은 데이터를 모두 처리하기 전에 리밸런스가 발생하면 데이터를 중복 처리할 수 있다.
-
poll() 메서드를 통해 받은 데이터 중 일부를 처리했으나 커밋하지 않았기 때문이다.
- 리밸런스 발생 시 데이터를 중복 처리하지 않게 하기 위해서는 리밸런스 발생 시 처리한 데이터를 기준으로 커밋을 시도해야 한다.
- 리밸런스 발생을 감지하기 위해 카프카 라이브러리는
ConsumerRebalanceListener 인터페이스를 지원한다.
-
ConsumerRebalanceListener 인터페이스로 구현된 클래스는 onPartitionAssigned() 메서드와 onPartitionRevoked() 메서드로 이루어져 있다.
-
onPartitionAssigned() : 리밸런스가 끝난 뒤에 파티션이 할당 완료되면 호출되는 메서드이다.
-
onPartitionRevoked() : 리밸런스가 시작되기 직전에 호출되는 메서드이다.
public static void main(String[] args) {
Properties configs = new Properties();
// ...
configs.put(ENABLE_AUTO_COMMIT_CONFIG, false);
consumer = new KafkaConsumer<>(configs);
consumer.subscribe(Arrays.asList(TOPIC_NAME), new RebalanceListener());
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecords<String, String> record : records) {
currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1, null));
consumer.commitSync(currentOffsets);
}
}
}
private static class RebalanceListener implements ConsumerRebalanceListener {
public void onPartitionAssigned(Collection<TopicPartition> partitions) {
logger.warn("Partitions are assigned");
}
public void onParitionsRevoked(Collection<TopicPartition> partitions) {
logger.warn("Partitions are revoked");
consumer.commitSync(currentOffsets);
}
}
- 컨슈머를 운영할 때,
subscribe() 메서드를 사용하여 구독 형태로 사용하는 것 외에도 직접 파티션을 명시적으로 할당해 운영할 수도 있다.
- 컨슈머가 어떤 토픽, 어떤 파티션을 할당할지 명시적으로 선언하고 싶다면
assign() 메서드를 사용하면 된다.
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);
consumer.assign(Collections.singleton(new TopicPartition(TOPIC_NAME, PARTITION_NUMBER)));
- 컨슈머에 할당된 토픽과 파티션에 대한 정보는
assignment() 메서드로 확인할 수 있다.
-
assignment() 메서드는 Set 인스턴스를 반환한다. TopicPartition 클래스는 토픽 이름과 파티션 번호가 포함된 객체이다.
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);
consumer.subscribe(Arrays.asList(TOPIC_NAME));
Set<TopicPartition> assignedTopicPartition = consumer.assignment();
- 정상적으로 종료되지 않은 컨슈머는 세션 타임아웃이 발생할 때까지 컨슈머 그룹에 남게 된다.
- 이로 인해 실제로는 종료되었지만 더는 동작을 하지 않는 컨슈머가 존재하기 때문에 파티션 데이터는 소모되지 못하고 컨슈머 LAG이 늘어나게 된다.
- 컨슈머 랙이 늘어나면 데이터 처리 지연이 발생하게 된다.
- 컨슈머를 안전하게 종료하기 위해서는 KafkaConsumer 클래스는
wakeup() 메서드를 지원한다.
-
wakeup() 메서드를 실행하여 KafkaConsumer 인스턴스를 안전하게 종료할 수 있다. wakeup() 메서드가 실행된 이후 poll() 메서드를 호출하게 되면 WakeupException 예외가 발생한다.
Properties configs = new Properties();
configs.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
AdminClient admin = AdminClient.create(configs);
| KafkaAdminClient 메서드명 |
설명 |
| describeCluster(DescribeClusterOptions options) |
브로커 정보 조회 |
| listTopics(ListTopicsOptions options) |
토픽 리스트 조회 |
| listConsumerGroups(ListConsumerGroupsOptions options) |
컨슈머 그룹 조회 |
| createTopics(Collection newTopics, CreateTopicsOptions options)
|
신규 토픽 생성 |
| createPartitions(Map<String, NewPartitions> newPartitions, CreatePartitionsOptions options) |
파티션 개수 변경 |
| createAcls(Collection acls, CreateAclsOptions options)
|
접근 제어 규칙 생성 |