Coroutines: making scalable code in Kotlin

Standard

Hello, dear readers! Welcome to my blog. In this post, we will talk about Coroutines and how it helps our Kotlin applications to run with more scalability and robustness.

The Kotlin language

Created by Intellij, Kotlin is a robust language, that quickly stablished as the standard language for Android programming.

In this article we won’t learn about the language itself as the focus is learning about coroutines. If the reader wants to learn about the language itself, I recommend this good book.

Reactive programming

Another important concept to grasp is reactive programming. Reactive programming encompasses a paradigm where the parts of a program are uncoupled by using asynchronous technology, so slow operations such as IO are not blocking the execution. My article about ReactiveX talks in more detail about this concept.

Now that we have a better understand about what is Kotlin and reactive programming, let’s begin our tour in coroutines. Let’s first understand what are threads and how coroutines are not threads.

What are threads

Threads are programming units that run inside a process. A process, typically, is translated to the application itself, with memory and CPU allocated for her to consume.

Threads are like subprocesses, that a computing process allocates to do task loads. Threads have their own share of resources, like memory, for example (in Java, for example, each thread has his own object stack), so they are expensive and frequently re-used by thread pools.

An important concept to keep it in mind is that threads are not necessarily parallel: when a process allocates threads, they are been executed by CPU cores.

If we have just one core, for example, and create two threads, they won’t really run in parallel. What will happen is that the core will switch between threads doing a little of execution in each one, but a lot of times extremely fast, making it look like is really running at the same time. This becomes apparent when one of the threads has to run an IO-blocking operation: this makes the whole processing to freeze, including the work on the other thread.

We only achieve parallelism when running inside an application with multiple cores: in this case, each thread can be run inside one core, and indeed, in this case, the CPU cores are really running in parallel.

Normally we don’t need to make any low-level code to make threads run on different cores, but it is an important thing to know, because if for example, we create a lot more threads then cores are available, we could end up with an application that is slower then a single-threaded, sequential one.

That’s because all execution control to switch executions and overburden to create a massive amount of threads will end up demanding more from the hardware than simply running everything in one thread. So, it is important to remember that, like everything in life, threads are not a silver bullet for everything.

So now we know what threads are. What about coroutines?

What are coroutines

Coroutines are a light-weighted programming unit, available by Kotlin. What that means is that coroutines are not threads, but actually functions (like sets of instructions) that run inside thread pools.

One thread runs multiple coroutines, and a coroutine can start processing in one thread, be suspended (more later), and resumed in another thread.

This is achieved by Kotlin’s inner coroutine support, where dispatchers are responsible for controlling how the coroutine will be executed if it will running everything in just one thread, a thread pool, or unconfined.

So, as we can see, coroutines are kind like threads, but more light and without the burden of resource consumption threads have. This also means that we can allocate a lot more coroutines then we would typically do with threads since the dispatcher will control the number of threads used.

Coroutine suspension

We talked briefly about coroutine suspension. What did we mean by that?

When running our code, sometimes our task will need to wait for operations, such as I/O, to be performed before continuing. When a coroutine reaches a point where it has to wait for something to runs, it enters a suspension state. In this state, the code will wait for the operation to complete before continuing to execute the coroutine.

One very important detail to keep in mind is that this state is not blocking: once the coroutine stops to wait for the operation, her thread is freed to continue running another thread meanwhile the operation is done.

Well, but let’s stop with all theory and get to the point we are all waiting for: begin the coding!

Our first coroutine

Let’s begin with a simple example. All code from the lab can be found here or at the end of the article. I strongly recommend using Intellij IDEA as IDE as it will make it really simple to run the examples (just click in the application file and run it), importing as a Gradle project. The project runs with Kotlin 1.3 and JDK 8.

In order to use coroutines, we need to add a library to build.gradle, to add his support:

dependencies {
    implementation "org.jetbrains.kotlin:kotlin-stdlib-jdk8"

    compile "io.github.microutils:kotlin-logging:1.8.3"
    compile "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.5"
    compile "org.jetbrains.kotlinx:kotlinx-coroutines-rx2:1.3.5"
    compile "org.slf4j:slf4j-simple:1.7.29"
}

Let’s suppose we have a task that it takes some time to complete – we will simulate this by using a delay – and this task is necessary to run a second task. Let’s implement a simple code, with a coroutine:

package com.alexandreesl

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers.IO
import kotlinx.coroutines.async
import kotlinx.coroutines.delay
import mu.KotlinLogging

object Application {

    private val logger = KotlinLogging.logger {}

    @JvmStatic
    fun main(args: Array<String>) {


        val deferred = CoroutineScope(IO).async {
            logger.info { "running on ${Thread.currentThread().name}" }
            delay(2000)
            123
        }

        logger.info { "running on ${Thread.currentThread().name}" }

        val resp = deferred.getCompleted()

        logger.info { "the result is $resp" }


    }

}

We have a first glance on some concepts, such as coroutine scopes and dispatchers – the IO passed to the scope. We will talk more about scopes later, but lets, for now, understand what are dispatchers.

Dispatchers, as talked before, are responsible for starting and controlling coroutine executions. The most commonly used ones are Main (used in Android development, it makes the code run inside the UI thread, a requirement from the Android platform to certain operations), and IO.

The IO dispatcher uses a thread-poll with the same size as the machine’s cores, making them ideal for parallel tasks. There are other ones such as previously mentioned Unconfined, which keeps switching from thread to thread as new scopes with other dispatchers are created, but commonly Main and IO are the ones we will use most of the time.

The code inside the braces is the coroutine itself. The receiver method, in this case async, is called a coroutine builder, as it instantiates the coroutine and supplies it to the dispatcher. There are other builders, such as launch, designed for fire-and-forget operations.

Async is used when we want to run something that returns a result, as it supplies a Deferred. Think of a deferred to be like a promise or future: It allows us an interface for receiving a result from asynchronous processing.

When we run the code, this happens:


Exception in thread "main" java.lang.IllegalStateException: This job has not completed yet
	at kotlinx.coroutines.JobSupport.getCompletedInternal$kotlinx_coroutines_core(JobSupport.kt:1198)
	at kotlinx.coroutines.DeferredCoroutine.getCompleted(Builders.common.kt:98)
	at com.alexandreesl.Application.main(Application.kt:26)

Process finished with exit code 1

What happened?! The problem is, we ran our coroutine in another thread, but our main code is running inside the main thread, which keeps running and tried to get the result from the deferred before is completed – getCompleted() is also an experimental method marked for removal, so it is not recommended. Let’s refactor our code to a better way of running our coroutine:

package com.alexandreesl

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers.IO
import kotlinx.coroutines.async
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import mu.KotlinLogging

object Application {

    private val logger = KotlinLogging.logger {}

    @JvmStatic
    fun main(args: Array<String>) {


        val deferred = CoroutineScope(IO).async {
            logger.info { "running on ${Thread.currentThread().name}" }
            delay(2000)
            123
        }

        runBlocking {

            logger.info { "running on ${Thread.currentThread().name}" }

            val resp = deferred.await()

            logger.info { "the result is $resp" }

        }



    }

}

