CHAP17 - Modern-Java-in-Action/Online-Study GitHub Wiki

주제: 리액티브 프로그래밍

RP = Reactive Programming

이 장의 내용

  • 리액티브 프로그래밍을 정의하고 리액티브 매니패스토를 확인함
    • 과거의 잘못된 행적을 솔직히 반성하며 새로운 미래를 위한 구체적 약속을 공개적인 방식으로 책임성을 담아 문서로서 선언하는
  • 애플리케이션 수준, 시스템 수준의 리액티브 프로그래밍
  • 리액티브 스트림, 자바 9플로 API를사용한예제코드
  • 널리 사용되는 리액티브 라이브러리 RxJava 소개
  • 여러 리액티브 스트림을 변환하고 합치는 RxJava 동작 살펴보기
  • 리액티브 스트림의 동작을 시각적으로 문서화하는 마블 다이어그램

왜 리액티브 블라블라 가 필요하게 되었을까?

오늘날 상황이 변했다.

  • 빅데이터
  • 많은 모바일 디바이스와 수천개 멀티코어 프로세서 애플리케이션 배포 환경
  • 1년 내내 가능한 고객의 서비스 요구

4가지 매니페스토 퀴즈

매핑하시오. ( Elastic Responsive Resilient Message-driven )

  • 시스템은 적시에 응답해야 한다.
  • 시스템은 느슨한 결합을 보장하기 위해 구성 요소 간에 비동기 메시지 전달을 사용해야 한다.
  • 시스템은 높은 부하에서도 응답성을 유지해야 한다.
  • 일부 구성 요소가 장애가 발생해도 시스템은 응답을 유지해야 한다.

리액티브 애플리케이션 vs 리액티브 시스템

잘 이해안됨

링크

  • 리액티브 시스템
    • 아키텍처 스타일의 한 종류
    • 애플리케이션을 조립하고 상호소통을 조절
    • 위의 매니페스토를 따라, 작업 부하가 있으면 유동적으로 scale-up 하는 성질을 가짐
    • 주요 속성: 메시지 주도
  • 리액티브 애플리케이션
    • 비교적 짧은 시간 동안만 유지되는 데이터 스트림에 기반한 연산을 수행하며 보통 이벤트 주도로 분류된다.
  • 요약
    • 리액티브 애플리케이션이 모여 시스템이 되는 것 같다.

앞으로 배울 리액티브 프로그래밍 필요한 이유와 일반적 사실들

  • 스레드는 비싸고 귀한 자원이다. (할당하고 해제하는 것도 비싸다)
  • 리액티브 프로그래밍의 프레임워크나 라이브러리는 개발자가 레이스 컨디션, 데드락 같은 같은 저수준의 멀티스레드 문제를 직접 처리할 필요가 없어지면서 비즈니스 요구사항 구현에 더 집중할 수 있다.
  • 스레드 풀을 쪼갤때는 블럭 동작을 넣지 않아야 한다.
    • RxJava, Akka 같은 리액티브 프레임워크는 별도로 지정된 스레드 풀에서 블록 동작을 실행시켜 이 문제를 해결한다.

![image-20220219195249486](D:\0 Google Drive\03 스터디 모임\모던자바\10주차\17장.assets\image-20220219195249486.png)

리액티브 시스템은 여러 애플리케이션이 한 개의 일관적인, 회복할 수 있는 플랫폼을 구성할 수 있게 해줄 뿐 아니라 이들 애플리케이션 중 하나가 실패해도 전체 시스템은 계속 운영될 수 있도록 도와주는 소프트웨어 아키텍처다.

리액티브 스트림과 Flow API

  • 리액티브 스트림 이란?
    • 리액티브 프로그래밍을 하기 위해 사용된다.
    • 무한의 비동기 데이터를 순서대로 그리고 블록하지 않는 역압력을 전제해 처리하는 표준 기술
  • 역압력이란?
    • 발행-구독 프로토콜에서 구독자가 느린 속도로 이벤트를 소비하면서 문제가 발생하지 않도록 보장하는 장치
    • 사용함으로써
      • 이벤트를 잃어버리는 문제 해결 가능
      • 이벤트 수신을 늦추는 것, 가능한 수신량 알림기능, 남은일 처리 예측 시간 알림과 같은 기능으로 업스트림 구독자에게 알릴수 있다.
      • 비동기 API를 이용하면 하드웨어 사용률을 극대화할 수 있지만 느린 다운스트림 컴포넌트에 너무 큰 부하를 줄 가능성도 생긴다. 역압력은 이 문제도 해결한다.
  • 회사별 리액티브 스트림 자체 구현 사례
    • 라이트밴드 - Akka Stream
    • 넷플릭스 - RxJava
    • 리액터 - Pivotal
    • 레드햇 - Vert.x
  • Flow API란?
    • 자바에서 위 4개를 기반으로 최소 기능 집합으로만 재정의한 표준 API = Flow API

