본문 바로가기
Programming/Spring

[리액티브 프로그래밍] Reactor 3 살펴보기

by peter paak 2021. 3. 23.
728x90

Publisher 생성

@DisplayName("Flux와 Mono를 생성한다")
@Test
void create() {
    Flux<String> flux1 = Flux.just("foo", "bar", "foobar");
    Flux<String> flux2 = Flux.fromIterable(Arrays.asList("foo", "bar", "foobar"));
    Flux<Integer> flux3 = Flux.range(5, 3); // 5, 6, 7

    Mono<Object> mono1 = Mono.empty(); // empty여도 타입을 가진다
    Mono<String> mono2 = Mono.just("foo");
}

Subscriber를 Subscribe

/**
 * lambda interface (consumer)를 사용하여 subscriber를 subscribe한다
 */
@DisplayName("Subscriber를 subscribe한다")
@Test
void subscribe() {
        Disposable disposable = Flux.just("foo").subscribe();   // subscribe하고 reactive 흐름을 시작한다
                                                            // disposable은 subscription을 참조하여 data가 더 이상 생성 안되도록 cancel 할 수 있다, 생성된 데이터를 clean up한다

    Flux.just("foo").subscribe(
            a -> System.out.println("consumer"));           // 각 생성된 데이터로 특정일을 시킨다

    Flux.just("foo").subscribe(
            a -> System.out.println("consumer"),            // consumer
            e -> new RuntimeException(e));                  // onError : exception

    Flux.just("foo").subscribe(
            a -> System.out.println("consumer"),            // consumer
            e -> new RuntimeException(e),                   // onError : exeception
            new Runnable() {                                // onComplete : 흐름이 잘 끝나면 특정일을 시킨다
                @Override
                public void run() {
                    System.out.println("complete consumer");
                }
            }
    );

    // 3.5에서 삭제 -> subscribeWith(subscriber) 사용, 사람들이 request 호출하는 것을 까먹는다고 지웠다고 함ㅎㅎ
    Flux.just("foo").subscribe(
            a -> System.out.println("consumer"),            // consumer
            e -> new RuntimeException(e),                   // onError : exeception
            () -> System.out.println("complete consumer"),  // onComplete : 흐름이 잘 끝나면 특정일을 시킨다
            s -> System.out.println("subscription consumer")// subscribe 시점에서 subscription에게 특정 콜
    );
}
  • subscribe
    • Lambda interface를 사용하여 publisher가 subscriber를 subcribe한다
    • 반환되는 disposable 인터페이스는 두가지 일을한다
      1. 데이터를 생성을 중단
      2. 기존 생성된 데이터를 clean up
    • onError
      • Publisher에서 발생한 에러를 핸들링 할 수 있다
    • onComplete
      • 흐름이 끝났을 때 특정일을 시킬 수 있다
      • Runabble 혹은 Supplier 사용
    • subscribe 시점에 subscription에게 특정일을 시킬 수 있다.
      • 3.5에서 deprecate 됨
      • 사람들이 subscription에 request하는 것을 종종 까먹었다고 한다;;
      • subscribeWith로 대체

Subscribe 예제

@DisplayName("subscribe 예제")
@Test
void subscribeExamples() {
    Flux<Integer> flux1 = Flux.range(1, 3);
    flux1.subscribe();

    Flux<Integer> flux2 = Flux.range(1, 3);
    flux2.subscribe(i -> System.out.println(i));

    Flux<Integer> flux3 = Flux.range(1, 4)
            .map(i -> {
                if(i <= 3){
                    return i;
                }
                throw new RuntimeException("Got to 4");
            });
    flux3.subscribe(
           i -> System.out.println(i),
            error -> System.out.println("Error: " + error));

    Flux<Integer> flux4 = Flux.range(1, 4);
    flux4.subscribe(
            i -> System.out.println(i),
            error -> System.out.println("Error: " + error), // onError : 에러로 흐름 종료
            () -> System.out.println("Done"));              // onComplete : 성공적으로 흐름 종료 == Runabble이다 (Supplier)

    Flux<Integer> flux5 = Flux.range(1, 15);
    flux5.subscribe(
            i -> System.out.println(i),
            error -> System.out.println("Error: " + error), // onError : 에러로 흐름 종료
            () -> System.out.println("Done"),               // onComplete : 성공적으로 흐름 종료 == Runabble이다 (Supplier)
            subscription -> subscription.request(10));      // subscription에게 10개의 데이터를 요청한다
}
// 1번
1
2
3

// 2번
1
2
3
Error: java.lang.RuntimeException: Got to 4

// 3번
1
2
3
4
Done

// 4번
1
2
3
4
5
6
7
8
9
10

