Reactive Programming ‐ Fundamentals of WebFlux and Reactor - thought-corner/Backend-PlayGround GitHub Wiki

Reactive Programming - Fundamentals of WebFlux and Reactor

Mono

  • 단 하나의 데이터만 발행하거나, 아예 비어있거나, 에러를 내고 종료되는 Publisher이다.
  • 데이터가 1개 전송되면 그 즉시 onComplete() 신호가 발생하며 스트림이 닫힌다.
Mono<String> mono = Mono.just("안녕하세요");
mono.subscribe(System.out::println);

Flux

  • 0개부터 N개(무한대 포함)의 데이터를 발행할 수 있는 Publisher이다.
  • 데이터가 끝없이 들어올 수 있는, 더 이상 보낼 데이터가 없을 때 비로소 onComplete() 신호가 발생한다.
Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5);
flux.subscribe(System.out::println);

Controller에서 Mono/Flux 반환 타입의 요청 URI를 매핑해 호출하면 발행자(Publisher)의 개념이 되고 내부 이벤트 루프(Event Loop)에서 처리되어 리턴된다.

Mono 기본 사용법

1. Mono 생성 (Creation)

  • Mono.just(data) : 이미 메모리에 계산이 완료되어 존재하는 데이터를 래핑할 때 사용한다. 구독(Subscribe) 시점과 무관하게 생성 시점에 값이 결정된다.
  • Mono.empty() : 반환할 값이 없을 때 사용한다. 기존 동기 프로그래밍의 경우 Optional.empty()null 반환을 대체한다.
  • Mono.defer(Supplier) : 실제 구독이 발생할 때마다 Supplier를 실행하여 새로운 Mono를 생성한다. 데이터 조회 시점의 최신 상태를 반영해야 한다거나 지연 실행이 필요할 때 필수적이다.
  • Mono.fromCallable(Callable) : 외부 연산이나 블로킹 I/O 작업의 결과를 Mono로 가져올 때 자주 사용되며 값이 비어있으면 Mono.empty()를 방출한다.

2. 데이터 가공 (Transformation)

  • 파이프라인을 구축하여 스트림을 타고 흐르는 데이터를 조작한다.
  • map(Function) : 방출된 데이터를 동기적으로 변환한다. 주로 HTTP 응답 DTO에서 특정 필드만 추출하는 등의 단순 변환에 적합하다.
  • flatMap(Function) : 방출된 데이터를 받아 비동기 작업(새로운 Mono 변환)을 수행한 후 결과를 평탄화한다.
  • filter(Predicate) : 조건에 맞는 데이터만 통과시킨다. 조건에 맞지 않으면 빈 Mono가 된다.

3. 예외 및 빈 상태 처리 (Error & Empty Handling)

  • 리액티브 스트림에서는 예외가 발생하면 스트림이 종료된다. 시스템의 무정지 운영을 위해 체인 내에서 에러를 우아하게 복구해야 한다.
  • defaultIfEmpty(T) : Mono가 비어있는 상태로 완료될 경우 대체할 기본값을 제공한다.
  • switchIfEmpty(Mono) : 비어있을 경우, 다른 비동기 흐름으로 전환한다.
  • onErrorReturn(T) : 에러 발생 시 기본값을 반환하고 스트림을 정상 종료 처리한다.
  • onErrorResume(Function) : 에러 발생 시 예외 객체를 분석하여 대체 Mono를 반환한다. 장애 발생 시 Fallback API를 호출하는 등 유연한 대처가 가능하다.

4. 실행 (Subscription)

  • 리액티브 타입은 지연 평가(Lazy Evaluation)를 기반으로 작동하므로, 구독(subscribe())이 발생하기 전까지는 아무런 데이터 흐름도 발생하지 않는다.
  • WebFlux 환경에서의 구독 : 개발자가 명시적으로 subscribe()를 호출하는 경우는 드물고 체이닝된 Mono를 Controller의 반환 값으로 넘겨주게 되면, Spring WebFlux 프레임워크는 적절한 시점에 구독을 처리하고 HTTP 응답으로 변환한다.
