[Study] Kafka producer - nrbae/happy-project GitHub Wiki
파이썬 패키지 설치 필요
- confluent-kafka-python : 성능 빠름(librdkafka 설치 필요)
- kafka-python
메시지를 생산해서 카프카의 토픽으로 메시지를 보내는 역할을 하는 애플리케이션, 서버 등을 모두 프로듀서라고 함.
- 주요 기능
- 각각의 메시지를 토픽 파티션에 매핑하고 파티션의 리더에 요청을 보냄.
- 키 값을 정해 해당 키를 가진 모든 메시지를 동일한 파티션으로 전달. (키 값이 없을 경우 -> 라운드 로빈 방식으로 파티션에 균등하게 분배)
참고 : producer config
카프카 클러스터에 처음 연결을 하기 위한 호스트와 포트 정보로 구성된 리스트.리스트 전체 입력을 권장.
주어진 리스트의 서버 중 하나에서 장애가 발생할 경우 클라이언트는 자동으로 다른 서버로 재접속을 시도함.
프로듀서가 카프카 토픽의 리더에게 메시지를 보낸 후 요청을 완료하기 전 ack(승인)의 수.
값이 작을수록 성능은 좋지만 메시지 손실 가능성이 있음.
프로듀서가 카프카 서버로 데이터를 보내기 위해 잠시 대기(배치 전송이나 딜레이 등)할 수 있는 전체 메모리 바이트.
같은 파티션으로 보내는 여러 데이터를 함께 배치로 보내면 성능 측면에서 도움이 됨. 정의된 크기보다 큰 데이터는 배치를 시도하지 않음.
고가용성이 필요한 메시지의 경우면 배치 사이즈를 주지 않는 것이 좋을 수 있음.
프로듀서가 보낸 모든 메시지에 대해 응답을 기다리면 시간이 많이 소요됨. 하지만 메시지를 보내지 못했을 때 예외를 처리하거나 에러를 기록(에러 로그)하기 위해서는 비동기 전송이 필요.
- 자바로 구현하기
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
class MyCallback implements Callback {
public void onCompletion(RecordMetadata metadata, Exception exception){
if (metadata != null) {
System.out.println("Partition : " + metadata.partition() + ", Offset : " + metadata.offset() + "");
} else {
exception.printStackTrace();
}
}
}
public class KafkaBookProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "kafkaServer1:9092,kafkaServer2:9092,kafkaServer3:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<String, String>(props);
try{
producer.send(new ProducerRecord<String, String>("test-topic", "This is a first message."), new MyCallback());
} catch (Exception exception) {
exception.printStackTrace();
} finally {
producer.close();
}
}
}
- 파이썬으로 구현하기
from kafka import KafkaProducer
producer = KafkaProducer(acks=1, compression_type='gzip',
bootstrap_servers='kafkaServer1:9092,kafkaServer2:9092,kafkaServer3:9092')
producer.send('test-topic', 'This is a first message.')
프로듀서의 경우 key 옵션을 줄 수 있는데, 해당 옵션을 주지 않을 경우 라운드로빈 방식으로 파티션마다 균등하게 메시지를 보냄. key를 지정하여 특정 파티션으로만 메시지를 보낼 수 있음.
- 사전 토픽 생성 및 메시지 확인
$ /home1/irteam/apps/kafka/bin/kafka-topics.sh \
--zookeeper zkserver1:2181,zkserver2:2181,zkserver3:2181/test-topic \
--replication-factor 1 --partitions 2 --topic key-test-topic create
$ /home1/irteam/apps/kafka/bin/kafka-console-consumer.sh \
--bootstrap-server kafkaserver1:9092,kafkaserver2:9092,kafkaserver3:9092 \
--topic key-test-topic (--partition [0|1]) --from-beginning
- 파이썬으로 구현하기
from kafka import KafkaProducer
producer = KafkaProducer(acks=1, retries=1, compression_type='gzip',
bootstrap_servers='kafkaServer1:9092,kafkaServer2:9092,kafkaServer3:9092')
for i in range(1, 11):
if i % 2 == 1:
producer.send('key-test-topic', key='1', value='%d - send message - key=1' % i)
else:
producer.send('key-test-topic', key='2', value='%d - send message - key=2' % i)
프로듀서의 acks 옵션 설정에 따라 카프카로 메시지를 전송할 때 메시지 손실 여부와 메시지 전송 속도 및 처리량이 달라짐.
- 옵션 설정 : acks=0
프로듀서가 카프카로 메시지를 보낸 후 보낸 메시지에 대해 카프카가 잘 받았는지 확인(acks)을 함.
- 옵션 설정 : acks=1
- 메시지 손실 시나리오 - 아주 예외적인 경우로 발생
(프로듀서가 acks=1로 메시지 전송 후 리더가 그 메시지에 대해 acks를 보낸 후 장애 발생)- 프로듀서가 acks=1 옵션으로 test-topic의 리더에게 1번 메시지를 보냄.
- test-topic의 리더는 1번 메시지를 받은 후 저장.
- test-topic의 리더는 프로듀서에게 1번 메시지를 받았다고 acks를 보낸 후 바로 장애 발생.
- 팔로워들은 리더를 주기적으로 바라봐야 하는데 리더가 없는 상태.
- 리더에 새로운 메시지가 있는지 모르기 때문에 메시지를 가져올 수 없음. (1번 메시지 손실)
- 리플리케이션 방식에 따라 팔로워 중 하나가 새로운 리더로 변경.
- 프로듀서는 1번 메시지에 대해 리더로부터 acks를 받았기 때문에 카프카에 잘 저장된 것으로 인지하고 다음 메시지 전송을 준비.
리더가 메시지를 받았는지 확인 + 팔로워까지 메시지를 받았는지 확인.
프로듀서의 설정 뿐만 아니라 브로커의 설정(응답 확인을 기다리는 수)도 같이 조정해야 함.
- 옵션 설정 : acks=all, min.insync.replicas=[1|2|3 ...]
- min.insync.replicas
브로커 설정 값으로, 최소 리플리케이션 팩터를 지정하는 옵션. (server.properties에서 변경)
- min.insync.replicas 값 만큼 메시지가 존재함(리더와 팔로워에 존재하는 메시지)을 확인한 후 브로커는 프로듀서에게 acks를 보냄.
- 아파치 카프카 문서에서는 손실 없는 메시지 전송을 위한 조건으로 프로듀서는 acks=all, 브로커의 min.insync.replicas의 옵션은 2, 토픽의 리플리케이션 팩터는 3으로 권장.