Notice we added a runBlocking code block. This special coroutine builder has the purpose of creating a coroutine which will block the current thread (making it possible to run simple sample codes and test cases such as these). It is important to keep in mind that in real production code we need to avoid using this as much as possible, since we are making blocking code this way.

We also changed to use await to get the result of the coroutine. This method is at the heart of coroutines usage: is by calling this that we make our coroutine to wait for the completion of the task before continuing. As talked before, this operation is not blocking.

When we run the code, we receive something like this in the console:

[DefaultDispatcher-worker-1] INFO com.alexandreesl.Application - running on DefaultDispatcher-worker-1
[main] INFO com.alexandreesl.Application - running on main
[main] INFO com.alexandreesl.Application - the result is 123

Process finished with exit code 0

As we can see, we have logs in different threads, showing our coroutine indeed ran on another thread.

Now, there is just one thing missing from the explanation: what is that CoroutineScope thing? Let’s talk about this now.

Coroutine scopes

When passing our coroutine builders, each one of them creates a coroutine. Let’s take this code snippet as an example:

runBlocking {

            val deferred = CoroutineScope(IO).async {
                logger.info { "I am the first Coroutine, I am running on ${Thread.currentThread().name}" }
                delay(2000)

                val job = CoroutineScope(IO).launch {
                    logger.info { "I am the second Coroutine, I am running on ${Thread.currentThread().name}" }
                    delay(2000)
                }

                job.join()
                123

            }

            logger.info { "running on ${Thread.currentThread().name}" }

            val resp = deferred.await()

            logger.info { "the result is $resp" }

        }

When running, we will get the following from the console:

[main] INFO com.alexandreesl.Application - running on main
[DefaultDispatcher-worker-1] INFO com.alexandreesl.Application - I am the first Coroutine, I am running on DefaultDispatcher-worker-1
[DefaultDispatcher-worker-3] INFO com.alexandreesl.Application - I am the second Coroutine, I am running on DefaultDispatcher-worker-3
[main] INFO com.alexandreesl.Application - the result is 123

As we can see it, each coroutine runs on his own thread, showing that indeed each builder created a coroutine.

Let’s suppose that we have some IO code that, due to some reason, we have scenarios where we will have to cancel the execution – for example, if a database session hangs and the code is forever hang in an non-terminal state – , can we do this with coroutines? Let’s find it out.

We change our code like this and run again:

package com.alexandreesl

import kotlinx.coroutines.*
import kotlinx.coroutines.Dispatchers.IO
import mu.KotlinLogging

object Application {

    private val logger = KotlinLogging.logger {}

    @JvmStatic
    fun main(args: Array<String>) {

        runBlocking {

            val deferred = CoroutineScope(IO).async {
                logger.info { "I am the first Coroutine, I am running on ${Thread.currentThread().name}" }
                delay(2000)

                val job = CoroutineScope(IO).launch {
                    logger.info { "I am the second Coroutine, I am running on ${Thread.currentThread().name}" }
                    delay(2000)
                }

                job.join()
                123

            }

            logger.info { "running on ${Thread.currentThread().name}" }

            deferred.cancelAndJoin()
            

        }

    }

}

After running, we see something like this on the terminal:

[main] INFO com.alexandreesl.Application - running on main
[DefaultDispatcher-worker-1] INFO com.alexandreesl.Application - I am the first Coroutine, I am running on DefaultDispatcher-worker-1

Only the first coroutine got the chance to start, and immediately after we got to the cancellation. But what would happen if we didn’t declared the two in the same scope?

We change the code and run again:

       runBlocking {

            val deferred = CoroutineScope(IO).async {
                logger.info { "I am the first Coroutine, I am running on ${Thread.currentThread().name}" }
                delay(2000)
                123
            }

            val job = CoroutineScope(IO).launch {
                logger.info { "I am the second Coroutine, I am running on ${Thread.currentThread().name}" }
                delay(2000)
            }

            logger.info { "running on ${Thread.currentThread().name}" }

            deferred.cancelAndJoin()
            
        }

This leads to:

[DefaultDispatcher-worker-1] INFO com.alexandreesl.Application - I am the first Coroutine, I am running on DefaultDispatcher-worker-1
[DefaultDispatcher-worker-3] INFO com.alexandreesl.Application - I am the second Coroutine, I am running on DefaultDispatcher-worker-3
[main] INFO com.alexandreesl.Application - running on main

Wait, the second one got to run? That’s because she ran outside the scope. In order to make the other coroutine to cancel as well, we would need to manually cancel her too.

That’s the main goal behind coroutine scopes: it allows us to group coroutines together, so we can make operations such as cancellations in a grouped manner, instead of making all the work one by one.

Suspending functions

Now let’s organize our code, moving our first example to another class. We create the following class:

package com.alexandreesl.examples

import kotlinx.coroutines.*
import mu.KotlinLogging

class BasicExample {

    private val logger = KotlinLogging.logger {}

    suspend fun runExample() {
        

            val deferred = CoroutineScope(Dispatchers.IO).async {
                logger.info { "I am the first Coroutine, I am running on ${Thread.currentThread().name}" }
                delay(2000)

                val job = CoroutineScope(Dispatchers.IO).launch {
                    logger.info { "I am the second Coroutine, I am running on ${Thread.currentThread().name}" }
                    delay(2000)
                }

                job.join()
                123

            }

            logger.info { "running on ${Thread.currentThread().name}" }

            val resp = deferred.await()

            logger.info { "the result is $resp" }

        
    }

}

We created a class that has a suspending function. Suspending functions are functions that, in her nature, their whole body is a coroutine. That’s why we can declare operations such as await without the need of a runBlocking code block, for example.

Now we change main to this:

package com.alexandreesl

import com.alexandreesl.examples.BasicExample
import kotlinx.coroutines.*
import kotlinx.coroutines.Dispatchers.IO
import mu.KotlinLogging

object Application {

    private val logger = KotlinLogging.logger {}

    @JvmStatic
    fun main(args: Array<String>) {

        runBlocking {

          val basicExample =  BasicExample()

            basicExample.runExample()

        }

    }

}

And after running again, we see everything is running as before. In a nutshell, suspending functions are a neat way of running most of our code inside coroutines.

Coroutine contexts

Another concept to grasp is coroutine contexts. Basically, a context is the composition of two things: the scope and the dispatcher.

Kotlin’s coroutines have another neat feature that is a builder called withContext. Using this feature, we can switch between contexts at will. Let’s see a example:

package com.alexandreesl.examples

import kotlinx.coroutines.delay
import kotlinx.coroutines.newSingleThreadContext
import kotlinx.coroutines.withContext
import mu.KotlinLogging

class ContextExample {

    private val logger = KotlinLogging.logger {}

    suspend fun runExample() {

        val context1 = newSingleThreadContext("MyOwnThread1")
        val context2 = newSingleThreadContext("MyOwnThread2")

        withContext(context1) {

            logger.info { "I am a coroutine, I am running on ${Thread.currentThread().name}" }

            withContext(context2) {

                logger.info { "I am also a coroutine, I am running on ${Thread.currentThread().name}" }
                delay(3000)

            }

            logger.info { "I am the previous coroutine, I am running on ${Thread.currentThread().name}" }

            delay(3000)

        }

    }

}

