본문 바로가기
Programming/Spring

[리액티브 프로그래밍] Publisher, Subscriber 그리고 Subscription - 2

by peter paak 2021. 3. 18.
728x90

Publisher, Operator, Subscriber의 용어가 익숙하지 않다면 이전 글인 [리액티브 프로그래밍] Publisher와 Subscriber 을 참고하시기 바랍니다.

  1. Publisher(생산자)가 Subscriber(소비자)를 subscribe(등록)한다.
  2. 동시에 Subscriber(소비자)가 Subscription(전달자)을 onSubscribe(등록)한다
  3. Subscriber(소비자)는 필요할 때 Subscribe(전달자).request(요청)을 통해 Publisher에게 데이터를 요청한다.
  4. Publisher(생산자)는 요청을 받으면 생성한 데이터를 보낸다
  5. SubscriberonNext로 데이터를 받는다.
  6. 모든 요청이 성공적으로 완료되면 onComplete을 호출하고 흐름을 종료한다.
  7. 요청이 실패하면 onError를 호출하고 흐름을 종료한다.

  • Publisher는 여러개의 Subscriber를 subscribe(등록)할 수 있다.

자세히 보기

/**
 * Publisher가 Subscriber를 등록(subscribe)하면 Subscriber는 onSubscribe를 호출하며 Subscription을 받는다. 
 * Subscriber가 Subscription.request로 요청하기 전까지는 아무런 일도 일어나지 않는다
 * Subscription.request로 데이터 요청한 이후:
 * - Subscription.request가 정한 최대치만큼 onNext 호출 가능
 * - 더 이상의 이벤트를 받지 않으면 끝났다는 걸 알리기 위해 onError, onComplete 호출
 * Subscriber가 핸들링 가능하면 Publisher에게 데이터를 얼마든지 요청할 수 있다.
 */
public interface Subscriber<T> {
    /**
     * Publisher.subscribe가 호출되면 비로소 호출되는 메소드
     * Subscription를 파라미터로 받아 request를 호출시킨다.    
     * Subscription.request가 호출되기 전까지는 어떤 데이터도 흐르지 않는다
     * Subscription.request를 호출하는 것은 순전히 Subscriber의 책임이다.
     * 언제든지 여유가 된다면 Subscription.request를 호출할 수 있다.
     * Publisher는 Subscription.request의 응답으로만 notification을 때릴 수 있다.
     */
    public void onSubscribe(Subscription s);

    /**
     * Publisher가 subscription.request요청을 받으면 응답으로 데이터를 받는다
     * T: Publisher가 보낸 데이터
     */
    public void onNext(T t);

    /**
     * 에러로 종료
     * Subscription.request가 호출되어도 더 이상 이벤트를 전송하지 않는다
     */
    public void onError(Throwable t);

    /**
     * 성공적으로 종료
     * Subscription.request가 호출되어도 더 이상 이벤트를 전송하지 않는다
     */
    public void onComplete();
}
/**
 * Subscriber와 같은 라이프 사이클을 가진다.
 * Subscriber가 Publisher에 의해 등록(subscribe)될 때 하나 생성된다.
 * 하나의 Subscriber 당 하나의 Subscription을 가진다.
 * Subscriber가 Publisher에 데이터를 요청할 때 Subscription.request를 사용한다.
 */
public interface Subscription {
    /**
     * Subscriber가 Subscription.request로 Publisher에 데이터를 요청하기 전까지는 아무런 이벤트도 발생하지 않는다
     * 해당 요청은 Subscriber가 여유가 될때 언제든지 가능하다.
     * 만약 스트림이 끝난다면 Publisher는 요청한 데이터보다 적게 보낼 수 있다. 
     * 하지만 Subscription은 반드시 onError나 onComplete을 전달(emit)해야 한다
     * 파라미터 n는 Publisher에게 요청하는 데이터의 개수
     */
    public void request(long n);

    /**
     * Publisher에게 데이터를 그만보내라고 요청
     * 이후 모든 자원을 비운다
     * cancel하더라도 그 이전에 요청한 데이터는 보내질 수 있다.
     */
    public void cancel();
}
/**
 * Publisher는 무한정의 데이터 흐름을 제공한다
 * Subscriber에게 받은 요청만큼 데이터를 보낸다
 * Publisher는 여러개의 Subscriber를 언제든지 등록(subscribe)할 수 있다
 */
public interface Publisher<T> {

