ReactiveX: implementing reactive solutions in the JVM

Standard

Hello, dear readers! Welcome to my blog. In this post, we will talk about ReactiveX, a Java framework that allows us to easily develop and deploy reactive solutions. But what is the reactive paradigm and why should we want to use it? Let’s find out!

What is the reactive paradigm?

According to wikipedia, reactive programming is

In computingreactive programming is a declarative programming paradigm concerned with data streams and the propagation of change. With this paradigm it is possible to express static (e.g., arrays) or dynamic (e.g., event emitters) data streams with ease, and also communicate that an inferred dependency within the associated execution model exists, which facilitates the automatic propagation of the changed data flow.

What this means is that the reactive paradigm is about handling data as streams, which flows, typically in an asynchronous manner, processing data as events, on the frequency they are produced (or consumed, depending on if we are using back-pressure or not). Is a different way of development thinking, outside of the traditional request-response. But why use reactive programming?

Why use reactive?

The main advantages on using reactive are non-blocking IO and parallelism. By providing a plug-and-play set of operators, designed to be easily transmuted to asynchronous running, we can develop solutions that scale well against massive chunks of data to process.

The non-blocking IO refers to the inners of reactive programming frameworks, that provide buffers and other types of techniques to make the code to operate independently. Let’s use an example to better explain.

Let’s imagine a classic API. Requests are made by clients to a server, which have a HTTP thread pool to process the requests. In traditional computing, all processing, from the controller up to the DB IO operations, are made on the HTTP thread.

Now imagine if we have a ton of requests arriving at near-light speed. Soon, the HTTP thread pool will be exhausted and the server will start rejecting requests.

Now, let’s see the same use case developed using the reactive paradigm. With reactive, we could break the solution in more components, for example by putting another thread pool to process the DB IO operations, with a buffer in the middle to make a decoupling of the slow IO code and the more need-to-be-fast HTTP code. The following picture illustrates the solution:

Keep it in mind that we are talking about the solution at a very low-level detail: typically, we won’t need to make this arrangements directly, but using all facilities from an reactive framework, such as ReactiveX.

With this approach, an HTTP thread only needs to receive the request and deliver to the buffer, immediately been freed to accept the next request. Normally there’s two possible solutions to produce the response for the client:

  1. By using fire-and-forget, that is, the HTTP thread just returns a ok message for the client and leave it to the DB thread;
  2. The HTTP thread sends to the buffer, alongside the data itself, an observer, which wraps the client’s connection. This way, the DB thread is responsible for emitting a response for the client, after making her processing.

Another powerful reactive technique is back-pressure. In back-pressure, we make the client-side of the stream responsible for dictating the frequency of the processing: the client requests for the server – or an middle-man buffer, depending on the framework’s implementation – for the next data to be processed.

This way we can, for example, protect the DB from been crushed under a massive load of writes coming his way. I wrote a good article about Akka streams, another good reactive framework made in Scala, which already explains in more detail about back-pressure. You can find the article here.

ReactiveX

So, now that we learned about reactive programming, let’s start learning about ReactiveX. Made in Java, with libraries for better Kotlin support, is one of the most popular reactive frameworks on the Java ecosystem.

For this lab, we will use Kotlin. Created by JetBrains, Kotlin is a good programming language, becoming very popular on Java ecosystems across the globe. With a much better support to functional programming than Java’s, is an excellent choice of language to learn today. All code will run on Java 11 and Kotlin 1.3.72.

To keep focused, we will stick to standalone examples, since making an API, DB migrations etc will drive our focus away from learning about the framework. If you want to read a more complex example, I suggest reading my Akka article after reading this.

All code from the lab can be found here. So, without further delay, let’s get started!

Our first stream

We will use Gradle as our dependency manager. First, let’s add ReactiveX dependencies on build.gradle file:

...
dependencies {
    runtime group: 'io.reactivex.rxjava2', name: 'rxkotlin', version: '2.4.0'
    compile group: 'io.reactivex.rxjava2', name: 'rxjava', version: '2.2.19'
}
...

