Skip to main content

C# Async Streams

Introduction

Async streams, introduced in C# 8.0, provide an elegant way to work with asynchronous sequences of data. Before async streams, developers had to use complex workarounds when dealing with data that is both asynchronous (comes over time) and enumerable (comes in sequence). Async streams solve this problem by combining the concepts of asynchronous programming and enumeration into a single, coherent feature.

In this tutorial, you'll learn:

  • What async streams are and why they're useful
  • How to create and consume async streams using IAsyncEnumerable<T>
  • How to implement async streams with yield return and await
  • Real-world scenarios where async streams excel

Prerequisites

To follow along with this tutorial, you should have:

  • Basic understanding of C# and .NET
  • Familiarity with asynchronous programming concepts (async/await)
  • Knowledge of IEnumerable and yield return
  • .NET Core 3.0 or later (preferably .NET 6+)

Understanding Async Streams

The Problem Async Streams Solve

Before async streams, developers faced challenges when working with sequences of data that are produced asynchronously:

  1. Using Task<IEnumerable<T>> forces you to wait for the entire collection before processing begins
  2. Creating custom async enumeration patterns was complex and error-prone
  3. Implementing cancellation and disposal patterns manually was difficult

Async streams elegantly solve these problems by providing first-class language support for asynchronous enumeration.

Key Components of Async Streams

An async stream implementation consists of:

  1. IAsyncEnumerable<T>: The interface representing an asynchronous stream of values
  2. IAsyncEnumerator<T>: The interface for iterating through an async stream
  3. await foreach: The language construct for consuming async streams
  4. async yield return: The mechanism for producing async streams

Creating Async Streams

Let's start by creating a basic async stream:

csharp
public static async IAsyncEnumerable<int> GenerateNumbersAsync()
{
for (int i = 0; i < 10; i++)
{
// Simulate asynchronous work
await Task.Delay(100);

// Yield a value asynchronously
yield return i;
}
}

This method returns an IAsyncEnumerable<int> which produces integers asynchronously. The yield return statement works similar to regular enumerables, but in an asynchronous context.

Consuming Async Streams

To consume an async stream, we use the await foreach statement:

csharp
public static async Task ConsumeNumbersAsync()
{
await foreach (var number in GenerateNumbersAsync())
{
Console.WriteLine($"Received: {number}");
}
}

Output:

Received: 0
Received: 1
Received: 2
Received: 3
Received: 4
Received: 5
Received: 6
Received: 7
Received: 8
Received: 9

Notice that each number is processed as soon as it becomes available, rather than waiting for the entire sequence to complete.

Cancellation Support

Async streams support cancellation through a special overload of WithCancellation extension method:

csharp
public static async Task ConsumeWithCancellationAsync()
{
using var cts = new CancellationTokenSource();
cts.CancelAfter(350); // Cancel after 350ms

try
{
await foreach (var number in GenerateNumbersAsync().WithCancellation(cts.Token))
{
Console.WriteLine($"Received: {number}");
}
}
catch (OperationCanceledException)
{
Console.WriteLine("Operation was cancelled");
}
}

Output:

Received: 0
Received: 1
Received: 2
Operation was cancelled

Configuring Async Streams

You can also configure how the async stream behaves with respect to capturing the synchronization context:

csharp
// Run without capturing sync context (similar to ConfigureAwait(false))
await foreach (var number in asyncStream.ConfigureAwait(false))
{
// Process item
}

Real-World Example: Streaming API Results

Let's implement a more realistic example where we stream data from a web API:

csharp
public class Product
{
public int Id { get; set; }
public string Name { get; set; }
public decimal Price { get; set; }
}

public class ProductService
{
private readonly HttpClient _httpClient;

public ProductService(HttpClient httpClient)
{
_httpClient = httpClient;
}

public async IAsyncEnumerable<Product> GetProductsStreamAsync(
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
// Simulate paging through API results
for (int page = 1; page <= 5; page++)
{
cancellationToken.ThrowIfCancellationRequested();

// In real code, we'd call an actual API endpoint
var response = await _httpClient.GetAsync(
$"api/products?page={page}",
cancellationToken);

response.EnsureSuccessStatusCode();

// Simulate deserializing a batch of products
var products = await FetchProductBatchAsync(page);

foreach (var product in products)
{
yield return product;
}

// Delay before fetching next page
await Task.Delay(200, cancellationToken);
}
}

// Helper method to simulate API response
private async Task<List<Product>> FetchProductBatchAsync(int page)
{
await Task.Delay(300); // Simulate network delay

return Enumerable.Range((page - 1) * 10 + 1, 10)
.Select(i => new Product
{
Id = i,
Name = $"Product {i}",
Price = 9.99m + i
})
.ToList();
}
}