Here we created two single-thread contexts and switched between then inside two nested coroutines. If we run and look at the console, we will see something like this:

[MyOwnThread1] INFO com.alexandreesl.examples.ContextExample - I am a coroutine, I am running on MyOwnThread1
[MyOwnThread2] INFO com.alexandreesl.examples.ContextExample - I am also a coroutine, I am running on MyOwnThread2
[MyOwnThread1] INFO com.alexandreesl.examples.ContextExample - I am the previous coroutine, I am running on MyOwnThread1

Showing that we successfully ran our coroutines with different contexts.

Processing streams with coroutines

In all our previous examples, we just returned a single value. What if we need to emit multiple values, that will be processed as they are generated?

For this purpose, Kotlin coroutines have flows. With flows, we can make data streams to emit data at the rate it is produced. Let’s see a example.

Let’s suppose we have a sequence of 10 numbers, that have IO operations between then and only after the operations the numbers can be emitted. The following code simulates this:

package com.alexandreesl.examples

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import mu.KotlinLogging
import java.time.LocalDateTime

class FlowExample {

    private val logger = KotlinLogging.logger {}

    suspend fun runExample() {

        flow {
            for (number in 1..10) {
                logger.info { "I am processing the number $number at ${LocalDateTime.now()}" }
                delay(2000)
                emit(number)
            }
        }.collect { item ->
            logger.info { "Receiving number $item at ${LocalDateTime.now()}" }
        }

    }

}

When running, we will get logs like this:

[main] INFO com.alexandreesl.examples.FlowExample - I am processing the number 1 at 2020-09-24T19:55:35.579
[main] INFO com.alexandreesl.examples.FlowExample - Receiving number 1 at 2020-09-24T19:55:37.592
[main] INFO com.alexandreesl.examples.FlowExample - I am processing the number 2 at 2020-09-24T19:55:37.592
[main] INFO com.alexandreesl.examples.FlowExample - Receiving number 2 at 2020-09-24T19:55:39.597
[main] INFO com.alexandreesl.examples.FlowExample - I am processing the number 3 at 2020-09-24T19:55:39.597
[main] INFO com.alexandreesl.examples.FlowExample - Receiving number 3 at 2020-09-24T19:55:41.600
[main] INFO com.alexandreesl.examples.FlowExample - I am processing the number 4 at 2020-09-24T19:55:41.600
[main] INFO com.alexandreesl.examples.FlowExample - Receiving number 4 at 2020-09-24T19:55:43.605
[main] INFO com.alexandreesl.examples.FlowExample - I am processing the number 5 at 2020-09-24T19:55:43.606
[main] INFO com.alexandreesl.examples.FlowExample - Receiving number 5 at 2020-09-24T19:55:45.608
[main] INFO com.alexandreesl.examples.FlowExample - I am processing the number 6 at 2020-09-24T19:55:45.608
[main] INFO com.alexandreesl.examples.FlowExample - Receiving number 6 at 2020-09-24T19:55:47.611
[main] INFO com.alexandreesl.examples.FlowExample - I am processing the number 7 at 2020-09-24T19:55:47.611
[main] INFO com.alexandreesl.examples.FlowExample - Receiving number 7 at 2020-09-24T19:55:49.616
[main] INFO com.alexandreesl.examples.FlowExample - I am processing the number 8 at 2020-09-24T19:55:49.616
[main] INFO com.alexandreesl.examples.FlowExample - Receiving number 8 at 2020-09-24T19:55:51.621
[main] INFO com.alexandreesl.examples.FlowExample - I am processing the number 9 at 2020-09-24T19:55:51.622
[main] INFO com.alexandreesl.examples.FlowExample - Receiving number 9 at 2020-09-24T19:55:53.627
[main] INFO com.alexandreesl.examples.FlowExample - I am processing the number 10 at 2020-09-24T19:55:53.627
[main] INFO com.alexandreesl.examples.FlowExample - Receiving number 10 at 2020-09-24T19:55:55.628

Showing how the data is been processed as it is generated. Flows are like other collections we have in Kotlin: We can use operations such as map, reduce, filter etc. Let’s improve our example to demonstrate this.

We change our code to filter only even numbers, log each filtered number and sum it all numbers from the flow:

package com.alexandreesl.examples

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.reduce
import mu.KotlinLogging
import java.time.LocalDateTime

class FlowExample {

    private val logger = KotlinLogging.logger {}

    suspend fun runExample() {

        val result = flow {
            for (number in 1..10) {
                logger.info { "I am processing the number $number at ${LocalDateTime.now()}" }
                delay(2000)
                emit(number)
            }
        }.filter { it % 2 == 0 }
            .map {
                logger.info { "processing number $it" }
                it
            }
            .reduce { accumulator, value ->
                accumulator + value
            }
        logger.info { "result: $result" }

    }

}

Notice that we didn’t need to add a await for the operation: since reduce is a terminal operation, Kotlin implicitly suspends the execution for us to receive the result.

We can also transform simple collections to execute as flows, using the asFlow() function, like this:

(1..5).asFlow().collect {
            logger.info { "item: $it" }
        }

This will produce:

[main] INFO com.alexandreesl.examples.FlowExample - item: 1
[main] INFO com.alexandreesl.examples.FlowExample - item: 2
[main] INFO com.alexandreesl.examples.FlowExample - item: 3
[main] INFO com.alexandreesl.examples.FlowExample - item: 4
[main] INFO com.alexandreesl.examples.FlowExample - item: 5

One last thing for us to learn before wrapping it up on Flows is the flowOn() function. With this function, we can define the coroutine context in which the flow will run. For example, if we want our flow to run in a specific thread, we can do this:

       val context1 = newSingleThreadContext("MyOwnThread1")

        val result = flow {
            for (number in 1..10) {
                logger.info { "I am processing the number $number at ${LocalDateTime.now()}" }
                delay(2000)
                emit(number)
            }
        }
            .flowOn(context1)
            .filter { it % 2 == 0 }
            .map {
                logger.info { "processing number $it" }
                it
            }
            .reduce { accumulator, value ->
                accumulator + value
            }
        logger.info { "result: $result" }

This produces the following output:

[MyOwnThread1] INFO com.alexandreesl.examples.FlowExample - I am processing the number 1 at 2020-09-24T20:22:36.840
[MyOwnThread1] INFO com.alexandreesl.examples.FlowExample - I am processing the number 2 at 2020-09-24T20:22:38.854
[MyOwnThread1] INFO com.alexandreesl.examples.FlowExample - I am processing the number 3 at 2020-09-24T20:22:40.855
[main] INFO com.alexandreesl.examples.FlowExample - processing number 2
[MyOwnThread1] INFO com.alexandreesl.examples.FlowExample - I am processing the number 4 at 2020-09-24T20:22:42.860
[main] INFO com.alexandreesl.examples.FlowExample - processing number 4
[MyOwnThread1] INFO com.alexandreesl.examples.FlowExample - I am processing the number 5 at 2020-09-24T20:22:44.866
[MyOwnThread1] INFO com.alexandreesl.examples.FlowExample - I am processing the number 6 at 2020-09-24T20:22:46.871
[main] INFO com.alexandreesl.examples.FlowExample - processing number 6
[MyOwnThread1] INFO com.alexandreesl.examples.FlowExample - I am processing the number 7 at 2020-09-24T20:22:48.877
[MyOwnThread1] INFO com.alexandreesl.examples.FlowExample - I am processing the number 8 at 2020-09-24T20:22:50.882
[main] INFO com.alexandreesl.examples.FlowExample - processing number 8
[MyOwnThread1] INFO com.alexandreesl.examples.FlowExample - I am processing the number 9 at 2020-09-24T20:22:52.887
[MyOwnThread1] INFO com.alexandreesl.examples.FlowExample - I am processing the number 10 at 2020-09-24T20:22:54.893
[main] INFO com.alexandreesl.examples.FlowExample - processing number 10
[main] INFO com.alexandreesl.examples.FlowExample - result: 30

