Callback
很多三方库的耗时 api 在调用的时候,都需要传入一个 callback,library 内部自动开启子线程执行耗时操作, 待完成时再通过 callback 回调给调用方,避免阻塞调用所在的线程。热心一点的 library 会在 callback 的时候通过 Handler 切换到主线程, 当然更热心一点的 library 会切换到调用所在的线程。
// 这里的 Result 并不是 kotlin 的包裹类 `Result`,而是泛指 library 回调的结果
xxLibrary.ooApi(
object : Callback() {
override fun onSuccess(result: Result) {
// do something success
}
override fun onFailure(throwable: Throwable) {
// do something failure
}
}
)
suspendCoroutine
现在我们想要给这个耗时 api 调用添加一个 timeout 限制,超过这个时间如果 library 没有回调返回就执行其他操作。 一个大家都比较容易想到的办法:在调用的时候开启一个倒计时,倒计时内 library 回调返回就取消倒计时,然后正常执行后续流程, 倒计时结束依然没有回调则执行 timeout 操作。
这个方法除了看起来不怎么优雅之外没有任何问题,事实上这也是唯一可行的方法,只不过如果我们自己手动去写这一套倒计时逻辑就会显得比较蠢。 kotlin 协程里边自带了一套 timeout suspend 方法,所以我们打算用协程来实现。
现在唯一的问题是我们需要把 library 的异步回调改成同步调用以便统计计时,而协程普通的 launch/async 方法无法完成这种 callback 异步转同步。 这时候就轮到 suspendCoroutine:
suspendCoroutine { continuation ->
xxLibrary.ooApi(object : Callback {
override fun onSuccess(result: Result) {
continuation.resume(result)
}
override fun onFailure(throwable: Throwable) {
continuation.resumeWithException(throwable)
}
})
}
再把 timeout 加上:
val exceptionHandler = CoroutineExceptionHandler { _, throwable ->
// 这里捕获的不止 onFailure 异常,也包含代码 bug 引起的异常,be careful!
Log.e(TAG, "Coroutine Exception: ${throwable.message}")
}
CoroutineScope(Dispatchers.Main).launch(exceptionHandler) {
val ret = withTimeoutOrNull(timeout) {
suspendCoroutine { continuation ->
xxLibrary.ooApi(object : Callback {
override fun onSuccess(result: Result) {
Log.d(TAG, "onSuccess: $result")
continuation.resume(result)
}
override fun onFailure(throwable: Throwable) {
Log.w(TAG, "onFailure: ${throwable.message}")
continuation.resumeWithException(throwable)
}
})
}
} ?: kotlin.run {
val message = "Timed out waiting for $timeout ms"
Log.w(TAG, "timeout: $message")
}
}
看过协程源码的应该知道,协程 cps 转换,本质上是对 suspend 函数添加了一个 continuation 参数,中断函数并执行 block,根据结果成功异常与否调用 continuation.resume 或者 continuation.resumeWithException 恢复中断的函数。
launch/async 之类的操作对外屏蔽了 continuation,编译器根据关键字 suspend 在编译期间自动处理相关中断/恢复逻辑,而 suspendCoroutine 则把 continuation 抛了出来。不调用 continuation resume,当前函数便一直处于中断,使用 suspendCoroutine 需要开发者自行调用 resume 决定恢复中断的时机,大大提高了灵活性。有点类似于 View 与自定义 View。
suspendCancellableCoroutine
suspendCoroutine 很好,但存在一个缺陷,以上面的代码为例,library 在超时后依然可能调用成功并执行 onSuccess callback, 因为只是 suspendCoroutine 协程超时了,我们并没有取消 library 调用,两者是独立的。
想要取消 library 调用,我们需要用到 suspendCancellableCoroutine,suspendCancellableCoroutine 相比 suspendCoroutine 多了一个 continuation.invokeOnCancellation 回调,可以在这里 cancel library 调用:
val exceptionHandler = CoroutineExceptionHandler { _, throwable ->
// 这里捕获的不止 onFailure 异常,也包含代码 bug 引起的异常,be careful!
Log.e(TAG, "Coroutine Exception: ${throwable.message}")
}
CoroutineScope(Dispatchers.Main).launch(exceptionHandler) {
val ret = withTimeoutOrNull(timeout) {
suspendCancellableCoroutine { continuation ->
val taskId = xxLibrary.ooApi(object : Callback {
override fun onSuccess(result: Result) {
Log.d(TAG, "onSuccess: $result")
continuation.resume(result)
}
override fun onFailure(throwable: Throwable) {
Log.w(TAG, "onFailure: ${throwable.message}")
continuation.resumeWithException(throwable)
}
})
continuation.invokeOnCancellation {
// 假设 xxLibrary.ooApi() 会返回一个 taskId 并有相应 cancel 方法
xxLibrary.cancel(taskId)
}
}
} ?: kotlin.run {
val message = "Timed out waiting for $timeout ms"
Log.w(TAG, "timeout: $message")
}
}
当然,这一切的前提是 library 支持 cancel 方法,如果 library 本身并不支持 cancel,那么就只能小心 onSuccess 里的操作了: onSuccess 除了 continuation.resume(result) 的其他操作都需要移到 suspendCancellableCoroutine 之外,因为 suspendCancellableCoroutine 是带返回值的(超时之后 onSuccess 的 continuation.resume(result) 不会返回),所以照样可以在外面进行调用成功的后续操作,也避免了超时之后 onSuccess 继续执行带来的副作用。
resume twice
目前为止,一切看起来都很合理正常,但上面 suspendCoroutine 和 suspendCancellableCoroutine 的代码都存在一个致命问题:
如果调用一次 library,成功返回了不止一个结果(即 onSuccess 回调了多次),那么第二次 continuation.resume(result) 就会导致 crash java.lang.IllegalStateException: Already resumed
。
我们可以简单地在 onSuccess 里重复 continuation.resume(result) 即可模拟出该 crash(我知道有人会说这应该是 library 的问题,但作为调用方来说我们也有义务使自己的代码更加健壮对吧)。
针对这个问题,只需要做一点点小小的改动,在 continuation resume 时增加一个判断即可:
val exceptionHandler = CoroutineExceptionHandler { _, throwable ->
// 这里捕获的不止 onFailure 异常,也包含代码 bug 引起的异常,be careful!
Log.e(TAG, "Coroutine Exception: ${throwable.message}")
}
CoroutineScope(Dispatchers.Main).launch(exceptionHandler) {
val ret = withTimeoutOrNull(timeout) {
suspendCancellableCoroutine { continuation ->
val taskId = xxLibrary.ooApi(object : Callback {
override fun onSuccess(result: Result) {
Log.d(TAG, "onSuccess: $result")
if (continuation.isActive) {
continuation.resume(result)
} else {
// continuation is completed or cancelled, ignore
}
}
override fun onFailure(throwable: Throwable) {
Log.w(TAG, "onFailure: ${throwable.message}")
if (continuation.isActive) {
continuation.resumeWithException(throwable)
}
}
})
continuation.invokeOnCancellation {
// 假设 xxLibrary.ooApi() 会返回一个 taskId 并有相应 cancel 方法
xxLibrary.cancel(taskId)
}
}
} ?: kotlin.run {
val message = "Timed out waiting for $timeout ms"
Log.w(TAG, "timeout: $message")
}
}
one more thing
BTW,都已经 2023 年了,写 library 的朋友可以考虑把 callback 的 onSuccess 和 onFailure 回调用 kotlin 的包裹类 Result
合并一下:
// 这里的 Result 指的就是 kotlin 的包裹类 `Result`
xxLibrary.ooApi(
object : Callback() {
override fun onComplete(result: Result) {
result.onSuccess {
// do something success
}.onFailure {
// do something failure
}
}
}
)
合并之后的好处是,Callback 的回调方法只有一个 onComplete,那么可以使用 SAM 转换:
xxLibrary.ooApi { result ->
result.onSuccess {
// do something success
}.onFailure {
// do something failure
}
}
很多现代语言都在借鉴使用 Result 包裹 success 和 failure 进行统一返回。