手写一个 Flow

Posted on By ᵇᵒ

开撸

核心代码很简单,主要就两个 interface:

interface Flow<out T> {
    fun collect(collector: Collector<T>)
}

fun interface Collector<in T> {
    fun emit(value: T)
}

private class FlowImpl<T>(private val block: Collector<T>.() -> Unit) : Flow<T> {
    override fun collect(collector: Collector<T>) {
        collector.block()
    }
}

fun <T> flow(block: Collector<T>.() -> Unit): Flow<T> = FlowImpl(block)

加上 transform,扩展一些变换:

inline fun <T, R> Flow<T>.transform(
    crossinline transform: Collector<R>.(value: T) -> Unit
): Flow<R> = flow {
    collect { value ->
        return@collect transform(value)
    }
}

inline fun <T, R> Flow<T>.map(crossinline transform: (value: T) -> R): Flow<R> =
    transform { value ->
        return@transform emit(transform(value))
    }

加上 suspend,支持异步数据流:

interface Flow<out T> {
    suspend fun collect(collector: Collector<T>)
}

fun interface Collector<in T> {
    suspend fun emit(value: T)
}

private class FlowImpl<T>(private val block: suspend Collector<T>.() -> Unit) : Flow<T> {
    override suspend fun collect(collector: Collector<T>) {
        collector.block()
    }
}

fun <T> flow(block: suspend Collector<T>.() -> Unit): Flow<T> = FlowImpl(block)

inline fun <T, R> Flow<T>.transform(
    crossinline transform: suspend Collector<R>.(value: T) -> Unit
): Flow<R> = flow {
    collect { value ->
        return@collect transform(value)
    }
}

inline fun <T, R> Flow<T>.map(crossinline transform: suspend (value: T) -> R): Flow<R> =
    transform { value ->
        return@transform emit(transform(value))
    }

其实不加 suspend,在发射数据时手动协程处理也是可以的,只是整个 Flow 写法稍微麻烦点。
Flow 跟 coroutine 没有多大关系,相互之间比较独立,我一直觉得 Flow 更适合放在 Jetpack 里,与 LiveData 并列。

SharedFlow StateFlow

Flow 是冷流,在不调用终端操作符的情况下,flow 构建代码块是不会执行的。而 SharedFlow、StateFlow 是热流。
两者都有相应的可变类 MutableStateFlow 与 MutableSharedFlow。

SharedFlow 与 StateFlow 比较像事件与状态的区别1,目的不同导致实现不同。

ShareFlow

MutableSharedFlow 定义:

public fun <T> MutableSharedFlow(
    replay: Int = 0,
    extraBufferCapacity: Int = 0,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
)
  • replay - 重播给新订阅者的 value 数量(不能为负数,默认为 0)
  • extraBufferCapacity - 除重播外缓冲值的数量(可选,不能为负,默认为 0),当有剩余缓冲空间时,emit 不会挂起
  • onBufferOverflow - 配置缓冲区溢出的策略(可选,默认 BufferOverflow.SUSPEND,缓冲区溢出时挂起;另外还有 DROP_OLDEST 和 DROP_LATEST,分别是丢弃最旧/最新的值,不挂起)

与 Flow 直来直去不同,ShareFlow 内部有一个 buffer 数组缓冲区根据构造函数配置维持着一定的缓冲数据,没有默认初始值,也无法直接获取缓冲区的值。
另外,ShareFlow 在有多个订阅者时,要等到订阅者全部接收并且处理完成之后才会进行下一次发送,否则发送会挂起。

SharedFlow 首个值发射丢失问题

fun main() {
    val sharedFlow = MutableSharedFlow<Int>()
    GlobalScope.launch {
        sharedFlow.onEach {
            println("collect1: $it")
        }.collect()
    }
    GlobalScope.launch {
        sharedFlow.collectLatest {
            println("collect2: $it")
        }
    }
    runBlocking {
        var value = 0
        while (value < 3) {
            // suspendCancellableCoroutine<Unit> { it.resume(Unit) }
            // delay(100)
            println("emit: $value")
            sharedFlow.emit(value++)
            delay(500)
        }
    }
}
emit: 0     // 发射的 0 没有被 collect 到
emit: 1
collect1: 1
collect2: 1
emit: 2
collect1: 2
collect2: 2

可以看到无论是 collect1 还是 collect2 发射的 0 没有被打印出来。如果放开注释 suspendCancellableCoroutine<Unit> { it.resume(Unit) }delay(100) 依然保持注释),则 collect1 可以打印出 0,collect2 仍然没有。换成放开注释 delay(100) 则 collect1 和 collect2 都能打印出 0。

解决方法:构造 SharedFlow 时,设置 replay = 1。

StateFlow

MutableStateFlow 定义:

public fun <T> MutableStateFlow(value: T)

StateFlow 用于标记状态,内部有一个范型 value 保存着最新变化的状态值,订阅者可以通过调用该 value 随时获取最新变化的状态值,为了保证该 value 不为空,初始化 StateFlow 必须传入初始值。StateFlow 只维持最新变化的状态值这一点很重要,“只”意味着 StateFlow 只能保证订阅者有可用的最新状态,允许历史状态丢失(比如 collect 与 emit 间隔周期不一致,emit 每次发送后 delay 1 秒,collect 每次收集后 delay 2秒,那么 collect 就会间隔性地漏掉一个状态)。“最新变化”意味着如果发射的值与当前 value 值一致,则不会触发 collect,毕竟状态发生没有变化,不需要再收集相同的状态值再做一遍同样的变化处理。

SharedFlow 继承自 Flow,而 StateFlow 又继承自 SharedFlow。
事实上 StateFlow 就是一个 replay = 1 onBufferOverflow = BufferOverflow.DROP_OLDEST 并且连续相同数据防抖的 SharedFlow:

// MutableStateFlow(initialValue) is a shared flow with the following parameters:
val shared = MutableSharedFlow(
    replay = 1,
    onBufferOverflow = BufferOverflow.DROP_OLDEST,
)
shared.tryEmit(initialValue) // emit the initial value
val state = shared.distinctUntilChanged() // get StateFlow-like behavior

Footnote

  1. 一直比较好奇为什么 SharedFlow 不取名 EventFlow?