Proving our configuration was a success – notice that some logs were on main thread, since only the flow code is running in other context.

Communication between coroutines

Sometimes, there are scenarios where we need to make two coroutines to work with each other, like when we have to send data to a API only after some heavy IO processing is done with that same data. Each one of this operations must be done in different coroutines, to allow parallel execution.

One way of doing this is by chaining the coroutines together, and another one is using channels.

Channels, as the name implies, allows coroutines to share data between them. Let’s see an example.

Suppose we have two coroutines, where one of then will delay by some seconds, to represent the IO processing, and after that will send the data to a channel to be processed by another coroutine:

package com.alexandreesl.examples

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ClosedReceiveChannelException
import mu.KotlinLogging

class ChannelExample {

    private val logger = KotlinLogging.logger {}

    suspend fun runExample() {

        val channel = Channel<Int>()

        CoroutineScope(Dispatchers.IO).launch {
            for (item in 1..10) {
                // delay to represent IO
                delay(2000)
                logger.info { "Sending: $item" }
                channel.send(item)
            }
            // closing the channel, we are done
            channel.close()
        }

        val deferred = CoroutineScope(Dispatchers.IO).async {
            try {
                while (true) {
                    val received = channel.receive()
                    logger.info { "Received: $received" }
                }
            } catch (e: ClosedReceiveChannelException) {
                // this happens when the channel is closed
            }
            //signalling the process has ended
            true
        }

        deferred.await()

    }


}

This will produce the following:

[DefaultDispatcher-worker-3] INFO com.alexandreesl.examples.ChannelExample - Sending: 1
[DefaultDispatcher-worker-3] INFO com.alexandreesl.examples.ChannelExample - Received: 1
[DefaultDispatcher-worker-3] INFO com.alexandreesl.examples.ChannelExample - Sending: 2
[DefaultDispatcher-worker-3] INFO com.alexandreesl.examples.ChannelExample - Received: 2
[DefaultDispatcher-worker-3] INFO com.alexandreesl.examples.ChannelExample - Sending: 3
[DefaultDispatcher-worker-3] INFO com.alexandreesl.examples.ChannelExample - Received: 3
[DefaultDispatcher-worker-3] INFO com.alexandreesl.examples.ChannelExample - Sending: 4
[DefaultDispatcher-worker-3] INFO com.alexandreesl.examples.ChannelExample - Received: 4
[DefaultDispatcher-worker-3] INFO com.alexandreesl.examples.ChannelExample - Sending: 5
[DefaultDispatcher-worker-3] INFO com.alexandreesl.examples.ChannelExample - Received: 5
[DefaultDispatcher-worker-3] INFO com.alexandreesl.examples.ChannelExample - Sending: 6
[DefaultDispatcher-worker-3] INFO com.alexandreesl.examples.ChannelExample - Received: 6
[DefaultDispatcher-worker-3] INFO com.alexandreesl.examples.ChannelExample - Sending: 7
[DefaultDispatcher-worker-3] INFO com.alexandreesl.examples.ChannelExample - Received: 7
[DefaultDispatcher-worker-3] INFO com.alexandreesl.examples.ChannelExample - Sending: 8
[DefaultDispatcher-worker-2] INFO com.alexandreesl.examples.ChannelExample - Received: 8
[DefaultDispatcher-worker-2] INFO com.alexandreesl.examples.ChannelExample - Sending: 9
[DefaultDispatcher-worker-2] INFO com.alexandreesl.examples.ChannelExample - Received: 9
[DefaultDispatcher-worker-2] INFO com.alexandreesl.examples.ChannelExample - Sending: 10
[DefaultDispatcher-worker-2] INFO com.alexandreesl.examples.ChannelExample - Received: 10

Notice that sometimes both receiving and sending are done in the same thread. This is a good example to show how Kotlin keeps re-using the same threads to run different coroutines, in order to optimize the execution

As we can see, the channel ran successfully, sending data from one coroutine to another.

Conclusion

And so we concluded our tour about coroutines. With a simple interface, but powerful features, coroutines are a excellent tool for developing robust solutions in Kotlin. All code from our lab can be found here.

Thank you for following me on my blog, until next time.

ReactiveX: implementing reactive solutions in the JVM

Standard

Hello, dear readers! Welcome to my blog. In this post, we will talk about ReactiveX, a Java framework that allows us to easily develop and deploy reactive solutions. But what is the reactive paradigm and why should we want to use it? Let’s find out!

What is the reactive paradigm?

According to wikipedia, reactive programming is

In computingreactive programming is a declarative programming paradigm concerned with data streams and the propagation of change. With this paradigm it is possible to express static (e.g., arrays) or dynamic (e.g., event emitters) data streams with ease, and also communicate that an inferred dependency within the associated execution model exists, which facilitates the automatic propagation of the changed data flow.

What this means is that the reactive paradigm is about handling data as streams, which flows, typically in an asynchronous manner, processing data as events, on the frequency they are produced (or consumed, depending on if we are using back-pressure or not). Is a different way of development thinking, outside of the traditional request-response. But why use reactive programming?

Why use reactive?

The main advantages on using reactive are non-blocking IO and parallelism. By providing a plug-and-play set of operators, designed to be easily transmuted to asynchronous running, we can develop solutions that scale well against massive chunks of data to process.

The non-blocking IO refers to the inners of reactive programming frameworks, that provide buffers and other types of techniques to make the code to operate independently. Let’s use an example to better explain.

Let’s imagine a classic API. Requests are made by clients to a server, which have a HTTP thread pool to process the requests. In traditional computing, all processing, from the controller up to the DB IO operations, are made on the HTTP thread.

Now imagine if we have a ton of requests arriving at near-light speed. Soon, the HTTP thread pool will be exhausted and the server will start rejecting requests.

Now, let’s see the same use case developed using the reactive paradigm. With reactive, we could break the solution in more components, for example by putting another thread pool to process the DB IO operations, with a buffer in the middle to make a decoupling of the slow IO code and the more need-to-be-fast HTTP code. The following picture illustrates the solution:

Keep it in mind that we are talking about the solution at a very low-level detail: typically, we won’t need to make this arrangements directly, but using all facilities from an reactive framework, such as ReactiveX.

