728x90
Publisher, Operator, Subscriber의 용어가 익숙하지 않다면 이전 글인 [리액티브 프로그래밍] Publisher와 Subscriber 을 참고하시기 바랍니다.
- Publisher(생산자)가 Subscriber(소비자)를 subscribe(등록)한다.
- 동시에 Subscriber(소비자)가 Subscription(전달자)을 onSubscribe(등록)한다
- Subscriber(소비자)는 필요할 때 Subscribe(전달자).request(요청)을 통해 Publisher에게 데이터를 요청한다.
- Publisher(생산자)는 요청을 받으면 생성한 데이터를 보낸다
- Subscriber는
onNext
로 데이터를 받는다. - 모든 요청이 성공적으로 완료되면
onComplete
을 호출하고 흐름을 종료한다. - 요청이 실패하면
onError
를 호출하고 흐름을 종료한다.
- Publisher는 여러개의 Subscriber를 subscribe(등록)할 수 있다.
자세히 보기
/**
* Publisher가 Subscriber를 등록(subscribe)하면 Subscriber는 onSubscribe를 호출하며 Subscription을 받는다.
* Subscriber가 Subscription.request로 요청하기 전까지는 아무런 일도 일어나지 않는다
* Subscription.request로 데이터 요청한 이후:
* - Subscription.request가 정한 최대치만큼 onNext 호출 가능
* - 더 이상의 이벤트를 받지 않으면 끝났다는 걸 알리기 위해 onError, onComplete 호출
* Subscriber가 핸들링 가능하면 Publisher에게 데이터를 얼마든지 요청할 수 있다.
*/
public interface Subscriber<T> {
/**
* Publisher.subscribe가 호출되면 비로소 호출되는 메소드
* Subscription를 파라미터로 받아 request를 호출시킨다.
* Subscription.request가 호출되기 전까지는 어떤 데이터도 흐르지 않는다
* Subscription.request를 호출하는 것은 순전히 Subscriber의 책임이다.
* 언제든지 여유가 된다면 Subscription.request를 호출할 수 있다.
* Publisher는 Subscription.request의 응답으로만 notification을 때릴 수 있다.
*/
public void onSubscribe(Subscription s);
/**
* Publisher가 subscription.request요청을 받으면 응답으로 데이터를 받는다
* T: Publisher가 보낸 데이터
*/
public void onNext(T t);
/**
* 에러로 종료
* Subscription.request가 호출되어도 더 이상 이벤트를 전송하지 않는다
*/
public void onError(Throwable t);
/**
* 성공적으로 종료
* Subscription.request가 호출되어도 더 이상 이벤트를 전송하지 않는다
*/
public void onComplete();
}
/**
* Subscriber와 같은 라이프 사이클을 가진다.
* Subscriber가 Publisher에 의해 등록(subscribe)될 때 하나 생성된다.
* 하나의 Subscriber 당 하나의 Subscription을 가진다.
* Subscriber가 Publisher에 데이터를 요청할 때 Subscription.request를 사용한다.
*/
public interface Subscription {
/**
* Subscriber가 Subscription.request로 Publisher에 데이터를 요청하기 전까지는 아무런 이벤트도 발생하지 않는다
* 해당 요청은 Subscriber가 여유가 될때 언제든지 가능하다.
* 만약 스트림이 끝난다면 Publisher는 요청한 데이터보다 적게 보낼 수 있다.
* 하지만 Subscription은 반드시 onError나 onComplete을 전달(emit)해야 한다
* 파라미터 n는 Publisher에게 요청하는 데이터의 개수
*/
public void request(long n);
/**
* Publisher에게 데이터를 그만보내라고 요청
* 이후 모든 자원을 비운다
* cancel하더라도 그 이전에 요청한 데이터는 보내질 수 있다.
*/
public void cancel();
}
/**
* Publisher는 무한정의 데이터 흐름을 제공한다
* Subscriber에게 받은 요청만큼 데이터를 보낸다
* Publisher는 여러개의 Subscriber를 언제든지 등록(subscribe)할 수 있다
*/
public interface Publisher<T> {
/**
* Subscriber를 등록한다.
* 비로소 데이터를 요청할 수 있다.
* Subscriber를 등록할 때마다 새로운 Subscription(배달원)을 생성한다.
* 하나의 Subscription(배달원)은 하나의 Subscriber에 등록되어 사용된다.
* 만약 publisher가 subscription을 요청을 거절하거나 실패되면, onError로 에러를 보낸다
*/
public void subscribe(Subscriber<? super T> s);
}
구현
Publisher
@Slf4j
public class MyPublisher implements Publisher {
final ExecutorService executor = Executors.newFixedThreadPool(4);
private List subscriptions = Collections.synchronizedList(new ArrayList());
private final CompletableFuture<Object> terminated = new CompletableFuture<>();
@Override
public void subscribe(Subscriber subscriber) {
log.info("Subscription 생성");
MySubscription subscription = new MySubscription(subscriber, executor, subscriptions, terminated);
subscriptions.add(subscription);
subscriber.onSubscribe(subscription);
}
public void waitUntilTerminated() throws ExecutionException, InterruptedException {
terminated.get();
}
}
- Publisher는 두가지 책임을 가진다
- 데이터 emit
Subscription
을 생성하고Subscriber
를 subscribe
Subscription
@Slf4j
public class MySubscription implements Subscription {
private Subscriber subscriber;
private final ExecutorService executor;
private final AtomicInteger value;
private AtomicBoolean isCanceled;
private List subscriptions;
private CompletableFuture terminated;
public MySubscription(Subscriber subscriber, ExecutorService executor, List subscriptions, CompletableFuture terminated) {
this.subscriber = subscriber;
this.executor = executor;
this.subscriptions = subscriptions;
this.terminated = terminated;
this.value = new AtomicInteger();
this.isCanceled = new AtomicBoolean(false);
}
@Override
public void request(long n) {
if (isCanceled.get()) {
return;
}
if (n < 0) {
executor.execute(() -> {
subscriber.onError(new IllegalArgumentException());
});
}
for (int i = 0; i < n; i++) {
executor.execute(() -> {
int v = value.incrementAndGet();
log.info("아이템 {}을 Subscriber에 전송", v);
subscriber.onNext(v);
});
}
}
@Override
public void cancel() {
isCanceled.set(true);
synchronized (subscriptions) {
subscriptions.remove(this);
if (subscriptions.size() == 0) {
shutdown();
}
}
}
private void shutdown() {
log.info("shutdown executor...");
executor.shutdown();
Executors.newSingleThreadExecutor().submit(() -> {
log.info("shutdown complete");
terminated.complete(null);
});
}
}
- Subscription은 두가지 책임을 가진다.
Subscriber
가 요청한 데이터를Subscriber
의onNext
를 통해 보내준다- 요청을 취소한다
Subscriber
@Slf4j
public class MySubscriber implements Subscriber {
private static final int DEMAND = 3;
private static final Random RANDOM = new Random();
private String name;
private Subscription subscription;
private int count;
public MySubscriber(String name) {
this.name = name;
}
@Override
public void onSubscribe(Subscription subscription) {
log.info("Subscriber가 subscribe 됨");
this.subscription = subscription;
log.info("새로운 아이템 {}을 Subcription에 요청한다", DEMAND);
count = DEMAND;
subscription.request(DEMAND);
}
@Override
public void onNext(Object item) {
log.info("Subscription으로 부터 받은 값 {}", item.toString());
synchronized (this) {
count--;
if (count == 0) {
if (RANDOM.nextBoolean()) {
log.info("새로운 아이템 {}을 Subcription에 요청한다", DEMAND);
count = DEMAND;
subscription.request(DEMAND);
}
else {
log.info("Subscription을 취소한다...");
count = 0;
subscription.cancel();
}
}
}
}
@Override
public void onError(Throwable t) {
log.info("Subscriber Error!");
}
@Override
public void onComplete() {
log.info("Complete!");
}
}
- Subscriber는 4가지 책임을 가진다.
Publisher
에 subscribe된다. (해당 시점부터Subscription
을 통해Publisher
에게 데이터를 요청 할 수 있다)Subscription
으로부터 요청한 데이터를 받고 다음 요청을Subscription
에게 보낸다- 데이터 전송 도중 오류가 발생한 경우 Exception을 발생시킨다
- 모든 요청이 완료되면 종료 시킨다
출처
728x90