博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
5章 RxJava背压策略
阅读量:6081 次
发布时间:2019-06-20

本文共 8516 字,大约阅读时间需要 28 分钟。

本篇文章已授权微信公众号 YYGeeker 独家发布转载请标明出处

CSDN学院课程地址

  • RxJava2从入门到精通-初级篇:
  • RxJava2从入门到精通-中级篇:
  • RxJava2从入门到精通-进阶篇:
  • RxJava2从入门到精通-源码分析篇:

5. RxJava背压策略(BackpressureStrategy)

5.1 背压是什么

背压的概念是在平时业务开发时较为常见,大多数是针对高并发的业务,背压是必须考虑的因素之一。在异步场景中,由于数据流的发射速度高于数据流的接收速度,就会导致数据不能及时处理,从而导致数据流的阻塞。背压所要做的事情就是主动控制数据流发射的速度

在RxJava2.0中,推出了Flowable用来支持背压,去除了Observable对背压的支持,下面在背压策略的讲解中,我们都使用Flowable作为我们的响应类型。在使用背压时,只需要在create()方法中第二个参数添加背压策略即可

  1. 在订阅的时候如果使用FlowableSubscriber,那么需要通过s.request(Long.MAX_VALUE)去主动请求上游的数据项。如果遇到背压报错的时候,FlowableSubscriber默认已经将错误try-catch,并通过onError()进行回调,程序并不会崩溃
  2. 在订阅的时候如果使用Consumer,那么不需要主动去请求上游数据,默认已经调用了s.request(Long.MAX_VALUE)。如果遇到背压报错、且对Throwable的Consumer没有new出来,则程序直接崩溃
  3. 背压策略的上游的默认缓存池是128
public abstract class Flowable
implements Publisher
{ /** The default buffer size. */ static final int BUFFER_SIZE; static { BUFFER_SIZE = Math.max(1, Integer.getInteger("rx2.buffer-size", 128)); }}复制代码

5.2 MISSING

MISSING表示OnNext事件没有任何缓存和丢弃,下游要处理任何溢出,可以理解为相当于没有指定背压策略。Flowable相当于没有指定背压策略可以将下游要处理任何溢出理解为,上游发射的数据未得到处理,就会缓存起来,当缓存容量达到128时,再增加一个未处理的数据项,就会抛出MissingBackpressureException,且带有队列已经满了的友好提示。这里就好比一个大水缸,当水注满的时候,它就会把盖子盖上,不让你再继续注水了

这里我们模拟上游发送速度高于下游数据流的处理速度,在数据处理的时候加上Thread.sleep(1000)

public void missing() {    Flowable.create(new FlowableOnSubscribe
() { @Override public void subscribe(FlowableEmitter
emitter) throws Exception { for (int i = 0; i < 129; i++) { emitter.onNext(i); } } }, BackpressureStrategy.MISSING) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new FlowableSubscriber
() { @Override public void onSubscribe(Subscription s) { s.request(Long.MAX_VALUE); } @Override public void onNext(Integer integer) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } Log.e("TAG", "onNext=" + integer); } @Override public void onError(Throwable t) { t.printStackTrace(); } @Override public void onComplete() { } });}复制代码

输出

io.reactivex.exceptions.MissingBackpressureException: Queue is full?!复制代码

5.3 ERROR

ERROR表示在下游无法跟上时,会抛出MissingBackpressureException。可以将下游无法跟上理解为,上游发射的数据未得到处理,就会缓存起来,当缓存容量达到128时,再增加一个未处理的数据项,就会抛出MissingBackpressureException。这里好比一个大水缸,当水注满的时候,它会把水缸撑破了,直接破裂

public void error() {    Flowable.create(new FlowableOnSubscribe
() { @Override public void subscribe(FlowableEmitter
emitter) throws Exception { for (int i = 0; i < 129; i++) { emitter.onNext(i); } } }, BackpressureStrategy.ERROR) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new FlowableSubscriber
() { @Override public void onSubscribe(Subscription s) { s.request(Long.MAX_VALUE); } @Override public void onNext(Integer integer) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } Log.e("TAG", "onNext=" + integer); } @Override public void onError(Throwable t) { t.printStackTrace(); } @Override public void onComplete() { } });}复制代码

输出

io.reactivex.exceptions.MissingBackpressureException: create: could not emit value due to lack of requests复制代码

5.4 BUFFER

上游不断的发出onNext请求,直到下游处理完,上游发射的数据项的缓存池是无限大的,程序也不会抛出错误,但是要注意程序OOM的现象,因为缓存越大,占用的内存就越多。例子中发射129个数据项,然而程序并没有崩溃,只会一直读取缓存池的数据项,直到数据项被处理完。这里就是一个无限大的水缸

