Kotlin Flow Operators
Introduction
In the world of asynchronous programming with Kotlin, Flow provides a powerful way to handle streams of data. While basic Flow creation and collection are essential starting points, the real power of Flow comes from its extensive set of operators. These operators allow you to transform, filter, combine, and manipulate data streams in various ways.
In this guide, we'll explore the most commonly used Flow operators and see how they can help you build robust asynchronous data processing pipelines.
Flow Operator Basics
Flow operators sit between the Flow producer and the collector, allowing you to modify the emitted values before they reach the final consumer. They follow a similar pattern to collection operators in Kotlin, but are designed to work with asynchronous streams.
flow {
// Flow producer
emit(1)
emit(2)
emit(3)
}
.map { it * 2 } // Operator
.filter { it > 3 } // Another operator
.collect { value ->
// Flow consumer
println(value)
}
Output:
4
6
Transformation Operators
map
The map
operator transforms each value emitted by the flow using a provided transformation function.
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
flowOf(1, 2, 3)
.map { it * 10 }
.collect { value ->
println("Received: $value")
}
}
Output:
Received: 10
Received: 20
Received: 30
mapNotNull
The mapNotNull
operator applies a transformation and filters out null results in a single step.
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
flowOf("1", "two", "3", "four")
.mapNotNull { it.toIntOrNull() }
.collect { value ->
println("Parsed number: $value")
}
}
Output:
Parsed number: 1
Parsed number: 3
transform
The transform
operator is the most general operator that can emit any number of values for each received value.
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
flowOf(1, 2, 3)
.transform { value ->
emit("Processing $value")
emit(value * value)
emit("Done with $value")
}
.collect { result ->
println(result)
}
}
Output:
Processing 1
1
Done with 1
Processing 2
4
Done with 2
Processing 3
9
Done with 3
Filtering Operators
filter
The filter
operator only allows values that satisfy a given predicate to pass through.
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
(1..10).asFlow()
.filter { it % 2 == 0 }
.collect { value ->
println("Even number: $value")
}
}
Output:
Even number: 2
Even number: 4
Even number: 6
Even number: 8
Even number: 10
take
The take
operator limits the flow to the first N elements and then cancels the flow.
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
(1..100).asFlow()
.take(3)
.collect { value ->
println("Collected: $value")
}
}
Output:
Collected: 1
Collected: 2
Collected: 3
drop
The drop
operator skips the first N elements.
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
flowOf(1, 2, 3, 4, 5)
.drop(2)
.collect { value ->
println("After dropping: $value")
}
}
Output:
After dropping: 3
After dropping: 4
After dropping: 5
Terminal Operators
Terminal operators are applied to the flow and start its collection. They return a single result instead of another flow.
toList, toSet
These operators collect the flow into a List or Set.
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
val list = (1..5).asFlow()
.map { it * it }
.toList()
println("Collected to list: $list")
val set = flowOf(1, 1, 2, 2, 3)
.toSet()
println("Collected to set: $set")
}
Output:
Collected to list: [1, 4, 9, 16, 25]
Collected to set: [1, 2, 3]
reduce, fold
These operators combine all values emitted by the flow using a provided operation.
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
val sum = (1..5).asFlow()
.reduce { accumulator, value -> accumulator + value }
println("Sum: $sum")
val result = (1..5).asFlow()
.fold(10) { accumulator, value -> accumulator + value }
println("Fold result: $result")
}
Output:
Sum: 15
Fold result: 25
Flow Combination Operators
zip
The zip
operator combines two flows by combining their corresponding values.
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
val numbers = (1..3).asFlow()
val strings = flowOf("one", "two", "three")
numbers.zip(strings) { a, b -> "$a -> $b" }
.collect { result ->
println(result)
}
}
Output:
1 -> one
2 -> two
3 -> three
combine
The combine
operator combines the most recent values of each flow whenever any of the flows emits a value.
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
val numbers = (1..3).asFlow().onEach { delay(300) }
val letters = flowOf("A", "B", "C").onEach { delay(400) }
numbers.combine(letters) { number, letter -> "$number$letter" }
.collect { result ->
println(result)
}
}
Output (approximate, as timing will vary):
1A
2A
2B
3B
3C
State Management Operators
stateIn
The stateIn
operator converts a cold flow into a hot StateFlow with an initial value.
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
val flowA = flow {
println("Flow started")
emit(1)
delay(500)
emit(2)
}
val stateFlow = flowA.stateIn(
scope = this,
started = SharingStarted.Eagerly,
initialValue = 0
)
println("Current value: ${stateFlow.value}")
stateFlow.collect { value ->
println("Collected: $value")
}
}
Output:
Flow started
Current value: 1
Collected: 1
Collected: 2
shareIn
The shareIn
operator converts a cold flow into a hot SharedFlow that can be shared among multiple collectors.
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
val originalFlow = flow {
repeat(3) {
emit(it)
delay(100)
}
}
val sharedFlow = originalFlow.shareIn(
scope = this,
started = SharingStarted.Eagerly,
replay = 0
)
// First collector
launch {
sharedFlow.collect { value ->
println("Collector 1: $value")
}
}
delay(50) // Start the second collector with a small delay
// Second collector
launch {
sharedFlow.collect { value ->
println("Collector 2: $value")
}
}
delay(300) // Wait for the flow to complete
}
Output (approximately):
Collector 1: 0
Collector 1: 1
Collector 2: 1
Collector 1: 2
Collector 2: 2
Real-World Example: Building a Temperature Monitor
Let's create a more practical example where we use flow operators to process a stream of temperature readings:
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.random.Random
import java.text.SimpleDateFormat
import java.util.Date
// Simulates a temperature sensor
fun temperatureSensor(): Flow<Double> = flow {
while (true) {
// Emit a random temperature between 15 and 30 degrees
emit(15 + Random.nextDouble() * 15)
delay(1000) // Reading every second
}
}
fun main() = runBlocking {
val dateFormat = SimpleDateFormat("HH:mm:ss")
// Process temperature readings
temperatureSensor()
.map { temp -> round(temp * 10) / 10.0 } // Round to 1 decimal place
.filter { temp -> temp > 25 } // Only interested in high temperatures
.map { temp ->
val timestamp = dateFormat.format(Date())
"$timestamp: $temp°C"
}
.onEach { reading ->
if (reading.contains("28")) {
println("ALERT: Temperature is very high! $reading")
}
}
.take(5) // Collect only 5 high temperature readings
.collect { reading ->
println(reading)
}
}
fun round(value: Double): Double {
return kotlin.math.round(value)
}
This example demonstrates:
- Creating a flow from a simulated sensor
- Transforming the raw values (rounding)
- Filtering to focus only on important values
- Adding context (timestamps) to the data
- Processing side effects with
onEach
- Limiting the collection with
take
Summary
Kotlin Flow operators provide a powerful toolset for processing asynchronous data streams. By chaining these operators together, you can build sophisticated data processing pipelines that are both concise and expressive.
We've covered:
- Transformation operators like
map
,mapNotNull
, andtransform
- Filtering operators such as
filter
,take
, anddrop
- Terminal operators like
toList
,reduce
, andfold
- Flow combination operators including
zip
andcombine
- State management with
stateIn
andshareIn
With these operators, you can efficiently manipulate data streams to meet your application's requirements while maintaining the asynchronous and non-blocking nature of coroutines.
Additional Resources
Exercises
-
Create a flow that emits numbers from 1 to 10, transforms them to their squared values, filters out values greater than 50, and collects them into a list.
-
Build a flow that simulates a chat by combining two flows: one for user messages and one for system notifications.
-
Implement a simple search feature that debounces user input (waits until the user stops typing) before performing a search operation using flow operators.
If you spot any mistakes on this website, please let me know at [email protected]. I’d greatly appreciate your feedback! :)