Akka Streams: developing robust applications using Scala

Standard

Hi, dear readers! Welcome to my blog. A long time ago, I wrote a post about the actor model and how to use Akka to implement solutions using actors. If the reader doesn’t read the post, it can be found here. Now, more than 4 years later – how fast time goes! – it is time to revisit this world, with a much better understanding and maturity. At the time, I used good old Java to do the task.

There’s nothing wrong with using Java, but if you really want to delve on Akka, then Scala is the language of choice, especially if we want to use Akka, a project specially tailored to develop data flows that could do tasks such as system integrations.

With non-blocking IO and parallelism embedded at his core – and encouraged to be used on our custom code by following their good practices! – Akka streams allow us to develop really fast applications that can easily scale. From my personal experience, it is a  specially good option for integrating with Apache Kafka.

So, without further delay, let’s begin our journey!

Actor Model

The actor model was already explained in my previous post so we won’t waste much time with this explanation. To sum it up, we have a system where actors work with each other asynchronously, creating a system where tasks are broken down on multiple steps by different actors, each one of them communicating by a personal queue (mailbox) that enqueues messages to be processed by the actors. This way, we have a scalable solution, where tasks are done in parallel.

Akka quick recap

Actors

Actors are the core components of an actor system. An actor consists of a program unit that implements logic based on messages it receives from his mailbox.

When developing Akka applications on Scala, an actor must implement a receive method, where will create logic for different types of messages it can receive. Each time a message arrives at the mailbox, the dispatcher delivers the message to the actor. It is important to notice it, however, that it is the actor which asks for the next message, as it completes processing the current message – by default, each actor process just one message a time -, this way avoiding an actor to be overloaded. This technique is called back-pressure, which we will talk more about it in the next sections.

ActorSystem

The actor system comprehends the whole actor solution. It is composed of actors, dispatchers, and mailboxes.

Applications can have multiple actor systems inside. Also, it is possible to define actor systems to be linked together remotely, forming a cluster.

Execution Context (Dispatchers)

Execution contexts, also known as dispatchers, are responsible for serving actors with messages by delivering the messages to the mailboxes.

Dispatchers are also responsible for allocating the actors themselves, including details such as parallel actor execution, using strategies such as thread pools, for example. A dispatcher can be defined globally for the whole system or defined at actor level.

An important note regarding performance with dispatchers is that they run the actors inside thinner layers that are memory-optimized, so memory consumption inside Akka solutions is lower than in traditional Java applications.

One interesting thing to notice on actor instantiation is how Akka treats actor references. When asking for an actor to be created inside a system, Akka will create an actor reference, which can be used to send messages to it.

These messages are sent using remote calls, even when the actor system is been used all locally. This guarantees that when using actor systems remotely, such as in a cluster, for example, there will be no difference in the code.

Mailboxes are, like the name suggests, repositories to messages that it will be processed by actors. Mailboxes can have different strategies to treat messages, such as unbounded lists, single and multi-consumer, priority queues and more.

Actor Supervisors & Lifecycle

When creating actors, we can create them at the system level or create them inside another actor. When creating inside an actor, we call the parent actor a Supervisor and the actors created inside are called Child Actors (when creating an actor at the system level, also known as Top-level Actors, they also are child actors, in this case from a reserved actor from Akka itself).

Every actor has a lifecycle: it can be started, on which case is running, stopped when an unrecoverable failure occurs and restarted or resumed, depending on the circumstances of the failure.

Supervisors are actors responsible for deciding for their child actors what to do when one of them faces a failure. It is possible to simply stop the actor, restart him, or resume (the main difference between restart and resume is that resources are freed on a restart, while on a resume the actor simply resumes his execution).

These decisions are called Supervisor Policies. These policies can also be set to behavior as one-for-one or all-for-one, meaning that when an error occurs on one child actor, the policy will be applied to all actors bellow (for example, all actors would restart) or just to the failed actor.

And that concludes our quick recap of Akka. Now, let’s begin our talk about Akka Stream.

Stream concepts

A stream is composed of tasks that must be done – continuously or not – in order to do a process. Each stream must have a Source, which is the beginning, a Flow composed of multiple tasks that can run at parallel depending on the needs and a Sink, which is the stream’s end.

Actor materialization

Akka Streams runs on top of Akka. That means when a stream is started, internally Akka Streams creates an actor system with actors responsible for running the tasks of the stream.

The responsibility for doing this task is of the Actor Materializer, that creates (materializes) the resources need to run the stream. One interesting thing is that it is possible to explicitly define materializing points on our flow.

These points are used by Akka streams to define points where it will group the tasks from the flow to run on separated actors, so it is a good technique to keep it in mind when doing stream tunning.

Sources

Sources are flow’s beginnings. A source is used for defining an entrypoint for data, be a finite datasource, such as a file, to an infinite one, such as a Kafka topic, for example. It is possible to zip multiple source definitions on a single combined source for processing, but still, a flow can have only one source.

Flows

Flows are the middle of the stream. One flow can have an infinite number of tasks (steps), that range from data transformation to enrichment by calling external resources.

Sinks

Sinks are flow’s endings. Analogous to sources, sinks can have multiple types of destinations, such as files, Kafka topics, REST endpoints, etc. Likewise the source, flows can also have only one sink.

Graphs

When modeling an Akka stream, as seen previously, we define a source, a sink and several flows in between. All of this generates a graph, where each node represents a task on the stream.

When instantiating a stream, a runnable graph is created, which represents a blueprint for executions. After executing the stream with the run() method for example (there’s also a runWith(Sink) method that accepts a sink as a parameter) the runnable graph is materialized and executed.

During our lab, we will see Graph Stages. Graph stages are like “boxes” that group tasks together, making them look like a single node in the final graph.

Back-pressure

One very important concept when learning about Akka streams is back-pressure. Typically, on a producer-consumer architecture, the producer will keep sending data to the consumer, without really knowing if the consumer is capable of keeping it up with the load or not. This can create a problem where a producer overloads a consumer, generating all kinds of errors and slowness.

With back-pressure, this approach is reversed. Now, it is the consumer that dictates when to receive a new message, by pulling new data at his rhythm. while no new message is asked, the producer keeps waiting for a signal and only then it starts pushing messages again.

The image below illustrates the concept in action:

graph_stage_conceptual

Stream error handling

Of course, just like with an actor system, streams also can fail. Just like with actors, error handling is also done with supervisors, that defines policies for a stream to resume, stop or restart depending on the error.

Streams also support a recovery configuration, that allows us, for example, to chain another stream execution in case of error after several retries.

Alpakka project

The Alpakka project is an integration library composed of several components that allow us to quickly deploy integrations between several technologies, such as files, REST endpoints and even AWS technologies such as Amazon Kinesis. During the course of our lab, we will use resources from this project, so stay tuned for more!

The project documentation can be found in:

https://developer.lightbend.com/docs/alpakka/current/

Lab

Pre-requisites

So, without further delay, let’s begin! This lab requires that the reader already have some knowledge of Scala and Akka. On my blog, it is possible to read my previous post on Akka, alongside my series about the Scala language.

Creating the project & infrastructure code

To create the project, let’s begin by just creating the sbt file that will hold our project’s dependencies.  We will start by creating a folder that will hold our project (all sources for the lab can be found here) and type the following on a file called build.sbt:

name := "Akka-stream-lab"
version := "1.0"
scalaVersion := "2.12.5"

enablePlugins(JavaAppPackaging)

mainClass in Compile := Some("Main")

lazy val Versions = new {
  val akkaVersion = "2.5.11"
}

lazy val akkaDependencies = Seq(
  "com.lightbend.akka" %% "akka-stream-alpakka-csv" % "0.8",
  "com.typesafe.akka" %% "akka-stream-kafka" % "0.22",
  "com.typesafe.akka" %% "akka-actor" % Versions.akkaVersion,
  "com.typesafe.akka" %% "akka-stream" % Versions.akkaVersion,
  "com.typesafe.akka" %% "akka-slf4j" % Versions.akkaVersion,
  "com.typesafe.akka" %% "akka-testkit" % Versions.akkaVersion % Test,
  "com.typesafe.akka" %% "akka-stream-testkit" % Versions.akkaVersion % Test,
  "com.typesafe.akka" %% "akka-testkit" % Versions.akkaVersion % Test
)

lazy val testDependencies = Seq(
  "org.scalacheck" %% "scalacheck" % "1.13.4" % Test,
  "org.scalamock" %% "scalamock" % "4.1.0" % Test,
  "org.mockito" % "mockito-core" % "2.19.0" % Test,
  "org.scalatest" %% "scalatest" % "3.0.5" % Test
)

lazy val loggingDependencies = Seq(
  "ch.qos.logback" % "logback-classic" % "1.2.3",
  "com.typesafe.scala-logging" %% "scala-logging" % "3.9.0",
  "org.slf4j" % "slf4j-api" % "1.7.25"
)

lazy val otherDependencies = Seq(
  "io.spray" %% "spray-json" % "1.3.5"
)

libraryDependencies ++= (
  akkaDependencies++
  loggingDependencies++
  testDependencies++
  otherDependencies
  )

As can be seen above, not only we defined a sbt project, but also included dependencies for Akka and logging, alongside Akka Streams itself. We also added a packaging plugin to simplify our use when running the project from command-line.

In order to use the plugin, we need to add it to sbt project’s definition. To do that, we create a project folder and inside create an plugins.sbt file, with the following:


logLevel := Level.Warn

addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.3.2")

Finally, we create our main Scala object that will be the App launcher for our project. We create a Scala source folder and add a Main.scala file, containing the following:


