Section 12: 데이터 동기화를 위한 Apache Kafka의 활용 2 - KwangtaekJung/MSA-SpringCloud-user-service GitHub Wiki

Section 12: 데이터 동기화를 위한 Apache Kafka의 활용 2

Orders Microservice와 Catalogs Microservice에 Kafka Topic의 적용

데이터 동기화 1 Orders -> Catalogs

  • Orders Service에 요청 된 주문의 수량 정보를 Catalogs Service에 반영
  • Orders Service에서 Kafka Topic으로 메시지 전송 -> Producer
  • Catalogs Service에서 Kafka에 전송된 메시지 취득 -> Consumer

Catalogs Microservice 수정

Orders Microservice 수정

Kafka를 활용한 데이터 동기화 테스트 1

  • 단일 Order Service에서 Kafka Producer, Consumer를 이용한 Catalog 데이터 동기화

Multi Orders Microservice 사용에 따른 데이터 동기화 문제

Multi Orders Microservice에서의 데이터 동기화

  • Orders Service 2개 기동
    • Users의 요청 분산 처리
    • Orders 데이터도 분산 저장 -> 동기화 문제 발생

Kafka Connect를 활용한 단일 데이터베이스를 사용

데이터 동기화 2 - Multi Orders Service

  • Orders Service에 요청된 주문 정보를 DB가 아니라 Kafka Topic으로 전송
  • Kfaka Topic에 설정된 Kfaka Sink Connect를 사용해 단일 DB에 저장 -> 데이터 동기화

Orders Microservice 수정 - MariaDB

  • Orders Service의 JPA 데이터베이스 교체
    • H2 DB -> MariaDB (h2-console에서도 MariaDB 접속 가능함.)
    • MariaDB 설치 후 접속
      root@9c9a988d625f:/# mysql -h127.0.0.1 -uroot -p
      Enter password:
      ERROR 1045 (28000): Access denied for user 'root'@'127.0.0.1' (using password: YES)
      
    • 127.0.0.1로 접속이 안될 경우 조치 방법
root@9c9a988d625f:/# mysql -uroot -p
Enter password:
Welcome to the MariaDB monitor.  Commands end with ; or \g.
Your MariaDB connection id is 6
Server version: 10.7.3-MariaDB-1:10.7.3+maria~focal mariadb.org binary distribution

Copyright (c) 2000, 2018, Oracle, MariaDB Corporation Ab and others.

Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.

MariaDB [(none)]> grant all privileges on *.* to 'root'@'%' identified by 'jug050@14';
Query OK, 0 rows affected (0.002 sec)

MariaDB [(none)]> flush privileges;
Query OK, 0 rows affected (0.001 sec)
  • exit 후 다시 -h127.0.0.1로 접속해보면 접속되는 것을 확인할 수 있다.
  • MariaDB에 Orders 테이블 수동 생성한다.
    MariaDB [mydb]> create table orders (id int auto_increment primary key,
        -> user_id varchar(50) not null,
        -> product_id varchar(20) not null,
        -> order_id varchar(50) not null,
        -> qty int default 0,
        -> unit_price int default 0,
        -> total_price int default 0,
       -> created_at datetime default now()
        -> )
        -> ;
    

Orders Microservice 수정 - Order Kafka Topic

  • Orders Service의 Producer에서 발행하기 위한 메시지 등록

Orders Microservice 수정 - Order Kafka Producer

  • Orders Service의 OrderProducer 생성

Kafka를 활용한 데이터 동기화 테스트 2

  • Zookeeper, Kafka Server 및 Kafka Connect 구동
  • Orders Service를 위한 Kafka Sink Connector 추가 (Postman 이용)
    • 현재 connector 조회 (Postman 이용, GET http://127.0.0.1:8083/connectors)
    • my-order-sink-connector 추가 (POST 127.0.0.1:8083/connectors)
      {
          "name": "my-order-sink-connect",
          "config": {
              "connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
              "connection.url": "jdbc:mysql://localhost:3306/mydb",
              "connection.user" : "root",
              "connection.password" : "jung050@14",
              "auto.create" : "true",
              "auto.evolve" : "true",
              "delete.enabled" : "false",
              "mode" : "incrementing",
              "tasks.max" : "1",
              "topics" : "orders"
          }
      }
      
    • 주문 테스트
      • 2개의 Order Service 기동