    /**
     * Subscriber를 등록한다.
     * 비로소 데이터를 요청할 수 있다.
     * Subscriber를 등록할 때마다 새로운 Subscription(배달원)을 생성한다.
     * 하나의 Subscription(배달원)은 하나의 Subscriber에 등록되어 사용된다.
     * 만약 publisher가 subscription을 요청을 거절하거나 실패되면, onError로 에러를 보낸다
     */
    public void subscribe(Subscriber<? super T> s);
}

구현

Publisher

@Slf4j
public class MyPublisher implements Publisher {

    final ExecutorService executor = Executors.newFixedThreadPool(4);
    private List subscriptions = Collections.synchronizedList(new ArrayList());
    private final CompletableFuture<Object> terminated = new CompletableFuture<>();

    @Override
    public void subscribe(Subscriber subscriber) {
        log.info("Subscription 생성");
        MySubscription subscription = new MySubscription(subscriber, executor, subscriptions, terminated);
        subscriptions.add(subscription);
        subscriber.onSubscribe(subscription);
    }

    public void waitUntilTerminated() throws ExecutionException, InterruptedException {
        terminated.get();
    }
}
  • Publisher는 두가지 책임을 가진다
    1. 데이터 emit
    2. Subscription을 생성하고 Subscriber를 subscribe

Subscription

@Slf4j
public class MySubscription implements Subscription {

    private Subscriber subscriber;

    private final ExecutorService executor;
    private final AtomicInteger value;
    private AtomicBoolean isCanceled;
    private List subscriptions;
    private CompletableFuture terminated;

    public MySubscription(Subscriber subscriber, ExecutorService executor, List subscriptions, CompletableFuture terminated) {
        this.subscriber = subscriber;

        this.executor = executor;
        this.subscriptions = subscriptions;
        this.terminated = terminated;
        this.value = new AtomicInteger();
        this.isCanceled = new AtomicBoolean(false);
    }

    @Override
    public void request(long n) {
        if (isCanceled.get()) {
            return;
        }
        if (n < 0) {
            executor.execute(() -> {
                subscriber.onError(new IllegalArgumentException());
            });
        }
        for (int i = 0; i < n; i++) {
            executor.execute(() -> {
                int v = value.incrementAndGet();
                log.info("아이템 {}을 Subscriber에 전송", v);
                subscriber.onNext(v);
            });
        }
    }


    @Override
    public void cancel() {
        isCanceled.set(true);
        synchronized (subscriptions) {
            subscriptions.remove(this);
            if (subscriptions.size() == 0) {
                shutdown();
            }
        }
    }

    private void shutdown() {
        log.info("shutdown executor...");
        executor.shutdown();
        Executors.newSingleThreadExecutor().submit(() -> {
            log.info("shutdown complete");
            terminated.complete(null);
        });
    }
}
  • Subscription은 두가지 책임을 가진다.
    1. Subscriber가 요청한 데이터를 SubscriberonNext를 통해 보내준다
    2. 요청을 취소한다

Subscriber

@Slf4j
public class MySubscriber implements Subscriber {

    private static final int DEMAND = 3;
    private static final Random RANDOM = new Random();

    private String name;
    private Subscription subscription;
    private int count;

    public MySubscriber(String name) {
        this.name = name;
    }

    @Override
    public void onSubscribe(Subscription subscription) {
        log.info("Subscriber가 subscribe 됨");
        this.subscription = subscription;

        log.info("새로운 아이템 {}을 Subcription에 요청한다", DEMAND);
        count = DEMAND;
        subscription.request(DEMAND);
    }

    @Override
    public void onNext(Object item) {
        log.info("Subscription으로 부터 받은 값 {}", item.toString());

        synchronized (this) {
            count--;
            if (count == 0) {
                if (RANDOM.nextBoolean()) {
                    log.info("새로운 아이템 {}을 Subcription에 요청한다", DEMAND);
                    count = DEMAND;
                    subscription.request(DEMAND);
                }
                else {
                    log.info("Subscription을 취소한다...");
                    count = 0;
                    subscription.cancel();
                }
            }
        }
    }

    @Override
    public void onError(Throwable t) {
        log.info("Subscriber Error!");
    }

    @Override
    public void onComplete() {
        log.info("Complete!");
    }
}
  • Subscriber는 4가지 책임을 가진다.
    1. Publisher에 subscribe된다. (해당 시점부터 Subscription을 통해 Publisher에게 데이터를 요청 할 수 있다)
    2. Subscription으로부터 요청한 데이터를 받고 다음 요청을 Subscription에게 보낸다
    3. 데이터 전송 도중 오류가 발생한 경우 Exception을 발생시킨다
    4. 모든 요청이 완료되면 종료 시킨다

출처

728x90