Reactor ‐ Backpressure 背壓 - CCH0124/spring-sandbox GitHub Wiki
訂閱者告訴生產者一次推送每個元素。這最後可能會讓訂閱者感到不堪重負,消耗其所有資源。
背壓是指下游可以告訴上游減少數據傳輸量,以防止其不堪重負。
下面範例訂閱者實現來應用背壓。使用 request() 方法來實現告訴上游每次只發送兩個元素:
@Test
public void test() {
Flux.just("a", "b", "c", "d")
.log()
.subscribe(new Subscriber() {
private Subscription s;
int onNextAmount;
@Override
public void onSubscribe(Subscription s) {
this.s = s;
s.request(2);
}
@Override
public void onNext(Object t) {
eles.add((String)t);
onNextAmount++;
if (onNextAmount % 2 == 0) {
s.request(2);
}
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
});
}
執行後如下,看到 request(2)
被調用,然後是兩個 onNext()
調用,然後再次 request(2)
21:02:11.511 [main] INFO reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
21:02:11.512 [main] INFO reactor.Flux.Array.1 - | request(2)
21:02:11.512 [main] INFO reactor.Flux.Array.1 - | onNext(a)
21:02:11.512 [main] INFO reactor.Flux.Array.1 - | onNext(b)
21:02:11.512 [main] INFO reactor.Flux.Array.1 - | request(2)
21:02:11.512 [main] INFO reactor.Flux.Array.1 - | onNext(c)
21:02:11.513 [main] INFO reactor.Flux.Array.1 - | onNext(d)
21:02:11.513 [main] INFO reactor.Flux.Array.1 - | request(2)
21:02:11.513 [main] INFO reactor.Flux.Array.1 - | onComplete()
基本上,這就是 Reactive 拉取背壓。範例請求上游只在準備好時推送一定數量的元素。如果想像自己正在接收來自 Twitter 的推文,那麼上游將決定如何處理這些推文。如果推文進來了,但下游沒有請求,那麼上游可以丟棄這些項目、將它們存儲在緩衝區中,或使用其他策略。
Strategy | Behavior |
---|---|
buffer | keep in memory |
error | throw error to the downstream |
drop | Once the queue is full, new items will be dropped |
latest | Once the queue is full, keep 1 latest item as and when it arrives. drop old. |