← 筆記

C#筆記 – 線程及異步(三)異步函數

異步編程
  • 基礎

    • 異步執行充斥了後續操作。在開始做一件事情時,要告知其操作完成後應進行哪些操作

    • 在.NET中,後續操作由委托加以表示,通常為接收異步操作結果的action

      • 因此異步方法需要包裝多個事件,表示在成功或失敗的情況下應執行哪段代碼
    • C#編譯器會對所有await都構建一個後續操作

  • 基於任務的異步模式

    • 不會將後續操作傳遞給異步操作,而是在異步操作開始時返回一個token

    • 這個token用於提供後續操作

      • 它表示正在進行的操作,可能已經完成,也可能正在處理

      • 想法是:在這個操作完成之前,不能進行下一步處理

      • token的形式通常為Task或Task

    • C#5中的異步方法執行流

      • 執行某些操作

      • 開始異步操作,記住返回的token

      • 可能會執行其他操作(一般不會)

      • 等待異步操作完成(通過token)

      • 執行其他操作

      • 完成

  • 異步模型

C#異步函數
  • 用async修飾符聲明,包含await表達式的方法/匿名函數
    • 編譯器將方法代碼轉換成實現了「狀態機」的一個類型
    • await表達式
static async Task<string> TestReqAsync(string msg)
{
    using (var pipe = new MemoryStream())
    {
        byte[] req = Encoding.UTF8.GetBytes(msg);
        await pipe.WriteAsync(req, 0, req.Length); //Call ContinueWith On Task, Return From TestReqAsync
        
        byte[] rsp = new byte[1000];
        int bytesRead = await pipe.ReadAsync(rsp, 0, rsp.Length); //Call ContinueWith On Task, Return From TestReqAsync
        return Encoding.UTF8.GetString(rsp);
   }
}

      • 這令該方法像迭代器一樣,不需要一直執行到結束才返回

        • 當線程調用TestReqAsync時,會構造出一個MemoryStream,並開始執行using塊裡面的代碼

        • 執行至WriteAsync時,WriteAsync會在內部分配一個Task對象

          • 這個Task對象會返回給TestReqAsync

          • C#的await操作符實際上會在Task對象上調用ContinueWith,向它傳遞用於恢復狀態機的方法

          • 然後線程從TestReqAsync返回

        • 當WriteAsync任務在將來某個時候完成時,一個線程池線程會通知Task對象,激活ContinueWith回調方法,恢復狀態機

          • 線程重新進入TestReqAsync,從await位置開始繼續執行後續代碼
        • 執行至ReadAsync時,同樣會在內部分配一個Task對象,其餘操作與WriteAsync類似

        • 狀態機執行完畢後,GC進行回收

      • 實際上,一旦把方法標記為async,編譯器自動生成代碼,在狀態機開始執行時創建一個Task對象

        •   IL_000e:  call       valuetype [System.Threading.Tasks]System.Runtime.CompilerServices.AsyncTaskMethodBuilder`1<!0> valuetype [System.Threading.Tasks]System.Runtime.CompilerServices.AsyncTaskMethodBuilder`1<string>::Create()
            IL_0013:  stfld      valuetype [System.Threading.Tasks]System.Runtime.CompilerServices.AsyncTaskMethodBuilder`1<string> CLR_Ch28.Program/'<TestReqAsync>d__1'::'<>t__builder'
        • 這個對象在狀態機執行完畢時自動完成

        • TestReqAsync返回的類型為Task,該Task正是由編譯器生成的代碼為這個方法的調用者而創建的對象

Await表達式
  • 負責消費異步操作

  • 如果表達式等待的值還不可用,異步函數立即返回

  • 當值可用時,異步函數將回到離開的地方繼續執行(在適當的線程上)

  • await的任務類型是Task<類型參數>;整段await表達式類型就是該類型參數

    • await表達式執行的是「拆包」(unwrap)操作

    • await的主要目的是在等待耗時操作完成時避免阻塞

  • 方法在執行到await表達式時就返回了,在此之前都是同步執行的

    • 到達await後,代碼將檢查其結果是否存在

      • 不存在,會安排一個操作完成後的後續操作

        • 後續操作:在異步操作中完成時執行的回調程序。保留了方法的控制狀態。後續操作記住了它的位置,在執行時可回到原處。

        • Task類包含一個專門用於添加後續操作的方法(Task.ContinueWith)

    • 異步函數的返回類型只能為:

      • void

      • Task

      • Task

    • Task表示一個可能還未完成的操作

      • Task表示一個返回值為T類型的操作,而Task不需要產生返回值

      • 即使沒有返回值,Task本身可以使調用者在返回的任務上,附加後續操作

      • void是為了和事件處理程序兼容,只調用給定的事件處理程序,沒有必要知道事件甚麼時候真正處理完畢

    • 對於一個異步方法,只有作為事件訂閱者時才應該返回void,在其他不需要特定返回值的情況下,最好是返回Task。調用者就可以「等待操作完成」。

  • 可等待模式

    • 一個await操作包含了三種操作

      • 告知是否已經完成

      • 後續操作的添加

      • 獲取結果,返回值或執行成功/失敗的信息

    • 對於一個await表達式,編譯器生成的代碼會先用GetAwaiter(),然後使用awaiter的成員來等待結果

      • C#編譯器要求awaiter必須實現INotifyCompletion接口

        •   //位於System.Runtime.CompilerServices命名空間
            public interface INotifyCompletion
            {
                void OnCompleted(Action contiuation);
            }
      • IsCompleted和GetResult必須是GetAwaiter()方法返回類型上的真正成員,能由包含await表達式的代碼訪問

      • 如果GetResult()返回void,整個await表達式就沒有類型

        • 否則await表達式的類型與GetResult()的返回類型相同

        •   using(var client = new HttpClient())
            {
                Task<string> task = client.GetStringAsync(...);
                string result = await task;
            }
        • 如果await表達式不返回任何類型的值,那就不能將其分配給變量、作為方法實參、執行任何將表達式作為值的相關操作

Await表達式的流
  • await後面有時是方法調用的結果,有時是屬性

    •   string pageText = await new HttpClient().GetStringAsync(url);
    • await只是在操作一個值,該代碼等價於

    •   Task<string> task = new HttpClient().GetStringAsync(url);
        string pageText = await task;
  • await表達式的結果(如有)可以用作實參或作為其他表達式的一部分

    •   AddPayment(await employee.GetHourlyRateAsync() * await timeSheet.GetHoursWorkedAsync(employee.Id));
  • 執行過程到達await表達式後,存在兩種可能

    • 等待中的異步操作已經完成

    • 等待中的異步操作尚未完成

  • 當異步操作仍在執行時,方法異步地等待操作完成,然後執行適當的上下文

    • 「異步等待」意味著方法不再執行,把其後續的操作附加了在異步操作上,然後返回

    • 然後等待完成後,再被異步操作確保方法能在正確的線程中恢復

    • 從一個異步方法「返回」的意味
      • 如果是第一個await表達式,原始調用者還位於棧中的某個位置(在到達await前,方法都是同步執行的)
        • 在這種情況,會將Task或Task返回給調用者
      • 已經等待了其他操作,因此處於由某個操作調用的「後續操作」中
從異步方法返回
  •   public static async Task<int> GetPageLengthAsync()
      {
          using(HttpClient client = new HttpClient())
          {
              Task<string> fetchTextTask = client.GetStringAsync(url);
              int length = (await fetchTextTask).Length;
              return length;
          }
      }
  • 方法中,return的類型是int,而方法簽名中的返回類型為Task

  • 代表生成的代碼為此進行了包裝,調用者將得到一個Task,並最終在方法完成時得到其返回值(int length)

  • 包裝的必要性

    • 在到達return語句前,幾乎必然會發生「返回到調用者」的行為(await),我們需要某種方式向調用者傳播信息,而Task就是通知調用者,對該方法未來生成的值或拋出的異常所做出的承諾
  • 如果return語句出現在有finally的try塊中,那用來計算返回值的表達式將立即被求值,但直到所有對象清理完畢後,才會作為任務結果

    • 同理,如果finally拋出一個異常,整個代碼都會失敗
異常
  • Task表示異常的方式

    • 異步操作失敗,任務的Status變為Faulted

    • Exception屬性返回一個AggregateException,包含所有造成任務失敗的異常;沒有錯誤則返回null

    • 如果任務最終狀態為錯誤,Wait()方法拋出一個AggregateException

    • Task的Result屬性拋出AggregateException

    • 通過CancellationTokenSource和CancellationToken來實現取消操作

      • 如果任務取消了,Wait()方法和Result屬性拋出包含OperationCanceledException的AggregateException
    • 在等待任務時,任務出錯/取消都拋出AggregateException中的第一個異常

  • 在等待時拆包異常

    • awaiter的GetResult可以獲取工作中的異常,將異常從異步操作傳遞回方法中

      • 但是單個Task可以表示多個操作,並導致多個失敗
    •   async Task<string> FetchFirstSuccessfulAsync(IEnumerable<string> urls)
        {
            foreach(string url in urls)
            {
                try
                {
                    using(var client = new HttpClient())
                    {
                        return await client.GetStringAsync(url);
                    }
                }
                catch(WebException e) { ... };
                
                throw new WebException("...");
            }
        }
    • 捕獲WebException

      • GetStringAsync只是負責啟動操作,返回一個包含WebException的任務(Task)

      • 如果調用任務的Wait,則拋出一個包含WebException的AggregateException

      • 任務awaiter的GetResult方法拋出WebException,並被以上代碼捕獲

        • 但這會導致信息丟失

        • 如果錯誤的任務中包含多個異常,GetResult只能拋出第一個異常

  • 在拋出異常時進行包裝

    • 異步方法在調用時永遠不會直接拋出異常。

      • 方法返回一個Task/Task,其中拋出的任何異常都將簡單地傳遞給任務

      • 如果調用者直接等待(Wait())任務,則得到一個包含真正異常的AggregateException

      • 如果使用await,異常則會從任務中解包

      • 一般只需捕獲嵌套的異步方法所拋出的異常即可

    • 失效的參數驗證

      •   static async Task MainAsync()
          {
              Task<int> task = ComputeLengthAsync(null);
              Console.WriteLine("Fetched the task");
              int length = await task;
              Console.WriteLine($"Length: {length}");
          }
        
          static async Task<int> ComputeLengthAsync(string text)
          {
              if(text == null) { throw new ArgumentNullException("text"); }
              await Task.Delay(500);
              return text.Length;
          }
      • 該代碼在失敗前會先輸出Fetched the task

        • 事實上,在輸出Fetched the task前,異常就已經拋出了。因為在text == null的驗證語句前不存在await表達式

        • 調用代碼直到等待返回的任務時,才能看到異常

      • 一般來說,應該迫使異常立即拋出,停止後續的操作。迫使異常立即拋出的方法有兩種:

        • 分離參數驗證和實現

          •   static async Task<int> ComputeLengthAsync(string text)
              {
                  //驗證
                  if(text == null) { throw new ArgumentException("text"); }
                  return ComputeLengthAsyncImpl(text);
              }
              static async Task<int> ComputeLengthAsyncImpl(string text)
              {
                  //異步實現
                  await Task.Delay(500);
                  return text.Length;
              }
        • 異步匿名函數

          •   static Task<int> ComputeLengthAsync(string text)
              {
                  if(text == null) { throw new ArgumentException("text"); }
                  Func<Task<int>> func = async () => 
                  {
                      await Task.Delay(500);
                      return text.Length;
                  };
                  return func;
              }
          • 將工作包裝到一個異步匿名函數中,調用委托並返回結果

  • 處理取消

    • 取消模型

      • 創建一個CancellationTokenSource

      • 向其請求一個CancellationToken

      • 傳遞給異步操作

    • 取消token的方式

      • 調用ThrowIfCancellationRequested

      • 如果取消了token,並沒有其他操作,則拋出OperationCanceledException

    • 如果異步方法拋出OperationCanceledException,返回的任務最終狀態為Canceled

    • 等待一個取消了的操作,將拋出原始的OperationCanceledException,而從異步方法返回的任務同樣會被取消

      • 如果A操作等待B操作,而B操作取消了,A操作也被認為取消了
異步函數的狀態機
  • 如有以下異步函數:

    •   internal sealed class Type1 { }
        internal sealed class Type2 { }
      
        static async Task<Type1> Method1Async()
        {
            Type1 t = new Type1();
            await new MemoryStream().WriteAsync(null, 0, 0);
            return t;
        }
      
        static async Task<Type2> Method2Async()
        {
            Type2 t = new Type2();
            await new MemoryStream().WriteAsync(null, 0, 0);
            return t;
        }
      
        static async Task<string> MyMethodAsync(int arg)
        {
            int local = arg;
            try
            {
                Type1 result1 = await Method1Async();
                for(int i = 0; i < 3; i++)
                {
                    Type2 result2 = await Method2Async();
                }
            }
            catch (Exception)
            {
                Console.WriteLine("Catch");
            }
            finally
            {
                Console.WriteLine("Finally");
            }
            return "Done";
        }
    • 查看IL代碼,可發現其方法所依賴的狀態機結構

  • 在MyMethodAsync類中
.method private hidebysig static class [System.Runtime]System.Threading.Tasks.Task`1<string> 
MyMethodAsync(int32 arg) cil managed
{
.custom instance void [System.Runtime]System.Runtime.CompilerServices.AsyncStateMachineAttribute::.ctor(class [System.Runtime]System.Type) = ( 01 00 24 43 4C 52 5F 43 68 32 38 2E 50 72 6F 67   // ..$CLR_Ch28.Prog
72 61 6D 2B 3C 4D 79 4D 65 74 68 6F 64 41 73 79   // ram+<MyMethodAsy
6E 63 3E 64 5F 5F 35 00 00 )                      // nc>d__5..
.custom instance void [System.Diagnostics.Debug]System.Diagnostics.DebuggerStepThroughAttribute::.ctor() = ( 01 00 00 00 ) 
// 程式碼大小       56 (0x38)
.maxstack  2
.locals init (class CLR_Ch28.Program/'<MyMethodAsync>d__5' V_0)
IL_0000:  newobj     instance void CLR_Ch28.Program/'<MyMethodAsync>d__5'::.ctor()
IL_0005:  stloc.0
IL_0006:  ldloc.0
IL_0007:  ldarg.0
IL_0008:  stfld      int32 CLR_Ch28.Program/'<MyMethodAsync>d__5'::arg
IL_000d:  ldloc.0
IL_000e:  call       valuetype [System.Threading.Tasks]System.Runtime.CompilerServices.AsyncTaskMethodBuilder`1<!0> valuetype [System.Threading.Tasks]System.Runtime.CompilerServices.AsyncTaskMethodBuilder`1<string>::Create()
IL_0013:  stfld      valuetype [System.Threading.Tasks]System.Runtime.CompilerServices.AsyncTaskMethodBuilder`1<string> CLR_Ch28.Program/'<MyMethodAsync>d__5'::'<>t__builder'
IL_0018:  ldloc.0
IL_0019:  ldc.i4.m1
IL_001a:  stfld      int32 CLR_Ch28.Program/'<MyMethodAsync>d__5'::'<>1__state'
IL_001f:  ldloc.0
IL_0020:  ldflda     valuetype [System.Threading.Tasks]System.Runtime.CompilerServices.AsyncTaskMethodBuilder`1<string> CLR_Ch28.Program/'<MyMethodAsync>d__5'::'<>t__builder'
IL_0025:  ldloca.s   V_0
IL_0027:  call       instance void valuetype [System.Threading.Tasks]System.Runtime.CompilerServices.AsyncTaskMethodBuilder`1<string>::Start<class CLR_Ch28.Program/'<MyMethodAsync>d__5'>(!!0&)
IL_002c:  ldloc.0
IL_002d:  ldflda     valuetype [System.Threading.Tasks]System.Runtime.CompilerServices.AsyncTaskMethodBuilder`1<string> CLR_Ch28.Program/'<MyMethodAsync>d__5'::'<>t__builder'
IL_0032:  call       instance class [System.Runtime]System.Threading.Tasks.Task`1<!0> valuetype [System.Threading.Tasks]System.Runtime.CompilerServices.AsyncTaskMethodBuilder`1<string>::get_Task()
IL_0037:  ret
} // end of method Program::MyMethodAsync
  • 使用AysncTaskMethodBuilder構造了builder(狀態機訪問builder來設置Task完成/異常),並將state(狀態機位置)初始化

    • 然後調用Start啟動狀態機

    • 返回狀態機的Task(get_task)

  • MoveNext函數中,包含了整個狀態機的流程

    • 首先是初始化
