Skip to main content

Kotlin Flow

Introduction

Kotlin Flow is a part of the Kotlin Coroutines library that provides a way to handle streams of asynchronous data. Think of Flow as a data stream that can emit multiple values sequentially over time, unlike suspend functions that return only a single value. Flow is Kotlin's approach to reactive programming, similar to RxJava but designed from the ground up to work seamlessly with coroutines and suspend functions.

Flow is particularly useful when you need to:

  • Process a sequence of values asynchronously
  • Handle streams of data coming from different sources
  • Process events like user input, network responses, or database updates
  • Perform operations on data streams like transforming, filtering, or combining them

Basic Flow Concepts

Creating a Flow

Let's start with a simple example of creating and collecting a Flow:

kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking {
// Create a flow
val numbersFlow = flow {
for (i in 1..5) {
delay(100) // Pretend we're doing something useful
emit(i) // Emit each value
println("Emitted $i")
}
}

// Collect the flow
println("Starting to collect")
numbersFlow.collect { value ->
println("Collected $value")
}
println("Collection complete")
}

Output:

Starting to collect
Emitted 1
Collected 1
Emitted 2
Collected 2
Emitted 3
Collected 3
Emitted 4
Collected 4
Emitted 5
Collected 5
Collection complete

In this example, the flow builder creates a Flow that emits numbers from 1 to 5 with a delay between each emission. The collect function is a terminal operator that consumes each emitted value.

Flow Builders

Kotlin provides several ways to create flows:

kotlin
fun main() = runBlocking {
// 1. Using the flow builder
val flow1 = flow {
emit(1)
emit(2)
emit(3)
}

// 2. Using flowOf for a fixed set of values
val flow2 = flowOf(4, 5, 6)

// 3. Converting a collection to a flow
val flow3 = listOf(7, 8, 9).asFlow()

// Collecting all flows
flow1.collect { println("Flow1: $it") }
flow2.collect { println("Flow2: $it") }
flow3.collect { println("Flow3: $it") }
}

Output:

Flow1: 1
Flow1: 2
Flow1: 3
Flow2: 4
Flow2: 5
Flow2: 6
Flow3: 7
Flow3: 8
Flow3: 9

Flow Operators

Flow provides a rich set of operators to transform, combine, and manipulate stream data. Let's explore some common operators:

Map and Filter