背压策略除了BUFFER策略的缓存池是无限大之外,其他默认的缓存池都是128

public void buffer() {    Flowable.create(new FlowableOnSubscribe
() { @Override public void subscribe(FlowableEmitter
emitter) throws Exception { for (int i = 0; i < 1000; i++) { emitter.onNext(i); } } }, BackpressureStrategy.BUFFER) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new FlowableSubscriber
() { @Override public void onSubscribe(Subscription s) { s.request(Long.MAX_VALUE); } @Override public void onNext(Integer integer) { try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } Log.e("TAG", "onNext=" + integer); } @Override public void onError(Throwable t) { t.printStackTrace(); } @Override public void onComplete() { } });}复制代码

输出

onNext=0onNext=1onNext=2......onNext=998onNext=999复制代码

5.5 DROP

会在下游跟不上速度时,把onNext的值丢弃,简单的说就是,超过缓存区大小(128)的数据项都会被丢弃。例子中通过发射800个数据项,那么我们只会收到0-127的数据项。如果我们再次调用request(),这时候取到的数据就是上一次request()后的128个数据。这里好比一个大水缸,当水注满的时候,水还是在继续的流,一旦有request调用的时候,它就会去取出水缸里的所有水,这时候水缸就是空的,但水一直在流,所以水缸马上又会被注满,这个时候就要等request再次取出水缸里的水

public void drop() {    Flowable.create(new FlowableOnSubscribe
() { @Override public void subscribe(FlowableEmitter
emitter) throws Exception { for (int i = 0; i < 1000; i++) { emitter.onNext(i); } } }, BackpressureStrategy.DROP) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new FlowableSubscriber
() { @Override public void onSubscribe(Subscription s) { s.request(Long.MAX_VALUE); } @Override public void onNext(Integer integer) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } Log.e("TAG", "onNext=" + integer); } @Override public void onError(Throwable t) { t.printStackTrace(); } @Override public void onComplete() { } });}复制代码

输出

onNext=0onNext=1onNext=2......onNext=127复制代码

5.6 LATEST

LATEST与Drop策略一样,如果超过缓存池容量大小的数据项都会被丢弃。不同的是,不管缓存池的状态如何,LATEST都会将最后一条数据强行放入缓存池中。这里的水缸容纳下了最后一滴水

public void latest() {    Flowable.create(new FlowableOnSubscribe
() { @Override public void subscribe(FlowableEmitter
emitter) throws Exception { for (int i = 0; i < 1000; i++) { emitter.onNext(i); } } }, BackpressureStrategy.LATEST) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new FlowableSubscriber
() { @Override public void onSubscribe(Subscription s) { s.request(Long.MAX_VALUE); } @Override public void onNext(Integer integer) { try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } Log.e("TAG", "onNext=" + integer); } @Override public void onError(Throwable t) { t.printStackTrace(); } @Override public void onComplete() { } });}复制代码

输出

onNext=0onNext=1......onNext=126onNext=127onNext=999复制代码

5.7 小结

  1. MISSING:没有任何缓存和丢弃,下游要处理任何溢出
  2. ERROR:下游的处理速度无法跟上上游的发射速度时报错
  3. BUFFER:数据项的缓存池无限大
  4. DROP:下游的处理速度无法跟上上游的发射速度时丢弃
  5. LATEST:最后一条数据项被强行放入缓存池

转载于:https://juejin.im/post/5cd8e37bf265da037129bd64

你可能感兴趣的文章
原型以及原型链
查看>>
王利芬 2011
查看>>
疯狂Spring Cloud连载(9)——RestTemplate的负载均衡原理
查看>>
疯狂Spring Cloud连载(27)Apache Kafka框架
查看>>
Hadoop2.4.1伪分布式的搭建
查看>>
https方式使用TortoiseGit设置git@osc密码长期存储
查看>>
由于多个切面pointcut重叠造成的事务的问题。
查看>>
JAVA懒开发:lombok的使用
查看>>
螃蟹学PHP设计模式之策略模式
查看>>
phpMyAdmin自动登录和取消自动登录
查看>>
Python 抓取网页乱码原因分析
查看>>
online lda 的dirichlet_expectation函数
查看>>
SecureCRT中文显示乱码的解决方法
查看>>
idea java.lang.OutOfMemoryError: PermGen space
查看>>
jstl中格式化时间戳
查看>>
Android上如何让应用截获系统按键
查看>>
ADB常用命令使用
查看>>
iOS --UITextField 输入值改变事件和键盘遮挡处理
查看>>
PAT 1017 Queueing at Bank
查看>>
AVI文件格式
查看>>