Flow 클래스 소개

Flow 클래스가 포함하는 4개 인터페이스

// Publisher: 이벤트 발행 수행
@FunctionalInterface
public interface Publisher<T> {
    void subscribe(Subscriber<? super T> s);
}

// Subscriber: 자신을 Publisher 에게 등록 요청해 이벤트 소비
// 호출 순서 : onSubscribe -> onNext (여러번 호출 가능) -> onError 또는 onComplete 
public interface Subscriber<T> {
    void onSubscribe(Subscription s);
    void onNext(T t);
    void onError(Throwable t);
    void onComplete();
}

// Subscription: Publisher와 Subscriber 사이의 제어 흐름, 역압력을 관리한다.
public interface Subscription {
    void request(long n);
    void cancel();
}

// Processor: 리액티브 스트림에서 처리하는 이벤트를 가공, 변환할 때 사용된다. 
// 들어오는 메시지를 변형시켜 다음 구독자 한테 넘기기 위해 사용. (중간자 역할이며, 구독자와 발행자 두 역할 수행)
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> { }
  • 언제 Subscriber의 onSubscribe()와 onNext()가 사용될까?
    • onSubscribe(): Subscriber가 Publisher에 자신을 등록할 때 Publisher는 처음으로 onSubscribe 메서드를 호출해 Subscription 객체를 전달할 수 있다.
    • onNext(): Publisher가 새로운 이벤트를 생성할 때마다 호출된다.
  • Subscription의 request()와 cancel() 메서드의 역할은?
    • request(): Publisher에게 주어진 개수의 이벤트를 처리할 준비가 되었음을 알리는데 사용
    • cancel(): 구독 취소 요청
  • 작동 요약

샘플 애플리케이션 만들어보기

온도 정보 주기적으로 받아볼 수 있게 하는 애플리케이션을 만들어 보자

리액티브 애플리케이션 생명 주기 확인하기

![image-20220219203502517](D:\0 Google Drive\03 스터디 모임\모던자바\10주차\17장.assets\image-20220219203502517.png)

  • 여기 코드에선
    • Publisher: 팩토리 메소드로 바로 생성
    • Subscriber: TempSubscriber
    • Subscription: TempSubscription
    • 기타
      • TempInfo: 온도 정보 담는 객체

특정 도시 온도 정보 구독 하기

import java.util.concurrent.Flow.*;
public class Main {
    public static void main( String[] args ) {
        // Subscriber를 인수로 받아 Subscriber의 onSubscribe 메서드를 호출
        getTemperatures( "New York" ).subscribe( new TempSubscriber() );
    }
    private static Publisher<TempInfo> getTemperatures( String town ) {
        // 아래 코드 익숙하지가 않네요...
        return subscriber -> subscriber.onSubscribe(
            new TempSubscription( subscriber, town ) );
    }
}

온도 정보를 받아볼 예정인 구독자

import java.util.concurrent.Flow.*;
public class TempSubscriber implements Subscriber<TempInfo> {
    private Subscription subscription;
    @Override
    public void onSubscribe( Subscription subscription ) {
        this.subscription = subscription; // 구독 저장
        subscription.request( 1 ); // 1번째 요청
    }
    @Override
    public void onNext( TempInfo tempInfo ) {
        // Subscription이 전달한 온도를 출력하고 새 레포트를 다시 요청
        System.out.println( tempInfo );
        subscription.request( 1 );
    }
    @Override
    public void onError( Throwable t ) {
        System.err.println(t.getMessage());
    }
    @Override
    public void onComplete() {
        System.out.println("Done!");
    }
}

온도 정보를 받아볼 수 있는 구독권

import java.util.concurrent.Flow.*;
public class TempSubscription implements Subscription {
    private final Subscriber<? super TempInfo> subscriber;
    private final String town;
    public TempSubscription( Subscriber<? super TempInfo> subscriber,
                            String town ) {
        this.subscriber = subscriber;
        this.town = town;
    }
    @Override
    public void request( long n ) {
        // 요청한 이벤트 수 만큼 처리해준다 
        for (long i = 0L; i < n; i++) {
            try {
                subscriber.onNext( TempInfo.fetch( town ) );
            } catch (Exception e) {
                subscriber.onError( e );
                break;
            }
        }
    }
    @Override
    public void cancel() {
        subscriber.onComplete();
    }
}

