type
status
date
slug
summary
tags
category
password
icon
基本概念:
定义:挂起和恢复
作用:异步逻辑同步代码
协程的分类:
- 按调用栈
- 有栈协程:每个协程会分配单独的调用栈,类似线程的调用栈(比如 Lua Corountine)
- 无栈协程:不回分配单独的调用栈,挂起点状态通过闭包或者对象保存(比如 Python Cenerator)(kotlin 就是这种,通过 continuation 对象保存挂起点状态)
- 按调用关系:
- 对称协程:调用权可以转移给任意协程,协程之间是对等关系
- 非对称协程:调度权只能转移给调用自己的协程,协程存在父子关系
挂起函数
- 挂起函数:以 suspend 修饰的函数
- 挂起函数只能在其它挂起函数或协程中调用
- 挂起函数调用时包含了协程“挂起”的语义
- 挂起函数返回时则包含了协程“恢复”的语义
个人理解:
只有协程才能挂起,一个协程被创建后会返回一个 continuation 作为该协程的本体,只有当调用了协程本体执行了 resume 这个协程内部的逻辑才会开始执行(相当于从调用 continuation.resume 的位置,恢复到协程内部执行协程中的逻辑)。如果协程内部遇到挂起函数,那么该协程会执行挂起函数内部的逻辑,如果挂起函数未调用自己的 continuation.resume,那么这个协程就真的挂起了。只有当调用挂起函数的 continuation.resume 这个挂起函数才会得到 resume 中的值作为返回值,这个协程才会继续执行,也就是恢复。
Continuation
kotlin 是无栈协程,所以使用 Continuation 保存挂起点的状态
所有的挂起恢复都可看着正常/异常的结果返回
suspend
suspend 函数
但实际转换为 Java 代码为
对比发现不同点:
- 形参多了一个 continuation,之所以挂起函数必须在挂起函数或者协程中调用,就是因为挂起函数必须要传入一个continuation
- 返回值变为Any,是因为挂起函数实际上并不一定需要挂起,就像bar函数仅仅返回了一个Hello字符串,不需要挂起的返回值就通过实际的函数返回值返回Any,但是如果是需要挂起的函数,那么函数的返回值实际是一个挂起的标识(COROUTINE_SUSPENDED 挂起标识对象)
将回调转写成挂起函数
真正的挂起必须异步调用 resume,包括:
- 切换到其它线程 resume
- 单线程事件循环异步执行
注意:在 suspendCoroutine 中直接调用 resume 也算没有挂起
- 使用 suspendCoroutine 获取挂起函数 Continuation
- 回调成功的分支使用 Continuation.resume(value)
- 回调失败则使用 Continuation.resumeWithException(e)
协程的创建
- 协程是一段可执行的程序
- 协程的创建通常需要一个函数 suspend function
- 协程的创建也需要一个 API
createCoroutine
startCoroutine
创建协程
一个协程的创建肯定有两个 continuation
- Reciver 是一个被 suspend 修饰的挂起函数,这也是协程的执行体,称作「协程体」。
- suspend 函数本身执行需要一个 Continuation 实例在恢复时调用,就是上面的completion 形参。会在协程体执行完成后调用,实际上就是协程的完成回调。
- 返回值 Continuation<Unit> 则是创建出来的协程载体,receiver suspend 函数会被传给该实例作为协程的实际执行体。并通过返回的这个 Continuation.resume() 触发协程的启动,开始执行协程体中的逻辑。
启动协程
协程中如果有 n 个挂起点,并且这个这些挂起点真正的挂起,总共会调用多少次 resume?从上面的例子分析:
- 协程启动时调用一次,通过恢复调用来开始执行协程体到下一次挂起之间的逻辑。
- 挂起点处如果异步挂起,则在恢复时会调用一次。由于这个过程中有两次挂起,硬刺会调用两次。
由此可知恢复调用的次数为 1 + n 其中 n 是协程体内真正挂起执行异步逻辑的挂起点个数。
协程体的 Receiver
协程创建和启动的 API 一共有两组,除了上面一组外,还有一组
对比发现,协程体多了一个 Reciver 类型 R,这个 R 可以为协程体提供一个作用域,在协程体内,可以直接使用作用域内提供的函数或者状态等。
协程作用域
通过 @RestrictsSuspension 注解作用域类,可以使得协程体内只允许使用作用域类中提供的方法
协程上下文
- 协程执行过程中需要携带数据
- 索引是 CoroutineContext.Key
- 元素是 CoroutineContext.Element
拦截器
协程允许通过拦截器拦截协程异步回调时的恢复调用。这样就可以操作线程的调度了
- 拦截器 ContinuationInterceptor 是协程上下文元素,继承了
CoroutineContext.Element
- 可以对协程上下文所在协程的 Continuation 进行拦截
拦截器最重要的作用就是线程切换
当使用 suspendCoroutine<Unit> 创建 suspend 函数,协程体就包装进一个 delegate 对象中了,这个对象实际的类型是SuspendLambda,并且 delegate 已经是经过拦截器拦截后的实例。
SafeContinuation 的作用就是确保:
- resume 只被调用一次
- 如果在当前线程调用栈上直接调用就不回挂起
在 Intercepted 的 resume 中就可以做线程切换
注意:
- SafeContinuation 仅在挂起点时出现
- 拦截器在每次(恢复)协程体时调用
- SuspendLambda 是协程函数体
协程挂起恢复的要点
- 协程体内的代码都是通过 Continuation.resumeWith 调用
- 每调用一次 label 加 1,每一个挂起点对应一个 case 分支
- 挂起函数在返回 COROUTINE_SUSPENDED 时才会挂起
协程线程调用
进阶
协程的状态
- Incomplete:未完成
- CancelHandler:取消中
- Cancelled:已取消
- Complete:已完成
协程的取消
在协程取消时,挂起函数通过抛出取消异常来实现对取消状态的响应。
协程的作用域
- 顶级作用域:没有父协程的协程所在的作用域为顶级作用域。
- 协同作用域:协程中启动新的协程,新协程为所在协程的子协程,这种情况下子协程所在的作用域默认为协同作用域。此时子协程抛出的未捕获异常都将传递给父协程处理,父协程同时也会被取消。
- 主从作用域:与协程作用域在协程的父子关系上一致,区别在于处于该作用域下的协程出现未捕获的异常时不会将异常向上传递给父协程。
除了这三种作用域中提到的行为以外,父子协程之间还存在以下规则:
- 父协程被取消,则所有子协程均被取消。由于协同作用域和主从作用域中都存在 父子协程关系,因此这条规则都适用。
- 父协程需要等待子协程执行完毕之后才会最终进入完成状态,不管父协程自身的 协程体是否已经执行完。
- 子协程会继承父协程的协程上下文中的元素,如果自身有相同key 的成员,则覆 盖对应的 key,覆盖的效果仅限自身范围内有效。
协程的启动模式
启动模式总共有 4 种。
- DEFAULT:协程创建后,立即开始调度,在调度前如果协程被取消,其将直接进入取消响应的状态。
- ATOMIC:协程创建后,立即开始调度,协程执行到第一个挂起点之前不响应取消。
- LAZY:只有协程被需要时,包括主动调用协程的 start、join 或者 await 等函数时才会开始调度,如果调度前就被取消,那么该协程将直接进入异常结束状态。
- UNDISPATCHED:协程创建后立即在当前函数调用栈中执行,直到遇到第一个真正挂起的点。
要彻底搞清楚这几个模式的效果,我们需要先搞清楚立即调度和立即执行的区别。立即调度表示协程的调度器会立即接收调度指令,但具体执行的时机以及在哪个线程上执行,还需要根据调度器的具体情况而定,也就是说立即调度到立即执行之间通常会有一段时间。因此,我们得出以下结论:
- DEFAULT 虽然是立即调度,但也有可能在执行前被取消。
- UNDISPATCHED 是立即执行,因此协程一定会执行。
- ATOMIC 虽然是立即调度,但其将调度和执行两个步骤合二为一了,就像它的名
字一样,其保证调度和执行是原子操作,因此协程也一定会执行。
- UNDISPATCHED 和 ATOMIC 虽然都会保证协程一定执行,但在第一个挂起点之前,前者运行在协程创建时所在的线程,后者则会调度到指定的调度器所在的线程上执行。
这些启动模式的设计主要是为了应对某些特的场景。业务开发实践中通常使用DEFAULT 和 LAZY这两个启动模式就足够
协程的调度器
- Default:默认调度器,适合处理后台计算,其是一个 CPU 密集型任务调度器。
- IO:IO调度器,适合执行 IO 相关操作,其是一个 IO 密集型任务调度器。
- Main:UI 调度器,根据平台不同会被初始化为对应的UI线程的调度器,例如在 Android 平台上它会将协程调度到 UI 事件循环中执行,即通常在主线程上执行。
- Unconfined:“无所谓”调度器,不要求协程执行在特定线程上。协程的调度器如果是 Unconfined,那么它在挂起点恢复执行时会在恢复所在的线程上直接执行,当然,如果嵌套创建以它为调度器的协程,那么这些协程会在启动时被调度到协程框架内部的事件循环上,以避免出现 StackOverflow。
高阶
热数据通道 Channel
Channel 实际就是一个并发安全的队列,它可以用来连接线程,实现不同协程的通信
基本使用
Channel 的容量
Channel 实际就是一个队列,存在一定的缓冲区,一但满了并且也没有调用 receive 取走元素,send 就需要挂起。同样的没有缓冲中没有元素,receive 也会挂起。
迭代 Channel
produce 和 actor
(开发版API,后续可能存在变动)
便捷的方式构造生产者:
便捷的方式构建消费者:
ReceiveChannel 和 SendChannel 都是 Channel 的父接口,前者定义了 receive,后者定义了send,因此 Channel 既可以使用 receive 又可以使用 send。
Channel 的关闭
produce 和 actor 返回的 Channel 都会随着对应的协程执行完毕而关闭,可见,Channel还有一个关闭的概念。也正是这样,Channel 才被称为热数据流,与 Flow 正好相反。
既然这样,对于一个 Channel,如果我们调用了它的 close 方法,它会立即停止接收新元素,也就是说这时候它的isClosedForSend 会立即返回 true。而由于 Channel缓冲区的存在,这时候可能还有一些元素没有被处理完,因此要等所有的元素都被读取之后 isClosedForReceive 才会返回 true。
BroadcastChannel
(开发版API,后续可能存在变动)
发送端和接受端在 Channel 中存在一对多的情况,从数据处理本身来说,虽然有多个接受端,但同一个元素只能被一个接收端读到,广播则不然,多个接收端不存在互斥行为。
创建 broadcastChannel 的方法与创建普通的 Channel 几乎没有区别:
val broadcastChannel = BroadcastChannel<Int>(5)
如果要订阅功能,那么只需要调用如下方法:
val receiveChannel = broadcastChannel.openSubscription()
这样我们就得到了一个 ReceiveChannel,要想获取订阅的消息,只需要调用它的receive 函数;如果想要取消订阅则调用 cancel 函数即可。
冷数据流 Flow
随着 RxJava 的流行,响应式编程模型逐步深人人心。Flow 就是 Kotlin 协程与响应式编程模型结合的产物。
Flow 的使用
异常处理
直接使用 catch 方法就行,如果没有调用 catch 函数,未捕获的异常会在消费时抛出
注意:catch函数只能捕获它上游的异常。
如果想在Flow执行完时执行捕获逻辑,可以使用 onCompletion:
onCompletion 用起来类似于 try..catch..finally 中的 finally,无论前面是否存在异常,它都会被调用,参数t则是前面未捕获的异常。
也可以在捕获到异常后,生产一个新的数据,类似下面的代码:
可以发现 emit 定义在 FlowCollector 中,因此只要遇到 Receiver 为 FlowCollector 的函数,就可以生产新元素。
操作符分类
- 末端操作符:collect 是最基本的末端操作符
- 集合类型转换操作符:toList、toSet 等。
- 聚合操作符:包括将 Flow 规约到单值的 reduce、fold 等操作;还有获得单个元素的操作符,包括 single、singleOrNull、first 等。
分离 Flow 的操作和触发
我们除了可以在 collect处消费Flow 的元素以外,还可以通过 onEach 来做到这一点。
这样消费的具体操作就不需要与末端操作符放到一起,collect 函数可以放到其他任意位置调用。
Flow 的取消
Flow 没有提供取消操作,因为并不需要。想要取消 Flow 只需要取消它所在的协程即可
Flow 的其他创建方式
flow{…} 这种形式的创建,无法随意切换调度器,因为 emit 函数不是线程安全的。下面的这种写法就是错误的。
要想在生成元素时切换调度器,就必须使用 channelFlow 函数来创建 Flow:
Flow 的背压
只要是响应式编程,就一定会有背压的问题。
背压问题在生产者的生产速率高于消费者的处理速率的情况下出现。为了保证数据不丢失,我们也会考虑添加缓冲来缓解背压问题。
出现背压问题的根本原因是生产和消费速率不匹配,此时除可直接优化消费者的性能以外,还可以采用一些取舍的手段。
第一种是 conflate。与 Channel 的 Conflate 模式一致,新数据会覆盖老数据;
第二种是 collectLatest。顾名思义,其只处理最新的数据。这看上去似乎与 conflate
没有区别,其实区别很大:collectLatest 并不会直接用新数据覆盖老数据,而是每一个数据都会被处理,只不过如果前一个还没被处理完后一个就来了的话,处理前一个数据的逻辑就会被取消。
除 collectLatest 之外,还有 mapLatest、flatMapLatest 等,因为作用类似,故不再重复。
Flow 的变换
我们已经对集合框架的变换非常熟悉了,Flow 看上去与集合框架极其类似,这一点与 RxJava 的 Observable 的表现基本一致。
例如我们可以使用map 来变换 Flow 的数据:
也可以映射成其他 Flow :
我们最终会得到的是一个数据类型为 Flow 的 Flow,如果希望他们拼接起来,可以使用 flattenConcat
在拼接的操作中,flattenConcat 是按顺序拼接的,结果的顺序仍然是生产时的顺序。此外,我们还可以使用 flattenMerge 进行会并发拼接,但是得到的结果不会保证顺序与生产是一致的。
多路复用 select
复用多个 await
如果有这样一个场景,两个 API 分别从网络和本地缓存获取数据,期望哪个先返回就先用哪个作展示:
不管先调用哪个 API,返回的 Deferred 的 await 都会被挂机,最终得到的结果可能并不是最先返回的,这不符合预期。当然我们也可以启动两个来分别调用 await,不过这样会将问题复杂化。可以采用 select 来解决这个问题:
可以看到,我们没有直接调用 await,而是调用了 onAwait 在 select 中注册了回调,select 总是会立即调用最先返回的事件的回调。假设 localDeferred.onAwait 先返回,那么 userResponse 的值就是 Response(it, true),由于我们的本地缓存可能不存在,因此 select 的结果类型是 Response<User>。
对于这个案例,如果先返回的是本地缓存,那么我们还需要获取网络结果来展示最终结果:
复用多个 Channel
SelectClause
使用 Flow 实现多路复用
协程并发工具
- Mutex:轻量级锁,他的 lock 和 unlock 从语义上与线程锁比较类似,之所以轻量是因为它在获取不到锁时,不会阻塞线程,而只是挂起等待锁的释放
- Semaphore: 轻量级信号量,信号量可以有多个,协程在获取到信号量后即可执行并发操作,当 Semaphore 的参数为 1 时,效果等价于 Mutex:
- 作者:shuouyang
- 链接:https://notion-tree.vercel.app/article/cfa1aae8-4240-447e-8541-e579ca4b63de
- 声明:本文采用 CC BY-NC-SA 4.0 许可协议,转载请注明出处。
相关文章