← 筆記

C#筆記 – 線程及異步(二) 任務

線程池
  • 每個CLR有一個線程池

    • 由CLR控制的所有AppDomain共享
  • 線程池維護了一個「操作請求隊列」

    • 應用程序執行一個異步操作時,就調用方法將一個記錄項追加到該隊列中

    • 線程池從該隊列提取記錄項,將其派發給一個線程池線程

    • 如果池裡沒有線程/發出請求的速度超過了線程池線程處理它們的速度,就創建新線程

  • 線程完成任務後,返回線程池

  • 當一個線程池閑置一段時間後,會自我銷毀並釋放資源

工作項
  • ThreadPool.QueueUserWorkItem

    • 向線程追加一個「工作項」以及可選的狀態數據,然後方法立即返回

    • 池中的某個線程會處理該「工作項」

    • 「工作項」的回調方法必須匹配WaitCallback委托類型

      •   delegate void WaitCallback(Object state);
    •   static void Main(string[] args)
        {
            ThreadPool.QueueUserWorkItem(Test, null);
        }
      
        static void Test(object state) { }

執行上下文
  • 每個線程都關聯了一個執行上下文的數據結構

  • 該結構包含了

    • 安全設置

    • 宿主設置

    • 邏輯調用上下文數據

  • 每當一個線程使用另一個線程執行任務時,前者的執行上下文應複製到後者,確保後者使用的是同一套的設置(安全/宿主)

  • 默認情況下,初始線程的執行上下文「流向」任何輔助線程

    • 可以通過System.Threading.ExecutionContext控制執行上下文的流動
協作式取消
  • 協作式取消

    • 要取消的操作必須顯式支持取消

    • 需要一個System.Threading.CancellationTokenSource對象

      • 該對象構建後,可將其中的Token屬性傳給操作,使操作可以取消

        •   static void Main(string[] args)
            {
                CancellationTokenSource cancelSrc = new CancellationTokenSource();
                CancellationToken ct = cancelSrc.Token;
                ThreadPool.QueueUserWorkItem(o => Count(ct, 100));
            }
          
            static void Count(CancellationToken token, int countTo)
            {
                for (int i = 0; i < countTo; i++)
                {
                    if (token.IsCancellationRequested)
                    {
                        Console.WriteLine("Operation Cancelled");
                        break;
                    }
                    Console.WriteLine(i);
                    Thread.Sleep(200);
                }
                Console.WriteLine("Done");
            }

任務
  • System.Threading.Task

    •   Task.Run(() => { Count(ct, 10); });
    • Task比ThreadPool.QueueUserWorkItem有更多的內建機制

      • 知道操作在甚麼時候完成

      • 操作完成時獲得返回值

  • 在Task構造時,可以傳遞TaskCreationOptions標志控制Task的執行方式

    •   namespace System.Threading.Tasks
        {
            [Flags]
            public enum TaskCreationOptions
            {        
                None = 0,
                PreferFairness = 1,
                LongRunning = 2,
                AttachedToParent = 4,
                DenyChildAttach = 8,
                HideScheduler = 16,
                RunContinuationsAsynchronously = 64
            }
        }
    • 有的標志只是「提議」,TaskScheduler在調度一個Task時不一定會採納

  • 等待任務完成並獲取結果

    •   static void Main(string[] args)
        {
            Task<int> t2 = new Task<int>(() => { return Sum(10); });
            t2.Start();
            t2.Wait(); //等待
            Console.WriteLine($"Result: {t2.Result}"); //.Result取得結果
        }
      
        static int Sum(int n)
        {
            int sum = 0;
            for(int i = 1; i < n; i++)
            {
                sum += i;
                Console.WriteLine($"Step Result: {sum}");
                Thread.Sleep(200);
            }
            return sum;
        }
    • 線程調用Wait方法時:

      • 如果要等待的Task已經開始執行,那調用Wait的線程會阻塞直到Task運行結束

      • 否則,系統可能用Wait的線程來執行Task,調用Wait的線程就不會阻塞

        • 取決於TaskScheduler

        • 如果線程在調用Wait前獲得了一個同步鎖,而Task試圖獲取同一個鎖,就會造成死鎖

    • 除了Task的對象方法Wait,Task還供了兩個靜態方法

      • WaitAny

      • WaitAll

  • 任務完成時自動啟動新任務

    • ContinueWith

    •   Task<int> t2 = new Task<int>(() => { return Sum(10); });
        t2.ContinueWith(task => Console.WriteLine("Next Action Operated"));
        t2.Start();
        t2.Wait();            
        Console.WriteLine($"Result: {t2.Result}");
    • 在調用ContinueWith時,可傳遞一組TaskContinuationOptions標志

      •   namespace System.Threading.Tasks
          {
              [Flags]
              public enum TaskContinuationOptions
              {
                  None = 0,
                  PreferFairness = 1,
                  LongRunning = 2,
                  AttachedToParent = 4,
                  DenyChildAttach = 8,
                  HideScheduler = 16,
                  LazyCancellation = 32,
                  RunContinuationsAsynchronously = 64,
                  NotOnRanToCompletion = 65536,
                  NotOnFaulted = 131072,
                  OnlyOnCanceled = 196608,
                  NotOnCanceled = 262144,
                  OnlyOnFaulted = 327680,
                  OnlyOnRanToCompletion = 393216,
                  ExecuteSynchronously = 524288
              }
          }
  • 啟動子任務

    •       Task<int[]> parent = new Task<int[]>(() =>
            {
                var results = new int[3];
              
                new Task(() => results[0] = Sum(5), TaskCreationOptions.AttachedToParent).Start();
                new Task(() => results[1] = Sum(10), TaskCreationOptions.AttachedToParent).Start();
                new Task(() => results[2] = Sum(15), TaskCreationOptions.AttachedToParent).Start();
              
                return results;
            });
    • AttachedToParent標志將一個Task與創建它的Task關聯,結果是除非所有子任務結束運行,否則父任務不認為已經結束