We added RxJava2 core library, as well as Kotlin Rx library to add more Kotlin support.

Now, let’s create a Main function. Just like in Java, this function is a entry point for running the application:

fun main(args: Array<String>) {
}

Now, let’s make a very simple stream. We will just take a group of words, map the length of the words and print on the console:

import io.reactivex.Observable

fun main(args: Array<String>) {
    Observable.just("I",
        "Love",
        "Ana",
        "Carolina")
        .map(String::length)
        .subscribe(
            {println("length $it")},
            { println("error $it")},
            { println("stream ended!")}
        )
}

When runnig the following script, the following is print on the console:

length 1
length 4
length 3
length 8
stream ended!

We covered some ground with this simples example, let’s talk about the code:

  • First, we created an Observable. In ReactiveX, streams are divided in two types: Observables and Flowables. The main difference between the two is that Flowables allows us to use back-pressure. We will learn about Flowables further on the lab;
  • Second, we used just to initialise the Observable. This method is useful for simple examples where we know exactly the items we want to process;
  • Then, we used map. Map is a traditional stream operation, where we read each item and produce another item as output. In our case, we read each item and produce the word length;
  • Finally, we subscribe in the stream. The subscribe method is where we supply the code that will make the bottom of the stream, the last code to be processed. In this case, we print all word lengths, or a message error in case anything goes wrong (probably nothing will happen in this case, but you got the idea 🙂 ). We also provide a message for when the stream ends.

One last interesting concept to grasp before we move on is the stream event cycle. After the stream is started, we got the following possible sequence of actions:

  • The events are sent, one by one, downstream to the bottom. The code we supplied on first lambda of subscribe is the one that will be fired on this case;
  • In case anything goes wrong with the stream, an error event is sent to the bottom. This is what runs on our second lambda. When this happens, the stream stops;
  • Finally, when all events are processed, the completed event is fired, and processed by our third lambda. Keep it in mind that there’s cases where the stream can be infinite, such as streaming from an topic for example, so this event can be never fired up.

So that’s concludes our first stream. Yes, it is simple as that! Now, let’s learn some more things we can do with our streams.

Stream operations

Now, let’s make a tour on some of the basic operations we can do with streams. Of course, this won’t be a exhaustive list of every operation, since this would make our lab a dull boring showcase of tons of methods, but just a quick glance to allow us to first dive in the framework.

First, let’s move our first example to another script and change the Main script to run as an import:

package com.alexandreesl.examples

import io.reactivex.Observable

fun firstExample() {
    Observable.just("I",
        "Love",
        "Ana",
        "Carolina")
        .map(String::length)
        .subscribe(
            {println("length $it")},
            { println("error $it")},
            { println("stream ended!")}
        )
}
import com.alexandreesl.examples.firstExample

fun main(args: Array<String>) {
    firstExample()
}

From now on, we will keep adding examples to functions and run everything from Main.

Now, let’s make another quick example. We will take a single word, append a prefix and print to the console:

package com.alexandreesl.examples

import io.reactivex.Single

fun prefixExample() {
    Single.just("Alexandre")
        .map { "I am $it" }
        .subscribe(
            {println("$it")},
            { println("error $it")}
        )


}

After running, we get this output:

I am Alexandre

This example introduces us to Single. Single is a specialised observable, which primary concern is to provide a simplified version of the original Observable, focused on scenarios where we will emit just a single item.

For this reason, on subscribe we only have success and error event handlers that can be defined, since a onComplete event won’t make much sense since there’s just one item to process.

Now let’s make another example. We will get a random generated array of numbers, filter just the even ones and print:

package com.alexandreesl.examples

import io.reactivex.Observable
import java.util.*

fun evenNumbersExample() {

    Observable.fromIterable((1..50).map { Random().nextInt() })
        .filter { it % 2 == 0 }
        .subscribe(
            {println("$it")},
            { println("error $it")}
        )

}

