Apache Kafka ‐ 카프카 프로듀서 애플리케이션 - dnwls16071/Backend_Summary GitHub Wiki

  • 카프카에서 데이터 시작점은 프로듀서이다. 프로듀서 애플리케이션은 카프카에 필요한 데이터를 선언하고 브로커의 특정 토픽의 파티션에 전송한다.
  • 프로듀서는 데이터를 전송할 때 리더 파티션을 가지고 있는 카프카 브로커와 직접 통신한다.
  • 프로듀서는 카프카 브로커로 데이터를 전송할 때 내부적으로 파티셔너, 배치 생성 단계를 거친다.
  • ProducerRecord : 프로듀서에서 생성하는 레코드(오프셋은 없음)
  • send() : 레코드 전송 요청 메서즈
  • Partitioner : 어느 파티션으로 전송할 것인가를 지정하는 파티셔너. 기본 값으로 DefaultPartitioner로 설정된다.
  • Accumulator : 배치로 묶어 전송할 데이터를 모으는 버퍼

📚 파티셔너

  • 프로듀서 API를 사용하면 UniformStickyPartitioner, RoundRobinPartitioner 2개 파티셔너를 제공한다.
  • 카프카 클라이언트 라이브러리 2.5.0 버전에서 파티셔너를 지정하지 않을 경우라면 기본적으로 UniformStickyPartitioner가 파티셔너로 설정된다.
  • 메시지 키가 있을 경우의 동작 메커니즘
    • UniformStickyPartitioner와 RoundRobinPartitioner 둘 다 메시지 키가 있을 때에는 메시지 키의 해시값과 파티션을 매칭하여 레코드를 전송한다.
    • 동일한 메시지 키가 존재하는 레코드는 동일한 파티션 번호에 전달된다.
    • 만약 파티션 개수가 변경될 경우 메시지 키와 파티션 번호 매칭은 깨지게 된다.
  • 메시지 키가 없을 경우의 동작 메커니즘
    • 메시지 키가 없을 때에는 파티션에 최대한 동일하게 분배하는 로직이 있는데 UniformStickyPartitioner는 RoundRobinPartitioner의 단점을 개선하였다는 점에서 다르다.
    • RoundRobinPartitioner
      • ProducerRecord가 들어오는 대로 파티션을 순회하면서 전송한다.
      • Accumulator에서 묶이는 정도가 적기 때문에 전송 성능이 낮다.
    • UniformStickyPartitioner
      • Accumulator에서 레코드들이 배치로 묶일 때까지 기다렸다가 전송한다.
      • 배치로 묶일 뿐 결국 파티션을 순회하면서 보내기 때문에 모든 파티션에 분배되어 전송되고 RoundRobinPartitioner에 비해 향상된 성능을 가진다.

📚 프로듀서 주요 옵션⭐

프로듀서 주요 옵션(필수)

  • bootstrap.servers
    • 프로듀서가 데이터를 전송할 대상 카프카 클러스터에 속한 브로커의 호스트 이름:포트를 1개 이상 작성한다.
    • 2개 이상 브로커 정보를 입력해 일부 브로커에서 이슈가 발생하더라도 접속하는 데 이슈가 없도록 설정 가능하다.
  • key.serializer
    • 레코드의 메시지 키를 직렬화하는 클래스를 지정한다.
  • value.serializer
    • 레코드의 메시지 값을 직렬화하는 클래스를 지정한다.

