Swift Async Sequences
Introduction
In modern app development, we often work with streams of data that arrive over time rather than all at once - think of events from a WebSocket connection, lines read from a file, or notifications from a sensor. Swift's Async Sequences provide an elegant way to handle these scenarios.
Async sequences are a powerful feature of Swift's concurrency model that allow you to process a stream of values that become available over time, using familiar patterns similar to Swift's synchronous sequences. They bridge the gap between Swift's iteration patterns and asynchronous programming.
In this tutorial, we'll explore how async sequences work, how to create them, and how to use them effectively in your Swift applications.
Understanding Async Sequences
What is an Async Sequence?
An async sequence is similar to a regular Swift sequence, but with an important difference: each element becomes available asynchronously, potentially with delays between elements.
The core protocol is AsyncSequence
, which is similar to the standard Sequence
protocol:
public protocol AsyncSequence {
associatedtype Element
associatedtype AsyncIterator: AsyncIteratorProtocol where AsyncIterator.Element == Element
func makeAsyncIterator() -> AsyncIterator
}
Just as with regular sequences, there's also an iterator protocol:
public protocol AsyncIteratorProtocol {
associatedtype Element
mutating func next() async throws -> Element?
}
The key difference is that the next()
method is marked with async
, indicating that it can suspend execution while waiting for the next element to become available.
Using Async Sequences
Iterating with For-Await-In
The most common way to consume an async sequence is using the for await
loop:
func processItems() async throws {
let dataStream = SomeAsyncSequence()
for await item in dataStream {
// Process each item as it becomes available
print("Received item: \(item)")
}
// Code here executes after the sequence completes
print("Finished processing all items")
}
The for await
loop will:
- Wait for each item to become available
- Execute the loop body for each item
- Complete when the sequence is exhausted (returns
nil
fromnext()
)
Error Handling
Async sequences can throw errors. To handle them, you can use try
with for await
:
func processWithErrorHandling() async {
let riskyStream = SomeRiskyAsyncSequence()
do {
for try await item in riskyStream {
print("Successfully received: \(item)")
}
print("Completed successfully")
} catch {
print("Error occurred: \(error)")
}
}
Built-in Async Sequences
Swift provides several built-in types that conform to AsyncSequence
:
AsyncBytes
When reading from a file or network connection, you can use AsyncBytes
:
func readFile() async throws {
let fileHandle = try FileHandle(forReadingFrom: URL(fileURLWithPath: "/path/to/file"))
for try await line in fileHandle.bytes.lines {
print("Read line: \(line)")
}
}
URL Session Data Tasks
URLSession now supports async sequences for streaming data:
func fetchDataInChunks() async throws {
let url = URL(string: "https://example.com/large-file")!
let (bytes, _) = try await URLSession.shared.bytes(from: url)
var totalBytes = 0
for try await byte in bytes {
totalBytes += 1
if totalBytes % 1_000_000 == 0 {
print("Downloaded \(totalBytes / 1_000_000) MB so far")
}
}
print("Download complete: \(totalBytes) bytes")
}
NotificationCenter
Notifications can now be received as async sequences:
func monitorNotifications() async {
let notifications = NotificationCenter.default.notifications(named: UIApplication.didEnterBackgroundNotification)
for await notification in notifications {
print("App entered background at \(Date())")
}
}
Creating Custom Async Sequences
Using AsyncStream
The simplest way to create a custom async sequence is with AsyncStream
:
func createCountdownStream() -> AsyncStream<Int> {
return AsyncStream { continuation in
// Start a background task
Task {
for count in (1...10).reversed() {
continuation.yield(count) // Emit a value
try? await Task.sleep(nanoseconds: 1_000_000_000) // Wait 1 second
}
continuation.finish() // End the sequence
}
}
}
// Usage
func startCountdown() async {
let countdown = createCountdownStream()
for await count in countdown {
print("\(count)...")
}
print("Liftoff! 🚀")
}
// Output:
// 10...
// 9...
// 8...
// ...
// 1...
// Liftoff! 🚀
AsyncStream
handles all the protocol conformance for you, so you just need to focus on when to emit values using the provided continuation.
Using AsyncThrowingStream
If your sequence needs to handle errors, use AsyncThrowingStream
:
enum FetchError: Error {
case networkFailure
case invalidData
}
func createDataFetcher(urls: [URL]) -> AsyncThrowingStream<Data, Error> {
return AsyncThrowingStream { continuation in
Task {
for url in urls {
do {
let (data, response) = try await URLSession.shared.data(from: url)
guard let httpResponse = response as? HTTPURLResponse,
httpResponse.statusCode == 200 else {
continuation.finish(throwing: FetchError.invalidData)
return
}
continuation.yield(data) // Emit successful data
} catch {
continuation.finish(throwing: error)
return
}
}
continuation.finish() // Successfully completed
}
}
}
// Usage
func fetchAllData() async {
let urls = [
URL(string: "https://example.com/data1")!,
URL(string: "https://example.com/data2")!
]
let dataStream = createDataFetcher(urls: urls)
do {
for try await data in dataStream {
print("Received data of size: \(data.count) bytes")
// Process the data...
}
print("All data fetched successfully")
} catch {
print("Error fetching data: \(error)")
}
}
Implementing AsyncSequence Directly
For complete control, you can implement the AsyncSequence
protocol yourself:
struct CountdownSequence: AsyncSequence {
typealias Element = Int
let start: Int
struct AsyncIterator: AsyncIteratorProtocol {
var current: Int
mutating func next() async -> Int? {
guard current > 0 else {
return nil // End of sequence
}
// Simulate waiting
try? await Task.sleep(nanoseconds: 1_000_000_000)
defer { current -= 1 }
return current
}
}
func makeAsyncIterator() -> AsyncIterator {
return AsyncIterator(current: start)
}
}
// Usage
func customCountdown() async {
let countdown = CountdownSequence(start: 5)
for await count in countdown {
print("\(count)...")
}
print("Go!")
}
// Output:
// 5...
// 4...
// 3...
// 2...
// 1...
// Go!
Real-World Applications
WebSocket Message Handler
Async sequences are perfect for handling WebSocket connections:
class WebSocketManager {
private var webSocketTask: URLSessionWebSocketTask
init(url: URL) {
let session = URLSession(configuration: .default)
webSocketTask = session.webSocketTask(with: url)
webSocketTask.resume()
}
func messages() -> AsyncThrowingStream<String, Error> {
AsyncThrowingStream { continuation in
Task {
do {
while true {
let message = try await webSocketTask.receive()
switch message {
case .string(let text):
continuation.yield(text)
case .data(let data):
if let text = String(data: data, encoding: .utf8) {
continuation.yield(text)
}
@unknown default:
break
}
}
} catch {
continuation.finish(throwing: error)
}
}
}
}
func send(message: String) async throws {
let message = URLSessionWebSocketTask.Message.string(message)
try await webSocketTask.send(message)
}
func close() {
webSocketTask.cancel()
}
}
// Usage
func connectToChat() async {
let wsManager = WebSocketManager(url: URL(string: "wss://chat.example.com")!)
Task {
do {
for try await message in wsManager.messages() {
print("Received: \(message)")
// Update UI with new message
}
} catch {
print("WebSocket error: \(error)")
}
}
// Send a message
try? await wsManager.send(message: "Hello, everyone!")
// Later, when done
wsManager.close()
}
Sensor Data Processing
For an IoT application that processes sensor readings:
struct SensorReading {
let temperature: Double
let humidity: Double
let timestamp: Date
}
class SensorMonitor {
func readings() -> AsyncStream<SensorReading> {
AsyncStream { continuation in
// Set up connection to sensor
let timer = Timer.publish(every: 2.0, on: .main, in: .common).autoconnect()
// Store the cancellable to keep the subscription alive
let cancellable = timer.sink { _ in
// Simulate getting sensor data
let reading = SensorReading(
temperature: Double.random(in: 18...30),
humidity: Double.random(in: 30...70),
timestamp: Date()
)
continuation.yield(reading)
}
// Ensure we clean up when the stream is cancelled
continuation.onTermination = { _ in
cancellable.cancel()
}
}
}
}
// Usage
func monitorEnvironment() async {
let monitor = SensorMonitor()
var temperatureSum = 0.0
var readingCount = 0
for await reading in monitor.readings() {
print("Temperature: \(reading.temperature)°C, Humidity: \(reading.humidity)%")
temperatureSum += reading.temperature
readingCount += 1
// Calculate running average
let averageTemp = temperatureSum / Double(readingCount)
print("Average temperature: \(averageTemp)°C")
// Stop after collecting 10 readings
if readingCount >= 10 {
break
}
}
print("Monitoring complete")
}
Paginated API Requests
Using async sequences for handling paginated API responses:
struct SearchResult: Decodable {
let items: [Item]
let nextPage: String?
}
struct Item: Decodable {
let id: String
let name: String
}
func searchItems(query: String) -> AsyncThrowingStream<Item, Error> {
AsyncThrowingStream { continuation in
Task {
var nextPage: String? = "initial"
// Continue fetching until there's no next page
while let page = nextPage {
do {
let url: URL
if page == "initial" {
url = URL(string: "https://api.example.com/search?query=\(query)")!
} else {
url = URL(string: "https://api.example.com/search?page=\(page)")!
}
let (data, _) = try await URLSession.shared.data(from: url)
let result = try JSONDecoder().decode(SearchResult.self, from: data)
// Emit each item individually
for item in result.items {
continuation.yield(item)
}
// Update the next page or end if we're done
nextPage = result.nextPage
} catch {
continuation.finish(throwing: error)
return
}
}
continuation.finish()
}
}
}
// Usage
func performSearch() async {
do {
var count = 0
// Each item comes as soon as it's available from any page
for try await item in searchItems(query: "swift programming") {
print("Found: \(item.name)")
count += 1
// Process first 50 results only
if count >= 50 {
break
}
}
} catch {
print("Search failed: \(error)")
}
}
Common Async Sequence Transformations
Async sequences support many of the same operations as synchronous sequences:
Map
let countdown = CountdownSequence(start: 3)
let messages = countdown.map { "T-minus \($0)" }
for await message in messages {
print(message)
}
// Output:
// T-minus 3
// T-minus 2
// T-minus 1
Filter
let numbers = createNumberStream()
let evenNumbers = numbers.filter { $0 % 2 == 0 }
for await number in evenNumbers {
print("\(number) is even")
}
Collect
Convert an async sequence into an array:
let countdown = CountdownSequence(start: 3)
let allNumbers = try await countdown.collect()
print(allNumbers) // [3, 2, 1]
Advanced Patterns
Cancellation
You can cancel processing an async sequence by using a task that can be cancelled:
let task = Task {
let stream = infiniteStream()
for await item in stream {
// Process items until cancelled
print(item)
// Check if we should stop
try Task.checkCancellation()
}
}
// Later...
task.cancel() // Stop processing the sequence
Combining Multiple Async Sequences
Sometimes you need to process multiple async sequences together:
func combineStreams<T, U>(
_ stream1: some AsyncSequence<T>,
_ stream2: some AsyncSequence<U>
) async -> AsyncStream<(T?, U?)> {
AsyncStream { continuation in
Task {
// Create child tasks for each stream
let task1 = Task {
var iterator1 = stream1.makeAsyncIterator()
while let item = try? await iterator1.next() {
continuation.yield((item, nil))
}
}
let task2 = Task {
var iterator2 = stream2.makeAsyncIterator()
while let item = try? await iterator2.next() {
continuation.yield((nil, item))
}
}
// Wait for both to complete
await task1.value
await task2.value
continuation.finish()
}
}
}
// Usage
func monitorTwoSources() async {
let tempSensor = createTemperatureStream()
let humiditySensor = createHumidityStream()
let combined = await combineStreams(tempSensor, humiditySensor)
for await (temp, humidity) in combined {
if let temp = temp {
print("Temperature update: \(temp)°C")
}
if let humidity = humidity {
print("Humidity update: \(humidity)%")
}
}
}
Performance Considerations
When working with async sequences, keep these performance tips in mind:
- Buffering: Consider using the
.buffered()
method to control how many items are pre-fetched - Task Priority: Set appropriate task priorities when creating tasks that process async sequences
- Resource Management: Always ensure proper cleanup happens when an async sequence is terminated
- Memory Usage: Be cautious with
.collect()
on unbounded sequences as it could consume excessive memory
Summary
Async sequences provide a powerful way to work with asynchronous streams of data in Swift. They allow you to:
- Process items as they become available over time
- Use familiar Swift sequence patterns like
map
,filter
, and loops - Handle errors gracefully with try-await syntax
- Build custom data streams with
AsyncStream
andAsyncThrowingStream
- Implement complex asynchronous workflows in a readable, maintainable way
Whether you're processing WebSocket messages, handling sensor data, making paginated API requests, or working with any other form of streaming data, async sequences provide a clean and efficient solution.
Additional Resources
Exercises
- Create an async sequence that simulates dice rolls, generating a random number between 1 and 6 every second.
- Implement a file reader that processes a large file line by line as an async sequence, calculating statistics on the fly.
- Create a custom async sequence that generates Fibonacci numbers with a delay between each number.
- Modify the WebSocket example to handle reconnection attempts when the connection is lost.
- Implement a debouncing function for an async sequence that emits a value only after a specified time has passed without new values.
If you spot any mistakes on this website, please let me know at [email protected]. I’d greatly appreciate your feedback! :)