.locals init (int32 V_0,
string V_1,
valuetype [System.Runtime]System.Runtime.CompilerServices.TaskAwaiter`1<class CLR_Ch28.Program/Type1> V_2,
class CLR_Ch28.Program/'<MyMethodAsync>d__5' V_3,
valuetype [System.Runtime]System.Runtime.CompilerServices.TaskAwaiter`1<class CLR_Ch28.Program/Type2> V_4,
int32 V_5,
bool V_6,
class [System.Runtime]System.Exception V_7)
IL_0000:  ldarg.0
IL_0001:  ldfld      int32 CLR_Ch28.Program/'<MyMethodAsync>d__5'::'<>1__state' //初始化位置
IL_0006:  stloc.0
.try
{
IL_0007:  ldloc.0
IL_0008:  ldc.i4.1
IL_0009:  ble.un.s   IL_000d
IL_000b:  br.s       IL_000f
IL_000d:  br.s       IL_001c
IL_000f:  nop
IL_0010:  ldarg.0
IL_0011:  ldarg.0
IL_0012:  ldfld      int32 CLR_Ch28.Program/'<MyMethodAsync>d__5'::arg //將實參Copy到狀態機字段
IL_0017:  stfld      int32 CLR_Ch28.Program/'<MyMethodAsync>d__5'::'<local>5__1'
IL_001c:  nop
//...
}
  • 然後狀態機中有著以下字段

  • 值得注意的是,每個await都有一個TaskAwaiter類型的字段

    • 在任何時候,都只有其中一個是重要的,這一個引用了最近執行、以異步方式完成的await
  • 然後這是異步方法的核心流程概要IL

