onErrorReturn : 에러가 터지는 순간, 준비해 둔 디폴트 값(Fallback)을 부여하고 스트림을 정상 종료시키는 연산자이다.
신호의 전환 : onError 신호가 내려오는 것을 가로채서 onNext로 바꾼 뒤, 바로 onComplete 신호를 던져 파이프라인을 Graceful Shutdown 한다.
조기 종료 : 에러가 발생한 그 즉시 스트림이 종료된다. 원본 스트림 뒤에 데이터가 더 남아있었더라도 에러가 터진 시점에 파이프라인 전체가 파괴되므로 뒤쪽 데이터는 절대 발행되지 않고 버려진다.
importreactor.core.publisher.Flux;
publicclassMain {
publicstaticvoidmain(String[] args) throwsInterruptedException {
// onErrorResume: 에러 발생 시 다른 대체 스트림(Publisher)으로 전환하여 구동Flux.just(1, 2, 0, 4, 5)
.map(i -> 10 / i)
.onErrorResume(e -> {
System.out.println("에러 발생 가로챔: " + e.getMessage());
// 💡 에러가 터진 시점에 새로운 대안 스트림을 즉시 생성하여 바톤 터치returnFlux.just(100, 200, 300);
})
.subscribe(
data -> System.out.println("데이터: " + data),
error -> System.out.println("구독 중 에러: " + error.getMessage()),
() -> System.out.println("구독 완료"));
}
}
onErrorResume : 자바 전통 문법의 catch (Exception e)를 리액티브 스트림즈 명세에 맞게 구현한 연산자이다.
onErrorReturn과 마찬가지로 뒤에 대기 중이던 데이터는 영원히 발행되지 않는다.
동적 람다 제어 : 에러 객체를 함수 인자로 직접 전달받는다. 즉, 어떤 에러가 터졌는지 파이프라인 안에서 확인한 뒤 에러 종류에 따라 각각 다른 대체 스트림으로 동적 라우팅을 설계할 수 있다.
importreactor.core.publisher.Flux;
publicclassMain {
publicstaticvoidmain(String[] args) throwsInterruptedException {
// onErrorMap: 발생한 에러 신호를 다른 예외(Exception) 타입으로 변환하여 다운스트림으로 전파Flux.just(1, 2, 0, 4, 5)
.map(n -> 10 / n)
.onErrorMap(ArithmeticException.class,
e -> newIllegalArgumentException("0으로 나눌 수 없습니다.", e))
.subscribe(
System.out::println,
error -> {
System.out.println("에러 발생: " + error.getMessage());
System.out.println("에러 타입: " + error.getClass().getSimpleName());
});
}
}
onErrorMap : onErrorReturn이나 onErrorResume처럼 에러를 막아서 성공 신호로 바꾸는 것이 아니라 어차피 발생할 에러라면 하위 레이어가 이해할 수 있는 형태로 바꿔서 터뜨리는 역할을 수행한다.
에러 상태 유지 : Return이나 Resume은 에러를 정상으로 돌리지만 Map은 여전히 에러를 그대로 고수한다. 변환된 결과물도 결국 Throwable이기 때문에 이 연산자를 사용해도 스트림은 성공 종료되지 않고 최종 에러 종료된다.
조기 종료의 특성 : 원본 스트림에서 뒤에 대기 중이던 데이터는 절대 발행되지 않고 버려진다. 에러의 종류만 바꾼 것이지 에러가 터졌다는 사실 자체는 변함이 없다.
WebFlux만의 문제가 아닌 Application의 회복성을 위한 Retry 정책
importjava.util.concurrent.atomic.AtomicInteger;
importreactor.core.publisher.Mono;
publicclassMain {
publicstaticvoidmain(String[] args) {
// retry: 에러 발생 시, 실패한 원본 파이프라인을 '처음부터 다시 구독(Resubscribe)'하는 연산자AtomicIntegercallCount = newAtomicInteger();
// Mono.defer를 통해 구독이 발생할 때마다 람다 내부 블록이 매번 새로 평가되도록 빌드Mono.defer(() -> {
intcount = callCount.incrementAndGet();
System.out.println("호출 # " + count);
if (count < 3) {
returnMono.error(newRuntimeException("일시적인 에러"));
}
returnMono.just("성공");
})
.retry() // 인자가 없으면 성공할 때까지 무한 재시도 시도
.subscribe(
data -> System.out.println("데이터: " + data),
error -> System.out.println("에러: " + error.getMessage())
);
}
}
retry 연산자는 네트워크 지연이나 일시적인 DB 타임아웃과 같은 일시적 오류가 발생했을 때 회복 탄력성(Resilience)을 극대화하는 역할을 한다.
importjava.time.Duration;
importjava.util.concurrent.atomic.AtomicInteger;
importreactor.core.publisher.Mono;
importreactor.util.retry.Retry;
publicclassMain {
publicstaticvoidmain(String[] args) throwsInterruptedException {
// retryWhen: 단순 재시도를 넘어 횟수, 지연 시간 등 정교한 '재시도 백오프(Backoff) 전략'을 주입하는 연산자AtomicIntegercallCount = newAtomicInteger(0);
longstart = System.currentTimeMillis();
Mono.defer(() -> {
intcount = callCount.incrementAndGet();
longelapsed = System.currentTimeMillis() - start;
System.out.println("callCount: " + count + ", elapsed: " + elapsed + "ms");
if (count < 4) {
returnMono.error(newRuntimeException("일시적 오류 발생"));
}
returnMono.just("성공");
})
.retryWhen(Retry.fixedDelay(5, Duration.ofMillis(500))) // 500ms 간격으로 최대 5번 재시도 정책 수립
.subscribe(
result -> System.out.println("결과: " + result),
error -> System.err.println("최종 오류: " + error.getMessage())
);
// retryWhen의 지연 시간은 내부적으로 별도의 스레드(Parallel 스케줄러)에서 비동기로 돌기 때문에 메인 스레드가 먼저 종료되는 것을 방지Thread.sleep(3000);
}
}
retryWhen은 retry와 목적이 같지만 장애가 발생한 서버에 대해 delay를 주면서 최대로 정한 max attempts까지 똑똑하게 재시도하자는 리액티브 회복 탄력성의 최종 진화형 연산자이다.
Stream의 흐름을 변경하지 않고 데이터의 흐름을 확인하는 Side Effect 패턴
importreactor.core.publisher.Flux;
publicclassMain {
publicstaticvoidmain(String[] args) throwsInterruptedException {
// doOnNext: 데이터가 통과하는 순간을 가로채서 특정 행동(로깅, 디버깅 등)을 수행하는 부수 효과 연산자Flux.just("apple", "banana", "cherry")
.doOnNext(data -> System.out.println("처리 중 1 : " + data)) // 변형 전 확인
.map(String::toUpperCase)
.doOnNext(data -> System.out.println("처리 중 2 : " + data)) // 변형 후 확인
.subscribe(data -> System.out.println("최종 결과 : " + data));
}
}
doOnNext는 값을 반환하지 않는 Consumer를 사용하므로 스트림에 흐르는 데이터 요소의 참조 자체를 다른 객체로 교체하거나 흐름을 제어할 수 없다.
데이터의 타입 변경이나 객체 변환은 map 혹은 flatMap의 책임이다.
importreactor.core.publisher.Flux;
publicclassMain {
publicstaticvoidmain(String[] args) throwsInterruptedException {
// doOnError: 업스트림에서 에러 신호(onError)가 발생하여 다운스트림으로 전파될 때 이를 감지하여 부수 효과를 수행하는 연산자Flux.just(1, 2, 0, 4)
.map(n -> 10 / n)
.doOnError(error -> System.out.println("에러 발생 : " + error.getMessage()))
.subscribe(
data -> System.out.println("데이터 : " + data),
error -> System.out.println("구독자에서 에러 발생 : " + error.getMessage())
);
}
}
doOnError는 리액티브 스트림즈 명세의 종단 신호 중 하나인 onError 시그널이 전파되는 과정에 개입하는 사이드 이펙트 연산자이다.
importjava.time.Duration;
importreactor.core.publisher.Flux;
importreactor.core.publisher.Signal;
publicclassMain {
publicstaticvoidmain(String[] args) throwsInterruptedException {
Flux.just(1, 2, 0, 4)
.map(n -> 10 / n)
// 1. doOnError: 에러 신호(onError)가 다운스트림으로 전파될 때 트리거 (부수 효과)
.doOnError(error -> System.out.println("[doOnError] 에러 감지: " + error.getMessage()))
// 2. doOnSubscribe: 구독 관계가 성립되어 Subscription 객체가 전달될 때 트리거
.doOnSubscribe(subscription -> System.out.println("[doOnSubscribe] 구독 시작"))
// 3. doOnCancel: 다운스트림에서 명시적으로 구독을 취소(cancel())했을 때 트리거
.doOnCancel(() -> System.out.println("[doOnCancel] 구독 취소 발생"))
// 4. doOnEach: onNext, onError, onComplete 등 모든 개별 시그널 전파 시마다 트리거
.doOnEach(signal -> {
Signal<Integer> s = (Signal<Integer>) signal;
System.out.println("[doOnEach] 시그널 타입: " + s.getType() + (s.isOnNext() ? ", 값: " + s.get() : ""));
})
// 5. doFirst: 최하단 subscribe() 호출 직후, 전체 파이프라인 구동 프로세스 최상단에서 트리거
.doFirst(() -> System.out.println("[doFirst] 파이프라인 구독 프로세스 개시 (최초 실행)"))
// 6. doFinally: 정상 종료, 에러 종료, 취소 등 어떤 원인으로든 스트림이 최종 파괴될 때 트리거
.doFinally(signalType -> System.out.println("[doFinally] 스트림 종료 완료. 원인 타입: " + signalType))
// 7. onErrorReturn: 상위에서 전파된 에러 신호를 가로채서 고정값으로 치환 후 onComplete 신호로 변경
.onErrorReturn(-1)
// 최종 구독 프로세스 실행
.subscribe(
data -> System.out.println("[Subscriber] 수신 데이터: " + data),
error -> System.err.println("[Subscriber] 최종 에러 전달: " + error.getMessage()),
() -> System.out.println("[Subscriber] 스트림 최종 성공 완료 (onComplete)")
);
// 비동기 시그널 스케줄링 방어를 위한 메인 스레드 대기Thread.sleep(500);
}
}
doOnSubscribe : 다운스트림이 subscribe()를 호출하여 구독 관계가 성립하고 최상위 Publisher부터 Subscription 객체가 전달되는 시점에 동기식으로 실행된다.
doOnCancel : 다운스트림에서 더 이상 데이터를 받지 않기 위해 Subscription.cancel() 신호를 업스트림으로 역전파할 때 실행된다.
doOnEach : 파이프라인을 통과하는 모든 종류의 리액티브 신호가 발생할 때마다 매번 실행된다.
doFirst : 연산자 체인 상에서 subscribe() 신호가 호출된 직후, 실제 구독 프로세서가 시작되기 전 가장 먼저 실행된다.