kotlin
fun main() = runBlocking {
val numbersFlow = flowOf(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

// Use map to transform each value
val squaredNumbers = numbersFlow
.filter { it % 2 == 0 } // Keep only even numbers
.map { it * it } // Square each number

squaredNumbers.collect { println("Squared: $it") }
}

Output:

Squared: 4
Squared: 16
Squared: 36
Squared: 64
Squared: 100

Transform

The transform operator allows for more complex transformations where you can emit multiple values for each input value:

kotlin
fun main() = runBlocking {
val numbersFlow = flowOf(1, 2, 3)

val transformedFlow = numbersFlow.transform { value ->
emit("Processing $value")
emit(value * value)
emit("Done with $value")
}

transformedFlow.collect { println(it) }
}

Output:

Processing 1
1
Done with 1
Processing 2
4
Done with 2
Processing 3
9
Done with 3

Take and Drop

kotlin
fun main() = runBlocking {
val numbersFlow = flowOf(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

println("Taking first 3 elements:")
numbersFlow.take(3).collect { println(it) }

println("Dropping first 7 elements:")
numbersFlow.drop(7).collect { println(it) }
}

Output:

Taking first 3 elements:
1
2
3
Dropping first 7 elements:
8
9
10

Flow Context and Concurrency

Flows are context-preserving by default, which means they collect in the same coroutine context where the collection happens. This is important to understand for UI frameworks where you might need to update UI elements.

Context Preservation

kotlin
fun main() = runBlocking {
val flow = flow {
println("Flow started in ${Thread.currentThread().name}")
emit(1)
}

withContext(Dispatchers.IO) {
flow.collect {
println("Collected $it in ${Thread.currentThread().name}")
}
}
}

Output might look like:

Flow started in DefaultDispatcher-worker-1
Collected 1 in DefaultDispatcher-worker-1

Notice how both the flow emission and collection happen on the IO dispatcher.

flowOn Operator

You can use flowOn to change the context where the flow is emitted:

kotlin
fun main() = runBlocking {
val flow = flow {
println("Flow started in ${Thread.currentThread().name}")
emit(1)
}.flowOn(Dispatchers.IO) // Upstream flow runs on IO dispatcher

// Collection happens in the current context
flow.collect {
println("Collected $it in ${Thread.currentThread().name}")
}
}

Output might look like:

Flow started in DefaultDispatcher-worker-1
Collected 1 in main

Flow Error Handling

Flows provide several ways to handle errors:

Catching Errors

kotlin
fun main() = runBlocking {
val flow = flow {
emit(1)
emit(2)
throw RuntimeException("Error in flow")
emit(3)
}

flow
.catch { e -> println("Caught exception: ${e.message}") }
.collect { value -> println("Collected $value") }
}

Output:

Collected 1
Collected 2
Caught exception: Error in flow

Completing with onCompletion

The onCompletion operator is called when the flow is completed or cancelled:

kotlin
fun main() = runBlocking {
val flow = flow {
emit(1)
emit(2)
throw RuntimeException("Error in flow")
emit(3)
}

flow
.onCompletion { cause ->
if (cause != null)
println("Flow completed with exception: ${cause.message}")
else
println("Flow completed successfully")
}
.catch { e -> println("Caught: ${e.message}") }
.collect { value -> println("Collected $value") }
}

Output:

Collected 1
Collected 2
Flow completed with exception: Error in flow
Caught: Error in flow

StateFlow and SharedFlow

Kotlin Flow library includes two specialized types of hot flows:

StateFlow

StateFlow is a state-holder observable flow that emits the current and new states to its collectors:

kotlin
fun main() = runBlocking {
val mutableStateFlow = MutableStateFlow(0) // Initial value is 0

// Start a collector
val job = launch {
mutableStateFlow.collect { value ->
println("StateFlow value: $value")
}
}

// Emit new values
delay(100)
mutableStateFlow.value = 1
delay(100)
mutableStateFlow.value = 2
delay(100)

job.cancel() // Stop collecting
}

Output:

StateFlow value: 0
StateFlow value: 1
StateFlow value: 2

SharedFlow

SharedFlow is a hot flow that emits values to all collectors:

kotlin
fun main() = runBlocking {
val mutableSharedFlow = MutableSharedFlow<Int>(replay = 0)

// Start first collector
val job1 = launch {
mutableSharedFlow.collect { value ->
println("Collector 1 received: $value")
}
}

delay(100)

// Emit a value
mutableSharedFlow.emit(1)

delay(100)

// Start second collector
val job2 = launch {
mutableSharedFlow.collect { value ->
println("Collector 2 received: $value")
}
}

delay(100)

// Emit more values
mutableSharedFlow.emit(2)
delay(100)
mutableSharedFlow.emit(3)

delay(100)
job1.cancel()
job2.cancel()
}

Output:

Collector 1 received: 1
Collector 1 received: 2
Collector 2 received: 2
Collector 1 received: 3
Collector 2 received: 3

Real-World Example: API Requests with Flow

Let's see how Flow can be used in a real application for fetching data from an API:

kotlin
// Simulating a network API service
class WeatherApi {
suspend fun getTemperature(city: String): Int {
delay(1000) // Simulate network delay
return when (city.lowercase()) {
"london" -> 15
"tokyo" -> 25
"new york" -> 20
else -> 18
}
}
}

// Repository using Flow
class WeatherRepository(private val api: WeatherApi) {
fun getCityTemperatures(vararg cities: String): Flow<Pair<String, Int>> = flow {
for (city in cities) {
val temperature = api.getTemperature(city)
emit(city to temperature)
}
}
}

fun main() = runBlocking {
val api = WeatherApi()
val repository = WeatherRepository(api)

// UI code
println("Fetching temperatures...")

repository
.getCityTemperatures("London", "Tokyo", "New York")
.onEach { (city, temp) ->
println("Update UI: $city is currently $temp°C")
}
.catch { error ->
println("Error fetching data: ${error.message}")
}
.collect()

println("All temperatures fetched!")
}

Output:

Fetching temperatures...
Update UI: London is currently 15°C
Update UI: Tokyo is currently 25°C
Update UI: New York is currently 20°C
All temperatures fetched!

This example shows how Flow can be used to create a stream of API responses, processing each one as it becomes available rather than waiting for all responses to complete.

Flow and Android

While this is a Kotlin course, it's worth noting that Flow is particularly powerful in Android development:

kotlin
// ViewModel class in Android
class WeatherViewModel(private val repository: WeatherRepository) : ViewModel() {
private val _temperatures = MutableStateFlow<List<Pair<String, Int>>>(emptyList())
val temperatures: StateFlow<List<Pair<String, Int>>> = _temperatures

fun loadTemperatures(vararg cities: String) {
viewModelScope.launch {
repository
.getCityTemperatures(*cities)
.catch { e ->
// Handle error
}
.collect { cityTemp ->
_temperatures.value = _temperatures.value + cityTemp
}
}
}
}

// Activity or Fragment
fun setupUi() {
lifecycleScope.launch {
repeatOnLifecycle(Lifecycle.State.STARTED) {
viewModel.temperatures.collect { temperatures ->
// Update UI with the list of temperatures
}
}
}
}

Summary

Kotlin Flow is a powerful API for handling asynchronous streams of data:

  • Flow is built on top of coroutines and provides a way to emit multiple values sequentially
  • Flow builders include flow {}, flowOf(), and .asFlow() extension functions
  • Flow operators allow you to transform, filter, and otherwise manipulate data streams
  • Flow context management helps control which dispatcher is used for flow emission and collection
  • Error handling with catch and onCompletion enables robust error management
  • StateFlow and SharedFlow provide specialized flow types for state management and sharing values
  • Flow integrates seamlessly with Android architecture components for reactive UIs

Further Learning Resources

  1. Official Kotlin Flow documentation
  2. Android Developers: Kotlin Flow Guide
  3. Advanced Flow Operations

Exercises

  1. Create a Flow that emits the Fibonacci sequence up to a specified number.
  2. Implement a simple search feature that uses debounce to avoid making too many API requests.
  3. Create a temperature converter that uses Flow transformations to convert between Celsius and Fahrenheit.
  4. Build a sample app that uses StateFlow to maintain UI state and SharedFlow to handle one-time events.
  5. Implement a Flow that reads lines from a file asynchronously and counts the occurrences of a specific word.

By mastering Kotlin Flow, you'll have a powerful tool for handling asynchronous data streams in your applications!



If you spot any mistakes on this website, please let me know at [email protected]. I’d greatly appreciate your feedback! :)