프로듀서 주요 옵션(선택)

  • acks
    • 프로듀서가 전송한 데이터가 브로커들에게 정상적으로 저장되었는지 전송 성공 여부를 확인하는데 사용하는 옵션으로 0, 1, -1(all) 중 하나로 설정할 수 있다. 기본 값은 1이다.
  • linger.ms
    • 배치를 전송하기 전까지 기다리는 최소 시간이다. 기본 값은 0이다.
  • max.in.flight.requests.per.connection
    • 한 번에 요청하는 최대 커넥션 개수. 설정된 값만큼 동시에 전달 요청을 수행한다. 기본 값은 5이다.
  • retries
    • 브로커로부터 에러를 받고 난 뒤 재전송을 시도하는 횟수를 지정한다. 기본 값은 2147483647이다.
  • partitioner.class
    • 레코드를 파티션에 전송할 때 적용하는 파티셔너 클래스를 지정한다. 기본 값은 org.apache.kafka.clients.producer.internals.DefaultPartitioner이다.
  • enable.idempotence
    • 멱등성 프로듀서로 동작할지 여부를 지정한다. 기본 값은 false이다.
  • transactional.id
    • 프로듀서가 레코드를 전송할 때 레코드를 트랜잭션 단위로 묶을지 여부를 설정한다. 기본 값은 null이다.

📚 ISR와 acks 옵션⭐

  • 이전에도 배웠지만 ISR은 리더 파티션의 상태와 팔로워 파티션의 상태가 모두 싱크가 된 상태를 말한다.
  • 복제 개수가 2인 토픽을 가정하면 위와 같이 리더 파티션 1개, 팔로워 파티션 1개가 존재한다.
  • 리더 파티션에 0부터 3까지의 오프셋이 있다고 가정할 때, 팔로워 파티션에 데이터 동기화가 완료되려면 0부터 3까지의 오프셋이 존재해야 한다.
  • 동기화가 완료되었다는 의미는 리더 파티션의 모든 데이터가 팔로워 파티션에 복제된 상태를 말하기 때문이다.
  • ISR이라는 용어가 나온 이유는 팔로워 파티션이 리더 파티션으로부터 데이터를 복제하는 데 걸리는 시간이 있기 때문이다.
  • 프로듀서가 특정 파티션에 데이터를 저장하는 작업은 리더 파티션을 통해 처리한다.
  • 이 때, 리더 파티션에 새로운 레코드가 추가되어 오프셋이 증가하면 팔로워 파티션이 위치한 브로커는 리더 파티션의 데이터를 복제한다.
  • 리더 파티션에 데이터가 적재된 이후 팔로워 파티션이 복제하는 시간 차 때문에 리더와 팔로워 간 오프셋 차이가 발생한다.

acks

  • 카프카 프로듀서의 acks 옵션은 0, 1, all(-1) 값을 가질 수 있다.
  • 이 옵션을 통해 프로듀서가 전송한 데이터가 카프카 클러스터에 얼마나 신뢰성높게 저장할지 지정할 수 있다.
  • acks 옵션에 따라 성능이 달라질 수 있으므로 acks 옵션에 따라 카프카의 동작 방식을 상세히 알고 설정해야 한다.
  • 복제 개수가 1 즉, 리더 파티션 1개만 있는 경우 acks 옵션에 따른 성능 변화는 크지 않다.
  • 그러나 안정적으로 데이터를 운영하기 위해서는 복제 개수가 2이상으로 운영하는 경우가 대부분이다.

acks=0

  • acks를 0으로 설정하는 것은 프로듀서가 리더 파티션으로 데이터를 전송했을 때 리더 파티션으로 데이터가 저장되었는지 확인하지 않는다는 뜻이다.
  • 리더 파티션은 데이터가 저장된 이후에 데이터가 몇 번째 오프셋에 저장되었는지를 리턴하는데 acks가 0으로 설정되어 있다면 프로듀서는 리더 파티션에 데이터가 저장되었는지 여부에 대한 응답 값을 받지 않는다.
  • 데이터 전송 속도는 acks를 1 또는 all로 했을 경우보다 훨씬 빠르다. 데이터가 일부 유실이 발생하더라도 전송 속도가 중요한 경우라면 이 옵션값을 사용하는 것이 좋다.

