Skip to main content

Swift Async Sequences

Introduction

In modern app development, we often work with streams of data that arrive over time rather than all at once - think of events from a WebSocket connection, lines read from a file, or notifications from a sensor. Swift's Async Sequences provide an elegant way to handle these scenarios.

Async sequences are a powerful feature of Swift's concurrency model that allow you to process a stream of values that become available over time, using familiar patterns similar to Swift's synchronous sequences. They bridge the gap between Swift's iteration patterns and asynchronous programming.

In this tutorial, we'll explore how async sequences work, how to create them, and how to use them effectively in your Swift applications.

Understanding Async Sequences

What is an Async Sequence?

An async sequence is similar to a regular Swift sequence, but with an important difference: each element becomes available asynchronously, potentially with delays between elements.

The core protocol is AsyncSequence, which is similar to the standard Sequence protocol:

swift
public protocol AsyncSequence {
associatedtype Element
associatedtype AsyncIterator: AsyncIteratorProtocol where AsyncIterator.Element == Element

func makeAsyncIterator() -> AsyncIterator
}

Just as with regular sequences, there's also an iterator protocol:

swift
public protocol AsyncIteratorProtocol {
associatedtype Element

mutating func next() async throws -> Element?
}

The key difference is that the next() method is marked with async, indicating that it can suspend execution while waiting for the next element to become available.

Using Async Sequences

Iterating with For-Await-In

The most common way to consume an async sequence is using the for await loop:

swift
func processItems() async throws {
let dataStream = SomeAsyncSequence()

for await item in dataStream {
// Process each item as it becomes available
print("Received item: \(item)")
}

// Code here executes after the sequence completes
print("Finished processing all items")
}

The for await loop will:

  1. Wait for each item to become available
  2. Execute the loop body for each item
  3. Complete when the sequence is exhausted (returns nil from next())

Error Handling

Async sequences can throw errors. To handle them, you can use try with for await:

swift
func processWithErrorHandling() async {
let riskyStream = SomeRiskyAsyncSequence()

do {
for try await item in riskyStream {
print("Successfully received: \(item)")
}
print("Completed successfully")
} catch {
print("Error occurred: \(error)")
}
}

Built-in Async Sequences

Swift provides several built-in types that conform to AsyncSequence:

AsyncBytes

When reading from a file or network connection, you can use AsyncBytes:

swift
func readFile() async throws {
let fileHandle = try FileHandle(forReadingFrom: URL(fileURLWithPath: "/path/to/file"))

for try await line in fileHandle.bytes.lines {
print("Read line: \(line)")
}
}

URL Session Data Tasks

URLSession now supports async sequences for streaming data:

swift
func fetchDataInChunks() async throws {
let url = URL(string: "https://example.com/large-file")!
let (bytes, _) = try await URLSession.shared.bytes(from: url)

var totalBytes = 0
for try await byte in bytes {
totalBytes += 1
if totalBytes % 1_000_000 == 0 {
print("Downloaded \(totalBytes / 1_000_000) MB so far")
}
}

print("Download complete: \(totalBytes) bytes")
}

NotificationCenter

Notifications can now be received as async sequences:

swift
func monitorNotifications() async {
let notifications = NotificationCenter.default.notifications(named: UIApplication.didEnterBackgroundNotification)

for await notification in notifications {
print("App entered background at \(Date())")
}
}

Creating Custom Async Sequences

Using AsyncStream

The simplest way to create a custom async sequence is with AsyncStream:

swift
func createCountdownStream() -> AsyncStream<Int> {
return AsyncStream { continuation in
// Start a background task
Task {
for count in (1...10).reversed() {
continuation.yield(count) // Emit a value
try? await Task.sleep(nanoseconds: 1_000_000_000) // Wait 1 second
}
continuation.finish() // End the sequence
}
}
}

// Usage
func startCountdown() async {
let countdown = createCountdownStream()

for await count in countdown {
print("\(count)...")
}

print("Liftoff! 🚀")
}

// Output:
// 10...
// 9...
// 8...
// ...
// 1...
// Liftoff! 🚀

AsyncStream handles all the protocol conformance for you, so you just need to focus on when to emit values using the provided continuation.

Using AsyncThrowingStream

If your sequence needs to handle errors, use AsyncThrowingStream:

swift
enum FetchError: Error {
case networkFailure
case invalidData
}