온도 정보 객체

import java.util.Random;
public class TempInfo {
    public static final Random random = new Random();
    private final String town;
    private final int temp;
    public TempInfo(String town, int temp) {
        this.town = town;
        this.temp = temp;
    }
    // 특정 타운의 온도를 리턴
    public static TempInfo fetch(String town) {
        if (random.nextInt(10) == 0)
            throw new RuntimeException("Error!");
        return new TempInfo(town, random.nextInt(100));
    }
    @Override
    public String toString() {
        return town + " : " + temp;
    }
    public int getTemp() {
        return temp;
    }
    public String getTown() {
        return town;
    }
}

결과 - 뉴욕의 온도를 네 번 성공적으로 전달했지만 다섯 번째에 에러가 발생

New York :: 44
New York :: 68
New York :: 95
New York :: 30
Error!

지금까지 개발한 코드에 작은 문제가 있다. 퀴즈를 보고 생각해보자.

퀴즈

지금까지 개발한 코드에 작은 문제가 있다. 하지만 이 문제는 Tempinfo 팩토리 메서드 내에서 에러를 임의로 발생시키는 코드 때문에 감춰진 상태다. 임의로 에러를 발생시키는 코드를 없앤 다음 main을 오래 실행하면 어떤 일이 일어날까?

.
.
.
.
.
.
.
.

해결 방법

  • Executor를 TempSubscription으로 추가한 다음 다른 스레드에서 TempSubscriber로 세 요소를 전달한다.
  • 이것 이해 잘 안된다.
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class TempSubscription implements Subscription {
    private static final ExecutorService executor =
        Executors.newSingleThreadExecutor();
    @Override
    public void request( long n ) {
        // 다른스레드에서 다음요소를 구독자에게 보낸다.
        executor.submit( () -> {
            for (long i = 0L; i < n; i++) {
                try {
                    subscriber.onNext( TempInfo.fetch( town ) );
                } catch (Exception e) {
                    subscriber.onError( e );
                    break;
                }
            }
        });
    }
}

Processor 데이터 변환하기

Processor 인터페이스의 사용법도 확인하자

  • 목적: Publisher를 구독한 다음 수신한 데이터를 가공해 다시 제공하는 것
  • Subscriber 인터페이스를 구현하는 다른 모든 메서드는 단순히 수신한 모든 신호를 업스트림 Subscriber로 전달하며 Publisher의 subscribe 메서드는 업스트림 Subscriber를 Processor로 등록하는 동작을 수행한다.

Main: Publisher를 만들고 TempSubscriberB 구독시킴

import java.util.concurrent.Flow.*;
public class Main {
    public static void main( String[] args ) {
        getCelsiusTemperatures( "New York" )
            .subscribe( new TempSubscriber() );
    }
    public static Publisher<TempInfo> getCelsiusTemperatures(String town) {
        return subscriber -> {
            TempProcessor processor = new TempProcessor();
            processor.subscribe( subscriber );
            processor.onSubscribe( new TempSubscription(processor, town) );
        };
    }
}

import java.util.concurrent.Flow.*;
// 화씨를 섭씨로 변환
public class TempProcessor implements Processor<TempInfo, TempInfo> {
    private Subscriber<? super TempInfo> subscriber;
    @Override
    public void subscribe( Subscriber<? super TempInfo> subscriber ) {
        this.subscriber = subscriber;
    }
    
    // 로직을 포함하는 유일한 메서드
    @Override
    public void onNext( TempInfo temp ) {
        // 섭씨로 변환한 다음 TempInfo 를 다시 전송
        subscriber.onNext( new TempInfo( temp.getTown(),
                                        (temp.getTemp() - 32) * 5 / 9) ); 
    }
    
    @Override
    public void onSubscribe( Subscription subscription ) {
        // 업스트림 구독자에 전달 
        subscriber.onSubscribe( subscription );
    }
    @Override
    public void onError( Throwable throwable ) {
        // 업스트림 구독자에 전달 
        subscriber.onError( throwable );
    }
    @Override
    public void onComplete() {
        // 업스트림 구독자에 전달 
        subscriber.onComplete();
    }
}

실행 코드

New York : 10
New York : -12
New York : 23
Error!

왜 Flow API 구현을 제공하지 않는가?

  • Flow API가 구현을 재공하지 않아, 우리가 직접 인터페이스를 구현했다.
  • 다른 사례를 보면, 자바는 List의 구현인 ArrayList를 제공한다.
  • 왜 구현 제공을 안할까?
    • 이유: API를 만들 당시 Akka, RxJava 등 다양한 리액티브 스트림의 자바 코드 라이브러리가 이미 존재했기 때문이다
    • 즉, 이 모든 기능의 공통 부분만 봅아내 표준화 작업을 한 것이다.