This will produce something like the following (of course, every run will produce a different result):

-925639320
-1367241566
-1993909882
786568802
1215269342
-480862456
-1166646146
1171590568
682849868
-1321837860
-1021426524
-196824578
858851372
-213227382
966926184
-1095918914
-950267778
-2128264986
-1770877918
227807366
1692840490
1622475038
-308642896
-1907658182

On this example, we see another basic operator, filter. As the name implies, this operator will filter items emitted from the stream, based on a predicate.

Moving on, let’s make another example to wrap it up our quick tour. Let’s take a list of numbers (with duplicates) and use distinct and reduce, in order to sum all unique numbers on the array:

package com.alexandreesl.examples

import io.reactivex.Observable

fun distinctSumExample() {

    val numbers = listOf(1,2,3,4,5,5,6,12,14,18,22,25,25)

    Observable.fromIterable(numbers)
        .distinct()
        .reduce(0) { total,next -> total + next }
        .subscribe(
            {println("$it")},
            {println("error $it")}
        )
    
}

As expected, we will receive the number 112 as the result.

And this concludes our quick tour in ReactiveX operations. As we can see, is very powerful and easy to use. I suggest exploring the documentation, to discover much more from where this come from!

Handling events

As we saw before, there is three types of events on ReactiveX event cycle:

  • Next;
  • Error;
  • Complete.

Let’s say we want to add code snippets to be run on different stages of the stream, for example, to print the item before filtering. We can do this using doOnNext:

package com.alexandreesl.examples

import io.reactivex.Observable
import java.util.*

fun evenNumbersWithPrintExample() {

    Observable.fromIterable((1..50).map { Random().nextInt() })
        .doOnNext { println("item: $it") }
        .filter { it % 2 == 0 }
        .subscribe(
            {println("$it")},
            { println("error $it")}
        )

}

Running the code, we receive the following output (truncated for brevity):

...
item: -1075891450
-1075891450
item: 994876337
item: 297766092
297766092
item: -858967930
-858967930
item: 1633354503
item: -1792369156
-1792369156
item: 1255904419
item: 1325177149
item: 1773095686
1773095686
...

As we have doOnNext, we also have doOnError and doOnComplete for the same kind of effect.

There is two additional methods that are interesting to learn before we move on to the next subject. Let’s revisit our reduce example. Let’s suppose that something can go wrong and we want to return 0 as default in case of an error. We can do this as follows (we also added code to “force” a error):

package com.alexandreesl.examples

import io.reactivex.Observable
import java.lang.RuntimeException

fun distinctSumWithDefaultExample() {

    val numbers = listOf(1,2,3,4,5,5,6,12,14,18,22,25,25)

    var counter = 0

    Observable.fromIterable(numbers)
        .distinct()
        .reduce(0) { total,next ->
            counter++
            if (counter == 5)  throw RuntimeException("BOOOM!!!!")
            total + next }
        .onErrorReturn { 0 }
        .subscribe(
            {println("$it")},
            {println("error $it")}
        )

}

If we run this, we will receive 0, just as expected.

Another interesting method is onErrorResumeNext. This method allow us to return another Observable to be executed, in case the main Observable fails. Let’s see a simple example, where we divide 10 by a range of numbers and one of them is 0:

package com.alexandreesl.examples

import io.reactivex.Observable

fun divisionWithResumeNextExample() {

    Observable.just(1,2,3,4,0,8,2)
        .map {item -> 10 / item}
        .onErrorResumeNext(Observable.just(10).repeat(3))
        .subscribe(
            {println("item  $it")},
            {println("error $it")}
        )
}

This will produce the following. Notice that the stream stops immediately to process the original stream items and consumes the fallback stream, skipping the last items:

item  10
item  5
item  3
item  2
item  10
item  10
item  10

And that concludes our tour on event handling. Next, let’s learn about ReactiveX Flowables.