acks=1

  • acks를 1로 설정한 경우 프로듀서는 보낸 데이터가 리더 파티션에만 정상적으로 적재되었는가를 확인한다.
  • 만약 리더 파티션에 데이터가 정상적으로 적재되지 않았다면 리더 파티션에 적재될 때까지 재시도할 수 있다.
  • 그러나 리더 파티션에 적재되었음을 보장하더라도 데이터는 유실될 수 있다. 왜냐하면 복제 개수를 2이상으로 운영할 경우 리더 파티션에 적재가 완료되더라도 팔로워 파티션에는 아직 동기화되지 않을 수 있기 때문이다.
  • 팔로워 파티션이 데이터를 복제하기 직전에 리더 파티션이 있는 브로커에 장애가 발생하면 동기화되지 못한 데이터가 유실될 수 있다.

acks=-1(all)

  • acks를 -1 혹은 all로 설정할 경우 프로듀서는 보낸 데이터가 리더 파티션과 팔로워 파티션에 모두 적재되었는지를 확인한다.
  • 리더 파티션뿐만 아니라 팔로워 파티션까지 데이터가 적재되었는지를 확인하기 때문에 0 또는 1 옵션에 비하면 상대적으로 속도가 느리다.
  • 그럼에도 불구하고 팔로우 파티션에 데이터가 정상 적재되었는지 기다리기 때문에 일부 브로커에서 장애가 발생하더라도 프로듀서는 안전하게 데이터를 전송하고 저장할 수 있음을 보장할 수 있다.
  • acks를 all로 설정할 경우에는 토픽 단위로 설정 가능한 min.insync.replicas 옵션값에 따라 데이터 안정성이 달라진다.

min.insync.replicas

  • min.insync.replicas 옵션은 프로듀서가 리더 파티션과 팔로워 파티션에 데이터가 적재되었는가를 확인하기 위한 최소 ISR 그룹의 파티션 개수이다.
  • 만약 이 값이 1이라면 ISR 중 최소 1개 이상의 파티션에 데이터가 적재되었음을 확인하는 것이다.
  • 값이 1인 상태의 경우 acks를 1로 설정했을 때와 동일한 동작을 수행한다. 왜냐하면 값이 1이라면 팔로워 파티션이 없이 리더 파티션만 존재하기 때문에 ISR 중 처음 적재가 완료되는 파티션은 리더 파티션이기 때문이다.

min.insync.replicas=2, acks=-1(all)

  • min.insync.replicas=2로 설정했을 때부터 acks를 all로 설정하는 의미가 있다.
  • 이 경우 ISR의 2개 이상의 파티션에 데이터가 적재되었음을 확인했다는 뜻이기 때문이다.
  • ISR의 2개 이상의 파티션에 데이터가 적재되었다는 의미는 리더 파티션 1개와 팔로워 파티션 1개에 정상 적재되었음을 보장하기 떄문이다.
  • 실제 카프카 클러스터를 운영하면서 브로커가 동시에 2개가 중단되는 일은 극히 드물기 때문에 리더 파티션과 팔로워 파티션 중 1개에 데이터가 적재 완료되었다면 데이터는 유실되지 않는다고 볼 수 있다.

📚 프로듀서 애플리케이션 예제

❗KafkaProducer의 send() 메서드는 Future 객체를 반환한다. 이 객체는 RecordMetadata의 비동기 결과를 표현하는 것으로 ProducerRecord가 카프카 브로커에 정상적으로 적재되었는지에 대한 데이터가 포함되어 있다. get() 메서드를 호출하면 프로듀서로 보낸 데이터의 결과를 동기적으로 가져올 수 있다. ❗프로듀서를 안전하게 종료하려면 close() 메서드를 사용하여 Accumulator에 저장된 모든 데이터를 카프카 클러스터로 전송해야 한다. ❗메시지 키를 지정하여 레코드를 전송할 때, 같은 메시지 키를 가진 레코드들은 같은 파티션으로 간다.