Subscription에게 cancel 요청

@DisplayName("Dispoable로 요청을 cancel한다")
@Test
void cancel() {
    Flux<Integer> flux1 = Flux.range(1, 3);
    Disposable disposable = flux1.subscribe();
    disposable.dispose(); // subscription에게 요청 취소

    Disposable.Swap swap = Disposables.swap();
    swap.replace(disposable);   // 개별적으로 cancel할 때 사용

    Disposable.Composite composite = Disposables.composite(disposable);
    composite.dispose();    // composite에 포함된 모든 disposable이 바로 취소된다
}
  • subscription에게 요청 취소
  • Publisher에게 더 이상 데이터를 생성하지 말라고 요청한다
  • 하지만 취소가 바로될지는 모른다
  • cancel 시그널이 발생하더라도 빠르게 생성된 데이터는 cancel 완료 시그널을 받기 전에 생성완료가 될 수 있다.
  • Disposable.swap
  • Disposable.composite(disposable...)

BaseSubscriber

@DisplayName("lambda 대용으로 BaseSubscriber 사용")
@Test
void baseSubscriber() {
    Flux.range(1, 4)
            .subscribe(new BaseSubscriber<Integer>() {
                @Override
                protected void hookOnSubscribe(Subscription subscription) {
                    System.out.println("Subscribed");
                    request(1);
                }

                @Override
                protected void hookOnNext(Integer value) {
                    System.out.println(value);
                    request(1);
                }
            });
}
  • BaseSubscriber
    • 단일 사용
    • 두번째 Publisher에 subscribe되면 첫번째 Publisher에 subscribe된 BaseSubscriber는 취소된다
    • 인스턴스를 두번 사용하는 것은 Reactive 흐름에 위배되기 때문이다
      • Subscriber의 onNext 메소드는 병렬적으로 호출되어서는 안된다
    • hook을 사용하여 Subscriber의 행동을 재정의한다
    • 기본적으로 unbouned request를 사용한다
    • hookOnComplete
    • hookOnError
    • hookOnCancel
    • hookFinally

Backpressure

@DisplayName("Backpressure 테스트")
@Test
void backpressure() {
    Flux.range(1, 10)
            .doOnRequest(r -> System.out.println("request of " + r))
            .subscribe(new BaseSubscriber<Integer>() {
                @Override
                protected void hookOnSubscribe(Subscription subscription) {
                    request(1); // subscribe가 되면 1개 데이터를 요청한다
                }

                @Override
                protected void hookOnNext(Integer value) {
                    System.out.println("Cancelling after having received " + value);
                    cancel(); // 다음 요청에서 cancel 발생
                }
            });
}
request of 1
Cancelling after having received 1
  • Consumer는 Publisher에 subscription.request로 전파하면 backpress한다
  • Consumer가 판단하기에 요청이 너무 많으면 backpressure를 조금만 한다
  • 현재 요청들의 합은 때론 현재 demand 혹은 pending request라고 부르기도 한다.
  • demand는 Long.MAX_VALUE 일때
    • unbound request를 나타낸다
    • 무한대의 요청 == 최대한 빠르게 요청
    • backpressure를 사용하지 않는다
  • 첫번쨰 요청은 최종 subscriber가 subscription 할 때 발생한다.
  • 모든 subscriber를 subscirbe하는 방법은 Long.MAX_VALUE로 unbounded request를 보내느 것
    • subscribe()
    • block(), blockFirst(), blockLast()
    • toIterable(), toStream()
  • 원래 요청을 커스터마이즈하는 좋은 방법은 subscribe 시, BaseSubscriberhookOnSubscribe를 오버라이드 하는 것이 좋다
    • BaseSubscriber의 hookOnSubscribe는 기본적으로 unbounded request를 보낸다
    • 해당 방식을 사용하는 경우 request를 최소한 한번은 보내야 한다
    • 적절한 양의 demand를 보낸다

프로그래밍적으로 sequence 생성하기

  • Flux와 Mono를 onNext, onComplete, onError 이벤트를 정의하면서 생성해보자
  • 해당 메소드들은 이벤트들은 트리거하는 sink라는 API를 노출시킨다

1. generate, 동기적 생성

  • generator 함수를 받는다
  • 동기적이고 하나하나 emission한다
  • 이때 sink는 synchronousSink를 사용한다
  • synchronousSink가 가지는 next 메소드는 한번의 콜백 호출 당 최소 한번 호출된다
  • 옵션으로 completeerror메소드를 가진다
  • 다음으로 무엇을 emit할 것인지 결정하여 state의 일관성을 유지할 수 있다