public Mono<Long> getMarketClosedPrice(String ticker) {
    return Mono.justOrEmpty(ticker)                                        // 1. 티커가 null이면 Mono.empty() 반환
        .filter(t -> t.length() == 6)                                      // 2. 6자리 티커만 통과 (예: 005930)
        .flatMap(validTicker -> priceRepository.findByTicker(validTicker)) // 3. R2DBC 비동기 조회 (Mono 반환)
        .map(PriceEntity::getClosedPrice)                                  // 4. 엔티티에서 가격 필드만 추출
        .switchIfEmpty(Mono.defer(() -> externalApi.fetchPrice(ticker)))   // 5. DB에 없으면 외부 API 호출
        .onErrorReturn(-1L);                                               // 6. 전체 체인 중 장애 발생 시 -1 반환
}
public class Main {
    public static void main(String[] args) {
        // Mono.just 방식
        Mono<String> justMono = Mono.just(getValue());

        System.out.println("Mono created, but value not yet retrieved.");

        // Mono.fromSupplier 방식
        Mono<String> supplierMono = Mono.fromSupplier(() -> getValue());

        System.out.println("Mono from supplier created, but value not yet retrieved.");

        // 구독(Subscribe)
        supplierMono.subscribe(value -> System.out.println("Received value: " + value));
    }

    static String getValue() {
        System.out.println("Getting value...");
        return "Hello, World!";
    }
}
public class Main {
    public static void main(String[] args) {
        // fromCallable : checked exception을 던질 수 있음
        Mono<String> callableMono = Mono.fromCallable(() -> {
            if (Math.random() > 0.5) {
                throw new Exception("Random exception occurred");
            }
            return "성공";
        });

        callableMono.subscribe(
                result -> System.out.println("결과: " + result),
                error -> System.err.println("에러: " + error.getMessage())
        );
    }
}
public class Main {
    public static void main(String[] args) {
        // defer : Mono 자체의 생성을 지연
        Mono<Long> deferredMono = Mono.defer(() -> {
            System.out.println("생성 중....");
            return Mono.just(System.currentTimeMillis());
        });

        System.out.println("Mono 생성 완료");
        deferredMono.subscribe(time -> System.out.println("첫 번째 구독: " + time));

        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("1초 후에 두 번째 구독");
        deferredMono.subscribe(time -> System.out.println("두 번째 구독: " + time));
    }
}
public class Main {

    static boolean featureEnabled = false;

    public static void main(String[] args) {
        // defer : 구독하는 시점의 featureEnabled 상태에 따라 다른 Mono를 동적으로 생성
        Mono<String> conditionMono = Mono.defer(() -> {
            if (featureEnabled) {
                return Mono.just("Feature is enabled");
            } else {
                return Mono.empty();
            }
        });

        System.out.println("Feature enabled: " + featureEnabled);
        conditionMono.subscribe(
                result -> System.out.println("Result: " + result),
                error -> System.err.println("Error: " + error),
                () -> System.out.println("Completed without emitting any value")
        );

        // 상태 변경
        featureEnabled = true;

        System.out.println("\nFeature enabled: " + featureEnabled);
        conditionMono.subscribe(
                result -> System.out.println("Result: " + result),
                error -> System.err.println("Error: " + error),
                () -> System.out.println("Completed without emitting any value")
        );
    }
}

Flux 기본 사용법

1. Flux 생성 (Creation)

  • 단일 값을 넣는 Mono와 달리, Flux는 컬렉션이나 배열, 심지어 시간 주기를 기반으로도 스트림을 생성할 수 있다.
  • Flux.just(data1, data2...) : 이미 존재하는 여러 개의 데이터를 나열하여 스트림을 생성한다.
  • Flux.fromIterable(Iterable) : List, Set 같은 기존 Java 컬렉션을 Flux 스트림으로 변환한다. 요청으로 들어온 리스트 데이터를 리액티브 파이프라인에 태울 때 가장 많이 사용된다.
  • Flux.range(start, count) : 특정 시작점부터 1씩 증가하는 정수를 지정한 개수만큼 방출한다. 반복문을 대체하는 용도로 자주 쓰인다.
  • Flux.interval(Duration) : 지정된 시간 간격으로 0부터 증가하는 숫자를 무한히 방출한다.