任務內部揭秘
  • 每個Task對象都有一組字段構成了任務的狀態

    • ID

    • Task執行狀態

    • 對父任務的引用

    • 對Task創建時指定的TaskScheduler的引用

    • 對回調方法的引用

    • 對要傳給回調方法的對象的引用

    • 對ExecutionContext的引用

    • 對ManualResetEventSlim對象的引用

    • 對根據需要創建的補充狀態的引用

      • CancellationToken

      • ContinueWithTask對象集合

      • 為拋出未處理異常的子任務而準備的Task對象集合

  • 創建Task的所需內存遠比ThreadPool.QueueUserWorkItem大

任務工廠
  •       Task t3 = new Task(() =>
          {
              var cts = new CancellationTokenSource();
              var tf = new TaskFactory<int>(
                  cts.Token,
                  TaskCreationOptions.AttachedToParent,
                  TaskContinuationOptions.ExecuteSynchronously,
                  TaskScheduler.Default
              );
            
              var childTask = new[]
              {
                  tf.StartNew(() => Sum(5)),
                  tf.StartNew(() => Sum(10)),
              };
          });
  • 用於創建一組共享相同配置的Task對象

    • CancellationToken

    • TaskScheduler

    • TaskCreationOptions

    • TaskContinuationOptions

  • 子任務通過StartNew來創建

任務調度器
  • 負責執行被調度的任務

  • FCL提供了兩個派生自TaskScheduler的類型

    • 線程池任務調度器(默認)

      • 將任務調度給線程池的工作者線程
    • 同步上下文任務調度器

Task的優勢
  • 使用內存、創建/銷毀的時間比線程少

  • 線程池可用CPU數量自動伸縮任務規模

  • 每個任務完成一個階段,運行任務的線程回到線程池,並等待接收新任務

  • 線程池是站在整個進程的高度觀察任務,所以它能更好地調度任務,減少進程中的線程數和上下文切換

Parallel.For/ForEach/Invoke
  • Parallel.For/ForEach

    •   Parallel.For(0, 1000, i => Console.WriteLine(i));
        Parallel.ForEach(new[] { 1, 2, 3 }, item => Console.WriteLine(item));
    • 使用Parallel的靜態遍歷函數,可以多個線程來輔助遍歷提升效率

      • Parallel所有方法都讓調用線程參與處理

      • 使用前提

        • 工作項必須能并行執行

        • 避免會修改任何共享數據的工作項

      • 開銷

        • 委托對象必須分配,針對每個工作項都要調用一次這些委托

          • 如果每一個工作項都涉及大量工作,效率才會有所提升

          • 如果工作項很少,或工作項本身的處理效率很快,使用Parallel反而會降低性能

