Skip to main content

Swift AsyncStream

Introduction

AsyncStream is a powerful feature introduced in Swift 5.5 as part of the Swift Concurrency model. It provides a way to bridge traditional callback or delegation-based code with the new async/await pattern by creating asynchronous sequences of values that can be consumed using for await loops.

In simple terms, an AsyncStream allows you to convert events that happen over time (like user interactions, network responses, or sensor data) into a stream of values that can be processed using Swift's modern concurrency features.

Understanding AsyncStream

At its core, AsyncStream is a type that conforms to the AsyncSequence protocol. It represents a sequence of asynchronous elements that you can iterate over using for await loops or access individually with next() calls.

Key Concepts

  • Continuous Values: Unlike a single async operation that returns one value, AsyncStream provides multiple values over time
  • Back-Pressure Handling: AsyncStream includes built-in mechanisms to handle back-pressure (when producers generate values faster than consumers can process them)
  • Continuation-Based: Uses a continuation mechanism to add values to the stream
  • Cancellation Support: Properly handles cancellation of the asynchronous task

Creating a Basic AsyncStream

Let's start with a simple example of creating and using an AsyncStream:

swift
func generateNumbers() -> AsyncStream<Int> {
AsyncStream { continuation in
// Provide values to the stream
for i in 1...5 {
continuation.yield(i)
Thread.sleep(forTimeInterval: 1) // Simulate work
}
continuation.finish() // Signal that no more values will be sent
}
}

// Using the AsyncStream
Task {
for await number in generateNumbers() {
print("Received: \(number)")
}
print("Stream finished")
}

// Output:
// Received: 1
// Received: 2
// Received: 3
// Received: 4
// Received: 5
// Stream finished

In this example:

  • We create an AsyncStream that produces integers
  • Inside the stream, we yield 5 values with a 1-second delay between each
  • When we're done, we call finish() to indicate the end of the stream
  • We consume the stream using a for await loop in an async task

AsyncStream Components

Let's break down the main components of AsyncStream:

1. AsyncStream Initialization

swift
AsyncStream<Element> { continuation in
// Use continuation to yield values
}

The closure provides a continuation you can use to:

  • Add values to the stream with yield()
  • Signal completion with finish()

2. Continuation

The continuation is how you control the stream:

swift
continuation.yield(value) // Add a value to the stream
continuation.yield(with: .success(value)) // Add a successful result
continuation.yield(with: .failure(error)) // Add a failure (requires ThrowingAsyncStream)
continuation.finish() // End the stream

3. Buffering Options

AsyncStream can handle back-pressure with different buffering strategies:

swift
AsyncStream(
bufferingPolicy: .unbounded, // No limit on queued values
_ build: (Continuation) -> Void
)

// Or with limited buffer:
AsyncStream(
bufferingPolicy: .bufferingOldest(5), // Keep only 5 newest values
_ build: (Continuation) -> Void
)

// Or dropping values when full:
AsyncStream(
bufferingPolicy: .bufferingNewest(5), // Keep only 5 oldest values
_ build: (Continuation) -> Void
)

Practical Examples

Example 1: Converting Notifications to AsyncStream

A common use case is converting UIKit/AppKit notifications to AsyncStream:

swift
func notificationStream(for name: Notification.Name) -> AsyncStream<Notification> {
AsyncStream { continuation in
let token = NotificationCenter.default.addObserver(
forName: name,
object: nil,
queue: .main
) { notification in
continuation.yield(notification)
}

continuation.onTermination = { _ in
NotificationCenter.default.removeObserver(token)
}
}
}

// Usage example
Task {
let keyboardStream = notificationStream(for: UIResponder.keyboardWillShowNotification)
for await notification in keyboardStream {
// Handle keyboard appearance
let keyboardFrame = notification.userInfo?[UIResponder.keyboardFrameEndUserInfoKey] as? CGRect
print("Keyboard will show with frame: \(String(describing: keyboardFrame))")
}
}

Example 2: Timer-Based AsyncStream

Create a timer that emits values at regular intervals:

swift
func timerStream(interval: TimeInterval) -> AsyncStream<Date> {
AsyncStream { continuation in
let timer = Timer.scheduledTimer(withTimeInterval: interval, repeats: true) { timer in
continuation.yield(Date())
}

continuation.onTermination = { _ in
timer.invalidate()
}
}
}

// Usage
Task {
let timer = timerStream(interval: 1.0)

// Run for 5 seconds then cancel
var count = 0
for await time in timer {
print("Timer fired at: \(time)")

count += 1
if count >= 5 {
break
}
}
print("Timer cancelled")
}