Flowables

When working with ReactiveX, there’s two major ways of using the framework: Observables and Flowables. The major difference between Observable and Flowable is that Flowable uses back-pressure, as stated before. Let’s see a example to better understand what that means.

Let’s imagine a stream that emits events from a very big range of numbers, that takes a little time to be processed. To see the problem, we will use a Scheduler (more on Schedulers later) to disconnect the emitter thread from the receiver one:

    Observable.fromIterable((1..5000000))
        .map { println("emitting $it")
            it }
        .observeOn(Schedulers.computation())
        .subscribe({
            Thread.sleep(5L)
            println(" receiving $it")
        })
    Thread.sleep(60000L)

When running, we would see something like this (of course, is only a fragment):

...
emitting 1264
emitting 1265
emitting 1266
emitting 1267
emitting 1268
emitting 1269
emitting 1270
emitting 1271
emitting 1272
emitting 1273
emitting 1274
emitting 1275
emitting 1276
 receiving 3
emitting 1277
emitting 1278
emitting 1279
emitting 1280
emitting 1281
emitting 1282
emitting 1283
emitting 1284
emitting 1285
...

As we can see, the emitter was a lot faster than the receiver. In this case, the scheduler is using by default a unbounded buffer so no data would be lost, but imagine that the risks on a production environment of incurring on memory overflows is very high.

So, how do we fix this? By changing to a Flowable, of course:

 Flowable.fromIterable((1..5000000))
        .map { println("emitting $it")
            it }
        .observeOn(Schedulers.computation())
        .subscribe({
            Thread.sleep(5L)
            println(" receiving $it")
        })
    Thread.sleep(60000L)

Yep, is simple as that to use a Flowable. Keep it in mind, all operators available to Observable are also available to Flowable, so all examples from before are valid for Flowables as well. If we run, we would see fragments like this:

... 
emitting 124
emitting 125
emitting 126
emitting 127
emitting 128
 receiving 1
 receiving 2
 receiving 3
 receiving 4
 receiving 5
 receiving 6
 receiving 7
 receiving 8
 receiving 9
 receiving 10
 receiving 11
 receiving 12
 receiving 13
 receiving 14
 receiving 15
 receiving 16
 receiving 17
 receiving 18
 receiving 19
 receiving 20
 receiving 21
 receiving 22
 receiving 23
...

We won’t enter deeply on the internals of Flowable implementation, but keeping it simple, the key here is that Flowable buffers data to the subscriber (the stream’s bottom). So after it hits the 128 row, it delivered the first chunk to be processed.

The main detail here is to note that we stopped receiving emitting messages. This is because the emitter stopped: is waiting for the subscriber to process the chunk, and only after that, it will start sending data again.

But what is that Scheduler we saw? Let’s find out!

Schedulers

Before start learning about schedulers, I want to ask you a question: have you noticed that, when we added a scheduler previously, we added a Thread.sleep after the Flowable? Did you guess why we did that?

The answer to this will probably be a shock: By default, ReactiveX is not multithread!

It is true: by default, all streams (Observables or Flowables) all run inside the same thread it created them, on our examples, the main thread. It is true that some operators (like timeout for example, where we define a time limit for the upstream to emit an event and throws a error if no event is emitted) are not single-thread, but the reason for that is that actually, from under the hood, they are defining a scheduler to run the operator.