With this approach, an HTTP thread only needs to receive the request and deliver to the buffer, immediately been freed to accept the next request. Normally there’s two possible solutions to produce the response for the client:

  1. By using fire-and-forget, that is, the HTTP thread just returns a ok message for the client and leave it to the DB thread;
  2. The HTTP thread sends to the buffer, alongside the data itself, an observer, which wraps the client’s connection. This way, the DB thread is responsible for emitting a response for the client, after making her processing.

Another powerful reactive technique is back-pressure. In back-pressure, we make the client-side of the stream responsible for dictating the frequency of the processing: the client requests for the server – or an middle-man buffer, depending on the framework’s implementation – for the next data to be processed.

This way we can, for example, protect the DB from been crushed under a massive load of writes coming his way. I wrote a good article about Akka streams, another good reactive framework made in Scala, which already explains in more detail about back-pressure. You can find the article here.

ReactiveX

So, now that we learned about reactive programming, let’s start learning about ReactiveX. Made in Java, with libraries for better Kotlin support, is one of the most popular reactive frameworks on the Java ecosystem.

For this lab, we will use Kotlin. Created by JetBrains, Kotlin is a good programming language, becoming very popular on Java ecosystems across the globe. With a much better support to functional programming than Java’s, is an excellent choice of language to learn today. All code will run on Java 11 and Kotlin 1.3.72.

To keep focused, we will stick to standalone examples, since making an API, DB migrations etc will drive our focus away from learning about the framework. If you want to read a more complex example, I suggest reading my Akka article after reading this.

All code from the lab can be found here. So, without further delay, let’s get started!

Our first stream

We will use Gradle as our dependency manager. First, let’s add ReactiveX dependencies on build.gradle file:

...
dependencies {
    runtime group: 'io.reactivex.rxjava2', name: 'rxkotlin', version: '2.4.0'
    compile group: 'io.reactivex.rxjava2', name: 'rxjava', version: '2.2.19'
}
...

We added RxJava2 core library, as well as Kotlin Rx library to add more Kotlin support.

Now, let’s create a Main function. Just like in Java, this function is a entry point for running the application:

fun main(args: Array<String>) {
}

Now, let’s make a very simple stream. We will just take a group of words, map the length of the words and print on the console:

import io.reactivex.Observable

fun main(args: Array<String>) {
    Observable.just("I",
        "Love",
        "Ana",
        "Carolina")
        .map(String::length)
        .subscribe(
            {println("length $it")},
            { println("error $it")},
            { println("stream ended!")}
        )
}

When runnig the following script, the following is print on the console:

length 1
length 4
length 3
length 8
stream ended!

We covered some ground with this simples example, let’s talk about the code:

  • First, we created an Observable. In ReactiveX, streams are divided in two types: Observables and Flowables. The main difference between the two is that Flowables allows us to use back-pressure. We will learn about Flowables further on the lab;
  • Second, we used just to initialise the Observable. This method is useful for simple examples where we know exactly the items we want to process;
  • Then, we used map. Map is a traditional stream operation, where we read each item and produce another item as output. In our case, we read each item and produce the word length;
  • Finally, we subscribe in the stream. The subscribe method is where we supply the code that will make the bottom of the stream, the last code to be processed. In this case, we print all word lengths, or a message error in case anything goes wrong (probably nothing will happen in this case, but you got the idea 🙂 ). We also provide a message for when the stream ends.

One last interesting concept to grasp before we move on is the stream event cycle. After the stream is started, we got the following possible sequence of actions:

  • The events are sent, one by one, downstream to the bottom. The code we supplied on first lambda of subscribe is the one that will be fired on this case;
  • In case anything goes wrong with the stream, an error event is sent to the bottom. This is what runs on our second lambda. When this happens, the stream stops;
  • Finally, when all events are processed, the completed event is fired, and processed by our third lambda. Keep it in mind that there’s cases where the stream can be infinite, such as streaming from an topic for example, so this event can be never fired up.

So that’s concludes our first stream. Yes, it is simple as that! Now, let’s learn some more things we can do with our streams.

Stream operations

Now, let’s make a tour on some of the basic operations we can do with streams. Of course, this won’t be a exhaustive list of every operation, since this would make our lab a dull boring showcase of tons of methods, but just a quick glance to allow us to first dive in the framework.

First, let’s move our first example to another script and change the Main script to run as an import:

package com.alexandreesl.examples

import io.reactivex.Observable

fun firstExample() {
    Observable.just("I",
        "Love",
        "Ana",
        "Carolina")
        .map(String::length)
        .subscribe(
            {println("length $it")},
            { println("error $it")},
            { println("stream ended!")}
        )
}
import com.alexandreesl.examples.firstExample

fun main(args: Array<String>) {
    firstExample()
}

From now on, we will keep adding examples to functions and run everything from Main.

Now, let’s make another quick example. We will take a single word, append a prefix and print to the console:

package com.alexandreesl.examples

import io.reactivex.Single

fun prefixExample() {
    Single.just("Alexandre")
        .map { "I am $it" }
        .subscribe(
            {println("$it")},
            { println("error $it")}
        )


}

After running, we get this output:

I am Alexandre

This example introduces us to Single. Single is a specialised observable, which primary concern is to provide a simplified version of the original Observable, focused on scenarios where we will emit just a single item.

For this reason, on subscribe we only have success and error event handlers that can be defined, since a onComplete event won’t make much sense since there’s just one item to process.

Now let’s make another example. We will get a random generated array of numbers, filter just the even ones and print:

package com.alexandreesl.examples

import io.reactivex.Observable
import java.util.*

fun evenNumbersExample() {

    Observable.fromIterable((1..50).map { Random().nextInt() })
        .filter { it % 2 == 0 }
        .subscribe(
            {println("$it")},
            { println("error $it")}
        )

}

This will produce something like the following (of course, every run will produce a different result):

-925639320
-1367241566
-1993909882
786568802
1215269342
-480862456
-1166646146
1171590568
682849868
-1321837860
-1021426524
-196824578
858851372
-213227382
966926184
-1095918914
-950267778
-2128264986
-1770877918
227807366
1692840490
1622475038
-308642896
-1907658182

On this example, we see another basic operator, filter. As the name implies, this operator will filter items emitted from the stream, based on a predicate.

Moving on, let’s make another example to wrap it up our quick tour. Let’s take a list of numbers (with duplicates) and use distinct and reduce, in order to sum all unique numbers on the array:

package com.alexandreesl.examples

import io.reactivex.Observable

fun distinctSumExample() {

    val numbers = listOf(1,2,3,4,5,5,6,12,14,18,22,25,25)

    Observable.fromIterable(numbers)
        .distinct()
        .reduce(0) { total,next -> total + next }
        .subscribe(
            {println("$it")},
            {println("error $it")}
        )
    
}

As expected, we will receive the number 112 as the result.

And this concludes our quick tour in ReactiveX operations. As we can see, is very powerful and easy to use. I suggest exploring the documentation, to discover much more from where this come from!

Handling events

As we saw before, there is three types of events on ReactiveX event cycle:

  • Next;
  • Error;
  • Complete.

Let’s say we want to add code snippets to be run on different stages of the stream, for example, to print the item before filtering. We can do this using doOnNext:

package com.alexandreesl.examples

import io.reactivex.Observable
import java.util.*

