C#筆記 – 線程及異步(二) 任務
線程池
-
每個CLR有一個線程池
- 由CLR控制的所有AppDomain共享
-
線程池維護了一個「操作請求隊列」
-
應用程序執行一個異步操作時,就調用方法將一個記錄項追加到該隊列中
-
線程池從該隊列提取記錄項,將其派發給一個線程池線程
-
如果池裡沒有線程/發出請求的速度超過了線程池線程處理它們的速度,就創建新線程
-
-
線程完成任務後,返回線程池
-
當一個線程池閑置一段時間後,會自我銷毀並釋放資源
工作項
-
ThreadPool.QueueUserWorkItem
-
向線程追加一個「工作項」以及可選的狀態數據,然後方法立即返回
-
池中的某個線程會處理該「工作項」
-
「工作項」的回調方法必須匹配WaitCallback委托類型
-
delegate void WaitCallback(Object state);
-
-
static void Main(string[] args) { ThreadPool.QueueUserWorkItem(Test, null); } static void Test(object state) { }
-
執行上下文
-
每個線程都關聯了一個執行上下文的數據結構
-
該結構包含了
-
安全設置
-
宿主設置
-
邏輯調用上下文數據
-
-
每當一個線程使用另一個線程執行任務時,前者的執行上下文應複製到後者,確保後者使用的是同一套的設置(安全/宿主)
-
默認情況下,初始線程的執行上下文「流向」任何輔助線程
- 可以通過System.Threading.ExecutionContext控制執行上下文的流動
協作式取消
-
協作式取消
-
要取消的操作必須顯式支持取消
-
需要一個System.Threading.CancellationTokenSource對象
-
該對象構建後,可將其中的Token屬性傳給操作,使操作可以取消
-
static void Main(string[] args) { CancellationTokenSource cancelSrc = new CancellationTokenSource(); CancellationToken ct = cancelSrc.Token; ThreadPool.QueueUserWorkItem(o => Count(ct, 100)); } static void Count(CancellationToken token, int countTo) { for (int i = 0; i < countTo; i++) { if (token.IsCancellationRequested) { Console.WriteLine("Operation Cancelled"); break; } Console.WriteLine(i); Thread.Sleep(200); } Console.WriteLine("Done"); }
-
-
-
任務
-
System.Threading.Task
-
Task.Run(() => { Count(ct, 10); }); -
Task比ThreadPool.QueueUserWorkItem有更多的內建機制
-
知道操作在甚麼時候完成
-
操作完成時獲得返回值
-
-
-
在Task構造時,可以傳遞TaskCreationOptions標志控制Task的執行方式
-
namespace System.Threading.Tasks { [Flags] public enum TaskCreationOptions { None = 0, PreferFairness = 1, LongRunning = 2, AttachedToParent = 4, DenyChildAttach = 8, HideScheduler = 16, RunContinuationsAsynchronously = 64 } } -
有的標志只是「提議」,TaskScheduler在調度一個Task時不一定會採納
-
-
等待任務完成並獲取結果
-
static void Main(string[] args) { Task<int> t2 = new Task<int>(() => { return Sum(10); }); t2.Start(); t2.Wait(); //等待 Console.WriteLine($"Result: {t2.Result}"); //.Result取得結果 } static int Sum(int n) { int sum = 0; for(int i = 1; i < n; i++) { sum += i; Console.WriteLine($"Step Result: {sum}"); Thread.Sleep(200); } return sum; } -
線程調用Wait方法時:
-
如果要等待的Task已經開始執行,那調用Wait的線程會阻塞直到Task運行結束
-
否則,系統可能用Wait的線程來執行Task,調用Wait的線程就不會阻塞
-
取決於TaskScheduler
-
如果線程在調用Wait前獲得了一個同步鎖,而Task試圖獲取同一個鎖,就會造成死鎖
-
-
-
除了Task的對象方法Wait,Task還供了兩個靜態方法
-
WaitAny
-
WaitAll
-
-
-
任務完成時自動啟動新任務
-
ContinueWith
-
Task<int> t2 = new Task<int>(() => { return Sum(10); }); t2.ContinueWith(task => Console.WriteLine("Next Action Operated")); t2.Start(); t2.Wait(); Console.WriteLine($"Result: {t2.Result}"); -
在調用ContinueWith時,可傳遞一組TaskContinuationOptions標志
-
namespace System.Threading.Tasks { [Flags] public enum TaskContinuationOptions { None = 0, PreferFairness = 1, LongRunning = 2, AttachedToParent = 4, DenyChildAttach = 8, HideScheduler = 16, LazyCancellation = 32, RunContinuationsAsynchronously = 64, NotOnRanToCompletion = 65536, NotOnFaulted = 131072, OnlyOnCanceled = 196608, NotOnCanceled = 262144, OnlyOnFaulted = 327680, OnlyOnRanToCompletion = 393216, ExecuteSynchronously = 524288 } }
-
-
-
啟動子任務
-
Task<int[]> parent = new Task<int[]>(() => { var results = new int[3]; new Task(() => results[0] = Sum(5), TaskCreationOptions.AttachedToParent).Start(); new Task(() => results[1] = Sum(10), TaskCreationOptions.AttachedToParent).Start(); new Task(() => results[2] = Sum(15), TaskCreationOptions.AttachedToParent).Start(); return results; }); -
AttachedToParent標志將一個Task與創建它的Task關聯,結果是除非所有子任務結束運行,否則父任務不認為已經結束
-
任務內部揭秘
-
每個Task對象都有一組字段構成了任務的狀態
-
ID
-
Task執行狀態
-
對父任務的引用
-
對Task創建時指定的TaskScheduler的引用
-
對回調方法的引用
-
對要傳給回調方法的對象的引用
-
對ExecutionContext的引用
-
對ManualResetEventSlim對象的引用
-
對根據需要創建的補充狀態的引用
-
CancellationToken
-
ContinueWithTask對象集合
-
為拋出未處理異常的子任務而準備的Task對象集合
-
-
-
創建Task的所需內存遠比ThreadPool.QueueUserWorkItem大
任務工廠
-
Task t3 = new Task(() => { var cts = new CancellationTokenSource(); var tf = new TaskFactory<int>( cts.Token, TaskCreationOptions.AttachedToParent, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default ); var childTask = new[] { tf.StartNew(() => Sum(5)), tf.StartNew(() => Sum(10)), }; }); -
用於創建一組共享相同配置的Task對象
-
CancellationToken
-
TaskScheduler
-
TaskCreationOptions
-
TaskContinuationOptions
-
-
子任務通過StartNew來創建
任務調度器
-
負責執行被調度的任務
-
FCL提供了兩個派生自TaskScheduler的類型
-
線程池任務調度器(默認)
- 將任務調度給線程池的工作者線程
-
同步上下文任務調度器
-
Task的優勢
-
使用內存、創建/銷毀的時間比線程少
-
線程池可用CPU數量自動伸縮任務規模
-
每個任務完成一個階段,運行任務的線程回到線程池,並等待接收新任務
-
線程池是站在整個進程的高度觀察任務,所以它能更好地調度任務,減少進程中的線程數和上下文切換
Parallel.For/ForEach/Invoke
-
Parallel.For/ForEach
-
Parallel.For(0, 1000, i => Console.WriteLine(i)); Parallel.ForEach(new[] { 1, 2, 3 }, item => Console.WriteLine(item)); -
使用Parallel的靜態遍歷函數,可以多個線程來輔助遍歷提升效率
-
Parallel所有方法都讓調用線程參與處理
-
使用前提
-
工作項必須能并行執行
-
避免會修改任何共享數據的工作項
-
-
開銷
-
委托對象必須分配,針對每個工作項都要調用一次這些委托
-
如果每一個工作項都涉及大量工作,效率才會有所提升
-
如果工作項很少,或工作項本身的處理效率很快,使用Parallel反而會降低性能
-
-
-
-
并行LINQ(PLINQ)
-
Parallel LINQ
-
一般LINQ稱為「順序查詢」
-
并行查詢則為PLINQ
-
-
PLINQ內部使用Task,將集合中的數據項的處理工作分散到多個CPU上
-
和Parallel的方法類似,如果要同時處理大量項/每項處理過程耗時長,PLINQ能獲得更大收益
-
實際使用上,是將LINQ的指令通過ParallelEnumberable.AsParallel使調用的命令轉換成并行版本的
-
可以使用ParallelEnumberable.AsSequential把并行版本轉換成順序版本
-
并行查詢返回一個ParallelQuery的結果,應使用ParallelEnumerable.ForAll來處理
-
-
PLINQ用多個線程處理查詢,返回的結果是無序的
-
使用ParallelEnumerable.AsOrdered,線程會成組處理數據項,然後這些組被合并回去,同時保持順序,但會損害性能
-
可以調用WithMergeOption,傳遞ParallelMergeOption標志,控制結果的緩沖和合并方式
-
namespace System.Linq { public enum ParallelMergeOptions { Default = 0, NotBuffered = 1, AutoBuffered = 2, FullyBuffered = 3 } }
-
-
-
PLINQ會分析一個查詢,然後決定是順序處理還是并行處理
-
可以調用WithExecutionMode,並傳遞ParallelExecutionMode標志來強迫使用并行方式處理查詢
-
namespace System.Linq { public enum ParallelExecutionMode { Default = 0, ForceParallelism = 1 } }
-
-
System.Threading.Timer定時操作
-
System.Threading.Timer
-
讓一個線程池線程定時調用一個方法
-
構造Timer實例等同告訴線程池:在將來某個時間回調你的一個方法
-
回調方法委托類型:TimerCallback
-
delegate void TimerCallback(Object state);
-
-
-
內部工作
-
線程池為所有Timer對象只使用了一個線程,該線程知道下一個Timer對象的計時器還有多久觸發
-
Timer對象到期時,線程會被喚醒,調用ThreadPool.QueueUserWorkItem,將一個工作項添加到線程池的隊列中
-
如果回調方法執行時間很長,可能會造成多個線程池線程同時執行該回調
-
構造Timer時,應為period參數指定Timeout.Infinite,使計時器只觸發一次
-
回調方法中調用Timer的對象方法Change,dueTime指定Timeout.Infinite
-
-
使用Dispose方法完全取消計時器
- Timer被GC時,其終結代碼告訴線程池取消計時器,對回調方法的調用也會停止
-
線程池的線程管理
-
最好是將線程池看成一個黑盒
-
線程池永遠都不應該設置線程數上限,因為可能發生飢餓或死鎖
-
CLR團隊一直增加線程池默認擁有的最大線程數
-
32位進程最大有2GB地址空間
- 加載了Win32、CLR DLLs,分配了本地堆和托管堆、用戶模式棧、線程環境塊後,大約能提供1360個線程
-
64位進程最大有8TB地址空間
-
-
具體的工作者線程管理
-
ThreadPool.QueueUserWorkItem和Timer類
-
總是將工作項放到全局隊列中
-
工作者線程採用FIFO將工作項從隊列中取出
-
所有工作者線程會競爭一個線程同步鎖,保証同一個工作項不會被多個線程獲取
- 可能成為瓶頸
-
-
TaskScheduler對Task的調度
-
非工作者線程調度Task時,該Task被添加至全局隊列
-
每個工作者線程有自己的本地隊列,工作者線程調度一個Task時,Task加到本地隊列
-
使用LIFO將任務從本地隊列取出
-
只有自己訪問自己,無需同步鎖
-
當發現本地隊列為空,會嘗試從其他工作者線程中的本地隊列尾部「偷」Task
- 此時需要獲取一個同步鎖
-
-
當所有本地隊列都為空,使用FIFO從全局隊列提取工作項
-
當全局隊列為空,進入睡眠,一定時間後還沒被喚醒剛自己醒來並銷毀自身
-
-
參考書目
- 《CLR via C#》(第4版) Jeffrey Richter