如何讓Task在非執行緒池執行緒中執行?

2023-05-29 09:01:01

Task承載的操作需要被排程才能被執行,由於.NET預設採用基於執行緒池的排程器,所以Task預設線上程池執行緒中執行。但是有的操作並不適合使用執行緒池,比如我們在一個ASP.NET Core應用中承載了一些需要長時間執行的後臺操作,由於執行緒池被用來處理HTTP請求,如果這些後臺操作也使用執行緒池來排程,就會造成相互影響。在這種情況下,使用獨立的一個或者多個執行緒來執行這些後臺操作可能是一個更好的選擇。

一、基於執行緒池的排程
二、TaskCreationOptions.LongRunning
三、換成非同步操作呢?
四、換種寫法呢?
五、呼叫Wait方法
六、自定義TaskScheduler
七、獨立執行緒池

一、基於執行緒池的排程

我們通過如下這個簡單的程式來驗證預設基於執行緒池的Task排程。我們呼叫Task型別的靜態屬性Factory返回一個TaskFactory物件,並呼叫其StartNew方法啟動一個Task物件,這個Task指向的Run方法會在一個迴圈中呼叫Do方法。Do方法使用自選等待的方式模擬一段耗時2秒的操作,並在控制檯輸出當前執行緒的IsThreadPoolThread屬性確定是否是執行緒池執行緒。

Task.Factory.StartNew(Run);
Console.Read();

void Run()
{
    while (true)
    {
        Do();
    }
}

void  Do()
{
    var end = DateTime.UtcNow.AddSeconds(2);
    SpinWait.SpinUntil(() => DateTimeOffset.UtcNow > end);
    var isThreadPoolThread = Thread.CurrentThread.IsThreadPoolThread;
    Console.WriteLine($"[{DateTimeOffset.Now}]Is thread pool thread: {isThreadPoolThread}");
}

通過如下所示的輸出結果,我們得到了答案:利用TaskFactory建立的Task在預設情況下確實是通過執行緒池的形式被排程的。

image

二、TaskCreationOptions.LongRunning

很明顯,上述Run方法是一個需要永久執行的LongRunning操作,並不適合使用執行緒池來執行,實際上TaskFactory在設計的時候就考慮到了這一點,我們利用它建立一個Task的時候可以指定對應的TaskCreationOptions選項,其中一個選項就是LongRuning。我們通過如下的方式修改了上面這段程式,在呼叫StartNew方法時指定了這個選項。

Task.Factory.StartNew(Run, TaskCreationOptions.LongRunning);
Console.Read();

void Run()
{
    while (true)
    {
        Do();
    }
}

void  Do()
{
    var end = DateTime.UtcNow.AddSeconds(2);
    SpinWait.SpinUntil(() => DateTimeOffset.UtcNow > end);
    var isThreadPoolThread = Thread.CurrentThread.IsThreadPoolThread;
    Console.WriteLine($"[{DateTimeOffset.Now}]Is thread pool thread: {isThreadPoolThread}");
}

再次執行我們的程式就會通過如下的輸出結果看到Do方法將不會線上程池執行緒中執行了。

image

三、換成非同步操作呢?

由於LongRunning操作經常會涉及IO操作,所以我們執行方法經常會寫成非同步的形式。如下所示的程式碼中,我們將Do方法替換成DoAsync,將2秒的自旋等待替換成Task.Delay。由於DoAsync寫成了非同步的形式,Run也換成對應的RunAsync。

Task.Factory.StartNew(RunAsync, TaskCreationOptions.LongRunning);
Console.Read();

async Task RunAsync()
{
    while (true)
    {
       await DoAsync();
    }
}

async Task  DoAsync()
{
    await Task.Delay(2000);
    var isThreadPoolThread = Thread.CurrentThread.IsThreadPoolThread;
    Console.WriteLine($"[{DateTimeOffset.Now}]Is thread pool thread: {isThreadPoolThread}");
}

再次啟動程式後,我們發現又切換成了執行緒池排程了。為什麼會這樣呢?其實很好理解,由於原來返回void的Run方法被替換成了返回Task的RunAsync,傳入StartNew方法表示執行操作的委託型別從Action切換成了Func<Task>,雖然我們指定了LongRunning選項,但是StartNew方法只是採用這種模式執行Func<Task>這個委託物件而已,而這個委託在遇到await的時候就返回了。終於返回的Task物件,還是按照預設的方式進行排程執行。

image

四、換種寫法呢?

有人說,上面我們使用的是一個方法來表示作為引數的委託物件,如果我們按照如下的方式使用基於async/await的Lambda表示式呢?實際上這樣的Lambda表示式就是Func<Task>的另一種編寫方式而已。

Task.Factory.StartNew(async () => { while (true) await DoAsync();}, TaskCreationOptions.LongRunning);
Console.Read();


