C# Async Streams
Introduction
Async streams, introduced in C# 8.0, provide an elegant way to work with asynchronous sequences of data. Before async streams, developers had to use complex workarounds when dealing with data that is both asynchronous (comes over time) and enumerable (comes in sequence). Async streams solve this problem by combining the concepts of asynchronous programming and enumeration into a single, coherent feature.
In this tutorial, you'll learn:
- What async streams are and why they're useful
- How to create and consume async streams using
IAsyncEnumerable<T>
- How to implement async streams with
yield return
andawait
- Real-world scenarios where async streams excel
Prerequisites
To follow along with this tutorial, you should have:
- Basic understanding of C# and .NET
- Familiarity with asynchronous programming concepts (async/await)
- Knowledge of IEnumerable and yield return
- .NET Core 3.0 or later (preferably .NET 6+)
Understanding Async Streams
The Problem Async Streams Solve
Before async streams, developers faced challenges when working with sequences of data that are produced asynchronously:
- Using
Task<IEnumerable<T>>
forces you to wait for the entire collection before processing begins - Creating custom async enumeration patterns was complex and error-prone
- Implementing cancellation and disposal patterns manually was difficult
Async streams elegantly solve these problems by providing first-class language support for asynchronous enumeration.
Key Components of Async Streams
An async stream implementation consists of:
IAsyncEnumerable<T>
: The interface representing an asynchronous stream of valuesIAsyncEnumerator<T>
: The interface for iterating through an async streamawait foreach
: The language construct for consuming async streamsasync yield return
: The mechanism for producing async streams
Creating Async Streams
Let's start by creating a basic async stream:
public static async IAsyncEnumerable<int> GenerateNumbersAsync()
{
for (int i = 0; i < 10; i++)
{
// Simulate asynchronous work
await Task.Delay(100);
// Yield a value asynchronously
yield return i;
}
}
This method returns an IAsyncEnumerable<int>
which produces integers asynchronously. The yield return
statement works similar to regular enumerables, but in an asynchronous context.
Consuming Async Streams
To consume an async stream, we use the await foreach
statement:
public static async Task ConsumeNumbersAsync()
{
await foreach (var number in GenerateNumbersAsync())
{
Console.WriteLine($"Received: {number}");
}
}
Output:
Received: 0
Received: 1
Received: 2
Received: 3
Received: 4
Received: 5
Received: 6
Received: 7
Received: 8
Received: 9
Notice that each number is processed as soon as it becomes available, rather than waiting for the entire sequence to complete.
Cancellation Support
Async streams support cancellation through a special overload of WithCancellation
extension method:
public static async Task ConsumeWithCancellationAsync()
{
using var cts = new CancellationTokenSource();
cts.CancelAfter(350); // Cancel after 350ms
try
{
await foreach (var number in GenerateNumbersAsync().WithCancellation(cts.Token))
{
Console.WriteLine($"Received: {number}");
}
}
catch (OperationCanceledException)
{
Console.WriteLine("Operation was cancelled");
}
}
Output:
Received: 0
Received: 1
Received: 2
Operation was cancelled
Configuring Async Streams
You can also configure how the async stream behaves with respect to capturing the synchronization context:
// Run without capturing sync context (similar to ConfigureAwait(false))
await foreach (var number in asyncStream.ConfigureAwait(false))
{
// Process item
}
Real-World Example: Streaming API Results
Let's implement a more realistic example where we stream data from a web API:
public class Product
{
public int Id { get; set; }
public string Name { get; set; }
public decimal Price { get; set; }
}
public class ProductService
{
private readonly HttpClient _httpClient;
public ProductService(HttpClient httpClient)
{
_httpClient = httpClient;
}
public async IAsyncEnumerable<Product> GetProductsStreamAsync(
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
// Simulate paging through API results
for (int page = 1; page <= 5; page++)
{
cancellationToken.ThrowIfCancellationRequested();
// In real code, we'd call an actual API endpoint
var response = await _httpClient.GetAsync(
$"api/products?page={page}",
cancellationToken);
response.EnsureSuccessStatusCode();
// Simulate deserializing a batch of products
var products = await FetchProductBatchAsync(page);
foreach (var product in products)
{
yield return product;
}
// Delay before fetching next page
await Task.Delay(200, cancellationToken);
}
}
// Helper method to simulate API response
private async Task<List<Product>> FetchProductBatchAsync(int page)
{
await Task.Delay(300); // Simulate network delay
return Enumerable.Range((page - 1) * 10 + 1, 10)
.Select(i => new Product
{
Id = i,
Name = $"Product {i}",
Price = 9.99m + i
})
.ToList();
}
}
To consume this async stream:
public static async Task ProcessProductStreamAsync()
{
var httpClient = new HttpClient();
var productService = new ProductService(httpClient);
await foreach (var product in productService.GetProductsStreamAsync())
{
Console.WriteLine($"Processing product: {product.Id} - {product.Name} (${product.Price})");
// Simulate some processing work
await Task.Delay(50);
}
}
This example shows how async streams can be perfect for scenarios like:
- Processing paginated API results
- Handling large data sets without loading everything into memory
- Working with streaming data sources
When to Use Async Streams
Async streams are ideal when:
- Data arrives over time: When your data source produces items asynchronously
- Processing can start early: When you can process items as they arrive, without waiting for the complete sequence
- Memory efficiency is important: When loading all items at once would consume too much memory
- Cancelability is needed: When you need to support cancellation of the enumeration process
When Not to Use Async Streams
Async streams might not be the best choice when:
- You need all the data before processing can begin
- The overhead of asynchronous operations isn't justified by the amount of work
- You're working with very small collections where the additional complexity isn't warranted
Advanced Patterns
Implementing IAsyncDisposable
For resources that need cleanup after enumeration, implement IAsyncDisposable
:
public class AsyncResourceStream : IAsyncEnumerable<string>, IAsyncDisposable
{
private bool _disposed = false;
private Resource _resource;
public AsyncResourceStream()
{
_resource = new Resource();
}
public async IAsyncEnumerator<string> GetAsyncEnumerator(CancellationToken cancellationToken = default)
{
try
{
for (int i = 0; i < 5; i++)
{
cancellationToken.ThrowIfCancellationRequested();
// Use the resource and produce an item
var item = await _resource.GetItemAsync(i);
yield return item;
await Task.Delay(100, cancellationToken);
}
}
finally
{
// Cleanup happens when the enumerator is disposed
Console.WriteLine("Enumerator is being disposed");
}
}
public async ValueTask DisposeAsync()
{
if (!_disposed)
{
// Clean up resources asynchronously
await _resource.CloseAsync();
_disposed = true;
Console.WriteLine("AsyncResourceStream disposed");
}
}
// Simulation of a resource that requires async cleanup
private class Resource
{
public async Task<string> GetItemAsync(int id)
{
await Task.Delay(50);
return $"Item {id}";
}
public async Task CloseAsync()
{
await Task.Delay(100);
Console.WriteLine("Resource closed");
}
}
}
Usage:
public static async Task UseAsyncDisposableStreamAsync()
{
await using var resourceStream = new AsyncResourceStream();
await foreach (var item in resourceStream)
{
Console.WriteLine($"Got: {item}");
}
}
Common Patterns and Best Practices
- Parameter Cancellation: Use the
[EnumeratorCancellation]
attribute to forward cancellation tokens correctly:
public async IAsyncEnumerable<T> GetDataAsync(
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
// Implementation
}
- Streams as Return Values: Return
IAsyncEnumerable<T>
directly from methods rather than wrapping them inTask
:
// GOOD: Returns the stream directly
public IAsyncEnumerable<int> GetValuesAsync()
// BAD: Wraps the stream in a Task
public Task<IAsyncEnumerable<int>> GetValuesAsync()
- Limit Buffering: One of the main advantages of async streams is to avoid buffering the entire collection, so avoid collecting the results into a list unless necessary.
Summary
Async streams provide a powerful way to work with asynchronous sequences of data in C#. They combine the asynchronous programming model (async/await) with the enumeration pattern to create a seamless way to process data that arrives over time.
Key takeaways:
- Use
async IAsyncEnumerable<T>
to create async streams - Use
await foreach
to consume async streams - Leverage cancellation and ConfigureAwait for better control
- Consider async streams for API pagination, file processing, and other streaming data scenarios
Exercises
- Create an async stream that reads a large file line by line asynchronously.
- Implement an async stream that simulates fetching data from multiple sources with different delays.
- Create a producer-consumer pattern where one method produces items asynchronously and another consumes them.
- Extend the ProductService example to include filtering options that are applied during stream processing.
Additional Resources
If you spot any mistakes on this website, please let me know at [email protected]. I’d greatly appreciate your feedback! :)