Java_Pub Sub_Api - kuyeol/Document GitHub Wiki

πŸ“˜ μ„€λͺ…

μ„œλ‘œ μ—°κ΄€λœ μΈν„°νŽ˜μ΄μŠ€λ“€κ³Ό 정적 λ©”μ„œλ“œλ“€μ€ Publisherκ°€ μƒμ„±ν•œ ν•­λͺ©μ„ ν•˜λ‚˜ μ΄μƒμ˜ Subscriberκ°€ μ†ŒλΉ„ν•˜λ„λ‘ μ„€κ³„λœ 흐름 μ œμ–΄ν˜• ꡬ성 μš”μ†Œ(flow-controlled components) λ₯Ό μ •μ˜ν•©λ‹ˆλ‹€.
각 SubscriberλŠ” Subscription을 톡해 κ΄€λ¦¬λ©λ‹ˆλ‹€.

이 μΈν„°νŽ˜μ΄μŠ€λ“€μ€ Reactive Streams λͺ…세에 ν•΄λ‹Ήν•˜λ©°, λ™μ‹œμ„± 및 λΆ„μ‚° 비동기 ν™˜κ²½ λͺ¨λ‘μ—μ„œ μ‚¬μš©λ  수 μžˆμŠ΅λ‹ˆλ‹€.
λͺ¨λ“  7개의 λ©”μ„œλ“œλŠ” void λ°˜ν™˜ν˜•μ„ κ°€μ§€λ©°, 일방ν–₯ λ©”μ‹œμ§€(one-way message) μŠ€νƒ€μΌλ‘œ λ™μž‘ν•©λ‹ˆλ‹€.

톡신은 κ°„λ‹¨ν•œ ν˜•νƒœμ˜ 흐름 μ œμ–΄(flow control)에 μ˜μ‘΄ν•˜λŠ”λ°, μ΄λŠ” Subscription#request λ©”μ„œλ“œλ₯Ό 톡해 이루어지며,
"ν‘Έμ‹œ 기반 μ‹œμŠ€ν…œ(push-based systems)"μ—μ„œ λ°œμƒν•  수 μžˆλŠ” λ¦¬μ†ŒμŠ€ 관리 문제λ₯Ό λ°©μ§€ν•  수 μžˆμŠ΅λ‹ˆλ‹€.


πŸ” μ˜ˆμ‹œ

일반적으둜 PublisherλŠ” μžμ‹ λ§Œμ˜ Subscription κ΅¬ν˜„μ²΄λ₯Ό μ •μ˜ν•©λ‹ˆλ‹€.
이 κ΅¬ν˜„μ²΄λŠ” subscribe λ©”μ„œλ“œμ—μ„œ μƒμ„±λ˜μ–΄, ν˜ΈμΆœν•œ Subscriberμ—κ²Œ μ „λ‹¬λ©λ‹ˆλ‹€.

ν•­λͺ©μ€ 일반적으둜 λΉ„λ™κΈ°μ μœΌλ‘œ λ°œν–‰λ˜λ©°, 이λ₯Ό μœ„ν•΄ Executorλ₯Ό μ‚¬μš©ν•˜λŠ” κ²½μš°κ°€ λ§ŽμŠ΅λ‹ˆλ‹€.

βœ… κ°„λ‹¨ν•œ Publisher μ˜ˆμ‹œ

λ‹€μŒμ€ 맀우 λ‹¨μˆœν•œ Publisher의 μ˜ˆμ‹œμž…λ‹ˆλ‹€:

  • μš”μ²­μ΄ μžˆμ„ λ•Œμ—λ§Œ
  • 단 ν•˜λ‚˜μ˜ TRUE ν•­λͺ©μ„
  • 단 ν•˜λ‚˜μ˜ κ΅¬λ…μžμ—κ²Œ λ°œν–‰(publish)ν•©λ‹ˆλ‹€.

이 μ˜ˆμ œμ—μ„œλŠ” κ΅¬λ…μžκ°€ 였직 ν•˜λ‚˜μ˜ ν•­λͺ©λ§Œ λ°›κΈ° λ•Œλ¬Έμ—,
λ²„νΌλ§μ΄λ‚˜ μˆœμ„œ μ œμ–΄κ°€ ν•„μš” μ—†μŠ΅λ‹ˆλ‹€.
(λ³΅μž‘ν•œ κ΅¬ν˜„μ—μ„œλŠ” SubmissionPublisher와 같은 ν΄λž˜μŠ€κ°€ μ΄λŸ¬ν•œ κΈ°λŠ₯을 μ œκ³΅ν•©λ‹ˆλ‹€.)