To consume this async stream:

csharp
public static async Task ProcessProductStreamAsync()
{
var httpClient = new HttpClient();
var productService = new ProductService(httpClient);

await foreach (var product in productService.GetProductsStreamAsync())
{
Console.WriteLine($"Processing product: {product.Id} - {product.Name} (${product.Price})");

// Simulate some processing work
await Task.Delay(50);
}
}

This example shows how async streams can be perfect for scenarios like:

  • Processing paginated API results
  • Handling large data sets without loading everything into memory
  • Working with streaming data sources

When to Use Async Streams

Async streams are ideal when:

  1. Data arrives over time: When your data source produces items asynchronously
  2. Processing can start early: When you can process items as they arrive, without waiting for the complete sequence
  3. Memory efficiency is important: When loading all items at once would consume too much memory
  4. Cancelability is needed: When you need to support cancellation of the enumeration process

When Not to Use Async Streams

Async streams might not be the best choice when:

  1. You need all the data before processing can begin
  2. The overhead of asynchronous operations isn't justified by the amount of work
  3. You're working with very small collections where the additional complexity isn't warranted

Advanced Patterns

Implementing IAsyncDisposable

For resources that need cleanup after enumeration, implement IAsyncDisposable:

csharp
public class AsyncResourceStream : IAsyncEnumerable<string>, IAsyncDisposable
{
private bool _disposed = false;
private Resource _resource;

public AsyncResourceStream()
{
_resource = new Resource();
}

public async IAsyncEnumerator<string> GetAsyncEnumerator(CancellationToken cancellationToken = default)
{
try
{
for (int i = 0; i < 5; i++)
{
cancellationToken.ThrowIfCancellationRequested();

// Use the resource and produce an item
var item = await _resource.GetItemAsync(i);
yield return item;

await Task.Delay(100, cancellationToken);
}
}
finally
{
// Cleanup happens when the enumerator is disposed
Console.WriteLine("Enumerator is being disposed");
}
}

public async ValueTask DisposeAsync()
{
if (!_disposed)
{
// Clean up resources asynchronously
await _resource.CloseAsync();
_disposed = true;
Console.WriteLine("AsyncResourceStream disposed");
}
}

// Simulation of a resource that requires async cleanup
private class Resource
{
public async Task<string> GetItemAsync(int id)
{
await Task.Delay(50);
return $"Item {id}";
}

public async Task CloseAsync()
{
await Task.Delay(100);
Console.WriteLine("Resource closed");
}
}
}

Usage:

csharp
public static async Task UseAsyncDisposableStreamAsync()
{
await using var resourceStream = new AsyncResourceStream();

await foreach (var item in resourceStream)
{
Console.WriteLine($"Got: {item}");
}
}

Common Patterns and Best Practices

  1. Parameter Cancellation: Use the [EnumeratorCancellation] attribute to forward cancellation tokens correctly:
csharp
public async IAsyncEnumerable<T> GetDataAsync(
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
// Implementation
}
  1. Streams as Return Values: Return IAsyncEnumerable<T> directly from methods rather than wrapping them in Task:
csharp
// GOOD: Returns the stream directly
public IAsyncEnumerable<int> GetValuesAsync()

// BAD: Wraps the stream in a Task
public Task<IAsyncEnumerable<int>> GetValuesAsync()
  1. Limit Buffering: One of the main advantages of async streams is to avoid buffering the entire collection, so avoid collecting the results into a list unless necessary.

Summary

Async streams provide a powerful way to work with asynchronous sequences of data in C#. They combine the asynchronous programming model (async/await) with the enumeration pattern to create a seamless way to process data that arrives over time.

Key takeaways:

  • Use async IAsyncEnumerable<T> to create async streams
  • Use await foreach to consume async streams
  • Leverage cancellation and ConfigureAwait for better control
  • Consider async streams for API pagination, file processing, and other streaming data scenarios

Exercises

  1. Create an async stream that reads a large file line by line asynchronously.
  2. Implement an async stream that simulates fetching data from multiple sources with different delays.
  3. Create a producer-consumer pattern where one method produces items asynchronously and another consumes them.
  4. Extend the ProductService example to include filtering options that are applied during stream processing.

Additional Resources



If you spot any mistakes on this website, please let me know at [email protected]. I’d greatly appreciate your feedback! :)