type
status
date
slug
summary
tags
category
password
icon
基本⽤法
框架结构
RxJava 的整体结构是⼀条链,其中:
- 链的最上游:⽣产者 Observable
- 链的最下游:观察者 Observer
- 链的中间:各个中介节点,既是下游的 Observable,⼜是上游的 Observer
创建
Single 为例:

操作符 Operator(map() 等等):
- 基于原 Observable 创建⼀个定制的 Observable
- 定制的 Observable 内部会创建⼀个定制的 Observer
- 通过定制的 Observable 的 subscribeActual() ⽅法和定制的 Observer 的 onXxx() ⽅法,来实现⾃⼰的中介⻆⾊(例如数据转换、线程切换)

Disposable:
可以通过 dispose() ⽅法来让上游或内部调度器(或两者都有)停⽌⼯作,达到「丢弃」的效果。
1. 当上游有后续任务,并且自己也具有延迟操作或生产事件的操作时,dispose() 需要把上游和自己都取消掉,让上游不要继续发送任务,自己当前处于延迟的任务也不发送给下游
2. 当上游没有后续任务,自己具有延迟操作或者生产事件的操作时,如果事件还未传递到自己,dispose() 需要告诉上游,下游要求取消事件,如果事件已经传递到自己,就只需要自己取消掉事件或者延迟操作
3. 当上游有后续任务,自己没有延迟操作或生产操作(map),如果下游要求dispose(),那么就直接 dispose() 上游。
3. 当上游没有后续任务,自己也没有延迟或生产操作时,整个链上的节点只需要把当前事件传递然后 dispose() 自身就行。
4. 如果没有上游,自己有延迟或生产操作时,下游要求 dispose() 时,自己需要取消延迟和生产操作。
DisposableHelper
作用是方便内部切换Disposable
,当执行到onSubscribe
可以使用DisposableHelper.
setOnce
(this, d);
切换为上游的Disposable
,当执行到onSuccess
和onError
时,可以切换到DisposableHelper.
replace
(this, d);
而真正当下游请求 dispose 时,只需要调用DisposableHelper.
dispose
(this);
subscribeOn()

原理
在 Scheduler 指定的线程⾥启动 subscribe()
效果
- 切换起源 Observable 的线程;
- 当多次调⽤ subscribeOn() 的时候,只有最上⾯的会对起源 Observable 起作⽤。
observeOn()

原理
在内部创建的 Observer 的 onNext() onError() onSuccess() 等回调⽅法⾥,通过 Scheduler 指定的线程来调⽤下级 Observer 的对应回调⽅法
效果
- 切换 observeOn() 下⾯的 Observer 的回调所在的线程
- 当多次调⽤ observeOn() 的时候,每个都会进⾏⼀次线程切换,影响范围是它下⾯的每个Observer (除⾮⼜遇到新的 observeOn())

Scheduler 的原理
- Schedulers.newThread() 和 Schedulers.io():
- 当 scheduleDirect() 被调⽤的时候,会创建⼀个 Worker,Worker 的内部会有⼀个Executor,由 Executor 来完成实际的线程切换;
- scheduleDirect() 还会创建出⼀个 Disposable 对象,交给外层的 Observer,让它能执⾏dispose() 操作,取消订阅链;
- newThread() 和 io() 的区别在于,io() 可能会对 Executor 进⾏重⽤。
- AndroidSchedulers.mainThread(): 通过内部的 Handler 把任务放到主线程去做。
- 作者:shuouyang
- 链接:https://notion-tree.vercel.app/article/e16b79ee-b5f1-4c35-b2b0-665749ca572d
- 声明:本文采用 CC BY-NC-SA 4.0 许可协议,转载请注明出处。