Kafka: implementing stream architectures

Standard

Hi, dear readers! Welcome to my blog. On this post, we will learn about Apache Kafka, a distributed messaging system which is being used on lots of streaming solutions.

This article will be divided on several sections, allowing the reader to not only understand the concepts behind but also a lab to exercise the concepts in practice. So, without further delay, let’s begin!

Kafka architecture

Overview

Kafka is a distributed messaging system created by Linkedin. On Kafka, we have stream data structures called topics, which can be consumed by several clients, organized on consumer groups. This topics are stored on a Kafka cluster, where which node is called a broker.

Kafka’s ecosystem also need a Zookeeper cluster in order to run. Zookeeper is a key-value  storage solution, which on Kafka’s context is used to store metadata. Several operations such as topic creation are done on Zookeeper, instead of in the brokers.

The main difference from Kafka to other messaging solutions that utilizes classic topic structures is that on Kafka we have offsets. Offsets act like cursors, pointing to the last location a consumer and/or producer has reached consuming/producing messages for a given topic on a given partition.

Partitions on Kafka are like shards on some NOSQL databases: they divide the data, organizing by partition and/or message keys (more about this when we talk about ingesting data on Kafka).

So, on Kafka, we have producers ingesting data, controlled by producer offsets, while we have consumers consuming data from topics, also with their offsets. The main advantages on this approach are:

  • Data can be read and replayed by consumers, since there’s no link between consumed data and produced data. This also allows to implement solutions with back-pressure, that is, solutions where consumers can poll data according to their processing limits;
  • Data can be retained for more time, since on streams, different from classic topics, data is not removed from the structure after been sent to all consumers. It is also possible to compress the data on the stream, allowing Kafka clusters to retain lots of data on their streams;

Kafka’s offsets explained

The following  diagram illustrates a Kafka topic on the run:

kafka-diagram-1

Kafka Topic producer/consumer offsets

On the diagram, we can see a topic with 2 partitions. Each little rectangle represents a offset pointing to a location on the topic. On the producer side, we can see 2 offsets pointing to the topic’s head, showing our producer ingesting data on topic.

On Kafka, each partition is assigned to a broker and each broker is responsible for delivering production/consumption for that partition. The broker responsible for this is called a partition leader on Kafka.

How many partitions are needed for a topic? The main factor for this point is the desired throughput for production/consumption. Several factors are key for the throughput, such as the producer ack type, number of replicas etc.

Too much partitions are also something to take care when planning a new topic, as too much partitions can hinder availability and end-to-end latency, alongside memory consumption on the client side – remember that both producer and consumer can operate with several partitions at the same time. This article is a excellent reference for this matter.

On the consumer side, we see some interesting features. Each consumer has his own offset, consuming data from just one partition. This is a important concept on Kafka: each consumer is responsible for consuming one partition on Kafka and each consumer group consumes the data individually, that is, there is no relation between the consumption of one group and the others.

We can see this on the diagram, where the offsets from one group are on different positions from the others. All Kafka offsets are stored on a internal topic inside Kafka’s cluster, both producer and consumer offsets. Offsets are committed (updated) on the cluster using auto-commit or by committing manually on code, analogous as relational database commits. We will see more about this when coding our own consumer.

What happens when there’s more partitions than consumers? When this happens, Kafka’s delivers data from more then one partition to the same consumer, as we can see bellow. It is important to note that it is possible to increase the number of consumers on a group, avoiding this situation altogether:

kafka-diagram-2

The same consumer consuming from more then one partition on Kafka

One important thing to notice is what happens on the opposite situation, when there is less partitions than consumers configured:

kafka-diagram-3

Idle consumers on Kafka

As we can see, on this case, we end up with idle consumers, that won’t process any messages until a new partition is created. This is important to keep in mind when setting a new consumer on Kafka, as increasing too much the number of consumers will just end up with idle resources not been used at all.

One of the key features on Kafka is that it guarantees message ordering. This ordering is done on the messages within the same partition, but not on the whole topic. That means that when we consume data from Kafka with our consumers, the data across the partitions is read on parallel, but data from the same partition is read with a single thread, guaranteeing the order.

IMPORTANT: As stated on Kafka’s documentation, it is not recommended to process data coming from Kafka in parallel, as it will scramble the messages order. The recommended way to scale the solution is by adding more partitions, as it will add more threads to process data on parallel, without losing the ordering inside the partitions.

Partitions on Kafka are always written on a single mount point on disk. They are written on files, that are splitted when they reach a certain amount of data, or a certain time period – 1GB or 1 week of data respectively by default, whatever  it comes first – that are called log segments. The more recent log segment, that represents the data ingested up to the head of the stream is called active segment and it is never deleted. The older segments are removed from disk according to the retention policies configured.

Replicas

In order to guarantee data availability, Kafka works with replicas. When creating a topic, we define how much replicas we want  to have for each partition on the topic. If we configure we want 3 replicas, for example, that means that for a topic with 2 partitions, we will have 6 replicas from that topic, plus the 2 active partitions.

Kafka replicates the data just like we would do by hand: brokers that are responsible for maintaining the replicas – called partition followers – will subscribe for the topic and keep reading data from the partition leader and writing to their replicas. Followers that have data up-to-date with the leader are called In-Synch replicas. Replicas can become out of synch for example due to network issues, that causes the synching process to be slow and lag behind the leader to a unacceptable point.

Rebalance

When a rebalance occurs, for example if a broker is down, all the writing/reading from partitions that broker was a partition leader are ceased. The cluster elects a new partition leader, from one of the IS (In-Synch) replicas, and so the writing/reading is resumed.

During this period, applications that were using the old leader to publish data will receive a specific error when trying to use the partition, indicating that a rebalance is occurring (unless we configure the producer to just deliver the messages without any acknowledgment, which we will see in more detail on the next sections). On the consumer side, it is possible to implement a rebalance listener, which can clean up the work for when the partition is available again.

It is important to notice that, as a broker is down, it could be possible that some messages won’t be committed, causing messages to be processed twice when the partition processing is resumed.

What happens if a broker is down and no IS replicas are available? That depends on what we configured on the cluster. If unclean election is disabled, then all processing is suspended on that partition, until the broker that was down comes back again. If unclean election is enabled, then one of the brokers that were a follower is elected as leader.

Off course, each option has his advantages: without unclean election, we can lose the partition in case we can’t restart the lost broker, but with unclean election, we risk losing some messages, since their offsets will be overwritten by the new leader, when new data arrives at the partition.

If the old leader comes back again, it will resume the partition’s processing as a new follower, and it will not insert the lost messages in case of a unclean election.

Kafka’s producer explained

On this section, we will learn the internals that compose a Kafka producer, responsible for sending messages to Kafka topics. When working with the producer, we create ProducerRecords, that we send to Kafka by using the producer.

Producer architecture

Kafka producer internal structure is divided as we can see on the following diagram:

kafka-producer

Kafka Producer internal details

As we can see, there is a lot going on when producing messages to Kafka. First, as said before, we create a ProducerRecord, that consist of 3 sections:

  • Partition Key: The partition key is a optional field. If it is passed, it indicates the partition that it must be sent the message too;
  • Message Key: The message key is a required field. If no partition key is passed, the partitioner will use this field to determine on which partition it will send the message. Kafka guarantees that all messages for a same given message key will always be sent to the same partition – as long as the number of partitions on a topic stay the same;
  • Value (payload): The value field is a required field and, as obvious, is the message itself that must be sended;

All the fields from the ProducerRecord must be serialized to byte arrays before sent to Kafka, so that’s exactly what is done by the Serializer at the first step of our sending – we will see later on our lab that we always define a serializer for our keys and  value – , after that, the records are sent to the Partitioner, that determines the partition to send the message.

The Partitioner then send the message to bulk processes, running on different threads, that “stack” the messages until a threshold is reached – a certain number of bytes or a certain time without new messages, whatever  it comes first – and finally, after the threshold is reached, the messages are sent to the Kafka broker.

Let’s keep in mind  that, as we saw before, brokers are elected as partition leaders for partitions on topics, so when sending the messages, they are sent directly to the partition leader’s broker.

Acknowledgment types

Kafka’s producer works with 3 types of acks (acknowledgments) that a message has been successfully sent. The types are:

  • ack=0: The producer just send the message and don’t wait for a confirmation, even from the partition leader. Of course, this is fastest option to deliver messages, but there is also risk of message loss;
  • ack=1: The producer waits for the partition leader to reply that wrote the message before moving on. This option is more safe, however, there is also some degree of risk, since a partition leader can go down just after the acknowledgement without repassing the message to any replica;
  • ack=all: The producer waits for the partition leader and all IS replicas to write before moving on. This option is naturally the safest of all, but there is also the disadvantage of possible performance issues, due to waiting for all network replication to occur before continuing. This aggravates when there is no IS replicas at the moment, as it will hold the production until at least one replica is made;

Which one to use? That depends on the characteristics of the solution we are working with. A ack=0 could be useful on a solution that works with lots of messages that are not critic in case of losses – monitoring events, for example, are short-lived information that could be lost at certain degree – unlike, for example, bank account transactions, where ack=all is a must, since message losses are unacceptable on this kind of application.

Producer configurations

There are several configurations that could be made on the producer. Here we have some of the more basic and interesting ones to know:

  • bootstrap.servers: A list of Kafka brokers for the producer to communicate with. This list is updated automatically when brokers are added/removed from the cluster, but it is advised to set at least 2 brokers, as the producer won’t start if just one broker is set and the broker is down;
  • key.serializer: The serializer that it will be used when transforming the keys to byte arrays. Of course, the serializer class will depend on the keys type been used;
  • value.serializer: The serializer that it will be used to transform the message to a byte array. When using complex types such as Java objects, it is possible to use one of the several out-of-box serializers, or implement your own;
  • acks: This is where we define the acknowledgement type, as we saw previously;
  • batch.size: This is the amount of memory the bulk process will wait to stack it up until reached to send the message batches;
  • linger.ms: The amount of time, in milliseconds, the producer will wait for new messages, before sending the messages it has buffered. Of course, if the batch.size is reached first, then the message batch is sent before reaching this threshold;
  • max.in.flight.requests.per.connection: This parameters defines how many messages the producer will send before waiting for responses from Kafka (if ack is not set as 0, of course). As stated on Kafka’s documentation, this configuration must be set to 1 to guarantee the messages on Kafka will be written at the same order  they are sent by the producer;
  • client.id: This parameter can be set with any string value and identifies the producer on the Kafka cluster. It is used by the cluster to build metrics and logging;
  • compression.type: This parameter define a compression to be used on messages, before they are sent to Kafka. It supports snappy, gzip and lz4 formats. By default, no compression is used;
  • retries: This parameter defines how many times the producer will retry sending a message to a broker, before notifying the application that a error has occurred;
  • retry.backoff.ms: This parameter defines how many milliseconds the producer will wait between the retries. By default, the time is 100ms;

Kafka’s consumer explained

On this section, we will learn the internals that compose a Kafka consumer, responsible for reading messages from Kafka topics.

Consumer architecture

Kafka consumer internal structure is divided as we can see on the following diagram:

kafka-consumer-3

Kafka consumer internal details

When we request a Kafka broker to create a consumer group for one or more topics, the broker creates a Consumer Group Coordinator. Each broker has a group coordinator for the partitions it is the partition leader.

This component is responsible for deciding which consumer will be responsible for consuming which partitions, by the rules we talked about on the offsets section. It is also responsible for checking consumers health, by establishing heartbeat frequencies to be sent at intervals. If a consumer fails to send heartbeats, it is considered unhealthy, so Kafka delegates the partitions assigned to that consumer to another one.

The consumer, on his turn, uses a deserializer to convert the messages from byte arrays to the required types. Like with the producer, we can also use several different types of out-of-box deserializers, as well as creating our own.

IMPORTANT: Kafka consumer must always run on the main thread. If you try to create a consumer and delegate to run on another thread, there’s a check on the consumer that will thrown a error. This is due to Kafka consumer not been thread safe. The recommended way to scale a application that consumes from Kafka is by creating new application instances, each one running his own consumer on the same consumer group.

One important point to take note is that, when a message is delivered to Kafka, it only becomes available to consume after it is properly replicated to all IS replicas for his respective partition. This is important to ensure data availability, but it also means that messages can take a significant amount of time to be delivered for consuming.

Kafka works with the concept of back-pressure. This means that applications are responsible for asking for new chunks of messages to process, allowing clients to process data at their paces.

Commit strategies

kafka works with 3 commit strategies, to know:

  • Auto-commit: On this strategy, messages are marked as committed as soon as they are successfully consumed from the broker. The downside of this approach is that messages that were not processed correctly could be lost due to already been committed;
  • Synchronous manual commit: On this strategy, messages are manually committed synchronously. This is the safest option, but has the downside of hindering the performance, as commits become more slow;
  • Asynchronous manual commit: On this strategy, messages are manually committed asynchronously. This option has better performance then the previous one as commits are done on a separate thread, but there is also some level of risk that messages won’t been committed due to some problem, resulting on messages been processed more then once;

Like when we talked about acknowledgement types, the best commit strategy to be used depends on the characteristics of the solution been implemented.

Consumer configurations

There are several configurations that could be made on the consumer. Here we have some of the more basic and interesting ones to know:

  • fetch.min.bytes: This defines the minimum amount of bytes a consumer wants to receive from a bulk of messages. The consumer will wait for this minimum to be reached, or a time limit to process messages, as defined on other config;
  • max.partition.fetch.bytes: As opposite to the previous config, this defines the maximum size, in bytes, that we want to receive on the chunk of data we asked for Kafka. As previously, if the time limit is reached first, Kafka will sent the messages it have;
  • fetch.max.wait.ms: As we talked on previous configs, this is the property that we define the time limit, on milliseconds, for Kafka to wait for more messages to fetch, before sending what it have to the consumer application;
  • auto.offset.reset: This defines what the consumer will do when first reading from a partition it never readed before or it has a invalid commit offset, for example if a consumer was down for so long that his last committed offset has already been purged from the partition. The default is latest, which means it will start reading from the newest records. The other option is earliest, on that case, the consumer will read all messages from the partition, since the beginning;
  • session.timeout.ms: This property defines the time limit for which a consumer must sent a heartbeat to still be considered healthy. The default is 3 seconds.

IMPORTANT: heartbeats are sent at each polling and/or commits made by the consumer. This means that, on the poll loop, we must be careful with the processing time, as if it passes the session timeout period, Kafka will consider the consumer unhealthy and it will redeliver the messages to another consumer.

Hands-on

Well, that was a lot to cover. Now that we learned Kafka main concepts, let’s begin our hands-on Kafka and learn what we talked in practice!

Set up

Unfortunately, there is no official  Kafka Docker image. So, for our lab, we will use Zookeeper and Kafka images provided by wurstmeister (thanks, man!). At the end, we can see links for his images.

Also at the end of the article, we can find a repository with the sources for this lab. There is also a docker compose stack that could be found there to get a Kafka cluster up and running. This is the stack:

version: '2'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
ports:
- "9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: ${MY_IP}
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_DELETE_TOPIC_ENABLE: "true"

volumes:
- /var/run/docker.sock:/var/run/docker.sock

In order to run a cluster with 3 nodes, we can run the following commands:

export MY_IP=`ip route get 1 | awk '{print $NF;exit}'`
docker-compose up -d --scale kafka=3

To stop it, just run:

docker-compose stop

On our repo’s lab there is also a convenient bash script that set up a 3 node Kafka cluster without the need to enter the commands above every time.

Coding the producer

Now that we have our environment, let’s begin our lab. First, we need to create our topic. To create a topic, we need to use a shell inside one of the brokers, pointing Zookeeper address as a parameter – some operations, such as topic CRUD operations, are done pointing to Zookeeper instead of Kafka. There is plans to move all operations to be done on brokers directly on next releases – alongside other parameters. Assuming we have a terminal with MY_IP environment variable set, this can be done using the following command:

docker exec -t -i kafkalab_kafka_1 /opt/kafka/bin/kafka-topics.sh 
--create --zookeeper ${MY_IP}:2181 --replication-factor 1 
--partitions 2 --topic test

PS: All commands assume the name of the Kafka containers follows docker compose naming standards. If running on the lab repo, it will be created as kafkalab_kafka_1,kafkalab_kafka_2,etc

On the previous command, we created a topic named test with replication factor of 1 and 2 partitions. We can check if the topic was created by running the list topics command, as follows:

docker exec -t -i kafkalab_kafka_1 /opt/kafka/bin/kafka-topics.sh 
--list --zookeeper ${MY_IP}:2181

This will return a list of topics that exist on Zookeeper, on this case, “test”.

Now, let’s create a producer. All code on this lab will be done on Java, using Kafka’s APIs. After creating a Java project, we will code our own producer wrapper. Let’s begin by creating the wrapper itself:

