Inter microservices invocation and event pub sub with Kafka - TheOpenCloudEngine/uEngine-cloud GitHub Wiki

Inter microservices invocation

microservices ๋“ค ๊ฐ„์˜ ๋‚ด๋ถ€ ํ˜ธ์ถœ(invocation) ๋ฐฉ๋ฒ•์€ ์—ฌ๋Ÿฌ๊ฐ€์ง€๊ฐ€ ์žˆ๋‹ค.

  1. ์™ธ๋ถ€์—์„œ ํ˜ธ์ถœ์„ ํ•˜๋“ฏ์ด REST API ๋ฅผ ํ˜ธ์ถœํ•˜๋Š” ๋ฐฉ๋ฒ•
  2. FeignClient ๋ฅผ ํ†ตํ•˜์—ฌ ๋ฐ”๋กœ ์„œ๋น„์Šค๋ฅผ ์ ‘๊ทผ๋ฐฉ๋ฒ•(spring-cloud)
  3. Publishโ€“subscribe pattern ์„ ์‚ฌ์šฉํ•œ Event Driven ๋ฐฉ๋ฒ•

1๋ฒˆ ๋ฐฉ๋ฒ•์€ Spring์—์„œ ์˜ˆ๋ฅผ ๋“ค์ž๋ฉด @RestController ์™€ @RequestMapping ์„ ์‚ฌ์šฉํ•œ ํ˜ธ์ถœ ๋ฐฉ๋ฒ•์ด๋‹ค.
์™ธ๋ถ€ ์„œ๋น„์Šค๋“ค์„ ์—ฐ๊ฒฐํ• ๋•Œ ๋งŽ์ด ์“ฐ์ด๋Š” ๋ฐฉ๋ฒ•์ด๊ธฐ์—, ๋‚ด๋ถ€์‹œ์Šคํ…œ๋„ ๋ฌผ๋ก  ์‚ฌ์šฉ์ด ๊ฐ€๋Šฅํ•˜์ง€๋งŒ,
๊ฐ microservices ์˜ ip ๋ฐ domain ์„ ๊ธฐ์–ตํ•ด์•ผ ํ•œ๋‹ค.

2๋ฒˆ ๋ฐฉ๋ฒ•์€ @FeignClient("์„œ๋น„์Šค๋ช…") ์„ ์‚ฌ์šฉํ•œ ๋ฐฉ๋ฒ•์ธ๋ฐ,
์—ฌ๊ธฐ์„œ ์„œ๋น„์Šค๋ช…์€ eureka ์— ๋“ฑ๋ก์ด ๋˜์–ด์žˆ์–ด์•ผํ•œ๋‹ค.
์ด ๋ฐฉ๋ฒ•์€ ๊ธฐ์กด์— ์„ค๋ช… ํ•˜์˜€๋˜ Integration with Feign Client ์™€ spring-cloud-netflix-feign-tutorial ์—์„œ ์ž์„ธํ•œ ์„ค๋ช…์„ ์ฐธ๊ณ ํ•  ์ˆ˜ ์žˆ๋‹ค.
์ด ๋ฐฉ๋ฒ•์€ ํ•ด๋‹น ๋งˆ์ดํฌ๋กœ ์„œ๋น„์Šค๋ฅผ ๊ฐ€์ง€๊ณ  ์™€์„œ local ๊ฐ์ฒด์ฒ˜๋Ÿผ ์‚ฌ์šฉ ํ•  ์ˆ˜ ์žˆ๋Š” ์žฅ์ ์ด ์žˆ๋‹ค.

์ด๋ฒˆ์‹œ๊ฐ„์— ์ง‘์ค‘ํ•  ๋‚ด์šฉ์€ 3๋ฒˆ ๋ฐฉ๋ฒ•์ด๋‹ค.
1๋ฒˆ๊ณผ 2๋ฒˆ ๋ฐฉ๋ฒ•์€ ํ˜ธ์ถœ ๊ธฐ๋ฐ˜ ์•„ํ‚คํ…์ฒ˜(Request-Driven Architecture)๋‹ค. ์ด์™€ ๊ฐ™์ด ์„ค๊ณ„์‹œ ์„œ๋น„์Šค๋ฅผ
๊ต์ฒด์‹œ ์˜ํ–ฅ์„ ๋ฏธ์น˜๋Š” ์š”์†Œ๊ฐ€ ๋งŽ์„ ์ˆ˜ ๋ฐ–์— ์—†๋‹ค.
3๋ฒˆ ๋ฐฉ๋ฒ•์€ ์ด๋ฒคํŠธ ์ค‘์‹ฌ ์•„ํ‚คํ…์ฒ˜(EDA, Event-Driven Architecture)์ธ๋ฐ, ์ด๋Š” ์ด๋ฒคํŠธ๊ฐ€ ๋ฐœํ–‰(Publish)๋˜๊ณ ,
์ด ์ด๋ฒคํŠธ๊ฐ€ ํ•„์š”ํ•œ ์„œ๋น„์Šค์—์„œ ์ด๋ฒคํŠธ๋ฅผ ์ˆ˜์‹ (subscribe)ํ•˜์—ฌ ์‚ฌ์šฉํ•˜๋Š” ๋ฐฉ๋ฒ•์ด๋‹ค.

