C# 非同步串流

C# 不是已經有 async 和 await 語法可實現非同步呼叫嗎?為什麼還需要非同步串流(async streams)呢?


👉 在 GitHub 上面閱讀本文 (GitHub 上面的程式碼沒有顯示行號,較不方便)


在需要處理大量資料的場合,我們可以在程式中透過「非同步」(asynchronous)的方式來呼叫其他 API(例如 Web API、資料查詢 API 等等)以獲取資料,好讓用戶端在我們的程式等待其他 API 回傳資料的過程中還能繼續做其他事情,不至於完全卡住使用者介面。這是非同步呼叫的好處之一。


剛才說的那些 API 所回傳的資料類型通常是 IEnumerable<T>,然而,IEnumerable<T> 的運作方式是同步的(synchronous)。意思是,在拉取資料的場合,用戶端必須等到資料提供端回傳所有需要之資料,才能對那些資料進行後續處理(例如顯示在網頁上)。也就是說,雖然非同步呼叫可以讓使用者繼續動動滑鼠、敲敲鍵盤,但沒辦法讓她想看到的資料盡快出現。


想像一下,你正在開發一個線上購物網站,當使用者在瀏覽商品清單頁面時,拚命地快速往下滑(快速跳下一頁),此時你的網站應用程式會收到許多抓取資料的請求。假設使用者每次往下跳一頁需要抓取 50 筆資料,那麼當使用者連續快速跳下一頁,雖然她的鍵盤與滑鼠都還可以繼續操作,但終究還是得等到所有該抓的資料全部取得之後,才有辦法看到她想看的東西。換言之,無論我們的網站後端程式在抓取資料時有沒有使用非同步呼叫,使用者需要等待的時間幾乎是一樣的,因為她此時的目的只有一個:要盡快看到她想看的資料,而不是在查詢資料時還去點其他按鈕或連結。


那麼,我們是否能在拉取資料時,讓使用者先看到一部份結果?也就是說,一邊抓取資料的同時就一邊回傳當下已經獲取的部分資料——就算只先出現 1 筆資料也好,而不要等到整頁(例如 50 筆)資料全都拉回來之後才一次全部顯示。這就是非同步串流(async streams)能夠派上用場的地方。

IEnumerable<T> 與 yield return

欲了解 C# 非同步串流的運作機制與寫法,至少必須提到兩個 .NET 基礎類別:IEnumerble<T> 和 IAsyncEnumerable<T>,同時也得了解 C# 的 yield return 用法。首先讓我們來看一下 IEnumerble<T> 與 yield return 的範例:


IEnumerable<int> numbers = GetNumbers();
foreach (int n in numbers)
{
Console.WriteLine($"取得 {n}");
}
static IEnumerable<int> GetNumbers()
{
var numbers = new int[] { 1, 2, 3 };
foreach (int n in numbers)
{
yield return n;
Console.WriteLine($"回傳 {n} 之後");
}
}


此範例的功能很單純,只是呼叫 GetNumbers 函式來取得一串整數,然後用一個迴圈把數字輸出至螢幕。執行結果如下:


取得 1
回傳 1 之後
取得 2
回傳 2 之後
取得 3
回傳 3 之後


👉試試看:https://dotnetfiddle.net/Kj1SSA


觀察重點是程式執行的流程:由於使用了 yield return,使得 GetNumbers 函式的執行流程被切成三段(因為回傳的串列元素總數為 3)。 


底下說明程式執行的過程(讀起來可能有點燒腦):

  1. 第 1 行呼叫 GetNumebers 方法,看似已經立刻取得一個完整的整數串列,但實際上並非如此。若以單步追蹤的方式來逐步執行,第 1 行程式碼跑完後,螢幕上並沒有輸出任何文字。
  2. 進入第 2 行的 foreach 迴圈,從 numbers 串列中取出第一個元素時,此時才會真正進入 GetNumbers 方法,並且依序執行第 9~12 行程式碼。第 12 行程式碼的 yield return 會立即返回當前的元素(整數 1),然後程式流程立刻回到呼叫端(即第 4 行)繼續執行,於是在螢幕上輸出字串:「取得 1」。
  3. 接下來,第 2 行的 foreach 進入第二圈,欲取出 numbers 串列的第二個元素,此時程式流程會跳回上一次 yield return 敘述的下面接著執行,於是印出字串:「回傳 1 之後」。接著會跑 GetNumbers 函式中的第二圈,再次碰到 yield return,於是回傳當前的元素(整數 2),程式流程立刻回到呼叫端(即第 4 行)繼續執行,於是在螢幕上輸出字串:「取得 2」。
  4. 跟上一個步驟所描述的流程相同,這次回傳的元素是整數 3。

像這樣跳來跳去的執行流程,是不是有點像 async 呼叫?(但不是)


值得一提的是,GetNumbers 函式雖然反覆進入多次,但第 9 行用來初始化整數陣列的程式碼只會執行一次。換言之,GetNumbers 函式宛如一個狀態機(state machine),能夠記住自身的狀態。這是 yield return 產生的效果。


如果你對這個範例的執行過程還有些疑問,不妨動手修改一下程式碼來實驗看看。比如說,yield return 不是一定要寫在迴圈裡面,你也可以把 GetNumbers 方法改寫成這樣:


IEnumerable<int> GetNumbers()
{
yield return 1;
Console.WriteLine("回傳 1 之後");
yield return 2;
Console.WriteLine("回傳 2 之後");
yield return 3;
Console.WriteLine("回傳 3 之後");
}
view raw GetNumbers-2.cs hosted with ❤ by GitHub

執行結果跟先前的寫法一樣。


另一方面,呼叫端的 foreach 迴圈,骨子裡其實是用列舉器 IEnumerator<T> 的 MoveNext 方法來取得下一個元素,所以先前範例的 1~5 行也可以改寫成:


IEnumerator<int> e = GetNumbers().GetEnumerator();
while (e.MoveNext())
{
Console.WriteLine($"取得 {e.Current}");
}
// 底下是先前的寫法,一併列出來,方便對照。
IEnumerable<int> numbers = GetNumbers();
foreach (int n in numbers)
{
Console.WriteLine($"取得 {n}");
}
view raw Enumerator.cs hosted with ❤ by GitHub
你可以把中斷點設定在呼叫 MoveNext 方法那行程式碼,然後以單步追蹤來觀察程式的執行流程與結果,應該會發現跟原先的版本是一樣的。

IAsyncEnumerable<T> 與 await foreach


剛才的範例程式雖然能夠一邊讀取串列元素,一邊立刻將取得之元素內容輸出至螢幕上,而且程式流程反覆跳來跳去,感覺有點像非同步呼叫,但並不是;剛才的範例都是同步呼叫。現在試著想像一下,GetNumbers 方法裡面會呼叫其他 Web API 或資料查詢函式,因而必須使用 await 來取得非同步工作的結果,像這樣:

IEnumerable<int> GetNumbers()
{
var numbers = await SomeApi.GetNumbersAsync();
foreach (int n in numbers)
{
yield return n;
Console.WriteLine($"回傳 {n} 之後");
}
}
view raw GetNumbers-3.cs hosted with ❤ by GitHub

你可能已經發現上面的程式碼無法通過編譯,因為 C# 的非同步呼叫寫法要求函式當中如果有用到 await,該函式就必須加上 async 宣告,成為非同步方法。像這樣:

async IEnumerable<int> GetNumbers()
{
var numbers = await SomeApi.GetNumbersAsync();
... (略)
}
view raw GetNumbers-4.cs hosted with ❤ by GitHub

但這樣還不夠,因為非同步方法的回傳型別必須是 Task、Task<T>、或者「具備 Task 性質的型別」,例如 IAsyncEnumerable<T>。繞了這麼一大圈,主角終於現身了。底下是非同步版本的 GetNumbers 的完整、正確寫法:

async IAsyncIEnumerable<int> GetNumbers()
{
var numbers = await SomeApi.GetNumbersAsync();
foreach (int n in numbers)
{
yield return n;
Console.WriteLine($"回傳 {n} 之後");
}
}
view raw GetNumbers-5.cs hosted with ❤ by GitHub

有了 IAsyncEnumerable<T>,再搭配 C# 8 新增的 await foreach 語法,就可以讓我們更自然地寫出非同步串流的程式碼。底下是把稍早的同步呼叫的範例程式改寫成 async 版本:

IAsyncEnumerable<int> numbers = GetNumbers();
await foreach (int n in numbers)
{
Console.WriteLine($"取得 {n}");
}
async IAsyncEnumerable<int> GetNumbers()
{
await Task.Delay(1000);
yield return 1;
await Task.Delay(1000);
yield return 2;
await Task.Delay(1000);
yield return 3;
}
view raw GetNumbers-6.cs hosted with ❤ by GitHub

你可以把 GetNumbers 函式裡面的 await Task.Delay() 假裝是某個 web API 呼叫或者某個非同步的資料查詢操作。



呼叫端的部分,除了使用 IAsyncEnumerable<T> 取代同步版本的 IEnumerable<T>,另外就是 foreach 迴圈前面必須加上 await 關鍵字(第 2 行)。下圖同時呈現同步與非同步版本的呼叫端程式碼,方便對照差異:



使用 await foreach 迴圈來取得串列元素時,編譯器所產生的程式碼是使用非同步版本的列舉器 IAsyncEnumerator<T>,像這樣:

IAsyncEnumerator<int> e = GetNumbers().GetAsyncEnumerator();
while (await e.MoveNextAsync())
{
Console.WriteLine($"取得 {e.Current}");
}

下圖可方便比對同步與非同步版本的寫法有何差異:


重點整理


最後整理 C# 非同步串流的幾個重點:

  • 以往使用 IEnumerable<T> 來傳遞資料時,是以「同步」的方式運行。
  • 支援非同步串流的函式,其回傳型別是 IAsyncEnumerable<T>,而不是 IEnumerable<T>。
  • 可讓我們在同一個方法當中使用 await 來取得非同步工作的結果,並使用 yield return 來回傳當下已獲取之資料;即抓到一點就回傳一點,邊抓邊傳的模式。
  • 用戶端程式在使用 await foreach 來逐一取得非同步串流的內容(元素)時,背後真正觸發其「抓取資料」動作的是 IAsyncEnumerator<T> 的 MoveNextAsync 方法。用白話來說就是:當用戶端需要下一筆資料,才立刻去抓那一筆資料。
Happy coding!

💬 P.S. 本文提及「拉取資料」時,指的是 pull-based API,例如 IEnumerable<T>。沒有明確指出的是另一種 push-based API,例如 IObservable<T>。此外,本文也沒有介紹 yield break 和 IAsyncDisposable

Post Comments

技術提供:Blogger.