class OneShotPublisher implements Publisher<Boolean> {
    private final ExecutorService executor = ForkJoinPool.commonPool(); // daemon-based
    private boolean subscribed; // true after first subscribe
    public synchronized void subscribe(Subscriber<? super Boolean> subscriber) {
      if (subscribed)
        subscriber.onError(new IllegalStateException()); // only one allowed
      else {
        subscribed = true;
        subscriber.onSubscribe(new OneShotSubscription(subscriber, executor));
      }
    }

  static class OneShotSubscription implements Subscription {

     private final Subscriber<? super Boolean> subscriber;
     private final ExecutorService executor;
     private Future<?> future; // to allow cancellation
     private boolean completed;

     OneShotSubscription(Subscriber<? super Boolean> subscriber,
                          ExecutorService executor) {
        this.subscriber = subscriber;
        this.executor = executor;
      }

      public synchronized void request(long n) {

        if (!completed) {
          completed = true;
          if (n <= 0) {
            IllegalArgumentException ex = new IllegalArgumentException();
            executor.execute(() -> subscriber.onError(ex));
          } else {
           future = executor.submit(() -> {
              subscriber.onNext(Boolean.TRUE);
              subscriber.onComplete();
            });
          }

        }
      }

      public synchronized void cancel() {
        completed = true;
        if (future != null) future.cancel(false);
      }
    }
  }}

🧩 Subscriber μ„€λͺ…

SubscriberλŠ” ν•­λͺ©(item)을 μš”μ²­ν•˜κ³  μ²˜λ¦¬ν•˜λŠ” 역할을 λ‹΄λ‹Ήν•©λ‹ˆλ‹€.
Subscriber#onNext λ©”μ„œλ“œλ₯Ό 톡해 ν•­λͺ©μ΄ μ „λ‹¬λ˜λ©°, μš”μ²­ν•˜μ§€ μ•ŠμœΌλ©΄ ν•­λͺ©μ€ μ „λ‹¬λ˜μ§€ μ•ŠμŠ΅λ‹ˆλ‹€.
ν•˜μ§€λ§Œ ν•œ λ²ˆμ— μ—¬λŸ¬ ν•­λͺ©μ„ μš”μ²­ν•  수 μžˆμŠ΅λ‹ˆλ‹€.

λ§Žμ€ Subscriber κ΅¬ν˜„μ²΄λŠ” λ‹€μŒ μ˜ˆμ‹œ μŠ€νƒ€μΌμ²˜λŸΌ λ™μž‘ν•  수 μžˆμŠ΅λ‹ˆλ‹€:

  • 버퍼 크기(buffer size)λ₯Ό 1둜 μ„€μ •ν•˜λ©΄ ν•­λͺ©μ„ ν•˜λ‚˜μ”© λ‹¨κ³„μ μœΌλ‘œ 처리(single-step)ν•˜κ²Œ λ©λ‹ˆλ‹€.
  • 더 큰 버퍼 ν¬κΈ°λŠ” μ²˜λ¦¬μ™€ 톡신을 λ³‘λ ¬μ μœΌλ‘œ 효율적으둜 μˆ˜ν–‰ν•  수 있게 λ„μ™€μ€λ‹ˆλ‹€.
    예λ₯Ό λ“€μ–΄ 버퍼 크기λ₯Ό 64둜 μ„€μ •ν•˜λ©΄, 총 μš”μ²­ 쀑 처리 λŒ€κΈ° 쀑인 ν•­λͺ© μˆ˜λŠ” 32~64개 사이λ₯Ό μœ μ§€ν•˜κ²Œ λ©λ‹ˆλ‹€.

πŸ”’ 동기화 κ΄€λ ¨ μ£Όμ˜μ‚¬ν•­

