아파치 카프카 애플리케이션 프로그래밍 with 자바 ‐ 카프카 빠르게 시작해보기 - dnwls16071/Backend_Summary GitHub Wiki

📚카프카 커맨드 라인 툴(Command Line Tool)

kafka-topics.sh

  • 토픽과 관련된 명령을 실행하는데 사용되는 쉘 스크립트이다.
  • 카프카 클러스터에 토픽은 여러 개 존재할 수 있다.
  • 토픽에는 파티션(partition)이 존재하는데 파티션의 개수는 최소 1개부터 시작한다.
  • 파티션을 통해 한 번에 처리할 수 있는 데이터양을 늘릴 수 있고 토픽 내부에서도 파티션을 통해 데이터 종류를 나누어 처리할 수 있다.
$ ded53ba0965b:/opt/kafka$ bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1 --config retention.ms=172800000 --topic hello2.kafka
  • --partitions
    • 파티션 개수를 지정할 수 있다. 파티션의 최소 개수는 1이다.
    • 만약 이 옵션을 사용하지 않으면 카프카 브로커 설정 파일에 있는 num.partitions 옵션값을 따라 생성된다.
  • --replication-factor
    • 토픽의 파티션을 복제할 복제 개수를 적는다. 1은 복제를 하지 않고 사용한다는 의미이다. 2이면 1개의 복제본을 사용하겠다는 의미이다.
    • 파티션의 데이터는 각 브로커마다 저장된다.
    • 한 개의 브로커에 장애가 발생하더라도 나머지 한 개 브로커에 저장된 데이터를 사용하여 안전하게 데이터 처리를 지속적으로 할 수 있다.
    • 복제 개수의 최소 설정은 1이고 최대 설정은 통신하는 카프카 클러스터의 브로커 개수이다.
    • 만약 이 옵션을 사용하지 않으면 default.replication.factor 옵션값에 따라서 생성된다.
  • --config
    • 명령에 포함되지 않은 추가적인 설정을 할 수 있다.
    • retention.ms는 토픽의 데이터를 유지하는 기간을 뜻한다.
$ ded53ba0965b:/opt/kafka$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
hello.kafka
hello2.kafka
$ ded53ba0965b:/opt/kafka$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic hello.kafka
Topic: hello.kafka	TopicId: 8WUG3OeFTXajDhDCdkRP-A	PartitionCount: 1	ReplicationFactor: 1	Configs: segment.bytes=1073741824
	Topic: hello.kafka	Partition: 0	Leader: 1	Replicas: 1	Isr: 1	Elr: 	LastKnownElr: 

// TopicId: 8WUG3OeFTXajDhDCdkRP-A (토픽의 고유 식별자)
// PartitionCount: 1 (파티션 1개)
// ReplicationFactor: 1 (복제본 1개 - 복제 없음)
// Configs: segment.bytes=1073741824 (세그먼트 크기 1GB)

$ ded53ba0965b:/opt/kafka$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic hello2.kafka
Topic: hello2.kafka	TopicId: KiQEMPeAT-Wm5EWAkxS7Dw	PartitionCount: 3	ReplicationFactor: 1	Configs: segment.bytes=1073741824,retention.ms=172800000
	Topic: hello2.kafka	Partition: 0	Leader: 1	Replicas: 1	Isr: 1	Elr: 	LastKnownElr: 
	Topic: hello2.kafka	Partition: 1	Leader: 1	Replicas: 1	Isr: 1	Elr: 	LastKnownElr: 
	Topic: hello2.kafka	Partition: 2	Leader: 1	Replicas: 1	Isr: 1	Elr: 	LastKnownElr:

// PartitionCount: 3 (파티션 3개)
// ReplicationFactor: 1 (복제본 1개)
// Configs:
// segment.bytes=1073741824 (세그먼트 크기 1GB)
// retention.ms=172800000 (보존 기간 48시간 = 172800000ms)

📚kafka-console-producer.sh

  • 토픽에 넣는 데이터를 레코드(Record)라고 부르며 메시지 키(Key)와 메시지 값(Value)으로 구성되어 있다.
  • 메시지 키와 메시지 값을 함께 전송한 레코드는 토픽의 파티션에 저장된다.
  • 메시지 키가 null인 경우 프로듀서가 파티션으로 전송할 때 레코드 배치 단위로 라운드 로빈 방식으로 전송한다.
  • 메시지 키가 존재하는 경우 키의 해시값을 작성하여 존재하는 파티션 중 한 개에 할당된다. 이로 인해 메시지 키가 동일한 경우에는 동일한 파티션으로 전송됨을 보장한다.
$ ded53ba0965b:/opt/kafka$ bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic hello.kafka --property "parse.key=true" --property "key.separator=:"
>key1:no1
>key2:no2
>key3:no3

$ ded53ba0965b:/opt/kafka$ bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic hello.kafka
>hello
>kafka
>0
>1
>2
>3
>4
>5

📚kafka-console-consumer.sh

  • 토픽으로 전송한 데이터에 대해 kakfa-console-consumer.sh 명령어로 확인할 수 있다.
$ ded53ba0965b:/opt/kafka$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic hello.kafka --from-beginning
hello
kafka
0
1
2
3
4
5

no1
no2
no3

$ ded53ba0965b:/opt/kafkabin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic hello.kafka --from-beginning --property print.key=true --property key.separator=:
null:hello
null:kafka
null:0
null:1
null:2
null:3
null:4
null:5
null:
key1:no1
key2:no2
key3:no3

📚kafka-consumer-groups.sh

  • 컨슈머 그룹을 통해 현재 컨슈머 그룹이 몇 개가 생성되었는지, 어떤 이름의 컨슈머 그룹이 존재하는지 확인할 수 있다.