package com.alexandreesl.producer;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class MyProducer {


  private KafkaProducer producer;


  public MyProducer() throws UnknownHostException {

    InetAddress ip = InetAddress.getLocalHost();

    StringBuilder builder = new StringBuilder();
    builder.append(ip.getHostAddress());
    builder.append(":");
    builder.append("");
    builder.append(",");
    builder.append(ip.getHostAddress());
    builder.append(":");
    builder.append("");

    Properties kafkaProps = new Properties();
    kafkaProps.put("bootstrap.servers", builder.toString());
    kafkaProps.put("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
    kafkaProps.put("value.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
    kafkaProps.put("acks", "all");
    producer = new KafkaProducer<String, String>(kafkaProps);

  }


  public void sendMessage(String topic, String key, String message) 
throws Exception {

    ProducerRecord<String, String> record = 
new ProducerRecord<>(topic,
        key, message);
    try {
      producer.send(record).get();
    } catch (Exception e) {
      throw e;
    }

  }
}

The code is very simple. We just defined the addresses from 2 brokers of our cluster – docker composer will automatically define ports for the brokers, so we need to change the ports accordingly to our environment first – ,  key and value serializers and set the acknowledgement type, on our case all, marking that we want all  replicas to be made before confirming the commit.

PS: Did you noticed the get() method been called after send()? This is because the send method is asynchronous by default. As we want to wait for Kafka to write the message before ending, we call get() to make the call synchronous.

The main class that uses our wrapper class is as follows:

package com.alexandreesl;

import com.alexandreesl.producer.MyProducer;

public class Main {

  public static void main(String[] args) throws Exception {

    MyProducer producer = new MyProducer();

    producer.sendMessage("test", "mysuperkey", "my value");


  }

}

As we can see, is a very simple class, just instantiate the class and use it.  If we run it, we will see the following output on terminal, with Kafka’s commit Id at the end, showing our producer is correctly implemented:

[main] INFO org.apache.kafka.clients.producer.ProducerConfig - 
ProducerConfig values: [main] 
INFO org.apache.kafka.clients.producer.ProducerConfig - 
ProducerConfig values:  
acks = all batch.size = 16384 
bootstrap.servers = [192.168.10.107:32813, 192.168.10.107:32814] 
buffer.memory = 33554432 client.id =  
compression.type = none 
connections.max.idle.ms = 540000 
enable.idempotence = false 
interceptor.classes = null 
key.serializer = class 
org.apache.kafka.common.serialization.StringSerializer 
linger.ms = 0 max.block.ms = 60000 
max.in.flight.requests.per.connection = 5 
max.request.size = 1048576 
metadata.max.age.ms = 300000 
metric.reporters = [] 
metrics.num.samples = 2 
metrics.recording.level = INFO 
metrics.sample.window.ms = 30000 
partitioner.class = class 
org.apache.kafka.clients.producer.internals.DefaultPartitioner 
receive.buffer.bytes = 32768 
reconnect.backoff.max.ms = 1000 
reconnect.backoff.ms = 50 
request.timeout.ms = 30000 
retries = 0 
retry.backoff.ms = 100 
sasl.jaas.config = null 
sasl.kerberos.kinit.cmd = /usr/bin/kinit 
sasl.kerberos.min.time.before.relogin = 60000 
sasl.kerberos.service.name = null 
sasl.kerberos.ticket.renew.jitter = 0.05 
sasl.kerberos.ticket.renew.window.factor = 0.8 
sasl.mechanism = GSSAPI 
security.protocol = PLAINTEXT 
send.buffer.bytes = 131072 
ssl.cipher.suites = null 
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] 
ssl.endpoint.identification.algorithm = null 
ssl.key.password = null ssl.keymanager.algorithm = SunX509 
ssl.keystore.location = null ssl.keystore.password = null 
ssl.keystore.type = JKS 
ssl.protocol = TLS ssl.provider = null 
ssl.secure.random.implementation = null 
ssl.trustmanager.algorithm = PKIX 
ssl.truststore.location = null 
ssl.truststore.password = null 
ssl.truststore.type = JKS 
transaction.timeout.ms = 60000 
transactional.id = null value.serializer = class 
org.apache.kafka.common.serialization.StringSerializer

[main] INFO org.apache.kafka.common.utils.AppInfoParser - 
Kafka version : 0.11.0.2
[main] INFO org.apache.kafka.common.utils.AppInfoParser - 
Kafka commitId : 73be1e1168f91ee2

Process finished with exit code 0

Now that we have our producer implemented, let’s move on to the consumer.

Coding the consumer

Now, let’s code our consumer. First, we create a consumer wrapper, like the following:

package com.alexandreesl.consumer;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Collections;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class MyConsumer {

  private KafkaConsumer<String, String> consumer;

  public MyConsumer() throws UnknownHostException {

    InetAddress ip = InetAddress.getLocalHost();

    StringBuilder builder = new StringBuilder();
    builder.append(ip.getHostAddress());
    builder.append(":");
    builder.append("");
    builder.append(",");
    builder.append(ip.getHostAddress());
    builder.append(":");
    builder.append("");

    Properties kafkaProps = new Properties();
    kafkaProps.put("bootstrap.servers", builder.toString());
    kafkaProps.put("group.id", "MyConsumerGroup");
    kafkaProps.put("key.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
    kafkaProps
        .put("value.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
    consumer = new KafkaConsumer<String, String>(kafkaProps);

  }

  public void consume(String topic) {

    consumer.subscribe(Collections.singletonList(topic));

    try {
      while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records) {
          System.out.println("Key: " + record.key());
          System.out.println("Value: " + record.value());
        }
      }
    } finally {
      consumer.close();
    }


  }

}

On wrapper, we subscribed to our test topic, configuring a ConsumerGroup ID and deserializers for our messages. When we call the subscribe method,  ConsumerGroupCoordinators are updated on the brokers, making the cluster allocate partitions for us on topics we asked for consumption, as long as there is no more consumers than partitions, like we talked about previously.

Then, we create the consume method, which has a infinite loop to keep consuming messages from topic. On our case, we just keep calling the poll method, which returns a List of messages – on default settings, up to 100 messages -, print keys and values of messages and keep polling. At the end, we close the connection.

On our example, we can notice we didn’t explicit commit the messages at any point. This is because we are using default settings, so it is doing auto-commit. As we talked previously, using auto-commit can be a option on some solutions, depending on the situation.

Now, let’s change our main class to allow us to produce and consume using the same program and also allowing to input messages to produce. We do this by adding some input parameters, as follows:

package com.alexandreesl;

import com.alexandreesl.consumer.MyConsumer;
import com.alexandreesl.producer.MyProducer;
import java.util.Scanner;

public class Main {

  public static void main(String[] args) throws Exception {

    Scanner scanner = new Scanner(System.in);

    System.out.println("Please select operation" + " 
(1 for producer, 2 for consumer) :");

    String operation = scanner.next();

    System.out.println("Please enter topic name :");

    String topic = scanner.next();

    if (operation.equals("1")) {

      MyProducer producer = new MyProducer();

      System.out.println("Please enter key :");

      String key = scanner.next();

      System.out.println("Please enter value :");

      String value = scanner.next();

      producer.sendMessage(topic, key, value);
    } else if (operation.equals("2")) {

      MyConsumer consumer = new MyConsumer();

      consumer.consume(topic);


    }


  }

}

If we run our code, we will see some interesting output on console, such as the consumer joining the ConsumerGroupCoordinator and been assigned to partitions. At the end it will print the messages we send as the producer, proving our coding was successful.

[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator -
 Discovered coordinator 192.168.10.107:32814 (id: 2147482646 rack: null) 
for group MyConsumerGroup.
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - 
Revoking previously assigned partitions [] for group MyConsumerGroup
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - 
(Re-)joining group MyConsumerGroup 
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - 
Successfully joined group MyConsumerGroup with generation 2 
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - 
Setting newly assigned partitions [test-1, test-0] for group MyConsumerGroup 
Key: mysuperkey 
Value: my value

Manual committing

Now that we know the basis to producing/consuming Kafka streams, let’s dive in on more details about Kafka’s consumer. We saw previously that our example used default auto-commit to commit offsets after reading. We do this by changing the code as follows:

package com.alexandreesl.consumer;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Collections;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class MyConsumer {

  private KafkaConsumer<String, String> consumer;

  public MyConsumer() throws UnknownHostException {

    InetAddress ip = InetAddress.getLocalHost();

    StringBuilder builder = new StringBuilder();
    builder.append(ip.getHostAddress());
    builder.append(":");
    builder.append("");
    builder.append(",");
    builder.append(ip.getHostAddress());
    builder.append(":");
    builder.append("");

    Properties kafkaProps = new Properties();
    kafkaProps.put("bootstrap.servers", builder.toString());
    kafkaProps.put("group.id", "MyConsumerGroup");
    kafkaProps.put("key.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
    kafkaProps
        .put("value.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
    kafkaProps.put("enable.auto.commit", "false");
    consumer = new KafkaConsumer<String, String>(kafkaProps);

  }

  public void consume(String topic) {

    consumer.subscribe(Collections.singletonList(topic));

    try {
      while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records) {
          System.out.println("Key: " + record.key());
          System.out.println("Value: " + record.value());


        }

        consumer.commitSync();

      }
    } finally {
      consumer.close();
    }


  }

}

If we run our code, we will see that it will continue to consume messages, as expected:

Please select operation (1 for producer, 2 for consumer) :2
Please enter topic name :test
[main] INFO org.apache.kafka.clients.consumer.ConsumerConfig - 
ConsumerConfig values:  auto.commit.interval.ms = 5000 
auto.offset.reset = latest 
bootstrap.servers = [192.168.10.107:32771, 192.168.10.107:32772] 
check.crcs = true client.id =  
connections.max.idle.ms = 540000 
enable.auto.commit = true 
exclude.internal.topics = true 
fetch.max.bytes = 52428800 
fetch.max.wait.ms = 500 
fetch.min.bytes = 1 
group.id = MyConsumerGroup 
heartbeat.interval.ms = 3000 
interceptor.classes = null 
internal.leave.group.on.close = true 
isolation.level = read_uncommitted 
key.deserializer = class 
org.apache.kafka.common.serialization.StringDeserializer 
max.partition.fetch.bytes = 1048576 
max.poll.interval.ms = 300000 
max.poll.records = 500 
metadata.max.age.ms = 300000 
metric.reporters = [] 
metrics.num.samples = 2 
metrics.recording.level = INFO 
metrics.sample.window.ms = 30000 
partition.assignment.strategy = 
[class org.apache.kafka.clients.consumer.RangeAssignor] 
receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 
reconnect.backoff.ms = 50 request.timeout.ms = 305000 
retry.backoff.ms = 100 sasl.jaas.config = null

sasl.kerberos.kinit.cmd = /usr/bin/kinit 
sasl.kerberos.min.time.before.relogin = 60000 
sasl.kerberos.service.name = null 
sasl.kerberos.ticket.renew.jitter = 0.05 
sasl.kerberos.ticket.renew.window.factor = 0.8 
sasl.mechanism = GSSAPI security.protocol = PLAINTEXT 
send.buffer.bytes = 131072 session.timeout.ms = 10000 
ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] 
ssl.endpoint.identification.algorithm = null ssl.key.password = null 
ssl.keymanager.algorithm = SunX509 
ssl.keystore.location = null 
ssl.keystore.password = null 
ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null 
ssl.secure.random.implementation = null 
ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null 
ssl.truststore.password = null ssl.truststore.type = JKS 
value.deserializer = class 
org.apache.kafka.common.serialization.StringDeserializer
[main] INFO org.apache.kafka.common.utils.AppInfoParser - 
Kafka version : 0.11.0.2[main] INFO org.apache.kafka.common.utils.AppInfoParser - 
Kafka commitId : 73be1e1168f91ee2
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator -
 Discovered coordinator 192.168.10.107:32773 
(id: 2147482645 rack: null) for group MyConsumerGroup.
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator -
 Revoking previously assigned partitions [] for group MyConsumerGroup
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator -
 (Re-)joining group MyConsumerGroup
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator -
 Successfully joined group MyConsumerGroup with generation 6
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator -
 Setting newly assigned partitions [test-1, test-0] 
for group MyConsumerGroup
Key: key
Value: value
Key: my
Value: key

On our example, we used synch committing, that is, the main thread is blocked waiting for the commit before start reading the next batch of messages. We can change this just by changing the commit method, as follows:

public void consume(String topic) {

  consumer.subscribe(Collections.singletonList(topic));

  try {
    while (true) {
      ConsumerRecords<String, String> records = consumer.poll(100);
      for (ConsumerRecord<String, String> record : records) {
        System.out.println("Key: " + record.key());
        System.out.println("Value: " + record.value());


      }

      consumer.commitAsync();

    }
  } finally {
    consumer.close();
  }


}

One last thing to check before we move on is committing specific offsets. On our previous examples, we committed all messages at once. If we wanted to do, for example, a asynch commit as messages are processed, we can do the following:

public void consume(String topic) {

  consumer.subscribe(Collections.singletonList(topic));

  try {
    while (true) {
      ConsumerRecords<String, String> records = consumer.poll(100);
      for (ConsumerRecord<String, String> record : records) {
        System.out.println("Key: " + record.key());
        System.out.println("Value: " + record.value());

        HashMap<TopicPartition, OffsetAndMetadata> offsets = 
new HashMap<>();

        offsets.put(new TopicPartition(record.topic(), record.partition()),
            new OffsetAndMetadata(record.offset() + 1, "no metadata"));

        consumer.commitAsync(offsets, null);


      }


    }
  } finally {
    consumer.close();
  }


}

Assigning to specific partitions

On our examples, we delegate to Kafka which partitions the consumers will consume. If we want to specify the partitions a consumer will be assigned to, we can use the assign method.

It is important to notice that this approach is not very recommended, as consumers won’t be replaced automatically by others when going down, neither new partitions will be added for consuming before been explicit assigned to a consumer.

On the example bellow, we do this, by marking that we want just to consume messages from one partition:

public void consume(String topic) {

  List partitions = new ArrayList<>();

  List partitionInfos = null;
  partitionInfos = consumer.partitionsFor(topic);
  if (partitionInfos != null) {
    partitions.add(
        new TopicPartition(partitionInfos.get(0).topic(), 
partitionInfos.get(0).partition()));

  }
  consumer.assign(partitions);

  try {
    while (true) {
      ConsumerRecords<String, String> records = consumer.poll(100);

      for (ConsumerRecord<String, String> record : records) {
        System.out.println("Key: " + record.key());
        System.out.println("Value: " + record.value());

        HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();

        offsets.put(new TopicPartition(record.topic(), record.partition()),
            new OffsetAndMetadata(record.offset() + 1, "no metadata"));

        consumer.commitAsync(offsets, null);


      }


    }
  } finally {
    consumer.close();
  }


}

Consumer rebalance

When consuming from a topic, we can scale consumption by adding more instances of our application, by parallelizing the processing. Let’s see this on practice.

First, let’s start a consumer. After initializing, we can see it joined both partitions from our topic:

[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator -
 Discovered coordinator 192.168.10.107:32772 (id: 2147482645 rack: null) 
for group MyConsumerGroup. 
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator -
 Revoking previously assigned partitions [] 
for group MyConsumerGroup [main] INFO 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator - 
(Re-)joining group MyConsumerGroup 
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator -
 Successfully joined group MyConsumerGroup with generation 18 
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator -
 Setting newly assigned partitions [test-1, test-0] for group 
MyConsumerGroup

Now, let’s start another consumer. We will see that, as soon it joins the ConsumerGroupCoordinator, it will be assigned to one of the partitions:

[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator -
 Discovered coordinator 192.168.10.107:32772 
(id: 2147482645 rack: null) 
for group MyConsumerGroup. 
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator -
 Revoking previously assigned partitions [] for group MyConsumerGroup 
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator -
 (Re-)joining group MyConsumerGroup 
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator -
 Successfully joined group MyConsumerGroup with generation 19 
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator -
 Setting newly assigned partitions [test-0] for group MyConsumerGroup

And if we see our old consumer, we will see that will be now reading from the other partition only:

[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator -
 (Re-)joining group MyConsumerGroup 
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator -
 Successfully joined group MyConsumerGroup with generation 19 
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator -
 Setting newly assigned partitions [test-1] for group MyConsumerGroup

This show us the power of Kafka ConsumerGroup Coordinator, that takes care of everything for us.

But, it is important to notice that, on real scenarios, we can implement listeners that are invoked when partitions are revoked to other consumers due to rebalance and before a partition starts consumption on his new consumer. This can be done by implementing the ConsumerRebalanceListener interface, as follows:

package com.alexandreesl.listener;

import java.util.Collection;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.common.TopicPartition;

public class MyConsumerRebalanceInterface implements 
ConsumerRebalanceListener {

  @Override
  public void onPartitionsRevoked(Collection partitions) {
    System.out.println("I am losing the following partitions:");
    for (TopicPartition partition : partitions) {
      System.out.println(partition.partition());
    }
  }

  @Override
  public void onPartitionsAssigned(Collection partitions) {
    System.out.println("I am starting on the following partitions:");
    for (TopicPartition partition : partitions) {
      System.out.println(partition.partition());
    }
  }
}

Of course, this is just a mock implementation. On a real implementation, we would be doing tasks such as committing offsets – if we buffered our commits on blocks before committing instead of committing one by one, that would turn out to be a necessity -, closing connections, etc.

We add our new listener by passing him as parameter to the subscribe() method, as follows:

public void consume(String topic) {

  consumer.subscribe(Collections.singletonList(topic), 
new MyConsumerRebalanceInterface());

  try {
    while (true) {
      ConsumerRecords<String, String> records = consumer.poll(100);

      for (ConsumerRecord<String, String> record : records) {
        System.out.println("Key: " + record.key());
        System.out.println("Value: " + record.value());

        HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();

        offsets.put(new TopicPartition(record.topic(), record.partition()),
            new OffsetAndMetadata(record.offset() + 1, "no metadata"));

        consumer.commitAsync(offsets, null);


      }


    }
  } finally {
    consumer.close();
  }


}

Now, let’s terminate all our previously started consumers and start them again. When starting the first consumer, we will see the following outputs on terminal:

[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator -
 Discovered coordinator 192.168.10.107:32772 (id: 2147482645 rack: null) 
for group MyConsumerGroup. 
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator -
 Revoking previously assigned partitions [] for group 
MyConsumerGroup I am losing the following partitions: 
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator -
 (Re-)joining group MyConsumerGroup 
I am starting on the following partitions:
 1 0 
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - 
Successfully joined group MyConsumerGroup with generation 21 
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator -
 Setting newly assigned partitions [test-1, test-0] 
for group MyConsumerGroup

That shows our listener was invoked. Let’s now start the second consumer and see what happens:

I am losing the following partitions: 
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator -
 Discovered coordinator 192.168.10.107:32772 
(id: 2147482645 rack: null) for group MyConsumerGroup. 
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator -
 Revoking previously assigned partitions [] for group 
MyConsumerGroup 
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator -
 (Re-)joining group MyConsumerGroup 
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator -
 Successfully joined group MyConsumerGroup with generation 22 
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator -
 Setting newly assigned partitions [test-0] 
for group MyConsumerGroup 
I am starting on the following partitions: 0

And finally, if we see the first consumer, we will see that both revoked and reassigned partitions were printed on console, showing our listener was implemented correctly:

[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator -
 (Re-)joining group MyConsumerGroup 
I am losing the following partitions: 1 0 
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator -
 Successfully joined group MyConsumerGroup with generation 22 
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator -
 Setting newly assigned partitions [test-1] 
for group MyConsumerGroup 
I am starting on the following partitions: 1

PS: Kafka works rebalancing by revoking all partitions and redistributing them. That’s why we see the first consumer losing all partitions before been reassigned to one of the old ones.

Log compaction

Log compaction is a powerful cleanup feature of Kafka. With log compaction, we define a  point from which messages from a same key on a same partition are compacted so only the more recent message is retained.

This is done by setting configurations that establish a compaction entry point and a retention entry point. This entry points consists of time periods, from which Kafka allow messages to keep coming from the producers, but at the same time removing old messages that doesn’t matter anymore. The following diagram explain the system on practice:

kafka-compaction

Kafka log compaction explained

In order to configure log compaction, we need to introduce some configurations both on cluster and topic. For the cluster, we change our docker compose YAML as follows:

version: '2'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
  kafka:
    image: wurstmeister/kafka
    ports:
      - "9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: ${MY_IP}
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_DELETE_TOPIC_ENABLE: "true"
      KAFKA_LOG_CLEANER_ENABLED: "true"

    volumes:
      - /var/run/docker.sock:/var/run/docker.sock

This change is needed due to log cleaning not been enabled by default on Kafka. Then, we change our topic configuration with the following new entries:

docker exec -t -i kafkalab_kafka_1 /opt/kafka/bin/kafka-configs.sh 
--zookeeper ${MY_IP}:2181 --entity-type topics --entity-name test --alter 
--add-config min.compaction.lag.ms=1800000,delete.retention.ms=172800000,
cleanup.policy=compact

On the command above, we set the compaction entry point – min.compaction.lag.ms – to 30 minutes, so all messages from the head of the stream to 30 minutes after will be on the dirty section. The other config stablished a retention period of 48 hours, so from 30 minutes up to 48 hours, all messages will be on the clean section, where compaction will occur. Messages older than 48 hours will be removed from the stream.

Lastly, we configured the cleanup policy, making compaction enabled. We can check our configs were successfully set by using the following command:

docker exec -t -i kafkalab_kafka_1 /opt/kafka/bin/kafka-configs.sh 
--zookeeper ${MY_IP}:2181 --entity-type topics --entity-name test --describe

Which will produce the following output:

Configs for topic 'test' are 
min.compaction.lag.ms=1800000,delete.retention.ms=172800000,
cleanup.policy=compact

One last thing we need to know before moving on to our next topic is that compaction also allows messages to be removed. If we want a message to be completely removed from our stream, all we need to do is send a message with his key, but with null as value. When sent this way with compaction enabled, it will remove all messages from the stream. This kind of messages are called tombstones on Kafka.

Kafka connect

Kafka connect is a integration framework, like others such as Apache Camel, that ships with Kafka – but runs on a cluster of his own – and allows us to quickly develop integrations from/to Kafka to other systems. It is maintained by Confluence.

This framework deserves a article of his own, so it won’t be covered here. If the reader wants to know more about it, please go to the following link:

https://docs.confluent.io/current/connect/intro.html

Kafka Streams

Kafka Streams is a framework shipped with Kafka that allows us to implement stream applications using Kafka. By stream applications, that means applications that have streams as input and output as well, consisting typically of operations such as aggregation, reduction, etc.

A typical example of a stream application is reading data from 2 different streams and producing a aggregated result from the two on a third stream.

This framework deserves a article of his own, so it won’t be covered here. If the reader wants to know more about it, please go to the following link:

https://kafka.apache.org/documentation/streams/

Kafka MirrorMaker

Kafka MirrorMaker is a tool that allows us to mirror Kafka clusters, by making copies from a source cluster to a target cluster, as messages goes in. As with Kafka connect and Streams, is a tool that deserves his own article, so it won’t be covered here. More information about it could be found on the following link:

https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=27846330

Kafka administration

Now that we covered most of the developing code to use Kafka, let’s see how to administrate a Kafka cluster. All commands for Kafka administration are done by their shell scripts, like we did previously on our study.

Kafka CRUD topic operations

Let’s begin with basic topic operations. Like we saw before, topics can be created with the following command:

docker exec -t -i kafkalab_kafka_1 /opt/kafka/bin/kafka-topics.sh 
--create --zookeeper ${MY_IP}:2181 --replication-factor 1 --partitions 2 
--topic test

Changing topics – not configurations, like we saw on log compaction, but the topic itself, such as the number of partitions  – are done with the same shell, just changing some options:

docker exec -t -i kafkalab_kafka_1 /opt/kafka/bin/kafka-topics.sh 
--alter --zookeeper ${MY_IP}:2181 --partitions 4 --topic test

IMPORTANT: changing partition numbers on topics also can change partition logic, meaning messages that always were sent to a same partition A can be now always sent to a partition B. This is important to watch out as can lead to message ordering issues if not taken with care.

We can search for topics by using the list command, like we did before:

docker exec -t -i kafkalab_kafka_1 /opt/kafka/bin/kafka-topics.sh 
--list --zookeeper ${MY_IP}:2181

If we want to delete a topic, we issue the following command. Take note that, if the configuration delete.topic.enabled is not set, the topic will just be marked for deletion, not removed:

docker exec -t -i kafkalab_kafka_1 /opt/kafka/bin/kafka-topics.sh 
--delete --zookeeper ${MY_IP}:2181 --topic test

Other Kafka admin operations

Let’s now see other Kafka admin operations. First, let’s create a new topic to test it out:

docker exec -t -i kafkalab_kafka_1 /opt/kafka/bin/kafka-topics.sh 
--create --zookeeper ${MY_IP}:2181 --replication-factor 1 
--partitions 3 --topic mytopic

The first operation we will see is preferred replica election. When Kafka creates a topic, at first, the partition leaders are spread out as evenly as possible, reducing impact risks on nodes going down. However, after some time, this distribution could be compromised, due to nodes going down and up several times, inducing on several rebalances. This could be specially problematic on a small cluster.

The preferred replica election operation tries to rebalance a topic to as closest as possible to his original partition leader distribution, solving the distribution problem. This is done with the following command:

docker exec -t -i kafkalab_kafka_1 
/opt/kafka/bin/kafka-preferred-replica-election.sh 
--zookeeper ${MY_IP}:2181

IMPORTANT: This command triggers rebalances on all topics from the cluster, so it must be used with care.

We can also trigger rebalance for just one topic, by writing a JSON file like this:

{
  "partitions": [
    {
      "partition": 1,
      "topic": "mytopic"
    },
    {
      "partition": 2,
      "topic": "mytopic"
    },
    {
      "partition": 3,
      "topic": "mytopic"
    }
  ]
}

And running the command like this:

docker exec -t -i kafkalab_kafka_1 
/opt/kafka/bin/kafka-preferred-replica-election.sh 
--zookeeper ${MY_IP}:2181 
--path-to-json-file rebalance-example.json

PS: Before running the command, it is necessary to copy the file to the container where it will run the command.

Another useful command is reassigning of replicas. This is useful, for example, if we want to isolate a broker from the cluster, that it will be removed for maintenance, or if a new broker is added and need to receive his share of topics in order to balance the cluster.

The first step is to generate a file that will be used to request a proposal to partition moves. We write the following file, calling “partition-req.json”:

{
 "topics": [
 {
 "topic": "mytopic"
 }
 ],
 "version": 1
}

On our stack, we have only 3 nodes, so reassign proposal can fail due to the cluster been so small. We change our start cluster shell as follows and run again:

#!/usr/bin/env bash

export MY_IP=`ip route get 1 | awk '{print $NF;exit}'`
docker-compose up -d --scale kafka=6

We then execute the following command. Remember to copy the file to the container first:

docker exec -t -i kafkalab_kafka_1 
/opt/kafka/bin/kafka-reassign-partitions.sh 
--zookeeper ${MY_IP}:2181 --generate 
--topics-to-move-json-file partition-req.json 
--broker-list 1004,1005,1006

IMPORTANT: We can copy the file as follows:

docker cp partition-req.json kafkalab_kafka_1:/partition-req.json

On the command above, we ask Kafka that we want to redistribute the replica set from the current brokers to the brokers 1004,1005 and 1006. We receive the following output, with the actual distribution and a proposed one:

Current partition replica assignment 
{"version":1,"partitions":[{"topic":"mytopic","partition":2,"replicas":[1001]
,"log_dirs":["any"]},{"topic":"mytopic","partition":1,"replicas":[1003],
"log_dirs":["any"]},{"topic":"mytopic","partition":0,"replicas":[1002],
"log_dirs":["any"]}]}

Proposed partition reassignment configuration 
{"version":1,"partitions":[{"topic":"mytopic","partition":2,"replicas":[1005]
,"log_dirs":["any"]},
{"topic":"mytopic","partition":1,"replicas":[1004],
"log_dirs":["any"]},{"topic":"mytopic","partition":0,"replicas":[1006],
"log_dirs":["any"]}]}

The first JSON can be saved for rolling back, in case anything goes wrong. Let’s save the second JSON on a file called replica-proposal.json:

{"version":1,"partitions":[{"topic":"mytopic","partition":2,
"replicas":[1005],"log_dirs":["any"]},
{"topic":"mytopic","partition":1,"replicas":[1004]
,"log_dirs":["any"]},{"topic":"mytopic","partition":0
,"replicas":[1006],"log_dirs":["any"]}]}

Finally, we run the replica assignment command, using the proposed distribution file as parameter – don’t forget to copy the file to the container first -, as follows:

docker exec -t -i kafkalab_kafka_1 
/opt/kafka/bin/kafka-reassign-partitions.sh 
--zookeeper ${MY_IP}:2181 --execute 
--reassignment-json-file replica-proposal.json

We will receive a output like this:

Current partition replica assignment {"version":1,"partitions":
[{"topic":"mytopic","partition":2,"replicas":[1001],"log_dirs":["any"]}
,{"topic":"mytopic","partition":1,"replicas":[1003],"log_dirs":["any"]}
,{"topic":"mytopic","partition":0,"replicas":[1002],"log_dirs":["any"]}]}

Save this to use as the --reassignment-json-file option during rollback

Successfully started reassignment of partitions.

This means that reassigning is been performed. During this phase, Kafka will redistribute the replicas and copy all data across the new brokers, so depending on the amount of data, this operation can take a lot of time. We can check the status of reassignment by running:

docker exec -t -i kafkalab_kafka_1 
/opt/kafka/bin/kafka-reassign-partitions.sh 
--zookeeper ${MY_IP}:2181 --verify 
--reassignment-json-file replica-proposal.json

When reassignment is finished, we will see the following:

Status of partition reassignment: 

Reassignment of partition mytopic-2 completed successfully

Reassignment of partition mytopic-1 completed successfully

Reassignment of partition mytopic-0 completed successfully

We can also check the status of our topics by running the describe command, as follows:

docker exec -t -i kafkalab_kafkadocker exec -t -i 
kafkalab_kafka_1 /opt/kafka/bin/kafka-topics.sh 
--zookeeper ${MY_IP}:2181 --describe

After our reassignment, it will output something like this:

Topic:mytopic PartitionCount:3 ReplicationFactor:1 Configs:

Topic: mytopicPartition: 0Leader: 1006Replicas: 1006Isr: 1006

Topic: mytopicPartition: 1Leader: 1004Replicas: 1004Isr: 1004

Topic: mytopicPartition: 2Leader: 1005Replicas: 1005Isr: 1005

Topic:test PartitionCount:2 ReplicationFactor:1 Configs:

Topic: testPartition: 0Leader: 1003Replicas: 1003Isr: 1003

Topic: testPartition: 1Leader: 1001Replicas: 1001Isr: 1001

Kafka offset lag

Kafka’s offset lag refers to a situation where we have consumers lagging behind the head of a stream. Let’s revisit one of our diagrams from the offsets explained section:

kafka-diagram-3

Consumers lagging behind on a stream

As we can see on the diagram above, we have 2 consumers groups in a stream. Consumer group 1 is 3 messages from the stream’s head, while consumer group 2 is 8 messages away. This difference between head and current position of a consumer on a stream is called offset lag.

The causes for a offset lag may vary, ranging from network problems to issues on the consumer application itself. It is important to keep this lag in check by monitoring it. One good tool for this is Burrow, provided by Linkedin. More information about it could be found on the following link:

https://github.com/linkedin/Burrow

Testing the cluster

It is important to test our cluster configuration, in order to verify how the cluster will behave on several situations, such as when brokers goes down – with partition leaderships or not -, new brokers goes in, etc.

We can code our own tests for this intent using the VerifiableProducer and VerifiableConsumer interfaces on Apache Kafka’s APIs. The usage for this interfaces are essentially the same as the original ones we saw on our lab.

There is also a read-to-use bash version of this interfaces, that can be used to make some testing. For example, if we wanted to test our cluster by sending 200000 messages to mytopic, we can something like this:

docker exec -t -i kafkalab_kafka_1 
/opt/kafka/bin/kafka-verifiable-producer.sh 
--topic mytopic --max-messages 200000 
--broker-list 
 ${MY_IP}:<a broker port>,${MY_IP}:<a broker port>

This will produce a output like the following:

{"timestamp":1516571263855,"name":"startup_complete"}

{"timestamp":1516571264213,"name":"producer_send_success","key":null,"value":"0","offset":0,"topic":"mytopic","partition":1}

{"timestamp":1516571264216,"name":"producer_send_success","key":null,"value":"3","offset":1,"topic":"mytopic","partition":1}

{"timestamp":1516571264216,"name":"producer_send_success","key":null,"value":"6","offset":2,"topic":"mytopic","partition":1}

{"timestamp":1516571264216,"name":"producer_send_success","key":null,"value":"9","offset":3,"topic":"mytopic","partition":1}

{"timestamp":1516571264216,"name":"producer_send_success","key":null,"value":"12","offset":4,"topic":"mytopic","partition":1}

{"timestamp":1516571264216,"name":"producer_send_success","key":null,"value":"15","offset":5,"topic":"mytopic","partition":1}

{"timestamp":1516571264217,"name":"producer_send_success","key":null,"value":"18","offset":6,"topic":"mytopic","partition":1}

{"timestamp":1516571264217,"name":"producer_send_success","key":null,"value":"21","offset":7,"topic":"mytopic","partition":1}

{"timestamp":1516571264218,"name":"producer_send_success","key":null,"value":"24","offset":8,"topic":"mytopic","partition":1}

{"timestamp":1516571264218,"name":"producer_send_success","key":null,"value":"27","offset":9,"topic":"mytopic","partition":1}

{"timestamp":1516571264218,"name":"producer_send_success","key":null,"value":"30","offset":10,"topic":"mytopic","partition":1}

{"timestamp":1516571264219,"name":"producer_send_success","key":null,"value":"33","offset":11,"topic":"mytopic","partition":1}

{"timestamp":1516571264220,"name":"producer_send_success","key":null,"value":"36","offset":12,"topic":"mytopic","partition":1}

{"timestamp":1516571264220,"name":"producer_send_success","key":null,"value":"39","offset":13,"topic":"mytopic","partition":1}

{"timestamp":1516571264220,"name":"producer_send_success","key":null,"value":"42","offset":14,"topic":"mytopic","partition":1}

{"timestamp":1516571264221,"name":"producer_send_success","key":null,"value":"45","offset":15,"topic":"mytopic","partition":1}

{"timestamp":1516571264224,"name":"producer_send_success","key":null,"value":"48","offset":16,"topic":"mytopic","partition":1}

{"timestamp":1516571264225,"name":"producer_send_success","key":null,"value":"51","offset":17,"topic":"mytopic","partition":1}

{"timestamp":1516571264225,"name":"producer_send_success","key":null,"value":"54","offset":18,"topic":"mytopic","partition":1}

...omitted...

{"timestamp":1516571272789,"name":"producer_send_success","key":null,"value":"199980","offset":66660,"topic":"mytopic","partition":1}

{"timestamp":1516571272789,"name":"producer_send_success","key":null,"value":"199983","offset":66661,"topic":"mytopic","partition":1}

{"timestamp":1516571272789,"name":"producer_send_success","key":null,"value":"199986","offset":66662,"topic":"mytopic","partition":1}

{"timestamp":1516571272789,"name":"producer_send_success","key":null,"value":"199989","offset":66663,"topic":"mytopic","partition":1}

{"timestamp":1516571272789,"name":"producer_send_success","key":null,"value":"199992","offset":66664,"topic":"mytopic","partition":1}

{"timestamp":1516571272789,"name":"producer_send_success","key":null,"value":"199995","offset":66665,"topic":"mytopic","partition":1}

{"timestamp":1516571272789,"name":"producer_send_success","key":null,"value":"199998","offset":66666,"topic":"mytopic","partition":1}

{"timestamp":1516571272803,"name":"shutdown_complete"}

{"timestamp":1516571272805,"name":"tool_data","sent":200000,"acked":200000,"target_throughput":-1,"avg_throughput":22346.368715083798}

And similarly, we can test the consumer by running something like this:

docker exec -t -i kafkalab_kafka_1 
/opt/kafka/bin/kafka-verifiable-consumer.sh 
--topic mytopic --max-messages 1000 
--group-id testing 
--broker-list ${MY_IP}:<a broker port>,${MY_IP}:<a broker port>

Which will output something like this:

{"timestamp":1516571973384,"name":"startup_complete"}

{"timestamp":1516571973534,"name":"partitions_revoked","partitions":[]}

{"timestamp":1516571973557,"name":"partitions_assigned","partitions":[{"topic":"mytopic","partition":2},{"topic":"mytopic","partition":1},{"topic":"mytopic","partition":0}]}

{"timestamp":1516571973669,"name":"records_consumed","count":500,"partitions":[{"topic":"mytopic","partition":1,"count":500,"minOffset":66667,"maxOffset":67166}]}

{"timestamp":1516571973680,"name":"offsets_committed","offsets":[{"topic":"mytopic","partition":1,"offset":67167}],"success":true}

{"timestamp":1516571973687,"name":"records_consumed","count":500,"partitions":[{"topic":"mytopic","partition":1,"count":500,"minOffset":67167,"maxOffset":67666}]}

{"timestamp":1516571973690,"name":"offsets_committed","offsets":[{"topic":"mytopic","partition":1,"offset":67667}],"success":true}

{"timestamp":1516571973692,"name":"records_consumed","count":500,"partitions":[]}

{"timestamp":1516571973692,"name":"offsets_committed","offsets":[],"success":true}

{"timestamp":1516571973694,"name":"records_consumed","count":500,"partitions":[]}

{"timestamp":1516571973694,"name":"offsets_committed","offsets":[],"success":true}

{"timestamp":1516571973696,"name":"records_consumed","count":500,"partitions":[]}

{"timestamp":1516571973696,"name":"offsets_committed","offsets":[],"success":true}

{"timestamp":1516571973697,"name":"records_consumed","count":500,"partitions":[]}

{"timestamp":1516571973697,"name":"offsets_committed","offsets":[],"success":true}

{"timestamp":1516571973698,"name":"records_consumed","count":500,"partitions":[]}

{"timestamp":1516571973699,"name":"offsets_committed","offsets":[],"success":true}

{"timestamp":1516571973700,"name":"records_consumed","count":500,"partitions":[]}

{"timestamp":1516571973700,"name":"offsets_committed","offsets":[],"success":true}

{"timestamp":1516571973701,"name":"records_consumed","count":500,"partitions":[]}

{"timestamp":1516571973702,"name":"offsets_committed","offsets":[],"success":true}

{"timestamp":1516571973702,"name":"records_consumed","count":500,"partitions":[]}

{"timestamp":1516571973703,"name":"offsets_committed","offsets":[],"success":true}

{"timestamp":1516571973704,"name":"records_consumed","count":500,"partitions":[]}

{"timestamp":1516571973704,"name":"offsets_committed","offsets":[],"success":true}

{"timestamp":1516571973705,"name":"records_consumed","count":500,"partitions":[]}

{"timestamp":1516571973705,"name":"offsets_committed","offsets":[],"success":true}

{"timestamp":1516571973706,"name":"records_consumed","count":500,"partitions":[]}

{"timestamp":1516571973706,"name":"offsets_committed","offsets":[],"success":true}

{"timestamp":1516571973708,"name":"records_consumed","count":500,"partitions":[]}

{"timestamp":1516571973708,"name":"offsets_committed","offsets":[],"success":true}

{"timestamp":1516571973709,"name":"records_consumed","count":500,"partitions":[]}

{"timestamp":1516571973709,"name":"offsets_committed","offsets":[],"success":true}

{"timestamp":1516571973710,"name":"records_consumed","count":500,"partitions":[]}

{"timestamp":1516571973711,"name":"offsets_committed","offsets":[],"success":true}

{"timestamp":1516571973714,"name":"records_consumed","count":500,"partitions":[]}

{"timestamp":1516571973714,"name":"offsets_committed","offsets":[],"success":true}

{"timestamp":1516571973715,"name":"records_consumed","count":500,"partitions":[]}

{"timestamp":1516571973715,"name":"offsets_committed","offsets":[],"success":true}

{"timestamp":1516571973716,"name":"records_consumed","count":500,"partitions":[]}

{"timestamp":1516571973716,"name":"offsets_committed","offsets":[],"success":true}
...omitted...

Conclusion

And that concludes our study of Apache Kafka. I hope to have passed for the reader a solid explanation of Kafka core concepts, as well as directions for complementary studies on his different usages and applications. Thank you for following me on this post, until next time.

Continue reading

Java 9: Learning the new features – part 4

Standard

Hi, dear readers! Welcome to my blog. On this post, last on our series, we will finally talk about the most known new feature of Java 9, Jigsaw. But after all, why do we need a module system? Let’s find out!

In the beginning

At Java’s beginnings, we have several types of ways to encapsulate applications. There is the most generic unit, know as JAR, and there’s also other more specific formats, such as WARs for web applications and EARs for Enterprise Java Beans (EJB) applications.

This applications, typically speaking, do not consist of only code that was written by the developers teams themselves: there is also a plethora of libraries and frameworks that are also imported, such as logging libraries, ORM frameworks, web frameworks, etc.

Generally speaking, each of this libraries and frameworks are packaged as JARs as well, and their dependencies are also packaged as JARs. This results on a scenario that we have a really big amount of dependencies included on a single application, just to make the whole thing work. The picture bellow shows a typical Spring Boot application’s classpath. It is possible to note the overwhelming mountain of dependencies:

Screen Shot 2017-11-01 at 22.38.00

Fragment of a typical Spring Boot Application dependencies list. It is 267 items long!

Jar hell

The situation stated previously leads us to the infamous Jar hell. This term refers to all problems the developers suffer across more then 20 years of Java, such as ClassNotFoundExceptions, when the application can’t found a certain class, or NoClassDefFoundError, when there’s multiple versions of the same class and the application can’t decide which version to use.

Encapsulation problems

Another problem we got is encapsulation. Once a  dependency is formed, all the classes from the imported package are accessible to the importer. Even if we declare a class with the default visibility access, it is still possible to access the class, just by using the same package name of the class we want to use – don’t try this at home, folks!.

This leads to poor possibilities on interface designs, since we can’t really avoid certain classes to not been exposed to the outside world.

Performance degradation

Another big problem is performance. This is specially felt on Java EE containers, since servers need to support a big list of features provided for applications.  It is true that we had efforts on the past to improve this situation, such as EAP profiles on JBoss server, but still, the situation was far from resolved.

This results on heavy, clunky servers, that can be slow to operate and specially to initialize, alongside intensely memory demanding.

Enter the modules

To solve all the problems we saw on the previous sections, on Java 9 we got Jigsaw, the new module system for Java.

With jigsaw, we can create modules from packages inside a application, allowing a much more coherent and organized structure. Not only that, with modules, we have to explicit declare what we want to expose from a module, so we also eliminate the encapsulation problems we talked about earlier.

This also helps with the performance degradation we just saw, since with modules the amount of classes and packages to be loaded from the servers can be significantly reduced, resulting and thinner servers.

So,let’s see how modules work on practice!

Creating a module

Let’s start by creating a simple project. the source code for  this lab is on this link, the project was created using Intellij IDEA.

To create a project, all we have to do is create a java file called module-info.java and place it at the root of the package structure we want to encapsulate on a module. The result is something like the image bellow:

jigsaw1

Inside the file, we define a module, that it is something like this:

module com.alexandreesl.application {
}

Now, the keyword module is reserved on Java. On the code above we defined a module which name must match the package’s name. That’s it! Our first module! Now, let’s see how to make this module to talk with other modules

Separating a application in different modules

Our sample application will consist of 4 modules: a main module, a dao module, a service module and a model module.

To create the different modules, all we have to do is create the different packages and module definitions – the module-info.java files – , creating the whole module structure.

The image bellow shows the structure:

jigsaw2

And the new module definitions are:

module com.alexandreesl.dao {
}
module com.alexandreesl.model {
}
module com.alexandreesl.service {
}

Exposing a module

Now that we have the modules defined, let’s start coding our project. Our project will represent a simple CRUD of books, for a Bookstore system.

Let’s start by coding the Model module. We will create a Book class, to represent books from the system.

The code for the class is shown bellow:

package com.alexandreesl.model;

public class Book {

    private Long id;

    private String name;

    private String author;

    public Long getId() {
        return id;
    }

    public void setId(Long id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getAuthor() {
        return author;
    }

    public void setAuthor(String author) {
        this.author = author;
    }
}

Then, we modify the module, to expose the model class:

module com.alexandreesl.model {

    exports com.alexandreesl.model;

}

Next, we code the DAO module. We will create  a interface and implementation for the module, separating each other by package segregation. We will also create a object factory.

This is the code for the interface, implementation and object factory of the dao module:

package com.alexandreesl.dao.interfaces;

import com.alexandreesl.model.Book;

public interface IBookDAO {

    void create(Book book);

    void update(Book book);

    Book find(Long id);


}
package com.alexandreesl.dao.impl;

import com.alexandreesl.dao.IBookDAO;
import com.alexandreesl.model.Book;

public class BookDAOImpl implements IBookDAO {
    @Override
    public void create(Book book) {

        System.out.println("INSERTED THE BOOK!");

    }

    @Override
    public void update(Book book) {

        System.out.println("UPDATED THE BOOK!");

    }

    @Override
    public Book find(Long id) {

        Book book = new Book();
        book.setId(id);
        book.setName("Elasticsearch: Consuming real-time data with ELK");
        book.setAuthor("Alexandre Eleutério Santos Lourenço");

        return book;
    }


}
package com.alexandreesl.dao.interfaces;

import com.alexandreesl.dao.impl.BookDAOImpl;

public class BookDAOFactory {

    public static IBookDAO getBookDAO() {

        return new BookDAOImpl();

    }

}

The image bellow shows the final structure of the model with the classes:

jigsaw3

To expose the model and also use the Book class from the Model module, we add the following lines to the module definition:

module com.alexandreesl.dao {

    requires com.alexandreesl.model;
    exports com.alexandreesl.dao.interfaces;

}

Here we can see a important advantage of modules: since we didn’t exported the impl package, the implementation won’t be exposed to code outside the module.

Now we code the service module. To simplify things up, we won’t create a interface-implementation approach this time, just a delegation class to the DAO layer. The code for the service class is shown bellow:

package com.alexandreesl.service;

import com.alexandreesl.dao.interfaces.BookDAOFactory;
import com.alexandreesl.dao.interfaces.IBookDAO;
import com.alexandreesl.model.Book;

public class BookService {

    private IBookDAO bookDAO;

    public BookService() {

        bookDAO = BookDAOFactory.getBookDAO();

    }

    public void create(Book book) {
        bookDAO.create(book);
    }

    public void update(Book book) {
        bookDAO.update(book);
    }

    public Book find(Long id) {
        return bookDAO.find(id);
    }


}

And the module changes are as follows:

module com.alexandreesl.service {

    requires com.alexandreesl.model;
    requires com.alexandreesl.dao;
    exports com.alexandreesl.service;

}

Finally, we code the main module, that it is simply a main method where we test it out our structure:

package com.alexandreesl.application;

import com.alexandreesl.model.Book;
import com.alexandreesl.service.BookService;

public class Main {

    public static void main(String[] args) {

        Book book = new Book();

        book.setAuthor("Stephen King");
        book.setId(1l);
        book.setName("IT - The thing");

        BookService service = new BookService();

        service.create(book);

        book.setName("IT");

        service.update(book);

        Book searchedBook = service.find(2l);

        System.out.println(searchedBook.getName());
        System.out.println(searchedBook.getAuthor());


    }

}

If we run our code, we will see that everything works, just as designed:

/Library/Java/JavaVirtualMachines/jdk-9.jdk/Contents/Home/bin/java "-javaagent:/Applications/IntelliJ IDEA CE.app/Contents/lib/idea_rt.jar=50683:/Applications/IntelliJ IDEA CE.app/Contents/bin" -Dfile.encoding=UTF-8 -p /Users/alexandrelourenco/Applications/git/JigsawLab9/out/production/application:/Users/alexandrelourenco/Applications/git/JigsawLab9/out/production/service:/Users/alexandrelourenco/Applications/git/JigsawLab9/out/production/dao:/Users/alexandrelourenco/Applications/git/JigsawLab9/out/production/model -m com.alexandreesl.application/com.alexandreesl.application.Main
INSERTED THE BOOK!
UPDATED THE BOOK!
Elasticsearch: Consuming real-time data with ELK
Alexandre Eleutério Santos Lourenço

Process finished with exit code 0

Please remember that, if the reader wants it, the code of this project is on Github, on this link.

Static dependencies

One thing that the reader may notice from our code, is that we needed to import the model module on each of the other modules of our system. This is because, as said before, no dependency required by a module is automatically inherited by another module on the hierarchy. All the requirements must be explicit declared to be linked.

However, on this case, if we wanted to declare the dependency on just one module and tell Java on the other modules that the dependency will be met later, we could use the static keyword. Static dependencies on Jigsaw are analogous to the provided scope on Maven, where a dependency is marked just for compilation requirements and it is assumed will be there when the code runs.

To make the changes so the model module is imported on just one module, we change all module definitions to the following:

module com.alexandreesl.application {

    requires com.alexandreesl.model;
    requires com.alexandreesl.service;

}
module com.alexandreesl.dao {

    requires static com.alexandreesl.model;
    exports com.alexandreesl.dao.interfaces;

}
module com.alexandreesl.service {

    requires static com.alexandreesl.model;
    requires com.alexandreesl.dao;
    exports com.alexandreesl.service;

}

If we run again our code, we will see that it runs successfully, just like before.

Package manager support

Since it is a new concept introduced just now, there is still some work undergoing on Java’s package management frameworks, such as Maven and Gradle. Keep it in mind that the objective of Jigsaw is not to replace package management systems.

Think of it more of a complement to this systems, with Jigsaw managing exposure and internal dependencies and the package systems managing issues such as packaging artifacts, running tests, etc.

If the reader is familiar with Gradle, there is some plugins already developed that integrates Jigsaw with it, like chainsaw:

https://github.com/zyxist/chainsaw

Conclusion

And so we conclude our Java 9 series. With several interesting new features, this new edition of Java proves not only that Java has still some relevance on the market, but also can still be evolved with the most modern practices on use. Thank you for following me on this post, until next time.

Java 9: Learning the new features – part 3

Standard

Hi, dear readers! Welcome to my blog. Continuing our series at the new features of Java 9, we will now talk about reactive streams, a new concept on parallel processing that promises to protect our applications from overflows of messages on processing. So, without further delay, let’s begin!

What is Reactive Streams

Let’s imagine a e-commerce that has to send some orders for a distribution center. The e-commerce and DC systems are apart from each other, been able to communicate by a REST service.

Normally, we could simply create a call from the e-commerce system to the DC. So, we implement the call and everything is fine.

However, some day we get a problem on our integration. We notice the e-commerce has overflowed the DC with lots of calls from a Black Friday’s sales, so the REST endpoint starts to fail and lose data.

This scenario illustrates a common integration problem: When a consumer has processing limitations to consume messages above a certain volume, we need to ensure the integration doesn’t end up overflowing the pipeline.

To tackle this problem, it was designed a pattern called Reactive Streams. With Reactive streams, the flow of processing is controlled by the Consumer, not the Publisher, that calls for more data to process as soon as it is ready, keeping his own pace. Not only that, we also have a feature called back pressure, that consists of a kind of throttling to ensure that the Publisher of the data will wait for the Consumer to be available before sending anymore messages and overflow the Consumer, just like we saw on our previous example.

The diagram bellow show us the new Flow API on Java 9, that allows us to implements Reactive Streams. We can see our consumer (the subscriber)  establishing a subscription with the producer and requesting n messages to consume, which are delivered to processing by the onNext method. Don’t worry about the rest of the details: we will see more on the next sections.

The reference site for this diagram, with another good tutorial on Reactive Streams, can be found on the references section at the end of the post.

Creating a Stream: the Publisher

First, let’s create our publisher. the simplest way to create a publisher is by using the SubmissionPublisher class.

Our lab will simulate the orders integration we talked about earlier. We will begin by creating a DTO to hold the data from our orders:

package com.alexandreesl.handson.model;

import java.math.BigDecimal;
import java.util.Date;
import java.util.List;

public class Order {

    private Long id;

    private List<String> products;

    private BigDecimal total;

    private Date orderDate;

    public Long getId() {
        return id;
    }

    public void setId(Long id) {
        this.id = id;
    }

    public List<String> getProducts() {
        return products;
    }

    public void setProducts(List<String> products) {
        this.products = products;
    }

    public BigDecimal getTotal() {
        return total;
    }

    public void setTotal(BigDecimal total) {
        this.total = total;
    }

    public Date getOrderDate() {
        return orderDate;
    }

    public void setOrderDate(Date orderDate) {
        this.orderDate = orderDate;
    }
}

Next, let’s instantiate our publisher, passing the message object DTO as generic:

public static void main(String[] args) {

    SubmissionPublisher<Order> submissionPublisher = new SubmissionPublisher<>();


}

That’s all we need to do for now with our publisher.

Creating a Stream: the Consumer

Now, let’s create our consumer, or subscriber in other words. For this, we will create a class called CDOrderConsumer that implements the Subscriber<T> interface:

package com.alexandreesl.handson.consumer;

import com.alexandreesl.handson.model.Order;

import static java.util.concurrent.Flow.Subscriber;
import static java.util.concurrent.Flow.Subscription;


public class CDOrderConsumer implements Subscriber<Order> {

    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {

        this.subscription = subscription;
        subscription.request(1);

    }

    @Override
    public void onNext(Order item) {
        System.out.println("I am sending the Order to the CD!");
        subscription.request(1);

    }

    @Override
    public void onError(Throwable throwable) {

        throwable.printStackTrace();

    }

    @Override
    public void onComplete() {

        System.out.println("All the orders were processed!");

    }
}

On this class we can see several methods implemented, which we saw on the previous diagram. We can explain each of them as:

  • onSubscribe: On this method, we receive a instance of subscription, which we use to request messages to the publisher. On our example, we stored the instance and requested 1 message to be processed – it is possible to establish a limit of more then 1 message per call, allowing the subscriber to process batches of messages  – with the subscription.request(n) method call;
  • onNext(T): On this method, we make the processing of the messages received. On our example, we print a message symbolizing the REST call and ask the publisher for another message;
  •  onError(Throwable throwable): On this method, we receive errors that can occur on the message processing. On our example, we simply print the errors;
  • onComplete(): This method is called after all messages are processed. On our example, we just print a message to signal the completion. It is important to note that, if we didn’t make the onNext(T) method to ask for other messages, this method would be called after the first message, since no more messages would be asked from the publisher;

Now that we understand how to implement our Subscribe, let’s try it out our stream by subscribing with the publisher and sending a message:

public static void main(String[] args) throws IOException {


    SubmissionPublisher<Order> submissionPublisher = new SubmissionPublisher<>();
    submissionPublisher.subscribe(new CDOrderConsumer());

    Order order = new Order();
    order.setId(1l);
    order.setOrderDate(new Date());
    order.setTotal(BigDecimal.valueOf(123));
    order.setProducts(List.of("product1", "product2", "product3"));


    submissionPublisher.submit(order);

    submissionPublisher.close();

    System.out.println("Waiting for processing.......");
    System.in.read();


}

On our script, we instantiate a publisher, create a order and submit the message for processing. Keep in mind that the submit call doesn’t mean that the message was sent to the subscriber: this is only done when the subscriber calls subscription.request(n). Lastly, we close the publisher, as we will not send any more messages.

Note: You may be thinking about why we put it that System.in.read() at the end. this is because all processing of the stream is done on a separate thread from the main one, so we need to make the program wait for the processing to complete, or else it will exit before the message is processed.

If we execute our program, we will see a output like this:

/Library/Java/JavaVirtualMachines/jdk-9.jdk/Contents/Home/bin/java "-javaagent:/Applications/IntelliJ IDEA CE.app/Contents/lib/idea_rt.jar=50218:/Applications/IntelliJ IDEA CE.app/Contents/bin" -Dfile.encoding=UTF-8 -classpath /Users/alexandrelourenco/Applications/git/ReactiveStreamsJava9/out/production/ReactiveStreamsJava9Lab com.alexandreesl.handson.Main
Waiting for processing.......
I am sending the Order to the CD!
All the orders were processed!

Success!!! Now we have a fully functional reactive stream, allowing us to process our messages.

Processors on Reactive Streams

Sometimes, on a stream, there will be logic that can be placed between the publisher and the consumer, such as filtering, transforming, and more. For this purpose, we can implement processors. Processors are like subscribers that also publish messages after the logic is applied. This way, processors can be chained together on a stream, executing one after another, before finally passing the message to a subscriber.

Let’s expand our previous example. We detected a bug on our e-commerce that sometimes places “phantom” orders with 0 total value on the stream. We didn’t identified the cause yet, but it is necessary to prevent this fake orders from been sent to the CD system. We can use a processor to filter this fake orders.

So, let’s implement the following class, OrderFilter, to accomplish this:

package com.alexandreesl.handson.processor;

import com.alexandreesl.handson.model.Order;

import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;


public class OrderFilter extends SubmissionPublisher<Order> implements Flow.Processor<Order, Order> {

    private Flow.Subscription subscription;


    @Override
    public void onSubscribe(Flow.Subscription subscription) {

        this.subscription = subscription;
        subscription.request(1);

    }

    @Override
    public void onNext(Order item) {

        if (item.getTotal().doubleValue() > 0) {

            submit(item);

        } else {

            System.out.println("INVALID ORDER! DISCARDING...");

        }

        subscription.request(1);


    }

    @Override
    public void onError(Throwable throwable) {

        throwable.printStackTrace();

    }

    @Override
    public void onComplete() {

        System.out.println("All the orders were processed!");

    }


}

On this class, we implement both publisher and subscriber interfaces. The code is basically the same from our subscriber, except that on the onNext(T) method we implement a logic that checks if a order has a total value bigger then 0. If it has, it is submitted to the subscriber, otherwise, it is discarded.

Next, we modify our code, subscribing the processor on our stream and testing it out with 2 orders, one valid and one fake:

public static void main(String[] args) throws IOException {

    SubmissionPublisher<Order> submissionPublisher = new SubmissionPublisher<>();
    OrderFilter filter = new OrderFilter();
    submissionPublisher.subscribe(filter);
    filter.subscribe(new CDOrderConsumer());

    Order order = new Order();
    order.setId(1l);
    order.setOrderDate(new Date());
    order.setTotal(BigDecimal.valueOf(123));
    order.setProducts(List.of("product1", "product2", "product3"));

    submissionPublisher.submit(order);

    order = new Order();
    order.setId(2l);
    order.setOrderDate(new Date());
    order.setProducts(List.of("product1", "product2", "product3"));

    order.setTotal(BigDecimal.ZERO);

    submissionPublisher.submit(order);

    submissionPublisher.close();

    System.out.println("Waiting for processing.......");
    System.in.read();

}

If we run the code, we will a message indicating that one of the messages was discarded, reflecting that our implementation was a success:

/Library/Java/JavaVirtualMachines/jdk-9.jdk/Contents/Home/bin/java "-javaagent:/Applications/IntelliJ IDEA CE.app/Contents/lib/idea_rt.jar=51469:/Applications/IntelliJ IDEA CE.app/Contents/bin" -Dfile.encoding=UTF-8 -classpath /Users/alexandrelourenco/Applications/git/ReactiveStreamsJava9/out/production/ReactiveStreamsJava9Lab com.alexandreesl.handson.Main
Waiting for processing.......
INVALID ORDER! DISCARDING...
I am sending the Order to the CD!
All the orders were processed!

The source code for our lab can be found here.

Conclusion

And so we conclude our learning on Reactive Streams. With a simple and intuitive approach, Reactive Streams are a good solution to try it out, specially on solutions that have capacity limitations. Please follow me next time for our last chapter on this series, where we will finally see the so famed new module system, Jigsaw. Thank you for following me on this post, until next time.

References

Reactive Streams (Wikipedia)

Reactive Streams Tutorial (another good tutorial to serve as guide)

Java 9: Learning the new features – part 2

Standard

Hi, dear readers! Welcome to my blog. On this post, we will continue our tour on Java 9, now focusing on what changed on Streams and Optionals.

Creating collections

Before Java 9, when we wanted to populate a collection with some data, we commonly would do this:

Map<Long,String> tasks = new HashMap<>();

tasks.put(1l,"Put trash on the street");
tasks.put(2l,"Buy bread");
tasks.put(3l,"Walk with the dog");
tasks.put(4l,"make dinner");

Of course, we could also create the collection like this:

Map<Long, String> tasks = new HashMap<>() {{

    put(1l, "Put trash on the street");
    put(2l, "Buy bread");
    put(3l, "Walk with the dog");
    put(4l, "make dinner");

}};

Still, it is quite a verbose way to create a collection. Finally, on Java 9, we can create a collection this way, much more cleaner:

Map<Long, String> tasks = Map.of(
        1l, "Put trash on the street",
        2l, "Buy bread",
        3l, "Walk with the dog",
        4l, "make dinner"

);

Two points are worth note about the of method, however:

  • There’s no way to choose which implementation will be used;
  • If there is any null values on the data, the creation will fail with a NullPointerException;

New collectors

Another good addition was the new collectors. With them, we can now apply filters or mappers to streams inside of the collecting. Let’s see some examples.

Let’s use the same tasks map from before. Let’s suppose the want a list of task values, filtered by only taks that doesn’t have the word dog. With the new collectors, we can accomplish this by doing:

tasks.values().stream().collect(Collectors.filtering(w -> !w.contains("dog"), Collectors.toList())).forEach(System.out::println);

If we execute the code, we will see that it will print all the tasks except the one about walling with the dog, as we expected.

Now, let’s see another example. Let’s suppose we want to create a list with only the first word of each task. This can be done by using the following code:

tasks.values().stream().collect(Collectors.mapping(w -> w.split(" ")[0], Collectors.toList())).forEach(System.out::println);

If we run the code, we will see that it will print a list with just the first words from the tasks, as we expected.

Iterating with streams

Another interesting new feature included was the dropwhile and takewhile operations. By using them, we can iterate sequentially on collections, discarding or including items while a predicate is not satisfied. Let’s see some examples.

Let’s begin by creating a collection for our tests:

List<String> words = List.of("we", "are", "testing", 
        "new", "features", "of", "Java", "9");

Now, let’s try the dropwhile:

words.stream().dropWhile(e -> !e.equals("new")).forEach(System.out::println);

The result if we execute our stream will be as follows:

new
features
of
Java
9

As we can see, it is the correct result, since we ordered the stream to drop items from our iteration while none of them are equals to “new”.

If we try the takewhile operation, with the same predicate, we will see that the stream will store the items while a item equals to “new” is not found, exactly as expected. This is the code modified for the new example:

words.stream().takeWhile(e -> !e.equals("new")).forEach(System.out::println);

And this is the new result:

we
are
testing

New features on Optionals

Optionals also get their share of improvements. Let’s begin with our previous example from the mapping collector.

Let’s suppose our tasks map uses Optionals for values instead of literal strings:

Map<Long, Optional<String>> tasks = Map.of(
        1l, Optional.ofNullable("Put trash on the street"),
        2l, Optional.ofNullable("Buy bread"),
        3l, Optional.ofNullable("Walk with the dog"),
        4l, Optional.ofNullable("make dinner")

);

If we wanted to use the same map to implement the previous stream, we would have to “extract” all the values from the optionals previous to using them on the stream. That is, until Java 9.

Now, we can implement the previous stream on this new scenario by doing this:

tasks.values().stream().flatMap(Optional::stream).collect(Collectors.mapping(w -> w.split(" ")[0], Collectors.toList())).forEach(System.out::println);

If we run our code, we will see that it will print the list with just the first words from the tasks, just like before.

Another good addition was the ifpresentorelse method. Now, if we need to implement logic that depends if a Optional is empty or not, we can just do:

myOptional.ifPresentOrElse(present -> System.out.println(present), () -> {
    System.out.println("nothing to do");
});

And even more interesting, now Optionals supports the or method, that allows us to create multiple fallback returns! We can see the method in action on the example bellow:

myOptional.or(() -> Optional.ofNullable("this is my first callback"))
        .or(() -> Optional.ofNullable("this is my second callback"))
        .or(() -> Optional.ofNullable("this is my third callback"))
        .or(() -> Optional.ofNullable("this is my fourth callback"));

Conclusion

And so we concluded another post from our series on the new features of Java 9. Please, stay tuned on my series, where we will talk about other features, such as the long waited Jigsaw. Thank you for following me on another post, thank you.

Java 9: Learning the new features – part 1

Standard

Hi, dear readers! Welcome to my blog. This is the first post from a series focused on studying the new features from Java 9.

After waiting so much time for some features like Jigsaw, the so-called Java module feature, Java 9 is finally upon us. Let’s begin our journey by exploring the new REPL console for the language, JShell!

Installing Java 9

To install Java 9, I recommend following the instructions on this link.

REPL

REPL is a acronym that stands for Read-Eval-Print-Loop. A REPL is a terminal where we can input commands and receive immediate feedback about the code we just entered.

The code is readed, his syntax is evaluated, then executed, the results are printed on the console and finally the terminal loops for the next command, hence concluding the execution, just like the acronym dictates.

Starting JShell

To start JShell, we just open a terminal and enter:

JShell

This will initialize the shell, as we can see bellow:

|  Welcome to JShell -- Version 9

|  For an introduction type: /help intro

jshell>

Just to finish our first glance at basic JShell commands, to exit the console, we just type:

jshell> /exit

|  Goodbye

Running commands

Now, let’s enter some commands. First, let’s create a String variable:

jshell> String myString = "Welcome to my JShell!"

myString ==> "Welcome to my JShell!"

jshell>

There’s two things we can notice on the code above: First, we don’t need to use a semicolon. Secondly, we can see the REPL in motion, as the code was processed and the results were printed on the console. If we just type the variable name, we can see that it will print his contents:

jshell> myString

myString ==> "Welcome to my JShell!"

jshell>

We can also use other types of commands as well, such as loops:

jshell> for (int i = 0;i < 10; i++)

   ...> System.out.println(i)

0

1

2

3

4

5

6

7

8

9

jshell>

It is also possible to make simple arithmetical operations. Let’s try a simple addition:

jshell> 1 + 2

$1 ==> 3

jshell>

Did you noticed we didn’t defined a variable? When we don’t include one, JShell do this for us, on this case, $1. This is defined by a $ followed by the command’s index, since JShell stores the commands of our session on a array-like structure.

We can see the command’s structure with the /list command, as follows:

jshell> /list

   1 : 1 + 2

   2 : String myString = "Welcome to my JShell!";

   3 : myString

   4 : for (int i = 0;i < 10; i++)

       System.out.println(i);

jshell>

Of course, variables implicit defined can also be used on other commands, as follows:

jshell> int i = $1 + 1

i ==> 4

jshell>

Editing scripts

JShell also allows us to edit and save scripts – snippets – of code, allowing us to create classes this way. Let’s see how to do it.

JShell comes with a editor, but it is also possible to change the editor for other of your choice. I will change my editor to Vim, using the following command:

jshell> /set editor vim

|  Editor set to: vim

jshell>

Now that our editor is changed, let’s begin by opening the command with the for loop on the editor – on my case, is the command at index 4:

jshell> /edit 4

This will open the snippet on Vim editor. Let’s edit the code as follows and save:

public class MyObject {

public static void myMethod() {

for (int i = 0;i < 10; i++)
System.out.println(i);

}

}

After saving, we will see a message indicating that the class was created:

jshell> /edit 4

|  created class MyObject

0

1

2

3

4

5

6

7

8

9

we can also discard the old code with the /drop command:

/drop 4

Now, let’s try to use our class on the shell:

jshell> MyObject.myMethod()

0

1

2

3

4

5

6

7

8

9

jshell>

As we can see, the code was correctly executed, proving that our class creation was a success.

Importing & Exporting

Importing and exporting is done by the /save and /open commands. Let’s run the following command:

/save <path-to-save>/backup.txt

The result will be a file like the following:

1 + 2
String myString = "Welcome to my JShell!";
myString
int i = $1 + 1;
public class MyObject {

public static void myMethod() {

for (int i = 0;i < 10; i++)
System.out.println(i);

}

}
MyObject.myMethod()

Now, let’s close the shell with the /exit command and open again, cleaning our session.

Now, let’s run the /open command to import our previous commands:

/open <path-to-save>/backup.txt

And finally, let’s run the /list command to see if the commands from our backup were imported:

jshell> /list

   1 : 1 + 2

   2 : String myString = "Welcome to my JShell!";

   3 : myString

   4 : int i = $1 + 1;

   5 : public class MyObject {

       

       public static void myMethod() {

       

       for (int i = 0;i < 10; i++)

       System.out.println(i);

       

       }

       

       }

   6 : MyObject.myMethod()

jshell>

We can see that our import was a success, successfully importing the commands from the session.

Other commands

Of course there are other commands alongside the ones showed on this post as well. A complete list of all the commands in JShell can be found on JShell’s documentation.

Conclusion

And so we conclude our first glimpse on the new features of Java 9. JShell is a interesting new addition to the Java language, allowing us to quickly test and run Java code. It is not a tool for production use, on my opinion, but it is a good tool for development and learning purposes. Thank you for following me on this post, until next time.

 

 

Apache Camel: integrating systems with Java

Standard

Hi, dear readers! Welcome to my blog. On this post, we will talk about Apache Camel, a robust solution for deploying system integrations across various technologies, such as REST, WS, JMS, JDBC, AWS Products, LDAP, SMTP, Elasticsearch etc.

So, let’s get start!

Origin

Apache Camel was created on Apache Service Mix. Apache Service Mix was a project powered by the Spring Framework and implemented following the JBI specification. The Java Business Integration specification specifies a plug and play platform for systems integrations, following the EIP (Enterprise Integration Patterns) patterns.

Terminology

Exchange

Exchanges – or MEPs(Message Exchange Patterns) – are like frames where we transport our data across the integrations on Camel. A Exchange can have 2 messages inside, one representing the input and another one representing the output of a integration.

The output message on Camel is optional, since we could have a integration that doesn’t have a response. Also, a Exchange can have properties, represented as key-value entries, that can be used as data that will be used across the whole route (we will see more about routes very soon)

Message

Messages are the data itself that is transferred inside a Camel route. A Message can have a body, which is the data itself and headers, which are, like properties on a Exchange, key-value entries that can be used along the processing.

One important aspect to keep in mind, however, is that along a Camel route our Messages are changed – when we convert the body with a Type converter, for instance – and when this happens, we lose all our headers. So, Message headers must be seen as ephemeral data, that will not be used through the whole route. For that type of data, it is better to use Exchange properties.

The Message body can de made of several types of data, such as binaries, JSON, etc.

Camel context

The Camel context is the runtime container where Camel runs it. It initializes type converters, routes, endpoints, EIPs etc.

A Camel context has 3 possible status: started, suspended and stopped. When started, the context will serve the routes processing as normal.

When on suspended status, the Camel context will stop the processing – after the Exchanges already on processing are completed – , but keep all the caches, resources etc still loaded. A suspended context can be restarted.

Finally, there’s the stop status. When stopped, the context will stop the processing like the suspended status, but also will release all the resources caches etc, making a complete shutdown. As with the suspended status, Camel will also guarantee that all the Exchanges being processing will be finished before the shutdown.

Route

Routes on Camel are the heart of the processing. It consists of a flow, that start on a endpoint, pass through a stream of processors/convertors and finishes on another endpoint. it is possible to chain routes by calling another route as the final endpoint of a previous route.

A route can also use other features, such as EIPs, asynchronous and parallel processing.

Channel

When Camel executes a route, the controller in which it executes the route is called Channel.

A Channel is responsible for chaining the processors execution, passing the Exchange from one to another, alongside monitoring the route execution. It also allow us to implement interceptors to run any logic on some route’s events, such as when a Exchange is going to a specific Endpoint.

Processor

Processors are the primary extension points on Camel. By creating classes that extend the org.apache.camel.Processor interface, we create programming units that we can use to include our own code on a Camel route, inside a convenient execute method.

Component

A Component act like a factory to instantiate Endpoints for our use. We don’t directly use a Component, we reference instead by defining a Endpoint URI, that makes Camel infer about the Component that it needs to be using in order to create the Endpoint.

Camel provides dozens of Components, from file to JMS, AWS Connections to their products and so on.

Registry

In order to utilize beans from IoC systems, such as OSGi, Spring and JNDI, Camel supplies us with a Bean Registry. The Registry’s mission is to supply the beans referred on Camel routes with the ones create on his associated context, such as a OSGi container, a Spring context etc

Type converter

Type converters, as the name implies, are used in order to convert the body of a message from one type to another. The uses for a converter are varied, ranging from converting a binary format to a String to converting XML to JSON.

We can create our own Type Converter by extending the org.apache.camel.TypeConverter  interface. After creating our own Converter by extending the interface, we need to register it on the Type’s Converter Registry.

Endpoint

A Endpoint is the entity responsible for communicating a Camel Route in or out of his execution process. It comprises several types of sources and destinations as mentioned before, such as SQS, files, Relational Databases and so on. A Endpoint is instantiated and configured by providing a URI to a Camel Route, following the pattern below:

component:option?option1=value1&option2=value2

We can create our own Components by extending the org.apache.camel.Endpoint interface. When extending the interface, we need to override 3 methods, where we supply the logic to create a polling consumer Endpoint, a passive consumer Endpoint and a producer Endpoint.

Lab

So, without further delay, let’s start our lab! On this lab, we will create a route that polls access files from a access log style file, sends the logs to a SQS and backups the file on a S3.

Setup

The setup for our lab is pretty simple: It is a Spring Boot application, configured to work with Camel. Our Gradle.build file is as follows:

apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'org.springframework.boot'
apply plugin: 'maven'
apply plugin: 'idea'

jar {
    baseName = 'apache-camel-handson'
    version = '1.0'
}

project.ext {
    springBootVersion = '1.5.4.RELEASE'
    camelVersion = '2.18.3'

}

sourceCompatibility = 1.8
targetCompatibility = 1.8

repositories {
    mavenLocal()
    mavenCentral()
}

bootRun {
    systemProperties = System.properties
}


dependencies {

    compile group: 'org.apache.camel', name: 'camel-spring-boot-starter', version: camelVersion
    compile group: 'org.apache.camel', name: 'camel-commands-spring-boot', version: camelVersion
    compile group: 'org.apache.camel',name: 'camel-aws', version: camelVersion
    compile group: 'org.apache.camel',name: 'camel-mail', version: camelVersion
    compile group: 'org.springframework.boot', name: 'spring-boot-autoconfigure', version: springBootVersion
    

}
group 'com.alexandreesl.handson'
version '1.0'

buildscript {
    repositories {
        mavenLocal()
        maven {
            url "https://plugins.gradle.org/m2/"
        }
        mavenCentral()
    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:1.5.4.RELEASE")
    }
}

And the Java main file is a simple Java Spring Boot Application file, as follows:

package com.alexandreesl.handson;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;

/**
 * Created by alexandrelourenco on 28/06/17.
 */

@ComponentScan(basePackages = {"com.alexandreesl.handson"})
@SpringBootApplication
@EnableAutoConfiguration
public class ApacheCamelHandsonApp {

    public static void main(String[] args) {
        SpringApplication.run(ApacheCamelHandsonApp.class, args);
    }

}

We also configure a configuration class, where we will register a type converter that we will create on the next section:

package com.alexandreesl.handson.configuration;

import org.apache.camel.CamelContext;
import org.apache.camel.spring.boot.CamelContextConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class CamelConfiguration {


    @Bean
    public CamelContextConfiguration camelContextConfiguration() {

        return new CamelContextConfiguration() {

            @Override
            public void beforeApplicationStart(CamelContext camelContext) {
               

            }

            @Override
            public void afterApplicationStart(CamelContext camelContext) {

            }

        };

    }

}

We also create a configuration which will create a AmazonS3Client and AmazonSQSClient, that will be used by the AWS-S3 and AWS-SQS Camel endpoints:

package com.alexandreesl.handson.configuration;

import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.internal.StaticCredentialsProvider;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.sqs.AmazonSQSClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;

@Configuration
public class AWSConfiguration {


    @Autowired
    private Environment environment;

    @Bean(name = "s3Client")
    public AmazonS3Client s3Client() {
        return new AmazonS3Client(staticCredentialsProvider()).withRegion(Regions.fromName("us-east-1"));
    }

    @Bean(name = "sqsClient")
    public AmazonSQSClient sqsClient() {
        return new AmazonSQSClient(staticCredentialsProvider()).withRegion(Regions.fromName("us-east-1"));
    }

    @Bean
    public StaticCredentialsProvider staticCredentialsProvider() {
        return new StaticCredentialsProvider(new BasicAWSCredentials("<access key>", "<secret access key>"));
    }

}

PS: this lab assumes that the reader is familiar with AWS and already has a account. For the lab, a bucket called “apache-camel-handson” and a SQS called “MyInputQueue” were created.

Configuring the route

Now that we have our Camel environment set up, let’s begin creating our route. First, we create a type converter called “StringToAccessLogDTOConverter” with the following code:

package com.alexandreesl.handson.converters;

import com.alexandreesl.handson.dto.AccessLogDTO;
import org.apache.camel.Converter;
import org.apache.camel.TypeConverters;

import java.util.StringTokenizer;

/**
 * Created by alexandrelourenco on 30/06/17.
 */

public class StringToAccessLogDTOConverter implements TypeConverters {

    @Converter
    public AccessLogDTO convert(String row) {

        AccessLogDTO dto = new AccessLogDTO();

        StringTokenizer tokens = new StringTokenizer(row);

        dto.setIp(tokens.nextToken());
        dto.setUrl(tokens.nextToken());
        dto.setHttpMethod(tokens.nextToken());
        dto.setDuration(Long.parseLong(tokens.nextToken()));

        return dto;

    }

}

Next, we change our Camel configuration, registering the converter:

package com.alexandreesl.handson.configuration;

import com.alexandreesl.handson.converters.StringToAccessLogDTOConverter;
import org.apache.camel.CamelContext;
import org.apache.camel.spring.boot.CamelContextConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class CamelConfiguration {


    @Bean
    public CamelContextConfiguration camelContextConfiguration() {

        return new CamelContextConfiguration() {

            @Override
            public void beforeApplicationStart(CamelContext camelContext) {

                camelContext.getTypeConverterRegistry().addTypeConverters(new StringToAccessLogDTOConverter());

            }

            @Override
            public void afterApplicationStart(CamelContext camelContext) {

            }

        };

    }

}

Our converter reads a String and converts to a DTO, with the following attributes:

package com.alexandreesl.handson.dto;

/**
 * Created by alexandrelourenco on 30/06/17.
 */
public class AccessLogDTO {

    private String ip;

    private String url;

    private String httpMethod;

    private long duration;

    public String getIp() {
        return ip;
    }

    public void setIp(String ip) {
        this.ip = ip;
    }

    public String getUrl() {
        return url;
    }

    public void setUrl(String url) {
        this.url = url;
    }

    public String getHttpMethod() {
        return httpMethod;
    }

    public void setHttpMethod(String httpMethod) {
        this.httpMethod = httpMethod;
    }

    public long getDuration() {
        return duration;
    }

    public void setDuration(long duration) {
        this.duration = duration;
    }

    @Override
    public String toString() {

        StringBuffer buffer = new StringBuffer();
        buffer.append("[");
        buffer.append(ip);
        buffer.append(",");
        buffer.append(url);
        buffer.append(",");
        buffer.append(httpMethod);
        buffer.append(",");
        buffer.append(duration);
        buffer.append("]");

        return buffer.toString();

    }
}

Finally, we have our route, defined on our RouteBuilder:

package com.alexandreesl.handson.routes;

import com.alexandreesl.handson.dto.AccessLogDTO;
import org.apache.camel.LoggingLevel;
import org.apache.camel.spring.SpringRouteBuilder;
import org.springframework.context.annotation.Configuration;

/**
 * Created by alexandrelourenco on 30/06/17.
 */

@Configuration
public class MyFirstCamelRoute extends SpringRouteBuilder {


    @Override
    public void configure() throws Exception {

        from("file:/Users/alexandrelourenco/Documents/apachecamelhandson?delay=1000&charset=utf-8&delete=true")
                .setHeader("CamelAwsS3Key", header("CamelFileName"))
                .to("aws-s3:arn:aws:s3:::apache-camel-handson?amazonS3Client=#s3Client")
                .convertBodyTo(String.class)
                .split().tokenize("\n")
                    .convertBodyTo(AccessLogDTO.class)
                    .log(LoggingLevel.INFO, "${body}")
                    .to("aws-sqs://MyInputQueue?amazonSQSClient=#sqsClient");

    }
}

On the route above, we define a file endpoint that will poll for files on a folder, each 1 second and remove the file if the processing is completed successfully. Then we send the file to Amazon using S3 as a backup storage.

Next, we split the file using a splitter, that generates a string for each line of the file. For each line we convert the line to a DTO, log the data and finally we send the data to a SQS.

Now that we have our code done, let’s run it!

Running

First, we start our Camel route. To do this, we simply run the main Spring Boot class, as we would do with any common Java program.

After firing up Spring Boot, we would receive on our console the output that the route was successful started:

. ____ _ __ _ _
 /\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/ ___)| |_)| | | | | || (_| | ) ) ) )
 ' |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot :: (v1.5.4.RELEASE)

2017-07-01 12:52:02.224 INFO 3042 --- [ main] c.a.handson.ApacheCamelHandsonApp : Starting ApacheCamelHandsonApp on Alexandres-MacBook-Pro.local with PID 3042 (/Users/alexandrelourenco/Applications/git/apache-camel-handson/build/classes/main started by alexandrelourenco in /Users/alexandrelourenco/Applications/git/apache-camel-handson)
2017-07-01 12:52:02.228 INFO 3042 --- [ main] c.a.handson.ApacheCamelHandsonApp : No active profile set, falling back to default profiles: default
2017-07-01 12:52:02.415 INFO 3042 --- [ main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@475e586c: startup date [Sat Jul 01 12:52:02 BRT 2017]; root of context hierarchy
2017-07-01 12:52:03.325 INFO 3042 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'org.apache.camel.spring.boot.CamelAutoConfiguration' of type [org.apache.camel.spring.boot.CamelAutoConfiguration$$EnhancerBySpringCGLIB$$72a2a9b] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2017-07-01 12:52:09.520 INFO 3042 --- [ main] o.a.c.i.converter.DefaultTypeConverter : Loaded 192 type converters
2017-07-01 12:52:09.612 INFO 3042 --- [ main] roperties$SimpleAuthenticationProperties :

Using default password for shell access: b738eab1-6577-4f9b-9a98-2f12eae59828




2017-07-01 12:52:15.463 WARN 3042 --- [ main] tarterDeprecatedWarningAutoConfiguration : spring-boot-starter-remote-shell is deprecated as of Spring Boot 1.5 and will be removed in Spring Boot 2.0
2017-07-01 12:52:15.511 INFO 3042 --- [ main] o.s.j.e.a.AnnotationMBeanExporter : Registering beans for JMX exposure on startup
2017-07-01 12:52:15.519 INFO 3042 --- [ main] o.s.c.support.DefaultLifecycleProcessor : Starting beans in phase 0
2017-07-01 12:52:15.615 INFO 3042 --- [ main] o.a.camel.spring.boot.RoutesCollector : Loading additional Camel XML routes from: classpath:camel/*.xml
2017-07-01 12:52:15.615 INFO 3042 --- [ main] o.a.camel.spring.boot.RoutesCollector : Loading additional Camel XML rests from: classpath:camel-rest/*.xml
2017-07-01 12:52:15.616 INFO 3042 --- [ main] o.a.camel.spring.SpringCamelContext : Apache Camel 2.18.3 (CamelContext: camel-1) is starting
2017-07-01 12:52:15.618 INFO 3042 --- [ main] o.a.c.m.ManagedManagementStrategy : JMX is enabled
2017-07-01 12:52:25.695 INFO 3042 --- [ main] o.a.c.i.DefaultRuntimeEndpointRegistry : Runtime endpoint registry is in extended mode gathering usage statistics of all incoming and outgoing endpoints (cache limit: 1000)
2017-07-01 12:52:25.810 INFO 3042 --- [ main] o.a.camel.spring.SpringCamelContext : StreamCaching is not in use. If using streams then its recommended to enable stream caching. See more details at http://camel.apache.org/stream-caching.html
2017-07-01 12:52:27.853 INFO 3042 --- [ main] o.a.camel.spring.SpringCamelContext : Route: route1 started and consuming from: file:///Users/alexandrelourenco/Documents/apachecamelhandson?charset=utf-8&delay=1000&delete=true
2017-07-01 12:52:27.854 INFO 3042 --- [ main] o.a.camel.spring.SpringCamelContext : Total 1 routes, of which 1 are started.
2017-07-01 12:52:27.854 INFO 3042 --- [ main] o.a.camel.spring.SpringCamelContext : Apache Camel 2.18.3 (CamelContext: camel-1) started in 12.238 seconds
2017-07-01 12:52:27.858 INFO 3042 --- [ main] c.a.handson.ApacheCamelHandsonApp : Started ApacheCamelHandsonApp in 36.001 seconds (JVM running for 36.523)

PS: Don’t forget it to replace the access key and secret with your own!

Now, to test it, we place a file on the polling folder. For testing, we create a file like the following:

10.12.64.3 /api/v1/test1 POST 123
10.12.67.3 /api/v1/test2 PATCH 125
10.15.64.3 /api/v1/test3 GET 166
10.120.64.23 /api/v1/test1 POST 100

We put a file with the content on the folder and after 1 second, the file is gone! Where did it go?

If we check the Amazon S3 bucket interface, we will see that the file was created on the storage:

 

Screen Shot 2017-07-01 at 13.12.39

And if we check the Amazon SQS interface, we will see 4 messages on the queue, proving that our integration is a success:

Screen Shot 2017-07-01 at 13.38.12

If we check the messages, we will see that Camel correctly parsed the information from the file, as we can see on the example bellow:

[10.12.64.3,/api/v1/test1,POST,123]

Implementing Error Handling

On Camel, we can implement logic designed for handling errors. These are done by defining routes as well, which inputs are the exceptions fired by the routes.

On our lab, let’s implement a error handling. First, we add a option on the file endpoint that makes the file to be moved to a .error folder when a error occurs, and then we send a email to ourselves to alert of the failure. we can do this by changing the route as follows:

package com.alexandreesl.handson.routes;

import com.alexandreesl.handson.dto.AccessLogDTO;
import org.apache.camel.LoggingLevel;
import org.apache.camel.spring.SpringRouteBuilder;
import org.springframework.context.annotation.Configuration;

/**
 * Created by alexandrelourenco on 30/06/17.
 */

@Configuration
public class MyFirstCamelRoute extends SpringRouteBuilder {


    @Override
    public void configure() throws Exception {

        onException(Exception.class)
                .handled(false)
                .log(LoggingLevel.ERROR, "An Error processing the file!")
                .to("smtps://smtp.gmail.com:465?password=xxxxxxxxxxxxxxxx&username=alexandreesl@gmail.com&subject=A error has occurred!");

        from("file:/Users/alexandrelourenco/Documents/apachecamelhandson?delay=1000&charset=utf-8&delete=true&moveFailed=.error")
                .setHeader("CamelAwsS3Key", header("CamelFileName"))
                .to("aws-s3:arn:aws:s3:::apache-camel-handson?amazonS3Client=#s3Client")
                .convertBodyTo(String.class)
                .split().tokenize("\n")
                    .convertBodyTo(AccessLogDTO.class)
                    .log(LoggingLevel.INFO, "${body}")
                    .to("aws-sqs://MyInputQueue?amazonSQSClient=#sqsClient");

    }
}

Then, we restart the route and feed up a file like the following, that will cause a parse exception:

10.12.64.3 /api/v1/test1 POST 123
10.12.67.3 /api/v1/test2 PATCH 125
10.15.64.3 /api/v1/test3 GET 166
10.120.64.23 /api/v1/test1 POST 10a

After the processing, we can see the console and watch how the error was handled:

2017-07-01 14:18:48.695  INFO 3230 --- [           main] o.a.camel.spring.SpringCamelContext      : Apache Camel 2.18.3 (CamelContext: camel-1) started in 11.899 seconds2017-07-01 14:18:48.695  INFO 3230 --- [           main] o.a.camel.spring.SpringCamelContext      : Apache Camel 2.18.3 (CamelContext: camel-1) started in 11.899 seconds2017-07-01 14:18:48.699  INFO 3230 --- [           main] c.a.handson.ApacheCamelHandsonApp        : Started ApacheCamelHandsonApp in 35.612 seconds (JVM running for 36.052)2017-07-01 14:18:52.737  WARN 3230 --- [checamelhandson] c.amazonaws.services.s3.AmazonS3Client   : No content length specified for stream data.  Stream contents will be buffered in memory and could result in out of memory errors.2017-07-01 14:18:53.105  INFO 3230 --- [checamelhandson] route1                                   : [10.12.64.3,/api/v1/test1,POST,123]2017-07-01 14:18:53.294  INFO 3230 --- [checamelhandson] route1                                   : [10.12.67.3,/api/v1/test2,PATCH,125]2017-07-01 14:18:53.504  INFO 3230 --- [checamelhandson] route1                                   : [10.15.64.3,/api/v1/test3,GET,166]2017-07-01 14:18:53.682 ERROR 3230 --- [checamelhandson] route1                                   : An Error processing the file!2017-07-01 14:19:02.058 ERROR 3230 --- [checamelhandson] o.a.camel.processor.DefaultErrorHandler  : Failed delivery for (MessageId: ID-Alexandres-MacBook-Pro-local-52251-1498929510223-0-9 on ExchangeId: ID-Alexandres-MacBook-Pro-local-52251-1498929510223-0-10). Exhausted after delivery attempt: 1 caught: org.apache.camel.InvalidPayloadException: No body available of type: com.alexandreesl.handson.dto.AccessLogDTO but has value: 10.120.64.23 /api/v1/test1 POST 10a of type: java.lang.String on: Message[ID-Alexandres-MacBook-Pro-local-52251-1498929510223-0-9]. Caused by: Error during type conversion from type: java.lang.String to the required type: com.alexandreesl.handson.dto.AccessLogDTO with value 10.120.64.23 /api/v1/test1 POST 10a due java.lang.NumberFormatException: For input string: "10a". Exchange[ID-Alexandres-MacBook-Pro-local-52251-1498929510223-0-10]. Caused by: [org.apache.camel.TypeConversionException - Error during type conversion from type: java.lang.String to the required type: com.alexandreesl.handson.dto.AccessLogDTO with value 10.120.64.23 /api/v1/test1 POST 10a due java.lang.NumberFormatException: For input string: "10a"]. Processed by failure processor: FatalFallbackErrorHandler[Pipeline[[Channel[Log(route1)[An Error processing the file!]], Channel[sendTo(smtps://smtp.gmail.com:465?password=xxxxxx&subject=A+error+has+occurred%21&username=alexandreesl%40gmail.com)]]]]
Message History---------------------------------------------------------------------------------------------------------------------------------------RouteId              ProcessorId          Processor                                                                        Elapsed (ms)[route1            ] [route1            ] [file:///Users/alexandrelourenco/Documents/apachecamelhandson?charset=utf-8&del] [      9323][route1            ] [convertBodyTo2    ] [convertBodyTo[com.alexandreesl.handson.dto.AccessLogDTO]                      ] [      8370][route1            ] [log1              ] [log                                                                           ] [         1][route1            ] [to1               ] [smtps://smtp.gmail.com:xxxxxx@gmail.com&subject=A error ha                    ] [      8366]
Stacktrace---------------------------------------------------------------------------------------------------------------------------------------
org.apache.camel.InvalidPayloadException: No body available of type: com.alexandreesl.handson.dto.AccessLogDTO but has value: 10.120.64.23 /api/v1/test1 POST 10a of type: java.lang.String on: Message[ID-Alexandres-MacBook-Pro-local-52251-1498929510223-0-9]. Caused by: Error during type conversion from type: java.lang.String to the required type: com.alexandreesl.handson.dto.AccessLogDTO with value 10.120.64.23 /api/v1/test1 POST 10a due java.lang.NumberFormatException: For input string: "10a". Exchange[ID-Alexandres-MacBook-Pro-local-52251-1498929510223-0-10]. Caused by: [org.apache.camel.TypeConversionException - Error during type conversion from type: java.lang.String to the required type: com.alexandreesl.handson.dto.AccessLogDTO with value 10.120.64.23 /api/v1/test1 POST 10a due java.lang.NumberFormatException: For input string: "10a"] at org.apache.camel.impl.MessageSupport.getMandatoryBody(MessageSupport.java:107) ~[camel-core-2.18.3.jar:2.18.3] at org.apache.camel.processor.ConvertBodyProcessor.process(ConvertBodyProcessor.java:91) ~[camel-core-2.18.3.jar:2.18.3] at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77) [camel-core-2.18.3.jar:2.18.3] at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:542) [camel-core-2.18.3.jar:2.18.3] at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:197) [camel-core-2.18.3.jar:2.18.3]

If we look to the folder, we will see that a .error folder was created and the file was moved to the folder:

Screen Shot 2017-07-01 at 14.25.18

And if we check the mailbox, we will see that we received the failure alert:

Screen Shot 2017-07-01 at 14.27.40

Conclusion

And so we conclude our tour through Apache Camel. With a easy-to-use architecture and dozens of components, it is a highly pluggable and robust option on integration developing. Thank you for following me on this post, until next time.

Refactoring:improving the design of existing code (book review)

Standard

Hi, Dear readers! Welcome to my blog. On this post, I will review a famous book of Martin Fowler, which focus on a refactoring techniques. But after all, why refactoring matters?

Definition

According to Fowler, a refactoring consists of modifying code in order to improve his readability and capacity to change, without changing his behavior. When refactoring, our objectives is to make the code easier to be read by humans and also improving his structure and design, making changes motivated by business rules easier to implement. Other benefits are that a cleaner code makes it easier to spot bugs, alongside fastening the development of new code on top of a well organized production code.

When refactor?

Fowler defends that refactoring should be done on 3 situations:

  • When you add a new functionality;
  • When you find a bug;
  • When you do a code review;

On this situations, you are forced to make changes on the code structure, making ideal situations for refactoring.

Pitfalls

When refactoring, there is some common pitfalls that could hinder the refactoring. The most common ones are the databases and the interfaces from the code.

Database schemas could be hard to change, specially if the database is old with millions of rows. This produces a splash effect on the code that manipulates the database, making more difficult to make changes on the code. Also the interfaces (APIs, libraries or even a single class inside a component) could be a challenge to refactoring, since a change on a interface could cascade to a change on lots of client’s code.

In order to solve this problem, the better approach for databases is to isolate the database’s logic on his own layer, allowing the “dirtier” code to be evolved in a more controlled manner. As for the interfaces issue, the better approach is to allow the old and new interfaces to coexist, while a migration work is conducted.

When not refactor?

According to Fowler, there is one situation when you shouldn’t refactor: when the code is so bad, that it is better to be written from scratch. This is a difficult rule to be measured as to when the code is bad enough to be rewritten. Some good hints could be if the code is infested by bugs or if it is identified that it has so much refactoring points, that fixing it up could end up rewritten most of the code.

Refactoring and performance

Sometimes, when refactoring, we could incur on refactorings that cause some performance degradation. Of course that it is up to the business to measure up how much this degradation is unbearable to meet the requirements, but as a general rule, we can assume that a more organized code is a easier code to fine tune. So, if we refactor first and improve his readability and design, it will be easier to make a performance tune later.

Unit testings

Another key point defended by Fowler is the need to develop unit tests for the code. With unit tests, we can develop refactorings in small steps (“baby steps”), receiving rapid feedback from the tests, so if anything breaks we can easily and fast make fixes during the refactoring process.

Refactoring catalog

Here there are some brief descriptions of some of the refactoring patterns that I found more interesting. Complete descriptions with examples can be found on Fowler’s book, that you can find on the links at the end of this post.

Extract Method

This refactoring consists of taking some code that can be grouped together and extract the code as his own method, this way improving readability.

Introduce Explaining Variable

This refactoring consists of taking a big and complex conditional and simplifying by turning his operators onto variables, this way making the conditional more self explanatory.

Replace Method with Method Object

This refactoring consists of a situation when you have a method that is better to have some code extracted to his own method, but it refers to a lot of variables that hinders the operation. On this case, this refactoring applies, consisting of taking all the variables and the method and moving to a new object, making a easier environment to make the extract method refactoring.

Move Method

This refactoring consists of moving a method from one class to another. This makes sense when the old class has less uses for his own method then the class he is moved to.

Extract Class

This refactoring consists of when you have a class that it is doing work that could be better organized if divided in two. On this case, we move the common behavior and data (methods and fields) that could form a new class and move it, making a delegation from the old one.

Remove Middle Man

This refactoring consists of when a class has lots of delegating methods to another class, which introduces unnecessary code. On this case, we create a accessor to the instance of the object itself, making it so the callers can call the methods from the class themselves, so after creating the accessor we remove all delegating methods.

Consolidate Conditional Expression

This refactoring consists of when you have several conditionals that returns the same value. On this case, we refactor the conditionals by creating a single one, commonly by creating a method, making the code more clear and simple.

Remove Control Flag

This refactoring consists of when you have a conditional flag that controls the behavior inside a loop. By using control commands like break and continue, we can remove the control flag, simplifying the code.

Replace Conditional with Polymorphism

This refactoring consists of when you have a method on a class that has conditional behavior depending on the type of the object. On this case, we extract each leg of the conditional and create a subclass around the different behaviors, until the method turns out to be empty, in which case we turn the method to abstract on the now superclass of the hierarchy. We may have to change the constructor of the class on a factory method.

Introduce Nul Object

This refactoring consists of when we have various null checks for data on the callers of a object. On this case, we create a object to represent null, that returns all the default data that should be used when the data was null. This way, we don’t have to make checks for null on the callers anymore, since the behavior on the null object will cover the check’s circumstances.

Preserve Whole Object

This refactoring consists of when you have a method call that is preceded by calling several data accessors to get lots of data from other object to pass as parameters for the call. On this case, we change the method to pass the object itself, removing the calls from the data accessors by moving them to inside the method itself. This refactoring not only simplifies the code on the caller’s side, but also simplifies changes if the method needs more data from the object passed by parameter on the future.

Replace Constructor with Factory Method

This refactoring consists of when we want to include more behavior on a constructor then it normally has it. This is specially true on class hierarchies, where the object construction must reflect the corresponding subclass depending on the type of the object. On this case, we change the constructors to a more restrictive access (private or protected)  and create a static factory method at the top of the hierarchy, allowing the dynamic creation of the objects.

Replace Error Code with Exception

This refactoring comes in handy when we have code that returns error codes when something breaks. Error codes are common on languages such as Unix and C, but on Java, we have a much more powerful tool: exceptions. With exceptions, we can easily separate the code that fix the errors from the normal code. So in this case, our best approach is to change the error code’s return to exception throws, which make a much more readable and organized structure.

Replace Exception with Test

This refactoring occurs when we have a code on a try-catch block, that has on the catch block some code that could be moved to be performed before the error occurs. This is typically found when we have predicted errors that can occur on some cases, but we use the catch block as part of our program’s logic. By changing the logic on the catch block to a test (if) before the code that breaks, we can remove the try -catch altogether, making a better readable and consistent code, that doesn’t rely on errors to work.

Extract Subclass

This refactoring consists of when we have some methods and fields that are used only on some instances of the class. On this case, we move the methods and fields to a new subclass, where they could be better organized and maintained.

Extract Superclass

This refactoring is opposed to the previous one, since we create a superclass instead of a subclass. If we have identified common behavior from two different classes, we create a superclass with the common behavior from the two and make both of them subclasses from the created superclass.

Form Template Method

This refactoring consists of when we have two classes that has methods with equal or very similar logic, that needs to be called on a certain order. On this case, we equalize the interfaces of the methods of both classes to be equal and creates a superclass where we move the common methods from both classes and create a orchestration method for the order of the calls. This improves the code on reusability and hierarchy organization.

Conclusion

And so we conclude our introduction to Martin Fowler’s Refactoring book. With good didactic and good examples, the book is a must read that I highly recommend!

Thank you for following me on this post, until next time.

Buy the book now!

Curator: Implementing purge routines on your Elasticsearch cluster

Standard

Hi, dear readers! Welcome to my blog. On this post, we will learn how to use the Curator project to create purge routines on a Elasticsearch cluster.

When we have a cluster crunching logs and other data types from our systems, it is necessary to configure process that manages this data, doing actions like purges and backups. For this purpose, the Curator project comes in handy.

Curator is a Python tool, that allows several types of actions. On this post, we will focus on 2 actions, purge and backup. To install Curator, we can use pip, like the command bellow:

sudo pip install elasticsearch-curator

Once installed, let’s begin preparing our cluster to make the backups, by a backup repository. A backup repository is a Elasticsearch feature, that process backups and save them on a persistent store. On this case, we will configure the backups to be stored on a Amazon S3 bucket. First, let’s install AWS Cloud plugin for Elasticsearch, by running the following command on each of the cluster’s nodes:

bin/plugin install cloud-aws

And before we restart our nodes, we configure the AWS credentials for the cluster to connect to AWS, by configuring them on the elasticsearch.yml file:

cloud:
  aws:
    access_key: <access key>
    secret_key: <secret key>

Finally, let’s configure our backup repository, using Elasticsearch REST API:

PUT /_snapshot/elasticsearch_backups
{
 “type”: “s3”,
 “settings”: {
 “bucket”: “elastic-bckup”,
 “region”: “us-east-1”
 }
}

On the command above, we created a new backup repository, called “elasticsearch-backups”, also defining the bucket where the backups will be created. With our repository created, let’s create our YAMLs to configure Curator.

The first YAML is “curator-config.yml”, where we configure details such as the cluster address. A configuration example could be as follows:

client:
  hosts:
    — localhost
  port: 9200
  url_prefix:
  use_ssl: False
  certificate:
  client_cert:
  client_key:
  aws_key:
  aws_secret_key:
  aws_region:
  ssl_no_validate: False
  http_auth:
  timeout: 240
  master_only: False
logging:
  loglevel: INFO
  logfile:
  logformat: default
  blacklist: [‘elasticsearch’, ‘urllib3’]

The other YAML is “curator-action.yml”, where we configure a action list to be executed by Curator. On the example, we have indexes of data from Twitter, with the prefix “twitter”, where we first create a backup from indexes that are more then 2 days old and after the backup, we purge the data:

actions:
 1:
   action: snapshot
   description: >-
     Make backups of indices older then 2 days.
   options:
     repository: elasticsearch_backups
     name: twitter-%Y.%m.%d
     ignore_unavailable: False
     include_global_state: True
     partial: False
     wait_for_completion: True
     skip_repo_fs_check: False
     timeout_override:
     continue_if_exception: False
     disable_action: False
   filters:
   — filtertype: age
     source: creation_date
     direction: older
     unit: days
     unit_count: 2
     exclude:
  2:
    action: delete_indices
    description: >-
      Delete indices older than 2 days (based on index name).
    options:
      ignore_empty_list: True
      timeout_override:
      continue_if_exception: False
      disable_action: False
    filters:
    — filtertype: pattern
      kind: prefix
      value: twitter-
      exclude:
    — filtertype: age
      source: name
      direction: older
      timestring: ‘%Y.%m.%d’
      unit: days
      unit_count: 2
      exclude:

With the YAMLs configured, we can execute Curator, with the following command:

curator — config curator-config.yml curator-action.yml

The command will generate a log from the actions performed, showing that our configurations were a success:

2016–08–27 16:14:36,576 INFO Action #1: snapshot
2016–08–27 16:14:40,814 INFO Creating snapshot “twitter-2016.08.27” from indices: [u’twitter-2016.08.14', u’twitter-2016.08.25']
2016–08–27 16:15:34,725 INFO Snapshot twitter-2016.08.27 successfully completed.
2016–08–27 16:15:34,725 INFO Action #1: completed
2016–08–27 16:15:34,725 INFO Action #2: delete_indices
2016–08–27 16:15:34,769 INFO Deleting selected indices: [u’twitter-2016.08.14', u’twitter-2016.08.25']
2016–08–27 16:15:34,769 INFO — -deleting index twitter-2016.08.14
2016–08–27 16:15:34,769 INFO — -deleting index twitter-2016.08.25
2016–08–27 16:15:34,860 INFO Action #2: completed
2016–08–27 16:15:34,861 INFO Job completed.

That’s it! Now it is just schedule this script to execute from time to time – once per day, for example – and we will have automated backups and purges.

Thank you for following me on this post, until next time.

Docker: using containers to implement a Microservices architecture

Standard

Hello, dear readers! Welcome to another post from my blog. On this post, we will talk about the virtualization technology called Docker, which gained a ton of popularity this days, specially when we talk about Microservices architectures. But after all, what it is virtualization?

Virtualization

Virtualization, as the name implies, consists in the use of software to emulate any part of a environment, ranging from hardware components to a entire OS. In this world, there is several traditional technologies such as VMWare, Virtualbox, etc. This technologies, however, use hypervisors, which creates a intermediate layer responsible for isolating the virtual machine from the physical machine.

On containers, however, we don’t have this hypervisor layer. Instead, containers run directly on top of the kernel itself of the host machine they are running. This leads to a much more lightweight virtualization, allowing us to run a lot more VMs on a single host then we could do with a hypervisor.

Of course that using virtualization with hypervisors over containers has his benefits too, such as more flexibility – since you can’t for example run a windows container from a linux machine – and security, since this shared kernel approach leads to a scenario where if a attacker invades the kernel, he automatically get access to all the containers provided by that kernel, although we can say the same if the attacker invades the hypervisor of a machine running some VMs. This is a very heated debate, which we can find a lot of discussion by the Internet.

The fact is that containers are here to stay and his lightweight implementation provides a very good foundation for a lot of applications, such as a Microservices architecture. So, without further delay, let’s begin do dive in on Docker, one of the most popular container engines on the market today.

Docker Architecture

Docker was developed by Docker inc (formerly DotCloud inc) as a open-source engine for deployment of applications on containers. By providing a logical layer that manages the lifecycle of the containers, we can focus our development on the applications themselves, leaving the implementation of the container management to Docker.

The Docker architecture consists of a client-server model, where we have a Docker client, that could be the command line one provided by Docker, or a consumer of the RESTFul API also provided on the toolset, and the Docker server, also known as Docker daemon, which receives requests to create/start/stop containers, been responsible for the container management. The diagram bellow illustrates this architecture:

docker-architecture

On the image above we can see the Docker clients connecting to the daemon, making requests to start/stop/create containers etc. We can also see that the daemon is communicating with a image repository and using images while processing the requests. What are those images for? That’s what we will find out on the next section.

Containers & Images

When we talk about containers on Docker, we talk about processes running from instructions that were set on images. We can understand images as the building blocks from which containers are build, that way organizing the building of containers on a Docker environment.

This images are distributed on repositories, also known as a docker registry, which in turn are versioned using git. The main repository for the distribution of Docker images is, of course, the Docker Hub, managed by Docker itself.

A interesting thing is that the code behind the Docker Hub is open, so it is perfectly possible to host your own image repository. To know more about this, please visit:

github.com/docker/distribution

Docker Union File System

If the reader is familiar with traditional virtualization with hypervisors, he could be thinking: “How Docker manages the file system so the images didn’t get ‘dirt’ from the files generated on the executions of all the created containers?”. This could be even worse when we think that images can be inherited from other images, making a whole image tree.

The answer for this is how Docker utilizes the file system, with a file system called Union File System. On this system, each image is layered as a read-only layer. The layers are then overlapped on top of each other and finally on top of the chain a read-write layer is created, for the use of the container. This way, none of the image’s file systems are altered, making the images clean for use across multiple containers.

Using Docker in other OS

Docker is designed for use on Linux distributions. However, for use as a development environment, Docker inc released a Docker Toolbox that allows it to run Docker both on OSX and Windows environments as well.

In order to do this, the toolbox install virtualbox inside his distribution, as well as a micro linux VM, that only has the minimum kernel packages necessary for the launch of containers. This way, when we launch the terminal of the toolbox, the launcher starts the VM on virtualbox, starting Docker inside the VM.

One important thing to notice is that, when we launch Docker this way, we can’t use localhost anymore when we want to access Docker from the same host it is running, since it is binded to a specific ip. We can see the ip Docker is bind to by the initialization messages, as we can see on the image bellow, from a Docker installation on OSX:

Another thing to notice is that, when we use docker on OSX or Windows, we don’t need to use sudo to execute the commands, while on Linux we will need to execute the commands as root. Another way to use Docker that removes the need to use sudo is by having a group called ‘docker’ on the machine, which will make docker apply the permissions necessary to run docker without sudo to the group. Notice, however, that users from this group will have root permissions, so caution is required.

Docker commands

Well, now that we get the concepts out of the way, let’s start using Docker!

As said before, Docker uses image repositories to resolve the images it needs to run the containers. Let’s begin by running a simple container, to make a famous ‘hello world!’, Docker style.

To do this, let’s open a terminal – or the Docker QuickStart Terminal on other OS – and type:

docker run ubuntu echo 'hello, Docker!'

This is the run command, which creates a new container each time is called. On the command above,   we simply asked Docker to create a new container, using the image ubuntu – if it doesn’t have already, Docker will search Docker Hub for the latest version of the image and download it. If we wanted to use a specific version, we could specify like this: ‘ubuntu:14.04’ – from Docker Hub. At the end, we specify a command for the container to run, on this case, a simple “echo ‘hello world!’ “.

The image bellow show the command in action, downloading the image and executing:

To list the containers created, we run the command ‘docker ps’. If we run the command this way now, however, we won’t see anything, because the command by default only shows the active containers and on our case, our container simply stopped after running the command. If we run the command with the flag -a, we can then see the stopped containers, like the image bellow:

However, there’s a problem: our hello world container was just a test, so we didn’t want to keep with the container. Let’s correct this by removing the container, with the command:

docker rm 2f134296daa6

PS: The hash you can see on the command is part of the ID of the container, that you can see when you list the containers with docker ps.

Now, what about if we wanted to run again our hello world container, or any other container that we need just once, without the need to manually remove the container afterwards? We can do this using the –rm flag, running like this:

docker run --rm ubuntu echo 'hello, Docker!'

If we run docker ps -a again, after running the command above, we can see that there’s no containers  from our previous execution, proving that the flag worked correctly.

Let’s now do a little more complex example, starting a container with a Tomcat server. First, let’s search for the name of the image we want on Docker Hub. To do this, open a browser and type it:

https://hub.docker.com

Once inside the Docker Hub, let’s search for the image typing ‘tomcat’ on the search box on the top right corner of the site. We will be send to a screen like the one bellow. You can see the images with some classifiers like ‘official’ and ‘automated’.

Official means that the image is maintained by Docker itself, while automated means that the image is maintained by a CI workload.On the section ‘Publishing on Docker Hub’ we will understand more about the options to publish our own images to the Docker Hub.

From the Docker Hub we get that the name of the official image is ‘tomcat’, so let’s use this image. To use it, we simply run the following command, which will start a new container with the tomcat image:

docker run --name mytomcat -p 8888:8080 tomcat:8.0

You can see that we used some new flags on this command. First, we used the flag –name, which makes a name for our container, so we can refer to this name when running commands afterwards.

The -p flag is used to bind a port from the host machine to a port on the container. On the next sections, we will talk about creating our own images and we will see that we can expose ports from the container to be accessed by clients, like in this case that we exposed the port tomcat will serve on the container (8080) to the port 8888 of the host.

Lastly, when declaring the image we will use, we specified the version, 8.0, meaning that we want to use Tomcat 8.0. Note that we didn’t passed any command to the container, since the image is already configured to start the Tomcat server after the building. After running the command, we can see that tomcat is running inside our container:

The problem is that now our terminal is occupied by the tomcat process so we can’t issue more commands. Let’s press Crtl+C and type the docker ps command. The container is not active! the reason for this is that, by default, Docker don’t put the containers to run in background when we create a new container. To do this, we use the -d flagso on the previous command, all we had to do is include this flag to make the container start on background.

Let’s start the container again with the command docker start:

docker start mytomcat

After the start, we can see that the container is running, if we run docker ps:

Before we open the server on the browser, let’s just check if the container is exposed on the port we defined. To do this, we use the command:

docker port mytomcat

This command will return the following result:

8080/tcp -> 0.0.0.0:8888

Which means that the container is exposed on the port 8888, as defined. If we open the browser on localhost:8888 – or the ip binded by Docker on a non-linux environment – we will receive the following screen:

Excellent! Now we have our own dockerized Tomcat! However, by starting the container in background, we couldn’t see the logging of the server, to see if there wasn’t any problem on the startup of the web server. let’s see the last lines of the log of our container by entering the command:

docker logs mytomcat

This will show the last inputs of our container on the stdout. We could also use the command:

docker attach mytomcat

This command has the same effect of the tail command on a file, making it possible to watch the logs of the container. Take notice that after attaching to a container, pressing Ctrl+C will kill him!

Before ending our container, let’s see more detailed information about our container, such as the Java version, network mappings etc. To do this, we run the following command:

docker inspect mytomcat

This will produce a result like the following, in JSON format:

[

{

“Id”: “14598a44a3b4fe2a5b987ea365b2c83a0399f4fda476ad1754acee23c96fcc22”,

“Created”: “2016-01-03T17:59:49.632050403Z”,

“Path”: “catalina.sh”,

“Args”: [

“run”

],

“State”: {

“Status”: “running”,

“Running”: true,

“Paused”: false,

“Restarting”: false,

“OOMKilled”: false,

“Dead”: false,

“Pid”: 4811,

“ExitCode”: 0,

“Error”: “”,

“StartedAt”: “2016-01-03T17:59:49.728287604Z”,

“FinishedAt”: “0001-01-01T00:00:00Z”

},

“Image”: “af28fa31b54b2e45d53e80c5a7cbfd2693f198fdb8ba53d44d8a432832ad1012”,

“ResolvConfPath”: “/mnt/sda1/var/lib/docker/containers/14598a44a3b4fe2a5b987ea365b2c83a0399f4fda476ad1754acee23c96fcc22/resolv.conf”,

“HostnamePath”: “/mnt/sda1/var/lib/docker/containers/14598a44a3b4fe2a5b987ea365b2c83a0399f4fda476ad1754acee23c96fcc22/hostname”,

“HostsPath”: “/mnt/sda1/var/lib/docker/containers/14598a44a3b4fe2a5b987ea365b2c83a0399f4fda476ad1754acee23c96fcc22/hosts”,

“LogPath”: “/mnt/sda1/var/lib/docker/containers/14598a44a3b4fe2a5b987ea365b2c83a0399f4fda476ad1754acee23c96fcc22/14598a44a3b4fe2a5b987ea365b2c83a0399f4fda476ad1754acee23c96fcc22-json.log”,

“Name”: “/mytomcat”,

“RestartCount”: 0,

“Driver”: “aufs”,

“ExecDriver”: “native-0.2”,

“MountLabel”: “”,

“ProcessLabel”: “”,

“AppArmorProfile”: “”,

“ExecIDs”: null,

“HostConfig”: {

“Binds”: null,

“ContainerIDFile”: “”,

“LxcConf”: [],

“Memory”: 0,

“MemoryReservation”: 0,

“MemorySwap”: 0,

“KernelMemory”: 0,

“CpuShares”: 0,

“CpuPeriod”: 0,

“CpusetCpus”: “”,

“CpusetMems”: “”,

“CpuQuota”: 0,

“BlkioWeight”: 0,

“OomKillDisable”: false,

“MemorySwappiness”: -1,

“Privileged”: false,

“PortBindings”: {

“8080/tcp”: [

{

“HostIp”: “”,

“HostPort”: “8888”

}

]

},

“Links”: null,

“PublishAllPorts”: false,

“Dns”: [],

“DnsOptions”: [],

“DnsSearch”: [],

“ExtraHosts”: null,

“VolumesFrom”: null,

“Devices”: [],

“NetworkMode”: “default”,

“IpcMode”: “”,

“PidMode”: “”,

“UTSMode”: “”,

“CapAdd”: null,

“CapDrop”: null,

“GroupAdd”: null,

“RestartPolicy”: {

“Name”: “no”,

“MaximumRetryCount”: 0

},

“SecurityOpt”: null,

“ReadonlyRootfs”: false,

“Ulimits”: null,

“LogConfig”: {

“Type”: “json-file”,

“Config”: {}

},

“CgroupParent”: “”,

“ConsoleSize”: [

0,

0

],

“VolumeDriver”: “”

},

“GraphDriver”: {

“Name”: “aufs”,

“Data”: null

},

“Mounts”: [],

“Config”: {

“Hostname”: “14598a44a3b4”,

“Domainname”: “”,

“User”: “”,

“AttachStdin”: false,

“AttachStdout”: false,

“AttachStderr”: false,

“ExposedPorts”: {

“8080/tcp”: {}

},

“Tty”: false,

“OpenStdin”: false,

“StdinOnce”: false,

“Env”: [

“PATH=/usr/local/tomcat/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin”,

“LANG=C.UTF-8”,

“JAVA_VERSION=7u91”,

“JAVA_DEBIAN_VERSION=7u91-2.6.3-1~deb8u1”,

“CATALINA_HOME=/usr/local/tomcat”,

“TOMCAT_MAJOR=8”,

“TOMCAT_VERSION=8.0.30”,

“TOMCAT_TGZ_URL=https://www.apache.org/dist/tomcat/tomcat-8/v8.0.30/bin/apache-tomcat-8.0.30.tar.gz”

],

“Cmd”: [

“catalina.sh”,

“run”

],

“Image”: “tomcat:8.0”,

“Volumes”: null,

“WorkingDir”: “/usr/local/tomcat”,

“Entrypoint”: null,

“OnBuild”: null,

“Labels”: {},

“StopSignal”: “SIGTERM”

},

“NetworkSettings”: {

“Bridge”: “”,

“SandboxID”: “37fe977f6a45f4495eae95260a67f0e99a8168c73930151926ad521d211b4ac6”,

“HairpinMode”: false,

“LinkLocalIPv6Address”: “”,

“LinkLocalIPv6PrefixLen”: 0,

“Ports”: {

“8080/tcp”: [

{

“HostIp”: “0.0.0.0”,

“HostPort”: “8888”

}

]

},

“SandboxKey”: “/var/run/docker/netns/37fe977f6a45”,

“SecondaryIPAddresses”: null,

“SecondaryIPv6Addresses”: null,

“EndpointID”: “09477abf22e8019e2f8d3e0deeb18d9362136954a09e336f4a93286cb3b8b027”,

“Gateway”: “172.17.0.2”,

“GlobalIPv6Address”: “”,

“GlobalIPv6PrefixLen”: 0,

“IPAddress”: “172.17.0.3”,

“IPPrefixLen”: 16,

“IPv6Gateway”: “”,

“MacAddress”: “02:42:ac:11:00:03”,

“Networks”: {

“bridge”: {

“EndpointID”: “09477abf22e8019e2f8d3e0deeb18d9362136954a09e336f4a93286cb3b8b027”,

“Gateway”: “172.17.0.2”,

“IPAddress”: “172.17.0.3”,

“IPPrefixLen”: 16,

“IPv6Gateway”: “”,

“GlobalIPv6Address”: “”,

“GlobalIPv6PrefixLen”: 0,

“MacAddress”: “02:42:ac:11:00:03”

}

}

}

}

]

Now, let’s stop our container, by entering:

docker stop mytomcat

Finally, let’s remove the container, since we won’t use him anymore on this practice:

docker rm mytomcat

Let’s also remove the image, since we won’t use it either on our next steps:

docker rmi tomcat:8.0

And that concludes our breaking course on Docker’s commands! There are also other useful commands as well, of course, such as:

  • docker build: Used to build a image from a Dockerfile (see next sections);
  • docker commit: Creates a image from a container;
  • docker push: Pushes the image for a registry (Docker Hub by default);
  • docker exec: Submit a command for a running container;
  • docker export: Export the file system of a container as a tar file;
  • docker images: list the images inside Docker;
  • docker kill: force kill a running container;
  • docker restart: restart a container;
  • docker network: manages docker networks (see next sections);
  • docker volume: manages docker volumes (see next sections);

Creating your own images

On the previous section, we used a image from the Docker Hub, that creates a container with a Tomcat Web Server. This image is implemented by a script with building instructions called Dockerfile. On our lab we will create a Dockerfile, but for now, let’s just examine the Dockerfile from the tomcat’s image in order to learn some of the instructions available:

FROM java:7-jre
ENV CATALINA_HOME /usr/local/tomcat

ENV PATH $CATALINA_HOME/bin:$PATH

RUN mkdir -p “$CATALINA_HOME”

WORKDIR $CATALINA_HOME
# see https://www.apache.org/dist/tomcat/tomcat-8/KEYS

RUN gpg –keyserver pool.sks-keyservers.net –recv-keys \

05AB33110949707C93A279E3D3EFE6B686867BA6 \

07E48665A34DCAFAE522E5E6266191C37C037D42 \

47309207D818FFD8DCD3F83F1931D684307A10A5 \

541FBE7D8F78B25E055DDEE13C370389288584E7 \

61B832AC2F1C5A90F0F9B00A1C506407564C17A3 \

79F7026C690BAA50B92CD8B66A3AD3F4F22C4FED \

9BA44C2621385CB966EBA586F72C284D731FABEE \

A27677289986DB50844682F8ACB77FC2E86E29AC \

A9C5DF4D22E99998D9875A5110C01C5A2F6059E7 \

DCFD35E0BF8CA7344752DE8B6FB21E8933C60243 \

F3A04C595DB5B6A5F1ECA43E3B7BBB100D811BBE \

F7DA48BB64BCB84ECBA7EE6935CD23C10D498E23
ENV TOMCAT_MAJOR 8

ENV TOMCAT_VERSION 8.0.30

ENV TOMCAT_TGZ_URL https://www.apache.org/dist/tomcat/tomcat-$TOMCAT_MAJOR/v$TOMCAT_VERSION/bin/apache-tomcat-$TOMCAT_VERSION.tar.gz
RUN set -x \

&& curl -fSL “$TOMCAT_TGZ_URL” -o tomcat.tar.gz \

&& curl -fSL “$TOMCAT_TGZ_URL.asc” -o tomcat.tar.gz.asc \

&& gpg –verify tomcat.tar.gz.asc \

&& tar -xvf tomcat.tar.gz –strip-components=1 \

&& rm bin/*.bat \

&& rm tomcat.tar.gz*
EXPOSE 8080

CMD [“catalina.sh”, “run”]

As we can see, the first instruction is called FROM. This instruction delimits the base image upon the image will be constructed. On this case, the java:7-jre image will create a basic Linux environment, with Java 7 configured.

Then we see some ENV instructions. This commands set environment variables on the OS, as part of Tomcat’s configuration. We can also see a WORKDIR instruction, which defines the directory that, from that point, Docker will use to run the commands.

We also see some RUN instructions. This instructions, as the name implies, run commands on the container.On the case of our image, this instructions make tomcat’s installation process.

Finally we see the EXPOSE instruction, which exposes the 8080 port for the host. Finally, we see the CMD instruction, which defines the start of the server. One important distinction of the CMD command from the RUN ones is that, while the RUN instructions only executes on the build phase, the CMD instruction is executed on the container’s startup, making it the command to be executed on the creation and start of a container. For this reason, it is only allowed to have one CMD command per Dockerfile.

One important side note about the CMD command is that, when starting a container, it allows to override the command on the Dockerfile, so it could lead to security holes when used. For this reason, it is recommended to use the ENTRYPOINT instruction instead, that like the CMD instruction, it defines the command the container will run at startup, but on this case, it will not permit the command to be overridden, making it more secure.

We could also use ENTRYPOINT and CMD combined, making it possible to restrict the user to only pass flags and/or arguments to the running command.

Of course, that is not the only instructions we can use on a Dockerfile. Other instructions are:

  • MAINTAINER: Defines the author of the image;
  • LABEL: Adds metadata to a image, in a key=value format;
  • ADD: Adds a file, folder or remote file URL to a folder inside the container. The current directory from which this instruction point it out is the same directory of the Dockerfile;
  •  COPY: Analogous to ADD, this instruction also copies files and folders from the host to the container. The two major differences are that COPY doesn’t allow the use of URLs and COPY doesn’t uncompress known files, like ADD can;
  • VOLUME: Creates a volume. On the section “creating the base image” we will learn in more detail about what are volumes on Docker;
  • USER: Defines the name of the user used to run the RUN, CMD and ENTRYPOINT instructions;
  • ARG: Defines build-time variables, that could be used to pass parameters for the image’s buildings, using the build-arg flag;
  • ONBUILD: Defines a command to be triggered by other images, when the image is used as a base image for other images;
  • STOPSIGNAL: Defines the code for the container to stop, as a number or in the SIGNAME format, for instance SIGKILL.

Publishing on Docker Hub

As we saw on the previous section, in order to create our own images, all we have to do is create a “Dockerfile” file, with the instructions necessary for the building. However, in order to distribute our images, we need to register them on a image registry, like the Docker Hub, for example.

The simplest way to publish images are with the commit and push commands. For example, if we had  made changes to our Tomcat container and wanted to save those changes as a new image, we could do this with this command:

docker commit mytomcat alexandreesl/mytomcat

Where the second argument is the <repository ID>/<image name>. In order to push the images to Docker Hub, the repository ID must be equals to the username of our Docker Hub account.

After committing the changes, all we have to do is push the changes to the Hub, using the following command:

docker push alexandreesl/mytomcat

And that’s it! After the push, if we see our Docker Hub account, we can see that our image was successfully created:

NOTE: before pushing, you could have to run the command docker login to register your credentials from the Docker Hub

Another way of publishing is by linking our images with a git repository, this way the Docker Hub will rebuild the image each time a new version of the Dockerfile is pushed to the repository. Is this method of publishing that generates automated build images, since this way Docker can establish a CI Workload with the git repository. We will see this method in action on the next section, ‘Creating the base image’.

Practice: Using Docker to implement a microservices architecture

On our lab, we will take a step forward from my previous post about microservices with Spring Boot (haven’t see it? you can find it here, I am really grateful if you read that post as well!), by using a Service Registry called Eureka, designed by Netflix.

On the Service Registry pattern, we have a registry where microservices can register/unregister and also find the addresses of a service dynamically, this way decoupling the bounds between them. We will dockerize (run inside a container) the services of that lab and make them use Eureka, which will also run inside a container. In order to integrate with Eureka, we will modify our applications to use the Spring Cloud project, which according to the project’s description:

Spring Cloud provides tools for developers to quickly build some of the common patterns in distributed systems (e.g. configuration management, service discovery, circuit breakers, intelligent routing, micro-proxy, control bus, one-time tokens, global locks, leadership election, distributed sessions, cluster state). Coordination of distributed systems leads to boiler plate patterns, and using Spring Cloud developers can quickly stand up services and applications that implement those patterns. They will work well in any distributed environment, including the developer’s own laptop, bare metal data centres, and managed platforms such as Cloud Foundry.

Also, to “Springfy” even more our example, we will exchange our implementation from the previous post that uses pure jax-rs to the RestController implementation, present on the Spring Web project.

Creating the base image

Well, so let’s begin our practice! First, we will create a Docker Network to accommodate the containers.

If we see the network adapters from the host machine when the Docker Daemon is up, we will see that he creates a bridge adapter on the host, subsequently appending the containers on network interfaces inside the bridge adapter. This forms a subnet where the containers can see each other and also Docker facilitates the work for us, by mapping the container’s IPs with their names, on the /etc/hosts files inside.

On our scenario, we will use this feature so the microservices can easily find our Eureka registry, by mapping the address to the container’s name. Of course that on a real scenario we would have a cluster of Eurekas with a load balancer on separate hosts, but for simplicity’s sake of our lab, we will just use one Eureka instance.

So, in order to create a network to accommodate our architecture, first we create a network, by running the following command:

docker network create microservicesnet

After running the command we will see a hash’s ID indicating our network was created. We can see the details of our network by running the following command:

docker network inspect microservicesnet

Which will produce a result like the following:

[

{

“Name”: “microservicesnet”,

“Id”: “e8d401a00de26b74f4f2461c13dbf848c14f37127b3337a3e40465eff5897910”,

“Scope”: “local”,

“Driver”: “bridge”,

“IPAM”: {

“Driver”: “default”,

“Config”: [

{}

]

},

“Containers”: {},

“Options”: {}

}

]

Notice that, for now, the containers object is empty. This is why we didn’t add any containers to the network yet, but this will soon change.

Now we will create the Dockerfile. Create a folder in the directory of your choice and create a file called ‘Dockerfile’ (without any extension). On my case, I will push my Dockerfile to a Github repository, in order to create the image on Docker Hub as a automated build image. You can find the repositories with the source for this lab at the end of the post.

So, without further delay, let’s code the Dockerfile. The code for our image will be the following:

FROM java:8-jre

MAINTAINER Alexandre Lourenco <alexandreesl@example.com>

VOLUME [ "/data" ]

WORKDIR /data

EXPOSE 8080
ENTRYPOINT [ "java" ]
CMD ["-?"]

 

As we can see, it is a simple Dockerfile. We use Java’s 8 official image as the base,  define a volume and set his folder as our Workdir, expose the 8080 port which is the default for Spring Boot and finally we combine the entrypoint and cmd commands, meaning that anything we pass when we get up a container will be treated as parameters for the Java command. But what is this volume we have speaked of?

The reader must remember what we saw about the Union File System and how each image is layered as a read-only layer. The volumes on Docker is a technique that bypasses the Union File System, by defining a mount point on the Containers that points for a shared space on the host machine. The uses of this technique are in order to provide a place where the container can read/write information that needs to be persisted, alongside the need to share data between containers.

On our scenario, we will use the volume to point to a place where the jars of our microservices will be generated, so when a container runs a microservice, it runs the last version of the software. On a real-world scenario, this place could be a result of a CI workload, for example from a Jenkins job.

Now that we have our image, let’s build the image to test his correctness:

docker build -t alexandreesl/microservices-spring-boot-base .

At the end, we will see a message that our building was successful, validating the Dockerfile. Out of curiosity, if you see the building process, you will notice that Docker created several temporary containers for each instruction of our script, maintaining a status of the last successful instruction executed, so if we have to rebuild the image after a failure, we can restart from the failed point. We can disable this feature with the flag -no-cache if we want.

Now, let’s register the image on Docker Hub. I have created a Github repository here, so I will register the image directly on the site. To do this, we log in on our account and click on the “create automated build” menu item, as shown on the screen bellow:

After clicking, if we haven’t linked a github account yet, we will be prompted to do so. after making the linking, we will see a page with the list of our git repositories. We select the one with the Dockerfile and finally create the image, entering the name and a description for the image, as the screen bellow:

After clicking on the create button, we have completed our step, successfully registering our image on the Docker Hub! You can see my image on this link.

Lastly, just to test if our Docker Hub upload was really successful, we will remove the image from the local cache – that we added with our docker build command – and download using the docker pull command. First, let’s remove the image with the command:

docker rmi alexandreesl/microservices-spring-boot-base

And after, let’s pull the image with the command:

docker pull alexandreesl/microservices-spring-boot-base

After the download, we will have the image from the Docker Hub repository.

Preparing the service registry image

For the service registry image, we actually don’t have to do any coding, since we will use a already made image. We will start the image on the “Launching the containers” section, but the reader can see the image’s page to satisfy his curiosity here.

Preparing the services to use the registry

Now, Let’s prepare the services for the registering/deregistering of microservices, alongside service discovery when a service has dependencies.

To focus on the explanation, I will omit some details like the pom configuration, since the reader can easily get the configuration on the Github repository from the lab. On this lab, we have 3 microservices: a Customer service, a Product service and a Order service. The Order service has dependencies on the other 2 services, in order to mount a Order.

On the 3 projects we will have the same configuration for Spring Boot’s main class, called Application,  as follows:

@SpringBootApplication
@Configuration
@EnableAutoConfiguration
@EnableEurekaClient
public class Application {

public static void main(String[] args) {
 SpringApplication.run(Application.class, args);
 }
}

The key line here is the annotation @EnableEurekaClient, which configures the Spring Boot’s application to work with Eureka. Just with these annotation alone, we configure Spring Boot to connect to Eureka at startup and register itself with, send heartbeats during his lifecycle to keep the registration alive and finally make a unregistration when the process is terminated. Also, it instantiate a RestTemplate with a Ribbon load balancer, also made by Netflix, that can easily lookup service addresses from the registry. All of this with just one annotation!

In order to connect to Eureka, we have to provide the registry’s address. this is made by configuring a YAML file, called application.yml, that we put it on the resources source folder. We also configure a application name on this file, in order to inform Eureka about what Application’s ID we would like to use for the microservice.

So, in order to make this configuration, we create the files on our projects. seeing a example, for the Order’s service,  we have the following YAML configuration:

spring:
 application:
 name: OrderService
eureka:
 client:
 serviceUrl:
 defaultZone: http://eureka:8761/eureka/

Notice that when we configure Eureka’s address, we used “eureka” as the host name. This is the name of the container that we will use in order to deploy the Eureka server on our Docker Network, so on the /etc/hosts files of our containers, this name will be mapped to Eureka’s container IP, making it possible to point it out dynamically.

Now let’s see how our Microservices were implemented. For the Customer service, we have the following code:

@RestController
@RequestMapping("/")
public class CustomerRest {

private static List<Customer> clients = new ArrayList<Customer>();

static {
Customer customer1 = new Customer();
 customer1.setId(1);
 customer1.setName("Cliente 1");
 customer1.setEmail("customer1@gmail.com");

Customer customer2 = new Customer();
 customer2.setId(2);
 customer2.setName("Cliente 2");
 customer2.setEmail("customer2@gmail.com");

Customer customer3 = new Customer();
 customer3.setId(3);
 customer3.setName("Cliente 3");
 customer3.setEmail("customer3@gmail.com");

Customer customer4 = new Customer();
 customer4.setId(4);
 customer4.setName("Cliente 4");
 customer4.setEmail("customer4@gmail.com");

Customer customer5 = new Customer();
 customer5.setId(5);
 customer5.setName("Cliente 5");
 customer5.setEmail("customer5@gmail.com");

clients.add(customer1);
 clients.add(customer2);
 clients.add(customer3);
 clients.add(customer4);
 clients.add(customer5);
}

@RequestMapping(method = RequestMethod.GET, produces = MediaType.APPLICATION_JSON_VALUE)
 public List<Customer> getClientes() {
 return clients;
 }

@RequestMapping(value = "customer/{id}", method = RequestMethod.GET, produces = MediaType.APPLICATION_JSON_VALUE)
 public Customer getCliente(@PathVariable long id) {

Customer cli = null;

for (Customer c : clients) {

if (c.getId() == id)
 cli = c;

}

return cli;
 }
}

As we can see, nothing out of normal, very simple usage of Spring’s REST Controllers. Now, on to the Product service:

@RestController
@RequestMapping("/")
public class ProductRest {

private static List<Product> products = new ArrayList<Product>();

static {

Product product1 = new Product();
 product1.setId(1);
 product1.setSku("abcd1");
 product1.setDescription("Produto1");

Product product2 = new Product();
 product2.setId(2);
 product2.setSku("abcd2");
 product2.setDescription("Produto2");

Product product3 = new Product();
 product3.setId(3);
 product3.setSku("abcd3");
 product3.setDescription("Produto3");

Product product4 = new Product();
 product4.setId(4);
 product4.setSku("abcd4");
 product4.setDescription("Produto4");

products.add(product1);
 products.add(product2);
 products.add(product3);
 products.add(product4);

}

@RequestMapping(method = RequestMethod.GET, produces = MediaType.APPLICATION_JSON_VALUE)
 public List<Product> getProdutos() {
 return products;
 }

@RequestMapping(value = "product/{id}", method = RequestMethod.GET, produces = MediaType.APPLICATION_JSON_VALUE)
 public Product getProduto(@PathVariable long id) {

Product prod = null;

for (Product p : products) {

if (p.getId() == id)
 prod = p;

}

return prod;
 }

}

Again, nothing unusual on the code. Now, let’s see the Order service, where we will see the registry been used for consuming microservices:

@RestController
@RequestMapping("/")
public class OrderRest {

private static long id = 1;

// Created automatically by Spring Cloud
 @Autowired
 @LoadBalanced
 private RestTemplate restTemplate;

private Logger logger = Logger.getLogger(OrderRest.class);

@RequestMapping(value = "order/{idCustomer}/{idProduct}/{amount}", method = RequestMethod.GET, produces = MediaType.APPLICATION_JSON_VALUE)
 public Order submitOrder(@PathVariable long idCustomer, @PathVariable long idProduct, @PathVariable long amount) {

Order order = new Order();

 Map map = new HashMap();

 map.put("id", idCustomer);

ResponseEntity<Customer> customer = restTemplate.exchange("http://CUSTOMERSERVICE/customer/{id}",
 HttpMethod.GET, null, Customer.class, map);

 map = new HashMap();

 map.put("id", idProduct);

ResponseEntity<Product> product = restTemplate.exchange("http://PRODUCTSERVICE/product/{id}", HttpMethod.GET,
 null, Product.class, map);

order.setCustomer(customer.getBody());
order.setProduct(product.getBody());
order.setId(id);
order.setAmount(amount);
order.setOrderDate(new Date());

logger.warn("The order " + id + " for the client " + customer.getBody().getName() + " with the product "
 + product.getBody().getSku() + " with the amount " + amount + " was created!");

id++;

return order;
 }
}

The first thing we notice is the autowired RestTemplate, that also has a@LoadBalanced annotation. This RestTemplate is automatically instantiated by Spring Cloud, creating a interface which we could use to communicate with Eureka. The annotation informs that we want to use Ribbon to make the load balance, in case we have a cluster of instances of the same service.

The exchange method we use inside of the submitOrder method is where the magic happens. When we make our calls using this method, internally a lookup is made with the Eureka Server, the addresses of the service are passed to Ribbon in order to load balance the calls, and them the call is made. Notice that we used names like “CUSTOMERSERVICE” as the host address on our URIs. This pattern tells the framework the ID of the application we really want to call, been replaced at runtime with one of the IP addresses of the service we want to call.

And that concludes our quick explanation of the Java code involved on our lab. Again, if the reader want to get the full code, just head for the Github repositories at the end of this post. In order to run the lab, I recommend that you clone the whole repository and execute a mvn package command on each of the 3 projects. If you don’t have Maven, you can get it from here.

Launching the containers

Now that we are almost finished with the lab, it is time for the fun part: run our containers! To do this, we will use docker-compose. With docker-compose, we can start/stop/kill etc full stacks of containers, without having to instantiate everything by hand. In order to make our docker-compose stack, let’s create a YAML file called docker-compose.yml and include the following configuration:

eureka:
 image: springcloud/eureka
 container_name: eureka
 ports:
 - "8761:8761"
 net: "microservicesnet"
customer1:
 image: alexandreesl/microservices-spring-boot-base
 container_name: customer1
 hostname: customer1
 net: "microservicesnet"
 ports:
 - "8080:8080"
 volumes:
 - /Users/alexandrelourenco/Applications/git/docker-handson:/data
 command: -jar /data/Customer-backend/target/Customer-backend-1.0.jar
product1:
 image: alexandreesl/microservices-spring-boot-base
 container_name: product1
 hostname: product1
 net: "microservicesnet"
 ports:
 - "8081:8080"
 volumes:
 - /Users/alexandrelourenco/Applications/git/docker-handson:/data
 command: -jar /data/Product-backend/target/Product-backend-1.0.jar
order1:
 image: alexandreesl/microservices-spring-boot-base
 container_name: order1
 hostname: order1
 net: "microservicesnet"
 ports:
 - "8082:8080"
 volumes:
 - /Users/alexandrelourenco/Applications/git/docker-handson:/data
 command: -jar /data/Order-backend/target/Order-backend-1.0.jar

Some things to notice from our configuration:

  • In all the containers we configured our “microservicesnet” network, in order to use the Docker Network feature to resolve our needs;
  • On the MicroService’s containers, we also defined the hostname, to force Spring Cloud’s registration control to properly register the correct alias for the service’s addresses;
  • We have exposed Eureka’s port to the physical host at 8761, so we can see the web interface from outside the Docker environment;
  • On the volumes sections, I have mapped the base folder where my Maven projects from the microservices are placed, binding to the /data folder inside the container, which is defined as the workdir on the image we created previously. This way, on the command section, when we map the location of the microservice’s Spring Boot jar, we map the location from the /data folder;

Finally, after saving our file, we run the script by simply running the following command, at the same folder of the YAML file:

docker-compose up

We will see some information about our containers  getting up and at the end we will see some log information from all the containers mixed, with the name of the container as a prefix for identification:

After waiting some moments we will see our containers are up. Let’s see if the microservices are up? Let’s open a browser and point it to the port 8761, using localhost or the ip used by Docker in case you are in OSX/Windows:

Excellent! Not only has Eureka booted up successfully, but also all of our services have registered with it. Let’s now toy a little with our stack, to test it out.

Let’s begin by making a search for a customer of ID 1 on the Customer’s service:

curl -XGET 'http://<your ip>:8080/customer/1'

This will produce a JSON response like the following:

{"id":1,"name":"Cliente 1","email":"customer1@gmail.com"}

Next, let’s test out the Product service, with a call for the details of the Product of ID 4:

curl -XGET 'http://<your ip>:8081/product/4'

This time it will produce a response like the following:

{"id":4,"sku":"abcd4","description":"Produto4"}

Finally, the moment of true: let’s call the Order service, that utilises our other 2 services, to see if the service’s addresses are resolved with Eureka’s help. To test it, let’s make the following call:

curl -XGET 'http://<your ip>:8082/order/2/1/4'

If everything goes well, we will see a response like:

{"id":1,"amount":4,"orderDate":1452216090392,"customer":{"id":2,"name":"Cliente 2","email":"customer2@gmail.com"},"product":{"id":1,"sku":"abcd1","description":"Produto1"}}

Success! But can we have further proof? If we see the logs from the Order1 container, we can see some logs showing Ribbon in action, searching for the services we need on the call and initializing load balancers, in order to serve our needs:

Let’s make one last test before closing in: let’s add one more instance of the Product’s service, to see if the new instance is registered under the same application ID. To do this, let’s run the following command. Notice that we didn’t specify the port to expose the service, since we just want to add up the instance to load balance the internal calls from the Order service:

docker run --rm --name product2 --hostname=product2 --net=microservicesnet -v /Users/alexandrelourenco/Applications/git/docker-handson:/data alexandreesl/microservices-spring-boot-base -jar /data/Product-backend/target/Product-backend-1.0.jar

If we open the Eureka web interface again, we will see that the second address is mapped:

Finally, to test the balance, let’s make a call of the previous URL of the Order service. After the call, we will see that our service is aware of both addresses, proving that the balance is implemented, as we can see on the log’s fragment bellow:

[2016-01-08 14:14:09.703] boot – 1  INFO [http-nio-8080-exec-1] — ChainedDynamicProperty: Flipping property: PRODUCTSERVICE.ribbon.ActiveConnectionsLimit to use NEXT property: niws.loadbalancer.availabilityFilteringRule.activeConnectionsLimit = 2147483647

[2016-01-08 14:14:09.709] boot – 1  INFO [http-nio-8080-exec-1] — BaseLoadBalancer: Client:PRODUCTSERVICE instantiated a LoadBalancer:DynamicServerListLoadBalancer:{NFLoadBalancer:name=PRODUCTSERVICE,current list of Servers=[],Load balancer stats=Zone stats: {},Server stats: []}ServerList:null

[2016-01-08 14:14:09.714] boot – 1  INFO [http-nio-8080-exec-1] — ChainedDynamicProperty: Flipping property: PRODUCTSERVICE.ribbon.ActiveConnectionsLimit to use NEXT property: niws.loadbalancer.availabilityFilteringRule.activeConnectionsLimit = 2147483647

[2016-01-08 14:14:09.718] boot – 1  INFO [http-nio-8080-exec-1] — DynamicServerListLoadBalancer: DynamicServerListLoadBalancer for client PRODUCTSERVICE initialized: DynamicServerListLoadBalancer:{NFLoadBalancer:name=PRODUCTSERVICE,current list of Servers=[product1:8080, product2:8080],Load balancer stats=Zone stats: {defaultzone=[Zone:defaultzone; Instance count:2; Active connections count: 0; Circuit breaker tripped count: 0; Active connections per server: 0.0;]

},Server stats: [[Server:product2:8080; Zone:defaultZone; Total Requests:0; Successive connection failure:0; Total blackout seconds:0; Last connection made:Thu Jan 01 00:00:00 UTC 1970; First connection made: Thu Jan 01 00:00:00 UTC 1970; Active Connections:0; total failure count in last (1000) msecs:0; average resp time:0.0; 90 percentile resp time:0.0; 95 percentile resp time:0.0; min resp time:0.0; max resp time:0.0; stddev resp time:0.0]

, [Server:product1:8080; Zone:defaultZone; Total Requests:0; Successive connection failure:0; Total blackout seconds:0; Last connection made:Thu Jan 01 00:00:00 UTC 1970; First connection made: Thu Jan 01 00:00:00 UTC 1970; Active Connections:0; total failure count in last (1000) msecs:0; average resp time:0.0; 90 percentile resp time:0.0; 95 percentile resp time:0.0; min resp time:0.0; max resp time:0.0; stddev resp time:0.0]

]}ServerList:org.springframework.cloud.netflix.ribbon.eureka.DomainExtractingServerList@2561f098

[2016-01-08 14:14:09.738] boot – 1  INFO [http-nio-8080-exec-1] — ConnectionPoolCleaner: Initializing ConnectionPoolCleaner for NFHttpClient:PRODUCTSERVICE

And that concludes our lab. As we can see, we builded a robust stack to deploy our microservices with their dependencies, with little effort.

Clustering container hosts

In all of our examples, we are always working with a single Docker host, our own machine on the case. On a real production environment, we can have some dozens or even hundreds of Docker hosts. In order to make all the hosts to work together, it is necessary to implement some layer that manages the hosts, making them appear as a single cluster to the consumers.

This is the goal of Docker Swarm. With Docker Swarm, we can connect multiple Docker hosts across a network, using them as a single cluster by using Docker Swarm’s interface.

For further information about Docker Swarm alongside a good example of utilization, I suggest consulting the excellent “The Docker Book”, which I describe on the next section.

References

This post was inspired by my studies on Docker with The Docker Book, written by James Turnbull. It is a excellent source of information, that I highly recommend to buy it! You can find the book available to purchase online on:

www.dockerbook.com

Conclusion

And this concludes our tour on the world of Docker. With a simple interface, Docker allows us to use all the power of the container world, allowing us to quickly deploy and escalate our applications. Never it was so simple to deploy our applications! Thank you for following me on another post, until next time.

Continue reading

TDD: Designing our code, test-first

Standard

Hi, dear readers! Welcome to my blog. On this post, we will talk about TDD, a methodology that preaches about focusing first on tests, instead of our production code. I have already talked about test technologies on my post about mockito, so in this post we will focus more on the theoretical aspects of this subject. So, without further delay, let’s begin talking about TDD!

Test-driven development

TDD, also known as Test-driven development, is a development technique created by Kent Beck, which also created JUnit, a highly known Java test library.

TDD, as the name implies, dictates that the development must be guided by tests, that must be written before even the “real” code that implements the requirements is written! Let’s see in more detail how the steps of TDD work on the next section.

TDD steps

As we can see on the picture above, the following steps must be met to proceed in using the TDD paradigm:

  1. Write a test (that fails): Represented by the red circle, it means that before we implement the code – a method on a class, for example – we implement a test case, like a JUnit method for instance, and invoke our code, in this case a simple empty method. Of course, on this scenario the test will fail, since there is no implementation, which leads to our next step;
  2. Write code just enough to pass: Represented by the green circle, it means that we write code on our method just enough to make our test case pass with success. The principle behind this step is that we write code with simplicity in mind, in other words, keeping it simple, like the famous kiss principle;
  3. Refactor the code, improving his quality: Represented by the blue circle, it means that, after we make our code to pass the test, we analyse our code, looking for chances to improve his quality. Is there duplicate code? Remove the duplications! Is there hard coded constants? Consider changing to a enum or a properties table, for instance.

A important thing to notice is that the focus of the third step and the previous one is to not implement more functionality then the test code is testing. The reason for this is pretty obvious: since our focus is to implement the tests for our scenarios first, any functionality that we implement without the test reflecting the scenario is automatically a code without test coverage;

After we conclude the third step, the construction returns to the first step, reinitiating the cycle. On the next iteration, we implement another test case for our method, representing another scenario. We then code the minimum code to implement the new scenario and conclude the process by refactoring the code. This keeps going until all the scenarios are met, leaving at the end not only our final code, but all the test cases necessary to effective test our method, with all the functionalities he is intended to implement.

The reader may be asking: “but couldn’t we get the same by coding the tests after we write our production code?”. The answer is yes, we could get the same results by adding our tests at the end of the process, but then we wouldn’t have something called feedback from our code.

Feedback

When we talk about feedback, we mean the perception we have about how our code will perform. If we think about, the test code is the first client of our code: he will prepare the data necessary for the invocation, invoke our code and then collect the results. By implementing the tests first, we receive more rapidly this feedbacks from our implementation, not only on the sphere of correctness, but also on design levels.

For instance, imagine that we are building a test case and realize that the test is getting difficult to implement because of the complex class structure we have to populate just to make the invocation of our test’s target. This could mean that our class model is too complex and a refactoring is necessary.

By getting this feedback when we are just beginning the development – remember that we always code just enough to the test to pass! – it makes a lot more cheap and less complex to refactor the model then if we receive this feedback just at the end of the construction, when a lot of time and effort was already made! By this point of view, we can easily see the benefits of the TDD approach.

Types of tests

When we talked about tests on the previous sections, we talked about a type of test called unit test. Unit tests are tests focused on testing a single program unit, like a class, not focusing on testing the access to external resources, for example. Of course, there are other types of tests we can implement, as follows:

Unit tests: Unit tests have their focus to test individually each program unit that composes the system, without external interferences.

Integration tests: Integration tests also have the focus to test program units, but in this case to test the integration of the system with external resources, like a database, for example. A good example of a integration test is test cases for a DAO class, testing the code that implement insertions, updates, etc.

System tests: System tests, as the name implies, are tests that focus on testing the whole system, across all his layers. In a web application, for example, that means a automated test that turns on a server, execute some HTTP requests to the web application and analyse the responses. A good example of technology that could be used to test the web tier is a tool called Selenium.

Acceptance tests: Acceptance tests, commonly, are tests made by the end user of the system. The focus of this kind of test is to measure the quality of the implementation of requirements specified by the user. Other requirements such as usability are also evaluated by this kind of test.

A important thing to notice is that this kind of test is also referred as a automated system test, with the objective of testing the requirement as it is, for example:

  • Requirement: the invoice must be inserted on the system by the web interface;
  • Test: create a system test that executes the insertion by the web interface and verifies if the data is correctly inserted;

This technique, called ATDD (Acceptance Test-Driven Development) preaches that first a acceptance test must be created, and then the TDD technique is applied, until the acceptance test is satisfied. The diagram bellow shows this technique in practice:

Mock objects and unit tests

When we talk about unit tests, as said before, it is important to isolate the program unit we are testing, avoiding the interference from other tiers and dependencies that the program unit uses. On the Java world, a good framework we can use to create mocks is Mockito, which we talked about on my previous post.

With mocks, following the principles of TDD, we can, for example, create just the interfaces our code depends on and mock that interfaces, this way already establishing the communication channel that will be created, without leaving our focus from the program unit we are creating.

Another benefit of this approach is on the creation of the interfaces themselves, since our focus is always to implement the minimum necessary for the tests to pass, the resulting interfaces will be simple and concise, improving their quality.

Considerations

When do I use mocks?

A important note about mocks is that not always is good to use them. Taking for example a DAO class, that essentially is just a class that implement code necessary to interact with a database, for instance, the use of mocks won’t bring any value, since the code of the class itself is too simple to benefit from a unit test. On this cases, typically just a integration test is enough, using for example in-memory databases such as HSQLDB to act as the database.

Should I code more then one test case at a time?

In general, is considered a bad practice to write more then one test case at once before running and getting the fail results. The reason for this is that the point of the technique is to test one functionality at a time, which of course is “ruined” by the practice of coding more then one test at once.

How much of a functionality do I code on the test?

On the TDD terminology, we can also call the “amounts” of functionality we code at each iteration as baby steps. There is no universal convention of how much must be implement in each of this baby steps, but it is a good advice to follow common sense. If the developer is experienced and the functionality is simple, it could be even possible to implement almost the whole code on just one iteration.

If the developer is less experienced and/or the functionality is more complex, it makes more sense to spend more time with more baby steps, since it will create more feedbacks for the developer, making it easier to implement the complex requirements.

Should I test private code?

Private code, as the name implies, are code – like a method, for instance – that are accessible only inside the same program unit, like a class, for example. Since the test cases are normally implemented on another program unit (class), the test code can’t access this code, which in turn begs the question: should I make any code to test that private code?

Generally speaking, a common consensus is that private code is intended to implement little things, like a portion of code that is duplicated across multiple methods, for example. In that scenario, if you have for instance a private method on a Java class that it is enormous with lots of functionality, then maybe it means that this method should be made public, maybe even moved to a different class, like a utility class.

If that is not the case, then it is really necessary to design test cases to efficiently test the method, by invoking him indirectly by his public consumer. Talking specifically on the Java World, one way to test the code without the “barrier” of the public consumer is by using reflection.

My target code has too much test cases, is this normal?

When implementing the test cases for a target production code – a method, for example – the developer could end up on a scenario that lots of test cases are created just to test the whole functionality that single code composes. When this kind of situation happens, it is a good practice that the developer analyse if the code doesn’t have too much functionality implemented on itself, which leads to what in OO we call as low cohesion.

To avoid this situation, a refactoring from the code is needed, maybe splitting the code on more methods, or even classes on Java’s case.

Conclusion

And this concludes our post about TDD. By shifting the focus from implementing the code first to implementing the tests first, we can easily make our code more robust, simple and testable.

In a time that software is more important then ever – been on places like airplanes, cars and even inside human beans -, testing efficiently the code is really important, since the consequences of a bad code are getting more and more devastating. Thank you for following me on another post, until next time.