.method private hidebysig newslot virtual final 
        instance void  MoveNext() cil managed
{
  .override [System.Runtime]System.Runtime.CompilerServices.IAsyncStateMachine::MoveNext
  // 程式碼大小       448 (0x1c0)
  //...
    .try
    {
      .try
      {
        //調用Method1Async並獲得其Awaiter
        IL_0030:  call       class [System.Runtime]System.Threading.Tasks.Task`1<class CLR_Ch28.Program/Type1> CLR_Ch28.Program::Method1Async()
        IL_0035:  callvirt   instance valuetype [System.Runtime]System.Runtime.CompilerServices.TaskAwaiter`1<!0> class [System.Runtime]System.Threading.Tasks.Task`1<class CLR_Ch28.Program/Type1>::GetAwaiter()
        //...
        //如果還沒完成,使用異步方式完成
        IL_003d:  call       instance bool valuetype [System.Runtime]System.Runtime.CompilerServices.TaskAwaiter`1<class CLR_Ch28.Program/Type1>::get_IsCompleted()
        IL_0042:  brtrue.s   IL_0087
        IL_0048:  stfld      int32 CLR_Ch28.Program/'<MyMethodAsync>d__5'::'<>1__state'
        //...
        //保存awaiter以便將來返回
        IL_004f:  stfld      valuetype [System.Runtime]System.Runtime.CompilerServices.TaskAwaiter`1<class CLR_Ch28.Program/Type1> CLR_Ch28.Program/'<MyMethodAsync>d__5'::'<>u__1'
        //...
        IL_0057:  ldflda     valuetype [System.Threading.Tasks]System.Runtime.CompilerServices.AsyncTaskMethodBuilder`1<string> CLR_Ch28.Program/'<MyMethodAsync>d__5'::'<>t__builder'
        //...
        //await操作完成時調用MoveNext
        IL_0060:  call       instance void valuetype [System.Threading.Tasks]System.Runtime.CompilerServices.AsyncTaskMethodBuilder`1<string>::AwaitUnsafeOnCompleted<valuetype [System.Runtime]System.Runtime.CompilerServices.TaskAwaiter`1<class CLR_Ch28.Program/Type1>,class CLR_Ch28.Program/'<MyMethodAsync>d__5'>(!!0&,!!1&)
        //...
        //第一個await的結果捕捉
        IL_008a:  call       instance !0 valuetype [System.Runtime]System.Runtime.CompilerServices.TaskAwaiter`1<class CLR_Ch28.Program/Type1>::GetResult()
        //...
        //啟動循環
        IL_00ae:  br         IL_0148
        //內部邏輯,與第一個await流程類似
        //...
        //結束循環
        IL_00ec:  leave      IL_01bf
        //第二個await的結果
        IL_010f:  ldloca.s   V_4
        IL_0111:  call       instance !0 valuetype [System.Runtime]System.Runtime.CompilerServices.TaskAwaiter`1<class CLR_Ch28.Program/Type2>::GetResult()
        //...
      }  // end .try
      catch [System.Runtime]System.Exception 
      {
        //...
        IL_0166:  ldstr      "Catch"
        //...
        }  // end handler
      IL_0174:  leave.s    IL_0188
    }  // end .try
    finally
    {
      //...
      IL_017b:  ldstr      "Finally"
      //...
      IL_0187:  endfinally
    }  // end handler
    IL_0188:  ldstr      "Done"
    //...
  }  // end .try
  catch [System.Runtime]System.Exception 
  {
    //狀態機異常處理部分
    //...
    IL_01a2:  call       instance void valuetype [System.Threading.Tasks]System.Runtime.CompilerServices.AsyncTaskMethodBuilder`1<string>::SetException(class [System.Runtime]System.Exception)
    IL_01a7:  nop
    IL_01a8:  leave.s    IL_01bf
  }  // end handler
  //...
  //無異常,返回結果
  IL_01b9:  call       instance void valuetype [System.Threading.Tasks]System.Runtime.CompilerServices.AsyncTaskMethodBuilder`1<string>::SetResult(!0)
  IL_01be:  nop
  IL_01bf:  ret
} // end of method '<MyMethodAsync>d__5'::MoveNext
  • 書中把IL編譯回C#的MoveNext概要