νŠΉμ • Subscription에 λŒ€ν•œ Subscriber의 λ©”μ„œλ“œ 호좜 μˆœμ„œλŠ” μ—„κ²©ν•˜κ²Œ 보μž₯λ˜λ―€λ‘œ,
동기화(synchronized), 락(lock), volatile λ³€μˆ˜ 등을 μ‚¬μš©ν•  ν•„μš”κ°€ μ—†μŠ΅λ‹ˆλ‹€.

단, Subscriberκ°€ μ—¬λŸ¬ 개의 Subscription을 λ™μ‹œμ— μœ μ§€ν•œλ‹€λ©΄,
μ΄λ³΄λ‹€λŠ” 각각의 Subscription에 λŒ€ν•΄ λ³„λ„μ˜ Subscriberλ₯Ό μ •μ˜ν•˜λŠ” 것이 더 μ’‹μŠ΅λ‹ˆλ‹€.

  class SampleSubscriber<T> implements Subscriber<T> {

    final Consumer<? super T> consumer;
    Subscription subscription;
    final long bufferSize;
    long count;

    SampleSubscriber(long bufferSize, Consumer<? super T> consumer) {
      this.bufferSize = bufferSize;
      this.consumer = consumer;
    }
   
public void onSubscribe(Subscription subscription) {

      long initialRequestSize = bufferSize;
      count = bufferSize - bufferSize / 2; // re-request when half consumed
      (this.subscription = subscription).request(initialRequestSize);
    }

    public void onNext(T item) {
      if (--count <= 0)
        subscription.request(count = bufferSize - bufferSize / 2);
      consumer.accept(item);
    }

    public void onError(Throwable ex) { ex.printStackTrace(); }
    public void onComplete() {}
 }

}

βš™οΈ κΈ°λ³Έ 버퍼 크기 (defaultBufferSize)

defaultBufferSize의 기본값은 λ‹€μŒκ³Ό 같은 κ²½μš°μ— μœ μš©ν•œ μ‹œμž‘μ μ΄ 될 수 μžˆμŠ΅λ‹ˆλ‹€:

  • μ˜ˆμƒ 처리 속도(rate)
  • μ‚¬μš© κ°€λŠ₯ν•œ λ¦¬μ†ŒμŠ€(resources)
  • μ‚¬μš© ν˜•νƒœ(usages)

이 값은 Flow ꡬ성 μš”μ†Œμ—μ„œ μš”μ²­ 크기(request size) 및 버퍼 μš©λŸ‰(capacity) 을 μ„€μ •ν•  λ•Œ 기쀀이 λ©λ‹ˆλ‹€.


🚫 흐름 μ œμ–΄κ°€ ν•„μš” μ—†λŠ” 경우

흐름 μ œμ–΄(flow control)κ°€ μ „ν˜€ ν•„μš”ν•˜μ§€ μ•Šμ€ 경우,
SubscriberλŠ” μ²˜μŒλΆ€ν„° 사싀상 λ¬΄μ œν•œ 개수의 ν•­λͺ©μ„ μš”μ²­ν•  μˆ˜λ„ μžˆμŠ΅λ‹ˆλ‹€.
예λ₯Ό λ“€μ–΄ λ‹€μŒκ³Ό 같이 μ‚¬μš©ν•  수 μžˆμŠ΅λ‹ˆλ‹€:

class UnboundedSubscriber<T> implements Subscriber<T> {
   
 public void onSubscribe(Subscription subscription) {
      subscription.request(Long.MAX_VALUE); // effectively unbounded
    }

    public void onNext(T item) { use(item); }
    public void onError(Throwable ex) { ex.printStackTrace(); }
    public void onComplete() {}
    void use(T item) { ... }

java API

public final class Flow {
    private Flow() {} // uninstantiable
}

 /**
     * A producer of items (and related control messages) received by
     * Subscribers.  Each current {@link Subscriber} receives the same
     * items (via method {@code onNext}) in the same order, unless
     * drops or errors are encountered. If a Publisher encounters an
     * error that does not allow items to be issued to a Subscriber,
     * that Subscriber receives {@code onError}, and then receives no
     * further messages.  Otherwise, when it is known that no further
     * messages will be issued to it, a subscriber receives {@code
     * onComplete}.  Publishers ensure that Subscriber method
     * invocations for each subscription are strictly ordered in <a
     * href="package-summary.html#MemoryVisibility"><i>happens-before</i></a>
     * order.
     *
     * <p>Publishers may vary in policy about whether drops (failures
     * to issue an item because of resource limitations) are treated
     * as unrecoverable errors.  Publishers may also vary about
     * whether Subscribers receive items that were produced or
     * available before they subscribed.
     *
     * @param <T> the published item type
     */