import akka.actor.ActorSystem

object Main extends App {
implicit val system: ActorSystem = ActorSystem("akka-streams-lab")
}

Our first version is very simple: It simply creates a new actor system. Of course, during our lab, it will receive more code to evolve to our final solution.

Concluding the setup, this would be the first structure of our project when seen (some other files, such as sbt’s build.properties, are created automatically when running the project using sbt):

Screen Shot 2018-12-23 at 15.44.04

This image is taken from Intellij, which I recommend as the IDE for the lab.

Without spoiling too much, as we can see in our next section, we will need some infrastructure code that will set it up a Kafka cluster for our use in the lab. For this, we will use Docker Compose.

So, let’s begin by creating a docker-compose.yml file, that will create our Kafka cluster. The file will be as follows:


version: '2'
services:
  zookeeper:
    image: wurstmeister/zookeeper:3.4.6
    ports:
      - 2181:2181
  kafka:
    image: wurstmeister/kafka:1.1.0
    ports:
      - 9092:9092
    environment:
      KAFKA_ADVERTISED_HOST_NAME: localhost
      KAFKA_CREATE_TOPICS: "accounts:1:1"
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

In this simple docker-compose file, we create an embedded cluster with a broker node and a Zookeeper node and also create a accounts topic at startup. To test it out our code, with docker up and running, we can start a cluster by running:


docker-compose up -d

Finally, let’s create a logback config file for our logging. To do this, let’s create a file called logback.xml inside resources folder and enter the following:

%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} – %msg%n

That’s it! now that we have our infrastructure code ready, let’s begin the actual coding.

Lab’s use case

In our lab, we will create 2 streams. One of them will read data from a file and publish to a Kafka topic. The other one will read from this same topic and save the message on other files, this way demonstrating the flow of data from a point A to a point B.

Creating the first stream

Let’s begin by creating the first stream, which is a simple stream that will read data (accounts in our case) from a file and send to Kafka. At first, let’s code the stream in the main code itself and evolve as we develop.

First, we create a file called input1.csv and add the following:

“COD”,”NAME”,”DOCUMENT”,”AGE”,”CIVIL_STATUS”,”PHONE”,”BIRTHDAY”,”HOME_COUNTRY”,”HOME_STATE”,”HOME_CITY”,”HOME_STREET”,”HOME_STREETNUM”,”HOME_NEIGHBORHOOD”
1,”alexandre eleuterio santos lourenco 1″,”43456754356″,36,”Single”,”+5511234433443″,”21/06/1982″,”Brazil”,”São Paulo”,”São Paulo”,”false st”,3134,”neigh 1″
2,”alexandre eleuterio santos lourenco 2″,”43456754376″,37,”Single”,”+5511234433444″,”21/06/1983″,”Brazil”,”São Paulo”,”São Paulo”,”false st”,3135,”neigh 1″

That will be the csv input file for our first stream. Next, we create a case class Account to encapsulate the data from the file:


package com.alexandreesl.model

case class Account(cod: Long, name: String, document: String,
age: Int, civilStatus: String,
                   phone: String, birthday: String,
country: String, state: String,
                   city: String, street: String,
streetNum: Long, neighBorhood: String)

We also create an object to allow JSON marshaling/unmarshalling when sending/receiving messages in Kafka:


package com.alexandreesl.json

import com.alexandreesl.model.Account
import spray.json.DefaultJsonProtocol

object JsonParsing extends DefaultJsonProtocol {

  implicit val accountFormat = jsonFormat13(Account)

}

Finally, we code the stream that reads from the file and publishes to Kafka (don’t you worry, we will refactor later):


import java.nio.file.Paths

import akka.actor.ActorSystem
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.scaladsl.{FileIO, Flow, Framing}
import akka.stream.{ActorMaterializer, ActorMaterializerSettings}
import akka.util.ByteString
import com.alexandreesl.model.Account
import com.typesafe.scalalogging.Logger
import org.apache.kafka.common.serialization.StringSerializer
import org.slf4j.LoggerFactory
import spray.json._
import com.alexandreesl.json.JsonParsing._
import org.apache.kafka.clients.producer.ProducerRecord
object Main extends App {

implicit val system: ActorSystem = ActorSystem("akka-streams-lab")
implicit val materializer: ActorMaterializer = ActorMaterializer(ActorMaterializerSettings(system))
implicit val ec = system.dispatcher
private val logger = Logger(LoggerFactory.getLogger("Main"))
val config = system.settings.config.getConfig("akka.kafka.producer")
val producerSettings =
ProducerSettings(config, new StringSerializer, new StringSerializer)
.withBootstrapServers("localhost:9092")

logger.info("Starting streams...")

private def convertToClass(csv: Array[String]): Account = {
Account(csv(0).toLong,
csv(1), csv(2),
csv(3).toInt, csv(4),
csv(5), csv(6),
csv(7), csv(8),
csv(9), csv(10),
csv(11).toLong, csv(12))
}

private val flow = Flow[String].filter(s => !s.contains("COD"))
.map(line => {
convertToClass(line.split(","))
})

FileIO.fromPath(Paths.get("input1.csv"))
.via(Framing.delimiter(ByteString("\n"), 4096)
.map(_.utf8String))
.via(flow)
.map(value => new ProducerRecord[String, String]("accounts", value.toJson.compactPrint))
.runWith(Producer.plainSink(producerSettings))

logger.info("Stream system is initialized!")

}

As we can see, the code is pretty straight-forward. We just added a stream that reads from a csv file and dispatches the lines to Kafka. To see the messages on Kafka, first, we start the application – don’t forget to run docker compose up first! -, and them we can use a shell inside the broker’s container, as follows:

docker exec -t -i akka-stream-lab_kafka_1 /opt/kafka/bin/kafka-console-consumer.sh –bootstrap-server :9092 –topic accounts –from-beginning

This will produce an output as follows:

{“age”:36,”birthday”:”\”21/06/1982\””,”city”:”\”São Paulo\””,”civilStatus”:”\”Single\””,”cod”:1,”country”:”\”Brazil\””,”document”:”\”43456754356\””,”name”:”\”alexandre eleuterio santos lourenco 1\””,”neighBorhood”:”\”neigh 1\””,”phone”:”\”+5511234433443\””,”state”:”\”São Paulo\””,”street”:”\”false st\””,”streetNum”:3134}
{“age”:37,”birthday”:”\”21/06/1983\””,”city”:”\”São Paulo\””,”civilStatus”:”\”Single\””,”cod”:2,”country”:”\”Brazil\””,”document”:”\”43456754376\””,”name”:”\”alexandre eleuterio santos lourenco 2\””,”neighBorhood”:”\”neigh 1\””,”phone”:”\”+5511234433444\””,”state”:”\”São Paulo\””,”street”:”\”false st\””,”streetNum”:3135}

Now that we have coded our first stream, let’s create our second stream. Don’t you worry about the messing code right now, we will refactor later when implementing error handling using actors.

Creating the second Stream

Now, let’s create our second stream. This stream will read from Kafka and generate two files, one with personal data and another with address data.

For this, we will use a graph stage, that will make the file write in parallel. We will disable autocommit for Kafka consumption and commit only at the end.

Before creating the stage, let’s do our first refactoring, by moving the Account case class to a GraphMessages object, which will hold all case classes we will use on our coding:

package com.alexandreesl.graph

import akka.kafka.ConsumerMessage

object GraphMessages {

  case class Account(cod: Long, name: String, document: String,
                     age: Int, civilStatus: String,
                     phone: String, birthday: String,
                     country: String, state: String,
                     city: String, street: String,
                     streetNum: Long, neighBorhood: String)

  case class InputMessage(acc: Account,
                          offset: ConsumerMessage.CommittableOffset)

  case class AccountPersonalData(cod: Long, name: String,
                                 document: String, age: Int,
                                 civilStatus: String,
                                 phone: String, birthday: String)

  case class AccountAddressData(cod: Long, country: String,
                                state: String, city: String,
                                street: String, streetNum: Long,
                                neighBorhood: String)

}

We also update our Json protocol accordingly, since we will use JSON marshaling for the other classes as well:

package com.alexandreesl.json

import com.alexandreesl.graph.GraphMessages.{Account, AccountAddressData, AccountPersonalData}
import spray.json.DefaultJsonProtocol

object JsonParsing extends DefaultJsonProtocol {

  implicit val accountFormat = jsonFormat13(Account)
  implicit val accountPersonalFormat = jsonFormat7(AccountPersonalData)
  implicit val accountAddressFormat = jsonFormat7(AccountAddressData)

}

Finally, let’s create our graph stage. Notice the ~> symbol? That symbol is used inside the graph stage builder to create the stage flow. This allows us to code flows in a visual manner, making a lot simpler to design stream flows.

package com.alexandreesl.graph

import java.nio.file.{Paths, StandardOpenOption}

import akka.actor.ActorSystem
import akka.kafka.ConsumerMessage
import akka.stream.{ActorMaterializer, FlowShape}
import akka.stream.scaladsl.{Broadcast, FileIO, Flow, GraphDSL, Source, Zip}
import akka.util.ByteString
import com.alexandreesl.graph.GraphMessages.{Account, AccountAddressData, AccountPersonalData, InputMessage}
import spray.json._
import com.alexandreesl.json.JsonParsing._

object AccountWriterGraphStage {

  val personal = Paths.get("personal.csv")
  val address = Paths.get("address.csv")

