自己撸一个 EventBus,不同事件使用独立的 flow,没有订阅时自动移除 flow:
import androidx.lifecycle.Lifecycle
import androidx.lifecycle.LifecycleOwner
import androidx.lifecycle.lifecycleScope
import androidx.lifecycle.repeatOnLifecycle
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.Job
import kotlinx.coroutines.MainScope
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.launch
import kotlin.reflect.KClass
object EventBus {
private val scope = CoroutineScope(SupervisorJob() + Dispatchers.Default)
private val lock = Any()
// Key: Event class, Value: SharedFlow
private val event2flow = mutableMapOf<KClass<*>, MutableSharedFlow<*>>()
@OptIn(FlowPreview::class)
@PublishedApi
@Suppress("UNCHECKED_CAST")
internal fun <T : Any> getOrCreateFlow(type: KClass<out T>): MutableSharedFlow<T> {
return synchronized(lock) {
val existing = event2flow[type] as? MutableSharedFlow<T>
existing ?: MutableSharedFlow<T>(
replay = 0,
extraBufferCapacity = 64,
onBufferOverflow = BufferOverflow.SUSPEND
).apply {
event2flow[type] = this
// 自动移除没有订阅的 flow
subscriptionCount
.dropWhile { it == 0 } // 忽略初始状态的 0 订阅,也可以使用 drop(1)
.debounce(2000) // 防止短时间内有人订阅→取消→订阅→取消导致多次删除/创建 flow
.filter { it == 0 }
.onEach { synchronized(lock) { event2flow.remove(type) } }
.launchIn(scope)
}
}
}
suspend fun <T : Any> post(event: T) {
getOrCreateFlow(event::class).emit(event)
}
fun <T : Any> tryPost(event: T): Boolean {
return getOrCreateFlow(event::class).tryEmit(event)
}
inline fun <reified T : Any> observe(): Flow<T> {
return getOrCreateFlow(T::class).filterIsInstance()
}
}
fun <T : Any> postEvent(event: T): Boolean = EventBus.tryPost(event)
inline fun <reified T : Any> LifecycleOwner.observeEvent(
minActiveState: Lifecycle.State = Lifecycle.State.STARTED,
crossinline action: (T) -> Unit
): Job = lifecycleScope.launch {
// 仅在 Lifecycle 处于 minActiveState 及以上时收集
lifecycle.repeatOnLifecycle(minActiveState) {
EventBus.observe<T>().collect {
action(it)
}
}
}
// 非生命周期感知订阅,需要手动取消
inline fun <reified T : Any> observeEvent(
scope: CoroutineScope = MainScope(),
crossinline action: (T) -> Unit
): Job = scope.launch {
EventBus.observe<T>().collect {
action(it)
}
}