   @FunctionalInterface
   public static interface Publisher<T> {
        /**
         * Adds the given Subscriber if possible.  If already
         * subscribed, or the attempt to subscribe fails due to policy
         * violations or errors, the Subscriber's {@code onError}
         * method is invoked with an {@link IllegalStateException}.
         * Otherwise, the Subscriber's {@code onSubscribe} method is
         * invoked with a new {@link Subscription}.  Subscribers may
         * enable receiving items by invoking the {@code request}
         * method of this Subscription, and may unsubscribe by
         * invoking its {@code cancel} method.
         *
         * @param subscriber the subscriber
         * @throws NullPointerException if subscriber is null
         */
        public void subscribe(Subscriber<? super T> subscriber);
    }

    /**
     * A receiver of messages.  The methods in this interface are
     * invoked in strict sequential order for each {@link
     * Subscription}.
     *
     * @param <T> the subscribed item type
     */
    public static interface Subscriber<T> {
        /**
         * Method invoked prior to invoking any other Subscriber
         * methods for the given Subscription. If this method throws
         * an exception, resulting behavior is not guaranteed, but may
         * cause the Subscription not to be established or to be cancelled.
         *
         * <p>Typically, implementations of this method invoke {@code
         * subscription.request} to enable receiving items.
         *
         * @param subscription a new subscription
         */
        public void onSubscribe(Subscription subscription);

        /**
         * Method invoked with a Subscription's next item.  If this
         * method throws an exception, resulting behavior is not
         * guaranteed, but may cause the Subscription to be cancelled.
         *
         * @param item the item
         */
        public void onNext(T item);

        /**
         * Method invoked upon an unrecoverable error encountered by a
         * Publisher or Subscription, after which no other Subscriber
         * methods are invoked by the Subscription.  If this method
         * itself throws an exception, resulting behavior is
         * undefined.
         *
         * @param throwable the exception
         */
        public void onError(Throwable throwable);

        /**
         * Method invoked when it is known that no additional
         * Subscriber method invocations will occur for a Subscription
         * that is not already terminated by error, after which no
         * other Subscriber methods are invoked by the Subscription.
         * If this method throws an exception, resulting behavior is
         * undefined.
         */
        public void onComplete();
    }

    /**
     * Message control linking a {@link Publisher} and {@link
     * Subscriber}.  Subscribers receive items only when requested,
     * and may cancel at any time. The methods in this interface are
     * intended to be invoked only by their Subscribers; usages in
     * other contexts have undefined effects.
     */
    public static interface Subscription {
        /**
         * Adds the given number {@code n} of items to the current
         * unfulfilled demand for this subscription.  If {@code n} is
         * less than or equal to zero, the Subscriber will receive an
         * {@code onError} signal with an {@link
         * IllegalArgumentException} argument.  Otherwise, the
         * Subscriber will receive up to {@code n} additional {@code
         * onNext} invocations (or fewer if terminated).
         *
         * @param n the increment of demand; a value of {@code
         * Long.MAX_VALUE} may be considered as effectively unbounded
         */
        public void request(long n);

        /**
         * Causes the Subscriber to (eventually) stop receiving
         * messages.  Implementation is best-effort -- additional
         * messages may be received after invoking this method.
         * A cancelled subscription need not ever receive an
         * {@code onComplete} or {@code onError} signal.
         */
        public void cancel();
    }

    /**
     * A component that acts as both a Subscriber and Publisher.
     *
     * @param <T> the subscribed item type
     * @param <R> the published item type
     */
    public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> {
    }

    static final int DEFAULT_BUFFER_SIZE = 256;

    /**
     * Returns a default value for Publisher or Subscriber buffering,
     * that may be used in the absence of other constraints.
     *
     * @implNote
     * The current value returned is 256.
     *
     * @return the buffer size value
     */
    public static int defaultBufferSize() {
        return DEFAULT_BUFFER_SIZE;
    }
}
⚠️ **GitHub.com Fallback** ⚠️