728x90
Publisher 생성
@DisplayName("Flux와 Mono를 생성한다")
@Test
void create() {
Flux<String> flux1 = Flux.just("foo", "bar", "foobar");
Flux<String> flux2 = Flux.fromIterable(Arrays.asList("foo", "bar", "foobar"));
Flux<Integer> flux3 = Flux.range(5, 3); // 5, 6, 7
Mono<Object> mono1 = Mono.empty(); // empty여도 타입을 가진다
Mono<String> mono2 = Mono.just("foo");
}
Subscriber를 Subscribe
/**
* lambda interface (consumer)를 사용하여 subscriber를 subscribe한다
*/
@DisplayName("Subscriber를 subscribe한다")
@Test
void subscribe() {
Disposable disposable = Flux.just("foo").subscribe(); // subscribe하고 reactive 흐름을 시작한다
// disposable은 subscription을 참조하여 data가 더 이상 생성 안되도록 cancel 할 수 있다, 생성된 데이터를 clean up한다
Flux.just("foo").subscribe(
a -> System.out.println("consumer")); // 각 생성된 데이터로 특정일을 시킨다
Flux.just("foo").subscribe(
a -> System.out.println("consumer"), // consumer
e -> new RuntimeException(e)); // onError : exception
Flux.just("foo").subscribe(
a -> System.out.println("consumer"), // consumer
e -> new RuntimeException(e), // onError : exeception
new Runnable() { // onComplete : 흐름이 잘 끝나면 특정일을 시킨다
@Override
public void run() {
System.out.println("complete consumer");
}
}
);
// 3.5에서 삭제 -> subscribeWith(subscriber) 사용, 사람들이 request 호출하는 것을 까먹는다고 지웠다고 함ㅎㅎ
Flux.just("foo").subscribe(
a -> System.out.println("consumer"), // consumer
e -> new RuntimeException(e), // onError : exeception
() -> System.out.println("complete consumer"), // onComplete : 흐름이 잘 끝나면 특정일을 시킨다
s -> System.out.println("subscription consumer")// subscribe 시점에서 subscription에게 특정 콜
);
}
- subscribe
- Lambda interface를 사용하여 publisher가 subscriber를 subcribe한다
- 반환되는 disposable 인터페이스는 두가지 일을한다
- 데이터를 생성을 중단
- 기존 생성된 데이터를 clean up
- onError
- Publisher에서 발생한 에러를 핸들링 할 수 있다
- onComplete
- 흐름이 끝났을 때 특정일을 시킬 수 있다
Runabble
혹은Supplier
사용
- subscribe 시점에 subscription에게 특정일을 시킬 수 있다.
- 3.5에서 deprecate 됨
- 사람들이 subscription에 request하는 것을 종종 까먹었다고 한다;;
subscribeWith
로 대체
Subscribe 예제
@DisplayName("subscribe 예제")
@Test
void subscribeExamples() {
Flux<Integer> flux1 = Flux.range(1, 3);
flux1.subscribe();
Flux<Integer> flux2 = Flux.range(1, 3);
flux2.subscribe(i -> System.out.println(i));
Flux<Integer> flux3 = Flux.range(1, 4)
.map(i -> {
if(i <= 3){
return i;
}
throw new RuntimeException("Got to 4");
});
flux3.subscribe(
i -> System.out.println(i),
error -> System.out.println("Error: " + error));
Flux<Integer> flux4 = Flux.range(1, 4);
flux4.subscribe(
i -> System.out.println(i),
error -> System.out.println("Error: " + error), // onError : 에러로 흐름 종료
() -> System.out.println("Done")); // onComplete : 성공적으로 흐름 종료 == Runabble이다 (Supplier)
Flux<Integer> flux5 = Flux.range(1, 15);
flux5.subscribe(
i -> System.out.println(i),
error -> System.out.println("Error: " + error), // onError : 에러로 흐름 종료
() -> System.out.println("Done"), // onComplete : 성공적으로 흐름 종료 == Runabble이다 (Supplier)
subscription -> subscription.request(10)); // subscription에게 10개의 데이터를 요청한다
}
// 1번
1
2
3
// 2번
1
2
3
Error: java.lang.RuntimeException: Got to 4
// 3번
1
2
3
4
Done
// 4번
1
2
3
4
5
6
7
8
9
10
Subscription에게 cancel 요청
@DisplayName("Dispoable로 요청을 cancel한다")
@Test
void cancel() {
Flux<Integer> flux1 = Flux.range(1, 3);
Disposable disposable = flux1.subscribe();
disposable.dispose(); // subscription에게 요청 취소
Disposable.Swap swap = Disposables.swap();
swap.replace(disposable); // 개별적으로 cancel할 때 사용
Disposable.Composite composite = Disposables.composite(disposable);
composite.dispose(); // composite에 포함된 모든 disposable이 바로 취소된다
}
- subscription에게 요청 취소
- Publisher에게 더 이상 데이터를 생성하지 말라고 요청한다
- 하지만 취소가 바로될지는 모른다
- cancel 시그널이 발생하더라도 빠르게 생성된 데이터는 cancel 완료 시그널을 받기 전에 생성완료가 될 수 있다.
Disposable.swap
Disposable.composite(disposable...)
BaseSubscriber
@DisplayName("lambda 대용으로 BaseSubscriber 사용")
@Test
void baseSubscriber() {
Flux.range(1, 4)
.subscribe(new BaseSubscriber<Integer>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
System.out.println("Subscribed");
request(1);
}
@Override
protected void hookOnNext(Integer value) {
System.out.println(value);
request(1);
}
});
}
- BaseSubscriber
- 단일 사용
- 두번째 Publisher에 subscribe되면 첫번째 Publisher에 subscribe된 BaseSubscriber는 취소된다
- 인스턴스를 두번 사용하는 것은 Reactive 흐름에 위배되기 때문이다
- Subscriber의 onNext 메소드는 병렬적으로 호출되어서는 안된다
- hook을 사용하여 Subscriber의 행동을 재정의한다
- 기본적으로 unbouned request를 사용한다
hookOnComplete
hookOnError
hookOnCancel
hookFinally
Backpressure
@DisplayName("Backpressure 테스트")
@Test
void backpressure() {
Flux.range(1, 10)
.doOnRequest(r -> System.out.println("request of " + r))
.subscribe(new BaseSubscriber<Integer>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
request(1); // subscribe가 되면 1개 데이터를 요청한다
}
@Override
protected void hookOnNext(Integer value) {
System.out.println("Cancelling after having received " + value);
cancel(); // 다음 요청에서 cancel 발생
}
});
}
request of 1
Cancelling after having received 1
- Consumer는 Publisher에 subscription.request로 전파하면 backpress한다
- Consumer가 판단하기에 요청이 너무 많으면 backpressure를 조금만 한다
- 현재 요청들의 합은 때론 현재 demand 혹은 pending request라고 부르기도 한다.
- demand는
Long.MAX_VALUE
일때- unbound request를 나타낸다
- 무한대의 요청 == 최대한 빠르게 요청
- backpressure를 사용하지 않는다
- 첫번쨰 요청은 최종 subscriber가 subscription 할 때 발생한다.
- 모든 subscriber를 subscirbe하는 방법은
Long.MAX_VALUE
로 unbounded request를 보내느 것subscribe()
block()
,blockFirst()
,blockLast()
toIterable()
,toStream()
- 원래 요청을 커스터마이즈하는 좋은 방법은 subscribe 시,
BaseSubscriber
와hookOnSubscribe
를 오버라이드 하는 것이 좋다- BaseSubscriber의 hookOnSubscribe는 기본적으로 unbounded request를 보낸다
- 해당 방식을 사용하는 경우 request를 최소한 한번은 보내야 한다
- 적절한 양의 demand를 보낸다
프로그래밍적으로 sequence 생성하기
- Flux와 Mono를 onNext, onComplete, onError 이벤트를 정의하면서 생성해보자
- 해당 메소드들은 이벤트들은 트리거하는 sink라는 API를 노출시킨다
1. generate, 동기적 생성
- generator 함수를 받는다
- 동기적이고 하나하나 emission한다
- 이때 sink는
synchronousSink
를 사용한다 - synchronousSink가 가지는
next
메소드는 한번의 콜백 호출 당 최소 한번 호출된다 - 옵션으로
complete
과error
메소드를 가진다 - 다음으로 무엇을 emit할 것인지 결정하여 state의 일관성을 유지할 수 있다
@DisplayName("generate로 동기적으로 Flux를 생성한다")
@Test
void generate() {
Flux<String> flux = Flux.generate(
() -> 0, // 초기 state = 0
(state, sink) -> {
sink.next("3 x " + state + " = " + 3 * state); // 다음으로 무엇을 emit할 지 결정
if (state == 10) sink.complete(); // 언제 멈출지 결정 (종료)
return state + 1; // 다음 호출에 사용할 새로운 state를 반환
});
flux.subscribe(System.out::println);
}
3 x 0 = 0
3 x 1 = 3
3 x 2 = 6
3 x 3 = 9
3 x 4 = 12
3 x 5 = 15
3 x 6 = 18
3 x 7 = 21
3 x 8 = 24
3 x 9 = 27
3 x 10 = 30
generate(Supplier<S>, BiFunction<S, SynchronousSink<T>>)
Supplier<S>
: 초기 state 값BiFunction<S, SynchronousSink<T>>
: state의 상태를 SynchronousSink의 이벤트 메소드로 결정
sink
- Subscriber에게 동기적으로 하나의 signal을 생성해서 보낸다
next
- emit을 시도한다 (에러가 발생할 수도 있다)
state
- 다음에 사용할 state
complete
- 해당 iteration을 멈춘다
- 뭔가 Reducer의 향기가 풍겨진다... (Redux도...)
혹은 아래처럼 Atomic
을 사용할 수도 있다
@DisplayName("generate로 동기적으로 Flux를 생성한다")
@Test
void generate2() {
Flux<String> flux = Flux.generate(
AtomicLong::new, // 초기 state = 0
(state, sink) -> {
long i = state.getAndIncrement();
sink.next("3 x " + i + " = " + 3 * i); // 다음으로 무엇을 emit할 지 결정
if (i == 10) sink.complete(); // 언제 멈출지 결정 (종료)
return state; // 다음 호출에 사용할 새로운 state를 반환
});
flux.subscribe(System.out::println);
}
혹은 generate의 세번짜 파라미터로 Consumer를 바로 사용할 수 있다
@DisplayName("generate로 동기적으로 Flux를 Consumer와 함께 생성한다")
@Test
void generate3() {
Flux<String> flux = Flux.generate(
AtomicLong::new, // 초기 state = 0
(state, sink) -> {
long i = state.getAndIncrement();
sink.next("3 x " + i + " = " + 3 * i); // 다음으로 무엇을 emit할 지 결정
if (i == 10) sink.complete(); // 언제 멈출지 결정 (종료)
return state; // 다음 호출에 사용할 새로운 state를 반환
}, (state) -> System.out.println("state" + state));
}
2. create, 비동기 멀티스레드
- create는 한번의 라운드에 여러개 스레드로 여러개의 emission을 하고 싶을 때 적합하다
- sink로 FluxSink를 사용한다
- 마찬가지로
next
,complete
,error
메소드를 가진다 - generate와 다르게 state 베이스가 아니다
- 다른 말로 callback에서 멀티스레드 이벤트를 호출 가능하다
3. push, 비동기 싱글 스레드
- push는 generate와 create의 중간이다
- create의 전략을 사용하여 비동기나 backpressure를 관리할 수 있다
- 하지만 오직 하나의 스레드만
next
,complete
,error
를 호출할 수 있다
Threading과 Scheduler
- Mono와 Flux는 지정된 thread에서만 구동되는 것은 아니다
- 대부분의 경우는 이전 operator가 실행된 Thread를 계속 사용한다
- 특별한 경우가 아니고서는
subscribe()
호출이된 스레드에서 operator가 사용된다
아래의 예를 살펴보자
@Test
void newThread() throws InterruptedException {
final Mono<String> mono = Mono.just("hello"); // main 메소드에서 Mono가 생성되었다.
Thread thread = new Thread(() -> mono
.map(msg -> msg + " thread ")
.subscribe(v -> System.out.println(v + Thread.currentThread().getName()))); // 하지만 Thread-0에 mono가 subscribe되었다.
thread.start();
thread.join();
}
hello thread Thread-0
- Mono는 main Thread에서 정의되었다
- 하지만 subscribe는 새로운 Thread 0에 되었다.
- 그러므로 map과 onNext 콜백은 Thread 0에서 구동된다.
- 즉, subscribe 시점의 Thread에 operator들이 동작한다.
- Reactor에서는 실행은 Scheduler에 의해 결정된다
- Scheduler는 ExecutorService와 같이 스케쥴링에 책임을 가진다. 하지만 좀 더 많은 기능을 한다
728x90