์‰ฌ์šด ์˜ˆ๋ฅผ ๋“ค์ž๋ฉด,
์‡ผํ•‘๋ชฐ์—์„œ ๊ณ ๊ฐ์ด ๋ฌผ๊ฑด์„ ์ฃผ๋ฌธํ•˜๊ณ  ๋‚˜๋ฉด, ํฌ์ธํŠธ,์—…์ ,์ถ•ํ•˜๋ฉ”์ผ,๋ฐฐ์†ก,์žฅ๋ฐ”๊ตฌ๋‹ˆ ๋“ฑ์˜ ์„œ๋น„์Šค๊ฐ€
์—ฐ๋‹ฌ์•„ ์‹คํ–‰์„ ํ•ด์•ผ ํ•œ๋‹ค๊ณ  ๊ฐ€์ •์„ ํ•˜์ž. ์ด๋ฅผ ํ˜ธ์ถœ๊ธฐ๋ฐ˜ ์•„์ผ€ํ‹ฑ์ณ๋กœ ๊ตฌํ˜„์„ ํ•œ๋‹ค๋ฉด, 5๊ฐœ์˜ ์„œ๋น„์Šค์˜
์„œ๋น„์Šค๋ช…์ด๋‚˜, doamin ๋ช… ๋“ฑ์„ ๋ชจ๋‘ ์•Œ๊ณ  ์žˆ์–ด์•ผ ํ•˜๊ณ , ๋ชจ๋‘ call์„ ํ•ด์•ผํ•œ๋‹ค.
์ด๊ฒƒ์„ event pub sub ์œผ๋กœ ๋ณ€๊ฒฝํ•œ๋‹ค๋ฉด, ์ฃผ๋ฌธ์„œ๋น„์Šค์—์„œ๋Š” ์ด๋ฒคํŠธ๋ฅผ ๋ฐœํ–‰๋งŒ ํ•˜๊ณ ,
๋‚˜๋จธ์ง€ ์„œ๋น„์Šค์—์„œ๋Š” ์ˆ˜์‹ ๋งŒ ํ•˜์—ฌ, ์ด๋ฒคํŠธ๊ฐ€ ๋ฐœ์ƒ ํ›„ ํ•ด์•ผํ•  ์ผ๋งŒ ๊ฐ์ž ์„œ๋น„์Šค์—์„œ ์ค€๋น„ํ•˜๋ฉด ๋œ๋‹ค.
์—ฌ๊ธฐ์— microservice ๋“ค์ด ์ถ”๊ฐ€๋œ๋‹ค๊ณ  ํ•˜์—ฌ๋„, ๊ธฐ์กด ์‹œ์Šคํ…œ์— ๋ฏธ์น˜๋Š” ์˜ํ–ฅ์ด ์ตœ์†Œํ™” ๋œ๋‹ค.

Event Publishโ€“subscribe with Kafka

์ด๋ฒคํŠธ๋ฅผ ์ฃผ๊ณ  ๋ฐ›์„๋•Œ ์‚ฌ์šฉ๋˜๋Š” Event-Broker ์—๋Š” Apache ActiveMQ, Apache Kafka, RabbitMQ ๋“ฑ
์—ฌ๋Ÿฌ๊ฐ€์ง€๊ฐ€ ์žˆ๋‹ค. ์ด์ค‘์—์„œ spring ์—์„œ ๋งŽ์ด ์“ฐ์ด๋Š” Kafka ๋กœ ๊ตฌํ˜„๋ฐฉ๋ฒ•์„ ์„ค๋ช…ํ•˜๊ฒ ๋‹ค.

Kafka ์— ๋Œ€ํ•œ ์„ค๋ช…๋“ค์€ ๋งŽ์€ ๋ธ”๋กœ๊ทธ๊ฐ€ ์žˆ์–ด์„œ ํ•ด๋‹น ๋ธ”๋กœ๊ทธ ๋งํฌ๋กœ ๋Œ€์ฒดํ•˜๊ฒ ๋‹ค.
ํ•ต์‹ฌ๊ฐœ๋…์ธ topic ๊ณผ partition, Producer(publish)์™€ Consumer(subscribe) ์— ๋Œ€ํ•œ ์ดํ•ด๊ฐ€ ํ•„์š”ํ•˜๋‹ค.

