chapter10 sijun - JAVA-JIKIMI/SPRING-IN-ACTION-5 GitHub Wiki
- ๋ช ๋ นํ - ์์ฐจ์ ์ผ๋ก ์ฐ์๋๋ ์์ ์ด๋ฉฐ, ๊ฐ ์์ ์ ํ ๋ฒ์ ํ๋์ฉ ๊ทธ๋ฆฌ๊ณ ์ด์ ์์ ๋ค์์ ์คํ๋๋ค. ๋ฐ์ดํฐ๋ ๋ชจ์์ ์ฒ๋ฆฌ๋๊ณ ์ด์ ์์ ์ด ๋ฐ์ดํฐ ์ฒ๋ฆฌ๋ฅผ ๋๋ธ ํ์ ๋ค์ ์์ ์ผ๋ก ๋์ด ๊ฐ ์ ์๋ค
- ๋ฆฌ์กํฐ๋ธ - ๋ฐ์ดํฐ ์ฒ๋ฆฌ๋ฅผ ์ํด ์ผ๋ จ์ ์์ ๋ค์ด ์ ์๋์ง๋ง, ์ด ์์ ๋ค์ ๋ณ๋ ฌ๋ก ์คํ๋ ์ ์๋ค. ๊ทธ๋ฆฌ๊ณ ๊ฐ ์์ ์ ๋ถ๋ถ ์งํฉ์ ๋ฐ์ดํฐ๋ฅผ ์ฒ๋ฆฌํ ์ ์์ผ๋ฉฐ, ์ฒ๋ฆฌ๊ฐ ๋๋ ๋ฐ์ดํฐ๋ฅผ ๋ค์ ์์ ์ ๋๊ฒจ์ฃผ๊ณ ๋ค๋ฅธ ๋ถ๋ถ ์งํฉ์ ๋ฐ์ดํฐ๋ก ๊ณ์ ์์ ํ ์ ์๋ค.
๋ฆฌ์กํฐ๋ธ ํ๋ก๊ทธ๋๋ฐ์ ๋ช ๋ นํ ํ๋ก๊ทธ๋๋ฐ์ ๋์์ด ๋๋ ํจ๋ฌ๋ค์์ด๋ค. ๋ช ๋ นํ ํ๋ก๊ทธ๋๋ฐ์ ํ๊ณ๋ฅผ ํด๊ฒฐํ ์ ์๊ธฐ ๋๋ฌธ์ด๋ค. ์ด๋ฐ ํ๊ณ๋ฅผ ์ดํดํ๋ฉด ๋ฆฌ์กํฐ๋ธ ๋ชจ๋ธ์ ์ฅ์ ์ ๋ ํ์คํ๊ฒ ์ดํดํ ์ ์๋ค.
๋ช ๋ นํ ํ๋ก๊ทธ๋๋ฐ์ ํ ๋ฒ์ ํ๋์ฉ ์์๋๋ก ์งํ๋๋ค. ๊ทธ๋ ๊ธฐ์, ๋ฐ์ดํฐ๋ฒ ์ด์ค์ ๋ฐ์ดํฐ๋ฅผ ์ฐ๊ฑฐ๋ ๊ฐ์ ธ์ค๋ ๊ฒ๊ณผ ๊ฐ์ ๊ฒ์ด๋ผ๋ฉด ์ด ์์ ์ด ์๋ฃ๋ ๋๊น์ง ์๋ฌด ๊ฒ๋ ํ ์ ์๋ค. ๋ฐ๋ผ์ ์ด ์์ ์ ์ํํ๋ ์ค๋ ๋๋ ์ฐจ๋จ๋๋ฉฐ ์ด๋ ๋ญ๋น๊ฐ ๋๋ค.
์ด์ ๋ฐํด ๋ฆฌ์กํฐ๋ธ ํ๋ก๊ทธ๋๋ฐ์ ํจ์์ ์ด๋ฉด์ ์ ์ธ์ ์ด๋ค. ์ฆ, ์์ฐจ์ ์ผ๋ก ์ํ๋๋ ์์ ๋จ๊ณ๋ฅผ ๋ํ๋ธ ๊ฒ์ด ์๋๋ผ ๋ฐ์ดํฐ๊ฐ ํ๋ฌ๊ฐ๋ ํ์ดํ๋ผ์ธ์ด๋ ์คํธ๋ฆผ์ ํฌํจํ๋ค. ๊ทธ๋ฆฌ๊ณ ์ด๋ฐ ๋ฆฌ์กํฐ๋ธ ์คํธ๋ฆผ์ ๋ฐ์ดํฐ ์ ์ฒด๋ฅผ ์ฌ์ฉํ ์ ์์ ๋๊น์ง ๊ธฐ๋ค๋ฆฌ์ง ์๊ณ ์ฌ์ฉ ๊ฐ๋ฅํ ๋ฐ์ดํฐ๊ฐ ์์ ๋๋ง๋ค ์ฒ๋ฆฌ๋๋ฏ๋ก ์ฌ์ค์ ์ ๋ ฅ๋๋ ๋ฐ์ดํฐ๋ ๋ฌดํํ ์ ์๋ค.
๋ฆฌ์กํฐ๋ธ ์คํธ๋ฆผ์ ๋ทํ๋ฆญ์ค, ๋ผ์ดํธ๋ฒค๋, ํ๋ณดํ์ ์์ง๋์ด๋ค์ ์ํด 2013๋ ๋ง์ ์์๋์๋ค. ๋ฆฌ์กํฐ๋ธ ์คํธ๋ฆผ์ ์ฐจ๋จ๋์ง ์๋ ๋ฐฑ ํ๋ ์ ๋ฅผ ๊ฐ์ง๋ ๋น๋๊ธฐ ์คํธ๋ฆผ ์ฒ๋ฆฌ์ ํ์ค์ ์ ๊ณตํ๋ ๊ฒ์ด ๋ชฉ์ ์ด๋ค.
๋ฐฑ ํ๋ ์ : ๋ฐ์ดํฐ๋ฅผ ์๋นํ๋ ์ปจ์๋จธ๊ฐ ์ฒ๋ฆฌํ ์ ์๋ ๋งํผ์ผ๋ก ์ ๋ฌ ๋ฐ์ดํฐ๋ฅผ ์ ํํจ์ผ๋ก์จ ์ง๋์น๊ฒ ๋น ๋ฅธ ๋ฐ์ดํฐ ์์ค๋ก๋ถํฐ์ ๋ฐ์ดํฐ ์ ๋ฌ ํญ์ฃผ๋ฅผ ํผํ ์ ์๋ ์๋จ์ด๋ค.
๋ฆฌ์กํฐ๋ธ ์คํธ๋ฆผ์ 4๊ฐ์ ์ธํฐํ์ด์ค์ธ Publisher(๋ฐํ์)
, Subscriber(๊ตฌ๋
์)
, Subscription(๊ตฌ๋
)
, Processor(ํ๋ก์ธ์)
๋ก ์์ฝํ ์ ์๋ค.
Publisherโ(0๊ฐ ์ด์์ Processor)โSubscriber ๊ตฌ์กฐ
/**
Publisher interface
subscribe() - Subscriber๊ฐ Publisher๋ฅผ ๊ตฌ๋
์ ์ฒญํ๊ธฐ ์ํ ๋ฉ์๋
**/
public interface Publisher<T> {
void subscribe(Subscriber<? super T> subscriber);
}
/**
Subscriber interface
onSubscribe() - ์ฒซ๋ฒ์งธ ์์ ์์ฒญ์ ์ํ ๋ฉ์๋๋ก ์ธ์๋ก Subscription์ ์ ๋ฌํ๋ค
onNext() - Subscribe๊ฐ ๋ฐ์ดํฐ ์์ฒญ์ด ์๋ฃ๋๋ฉด ์ถ๊ฐ๋ก
Publisher์๊ฒ ๋ฐ์ดํฐ๋ฅผ ์์ฒญํ๋ ๋ฉ์๋
onError() - ์๋ฌ ๋ฐ์์ ํธ์ถ๋๋ ๋ฉ์๋
onComplete() - Publisher๊ฐ ์์
์๋ฃ ํ ํธ์ถํ์ฌ
Subscriber์๊ฒ ์์
์๋ฃ ์ฌ๋ถ๋ฅผ ์๋ ค์ฃผ๋ ๋ฉ์๋
**/
public interface Subscriber<T> {
void onSubscribe(Subscription sub);
void onnext(T item);
void onError(Throwable ex);
void onComplete();
}
/**
Subscription interface
request() - Subscriber๊ฐ ๋ฐ์ดํฐ๋ฅผ ์์ฒญํ๊ธฐ ์ํ ๋ฉ์๋
cancel() - Subscriber๊ฐ ๊ตฌ๋
์ทจ์ ์์ฒญ์ ์ํ ๋ฉ์๋
**/
public interface Subscription {
void request(long n);
void cancel();
}
/**
Subscriber์ Publisher๋ฅผ ๊ฒฐํฉํ ์ธํฐํ์ด์ค
**/
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {}
ํ์ง๋ง, ๋ฆฌ์กํฐ๋ธ ์คํธ๋ฆผ ์ธํฐํ์ด์ค๋ ์คํธ๋ฆผ์ ๊ตฌ์ฑํ๋ ๊ธฐ๋ฅ์ด ์๋ค. ์ด์ ๋ฐ๋ผ ํ๋ก์ ํธ ๋ฆฌ์กํฐ์์๋ ๋ฆฌ์กํฐ๋ธ ์คํธ๋ฆผ์ ๊ตฌ์ฑํ๋ API๋ฅผ ์ ๊ณตํ์ฌ ๋ฆฌ์กํฐ๋ธ ์คํธ๋ฆผ ์ธํฐํ์ด์ค๋ฅผ ๊ตฌํํ์๋ค. ์ด๋ฒ ์ฅ์ ๋๋จธ์ง์์๋ ํ๋ก์ ํธ ๋ฆฌ์กํฐ๋ฅผ ์ดํด๋ณผ ๊ฒ์ด๋ค.
์ค๋ ๋์์ ํ ๋จ๊ณ์ฉ ์ฐจ๋ก๋๋ก ์คํ๋๋ค. ๊ทธ๋ฆฌ๊ณ ๊ฐ ๋จ๊ณ๊ฐ ์๋ฃ๋ ๋๊น์ง ๋ค์ ๋จ๊ณ๋ก ์ด๋ํ์ง ๋ชปํ๊ฒ ์คํ ์ค์ธ ์ค๋ ๋๋ฅผ ๋ง๋๋ค.
String name ="Craig";
String capitalName = name.toUpperCase();
String greeting = "Hello, " + capitalName + "!";
System.out.println(greeting);
๋ฆฌ์กํฐ๋ธ ์ฝ๋๊ฐ ๋จ๊ณ๋ณ๋ก ์คํ๋๋ ๊ฒ์ฒ๋ผ ๋ณด์ด์ง๋ง, ์ค์ ๋ก๋ ๋ฐ์ดํฐ๊ฐ ์ ๋ฌ๋๋ ํ์ดํ๋ผ์ธ์ ๊ตฌ์ฑํ๋ ๊ฒ์ด๋ค. ๊ทธ๋ฆฌ๊ณ ๊ฐ ๋จ๊ณ์์๋ ์ด๋ป๊ฒ ํ๋ ๋ฐ์ดํฐ๊ฐ ๋ณ๊ฒฝ๋๋ค. ๋ํ, ๊ฐ ์คํผ๋ ์ด์ ์ ๊ฐ์ ์ค๋ ๋๋ก ์คํ๋๊ฑฐ๋ ๋ค๋ฅธ ์ค๋ ๋๋ก ์คํ๋ ์ ์๋ค.
Mono.just("Craig")
.map(n -> n.toUpperCase())
.map(cn -> "Hello, " + cn + "!")
.subscribe(System.out:println);
just() ์คํผ๋ ์ด์ ์ ์ฒซ ๋ฒ์งธ ๊ฒ์ ์์ฑํ๋ค. ์ฒซ๋ฒ์จฐ Mono๊ฐ ๊ฐ์ ๋ฐฉ์ถํ๋ฉด ์ด ๊ฐ์ด ์ฒซ ๋ฒ์งธ map() ์คํผ๋ ์ด์ ์ ์ ๋ฌ๋์ด ๋๋ฌธ์๋ก ๋ณ๊ฒฝ๋๊ณ ๋ค๋ฅธ mono๋ฅผ ์์ฑํ๋ค. ์ด๋ ๊ฒ ์์ฑ๋ ๋ ๋ฒ์งธ Mono๊ฐ ๋ฐ์ดํฐ๋ฅผ ๋ฐฉ์ถํ๋ฉด ์ด ๋ฐ์ดํฐ๊ฐ ๋ ๋ฒ์งธ map()์ ์ ๋ฌ๋์ด ๋ฌธ์์ด ๊ฒฐํฉ์ด ์ํ๋๋ฉฐ, ์ด ๊ฒฐ๊ณผ๋ ์ธ ๋ฒ์งธ Mono๋ฅผ ์์ฑํ๋ ๋ฐ ์ฌ์ฉ๋๋ค. ๊ทธ๋ฆฌ๊ณ ๋์ผ๋ก subscribe() ํธ์ถ์์๋ ์ธ ๋ฒ์งธ Mono๋ฅผ ๊ตฌ๋ ํ์ฌ ๋ฐ์ดํฐ๋ฅผ ์์ ํ๊ณ ์ถ๋ ฅํ๋ค.
Mono์ Flux๋ ๋ฆฌ์กํฐ๋ธ์ ๋ ๊ฐ์ง ํต์ฌ ํ์ ์ด๋ค.๋ ๊ฐ ๋ชจ๋ ๋ฆฌ์กํฐ๋ธ ์คํธ๋ฆผ์ Publisher ์ธํฐํ์ด์ค๋ฅผ ๊ตฌํํ ๊ฒ์ด๋ค.
Mono: ํ๋์ ๋ฐ์ดํฐ ํญ๋ชฉ๋ง ๊ฐ๋ ๋ฐ์ดํฐ์ ์ ์ต์ ํ๋ ๋ฆฌ์กํฐ๋ธ ํ์
Flux: 0, 1 ๋๋ ๋ค์์(๋ฌดํ์ผ ์ ์๋) ๋ฐ์ดํฐ๋ฅผ ๊ฐ๋ ํ์ดํ๋ผ์ธ
๋ฆฌ์กํฐ๋ธ ํ๋ก์ฐ๋ ๋ง๋ธ ๋ค์ด์ด๊ทธ๋จ(marble diagram)์ผ๋ก ๋ํ๋ด๊ณค ํ๋ค.
- ์ ์ผ ์ ๋ถ๋ถ - Flux๋ Mono๋ฅผ ํตํด ์ ๋ฌ๋๋ ๋ฐ์ดํฐ์ ํ์๋ผ์ธ
- ์ค์ ๋ถ๋ถ - ์คํผ๋ ์ด์
- ์ ์ผ ๋ฐ ๋ถ๋ถ - Flux๋ Mono์ ํ์๋ผ์ธ
<!-- ๋ฆฌ์กํฐ๋ฅผ ์ฌ์ฉํ๊ธฐ ์ํด ํ์ํ ๋ผ์ด๋ธ๋ฌ๋ฆฌ -->
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>
<!-- ๋ฆฌ์กํฐ ํ
์คํธ๋ฅผ ์์ฑํ๊ธฐ ์ํด ํ์ํ ๋ผ์ด๋ธ๋ฌ๋ฆฌ -->
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
Flux์ Mono๋ ๋ฆฌ์กํฐ๊ฐ ์ ๊ณตํ๋ ๊ฐ์ฅ ํต์ฌ์ ์ธ ๊ตฌ์ฑ ์์์ด๋ค. ์ด ๋๊ฐ์ง๋ฅผ ์ด์ฉํ์ฌ ํ์ดํ๋ผ์ธ์ ์์ฑํ๋ฉฐ, ์ด ๋๊ฐ์ง ๋ด์๋ 500๊ฐ ์ด์์ ์คํผ๋ ์ด์ ์ด ์๋ค.
- ์์ฑ ์คํผ๋ ์ด์ (creation)
- ์กฐํฉ ์คํผ๋ ์ด์ (combination)
- ๋ณํ ์คํผ๋ ์ด์ (transformation)
- ๋ก์ง ์คํผ๋ ์ด์ (logic)
@Test
public void createAFlux_just() {
Flux<String> fruitFlux = Flux
.just("Apple", "Orange", "Grape", "Banana", "Strawberry");
}
fruitFlux.subscribe(
f->System.out.println("Here's some fruit: " + f)
};
@Test
public void createAFlux_just() {
Flux<String> fruitFlux = Flux
.just("Apple", "Orange", "Grape", "Banana", "Strawberry");
StepVerifier.create(fruitFlux)
.expectNext("Apple")
.expectNext("Orange")
.expectNext("Grape")
.expectNext("Banana")
.expectNext("Strawberry")
.verifyComplete();
}
Flux๋ ๋ฐฐ์ด, Iterable ๊ฐ์ฒด, ์๋ฐ Stream ๊ฐ์ฒด๋ก๋ถํฐ ์์ฑ๋ ์ ์๋ค.
@Test
public void createAFlux_fromArray() {
String[] fruits = new String[] {
"Apple", "Orange", "Grape", "Banana", "Strawberry" };
// fromArray()๋ฅผ ํ์ฉํ Flux ์์ฑ
Flux<String> fruitFlux = Flux.fromArray(fruits);
StepVerifier.create(fruitFlux)
.expectNext("Apple")
.expectNext("Orange")
.expectNext("Grape")
.expectNext("Banana")
.expectNext("Strawberry")
.verifyComplete();
}
@Test
public void createAFlux_fromIterable() {
List<String> fruitList = new ArrayList<>();
fruitList.add("Apple");
fruitList.add("Orange");
fruitList.add("Grape");
fruitList.add("Banana");
fruitList.add("Strawberry");
// fromIterable()๋ฅผ ํ์ฉํ Flux ์์ฑ
Flux<String> fruitFlux = Flux.fromIterable(fruitList);
StepVerifier.create(fruitFlux)
.expectNext("Apple")
.expectNext("Orange")
.expectNext("Grape")
.expectNext("Banana")
.expectNext("Strawberry")
.verifyComplete();
}
@Test
public void createAFlux_fromStream() {
Stream<String> fruitStream =
Stream.of("Apple", "Orange", "Grape", "Banana", "Strawberry");
// fromStream()์ ํ์ฉํ Flux ์์ฑ
Flux<String> fruitFlux = Flux.fromStream(fruitStream);
StepVerifier.create(fruitFlux)
.expectNext("Apple")
.expectNext("Orange")
.expectNext("Grape")
.expectNext("Banana")
.expectNext("Strawberry")
.verifyComplete();
}
๋ฐ์ดํฐ ์์ด ๋งค๋ฒ ์ ๊ฐ์ผ๋ก ์ซ์๋ฅผ ๋ฐฉ์ถํ๋ Flux๋ง ํ์ํ ๊ฒฝ์ฐ๊ฐ ์๋ค.
-
range()๋ฅผ ํ์ฉํ ๋ฐ์ดํฐ ์์ฑ
@Test public void createAFlux_range() { Flux<Integer> rangeFlux = Flux.range(1, 5); StepVerifier.create(intervalFlux) .expectNext(1) .expectNext(2) .expectNext(3) .expectNext(4) .expectNext(5) .verifyComplete(); }
-
interval()๋ฅผ ํ์ฉํ ๋ฐ์ดํฐ ์์ฑ
์์ ๊ฐ๊ณผ ์ข ๋ฃ ๊ฐ๋์ ๊ฐ์ด ๋ฐฉ์ถ๋๋ ์๊ฐ ๊ฐ๊ฒฉ์ด๋ ์ฃผ๊ธฐ๋ฅผ ์ง์ ํ๋ค.
@Test public void createAFlux_interval() { Flux<Long> intervalFlux = Flux.interval(Duration.ofSeconds(1)) .take(5); StepVerifier.create(intervalFlux) .expectNext(0L) .expectNext(1L) .expectNext(2L) .expectNext(3L) .expectNext(4L) .verifyComplete(); }
๋ ๊ฐ ์ด์์ ๋ฆฌ์กํฐ๋ธ ํ์ ์ผ๋ก ๊ฒฐํฉ์ด๋ ๋ถํ ํ๋ ์คํผ๋ ์ด์
๋ ๊ฐ์ Flux ์คํธ๋ฆผ์ mergeWith()
operation์ ์ฌ์ฉํ์ฌ ๊ฒฐํฉํ ์ ์๋ค.
@Test
public void mergeFluxes() {
// Flux๋ ๊ฐ๋ฅํ ๋นจ๋ฆฌ ๋ฐ์ดํฐ๋ฅผ ๋ฐฉ์ถํ๋ค.
// ๋ฒ๊ฐ์ ๊ตฌ๋
ํจ์ ๋ณด์ฌ์ฃผ๊ธฐ ์ํด delayElements๋ฅผ ์ฌ์ฉํ๋ค.
Flux<String> characterFlux = Flux
.just("Garfield", "Kojak", "Barbossa")
.delayElements(Duration.ofMillis(500));
Flux<String> foodFlux = Flux
.just("Lasagna", "Lollipops", "Apples")
.delaySubscription(Duration.ofMillis(250))
.delayElements(Duration.ofMillis(500));
Flux<String> mergedFlux = characterFlux.mergeWith(foodFlux);
StepVerifier.create(mergedFlux)
.expectNext("Garfield")
.expectNext("Lasagna")
.expectNext("Kojak")
.expectNext("Lollipops")
.expectNext("Barbossa")
.expectNext("Apples")
.verifyComplete();
}
mergeWith()
๋ Flux๋ค์ ๊ฐ์ด ์๋ฒฝํ๊ฒ ๋ฒ๊ฐ์ ๋ฐฉ์ถ๋๊ฒ ๋ณด์ฅํ ์ ์์ผ๋ฏ๋ก ํ์ํ๋ค๋ฉด zip()
์คํผ๋ ์ด์
์ ๋์ ์ฌ์ฉํ ์ ์๋ค.
zip()
์คํผ๋ ์ด์
์ ๋ฒ๊ฐ์ ๊ฐ์ ธ์ ์๋ก์ด flux๋ฅผ ์์ฑํ๋ค.
@Test
public void zipFluxes() {
Flux<String> characterFlux = Flux
.just("Garfield", "Kojak", "Barbossa");
Flux<String> foodFlux = Flux
.just("Lasagna", "Lollipops", "Apples");
Flux<Tuple2<String, String>> zippedFlux =
Flux.zip(characterFlux, foodFlux);
StepVerifier.create(zippedFlux)
.expectNextMatches(p ->
p.getT1().equals("Garfield") &&
p.getT2().equals("Lasagna"))
.expectNextMatches(p ->
p.getT1().equals("Kojak") &&
p.getT2().equals("Lollipops"))
.expectNextMatches(p ->
p.getT1().equals("Barbossa") &&
p.getT2().equals("Apples"))
.verifyComplete();
}
zippedFlux๋ก๋ถํฐ ๋ฐฉ์ถ๋๋ ๊ฐ ํญ๋ชฉ์ Tuple2(๋ ๊ฐ์ ๋ค๋ฅธ ๊ฐ์ฒด๋ฅผ ์ ๋ฌํ๋ ์ปจํ ์ด๋ ๊ฐ์ฒด), ๊ฐ ์์ค Flux๊ฐ ์์๋๋ก ๋ฐฉ์ถํ๋ ํญ๋ชฉ์ ํฌํจํ๋ค.
๋ง์ผ Tuple2๊ฐ ์๋ ๋ค๋ฅธ ํ์ ์ ์ฌ์ฉํ๊ณ ์ถ๋ค๋ฉด ์ฐ๋ฆฌ๊ฐ ์ํ๋ ๊ฐ์ฒด๋ฅผ ์์ฑํ๋ ํจ์๋ฅผ zip()์๊ฒ ์ ๊ณตํ๋ฉด ๋๋ค
@Test
public void zipFluxesToObject() {
Flux<String> characterFlux = Flux
.just("Garfield", "Kojak", "Barbossa");
Flux<String> foodFlux = Flux
.just("Lasagna", "Lollipops", "Apples");
Flux<String> zippedFlux =
Flux.zip(characterFlux, foodFlux, (c, f) -> c + " eats " + f);
StepVerifier.create(zippedFlux)
.expectNext("Garfield eats Lasagna")
.expectNext("Kojak eats Lollipops")
.expectNext("Barbossa eats Apples")
.verifyComplete();
}
๋ ๊ฐ์ Flux๋ฅผ ๊ฒฐํฉํ๋ ๋์ ๋จผ์ ๊ฐ์ ๋ฐฉ์ถํ๋ ์์ค Flux์ ๊ฐ์ ๋ฐํํ๋ Flux๋ first() ์คํผ๋ ์ด์ ์ ์ฌ์ฉํ ์ ์๋ค.
@Test
public void firstFlux() {
// delay needed to "slow down" the slow Flux
Flux<String> slowFlux = Flux.just("tortoise", "snail", "sloth")
.delaySubscription(Duration.ofMillis(100));
Flux<String> fastFlux = Flux.just("hare", "cheetah", "squirrel");
Flux<String> firstFlux = Flux.first(slowFlux, fastFlux);
StepVerifier.create(firstFlux)
.expectNext("hare")
.expectNext("cheetah")
.expectNext("squirrel")
.verifyComplete();
}
์์์ ์ํ๋ ๊ฐ์์ ํญ๋ชฉ์ ๋ฌด์ํ๋ ๊ฒ์ skip()
์คํผ๋ ์ด์
์ ์ฌ์ฉํ๋ค.
@Test
public void skipAFew() {
Flux<String> countFlux = Flux.just(
"one", "two", "skip a few", "ninety nine", "one hundred")
.skip(3);
StepVerifier.create(countFlux)
.expectNext("ninety nine", "one hundred")
.verifyComplete();
}
ํน์ ์๊ฐ์ด ์ง๋ ํ์ ์ค๋ ๋ฐ์ดํฐ๋ก Flux๋ฅผ ์์ฑํ๊ธฐ ์ํด์ skip()
์์ Duration์ ๋งค๊ฐ๋ณ์๋ก ์ง์ ํ ์ ์๋ค.
@Test
public void skipAFewSeconds() {
Flux<String> countFlux = Flux.just(
"one", "two", "skip a few", "ninety nine", "one hundred")
.delayElements(Duration.ofSeconds(1))
.skip(Duration.ofSeconds(4));
StepVerifier.create(countFlux)
.expectNext("ninety nine", "one hundred")
.verifyComplete();
}
๋ฐ๋๋ก ํน์ ๊ฐ์๋งํผ๋ง ๋จผ์ ๋ฐ๊ณ ์ถ๋ค๋ฉด take()๋ฅผ ํน์ ์๊ฐ๋์์ ๋ฐ์ดํฐ๋ง ์ฌ์ฉํด์ flux๋ฅผ ๋ง๋ค๊ธฐ ์ํด์ take(Duration)์ ์ฌ์ฉํ ์ ์๋ค.
@Test
public void take() {
Flux<String> nationalParkFlux = Flux.just(
"Yellowstone", "Yosemite", "Grand Canyon", "Zion", "Acadia")
.take(3);
StepVerifier.create(nationalParkFlux)
.expectNext("Yellowstone", "Yosemite", "Grand Canyon")
.verifyComplete();
}
@Test
public void takeForAwhile() {
Flux<String> nationalParkFlux = Flux.just(
"Yellowstone", "Yosemite", "Grand Canyon", "Zion", "Grand Teton")
.delayElements(Duration.ofSeconds(1))
.take(Duration.ofMillis(3500));
StepVerifier.create(nationalParkFlux)
.expectNext("Yellowstone", "Yosemite", "Grand Canyon")
.verifyComplete();
}
Flux๋ Mono์ ๊ฐ์ฅ ๋ง์ด ์ฌ์ฉํ๋ ์คํผ๋ ์ด์ ์ค ํ๋๋ ๋ฐํ๋ ํญ๋ชฉ์ ๋ค๋ฅธ ํํ๋ ํ์ ์ผ๋ก ๋งคํ(๋ณํ)ํ๋ ๊ฒ์ด๋ค. ๋ค๋ฅธ ํํ๋ ํ์ ์ผ๋ก ๋งคํํ๊ธฐ ์ํด map()๊ณผ flatMap() ์คํผ๋ ์ด์ ์ ์ฌ์ฉํ๋ค.
@Test
public void map() {
Flux<Player> playerFlux = Flux
.just("Michael Jordan", "Scottie Pippen", "Steve Kerr")
.map(n -> {
String[] split = n.split("\\s");
return new Player(split[0], split[1]);
});
StepVerifier.create(playerFlux)
.expectNext(new Player("Michael", "Jordan"))
.expectNext(new Player("Scottie", "Pippen"))
.expectNext(new Player("Steve", "Kerr"))
.verifyComplete();
}
map()
์ ํ์ฉํ๋ฉด ๋งคํ ์ํ ๋ฐฉ์์ด ๋๊ธฐ์ ์ผ๋ก ์ํ๋๋ค. ๋ฐ๋ผ์, ๋น๋๊ธฐ์ ์ผ๋ก ์ฌ์ฉํ๋ ค๋ฉด flatMap()
์ ์ฌ์ฉํด์ผ ํ๋ค.
๋จ์ํ flatMap()์ ์ฌ์ฉํ๋ฉด ๋๊ธฐ์ ์ผ๋ก ์คํ๋๋ค. subscribeOn()์ ์ฌ์ฉํ์ฌ ๊ตฌ๋ ์ด ๋์์ ์ผ๋ก ์ฒ๋ฆฌ๋์ด์ผ ํ๋ค๋ ๊ฒ์ ์ง์ ํ๋ค. (๋์์ฑ ๋ชจ๋ธ์ subscribeOn()์ ์ธ์๋ก ์ง์ ํ์๋ค)
Schedule์ ๋์์ฑ ๋ชจ๋ธ
- immeidate() - ํ์ฌ ์ค๋ ๋์์ ๊ตฌ๋ ์คํ
- single() - ๋จ์ผ์ ์ฌ์ฌ์ฉ ๊ฐ๋ฅํ ์ค๋ ๋์์ ๊ตฌ๋ ์คํ. ๋ชจ๋ ํธ์ถ์์ ๋ํด ๋์ผํ ์ค๋ ๋๋ฅผ ์ฌ์ฌ์ฉํจ
- newSingle() - ๋งค ํธ์ถ๋ง๋ค ์ ์ฉ ์ค๋ ๋์์ ๊ตฌ๋ ์คํ
- elastic() - ๋ฌดํํ๊ณ ์ ์ถ์ฑ ์๋ ํ์์ ๊ฐ์ ธ์จ ์์ ์ค๋ ๋์์ ๊ตฌ๋ ์ ์คํ. ํ์ ์ ์๋ก์ด ์์ ์ค๋ ๋๊ฐ ์์ฑ๋๋ฉฐ, ์ ํด ์ค๋ ๋๋ ์ ๊ฑฐ๋๋ค.
- parallel() - ๊ณ ์ ๋ ํฌ๊ธฐ์ ํ์์ ๊ฐ์ ธ์จ ์์ ์ค๋ ๋์์ ๊ตฌ๋ ์ ์คํํ๋ฉฐ, CPU ์ฝ์ด์ ๊ฐ์๊ฐ ํฌ๊ธฐ๊ฐ ๋๋ค.
Flux๋ฅผ ํตํด ์ ๋ฌ๋๋ ๋ฐ์ดํฐ๋ฅผ ์ฒ๋ฆฌํ๋ ๋์ ๋ฐ์ดํฐ ์คํธ๋ฆผ์ ์์ ๋ฉ์ด๋ฆฌ๋ก ๋ถํ ํ๋ฉด ๋์์ด ๋ ์ ์๋ค. ์ด๋ buffer()
์คํผ๋ ์ด์
์ ์ฌ์ฉํ ์ ์๋ค.
@Test
public void buffer() {
Flux<String> fruitFlux = Flux.just(
"apple", "orange", "banana", "kiwi", "strawberry");
Flux<List<String>> bufferedFlux = fruitFlux.buffer(3);
StepVerifier
.create(bufferedFlux)
.expectNext(Arrays.asList("apple", "orange", "banana"))
.expectNext(Arrays.asList("kiwi", "strawberry"))
.verifyComplete();
}
์ ์ฝ๋๋ Flux๋ก๋ถํฐ ๋ฆฌ์กํฐ๋ธ๊ฐ ์๋ List ์ปฌ๋ ์ ์ผ๋ก ๋ฒํผ๋ง๋๋ ๊ฐ์ ๋น์์ฐ์ ์ธ ๊ฒ์ฒ๋ผ ๋ณด์ธ๋ค. ๊ทธ๋ฌ๋ buffer()๋ฅผ flatMap()๊ณผ ๊ฐ์ด ์ฌ์ฉํ๋ฉด ๊ฐ List ์ปฌ๋ ์ ์ ๋ณํ ์ฒ๋ฆฌ ํ ์ ์๋ค.
@Test
public void bufferAndFlatMap() throws Exception {
Flux.just(
"apple", "orange", "banana", "kiwi", "strawberry")
.buffer(3)
.flatMap(x ->
Flux.fromIterable(x)
.map(y -> y.toUpperCase())
.subscribeOn(Schedulers.parallel())
.log()
).subscribe();
}
๋ง์ฝ ์ด๋ค ์ด์ ๋ก๋ Flux๊ฐ ๋ฐฉ์ถํ๋ ๋ชจ๋ ํญ๋ชฉ์ List๋ก ๋ชจ์ ํ์๊ฐ ์๋ค๋ฉด ์ธ์๋ฅผ ์ ๋ฌํ์ง ์๊ณ buffer()๋ฅผ ํธ์ถํ๋ฉด ๋๋ค.
Flux<List<String>> bufferedFlux = fruitFlux.buffer();
Mono๋ Flux๊ฐ ๋ฐํํ ํญ๋ชฉ์ด ์ด๋ค ์กฐ๊ฑด๊ณผ ์ผ์นํ๋์ง๋ง ์์์ผ ํ ๊ฒฝ์ฐ๊ฐ ์๋ค. ์ด๋๋ all()
์ด๋ any()
์คํผ๋ ์ด์
์ด ๊ทธ๋ฐ ๋ก์ง์ ์ํํ๋ค.
@Test
public void all() {
Flux<String> animalFlux = Flux.just(
"aardvark", "elephant", "koala", "eagle", "kangaroo");
Mono<Boolean> hasAMono = animalFlux.all(a -> a.contains("a"));
StepVerifier.create(hasAMono)
.expectNext(true)
.verifyComplete();
Mono<Boolean> hasKMono = animalFlux.all(a -> a.contains("k"));
StepVerifier.create(hasKMono)
.expectNext(false)
.verifyComplete();
}
@Test
public void any() {
Flux<String> animalFlux = Flux.just(
"aardvark", "elephant", "koala", "eagle", "kangaroo");
Mono<Boolean> hasAMono = animalFlux.any(a -> a.contains("a"));
StepVerifier.create(hasAMono)
.expectNext(true)
.verifyComplete();
Mono<Boolean> hasZMono = animalFlux.any(a -> a.contains("z"));
StepVerifier.create(hasZMono)
.expectNext(false)
.verifyComplete();
}
- ๋ฆฌ์กํฐ๋ธ ํ๋ก๊ทธ๋๋ฐ์์๋ ๋ฐ์ดํฐ๊ฐ ํ๋ฌ๊ฐ๋ ํ์ดํ๋ผ์ธ์ ์์ฑํ๋ค.
- ๋ฆฌ์กํฐ๋ธ ์คํธ๋ฆผ์ Publisher, Subscriber, Subscription, Transformer์ ๋ค ๊ฐ์ง ํ์ ์ ์ ์ํ๋ค.
- ํ๋ก์ ํธ ๋ฆฌ์กํฐ๋ ๋ฆฌ์กํฐ๋ธ ์คํธ๋ฆผ์ ๊ตฌํํ๋ฉฐ, ์๋ง์ ์คํผ๋ ์ด์ ์ ์ ๊ณตํ๋ Flux์ Mono์ ๋ ๊ฐ์ง ํ์ ์ผ๋ก ์คํธ๋ฆผ์ ์ ์ํ๋ค.
- ์คํ๋ง 5๋ ๋ฆฌ์กํฐ๋ฅผ ์ฌ์ฉํด์ ๋ฆฌ์กํฐ๋ธ ์ปจํธ๋กค๋ฌ, ๋ฆฌํผ์งํฐ๋ฆฌ, REST ํด๋ผ์ด์ธํธ๋ฅผ ์์ฑํ๊ณ ๋ค๋ฅธ ๋ฆฌ์กํฐ๋ธ ํ๋ ์์ํฌ๋ฅผ ์ง์ํ๋ค.