So, what that Schedulers.computation() do? Basically, ReactiveX is getting the stream we declared and delivering his execution for a thread pool to be executed. What computation means? Is the type of scheduler we want to use. ReactiveX comes with several scheduler flavours, the main ones been:

  • computation: Computation is designed for streams that do CPU-intensive processing. When an application using ReactiveX is started, the framework creates a thread pool with the size equivalent to the number of cores on the CPU of the machine where the application is running;
  • IO: IO is designed for streams that do simple IO operations, that won’t take too much time to run. IO’s thread pool is unbounded: as the demand grows, ReactiveX will instantiate more threads, if the current pool is exhausted. This is a point of attention: if you are unsure of the load the application will have, using this option can be dangerous, as it can add more threads then the application can handle and result in an breakdown in more severe cases;
  • newThread: This works just like the name says: no thread pool, just keep creating new threads and deliver the new executions for them. Of course this is an even more dangerous one, since can even more easily result on the problem described above. Usable only on situations that the processing is indeed extremely fast;
  • single: This is like the opposite of newThread: it just creates one thread and keeps using it for all processing. This can be useful on simple streams, where you just need a little boost on performance. Remember that create threads is a costly operation, so indeed, in simple streams, it can be a better solution then IO, for example.

In my personal view, I think computation is the best all-around option, as it shields the risks of “over-threading” the application and creates the most optimal number for a thread pool, the same number of cores (where there is more threads then cores, the CPU divides them between the cores, so the multitask improvement is shrunk)

So, when we added observeOn, does the whole stream ran on a computation thread? Well, to tell you the true, I simplified my first answer a little: ReactiveX uses two methods for defining schedulers: subscribeOn and observeOn. So actually, in that case, just the receiver part of the stream ran on a different thread.

So, what is the difference between what we did previously, and this:

    Flowable.fromIterable((1..5000000))
        .map { println("emitting $it")
            it }
        .subscribeOn(Schedulers.computation())
        .subscribe({
            Thread.sleep(5L)
            println(" receiving $it")
        })
    Thread.sleep(60000L)

Well, actually, when using this, it indeed does what I said before: The whole stream will now run inside a computation thread. When using subscribeOn, we tell the whole stream to use a given scheduler. If we use both subscribeOn and observeOn on an stream, ReactiveX will use the scheduler defined on subscribeOn to upstream until the first observeOn is reached, and observeOn will be used for downstream, until the stream’s end is reached or another observeOn

Here is a example where we can see this thread manipulation running in practice:

fun schedulersExample() {

    Flowable.fromIterable((1..10))
        .map { println("emitting $it on ${Thread.currentThread().getName()}")
            it }
        .observeOn(Schedulers.io())
        .map { println("first receiving $it on ${Thread.currentThread().getName()}")
            it }
        .observeOn(Schedulers.single())
        .map { println("secondly receiving $it on ${Thread.currentThread().getName()}")
            it }
        .subscribeOn(Schedulers.computation())
        .subscribe {
            println("finally receiving $it on ${Thread.currentThread().getName()}")
        }
    Thread.sleep(300L)

}

After running the code, we will see an output like this:

emitting 1 on RxComputationThreadPool-1
emitting 2 on RxComputationThreadPool-1
emitting 3 on RxComputationThreadPool-1
emitting 4 on RxComputationThreadPool-1
first receiving 1 on RxCachedThreadScheduler-1
emitting 5 on RxComputationThreadPool-1
first receiving 2 on RxCachedThreadScheduler-1
emitting 6 on RxComputationThreadPool-1
secondly receiving 1 on RxSingleScheduler-1
first receiving 3 on RxCachedThreadScheduler-1
finally receiving 1 on RxSingleScheduler-1
emitting 7 on RxComputationThreadPool-1
secondly receiving 2 on RxSingleScheduler-1
finally receiving 2 on RxSingleScheduler-1
first receiving 4 on RxCachedThreadScheduler-1
secondly receiving 3 on RxSingleScheduler-1
finally receiving 3 on RxSingleScheduler-1
emitting 8 on RxComputationThreadPool-1
emitting 9 on RxComputationThreadPool-1
secondly receiving 4 on RxSingleScheduler-1
first receiving 5 on RxCachedThreadScheduler-1
finally receiving 4 on RxSingleScheduler-1
emitting 10 on RxComputationThreadPool-1
secondly receiving 5 on RxSingleScheduler-1
finally receiving 5 on RxSingleScheduler-1
first receiving 6 on RxCachedThreadScheduler-1
first receiving 7 on RxCachedThreadScheduler-1
secondly receiving 6 on RxSingleScheduler-1
first receiving 8 on RxCachedThreadScheduler-1
finally receiving 6 on RxSingleScheduler-1
first receiving 9 on RxCachedThreadScheduler-1
secondly receiving 7 on RxSingleScheduler-1
first receiving 10 on RxCachedThreadScheduler-1
finally receiving 7 on RxSingleScheduler-1
secondly receiving 8 on RxSingleScheduler-1
finally receiving 8 on RxSingleScheduler-1
secondly receiving 9 on RxSingleScheduler-1
finally receiving 9 on RxSingleScheduler-1
secondly receiving 10 on RxSingleScheduler-1
finally receiving 10 on RxSingleScheduler-1

