开撸
核心代码很简单,主要就两个 interface:
interface Flow<out T> {
fun collect(collector: Collector<T>)
}
fun interface Collector<in T> {
fun emit(value: T)
}
private class FlowImpl<T>(private val block: Collector<T>.() -> Unit) : Flow<T> {
override fun collect(collector: Collector<T>) {
collector.block()
}
}
fun <T> flow(block: Collector<T>.() -> Unit): Flow<T> = FlowImpl(block)
加上 transform,扩展一些变换:
inline fun <T, R> Flow<T>.transform(
crossinline transform: Collector<R>.(value: T) -> Unit
): Flow<R> = flow {
collect { value ->
return@collect transform(value)
}
}
inline fun <T, R> Flow<T>.map(crossinline transform: (value: T) -> R): Flow<R> =
transform { value ->
return@transform emit(transform(value))
}
加上 suspend,支持异步数据流:
interface Flow<out T> {
suspend fun collect(collector: Collector<T>)
}
fun interface Collector<in T> {
suspend fun emit(value: T)
}
private class FlowImpl<T>(private val block: suspend Collector<T>.() -> Unit) : Flow<T> {
override suspend fun collect(collector: Collector<T>) {
collector.block()
}
}
fun <T> flow(block: suspend Collector<T>.() -> Unit): Flow<T> = FlowImpl(block)
inline fun <T, R> Flow<T>.transform(
crossinline transform: suspend Collector<R>.(value: T) -> Unit
): Flow<R> = flow {
collect { value ->
return@collect transform(value)
}
}
inline fun <T, R> Flow<T>.map(crossinline transform: suspend (value: T) -> R): Flow<R> =
transform { value ->
return@transform emit(transform(value))
}
其实不加 suspend,在发射数据时手动协程处理也是可以的,只是整个 Flow 写法稍微麻烦点。
Flow 跟 coroutine 没有多大关系,相互之间比较独立,我一直觉得 Flow 更适合放在 Jetpack 里,与 LiveData 并列。
SharedFlow StateFlow
Flow 是冷流,在不调用终端操作符的情况下,flow 构建代码块是不会执行的。而 SharedFlow、StateFlow 是热流。
两者都有相应的可变类 MutableStateFlow 与 MutableSharedFlow。
SharedFlow 与 StateFlow 比较像事件与状态的区别1,目的不同导致实现不同。
ShareFlow
MutableSharedFlow 定义:
public fun <T> MutableSharedFlow(
replay: Int = 0,
extraBufferCapacity: Int = 0,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
)
- replay - 重播给新订阅者的 value 数量(不能为负数,默认为 0)
- extraBufferCapacity - 除重播外缓冲值的数量(可选,不能为负,默认为 0),当有剩余缓冲空间时,emit 不会挂起
- onBufferOverflow - 配置缓冲区溢出的策略(可选,默认 BufferOverflow.SUSPEND,缓冲区溢出时挂起;另外还有 DROP_OLDEST 和 DROP_LATEST,分别是丢弃最旧/最新的值,不挂起)
与 Flow 直来直去不同,ShareFlow 内部有一个 buffer 数组缓冲区根据构造函数配置维持着一定的缓冲数据,没有默认初始值,也无法直接获取缓冲区的值。
另外,ShareFlow 在有多个订阅者时,要等到订阅者全部接收并且处理完成之后才会进行下一次发送,否则发送会挂起。
SharedFlow 首个值发射丢失问题
fun main() {
val sharedFlow = MutableSharedFlow<Int>()
GlobalScope.launch {
sharedFlow.onEach {
println("collect1: $it")
}.collect()
}
GlobalScope.launch {
sharedFlow.collectLatest {
println("collect2: $it")
}
}
runBlocking {
var value = 0
while (value < 3) {
// suspendCancellableCoroutine<Unit> { it.resume(Unit) }
// delay(100)
println("emit: $value")
sharedFlow.emit(value++)
delay(500)
}
}
}
emit: 0 // 发射的 0 没有被 collect 到
emit: 1
collect1: 1
collect2: 1
emit: 2
collect1: 2
collect2: 2
可以看到无论是 collect1 还是 collect2 发射的 0 没有被打印出来。如果放开注释 suspendCancellableCoroutine<Unit> { it.resume(Unit) }
(delay(100)
依然保持注释),则 collect1 可以打印出 0,collect2 仍然没有。换成放开注释 delay(100)
则 collect1 和 collect2 都能打印出 0。
解决方法:构造 SharedFlow 时,设置 replay = 1。
StateFlow
MutableStateFlow 定义:
public fun <T> MutableStateFlow(value: T)
StateFlow 用于标记状态,内部有一个范型 value 保存着最新变化的状态值,订阅者可以通过调用该 value 随时获取最新变化的状态值,为了保证该 value 不为空,初始化 StateFlow 必须传入初始值。StateFlow 只维持最新变化的状态值这一点很重要,“只”意味着 StateFlow 只能保证订阅者有可用的最新状态,允许历史状态丢失(比如 collect 与 emit 间隔周期不一致,emit 每次发送后 delay 1 秒,collect 每次收集后 delay 2秒,那么 collect 就会间隔性地漏掉一个状态)。“最新变化”意味着如果发射的值与当前 value 值一致,则不会触发 collect,毕竟状态发生没有变化,不需要再收集相同的状态值再做一遍同样的变化处理。
SharedFlow 继承自 Flow,而 StateFlow 又继承自 SharedFlow。
事实上 StateFlow 就是一个 replay = 1
onBufferOverflow = BufferOverflow.DROP_OLDEST
并且连续相同数据防抖的 SharedFlow:
// MutableStateFlow(initialValue) is a shared flow with the following parameters:
val shared = MutableSharedFlow(
replay = 1,
onBufferOverflow = BufferOverflow.DROP_OLDEST,
)
shared.tryEmit(initialValue) // emit the initial value
val state = shared.distinctUntilChanged() // get StateFlow-like behavior
Footnote
-
一直比较好奇为什么 SharedFlow 不取名 EventFlow?
↩