2. 데이터 가공 (Transformation)

  • 여러 데이터가 스트림을 타고 흐르므로, 변환의 '순서'와 '동시성'을 제어하는 것이 중요하다.
  • map(Function) : 각 요소를 동기적으로 변환한다.
  • flatMap(Function) : 각 요소를 비동기적으로 처리하고 결과를 평탄화한다. 내부적으로 쓰레드를 활용해 동시 실행되므로 데이터 방출 순서가 보장되지 않는다. 빠른 병렬 처리가 필요한 경우 사용한다.
  • concatMap(Function) : 앞선 요소의 비동기 처리가 끝나야 다음 요소를 처리하여 순서를 엄격하게 보장한다. 순차적인 처리가 필요할 때 사용하지만 속도는 더 느리다.
  • filter(Predicate) : 조건에 맞는 요소만 흘려보낸다.
  • collectList() : Flux<T>로 흐르는 모든 데이터를 모아 하나의 Mono<List<T>>로 묶어준다.

3. 다건 처리 중 예외 제어 (Error Handling)

  • Flux에서 예외 처리는 주의해야 한다. 스트림 중간에 하나의 데이터라도 에러를 발생시키면 전체 Flux 스트림이 종료되기 때문이다.
  • onErrorResume : flatMap 내부에서 개별 요소에 대해 에러 처리를 하여, 특정 요소의 실패가 전체 스트림을 망가뜨리지 않도록 방어한다.
  • onErrorContinue(BiConsumer) : 에러가 발생한 요소를 로깅하고 버린 뒤, 스트림을 종료하지 않고 다음 요소 처리를 계속 진행한다. 대용량 데이터 배치 처리 시 유용하다.

4. 실행 및 배압 (Subscription & Backpressure)

  • Flux 역시 subscribe()가 호출되어야 데이터 흐름이 시작된다. WebFlux 환경에서 리턴 값으로 넘겨주면 프레임워크가 알아서 구독한다.
  • 백프레셔(Backpressure) : Flux의 고유한 특징으로, 컨슈머가 자신이 처리할 수 있는 양만 프로듀서에게 요청하여 메모리 오버플로우를 방지하는 메커니즘이다. WebFlux에서는 TCP 흐름 제어와 연계되어 자동으로 관리된다.
public Mono<List<PriceDto>> getSemiconductorPrices(List<String> tickers) {
    return Flux.fromIterable(tickers)                                // 1. List<String>을 Flux<String>으로 변환
        .flatMap(ticker -> priceRepository.findByTicker(ticker)      // 2. 비동기 DB 또는 API 조회 (순서 무관, 병렬 처리)
            .onErrorResume(e -> {                                    // 3. 핵심: 특정 종목 조회 실패 시 빈 Mono를 반환하여 전체 스트림 중단 방지
                log.warn("Failed to fetch price for {}", ticker, e);
                return Mono.empty(); 
            })
        )
        .filter(entity -> entity.getClosedPrice() > 0)                              // 4. 가격이 존재하는(0보다 큰) 유효한 데이터만 통과
        .map(entity -> new PriceDto(entity.getTicker(), entity.getClosedPrice()))   // 5. Entity를 DTO로 변환
        .collectList();                                                             // 6. 흩어진 DTO들을 모아 단일 Mono<List<PriceDto>>로 변환하여 API 응답 준비
}
public class Main {
    public static void main(String[] args) {
        Flux<Integer> fromStream = Flux.fromStream(Stream.of(10, 20, 30));
        
        fromStream.subscribe(System.out::println);
    }
}
public class Main {
    public static void main(String[] args) {
        // generate: 상태를 유지하면서 하나씩 생성
        Flux<Integer> generated = Flux.generate(
                () -> 0, // 초기 상태 (State Supplier)
                (state, sink) -> {
                    sink.next(state); // 데이터 발행 (onNext 신호)
                    if (state == 4) {
                        sink.complete(); // 완료 신호 (onComplete 신호)
                    }
                    return state + 1; // 다음 상태 반환 (State BiFunction)
                }
        );

        generated.subscribe(data -> System.out.println("generate: " + data));
    }
}
public class Main {
    public static void main(String[] args) {
        // create: 비동기적이고 다중 데이터 발행이 가능한 스트림 생성
        Flux<String> created = Flux.create(sink -> {
            sink.next("첫번째");
            sink.next("두번째");
            sink.next("세번째");
            sink.complete();
        });

        created.subscribe(System.out::println);
    }
}
public class Main {
    public static void main(String[] args) {
        // 단일 map 연산자 활용
        Flux<String> upperCase = Flux.just("apple", "banana", "cherry")
                .map(String::toUpperCase);

        upperCase.subscribe(System.out::println);

        // map 연산자 체이닝 (함수 합성)
        Flux<Integer> lengths = Flux.just("hello", "world", "reactor")
                .map(String::toUpperCase)
                .map(String::length);

        lengths.subscribe(System.out::println);
    }
}
public class Main {
    public static void main(String[] args) {
        // 1. 비동기/논블로킹 메서드(Mono) 연동 및 언랩
        Flux<String> result = Flux.just("user-1", "user-2", "user-3")
                .flatMap(Main::findUserName);

        result.subscribe(System.out::println);

        // 2. 1대N 평탄화 (Flatting)
        Flux<String> expanded = Flux.just("hello", "world")
                .flatMap(word -> Flux.fromArray(word.split("")));

        expanded.subscribe(System.out::println);
    }

