flatMapSequential은 비동기로 동시에 쏘면서 성능을 챙기면서도 최종 결과물의 순서는 원본 순서대로 정렬한다.
Reactor에서 제공하는 데이터 변환 연산자 메서드 - flatMapMany()
publicclassMain {
publicstaticvoidmain(String[] args) throwsInterruptedException {
// flatMapMany: Mono 환경에서 다중 데이터(Flux) 스트림으로 확장 및 평탄화// 1. 단순 문자열을 분해하여 여러 개의 데이터로 밀어내기Mono.just("hello")
.flatMapMany(word -> Flux.fromArray(word.split("")))
.subscribe(System.out::println);
// 2. 실무 정석: 단일 조건(Mono)으로 다중 데이터 조회(Flux) 파이프라인 연동Mono.just("user123")
.flatMapMany(Main::findOrdersByUserId)
.subscribe(System.out::println);
}
staticFlux<String> findOrdersByUserId(StringuserId) {
returnFlux.just("주문#1001", "주문#1002", "주문#1003");
}
}
flatMapMany는 Mono를 Flux로 전환하기 위해 사용하는 연산자이다.
데이터 스트림 환경에서 가공 및 조율을 위한 Reactor의 필터링 연산자 메서드 - filter()
publicclassMain {
publicstaticvoidmain(String[] args) throwsInterruptedException {
// filter 복합 체이닝Flux.just(
newProduct("노트북", 1500000, true),
newProduct("마우스", 35000, true),
newProduct("키보드", 89000, false),
newProduct("모니터", 450000, true),
newProduct("웹캠", 62000, true)
)
.filter(Product::inStock) // 1차 필터: 재고 유무 확인
.filter(p -> p.price() <= 100000) // 2차 필터: 가격 조건 확인
.subscribe(p -> System.out.println(p.name() + " - " + p.price() + "원"));
}
recordProduct(Stringname, intprice, booleaninStock) {
}
}
filter()는 내부에 진위 연산 람다식을 받아서 스트림의 흐름을 제어한다.
데이터 스트림 환경에서 가공 및 조율을 위한 Reactor의 필터링 연산자 메서드 - distinct()
publicclassMain {
publicstaticvoidmain(String[] args) throwsInterruptedException {
// 1. 기본 distinct: 객체 자체의 equals/hashCode 기반 중복 제거Flux.just(1, 2, 3, 2, 1, 4, 3, 5)
.distinct()
.subscribe(data -> System.out.print(data + " "));
System.out.println();
// 2. 키 기반 distinct: 특정 필드(Key) 기준으로 중복 제거Flux.just(
newUser("홍길동", "서울"),
newUser("김영희", "부산"),
newUser("이철수", "서울"),
newUser("박지민", "대전"),
newUser("최민수", "부산")
)
.distinct(User::city) // 각 유저의 city 필드를 기준으로 중복 판별
.subscribe(user -> System.out.println(user.name() + " (" + user.city() + ")"));
}
recordUser(Stringname, Stringcity) {
}
}
distinct()는 중복된 데이터를 완벽하게 솎아내는 역할을 한다.
그러나 대용량 아키텍처 관점에서 치명적인 트레이드 오프를 가지고 있다.
메모리 누수 위험 : Flux.interval처럼 종료되지 않고 무한히 흐르는 스트림이거나 하루에 수억 건씩 쏟아지는 금융 트랜잭션 스트림에 걸게 되면 스트림이 유지되는 내내 내부 Set에 데이터가 계속 누적되면서 힙 메모리를 잡아먹게 되고 결국 서버가 OutOfMemoryError(OOM)을 뱉을 수 있다.
대안 - distinctUntilChanged() : 해당 메서드는 전체 데이터가 아니라 바로 직전에 통과한 데이터와만 비교해 연속으로 중복되는 데이터만 쳐내는 오퍼레이터이다.
데이터 스트림 환경에서 가공 및 조율을 위한 Reactor의 필터링 연산자 메서드 - elementAt()
여러 독립된 데이터 소스를 순서에 상관없이 먼저 처리되는 대로 가장 빠르게 다운스트림에 밀어 넣어 병합하는 역할을 한다.
다중 스트림 구조에서 데이터 결합을 위한 Reactor 결합 연산자 메서드 - zip()
publicclassMain {
publicstaticvoidmain(String[] args) throwsInterruptedException {
// Flux.zip: 여러 스트림의 데이터를 index 기준으로 1:1 매칭하여 결합// 1. Bi-Function 조합식을 결합한 형태 (2개의 스트림)Flux<String> names = Flux.just("홍길동", "김영희", "이철수");
Flux<Integer> ages = Flux.just(20, 30, 40);
Flux.zip(names, ages, (name, age) -> name + " : " + age)
.subscribe(System.out::println);
System.out.println("--------------------------------");
// 2. 3개 이상의 스트림 결합 (Tuple 데이터 구조 활용)Flux<String> names2 = Flux.just("홍길동", "김영희", "이철수");
Flux<Integer> ages2 = Flux.just(20, 30, 40);
Flux<String> cities = Flux.just("서울", "부산", "대구");
Flux.zip(names2, ages2, cities)
.map(tuple -> tuple.getT1() + "(" + tuple.getT2() + "세, " + tuple.getT3() + ")")
.subscribe(System.out::println);
System.out.println("--------------------------------");
// 3. 결합할 데이터의 개수가 불일치할 때의 동작 메커니즘Flux<String> names3 = Flux.just("홍길동", "이철수", "박지민");
Flux<Integer> ages3 = Flux.just(20, 30);
Flux.zip(names3, ages3)
.subscribe(System.out::println);
}
}
zip()는 서로 다른 파이프라인에서 흐르는 데이터들을 동일한 인덱스 기준으로 정확하게 한 쌍씩 묶어내는 역할을 한다.
무한한 데이터 스트림 환경에서 조건을 검증하기 위한 연산자 메서드 - defaultIfEmpty(), switchIfEmpty()
publicclassMain {
publicstaticvoidmain(String[] args) throwsInterruptedException {
// defaultIfEmpty: 스트림이 비어있을 때 고정된 기본값(Fallback Value)을 반환// 1. 빈 스트림인 경우 -> 기본값 출력Flux<String> result = Flux.<String>empty()
.defaultIfEmpty("데이터 없음");
result.subscribe(System.out::println);
System.out.println("--------------------------------");
// 2. 데이터가 존재하는 경우 -> 원본 데이터 그대로 통과Flux<String> hasData = Flux.just("A", "B", "C")
.defaultIfEmpty("데이터 없음");
hasData.subscribe(System.out::println);
}
}
defaultIfEmpty()는 데이터가 흐르지 않고 그냥 끝나버린 빈 스트림을 감지하고 이에 대한 디폴트 값을 안전하게 밀어주는 역할을 한다.