@DisplayName("generate로 동기적으로 Flux를 생성한다")
@Test
void generate() {
    Flux<String> flux = Flux.generate(
            () -> 0,                                                    // 초기 state = 0
            (state, sink) -> {
                sink.next("3 x " + state + " = " + 3 * state);          // 다음으로 무엇을 emit할 지 결정
                if (state == 10) sink.complete();                       // 언제 멈출지 결정 (종료)
                return state + 1;                                       // 다음 호출에 사용할 새로운 state를 반환
            });

    flux.subscribe(System.out::println);
}
3 x 0 = 0
3 x 1 = 3
3 x 2 = 6
3 x 3 = 9
3 x 4 = 12
3 x 5 = 15
3 x 6 = 18
3 x 7 = 21
3 x 8 = 24
3 x 9 = 27
3 x 10 = 30
  • generate(Supplier<S>, BiFunction<S, SynchronousSink<T>>)
    • Supplier<S> : 초기 state 값
    • BiFunction<S, SynchronousSink<T>> : state의 상태를 SynchronousSink의 이벤트 메소드로 결정
  • sink
    • Subscriber에게 동기적으로 하나의 signal을 생성해서 보낸다
  • next
    • emit을 시도한다 (에러가 발생할 수도 있다)
  • state
    • 다음에 사용할 state
  • complete
    • 해당 iteration을 멈춘다
  • 뭔가 Reducer의 향기가 풍겨진다... (Redux도...)

혹은 아래처럼 Atomic 을 사용할 수도 있다

@DisplayName("generate로 동기적으로 Flux를 생성한다")
@Test
void generate2() {
    Flux<String> flux = Flux.generate(
            AtomicLong::new,                                                    // 초기 state = 0
            (state, sink) -> {
                long i = state.getAndIncrement();
                sink.next("3 x " + i + " = " + 3 * i);       // 다음으로 무엇을 emit할 지 결정
                if (i == 10) sink.complete();                       // 언제 멈출지 결정 (종료)
                return state;                                       // 다음 호출에 사용할 새로운 state를 반환
            });

    flux.subscribe(System.out::println);
}

혹은 generate의 세번짜 파라미터로 Consumer를 바로 사용할 수 있다


@DisplayName("generate로 동기적으로 Flux를 Consumer와 함께 생성한다")
@Test
void generate3() {
    Flux<String> flux = Flux.generate(
            AtomicLong::new,                                        // 초기 state = 0
            (state, sink) -> {
                long i = state.getAndIncrement();
                sink.next("3 x " + i + " = " + 3 * i);              // 다음으로 무엇을 emit할 지 결정
                if (i == 10) sink.complete();                       // 언제 멈출지 결정 (종료)
                return state;                                       // 다음 호출에 사용할 새로운 state를 반환
            }, (state) -> System.out.println("state" + state));
}

2. create, 비동기 멀티스레드

  • create는 한번의 라운드에 여러개 스레드로 여러개의 emission을 하고 싶을 때 적합하다
  • sink로 FluxSink를 사용한다
  • 마찬가지로 next, complete, error 메소드를 가진다
  • generate와 다르게 state 베이스가 아니다
  • 다른 말로 callback에서 멀티스레드 이벤트를 호출 가능하다

3. push, 비동기 싱글 스레드

  • push는 generate와 create의 중간이다
  • create의 전략을 사용하여 비동기나 backpressure를 관리할 수 있다
  • 하지만 오직 하나의 스레드만 next, complete, error를 호출할 수 있다

Threading과 Scheduler

  • Mono와 Flux는 지정된 thread에서만 구동되는 것은 아니다
  • 대부분의 경우는 이전 operator가 실행된 Thread를 계속 사용한다
  • 특별한 경우가 아니고서는 subscribe() 호출이된 스레드에서 operator가 사용된다

아래의 예를 살펴보자

@Test
void newThread() throws InterruptedException {

    final Mono<String> mono = Mono.just("hello");  // main 메소드에서 Mono가 생성되었다.

    Thread thread = new Thread(() -> mono
            .map(msg -> msg + " thread ")
            .subscribe(v -> System.out.println(v + Thread.currentThread().getName()))); // 하지만 Thread-0에 mono가 subscribe되었다.

    thread.start();
    thread.join();
}
hello thread Thread-0
  • Mono는 main Thread에서 정의되었다
    • 하지만 subscribe는 새로운 Thread 0에 되었다.
    • 그러므로 map과 onNext 콜백은 Thread 0에서 구동된다.
    • 즉, subscribe 시점의 Thread에 operator들이 동작한다.
  • Reactor에서는 실행은 Scheduler에 의해 결정된다
    • Scheduler는 ExecutorService와 같이 스케쥴링에 책임을 가진다. 하지만 좀 더 많은 기능을 한다
728x90