    static Mono<String> findUserName(String userId) {
        return Mono.just("User-Name-" + userId);
    }
}
public class Main {
    public static void main(String[] args) {
        // 1. 단순 숫자 필터링
        Flux<Integer> even = Flux.range(1, 10)
                .filter(i -> i % 2 == 0);
                
        even.subscribe(System.out::println);

        // 2. filter와 map 연산자 체이닝
        Flux<String> result = Flux.just("apple", "avocado", "banana", "apricot", "cherry")
                .filter(s -> s.startsWith("a"))
                .map(String::toUpperCase);

        result.subscribe(System.out::println);
    }
}
  • 구독이 시작되면 파이프라인을 타면서 DB 조회나 외부 API 호출과 같은 I/O 이벤트가 등록되고, Netty의 이벤트 루프가 이를 비동기로 처리한다.
  • 결국 구독의 주체가 "개발자"가 아니라 "프레임워크"로 이관된 것이다.
  • 일반적인 HTTP 요청 - 응답 사이클 내에서는 subscribe() 메서드를 쓸 일이 없고, Fire-and-Forget 패턴의 경우 제한적으로 직접 사용한다.

데이터 스트림 시작을 결정하는 Subscribe와 데이터 처리량을 조율하는 Backpressure

1. Subscribe (구독) : 데이터 스트림의 시작

  • 아무리 복잡한 Mono나 Flux 체인을 만들었다고 하더라도 최종적으로 subscribe()가 호출되지 않으면 아무런 연산도, 데이터 조회도 발생하지 않는다. 이를 지연 실행(Lazy Evaluation)이라고 한다.
  • 리액티브 스트림은 구독자가 데이터를 받을 준비가 되었을 때만 리소스를 사용하도록 설계되었다.
  • Spring WebFlux 환경에서는 개발자가 직접 subscribe()를 호출할 필요가 거의 없다. 컨트롤러에서 Mono 혹은 Flux를 반환하면 프레임워크 내부에 있는 기본 웹 서버(Netty)가 클라이언트에게 응답을 주기 직전에 알아서 subscribe()를 호출하여 흐름을 시작시킨다.

2. Backpressure (배압) : 데이터 처리량 조율

  • 동기식이나 기존 비동기 방식에서는 처리하지 못한 데이터가 메모리(버퍼)에 계속 쌓이다가 결국 OutOfMemory(OOM) 장애로 이어질 수 있다.
  • Backpressure는 컨슈머가 프로듀서에게 현재 처리할 수 있는 만큼만 보내라고 요청하는 역방향 제어 메커니즘이다. 컨슈머가 주도권을 쥐고 흐름을 제어해 시스템 무정지 안정성을 보장한다.
