Kotlin Flow
Introduction
Kotlin Flow is a part of the Kotlin Coroutines library that provides a way to handle streams of asynchronous data. Think of Flow as a data stream that can emit multiple values sequentially over time, unlike suspend functions that return only a single value. Flow is Kotlin's approach to reactive programming, similar to RxJava but designed from the ground up to work seamlessly with coroutines and suspend functions.
Flow is particularly useful when you need to:
- Process a sequence of values asynchronously
- Handle streams of data coming from different sources
- Process events like user input, network responses, or database updates
- Perform operations on data streams like transforming, filtering, or combining them
Basic Flow Concepts
Creating a Flow
Let's start with a simple example of creating and collecting a Flow:
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
// Create a flow
val numbersFlow = flow {
for (i in 1..5) {
delay(100) // Pretend we're doing something useful
emit(i) // Emit each value
println("Emitted $i")
}
}
// Collect the flow
println("Starting to collect")
numbersFlow.collect { value ->
println("Collected $value")
}
println("Collection complete")
}
Output:
Starting to collect
Emitted 1
Collected 1
Emitted 2
Collected 2
Emitted 3
Collected 3
Emitted 4
Collected 4
Emitted 5
Collected 5
Collection complete
In this example, the flow
builder creates a Flow that emits numbers from 1 to 5 with a delay between each emission. The collect
function is a terminal operator that consumes each emitted value.
Flow Builders
Kotlin provides several ways to create flows:
fun main() = runBlocking {
// 1. Using the flow builder
val flow1 = flow {
emit(1)
emit(2)
emit(3)
}
// 2. Using flowOf for a fixed set of values
val flow2 = flowOf(4, 5, 6)
// 3. Converting a collection to a flow
val flow3 = listOf(7, 8, 9).asFlow()
// Collecting all flows
flow1.collect { println("Flow1: $it") }
flow2.collect { println("Flow2: $it") }
flow3.collect { println("Flow3: $it") }
}
Output:
Flow1: 1
Flow1: 2
Flow1: 3
Flow2: 4
Flow2: 5
Flow2: 6
Flow3: 7
Flow3: 8
Flow3: 9
Flow Operators
Flow provides a rich set of operators to transform, combine, and manipulate stream data. Let's explore some common operators:
Map and Filter
fun main() = runBlocking {
val numbersFlow = flowOf(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
// Use map to transform each value
val squaredNumbers = numbersFlow
.filter { it % 2 == 0 } // Keep only even numbers
.map { it * it } // Square each number
squaredNumbers.collect { println("Squared: $it") }
}
Output:
Squared: 4
Squared: 16
Squared: 36
Squared: 64
Squared: 100
Transform
The transform
operator allows for more complex transformations where you can emit multiple values for each input value:
fun main() = runBlocking {
val numbersFlow = flowOf(1, 2, 3)
val transformedFlow = numbersFlow.transform { value ->
emit("Processing $value")
emit(value * value)
emit("Done with $value")
}
transformedFlow.collect { println(it) }
}
Output:
Processing 1
1
Done with 1
Processing 2
4
Done with 2
Processing 3
9
Done with 3
Take and Drop
fun main() = runBlocking {
val numbersFlow = flowOf(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
println("Taking first 3 elements:")
numbersFlow.take(3).collect { println(it) }
println("Dropping first 7 elements:")
numbersFlow.drop(7).collect { println(it) }
}
Output:
Taking first 3 elements:
1
2
3
Dropping first 7 elements:
8
9
10
Flow Context and Concurrency
Flows are context-preserving by default, which means they collect in the same coroutine context where the collection happens. This is important to understand for UI frameworks where you might need to update UI elements.
Context Preservation
fun main() = runBlocking {
val flow = flow {
println("Flow started in ${Thread.currentThread().name}")
emit(1)
}
withContext(Dispatchers.IO) {
flow.collect {
println("Collected $it in ${Thread.currentThread().name}")
}
}
}
Output might look like:
Flow started in DefaultDispatcher-worker-1
Collected 1 in DefaultDispatcher-worker-1
Notice how both the flow emission and collection happen on the IO dispatcher.
flowOn Operator
You can use flowOn
to change the context where the flow is emitted:
fun main() = runBlocking {
val flow = flow {
println("Flow started in ${Thread.currentThread().name}")
emit(1)
}.flowOn(Dispatchers.IO) // Upstream flow runs on IO dispatcher
// Collection happens in the current context
flow.collect {
println("Collected $it in ${Thread.currentThread().name}")
}
}
Output might look like:
Flow started in DefaultDispatcher-worker-1
Collected 1 in main
Flow Error Handling
Flows provide several ways to handle errors:
Catching Errors
fun main() = runBlocking {
val flow = flow {
emit(1)
emit(2)
throw RuntimeException("Error in flow")
emit(3)
}
flow
.catch { e -> println("Caught exception: ${e.message}") }
.collect { value -> println("Collected $value") }
}
Output:
Collected 1
Collected 2
Caught exception: Error in flow
Completing with onCompletion
The onCompletion
operator is called when the flow is completed or cancelled:
fun main() = runBlocking {
val flow = flow {
emit(1)
emit(2)
throw RuntimeException("Error in flow")
emit(3)
}
flow
.onCompletion { cause ->
if (cause != null)
println("Flow completed with exception: ${cause.message}")
else
println("Flow completed successfully")
}
.catch { e -> println("Caught: ${e.message}") }
.collect { value -> println("Collected $value") }
}
Output:
Collected 1
Collected 2
Flow completed with exception: Error in flow
Caught: Error in flow
StateFlow and SharedFlow
Kotlin Flow library includes two specialized types of hot flows:
StateFlow
StateFlow
is a state-holder observable flow that emits the current and new states to its collectors:
fun main() = runBlocking {
val mutableStateFlow = MutableStateFlow(0) // Initial value is 0
// Start a collector
val job = launch {
mutableStateFlow.collect { value ->
println("StateFlow value: $value")
}
}
// Emit new values
delay(100)
mutableStateFlow.value = 1
delay(100)
mutableStateFlow.value = 2
delay(100)
job.cancel() // Stop collecting
}
Output:
StateFlow value: 0
StateFlow value: 1
StateFlow value: 2
SharedFlow
SharedFlow
is a hot flow that emits values to all collectors:
fun main() = runBlocking {
val mutableSharedFlow = MutableSharedFlow<Int>(replay = 0)
// Start first collector
val job1 = launch {
mutableSharedFlow.collect { value ->
println("Collector 1 received: $value")
}
}
delay(100)
// Emit a value
mutableSharedFlow.emit(1)
delay(100)
// Start second collector
val job2 = launch {
mutableSharedFlow.collect { value ->
println("Collector 2 received: $value")
}
}
delay(100)
// Emit more values
mutableSharedFlow.emit(2)
delay(100)
mutableSharedFlow.emit(3)
delay(100)
job1.cancel()
job2.cancel()
}
Output:
Collector 1 received: 1
Collector 1 received: 2
Collector 2 received: 2
Collector 1 received: 3
Collector 2 received: 3
Real-World Example: API Requests with Flow
Let's see how Flow can be used in a real application for fetching data from an API:
// Simulating a network API service
class WeatherApi {
suspend fun getTemperature(city: String): Int {
delay(1000) // Simulate network delay
return when (city.lowercase()) {
"london" -> 15
"tokyo" -> 25
"new york" -> 20
else -> 18
}
}
}
// Repository using Flow
class WeatherRepository(private val api: WeatherApi) {
fun getCityTemperatures(vararg cities: String): Flow<Pair<String, Int>> = flow {
for (city in cities) {
val temperature = api.getTemperature(city)
emit(city to temperature)
}
}
}
fun main() = runBlocking {
val api = WeatherApi()
val repository = WeatherRepository(api)
// UI code
println("Fetching temperatures...")
repository
.getCityTemperatures("London", "Tokyo", "New York")
.onEach { (city, temp) ->
println("Update UI: $city is currently $temp°C")
}
.catch { error ->
println("Error fetching data: ${error.message}")
}
.collect()
println("All temperatures fetched!")
}
Output:
Fetching temperatures...
Update UI: London is currently 15°C
Update UI: Tokyo is currently 25°C
Update UI: New York is currently 20°C
All temperatures fetched!
This example shows how Flow can be used to create a stream of API responses, processing each one as it becomes available rather than waiting for all responses to complete.
Flow and Android
While this is a Kotlin course, it's worth noting that Flow is particularly powerful in Android development:
// ViewModel class in Android
class WeatherViewModel(private val repository: WeatherRepository) : ViewModel() {
private val _temperatures = MutableStateFlow<List<Pair<String, Int>>>(emptyList())
val temperatures: StateFlow<List<Pair<String, Int>>> = _temperatures
fun loadTemperatures(vararg cities: String) {
viewModelScope.launch {
repository
.getCityTemperatures(*cities)
.catch { e ->
// Handle error
}
.collect { cityTemp ->
_temperatures.value = _temperatures.value + cityTemp
}
}
}
}
// Activity or Fragment
fun setupUi() {
lifecycleScope.launch {
repeatOnLifecycle(Lifecycle.State.STARTED) {
viewModel.temperatures.collect { temperatures ->
// Update UI with the list of temperatures
}
}
}
}
Summary
Kotlin Flow is a powerful API for handling asynchronous streams of data:
- Flow is built on top of coroutines and provides a way to emit multiple values sequentially
- Flow builders include
flow {}
,flowOf()
, and.asFlow()
extension functions - Flow operators allow you to transform, filter, and otherwise manipulate data streams
- Flow context management helps control which dispatcher is used for flow emission and collection
- Error handling with
catch
andonCompletion
enables robust error management - StateFlow and SharedFlow provide specialized flow types for state management and sharing values
- Flow integrates seamlessly with Android architecture components for reactive UIs
Further Learning Resources
Exercises
- Create a Flow that emits the Fibonacci sequence up to a specified number.
- Implement a simple search feature that uses
debounce
to avoid making too many API requests. - Create a temperature converter that uses Flow transformations to convert between Celsius and Fahrenheit.
- Build a sample app that uses StateFlow to maintain UI state and SharedFlow to handle one-time events.
- Implement a Flow that reads lines from a file asynchronously and counts the occurrences of a specific word.
By mastering Kotlin Flow, you'll have a powerful tool for handling asynchronous data streams in your applications!
If you spot any mistakes on this website, please let me know at [email protected]. I’d greatly appreciate your feedback! :)