As we can see, it started using an computation thread, as we defined on subscribeOn and later switched as reached different observeOns (RxCachedThreadScheduler is how ReactiveX names the threads from the IO scheduler).

And so we conclude our quick tour on Schedulers. Pretty cool, huh?

Conclusion

And so we conclude our first glimpse on ReactiveX. Of course, one article is too small to possibly feature everything that there is to learn about (I recommend to search about more topics such as Disposables, Scheduler strategies and specialised test classes such as TestObserver and TestSubscriber), but I hope to have at least teached the basics for a good overall understanding of the framework.

ReactiveX is a excellent framework, that it should be used. Happy coding!

References

Good ReactiveX book that I recommend

ReactiveX documentation

ReactiveX marbles (good site with diagrams that helps understand how the operators work)

Lab code

Java 9: Learning the new features – part 3

Standard

Hi, dear readers! Welcome to my blog. Continuing our series at the new features of Java 9, we will now talk about reactive streams, a new concept on parallel processing that promises to protect our applications from overflows of messages on processing. So, without further delay, let’s begin!

What is Reactive Streams

Let’s imagine a e-commerce that has to send some orders for a distribution center. The e-commerce and DC systems are apart from each other, been able to communicate by a REST service.

Normally, we could simply create a call from the e-commerce system to the DC. So, we implement the call and everything is fine.

However, some day we get a problem on our integration. We notice the e-commerce has overflowed the DC with lots of calls from a Black Friday’s sales, so the REST endpoint starts to fail and lose data.

This scenario illustrates a common integration problem: When a consumer has processing limitations to consume messages above a certain volume, we need to ensure the integration doesn’t end up overflowing the pipeline.

To tackle this problem, it was designed a pattern called Reactive Streams. With Reactive streams, the flow of processing is controlled by the Consumer, not the Publisher, that calls for more data to process as soon as it is ready, keeping his own pace. Not only that, we also have a feature called back pressure, that consists of a kind of throttling to ensure that the Publisher of the data will wait for the Consumer to be available before sending anymore messages and overflow the Consumer, just like we saw on our previous example.

The diagram bellow show us the new Flow API on Java 9, that allows us to implements Reactive Streams. We can see our consumer (the subscriber)  establishing a subscription with the producer and requesting n messages to consume, which are delivered to processing by the onNext method. Don’t worry about the rest of the details: we will see more on the next sections.

The reference site for this diagram, with another good tutorial on Reactive Streams, can be found on the references section at the end of the post.

Creating a Stream: the Publisher

First, let’s create our publisher. the simplest way to create a publisher is by using the SubmissionPublisher class.

Our lab will simulate the orders integration we talked about earlier. We will begin by creating a DTO to hold the data from our orders:

package com.alexandreesl.handson.model;

import java.math.BigDecimal;
import java.util.Date;
import java.util.List;

public class Order {

    private Long id;

    private List<String> products;

    private BigDecimal total;

    private Date orderDate;

    public Long getId() {
        return id;
    }

    public void setId(Long id) {
        this.id = id;
    }

    public List<String> getProducts() {
        return products;
    }

    public void setProducts(List<String> products) {
        this.products = products;
    }