fun evenNumbersWithPrintExample() {

    Observable.fromIterable((1..50).map { Random().nextInt() })
        .doOnNext { println("item: $it") }
        .filter { it % 2 == 0 }
        .subscribe(
            {println("$it")},
            { println("error $it")}
        )

}

Running the code, we receive the following output (truncated for brevity):

...
item: -1075891450
-1075891450
item: 994876337
item: 297766092
297766092
item: -858967930
-858967930
item: 1633354503
item: -1792369156
-1792369156
item: 1255904419
item: 1325177149
item: 1773095686
1773095686
...

As we have doOnNext, we also have doOnError and doOnComplete for the same kind of effect.

There is two additional methods that are interesting to learn before we move on to the next subject. Let’s revisit our reduce example. Let’s suppose that something can go wrong and we want to return 0 as default in case of an error. We can do this as follows (we also added code to “force” a error):

package com.alexandreesl.examples

import io.reactivex.Observable
import java.lang.RuntimeException

fun distinctSumWithDefaultExample() {

    val numbers = listOf(1,2,3,4,5,5,6,12,14,18,22,25,25)

    var counter = 0

    Observable.fromIterable(numbers)
        .distinct()
        .reduce(0) { total,next ->
            counter++
            if (counter == 5)  throw RuntimeException("BOOOM!!!!")
            total + next }
        .onErrorReturn { 0 }
        .subscribe(
            {println("$it")},
            {println("error $it")}
        )

}

If we run this, we will receive 0, just as expected.

Another interesting method is onErrorResumeNext. This method allow us to return another Observable to be executed, in case the main Observable fails. Let’s see a simple example, where we divide 10 by a range of numbers and one of them is 0:

package com.alexandreesl.examples

import io.reactivex.Observable

fun divisionWithResumeNextExample() {

    Observable.just(1,2,3,4,0,8,2)
        .map {item -> 10 / item}
        .onErrorResumeNext(Observable.just(10).repeat(3))
        .subscribe(
            {println("item  $it")},
            {println("error $it")}
        )
}

This will produce the following. Notice that the stream stops immediately to process the original stream items and consumes the fallback stream, skipping the last items:

item  10
item  5
item  3
item  2
item  10
item  10
item  10

And that concludes our tour on event handling. Next, let’s learn about ReactiveX Flowables.

Flowables

When working with ReactiveX, there’s two major ways of using the framework: Observables and Flowables. The major difference between Observable and Flowable is that Flowable uses back-pressure, as stated before. Let’s see a example to better understand what that means.

Let’s imagine a stream that emits events from a very big range of numbers, that takes a little time to be processed. To see the problem, we will use a Scheduler (more on Schedulers later) to disconnect the emitter thread from the receiver one:

    Observable.fromIterable((1..5000000))
        .map { println("emitting $it")
            it }
        .observeOn(Schedulers.computation())
        .subscribe({
            Thread.sleep(5L)
            println(" receiving $it")
        })
    Thread.sleep(60000L)

When running, we would see something like this (of course, is only a fragment):

...
emitting 1264
emitting 1265
emitting 1266
emitting 1267
emitting 1268
emitting 1269
emitting 1270
emitting 1271
emitting 1272
emitting 1273
emitting 1274
emitting 1275
emitting 1276
 receiving 3
emitting 1277
emitting 1278
emitting 1279
emitting 1280
emitting 1281
emitting 1282
emitting 1283
emitting 1284
emitting 1285
...

As we can see, the emitter was a lot faster than the receiver. In this case, the scheduler is using by default a unbounded buffer so no data would be lost, but imagine that the risks on a production environment of incurring on memory overflows is very high.

So, how do we fix this? By changing to a Flowable, of course:

 Flowable.fromIterable((1..5000000))
        .map { println("emitting $it")
            it }
        .observeOn(Schedulers.computation())
        .subscribe({
            Thread.sleep(5L)
            println(" receiving $it")
        })
    Thread.sleep(60000L)

Yep, is simple as that to use a Flowable. Keep it in mind, all operators available to Observable are also available to Flowable, so all examples from before are valid for Flowables as well. If we run, we would see fragments like this:

... 
emitting 124
emitting 125
emitting 126
emitting 127
emitting 128
 receiving 1
 receiving 2
 receiving 3
 receiving 4
 receiving 5
 receiving 6
 receiving 7
 receiving 8
 receiving 9
 receiving 10
 receiving 11
 receiving 12
 receiving 13
 receiving 14
 receiving 15
 receiving 16
 receiving 17
 receiving 18
 receiving 19
 receiving 20
 receiving 21
 receiving 22
 receiving 23
...

We won’t enter deeply on the internals of Flowable implementation, but keeping it simple, the key here is that Flowable buffers data to the subscriber (the stream’s bottom). So after it hits the 128 row, it delivered the first chunk to be processed.

The main detail here is to note that we stopped receiving emitting messages. This is because the emitter stopped: is waiting for the subscriber to process the chunk, and only after that, it will start sending data again.

But what is that Scheduler we saw? Let’s find out!

Schedulers

Before start learning about schedulers, I want to ask you a question: have you noticed that, when we added a scheduler previously, we added a Thread.sleep after the Flowable? Did you guess why we did that?

The answer to this will probably be a shock: By default, ReactiveX is not multithread!

It is true: by default, all streams (Observables or Flowables) all run inside the same thread it created them, on our examples, the main thread. It is true that some operators (like timeout for example, where we define a time limit for the upstream to emit an event and throws a error if no event is emitted) are not single-thread, but the reason for that is that actually, from under the hood, they are defining a scheduler to run the operator.

So, what that Schedulers.computation() do? Basically, ReactiveX is getting the stream we declared and delivering his execution for a thread pool to be executed. What computation means? Is the type of scheduler we want to use. ReactiveX comes with several scheduler flavours, the main ones been:

  • computation: Computation is designed for streams that do CPU-intensive processing. When an application using ReactiveX is started, the framework creates a thread pool with the size equivalent to the number of cores on the CPU of the machine where the application is running;
  • IO: IO is designed for streams that do simple IO operations, that won’t take too much time to run. IO’s thread pool is unbounded: as the demand grows, ReactiveX will instantiate more threads, if the current pool is exhausted. This is a point of attention: if you are unsure of the load the application will have, using this option can be dangerous, as it can add more threads then the application can handle and result in an breakdown in more severe cases;
  • newThread: This works just like the name says: no thread pool, just keep creating new threads and deliver the new executions for them. Of course this is an even more dangerous one, since can even more easily result on the problem described above. Usable only on situations that the processing is indeed extremely fast;
  • single: This is like the opposite of newThread: it just creates one thread and keeps using it for all processing. This can be useful on simple streams, where you just need a little boost on performance. Remember that create threads is a costly operation, so indeed, in simple streams, it can be a better solution then IO, for example.

In my personal view, I think computation is the best all-around option, as it shields the risks of “over-threading” the application and creates the most optimal number for a thread pool, the same number of cores (where there is more threads then cores, the CPU divides them between the cores, so the multitask improvement is shrunk)

So, when we added observeOn, does the whole stream ran on a computation thread? Well, to tell you the true, I simplified my first answer a little: ReactiveX uses two methods for defining schedulers: subscribeOn and observeOn. So actually, in that case, just the receiver part of the stream ran on a different thread.