RxJava 사용하기

  • RxJava: 리액티브 라이브러리의 한 종류

  • RxJava 는 Flow.Publisher 를 구현하는 두 클래스를 제공

    • io.reactivex.Flowable
      • Pull 기반 역압력 기능 포함
      • 이미 Flow API를 통해 역압력을 확인했으니 아래선 생략
    • io.reactivex.Observable
      • 역압력 지원 X
      • 단순한 마우스 움직임 같은 사용자 인터페이스 이벤트에 적합
        • 이유: 마우스 움직임을 느리게 하거나 멈출 수 없듯이 역압력을 적용할 수 없기 때문
  • Flow API 와 비교

    • Publisher 역할: Observable

      • 옵저버블은 역압력 지원을 안하므로, request() 메서드를 사용할 필요 없다.
      • Subscription 클래스 이용이 필요 없다는거 같음.
    • Subscriber 역할: Observer

      • 옵저버는 추가로 Disposable 인수를 갖는다.

      • Observer 인터페이스

        public interface Observer<T> {
            void onSubscribe(Disposable d);
            void onNext(T t);
            void onError(Throwable t);
            void onComplete();
        }
  • 좋은 시스템 아키텍처를 위한 TIP

    • 세부 사항을 노출하지 말자.
      • 즉, Observable의 추가 구조가 필요한 상황에서만 Observable 을 사용하고 그렇지 않으면 Publisher의 인터페이스를 사용하는 것이 좋다.
      • 비슷한 사례
        • 전달하는 값이 ArrayList 임을 알지만 파라미터 형식을 List로 설정함으로 구현 세부사항을 밖으로 노출하지 않을 수 있다.
        • 이럼으로써, LinkedList로 적용 가능하다.
        • 다형성
  • 마블 다이어그램

    • 모형들을 수평선에 표시해서 리액티브 스트림의 흐름을 보여준다.

![image-20220220002256504](D:\0 Google Drive\03 스터디 모임\모던자바\10주차\17장.assets\image-20220220002256504.png)

지금부터는 RxJava의 리액티브 스트림의 구현을 이용해서 온도 보고 시스템을 정의해보자.

Observable 만들고 사용하기

Observable, Flowable 클래스는 다양한 종류의 리액티브 스트림을 편리하게 만들 수 있도록 여러 팩토리 메서드를 제공한다.

  • just(): 한 개 이상의 요소를 이용해 이를 방출하는 Observable로 변환한다.
  • interval(): 특정 속도로 이벤트를 방출하는 상황에 유용하다.

just() 사용한 간단한 예제

// Observable의 구독자는 onNext("first"), onNext("second"), onComplete()의 순서로 메시지를 받는다.
Observable<String> strings = Observable.just( "first", "second" );
Disposable disposable = strings.subscribe(System.out::println); // 구독하기 
disposable.dispose(); // 구독 취소 
System.out.println(disposable.isDisposed());

interval() 사용 예

  • 이 코드는 프린팅이 안되고 종료되버린다.
    • 이유: 매 초마다 정보를 발행하는 Observable이 RxJava의 연산 스레드 풀 즉 데몬 스레드에서 실행되기 때문프린팅이 안된다.
    • 해결: blockingSubscribe() 사용하기
// main 프로그램은 실행하자 마자 따로 실행할 코드가 없으므로 바로 종료되고
// 프로그램이 종료되었으므로 어떤 결과를 출력하기도 전에 데몬 스레드도 종료되면서
// 이런 현상이 일어난다ㄴ
Observable<Long> onePerSec = Observable.interval(1, TimeUnit.SECONDS);
onePerSec.subscribe(i -> System.out.println(TempInfo.fetch( "New York" )));
// onePerSec.blockingSubscribe(i -> System.out.println(TempInfo.fetch( "New York" )));

예제의 난이도를 높여보자.

온도를 직접 출력하지 않고 사용자에게 팩토리 메서드를 제공해 매 초마다 온도를 방출하는 Observable을 반환해보자.

main 코드

