Section 11: 데이터 동기화를 위한 Apache Kafka의 활용 1 - KwangtaekJung/MSA-SpringCloud-user-service GitHub Wiki
Section 11: 데이터 동기화를 위한 Apache Kafka의 활용 1
- Kafka 란?
- Kafka 설치
- kafka PRoducer / Consumer
- Kafka Connect
Apache Kafka
- Apache Sofrware Foundation의 Scalar 언어로 된 오픈 소스 메시지 브로커 프로젝트
- Open Source Message Broker Project
- 실시간 데이터 피드를 관리하기 위해 통일된 높은 처리량, 낮은 지연 시간을 지닌 플랫폼 제공
- 모든 시스템으로 데이터를 실시간 전송하여 처리할 수 있는 시스템
- 데이터가 많아지더라도 확장이 용이한 시스템
- Kafka Broker
- 실행 된 Kafka 애플리케이션 서버
- 3대 이상의 Broker Cluster 구성
- Zookeeper 연동 (Cluster로 구성된 Kafka Broker 서버를 코디네이팅하는 역할)
- n개 Broker 중 1대는 Controller 기능 수행
- Kafka 3대 이상, Zookeeper도 3개 정도 구성하여 운영하는 것이 안정적임.
Kafka 설치
Ecosystem 1 - Kafka Client
- kafa와 데이터를 주고 받기 위해 사용하는 Java Library
- producer, Consumer, Admin, Stream 등 Kafka 관련 API 제공
- 다양한 3rd party library 존재
- cwiki.apache.org/confluence/display/AFAKA/Clients
Kafka 서버 기동
- Zookeeper 및 Kakfa 서버 구동
PS C:\workspace\tools\kafka_2.13-3.1.0> .\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
PS C:\WorkSpace\tools\kafka_2.13-3.1.0> .\bin\windows\kafka-server-start.bat .\config\server.properties
- Topic 생성
PS C:\WorkSpace\tools\kafka_2.13-3.1.0> .\bin\windows\kafka-topics.bat --create --topic quickstart-events --bootstrap-server localhost:9092 --partitions 1
- Topic 목록 확인
PS C:\WorkSpace\tools\kafka_2.13-3.1.0> .\bin\windows\kafka-topics.bat --bootstrap-server localhost:9092 --list
- Topic 정보 확인
PS C:\WorkSpace\tools\kafka_2.13-3.1.0> .\bin\windows\kafka-topics.bat --describe --topic quickstart-events --bootstrap-server localhost:9092
Ecosystem 2 - Kafka Connect
- Kafka Connect를 통해 Data를 Import/Export 가능
- 코드 없이 Configuration으로 데이터를 이동
Maria 설치 및 구동
- 윈도우 MariaDB MSI으로 설치
- 윈도우 MariaDB 구동
- 구동방법1) MySQL Client로 구동
- MySQL [(none)] show databases;
- MySQL [(none)] create database mydb;
- MySQL [(none)] use mydb;
- MySQL [(none)] show tables;
- 구동방법2) HeidlSQL로 접속
- 구동방법1) MySQL Client로 구동
Kafka Connect 설치 및 구동
- 설치
- Kafka Connect 설치 (confluent-6.1.0)
- JDBC Connector 설치 (confluentinc-kafka-connect-jdbc-10.3.3)
- JDBC Connector 설치
- ./etc/kafka/connect-distributed.properties 파일 마지막에 아래 plugin 정보 추가
plugin.path=\C:\\WorkSpace\\tools\\confluentinc-kafka-connect-jdbc-10.3.3\\lib
- JdbcSourceConnector에서 MariaDB 사용하기 위해 mariadb 드라이버 복사
- ./share/java/kafka 폴더에 mariadb-java-client-2.7.2.jar 파일 복사(.m2/repository/org/mariadb/jdbc/mariadb-java-client/2.7.2에 있음.)
- 구동
- Zookeeper와 Kafka Server가 구동 되어있는 상태여야 함.
PS C:\WorkSpace\tools\confluent-6.1.0> .\bin\windows\connect-distributed.bat .\etc\kafka\connect-distributed.properties
- Topic 확인
PS C:\WorkSpace\tools\confluent-6.1.0> .\bin\windows\kafka-topics.bat --bootstrap-server localhost:9092 --list __consumer_offsets connect-configs connect-offsets connect-status quickstart-events
Kafka Source Connect 테스트
- Kafka Source Connect 추가 (MariaDB)
curl --location --request POST '127.0.0.1:8083/connectors'
{ "name": "my-source-connect", "config": { "connector.class" : "io.confluent.connect.jdbc.JdbcSourceConnector", "connection.url": "jdbc:mysql://localhost:3306/mydb", "connection.user" : "root", "connection.password" : "jung050@14", "mode" : "incrementing", "table.whitelist" : "users", ==> 변경을 체크할 테이블 "topic.prefix" : "my_topic_", ==> 변경 사항을 저장할 토픽의 prefix "tasks.max" : "1" } }
- Kafka Source Connect 테스트
- Kafka Source Connector 등록
==>Postmna을 이용하여 등록 가능함.
- mariaDB에서 users 테이블에 신규 컬럼 추가
insert into users(user_id, pwd, name) values('admin', 'admin1111', 'Administrator');
- Topic 확인
PS C:\WorkSpace\tools\kafka_2.13-3.1.0> .\bin\windows\kafka-topics.bat --bootstrap-server localhost:9092 --list __consumer_offsets connect-configs connect-offsets connect-status my_topic_users <== quickstart-events
- Consumer에서 변경 사항 확인 가능
PS C:\Workspace\tools\kafka_2.13-3.1.0> .\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic my_topic_users --from- beginning {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"user_id"}, {"type":"string","optional":true,"field":"pwd"},{"type":"string","optional":true,"field":"name"},{"type":"int64","optional":true,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"created_at"}],"optional":false,"name":"users"},"payload":{"id":1,"user_id":"user1","pwd":"1111","name":"User name","created_at":1649192430000}}
- Kafka Source Connector 등록
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"user_id"},{"type":"string","optional":true,"field":"pwd"},{"type":"string","optional":true,"field":"name"},{"type":"int64","optional":true,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"created_at"}],"optional":false,"name":"users"},"payload":{"id":2,"user_id":"admin","pwd":"admin1111","name":"Administrator","created_at":1649192644000}} ```
Kafka Sink Connect 테스트
- Kafka Sink Connect 추가 (MariaDB)
- sink connect와 연결된 table이 생성된 것을 mariaDB에서 확인 가능
MariaDB [mydb]> show tables; +----------------+ | Tables_in_mydb | +----------------+ | my_topic_users | <== 토픽과 동일한 이름의 table이 생성된다. | users | +----------------+ 2 rows in set (0.001 sec)