Skip to main content

Kotlin Flow Collection

Introduction

Kotlin Flow is a powerful feature built on top of coroutines that allows you to process streams of asynchronous data in a sequential manner. Flow collection is the process of gathering and processing values emitted by a Flow. Unlike regular suspending functions that return a single value, Flows can emit multiple values over time, making them perfect for representing streams of data like real-time updates, user interactions, or database changes.

In this tutorial, we'll explore how to collect values from Kotlin Flows and learn different collection techniques that will help you build responsive applications.

Prerequisites

Before diving into Flow collection, you should be familiar with:

  • Basic Kotlin syntax
  • Coroutines fundamentals
  • Understanding of suspending functions

Flow Basics Recap

A Flow is a type that can emit multiple values sequentially, as opposed to suspending functions that return only a single value. It's similar to sequences in Kotlin, but with support for asynchronous operations.

Here's a simple Flow that emits three numbers:

kotlin
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow

fun simpleFlow(): Flow<Int> {
return flow {
println("Flow started")
for (i in 1..3) {
delay(100)
emit(i) // Emits values to the flow
}
}
}

Basic Flow Collection

The most straightforward way to collect values from a Flow is using the collect function. This function will receive each emitted value and execute the provided lambda for each value.

kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking {
val flow = simpleFlow()

flow.collect { value ->
println("Received: $value")
}
}

Output:

Flow started
Received: 1
Received: 2
Received: 3

Notice that the Flow is cold - it doesn't start emitting until we call collect. Each time we collect the Flow, it starts from the beginning.

Terminal Flow Operators

Kotlin provides several terminal operators that collect Flow values in different ways:

1. toList() and toSet()

These operators collect all Flow values into a List or Set:

kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking {
val flow = simpleFlow()

// Collect as List
val list = flow.toList()
println("Collected list: $list")

// Collect as Set
val set = flow.toSet()
println("Collected set: $set")
}

Output:

Flow started
Collected list: [1, 2, 3]
Flow started
Collected set: [1, 2, 3]

2. first() and single()

When you only need specific elements from a Flow:

kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking {
// Get only the first element
val firstValue = simpleFlow().first()
println("First value: $firstValue")

// For flows with exactly one element
try {
val singleValue = simpleFlow().single()
println("Single value: $singleValue")
} catch (e: IllegalArgumentException) {
println("Error: ${e.message}")
}

// With a predicate to get the first matching element
val firstEvenValue = simpleFlow().first { it % 2 == 0 }
println("First even value: $firstEvenValue")
}

Output:

Flow started
First value: 1
Flow started
Error: Flow has more than one element
Flow started
First even value: 2

3. reduce() and fold()

These operators allow you to accumulate values from a Flow:

kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking {
val flow = simpleFlow()

// Reduce to sum all values
val sum = flow.reduce { accumulator, value ->
println("Reducing: acc=$accumulator, value=$value")
accumulator + value
}
println("Sum: $sum")

// Fold to calculate product with initial value
val product = simpleFlow().fold(1) { acc, value ->
println("Folding: acc=$acc, value=$value")
acc * value
}
println("Product: $product")
}

Output:

Flow started
Reducing: acc=1, value=2
Reducing: acc=3, value=3
Sum: 6
Flow started
Folding: acc=1, value=1
Folding: acc=1, value=2
Folding: acc=2, value=3
Product: 6

Flow Collection Context

Flow collection always happens in the context of the coroutine that calls the collection function. This is important to understand, especially when working with UI applications.

kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking {
val flow = flow {
println("Flow started in ${Thread.currentThread().name}")
for (i in 1..3) {
delay(100)
emit(i)
}
}

// Collect in the Main coroutine context
flow.collect { value ->
println("Collected $value in ${Thread.currentThread().name}")
}

// Collect in a different context
withContext(Dispatchers.IO) {
flow.collect { value ->
println("Collected $value in IO context: ${Thread.currentThread().name}")
}
}
}

Output:

Flow started in main
Collected 1 in main
Collected 2 in main
Collected 3 in main
Flow started in DefaultDispatcher-worker-1
Collected 1 in IO context: DefaultDispatcher-worker-1
Collected 2 in IO context: DefaultDispatcher-worker-1
Collected 3 in IO context: DefaultDispatcher-worker-1

Collecting with Timeout

Sometimes you need to limit how long a Flow collection should take. You can use withTimeoutOrNull:

kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking {
val flow = flow {
for (i in 1..5) {
delay(100)
emit(i)
println("Emitted $i")
}
}

// Collect with a timeout of 250ms
val result = withTimeoutOrNull(250) {
flow.collect { value ->
println("Collected $value")
}
"Completed"
}

println("Result: $result") // Will be null as timeout occurs
}

Output:

Emitted 1
Collected 1
Emitted 2
Collected 2
Result: null

LaunchIn - Non-blocking Collection

When you don't want to block the current coroutine while collecting a Flow, you can use launchIn:

kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking {
val flow = simpleFlow()

// Non-blocking collection
val job = flow
.onEach { value -> println("Received $value") }
.launchIn(this) // starts collection in a new coroutine

println("Flow collection started in background")
delay(150) // Do some other work
println("Cancelling collection")
job.cancel() // Cancel the collection

delay(300) // Give time to see what happens
}

Output:

Flow collection started in background
Flow started
Received 1
Received 2
Cancelling collection

Let's build a practical example of using Flow collection for a real-time search feature:

kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.time.Duration.Companion.milliseconds

// Simulated API service
class SearchService {
suspend fun search(query: String): List<String> {
delay(300) // Simulate network delay
return listOf("Result 1 for $query", "Result 2 for $query", "Result 3 for $query")
}
}

fun main() = runBlocking {
val searchService = SearchService()

// Simulate user typing in search box
val searchFlow = flow {
emit("Kot")
delay(200)
emit("Kotl")
delay(300)
emit("Kotlin")
delay(500)
emit("Kotlin C")
delay(200)
emit("Kotlin Coroutines")
}

// Process search queries with debounce
searchFlow
.debounce(400.milliseconds) // Wait for user to stop typing
.distinctUntilChanged() // Ignore duplicate queries
.flatMapLatest { query -> // Cancel previous search when new query comes
flow {
println("Searching for: $query")
val results = searchService.search(query)
emit(results)
}
}
.collect { results ->
println("Results: $results")
}
}

Output:

Searching for: Kotl
Results: [Result 1 for Kotl, Result 2 for Kotl, Result 3 for Kotl]
Searching for: Kotlin
Results: [Result 1 for Kotlin, Result 2 for Kotlin, Result 3 for Kotlin]
Searching for: Kotlin Coroutines
Results: [Result 1 for Kotlin Coroutines, Result 2 for Kotlin Coroutines, Result 3 for Kotlin Coroutines]

Collecting in Android

When working with Android, you'll often need to collect Flows on the UI thread. Here's how you can safely collect Flows in Android:

kotlin
// In a ViewModel
private val _searchResults = MutableStateFlow<List<SearchResult>>(emptyList())
val searchResults: StateFlow<List<SearchResult>> = _searchResults

// In an Activity or Fragment
lifecycleScope.launch {
// Collect safely only when the view is in STARTED state
viewModel.searchResults
.flowWithLifecycle(lifecycle, Lifecycle.State.STARTED)
.collect { results ->
// Update UI safely
searchAdapter.submitList(results)
}
}

// Alternative using repeatOnLifecycle
lifecycleScope.launch {
lifecycle.repeatOnLifecycle(Lifecycle.State.STARTED) {
viewModel.searchResults.collect { results ->
searchAdapter.submitList(results)
}
}
}

Common Collection Strategies

1. Error Handling

Handle errors during collection with catch:

kotlin
flow {
emit(1)
throw RuntimeException("Error in flow")
}
.catch { e ->
println("Caught error: ${e.message}")
emit(-1) // Provide fallback value
}
.collect { value ->
println("Collected: $value")
}

Output:

Collected: 1
Caught error: Error in flow
Collected: -1

2. Retry Logic

Automatically retry Flow on failure:

kotlin
var attempts = 0

flow {
println("Attempting to emit (attempt ${++attempts})")
if (attempts < 3) {
throw RuntimeException("Failed attempt $attempts")
}
emit("Success after $attempts attempts")
}
.retry(2) { cause ->
println("Retrying after error: ${cause.message}")
true // Always retry
}
.catch { e ->
println("All retries failed: ${e.message}")
emit("Using fallback value")
}
.collect { result ->
println("Final result: $result")
}

Output:

Attempting to emit (attempt 1)
Retrying after error: Failed attempt 1
Attempting to emit (attempt 2)
Retrying after error: Failed attempt 2
Attempting to emit (attempt 3)
Final result: Success after 3 attempts

Summary

In this tutorial, we've covered the essential aspects of Flow collection in Kotlin:

  • Basic Flow collection using the collect function
  • Terminal operators like toList(), first(), reduce(), and fold()
  • Context preservation during Flow collection
  • Collection with timeouts
  • Non-blocking collection using launchIn
  • Practical examples including real-time search
  • Android-specific collection strategies
  • Error handling and retry logic

Kotlin Flow collection provides powerful tools for handling asynchronous streams of data in a clean, efficient manner. By understanding the different collection operators and strategies, you can build responsive applications that handle data streams elegantly.

Exercises

  1. Create a Flow that emits 10 random numbers, and collect only the even numbers
  2. Implement a Flow that simulates a countdown timer (10, 9, 8, ..., 1)
  3. Create a Flow that reads lines from a file and transform each line to uppercase
  4. Build a simple temperature monitoring system using Flow, which emits temperature readings every second and triggers an alert when the temperature exceeds a threshold
  5. Implement a retry mechanism for a Flow that occasionally fails, with exponential backoff between retries

Additional Resources

Happy coding with Kotlin Flows!



If you spot any mistakes on this website, please let me know at [email protected]. I’d greatly appreciate your feedback! :)