Hi, dear readers! Welcome to my blog. Continuing our series at the new features of Java 9, we will now talk about reactive streams, a new concept on parallel processing that promises to protect our applications from overflows of messages on processing. So, without further delay, let’s begin!
What is Reactive Streams
Let’s imagine a e-commerce that has to send some orders for a distribution center. The e-commerce and DC systems are apart from each other, been able to communicate by a REST service.
Normally, we could simply create a call from the e-commerce system to the DC. So, we implement the call and everything is fine.
However, some day we get a problem on our integration. We notice the e-commerce has overflowed the DC with lots of calls from a Black Friday’s sales, so the REST endpoint starts to fail and lose data.
This scenario illustrates a common integration problem: When a consumer has processing limitations to consume messages above a certain volume, we need to ensure the integration doesn’t end up overflowing the pipeline.
To tackle this problem, it was designed a pattern called Reactive Streams. With Reactive streams, the flow of processing is controlled by the Consumer, not the Publisher, that calls for more data to process as soon as it is ready, keeping his own pace. Not only that, we also have a feature called back pressure, that consists of a kind of throttling to ensure that the Publisher of the data will wait for the Consumer to be available before sending anymore messages and overflow the Consumer, just like we saw on our previous example.
The diagram bellow show us the new Flow API on Java 9, that allows us to implements Reactive Streams. We can see our consumer (the subscriber) establishing a subscription with the producer and requesting n messages to consume, which are delivered to processing by the onNext method. Don’t worry about the rest of the details: we will see more on the next sections.
The reference site for this diagram, with another good tutorial on Reactive Streams, can be found on the references section at the end of the post.
Creating a Stream: the Publisher
First, let’s create our publisher. the simplest way to create a publisher is by using the SubmissionPublisher class.
Our lab will simulate the orders integration we talked about earlier. We will begin by creating a DTO to hold the data from our orders:
package com.alexandreesl.handson.model; import java.math.BigDecimal; import java.util.Date; import java.util.List; public class Order { private Long id; private List<String> products; private BigDecimal total; private Date orderDate; public Long getId() { return id; } public void setId(Long id) { this.id = id; } public List<String> getProducts() { return products; } public void setProducts(List<String> products) { this.products = products; } public BigDecimal getTotal() { return total; } public void setTotal(BigDecimal total) { this.total = total; } public Date getOrderDate() { return orderDate; } public void setOrderDate(Date orderDate) { this.orderDate = orderDate; } }
Next, let’s instantiate our publisher, passing the message object DTO as generic:
public static void main(String[] args) { SubmissionPublisher<Order> submissionPublisher = new SubmissionPublisher<>(); }
That’s all we need to do for now with our publisher.
Creating a Stream: the Consumer
Now, let’s create our consumer, or subscriber in other words. For this, we will create a class called CDOrderConsumer that implements the Subscriber<T> interface:
package com.alexandreesl.handson.consumer; import com.alexandreesl.handson.model.Order; import static java.util.concurrent.Flow.Subscriber; import static java.util.concurrent.Flow.Subscription; public class CDOrderConsumer implements Subscriber<Order> { private Subscription subscription; @Override public void onSubscribe(Subscription subscription) { this.subscription = subscription; subscription.request(1); } @Override public void onNext(Order item) { System.out.println("I am sending the Order to the CD!"); subscription.request(1); } @Override public void onError(Throwable throwable) { throwable.printStackTrace(); } @Override public void onComplete() { System.out.println("All the orders were processed!"); } }
On this class we can see several methods implemented, which we saw on the previous diagram. We can explain each of them as:
- onSubscribe: On this method, we receive a instance of subscription, which we use to request messages to the publisher. On our example, we stored the instance and requested 1 message to be processed – it is possible to establish a limit of more then 1 message per call, allowing the subscriber to process batches of messages – with the subscription.request(n) method call;
- onNext(T): On this method, we make the processing of the messages received. On our example, we print a message symbolizing the REST call and ask the publisher for another message;
- onError(Throwable throwable): On this method, we receive errors that can occur on the message processing. On our example, we simply print the errors;
- onComplete(): This method is called after all messages are processed. On our example, we just print a message to signal the completion. It is important to note that, if we didn’t make the onNext(T) method to ask for other messages, this method would be called after the first message, since no more messages would be asked from the publisher;
Now that we understand how to implement our Subscribe, let’s try it out our stream by subscribing with the publisher and sending a message:
public static void main(String[] args) throws IOException { SubmissionPublisher<Order> submissionPublisher = new SubmissionPublisher<>(); submissionPublisher.subscribe(new CDOrderConsumer()); Order order = new Order(); order.setId(1l); order.setOrderDate(new Date()); order.setTotal(BigDecimal.valueOf(123)); order.setProducts(List.of("product1", "product2", "product3")); submissionPublisher.submit(order); submissionPublisher.close(); System.out.println("Waiting for processing......."); System.in.read(); }
On our script, we instantiate a publisher, create a order and submit the message for processing. Keep in mind that the submit call doesn’t mean that the message was sent to the subscriber: this is only done when the subscriber calls subscription.request(n). Lastly, we close the publisher, as we will not send any more messages.
Note: You may be thinking about why we put it that System.in.read() at the end. this is because all processing of the stream is done on a separate thread from the main one, so we need to make the program wait for the processing to complete, or else it will exit before the message is processed.
If we execute our program, we will see a output like this:
/Library/Java/JavaVirtualMachines/jdk-9.jdk/Contents/Home/bin/java "-javaagent:/Applications/IntelliJ IDEA CE.app/Contents/lib/idea_rt.jar=50218:/Applications/IntelliJ IDEA CE.app/Contents/bin" -Dfile.encoding=UTF-8 -classpath /Users/alexandrelourenco/Applications/git/ReactiveStreamsJava9/out/production/ReactiveStreamsJava9Lab com.alexandreesl.handson.Main Waiting for processing....... I am sending the Order to the CD! All the orders were processed!
Success!!! Now we have a fully functional reactive stream, allowing us to process our messages.
Processors on Reactive Streams
Sometimes, on a stream, there will be logic that can be placed between the publisher and the consumer, such as filtering, transforming, and more. For this purpose, we can implement processors. Processors are like subscribers that also publish messages after the logic is applied. This way, processors can be chained together on a stream, executing one after another, before finally passing the message to a subscriber.
Let’s expand our previous example. We detected a bug on our e-commerce that sometimes places “phantom” orders with 0 total value on the stream. We didn’t identified the cause yet, but it is necessary to prevent this fake orders from been sent to the CD system. We can use a processor to filter this fake orders.
So, let’s implement the following class, OrderFilter, to accomplish this:
package com.alexandreesl.handson.processor; import com.alexandreesl.handson.model.Order; import java.util.concurrent.Flow; import java.util.concurrent.SubmissionPublisher; public class OrderFilter extends SubmissionPublisher<Order> implements Flow.Processor<Order, Order> { private Flow.Subscription subscription; @Override public void onSubscribe(Flow.Subscription subscription) { this.subscription = subscription; subscription.request(1); } @Override public void onNext(Order item) { if (item.getTotal().doubleValue() > 0) { submit(item); } else { System.out.println("INVALID ORDER! DISCARDING..."); } subscription.request(1); } @Override public void onError(Throwable throwable) { throwable.printStackTrace(); } @Override public void onComplete() { System.out.println("All the orders were processed!"); } }
On this class, we implement both publisher and subscriber interfaces. The code is basically the same from our subscriber, except that on the onNext(T) method we implement a logic that checks if a order has a total value bigger then 0. If it has, it is submitted to the subscriber, otherwise, it is discarded.
Next, we modify our code, subscribing the processor on our stream and testing it out with 2 orders, one valid and one fake:
public static void main(String[] args) throws IOException { SubmissionPublisher<Order> submissionPublisher = new SubmissionPublisher<>(); OrderFilter filter = new OrderFilter(); submissionPublisher.subscribe(filter); filter.subscribe(new CDOrderConsumer()); Order order = new Order(); order.setId(1l); order.setOrderDate(new Date()); order.setTotal(BigDecimal.valueOf(123)); order.setProducts(List.of("product1", "product2", "product3")); submissionPublisher.submit(order); order = new Order(); order.setId(2l); order.setOrderDate(new Date()); order.setProducts(List.of("product1", "product2", "product3")); order.setTotal(BigDecimal.ZERO); submissionPublisher.submit(order); submissionPublisher.close(); System.out.println("Waiting for processing......."); System.in.read(); }
If we run the code, we will a message indicating that one of the messages was discarded, reflecting that our implementation was a success:
/Library/Java/JavaVirtualMachines/jdk-9.jdk/Contents/Home/bin/java "-javaagent:/Applications/IntelliJ IDEA CE.app/Contents/lib/idea_rt.jar=51469:/Applications/IntelliJ IDEA CE.app/Contents/bin" -Dfile.encoding=UTF-8 -classpath /Users/alexandrelourenco/Applications/git/ReactiveStreamsJava9/out/production/ReactiveStreamsJava9Lab com.alexandreesl.handson.Main Waiting for processing....... INVALID ORDER! DISCARDING... I am sending the Order to the CD! All the orders were processed!
The source code for our lab can be found here.
Conclusion
And so we conclude our learning on Reactive Streams. With a simple and intuitive approach, Reactive Streams are a good solution to try it out, specially on solutions that have capacity limitations. Please follow me next time for our last chapter on this series, where we will finally see the so famed new module system, Jigsaw. Thank you for following me on this post, until next time.
References
Reactive Streams Tutorial (another good tutorial to serve as guide)