func createDataFetcher(urls: [URL]) -> AsyncThrowingStream<Data, Error> {
return AsyncThrowingStream { continuation in
Task {
for url in urls {
do {
let (data, response) = try await URLSession.shared.data(from: url)

guard let httpResponse = response as? HTTPURLResponse,
httpResponse.statusCode == 200 else {
continuation.finish(throwing: FetchError.invalidData)
return
}

continuation.yield(data) // Emit successful data
} catch {
continuation.finish(throwing: error)
return
}
}
continuation.finish() // Successfully completed
}
}
}

// Usage
func fetchAllData() async {
let urls = [
URL(string: "https://example.com/data1")!,
URL(string: "https://example.com/data2")!
]

let dataStream = createDataFetcher(urls: urls)

do {
for try await data in dataStream {
print("Received data of size: \(data.count) bytes")
// Process the data...
}
print("All data fetched successfully")
} catch {
print("Error fetching data: \(error)")
}
}

Implementing AsyncSequence Directly

For complete control, you can implement the AsyncSequence protocol yourself:

swift
struct CountdownSequence: AsyncSequence {
typealias Element = Int
let start: Int

struct AsyncIterator: AsyncIteratorProtocol {
var current: Int

mutating func next() async -> Int? {
guard current > 0 else {
return nil // End of sequence
}

// Simulate waiting
try? await Task.sleep(nanoseconds: 1_000_000_000)

defer { current -= 1 }
return current
}
}

func makeAsyncIterator() -> AsyncIterator {
return AsyncIterator(current: start)
}
}

// Usage
func customCountdown() async {
let countdown = CountdownSequence(start: 5)

for await count in countdown {
print("\(count)...")
}

print("Go!")
}

// Output:
// 5...
// 4...
// 3...
// 2...
// 1...
// Go!

Real-World Applications

WebSocket Message Handler

Async sequences are perfect for handling WebSocket connections:

swift
class WebSocketManager {
private var webSocketTask: URLSessionWebSocketTask

init(url: URL) {
let session = URLSession(configuration: .default)
webSocketTask = session.webSocketTask(with: url)
webSocketTask.resume()
}

func messages() -> AsyncThrowingStream<String, Error> {
AsyncThrowingStream { continuation in
Task {
do {
while true {
let message = try await webSocketTask.receive()

switch message {
case .string(let text):
continuation.yield(text)
case .data(let data):
if let text = String(data: data, encoding: .utf8) {
continuation.yield(text)
}
@unknown default:
break
}
}
} catch {
continuation.finish(throwing: error)
}
}
}
}

func send(message: String) async throws {
let message = URLSessionWebSocketTask.Message.string(message)
try await webSocketTask.send(message)
}

func close() {
webSocketTask.cancel()
}
}

// Usage
func connectToChat() async {
let wsManager = WebSocketManager(url: URL(string: "wss://chat.example.com")!)

Task {
do {
for try await message in wsManager.messages() {
print("Received: \(message)")
// Update UI with new message
}
} catch {
print("WebSocket error: \(error)")
}
}

// Send a message
try? await wsManager.send(message: "Hello, everyone!")

// Later, when done
wsManager.close()
}

Sensor Data Processing

For an IoT application that processes sensor readings:

swift
struct SensorReading {
let temperature: Double
let humidity: Double
let timestamp: Date
}

class SensorMonitor {
func readings() -> AsyncStream<SensorReading> {
AsyncStream { continuation in
// Set up connection to sensor
let timer = Timer.publish(every: 2.0, on: .main, in: .common).autoconnect()

// Store the cancellable to keep the subscription alive
let cancellable = timer.sink { _ in
// Simulate getting sensor data
let reading = SensorReading(
temperature: Double.random(in: 18...30),
humidity: Double.random(in: 30...70),
timestamp: Date()
)
continuation.yield(reading)
}

// Ensure we clean up when the stream is cancelled
continuation.onTermination = { _ in
cancellable.cancel()
}
}
}
}

// Usage
func monitorEnvironment() async {
let monitor = SensorMonitor()
var temperatureSum = 0.0
var readingCount = 0

for await reading in monitor.readings() {
print("Temperature: \(reading.temperature)°C, Humidity: \(reading.humidity)%")

temperatureSum += reading.temperature
readingCount += 1

// Calculate running average
let averageTemp = temperatureSum / Double(readingCount)
print("Average temperature: \(averageTemp)°C")

// Stop after collecting 10 readings
if readingCount >= 10 {
break
}
}

print("Monitoring complete")
}

Paginated API Requests

Using async sequences for handling paginated API responses:

swift
struct SearchResult: Decodable {
let items: [Item]
let nextPage: String?
}

struct Item: Decodable {
let id: String
let name: String
}