// getTemperature 메서드가 반환하는 Observable에  TempObserver 를 구독시킨다.
public static void main(String[] args) {
    // 매초마다 뉴욕 온도 방출하는 Oberserverble 생성
    Observable<TempInfo> observable = getTemperature( "New York" );
    // Observable에 가입해서 온도 출력
    observable.blockingSubscribe( new TempObserver() );
}
  • 순서 정리
    • Observer를 소비하는 함수로부터 Observable 만들기
    • 매초 마다 무한으로 증가하는 일련의 long 값을 방출하는 Observerble
    • 소비된 Observer가 폐기되지 않았으면 어떤 작업을 수행 (이전 에러)
    • 온도를 다섯번 보고 했으면 옵저버를 완료하고 스트림 종료
    • 아니면 온도를 옵저버로 보고
public static Observable<TempInfo> getTemperature(String town) {
    return Observable.create(emitter ->
                             Observable.interval(1, TimeUnit.SECONDS)
                             .subscribe(i -> {
                                 if (!emitter.isDisposed()) {
                                     // 편의상 다섯 번만
                                     if ( i >= 5 ) {
                                         emitter.onComplete();
                                     } else {
                                         try {
                                             emitter.onNext(TempInfo.fetch(town));
                                         } catch (Exception e) {
                                             emitter.onError(e);
                                         }
                                     }
                                 }}));
}

수신한 온도를 출력하는 Observer

  • 역압력을 제공하지 않으니, request() 메서드가 필요 없어 단순하다.
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
public class TempObserver implements Observer<TempInfo> {
    @Override
    public void onComplete() {
        System.out.println( "Done!" );
    }
    @Override
    public void onError( Throwable throwable ) {
        System.out.println( "Got problem: " + throwable.getMessage() );
    }
    @Override
    public void onSubscribe( Disposable disposable ) {
    }
    @Override
    public void onNext( TempInfo tempInfo ) {
        System.out.println( tempInfo );
    }
} 

결과

New York : 69
New York : 26
New York : 85
New York : 94
New York : 29
Done!

RxJava 예제를 조금 더 발전시켜서 한 개 이상의 리액티브 스트림을 다루는 방법을 살펴보자.

RxJava 고급지게 활용해보기

Observable이 방출하는 요소를 조작하는 다양한 방법을 확인해보자.

RxJava의 장점: 스트림을 합치고, 만들고, 거르는 등의 풍부한 기능 사용 가능

map() 사례

// 섭씨를 화씨로
public static Observable<TempInfo> getCelsiusTemperature(String town) {
    return getTemperature( town )
        .map( temp -> new TempInfo( temp.getTown(),
                                   (temp.getTemp() - 32) * 5 / 9) );
}

filter() 사례

// 필터에 만족하는 Observable들만 가져온다. 
// 아래는 동상에 걸릴 위험이 있을 때 알려주는 경고 시스템 사례 
public static Observable<TempInfo> getNegativeTemperature(String town) {
    return getCelsiusTemperature( town )
        .filter( temp -> temp.getTemp() < 0 ); // 온도가 섭씨 0도 이하일 때만
}

세 도시의 온도를 출력하는 Main 클래스

  • getCelsiusTemperatures(): 여러 도시에서 온도를 방출하는 Observable을 가질 수 있도록 처리되었다.
public class Main {
    public static void main(String[] args) {
        Observable<TempInfo> observable = getCelsiusTemperatures(
            "New York", "Chicago", "San Francisco" );
        observable.blockingSubscribe( new TempObserver() );
    }
}
  • merge 메서드는 Iterable을 인수로 받아 마치 한 개의 Observable 처럼 동작하도록 결과를 합친다.
public static Observable<TempInfo> getCelsiusTemperatures(String... towns) {
    return Observable.merge(Arrays.stream(towns)
                            .map(TempObservable::getCelsiusTemperature)
                            .collect(toList()));
}

결과

New York : 21
Chicago : 6
San Francisco : -15
New York : -3
Chicago : 12
San Francisco : 5
Got problem: Error!

merge 마블 다이어그램

![image-20220220002712153](D:\0 Google Drive\03 스터디 모임\모던자바\10주차\17장.assets\image-20220220002712153.png)

정리

  • 리액티브 프로그래밍의 사상은 이미 오래전에 나왔지만 최근에서야 각광받는 중이다.
  • 리액티브 소프트웨어는 4 가지 속성 (반응성, 회복성, 탄력성, 메시지 주도) 을 가져야 한다.
  • 리액티브 시스템과 리액티브 애플리케이션의 개념 차이를 알자.
  • 리액티브 스트림에서 역압력은 중요하다. (구독자-발행자 속도 차이해결)
  • RxJava는 리액티브 프로그래밍 도구 중 대표적이다. (강력한 연산자들 - filter, map 등)
  • 표준화된 자바 Flow API 가 있다.
⚠️ **GitHub.com Fallback** ⚠️