$ ded53ba0965b:/opt/kafka$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
console-consumer-68173
console-consumer-12850
console-consumer-7592
hello-group
console-consumer-56845
console-consumer-40793
$ ded53ba0965b:/opt/kafka$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group hello-group --describe

Consumer group 'hello-group' has no active members.

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
hello-group     hello2.kafka    1          0               0               0               -               -               -
hello-group     hello2.kafka    0          0               0               0               -               -               -
hello-group     hello2.kafka    2          0               0               0               -               -               -
  • GROUP/TOPIC/PARTITION : 조회한 컨슈머 그룹이 마지막으로 커밋한 토픽과 파티션을 나타낸다.
  • CURRENT-OFFSET : 컨슈머 그룹이 가져간 토픽의 파티션에 가장 최신 오프셋이 몇 번인지를 나타낸다. 이 번호는 데이터가 파티션에 들어올 때마다 1씩 증가한다.
  • LOG-END-OFFSET : 해당 컨슈머 그룹의 컨슈머가 어느 오프셋까지 커밋했는지 알 수 있다. CURRENT-OFFSET은 LOG-END-OFFSET과 같거나 작은 값일 수 있다.
  • LAG : 랙은 컨슈머 그룹이 토픽의 파티션에 있는 데이터를 가져가는 데에 얼마나 지연이 발생하는지 나타내는 지표이다. 랙은 컨슈머 그룹이 커밋한 오프셋과 해당 파티션의 가장 최신 오프셋 간의 차이이다.
  • CONSUMER-ID : 컨슈머의 토픽 할당을 카프카 내부적으로 구분하기 위해 사용하는 ID이다. 이 값은 client id에 uuid 값을 붙여서 자동으로 할당되어 유니크한 값으로 설정된다.
  • HOST : 컨슈머가 동작하는 HOST명을 출력한다. 이 값을 통해 카프카에 붙은 컨슈머의 호스트명 또는 IP를 알 수 있다.
  • CLIENT-ID : 컨슈머에 할당된 ID이다. 이 값은 사용자가 지정할 수 있으며 지정하지 않으면 자동 생성된다.

📚kafka-verifiable-producer.sh / kafka-verifiable-consumer.sh

  • kafka-verifiable로 시작하는 스크립트를 사용하면 String 타입 메시지 값을 코드 없이도 주고받을 수 있다.
  • 카프카 클러스터 설치가 완료된 이후에 토픽에 데이터를 전송해 간단한 네트워크 통신 테스트를 할 때 유용하다.
$ ded53ba0965b:/opt/kafka$ bin/kafka-verifiable-producer.sh --bootstrap-server localhost:9092 --max-messages 10 --topic verify-test
{"timestamp":1759594709846,"name":"startup_complete"}
[2025-10-04 16:18:29,984] WARN [Producer clientId=producer-1] The metadata response from the cluster reported a recoverable issue with correlation id 1 : {verify-test=UNKNOWN_TOPIC_OR_PARTITION} (org.apache.kafka.clients.NetworkClient)
{"timestamp":1759594710119,"name":"producer_send_success","key":null,"value":"0","offset":0,"partition":0,"topic":"verify-test"}
{"timestamp":1759594710121,"name":"producer_send_success","key":null,"value":"1","offset":1,"partition":0,"topic":"verify-test"}
{"timestamp":1759594710121,"name":"producer_send_success","key":null,"value":"2","offset":2,"partition":0,"topic":"verify-test"}
{"timestamp":1759594710121,"name":"producer_send_success","key":null,"value":"3","offset":3,"partition":0,"topic":"verify-test"}
{"timestamp":1759594710121,"name":"producer_send_success","key":null,"value":"4","offset":4,"partition":0,"topic":"verify-test"}
{"timestamp":1759594710121,"name":"producer_send_success","key":null,"value":"5","offset":5,"partition":0,"topic":"verify-test"}
{"timestamp":1759594710121,"name":"producer_send_success","key":null,"value":"6","offset":6,"partition":0,"topic":"verify-test"}
{"timestamp":1759594710121,"name":"producer_send_success","key":null,"value":"7","offset":7,"partition":0,"topic":"verify-test"}
{"timestamp":1759594710121,"name":"producer_send_success","key":null,"value":"8","offset":8,"partition":0,"topic":"verify-test"}
{"timestamp":1759594710121,"name":"producer_send_success","key":null,"value":"9","offset":9,"partition":0,"topic":"verify-test"}
{"timestamp":1759594710128,"name":"shutdown_complete"}
{"timestamp":1759594710129,"name":"tool_data","sent":10,"acked":10,"target_throughput":-1,"avg_throughput":35.21126760563381}

$ ded53ba0965b:/opt/kafka$ bin/kafka-verifiable-consumer.sh --bootstrap-server localhost:9092 --topic verify-test --group-id test-group
{"timestamp":1759594763492,"name":"startup_complete"}
{"timestamp":1759594766669,"name":"partitions_assigned","partitions":[{"topic":"verify-test","partition":0}]}
{"timestamp":1759594766695,"name":"records_consumed","count":10,"partitions":[{"topic":"verify-test","partition":0,"count":10,"minOffset":0,"maxOffset":9}]}
{"timestamp":1759594766705,"name":"offsets_committed","offsets":[{"topic":"verify-test","partition":0,"offset":10}],"success":true}

📚kafka-delete-records.sh

  • 이미 적재된 토픽의 데이터를 지우는 방법으로 kafka-delete-records.sh를 사용할 수 있다.
  • 이미 적재된 토픽의 데이터 중 가장 오래된 데이터부터 특정 시점의 오프셋까지 삭제할 수 있다.