func searchItems(query: String) -> AsyncThrowingStream<Item, Error> {
AsyncThrowingStream { continuation in
Task {
var nextPage: String? = "initial"

// Continue fetching until there's no next page
while let page = nextPage {
do {
let url: URL
if page == "initial" {
url = URL(string: "https://api.example.com/search?query=\(query)")!
} else {
url = URL(string: "https://api.example.com/search?page=\(page)")!
}

let (data, _) = try await URLSession.shared.data(from: url)
let result = try JSONDecoder().decode(SearchResult.self, from: data)

// Emit each item individually
for item in result.items {
continuation.yield(item)
}

// Update the next page or end if we're done
nextPage = result.nextPage
} catch {
continuation.finish(throwing: error)
return
}
}

continuation.finish()
}
}
}

// Usage
func performSearch() async {
do {
var count = 0
// Each item comes as soon as it's available from any page
for try await item in searchItems(query: "swift programming") {
print("Found: \(item.name)")
count += 1

// Process first 50 results only
if count >= 50 {
break
}
}
} catch {
print("Search failed: \(error)")
}
}

Common Async Sequence Transformations

Async sequences support many of the same operations as synchronous sequences:

Map

swift
let countdown = CountdownSequence(start: 3)
let messages = countdown.map { "T-minus \($0)" }

for await message in messages {
print(message)
}

// Output:
// T-minus 3
// T-minus 2
// T-minus 1

Filter

swift
let numbers = createNumberStream()
let evenNumbers = numbers.filter { $0 % 2 == 0 }

for await number in evenNumbers {
print("\(number) is even")
}

Collect

Convert an async sequence into an array:

swift
let countdown = CountdownSequence(start: 3)
let allNumbers = try await countdown.collect()
print(allNumbers) // [3, 2, 1]

Advanced Patterns

Cancellation

You can cancel processing an async sequence by using a task that can be cancelled:

swift
let task = Task {
let stream = infiniteStream()
for await item in stream {
// Process items until cancelled
print(item)

// Check if we should stop
try Task.checkCancellation()
}
}

// Later...
task.cancel() // Stop processing the sequence

Combining Multiple Async Sequences

Sometimes you need to process multiple async sequences together:

swift
func combineStreams<T, U>(
_ stream1: some AsyncSequence<T>,
_ stream2: some AsyncSequence<U>
) async -> AsyncStream<(T?, U?)> {
AsyncStream { continuation in
Task {
// Create child tasks for each stream
let task1 = Task {
var iterator1 = stream1.makeAsyncIterator()
while let item = try? await iterator1.next() {
continuation.yield((item, nil))
}
}

let task2 = Task {
var iterator2 = stream2.makeAsyncIterator()
while let item = try? await iterator2.next() {
continuation.yield((nil, item))
}
}

// Wait for both to complete
await task1.value
await task2.value

continuation.finish()
}
}
}

// Usage
func monitorTwoSources() async {
let tempSensor = createTemperatureStream()
let humiditySensor = createHumidityStream()

let combined = await combineStreams(tempSensor, humiditySensor)

for await (temp, humidity) in combined {
if let temp = temp {
print("Temperature update: \(temp)°C")
}
if let humidity = humidity {
print("Humidity update: \(humidity)%")
}
}
}

Performance Considerations

When working with async sequences, keep these performance tips in mind:

  1. Buffering: Consider using the .buffered() method to control how many items are pre-fetched
  2. Task Priority: Set appropriate task priorities when creating tasks that process async sequences
  3. Resource Management: Always ensure proper cleanup happens when an async sequence is terminated
  4. Memory Usage: Be cautious with .collect() on unbounded sequences as it could consume excessive memory

Summary

Async sequences provide a powerful way to work with asynchronous streams of data in Swift. They allow you to:

  • Process items as they become available over time
  • Use familiar Swift sequence patterns like map, filter, and loops
  • Handle errors gracefully with try-await syntax
  • Build custom data streams with AsyncStream and AsyncThrowingStream
  • Implement complex asynchronous workflows in a readable, maintainable way

Whether you're processing WebSocket messages, handling sensor data, making paginated API requests, or working with any other form of streaming data, async sequences provide a clean and efficient solution.

Additional Resources

Exercises

  1. Create an async sequence that simulates dice rolls, generating a random number between 1 and 6 every second.
  2. Implement a file reader that processes a large file line by line as an async sequence, calculating statistics on the fly.
  3. Create a custom async sequence that generates Fibonacci numbers with a delay between each number.
  4. Modify the WebSocket example to handle reconnection attempts when the connection is lost.
  5. Implement a debouncing function for an async sequence that emits a value only after a specified time has passed without new values.


If you spot any mistakes on this website, please let me know at [email protected]. I’d greatly appreciate your feedback! :)