chapter8 SeungminLee - JAVA-JIKIMI/SPRING-IN-ACTION-5 GitHub Wiki
๋น๋๊ธฐ ๋ฉ์์ง ์ ์กํ๊ธฐ
๋๊ธฐ workflow
๋น๋๊ธฐ workflow
๋น๋๊ธฐ ๋ฉ์ธ์ง์ ์ดํ๋ฆฌ์ผ์ด์ ๊ฐ์ ์๋ต์ ๊ธฐ๋ค๋ฆฌ์ง ์๊ณ ๊ฐ์ ์ ์ผ๋ก ๋ฉ์ธ์ง๋ฅผ ์ ์กํ๋ ๋ฐฉ๋ฒ์ด๋ค. ๋ฉ์ธ์ง๋ฅผ ์ ์กํ๋ฉด ํด๋น ์๋น์ค๋ ์ธ์ ๊ฐ ์ฒ๋ฆฌํ ๊ฒ์ด๋ผ๋ ๊ฐ์ ํ์ ํด๋ผ์ด์ธํธ๋ ์ดํ์ ๋์์ ์ํํ๋ค. ๋ฐ๋ผ์ ์ด๋ ํต์ ํ๋ ์ดํ๋ฆฌ์ผ์ด์ ๊ฐ์ ๊ฒฐํฉ๋๋ฅผ ๋ฎ์ถฐ์ฃผ๊ณ ํ์ฅ์ฑ์ ๋์ฌ์ค๋ค.
Spring ์์ ์ง์ํ๋ ๋น๋๊ธฐ ๋ฉ์ธ์ง
- JMS (Java Message Service)
- RabbitMQ
- AMQP
- ์ํ์น ์นดํ์นด
- (redis)
JMS๋?
- ๋ ๊ฐ ์ด์์ ํด๋ผ์ด์ธํธ ๊ฐ์ ๋ฉ์ธ์ง ํต์ ์ ์ํ ๊ณตํต API๋ฅผ ์ ์ํ๋ ์๋ฐ ํ์ค์ด๋ค.
- 2001๋ ์ ์ฒ์ ์๊ฐ๋์๋ค.
- JMS๊ฐ ๋์ค๊ธฐ ์ ์๋ ํด๋ผ์ด์ธํธ ๊ฐ์ ๋ฉ์์ง ํต์์ ์ค๊ฐํ๋ ๋ฉ์ธ์ง ๋ธ๋ก์ปค๋ค์ด ์ฒํธ์ผ๋ฅ ์ API๋ฅผ ๊ฐ์ง๊ณ ์์ด ํธํ์ด ์ด๋ ค์ ๋ค.
์คํ๋ง์ JmsTemplate ๊ธฐ๋ฐ์ ํด๋์ค๋ฅผ ํตํด JMS๋ฅผ ์ง์ํ๊ณ , JmsTemplate์ ์ฌ์ฉํ๋ฉด ํ๋ก๋์๊ฐ ํ(๋๋ ํ ํฝ)์ ๋ฉ์ธ์ง๋ฅผ ์ ์กํ๊ณ ์ปจ์๋จธ๋ ๊ทธ ๋ฉ์ธ์ง๋ค์ ๋ฐ์ ์ ์๋ค.
์คํ๋ง์ ๋ฉ์ธ์ง ๊ธฐ๋ฐ์ POJO(plain old java object_๋ ์ง์ํ๋๋ฐ ์ฌ๊ธฐ์ pojo๋ ํ๋ ํ ํฝ์ ๋์ฐฉํ๋ ๋ฉ์ธ์ง์ ๋ฐ์ํ์ฌ ๋น๋๊ธฐ ๋ฐฉ์์ผ๋ก ๋ฉ์ธ์ง๋ฅผ ์์ ํ๋ ๊ฐ๋จํ ์๋ฐ ๊ฐ์ฒด์ ์๋ฏธ๋ก ์ฌ์ฉ๋๋ค.
-
JMS๋ฅผ ์ฌ์ฉํ๊ธฐ ์ํด์๋ JMS ํด๋ผ์ด์ธํธ๋ฅผ ํ๋ก์ ํธ์ ์ถ๊ฐํ๊ธฐ๋ง ํ๋ฉด ๋๋ค.ใ
-
๋ฉ์ธ์ง๋ฅผ ์ ๋ฌํด ์ค ๋ธ๋ก์ปค๋ฅผ ๊ณจ๋ผ์ผ ํ๋ค. (์์ : ์ํ์น ActiveMQ, ์ํ์น ActiveMQ Aretemis)
-
์ ํํด์ ์์กด์ฑ์ ์ถ๊ฐํด๋ณด์ (์ฑ ์์๋ ๋ ์ต์ ์ ActiveMQ Aretemis ์ฌ์ฉ)
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-artemis<artifactId> </dependency>
-
Aretemis ๋ธ๋ก์ปค๋ฅผ ๊ฒฐ์ ํ๋ค๋ฉด ๋ช๊ฐ์ง properties ๋ฅผ ์ค์ ํด์ผ ํ๋ค. (yml)
spring: artemis: host: artemis.tacocloud.com port: 61617 user: tacoweb //์ ํ์ฌํญ password: l3tm31n //์ ํ์ฌํญ
โ ๊ฐ๋ฐํ๊ฒฝ์์๋ ์ค์ ํด์ค ํ์๊ฐ ์์ง๋ง ์ด์์ ํ ๋๋ Aretemis๋ฅผ ๋ฐ๋ก ์ค์น ํด์ผ ํ๋ค. ์ด์ํ๊ฒฝ์์์ ์ค์ ์ Aremis์ ๊ณต์ ๋ฌธ์๋ฅผ ์ฐธ๊ณ ํ์.
- ํ๋ก์ ํธ์ JMS๋ฅผ ์ถ๊ฐ ํ๊ณ , ๋ธ๋ก์ปค๊ฐ ํ ์ดํ๋ฆฌ์ผ์ด์ ์์ ๋ค๋ฅธ ์ดํ๋ฆฌ์ผ์ด์ ์ผ๋ก ๋ฉ์ธ์ง๋ฅผ ์ ๋ฌํ๊ธฐ ์ํด ๋๊ธฐ์ค์ด๋ค. ์ ์ก ์์ํ ์ค๋น๊ฐ ๋๋ ๊ฒ์ด๋ค!
JmsTemplate (Spring Framework 5.2.8.RELEASE API)
JmsTemplate์ JMS๋ก ์์ ํ๋๋ฐ ํ์ํ ์ฝ๋๋ฅผ ์ค์ฌ์ค๋ค. JmsTemplate์ ๋ฉ์ธ์ง ๋ธ๋ก์ปค์ ์ฐ๊ฒฐ ๋ฐ ์ธ์ ์ ์์ฑํ๋ ์ฝ๋์ ๋ฉ์ธ์ง๋ฅผ์ ์กํ๋ ๋์ค ๋ฐ์ํ ์ ์๋ ์์ธ๋ฅผ ์ฒ๋ฆฌํ๋ ์ฝ๋๋ ๊ตฌํ์ด ๋์ด์๋ค. ๋๋ฌธ์ ์ฐ๋ฆฌ๋ ๋ฉ์ธ์ง ์ ์ก ์์ ์๋ง ์ง์คํ๋ฉด ๋๋ค.
send์ convertAndSend๊ฐ ๋ฉ์ธ์ง ์ ์ก์ ํ๋ ๋ฉ์๋์ธ๋ฐ send๋ ์์ ํํ์ ๋ฉ์ธ์ง๋ฅผ ์ ์กํ๊ณ , convertAndSend๋ ๊ฐ์ฒด๋ฅผ ๋ฐ์์ ๋ณํ์ ํ๊ณ , ์ ์กํ๊ธฐ ์ ์ MessagePostProcessor๋ก ์ถ๊ฐ๋ก ํ์ฒ๋ฆฌ๊น์ง ํ ํ์ ๋ฉ์ธ์ง๋ฅผ ์ ์กํ๋ค.
void send(MessageCreator messageCreator) throws JmsException;
void send(Destination destination, MessageCreator messageCreator) throws JmsException;
void send(String destinationName, MessageCreator messageCreator) throws JmsException;
void convertAndSend(Object message) throws JmsException;
void convertAndSend(Destination destination, Object message) throws JmsException;
void convertAndSend(String destinationName, Object message) throws JmsException;
void convertAndSend(Object message, MessagePostProcessor postProcessor) throws JmsException;
void convertAndSend(Destination destination, Object message, MessagePostProcessor postProcessor) throws JmsException;
void convertAndSend(String destinationName, Object message, MessagePostProcessor postProcessor) throws JmsException;
- send๋ Message ๊ฐ์ฒด๋ฅผ ์์ฑํ๊ธฐ ์ํด MessageCreator ์ธ์๋ก ๋ฐ๋๋ค.
- converAndSend๋ Object ํ์ ์ ์ธ์๋ก ๋ฐ์ ๋ด๋ถ์ ์ผ๋ก Message ํ์ ์ผ๋ก ๋ณํํ๋ค.
- ๋ฉ์ธ์ง๋ฅผ ์ ์กํ๊ธฐ ์ ์ ์ปค์คํฐ๋ง์ด์ง์ ํ ์ ์๋๋ก MessagePostProcessor ๋ํ ์ธ์๋ก ๋ฐ๋๋ค.
- ๋ชฉ์ ์ง์ ๋ํ ์๋ฌด๋ฐ ์ธ์๋ฅผ ๋ฐ์ง ์์ ๊ฒฝ์ฐ: ํด๋น ๋ฉ์ธ์ง๋ฅผ ๋ฏธ๋ฆฌ ์ ํด๋ ๊ธฐ๋ณธ ๋์ฐฉ์ง๋ก ์ ์กํ๋ค.
- Destination ๊ฐ์ฒด๋ String ๊ฐ์ฒด๋ฅผ ์ด์ฉํด ๋ชฉ์ ์ง๋ฅผ ์ค์ ํ ์ ์๋ค.
package tacos.messaging;
import javax.jms.JMSException;
import javax.jms.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Service;
import tacos.Order;
@Service
public class JmsOrderMessagingService implements OrderMessagingService {
private JmsTemplate jms;
@Autowired
public JmsOrderMessagingService(JmsTemplate jms) {
this.jms = jms;
}
// ์ต๋ช
๋ด๋ถ ํด๋์ค๋ฅผ ์ธ์๋ก ์ ๋ฌํ ์์1
@Override
public void sendOrder(Order order) {
jms.send(new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
return session.createObjectMessage(order);
}
}
// ๋๋ค๋ฅผ ์ฌ์ฉํด์ ๋ ๊ฐ๋จํ ํํํ ์์2
@Override
public void sendOrder(Order order) {
jms.send(session -> session.createObjectMessage(order));
}
}
์์ 1, 2 ๋ชจ๋ MessageCreator ์ธํฐํ์ด์ค๋ฅผ ๊ตฌํํด์ send์ ์ธ์๋ก ๋๊ธฐ๊ณ ์๋ค. MessageCreator๋ ํจ์ํ ์ธํฐํ์ด์ค์ด๊ธฐ ๋๋ฌธ์ ๋๋ค๋ก ๋ํ๋ผ ์ ์๋ค. ์๊น destination ์ ์ค์ ํ์ง ์์ ๋ฉ์๋๋ค์ ๋ฉ์ธ์ง๋ฅผ ๊ธฐ๋ณธ ๋ชฉ์ ์ง๋ก ๋ณด๋ธ๋ค๊ณ ํ๋๋ฐ ์ด๋ jms์ properties ์ค ํ๋๋ฅผ ์ค์ ํ ๊ฒ์ด๋ค.
spring:
jms:
template:
default-destination: tacocloud.order.queue
์ด์ฒ๋ผ ๊ธฐ๋ณธ ๋์ฐฉ์ง๋ฅผ ์ง์ ํด์ ์ฌ์ฉํ๋ ๊ฒ์ด ๊ฐ์ฅ ์ฌ์ด ๋ฐฉ๋ฒ์ด๋ค. ๋์ฐฉ์ง ์ด๋ฆ์ ํ ๋ฒ๋ง ์ง์ ํ๋ฉด ์ฝ๋์์๋ ๋ฉ์ธ์ง๊ฐ ์ ์ก๋๋ ๊ณณ์ ๋งค๋ฒ ์ ๊ฒฝ์ฐ์ง ์๊ณ ์ ์ก๋ง ํ๋ฉด ๋๊ธฐ ๋๋ฌธ์ด๋ค. ๊ธฐ๋ณธ ๋ชฉ์ ์ง๋ก ๋ฉ์ธ์ง๋ฅผ ๋ณด๋ด๋๊ฒ ์๋๋ผ๋ฉด send()๋ฉ์๋์์ destination(Destination ๊ฐ์ฒด ๋๋ String ํํ)๋ฅผ ์ธ์๋ก ๋๊ฒจ์ค์ผ ํ๋ค.
Destination ๊ฐ์ฒด๋ฅผ send()์ ์ ๋ฌํ๊ณ ์ ํ๋ค๋ฉด Destination ๋น์ ์ ์ธํ๊ณ ๋ฉ์ธ์ง ์ ์ก์ ์ํํ๋ ๋น์ DI๋ฅผ ํ์. Artemis ์์ ์ ๊ณตํ๋ ActiveMQQueue๋ฅผ ์ด์ฉํด์ ๊ตฌํํด๋ณด์.
Destination ๊ฐ์ฒด๋ฅผ ์ฌ์ฉํด ๋ฉ์ธ์ง ๋์ฐฉ์ง๋ฅผ ์ง์ ํ๋ฉด ๋์ฐฉ์ง ์ด๋ฆ๋ง ์ง์ ํ๋ ๊ฒ๋ณด๋ค ๋ ๋คํฅํ๊ฒ ๋์ฐฉ์ง๋ฅผ ๊ตฌ์ฑํ ์ ์์ผ๋, ์ค์ ๋ก๋ ๋์ฐฉ์ง ์ด๋ฆ ์ธ์ ๋ค๋ฅธ ๊ฒ์ ์ง์ ํ๋ ์ผ์ ๊ฑฐ์ ์์ผ๋ String์ผ๋ก ๊ฐ๋จํ๊ฒ ์ด๋ฆ์ ์ง์ ํด๋ ๋๋ค.
@Bean
public Desination orderQueue() {
return new ActiveMQQueue("tacocloud.order.queue");
}
@Service
public class JmsOrderMessagingService implements OrderMessagingService {
private JmsTemplate jms;
private Destination orderQueue;
@Autowired
public JmsOrderMessagingService(JmsTemplate jms, Destination orderQueue) {
this.jms = jms;
this.orderQueue = orderQueue;
}
@Override
public void sendOrder(Order order) {
jms.send(
orderQueue,
session -> session.createObjectMessage(order));
}
// destination ์ ์ฌ์ฉํ์ง ์๊ณ ๊ฐ๋จํ๊ฒ string์ผ๋ก ๋์ฐฉ์ง๋ฅผ ํํํ ์์
@Override
public void sendOrder(Order order) {
jms.send(
"tacocloud.order.queue",
session -> session.createObjectMessage(order));
}
}
send() ๋ฉ์๋๋ ์ฌ์ฉ์ด ์ด๋ ต์ง ์์ง๋ง Message ๊ฐ์ฒด ์์ฑ์ ์ํด MessageCreator๋ฅผ ๋๋ฒ์งธ ์ธ์๋ก ๋๊ฒจ์ค์ผ ํด์ ์ฝ๋๊ฐ ์กฐ๊ธ ๋ณต์กํด์ง๋ค. ์ด๋ด๋ convertAndSend()๋ฅผ ์ฌ์ฉํ์.
์์์ ๋งํ๋ ๊ฒ์ฒ๋ผ convertAndSend๋ ๊ฐ์ฒด๋ฅผ ์ธ์๋ก ๋๊ฒจ์ฃผ๊ธฐ๋ง ํ๋ฉด Message ๊ฐ์ฒด๋ก ๋ณํ ํ ์ ์ก์ด ๋๋ค. (๋์ฐฉ์ง๋ฅผ ์ ์ ํ๋ ๋ฐฉ๋ฒ์ send ๋ฉ์๋์ ๋์ผํ๋ค). Message ๊ฐ์ฒด๋ก ๋ณํํ๋ ์ผ์ spring์ ์ ์๋ MessgeConverter ์ธํฐํ์ด์ค ๊ตฌํ์ฒด๊ฐ ์ฌ์ฉ๋๋ค.
// convertAndSend ์ฌ์ฉ ์์
@Override
public void sendOrder(Order order) {
jms.convertAndSend("tacocloud.order.queue", order);
}
public interface MessageConverter {
Message toMessage(Object object, Session session) throws JMSException, MessageCOnversionException;
Object fromMessage(Message message);
}
org.springframework.jms.support.converter
Interface MessageConverter
All Known Subinterfaces:
SmartMessageConverter
All Known Implementing Classes:
MappingJackson2MessageConverter, MarshallingMessageConverter, MessagingMessageConverter, SimpleMessageConverter
- MappingJackson2MessageConverter: ๋ฉ์ธ์ง๋ฅผ Json ์ผ๋ก ์ํธ ๋ณํ
- MarshallingMessageConverter: ๋ฉ์ธ์ง๋ฅผ XML๋ก ์ํธ ๋ณํ
- MessagingMessageConverter: ๋ฉ์ธ์ง๋ฅผ Message ๊ฐ์ฒด๋ก ์ํธ ๋ณํ
- SimpleMessageConverter: ๋ฌธ์์ด์ TextMessage๋ก, byte ๋ฐฐ์ด์ ByteMessage๋ก, Map์ MapMessage๋ก, Serializable ๊ฐ์ฒด๋ฅผ ObjectMessage ๋ก ์ํธ ๋ณํ
convertAndSend() ์์๋ ๊ธฐ๋ณธ์ ์ผ๋ก SimpleMessageConverter๊ฐ ์ฌ์ฉ๋๋ฉฐ ์ด๋๋ Serializable ์ธํฐํ์ด์ค๋ฅผ ๊ตฌํํด์ผ ํ๋ค. Serializable ๊ตฌํ ์ ์ฝ์ ํผํ๊ธฐ ์ํด MappingJackson2MessageConverter ๋๋ ๋ค๋ฅธ ๋ณํ๊ธฐ๋ฅผ ์ฌ์ฉํ ์ ์๋ค. ์ด๋๋ ์ฌ์ฉํ ๋ณํ๊ธฐ๋ฅผ ๋น์ผ๋ก ๋ฑ๋กํด์ค๋ค.
@Bean
public MappingJackson2MessageConverter messageConverter() {
MappingJson2MessagingConverter messagingConverter =
new MappingJson2MessagingConverter();
messagingConverter.setTypeIdPropertyName("_typeId");
return messagingConverter;
}
โ ์์ ์๊ฐ ์์ ๋ ๋ฉ์ธ์ง์ ๋ณํ ํ์ ์ ์์์ผ ํ๊ธฐ ๋๋ฌธ์ MappingJackson2MessageConverter ์์ setTypeIdPropertyName() ์ ํธ์ถ ํ๋ค๋ ๊ฒ์ ์ ์ํ์. ์ฌ๊ธฐ์๋ ๋ณํ๋๋ ํ์ ์ ํด๋์ค ์ด๋ฆ์ด ํฌํจ๋๋๋ฐ ํจํค์ง ์ ์ฒด ๊ฒฝ๋ก๊ฐ ํฌํจ๋๊ธฐ ๋๋ฌธ์ ๋ฉ์ธ์ง ์์ ์๋ ๋๊ฐ์ ํด๋์ค(ํจํค์ง ๊ฒฝ๋ก๊น์ง ๋์ผ)ํ ํ์ ์ ๊ฐ์ ธ์ผ ํด์ ์ ์ฐ์ฑ์ด ๋ง์ด ๋จ์ด์ง๋ค.
โ ์ด๋ฅผ ํด๊ฒฐํ๊ณ ์ setTypeUdMappings()๋ฅผ ํธ์ถํ๋ค. ์ด๋ฌ๋ฉด ์ค์ ํ์ ์ ์์์ ํ์ ์ด๋ฆ์ ๋งคํ์ํฌ ์ ์๋ค.
@Bean
public MappingJackson2MessageConverter messageConverter() {
MappingJson2MessagingConverter messagingConverter =
new MappingJson2MessagingConverter();
messagingConverter.setTypeIdPropertyName("_typeId");
//์ถ๊ฐ
Map<String, Class<?>> typeIdMappings = new HashMap<>();
typeIdMappings.put("order", Order.class);
messagingConverter.setTypeIdMappings(typeIdMappings);
return messagingConverter;
}
โ ์์ ์์๊ฒ๋ "order"์ ๋งคํ๋ Order.class ๊ฐ ์ ๋ฌ๋๋ค. ์์ ์ดํ๋ฆฌ์ผ์ด์ ์๋ ์ด์ ์ ์ฌํ ๋ฉฐ์ธ์ง ๋ณํ๊ธฐ๊ฐ ๊ตฌ์ฑ๋์ด์์ ๊ฒ์ด๋ฏ๋ก order ๋ฅผ ์์ ์ด ์๊ณ ์๋ ์ฃผ๋ฌธ ๋ฐ์ดํฐ๋ก ๋งคํํ๋ฉด ๋๋ค.
์์ต์ฑ ์ข์ ์น ๋น์ฆ๋์ค ๋ชจ๋ธ์๋ ํญ์ ์ถ๊ฐ ์์ฒญ์ฌํญ์ด ์ค๊ธฐ ๋ง๋ จ์ด๋ค. ๋ง์ฝ ์์์ ํ์ฝ ์๋น์ค๊ฐ ์คํ๋ผ์ธ ํํ์์ ์จ๋ผ์ธ ํํ๋ก ์ฌ์ ์ ํ์ฅํ๋ค๊ณ ํด๋ณด์. ๊ทธ๋ผ ์ฌ๋๋ค์ 1. ๊ฐ๊ฒ์์ ์ง์ , 2. ์น์ฌ์ดํธ ๋ฅผ ํตํด์ ํ์ฝ๋ฅผ ์ฃผ๋ฌธํ ์ ์๋ค. ๊ทธ๋ผ ์ฃผ๋ฌธ ๋ฉ์ธ์ง๊ฐ ์ด๋์ ์์ฑ์ ํ๋์ง ํ๋ณ์ด ๊ฐ๋ฅํด์ผ ํ๋ค. ์ด๋ด๋๋ Order ํด๋์ค์ ๊ฐ์ฒด๋ฅผ (์ฃผ๋ฌธํ๋ ์ชฝ/ ์ฃผ๋ฌธ๋ฐ๋ ์ชฝ aka ์ฃผ๋ฐฉ) ์์ชฝ์์ ๊ณ ์น๋ ๊ฒ๋ณด๋ค ์ปค์คํ ํค๋๋ฅผ ๋ฉ์ธ์ง์ ์ถ๊ฐํ๋๊ฒ ์ข๋ค.
์ด๋ป๊ฒ ๋ฉ์ธ์ง์ ์ปค์คํ ํค๋๋ฅผ ๋ฃ๋ ํ์ฒ๋ฆฌ๋ฅผ ํด์ค๊น?
send()์ ๊ฒฝ์ฐ ๋น๊ต์ ๊ฐ๋จํ๊ฒ ์ ์ก ์ ์ Message ๊ฐ์ฒด์ setStringProperty() ๋ฅผ ํธ์ถํ๋ฉด ๋๋ค.
@Override
public void sendOrder(Order order) {
jms.send(
"tacocloud.order.queue",
session -> {
Message message = session.createObjectMessage(order);
message.setStringProperty("X_ORDER_SOURCE", "WEB");
});
}
converterAndSend()์ ๊ฒฝ์ฐ๋ ์ฐ๋ฆฌ๊ฐ ์ง์ Message ๋ก ๋ณํํ๋๊ฒ ์๋๋ค ๋ณด๋ Message ๊ฐ์ฒด์ ์ง์ ์ ๊ทผ์ ํ ์ ์๋ค. ์ด๋ด๋๋ converterAndSend()์ ๋ง์ง๋ง ์ธ์์ธ MessagePostProcessor ๋ฅผ ์ ๋ฌํ๋ฉด Message ๊ฐ์ฒด๊ฐ ์์ฑ๋ ํ ์ด ๊ฐ์ฒด์ ์ฐ๋ฆฌ๊ฐ ํ์ํ ํ์ฒ๋ฆฌ๋ฅผ ํ ์ ์๋ค.
@Override
public void sendOrder(Order order) {
jms.convertAndSend("tacocloud.order.queue", order, new MessagePostProcessor() {});
@Override
public Message postProcessMessage(Message message) throw JMSException {
message.setStringProperty("X_ORDER_SOURCE", "WEB");
return message;
}
}
// ๋๋ค๋ฅผ ์ฌ์ฉํ ๊ฐ๊ฒฐํ ์์
@Override
public void sendOrder(Order order) {
jms.convertAndSend("tacocloud.order.queue", order,
message-> {
message.setStringProperty("X_ORDER_SOURCE", "WEB");
return message;
});
}
์ฌ๊ธฐ์ ๊ตฌํํ MessagePostProcessor ๋ฅผ ๋ค๋ฅธ convertAndSend ํธ์ถ์๋ ์ ๋๋ดํ๊ฒ ์ฌ์ฉํ๊ณ ์ถ๋ค๋ฉด ๋๋ค๋ณด๋ค๋ ๋ฉ์๋ ์ฐธ์กฐ๋ฅผ ์ฌ์ฉํ๋ ๋ฐฉํฅ์ผ๋ก ๊ตฌํํ์.
@GetMapping("/convertAndSend/order")
public String convetAndSendOrder() {
Order order = buildOrder();
jms.convertAndSend("tacocloud.order.queue", order,
this::addOrderSource);
return "Convert and sent order";
}
private Message addOrderSource(Message message) throws JMSException {
message.setStringProperty("X_ORDER_SOURCE", "WEB");
return message;
}
๋ฉ์ธ์ง ์์ ๋ฐฉ๋ฒ
- ํ ๋ชจ๋ธ(pull model): ๋ฉ์ธ์ง๋ฅผ ์์ฒญํ๊ณ ์์ฒญํ ๋ฉ์ธ์ง๊ฐ ๋์ฐฉํ ๋ ๊น์ง ๊ธฐ๋ค๋ฆฐ๋ค.
- ํธ์ ๋ชจ๋ธ(push model): ๋ฉ์ธ์ง๋ฅผ ์์ฒญํ๊ณ , ์ค์ ๋ก ์์ ๊ฐ๋ฅํ ์ํ๊ฐ ๋๋ฉด ๋ฉ์ธ์ง๋ฅผ ์ ๋ฌ ๋ฐ๋๋ค.
- JmsTemplate์ ๊ฒฝ์ฐ ๋ชจ๋ ๋ฉ์๋๊ฐ ํ ๋ชจ๋ธ์ ์ฌ์ฉํ๋ค. ๋ฐ๋ผ์ ๋ฉ์๋๋ฅผ ํ๋ ํธ์ถํด์ ๋ฉ์ธ์ง๋ฅผ ์์ฒญํ๋ฉด ์ค๋ ๋์์ ๋ฉ์ธ์ง๋ฅผ ์์ ํ ์ ์์ ๋ ๊น์ง ํ๊ฐํ๊ฒ ๊ธฐ๋ค๋ฆฐ๋ค.
- ํธ์ ํ์์ ์ฌ์ฉํ๋ ค๋ฉด ๋ฉ์ธ์ง ๋ฆฌ์ค๋๋ฅผ ์ ์ํด์ผ ํ๋ค.
- ๋ ๋ชจ๋ธ ๋ชจ๋ ๊ฐ๋ฅํ๋, ์ค๋ ๋ ์คํ์ ๋ง์ง ์๋๋ค๋ ์ด์ ๋ก ์ผ๋ฐ์ ์ผ๋ก๋ ํธ์ ๋ชจ๋ธ์ด ์ข์ ์ ํ์ด๋ค. (๋ง์ ๋ฉ์ธ์ง๊ฐ ๋๋ฌด ๋นจ๋ฆฌ ๋์ฐฉํ๋ค๋ฉด ๊ณผ๋ถํ๊ฐ ๊ฑธ๋ฆด ์๋ ์๋ค).
Message receive() throws JmsException;
Message receive(Destination destination) throws JmsException;
Message receive(String destinationName) throws JmsException;
Object receiveAndConvert() throws JmsException;
Object receiveAndConvert(Destination destination) throws JmsException;
Object receiveAndConvert(String destinationName) throws JmsException;
- JmsTemplate์ send()์ convertAndSend() ๋ฉ์๋์ ๋์๋๋ค.
- receive ๋ ์์ํ์ ๋ฉ์ธ์ง๋ฅผ ์์ , convertAndSend ๋ ๋ฉ์ธ์ง๋ฅผ ๋๋ฉ์ธ ํ์ ์ผ๋ก ๋ณํํ๊ธฐ ์ํด ๊ตฌ์ฑ๋ ๋ฉ์ธ์ง ๋ณํ๊ธฐ๋ฅผ ์ฌ์ฉํ๋ค.
- ๋ง์ฐฌ๊ฐ์ง๋ก String ๋๋ Destination ์ผ๋ก ๋์ฐฉ์ง๋ฅผ ์ค์ ํ ์ ์๋ค. ๋จ, ์ฌ๊ธฐ์ ๋์ฐฉ์ง๋ ๋ฉ์ธ์ง๋ฅผ ๊ฐ์ ธ์ค๋ ๊ณณ์ด๋ค.
package tacos.kitchen.messaging.jms;
import javax.jms.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.stereotype.Component;
@Component
public class JmsOrderReceiver implements OrderReceiver {
private JmsTemplate jms;
private MessageConverter converter;
@Autowired
public JmsOrderReceiver(JmsTemplate jms, MessageConverter converter) {
this.jms = jms;
this.converter = converter;
}
public Order receiveOrder() {
Message message = jms.receive("tacocloud.order.queue");
return (Order) converter.fromMessage(message);
}
}
โ ์์ ๋ฉ์ธ์ง์ ํ์ ID ์์ฑ์ ํด๋น ๋ฉ์ธ์ง๋ฅผ Order ๊ฐ์ฒด๋ก ๋ณํํ๋ผ๊ณ ์๋ ค์ฃผ์ง๋ง, receive๋ Object ๋ก ๋ฐ์์ค๊ธฐ ๋๋ฌธ์ Order๋ก ์บ์คํ ์ ํ ํ์ ๋ณํํด์ผ ํ๋ค.
โMessgae ๊ฐ์ฒด๋ฅผ ๋ฉ์ธ์ง๋ก ์์ ํ๋๊ฒ ์ ์ฉํ ์ ์์ง๋ง ๋๋ถ๋ถ์ ๊ฒฝ์ฐ ๋ฉํ๋ฐ์ดํฐ๋ ํ์์๊ณ ๋ชธ์ฒด์ธ ํ์ด๋ก๋๋ง ํ์ํ๋ค. ์ด ๊ฒฝ์ฐ ํ์ด๋ก๋๋ฅผ ๋๋ฉ์ธ ํ์ ์ผ๋ก ๋ณํํ๋ฉฐ, ๋ฉ์ธ์ง ๋ณํ๊ธฐ๊ฐ ํด๋น ์ปดํฌ๋ํธ์ DI ๋์ด์ผ ํ๋ค.
โ๋ฐ๋ผ์ ํ์ด๋ก๋๋ง ํ์ํ๋ค๋ฉด receiveAndConvert ๋ฅผ ์ฌ์ฉํ์.
โ๋ชจ๋ ๋ฉ์ธ์ง ๋ณํ์ receiveAndConvet์์ ์ํ๋๊ธฐ ๋๋ฌธ์ ๋์ด์ MessaceConveter ๋ฅผ ์ฃผ์ ํ ํ์๊ฐ ์์ด์ก๋ค!
@Component
public class JmsOrderReceiver implements OrderReceiver {
private JmsTemplate jms;
@Autowired
public JmsOrderReceiver(JmsTemplate jms) {
this.jms = jms;
}
public Order receiveOrder() {
return (Order) jms.receiveAndConvert("tacocloud.order.queue");
}
}
ํ๋ชจ๋ธ์ด receive()๋ receiveAndConvert()๋ฅผ ํธ์ถํ๋ ๊ฒ๊ณผ ๋ค๋ฅด๊ฒ ํธ์ ๋ชจ๋ธ์ ๋ฉ์ธ์ง ๋ฆฌ์ค๋๋ ๋ฉ์ธ์ง๊ฐ ๋์ฐฉํ ๋ ๊น์ง ๋๊ธฐํ๋ ์๋์ ์ปดํฌ๋ํธ๋ค. JMS ๋ฉ์ธ์ง์ ๋ฐ์ํ๋ ๋ฉ์ธ์ง ๋ฆฌ์คํธ๋ฅผ ์์ฑํ๊ธฐ ์ํด์๋ @JmsListener ์ด๋ ธํ ์ด์ ์ ์ฌ์ฉํ๋ฉด ๋๋ค.
package tacos.kitchen.messaging.jms.listener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
@Component
public class OrderListener {
private KitchenUI ui;
@Autowired
public OrderListener(KitchenUI ui) {
this.ui = ui;
}
@JmsListener(destination = "tacocloud.order.queue")
public void receiveOrder(Order order) {
ui.displayOrder(order);
}
}
- ๋์ฐฉ์ง tacocloud.order.queue ์ ์๋ ๋ฉ์ธ์ง๋ฅผ ๋ฆฌ์ค๋ํ๊ธฐ ์ํด @JmsListenr ์ด๋ ธํ ์ด์ ์ฌ์ฉ
- JmsTemplate ๋ฅผ ์ฌ์ฉํ์ง ์๊ณ , ์ฐ๋ฆฌ์ ์ดํ๋ฆฌ์ผ์ด์ ์ฝ๋์์๋ ํธ์ถ๋์ง ์๋๋ค.
- ์คํ๋ง ํ๋ ์์ํฌ ์ฝ๋๊ฐ ํน์ ๋์ฐฉ์ง์ ๋ฉ์ธ์ง๊ฐ ๋์ฐฉํ๋ ๊ฑธ ๊ธฐ๋ค๋ฆฌ๋ค๊ฐ ๋์ฐฉํ๋ฉด ํด๋น ๋ฉ์ธ์ง์ ์ ์ฌ๋ Order ๊ฐ์ฒด๋ฅผ ์ธ์๋ก ์ ๋ฌํ๋ฉฐ receiveOrder() ์๋ ํธ์ถํ๋ค.
- @JmsListener ์ด๋ ธํ ์ด์ ์ @GetMapping @PostMapping ๋ฑ ๊ณผ ์ ์ฌ. ํน์ ๊ฒฝ๋ก์ ๋ํ ์์ฒญ์ ๋ฐ์ํ๋ ๊ฒ์ฒ๋ผ ํน์ ๋์ฐฉ์ง์ ๋ค์ด์ค๋ ๋ฉ์ธ์ง์ ๋ฐ์ํ๋ค.
- ๋ฉ์ธ์ง ๋ฆฌ์ค๋๋ ์ค๋จ ์์ด ๋ค์์ ๋ฉ์ธ์ง๋ฅผ ๋น ๋ฅด๊ฒ ๊ฐ์ ธ์ฌ ์ ์์ผ๋ ๊ฐ์ฒด๋ฅผ ๋ฐ์์ ์ฌ์ฉํ๋ ํด๋ผ์ด์ธํธ๊ฐ ์๋๊ฐ ๋๋ฆฌ๋ฉด ๋ณ๋ชฉํ์์ด ์ผ์ด๋ ์ ์์ด์ ๋ฉ์ธ์ง ๋ฒํผ๋ง ๋ฑ์ ๊ตฌํํด์ผ ํ๋ค.
JMS๋ ํ์ค ๋ฐ์ ๋ช ์ธ์ ์ ์๋์ด ์๊ณ ์ฌ๋ฌ ๋ธ๋ก์ปค์์ ์ง์๋์ง๋ง ์๋ฐ ์ดํ๋ฆฌ์ผ์ด์ ์์๋ง ์ฌ์ฉํ ์ ์๋ค๋ ๋จ์ ์ด ์๋ค. JVM ์ธ์ ๋ค๋ฅธ ํ๋ ํผ์์๋ ์ฌ์ฉํ ์ ์๋ RabbitMQ์ ์นดํ์นด๋ฅผ ์์๋ณด์.
AMQP๊ฐ ๊ตฌํํ RabbitMQ ๋ JMS ๋ณด๋ค ํฅ์๋ ๋ฉ์ธ์ง ๋ผ์ฐํ ์ ๋ต์ ์ ๊ณตํ๋ค. JMS ๋ฉ์ธ์ง๋ ์์ ์๊ฐ ๊ฐ์ ธ๊ฐ ๋ฉ์ธ์ง ๋์ฐฉ์ง์ ์ด๋ฆ์ ์ฃผ์๋ก ์ฌ์ฉํ๋๋ฐ RabbitMQ ๋ฉ์ธ์ง๋ ์์ ์๊ฐ ๋ฆฌ์ค๋ํ๋ ํ์ ๋ถ๋ฆฌ๋์ด ์๋ ๊ฑฐ๋์ ์ด๋ฆ๊ณผ ๋ผ์ฐํ ํค๋ฅผ ์ฃผ์๋ก ํ๋ค.
๋ฉ์ธ์ง๊ฐ RabbitMQ ๋ธ๋ก์ปค์ ๋์ฐฉํ๋ฉด ์ฃผ์๋ก ์ง์ ๋ ๊ฑฐ๋์์ ๋ค์ด๊ฐ๋ค. ๊ฑฐ๋์๋ ํ๋ ์ด์์ ํ์ ๋ฉ์ธ์ง๋ฅผ ์ ๋ฌํ ์ฑ ์์ด ์์ผ๋ฉฐ ์ฒ๋ฆฌ ๊ธฐ๋ฐ์ ๊ฑฐ๋์์ ํ์ , ๊ฑฐ๋์์ ํ ๊ฐ์ ๋ฐ์ธ๋ฉ, ๋ฉ์ธ์ง์ ๋ผ์ฐํ ํค ๊ฐ์ด๋ค.
๊ฑฐ๋์ ์ข ๋ฅ
- ๊ธฐ๋ณธ: ๋ธ๋ก์ปค๊ฐ ์๋์ผ๋ก ์์ฑํ๋ ๊ฑฐ๋์. ํด๋น ๋ฉ์ธ์ง์ ๋ผ์ฐํ ํค์ ์ด๋ฆ์ด ๊ฐ์ ํ๋ก ๋ฉ์ธ์ง๋ฅผ ์ ๋ฌํ๋ค. ๋ํ ๋ชจ๋ ํ๋ ์๋์ผ๋ก ๊ธฐ๋ณธ ๊ฑฐ๋์์ ์ฐ๊ฒฐ๋๋ค.
- ๋๋ ํธ: ๋ฐ์ธ๋ฉ ํค๊ฐ ํด๋น ๋ฉ์ธ์ง์ ๋ผ์ฐํ ํค์ ๊ฐ์ ํ์ ๋ฉ์ธ์ง๋ฅผ ์ ๋ฌํ๋ค.
- ํ ํฝ: **๋ฐ์ธ๋ฉ ํค(์์ผ๋ ์นด๋ ํฌํจ)**๊ฐ ํด๋น ๋ฉ์ธ์ง์ ๋ผ์ฐํ ํค์ ๊ฐ์ ํ๋ค์ ๋ฉ์ธ์ง๋ฅผ ์ ๋ฌํ๋ค.
- ํฌ์์:๋ฐ์ธ๋ฉ ํค, ๋ผ์ฐํ ํค์ ์๊ด์์ด ์ฐ๊ฒฐ๋ ๋ชจ๋ ํ์ ๋ฉ์ธ์ง๋ฅผ ์ ๋ฌํ๋ค.
- ํค๋: ํ ํฝ๊ณผ ์ ์ฌํ๋ ๋ผ์ฐํ ํค ๋์ ๋ฉ์ธ์ง ํค๋ ๊ฐ์ ๊ธฐ๋ฐ์ผ๋ก ํ๋ค.
- ๋ฐ๋ ๋ ํฐ: ์ ๋ฌ์ด ๋ถ๊ฐ๋ฅํ ๋ฉ์ธ์ง๋ค์ ๋ชจ์๋๋ ์ฐ๋ ๊ธฐ์ฅ
๊ฐ๋จํ ํํ์ธ ๊ธฐ๋ณธ๊ณผ ํฌ์์ ๊ฑฐ๋์๋ JMS์ ํ ํํ์๋ ์ ์ฌํ๋ค. ๊ทธ๋ฌ๋ ๋ค๋ฅธ ๊ฑฐ๋์๋ฅผ ์ฌ์ฉํ๋ฉด ๋ ์ ์ฐํ ๋ผ์ฐํ ์คํฌ์ ์ ์ํ ์ ์๋ค.
๋ฉ์ธ์ง๋ ๋ผ์ฐํ ํค๋ฅผ ๊ฐ๊ณ ๊ฑฐ๋์๋ก ์ ๋ฌ๋๊ณ ํ์์ ์ฝํ์ ธ ์๋น๋๋ค๋ ๊ฒ์ ์ดํดํ๋ ๊ฒ์ด ๊ฐ์ฅ ์ค์ํ๋ค! ๋ฉ์ธ์ง๋ ๋ฐ์ธ๋ฉ ์ ์๋ฅผ ๊ธฐ๋ฐ์ผ๋ก ๊ฑฐ๋์๋ก๋ถํฐ ํ๋ก ์ ๋ฌ๋๋ค.
์คํ๋ง ์ดํ๋ฆฌ์ผ์ด์ ์์ ๋ฉ์ธ์ง๋ฅผ ์ ์กํ๊ณ ์์ ํ๋ ๋ฐฉ๋ฒ์ ์ฌ์ฉํ๋ ๊ฑฐ๋์ ํ์ ๊ณผ ๋ฌด๊ดํ๋ฉฐ, ๊ฑฐ๋์์ ํ์ ๋ฐ์ธ๋ฉ์ ์ ์ํ๋ ๋ฐฉ๋ฒ๊ณผ๋ ๊ด๊ณ๊ฐ ์๋ค. (์คํ๋ง ๋จ์์ ๊ฑฐ๋์ ๊ด๋ จ ์ค์ ์ด ์๋ ๊ฒ). RabbitMQ๋ฅผ ์ฌ์ฉํด์ ๋ฉ์ธ์ง๋ฅผ ์ ์ก/์์ ํ๋ ์ฝ๋๋ฅผ ์์ฑํด์ผ ํ๋ค.
JMS ์์ ์์ Artemis๋ฅผ ํ๋ก์ ํธ์ ์ถ๊ฐํ ๊ฒ ์ฒ๋ผ ์ด๋ฒ์๋ RabbitMQ๋ฅผ ๊ตฌํํ AMQP๋ฅผ ์ถ๊ฐํด์ฃผ๋ฉด ๋๋ค. ํ๋ก์ ํธ์ ๋น๋๋ฅผ ํ๋ฉด ๋ฉ์ธ์ง ์ ์ก์ ๊ฐ๋ฅํ๊ฒ ํด์ฃผ๋ RabbitTemplate์ ์ฌ์ฉํ ์ ์๋ค.
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
Artemis ์ Properties๋ฅผ ์ง์ ํ๋ ๊ฒ์ฒ๋ผ rabbitmq๋ฅผ ์ํ properties๋ฅผ ์ง์ ํด์ฃผ์. ํํ๋ Artemis์ ์ ์ฌํ๋ค. ๊ฐ๋ฐํ๊ฒฝ์์๋ properties ์ง์ ์ด ํฐ ์๊ด์ด ์์ผ๋ ์ด์ ํ๊ฒฝ์ผ๋ก ๋ณ๊ฒฝํ ๋ ์ ์ฉํ๋ ์ค์น ์ด๋ฐ์ ๊ผญ ์ค์ ํด์ฃผ์.
spring:
profiles: prod
rabbitmq:
host: rabbit.tacocloud.com
port: 5673
username: tacoweb
password: l3tm31n
RabbitTemplate๋ JMSTemplate๊ณผ ์ ์ฌํ send()์ convertAndSend()๋ฅผ ์ฌ์ฉํ๋, JmsTemplate์ด ์ง์ ๋ ํ์๋ง ๋ฉ์ธ์ง๋ฅผ ์ ์กํ๋ค๋ฉด RabbitTemplate์ ๊ฑฐ๋์์ ๋ผ์ฐํ ํค๋ฅผ ์ด์ฉํด์ ๋ฉ์ธ์ง๋ฅผ ์ ์กํ๋ค.
void send(Message message) throws AmqpException;
void send(String routingKey, Message message) throws AmqpException;
void send(String exchange, String routingKey, Message message) throws AmqpException;
void convertAndSend(Object message) throws AmqpException;
void convertAndSend(String routingKey, Object message) throws AmqpException;
void convertAndSend(String exchange, String routingKey, Object message) throws AmqpException;
void convertAndSend(Object message, MessagePostProcessor mPP) throws AmqpException;
void convertAndSend(String routingKey, Object message, MessagePostProcessor messagePostProcessor) throws AmqpException;
void convertAndSend(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor) throws AmqpException;
(๊ต์ฌ์์๋ CorrelationData ๋ฅผ ์ธ์๋ก ๋ฐ๋ ํจ์์ ๋ํด์๋ ์ค๋ช ํ์ง ์๋๋ค)
- send ๋ฉ์ธ์ง๋ ์์ ํํ์ Message๋ฅผ ์ ์ก, convertAndSend๋ ๋ด๋ถ ๋ณํ ํ Message ์ ์ก (JmsTemplate๊ณผ ๋์ผ)
- convertAndSend์ MessagePostProcessor ๋ฅผ ์ธ์๋ก ๋๊ฒจ ํ์ฒ๋ฆฌ๋ฅผ ํ๋ค. (JmsTemplate๊ณผ ๋์ผ)
- Destination ๊ฐ์ฒด๋ String์ผ๋ก ๋์ฐฉ์ง ์ ๋ณด๋ฅผ ๋ฐ์๋ JmsTemplate๊ณผ๋ ๋ค๋ฅด๊ฒ String routingKey, String exchange๋ก ๋ผ์ฐํ ํค์ ๊ฑฐ๋์๋ฅผ ์ง์ ํ๋ ๋ฌธ์์ด์ ์ธ์๋ก ๋ฐ๋๋ค.
- ๋ผ์ฐํ ๊ณผ ๊ฑฐ๋์๋ฅผ ์ง์ ํ์ง ์์๋ค๋ฉด ๊ธฐ๋ณธ ๋ผ์ฐํ ๊ณผ ๊ฑฐ๋์๊ฐ ์ฌ์ฉ๋๋ค. properties ์ค์ ์ ํตํด ๊ธฐ๋ณธ์ผ๋ก ์ฌ์ฉ๋๋ ๋ผ์ฐํ ์นดใ ฃ์ ๊ฑฐ๋์๋ฅผ ๋ฐ๊ฟ ์ ์๋ค.
spring:
rabbitmq:
template:
exchange: tacocloud.orders
routing-key: kitchens.central
@Service
public class RabbitOrderMessagingService implements OrderMessagingService {
private RabbitTemplate rabbit;
@Autowired
public RabbitOrderMessagingService(RabbitTemplate rabbit) {
this.rabbit = rabbit;
}
public void sendOrder(Order order) {
MessageConverter converter = rabbit.getMessageConverter();
MessageProperties props = new MessageProperties();
Message message = converter.toMessage(order, props);
rabbit.send("tacocloud.order", message);
}
}
- RabbitTemplate์๋ ์ด๋ฏธ MessageConverter๊ฐ ๊ตฌํ์ด ๋์ด์๊ธฐ ๋๋ฌธ์ getMessageConverter()๋ก ์ปจ๋ฒํฐ๋ฅผ ๋ถ๋ฌ์ ์ฌ์ฉ ํด์ฃผ๋ฉด ๋๋ค.
- ๋ฉ์ธ์ง ์์ฑ์ MessageProperties๋ฅผ ์ฌ์ฉํด์ ์ ๊ณตํด์ผ ํ๋๋ฐ ๋ณ๋ค๋ฅธ ์์ฑ์ ์ ๋ฌํด์ค ํ์๊ฐ ์๋ค๋ฉด ๊ธฐ๋ณธ ์ธ์คํด์ค ๊ทธ๋๋ก ๋ณด๋ด๋ฉด ๋๋ค.
- ์์ ์์๋ ๋ผ์ฐํ ํค์ธ tacocloud.order๋ง ์ ๊ณต๋๊ณ ๊ฑฐ๋์์ ๋ํ ์ ๋ณด๋ ์ ๊ณต๋๊ณ ์์ง ์์ ๊ธฐ๋ณธ ๊ฑฐ๋์๊ฐ ์ฌ์ฉ๋๋ค.
- ๊ธฐ๋ณธ๊ฑฐ๋์ ์ด๋ฆ์ ๋น ๋ฌธ์์ด "" ์ด๊ณ ์ด๋ RabbitMQ ๋ธ๋ก์ปค๊ฐ ์์ฑํ๋ ๊ธฐ๋ณธ ๊ฑฐ๋์์ ์ผ์นํ๋ค. ๊ธฐ๋ณธ ๋ผ์ฐํ ๋ํ ""์ด๋ค.
//MessageConverter๊ฐ ๋ด๋ถ๋ก ๊ตฌํ๋์ด ์์ด ๋ ํธ๋ฆฌํ convertAndSend์ ์์
public void sendOrder(Order order) {
rabbit.convertAndSend("tacocloud.order", order);
}
org.springframework.jms.support.converter
Interface MessageConverter
All Known Subinterfaces:
SmartMessageConverter
All Known Implementing Classes:
MappingJackson2MessageConverter, MarshallingMessageConverter, MessagingMessageConverter, SimpleMessageConverter
๋ฉ์ธ์ง ๋ณํ์ JMSTemplate์์ ์ค๋ช ํ ๋ฉ์ธ์ง ๋ณํ๊ธฐ์ ๊ต์ฅํ ์ ์ฌํ๋ค. ๊ธฐ๋ณธ์ค์ ์ผ๋ก ์ฌ์ฉ๋๋๊ฒ์ SimpleMessageConverter์ธ๋ฐ ์ดํฉํฐ๋ธ ์๋ฐ์์ ๋์๋ ๊ฒ ์ฒ๋ผ Serializable์ ๊ตฌํํ ๋๋ ์ ์คํด์ผ ํ๊ณ side effect ๋ ๋ฐ์ํ ์ํ์ด ์์ด์ ๊ฐ๋ณ ์ค์ต์ ํ ๋๋ MappingJackson2MessageConverter ๋ฅผ ์ฌ์ฉํด ๊ฐ์ฒด๋ฅผ JSON ํํ๋ก ๋ณํํ๋ค.
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
JMSTemplate์์๋ ๋ค์ ๋ณต์กํ๊ฒ Bean์ ๋ฑ๋กํ์ผ๋ RabbitTemplate์ ์ข ๋ ๊ฐ๋จํ ํํ๋ก ๋ฑ๋ก์ด ๊ฐ๋ฅํ๋ค. ๋ฑ๋กํด๋๋ฉด ์คํ๋ง์ด ์์์ ์ด ๋น์ ์ฐพ์์ RabbitTemplate์ ๋ณํ๊ธฐ๋ก ์ฃผ์ ํ๋ค!
JMS์์์ฒ๋ผ ํด๋ ๋ฑ์ ์ถ๊ฐ ์ค์ ์ ํด์ผ ํ ๋๋ฅผ ์ดํด๋ณด๊ฒ ๋ค. send()๋ฅผ ์ฌ์ฉํด์ ์ง์ ์ถ๊ฐ๋ฅผ ํด์ฃผ๋ ๋ฐฉ๋ฒ๊ณผ convertAndSend()์ฒ๋ผ ๋ฉ์ธ์ง ๋ณํ์ ๋ด๊ฐ ์ง์ ํ์ง ์์ converter ๊ฐ์ฒด ์์ด, MessagePostProcessor ๋ฅผ ์ด์ฉํด ๋ฉ์ธ์ง ์ถ๊ฐ๋ฅผ ๊ฐ์ ์ ์ผ๋ก ํด์ผ ํ๋ ๊ฒฝ์ฐ๊ฐ ์๋ค.
- send()
public void sendOrder(Order order) {
MessageConverter converter = rabbit.getMessageConverter();
MessageProperties props = new MessageProperties();
// ์ถ๊ฐ
props.setHeader("X_ORDER_SOURCE", "WEB");
Message message = converter.toMessage(order, props);
rabbit.send("tacocloud.order", message);
}
- convertAndSend()
- ๊ตฌํํ ์ต๋ช ๋ด๋ถ ํด๋์ค MessagePostProcessor๋ฅผ convertAndSend()์ธ์๋ก ์ ๋ฌ. Message ๊ฐ์ฒด๊ฐ ๊ฐ์ ธ์จ Properties์ ํค๋๋ฅผ ์ถ๊ฐ ์ค์ ํด์ ๋ณด๋ด์ค
public void sendOrder(Order order) {
rabbit.convertAndSend("tacocloud.order.queue", order,
new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message)
throws AmqpException {
MessageProperties props = message.getMessageProperties();
props.setHeader("X_ORDER_SOURCE", "WEB");
return message;
}
});
}
๋ฉ์ธ์ง ์์ ๋ฐฉ๋ฒ๋ JMSTemplate๊ณผ ๋๋์์ดํ๋ค.
- RabbitTemplate์ ์ด์ฉํด ํ๋ก๋ถํฐ ๋ฉ์ธ์ง ๊ฐ์ ธ์ค๊ธฐ (ํ ๋ฐฉ์)
- @RabbitListener์ ๋ฉ์๋์ ์ง์ ํด์ ๋ฉ์ธ์ง ๊ฐ์ ธ์ค๊ธฐ (ํธ์ ๋ฐฉ์)
Message receive() throws AmqpException;
Message receive(String queueName) throws AmqpException;
Message receive(long timeoutMillis) throws AmqpException;
Message receive(String queueName, long timeoutMillis) throws AmqpException;
Object receiveAndConvert() throws AmqpException;
Object receiveAndConvert(String queueName) throws AmqpException;
Object receiveAndConvert(long timeoutMillis) throws AmqpException;
Object receiveAndConvert(String queueName, long timeoutMillis) throws AmqpException;
// ๆฅๆถไปๆถๆฏ่ฝฌๆข่ฟๆฅ็็ฑปๅๅฎๅ
จ็ๅฏน่ฑก
<T> T receiveAndConvert(ParameterizedTypeReference<T> type) throws AmqpException;
<T> T receiveAndConvert(String queueName, ParameterizedTypeReference<T> type) throws AmqpException;
<T> T receiveAndConvert(long timeoutMillis, ParameterizedTypeReference<T> type) throws AmqpException;
<T> T receiveAndConvert(String queueName, long timeoutMillis, ParameterizedTypeReference<T> type) throws AmqpException;
(RabbitTemplate ์์ ๋ฉ์๋ ๋ง์ ํน์ง)
- ์์ ๋ฉ์๋ ๋ชจ๋๊ฐ ๊ฑฐ๋์๋ ๋ผ์ฐํ ํค๋ฅผ ๋งค๊ฐ๋ณ์๋ก ๊ฐ์ง๊ณ ์์ง ์๋ค! ๊ฑฐ๋์์ ๋ผ์ฐํ ํค๋ ๋ฉ์ธ์ง๋ฅผ ํ๋ก ์ ๋ฌํ๋๋ฐ ์ฌ์ฉ์ด ๋๋ ๊ฒ์ด๊ณ , ์ผ๋จ ๋ฉ์ธ์ง๊ฐ ํ์ ๋ค์ด๊ฐ๋ฉด ๋์ฐฉ์ง๋ ๋ฌด์กฐ๊ฑด ํ์๊ฒ์ ๋ฉ์ธ์ง๋ฅผ ์๋นํ๋ (์์ /์ฌ์ฉ)ํ๋ ์ปจ์๋จธ๋ผ์ ์ด์ชฝ์์๋ ๋ผ์ฐํ ์ ๊ฑฐ๋์๋ฅผ ์ ๊ฒฝ์ธ ํ์๊ฐ ์๋ค.
- ๋ฉ์ธ์ง ์์ ํ์์์์ ์ค์ ํ๊ธฐ ์ํด longํ์ ์ ๋งค๊ฐ๋ณ์๋ฅผ ์ ๋ฌํ๋ค. (๊ธฐ๋ณธ๊ฐ์ 0๋ฐ๋ฆฌ์ด, ๋ง์ฝ ์์ ํ ๋ฉ์ธ์ง๊ฐ ์์ผ๋ฉด null ๋ฐํ)
- ํ์ ์ธ์ดํํ๊ฒ ๊ฐ์ฒด๋ฅผ ์์ ํ ์ ์๋ ์ ๋ค๋ฆญ ํ์ ์ receiveAndConvert๋ ์ค๋ฒ๋ก๋ฉ์ด ๋์ด์๋ค.
@Component
public class RabbitOrderReceiver {
private RabbitTemplate rabbit;
private MessageConverter converter;
@Autowired
public RabbitOrderReceiver(RabbitTemplate rabbit) {
this.rabbit = rabbit;
this.converter = rabbit.getMessageConverter();
}
public Order receiveOrder() {
// ์ง์ฐ ์๋ ๋ฒ์ ผ
Message message = rabbit.receive("tacocloud.orders");
// 30์ด ์ง์ฐ ํ์ฉํ๋ ํํ
//Message message = rabbit.receive("tacocloud.orders", 30000);
return message != null
? (Order) converter.fromMessage(message)
: null;
}
}
- receive ๋ฉ์ธ์ง์ ์ ๋ฌ๋ ํ ์ฃผ์ 'tacocloud.orders'์์ ์ฃผ๋ฌธ ๋ฐ์ดํฐ๋ฅผ ๊ฐ์ ธ์ด
- ๊ฐ์ฒด๊ฐ ์์ผ๋ฉด null ๋ฐํ, ๊ฐ์ฒด๊ฐ ๋ฐํ๋๋ฉด ๋ฐ๋ก convet์ง์์ ์ ํ๋ ํจ์๋ผ์ rabbittemplate์์ ์ปจ๋ฒํฐ๋ฅผ ๊ฐ์ ธ์์ order ๊ฐ์ฒด๋ก ๋ณํ
- ๋ฐ๋ก ๋ฉ์ธ์ง๊ฐ ์ค์ง ์๋๋ค๊ณ ์ง์ฐ์ ํ์ฉํ๋ค๋ฉด recevice(ํ์ฃผ์, ์ง์ฐ์๊ฐ)์ ์จ์ฃผ๋ฉด ๋๋ค.
- ์ง์ฐ ํ์ฉ ์๊ฐ์ ํ๋์ฝ๋ฉํ๋๊ฒ ์ซ์๋ฉด properties์์ rabbitmqํญ๋ชฉ์ receive-timeout์ ์ค์ ํด์ฃผ๋ฉด ๋๋ค!
spring:
rabbitmq:
template:
receive-timeout: 30000
receiveAndConvertํจ์๋ฅผ ์ฌ์ฉํด์ฃผ๋ฉด ๋ฐ๋ก rabbitTemplate์์ converter๋ฅผ ๊บผ๋ด์ง ์๊ณ ๋ฐ๋ก ์ปจ๋ฒํ ์ด ๊ฐ๋ฅํ๋ค. Object ํ์ ์ Order ํ์ ์ผ๋ก ์บ์คํ ๋ง ํด์ฃผ๋ฉด ๋๋๋ฐ ์ข ๋ ํ์ ์ธ์ดํํ๊ฒ ์ปจ๋ฒํ ์ ํ ์ ์๋ค. revceiveAndConvert ํจ์์ ์ธ์๋ก ParameterizedTypeReference๋ฅผ ์ฃผ๋ ๊ฒ์ด๋ค. ๋ง ๊ทธ๋๋ก ๋ณํํ ๊ฐ์ฒด ํํ์ ๋ ํผ๋ฐ์ค๋ฅผ ์ ๋ฌํด์ฃผ๋ ๊ฒ์ด๋ค.
public Order receiveOrder() {
return rabbit.receiveAndConvert("tacocloud.order.queue",
new ParameterizedTypeReference<Order>() {});
}
๊ทธ๋ฐ๋ฐ ์ด๋ฌํ ํํ๋ก ๋ ํผ๋ฐ์ค๋ฅผ ์ฃผ๊ธฐ ์ํด์๋ ๊ธฐ๋ณธ์ผ๋ก ์ค์ ๋์ด์๋ SimpleMessageConverter๋ฅผ ์ฌ์ฉํ๋ฉด ์๋๊ณ SmartMessageConverter ์ธํฐํ์ด์ค๋ฅผ ๊ตฌํํ Jackson2JsonMessageConverter ๋ฑ์ ์ฌ์ฉํด์ผ ํ๋ค.
@Component
public class OrderListener {
private KitchenUI ui;
@Autowired
public OrderListener(KitchenUI ui) {
this.ui = ui;
}
@RabbitListener(queues = "tacocloud.order.queue")
public void receiveOrder(Order order) {
ui.displayOrder(order);
}
}
์ด๋ ๊ฒ @RabbitListener๋ฅผ ๊ตฌํํด๋๋ฉด ๋ฉ์ธ์ง๊ฐ ํ์ ๋์ฐฉํ๋ฉด ๋ฉ์๋๊ฐ ์๋ ํธ์ถ ๋๋ค. ์ด๋ ์์ฅ์ @JmsListener ์ ์ด๋ ธํ ์ด์ ๋ง ๋ค๋ฅด๊ณ ๋๊ฐ์ ํํ์ด๋ค! ๋ธ๋ก์ปค๊ฐ ๋ค๋ฅด๋๋ผ๋ ๋์ผํ ์ฝ๋๋ฅผ ์ธ ์ ์์ด์ ์ฐธ ์ข๋ค.
์์์์ ๋ณด์ด๋ ๊ฒ ์ฒ๋ผ RabbitMQ์ ActiveMQ(JMS)์ ์๋นํ ์ ์ฌํ๊ฒ ๋์ํ๋ค. ์ด ๋์ ๋ค๋ฅธ ๋ธ๋ก์ปค๋ฅผ ์ฌ์ฉํ๋ค๊ณ ํด๋ ์์ ๋ค๋ฅธ ๋ฉ์๋๋ ํ๋ก๊ทธ๋๋ฐ ๋ชจ๋ธ์ ๋์ ํ ํ์๊ฐ ์๋ค๋ ์ฅ์ ์ด ์๋ค.
JMS์ RabbitMQ๋ ์ฝ๊ฐ ๊ตฌํ์ ๊ธฐ์ ๋ชจ๋ธ์ด๋ค. ์ํ์น ์นดํ์นด๋ ๊ฐ์ฅ ์๋ก์ด ๋ฉ์ธ์ง ์์คํ ์ด๊ณ , RabbiMQ(ActiveMQ, Artemis)์ ์ ์ฌํ ๋ฉ์ธ์ง ๋ธ๋ก์ปค์ง๋ง ํน์ ์ ์ํคํ ์ฒ๋ฅผ ๊ฐ์ง๊ณ ์๋ค.
- ๋์ ํ์ฅ์ฑ์ ์ ๊ณตํ๋ ํด๋ก์คํฐ๋ก ์คํ๋๋๋ก ์ค๊ณ๋์ด์๋ค.
- ํด๋ฌ์คํฐ์ ๋ชจ๋ ์นดํ์นด ์ธ์คํด์ค์ ํ ํฝ์ ํํฐ์ ์ผ๋ก ๋ถํ ํ์ฌ ๋ฉ์ธ์ง๋ฅผ ๊ด๋ฆฌํ๋ค.
- ์นดํ์นด๋ ํ, ๊ฑฐ๋์๋ฅผ ์ฌ์ฉํ์ง ์๊ณ ์ค์ง ํ ํฝ๋ง ์ฌ์ฉํ๋ค.
- ์นดํ์นด ํ ํฝ์ ํด๋ฌ์คํฐ์ ๋ชจ๋ ๋ธ๋ก์ปค์ ๊ฑธ์ณ ๋ณต์ ๋๋ค.
- ํด๋ฌ์คํฐ์ ๊ฐ ๋ ธ๋๋ ํ๋ ์ด์์ ํ ํฝ์ ๋ํ ๋ฆฌ๋(leader)๋ก ๋์ํ๋ค. ํ๋ ์ญํ ๋ก๋ ํ ํฝ ๋ฐ์ดํฐ๋ฅผ ๊ด๋ฆฌํ๊ณ , ํด๋ฌ์คํฐ์ ๋ค๋ฅธ ๋ ธ๋๋ก ๋ฐ์ดํฐ๋ฅผ ๋ณต์ ํ๋ค.
- (์ฑ ์ ์๋ ๋ถ๋ถ) ์ฃผํคํผ๋ ๋ถ์ฐ ์ดํ๋ฆฌ์ผ์ด์ ์ ์ํ ์ฝ๋๋ค์ด์ ์์คํ ์ด๋ค.
- ๋ถ์ฐ๋์ด์๋ ๊ฐ ์ดํ๋ฆฌ์ผ์ด์ ์ ์ ๋ณด๋ฅผ ์ค์์ ์ง์คํ๊ณ ์์ , ๊ด๋ฆฌ, ๊ทธ๋ฃน ๊ด๋ฆฌ, ๋๊ธฐํ ๋ฑ์ ํ๋ค.
์ฌ๊ธฐ์๋ ์ํคํ ์ณ๋ณด๋ค๋ ์ค์ฌ์ฉ ์ธก๋ฉด์ ์ค์ ์ ๋๋ค!
์ํคํ ์ณ ์ฐธ๊ณ ์๋ฃ
Kafka Architecture - DZone Big Data
์คํ๋ง์์ ์นดํ์นด๋ฅผ ์ฌ์ฉํ๊ธฐ ์ํด์ ์์กด์ฑ์ ๋น๋์ ์ถ๊ฐํด์ฃผ์! ActiveMQ์ RabbitMQ์๋ ๋ค๋ฅด๊ฒ spring-boot-starter ์๋ฆฌ์ฆ์ ๋ค์ด๊ฐ์์ง ์๋ค.
ActiveMQ(JMSTemplate), RabbitMQ(RabbitTemplate)์ฒ๋ผ ๋น๋์ ์ถ๊ฐํ๋ฉด KafkaTemplate์ ๋ฉ์ธ์ง ์ ์ก/์์ ์ ์ฌ์ฉํ ์ ์๋ค.
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
์นดํ์นด๋ฅผ ์ฌ์ฉํ๊ธฐ ์ ์ ์ด์ํ๊ฒฝ์์ ์ฌ์ฉํ๊ฑฐ๋, ์ฌ์ฉ์ ํธ๋ฆฌํ๊ฒ ํด์ฃผ๋ ์นดํ์นด ์์ฑ์ properties์ ์ค์ ํด๋์.
**spring:
kafka:
bootstrap-servers:
- kafka.tacocloud.com:9092**
bootstrap-servers์๋ ์นด์ธ ์นด ํด๋ฌ์คํฐ ์ด๊ธฐ ์ฐ๊ฒฐ์ ์ฌ์ฉ๋๋ ํ๋ ์ด์์ ์นดํ์นด ์๋ฒ๋ค์ ์์น๋ฅผ ์ค์ ํ๋ค. ์ฌ๋ฌ๊ฐ์ง ์๋ฒ์์ ๋ฐ์์จ๋ค๋ฉด ์ค ์ง์ด์ ์จ์ฃผ๋ฉด ๋๋ค. bootstrap-servers๋ ๋ฆฌ์คํธ๋ฅผ ๋ฐ๋๋ค.
spring:
kafka:
bootstrap-servers:
- kafka.tacocloud.com:9092
- kafka.tacocloud.com:9093
- kafka.tacocloud.com:9094
์ด์ ์ฅ์์ ๋์๋ Template๋ค๊ณผ๋ ์ฌ์ฉํ๋ ๋ฉ์๋๊ฐ ์ด์ง ๋ค๋ฅด๋ค.
ListenableFuture<SendResult<K, V>> send(String topic, V data);
ListenableFuture<SendResult<K, V>> send(String topic, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);
ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);
ListenableFuture<SendResult<K, V>> send(Message<?> message);
ListenableFuture<SendResult<K, V>> sendDefault(V data);
ListenableFuture<SendResult<K, V>> sendDefault(K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);
- convetAndSend() ๋ฉ์๋๊ฐ ์๋ค! KafkaTemplate์ ์ ๋ค๋ฆญ ํ์ ์ ์ฌ์ฉํ๊ณ , ๋ฉ์ธ์ง๋ฅผ ์ ์กํ ๋ ์ง์ ๋๋ฉ์ธ ํ์ ์ ์ฒ๋ฆฌํ ์ ์๊ธฐ ๋๋ฌธ์ ๋ชจ๋ send() ๋ฉ์๋๊ฐ convertAndSend() ๊ธฐ๋ฅ์ ๊ฐ์ง๊ณ ์๋ค.
- send()์ sendDefault()์์ ๋ง์ ๋งค๊ฐ๋ณ์๋ค์ ๋ฐ๊ณ ์๋๋ฐ ์ด๋ ์นดํ์นด์์ ๋ฉ์ธ์ง๋ฅผ ์ ํ ๋ ๋ฉ์ธ์ง๊ฐ ์ ์ก๋๋ ๋ฐฉ๋ฒ์ ์๋ ค์ฃผ๋ ๋งค๊ฐ๋ณ์๋ฅผ ์ ๋ฌํ๊ธฐ ๋๋ฌธ์ด๋ค.
- ๋ฉ์ธ์ง๊ฐ ์ ์ก๋ ํ ํฝ
- ํ ํฝ ๋ฐ์ดํฐ๋ฅผ ์ฌ์ฉํ๋ ํํฐ์ (optional), ๋ ์ฝ๋ ์ ์ก ํค (optional), ํ์์คํฌํ (optional)
- ํ์ด๋ก๋
โํ ํฝ๊ณผ ํ์ด๋ก๋๋ ๊ฐ์ฅ ์ค์ํ ๋งค๊ฐ๋ณ์์ด๋ค. ํํฐ์ ๊ณผ ํค๋ send()์ sendDefault()์ ๋งค๊ฐ๋ณ์๋ก ์ ๊ณต๋๋ ์ถ๊ฐ ์ ๋ณด์ด๊ณ Template ์ฌ์ฉ์๋ ๊ฑฐ์ ์ํฅ์ ์ฃผ์ง ์๋๋ค.
โ ProducerRecord๋ฅผ ์ ์กํ๋ send() ๋ฉ์๋๋ ์๋๋ฐ ์ด๋ ๋ชจ๋ ์ ํํ๋ ๋งค๊ฐ๋ณ์๋ค์ ํ๋์ ๊ฐ์ฒด๋ก ๋ด์ ๊ฒ์ด๋ค.
โ Message ๊ฐ์ฒด๋ฅผ ์ ์กํ๋ send()๋ ์๋๋ฐ ์ด ๊ฒฝ์ฐ๋ ์์ ์ฒ๋ผ ์ฐ๋ฆฌ ๋๋ฉ์ธ ๊ฐ์ฒด๋ฅผ message ๊ฐ์ฒด๋ก ๋ณํํด์ผ ํ๋ค.
โ ๊ฐ์ฒด๋ฅผ ์ถ๊ฐ๋ก ๋ณํํด์ผ ํ๋ ProducerRecord์ Message๋ณด๋ค ๋ค๋ฅธ ๊ฒ์ ์ฌ์ฉํ๋๊ฒ ์ข๋ค.
@Service
public class KafkaOrderMessagingService implements OrderMessagingService {
private KafkaTemplate<String, Order> kafkaTemplate;
@Autowired
public KafkaOrderMessagingService(KafkaTemplate<String, Order> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
@Override
public void sendOrder(Order order) {
kafkaTemplate.send("tacocloud.orders.topic", order);
}
// ๊ธฐ๋ณธ์ผ๋ก ์ค์ ๋ ํ ํฝ์ผ๋ก ๋ณด๋ด๊ธฐ ๋๋ฌธ์ ๋ฐ๋ก ๋ช
์๋ฅผ ์ ํ๋ค.
@Override
public void sendOrder(Order order) {
kafkaTemplate.sendDefault(order);
}
}
โ tacocloud.orders.topic ์ด๋ผ๋ ์ด๋ฆ์ ํ ํฝ์ผ๋ก order ๊ฐ์ ๊ฐ ์ ์ก๋์๋ค.
โ 2๋ฒ์งธ sendOrder์ฒ๋ผ ๊ธฐ๋ณธ ํ ํฝ์ ์ฌ์ฉํ๋ ค๋ฉด properties์ ์ค์ ํด๋๋ฉด ๋๋ค.
spring:
kafka:
template:
default-topic: tacocloud.orders.topic
์ด์ ์ JmsTemplate์ด๋ RabbitTemplate์ receive์ receiveAndConvert๋ฑ ๋ฉ์ธ์ง๋ฅผ ์์ ํด์ฃผ๋ ์ฝ๋๋ฅผ ์ง์ํด์คฌ์ง๋ง Kafka๋ ์๋ฌด๋ฐ ์์ ์ฝ๋๋ฅผ ์ ๊ณตํด์ฃผ๊ณ ์์ง ์๋ค. ๋ฉ์ธ์ง ์์ ์ ์ ์ผํ ๋ฐฉ์์ @KafkaListener ๋ก ๋ฆฌ์ค๋๋ฅผ ์์ฑํด์ผ ํ๋ค. ๋ฉ์ธ์ง๊ฐ ์ค๋ฉด ์๋์ผ๋ก ๋ฉ์๋๋ฅผ ํธ์ถํ๋ ํธ์ ๋ฐฉ์์ผ๋ก๋ง ๋ฉ์ธ์ง๋ฅผ ๋ฐ์ ์ ์๋ ๊ฒ์ด๋ค.
@Component
public class OrderListener {
private KitchenUI ui;
@Autowired
public OrderListener(KitchenUI ui) {
this.ui = ui;
}
@KafkaListener(topics="tacocloud.orders.topic")
public void handle(Order order) {
ui.displayOrder(order);
}
}
์ด ์์ ์์๋ ํ์ด๋ก๋์ธ Order๋ง ๋ฐ๊ณ ์์ง๋ง ํ์ํ๋ค๋ฉด ConsumerRecord๋ Message ๊ฐ์ฒด๋ฅผ ์ธ์๋ก ๋ฐ์ ์ ์๋ค. ๋ฐ์ ์์ ๋ ๋ฐ์ ๋ฉ์ธ์ง๊ฐ ์๋ ํํฐ์ ๊ณผ ํ์์คํฌํ๋ฅผ ๋ก๊ทธ์ ์ฐ๊ธฐ ์ํด ConsumerRecord๋ฅผ ๋๊ฒผ๋ค.
@KafkaListener(topics="tacocloud.orders.topic")
public void handle(Order order, ConsumerRecord<Order> record) {
log.info("Received from partition {} with timestamp {}",
record.partition(), record.timestamp());
ui.displayOrder(order);
}
CosumerRecord๋ง๊ณ Message ๊ฐ์ฒด๋ฅผ ๋๊ฒจ๋ ๊ฐ์ ์ผ์ ํ ์ ์๋ค.
@KafkaListener(topics="tacocloud.orders.topic")
public void handle(Order order, Message<Order> message) {
MessageHeaders headers = message.getHeaders();
log.info("Received from partition {} with timestamp {}",
headers.get(KafkaHeaders.RECEIVED_PARTITION_ID)
headers.get(KafkaHeaders.RECEIVED_TIMESTAMP));
ui.displayOrder(order);
}
์ ๋ ์์ ๋ handle์ ๋งค๊ฐ๋ณ์๋ก ์ง์ Order๋ฅผ ์์ฒญํ์ง๋ง ConsumerRecord์ Messge ๊ฐ์ฒด๋ฅผ ๋๊ฒจ์คฌ๋ค๋ฉด ConsumerRecord.value() ์ Message.getPayload() ๋ฅผ ์ฌ์ฉํด๋ ๋ฐ์ ์ ์๋ค.
Use Redis:
- If you want fire and forget kind of system, where all the messages that you produce are delivered instantly to consumers.
- If speed is most concerned.
- If you can live up with data loss.
- If you don't want your system to hold the message that has been sent.
- Amount of data that is gonna be dealt is not huge.
Use kafka:
- If you want reliability.
- If you want your system to have a copy of messages that has been sent even after consumption.
- If you can't live up with data loss.
- If Speed is not a big concern.
- data size is huge