異步函數的限制
  • 不能將Main方法轉變為異步函數

    • 構造器、屬性訪問器、事件訪問器都不能

    • 如果Main方法中使用了await操作符,進程的主線程會在遇到第一個await操作符時立即從Main返回

      • 由於Main返回void,導致調用Main的代碼無法獲得一個可進行監視並等待完成的Task,所以進程直接終止

        • 異步函數不僅可以返回Task,也可以返回void

        • 對於返回void的異步函數,我們沒有辦法知道其狀態機在甚麼時候執行完畢

  • 不能使用任何out/ref參數

    • 原因參考:https://social.msdn.microsoft.com/Forums/en-US/d2f48a52-e35a-4948-844d-828a1a6deb74/why-async-methods-cannot-have-ref-or-out-parameters?forum=async

      • As for why async methods don’t support out-by-reference parameters? (or ref parameters?) That’s a limitation of the CLR. We chose to implement async methods in a similar way to iterator methods — i.e. through the compiler transforming the method into a state-machine-object. The CLR has no safe way to store the address of an “out parameter” or “reference parameter” as a field of an object. The only way to have supported out-by-reference parameters would be if the async feature were done by a low-level CLR rewrite instead of a compiler-rewrite. We examined that approach, and it had a lot going for it, but it would ultimately have been so costly that it’d never have happened.
  • 不能在catch、finally、unsafe、lock塊和非異步匿名函數中使用await操作符

    • catch和finally在C# 6.0時已經可以使用await操作符
  • 不能在await操作得之前獲得一個支持線程所有權或遞歸的鎖(lock或者Monitor.Enter/Exit),並在await操作符之後釋放它

    • await之前的代碼與await之後的代碼不一定由同一個線程執行
  • 在查詢表達式中,await操作符只能在初始from子句的第一個集合表達式/join子句的集合表達式中使用

參考書目

  • 《CLR via C#》(第4版) Jeffrey Richter
  • 《深入理解C#》(第3版) Jon Skeet