public class BackpressureExample {
    public static void main(String[] args) {
        
        // 1. 프로듀서: 1부터 100까지의 데이터를 준비 (파이프라인 구축)
        Flux<Integer> fastProducer = Flux.range(1, 100);

        // 2. 구독(Subscribe)과 동시에 배압(Backpressure) 로직 적용
        fastProducer.subscribe(new BaseSubscriber<Integer>() {
            
            // 구독이 시작될 때 최초 1회 호출됨
            @Override
            protected void hookOnSubscribe(Subscription subscription) {
                System.out.println("구독 시작! 최초 2개의 데이터를 요청합니다.");
                request(2); // 한 번에 2개만 달라고 프로듀서에게 요청
            }

            // 프로듀서가 데이터를 보내줄 때마다 호출됨
            @Override
            protected void hookOnNext(Integer value) {
                System.out.println("컨슈머 처리 중: " + value);
                
                // 복잡한 비즈니스 로직 (처리 지연 가정)
                try { Thread.sleep(500); } catch (InterruptedException e) {}

                // 현재 처리한 데이터가 짝수면(즉, 2개를 다 처리했으면) 다시 2개를 추가 요청
                if (value % 2 == 0) {
                    System.out.println("--- 2개 처리 완료, 추가로 2개를 더 요청합니다 ---");
                    request(2); 
                }
            }
        });
    }
}

Subscribe의 반환값 Disposable Object를 활용한 스트림 처리 신호 조율 패턴

  • 동기식 프로그래밍에서는 쓰레드를 강제로 중단(Interrupt)하는 것이 위험하지만, 리액티브 프로그래밍에서는 이 Disposable을 통해 안전하고 우아하게(Graceful) 스트림을 취소하고 리소스를 반납할 수 있다.

1. Disposable의 핵심 원리: Cancel 신호의 역전파

  • 유한 스트림(단순 DB 조회 등) : Mono.just나 유한한 Flux는 데이터를 모두 방출하면 자동으로 onComplete 신호를 보내고 스트림을 종료한다.
  • 무한 스트림(실시간 시세 스트리밍, Kafka 리스너 등) : Flux.interval을 사용한 폴링이나 SSE 응답 같은 무한 스트림은 누군가 명시적으로 끊어주지 않으면 메모리를 점유한 채 영원히 실행된다.
  • 반환받은 Disposable 객체의 dispose() 메서드를 호출하면 컨슈머에서 프로듀서 방향으로 취소 신호가 전파된다. 이를 받은 프로듀서는 즉시 데이터 생성을 중단하고 연결된 DB 커넥션이나 네트워크 리소스를 해제한다.
public class Main {
    public static void main(String[] args) {
        // Disposable: 구독의 제어권을 쥐고 있는 객체
        Flux<Integer> flux = Flux.range(1, 5);

        Disposable disposable = flux.subscribe(
                data -> System.out.println("Received: " + data)
        );

        System.out.println("Is disposed: " + disposable.isDisposed());
    }
}
public class Main {
    public static void main(String[] args) {
        // 500ms마다 0부터 시작해 1씩 증가하는 숫자를 무한히 발행하는 Flux
        Flux<Long> interval = Flux.interval(Duration.ofMillis(500)); 

        Disposable disposable = interval.subscribe(System.out::println); // 구독 시작

        System.out.println("isDisposed: " + disposable.isDisposed()); // 구독 상태 확인

        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        disposable.dispose(); // 구독 취소
        System.out.println("isDisposed: " + disposable.isDisposed()); // 구독 상태 확인
    }
}
public class Main {
    public static void main(String[] args) {
        Flux<Integer> flux = Flux.just(1, 2, 3);

        Disposable disposable = flux.subscribe(
                data -> System.out.println("데이터: " + data)
        );

        System.out.println("이미 완료됨: " + disposable.isDisposed());
        
        disposable.dispose(); // 이미 완료된 상태에서 호출해도 문제 없음 (Idempotent)
        
        System.out.println("dispose 후: " + disposable.isDisposed());
    }
}
⚠️ **GitHub.com Fallback** ⚠️