并行LINQ(PLINQ)
  • Parallel LINQ

    • 一般LINQ稱為「順序查詢」

    • 并行查詢則為PLINQ

  • PLINQ內部使用Task,將集合中的數據項的處理工作分散到多個CPU上

  • 和Parallel的方法類似,如果要同時處理大量項/每項處理過程耗時長,PLINQ能獲得更大收益

  • 實際使用上,是將LINQ的指令通過ParallelEnumberable.AsParallel使調用的命令轉換成并行版本的

    • 可以使用ParallelEnumberable.AsSequential把并行版本轉換成順序版本

    • 并行查詢返回一個ParallelQuery的結果,應使用ParallelEnumerable.ForAll來處理

  • PLINQ用多個線程處理查詢,返回的結果是無序的

    • 使用ParallelEnumerable.AsOrdered,線程會成組處理數據項,然後這些組被合并回去,同時保持順序,但會損害性能

    • 可以調用WithMergeOption,傳遞ParallelMergeOption標志,控制結果的緩沖和合并方式

      •   namespace System.Linq
          {
              public enum ParallelMergeOptions
              {
                  Default = 0,
                  NotBuffered = 1,
                  AutoBuffered = 2,
                  FullyBuffered = 3
              }
          }
  • PLINQ會分析一個查詢,然後決定是順序處理還是并行處理

    • 可以調用WithExecutionMode,並傳遞ParallelExecutionMode標志來強迫使用并行方式處理查詢

      •   namespace System.Linq
          {
              public enum ParallelExecutionMode
              {        
                  Default = 0,
                  ForceParallelism = 1
              }
          }
          
System.Threading.Timer定時操作
  • System.Threading.Timer

    • 讓一個線程池線程定時調用一個方法

    • 構造Timer實例等同告訴線程池:在將來某個時間回調你的一個方法

    • 回調方法委托類型:TimerCallback

      •   delegate void TimerCallback(Object state);
  • 內部工作

    • 線程池為所有Timer對象只使用了一個線程,該線程知道下一個Timer對象的計時器還有多久觸發

    • Timer對象到期時,線程會被喚醒,調用ThreadPool.QueueUserWorkItem,將一個工作項添加到線程池的隊列中

    • 如果回調方法執行時間很長,可能會造成多個線程池線程同時執行該回調

      • 構造Timer時,應為period參數指定Timeout.Infinite,使計時器只觸發一次

      • 回調方法中調用Timer的對象方法Change,dueTime指定Timeout.Infinite

    • 使用Dispose方法完全取消計時器

      • Timer被GC時,其終結代碼告訴線程池取消計時器,對回調方法的調用也會停止
線程池的線程管理
  • 最好是將線程池看成一個黑盒

  • 線程池永遠都不應該設置線程數上限,因為可能發生飢餓或死鎖

  • CLR團隊一直增加線程池默認擁有的最大線程數

    • 32位進程最大有2GB地址空間

      • 加載了Win32、CLR DLLs,分配了本地堆和托管堆、用戶模式棧、線程環境塊後,大約能提供1360個線程
    • 64位進程最大有8TB地址空間

  • 具體的工作者線程管理

    • ThreadPool.QueueUserWorkItem和Timer類

      • 總是將工作項放到全局隊列中

      • 工作者線程採用FIFO將工作項從隊列中取出

      • 所有工作者線程會競爭一個線程同步鎖,保証同一個工作項不會被多個線程獲取

        • 可能成為瓶頸
    • TaskScheduler對Task的調度

      • 非工作者線程調度Task時,該Task被添加至全局隊列

      • 每個工作者線程有自己的本地隊列,工作者線程調度一個Task時,Task加到本地隊列

        • 使用LIFO將任務從本地隊列取出

        • 只有自己訪問自己,無需同步鎖

        • 當發現本地隊列為空,會嘗試從其他工作者線程中的本地隊列尾部「偷」Task

          • 此時需要獲取一個同步鎖
      • 當所有本地隊列都為空,使用FIFO從全局隊列提取工作項

      • 當全局隊列為空,進入睡眠,一定時間後還沒被喚醒剛自己醒來並銷毀自身

參考書目

  • 《CLR via C#》(第4版) Jeffrey Richter