마이크로서비스 간 커뮤니케이션 - TheOpenCloudEngine/uEngine-cloud GitHub Wiki

e-Shop 시나리오에서 고객이 상품을 구매할 때 마다 상품에 설정된 point 값이 고객의 포인트에 누적되게 된다. 그러기 위하여 주문서비스를 통하여 주문 정보가 추가될 때 단순히 고객의 포인트를 누적한다고 보자. (물론, 배송이 이루어진 후 추가되는 것이 맞으나 시간상 주문 즉시 포인트가 추가된다고 본다)

로컬 호출로 두개의 서비스 간의 연결 (모놀로씩)

주문서비스와 고객서비스가 분리되기 이전에는 상품 구매에 따라 고객의 포인트를 누적시키는 작업은 다음과 같이 간단하게 처리 가능하다:

public class Order implements BeforeSave{
    ....
    @Override
    public void beforeSave() {

        if(getCustomer()!=null && getItem()!=null)
            getCustomer().setPoint(getCustomer().getPoint() + getItem().getPoint());

    }
}

BeforeSave 인터페이스가 부여되면, 저장이 이루어지기 전에 해당 엔티티의 beforeSave() 메서드가 호출되도록 MultitenantRepository가 구현되어있어서, 저장 전에 해야 할 액션이 있다면 그 부분에 로직을 작성한다. 예제는 beforeSave()에 직접 Customer 엔티티를 얻어와서 바로 상품의 포인트 값을 합산하여 추가한다. 하나의 트랜잭션과 Spring Application Context 내에 존재하기 때문에 구현은 가장 간단하다 볼 수 있다.

외부 호출로 서비스 연동 (동기식)

동기식으로 연동하는 방법은 말그대로 서비스에서 다른 서비스를 REST 호출 하는 것이다. 그나마 호출을 추상적인 방법으로 하는 방법으로 FeignClient 를 추천한다. FeignClient 는 REST 서비스의 추상화된 Stub interface 를 제공해주어 Spring DI 방식으로 쉽게 외부 서비스를 호출하여 사용할 수 있도록 해준다:

  1. FeignClient interface (Stub class) 만들기 먼저 호출될 서비스의 REST API interface 를 Java 와 매핑하기 위하여 JAX-RS 방식의 interface 를 만들어 준다:

@FeignClient("customer-service")
public interface CustomerService {

    @RequestMapping(value = "/customers/{customerId}", method = {RequestMethod.PATCH, RequestMethod.POST, RequestMethod.PUT})
    public ResourceSupport saveCustomer(@PathVariable("customerId") String customerId, @RequestBody Customer customer) throws Exception;
}

위의 선언한 인터페이스 (스텁) 클래스를 이용하여 @Autowired 로 injection 만 해서 사용하면 된다:

    @Override
    public void beforeSave() {

        if(getCustomer()!=null && getItem()!=null) {
            getCustomer().setPoint(getCustomer().getPoint() + getItem().getPoint());

            CustomerService customerService = MetaworksRemoteService.getComponent(CustomerService.class);
            try {
                customerService.saveCustomer(getCustomer().getId(), getCustomer());
            } catch (Exception e) {
                throw new RuntimeException("고객 정보 업데이트 중 오류 발생", e);
            }

        }

    }

이벤트로 서비스 연동 (비동기식)

만약 주문이 발생한 후 고객 정보만 업데이트 되고 끝난다면 위의 FeignClient 를 통한 point-to-point 방식의 연동 만으로도 만족스러운 결과를 얻을 수 있다 하지만 다음과 같은 경우라면:

  1. 주문 이벤트가 발생하면 고객정보, 상품정보 등 많은 서비스가 거기에 따라 반응해야 한다. 예를 들어 고객 포인트가 변경되어야 하고, 재고량이 바뀌어야 한다.
  2. 주문 이벤트는 빠르게 처리되어야 하기 때문에 고객정보, 상품정보가 모두 업데이트 될 때까지 기다릴 수 없고, 이후에 천천히 포인트 정보와 재고량이 변경되도록 큐잉 처리하고 싶다.

이러한 경우라면 Event Queue 를 통하여 마이크로 서비스간의 데이터를 맞추어주는 방법이 있다.

예제에서는 Kafka 를 이용하여 주문서비스가 주문이벤트를 Publish하고, 고객서비스와 재고서비스가 (여기서는 재고서비스를 분리하지 않았으니, 그냥 고객서비스만 Subscribe 하는 걸로 하자) 해당 이벤트를 listening (subscribe)한다고 보자.

    @Override
    public void beforeSave() {

        if(getCustomer()!=null && getItem()!=null) {
            EventSender eventSender = MetaworksRemoteService.getComponent(EventSender.class);
            eventSender.sendBusinessEvent("{\"order\":" + getCustomer().getId() + ", \"item\":" + getItem().getItem());

        }

    }

그리고, EventSourcing 기능을 사용하도록 Configuration Class 를 추가해준다:

package hello;

import org.metaworks.springboot.configuration.Metaworks4EventSourcingConfig;
import org.springframework.context.annotation.Configuration;

/**
 * Created by uengine on 2018. 1. 19..
 */
@Configuration
public class EventSourcingConfig extends Metaworks4EventSourcingConfig {
}

마지막으로 application.yml 설정에 kafka 설정을 추가해준다:


kafka:
    bootstrap-servers: cloud.pas-mini.io:10091
    topic:
      eshop: eshop.topic

받는 쪽인 Customer.java 의 Application.java 에 이벤트를 받는 코드를 넣어준다:

    @KafkaListener(topics = "${kafka.topic.eshop}")
    public void receive(String payload) {

        ObjectMapper jsonMapper = new ObjectMapper();
        try {
            JsonNode jsonNode = jsonMapper.readTree(payload);

            String event = jsonNode.get("event").textValue();

            if("order".equals(event)){
                String itemId = jsonNode.get("item").textValue();
                Long customerId = jsonNode.get("customer").longValue();

                CustomerRepository customerRepository = MetaworksRemoteService.getComponent(CustomerRepository.class);
                Customer customer = customerRepository.findOne(customerId);
                
                customer.setPoint(customer.getPoint() + 100);
            }

        } catch (IOException e) {
            throw new RuntimeException("고객 정보 업데이트 중 오류가 발생했습니다.", e);
        }
    }

받는쪽 또한 application.yml 설정이 필요하다:

kafka:
    bootstrap-servers: cloud.pas-mini.io:10091
    topic:
      eshop: eshop.topic
    consumer:
      topicKey: eshop