So, what is the difference between what we did previously, and this:

    Flowable.fromIterable((1..5000000))
        .map { println("emitting $it")
            it }
        .subscribeOn(Schedulers.computation())
        .subscribe({
            Thread.sleep(5L)
            println(" receiving $it")
        })
    Thread.sleep(60000L)

Well, actually, when using this, it indeed does what I said before: The whole stream will now run inside a computation thread. When using subscribeOn, we tell the whole stream to use a given scheduler. If we use both subscribeOn and observeOn on an stream, ReactiveX will use the scheduler defined on subscribeOn to upstream until the first observeOn is reached, and observeOn will be used for downstream, until the stream’s end is reached or another observeOn

Here is a example where we can see this thread manipulation running in practice:

fun schedulersExample() {

    Flowable.fromIterable((1..10))
        .map { println("emitting $it on ${Thread.currentThread().getName()}")
            it }
        .observeOn(Schedulers.io())
        .map { println("first receiving $it on ${Thread.currentThread().getName()}")
            it }
        .observeOn(Schedulers.single())
        .map { println("secondly receiving $it on ${Thread.currentThread().getName()}")
            it }
        .subscribeOn(Schedulers.computation())
        .subscribe {
            println("finally receiving $it on ${Thread.currentThread().getName()}")
        }
    Thread.sleep(300L)

}

After running the code, we will see an output like this:

emitting 1 on RxComputationThreadPool-1
emitting 2 on RxComputationThreadPool-1
emitting 3 on RxComputationThreadPool-1
emitting 4 on RxComputationThreadPool-1
first receiving 1 on RxCachedThreadScheduler-1
emitting 5 on RxComputationThreadPool-1
first receiving 2 on RxCachedThreadScheduler-1
emitting 6 on RxComputationThreadPool-1
secondly receiving 1 on RxSingleScheduler-1
first receiving 3 on RxCachedThreadScheduler-1
finally receiving 1 on RxSingleScheduler-1
emitting 7 on RxComputationThreadPool-1
secondly receiving 2 on RxSingleScheduler-1
finally receiving 2 on RxSingleScheduler-1
first receiving 4 on RxCachedThreadScheduler-1
secondly receiving 3 on RxSingleScheduler-1
finally receiving 3 on RxSingleScheduler-1
emitting 8 on RxComputationThreadPool-1
emitting 9 on RxComputationThreadPool-1
secondly receiving 4 on RxSingleScheduler-1
first receiving 5 on RxCachedThreadScheduler-1
finally receiving 4 on RxSingleScheduler-1
emitting 10 on RxComputationThreadPool-1
secondly receiving 5 on RxSingleScheduler-1
finally receiving 5 on RxSingleScheduler-1
first receiving 6 on RxCachedThreadScheduler-1
first receiving 7 on RxCachedThreadScheduler-1
secondly receiving 6 on RxSingleScheduler-1
first receiving 8 on RxCachedThreadScheduler-1
finally receiving 6 on RxSingleScheduler-1
first receiving 9 on RxCachedThreadScheduler-1
secondly receiving 7 on RxSingleScheduler-1
first receiving 10 on RxCachedThreadScheduler-1
finally receiving 7 on RxSingleScheduler-1
secondly receiving 8 on RxSingleScheduler-1
finally receiving 8 on RxSingleScheduler-1
secondly receiving 9 on RxSingleScheduler-1
finally receiving 9 on RxSingleScheduler-1
secondly receiving 10 on RxSingleScheduler-1
finally receiving 10 on RxSingleScheduler-1

As we can see, it started using an computation thread, as we defined on subscribeOn and later switched as reached different observeOns (RxCachedThreadScheduler is how ReactiveX names the threads from the IO scheduler).

And so we conclude our quick tour on Schedulers. Pretty cool, huh?

Conclusion

And so we conclude our first glimpse on ReactiveX. Of course, one article is too small to possibly feature everything that there is to learn about (I recommend to search about more topics such as Disposables, Scheduler strategies and specialised test classes such as TestObserver and TestSubscriber), but I hope to have at least teached the basics for a good overall understanding of the framework.

ReactiveX is a excellent framework, that it should be used. Happy coding!

References

Good ReactiveX book that I recommend

ReactiveX documentation

ReactiveX marbles (good site with diagrams that helps understand how the operators work)

Lab code

Java 9: Learning the new features – part 3

Standard

Hi, dear readers! Welcome to my blog. Continuing our series at the new features of Java 9, we will now talk about reactive streams, a new concept on parallel processing that promises to protect our applications from overflows of messages on processing. So, without further delay, let’s begin!

What is Reactive Streams

Let’s imagine a e-commerce that has to send some orders for a distribution center. The e-commerce and DC systems are apart from each other, been able to communicate by a REST service.

Normally, we could simply create a call from the e-commerce system to the DC. So, we implement the call and everything is fine.

However, some day we get a problem on our integration. We notice the e-commerce has overflowed the DC with lots of calls from a Black Friday’s sales, so the REST endpoint starts to fail and lose data.

This scenario illustrates a common integration problem: When a consumer has processing limitations to consume messages above a certain volume, we need to ensure the integration doesn’t end up overflowing the pipeline.

To tackle this problem, it was designed a pattern called Reactive Streams. With Reactive streams, the flow of processing is controlled by the Consumer, not the Publisher, that calls for more data to process as soon as it is ready, keeping his own pace. Not only that, we also have a feature called back pressure, that consists of a kind of throttling to ensure that the Publisher of the data will wait for the Consumer to be available before sending anymore messages and overflow the Consumer, just like we saw on our previous example.

The diagram bellow show us the new Flow API on Java 9, that allows us to implements Reactive Streams. We can see our consumer (the subscriber)  establishing a subscription with the producer and requesting n messages to consume, which are delivered to processing by the onNext method. Don’t worry about the rest of the details: we will see more on the next sections.

The reference site for this diagram, with another good tutorial on Reactive Streams, can be found on the references section at the end of the post.

Creating a Stream: the Publisher

First, let’s create our publisher. the simplest way to create a publisher is by using the SubmissionPublisher class.

Our lab will simulate the orders integration we talked about earlier. We will begin by creating a DTO to hold the data from our orders:

package com.alexandreesl.handson.model;

import java.math.BigDecimal;
import java.util.Date;
import java.util.List;

public class Order {

    private Long id;

    private List<String> products;

    private BigDecimal total;

    private Date orderDate;

    public Long getId() {
        return id;
    }

    public void setId(Long id) {
        this.id = id;
    }

    public List<String> getProducts() {
        return products;
    }

    public void setProducts(List<String> products) {
        this.products = products;
    }

    public BigDecimal getTotal() {
        return total;
    }

    public void setTotal(BigDecimal total) {
        this.total = total;
    }

    public Date getOrderDate() {
        return orderDate;
    }

    public void setOrderDate(Date orderDate) {
        this.orderDate = orderDate;
    }
}

Next, let’s instantiate our publisher, passing the message object DTO as generic:

public static void main(String[] args) {

    SubmissionPublisher<Order> submissionPublisher = new SubmissionPublisher<>();


}

That’s all we need to do for now with our publisher.

Creating a Stream: the Consumer