Kafka ์ดํ•ดํ•˜๊ธฐ
Apache Kafka ์†Œ๊ฐœ ๋ฐ ์•„ํ‚คํ…์ณ

  1. ์šฐ์„  kafka๋ฅผ ํ˜ธ์ถœํ•˜๊ธฐ ์œ„ํ•˜์—ฌ localํ™˜๊ฒฝ์— kafka๋ฅผ ์„ค์น˜ํ•œ๋‹ค.
    ์นดํ”„์นด๋Š” ๋ถ„์‚ฐํ™˜๊ฒฝ์ธ Apache ZooKeeper ์œ„์—์„œ ๋Œ์•„๊ฐ€๊ธฐ๋•Œ๋ฌธ์— ์นดํ”„์นด๋ฅผ ์‹คํ–‰์ „์— ZooKeeper๋ฅผ ๋จผ์ € ์‹คํ–‰ํ•ด์•ผํ•œ๋‹ค.
    ์นดํ”„์นด ์„ค์น˜ ๋ฐ ์‹คํ–‰ ๋ฐฉ๋ฒ•์€ https://kafka.apache.org/quickstart ๋ฅผ ๋”ฐ๋ผํ•˜๋ฉด๋œ๋‹ค.

  2. ์ด์ œ ์šฐ๋ฆฌ์˜ ์„œ๋น„์Šค์— kafka ๋ฅผ ํ†ตํ•˜์—ฌ ๋ฉ”์„ธ์ง€๋ฅผ ๋ฐœํ–‰ํ•˜๊ณ , ์ˆ˜์‹ ํ•˜๋Š” ๋กœ์ง์„ ์ž‘์„ฑํ•˜์—ฌ ๋ณด์ž.

์ด๋ฒคํŠธ Producer(publish), Consumer(subscribe) ์„œ๋น„์Šค ๊ณตํ†ต ์ž‘์„ฑ

pom.xml

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>1.3.2.RELEASE</version>
</dependency>

์—ฌ๊ธฐ์„œ ์ฃผ์˜ํ• ์ ์€ ํ˜„์žฌ (2018๋…„8์›”) spring-kafka ๋Š” 2.x ๋Œ€ release version ์ด ๋‚˜์™€์žˆ๋‹ค.
2.x ๋ฒ„์ „์€ spring-boot 2.x ๋ฒ„์ „์—์„œ๋งŒ ๋Œ์•„๊ฐ€๋‹ˆ, ๋งŒ์•ฝ ์ž๊ธฐ project๊ฐ€ spring-boot 1.x ๋ฅผ
์‚ฌ์šฉ์ค‘์ด๋ผ๋ฉด <version>1.3.x.RELEASE</version> ๋กœ ์„ค์ •์„ ํ•ด์ค˜์•ผ ํ•œ๋‹ค.

application.yml

spring:
  profiles: local
  kafka:
    bootstrap-servers: localhost:9092

์ด๋ฒคํŠธ Producer(publish) ์„œ๋น„์Šค ์ž‘์„ฑ

PublishConfig.java

@EnableKafka
public class PublishConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;
    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<String, Object>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory(producerConfigs());
    }
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<String, String>(producerFactory());
    }
}

๊ฐ„๋‹จํžˆ ์„ค๋ช…์„ ํ•˜์ž๋ฉด @EnableKafka ๋ฅผ ์„ ์–ธํ•˜์—ฌ Kafka์‚ฌ์šฉ์„ spring์— ์•Œ๋ฆฌ๊ณ ,
KafkaTemplate ์„ @Bean ์œผ๋กœ ์„ ์–ธ ํ•˜์˜€๋‹ค.

Clazz.java

@Autowired
KafkaTemplate<String, String> kafkaTemplate;
public void sendkafka(){
   String topic = "topicName";
   String payload = "๋ฉ”์„ธ์ง€ ๋‚ด์šฉ";
   kafkaTemplate.send(topic, payload);
}

์ด๋ฒคํŠธ Consumer(subscribe) ์„œ๋น„์Šค ์ž‘์„ฑ

SubscribeConfig.java

@EnableKafka
public class SubscribeConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;
    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "mail");

        return props;
    }
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

PublishConfig ์™€ ์„ค์ •์ด ๋น„์Šทํ•ด ๋ณด์ด์ง€๋งŒ ConsumerConfig ๊ฐ’์„ ์ง€์ •ํ•˜์—ฌ ์ฃผ๊ณ , GROUP_ID_CONFIG ๋ฅผ
์ง€์ •ํ•˜์—ฌ ์ฃผ์—ˆ๋‹ค. Consumer ์—์„œ GROUP_ID ๋Š” ๋ฐ˜๋“œ์‹œ ์ง€์ •์„ ํ•ด์ค˜์•ผ ํ•œ๋‹ค.
์ด๋Š” ์ปจ์Šˆ๋จธ ๊ทธ๋ฃน์€ ํ•˜๋‚˜์˜ topic์— ๋Œ€ํ•œ ์ฑ…์ž„์„ ๊ฐ€์ง€๊ณ  ์žˆ๊ธฐ๋•Œ๋ฌธ์ด๋‹ค.

๋งˆ์ง€๋ง‰์œผ๋กœ ํ† ํ”ฝ์„ ๋ฐ›๋Š” ๋ถ€๋ถ„์ด๋‹ค.

    @KafkaListener(topics = "topicName")
    public void receiveTopic(ConsumerRecord<?, ?> consumerRecord) {
        System.out.println("Receiver on topic: "+consumerRecord.toString());
        String data = (String)consumerRecord.value();
    }
โš ๏ธ **GitHub.com Fallback** โš ๏ธ