chapter9 incheol - JAVA-JIKIMI/SPRING-IN-ACTION-5 GitHub Wiki
์คํ๋ง ํตํฉ์ <Enterprise Integration Patterns 2003>์์ ๋ณด์ฌ์ค ๋๋ถ๋ถ์ ํตํฉ ํจํด์ ์ฌ์ฉํ ์ ์๊ฒ ๊ตฌํํ ๊ฒ์ด๋ค. ๊ฐ ํตํฉ ํจํด์ ํ๋์ ์ปดํฌ๋ํธ๋ก ๊ตฌํ๋๋ฉฐ, ์ด๊ฒ์ ํตํด์ ํ์ดํ๋ผ์ธ์ผ๋ก ๋ฉ์์ง๊ฐ ๋ฐ์ดํฐ๋ฅผ ์ด๋ฐํ๋ค. ์คํ๋ง ๊ตฌ์ฑ์ ์ฌ์ฉํ๋ฉด ๋ฐ์ดํฐ๊ฐ ์ด๋ํ๋ ํ์ดํ๋ผ์ธ์ผ๋ก ์ด๋ฐ ์ปดํฌ๋ํธ๋ค์ ์กฐ๋ฆฝํ ์ ์๋ค.
์ ํ๋ฆฌ์ผ์ด์ ์ด ํตํฉํ ์ ์๋ ๊ทธ๋ฐ ๋ฆฌ์์ค ์ค ํ๋๊ฐ ํ์ผ ์์คํ ์ด๋ค. ์ด์ ๋ฐ๋ผ ์คํ๋ง ํตํฉ์ ๋ง์ ์ปดํฌ๋ํธ ์ค์ ํ์ผ์ ์ฝ๊ฑฐ๋ ์ฐ๋ ์ฑ๋ ์ด๋ํฐ๊ฐ ์๋ค.
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-file</artifactId>
</dependency>
์ฒซ ๋ฒ์งธ ์์กด์ฑ์ ์คํ๋ง ํตํฉ์ ์คํ๋ง ๋ถํธ ์คํํฐ์ด๋ค. ํตํฉํ๋ ค๋ ํ๋ก์ฐ์ ๋ฌด๊ดํ๊ฒ ์ด ์์กด์ฑ์ ์คํ๋ง ํตํฉ ํ๋ก์ฐ์ ๊ฐ๋ฐ ์์ ๋ฐ๋์ ์ถ๊ฐํด์ผ ํ๋ค.
๋ ๋ฒ์งธ ์์กด์ฑ์ ์คํ๋ง ํตํฉ์ ํ์ผ ์๋ํฌ์ธํธ(endpoint) ๋ชจ๋์ด๋ค. ์ด ๋ชจ๋์ ์ธ๋ถ ์์คํ ํตํฉ์ ์ฌ์ฉ๋๋ 24๊ฐ ์ด์์ ์๋ํฌ์ธํธ ๋ชจ๋ ์ค ํ๋๋ค.
import org.springframework.integration.file.FileHeaders;
import org.springframework.messaging.handler.annotation.Header;
@MessagingGateway(defaultRequestChannel="textInChannel")
public interface FileWriterGateway {
void writeToFile(
@Header(FileHeaders.FILENAME) String filename,
String data
);
}
- @MessagingGateway : FileWriterGateway ์ธํฐํ์ด์ค์ ๊ตฌํ์ฒด(ํด๋์ค)๋ฅผ ๋ฐํ์ ์์ ์์ฑํ๋ผ๊ณ ์คํ๋ง ํตํฉ์ ์๋ ค์ค๋ค.
- defaultRequestChannel : ํด๋น ์ธํฐํ์ด์ค์ ๋ฉ์๋ ํธ์ถ๋ก ์์ฑ๋ ๋ฉ์์ง๊ฐ ์ด ์์ฑ์ ์ง์ ๋ ๋ฉ์์ง ์ฑ๋๋ก ์ ์ก๋๋ค๋ ๊ฒ์ ๋ํ๋ธ๋ค.
- writeToFile()์ ํธ์ถ๋ก ์๊ธด ๋ฉ์์ง๊ฐ textInChannel์ด๋ผ๋ ์ด๋ฆ์ ์ฑ๋๋ก ์ ์ก๋๋ค.
- @Header : filename์ ์ ๋ฌ๋๋ ๊ฐ์ด ๋ฉ์์ง ํ์ด๋ก๋๊ฐ ์๋ ๋ฉ์์ง ํค๋์ ์๋ค๋ ๊ฒ์ ๋ํ๋ธ๋ค.
@Configuration
public class FileWriterIntegrationConfig {
@Bean
@Transformer(inputChannel="textInChannel", // Declares a transformer
outputChannel="fileWriterChannel")
public GenericTransformer<String, String> upperCaseTransformer() {
return text -> text.toUpperCase(); }
@Bean
@ServiceActivator(inputChannel="fileWriterChannel")
public FileWritingMessageHandler fileWriter() { // Declares a file writer
FileWritingMessageHandler handler =
new FileWritingMessageHandler(new File("/tmp/sia5/files"));
handler.setExpectReply(false);
handler.setFileExistsMode(FileExistsMode.APPEND);
handler.setAppendNewLine(true);
return handler;
}
}
- ์๋ฐ ๊ตฌ์ฑ์์๋ ๋ ๊ฐ์ ๋น์ ์ ์ํ๋ค.
- ๋ณํ๊ธฐ(GenericTransformer) : ์ ๋ฌ ๋ฐ์ ๋ฌธ์๋ฅผ ๋๋ฌธ์๋ก ๋ณํํ๋ค.
- ํ์ผ-์ฐ๊ธฐ ๋ฉ์์ง ํธ๋ค๋ฌ(FileWritingMessageHandler)
- fileWriterChannel๋ก๋ถํฐ ๋ฉ์์ง๋ฅผ ๋ฐ์์ FileWritingMessageHandler์ ์ธ์คํด์ค๋ก ์ ์๋ ์๋น์ค์ ๋๊ฒจ์ค๋ค.
- handler.setExpectReply(false)๋ ์๋น์ค์์ ์๋ต ์ฑ๋(ํ๋ก์ฐ์ ์ ์คํธ๋ฆผ ์ปดํฌ๋ํธ๋ก ๊ฐ์ด ๋ฐํ๋ ์ ์๋ ์ฑ๋)์ ์ฌ์ฉํ์ง ์์์ ๋ํ๋ธ๋ค. ํด๋น ๋ฉ์๋๋ฅผ ํธ์ถํ์ง ์์๋ค๋ฉด ํตํฉ ํ๋ก์ฐ๊ฐ ์ ์์ ์ผ๋ก ์๋ํ๋๋ผ๋ ์๋ต ์ฑ๋์ด ๊ตฌ์ฑ๋์ง ์์๋ค๋ ๋ก๊ทธ ๋ฉ์์ง๋ค์ด ๋ํ๋๋ค.
ํตํฉ ํ๋ก์ฐ์ ๊ฐ ์ปดํฌ๋ํธ๋ฅผ ๋ณ๋์ ๋น์ผ๋ก ์ ์ธํ์ง ์๊ณ ์ ์ฒด ํ๋ก์ฐ๋ฅผ ํ๋์ ๋น์ผ๋ก ์ ์ธํ๋ค.
@Configuration
public class FileWriterIntegrationConfig {
@Bean
public IntegrationFlow fileWriterFlow() {
return IntegrationFlows.from(MessageChannels.direct("textInChannel"))
.<String, String>transform(t -> t.toUpperCase())
.handle(Files
.outboundAdapter(new File("/tmp/sia5/files"))
.fileExistsMode(FileExistsMode.APPEND)
.appendNewLine(true))
.get();
}
}
- IntegrationFlows ํด๋์ค๋ ํ๋ก์ฐ๋ฅผ ์ ์ธํ ์ ์๋ ๋น๋ API๋ฅผ ์์์ํจ๋ค.
- channel() ๋ฉ์๋๋ฅผ ํธ์ถํ์ฌ ํด๋น ์ฑ๋์ ์ด๋ฆ์ผ๋ก ์ฐธ์กฐํ ์ ์๋ค.
- ์ฝ๋๋ ๋ง์ด ์ค์์ง๋ง ์ฝ๋์ ๊ฐ๋ ์ฑ์ ๋์ด๊ธฐ ์ํด ๋ค์ฌ์ฐ๊ธฐ๋ฅผ ์ ํด์ผ ํ๋ค.
์คํ๋ง ํตํฉ์ ๋ค์์ ํตํฉ ์๋๋ฆฌ์ค๋ฅผ ๊ฐ๋ ๋ง์ ์์ญ์ ํฌํจํ๋ค. ๋ฐ๋ผ์ ๊ทธ ๋ชจ๋ ๊ฒ์ ํ๋์ ์ฑํฐ์ ํฌํจ์ํค๋ ค๊ณ ํ๋ ๊ฒ์ ๋ง์น ์ฝ๋ผ๋ฆฌ๋ฅผ ๋ดํฌ์ ๋ง์ถฐ ๋ฃ์ผ๋ ค๊ณ ํ๋ ๊ฒ๊ณผ ๊ฐ๋ค. ํตํฉ ํ๋ก์ฐ๋ ํ๋ ์ด์์ ์ปดํฌ๋ํธ๋ก ๊ตฌ์ฑ๋๋ฉฐ ๋ค์๊ณผ ๊ฐ๋ค.
- ์ฑ๋(Channel) : ํ ์์๋ก๋ถํฐ ๋ค๋ฅธ ์์๋ก ๋ฉ์์ง๋ฅผ ์ ๋ฌํ๋ค.
- ํํฐ(Filter) : ์กฐ๊ฑด์ ๋ง๋ ๋ฉ์์ง๊ฐ ํ๋ก์ฐ๋ฅผ ํต๊ณผํ๊ฒ ํด์ค๋ค.
- ๋ณํ๊ธฐ(Transaformer) : ๋ฉ์์ง ๊ฐ์ ๋ณ๊ฒฝํ๊ฑฐ๋ ๋ฉ์์ง ํ์ด๋ก๋์ ํ์ ์ ๋ค๋ฅธ ํ์ ์ผ๋ก ๋ณํํ๋ค.
- ๋ผ์ฐํฐ(Router) : ์ฌ๋ฌ ์ฑ๋ ์ค ํ๋๋ก ๋ฉ์์ง๋ฅผ ์ ๋ฌํ๋ฉฐ, ๋๊ฐ ๋ฉ์์ง ํค๋๋ฅผ ๊ธฐ๋ฐ์ผ๋ก ํ๋ค.
- ๋ถ๋ฐฐ๊ธฐ(Splitter) : ๋ค์ด์ค๋ ๋ฉ์์ง๋ฅผ ๋ ๊ฐ ์ด์์ ๋ฉ์์ง๋ก ๋ถํ ํ๋ฉฐ, ๋ถํ ๋ ๊ฐ ๋ฉ์์ง๋ ๋ค๋ฅธ ์ฑ๋๋ก ์ ์ก๋๋ค.
- ์ง์ ๊ธฐ(Aggregator) : ๋ถ๋ฐฐ๊ธฐ์ ์๋ฐ๋ ๊ฒ์ผ๋ก ๋ณ๊ฐ์ ์ฑ๋๋ก๋ถํฐ ์ ๋ฌ๋๋ ๋ค์์ ๋ฉ์์ง๋ฅผ ํ๋์ ๋ฉ์์ง๋ก ๊ฒฐํฉํ๋ค.
- ์๋น์ค ์กํฐ๋ฒ ์ดํฐ(Service activator) : ๋ฉ์์ง๋ฅผ ์ฒ๋ฆฌํ๋๋ก ์๋ฐ ๋ฉ์๋์ ๋ฉ์์ง๋ฅผ ๋๊ฒจ์ค ํ ๋ฉ์๋์ ๋ฐํ๊ฐ์ ์ถ๋ ฅ ์ฑ๋๋ก ์ ์กํ๋ค.
- ์ฑ๋ ์ด๋ํฐ(Channel adapter) : ์ธ๋ถ ์์คํ ์ ์ฑ๋์ ์ฐ๊ฒฐํ๋ค. ์ธ๋ถ ์์คํ ์ผ๋ก๋ถํฐ ์ ๋ ฅ์ ๋ฐ๊ฑฐ๋ ์ธ ์ ์๋ค.
- ๊ฒ์ดํธ์จ์ด(Gateway) : ์ธํฐํ์ด์ค๋ฅผ ํตํด ํตํฉ ํ๋ก์ฐ๋ก ๋ฐ์ดํฐ๋ฅผ ์ ๋ฌํ๋ค.
๋ฉ์์ง ์ฑ๋์ ํตํฉ ํ๋ฆฌํ๋ผ์ธ์ ํตํด์ ๋ฉ์์ง๊ฐ ์ด๋ํ๋ ์๋จ์ด๋ค. ์ฆ, ์ฑ๋์ ์คํ๋ง ํตํฉ์ ๋ค๋ฅธ ๋ถ๋ถ์ ์ฐ๊ฒฐํ๋ ํต๋ก๋ค. ์คํ๋ง ํตํฉ์ ๋ค์์ ํฌํจํด์ ์ฌ๋ฌ ์ฑ๋ ๊ตฌํ์ฒด(ํด๋์ค)๋ฅผ ์ ๊ณตํ๋ค.
- PublishSubscribeChannel : ์ ์ก๋๋ ๋ฉ์์ง๋ ํ๋ ์ด์์ ์ปจ์๋จธ๋ก ์ ๋ฌ๋๋ค. ์ปจ์๋จธ๊ฐ ์ฌ๋ฟ์ผ ๋๋ ๋ชจ๋ ์ปจ์๋จธ๊ฐ ํด๋น ๋ฉ์์ง๋ฅผ ์์ ํ๋ค.
- QueueChannel : ์ ์ก๋๋ ๋ฉ์์ง๋ FIFO ๋ฐฉ์์ผ๋ก ์ปจ์๋จธ๊ฐ ๊ฐ์ ธ๊ฐ ๋๊น์ง ํ์ ์ ์ฅ๋๋ค. ์ปจ์๋จธ๊ฐ ์ฌ๋ฟ์ผ ๋๋ ๊ทธ์ค ํ๋์ ์ปจ์๋จธ๋ง ํด๋น ๋ฉ์์ง๋ฅผ ์์ ํ๋ค.
- PriorityChannel : QueueChannel๊ณผ ์ ์ฌํ์ง๋ง, FIFO ๋ฐฉ์ ๋์ ๋ฉ์์ง์ priority ํค๋๋ฅผ ๊ธฐ๋ฐ์ผ๋ก ์ปจ์๋จธ๊ฐ ๋ฉ์์ง๋ฅผ ๊ฐ์ ธ๊ฐ๋ค.
- RendezvousChannel : QueueChannel๊ณผ ์ ์ฌํ์ง๋ง, ์ ์ก์์ ๋์ผํ ์ค๋ ๋๋ก ์คํ๋๋ ์ปจ์๋จธ๋ฅผ ํธ์ถํ์ฌ ๋จ์ผ ์ปจ์๋จธ์๊ฒ ๋ฉ์์ง๋ฅผ ์ ์กํ๋ค. ์ด ์ฑ๋์ ํธ๋์ญ์ ์ ์ง์ํ๋ค.
- ExecutorChannel : DirectChannel๊ณผ ์ ์ฌํ์ง๋ง, TaskExecutor๋ฅผ ํตํด์ ๋ฉ์์ง๊ฐ ์ ์ก๋๋ค. (์ ์ก์์ ๋ค๋ฅธ ์ค๋ ๋์์ ์ฒ๋ฆฌ๋๋ค) ์ด ์ฑ๋ ํ์ ์ ํธ๋์ญ์ ์ ์ง์ํ์ง ์๋๋ค.
- FluxMessageChannel : ํ๋ก์ ํธ ๋ฆฌ์กํฐ(Product Reactor)์ ํ๋ญ์ค(Flux)๋ฅผ ๊ธฐ๋ฐ์ผ๋ก ํ๋ ๋ฆฌ์กํฐ๋ธ ์คํธ๋ฆผ์ฆ ํผ๋ธ๋ฆฌ์ (Reactive Streams Publisher) ์ฑ๋์ด๋ค.
// option1. PublishSubscribeChannel
@Bean
public MessageChannel orderChannel() {
return new PublishSubscribeChannel();
}
// option2. QueueChannel
@Bean
public MessageChannel orderChannel() {
return new QueueChannel();
}
@Bean
public IntegrationFlow orderFlow() {
return IntegrationFlows
...
.channel("orderChannel")
...
.get();
}
ํํฐ๋ ํตํฉ ํ์ดํ๋ผ์ธ์ ์ค๊ฐ์ ์์นํ ์ ์์ผ๋ฉฐ, ํ๋ก์ฐ์ ์ ๋จ๊ณ๋ก๋ถํฐ ๋ค์ ๋จ๊ณ๋ก์ ๋ฉ์์ง ์ ๋ฌ์ ํ์ฉ ๋๋ ๋ถํํ๋ค.
// option1. ์ด๋
ธํ
์ด์
๊ธฐ๋ฐ ํํฐ ์ค์
@Filter(inputChannel="numberChannel",
outputChannel="evenNumberChannel")
public boolean evenNumberFilter(Integer number) {
return number % 2 == 0;
}
// option2. ์๋ฐ DSL ๊ตฌ์ฑ ํํฐ ์ค์
@Bean
public IntegrationFlow evenNumberFlow(AtomicInteger integerSource) {
return IntegrationFlows
...
.<Integer>filter((p) -> p % 2 == 0)
...
.get();
}
๋ณํ๊ธฐ๋ ๋ฉ์์ง ๊ฐ์ ๋ณ๊ฒฝ์ด๋ ํ์ ์ ๋ณํํ๋ ์ผ์ ์ํํ๋ค.
// option1. ์ด๋
ธํ
์ด์
๊ธฐ๋ฐ ๋ณํ๊ธฐ ์ค์
@Bean
@Transformer(inputChannel="numberChannel",
outputChannel="romanNumberChannel")
public GenericTransformer<Integer, String> romanNumTransformer() {
return RomanNumbers::toRoman;
}
// option2. ์๋ฐ DSL ๊ตฌ์ฑ ๋ณํ๊ธฐ ์ค์
@Bean
public IntegrationFlow transformerFlow() {
return IntegrationFlows ...
.transform(RomanNumbers::toRoman)
...
.get();
}
// option3. ์ด๋
ธํ
์ด์
+ ์๋ฐ DSL ๊ตฌ์ฑ ์กฐํฉ
@Bean
public RomanNumberTransformer romanNumberTransformer() {
return new RomanNumberTransformer();
}
@Bean
public IntegrationFlow transformerFlow(
RomanNumberTransformer romanNumberTransformer) {
return IntegrationFlows
...
.transform(romanNumberTransformer)
...
.get();
}
๋ผ์ฐํฐ๋ ์ ๋ฌ ์กฐ๊ฑด์ ๊ธฐ๋ฐ์ผ๋ก ํตํฉ ํ๋ก์ฐ ๋ด๋ถ๋ฅผ ๋ถ๊ธฐ(์๋ก ๋ค๋ฅธ ์ฑ๋๋ก ๋ฉ์์ง๋ฅผ ์ ๋ฌ)ํ๋ค.
์๋ฅผ ๋ค์ด, ์ ์๊ฐ์ ์ ๋ฌํ๋ numberChannel์ด๋ผ๋ ์ด๋ฆ์ ์ฑ๋์ด ์๋ค๊ณ ํ์. ๊ทธ๋ฆฌ๊ณ ๋ชจ๋ ์ง์ ๋ฉ์์ง๋ฅผ evenChannel์ด๋ผ๋ ์ด๋ฆ์ ์ฑ๋๋ก ์ ๋ฌํ๊ณ , ํ์ ๋ฉ์์ง๋ oddChannel์ด๋ผ๋ ์ด๋ฆ์ ์ฑ๋๋ก ์ ๋ฌํ๋ค๊ณ ๊ฐ์ ํด๋ณด์.
// option1. ์ด๋
ธํ
์ด์
๊ธฐ๋ฐ ๋ผ์ฐํฐ ์ค์
@Bean
@Router(inputChannel="numberChannel")
public AbstractMessageRouter evenOddRouter() {
return new AbstractMessageRouter() {
@Override
protected Collection<MessageChannel>
determineTargetChannels(Message<?> message) { Integer number = (Integer) message.getPayload();
if (number % 2 == 0) {
return Collections.singleton(evenChannel()); }
return Collections.singleton(oddChannel()); }
};
}
@Bean
public MessageChannel evenChannel() {
return new DirectChannel();
}
@Bean
public MessageChannel oddChannel() {
return new DirectChannel();
}
์ฌ๊ธฐ์ ์ ์ธํ AbstractMessageRouter ๋น์ numberChannel์ด๋ผ๋ ์ด๋ฆ์ ์ ๋ ฅ ์ฑ๋๋ก๋ถํฐ ๋ฉ์์ง๋ฅผ ๋ฐ๋๋ค. ๊ทธ๋ฆฌ๊ณ ์ด ๋น์ ๊ตฌํํ ์ต๋ช ์ ๋ด๋ถ ํด๋์ค์์๋ ๋ฉ์์ง ํ์ด๋ก๋๋ฅผ ๊ฒ์ฌํ์ฌ ์ง์์ผ ๋๋ evenChannel์ด๋ผ๋ ์ด๋ฆ์ ์ฑ๋์ ๋ฐํํ๋ค.
// option2. ์๋ฐ DSL ๊ตฌ์ฑ ๋ผ์ฐํฐ ์ค์
@Bean
public IntegrationFlow numberRoutingFlow(AtomicInteger source) {
return IntegrationFlows
...
.<Integer, String>route(n -> n%2==0 ? "EVEN":"ODD", mapping -> mapping
.subFlowMapping("EVEN",
sf -> sf.<Integer, Integer>transform(n -> n * 10)
.handle((i,h) -> { ... })
)
.subFlowMapping("ODD", sf -> sf
.transform(RomanNumbers::toRoman)
.handle((i,h) -> { ... })
)
.get();
}
๋๋ก๋ ํตํฉ ํ๋ก์ฐ์์ ํ๋์ ๋ฉ์์ง๋ฅผ ์ฌ๋ฌ ๊ฐ๋ก ๋ถํ ํ์ฌ ๋ ๋ฆฝ์ ์ผ๋ก ์ฒ๋ฆฌํ๋ ๊ฒ์ด ์ ์ฉํ ์ ์๋ค.
- ๋ฉ์์ง ํ์ด๋ก๋๊ฐ ๊ฐ์ ํ์ ์ ์ปฌ๋ ์ ํญ๋ชฉ๋ค์ ํฌํจํ๋ฉฐ, ๊ฐ ๋ฉ์์ง ํ์ด๋ก๋ ๋ณ๋ก ์ฒ๋ฆฌํ๊ณ ์ ํ ๋๋ค
- ์ฐ๊ด๋ ์ ๋ณด๋ฅผ ํจ๊ป ์ ๋ฌํ๋ ํ๋์ ๋ฉ์์ง ํ์ด๋ก๋๋ ๋ ๊ฐ ์ด์์ ์๋ก ๋ค๋ฅธ ํ์ ๋ฉ์์ง๋ก ๋ถํ ๋ ์ ์๋ค.
์๋ฅผ ๋ค์ด, ์ฃผ๋ฌธ ๋ฐ์ดํฐ๋ฅผ ์ ๋ฌํ๋ ๋ฉ์์ง๋ ๋๊ธ ์ฒญ๊ตฌ ์ ๋ณด์ ์ฃผ๋ฌธ ํญ๋ชฉ ๋ฆฌ์คํธ์ ๋ ๊ฐ์ง ๋ฉ์์ง๋ก ๋ถํ ํ ์ ์๋ค.
public class OrderSplitter {
public Collection<Object> splitOrderIntoParts(PurchaseOrder po) {
ArrayList<Object> parts = new ArrayList<>();
parts.add(po.getBillingInfo());
parts.add(po.getLineItems());
return parts;
}
}
๊ทธ ๋ค์์ @Splitter ์ ๋ ธํ ์ด์ ์ ์ง์ ํ์ฌ ํตํฉ ํ๋ก์ฐ์ ์ผ๋ถ๋ก OrderSplitter ๋น์ ์ ์ธํ ์ ์๋ค.
@Bean
@Splitter(inputChannel="poChannel",
outputChannel="splitOrderChannel")
public OrderSplitter orderSplitter() {
return new OrderSplitter();
}
ํ๋ก์ฐ์ ์ด ์ง์ ์์ PayloadTypeRouter๋ฅผ ์ ์ธํ์ฌ ๋๊ธ ์ฒญ๊ตฌ ์ ๋ณด์ ์ฃผ๋ฌธ ํญ๋ชฉ ์ ๋ณด๋ฅผ ๊ฐ ์ ๋ณด์ ์ ํฉํ ํ์ ํ๋ก์ฐ๋ก ์ ๋ฌํ ์ ์๋ค.
@Bean
@Router(inputChannel = "splitOrderChannel")
public MessageRouter splitOrderRouter() {
PayloadTypeRouter router = new PayloadTypeRouter();
router.setChannelMapping(BillingInfo.class.getName(), "billingInfoChannel");
router.setChannelMapping(List.class.getName(), "lineItemsChannel");
return router;
}
BillingInfo ํ์ ์ ํ์ด๋ก๋๋ billingInfoChannel๋ก ์ ๋ฌ๋์ด ์ฒ๋ฆฌ๋๋ฉฐ, java.util.List ์ปฌ๋ ์ ์ ์ ์ฅ๋ ์ฃผ๋ฌธ ํญ๋ชฉ๋ค์ List ํ์ ์ผ๋ก lineItemsChannel์ ์ ๋ฌ๋๋ค.
์ด๋๋ List์ ๋ค์์ ๋ฉ์์ง๋ก ๋ถํ ํ๊ธฐ ์ํด @Splitter ์ ๋ ธํ ์ด์ ์ ์ง์ ํ ๋ฉ์๋(๋น์ด ์๋)๋ฅผ ์์ฑํ๊ณ ์ด ๋ฉ์๋์์๋ ์ฒ๋ฆฌ๋ LineItem์ด ์ ์ฅ๋ ์ปฌ๋ ์ ์ ๋ฐํํ๋ฉด ๋๋ค.
@Splitter(inputChannel="lineItemsChannel", outputChannel="lineItemChannel")
public List<LineItem> lineItemSplitter(List<LineItem> lineItems) {
return lineItems;
}
์ด ๊ฒฝ์ฐ List ํ์ด๋ก๋๋ฅผ ๊ฐ๋ ๋ฉ์์ง๊ฐ limeItemsChannel์ ๋์ฐฉํ๋ฉด ์ด ๋ฉ์์ง๋ lineItemSplitter() ๋ฉ์๋ ์ธ์๋ก ์ ๋ฌ๋๋ค. ๊ทธ๋ฆฌ๊ณ ์ด ๋ฉ์๋๋ ๋ถํ ๋ LineItem๋ค์ด ์ ์ฅ๋ ์ปฌ๋ ์ ์ ๋ฐํํ๋๋ฐ, ์ฌ๊ธฐ์๋ ์ด๋ฏธ LineItem๋ค์ด ์ ์ฅ๋ ์ปฌ๋ ์ ์ ๊ฐ๊ณ ์์ผ๋ฏ๋ก ์ด๊ฒ์ ๋ฐ๋ก ๋ฐํํ๋ค. ์ด์ ๋ฐ๋ผ ์ปฌ๋ ์ ์ ์ ์ฅ๋ ๊ฐ LineItem์ lineItemChannel๋ก ์ ๋ฌ๋๋ค.
return IntegrationFlows...
.split(orderSplitter()).<Object, String> route(
p->{
if(p.getClass().isAssignableFrom(BillingInfo.class)){
return"BILLING_INFO";
}else{
return"LINE_ITEMS";
}
}, mapping->mapping
.subFlowMapping("BILLING_INFO",
sf->sf.<BillingInfo> handle((billingInfo,h)->{
...
}))
.subFlowMapping("LINE_ITEMS",
sf->sf.split()
.<LineItem> handle((lineItem,h)->{
...
}))
)
.get();
์๋น์ค ์กํฐ๋ฒ ์ดํฐ๋ ์ ๋ ฅ ์ฑ๋๋ก๋ถํฐ ๋ฉ์์ง๋ฅผ ์์ ํ๊ณ ์ด ๋ฉ์์ง๋ฅผ MessageHandler ์ธํฐํ์ด์ค๋ฅผ ๊ตฌํํ ํด๋์ค(๋น)์ ์ ๋ฌํ๋ค.
@Bean
@ServiceActivator(inputChannel="someChannel")
public MessageHandler sysoutHandler() {
return message -> {
System.out.println("Message payload: " + message.getPayload());
};
}
๋๋ ๋ฐ์ ๋ฉ์์ง์ ๋ฐ์ดํฐ๋ฅผ ์ฒ๋ฆฌํ ํ ์๋ก์ด ํ์ด๋ก๋๋ฅผ ๋ฐํํ๋ ์๋น์ค ์งํฐ๋ฒ ์ดํฐ๋ฅผ ์ ์ธํ ์๋ ์๋ค. ์ด ๊ฒฝ์ฐ ์ด ๋น์ MessageHandler๊ฐ ์๋ GenericHandler๋ฅผ ๊ตฌํํ ๊ฒ์ด์ด์ผ ํ๋ค.
@Bean
@ServiceActivator(inputChannel="orderChannel",
outputChannel="completeOrder")
public GenericHandler<Order> orderHandler(OrderRepository orderRepo) {
return (payload, headers) -> {
return orderRepo.save(payload);
};
}
์ด๋ฒ์๋ ์๋ฐ DSL ๊ตฌ์ฑ์ผ๋ก ๋ณ๊ฒฝํด๋ณด์.
public IntegrationFlow someFlow() {
return IntegrationFlows
...
.handle(msg -> {
System.out.println("Message payload: " + msg.getPayload());
})
.get();
}
์ฌ๊ธฐ์๋ handle() ๋ฉ์๋์ ์ธ์๋ก ์ ๋ฌ๋๋ MessageHandler๋ก ๋๋ค๋ฅผ ์ฌ์ฉํ์๋ค. ๊ทธ๋ฌ๋ ๋ฉ์๋ ์ฐธ์กฐ ๋๋ MessageHandler ์ธํฐํ์ด์ค๋ฅผ ๊ตฌํํ๋ ํด๋์ค ์ธ์คํด์ค๊น์ง๋ handler() ๋ฉ์๋์ ์ธ์๋ก ์ ๊ณตํ ์ ์๋ค.
๋ง์ผ ์๋น์ค ์กํฐ๋ฒ ์ดํฐ๋ฅผ ํ๋ก์ฐ์ ์ ์ผ ๋์ ๋์ง ์๋๋ค๋ฉด MessageHandler์ ๊ฒฝ์ฐ์ ์ ์ฌํ๊ฒ handle() ๋ฉ์๋์์ GenericHandler๋ฅผ ์ธ์๋ก ๋ฐ์ ์๋ ์๋ค.
public IntegrationFlow orderFlow(OrderRepository orderRepo) {
return IntegrationFlows
...
.<Order>handle((payload, headers) -> {
return orderRepo.save(payload);
})
...
.get();
}
๊ฒ์ดํธ์จ์ด๋ ์ ํ๋ฆฌ์ผ์ด์ ์ด ํตํฉ ํ๋ก์ฐ๋ก ๋ฐ์ดํฐ๋ฅผ ์ ์ถ(submit)ํ๊ณ ์ ํ์ ์ผ๋ก ํ๋ก์ฐ์ ์ฒ๋ฆฌ ๊ฒฐ๊ณผ์ธ ์๋ต์ ๋ฐ์ ์ ์๋ ์๋จ์ด๋ค.
์ด์ ์ ๋ณธ FileWriterGateway๋ ๋จ๋ฐฉํฅ ๊ฒ์ดํธ์จ์ด๋ฉฐ, ํ์ผ์ ์ฐ๊ธฐ ์ํด ๋ฌธ์์ด์ ์ธ์๋ก ๋ฐ๊ณ void๋ฅผ ๋ฐํํ๋ ๋ฉ์๋๋ฅผ ๊ฐ๊ณ ์๋ค. ์๋ฐฉํฅ ๊ฒ์ดํธ์จ์ด์ ์์ฑ๋ ์ด๋ ต์ง ์์ผ๋ฉฐ, ์ด๋๋ ๊ฒ์ดํธ์จ์ด ์ธํฐํ์ด์ค๋ฅผ ์์ฑํ ๋ ํตํฉ ํ๋ก์ฐ๋ก ์ ์กํ ๊ฐ์ ๋ฉ์๋์์ ๋ฐํํด์ผ ํ๋ค.
์๋ฅผ ๋ค์ด, ๋ฌธ์์ด์ ๋ฐ์์ ๋ชจ๋ ๋๋ฌธ์๋ก ๋ณํํ๋ ํตํฉ ํ๋ก์ฐ์ ๊ฒ์ดํธ์จ์ด๋ฅผ ์๊ฐํด ๋ณด์.
// option1. ์ด๋
ธํ
์ด์
๊ธฐ๋ฐ ๊ฒ์ดํธ์จ์ด ์ค์
@Component
@MessagingGateway(defaultRequestChannel="inChannel",
defaultReplyChannel="outChannel")
public interface UpperCaseGateway {
String uppercase(String in);
}
// option2. ์๋ฐ DSL ๊ตฌ์ฑ ๊ฒ์ดํธ์จ์ด ์ค์
@Bean
public IntegrationFlow uppercaseFlow() {
return IntegrationFlows
.from("inChannel")
.<String, String> transform(s -> s.toUpperCase())
.channel("outChannel")
.get();
}
์ฑ๋ ์ด๋ํฐ๋ ํตํฉ ํ๋ก์ฐ์ ์ ๊ตฌ์ ์ถ๊ตฌ๋ฅผ ๋ํ๋ธ๋ค. ๋ฐ์ดํฐ๋ ์ธ๋ฐ์ด๋(inbound) ์ฑ๋ ์ด๋ํฐ๋ฅผ ํตํด ํตํฉ ํ๋ก์ฐ๋ก ๋ค์ด์ค๊ณ , ์์๋ฐ์ด๋(outbound) ์ฑ๋ ์ด๋ํฐ๋ฅผ ํตํด ํตํฉ ํ๋ก์ฐ์์ ๋๊ฐ๋ค.
์ธ๋ฐ์ด๋ ์ฑ๋ ์ด๋ํฐ๋ ํ๋ก์ฐ์ ์ง์ ๋ ๋ฐ์ดํฐ ์์ค์ ๋ฐ๋ผ ์ฌ๋ฌ ๊ฐ์ง ํํ๋ฅผ ๊ฐ๋๋ค. ์๋ฅผ ๋ค์ด, ์ฆ๊ฐ๋๋ ์ซ์๋ฅผ AtomicInteger๋ก๋ถํฐ ํ๋ก์ฐ๋ก ๋ฃ๋ ์ธ๋ฐ์ด๋ ์ฑ๋ ์ด๋ํฐ๋ฅผ ์ ์ธํ ์ ์๋ค.
// option1. ์๋ฐ ์ด๋
ธํ
์ด์
๊ธฐ๋ฐ ์ด๋ํฐ ๊ตฌ์ฑ
@Bean
@InboundChannelAdapter(
poller=@Poller(fixedRate="1000"), channel="numberChannel")
public MessageSource<Integer> numberSource(AtomicInteger source) {
return () -> {
return new GenericMessage<>(source.getAndIncrement());
};
}
// option2. ์๋ฐ DSL ๊ธฐ๋ฐ ์ด๋ํฐ ๊ตฌ์ฑ
@Bean
public IntegrationFlow someFlow(AtomicInteger integerSource) {
return IntegrationFlows
.from(integerSource, "getAndIncrement",
c -> c.poller(Pollers.fixedRate(1000)))
...
.get();
}
์คํ๋ง ํตํฉ์ ์ฐ๋ฆฌ ๋๋ฆ์ ์ฑ๋ ์ด๋ํฐ๋ฅผ ์์ฑํ ์ ์๊ฒ ํด์ค๋ค. ์๋ ํ์ ์๋ ๊ฒ์ ํฌํจํด์ ๋ค์ํ ์ธ๋ถ ์์คํ ๊ณผ์ ํตํฉ์ ์ํด ์ฑ๋ ์ด๋ํฐ๊ฐ ํฌํจ๋ 24๊ฐ ์ด์์ ์๋ํฌ์ธํธ ๋ชจ๋์ ์คํ๋ง ํตํฉ์ด ์ ๊ณตํ๋ค.
Module | Dependency artifact ID |
---|---|
AMQP | spring-integration-amqp |
Spring | application events spring-integration-event |
RSS | and Atom spring-integration-feed |
Filesystem | spring-integration-file |
FTP | /FTPS spring-integration-ftp |
GemFire | spring-integration-gemfire |
HTTP | spring-integration-http |
JDBC | spring-integration-jdbc |
JPA | spring-integration-jpa |
JMS | spring-integration-jms |
spring-integration-mail | |
MongoDB | spring-integration-mongodb |
MQTT | spring-integration-mqtt |
Redis | spring-integration-redis |
RMI | spring-integration-rmi |
SFTP | spring-integration-sftp |
STOMP | spring-integration-stomp |
Stream | spring-integration-stream |
Syslog | spring-integration-syslog |
TCP | /UDP spring-integration-ip |
spring-integration-twitter | |
Web | Services spring-integration-ws |
WebFlux | spring-integration-webflux |
WebSocket | spring-integration-websocket |
XMPP | spring-integration-xmpp |
ZooKeeper | spring-integration-zookeeper |
ํ์ฝ ํด๋ผ์ฐ๋ ๋ฐ์ ํธ์งํจ์ ํ์ฝ ์ฃผ๋ฌธ ์ด๋ฉ์ผ์ ์ง์์ ์ผ๋ก ํ์ธํ์ฌ ์ด๋ฉ์ผ์ ์ฃผ๋ฌธ ๋ช ์ธ๋ฅผ ํ์ฑํ ํ ํด๋น ์ฃผ๋ฌธ ๋ฐ์ดํฐ์ ์ฒ๋ฆฌ๋ฅผ ์ํด ํ์ฝ ํด๋ผ์ฐ๋์ ์ ์ถํ๋ ํตํฉ ํ๋ก์ฐ๋ฅผ ๊ตฌํํด๋ณด์.
@Data
@ConfigurationProperties(prefix="tacocloud.email")
@Component
public class EmailProperties {
private String username;
private String password;
private String host;
private String mailbox;
private long pollRate = 30000;
public String getImapUrl() {
return String.format("imaps://%s:%s@%s/%s",
this.username, this.password, this.host, this.mailbox);
}
}
EmailProperties ํด๋์ค์ ํ์ํ ๋ฐ์ดํฐ๋ application.yml ํ์ผ์ ๊ตฌ์ฑํ ์ ์๋ค.
tacocloud:
email:
host: imap.tacocloud.com
mailbox: INBOX
username: taco-in-flow
password: 1L0v3T4c0s
poll-rate: 10000
์ง๊ธ๋ถํฐ๋ ํ์ฝ ์ฃผ๋ฌธ ์ด๋ฉ์ผ ํ๋ก์ฐ์ ์๋ฐ DSL ๊ตฌ์ฑ ํด๋ณด์
@Configuration
public class TacoOrderEmailIntegrationConfig {
@Bean
public IntegrationFlow tacoOrderEmailFlow(EmailProperties emailProps,
EmailToOrderTransformer emailToOrderTransformer,
OrderSubmitMessageHandler orderSubmitHandler) {
return IntegrationFlows.from(Mail.imapInboundAdapter(emailProps.getImapUrl()),
e -> e.poller(Pollers.fixedDelay(emailProps.getPollRate())))
.transform(emailToOrderTransformer)
.handle(orderSubmitHandler)
.get();
}
}
tacoOrderEmailFlow() ๋ฉ์๋์ ์ ์๋ ํ์ฝ ์ฃผ๋ฌธ ์ด๋ฉ์ผ ํ๋ก์ฐ๋ 3๊ฐ์ ์๋ก ๋ค๋ฅธ ์ปดํฌ๋ํธ๋ก ๊ตฌ์ฑ๋๋ค.
- IMAP ์ด๋ฉ์ผ ์ธ๋ฐ์ด๋ ์ฑ๋ ์ด๋ํฐ : ์ด ์ฑ๋ ์ด๋ํฐ๋ EmailProperties์ getImapUrl() ๋ฉ์๋๋ก๋ถํฐ ์์ฑ๋ IMP URL๋ก ์์ฑ๋๋ฉฐ, EmailProperties์ poolRate ์์ฑ์ ์ค์ ๋ ์ง์ฐ ์๊ฐ์ด ๋ ๋๋ง๋ค ์ด๋ฉ์ผ์ ํ์ธํ๋ค. ๋ฐ์ ์ด๋ฉ์ผ์ ๋ณํ๊ธฐ์ ์ฐ๊ฒฐํ๋ ์ฑ๋๋ก ์ ๋ฌ๋๋ค.
- ์ด๋ฉ์ผ์ Order ๊ฐ์ฒด๋ก ๋ณํํ๋ ๋ณํ๊ธฐ : ์ด ๋ณํ๊ธฐ๋ tacoOrderEmailFlow() ๋ฉ์๋๋ก ์ฃผ์ ๋๋ EmailToOrderTransformer์ ๊ตฌํ๋๋ค. ๋ณํ๋ ์ฃผ๋ฌธ ๋ฐ์ดํฐ(Order ๊ฐ์ฒด)๋ ๋ค๋ฅธ ์ฑ๋์ ํตํด ์ต์ข ์ปดํฌ๋ํธ๋ก ์ ๋ฌ๋๋ค.
- ํธ๋ค๋ฌ(์์๋ฐ์ด๋ ์ฑ๋ ์ด๋ํฐ๋ก ์๋) : ํธ๋ค๋ฌ๋ Order ๊ฐ์ฒด๋ฅผ ๋ฐ์์ ํ์ฝ ํด๋ผ์ฐ๋์ REST API๋ก ์ ์ถํ๋ค.
@Component
public class EmailToOrderTransformer
extends AbstractMailMessageTransformer<Order> {
@Override
protected AbstractIntegrationMessageBuilder<Order>
doTransform(Message mailMessage) throws Exception {
Order tacoOrder = processPayload(mailMessage); // ์ด๋ฉ์ผ์ Order ๊ฐ์ฒด๋ก ํ์ฑ
return MessageBuilder.withPayload(tacoOrder);
}
...
}
AbstractMailMessageTransformer๋ ํ์ด๋ก๋๊ฐ ์ด๋ฉ์ผ์ธ ๋ฉ์์ง๋ฅผ ์ฒ๋ฆฌํ๋ ๋ฐ ํธ๋ฆฌํ ๋ฒ ์ด์ค ํด๋์ค๋ค. ์ ๋ ฅ ๋ฉ์์ง๋ก๋ถํฐ ์ด๋ฉ์ผ ์ ๋ณด๋ฅผ Message ๊ฐ์ฒด(doTransform() ๋ฉ์๋์ ์ธ์๋ก ์ ๋ฌ)๋ก ์ถ์ถํ๋ ์ผ์ ์ง์ํ๋ค.
EmailToOrderTransformer๊ฐ ๋ง์ง๋ง์ผ๋ก ํ๋ ์ผ์ Order ๊ฐ์ฒด๋ฅผ ํฌํจํ๋ ํ์ด๋ก๋๋ฅผ ๊ฐ๋ MessageBuilder๋ฅผ ๋ฐํํ๋ ๊ฒ์ด๋ค.
@Component
public class OrderSubmitMessageHandler implements GenericHandler<Order> {
private RestTemplate rest;
private ApiProperties apiProps;
public OrderSubmitMessageHandler(ApiProperties apiProps, RestTemplate rest) {
this.apiProps = apiProps;
this.rest = rest;
}
@Override
public Object handle(Order order, Map<String, Object> headers) {
rest.postForObject(apiProps.getUrl(), order, String.class);
return null;
}
}
GenericHandler ์ธํฐํ์ด์ค์ handle ๋ฉ์๋๋ฅผ ์ค๋ฒ๋ผ์ด๋ ํ์๋ค. ์ด ๋ฉ์๋๋ ์ ๋ ฅ๋ Order ๊ฐ์ฒด๋ฅผ ๋ฐ์ผ๋ฉฐ, ์ฃผ์ ๋ RestTemplate์ ์ฌ์ฉํด์ ์ฃผ๋ฌธ(Order ๊ฐ์ฒด)์ ์ ์ถํ๋ค.
@Data
@ConfigurationProperties(prefix="tacocloud.api")
@Component
public class ApiProperties {
private String url;
}
application.yml์๋ ๋ค์๊ณผ ๊ฐ์ ๊ตฌ์ฑํ๋ค.
tacocloud:
api:
url: http://api.tacocloud.com
- simple-flow ํ๋ก์ ํธ๋ฅผ ๋น๋ํ๋ค.
- ./mvnw clean package
- simple-flow ํ๋ก์ ํธ๊ฐ ๋น๋๋์ด simple-flow\target ์๋์ simple-flow-0.0.9-SNAPSHOT.jar ํ์ผ๋ก ์์ฑ๋๋ค.
- java -Dspring.profiles.active=javaconfig -jar target/simple-flow-0.0.9-SNAPSHOT.jar
- /tmp/sia5/files/simple.txt ํ์ผ ํ์ธ
- ์คํ๋ง ํตํฉ์ ํ๋ก์ฐ๋ฅผ ์ ์ํ ์ ์๊ฒ ํด์ค๋ค. ๋ฐ์ดํฐ๋ ์ ํ๋ฆฌ์ผ์ด์ ์ผ๋ก ๋ค์ด์ค๊ฑฐ๋ ๋๊ฐ ๋ ํ๋ก์ฐ๋ฅผ ํตํด ์ฒ๋ฆฌํ ์ ์๋ค.
- ํตํฉ ํ๋ก์ฐ๋ XML, Java, Java DSL์ ์ฌ์ฉํด์ ์ ์ํ ์ ์๋ค.
- ๋ฉ์์ง ๊ฒ์ดํธ์จ์ด์ ์ฑ๋ ์ด๋์ฒ๋ ํตํฉ ํ๋ก์ฐ์ ์ ๊ตฌ๋ ์ถ๊ตฌ์ ์ญํ ์ ํ๋ค.
- ๋ฉ์์ง๋ ํ๋ก์ฐ ๋ด๋ถ์์ ๋ณํ, ๋ถํ , ์ง์ , ์ ๋ฌ๋ ์ ์์ผ๋ฉฐ, ์๋น์ค ์กํฐ๋ฒ ์ดํฐ์์ํด ์ฒ๋ฆฌ๋ ์ ์๋ค.
- ๋ฉ์์ง ์ฑ๋์ ํตํฉ ํ๋ก์ฐ์ ์ปดํฌ๋ํธ๋ค์ ์ฐ๊ฒฐํ๋ค.