Skip to main content

Kotlin Flow Operators

Introduction

In the world of asynchronous programming with Kotlin, Flow provides a powerful way to handle streams of data. While basic Flow creation and collection are essential starting points, the real power of Flow comes from its extensive set of operators. These operators allow you to transform, filter, combine, and manipulate data streams in various ways.

In this guide, we'll explore the most commonly used Flow operators and see how they can help you build robust asynchronous data processing pipelines.

Flow Operator Basics

Flow operators sit between the Flow producer and the collector, allowing you to modify the emitted values before they reach the final consumer. They follow a similar pattern to collection operators in Kotlin, but are designed to work with asynchronous streams.

kotlin
flow {
// Flow producer
emit(1)
emit(2)
emit(3)
}
.map { it * 2 } // Operator
.filter { it > 3 } // Another operator
.collect { value ->
// Flow consumer
println(value)
}

Output:

4
6

Transformation Operators

map

The map operator transforms each value emitted by the flow using a provided transformation function.

kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking {
flowOf(1, 2, 3)
.map { it * 10 }
.collect { value ->
println("Received: $value")
}
}

Output:

Received: 10
Received: 20
Received: 30

mapNotNull

The mapNotNull operator applies a transformation and filters out null results in a single step.

kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking {
flowOf("1", "two", "3", "four")
.mapNotNull { it.toIntOrNull() }
.collect { value ->
println("Parsed number: $value")
}
}

Output:

Parsed number: 1
Parsed number: 3

transform

The transform operator is the most general operator that can emit any number of values for each received value.

kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking {
flowOf(1, 2, 3)
.transform { value ->
emit("Processing $value")
emit(value * value)
emit("Done with $value")
}
.collect { result ->
println(result)
}
}

Output:

Processing 1
1
Done with 1
Processing 2
4
Done with 2
Processing 3
9
Done with 3

Filtering Operators

filter

The filter operator only allows values that satisfy a given predicate to pass through.

kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking {
(1..10).asFlow()
.filter { it % 2 == 0 }
.collect { value ->
println("Even number: $value")
}
}

Output:

Even number: 2
Even number: 4
Even number: 6
Even number: 8
Even number: 10

take

The take operator limits the flow to the first N elements and then cancels the flow.

kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking {
(1..100).asFlow()
.take(3)
.collect { value ->
println("Collected: $value")
}
}

Output:

Collected: 1
Collected: 2
Collected: 3

drop

The drop operator skips the first N elements.

kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking {
flowOf(1, 2, 3, 4, 5)
.drop(2)
.collect { value ->
println("After dropping: $value")
}
}

Output:

After dropping: 3
After dropping: 4
After dropping: 5

Terminal Operators

Terminal operators are applied to the flow and start its collection. They return a single result instead of another flow.

toList, toSet

These operators collect the flow into a List or Set.

kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking {
val list = (1..5).asFlow()
.map { it * it }
.toList()

println("Collected to list: $list")

val set = flowOf(1, 1, 2, 2, 3)
.toSet()

println("Collected to set: $set")
}

Output:

Collected to list: [1, 4, 9, 16, 25]
Collected to set: [1, 2, 3]

reduce, fold

These operators combine all values emitted by the flow using a provided operation.

kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking {
val sum = (1..5).asFlow()
.reduce { accumulator, value -> accumulator + value }

println("Sum: $sum")

val result = (1..5).asFlow()
.fold(10) { accumulator, value -> accumulator + value }

println("Fold result: $result")
}

Output:

Sum: 15
Fold result: 25

Flow Combination Operators

zip

The zip operator combines two flows by combining their corresponding values.

kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking {
val numbers = (1..3).asFlow()
val strings = flowOf("one", "two", "three")

numbers.zip(strings) { a, b -> "$a -> $b" }
.collect { result ->
println(result)
}
}

Output:

1 -> one
2 -> two
3 -> three

combine

The combine operator combines the most recent values of each flow whenever any of the flows emits a value.

kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking {
val numbers = (1..3).asFlow().onEach { delay(300) }
val letters = flowOf("A", "B", "C").onEach { delay(400) }

numbers.combine(letters) { number, letter -> "$number$letter" }
.collect { result ->
println(result)
}
}

Output (approximate, as timing will vary):

1A
2A
2B
3B
3C

State Management Operators

stateIn

The stateIn operator converts a cold flow into a hot StateFlow with an initial value.

kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking {
val flowA = flow {
println("Flow started")
emit(1)
delay(500)
emit(2)
}

val stateFlow = flowA.stateIn(
scope = this,
started = SharingStarted.Eagerly,
initialValue = 0
)

println("Current value: ${stateFlow.value}")

stateFlow.collect { value ->
println("Collected: $value")
}
}

Output:

Flow started
Current value: 1
Collected: 1
Collected: 2

shareIn

The shareIn operator converts a cold flow into a hot SharedFlow that can be shared among multiple collectors.

kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking {
val originalFlow = flow {
repeat(3) {
emit(it)
delay(100)
}
}

val sharedFlow = originalFlow.shareIn(
scope = this,
started = SharingStarted.Eagerly,
replay = 0
)

// First collector
launch {
sharedFlow.collect { value ->
println("Collector 1: $value")
}
}

delay(50) // Start the second collector with a small delay

// Second collector
launch {
sharedFlow.collect { value ->
println("Collector 2: $value")
}
}

delay(300) // Wait for the flow to complete
}

Output (approximately):

Collector 1: 0
Collector 1: 1
Collector 2: 1
Collector 1: 2
Collector 2: 2

Real-World Example: Building a Temperature Monitor

Let's create a more practical example where we use flow operators to process a stream of temperature readings:

kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.random.Random
import java.text.SimpleDateFormat
import java.util.Date

// Simulates a temperature sensor
fun temperatureSensor(): Flow<Double> = flow {
while (true) {
// Emit a random temperature between 15 and 30 degrees
emit(15 + Random.nextDouble() * 15)
delay(1000) // Reading every second
}
}

fun main() = runBlocking {
val dateFormat = SimpleDateFormat("HH:mm:ss")

// Process temperature readings
temperatureSensor()
.map { temp -> round(temp * 10) / 10.0 } // Round to 1 decimal place
.filter { temp -> temp > 25 } // Only interested in high temperatures
.map { temp ->
val timestamp = dateFormat.format(Date())
"$timestamp: $temp°C"
}
.onEach { reading ->
if (reading.contains("28")) {
println("ALERT: Temperature is very high! $reading")
}
}
.take(5) // Collect only 5 high temperature readings
.collect { reading ->
println(reading)
}
}

fun round(value: Double): Double {
return kotlin.math.round(value)
}

This example demonstrates:

  1. Creating a flow from a simulated sensor
  2. Transforming the raw values (rounding)
  3. Filtering to focus only on important values
  4. Adding context (timestamps) to the data
  5. Processing side effects with onEach
  6. Limiting the collection with take

Summary

Kotlin Flow operators provide a powerful toolset for processing asynchronous data streams. By chaining these operators together, you can build sophisticated data processing pipelines that are both concise and expressive.

We've covered:

  • Transformation operators like map, mapNotNull, and transform
  • Filtering operators such as filter, take, and drop
  • Terminal operators like toList, reduce, and fold
  • Flow combination operators including zip and combine
  • State management with stateIn and shareIn

With these operators, you can efficiently manipulate data streams to meet your application's requirements while maintaining the asynchronous and non-blocking nature of coroutines.

Additional Resources

Exercises

  1. Create a flow that emits numbers from 1 to 10, transforms them to their squared values, filters out values greater than 50, and collects them into a list.

  2. Build a flow that simulates a chat by combining two flows: one for user messages and one for system notifications.

  3. Implement a simple search feature that debounces user input (waits until the user stops typing) before performing a search operation using flow operators.



If you spot any mistakes on this website, please let me know at [email protected]. I’d greatly appreciate your feedback! :)