Kotlin 协程设计思想(九):Flow 到底是什么?为什么 suspend 函数还需要 Flow?
—— 从 suspend、Sequence 到 Cold Flow,彻底讲透 Kotlin Flow 的设计哲学
前面几篇,我们已经讲了:
CoroutineContext
↓
Job
↓
Dispatcher
↓
launch / async
↓
Exception
↓
Structured Concurrency
↓
Supervisor
↓
suspend
到第八篇,我们已经知道:
suspend不是开启协程。
它真正解决的是:
协程如何暂停 协程如何恢复suspend的底层核心是:
Continuation + 编译器状态机到这里,很多人会觉得:
既然已经有了suspend,为什么还要有Flow?
这就是第九篇真正要讲的问题。
不是:
Flow 怎么用?而是:
为什么 Kotlin 已经有 suspend,还要设计 Flow?一、前面其实埋了一个坑
前面我们一直在讲:
suspend fun login() { }它的特点是什么?
一句话:
一次调用,一个结果例如:
suspend fun getUser(): User执行流程是:
请求用户信息 ↓ 返回 User ↓ 结束这非常适合接口请求。
例如:
val user = getUser()调用一次,返回一个结果,然后结束。
但是问题来了。
如果是定位呢?
1 秒返回一次经纬度如果是 WebSocket 呢?
不断收到服务端消息如果是蓝牙通知呢?
设备一直上报数据如果是下载进度呢?
1%、2%、3%、4%...这些场景都有一个共同特点:
不是一次结果 而是连续结果这时候,单纯的suspend fun就不够用了。
二、suspend 最大的限制
suspend最大的特点是:
一次调用,一个结果例如:
val user = getUser()这很好理解。
但是现实世界里,很多数据不是一次性的。
而是源源不断的。
例如:
定位 WebSocket 蓝牙通知 聊天消息 股票价格 传感器数据 下载进度 数据库变化这些数据不是:
请求一次 返回一次 结束而是:
不断产生 不断更新 不断消费如果只靠suspend fun,就会很尴尬。
因为suspend fun只能这样:
suspend fun getLocation(): Location它只能返回一个Location。
但是定位真正需要的是:
Location Location Location Location ...所以:
suspend 解决的是一次结果。
而:
Flow 解决的是连续结果。
三、其实 Java 早就遇到过这个问题
在 Java 里,如果我们想一个个拿数据,会用什么?
Iterator例如:
while (iterator.hasNext()) { Object item = iterator.next(); }它的特点是:
一次拿一个 不断往后拿后来 Kotlin 里有了:
Sequence例如:
val sequence = sequence { yield(1) yield(2) yield(3) }它也是一个个产生数据。
使用时:
sequence.forEach { println(it) }输出:
1 2 3Sequence 的特点是:
懒加载 按需产生 一个个消费这和 Flow 很像。
但是 Sequence 有一个问题:
它不支持挂起。
四、Sequence 为什么不够?
例如:
val sequence = sequence { yield(1) delay(1000) yield(2) }这段代码是不成立的。
因为sequence {}里面不能直接调用挂起函数。
也就是说,Sequence 可以做到:
一个个产生数据但是它做不到:
一边等待 一边挂起 一边继续产生数据而现实中的连续数据,往往都需要等待。
例如:
1 秒后产生一次定位 网络消息来了才产生一次数据 蓝牙通知来了才产生一次数据 数据库变化了才产生一次数据这些都不是普通的同步迭代能解决的。
所以可以这样理解:
Sequence = 同步数据序列 Flow = 支持挂起的异步数据序列五、突然理解 Flow
很多教程会说:
Flow 是数据流这句话没错,但是太抽象。
我觉得更适合初学者理解的一句话是:
Flow = 支持 suspend 的 SequenceSequence 是:
一个个给数据Flow 也是:
一个个给数据但 Flow 比 Sequence 多了一个能力:
每次给数据之前,可以挂起例如:
val flow = flow { emit(1) delay(1000) emit(2) delay(1000) emit(3) }这段代码表达的意思是:
先发 1 等 1 秒 再发 2 再等 1 秒 再发 3这个能力,Sequence 做不到。
Flow 能做到,是因为:
emit 是 suspend collect 也是 suspend六、emit 到底是什么?
例如:
flow { emit(1) emit(2) emit(3) }很多人觉得:
emit 就是发送数据这句话没错,但还不够。
更准确地说:
emit 是一次可能挂起的数据发送为什么emit()也要是suspend?
因为生产者和消费者之间可能存在速度差。
例如:
生产者很快 消费者很慢生产者不停发:
1 2 3 4 5但是消费者处理不过来。
这时候怎么办?
不能无限制乱发。
所以emit()必须有能力:
等待消费者处理 必要时挂起自己这就是为什么:
emit()本身也是suspend。
七、collect 为什么也是 suspend?
再看:
flow.collect { println(it) }为什么collect()也是suspend?
因为 collect 的本质是:
等待数据 处理数据 继续等待 继续处理尤其是很多 Flow 可能永远不会结束。
例如:
WebSocket 消息流 定位数据流 蓝牙通知流 数据库监听流它们的特点是:
只要页面还在 就一直等待数据所以 collect 必须能够挂起当前协程。
否则它就只能阻塞线程。
所以:
collect 是 suspend本质上是因为:
它要等待连续数据八、为什么 Flow 是 Cold?
这是 Flow 最经典的问题。
例如:
val flow = flow { println("开始") emit(1) }这里会不会马上打印:
开始答案是:
不会。
因为只是创建了一个 Flow 对象。
此时:
没人 collect 就没人生产只有真正调用:
flow.collect { println(it) }才会开始执行flow {}里面的代码。
所以 Flow 默认是 Cold Flow。
所谓 Cold Flow,就是:
没人收集 就不生产数据九、为什么要设计成 Cold?
这个设计非常重要。
例如在 Android 页面里:
页面进入 开始 collect 页面退出 停止 collect当没人 collect 的时候,Flow 就不再生产数据。
这样有几个好处:
节约资源 避免无意义任务 配合生命周期更安全 减少内存泄漏比如定位。
如果页面已经退出了,还在后台一直生产定位数据,就很浪费。
Cold Flow 的设计正好解决这个问题:
需要时才开始 不需要时就停止这和协程的结构化并发思想是统一的。
十、突然和前面串起来了
现在我们回头看整个体系。
suspend:一次调用,一个结果 Flow:一次调用,多个结果 Channel:消息队列 callbackFlow:把回调转成 Flow StateFlow:状态流 SharedFlow:事件流这样一看,很多东西就不再是零散 API 了。
它们分别解决不同的问题。
suspend解决:
一次异步结果Flow解决:
连续异步结果Channel解决:
协程之间排队传消息callbackFlow解决:
把传统 callback 接入协程世界StateFlow解决:
状态保存和状态观察SharedFlow解决:
事件分发和一次性通知这才是 Flow 真正的位置。
十一、Google 为什么还要设计 Flow?
因为现实业务里,数据形态不止一种。
有些数据是一次性的。
例如:
登录 获取用户信息 提交表单 上传结果这些适合:
suspend fun有些数据是连续的。
例如:
定位 下载进度 WebSocket 蓝牙通知 数据库监听 页面状态变化这些适合:
Flow所以 Kotlin 协程体系不是简单地“多造了几个 API”。
它是在补完整个异步编程模型:
一次结果 连续结果 状态 事件 消息 回调十二、整个协程体系的设计主线
现在再看这些概念:
Thread Coroutine suspend Flow StateFlow SharedFlow Channel callbackFlow它们其实一直在回答不同层次的问题。
Thread:任务怎么执行? Coroutine:任务怎么暂停和恢复? suspend:一次异步结果怎么表达? Flow:连续异步结果怎么表达? StateFlow:状态怎么保存和观察? SharedFlow:事件怎么分发? Channel:消息怎么排队? callbackFlow:回调怎么接入协程体系?你会发现:
Kotlin 协程体系,并不是一堆零散 API。
它真正解决的是:
程序里的任务和数据,应该如何流动。
十三、最终总结
如果让我一句话解释suspend,我会说:
suspend 解决一次异步结果。如果让我一句话解释Flow,我会说:
Flow 是支持 suspend 的 Sequence。如果让我解释为什么已经有了suspend,还要有Flow,我会说:
suspend 只能表达一次结果。 Flow 可以表达连续结果。所以:
suspend fun getUser(): User适合一次请求。
而:
fun observeLocation(): Flow<Location>适合连续数据。
真正理解 Flow,不是记住:
emit collect flowOn StateFlow SharedFlow而是理解:
现实世界里有大量连续产生的数据。 suspend 只能解决一次结果。 Flow 才能解决连续结果。这就是 Flow 的设计哲学。
下篇预告
到这里,整个协程设计思想系列已经走到了一个非常有意思的位置。
我们已经讲完了:
CoroutineContext Job Dispatcher launch / async Exception Structured Concurrency Supervisor suspend Flow那么最后一个问题来了:
Kotlin 协程到底解决了什么问题?下一篇继续:
Kotlin 协程设计思想(十):Kotlin 协程到底解决了什么问题?
—— 从 Thread、Future、Callback、RxJava 到 Coroutine、Flow,彻底讲透 Kotlin 协程的发展脉络,以及 Google 为什么最终选择了这套设计。
最后再补一句。
整个《Kotlin 协程设计思想》系列,其实一直在回答同一个问题:
程序里的任务和数据,到底应该如何流动?Thread 解决怎么执行。
Coroutine 解决怎么暂停。
suspend 解决一次异步结果。
Flow 解决连续异步结果。
StateFlow 解决状态。
SharedFlow 解决事件。
Channel 解决消息队列。
callbackFlow 解决回调接入。
这才是这个系列真正的主线。
也正是它比单纯讲 API 更有价值的地方。
