Kotlin

Kotlin Coroutine Flow API: An Efficient Way To Handle Streams

Google+ Pinterest LinkedIn Tumblr

Flow is a new API for Kotlin coroutines, with the focus on data streams that sequentially emits value–the goal is to make back-pressure support easy and get things right.

After the release of kotlinx.coroutines 1.3.2, there’s a lot of hype created for new Kotlin Flow API. Today, in this article I would like to take the opportunity and give you an overview of Kotlin Flow API. I’m a beginner like you and write things which I learn in my spare time. I will make my best to share the experience with you.

Contents:

First thing first.

Why we need Kotlin Flow API

One thing we all programmer are worried about when we need to work with structured concurrency is that. Let’s say we subscribe to reactive Stream and hold the reference to a Subscription object that must be carefully managed otherwise we risk leaking it.

Now in kotlin flow API this mostly works without developers having to think about too much. Because the API does not have the concept of subscription at all and it is fully transparent for cancellation. It is important for us to understand that flow API built on top of suspension and light-weight coroutines that are hard to leak!

So, flow API is like Java8 Stream but the huge difference is that it, we don’t need to worry about leaking it, suitable for asynchronous operations, manages back-pressure, and suitable for finite or infinite streams.

Flows

To represent the flow stream that is being asynchronously computed, we can use a flow builder just like we use the sequence builder for synchronously.

import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.delay

fun myFlow() = flow {  // 1
    emit("Ahsen") // 2
    emit("Saeed")
    delay(100)  // 3
    emit("Done")
}

fun main() = runBlocking<Unit> {
    myFlow().collect {  // 4
        println(it)
    }
}

Here what happens at the above code.

  1. A builder function in order to create flow.
  2. Data is emitted from the flow using the emit method.
  3. We don’t need to mark the function with suspend modifier because the code inside the flow builder can be suspended.
  4. Values are collected from flow using the collect method.

One thing I want to point out in the above example–the code inside flow{} will not run until someone calls the collect method on it. Well, that’s because of flow is lazy similar to the sequence they only execute block when someone actually needs it.

So, the collect method is a terminal operator that start a collection of the flow in the given scope. The collect operator is the most basic one, but there are other terminal operators, which we see in the next section.

Intermediate & terminal operator

And now you read in the above section that the terminal operators have the power to start the emission of the flow builder. Actually, there are two types of operators available inside the flow which are intermediate and terminal. You can read more about terminal and intermediate operators here on this link.

Note: The operator is just like an extension on the Flow type.

There are so many operators added in the flow type like map, filter, zip, etc… If you want to see all operators check out the following link.

-> https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-flow/

Writing a custom flow operator

There are already so many operators added to Flow type but if you want to write your own operator it can be implemented like this.

import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking

fun <T> Flow<T>.uppercase(): Flow<String> {
    return flow {
        collect {
            val value = it.toString().toUpperCase()
            emit(value)
        }
    }
}

val flowC = flowOf("ahsen", "saeed").uppercase()

fun main() = runBlocking<Unit> {
   flowC.collect {
       println(it)
   }
}

Using the uppercase operator we can simply change our strings to Uppercase and all the new strings will be collected via a collect function call.

Different types of flow builders

The flow builder that we saw in the flows section is the most basic one. There are other builders for easy declaration of kotlin flows.

  • From A fixed set of values
val flowA = flowOf(1, 2, 3)
  • From function type () -> T (experimental)
val flowB : Flow<String> = {
    "Ahsen Saeed"
}.asFlow()
  • Various collections and sequences can be easily converted to Flow. As an example:
val flowC = (1..5).asFlow()

Concurrent flow using buffer

I said at the start of this article that flow operation is sequentially emit values. What does it mean?

Now let’s say if I have an emitter and a collector who simply emits and collects values respectively. By default, both emitter, and collector sharing the same scope so that the whole operation is running inside a single coroutine.

In order to speed up the process and decouple the emitter and collector–run emitter in a separate coroutine and collector in a separate, so that they can be concurrent. For this, we can happily use the buffer operator on flows that run our code concurrently and gives us the desired speed in execution.

I may show you the example but Roman Elizarov already wrote a complete article on concurrent execution with coroutines when working with flow. Go and check out this link without going further.

Another way to apply concurrency with flowOn operator

Another way to change the flow of emission is by using a flowOn operator. It simply creates another coroutine for upstream flow. Let’s see an example of what I’m talking about.

import kotlinx.coroutines.delay
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking
import kotlin.coroutines.coroutineContext

val flowA = flow {
   println("Emitting coroutine -> $coroutineContext")  
   emit("Ahsen")
   emit("Saeed")
   delay(100)
   emit("Done")
}.flowOn(Dispatchers.IO) // The above stream will run in io dispatcher

fun main() = runBlocking {
    flowA.collect {
       println("Collecting coroutine -> $coroutineContext and value $it")
    }
}

Notice how flow {} works in a coroutine#2 and emission happen in coroutine#1 that is running another thread concurrently with collecting coroutine.

Note: The buffer and flowOn operator are both different things. The buffer actually runs in different coroutine but never ask in which dispatcher you wanna run the code while flowOn specifically asks for it.

When to use Channels and Kotlin Flow API

Channels will be used for hot streams, under the hood in some Flow operators, communicating coroutines, and listening data from sockets. While Flow API will be primarily used to handle the async stream of data.

Third-party library integration

The recent release of room persistence 2.2.0 added the support for kotlin flow API. We can now return the Flow<T> from DAO methods.

I wrote an article on room persistence 2.2.0 library changes in case you wanna check out.


Conclusion

Now you know basically everything about handling streams in kotlin with Flow API. We can easily implement multiple operators to flow{}, create new operators, back-pressure supported via suspending functions and has a single suspend collect method.

And the end result is absolutely my favorite kind of feature because it is totally invisible: we don’t need to worry about cancellation it is transparent.

More Resources

I hope I educate you on some new Kotlin coroutine Flow API. If you have any questions or queries please ask them in the comments section.

Thank you for being here and HAPPY KOTLIN FLOW

2 Comments

    • ahsensaeed067 Reply

      The flow API comes with kotlin coroutines library. You just need to add coroutines library in the gradle.

Write A Comment