Swift AsyncStream
Introduction
AsyncStream is a powerful feature introduced in Swift 5.5 as part of the Swift Concurrency model. It provides a way to bridge traditional callback or delegation-based code with the new async/await pattern by creating asynchronous sequences of values that can be consumed using for await
loops.
In simple terms, an AsyncStream allows you to convert events that happen over time (like user interactions, network responses, or sensor data) into a stream of values that can be processed using Swift's modern concurrency features.
Understanding AsyncStream
At its core, AsyncStream
is a type that conforms to the AsyncSequence
protocol. It represents a sequence of asynchronous elements that you can iterate over using for await
loops or access individually with next()
calls.
Key Concepts
- Continuous Values: Unlike a single async operation that returns one value, AsyncStream provides multiple values over time
- Back-Pressure Handling: AsyncStream includes built-in mechanisms to handle back-pressure (when producers generate values faster than consumers can process them)
- Continuation-Based: Uses a continuation mechanism to add values to the stream
- Cancellation Support: Properly handles cancellation of the asynchronous task
Creating a Basic AsyncStream
Let's start with a simple example of creating and using an AsyncStream:
func generateNumbers() -> AsyncStream<Int> {
AsyncStream { continuation in
// Provide values to the stream
for i in 1...5 {
continuation.yield(i)
Thread.sleep(forTimeInterval: 1) // Simulate work
}
continuation.finish() // Signal that no more values will be sent
}
}
// Using the AsyncStream
Task {
for await number in generateNumbers() {
print("Received: \(number)")
}
print("Stream finished")
}
// Output:
// Received: 1
// Received: 2
// Received: 3
// Received: 4
// Received: 5
// Stream finished
In this example:
- We create an AsyncStream that produces integers
- Inside the stream, we yield 5 values with a 1-second delay between each
- When we're done, we call
finish()
to indicate the end of the stream - We consume the stream using a
for await
loop in an async task
AsyncStream Components
Let's break down the main components of AsyncStream:
1. AsyncStream Initialization
AsyncStream<Element> { continuation in
// Use continuation to yield values
}
The closure provides a continuation you can use to:
- Add values to the stream with
yield()
- Signal completion with
finish()
2. Continuation
The continuation is how you control the stream:
continuation.yield(value) // Add a value to the stream
continuation.yield(with: .success(value)) // Add a successful result
continuation.yield(with: .failure(error)) // Add a failure (requires ThrowingAsyncStream)
continuation.finish() // End the stream
3. Buffering Options
AsyncStream can handle back-pressure with different buffering strategies:
AsyncStream(
bufferingPolicy: .unbounded, // No limit on queued values
_ build: (Continuation) -> Void
)
// Or with limited buffer:
AsyncStream(
bufferingPolicy: .bufferingOldest(5), // Keep only 5 newest values
_ build: (Continuation) -> Void
)
// Or dropping values when full:
AsyncStream(
bufferingPolicy: .bufferingNewest(5), // Keep only 5 oldest values
_ build: (Continuation) -> Void
)
Practical Examples
Example 1: Converting Notifications to AsyncStream
A common use case is converting UIKit/AppKit notifications to AsyncStream:
func notificationStream(for name: Notification.Name) -> AsyncStream<Notification> {
AsyncStream { continuation in
let token = NotificationCenter.default.addObserver(
forName: name,
object: nil,
queue: .main
) { notification in
continuation.yield(notification)
}
continuation.onTermination = { _ in
NotificationCenter.default.removeObserver(token)
}
}
}
// Usage example
Task {
let keyboardStream = notificationStream(for: UIResponder.keyboardWillShowNotification)
for await notification in keyboardStream {
// Handle keyboard appearance
let keyboardFrame = notification.userInfo?[UIResponder.keyboardFrameEndUserInfoKey] as? CGRect
print("Keyboard will show with frame: \(String(describing: keyboardFrame))")
}
}
Example 2: Timer-Based AsyncStream
Create a timer that emits values at regular intervals:
func timerStream(interval: TimeInterval) -> AsyncStream<Date> {
AsyncStream { continuation in
let timer = Timer.scheduledTimer(withTimeInterval: interval, repeats: true) { timer in
continuation.yield(Date())
}
continuation.onTermination = { _ in
timer.invalidate()
}
}
}
// Usage
Task {
let timer = timerStream(interval: 1.0)
// Run for 5 seconds then cancel
var count = 0
for await time in timer {
print("Timer fired at: \(time)")
count += 1
if count >= 5 {
break
}
}
print("Timer cancelled")
}
// Output:
// Timer fired at: 2023-05-10 15:20:01
// Timer fired at: 2023-05-10 15:20:02
// Timer fired at: 2023-05-10 15:20:03
// Timer fired at: 2023-05-10 15:20:04
// Timer fired at: 2023-05-10 15:20:05
// Timer cancelled
Example 3: Converting Delegate Callbacks to AsyncStream
A more complex example - converting a location manager's updates to an AsyncStream:
import CoreLocation
func locationUpdates() -> AsyncStream<CLLocation> {
AsyncStream { continuation in
let manager = CLLocationManager()
class Delegate: NSObject, CLLocationManagerDelegate {
var continuation: AsyncStream<CLLocation>.Continuation?
func locationManager(_ manager: CLLocationManager, didUpdateLocations locations: [CLLocation]) {
locations.forEach { continuation?.yield($0) }
}
}
let delegate = Delegate()
delegate.continuation = continuation
manager.delegate = delegate
// Keep a strong reference to the delegate
continuation.onTermination = { [delegate] _ in
manager.stopUpdatingLocation()
// The delegate reference ensures it lives as long as the continuation
_ = delegate
}
manager.requestWhenInUseAuthorization()
manager.startUpdatingLocation()
}
}
// Usage
Task {
for await location in locationUpdates() {
print("Location update: \(location.coordinate.latitude), \(location.coordinate.longitude)")
}
}
Creating Throwing AsyncStreams
If your stream might encounter errors, use AsyncThrowingStream
:
func fetchDataStream(from urls: [URL]) -> AsyncThrowingStream<Data, Error> {
AsyncThrowingStream { continuation in
Task {
for url in urls {
do {
let (data, _) = try await URLSession.shared.data(from: url)
continuation.yield(data)
} catch {
continuation.finish(throwing: error)
return
}
}
continuation.finish()
}
}
}
// Usage with error handling
Task {
let urls = [
URL(string: "https://example.com/api/1")!,
URL(string: "https://example.com/api/2")!
]
do {
for try await data in fetchDataStream(from: urls) {
print("Received \(data.count) bytes")
}
} catch {
print("Stream error: \(error)")
}
}
Advanced AsyncStream Patterns
Manual Iterator
Instead of using for await
, you can manually iterate:
Task {
let stream = generateNumbers()
var iterator = stream.makeAsyncIterator()
while let number = await iterator.next() {
print("Got number: \(number)")
}
}
Combining Multiple Streams
You can process multiple streams by using tasks:
func combineStreams<T>(_ streams: [AsyncStream<T>]) async -> AsyncStream<T> {
AsyncStream { continuation in
let taskGroup = Task {
await withTaskGroup(of: Void.self) { group in
for stream in streams {
group.addTask {
for await value in stream {
continuation.yield(value)
}
}
}
}
continuation.finish()
}
continuation.onTermination = { _ in
taskGroup.cancel()
}
}
}
Best Practices
- Always handle termination: Use
continuation.onTermination
to clean up resources - Choose appropriate buffering: Select a buffering policy that suits your needs
- Finish the stream: Call
continuation.finish()
when no more values will be sent - Maintain strong references: Keep references to any objects needed by your continuation
- Consider back-pressure: Be mindful of how fast producers generate values vs how fast consumers process them
Comparing AsyncStream with Other Patterns
Feature | AsyncStream | Combine Publishers | Callback-Based APIs |
---|---|---|---|
Swift Version | Swift 5.5+ | Swift 5.0+ | Any |
Error Handling | Through AsyncThrowingStream | Through publisher errors | Via error callbacks |
Backpressure | Built-in buffering options | Through .buffer operator | Manual implementation |
Cancellation | Through task cancellation | Through cancellables | Varies |
Integration with async/await | Native | Requires .values for await | Requires wrapping |
Summary
AsyncStream provides a powerful way to bridge traditional callback-based code with the modern Swift Concurrency model. It allows you to:
- Convert event-based code to structured asynchronous sequences
- Process values that arrive over time
- Handle back-pressure automatically
- Clean up resources properly
By using AsyncStream, you can write more readable, maintainable code that takes advantage of Swift's concurrency features without having to completely redesign existing APIs.
Further Learning
- Exercise: Create an AsyncStream that delivers random numbers at random intervals
- Exercise: Implement a chat application using AsyncStream to deliver messages
- Advanced: Create a custom AsyncSequence type that behaves like AsyncStream but with additional functionality
Additional Resources
If you spot any mistakes on this website, please let me know at [email protected]. I’d greatly appreciate your feedback! :)