Now, let’s create our consumer, or subscriber in other words. For this, we will create a class called CDOrderConsumer that implements the Subscriber<T> interface:

package com.alexandreesl.handson.consumer;

import com.alexandreesl.handson.model.Order;

import static java.util.concurrent.Flow.Subscriber;
import static java.util.concurrent.Flow.Subscription;


public class CDOrderConsumer implements Subscriber<Order> {

    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {

        this.subscription = subscription;
        subscription.request(1);

    }

    @Override
    public void onNext(Order item) {
        System.out.println("I am sending the Order to the CD!");
        subscription.request(1);

    }

    @Override
    public void onError(Throwable throwable) {

        throwable.printStackTrace();

    }

    @Override
    public void onComplete() {

        System.out.println("All the orders were processed!");

    }
}

On this class we can see several methods implemented, which we saw on the previous diagram. We can explain each of them as:

  • onSubscribe: On this method, we receive a instance of subscription, which we use to request messages to the publisher. On our example, we stored the instance and requested 1 message to be processed – it is possible to establish a limit of more then 1 message per call, allowing the subscriber to process batches of messages  – with the subscription.request(n) method call;
  • onNext(T): On this method, we make the processing of the messages received. On our example, we print a message symbolizing the REST call and ask the publisher for another message;
  •  onError(Throwable throwable): On this method, we receive errors that can occur on the message processing. On our example, we simply print the errors;
  • onComplete(): This method is called after all messages are processed. On our example, we just print a message to signal the completion. It is important to note that, if we didn’t make the onNext(T) method to ask for other messages, this method would be called after the first message, since no more messages would be asked from the publisher;

Now that we understand how to implement our Subscribe, let’s try it out our stream by subscribing with the publisher and sending a message:

public static void main(String[] args) throws IOException {


    SubmissionPublisher<Order> submissionPublisher = new SubmissionPublisher<>();
    submissionPublisher.subscribe(new CDOrderConsumer());

    Order order = new Order();
    order.setId(1l);
    order.setOrderDate(new Date());
    order.setTotal(BigDecimal.valueOf(123));
    order.setProducts(List.of("product1", "product2", "product3"));


    submissionPublisher.submit(order);

    submissionPublisher.close();

    System.out.println("Waiting for processing.......");
    System.in.read();


}

On our script, we instantiate a publisher, create a order and submit the message for processing. Keep in mind that the submit call doesn’t mean that the message was sent to the subscriber: this is only done when the subscriber calls subscription.request(n). Lastly, we close the publisher, as we will not send any more messages.

Note: You may be thinking about why we put it that System.in.read() at the end. this is because all processing of the stream is done on a separate thread from the main one, so we need to make the program wait for the processing to complete, or else it will exit before the message is processed.

If we execute our program, we will see a output like this:

/Library/Java/JavaVirtualMachines/jdk-9.jdk/Contents/Home/bin/java "-javaagent:/Applications/IntelliJ IDEA CE.app/Contents/lib/idea_rt.jar=50218:/Applications/IntelliJ IDEA CE.app/Contents/bin" -Dfile.encoding=UTF-8 -classpath /Users/alexandrelourenco/Applications/git/ReactiveStreamsJava9/out/production/ReactiveStreamsJava9Lab com.alexandreesl.handson.Main
Waiting for processing.......
I am sending the Order to the CD!
All the orders were processed!

Success!!! Now we have a fully functional reactive stream, allowing us to process our messages.

Processors on Reactive Streams

Sometimes, on a stream, there will be logic that can be placed between the publisher and the consumer, such as filtering, transforming, and more. For this purpose, we can implement processors. Processors are like subscribers that also publish messages after the logic is applied. This way, processors can be chained together on a stream, executing one after another, before finally passing the message to a subscriber.

Let’s expand our previous example. We detected a bug on our e-commerce that sometimes places “phantom” orders with 0 total value on the stream. We didn’t identified the cause yet, but it is necessary to prevent this fake orders from been sent to the CD system. We can use a processor to filter this fake orders.

So, let’s implement the following class, OrderFilter, to accomplish this:

package com.alexandreesl.handson.processor;

import com.alexandreesl.handson.model.Order;

import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;


public class OrderFilter extends SubmissionPublisher<Order> implements Flow.Processor<Order, Order> {

    private Flow.Subscription subscription;


    @Override
    public void onSubscribe(Flow.Subscription subscription) {

        this.subscription = subscription;
        subscription.request(1);

    }

    @Override
    public void onNext(Order item) {

        if (item.getTotal().doubleValue() > 0) {

            submit(item);

        } else {

            System.out.println("INVALID ORDER! DISCARDING...");

        }

        subscription.request(1);


    }

    @Override
    public void onError(Throwable throwable) {

        throwable.printStackTrace();

    }

    @Override
    public void onComplete() {

        System.out.println("All the orders were processed!");

    }


}

On this class, we implement both publisher and subscriber interfaces. The code is basically the same from our subscriber, except that on the onNext(T) method we implement a logic that checks if a order has a total value bigger then 0. If it has, it is submitted to the subscriber, otherwise, it is discarded.

Next, we modify our code, subscribing the processor on our stream and testing it out with 2 orders, one valid and one fake:

public static void main(String[] args) throws IOException {

    SubmissionPublisher<Order> submissionPublisher = new SubmissionPublisher<>();
    OrderFilter filter = new OrderFilter();
    submissionPublisher.subscribe(filter);
    filter.subscribe(new CDOrderConsumer());

    Order order = new Order();
    order.setId(1l);
    order.setOrderDate(new Date());
    order.setTotal(BigDecimal.valueOf(123));
    order.setProducts(List.of("product1", "product2", "product3"));

    submissionPublisher.submit(order);

    order = new Order();
    order.setId(2l);
    order.setOrderDate(new Date());
    order.setProducts(List.of("product1", "product2", "product3"));

    order.setTotal(BigDecimal.ZERO);

    submissionPublisher.submit(order);

    submissionPublisher.close();

    System.out.println("Waiting for processing.......");
    System.in.read();

}

If we run the code, we will a message indicating that one of the messages was discarded, reflecting that our implementation was a success:

/Library/Java/JavaVirtualMachines/jdk-9.jdk/Contents/Home/bin/java "-javaagent:/Applications/IntelliJ IDEA CE.app/Contents/lib/idea_rt.jar=51469:/Applications/IntelliJ IDEA CE.app/Contents/bin" -Dfile.encoding=UTF-8 -classpath /Users/alexandrelourenco/Applications/git/ReactiveStreamsJava9/out/production/ReactiveStreamsJava9Lab com.alexandreesl.handson.Main
Waiting for processing.......
INVALID ORDER! DISCARDING...
I am sending the Order to the CD!
All the orders were processed!

The source code for our lab can be found here.

Conclusion

And so we conclude our learning on Reactive Streams. With a simple and intuitive approach, Reactive Streams are a good solution to try it out, specially on solutions that have capacity limitations. Please follow me next time for our last chapter on this series, where we will finally see the so famed new module system, Jigsaw. Thank you for following me on this post, until next time.

References

Reactive Streams (Wikipedia)

Reactive Streams Tutorial (another good tutorial to serve as guide)