  def graph(implicit system: ActorSystem, materializer: ActorMaterializer) =
Flow.fromGraph(GraphDSL.create() { implicit builder =>

    import GraphDSL.Implicits._

    val flowPersonal = Flow[InputMessage].map(msg => {
      Source.single(AccountPersonalData(msg.acc.cod,
        msg.acc.name, msg.acc.document, msg.acc.age,
msg.acc.civilStatus, msg.acc.phone,
msg.acc.birthday).toJson.compactPrint + "\n")
        .map(t => ByteString(t))
        .runWith(FileIO.toPath(personal,
Set(StandardOpenOption.CREATE, StandardOpenOption.APPEND)))
      msg.acc
    })

    val flowAddress = Flow[InputMessage].map(msg => {
      Source.single(AccountAddressData(msg.acc.cod,
        msg.acc.country, msg.acc.state,
msg.acc.city, msg.acc.street, msg.acc.streetNum,
msg.acc.neighBorhood).toJson.compactPrint + "\n")
        .map(t => ByteString(t))
        .runWith(FileIO.toPath(address,
Set(StandardOpenOption.CREATE, StandardOpenOption.APPEND)))
      msg.offset
    })

    val bcastJson = builder.add(Broadcast[InputMessage](2))
    val zip = builder.add(Zip[Account, ConsumerMessage.CommittableOffset])

    bcastJson ~> flowPersonal ~> zip.in0
    bcastJson ~> flowAddress ~> zip.in1

    FlowShape(bcastJson.in, zip.out)

  })

}

On our stage, we created two flows that executed in parallel a broadcast, each one passing through one value from the original input message. In the end, we use zip to generate a tuple from the two objects that will be passed to the next stage. Finally, let’s create our stream, which will be using the graph stage as part of the stream. I promise this will be the last time we will see that big messy main object: next section we will start refactoring.

import java.nio.file.Paths

import akka.actor.ActorSystem
import akka.kafka.{ConsumerSettings, ProducerSettings, Subscriptions}
import akka.kafka.scaladsl.{Consumer, Producer}
import akka.stream.scaladsl.{FileIO, Flow, Framing, Sink}
import akka.stream.{ActorMaterializer, ActorMaterializerSettings}
import akka.util.ByteString
import com.alexandreesl.graph.AccountWriterGraphStage
import com.alexandreesl.graph.GraphMessages.{Account, InputMessage}
import com.typesafe.scalalogging.Logger
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
import org.slf4j.LoggerFactory
import spray.json._
import com.alexandreesl.json.JsonParsing._
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerRecord

import scala.concurrent.Future

object Main extends App {

  implicit val system: ActorSystem = ActorSystem("akka-streams-lab")
  implicit val materializer: ActorMaterializer = ActorMaterializer(ActorMaterializerSettings(system))
  implicit val ec = system.dispatcher
  private val logger = Logger(LoggerFactory.getLogger("Main"))
  val configProducer = system.settings.config.getConfig("akka.kafka.producer")
  val producerSettings =
    ProducerSettings(configProducer, new StringSerializer, new StringSerializer)
      .withBootstrapServers("localhost:9092")
  val configConsumer = system.settings.config.getConfig("akka.kafka.consumer")
  val consumerSettings =
    ConsumerSettings(configConsumer, new StringDeserializer, new StringDeserializer)
      .withBootstrapServers("localhost:9092")
      .withGroupId("group1")
      .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

  logger.info("Starting streams...")

  private def convertToClass(csv: Array[String]): Account = {
    Account(csv(0).toLong,
      csv(1), csv(2),
      csv(3).toInt, csv(4),
      csv(5), csv(6),
      csv(7), csv(8),
      csv(9), csv(10),
      csv(11).toLong, csv(12))
  }

  private val flow = Flow[String].filter(s => !s.contains("COD"))
    .map(line => {
      convertToClass(line.split(","))
    })

  FileIO.fromPath(Paths.get("input1.csv"))
    .via(Framing.delimiter(ByteString("\n"), 4096)
      .map(_.utf8String))
    .via(flow)
    .map(value => new ProducerRecord[String, String]("accounts", value.toJson.compactPrint))
    .runWith(Producer.plainSink(producerSettings))

  Consumer
    .committableSource(consumerSettings, Subscriptions.topics("accounts"))
    .mapAsync(10)(msg =>
      Future.successful(InputMessage(msg.record.value.parseJson.convertTo[Account], msg.committableOffset))
    ).via(AccountWriterGraphStage.graph)
    .mapAsync(10) { tuple =>
      val acc = tuple._1
      logger.info(s"persisted Account: $acc")
      tuple._2.commitScaladsl()
    }.runWith(Sink.ignore)

  logger.info("Stream system is initialized!")

}

That’s it! since we started both streams in the main application, it is possible to test the stream simply by running the application, which will make the first stream to enqueue 2 messages, that it will be dequeued by the other stream. If we see the logs, we will see the following:

[INFO] [12/27/2018 23:59:52.395] [akka-streams-lab-akka.actor.default-dispatcher-3] [SingleSourceLogic(akka://akka-streams-lab)] Assigned partitions: Set(accounts-0). All partitions: Set(accounts-0)
23:59:52.436 [akka-streams-lab-akka.actor.default-dispatcher-2] INFO Main – persisted Account: Account(1,”alexandre eleuterio santos lourenco 1″,”43456754356″,36,”Single”,”+5511234433443″,”21/06/1982″,”Brazil”,”São Paulo”,”São Paulo”,”false st”,3134,”neigh 1″)
23:59:52.449 [akka-streams-lab-akka.actor.default-dispatcher-2] INFO Main – persisted Account: Account(2,”alexandre eleuterio santos lourenco 2″,”43456754376″,37,”Single”,”+5511234433444″,”21/06/1983″,”Brazil”,”São Paulo”,”São Paulo”,”false st”,3135,”neigh 1″)

And if we inspect the files, we will see that it wrote the data accordingly, as we can see below:

personal.csv:

{“age”:36,”birthday”:”\”21/06/1982\””,”civilStatus”:”\”Single\””,”cod”:1,”document”:”\”43456754356\””,”name”:”\”alexandre eleuterio santos lourenco 1\””,”phone”:”\”+5511234433443\””}
{“age”:37,”birthday”:”\”21/06/1983\””,”civilStatus”:”\”Single\””,”cod”:2,”document”:”\”43456754376\””,”name”:”\”alexandre eleuterio santos lourenco 2\””,”phone”:”\”+5511234433444\””}

address.csv:

{“city”:”\”São Paulo\””,”cod”:1,”country”:”\”Brazil\””,”neighBorhood”:”\”neigh 1\””,”state”:”\”São Paulo\””,”street”:”\”false st\””,”streetNum”:3134}
{“city”:”\”São Paulo\””,”cod”:2,”country”:”\”Brazil\””,”neighBorhood”:”\”neigh 1\””,”state”:”\”São Paulo\””,”street”:”\”false st\””,”streetNum”:3135}

Now, let’s refactor this code to be more maintainable and introduce error handling.

Implementing error handling

In order to easier error handling, we will move our streams to actors. Them we will create a supervisor, defining what to do when an error occurs.

Let’s begin by creating the first actor, called KafkaImporterActor  and move the stream:


package com.alexandreesl.actor

import java.nio.file.Paths

import akka.actor.{Actor, ActorLogging, ActorSystem, Props}
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{FileIO, Flow, Framing}
import akka.util.ByteString
import com.alexandreesl.actor.KafkaImporterActor.Start
import com.alexandreesl.graph.GraphMessages.Account
import com.alexandreesl.json.JsonParsing._
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.StringSerializer
import spray.json._

import scala.concurrent.ExecutionContextExecutor

class KafkaImporterActor extends Actor with ActorLogging {

implicit val actorSystem: ActorSystem = context.system
implicit val materializer: ActorMaterializer = ActorMaterializer()
implicit val dispatcher: ExecutionContextExecutor = context.system.dispatcher

private def convertToClass(csv: Array[String]): Account = {
Account(csv(0).toLong,
csv(1), csv(2),
csv(3).toInt, csv(4),
csv(5), csv(6),
csv(7), csv(8),
csv(9), csv(10),
csv(11).toLong, csv(12))
}

private val flow = Flow[String].filter(s => !s.contains("COD"))
.map(line => {
convertToClass(line.split(","))
})
private val configProducer = actorSystem.settings.config.getConfig("akka.kafka.producer")
private val producerSettings =
ProducerSettings(configProducer, new StringSerializer, new StringSerializer)
.withBootstrapServers("localhost:9092")

override def preStart(): Unit = {
self ! Start
}

override def receive: Receive = {
case Start =>
FileIO.fromPath(Paths.get("input1.csv"))
.via(Framing.delimiter(ByteString("\n"), 4096)
.map(_.utf8String))
.via(flow)
.map(value => new ProducerRecord[String, String]("accounts", value.toJson.compactPrint))
.runWith(Producer.plainSink(producerSettings))

}

}

object KafkaImporterActor {

val name = "Kafka-Importer-actor"

def props = Props(new KafkaImporterActor)

case object Start

}

On this actor, we just moved our configurations and created a receive method. On that method, we created a message case that it is fired at actor startup, making the actor starting up the stream as soon it is instantiated.

Now, let’s do the same to the other stream:


package com.alexandreesl.actor

import akka.actor.{Actor, ActorLogging, ActorSystem, Props}
import akka.kafka.{ConsumerSettings, Subscriptions}
import akka.kafka.scaladsl.Consumer
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import com.alexandreesl.actor.KafkaExporterActor.Start
import com.alexandreesl.graph.AccountWriterGraphStage
import com.alexandreesl.graph.GraphMessages.{Account, InputMessage}
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import spray.json._
import com.alexandreesl.json.JsonParsing._

import scala.concurrent.{ExecutionContextExecutor, Future}

class KafkaExporterActor extends Actor with ActorLogging {

implicit val actorSystem: ActorSystem = context.system
implicit val materializer: ActorMaterializer = ActorMaterializer()
implicit val dispatcher: ExecutionContextExecutor = context.system.dispatcher
private val configConsumer = actorSystem.settings.config.getConfig("akka.kafka.consumer")
private val consumerSettings =
ConsumerSettings(configConsumer, new StringDeserializer, new StringDeserializer)
.withBootstrapServers("localhost:9092")
.withGroupId("group1")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

override def preStart(): Unit = {
self ! Start
}

override def receive: Receive = {
case Start =>
Consumer
.committableSource(consumerSettings, Subscriptions.topics("accounts"))
.mapAsync(10)(msg =>
Future.successful(InputMessage(msg.record.value.parseJson.convertTo[Account], msg.committableOffset))
).via(AccountWriterGraphStage.graph)
.mapAsync(10) { tuple =>
val acc = tuple._1
log.info(s"persisted Account: $acc")
tuple._2.commitScaladsl()
}.runWith(Sink.ignore)

}

}

object KafkaExporterActor {

val name = "Kafka-Exporter-actor"

def props = Props(new KafkaExporterActor)

case object Start

}

Finally, as we moved the code to actors, we now can simplify the main object to be just this:


import akka.actor.ActorSystem
import com.alexandreesl.actor.{KafkaExporterActor, KafkaImporterActor}
import com.typesafe.scalalogging.Logger
import org.slf4j.LoggerFactory

object Main extends App {

implicit val system: ActorSystem = ActorSystem("akka-streams-lab")
private val logger = Logger(LoggerFactory.getLogger("Main"))

logger.info("Starting streams...")

system.actorOf(KafkaImporterActor.props, KafkaImporterActor.name)
system.actorOf(KafkaExporterActor.props, KafkaExporterActor.name)

logger.info("Stream system is initialized!")

}

If we run the application again, we will see that it runs just like before, proving our refactoring was a success.

Now that our code is better organized, let’s introduce a supervisor to guarantee error handling. We will define a supervisor strategy in our actors and backoff policies to make the actors restart gradually slower as errors repeat, for example, to wait for Kafka to recover from a shutdown.

To do this, we change our main object code like this:


import akka.actor.{ActorSystem, OneForOneStrategy, Props, SupervisorStrategy}
import akka.pattern.{Backoff, BackoffSupervisor}
import com.alexandreesl.actor.{KafkaExporterActor, KafkaImporterActor}
import com.typesafe.scalalogging.Logger
import org.slf4j.LoggerFactory

import scala.concurrent.duration._

object Main extends App {

implicit val system: ActorSystem = ActorSystem("akka-streams-lab")
private val logger = Logger(LoggerFactory.getLogger("Main"))

logger.info("Starting streams...")

private val supervisorStrategy = OneForOneStrategy() {
case ex: Exception =>
logger.info(s"exception: $ex")
SupervisorStrategy.Restart

}
private val importerProps: Props = BackoffSupervisor.props(
Backoff.onStop(
childProps = KafkaImporterActor.props,
childName = KafkaImporterActor.name,
minBackoff = 3.seconds,
maxBackoff = 30.seconds,
randomFactor = 0.2
).withSupervisorStrategy(supervisorStrategy)
)
private val exporterProps: Props = BackoffSupervisor.props(
Backoff.onStop(
childProps = KafkaExporterActor.props,
childName = KafkaExporterActor.name,
minBackoff = 3.seconds,
maxBackoff = 30.seconds,
randomFactor = 0.2
).withSupervisorStrategy(supervisorStrategy)
)

system.actorOf(importerProps, "Kafka-importer")
system.actorOf(exporterProps, "Kafka-exporter")

logger.info("Stream system is initialized!")

}

On our code, we now defined backoff policies that start restarting after 3 seconds, all up to 30 seconds, randomly scaling up the time between retries after each retry. As supervisor policy, we defined a OneForOne strategy, meaning that if one of the actors restart, only the faulty actor will be affected by the policy.

Finally, we define a simple policy where any errors that occur will be logged and the actor will be restarted. Since errors in the stream will also escalate to the encapsulating actor, this means that errors in the stream will also make the actor fail, causing a restart.

To make this escalation to work,  we need to change our actors to make the errors inside the streams to propagate. To do this, we change the actors as follows, adding code to check the status from the stream’s futures:

KafkaImporterActor


package com.alexandreesl.actor

import java.nio.file.Paths

import akka.actor.{Actor, ActorLogging, ActorSystem, PoisonPill, Props}
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{FileIO, Flow, Framing}
import akka.util.ByteString
import com.alexandreesl.actor.KafkaImporterActor.Start
import com.alexandreesl.graph.GraphMessages.Account
import com.alexandreesl.json.JsonParsing._
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.StringSerializer
import spray.json._

import scala.concurrent.ExecutionContextExecutor
import scala.util.{Failure, Success}

class KafkaImporterActor extends Actor with ActorLogging {

implicit val actorSystem: ActorSystem = context.system
implicit val materializer: ActorMaterializer = ActorMaterializer()
implicit val dispatcher: ExecutionContextExecutor = context.system.dispatcher

private def convertToClass(csv: Array[String]): Account = {
Account(csv(0).toLong,
csv(1), csv(2),
csv(3).toInt, csv(4),
csv(5), csv(6),
csv(7), csv(8),
csv(9), csv(10),
csv(11).toLong, csv(12))
}

private val flow = Flow[String].filter(s => !s.contains("COD"))
.map(line => {
convertToClass(line.split(","))
})
private val configProducer = actorSystem.settings.config.getConfig("akka.kafka.producer")
private val producerSettings =
ProducerSettings(configProducer, new StringSerializer, new StringSerializer)
.withBootstrapServers("localhost:9092")

override def preStart(): Unit = {
self ! Start
}

override def receive: Receive = {
case Start =>
val done = FileIO.fromPath(Paths.get("input1.csv"))
.via(Framing.delimiter(ByteString("\n"), 4096)
.map(_.utf8String))
.via(flow)
.map(value => new ProducerRecord[String, String]("accounts", value.toJson.compactPrint))
.runWith(Producer.plainSink(producerSettings))
done onComplete {
case Success(_) =>
log.info("I completed successfully, I am so happy :)")
case Failure(ex) =>
log.error(ex, "I received a error! Goodbye cruel world!")
self ! PoisonPill
}

}

}

object KafkaImporterActor {

val name = "Kafka-Importer-actor"

def props = Props(new KafkaImporterActor)

case object Start

}

KafkaExporterActor


package com.alexandreesl.actor

import akka.actor.{Actor, ActorLogging, ActorSystem, PoisonPill, Props}
import akka.kafka.{ConsumerSettings, Subscriptions}
import akka.kafka.scaladsl.Consumer
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import com.alexandreesl.actor.KafkaExporterActor.Start
import com.alexandreesl.graph.AccountWriterGraphStage
import com.alexandreesl.graph.GraphMessages.{Account, InputMessage}
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import spray.json._
import com.alexandreesl.json.JsonParsing._

import scala.concurrent.{ExecutionContextExecutor, Future}
import scala.util.{Failure, Success}

class KafkaExporterActor extends Actor with ActorLogging {

implicit val actorSystem: ActorSystem = context.system
implicit val materializer: ActorMaterializer = ActorMaterializer()
implicit val dispatcher: ExecutionContextExecutor = context.system.dispatcher
private val configConsumer = actorSystem.settings.config.getConfig("akka.kafka.consumer")
private val consumerSettings =
ConsumerSettings(configConsumer, new StringDeserializer, new StringDeserializer)
.withBootstrapServers("localhost:9092")
.withGroupId("group1")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

override def preStart(): Unit = {
self ! Start
}

override def receive: Receive = {
case Start =>
val done = Consumer
.committableSource(consumerSettings, Subscriptions.topics("accounts"))
.mapAsync(10)(msg =>
Future.successful(InputMessage(msg.record.value.parseJson.convertTo[Account], msg.committableOffset))
).via(AccountWriterGraphStage.graph)
.mapAsync(10) { tuple =>
val acc = tuple._1
log.info(s"persisted Account: $acc")
tuple._2.commitScaladsl()
}.runWith(Sink.ignore)
done onComplete {
case Success(_) =>
log.info("I completed successfully, I am so happy :)")
case Failure(ex) =>
log.error(ex, "I received a error! Goodbye cruel world!")
self ! PoisonPill
}

}

}

object KafkaExporterActor {

val name = "Kafka-Exporter-actor"

def props = Props(new KafkaExporterActor)

case object Start

}

Finally, let’s test it out. We begin by shutting down Kafka without stopping our application, by running:

docker-compose down

If we look at the logs, we will see the streams will start complaining about not connecting to Kafka. After some time, we will get an actor terminated error, caused by the poison pill we make it swallow:

[INFO] [12/28/2018 22:23:54.236] [akka-streams-lab-akka.actor.default-dispatcher-4] [akka://akka-streams-lab/system/kafka-consumer-1] Message [akka.kafka.KafkaConsumerActor$Stop$] without sender to Actor[akka://akka-streams-lab/system/kafka-consumer-1#1868012916] was not delivered. [2] dead letters encountered. If this is not an expected behavior, then [Actor[akka://akka-streams-lab/system/kafka-consumer-1#1868012916]] may have terminated unexpectedly, This logging can be turned off or adjusted with configuration settings ‘akka.log-dead-letters’ and ‘akka.log-dead-letters-during-shutdown’.
[ERROR] [12/28/2018 22:23:54.236] [akka-streams-lab-akka.actor.default-dispatcher-5] [akka://akka-streams-lab/user/Kafka-exporter/Kafka-Exporter-actor] I received a error! Goodbye cruel world!
akka.kafka.ConsumerFailed: Consumer actor terminated
at akka.kafka.internal.SingleSourceLogic.$anonfun$preStart$1(SingleSourceLogic.scala:66)
at akka.kafka.internal.SingleSourceLogic.$anonfun$preStart$1$adapted(SingleSourceLogic.scala:53)
at akka.stream.stage.GraphStageLogic$StageActor.internalReceive(GraphStage.scala:230)
at akka.stream.stage.GraphStageLogic$StageActor.$anonfun$callback$1(GraphStage.scala:198)
at akka.stream.stage.GraphStageLogic$StageActor.$anonfun$callback$1$adapted(GraphStage.scala:198)
at akka.stream.impl.fusing.GraphInterpreter.runAsyncInput(GraphInterpreter.scala:454)
at

If we just keep watching, we will see this cycle endless repeating, as streams are restarted, they fail to connect to Kafka and the poison pill is swallowed again.

To make the application come back again, let’s restart our Kafka cluster with:

docker-compose up -d

We will see that after Kafka returns, the streams will resume to normal.

And that concludes our error handling code. Of course, that it is not all we can do in this field. Another interesting error handling technique that can be used in some cases is recovering, where we can define another stream to be executed in case of a failure, as a circuit breaker. This can be seen in more detail here.

Finally, let’s test our Package plugin, by running the code in the terminal. Let’s open a terminal and run the following:

sbt stage

This will prepare our application, including a shell script to run the application. We can run it by typing:

./target/universal/stage/bin/akka-stream-lab

After entering, we will see that our application will run just like in Intellij:

Screen Shot 2019-01-01 at 12.12.15

Automated testing the stream

Finally, to wrap it up, we will see how to test our streams. Automated tests are important to code’s sturdiness, also allowing CI pipelines to be implemented efficiently.

Streams can be tested by using probes to run the streams and check the results. Let’s start by creating a test for the converter flow that generates accounts from csv lines – the rest of the code would just be testing third-party libraries so we will focus on our own code only  – and next, we will test our graph stage.

On our tests, we will use several traits to add support to several features we will/can use. It is good practice to join all traits inside a single one so our test classes won’t have a big single line of trait declarations.

So let’s begin by creating our trait:


package com.alexandreesl.test

import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, Matchers, WordSpecLike}

trait TestEnvironment extends WordSpecLike with Matchers 

with BeforeAndAfter with BeforeAndAfterAll

Before coding the test, we will move the flow on KafkaImporterActor to the object companion, this way allowing us to reference in the test:


package com.alexandreesl.actor

import java.nio.file.Paths

import akka.actor.{Actor, ActorLogging, ActorSystem, PoisonPill, Props}
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{FileIO, Flow, Framing}
import akka.util.ByteString
import com.alexandreesl.actor.KafkaImporterActor.Start
import com.alexandreesl.graph.GraphMessages.Account
import com.alexandreesl.json.JsonParsing._
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.StringSerializer
import spray.json._

import scala.concurrent.ExecutionContextExecutor
import scala.util.{Failure, Success}

class KafkaImporterActor extends Actor with ActorLogging {

implicit val actorSystem: ActorSystem = context.system
implicit val materializer: ActorMaterializer = ActorMaterializer()
implicit val dispatcher: ExecutionContextExecutor = context.system.dispatcher

private val configProducer = actorSystem.settings.config.getConfig("akka.kafka.producer")
private val producerSettings =
ProducerSettings(configProducer, new StringSerializer, new StringSerializer)
.withBootstrapServers("localhost:9092")

override def preStart(): Unit = {
self ! Start
}

override def receive: Receive = {
case Start =>
val done = FileIO.fromPath(Paths.get("input1.csv"))
.via(Framing.delimiter(ByteString("\n"), 4096)
.map(_.utf8String))
.via(KafkaImporterActor.flow)
.map(value => new ProducerRecord[String, String]("accounts", value.toJson.compactPrint))
.runWith(Producer.plainSink(producerSettings))
done onComplete {
case Success(_) =>
log.info("I completed successfully, I am so happy :)")
case Failure(ex) =>
log.error(ex, "I received a error! Goodbye cruel world!")
self ! PoisonPill
}

}

}

object KafkaImporterActor {

private def convertToClass(csv: Array[String]): Account = {
Account(csv(0).toLong,
csv(1), csv(2),
csv(3).toInt, csv(4),
csv(5), csv(6),
csv(7), csv(8),
csv(9), csv(10),
csv(11).toLong, csv(12))
}

val flow = Flow[String].filter(s => !s.contains("COD"))
.map(line => {
convertToClass(line.split(","))
})

val name = "Kafka-Importer-actor"

def props = Props(new KafkaImporterActor)

case object Start

}

Next, we will implement an different equals method on Account class so it will work properly in test assertations:


package com.alexandreesl.graph

import akka.kafka.ConsumerMessage

object GraphMessages {

case class Account(cod: Long, name: String, document: String, age: Int, civilStatus: String,
phone: String, birthday: String, country: String, state: String,
city: String, street: String, streetNum: Long, neighBorhood: String) {
override def equals(that: Any): Boolean =
that match {
case that: Account => that.canEqual(this) && that.cod == this.cod
case _ => false
}
}

case class InputMessage(acc: Account, offset: ConsumerMessage.CommittableOffset)

case class AccountPersonalData(cod: Long, name: String, document: String, age: Int, civilStatus: String,
phone: String, birthday: String)

case class AccountAddressData(cod: Long, country: String, state: String,
city: String, street: String, streetNum: Long, neighBorhood: String)

}

Finally, let’s code the test:


package com.alexandreesl.test.importer

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
import akka.testkit.{TestKit, TestProbe}
import com.alexandreesl.actor.KafkaImporterActor
import com.alexandreesl.graph.GraphMessages.Account
import com.alexandreesl.test.TestEnvironment

import scala.concurrent.duration._

class KafkaImporterActorSpec extends TestKit(ActorSystem("MyTestSystem")) with TestEnvironment {
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher

override def afterAll() = TestKit.shutdownActorSystem(system)

"A csv line in a file " when {
val csv = "1,\"alexandre eleuterio santos lourenco 1\",\"43456754356\",36,\"Single\",\"+5511234433443\",\"21/06/1982\",\"Brazil\",\"São Paulo\",\"São Paulo\",\"false st\",3134,\"neigh 1\""

" should convert to Account case class " in {

val probe = TestProbe()

Source.single(csv)
.via(KafkaImporterActor.flow)
.to(Sink.actorRef(probe.ref, "completed"))
.run()

probe.expectMsg(2.seconds, Account(1, "alexandre eleuterio santos lourenco 1",
"43456754356", 36, "Single", "+5511234433443", "21/06/1982",
"Brazil", "São Paulo", "São Paulo", "false st", 3134, "neigh 1"))
probe.expectMsg(2.seconds, "completed")

}

}

}

On this code, we create a probe that waits for a message containing the Account converted from the flow and a “completed” message, that the sink will emit at the end. The 2 seconds timeout is to control how much time the probe will wait for a message to come.

Now, let’s code our second test. Before writing the test itself, let’s make a little refactoring on KafkaExporterActor, by exposing a part of the stream to the spec. This way we will test all our custom code:


package com.alexandreesl.actor

import akka.actor.{Actor, ActorLogging, ActorSystem, PoisonPill, Props}
import akka.kafka.{ConsumerSettings, Subscriptions}
import akka.kafka.scaladsl.Consumer
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Flow, Sink}
import com.alexandreesl.actor.KafkaExporterActor.Start
import com.alexandreesl.graph.AccountWriterGraphStage
import com.alexandreesl.graph.GraphMessages.{Account, InputMessage}
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import spray.json._
import com.alexandreesl.json.JsonParsing._
import com.typesafe.scalalogging.Logger
import org.slf4j.LoggerFactory

import scala.concurrent.{ExecutionContextExecutor, Future}
import scala.util.{Failure, Success}

class KafkaExporterActor extends Actor with ActorLogging {

implicit val actorSystem: ActorSystem = context.system
implicit val materializer: ActorMaterializer = ActorMaterializer()
implicit val dispatcher: ExecutionContextExecutor = context.system.dispatcher
private val configConsumer = actorSystem.settings.config.getConfig("akka.kafka.consumer")
private val consumerSettings =
ConsumerSettings(configConsumer, new StringDeserializer, new StringDeserializer)
.withBootstrapServers("localhost:9092")
.withGroupId("group1")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

override def preStart(): Unit = {
self ! Start
}

override def receive: Receive = {
case Start =>
val done = Consumer
.committableSource(consumerSettings, Subscriptions.topics("accounts"))
.mapAsync(10)(msg =>
Future.successful(InputMessage(msg.record.value.parseJson.convertTo[Account], msg.committableOffset))
)
.via(KafkaExporterActor.flow)
.runWith(Sink.ignore)
done onComplete {
case Success(_) =>
log.info("I completed successfully, I am so happy :)")
case Failure(ex) =>
log.error(ex, "I received a error! Goodbye cruel world!")
self ! PoisonPill
}

}

}

object KafkaExporterActor {

private val logger = Logger(LoggerFactory.getLogger("KafkaExporterActor"))

def flow()(implicit actorSystem: ActorSystem,
materializer: ActorMaterializer) =
Flow[InputMessage].via(AccountWriterGraphStage.graph)
.mapAsync(10) { tuple =>
val acc = tuple._1
logger.info(s"persisted Account: $acc")
tuple._2.commitScaladsl()
}

val name = "Kafka-Exporter-actor"

def props = Props(new KafkaExporterActor)

case object Start

}

Finally, let’s code our test:


package com.alexandreesl.test.exporter

import akka.Done
import akka.actor.ActorSystem
import akka.kafka.ConsumerMessage.CommittableOffset
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
import akka.testkit.{TestKit, TestProbe}
import com.alexandreesl.actor.KafkaExporterActor
import com.alexandreesl.graph.GraphMessages.{Account, InputMessage}
import com.alexandreesl.test.TestEnvironment
import org.mockito.Mockito.{times, verify, when}
import org.scalatest.mockito.MockitoSugar.mock

import scala.concurrent.Future
import scala.concurrent.duration._

class KafkaExporterActorSpec extends TestKit(ActorSystem("MyTestSystem")) with TestEnvironment {
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher

override def afterAll() = TestKit.shutdownActorSystem(system)

"A Account object " when {

val offset = mock[CommittableOffset]
when(offset commitScaladsl) thenReturn Future.successful(Done)
val account = Account(1, "alexandre eleuterio santos lourenco 1",
"43456754356", 36, "Single", "+5511234433443", "21/06/1982",
"Brazil", "São Paulo", "São Paulo", "false st", 3134, "neigh 1")
val inputMessage = InputMessage(account, offset)

" should pass through the graph stage " in {

val probe = TestProbe()
Source.single(inputMessage)
.via(KafkaExporterActor.flow)
.to(Sink.actorRef(probe.ref, "completed"))
.run()

probe.expectMsg(2.seconds, Done)
probe.expectMsg(2.seconds, "completed")
verify(offset, times(1)) commitScaladsl

}

}

}

On this code, again we wait for the probe to receive messages. In this case, we first receive the Done object from Kafka commit – in this case, we create a mock object in order to allow us to run tests without Kafka – and next receive our good old “completed” message. Finally, we test if our mock was called, to assure that the flow is committing messages back to Kafka after processing.

Of course, this was just a tiny taste of what can be done with Akka’s Testkit. The probe we used just for sinks can also be used for sources as well and just like we test streams, it is also possible to test actors communicating with each other in an actor solution, for example.

In the references section, it is possible to get links to all documentation supplied to this and other subjects observed on this article.

Going beyond

Of course, this brief article can’t possibly talk about every detail of Akka and Akka Streams. From subjects not talked here we can spotlight a few:

  • Akka FSM (Finite State Machine): It allows us to implement state machines on actors solutions;
  • Akka HTTP: Allows us to call and expose HTTP endpoints;
  • Akka persistence: Allows us to implement a persistence layer on messages flowing through Akka, in order to implement better recoverability in case of failures;
  • Akka Event Bus: A built-in publisher-subscriber event bus inside Akka, Allows us to implement broadcasting of messages inside actor solutions.

In the references section, it is possible to find links for this and more!

Thanks

Special thanks to Iago David Santos (LinkedIn), which revised this article and pointed some things. Thanks, Iago!

Conclusion

And that concludes our article. With a great toolkit and sturdiness, Akka Streams is a great tool to be considered when coding integrations, APIs and more. Thank you for following me again on this article, until next time!

Continue reading

Java 9: Learning the new features – part 2

Standard

Hi, dear readers! Welcome to my blog. On this post, we will continue our tour on Java 9, now focusing on what changed on Streams and Optionals.

Creating collections

Before Java 9, when we wanted to populate a collection with some data, we commonly would do this:

Map<Long,String> tasks = new HashMap<>();

tasks.put(1l,"Put trash on the street");
tasks.put(2l,"Buy bread");
tasks.put(3l,"Walk with the dog");
tasks.put(4l,"make dinner");

Of course, we could also create the collection like this:

Map<Long, String> tasks = new HashMap<>() {{

    put(1l, "Put trash on the street");
    put(2l, "Buy bread");
    put(3l, "Walk with the dog");
    put(4l, "make dinner");

}};

Still, it is quite a verbose way to create a collection. Finally, on Java 9, we can create a collection this way, much more cleaner:

Map<Long, String> tasks = Map.of(
        1l, "Put trash on the street",
        2l, "Buy bread",
        3l, "Walk with the dog",
        4l, "make dinner"

);

Two points are worth note about the of method, however:

  • There’s no way to choose which implementation will be used;
  • If there is any null values on the data, the creation will fail with a NullPointerException;

New collectors

Another good addition was the new collectors. With them, we can now apply filters or mappers to streams inside of the collecting. Let’s see some examples.

Let’s use the same tasks map from before. Let’s suppose the want a list of task values, filtered by only taks that doesn’t have the word dog. With the new collectors, we can accomplish this by doing:

tasks.values().stream().collect(Collectors.filtering(w -> !w.contains("dog"), Collectors.toList())).forEach(System.out::println);

If we execute the code, we will see that it will print all the tasks except the one about walling with the dog, as we expected.

Now, let’s see another example. Let’s suppose we want to create a list with only the first word of each task. This can be done by using the following code:

tasks.values().stream().collect(Collectors.mapping(w -> w.split(" ")[0], Collectors.toList())).forEach(System.out::println);

If we run the code, we will see that it will print a list with just the first words from the tasks, as we expected.

Iterating with streams

Another interesting new feature included was the dropwhile and takewhile operations. By using them, we can iterate sequentially on collections, discarding or including items while a predicate is not satisfied. Let’s see some examples.

Let’s begin by creating a collection for our tests:

List<String> words = List.of("we", "are", "testing", 
        "new", "features", "of", "Java", "9");

Now, let’s try the dropwhile:

words.stream().dropWhile(e -> !e.equals("new")).forEach(System.out::println);

The result if we execute our stream will be as follows:

new
features
of
Java
9

As we can see, it is the correct result, since we ordered the stream to drop items from our iteration while none of them are equals to “new”.

If we try the takewhile operation, with the same predicate, we will see that the stream will store the items while a item equals to “new” is not found, exactly as expected. This is the code modified for the new example:

words.stream().takeWhile(e -> !e.equals("new")).forEach(System.out::println);

And this is the new result:

we
are
testing

New features on Optionals

Optionals also get their share of improvements. Let’s begin with our previous example from the mapping collector.

Let’s suppose our tasks map uses Optionals for values instead of literal strings:

Map<Long, Optional<String>> tasks = Map.of(
        1l, Optional.ofNullable("Put trash on the street"),
        2l, Optional.ofNullable("Buy bread"),
        3l, Optional.ofNullable("Walk with the dog"),
        4l, Optional.ofNullable("make dinner")

);

If we wanted to use the same map to implement the previous stream, we would have to “extract” all the values from the optionals previous to using them on the stream. That is, until Java 9.

Now, we can implement the previous stream on this new scenario by doing this:

tasks.values().stream().flatMap(Optional::stream).collect(Collectors.mapping(w -> w.split(" ")[0], Collectors.toList())).forEach(System.out::println);

If we run our code, we will see that it will print the list with just the first words from the tasks, just like before.

Another good addition was the ifpresentorelse method. Now, if we need to implement logic that depends if a Optional is empty or not, we can just do:

myOptional.ifPresentOrElse(present -> System.out.println(present), () -> {
    System.out.println("nothing to do");
});

And even more interesting, now Optionals supports the or method, that allows us to create multiple fallback returns! We can see the method in action on the example bellow:

myOptional.or(() -> Optional.ofNullable("this is my first callback"))
        .or(() -> Optional.ofNullable("this is my second callback"))
        .or(() -> Optional.ofNullable("this is my third callback"))
        .or(() -> Optional.ofNullable("this is my fourth callback"));

Conclusion

And so we concluded another post from our series on the new features of Java 9. Please, stay tuned on my series, where we will talk about other features, such as the long waited Jigsaw. Thank you for following me on another post, thank you.

Java 8: Knowing the new features – the new Date API

Standard

Hi, dear readers! Welcome to my blog. On this post, the last on the series, we talk about the new library for Date & Time manipulation, which was inspired by the Joda Time library.

So, without further delay, let’s begin our journey through this feature!

Manipulating Dates & Time on Java

It is a old complain on the Java community how the Java APIs for manipulating Dates has his issues, like limitations, difficult  to work with, etc. Thinking on this, the Java 8 comes with a new API that brings simplicity and strength to the way we work with datetimes on Java. Let’s start by learning how to create instances of the new classes.

To create a new Date instance (without time), representing the current date, all we have to do is:

LocalDate date = LocalDate.now();

To create a new Time instance, based at the time the instance was created, we do this:

LocalTime time = LocalTime.now();

And finally, to create a datetime, in other words, a date and time representation, we use this:

LocalDateTime dateTime = LocalDateTime.now();

The instance above have not timezone information, using only the local timezone. If it is needed to use a specific timezone, we created a class called ZonedDateTime. For example, if we wanted to create a instance from our timezone and them change to Sidney’s timezone, we could do like this:

ZonedDateTime zonedDateTime = ZonedDateTime.now();

System.out.println("Time at my timezone: " + zonedDateTime);

zonedDateTime = zonedDateTime.withZoneSameInstant(ZoneId
.of("Australia/Sydney"));

System.out.println("Time at Sidney: " + zonedDateTime);

The code above print the following at my location:

Time at my timezone: 2015-06-04T14:42:30.850-03:00[America/Sao_Paulo]
Time at Sidney: 2015-06-05T03:42:30.850+10:00[Australia/Sydney]

Another way of instantiating this classes is for a predefined date and/or time. We can do this like the following:

 date = LocalDate.of(2015, Month.DECEMBER, 25);

 dateTime = LocalDateTime.of(2015, Month.DECEMBER, 25, 10, 30);

With all those classes is really simple to add and/or remove days, months or years to a date, or the same to a time object. the code bellow illustrate this simplicity:

System.out.println("Date before adding days: " + date);

date = date.plusDays(10);

System.out.println("Date after adding days: " + date);

date = date.plusMonths(6);

System.out.println("Date after adding months: " + date);

date = date.plusYears(5);

System.out.println("Date after adding years: " + date);

date = date.minusDays(7);

System.out.println("Date after subtracting days: " + date);

date = date.minusMonths(6);

System.out.println("Date after subtracting months: " + date);

date = date.minusYears(10);

System.out.println("Date after subtracting years: " + date);

time = time.plusHours(12);

System.out.println("Time after adding hours: " + time);

time = time.plusMinutes(30);

System.out.println("Time after adding minutes: " + time);

time = time.plusSeconds(120);

System.out.println("Time after adding seconds: " + time);

time = time.minusHours(12);

System.out.println("Time after subtracting hours: " + time);

time = time.minusMinutes(30);

System.out.println("Time after subtracting minutes: " + time);

time = time.minusSeconds(120);

System.out.println("Time after subtracting seconds: " + time);

Running the above code, it prints:

Date before adding days: 2015-12-25
Date after adding days: 2016-01-04
Date after adding months: 2016-07-04
Date after adding years: 2021-07-04
Date after subtracting days: 2021-06-27
Date after subtracting months: 2020-12-27
Date after subtracting years: 2010-12-27
Time after adding hours: 09:28:24.380
Time after adding minutes: 09:58:24.380
Time after adding seconds: 10:00:24.380
Time after subtracting hours: 22:00:24.380
Time after subtracting minutes: 21:30:24.380
Time after subtracting seconds: 21:28:24.380

One important thing to notice is that in all methods we had to “catch” the return of the operations. The reason for this is that, opposite to the old classes we used like the Calendar one, the instances on the new date API are immutable, so they always return a new value. This is useful for scenarios with concurrent access for example, since the instances wont carry states.

Another simplicity is on the way we get the values from a date or time. On the old days, when we wanted to get a year or month from a Calendar, for example, we would need to use the generic get method, with a indication of the field we would want, like Calendar.YEAR. With the new API, we could use specific methods with ease, like the following:

System.out.println("For the date: " + date);

System.out.println("The year from the date is: " + date.getYear());

System.out.println("The month from the date is: " + date.getMonth());

System.out.println("The day from the date is: " + date.getDayOfMonth());

System.out.println("The era from the date is: " + date.getEra());

System.out.println("The day of the week is: " + date.getDayOfWeek());

System.out.println("The day of the year is: " + date.getDayOfYear());

After we run the code above, the following result will be produced:

For the date: 2010-12-27
The year from the date is: 2010
The month from the date is: DECEMBER
The day from the date is: 27
The era from the date is: CE
The day of the week is: MONDAY
The day of the year is: 361

Another simple thing to do is comparing dates with the API. If we code the following:

  // comparing dates
  LocalDate today = LocalDate.now();
  LocalDate tomorrow = today.plusDays(1);

   System.out.println("Is today before tomorrow? "
			+ today.isBefore(tomorrow));

   System.out.println("Is today after tomorrow? "
			+ today.isAfter(tomorrow));

   System.out.println("Is today equal tomorrow? "
			+ today.isEqual(tomorrow));

On the code above, as expected, only  the first print will print true.

One interesting feature of the new API is the locale support. On the code bellow, for example, we print the month of a date in different languages:

System.out.println("English: "+today.getMonth().getDisplayName(TextStyle.FULL, Locale.ENGLISH));
		
System.out.println("Portuguese: "+today.getMonth().getDisplayName(TextStyle.FULL, Locale.forLanguageTag("pt")));
		
System.out.println("German: "+today.getMonth().getDisplayName(TextStyle.FULL, Locale.GERMAN));
		
System.out.println("Italian: "+today.getMonth().getDisplayName(TextStyle.FULL, Locale.ITALIAN));
		
System.out.println("Japanese: "+today.getMonth().getDisplayName(TextStyle.FULL, Locale.JAPANESE));
		
System.out.println("Chinese: "+today.getMonth().getDisplayName(TextStyle.FULL, Locale.CHINESE));

Running the above code, on my current date, we will get the following result:

English: June
Portuguese: Junho
German: Juni
Italian: giugno
Japanese: 6月
Chinese: 六月

Formatting dates is also a easy task with the new API. If we wanted to format a date to a “dd/MM/yyyy” format, all we have to do is pass a DateTimeFormatter with the desired format:

System.out.println(today.format(DateTimeFormatter
			.ofPattern("dd/MM/yyyy")));

One very common requirement we encounter from time to time is the need to calculate the time between two dates. With the new API, we can calculate this very easily, with the ChronoUnit class:

 LocalDateTime oneDate = LocalDateTime.now();
 LocalDateTime anotherDate = LocalDateTime.of(1982, Month.JUNE, 21, 20,
 00);

 System.out.println("Days between the dates: "
 + ChronoUnit.DAYS.between(anotherDate, oneDate));

 System.out.println("Months between the dates: "
 + ChronoUnit.MONTHS.between(anotherDate, oneDate));

 System.out.println("Years between the dates: "
 + ChronoUnit.YEARS.between(anotherDate, oneDate));

System.out.println("Hours between the dates: "
 + ChronoUnit.HOURS.between(anotherDate, oneDate));

 System.out.println("Minutes between the dates: "
 + ChronoUnit.MINUTES.between(anotherDate, oneDate));

 System.out.println("Seconds between the dates: "
 + ChronoUnit.SECONDS.between(anotherDate, oneDate));

On my current day (08/06/2015), the above code produced:

Days between the dates: 12040
Months between the dates: 395
Years between the dates: 32
Hours between the dates: 288962
Minutes between the dates: 17337771
Seconds between the dates: 1040266275

One thing to note is that, if we use the same methods with the objects exchanged, we will receive negative numbers. If our logic needs the calculations to be always positive, we could use the classes Period and Duration to calculate the time between the dates, which have the methods isNegative() and negated() to produce this desired effect.

One final feature we will visit of the new API is the concept of invalid dates. When we were using a Calendar,  if we tried to input the date of February, 30, on a year the month goes to 28 days, the Calendar will adjust the date to March, 2, in other words, it will go past the date inputted, without throwing any errors. This is not always the desired effect, since sometimes this could lead to unpredictable behaviors. On the new API, if we try for example to do the following:

LocalDate invalidDate = LocalDate.of(2014, Month.FEBRUARY, 30);
		
System.out.println(invalidDate);

We will receive a invalid date exception, ensuring a easier way to treat this kind of bug:

Exception in thread "main" java.time.DateTimeException: Invalid date 'FEBRUARY 30'
	at java.time.LocalDate.create(LocalDate.java:431)
	at java.time.LocalDate.of(LocalDate.java:249)
	at com.alexandreesl.handson.DateAPIShowcase.main(DateAPIShowcase.java:174)

References

This series was inspired by a book from the publisher “Casa do Código”, which was used by me on my studies. Unfortunately the book is on Portuguese, but it is a good source for developers who want to quickly learn about the new features of Java 8:

Java 8 prático

Conclusion

And that concludes our series about the new features  of the Java 8. Of course, there is other subjects we didn’t talked about, like the end of the PermGen, that it was replaced by another memory technique called metaspace. If the reader wants to know more about this, this article is very interesting on the subject. However, with this series, the reader can have a good base to start developing on Java 8.

On a programming language like Java, it is normal to have changes from time to time. For a language with so many years, it is impressive how Java can still evolve, reflecting the new tendencies from the more modern languages. Will it Java continue like this forever? Only time will tell….

Thank you for following me on another post from my blog, until next time.

Continue reading

Java 8: Knowing the new features – Streams

Standard

Hi, dear readers! Welcome to my blog. On this post, the second on the series, we talk about streams, a new way to manipulate collections.

So, without further delay, let’s begin our journey through this feature!

Streams

Streams was introduced on Java 8 as a way to create a new form of manipulating Collections. Normally, when we use a Collection, we prepare a list of items, make several operations by this collection, like filtering, sums, etc and finally we use a final result, which could be evaluated as a single operation. That is exactly the goal of the streams API: allow us to program our Collection’s logic like a single operation, using the functional programming paradigm.

So, let’s get started with the preparations for the examples.

First, we create a Client class, which we will use as the POJO for our examples:

public class Client {

private String name;

private Long phone;

private String sex;

private List<Order> orders;

public List<Order> getOrders() {
return orders;
}

public void setOrders(List<Order> orders) {
this.orders = orders;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public Long getPhone() {
return phone;
}

public void setPhone(Long phone) {
this.phone = phone;
}

public String getSex() {
return sex;
}

public void setSex(String sex) {
this.sex = sex;
}

public void markClientSpecial() {

System.out.println("The client " + getName() + " is special! ");

}

}

Our Client class this time has a reference to another POJO, the Order class, which we will use to enrich our examples:

public class Order {

private Long id;

private String description;

private Double total;

public Long getId() {
return id;
}

public void setId(Long id) {
this.id = id;
}

public String getDescription() {
return description;
}

public void setDescription(String description) {
this.description = description;
}

public Double getTotal() {
return total;
}

public void setTotal(Double total) {
this.total = total;
}

}

Finally, for all the examples, we will use a single Collection’s data, so we create a Utility class to populate our data:

public class CollectionUtils {

public static List<Client> getData() {

List<Client> list = new ArrayList<>();

List<Order> orders;

Order order;

Client clientData = new Client();

clientData.setName("Alexandre Eleuterio Santos Lourenco");
clientData.setPhone(33455676l);
clientData.setSex("M");

list.add(clientData);

orders = new ArrayList<>();

order = new Order();

order.setDescription("description 1");
order.setId(1l);
order.setTotal(32.33);
orders.add(order);

order = new Order();

order.setDescription("description 2");
order.setId(2l);
order.setTotal(42.33);
orders.add(order);

order = new Order();

order.setDescription("description 3");
order.setId(3l);
order.setTotal(72.54);
orders.add(order);

clientData.setOrders(orders);

clientData = new Client();

clientData.setName("Lucebiane Santos Lourenco");
clientData.setPhone(456782387l);
clientData.setSex("F");

list.add(clientData);

orders = new ArrayList<>();

order = new Order();

order.setDescription("description 4");
order.setId(4l);
order.setTotal(52.33);
orders.add(order);

order = new Order();

order.setDescription("description 2");
order.setId(5l);
order.setTotal(102.33);
orders.add(order);

order = new Order();

order.setDescription("description 5");
order.setId(6l);
order.setTotal(12.54);
orders.add(order);

clientData.setOrders(orders);

clientData = new Client();

clientData.setName("Ana Carolina Fernandes do Sim");
clientData.setPhone(345622189l);
clientData.setSex("F");

list.add(clientData);

orders = new ArrayList<>();

order = new Order();

order.setDescription("description 6");
order.setId(7l);
order.setTotal(12.43);
orders.add(order);

order = new Order();

order.setDescription("description 7");
order.setId(8l);
order.setTotal(98.11);
orders.add(order);

order = new Order();

order.setDescription("description 8");
order.setId(9l);
order.setTotal(130.22);
orders.add(order);

clientData.setOrders(orders);

return list;

}

}

So, let’s begin with the examples!

To use the stream API, all we have to to is use the stream() mehod on the Collection’s APIs to get a stream already prepared for our use. The Stream interface use the default methods feature, so we don’t need to implement the interface methods. Another good point on this approach is that consequently all Collections already has support for the Streams feature, so if the reader has that favorite framework for collections (like the commons one from Apache), all you have to do is upgrading the JVM of your projects and the support is added!

The first thing to notice about streams is that they don’t change the Collection. That means that if we do something like this:

public class StreamsExample {

public static void main(String[] args) {

List<Client> clients = CollectionUtils.getData();

clients.stream().filter(
c -> c.getName().equals("Alexandre Eleuterio Santos Lourenco"));

clients.forEach(c -> System.out.println(c.getName()));

}

}

And run the code, we will see that the Collection will still print the 3 clients from our Collection’s test data, not just the one we filtered on our stream! This is a important concept to keep it in mind, since it means we don’t have to populate multiple collections with different data to execute different logic.

So, how we could print the result of our previous filter? All we have to do is link the methods, like this:

.

.

.

clients.stream()
.filter(c -> c.getName().equals(
"Alexandre Eleuterio Santos Lourenco"))
.forEach(c -> System.out.println(c.getName()));

if we run our code again, we will see that now the code only prints the elements we filtered. On this example, as said before, we didn’t received the list we filtered. If we needed to retrieve the Collection formed by the transformations we made on our Streams, we can use the collect method. This method receives 3 functional interfaces as the parameters, but fortunately Java 8 already comes with another interface, called Collectors, that supply common implementations for the interfaces we need to supply to the collect method. Using this features, we could retrieve the Collection coding like this:

.

.

.

List<Client> filteredList = clients
.stream()
.filter(c -> c.getName().equals(
"Alexandre Eleuterio Santos Lourenco"))
.collect(Collectors.toList());

filteredList.forEach(c -> System.out.println(c.getName()));

On our previous examples, we retrieved the whole Client objects on our filtering. But and if we wanted to retrieve a List with the names of the Clients that has orders with total > 90 and print on the console? We could do this:

.

.

.

System.out.println("USING THE MAP METHOD!");

clients.stream()
.filter(c -> c.getOrders().stream()
.anyMatch(o -> o.getTotal() > 90))
.map(Client::getName)
.forEach(System.out::println);

The code above could seen a little strange at first, but if we imagine the size of the code we would do to make the same with traditional Java code – iterating by multiple Collections, creating another collection with just the names and iterating again for the prints – we can see that the new features really help to make a more simple and cleaner code. We also see the use of the anyMatch method, which receives a predicate as parameter and returns true or false if any of the elements on the stream succeeds on the predicate.

Besides the all-purpose map method, there’s also another implementations specific for integers, longs and doubles. The reason for this is to prevent the called “boxing effect” where the primitive values would be wrapped and unwrapped on the operations, which will cause a performance overhead, and since we already informed the type of value we are working with, this implementations provide some interesting methods that return things like the average or the max value of our mapping. Let’s see a example. Imagine that we want to retrieve the max total from the orders on each client and print the name and the total on the console. We could do like this:

.

.

.
clients.stream().forEach(
c -> System.out.println("Name: "
+ c.getName()
+ " Highest Order Total: "
+ c.getOrders().stream().mapToDouble(Order::getTotal)
.max().getAsDouble()));

The reader may notice that the max method’s return is not the primitive itself, but a Object. This object is a OptionalDouble, that together with other classes like the java.util.Optional, it supplies a implementation that allow us to provide a default behavior for the cases in which the operation been used with the Optional – in our case, the max() method – has some null element among the values. For example, if we want in our previous operation that the max returns 0 in case any of the elements was null, we could modify the code as follows:

.

.

.

clients.stream().forEach(
c -> System.out.println("Name: "
+ c.getName()
+ " Highest Order Total: "
+ c.getOrders().stream().mapToDouble(Order::getTotal)
.max().orElse(0)));

One interesting behavior of the streams is their lazy behavior. That means that when we create a flow – also called a pipe – of streams operations, the operations will always execute only at the time they are really needed to produce the final result. We can see this behavior using one method called peek(). Let’s see a example that clearly shows this behavior:

.

.

.

clients.stream()

.filter(c -> c.getName().equals(
"Alexandre Eleuterio Santos Lourenco"))
.peek(System.out::println);

System.out.println("*********** SECOND PEEK TEST ******************");

clients.stream()
.filter(c -> c.getName().equals(
"Alexandre Eleuterio Santos Lourenco"))
.peek(System.out::println)
.forEach(c -> System.out.println(c.getName()));

If we run the example above, we can see that on the first stream the peek method doesn’t print anything. That’s because the filter operation it was not executed, since we didn’t do anything with the stream after the filtering. On the second stream, we used the foreach operation afterwards, so the peek method will print a toString() of all the objects inside the filtered stream.

On our previous examples, we see the max method, which returns the max value from a stream of numbers. That type of operation, that returns a single result from a stream, is called a reduce operation. We can make our own reduce operations, just providing a initial value and the operation itself, using the reduce method. For example, if we wanted to subtract the values from the stream:

.

.

.

clients.stream().forEach(
c -> System.out.println("Name: "
+ c.getName()
+ " TOTAL SUBTRACTED: "
+ c.getOrders().stream().mapToDouble(Order::getTotal)
.reduce(0, (a, b) -> a - b)));

This is a really useful feature to keep in mind when the default arithmetic operations don’t suffice.

Parallel Streams

At last, let’s talk about the last subject on our streams’s journey: parallel streams. When using parallel streams, we run all the operations we see previously with parallel processing mode, instead of just the main thread as usual. The jdk will choose the number of threads, how to break the segments of processing and how to join the parts to the final result. The reader may be asking “what do I have to pass to help the jdk on this settings?” the answer is: nothing! That’s right, all we have to do to use parallel streams is change the beginning of our commands, like the example bellow:

.

.

.
clients.parallelStream()
.filter(c -> c.getOrders().stream()
.anyMatch(o -> o.getTotal() > 90)).map(Client::getName)
.forEach(System.out::println);

As we can see, all we have to do is change from stream() to parallelStream(). One important thing to keep in mind is when to use parallel streams. Since there is a payload of preparing the thread pool and managing the segmentation and joining of the results, unless we have a really big volume of data to use or a really heavy operation to do with the data, we normally will use single thread streams.

Other features

Of course, there is more features we could talk on this post, like the sort method, that as the name implies, make sorting of the items on our streams. Another really powerful feature is on the Collectors’s methods, which has impressive transformation options such as grouping, partitioning, joining and so on. However, with this post we made a very good start with the usage of the feature, sowing the way for his adoption.

Conclusion 

And so we conclude another part of our series. As we can easily see, streams is a very powerful tool, which can help us a lot on keeping a really short code when processing our collections. That is one of the keys – or maybe the master key – of the Java 8 philosophy. For years, the Java scenario was plagued with “accusations” of not being a simple language, since it is so verbose, specially with the appearance of languages like Python or Ruby, for example. With this new features, maybe the burden of “being complex” for Java will finally begone. I thank the reader for following me on another post and invite you to please return to the last part of our series, when we will talk about the last of our pillars, the new Date API. Until next time.

Continue reading