Kotlin協程系列(二)

2023-11-27 18:00:28

  在進行業務開發時,我們通常會基於官方的協程框架(kotlinx.coroutines)來運用Kotlin協程優化非同步邏輯,不過這個框架過於龐大和複雜,如果直接接觸它容易被勸退。所以,為了我們在後續的學習中游刃有餘,在使用官方給出的複合協程時能夠胸有成竹,我們暫且拋開它,按照它的思路實現一個輕量版的協程框架。

1.開胃小菜:實現一個delay函數

  在實現delay函數前,我們先來思考delay函數的作用。在使用執行緒開發時,如果我們想讓一段程式碼延遲一段時間再執行,我們一般會用Thread.sleep函數,但是這個函數的缺點是它會阻塞當前執行緒。在協程當中,我們同樣可以這樣做,只是這麼做不好,明知道協程可以掛起,卻要它阻塞執行緒,豈不是在浪費cpu資源?

  我們的目的是讓程式碼延遲一段時間後再執行,只要做到這點就好了。因此,delay函數的實現可以確定以下兩點:

  • 不需要阻塞執行緒
  • 是個掛起函數,指定時間後,能夠恢復執行即可

  這裡,直接給出delay函數的實現,然後再作出解釋:

suspend fun delay(time:Long,unit: TimeUnit =TimeUnit.MILLISECONDS){
    val executor=Executors.newScheduledThreadPool(1){r->
        Thread(r,"Scheduler").apply{
            isDaemon=true
        }
    }
    if(time<=0){
        return
    }
    suspendCoroutine<Unit> {continuation ->
        executor.schedule({continuation.resume(Unit)},time,unit)
    }
}

  當time不大於0時,表示無延遲,直接返回就好;接下來需要考慮掛起,我們可以使用suspendCoroutine,不難想到,只要再指定time之後,恢復協程的執行就好,所以只要能夠給我們提供一個這樣的定時回撥機制就可以輕鬆實現這個功能。

  在jvm上,我們很自然的就可以想到使用ScheduledExecutorService,問題就這樣迎刃而解了。

2.協程的描述

  客觀的講,startCoroutine和createCoroutine這兩個api並不適合直接做業務開發。因此,對於協程的建立,在框架中也要根據不同的目的提供不同的構建器(例如launch,async),其背後對於封裝出來的複合協程的型別描述,就是至關重要的一環。

  協程的描述類,官方給出的名字是Job,和執行緒的描述類Thread相比,Job同樣有join函數,呼叫時會掛起協程,直到它的完成,它的cancel函數可以對應Thread的interrupt函數,用於取消協程,isActive可以類比Thread的isActive(),用於查詢協程是否還在執行。此外,Job還有取消回撥函數invokeOnCancel,完成回撥函數invokeOnComplete,用於移除回撥的remove函數。

3.協程的建立

  我們已經給出了協程的描述,知道了協程應該具有哪些能力,接下來就需要如何封裝協程的建立了。

  3.1無返回值的launch函數

    如果一個協程的返回值時Unit,我們可以稱它為無返回值的,對於這樣的協程,我們只需要啟動它即可,下面我們給出launch函數的定義:

fun launch(context: CoroutineContext=EmptyCoroutineContext,block:suspend ()->Unit):Job{
    val completion=StandaloneCoroutine(context)
    block.startCoroutine(completion)
    return completion
}
class StandaloneCoroutine(context:CoroutineContext):AbstractCoroutine<Unit>(context){}
abstract class AbstractCoroutine<T>(context:CoroutineContext):Job,Continuation<T>{
protected val state=AtomicReference<CoroutineState>()
override val context:CoroutineContext
init{
state.set(CoroutineState.Incomplete())
this.context=context+this
}
val isCompleted
get()=state.get() is CoroutineState.Complete<*>
override val isActive:Boolean
get()=when(state.get()){
is CoroutineState.Complete<*>->false
is CoroutineState.Cancelling->false
else->true
}
......
}

    其中,StandaloneCoroutine是AbstractCoroutine的子類,目前只有一個空實現。

  3.2實現join函數

    join函數是一個掛起函數,他需要等待協程的執行,此時會有兩種情況:被等待的協程已經執行完成,join函數就不會掛起,而是立馬返回;被等待的協程尚未完成,此時join將協程掛起,直到協程完成。

    下面給出join函數的虛擬碼實現:

sealed class CoroutineState {
    class Incomplete():CoroutineState()
    class Cancelling():CoroutineState()
    class Complete():CoroutineState()
}

suspend fun join(){
    when(state.get()){
        is CoroutineState.Incomplete->return joinSuspend()
        is CoroutineState.Cancelling->return joinSuspend()
        is CoroutineState.Complete->return
    }
}

suspend fun joinSuspend()= suspendCoroutine<Unit> {continuation ->   
    doOnCompleted{result->
        continuation.resume(Unit)
    }
}

  3.3有返回值的async函數

    現在,我們已經知道如何啟動協程和等待協程完成了,不過很多時候,我們想拿到協程的返回值,因此我們再基於Job介面再定義一個Deferred介面,如下所示:

interface Deferred<T>:Job{
    suspend fun await()
}

    這裡多了一個泛型引數T,T表示返回值型別,通過它的await函數可以拿到這個返回值,因此await函數的主要作用有:在協程執行完成時,立即拿到協程的結果;如果協程尚未完成,則掛起協程,直到它完成,這一點和join類似。下面,給出await函數的定義:

class DeferredCoroutine<T>(context:CoroutineContext):AbstractCoroutine<T>(context),Deferred<T>{
    override suspend fun await():T{
        val currentState=state.get()
        return when(currentState){
            is CoroutineState.Incomplete->awaitSuspend()
            is CoroutineState.Cancelling->awaitSuspend()
            is CoroutineState.Complete->currentState.value as T
        }
    }
    private suspend fun awaitSuspend()=suspendCoroutine<T>{continuation->
        doOnCompleted{result->
            continuation.resumeWith(result)
        }
    }
}

    await函數的實現思路和join函數類似,只是在對結果的處理上有差異。接下來,我們再給出async函數的實現,程式碼如下:

fun <T> async(context:CoroutineContext=EmptyCoroutineContext,block:suspend ()->T):Deferred<T>{
    val completion=DeferredCoroutine<T>(context)
    block.startCoroutine(completion)
    return completion
}

    這樣,我們就可以啟動有返回值的協程了,首先定義一個掛起函數,然後用delay(1000)函數來模擬耗時操作,然後我們用async啟動協程,並獲取協程的返回值,程式碼如下:

suspend fun getValue():String{
    delay(1000)
    return "使用async啟動協程"
}
suspend fun main(){
    val deferred=async{
        getValue()
    }
    val result=deferred.await()
    println(result)//將會列印"使用async啟動協程"
}

4.協程的排程

  截至目前,我們已經大致用程式勾勒出一個比較完善的複合協程了,不過還有一個問題沒有解決,我們的協程是如何實現並行的?我們前面在介紹協程的時候提到過協程的掛起和恢復和執行緒的不同點在於在哪兒掛起,什麼時候恢復是開發者自己決定的,這意味著排程工作不能交給作業系統,而應該在使用者態解決。

  協程需要排程的位置就是掛起點的位置,當協程執行到掛起點的位置時,如果產生了非同步行為,協程就會在這個掛起點掛起,只有協程在掛起點正真掛起時,我們才有機會實現排程,而實現排程器需要使用協程的攔截器。排程的本質就是解決協程在掛起點恢復後的協程邏輯在哪裡執行的問題,由此給出排程器的介面定義:

interface Dispatcher{
  fun dispatch(block:()->Unit)    
}

  接下來,我們將排程器和攔截器結合起來,攔截器是協程上下文元素的一類實現,下面給出基於排程器的攔截器實現的定義:

open class DispatcherContext(private val dispatcher:Dispatcher):AbstractCoroutineContextElement(ContinuationInterceptor),ContinuationInterceptor{
    override fun <T> interceptorContinuation(continuation:Continuation<T>):Continuation<T>=DispatchedContinuation(continuation,dispatcher)
}
private class DispatchedContinuation<T>(val delegate:Continuation<T>,val dispatcher:Dispatcher):Continuation<T>{
    override val context=delegate.context
    override fun resumeWith(result: Result<T>) {
        dispatcher.dispatch{
            delegate.resumeWith(result)//通過dispatch將協程的恢復執行排程在指定的排程器上
        }
    }
}

  排程的具體過程其實就是在delegate的恢復呼叫之前,通過dispatch將其排程在指定的排程器上。下面我們介紹一下有哪些排程器型別:

  • 預設排程器(Dispatchers.Default):使用共用的執行緒池,適用於 CPU 密集型的計算任務。預設排程器的執行緒數量通常與可用的 CPU 核數相等,因此適用於平行計算。但不適合執行可能導致執行緒阻塞的操作
  • 主執行緒排程器(Dispatchers.Main):適用於 Android 應用程式中執行 UI 操作的協程。它會將協程的執行切換到主執行緒,以確保 UI 操作不會在後臺執行緒上執行
  • IO排程器(Dispatchers.IO):專門用於執行涉及阻塞 IO 操作的協程,例如檔案讀寫或網路請求。IO 排程器使用一個專門的執行緒池,允許執行大量的 IO 操作而不阻塞執行緒
  • 無限制排程器(Dispatchers.Unconfined):允許協程在呼叫掛起函數的執行緒中繼續執行,直到第一個掛起點。之後,它可能在其他執行緒中繼續執行,這取決於具體的掛起函數實現。

 5.協程的作用域

  通常我們提到域,都是用來表示範圍的,域既有約束作用,又能提供額外的能力。官方框架在實現複合協程的過程中也提供了作用域,主要用以明確協程之間的父子關係以及對於取消的傳播行為。該作用域包括以下三種:

  • 頂級作用域:沒有父協程的協程所在的作用域為頂級作用域
  • 協同作用域:協程中啟動新的協程,新協程為所在協程的子協程,這種情況下子協程所在的作用域預設為協同作用域。
  • 主同作用域:與協程作用域在協程的父子關係上一致,區別在於處於該作用域下的協程出現未捕獲的異常時不會將異常向上傳遞給父協程。

  除了這三種作用域中提到的行為外,父子協程之間還存在以下規則:

  • 父協程被取消,則所有的子協程均被取消
  • 父協程需要等待子協程執行完畢後才最終進入完成狀態
  • 子協程會繼承父協程的協程上下文中的元素,如果自身有相同的key的成員,則覆蓋對應的key,覆蓋效果僅限自身範圍內有效

  下面給出一個協程作用域的通用介面:

interface CoroutineScope{
  val coroutineContext:CoroutineContext  
}

  從約束的角度來講,既然有了作用域,我們就不能任意直接使用launch和async函數來建立協程了,加上作用域之後,我們的launch函數的定義如下:

fun CoroutineScope.launch(context:CoroutineContext=EmptyCoroutineContext,block:suspend CoroutineScope.()->Unit):Job{
    val completion=StandaloneCoroutine(newContext)
    block.startCoroutine(completion,completion)
    return completion
}

  async函數的實現類似,請自行嘗試。