    public BigDecimal getTotal() {
        return total;
    }

    public void setTotal(BigDecimal total) {
        this.total = total;
    }

    public Date getOrderDate() {
        return orderDate;
    }

    public void setOrderDate(Date orderDate) {
        this.orderDate = orderDate;
    }
}

Next, let’s instantiate our publisher, passing the message object DTO as generic:

public static void main(String[] args) {

    SubmissionPublisher<Order> submissionPublisher = new SubmissionPublisher<>();


}

That’s all we need to do for now with our publisher.

Creating a Stream: the Consumer

Now, let’s create our consumer, or subscriber in other words. For this, we will create a class called CDOrderConsumer that implements the Subscriber<T> interface:

package com.alexandreesl.handson.consumer;

import com.alexandreesl.handson.model.Order;

import static java.util.concurrent.Flow.Subscriber;
import static java.util.concurrent.Flow.Subscription;


public class CDOrderConsumer implements Subscriber<Order> {

    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {

        this.subscription = subscription;
        subscription.request(1);

    }

    @Override
    public void onNext(Order item) {
        System.out.println("I am sending the Order to the CD!");
        subscription.request(1);

    }

    @Override
    public void onError(Throwable throwable) {

        throwable.printStackTrace();

    }

    @Override
    public void onComplete() {

        System.out.println("All the orders were processed!");

    }
}

On this class we can see several methods implemented, which we saw on the previous diagram. We can explain each of them as:

  • onSubscribe: On this method, we receive a instance of subscription, which we use to request messages to the publisher. On our example, we stored the instance and requested 1 message to be processed – it is possible to establish a limit of more then 1 message per call, allowing the subscriber to process batches of messages  – with the subscription.request(n) method call;
  • onNext(T): On this method, we make the processing of the messages received. On our example, we print a message symbolizing the REST call and ask the publisher for another message;
  •  onError(Throwable throwable): On this method, we receive errors that can occur on the message processing. On our example, we simply print the errors;
  • onComplete(): This method is called after all messages are processed. On our example, we just print a message to signal the completion. It is important to note that, if we didn’t make the onNext(T) method to ask for other messages, this method would be called after the first message, since no more messages would be asked from the publisher;

Now that we understand how to implement our Subscribe, let’s try it out our stream by subscribing with the publisher and sending a message:

public static void main(String[] args) throws IOException {


    SubmissionPublisher<Order> submissionPublisher = new SubmissionPublisher<>();
    submissionPublisher.subscribe(new CDOrderConsumer());

    Order order = new Order();
    order.setId(1l);
    order.setOrderDate(new Date());
    order.setTotal(BigDecimal.valueOf(123));
    order.setProducts(List.of("product1", "product2", "product3"));


    submissionPublisher.submit(order);

    submissionPublisher.close();

    System.out.println("Waiting for processing.......");
    System.in.read();


}

On our script, we instantiate a publisher, create a order and submit the message for processing. Keep in mind that the submit call doesn’t mean that the message was sent to the subscriber: this is only done when the subscriber calls subscription.request(n). Lastly, we close the publisher, as we will not send any more messages.

Note: You may be thinking about why we put it that System.in.read() at the end. this is because all processing of the stream is done on a separate thread from the main one, so we need to make the program wait for the processing to complete, or else it will exit before the message is processed.

If we execute our program, we will see a output like this:

/Library/Java/JavaVirtualMachines/jdk-9.jdk/Contents/Home/bin/java "-javaagent:/Applications/IntelliJ IDEA CE.app/Contents/lib/idea_rt.jar=50218:/Applications/IntelliJ IDEA CE.app/Contents/bin" -Dfile.encoding=UTF-8 -classpath /Users/alexandrelourenco/Applications/git/ReactiveStreamsJava9/out/production/ReactiveStreamsJava9Lab com.alexandreesl.handson.Main
Waiting for processing.......
I am sending the Order to the CD!
All the orders were processed!