async Task  DoAsync()
{
    await Task.Delay(2000);
    var isThreadPoolThread = Thread.CurrentThread.IsThreadPoolThread;
    Console.WriteLine($"[{DateTimeOffset.Now}]Is thread pool thread: {isThreadPoolThread}");
}

五、呼叫Wait方法

其實這個問題很好解決,按照如下的方式將DoAsync方法換成同步形式的Do,將基於await的等待替換成針對Wait方法的呼叫就可以了。我想當你接觸Task的時候,就有很多人不斷提醒你,謹慎使用Wait方法,因為它會阻塞當前執行緒。實際上對於我們的硬要用場景,呼叫Wait方法才是正確的選擇,因為我們的初衷就是使用一個獨立的執行緒以獨佔的方式來執行所需的操作。

Task.Factory.StartNew(() => { while (true) Do(); }, TaskCreationOptions.LongRunning);
Console.Read();

void  Do()
{
    Task.Delay(2000).Wait();
    var isThreadPoolThread = Thread.CurrentThread.IsThreadPoolThread;
    Console.WriteLine($"[{DateTimeOffset.Now}]Is thread pool thread: {isThreadPoolThread}");
}

六、自定義TaskScheduler

既然針對執行緒池的使用是「Task排程」導致的,那麼我們自然可以通過重寫TaskScheduler的方式來解決這個問題。如下這個自定義的DedicatedThreadTaskScheduler 會採用獨立的執行緒來執行被排程的Task,執行緒的數量可以引數來指定。

internal sealed class DedicatedThreadTaskScheduler : TaskScheduler
{
    private readonly BlockingCollection<Task> _tasks = new();
    private readonly Thread[] _threads;
    protected override IEnumerable<Task>? GetScheduledTasks() => _tasks;
    protected override void QueueTask(Task task) => _tasks.Add(task);
    protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) => false;
    public DedicatedThreadTaskScheduler(int threadCount)
    {
        _threads = new Thread[threadCount];
        for (int index = 0; index < threadCount; index++)
        {
            _threads[index] = new Thread(_ =>
            {
                while (true)
                {
                    TryExecuteTask(_tasks.Take());
                }
            });
        }
        Array.ForEach(_threads, it => it.Start());
    }
}

我們演示範例中Run/Do方法再次還原成如下所示的純非同步模式的RunAsync/DoAsync,並在呼叫StartNew方法的時候建立一個DedicatedThreadTaskScheduler物件作為最後一個引數。

Task.Factory.StartNew(RunAsync, CancellationToken.None, TaskCreationOptions.LongRunning, new DedicatedThreadTaskScheduler(1));
Console.Read();

async Task RunAsync()
{
    while (true)
    {
        await DoAsync();
    }
}

async Task DoAsync()
{
    await Task.Delay(2000);
    var isThreadPoolThread = Thread.CurrentThread.IsThreadPoolThread;
    Console.WriteLine($"[{DateTimeOffset.Now}]Is thread pool thread: {isThreadPoolThread}");
}

由於建立的Task將會使用指定的DedicatedThreadTaskScheduler 物件來排程,DoAsync方法自然就不會線上程池執行緒中執行了。

image

七、獨立執行緒池

.NET提供的執行緒池是一個全域性共用的執行緒池,而我們定義的DedicatedThreadTaskScheduler相當於建立了一個獨立的執行緒池,物件池的效果可以通過如下這個簡單的程式展現出來。

Task.Factory.StartNew(()=> Task.WhenAll( Enumerable.Range(1,6).Select(it=>DoAsync(it))),
        CancellationToken.None,
        TaskCreationOptions.None,
        new DedicatedThreadTaskScheduler(2));

async Task DoAsync(int index)
{
    await Task.Yield();
    Console.WriteLine($"[{DateTimeOffset.Now.ToString("hh:MM:ss")}]Task {index} is executed in thread {Environment.CurrentManagedThreadId}");
    var endTime = DateTime.UtcNow.AddSeconds(4);
    SpinWait.SpinUntil(() => DateTime.UtcNow > endTime);
    await Task.Delay(1000);
}
Console.ReadLine();

如上面的程式碼片段所示,非同步方法DoAsync利用自旋等待模擬了一段耗時4秒的操作,通過呼叫Task.Delay方法模擬了一段耗時1秒的IO操作。我們在其中輸出了任務開始執行的時間和當前執行緒ID。在呼叫的StartNew方法中,我們呼叫這個DoAsync方法建立了6個Task,這些Task交給建立的DedicatedThreadTaskScheduler進行排程。我們為這個DedicatedThreadTaskScheduler指定的執行緒數量為2。從如下所示的輸出結果可以看出,6個操作確實在兩個執行緒中執行的。

image