EventBus

Posted on By ᵇᵒ

自己撸一个 EventBus,不同事件使用独立的 flow,没有订阅时自动移除 flow:

import androidx.lifecycle.Lifecycle
import androidx.lifecycle.LifecycleOwner
import androidx.lifecycle.lifecycleScope
import androidx.lifecycle.repeatOnLifecycle
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.Job
import kotlinx.coroutines.MainScope
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.launch
import kotlin.reflect.KClass

object EventBus {
    private val scope = CoroutineScope(SupervisorJob() + Dispatchers.Default)
    private val lock = Any()

    // Key: Event class, Value: SharedFlow
    private val event2flow = mutableMapOf<KClass<*>, MutableSharedFlow<*>>()

    @OptIn(FlowPreview::class)
    @PublishedApi
    @Suppress("UNCHECKED_CAST")
    internal fun <T : Any> getOrCreateFlow(type: KClass<out T>): MutableSharedFlow<T> {
        return synchronized(lock) {
            val existing = event2flow[type] as? MutableSharedFlow<T>
            existing ?: MutableSharedFlow<T>(
                replay = 0,
                extraBufferCapacity = 64,
                onBufferOverflow = BufferOverflow.SUSPEND
            ).apply {
                event2flow[type] = this
                // 自动移除没有订阅的 flow
                subscriptionCount
                    .dropWhile { it == 0 }  // 忽略初始状态的 0 订阅,也可以使用 drop(1)
                    .debounce(2000) // 防止短时间内有人订阅→取消→订阅→取消导致多次删除/创建 flow
                    .filter { it == 0 }
                    .onEach { synchronized(lock) { event2flow.remove(type) } }
                    .launchIn(scope)
            }
        }
    }

    suspend fun <T : Any> post(event: T) {
        getOrCreateFlow(event::class).emit(event)
    }

    fun <T : Any> tryPost(event: T): Boolean {
        return getOrCreateFlow(event::class).tryEmit(event)
    }

    inline fun <reified T : Any> observe(): Flow<T> {
        return getOrCreateFlow(T::class).filterIsInstance()
    }
}

fun <T : Any> postEvent(event: T): Boolean = EventBus.tryPost(event)

inline fun <reified T : Any> LifecycleOwner.observeEvent(
    minActiveState: Lifecycle.State = Lifecycle.State.STARTED,
    crossinline action: (T) -> Unit
): Job = lifecycleScope.launch {
    // 仅在 Lifecycle 处于 minActiveState 及以上时收集
    lifecycle.repeatOnLifecycle(minActiveState) {
        EventBus.observe<T>().collect {
            action(it)
        }
    }
}

// 非生命周期感知订阅,需要手动取消
inline fun <reified T : Any> observeEvent(
    scope: CoroutineScope = MainScope(),
    crossinline action: (T) -> Unit
): Job = scope.launch {
    EventBus.observe<T>().collect {
        action(it)
    }
}