Success!!! Now we have a fully functional reactive stream, allowing us to process our messages.

Processors on Reactive Streams

Sometimes, on a stream, there will be logic that can be placed between the publisher and the consumer, such as filtering, transforming, and more. For this purpose, we can implement processors. Processors are like subscribers that also publish messages after the logic is applied. This way, processors can be chained together on a stream, executing one after another, before finally passing the message to a subscriber.

Let’s expand our previous example. We detected a bug on our e-commerce that sometimes places “phantom” orders with 0 total value on the stream. We didn’t identified the cause yet, but it is necessary to prevent this fake orders from been sent to the CD system. We can use a processor to filter this fake orders.

So, let’s implement the following class, OrderFilter, to accomplish this:

package com.alexandreesl.handson.processor;

import com.alexandreesl.handson.model.Order;

import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;


public class OrderFilter extends SubmissionPublisher<Order> implements Flow.Processor<Order, Order> {

    private Flow.Subscription subscription;


    @Override
    public void onSubscribe(Flow.Subscription subscription) {

        this.subscription = subscription;
        subscription.request(1);

    }

    @Override
    public void onNext(Order item) {

        if (item.getTotal().doubleValue() > 0) {

            submit(item);

        } else {

            System.out.println("INVALID ORDER! DISCARDING...");

        }

        subscription.request(1);


    }

    @Override
    public void onError(Throwable throwable) {

        throwable.printStackTrace();

    }

    @Override
    public void onComplete() {

        System.out.println("All the orders were processed!");

    }


}

On this class, we implement both publisher and subscriber interfaces. The code is basically the same from our subscriber, except that on the onNext(T) method we implement a logic that checks if a order has a total value bigger then 0. If it has, it is submitted to the subscriber, otherwise, it is discarded.

Next, we modify our code, subscribing the processor on our stream and testing it out with 2 orders, one valid and one fake:

public static void main(String[] args) throws IOException {

    SubmissionPublisher<Order> submissionPublisher = new SubmissionPublisher<>();
    OrderFilter filter = new OrderFilter();
    submissionPublisher.subscribe(filter);
    filter.subscribe(new CDOrderConsumer());

    Order order = new Order();
    order.setId(1l);
    order.setOrderDate(new Date());
    order.setTotal(BigDecimal.valueOf(123));
    order.setProducts(List.of("product1", "product2", "product3"));

    submissionPublisher.submit(order);

    order = new Order();
    order.setId(2l);
    order.setOrderDate(new Date());
    order.setProducts(List.of("product1", "product2", "product3"));

    order.setTotal(BigDecimal.ZERO);

    submissionPublisher.submit(order);

    submissionPublisher.close();

    System.out.println("Waiting for processing.......");
    System.in.read();

}

If we run the code, we will a message indicating that one of the messages was discarded, reflecting that our implementation was a success:

/Library/Java/JavaVirtualMachines/jdk-9.jdk/Contents/Home/bin/java "-javaagent:/Applications/IntelliJ IDEA CE.app/Contents/lib/idea_rt.jar=51469:/Applications/IntelliJ IDEA CE.app/Contents/bin" -Dfile.encoding=UTF-8 -classpath /Users/alexandrelourenco/Applications/git/ReactiveStreamsJava9/out/production/ReactiveStreamsJava9Lab com.alexandreesl.handson.Main
Waiting for processing.......
INVALID ORDER! DISCARDING...
I am sending the Order to the CD!
All the orders were processed!

The source code for our lab can be found here.

Conclusion

And so we conclude our learning on Reactive Streams. With a simple and intuitive approach, Reactive Streams are a good solution to try it out, specially on solutions that have capacity limitations. Please follow me next time for our last chapter on this series, where we will finally see the so famed new module system, Jigsaw. Thank you for following me on this post, until next time.

References

Reactive Streams (Wikipedia)

Reactive Streams Tutorial (another good tutorial to serve as guide)