// Output:
// Timer fired at: 2023-05-10 15:20:01
// Timer fired at: 2023-05-10 15:20:02
// Timer fired at: 2023-05-10 15:20:03
// Timer fired at: 2023-05-10 15:20:04
// Timer fired at: 2023-05-10 15:20:05
// Timer cancelled

Example 3: Converting Delegate Callbacks to AsyncStream

A more complex example - converting a location manager's updates to an AsyncStream:

swift
import CoreLocation

func locationUpdates() -> AsyncStream<CLLocation> {
AsyncStream { continuation in
let manager = CLLocationManager()

class Delegate: NSObject, CLLocationManagerDelegate {
var continuation: AsyncStream<CLLocation>.Continuation?

func locationManager(_ manager: CLLocationManager, didUpdateLocations locations: [CLLocation]) {
locations.forEach { continuation?.yield($0) }
}
}

let delegate = Delegate()
delegate.continuation = continuation
manager.delegate = delegate

// Keep a strong reference to the delegate
continuation.onTermination = { [delegate] _ in
manager.stopUpdatingLocation()
// The delegate reference ensures it lives as long as the continuation
_ = delegate
}

manager.requestWhenInUseAuthorization()
manager.startUpdatingLocation()
}
}

// Usage
Task {
for await location in locationUpdates() {
print("Location update: \(location.coordinate.latitude), \(location.coordinate.longitude)")
}
}

Creating Throwing AsyncStreams

If your stream might encounter errors, use AsyncThrowingStream:

swift
func fetchDataStream(from urls: [URL]) -> AsyncThrowingStream<Data, Error> {
AsyncThrowingStream { continuation in
Task {
for url in urls {
do {
let (data, _) = try await URLSession.shared.data(from: url)
continuation.yield(data)
} catch {
continuation.finish(throwing: error)
return
}
}
continuation.finish()
}
}
}

// Usage with error handling
Task {
let urls = [
URL(string: "https://example.com/api/1")!,
URL(string: "https://example.com/api/2")!
]

do {
for try await data in fetchDataStream(from: urls) {
print("Received \(data.count) bytes")
}
} catch {
print("Stream error: \(error)")
}
}

Advanced AsyncStream Patterns

Manual Iterator

Instead of using for await, you can manually iterate:

swift
Task {
let stream = generateNumbers()
var iterator = stream.makeAsyncIterator()

while let number = await iterator.next() {
print("Got number: \(number)")
}
}

Combining Multiple Streams

You can process multiple streams by using tasks:

swift
func combineStreams<T>(_ streams: [AsyncStream<T>]) async -> AsyncStream<T> {
AsyncStream { continuation in
let taskGroup = Task {
await withTaskGroup(of: Void.self) { group in
for stream in streams {
group.addTask {
for await value in stream {
continuation.yield(value)
}
}
}
}
continuation.finish()
}

continuation.onTermination = { _ in
taskGroup.cancel()
}
}
}

Best Practices

  1. Always handle termination: Use continuation.onTermination to clean up resources
  2. Choose appropriate buffering: Select a buffering policy that suits your needs
  3. Finish the stream: Call continuation.finish() when no more values will be sent
  4. Maintain strong references: Keep references to any objects needed by your continuation
  5. Consider back-pressure: Be mindful of how fast producers generate values vs how fast consumers process them

Comparing AsyncStream with Other Patterns

FeatureAsyncStreamCombine PublishersCallback-Based APIs
Swift VersionSwift 5.5+Swift 5.0+Any
Error HandlingThrough AsyncThrowingStreamThrough publisher errorsVia error callbacks
BackpressureBuilt-in buffering optionsThrough .buffer operatorManual implementation
CancellationThrough task cancellationThrough cancellablesVaries
Integration with async/awaitNativeRequires .values for awaitRequires wrapping

Summary

AsyncStream provides a powerful way to bridge traditional callback-based code with the modern Swift Concurrency model. It allows you to:

  • Convert event-based code to structured asynchronous sequences
  • Process values that arrive over time
  • Handle back-pressure automatically
  • Clean up resources properly

By using AsyncStream, you can write more readable, maintainable code that takes advantage of Swift's concurrency features without having to completely redesign existing APIs.

Further Learning

  1. Exercise: Create an AsyncStream that delivers random numbers at random intervals
  2. Exercise: Implement a chat application using AsyncStream to deliver messages
  3. Advanced: Create a custom AsyncSequence type that behaves like AsyncStream but with additional functionality

Additional Resources



If you spot any mistakes on this website, please let me know at [email protected]. I’d greatly appreciate your feedback! :)