AWS Lambda: building a serverless back-end on Amazon

Standard

Hello, dear readers! welcome to my blog. On this post, we will learn about AWS Lambda, a serverless architectural solution that enables us to quickly deploy serverless back-end infrastructures. But why using this service, instead of good old EC2s?Let’s find it out!

Motivation behind AWS Lambda

Alongside the benefits of developing a back-end using the serverless paradigm – which can be learned on more detail on this other post of mine – another good point on using AWS is pricing.

When deploying your application with a EC2, be a on-demand, spot or reserved one, we are charged by hour. This is true even if our application is not called at all during that hour, resulting on wasted resources and money.

With AWS lambda, Amazon charge us by processing time, as such, it only charges us the time spent on lambdas execution. This results on a much cleaner architecture, where less resources and money are spent. This post details the case on more detail.

AWS Lambda development is based on functions. When developing a lambda, we develop a function that can run as a REST endpoint – served by Amazon API Gateway – or a event processing function, running on events such as a file been uploaded to a S3 bucket.

Limitations

However, not all is simple on this service. When developing with AWS Lambda, two things must be kept in mind: cold starts and resource restrictions.

Cold starts consist of the first time a lambda is called, or after some time is passed and the server – behind the scenes, obviously there are servers that runs the functions, but this is hidden from the user – used to run the lambda is already down due to inactivity. Amazon has algorithms that make the server be up and serving as long as there is a consistent frequency of client calls, but off course, from time to time, there will be idle times.

When a cold start is made, this causes the requests to have a more slow response, since it will wait for a server to be up and running to run the function. This can be worsen if clients have low timeout configurations, resulting on requests failing. This should be taking on account when developing lambdas that act as APIs.

Another important aspect to take note are resource restrictions. Been designed to be used for small functions (“microservices”), lambdas have several limitations, such as amount of memory, disk and cpu. This limits can be increased, but only by a small amount. This link on AWS docs details more about the limits.

One important limit is the running time of the lambda itself. A AWS Lambda can run at most 5 minutes. This is a important limit to understand the nature of what lambdas must be in nature: simple functions, that must not run by long periods of time.

If any of this limits are reached, the lambda will fail his execution.

Lab

For this lab, we will use a framework called Serverless. Serverless is a framework that automates for us some tasks that are a little boring to do if developing with AWS Lambda by hand, such as creating a zip file with all our sources to be uploaded to S3 and creating/configuring all AWS resources. Serverless uses CloudFormation under the hood, managing resource creation and updates for us. For programming language, we will use Python 3.6.

To install Serverless, please follow this guide.

With Serverless installed, let’s begin our lab! First, let’s create a project, by running:

serverless create --template aws-python3 --path MyAmazingAWSLambdaService 

This command will create a new Serverless project, using a initial template for our first Python lambda. Let’s open the project – I will be using PyCharm, but any IDE or editor of choice will suffice – and see what the framework created for us.

Project structure

Serverless created a simple project structure, consisting of a serverless YAML file and a Python script. It is on the YAML that we declare our functions, the cloud provider, IAM permissions, resources to be created etc.

The file created by the command is as follows:

# Welcome to Serverless!
#
# This file is the main config file for your service.
# It's very minimal at this point and uses default values.
# You can always add more config options for more control.
# We've included some commented out config examples here.
# Just uncomment any of them to get that config option.
#
# For full config options, check the docs:
#    docs.serverless.com
#
# Happy Coding!

service: MyAmazingAWSLambdaService

# You can pin your service to only deploy with a specific Serverless version
# Check out our docs for more details
# frameworkVersion: "=X.X.X"

provider:
  name: aws
  runtime: python3.6

# you can overwrite defaults here
#  stage: dev
#  region: us-east-1

# you can add statements to the Lambda function's IAM Role here
#  iamRoleStatements:
#    - Effect: "Allow"
#      Action:
#        - "s3:ListBucket"
#      Resource: { "Fn::Join" : ["", ["arn:aws:s3:::", { "Ref" : "ServerlessDeploymentBucket" } ] ]  }
#    - Effect: "Allow"
#      Action:
#        - "s3:PutObject"
#      Resource:
#        Fn::Join:
#          - ""
#          - - "arn:aws:s3:::"
#            - "Ref" : "ServerlessDeploymentBucket"
#            - "/*"

# you can define service wide environment variables here
#  environment:
#    variable1: value1

# you can add packaging information here
#package:
#  include:
#    - include-me.py
#    - include-me-dir/**
#  exclude:
#    - exclude-me.py
#    - exclude-me-dir/**

functions:
  hello:
    handler: handler.hello

#    The following are a few example events you can configure
#    NOTE: Please make sure to change your handler code to work with those events
#    Check the event documentation for details
#    events:
#      - http:
#          path: users/create
#          method: get
#      - s3: ${env:BUCKET}
#      - schedule: rate(10 minutes)
#      - sns: greeter-topic
#      - stream: arn:aws:dynamodb:region:XXXXXX:table/foo/stream/1970-01-01T00:00:00.000
#      - alexaSkill
#      - alexaSmartHome: amzn1.ask.skill.xx-xx-xx-xx
#      - iot:
#          sql: "SELECT * FROM 'some_topic'"
#      - cloudwatchEvent:
#          event:
#            source:
#              - "aws.ec2"
#            detail-type:
#              - "EC2 Instance State-change Notification"
#            detail:
#              state:
#                - pending
#      - cloudwatchLog: '/aws/lambda/hello'
#      - cognitoUserPool:
#          pool: MyUserPool
#          trigger: PreSignUp

#    Define function environment variables here
#    environment:
#      variable2: value2

# you can add CloudFormation resource templates here
#resources:
#  Resources:
#    NewResource:
#      Type: AWS::S3::Bucket
#      Properties:
#        BucketName: my-new-bucket
#  Outputs:
#     NewOutput:
#       Description: "Description for the output"
#       Value: "Some output value"

For now, let’s just remove all comments in order to have a cleaner file. The other file is a Python script, which have our first function. Let’s see it:

import json


def hello(event, context):
    body = {
        "message": "Go Serverless v1.0! Your function executed successfully!",
        "input": event
    }

    response = {
        "statusCode": 200,
        "body": json.dumps(body)
    }

    return response

    # Use this code if you don't use the http event with the LAMBDA-PROXY
    # integration
    """
    return {
        "message": "Go Serverless v1.0! Your function executed successfully!",
        "event": event
    }
    """

As we can see, is a pretty simple script. All we have to do is create a function that receives 2 parameters, context and event. Event is used to pass the input data on which the lambda will work. Context is used by AWS to pass information about the environment on which the lambda is running. For example, if we wanted to know how much time is left before our running time limit is reached, we could do the following call:

print("Time remaining (MS):", context.get_remaining_time_in_millis())

The dictionary returned by the function is the standard response for a lambda that acts as a API, proxied by AWS API Gateway.

For now, let’s leave the script as it is, as we will add more functions to the project. Let’s begin by adding the Dynamodb table we will use on our lab, alongside other configurations.

Creating Dynamodb table

In order to create the table, we modify our YAML as follows:

service: MyAmazingAWSLambdaService

provider:
  name: aws
  runtime: python3.6
  stage: ${opt:stage, 'dev'}
  region: ${opt:region, 'us-east-1'}
  profile: personal

functions:
  hello:
    handler: handler.hello

resources:
  Resources:
    product:
      Type: AWS::DynamoDB::Table
      Properties:
        TableName: product
        AttributeDefinitions:
          - AttributeName: id
            AttributeType: S
        KeySchema:
          - AttributeName: id
            KeyType: HASH
        ProvisionedThroughput:
          ReadCapacityUnits: 1
          WriteCapacityUnits: 1

We added a resources section, where we defined a dynamodb table called product and defined a atribute called id to be key in table’s items. We also defined the stage and region to be collected as command-line options – stages are used as application environments, such as QA and Production. Finally, we defined that we want the deploys to use a IAM profile called personal. This is useful when having several accounts on the same machine.

Let’s deploy the stack by entering:

serverless deploy --stage prod

After some time, we will see that our stack was successfully deployed, as we can see on the console:

Serverless: Packaging service...
Serverless: Excluding development dependencies...
Serverless: Creating Stack...
Serverless: Checking Stack create progress...
.....
Serverless: Stack create finished...
Serverless: Uploading CloudFormation file to S3...
Serverless: Uploading artifacts...
Serverless: Uploading service .zip file to S3 (3.38 KB)...
Serverless: Validating template...
Serverless: Updating Stack...
Serverless: Checking Stack update progress...
..................
Serverless: Stack update finished...
Service Information
service: MyAmazingAWSLambdaService
stage: prod
region: us-east-1
stack: MyAmazingAWSLambdaService-prod
api keys:
 None
endpoints:
 None
functions:
 hello: MyAmazingAWSLambdaService-prod-hello

During the deployment, Serverless generated a zip file with all our resources, uploaded to a bucket, created a CloudFormation stack and deployed our lambda with it, alongside the necessary permissions to run. It also created our dynamodb table, as required.

Now that we have our stack and table, let’s begin by creating a group of lambdas to implement CRUD operations on our table.

Creating CRUD lambdas

Now, let’s create our lambdas. First, let’s create a Python class to encapsulate the operations on our dynamodb table:

# -*- coding: UTF-8 -*-

import boto3

dynamodb = boto3.resource('dynamodb')


class DynamoDbHelper:

    def __init__(self, **kwargs):
        self.table = dynamodb.Table(kwargs.get('table', """missing"""))

    def save(self, entity):
        """
           save entity from DYNAMODB_self.table
        """
        saved = self.table.put_item(Item=entity)
        print('Saving result: ({})'.format(saved))

    def get(self, entity_id):
        """
         get entity from DYNAMODB_self.table
      """
        entity = self.table.get_item(Key={'id': entity_id})

        if 'Item' in entity:
            return entity['Item']
        else:
            return None

    def delete(self, entity_id):
        """
           delete entity from DYNAMODB_self.table
        """
        deleted = self.table.delete_item(Key={'id': entity_id})
        print('Deleting result: ({})'.format(deleted))

Then, we create another script to make a converter. This will be used to read the input data from lambda calls to dynamodb’s data:

# -*- coding: UTF-8 -*-
import json


class ProductConverter:

    def convert(self, event):
        """
           convert entity from input to dictionary to be saved on dynamodb
        """
        if isinstance(event['body'], dict):
            data = event['body']
        else:
            data = json.loads(event['body'])
        return {
            'id': data['id'],
            'name': data.get('name', ''),
            'description': data.get('description', ''),
            'price': data.get('price', '')
        }

Next, we change the script created by Serverless, creating the lambda handlers:

# -*- coding: UTF-8 -*-

import json

from helpers.DynamoDbHelper import DynamoDbHelper
from helpers.ProductConverter import ProductConverter


def get_path_parameters(event, name):
    return event['pathParameters'][name]


dynamodb_helper = DynamoDbHelper(table='product')
product_converter = ProductConverter()


def save(event, context):
    product = product_converter.convert(event)

    if 'id' not in event['body']:
        return {
            "statusCode": 422,
            "body": json.dumps({'message': 'Product ID is required!'})
        }

    try:
        dynamodb_helper.save(product)
        response = {
            "statusCode": 200,
            "body": json.dumps({'message': 'saved successfully'})
        }
    except Exception as e:
        return {
            "statusCode": 500,
            "body": json.dumps({'message': str(e)})
        }

    return response


def get(event, context):
    product = dynamodb_helper.get(get_path_parameters(event, 'product_id'))

    if product is not None:
        response = {
            "statusCode": 200,
            "body": json.dumps(product)
        }
    else:
        response = {
            "statusCode": 404,
            "body": json.dumps({'message': 'not found'})
        }
    return response


def delete(event, context):
    try:
        dynamodb_helper.delete(get_path_parameters(event, 'product_id'))
        response = {
            "statusCode": 200,
            "body": json.dumps({'message': 'deleted successfully'})
        }
    except Exception as e:
        return {
            "statusCode": 500,
            "body": json.dumps({'message': str(e)})
        }

    return response

Finally, we change serverless.yml, defining our lambdas to be deployed on AWS:

service: MyAmazingAWSLambdaService

provider:
  name: aws
  runtime: python3.6
  stage: ${opt:stage, 'dev'}
  region: ${opt:region, 'us-east-1'}
  profile: personal

  iamRoleStatements: # permissions for all of your functions can be set here
      - Effect: Allow
        Action:
          - dynamodb:*
        Resource: "arn:aws:dynamodb:us-east-1:<your_account_id>:table/product"

functions:
  save_product:
    handler: handler.save
    events:
      - http:
          path: product/save
          method: post
  update_product:
    handler: handler.save
    events:
      - http:
          path: product/update
          method: patch
  get_product:
    handler: handler.get
    events:
      - http:
          path: product/{product_id}
          method: get
  delete_product:
    handler: handler.delete
    events:
      - http:
          path: product/{product_id}
          method: delete

resources:
  Resources:
    product:
      Type: AWS::DynamoDB::Table
      Properties:
        TableName: product
        AttributeDefinitions:
          - AttributeName: id
            AttributeType: S
        KeySchema:
          - AttributeName: id
            KeyType: HASH
        ProvisionedThroughput:
          ReadCapacityUnits: 1
          WriteCapacityUnits: 1

After running the deployment again, we can see the functions were deployed, as seen on terminal:

Serverless: Packaging service...
Serverless: Excluding development dependencies...
Serverless: Uploading CloudFormation file to S3...
Serverless: Uploading artifacts...
Serverless: Uploading service .zip file to S3 (4.92 KB)...
Serverless: Validating template...
Serverless: Updating Stack...
Serverless: Checking Stack update progress...
........................................................................................
Serverless: Stack update finished...
Service Information
service: MyAmazingAWSLambdaService
stage: prod
region: us-east-1
stack: MyAmazingAWSLambdaService-prod
api keys:
 None
endpoints:
 POST - https://xxxxxxxxxxxx.execute-api.us-east-1.amazonaws.com/prod/product/save
 PATCH - https://xxxxxxxxxxxx.execute-api.us-east-1.amazonaws.com/prod/product/update
 GET - https://xxxxxxxxxxxx.execute-api.us-east-1.amazonaws.com/prod/product/{product_id}
 DELETE - https://xxxxxxxxxxxx.execute-api.us-east-1.amazonaws.com/prod/product/{product_id}
functions:
 save_product: MyAmazingAWSLambdaService-prod-save_product
 update_product: MyAmazingAWSLambdaService-prod-update_product
 get_product: MyAmazingAWSLambdaService-prod-get_product
 delete_product: MyAmazingAWSLambdaService-prod-delete_product

PS: the rest api id was intentionally masked by me for security reasons.

On terminal, we can also see the URLs to call our lambdas. On AWS lambda, the URLs follows this pattern:

https://{restapi_id}.execute-api.{region}.amazonaws.com/{stage_name}/

Later on our lab we will learn how to test our lambdas. For now, let’s learn how to create our last lambda, the one that will read from S3 events.

Creating S3 lambda to bulk create to Dynamodb

Now, let’s implement a lambda that will bulk process product inserts. This lambda will use a csv file as parameter, receiving chunks of data. The lambda will process the data as a stream, using the streaming interface from boto3 behind the hood, saving products as it reads them. To facilitate, we will use Pandas Python library to read the csv . The lambda code is as follows:

# -*- coding: UTF-8 -*-

import boto3
import pandas as pd
import io

from helpers.DynamoDbHelper import DynamoDbHelper

s3 = boto3.client('s3')
dynamodb_helper = DynamoDbHelper(table='product')


def bulk_insert(event, context):
    for record in event['Records']:
        bucket = record['s3']['bucket']['name']
        key = record['s3']['object']['key']
        print('Bucket: ' + bucket)
        print('Key: ' + key)
        obj = s3.get_object(Bucket=bucket, Key=key)
        products = pd.read_csv(io.BytesIO(obj['Body'].read()))
        products_records = products.fillna('').to_dict('records')
        for product in products_records:
            record = {
                'id': str(product['id']),
                'name': product.get('name', ''),
                'description': product.get('description', ''),
                'price': str(product.get('price', '0'))
            }
            dynamodb_helper.save(record)

You may notice that we iterate over a list to read bucket files. That’s because AWS can bulk S3 changes to call our lambda fewer times, so we need to read all references that are sent to us.

Before moving on to serverless.yml changes, we need to install a plugin called serverless-python-requirements. This plugin is responsible for adding our pip requirements to lambda packaging.

First, let’s create a file called package.json, with the following. This file must be created by the npm init command:

{
  "name": "myamazingawslambdaservice",
  "version": "1.0.0",
  "description": "",
  "main": "index.js",
  "author": "",
  "license": "ISC"
}

Next, let’s install the plugin, using npm:

npm install --save serverless-python-requirements

And finally, add the plugin to serverless.yml. Here it is all the changes needed for the new lambda:

service: MyAmazingAWSLambdaService

plugins:
  - serverless-python-requirements

custom:
  pythonRequirements:
    dockerizePip: non-linux

provider:
  name: aws
  runtime: python3.6
  stage: ${opt:stage, 'dev'}
  region: ${opt:region, 'us-east-1'}
  profile: personal

  iamRoleStatements: # permissions for all of your functions can be set here
      - Effect: Allow
        Action:
          - dynamodb:*
        Resource: "arn:aws:dynamodb:us-east-1:<your_account_id>:table/product"
      - Effect: Allow
        Action:
          - s3:GetObject
        Resource: "arn:aws:s3:::myamazingbuckets3/*"

functions:
  bulk_insert:
    handler: bulk_handler.bulk_insert
    events:
      - s3:
          bucket: myamazingbuckets3
          event: s3:ObjectCreated:*
  save_product:
    handler: handler.save
    events:
      - http:
          path: product/save
          method: post
  update_product:
    handler: handler.save
    events:
      - http:
          path: product/update
          method: patch
  get_product:
    handler: handler.get
    events:
      - http:
          path: product/{product_id}
          method: get
  delete_product:
    handler: handler.delete
    events:
      - http:
          path: product/{product_id}
          method: delete

resources:
  Resources:
    product:
      Type: AWS::DynamoDB::Table
      Properties:
        TableName: product
        AttributeDefinitions:
          - AttributeName: id
            AttributeType: S
        KeySchema:
          - AttributeName: id
            KeyType: HASH
        ProvisionedThroughput:
          ReadCapacityUnits: 1
          WriteCapacityUnits: 1

PS: because of the plugin, is now needed to have Docker running on deployment. This is because the plugin uses Docker to compile Python packages that requires OS binaries to be installed. The first time you run it, you may notice the process ‘hangs’ at docker step. This is because is downloading the docker image, which is quite sizeable (about 600Mb). 

All we had to do is add IAM permissions to the bucket and define the lambda, adding a event to fire at object creations on the bucket. It is not needed to add the bucket to the resource creation section, as Serverless will already create the bucket as we defined that will be used by a lambda on the project.

Now, let’s test the bulk insert. After redeploying, let’s create a csv like following:

id,name,description,price
1,product 1,description 1,125.23
2,product 2,description 2,133.43
3,product 3,description 3,142.24

Let’s save the file and upload to our bucket. After some instants, we will see that our DynamoDB table will be populated with the data, as we can see bellow:

Screen Shot 2018-05-07 at 23.45.56

DynamoDB table populated with csv bulk data

Now that we have all our lambdas developed, let’s learn how to test locally and invoke our lambdas from different locations.

Testing local

Let’s test locally our insert product lambda. First, we need to create a json file to hold our test data. Let’s call it example_json.json :

{
  "body": {
    "id": "123",
    "name": "product 1",
    "description": "description 1",
    "price": "123.24"
  }
}

Next, let’s run locally by using Serverless, with the following command:

serverless invoke local --function save_product --path example_json.json

PS: Before running, don’t forget to set the AWS_PROFILE env variable to your profile.

After some instants, we will see the terminal outputting as follows:

Serverless: Invoke invoke:local
Saving result: ({'ResponseMetadata': {'RequestId': '0CDUDLGJEU7UBLLSCQ82SF0B53VV4KQNSO5AEMVJF66Q9ASUAAJG', 'HTTPStatusCode': 200, 'HTTPHeaders': {'server': 'Server', 'date': 'Mon, 14 May 2018 15:13:22 GMT', 'content-type': 'application/x-amz-json-1.0', 'content-length': '2', 'connection': 'keep-alive', 'x-amzn-requestid': '0CDUDLGJEU7UBLLSCQ82SF0B53VV4KQNSO5AEMVJF66Q9ASUAAJG', 'x-amz-crc32': '2745614147'}, 'RetryAttempts': 0}})

{
 "statusCode": 200,
 "body": "{\"message\": \"saved successfully\"}"
}

Behind the hood, Serverless is creating a emulated environment as close as it gets to AWS lambda environment, using the permissions described on the YAML to emulate the permissions set for the function.

It is important to notice that the framework doesn’t guarantee 100% accuracy with a real lambda environment, so more testing in a separate stage – QA, for example – is still necessary before going to production.

Testing on AWS API gateway

To test with AWS API gateway interface, is pretty simple: simply navigate to API Gateway inside Amazon console, then select prod-MyAmazingAWSLambdaService API, navigate to the POST endpoint for example, and select the TEST link.

Provide a JSON like the one we used on our local test – but without the body atribute, moving the attributes to the root – and run it. The API will run successfully, as we can see on the picture bellow:

Screen Shot 2018-05-14 at 16.12.59

AWS Lambda running on API Gateway

Testing as a consumer

Finally, let’s test like a consumer would call our API. For that, we will use curl. We open a terminal and run:

curl -d '{ "id": "123", "name": "product 1", "description": "description 1", "price": "123.24" }' -H "Content-Type: application/json" -X POST https://<your_rest_api_id>.execute-api.us-east-1.amazonaws.com/prod/product/save

This will produce the following output:

{"message": "saved successfully"}%  

Proving our API is successfully deployed for consuming!

Adding security (API keys)

In our previous example, our API is exposed without security to the open world. Of course, on a real scenario, this is not good. It is possible to integrate lambda with several security solutions such as AWS Cognito, to improve security. In our lab, we will use basic API token authentication provided by AWS API gateway.

Let’s change our YAML as follows:

...omitted....
functions:
  bulk_insert:
    handler: bulk_handler.bulk_insert
    events:
      - s3:
          bucket: myamazingbuckets3
          event: s3:ObjectCreated:*
  save_product:
    handler: handler.save
    events:
      - http:
          path: product/save
          method: post
          private: true
  update_product:
    handler: handler.save
    events:
      - http:
          path: product/update
          method: patch
          private: true
  get_product:
    handler: handler.get
    events:
      - http:
          path: product/{product_id}
          method: get
          private: true
  delete_product:
    handler: handler.delete
    events:
      - http:
          path: product/{product_id}
          method: delete
          private: true
...omitted....

Now, after redeploying, if we try to make our previous curl, we will receive the following error:

{"message":"Forbidden"}% 

So, how do we call it now? First, we need to enter API Gateway and generate a API key, as bellow:

Screen Shot 2018-05-14 at 16.35.10

API gateway keys

Secondly, we need to create a usage plan, to associate the key with our API. We do this by following the steps bellow inside API gateway interface:

usage1

usage2

usage3

With our key in hand, let’s try again our curl call, adding the header to pass our key:

curl -d '{ "id": "123", "name": "product 1", "description": "description 1", "price": "123.24" }' -H "Content-Type: application/json" -H "x-api-key: <your_API_key>" -X POST https://udw8q0957h.execute-api.us-east-1.amazonaws.com/prod/product/save

After the call, we will receive again the saved successfully response, proving our configuration was successful.

Lambda Logs (CloudWatch)

One last thing we will talk about is logging on AWS Lambda. The reader may noticed the use of Python’s print function in our code. On AWS Lambda, the prints done by Python are collected and organised inside another AWS service, called CloudWatch. We can access CloudWatch on the Amazon Console, as follows:

Screen Shot 2018-05-14 at 17.05.56

CloudWatch logs list

On the list above, we have each function separated as a link. If we drill down inside one of the links, we will see another link list of each execution made by that function. The print bellow is a example of one of our lambda’s executions:

Screen Shot 2018-05-14 at 17.06.11

Lambda execution log

Conclusion

And so we concluded our tour through AWS Lambda. With a simple and intuitive approach, it is a good option to deploy applications back-ends following the microservices paradigm. Thank you for following me on this post, until next time.

sources used on this lab

Advertisements

Kafka: implementing stream architectures

Standard

Hi, dear readers! Welcome to my blog. On this post, we will learn about Apache Kafka, a distributed messaging system which is being used on lots of streaming solutions.

This article will be divided on several sections, allowing the reader to not only understand the concepts behind but also a lab to exercise the concepts in practice. So, without further delay, let’s begin!

Kafka architecture

Overview

Kafka is a distributed messaging system created by Linkedin. On Kafka, we have stream data structures called topics, which can be consumed by several clients, organized on consumer groups. This topics are stored on a Kafka cluster, where which node is called a broker.

Kafka’s ecosystem also need a Zookeeper cluster in order to run. Zookeeper is a key-value  storage solution, which on Kafka’s context is used to store metadata. Several operations such as topic creation are done on Zookeeper, instead of in the brokers.

The main difference from Kafka to other messaging solutions that utilizes classic topic structures is that on Kafka we have offsets. Offsets act like cursors, pointing to the last location a consumer and/or producer has reached consuming/producing messages for a given topic on a given partition.

Partitions on Kafka are like shards on some NOSQL databases: they divide the data, organizing by partition and/or message keys (more about this when we talk about ingesting data on Kafka).

So, on Kafka, we have producers ingesting data, controlled by producer offsets, while we have consumers consuming data from topics, also with their offsets. The main advantages on this approach are:

  • Data can be read and replayed by consumers, since there’s no link between consumed data and produced data. This also allows to implement solutions with back-pressure, that is, solutions where consumers can poll data according to their processing limits;
  • Data can be retained for more time, since on streams, different from classic topics, data is not removed from the structure after been sent to all consumers. It is also possible to compress the data on the stream, allowing Kafka clusters to retain lots of data on their streams;

Kafka’s offsets explained

The following  diagram illustrates a Kafka topic on the run:

kafka-diagram-1

Kafka Topic producer/consumer offsets

On the diagram, we can see a topic with 2 partitions. Each little rectangle represents a offset pointing to a location on the topic. On the producer side, we can see 2 offsets pointing to the topic’s head, showing our producer ingesting data on topic.

On Kafka, each partition is assigned to a broker and each broker is responsible for delivering production/consumption for that partition. The broker responsible for this is called a partition leader on Kafka.

How many partitions are needed for a topic? The main factor for this point is the desired throughput for production/consumption. Several factors are key for the throughput, such as the producer ack type, number of replicas etc.

Too much partitions are also something to take care when planning a new topic, as too much partitions can hinder availability and end-to-end latency, alongside memory consumption on the client side – remember that both producer and consumer can operate with several partitions at the same time. This article is a excellent reference for this matter.

On the consumer side, we see some interesting features. Each consumer has his own offset, consuming data from just one partition. This is a important concept on Kafka: each consumer is responsible for consuming one partition on Kafka and each consumer group consumes the data individually, that is, there is no relation between the consumption of one group and the others.

We can see this on the diagram, where the offsets from one group are on different positions from the others. All Kafka offsets are stored on a internal topic inside Kafka’s cluster, both producer and consumer offsets. Offsets are committed (updated) on the cluster using auto-commit or by committing manually on code, analogous as relational database commits. We will see more about this when coding our own consumer.

What happens when there’s more partitions than consumers? When this happens, Kafka’s delivers data from more then one partition to the same consumer, as we can see bellow. It is important to note that it is possible to increase the number of consumers on a group, avoiding this situation altogether:

kafka-diagram-2

The same consumer consuming from more then one partition on Kafka

One important thing to notice is what happens on the opposite situation, when there is less partitions than consumers configured:

kafka-diagram-3

Idle consumers on Kafka

As we can see, on this case, we end up with idle consumers, that won’t process any messages until a new partition is created. This is important to keep in mind when setting a new consumer on Kafka, as increasing too much the number of consumers will just end up with idle resources not been used at all.

One of the key features on Kafka is that it guarantees message ordering. This ordering is done on the messages within the same partition, but not on the whole topic. That means that when we consume data from Kafka with our consumers, the data across the partitions is read on parallel, but data from the same partition is read with a single thread, guaranteeing the order.

IMPORTANT: As stated on Kafka’s documentation, it is not recommended to process data coming from Kafka in parallel, as it will scramble the messages order. The recommended way to scale the solution is by adding more partitions, as it will add more threads to process data on parallel, without losing the ordering inside the partitions.

Partitions on Kafka are always written on a single mount point on disk. They are written on files, that are splitted when they reach a certain amount of data, or a certain time period – 1GB or 1 week of data respectively by default, whatever  it comes first – that are called log segments. The more recent log segment, that represents the data ingested up to the head of the stream is called active segment and it is never deleted. The older segments are removed from disk according to the retention policies configured.

Replicas

In order to guarantee data availability, Kafka works with replicas. When creating a topic, we define how much replicas we want  to have for each partition on the topic. If we configure we want 3 replicas, for example, that means that for a topic with 2 partitions, we will have 6 replicas from that topic, plus the 2 active partitions.

Kafka replicates the data just like we would do by hand: brokers that are responsible for maintaining the replicas – called partition followers – will subscribe for the topic and keep reading data from the partition leader and writing to their replicas. Followers that have data up-to-date with the leader are called In-Synch replicas. Replicas can become out of synch for example due to network issues, that causes the synching process to be slow and lag behind the leader to a unacceptable point.

Rebalance

When a rebalance occurs, for example if a broker is down, all the writing/reading from partitions that broker was a partition leader are ceased. The cluster elects a new partition leader, from one of the IS (In-Synch) replicas, and so the writing/reading is resumed.

During this period, applications that were using the old leader to publish data will receive a specific error when trying to use the partition, indicating that a rebalance is occurring (unless we configure the producer to just deliver the messages without any acknowledgment, which we will see in more detail on the next sections). On the consumer side, it is possible to implement a rebalance listener, which can clean up the work for when the partition is available again.

It is important to notice that, as a broker is down, it could be possible that some messages won’t be committed, causing messages to be processed twice when the partition processing is resumed.

What happens if a broker is down and no IS replicas are available? That depends on what we configured on the cluster. If unclean election is disabled, then all processing is suspended on that partition, until the broker that was down comes back again. If unclean election is enabled, then one of the brokers that were a follower is elected as leader.

Off course, each option has his advantages: without unclean election, we can lose the partition in case we can’t restart the lost broker, but with unclean election, we risk losing some messages, since their offsets will be overwritten by the new leader, when new data arrives at the partition.

If the old leader comes back again, it will resume the partition’s processing as a new follower, and it will not insert the lost messages in case of a unclean election.

Kafka’s producer explained

On this section, we will learn the internals that compose a Kafka producer, responsible for sending messages to Kafka topics. When working with the producer, we create ProducerRecords, that we send to Kafka by using the producer.

Producer architecture

Kafka producer internal structure is divided as we can see on the following diagram:

kafka-producer

Kafka Producer internal details

As we can see, there is a lot going on when producing messages to Kafka. First, as said before, we create a ProducerRecord, that consist of 3 sections:

  • Partition Key: The partition key is a optional field. If it is passed, it indicates the partition that it must be sent the message too;
  • Message Key: The message key is a required field. If no partition key is passed, the partitioner will use this field to determine on which partition it will send the message. Kafka guarantees that all messages for a same given message key will always be sent to the same partition – as long as the number of partitions on a topic stay the same;
  • Value (payload): The value field is a required field and, as obvious, is the message itself that must be sended;

All the fields from the ProducerRecord must be serialized to byte arrays before sent to Kafka, so that’s exactly what is done by the Serializer at the first step of our sending – we will see later on our lab that we always define a serializer for our keys and  value – , after that, the records are sent to the Partitioner, that determines the partition to send the message.

The Partitioner then send the message to bulk processes, running on different threads, that “stack” the messages until a threshold is reached – a certain number of bytes or a certain time without new messages, whatever  it comes first – and finally, after the threshold is reached, the messages are sent to the Kafka broker.

Let’s keep in mind  that, as we saw before, brokers are elected as partition leaders for partitions on topics, so when sending the messages, they are sent directly to the partition leader’s broker.

Acknowledgment types

Kafka’s producer works with 3 types of acks (acknowledgments) that a message has been successfully sent. The types are:

  • ack=0: The producer just send the message and don’t wait for a confirmation, even from the partition leader. Of course, this is fastest option to deliver messages, but there is also risk of message loss;
  • ack=1: The producer waits for the partition leader to reply that wrote the message before moving on. This option is more safe, however, there is also some degree of risk, since a partition leader can go down just after the acknowledgement without repassing the message to any replica;
  • ack=all: The producer waits for the partition leader and all IS replicas to write before moving on. This option is naturally the safest of all, but there is also the disadvantage of possible performance issues, due to waiting for all network replication to occur before continuing. This aggravates when there is no IS replicas at the moment, as it will hold the production until at least one replica is made;

Which one to use? That depends on the characteristics of the solution we are working with. A ack=0 could be useful on a solution that works with lots of messages that are not critic in case of losses – monitoring events, for example, are short-lived information that could be lost at certain degree – unlike, for example, bank account transactions, where ack=all is a must, since message losses are unacceptable on this kind of application.

Producer configurations

There are several configurations that could be made on the producer. Here we have some of the more basic and interesting ones to know:

  • bootstrap.servers: A list of Kafka brokers for the producer to communicate with. This list is updated automatically when brokers are added/removed from the cluster, but it is advised to set at least 2 brokers, as the producer won’t start if just one broker is set and the broker is down;
  • key.serializer: The serializer that it will be used when transforming the keys to byte arrays. Of course, the serializer class will depend on the keys type been used;
  • value.serializer: The serializer that it will be used to transform the message to a byte array. When using complex types such as Java objects, it is possible to use one of the several out-of-box serializers, or implement your own;
  • acks: This is where we define the acknowledgement type, as we saw previously;
  • batch.size: This is the amount of memory the bulk process will wait to stack it up until reached to send the message batches;
  • linger.ms: The amount of time, in milliseconds, the producer will wait for new messages, before sending the messages it has buffered. Of course, if the batch.size is reached first, then the message batch is sent before reaching this threshold;
  • max.in.flight.requests.per.connection: This parameters defines how many messages the producer will send before waiting for responses from Kafka (if ack is not set as 0, of course). As stated on Kafka’s documentation, this configuration must be set to 1 to guarantee the messages on Kafka will be written at the same order  they are sent by the producer;
  • client.id: This parameter can be set with any string value and identifies the producer on the Kafka cluster. It is used by the cluster to build metrics and logging;
  • compression.type: This parameter define a compression to be used on messages, before they are sent to Kafka. It supports snappy, gzip and lz4 formats. By default, no compression is used;
  • retries: This parameter defines how many times the producer will retry sending a message to a broker, before notifying the application that a error has occurred;
  • retry.backoff.ms: This parameter defines how many milliseconds the producer will wait between the retries. By default, the time is 100ms;

Kafka’s consumer explained

On this section, we will learn the internals that compose a Kafka consumer, responsible for reading messages from Kafka topics.

Consumer architecture

Kafka consumer internal structure is divided as we can see on the following diagram:

kafka-consumer-3

Kafka consumer internal details

When we request a Kafka broker to create a consumer group for one or more topics, the broker creates a Consumer Group Coordinator. Each broker has a group coordinator for the partitions it is the partition leader.

This component is responsible for deciding which consumer will be responsible for consuming which partitions, by the rules we talked about on the offsets section. It is also responsible for checking consumers health, by establishing heartbeat frequencies to be sent at intervals. If a consumer fails to send heartbeats, it is considered unhealthy, so Kafka delegates the partitions assigned to that consumer to another one.

The consumer, on his turn, uses a deserializer to convert the messages from byte arrays to the required types. Like with the producer, we can also use several different types of out-of-box deserializers, as well as creating our own.

IMPORTANT: Kafka consumer must always run on the main thread. If you try to create a consumer and delegate to run on another thread, there’s a check on the consumer that will thrown a error. This is due to Kafka consumer not been thread safe. The recommended way to scale a application that consumes from Kafka is by creating new application instances, each one running his own consumer on the same consumer group.

One important point to take note is that, when a message is delivered to Kafka, it only becomes available to consume after it is properly replicated to all IS replicas for his respective partition. This is important to ensure data availability, but it also means that messages can take a significant amount of time to be delivered for consuming.

Kafka works with the concept of back-pressure. This means that applications are responsible for asking for new chunks of messages to process, allowing clients to process data at their paces.

Commit strategies

kafka works with 3 commit strategies, to know:

  • Auto-commit: On this strategy, messages are marked as committed as soon as they are successfully consumed from the broker. The downside of this approach is that messages that were not processed correctly could be lost due to already been committed;
  • Synchronous manual commit: On this strategy, messages are manually committed synchronously. This is the safest option, but has the downside of hindering the performance, as commits become more slow;
  • Asynchronous manual commit: On this strategy, messages are manually committed asynchronously. This option has better performance then the previous one as commits are done on a separate thread, but there is also some level of risk that messages won’t been committed due to some problem, resulting on messages been processed more then once;

Like when we talked about acknowledgement types, the best commit strategy to be used depends on the characteristics of the solution been implemented.

Consumer configurations

There are several configurations that could be made on the consumer. Here we have some of the more basic and interesting ones to know:

  • fetch.min.bytes: This defines the minimum amount of bytes a consumer wants to receive from a bulk of messages. The consumer will wait for this minimum to be reached, or a time limit to process messages, as defined on other config;
  • max.partition.fetch.bytes: As opposite to the previous config, this defines the maximum size, in bytes, that we want to receive on the chunk of data we asked for Kafka. As previously, if the time limit is reached first, Kafka will sent the messages it have;
  • fetch.max.wait.ms: As we talked on previous configs, this is the property that we define the time limit, on milliseconds, for Kafka to wait for more messages to fetch, before sending what it have to the consumer application;
  • auto.offset.reset: This defines what the consumer will do when first reading from a partition it never readed before or it has a invalid commit offset, for example if a consumer was down for so long that his last committed offset has already been purged from the partition. The default is latest, which means it will start reading from the newest records. The other option is earliest, on that case, the consumer will read all messages from the partition, since the beginning;
  • session.timeout.ms: This property defines the time limit for which a consumer must sent a heartbeat to still be considered healthy. The default is 3 seconds.

IMPORTANT: heartbeats are sent at each polling and/or commits made by the consumer. This means that, on the poll loop, we must be careful with the processing time, as if it passes the session timeout period, Kafka will consider the consumer unhealthy and it will redeliver the messages to another consumer.

Hands-on

Well, that was a lot to cover. Now that we learned Kafka main concepts, let’s begin our hands-on Kafka and learn what we talked in practice!

Set up

Unfortunately, there is no official  Kafka Docker image. So, for our lab, we will use Zookeeper and Kafka images provided by wurstmeister (thanks, man!). At the end, we can see links for his images.

Also at the end of the article, we can find a repository with the sources for this lab. There is also a docker compose stack that could be found there to get a Kafka cluster up and running. This is the stack:

version: '2'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
ports:
- "9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: ${MY_IP}
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_DELETE_TOPIC_ENABLE: "true"

volumes:
- /var/run/docker.sock:/var/run/docker.sock

In order to run a cluster with 3 nodes, we can run the following commands:

export MY_IP=`ip route get 1 | awk '{print $NF;exit}'`
docker-compose up -d --scale kafka=3

To stop it, just run:

docker-compose stop

On our repo’s lab there is also a convenient bash script that set up a 3 node Kafka cluster without the need to enter the commands above every time.

Coding the producer

Now that we have our environment, let’s begin our lab. First, we need to create our topic. To create a topic, we need to use a shell inside one of the brokers, pointing Zookeeper address as a parameter – some operations, such as topic CRUD operations, are done pointing to Zookeeper instead of Kafka. There is plans to move all operations to be done on brokers directly on next releases – alongside other parameters. Assuming we have a terminal with MY_IP environment variable set, this can be done using the following command:

docker exec -t -i kafkalab_kafka_1 /opt/kafka/bin/kafka-topics.sh 
--create --zookeeper ${MY_IP}:2181 --replication-factor 1 
--partitions 2 --topic test

PS: All commands assume the name of the Kafka containers follows docker compose naming standards. If running on the lab repo, it will be created as kafkalab_kafka_1,kafkalab_kafka_2,etc

On the previous command, we created a topic named test with replication factor of 1 and 2 partitions. We can check if the topic was created by running the list topics command, as follows:

docker exec -t -i kafkalab_kafka_1 /opt/kafka/bin/kafka-topics.sh 
--list --zookeeper ${MY_IP}:2181

This will return a list of topics that exist on Zookeeper, on this case, “test”.

Now, let’s create a producer. All code on this lab will be done on Java, using Kafka’s APIs. After creating a Java project, we will code our own producer wrapper. Let’s begin by creating the wrapper itself:

package com.alexandreesl.producer;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class MyProducer {


  private KafkaProducer producer;


  public MyProducer() throws UnknownHostException {

    InetAddress ip = InetAddress.getLocalHost();

    StringBuilder builder = new StringBuilder();
    builder.append(ip.getHostAddress());
    builder.append(":");
    builder.append("");
    builder.append(",");
    builder.append(ip.getHostAddress());
    builder.append(":");
    builder.append("");

    Properties kafkaProps = new Properties();
    kafkaProps.put("bootstrap.servers", builder.toString());
    kafkaProps.put("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
    kafkaProps.put("value.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
    kafkaProps.put("acks", "all");
    producer = new KafkaProducer<String, String>(kafkaProps);

  }


  public void sendMessage(String topic, String key, String message) 
throws Exception {

    ProducerRecord<String, String> record = 
new ProducerRecord<>(topic,
        key, message);
    try {
      producer.send(record).get();
    } catch (Exception e) {
      throw e;
    }

  }
}

The code is very simple. We just defined the addresses from 2 brokers of our cluster – docker composer will automatically define ports for the brokers, so we need to change the ports accordingly to our environment first – ,  key and value serializers and set the acknowledgement type, on our case all, marking that we want all  replicas to be made before confirming the commit.

PS: Did you noticed the get() method been called after send()? This is because the send method is asynchronous by default. As we want to wait for Kafka to write the message before ending, we call get() to make the call synchronous.

The main class that uses our wrapper class is as follows:

package com.alexandreesl;

import com.alexandreesl.producer.MyProducer;

public class Main {

  public static void main(String[] args) throws Exception {

    MyProducer producer = new MyProducer();

    producer.sendMessage("test", "mysuperkey", "my value");


  }

}

As we can see, is a very simple class, just instantiate the class and use it.  If we run it, we will see the following output on terminal, with Kafka’s commit Id at the end, showing our producer is correctly implemented:

[main] INFO org.apache.kafka.clients.producer.ProducerConfig - 
ProducerConfig values: [main] 
INFO org.apache.kafka.clients.producer.ProducerConfig - 
ProducerConfig values:  
acks = all batch.size = 16384 
bootstrap.servers = [192.168.10.107:32813, 192.168.10.107:32814] 
buffer.memory = 33554432 client.id =  
compression.type = none 
connections.max.idle.ms = 540000 
enable.idempotence = false 
interceptor.classes = null 
key.serializer = class 
org.apache.kafka.common.serialization.StringSerializer 
linger.ms = 0 max.block.ms = 60000 
max.in.flight.requests.per.connection = 5 
max.request.size = 1048576 
metadata.max.age.ms = 300000 
metric.reporters = [] 
metrics.num.samples = 2 
metrics.recording.level = INFO 
metrics.sample.window.ms = 30000 
partitioner.class = class 
org.apache.kafka.clients.producer.internals.DefaultPartitioner 
receive.buffer.bytes = 32768 
reconnect.backoff.max.ms = 1000 
reconnect.backoff.ms = 50 
request.timeout.ms = 30000 
retries = 0 
retry.backoff.ms = 100 
sasl.jaas.config = null 
sasl.kerberos.kinit.cmd = /usr/bin/kinit 
sasl.kerberos.min.time.before.relogin = 60000 
sasl.kerberos.service.name = null 
sasl.kerberos.ticket.renew.jitter = 0.05 
sasl.kerberos.ticket.renew.window.factor = 0.8 
sasl.mechanism = GSSAPI 
security.protocol = PLAINTEXT 
send.buffer.bytes = 131072 
ssl.cipher.suites = null 
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] 
ssl.endpoint.identification.algorithm = null 
ssl.key.password = null ssl.keymanager.algorithm = SunX509 
ssl.keystore.location = null ssl.keystore.password = null 
ssl.keystore.type = JKS 
ssl.protocol = TLS ssl.provider = null 
ssl.secure.random.implementation = null 
ssl.trustmanager.algorithm = PKIX 
ssl.truststore.location = null 
ssl.truststore.password = null 
ssl.truststore.type = JKS 
transaction.timeout.ms = 60000 
transactional.id = null value.serializer = class 
org.apache.kafka.common.serialization.StringSerializer

[main] INFO org.apache.kafka.common.utils.AppInfoParser - 
Kafka version : 0.11.0.2
[main] INFO org.apache.kafka.common.utils.AppInfoParser - 
Kafka commitId : 73be1e1168f91ee2

Process finished with exit code 0

Now that we have our producer implemented, let’s move on to the consumer.

Coding the consumer

Now, let’s code our consumer. First, we create a consumer wrapper, like the following:

package com.alexandreesl.consumer;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Collections;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class MyConsumer {

  private KafkaConsumer<String, String> consumer;

  public MyConsumer() throws UnknownHostException {

    InetAddress ip = InetAddress.getLocalHost();

    StringBuilder builder = new StringBuilder();
    builder.append(ip.getHostAddress());
    builder.append(":");
    builder.append("");
    builder.append(",");
    builder.append(ip.getHostAddress());
    builder.append(":");
    builder.append("");

    Properties kafkaProps = new Properties();
    kafkaProps.put("bootstrap.servers", builder.toString());
    kafkaProps.put("group.id", "MyConsumerGroup");
    kafkaProps.put("key.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
    kafkaProps
        .put("value.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
    consumer = new KafkaConsumer<String, String>(kafkaProps);

  }

  public void consume(String topic) {

    consumer.subscribe(Collections.singletonList(topic));

    try {
      while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records) {
          System.out.println("Key: " + record.key());
          System.out.println("Value: " + record.value());
        }
      }
    } finally {
      consumer.close();
    }


  }

}

On wrapper, we subscribed to our test topic, configuring a ConsumerGroup ID and deserializers for our messages. When we call the subscribe method,  ConsumerGroupCoordinators are updated on the brokers, making the cluster allocate partitions for us on topics we asked for consumption, as long as there is no more consumers than partitions, like we talked about previously.

Then, we create the consume method, which has a infinite loop to keep consuming messages from topic. On our case, we just keep calling the poll method, which returns a List of messages – on default settings, up to 100 messages -, print keys and values of messages and keep polling. At the end, we close the connection.

On our example, we can notice we didn’t explicit commit the messages at any point. This is because we are using default settings, so it is doing auto-commit. As we talked previously, using auto-commit can be a option on some solutions, depending on the situation.

Now, let’s change our main class to allow us to produce and consume using the same program and also allowing to input messages to produce. We do this by adding some input parameters, as follows:

package com.alexandreesl;

import com.alexandreesl.consumer.MyConsumer;
import com.alexandreesl.producer.MyProducer;
import java.util.Scanner;

public class Main {

  public static void main(String[] args) throws Exception {

    Scanner scanner = new Scanner(System.in);

    System.out.println("Please select operation" + " 
(1 for producer, 2 for consumer) :");

    String operation = scanner.next();

    System.out.println("Please enter topic name :");

    String topic = scanner.next();

    if (operation.equals("1")) {

      MyProducer producer = new MyProducer();

      System.out.println("Please enter key :");

      String key = scanner.next();

      System.out.println("Please enter value :");

      String value = scanner.next();

      producer.sendMessage(topic, key, value);
    } else if (operation.equals("2")) {

      MyConsumer consumer = new MyConsumer();

      consumer.consume(topic);


    }


  }

}

If we run our code, we will see some interesting output on console, such as the consumer joining the ConsumerGroupCoordinator and been assigned to partitions. At the end it will print the messages we send as the producer, proving our coding was successful.

[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator -
 Discovered coordinator 192.168.10.107:32814 (id: 2147482646 rack: null) 
for group MyConsumerGroup.
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - 
Revoking previously assigned partitions [] for group MyConsumerGroup
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - 
(Re-)joining group MyConsumerGroup 
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - 
Successfully joined group MyConsumerGroup with generation 2 
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - 
Setting newly assigned partitions [test-1, test-0] for group MyConsumerGroup 
Key: mysuperkey 
Value: my value

Manual committing

Now that we know the basis to producing/consuming Kafka streams, let’s dive in on more details about Kafka’s consumer. We saw previously that our example used default auto-commit to commit offsets after reading. We do this by changing the code as follows:

package com.alexandreesl.consumer;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Collections;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class MyConsumer {

  private KafkaConsumer<String, String> consumer;

  public MyConsumer() throws UnknownHostException {

    InetAddress ip = InetAddress.getLocalHost();

    StringBuilder builder = new StringBuilder();
    builder.append(ip.getHostAddress());
    builder.append(":");
    builder.append("");
    builder.append(",");
    builder.append(ip.getHostAddress());
    builder.append(":");
    builder.append("");

    Properties kafkaProps = new Properties();
    kafkaProps.put("bootstrap.servers", builder.toString());
    kafkaProps.put("group.id", "MyConsumerGroup");
    kafkaProps.put("key.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
    kafkaProps
        .put("value.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
    kafkaProps.put("enable.auto.commit", "false");
    consumer = new KafkaConsumer<String, String>(kafkaProps);

  }

  public void consume(String topic) {

    consumer.subscribe(Collections.singletonList(topic));

    try {
      while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records) {
          System.out.println("Key: " + record.key());
          System.out.println("Value: " + record.value());


        }

        consumer.commitSync();

      }
    } finally {
      consumer.close();
    }


  }

}

If we run our code, we will see that it will continue to consume messages, as expected:

Please select operation (1 for producer, 2 for consumer) :2
Please enter topic name :test
[main] INFO org.apache.kafka.clients.consumer.ConsumerConfig - 
ConsumerConfig values:  auto.commit.interval.ms = 5000 
auto.offset.reset = latest 
bootstrap.servers = [192.168.10.107:32771, 192.168.10.107:32772] 
check.crcs = true client.id =  
connections.max.idle.ms = 540000 
enable.auto.commit = true 
exclude.internal.topics = true 
fetch.max.bytes = 52428800 
fetch.max.wait.ms = 500 
fetch.min.bytes = 1 
group.id = MyConsumerGroup 
heartbeat.interval.ms = 3000 
interceptor.classes = null 
internal.leave.group.on.close = true 
isolation.level = read_uncommitted 
key.deserializer = class 
org.apache.kafka.common.serialization.StringDeserializer 
max.partition.fetch.bytes = 1048576 
max.poll.interval.ms = 300000 
max.poll.records = 500 
metadata.max.age.ms = 300000 
metric.reporters = [] 
metrics.num.samples = 2 
metrics.recording.level = INFO 
metrics.sample.window.ms = 30000 
partition.assignment.strategy = 
[class org.apache.kafka.clients.consumer.RangeAssignor] 
receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 
reconnect.backoff.ms = 50 request.timeout.ms = 305000 
retry.backoff.ms = 100 sasl.jaas.config = null

sasl.kerberos.kinit.cmd = /usr/bin/kinit 
sasl.kerberos.min.time.before.relogin = 60000 
sasl.kerberos.service.name = null 
sasl.kerberos.ticket.renew.jitter = 0.05 
sasl.kerberos.ticket.renew.window.factor = 0.8 
sasl.mechanism = GSSAPI security.protocol = PLAINTEXT 
send.buffer.bytes = 131072 session.timeout.ms = 10000 
ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] 
ssl.endpoint.identification.algorithm = null ssl.key.password = null 
ssl.keymanager.algorithm = SunX509 
ssl.keystore.location = null 
ssl.keystore.password = null 
ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null 
ssl.secure.random.implementation = null 
ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null 
ssl.truststore.password = null ssl.truststore.type = JKS 
value.deserializer = class 
org.apache.kafka.common.serialization.StringDeserializer
[main] INFO org.apache.kafka.common.utils.AppInfoParser - 
Kafka version : 0.11.0.2[main] INFO org.apache.kafka.common.utils.AppInfoParser - 
Kafka commitId : 73be1e1168f91ee2
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator -
 Discovered coordinator 192.168.10.107:32773 
(id: 2147482645 rack: null) for group MyConsumerGroup.
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator -
 Revoking previously assigned partitions [] for group MyConsumerGroup
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator -
 (Re-)joining group MyConsumerGroup
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator -
 Successfully joined group MyConsumerGroup with generation 6
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator -
 Setting newly assigned partitions [test-1, test-0] 
for group MyConsumerGroup
Key: key
Value: value
Key: my
Value: key

On our example, we used synch committing, that is, the main thread is blocked waiting for the commit before start reading the next batch of messages. We can change this just by changing the commit method, as follows:

public void consume(String topic) {

  consumer.subscribe(Collections.singletonList(topic));

  try {
    while (true) {
      ConsumerRecords<String, String> records = consumer.poll(100);
      for (ConsumerRecord<String, String> record : records) {
        System.out.println("Key: " + record.key());
        System.out.println("Value: " + record.value());


      }

      consumer.commitAsync();

    }
  } finally {
    consumer.close();
  }


}

One last thing to check before we move on is committing specific offsets. On our previous examples, we committed all messages at once. If we wanted to do, for example, a asynch commit as messages are processed, we can do the following:

public void consume(String topic) {

  consumer.subscribe(Collections.singletonList(topic));

  try {
    while (true) {
      ConsumerRecords<String, String> records = consumer.poll(100);
      for (ConsumerRecord<String, String> record : records) {
        System.out.println("Key: " + record.key());
        System.out.println("Value: " + record.value());

        HashMap<TopicPartition, OffsetAndMetadata> offsets = 
new HashMap<>();

        offsets.put(new TopicPartition(record.topic(), record.partition()),
            new OffsetAndMetadata(record.offset() + 1, "no metadata"));

        consumer.commitAsync(offsets, null);


      }


    }
  } finally {
    consumer.close();
  }


}

Assigning to specific partitions

On our examples, we delegate to Kafka which partitions the consumers will consume. If we want to specify the partitions a consumer will be assigned to, we can use the assign method.

It is important to notice that this approach is not very recommended, as consumers won’t be replaced automatically by others when going down, neither new partitions will be added for consuming before been explicit assigned to a consumer.

On the example bellow, we do this, by marking that we want just to consume messages from one partition:

public void consume(String topic) {

  List partitions = new ArrayList<>();

  List partitionInfos = null;
  partitionInfos = consumer.partitionsFor(topic);
  if (partitionInfos != null) {
    partitions.add(
        new TopicPartition(partitionInfos.get(0).topic(), 
partitionInfos.get(0).partition()));

  }
  consumer.assign(partitions);

  try {
    while (true) {
      ConsumerRecords<String, String> records = consumer.poll(100);

      for (ConsumerRecord<String, String> record : records) {
        System.out.println("Key: " + record.key());
        System.out.println("Value: " + record.value());

        HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();

        offsets.put(new TopicPartition(record.topic(), record.partition()),
            new OffsetAndMetadata(record.offset() + 1, "no metadata"));

        consumer.commitAsync(offsets, null);


      }


    }
  } finally {
    consumer.close();
  }


}

Consumer rebalance

When consuming from a topic, we can scale consumption by adding more instances of our application, by parallelizing the processing. Let’s see this on practice.

First, let’s start a consumer. After initializing, we can see it joined both partitions from our topic:

[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator -
 Discovered coordinator 192.168.10.107:32772 (id: 2147482645 rack: null) 
for group MyConsumerGroup. 
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator -
 Revoking previously assigned partitions [] 
for group MyConsumerGroup [main] INFO 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator - 
(Re-)joining group MyConsumerGroup 
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator -
 Successfully joined group MyConsumerGroup with generation 18 
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator -
 Setting newly assigned partitions [test-1, test-0] for group 
MyConsumerGroup

Now, let’s start another consumer. We will see that, as soon it joins the ConsumerGroupCoordinator, it will be assigned to one of the partitions:

[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator -
 Discovered coordinator 192.168.10.107:32772 
(id: 2147482645 rack: null) 
for group MyConsumerGroup. 
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator -
 Revoking previously assigned partitions [] for group MyConsumerGroup 
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator -
 (Re-)joining group MyConsumerGroup 
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator -
 Successfully joined group MyConsumerGroup with generation 19 
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator -
 Setting newly assigned partitions [test-0] for group MyConsumerGroup

And if we see our old consumer, we will see that will be now reading from the other partition only:

[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator -
 (Re-)joining group MyConsumerGroup 
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator -
 Successfully joined group MyConsumerGroup with generation 19 
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator -
 Setting newly assigned partitions [test-1] for group MyConsumerGroup

This show us the power of Kafka ConsumerGroup Coordinator, that takes care of everything for us.

But, it is important to notice that, on real scenarios, we can implement listeners that are invoked when partitions are revoked to other consumers due to rebalance and before a partition starts consumption on his new consumer. This can be done by implementing the ConsumerRebalanceListener interface, as follows:

package com.alexandreesl.listener;

import java.util.Collection;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.common.TopicPartition;

public class MyConsumerRebalanceInterface implements 
ConsumerRebalanceListener {

  @Override
  public void onPartitionsRevoked(Collection partitions) {
    System.out.println("I am losing the following partitions:");
    for (TopicPartition partition : partitions) {
      System.out.println(partition.partition());
    }
  }

  @Override
  public void onPartitionsAssigned(Collection partitions) {
    System.out.println("I am starting on the following partitions:");
    for (TopicPartition partition : partitions) {
      System.out.println(partition.partition());
    }
  }
}

Of course, this is just a mock implementation. On a real implementation, we would be doing tasks such as committing offsets – if we buffered our commits on blocks before committing instead of committing one by one, that would turn out to be a necessity -, closing connections, etc.

We add our new listener by passing him as parameter to the subscribe() method, as follows:

public void consume(String topic) {

  consumer.subscribe(Collections.singletonList(topic), 
new MyConsumerRebalanceInterface());

  try {
    while (true) {
      ConsumerRecords<String, String> records = consumer.poll(100);

      for (ConsumerRecord<String, String> record : records) {
        System.out.println("Key: " + record.key());
        System.out.println("Value: " + record.value());

        HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();

        offsets.put(new TopicPartition(record.topic(), record.partition()),
            new OffsetAndMetadata(record.offset() + 1, "no metadata"));

        consumer.commitAsync(offsets, null);


      }


    }
  } finally {
    consumer.close();
  }


}

Now, let’s terminate all our previously started consumers and start them again. When starting the first consumer, we will see the following outputs on terminal:

[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator -
 Discovered coordinator 192.168.10.107:32772 (id: 2147482645 rack: null) 
for group MyConsumerGroup. 
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator -
 Revoking previously assigned partitions [] for group 
MyConsumerGroup I am losing the following partitions: 
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator -
 (Re-)joining group MyConsumerGroup 
I am starting on the following partitions:
 1 0 
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - 
Successfully joined group MyConsumerGroup with generation 21 
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator -
 Setting newly assigned partitions [test-1, test-0] 
for group MyConsumerGroup

That shows our listener was invoked. Let’s now start the second consumer and see what happens:

I am losing the following partitions: 
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator -
 Discovered coordinator 192.168.10.107:32772 
(id: 2147482645 rack: null) for group MyConsumerGroup. 
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator -
 Revoking previously assigned partitions [] for group 
MyConsumerGroup 
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator -
 (Re-)joining group MyConsumerGroup 
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator -
 Successfully joined group MyConsumerGroup with generation 22 
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator -
 Setting newly assigned partitions [test-0] 
for group MyConsumerGroup 
I am starting on the following partitions: 0

And finally, if we see the first consumer, we will see that both revoked and reassigned partitions were printed on console, showing our listener was implemented correctly:

[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator -
 (Re-)joining group MyConsumerGroup 
I am losing the following partitions: 1 0 
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator -
 Successfully joined group MyConsumerGroup with generation 22 
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator -
 Setting newly assigned partitions [test-1] 
for group MyConsumerGroup 
I am starting on the following partitions: 1

PS: Kafka works rebalancing by revoking all partitions and redistributing them. That’s why we see the first consumer losing all partitions before been reassigned to one of the old ones.

Log compaction

Log compaction is a powerful cleanup feature of Kafka. With log compaction, we define a  point from which messages from a same key on a same partition are compacted so only the more recent message is retained.

This is done by setting configurations that establish a compaction entry point and a retention entry point. This entry points consists of time periods, from which Kafka allow messages to keep coming from the producers, but at the same time removing old messages that doesn’t matter anymore. The following diagram explain the system on practice:

kafka-compaction

Kafka log compaction explained

In order to configure log compaction, we need to introduce some configurations both on cluster and topic. For the cluster, we change our docker compose YAML as follows:

version: '2'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
  kafka:
    image: wurstmeister/kafka
    ports:
      - "9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: ${MY_IP}
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_DELETE_TOPIC_ENABLE: "true"
      KAFKA_LOG_CLEANER_ENABLED: "true"

    volumes:
      - /var/run/docker.sock:/var/run/docker.sock

This change is needed due to log cleaning not been enabled by default on Kafka. Then, we change our topic configuration with the following new entries:

docker exec -t -i kafkalab_kafka_1 /opt/kafka/bin/kafka-configs.sh 
--zookeeper ${MY_IP}:2181 --entity-type topics --entity-name test --alter 
--add-config min.compaction.lag.ms=1800000,delete.retention.ms=172800000,
cleanup.policy=compact

On the command above, we set the compaction entry point – min.compaction.lag.ms – to 30 minutes, so all messages from the head of the stream to 30 minutes after will be on the dirty section. The other config stablished a retention period of 48 hours, so from 30 minutes up to 48 hours, all messages will be on the clean section, where compaction will occur. Messages older than 48 hours will be removed from the stream.

Lastly, we configured the cleanup policy, making compaction enabled. We can check our configs were successfully set by using the following command:

docker exec -t -i kafkalab_kafka_1 /opt/kafka/bin/kafka-configs.sh 
--zookeeper ${MY_IP}:2181 --entity-type topics --entity-name test --describe

Which will produce the following output:

Configs for topic 'test' are 
min.compaction.lag.ms=1800000,delete.retention.ms=172800000,
cleanup.policy=compact

One last thing we need to know before moving on to our next topic is that compaction also allows messages to be removed. If we want a message to be completely removed from our stream, all we need to do is send a message with his key, but with null as value. When sent this way with compaction enabled, it will remove all messages from the stream. This kind of messages are called tombstones on Kafka.

Kafka connect

Kafka connect is a integration framework, like others such as Apache Camel, that ships with Kafka – but runs on a cluster of his own – and allows us to quickly develop integrations from/to Kafka to other systems. It is maintained by Confluence.

This framework deserves a article of his own, so it won’t be covered here. If the reader wants to know more about it, please go to the following link:

https://docs.confluent.io/current/connect/intro.html

Kafka Streams

Kafka Streams is a framework shipped with Kafka that allows us to implement stream applications using Kafka. By stream applications, that means applications that have streams as input and output as well, consisting typically of operations such as aggregation, reduction, etc.

A typical example of a stream application is reading data from 2 different streams and producing a aggregated result from the two on a third stream.

This framework deserves a article of his own, so it won’t be covered here. If the reader wants to know more about it, please go to the following link:

https://kafka.apache.org/documentation/streams/

Kafka MirrorMaker

Kafka MirrorMaker is a tool that allows us to mirror Kafka clusters, by making copies from a source cluster to a target cluster, as messages goes in. As with Kafka connect and Streams, is a tool that deserves his own article, so it won’t be covered here. More information about it could be found on the following link:

https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=27846330

Kafka administration

Now that we covered most of the developing code to use Kafka, let’s see how to administrate a Kafka cluster. All commands for Kafka administration are done by their shell scripts, like we did previously on our study.

Kafka CRUD topic operations

Let’s begin with basic topic operations. Like we saw before, topics can be created with the following command:

docker exec -t -i kafkalab_kafka_1 /opt/kafka/bin/kafka-topics.sh 
--create --zookeeper ${MY_IP}:2181 --replication-factor 1 --partitions 2 
--topic test

Changing topics – not configurations, like we saw on log compaction, but the topic itself, such as the number of partitions  – are done with the same shell, just changing some options:

docker exec -t -i kafkalab_kafka_1 /opt/kafka/bin/kafka-topics.sh 
--alter --zookeeper ${MY_IP}:2181 --partitions 4 --topic test

IMPORTANT: changing partition numbers on topics also can change partition logic, meaning messages that always were sent to a same partition A can be now always sent to a partition B. This is important to watch out as can lead to message ordering issues if not taken with care.

We can search for topics by using the list command, like we did before:

docker exec -t -i kafkalab_kafka_1 /opt/kafka/bin/kafka-topics.sh 
--list --zookeeper ${MY_IP}:2181

If we want to delete a topic, we issue the following command. Take note that, if the configuration delete.topic.enabled is not set, the topic will just be marked for deletion, not removed:

docker exec -t -i kafkalab_kafka_1 /opt/kafka/bin/kafka-topics.sh 
--delete --zookeeper ${MY_IP}:2181 --topic test

Other Kafka admin operations

Let’s now see other Kafka admin operations. First, let’s create a new topic to test it out:

docker exec -t -i kafkalab_kafka_1 /opt/kafka/bin/kafka-topics.sh 
--create --zookeeper ${MY_IP}:2181 --replication-factor 1 
--partitions 3 --topic mytopic

The first operation we will see is preferred replica election. When Kafka creates a topic, at first, the partition leaders are spread out as evenly as possible, reducing impact risks on nodes going down. However, after some time, this distribution could be compromised, due to nodes going down and up several times, inducing on several rebalances. This could be specially problematic on a small cluster.

The preferred replica election operation tries to rebalance a topic to as closest as possible to his original partition leader distribution, solving the distribution problem. This is done with the following command:

docker exec -t -i kafkalab_kafka_1 
/opt/kafka/bin/kafka-preferred-replica-election.sh 
--zookeeper ${MY_IP}:2181

IMPORTANT: This command triggers rebalances on all topics from the cluster, so it must be used with care.

We can also trigger rebalance for just one topic, by writing a JSON file like this:

{
  "partitions": [
    {
      "partition": 1,
      "topic": "mytopic"
    },
    {
      "partition": 2,
      "topic": "mytopic"
    },
    {
      "partition": 3,
      "topic": "mytopic"
    }
  ]
}

And running the command like this:

docker exec -t -i kafkalab_kafka_1 
/opt/kafka/bin/kafka-preferred-replica-election.sh 
--zookeeper ${MY_IP}:2181 
--path-to-json-file rebalance-example.json

PS: Before running the command, it is necessary to copy the file to the container where it will run the command.

Another useful command is reassigning of replicas. This is useful, for example, if we want to isolate a broker from the cluster, that it will be removed for maintenance, or if a new broker is added and need to receive his share of topics in order to balance the cluster.

The first step is to generate a file that will be used to request a proposal to partition moves. We write the following file, calling “partition-req.json”:

{
 "topics": [
 {
 "topic": "mytopic"
 }
 ],
 "version": 1
}

On our stack, we have only 3 nodes, so reassign proposal can fail due to the cluster been so small. We change our start cluster shell as follows and run again:

#!/usr/bin/env bash

export MY_IP=`ip route get 1 | awk '{print $NF;exit}'`
docker-compose up -d --scale kafka=6

We then execute the following command. Remember to copy the file to the container first:

docker exec -t -i kafkalab_kafka_1 
/opt/kafka/bin/kafka-reassign-partitions.sh 
--zookeeper ${MY_IP}:2181 --generate 
--topics-to-move-json-file partition-req.json 
--broker-list 1004,1005,1006

IMPORTANT: We can copy the file as follows:

docker cp partition-req.json kafkalab_kafka_1:/partition-req.json

On the command above, we ask Kafka that we want to redistribute the replica set from the current brokers to the brokers 1004,1005 and 1006. We receive the following output, with the actual distribution and a proposed one:

Current partition replica assignment 
{"version":1,"partitions":[{"topic":"mytopic","partition":2,"replicas":[1001]
,"log_dirs":["any"]},{"topic":"mytopic","partition":1,"replicas":[1003],
"log_dirs":["any"]},{"topic":"mytopic","partition":0,"replicas":[1002],
"log_dirs":["any"]}]}

Proposed partition reassignment configuration 
{"version":1,"partitions":[{"topic":"mytopic","partition":2,"replicas":[1005]
,"log_dirs":["any"]},
{"topic":"mytopic","partition":1,"replicas":[1004],
"log_dirs":["any"]},{"topic":"mytopic","partition":0,"replicas":[1006],
"log_dirs":["any"]}]}

The first JSON can be saved for rolling back, in case anything goes wrong. Let’s save the second JSON on a file called replica-proposal.json:

{"version":1,"partitions":[{"topic":"mytopic","partition":2,
"replicas":[1005],"log_dirs":["any"]},
{"topic":"mytopic","partition":1,"replicas":[1004]
,"log_dirs":["any"]},{"topic":"mytopic","partition":0
,"replicas":[1006],"log_dirs":["any"]}]}

Finally, we run the replica assignment command, using the proposed distribution file as parameter – don’t forget to copy the file to the container first -, as follows:

docker exec -t -i kafkalab_kafka_1 
/opt/kafka/bin/kafka-reassign-partitions.sh 
--zookeeper ${MY_IP}:2181 --execute 
--reassignment-json-file replica-proposal.json

We will receive a output like this:

Current partition replica assignment {"version":1,"partitions":
[{"topic":"mytopic","partition":2,"replicas":[1001],"log_dirs":["any"]}
,{"topic":"mytopic","partition":1,"replicas":[1003],"log_dirs":["any"]}
,{"topic":"mytopic","partition":0,"replicas":[1002],"log_dirs":["any"]}]}

Save this to use as the --reassignment-json-file option during rollback

Successfully started reassignment of partitions.

This means that reassigning is been performed. During this phase, Kafka will redistribute the replicas and copy all data across the new brokers, so depending on the amount of data, this operation can take a lot of time. We can check the status of reassignment by running:

docker exec -t -i kafkalab_kafka_1 
/opt/kafka/bin/kafka-reassign-partitions.sh 
--zookeeper ${MY_IP}:2181 --verify 
--reassignment-json-file replica-proposal.json

When reassignment is finished, we will see the following:

Status of partition reassignment: 

Reassignment of partition mytopic-2 completed successfully

Reassignment of partition mytopic-1 completed successfully

Reassignment of partition mytopic-0 completed successfully

We can also check the status of our topics by running the describe command, as follows:

docker exec -t -i kafkalab_kafkadocker exec -t -i 
kafkalab_kafka_1 /opt/kafka/bin/kafka-topics.sh 
--zookeeper ${MY_IP}:2181 --describe

After our reassignment, it will output something like this:

Topic:mytopic PartitionCount:3 ReplicationFactor:1 Configs:

Topic: mytopicPartition: 0Leader: 1006Replicas: 1006Isr: 1006

Topic: mytopicPartition: 1Leader: 1004Replicas: 1004Isr: 1004

Topic: mytopicPartition: 2Leader: 1005Replicas: 1005Isr: 1005

Topic:test PartitionCount:2 ReplicationFactor:1 Configs:

Topic: testPartition: 0Leader: 1003Replicas: 1003Isr: 1003

Topic: testPartition: 1Leader: 1001Replicas: 1001Isr: 1001

Kafka offset lag

Kafka’s offset lag refers to a situation where we have consumers lagging behind the head of a stream. Let’s revisit one of our diagrams from the offsets explained section:

kafka-diagram-3

Consumers lagging behind on a stream

As we can see on the diagram above, we have 2 consumers groups in a stream. Consumer group 1 is 3 messages from the stream’s head, while consumer group 2 is 8 messages away. This difference between head and current position of a consumer on a stream is called offset lag.

The causes for a offset lag may vary, ranging from network problems to issues on the consumer application itself. It is important to keep this lag in check by monitoring it. One good tool for this is Burrow, provided by Linkedin. More information about it could be found on the following link:

https://github.com/linkedin/Burrow

Testing the cluster

It is important to test our cluster configuration, in order to verify how the cluster will behave on several situations, such as when brokers goes down – with partition leaderships or not -, new brokers goes in, etc.

We can code our own tests for this intent using the VerifiableProducer and VerifiableConsumer interfaces on Apache Kafka’s APIs. The usage for this interfaces are essentially the same as the original ones we saw on our lab.

There is also a read-to-use bash version of this interfaces, that can be used to make some testing. For example, if we wanted to test our cluster by sending 200000 messages to mytopic, we can something like this:

docker exec -t -i kafkalab_kafka_1 
/opt/kafka/bin/kafka-verifiable-producer.sh 
--topic mytopic --max-messages 200000 
--broker-list 
 ${MY_IP}:<a broker port>,${MY_IP}:<a broker port>

This will produce a output like the following:

{"timestamp":1516571263855,"name":"startup_complete"}

{"timestamp":1516571264213,"name":"producer_send_success","key":null,"value":"0","offset":0,"topic":"mytopic","partition":1}

{"timestamp":1516571264216,"name":"producer_send_success","key":null,"value":"3","offset":1,"topic":"mytopic","partition":1}

{"timestamp":1516571264216,"name":"producer_send_success","key":null,"value":"6","offset":2,"topic":"mytopic","partition":1}

{"timestamp":1516571264216,"name":"producer_send_success","key":null,"value":"9","offset":3,"topic":"mytopic","partition":1}

{"timestamp":1516571264216,"name":"producer_send_success","key":null,"value":"12","offset":4,"topic":"mytopic","partition":1}

{"timestamp":1516571264216,"name":"producer_send_success","key":null,"value":"15","offset":5,"topic":"mytopic","partition":1}

{"timestamp":1516571264217,"name":"producer_send_success","key":null,"value":"18","offset":6,"topic":"mytopic","partition":1}

{"timestamp":1516571264217,"name":"producer_send_success","key":null,"value":"21","offset":7,"topic":"mytopic","partition":1}

{"timestamp":1516571264218,"name":"producer_send_success","key":null,"value":"24","offset":8,"topic":"mytopic","partition":1}

{"timestamp":1516571264218,"name":"producer_send_success","key":null,"value":"27","offset":9,"topic":"mytopic","partition":1}

{"timestamp":1516571264218,"name":"producer_send_success","key":null,"value":"30","offset":10,"topic":"mytopic","partition":1}

{"timestamp":1516571264219,"name":"producer_send_success","key":null,"value":"33","offset":11,"topic":"mytopic","partition":1}

{"timestamp":1516571264220,"name":"producer_send_success","key":null,"value":"36","offset":12,"topic":"mytopic","partition":1}

{"timestamp":1516571264220,"name":"producer_send_success","key":null,"value":"39","offset":13,"topic":"mytopic","partition":1}

{"timestamp":1516571264220,"name":"producer_send_success","key":null,"value":"42","offset":14,"topic":"mytopic","partition":1}

{"timestamp":1516571264221,"name":"producer_send_success","key":null,"value":"45","offset":15,"topic":"mytopic","partition":1}

{"timestamp":1516571264224,"name":"producer_send_success","key":null,"value":"48","offset":16,"topic":"mytopic","partition":1}

{"timestamp":1516571264225,"name":"producer_send_success","key":null,"value":"51","offset":17,"topic":"mytopic","partition":1}

{"timestamp":1516571264225,"name":"producer_send_success","key":null,"value":"54","offset":18,"topic":"mytopic","partition":1}

...omitted...

{"timestamp":1516571272789,"name":"producer_send_success","key":null,"value":"199980","offset":66660,"topic":"mytopic","partition":1}

{"timestamp":1516571272789,"name":"producer_send_success","key":null,"value":"199983","offset":66661,"topic":"mytopic","partition":1}

{"timestamp":1516571272789,"name":"producer_send_success","key":null,"value":"199986","offset":66662,"topic":"mytopic","partition":1}

{"timestamp":1516571272789,"name":"producer_send_success","key":null,"value":"199989","offset":66663,"topic":"mytopic","partition":1}

{"timestamp":1516571272789,"name":"producer_send_success","key":null,"value":"199992","offset":66664,"topic":"mytopic","partition":1}

{"timestamp":1516571272789,"name":"producer_send_success","key":null,"value":"199995","offset":66665,"topic":"mytopic","partition":1}

{"timestamp":1516571272789,"name":"producer_send_success","key":null,"value":"199998","offset":66666,"topic":"mytopic","partition":1}

{"timestamp":1516571272803,"name":"shutdown_complete"}

{"timestamp":1516571272805,"name":"tool_data","sent":200000,"acked":200000,"target_throughput":-1,"avg_throughput":22346.368715083798}

And similarly, we can test the consumer by running something like this:

docker exec -t -i kafkalab_kafka_1 
/opt/kafka/bin/kafka-verifiable-consumer.sh 
--topic mytopic --max-messages 1000 
--group-id testing 
--broker-list ${MY_IP}:<a broker port>,${MY_IP}:<a broker port>

Which will output something like this:

{"timestamp":1516571973384,"name":"startup_complete"}

{"timestamp":1516571973534,"name":"partitions_revoked","partitions":[]}

{"timestamp":1516571973557,"name":"partitions_assigned","partitions":[{"topic":"mytopic","partition":2},{"topic":"mytopic","partition":1},{"topic":"mytopic","partition":0}]}

{"timestamp":1516571973669,"name":"records_consumed","count":500,"partitions":[{"topic":"mytopic","partition":1,"count":500,"minOffset":66667,"maxOffset":67166}]}

{"timestamp":1516571973680,"name":"offsets_committed","offsets":[{"topic":"mytopic","partition":1,"offset":67167}],"success":true}

{"timestamp":1516571973687,"name":"records_consumed","count":500,"partitions":[{"topic":"mytopic","partition":1,"count":500,"minOffset":67167,"maxOffset":67666}]}

{"timestamp":1516571973690,"name":"offsets_committed","offsets":[{"topic":"mytopic","partition":1,"offset":67667}],"success":true}

{"timestamp":1516571973692,"name":"records_consumed","count":500,"partitions":[]}

{"timestamp":1516571973692,"name":"offsets_committed","offsets":[],"success":true}

{"timestamp":1516571973694,"name":"records_consumed","count":500,"partitions":[]}

{"timestamp":1516571973694,"name":"offsets_committed","offsets":[],"success":true}

{"timestamp":1516571973696,"name":"records_consumed","count":500,"partitions":[]}

{"timestamp":1516571973696,"name":"offsets_committed","offsets":[],"success":true}

{"timestamp":1516571973697,"name":"records_consumed","count":500,"partitions":[]}

{"timestamp":1516571973697,"name":"offsets_committed","offsets":[],"success":true}

{"timestamp":1516571973698,"name":"records_consumed","count":500,"partitions":[]}

{"timestamp":1516571973699,"name":"offsets_committed","offsets":[],"success":true}

{"timestamp":1516571973700,"name":"records_consumed","count":500,"partitions":[]}

{"timestamp":1516571973700,"name":"offsets_committed","offsets":[],"success":true}

{"timestamp":1516571973701,"name":"records_consumed","count":500,"partitions":[]}

{"timestamp":1516571973702,"name":"offsets_committed","offsets":[],"success":true}

{"timestamp":1516571973702,"name":"records_consumed","count":500,"partitions":[]}

{"timestamp":1516571973703,"name":"offsets_committed","offsets":[],"success":true}

{"timestamp":1516571973704,"name":"records_consumed","count":500,"partitions":[]}

{"timestamp":1516571973704,"name":"offsets_committed","offsets":[],"success":true}

{"timestamp":1516571973705,"name":"records_consumed","count":500,"partitions":[]}

{"timestamp":1516571973705,"name":"offsets_committed","offsets":[],"success":true}

{"timestamp":1516571973706,"name":"records_consumed","count":500,"partitions":[]}

{"timestamp":1516571973706,"name":"offsets_committed","offsets":[],"success":true}

{"timestamp":1516571973708,"name":"records_consumed","count":500,"partitions":[]}

{"timestamp":1516571973708,"name":"offsets_committed","offsets":[],"success":true}

{"timestamp":1516571973709,"name":"records_consumed","count":500,"partitions":[]}

{"timestamp":1516571973709,"name":"offsets_committed","offsets":[],"success":true}

{"timestamp":1516571973710,"name":"records_consumed","count":500,"partitions":[]}

{"timestamp":1516571973711,"name":"offsets_committed","offsets":[],"success":true}

{"timestamp":1516571973714,"name":"records_consumed","count":500,"partitions":[]}

{"timestamp":1516571973714,"name":"offsets_committed","offsets":[],"success":true}

{"timestamp":1516571973715,"name":"records_consumed","count":500,"partitions":[]}

{"timestamp":1516571973715,"name":"offsets_committed","offsets":[],"success":true}

{"timestamp":1516571973716,"name":"records_consumed","count":500,"partitions":[]}

{"timestamp":1516571973716,"name":"offsets_committed","offsets":[],"success":true}
...omitted...

Conclusion

And that concludes our study of Apache Kafka. I hope to have passed for the reader a solid explanation of Kafka core concepts, as well as directions for complementary studies on his different usages and applications. Thank you for following me on this post, until next time.

Continue reading

Java 9: Learning the new features – part 4

Standard

Hi, dear readers! Welcome to my blog. On this post, last on our series, we will finally talk about the most known new feature of Java 9, Jigsaw. But after all, why do we need a module system? Let’s find out!

In the beginning

At Java’s beginnings, we have several types of ways to encapsulate applications. There is the most generic unit, know as JAR, and there’s also other more specific formats, such as WARs for web applications and EARs for Enterprise Java Beans (EJB) applications.

This applications, typically speaking, do not consist of only code that was written by the developers teams themselves: there is also a plethora of libraries and frameworks that are also imported, such as logging libraries, ORM frameworks, web frameworks, etc.

Generally speaking, each of this libraries and frameworks are packaged as JARs as well, and their dependencies are also packaged as JARs. This results on a scenario that we have a really big amount of dependencies included on a single application, just to make the whole thing work. The picture bellow shows a typical Spring Boot application’s classpath. It is possible to note the overwhelming mountain of dependencies:

Screen Shot 2017-11-01 at 22.38.00

Fragment of a typical Spring Boot Application dependencies list. It is 267 items long!

Jar hell

The situation stated previously leads us to the infamous Jar hell. This term refers to all problems the developers suffer across more then 20 years of Java, such as ClassNotFoundExceptions, when the application can’t found a certain class, or NoClassDefFoundError, when there’s multiple versions of the same class and the application can’t decide which version to use.

Encapsulation problems

Another problem we got is encapsulation. Once a  dependency is formed, all the classes from the imported package are accessible to the importer. Even if we declare a class with the default visibility access, it is still possible to access the class, just by using the same package name of the class we want to use – don’t try this at home, folks!.

This leads to poor possibilities on interface designs, since we can’t really avoid certain classes to not been exposed to the outside world.

Performance degradation

Another big problem is performance. This is specially felt on Java EE containers, since servers need to support a big list of features provided for applications.  It is true that we had efforts on the past to improve this situation, such as EAP profiles on JBoss server, but still, the situation was far from resolved.

This results on heavy, clunky servers, that can be slow to operate and specially to initialize, alongside intensely memory demanding.

Enter the modules

To solve all the problems we saw on the previous sections, on Java 9 we got Jigsaw, the new module system for Java.

With jigsaw, we can create modules from packages inside a application, allowing a much more coherent and organized structure. Not only that, with modules, we have to explicit declare what we want to expose from a module, so we also eliminate the encapsulation problems we talked about earlier.

This also helps with the performance degradation we just saw, since with modules the amount of classes and packages to be loaded from the servers can be significantly reduced, resulting and thinner servers.

So,let’s see how modules work on practice!

Creating a module

Let’s start by creating a simple project. the source code for  this lab is on this link, the project was created using Intellij IDEA.

To create a project, all we have to do is create a java file called module-info.java and place it at the root of the package structure we want to encapsulate on a module. The result is something like the image bellow:

jigsaw1

Inside the file, we define a module, that it is something like this:

module com.alexandreesl.application {
}

Now, the keyword module is reserved on Java. On the code above we defined a module which name must match the package’s name. That’s it! Our first module! Now, let’s see how to make this module to talk with other modules

Separating a application in different modules

Our sample application will consist of 4 modules: a main module, a dao module, a service module and a model module.

To create the different modules, all we have to do is create the different packages and module definitions – the module-info.java files – , creating the whole module structure.

The image bellow shows the structure:

jigsaw2

And the new module definitions are:

module com.alexandreesl.dao {
}
module com.alexandreesl.model {
}
module com.alexandreesl.service {
}

Exposing a module

Now that we have the modules defined, let’s start coding our project. Our project will represent a simple CRUD of books, for a Bookstore system.

Let’s start by coding the Model module. We will create a Book class, to represent books from the system.

The code for the class is shown bellow:

package com.alexandreesl.model;

public class Book {

    private Long id;

    private String name;

    private String author;

    public Long getId() {
        return id;
    }

    public void setId(Long id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getAuthor() {
        return author;
    }

    public void setAuthor(String author) {
        this.author = author;
    }
}

Then, we modify the module, to expose the model class:

module com.alexandreesl.model {

    exports com.alexandreesl.model;

}

Next, we code the DAO module. We will create  a interface and implementation for the module, separating each other by package segregation. We will also create a object factory.

This is the code for the interface, implementation and object factory of the dao module:

package com.alexandreesl.dao.interfaces;

import com.alexandreesl.model.Book;

public interface IBookDAO {

    void create(Book book);

    void update(Book book);

    Book find(Long id);


}
package com.alexandreesl.dao.impl;

import com.alexandreesl.dao.IBookDAO;
import com.alexandreesl.model.Book;

public class BookDAOImpl implements IBookDAO {
    @Override
    public void create(Book book) {

        System.out.println("INSERTED THE BOOK!");

    }

    @Override
    public void update(Book book) {

        System.out.println("UPDATED THE BOOK!");

    }

    @Override
    public Book find(Long id) {

        Book book = new Book();
        book.setId(id);
        book.setName("Elasticsearch: Consuming real-time data with ELK");
        book.setAuthor("Alexandre Eleutério Santos Lourenço");

        return book;
    }


}
package com.alexandreesl.dao.interfaces;

import com.alexandreesl.dao.impl.BookDAOImpl;

public class BookDAOFactory {

    public static IBookDAO getBookDAO() {

        return new BookDAOImpl();

    }

}

The image bellow shows the final structure of the model with the classes:

jigsaw3

To expose the model and also use the Book class from the Model module, we add the following lines to the module definition:

module com.alexandreesl.dao {

    requires com.alexandreesl.model;
    exports com.alexandreesl.dao.interfaces;

}

Here we can see a important advantage of modules: since we didn’t exported the impl package, the implementation won’t be exposed to code outside the module.

Now we code the service module. To simplify things up, we won’t create a interface-implementation approach this time, just a delegation class to the DAO layer. The code for the service class is shown bellow:

package com.alexandreesl.service;

import com.alexandreesl.dao.interfaces.BookDAOFactory;
import com.alexandreesl.dao.interfaces.IBookDAO;
import com.alexandreesl.model.Book;

public class BookService {

    private IBookDAO bookDAO;

    public BookService() {

        bookDAO = BookDAOFactory.getBookDAO();

    }

    public void create(Book book) {
        bookDAO.create(book);
    }

    public void update(Book book) {
        bookDAO.update(book);
    }

    public Book find(Long id) {
        return bookDAO.find(id);
    }


}

And the module changes are as follows:

module com.alexandreesl.service {

    requires com.alexandreesl.model;
    requires com.alexandreesl.dao;
    exports com.alexandreesl.service;

}

Finally, we code the main module, that it is simply a main method where we test it out our structure:

package com.alexandreesl.application;

import com.alexandreesl.model.Book;
import com.alexandreesl.service.BookService;

public class Main {

    public static void main(String[] args) {

        Book book = new Book();

        book.setAuthor("Stephen King");
        book.setId(1l);
        book.setName("IT - The thing");

        BookService service = new BookService();

        service.create(book);

        book.setName("IT");

        service.update(book);

        Book searchedBook = service.find(2l);

        System.out.println(searchedBook.getName());
        System.out.println(searchedBook.getAuthor());


    }

}

If we run our code, we will see that everything works, just as designed:

/Library/Java/JavaVirtualMachines/jdk-9.jdk/Contents/Home/bin/java "-javaagent:/Applications/IntelliJ IDEA CE.app/Contents/lib/idea_rt.jar=50683:/Applications/IntelliJ IDEA CE.app/Contents/bin" -Dfile.encoding=UTF-8 -p /Users/alexandrelourenco/Applications/git/JigsawLab9/out/production/application:/Users/alexandrelourenco/Applications/git/JigsawLab9/out/production/service:/Users/alexandrelourenco/Applications/git/JigsawLab9/out/production/dao:/Users/alexandrelourenco/Applications/git/JigsawLab9/out/production/model -m com.alexandreesl.application/com.alexandreesl.application.Main
INSERTED THE BOOK!
UPDATED THE BOOK!
Elasticsearch: Consuming real-time data with ELK
Alexandre Eleutério Santos Lourenço

Process finished with exit code 0

Please remember that, if the reader wants it, the code of this project is on Github, on this link.

Static dependencies

One thing that the reader may notice from our code, is that we needed to import the model module on each of the other modules of our system. This is because, as said before, no dependency required by a module is automatically inherited by another module on the hierarchy. All the requirements must be explicit declared to be linked.

However, on this case, if we wanted to declare the dependency on just one module and tell Java on the other modules that the dependency will be met later, we could use the static keyword. Static dependencies on Jigsaw are analogous to the provided scope on Maven, where a dependency is marked just for compilation requirements and it is assumed will be there when the code runs.

To make the changes so the model module is imported on just one module, we change all module definitions to the following:

module com.alexandreesl.application {

    requires com.alexandreesl.model;
    requires com.alexandreesl.service;

}
module com.alexandreesl.dao {

    requires static com.alexandreesl.model;
    exports com.alexandreesl.dao.interfaces;

}
module com.alexandreesl.service {

    requires static com.alexandreesl.model;
    requires com.alexandreesl.dao;
    exports com.alexandreesl.service;

}

If we run again our code, we will see that it runs successfully, just like before.

Package manager support

Since it is a new concept introduced just now, there is still some work undergoing on Java’s package management frameworks, such as Maven and Gradle. Keep it in mind that the objective of Jigsaw is not to replace package management systems.

Think of it more of a complement to this systems, with Jigsaw managing exposure and internal dependencies and the package systems managing issues such as packaging artifacts, running tests, etc.

If the reader is familiar with Gradle, there is some plugins already developed that integrates Jigsaw with it, like chainsaw:

https://github.com/zyxist/chainsaw

Conclusion

And so we conclude our Java 9 series. With several interesting new features, this new edition of Java proves not only that Java has still some relevance on the market, but also can still be evolved with the most modern practices on use. Thank you for following me on this post, until next time.

Java 9: Learning the new features – part 3

Standard

Hi, dear readers! Welcome to my blog. Continuing our series at the new features of Java 9, we will now talk about reactive streams, a new concept on parallel processing that promises to protect our applications from overflows of messages on processing. So, without further delay, let’s begin!

What is Reactive Streams

Let’s imagine a e-commerce that has to send some orders for a distribution center. The e-commerce and DC systems are apart from each other, been able to communicate by a REST service.

Normally, we could simply create a call from the e-commerce system to the DC. So, we implement the call and everything is fine.

However, some day we get a problem on our integration. We notice the e-commerce has overflowed the DC with lots of calls from a Black Friday’s sales, so the REST endpoint starts to fail and lose data.

This scenario illustrates a common integration problem: When a consumer has processing limitations to consume messages above a certain volume, we need to ensure the integration doesn’t end up overflowing the pipeline.

To tackle this problem, it was designed a pattern called Reactive Streams. With Reactive streams, the flow of processing is controlled by the Consumer, not the Publisher, that calls for more data to process as soon as it is ready, keeping his own pace. Not only that, we also have a feature called back pressure, that consists of a kind of throttling to ensure that the Publisher of the data will wait for the Consumer to be available before sending anymore messages and overflow the Consumer, just like we saw on our previous example.

The diagram bellow show us the new Flow API on Java 9, that allows us to implements Reactive Streams. We can see our consumer (the subscriber)  establishing a subscription with the producer and requesting n messages to consume, which are delivered to processing by the onNext method. Don’t worry about the rest of the details: we will see more on the next sections.

The reference site for this diagram, with another good tutorial on Reactive Streams, can be found on the references section at the end of the post.

Creating a Stream: the Publisher

First, let’s create our publisher. the simplest way to create a publisher is by using the SubmissionPublisher class.

Our lab will simulate the orders integration we talked about earlier. We will begin by creating a DTO to hold the data from our orders:

package com.alexandreesl.handson.model;

import java.math.BigDecimal;
import java.util.Date;
import java.util.List;

public class Order {

    private Long id;

    private List<String> products;

    private BigDecimal total;

    private Date orderDate;

    public Long getId() {
        return id;
    }

    public void setId(Long id) {
        this.id = id;
    }

    public List<String> getProducts() {
        return products;
    }

    public void setProducts(List<String> products) {
        this.products = products;
    }

    public BigDecimal getTotal() {
        return total;
    }

    public void setTotal(BigDecimal total) {
        this.total = total;
    }

    public Date getOrderDate() {
        return orderDate;
    }

    public void setOrderDate(Date orderDate) {
        this.orderDate = orderDate;
    }
}

Next, let’s instantiate our publisher, passing the message object DTO as generic:

public static void main(String[] args) {

    SubmissionPublisher<Order> submissionPublisher = new SubmissionPublisher<>();


}

That’s all we need to do for now with our publisher.

Creating a Stream: the Consumer

Now, let’s create our consumer, or subscriber in other words. For this, we will create a class called CDOrderConsumer that implements the Subscriber<T> interface:

package com.alexandreesl.handson.consumer;

import com.alexandreesl.handson.model.Order;

import static java.util.concurrent.Flow.Subscriber;
import static java.util.concurrent.Flow.Subscription;


public class CDOrderConsumer implements Subscriber<Order> {

    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {

        this.subscription = subscription;
        subscription.request(1);

    }

    @Override
    public void onNext(Order item) {
        System.out.println("I am sending the Order to the CD!");
        subscription.request(1);

    }

    @Override
    public void onError(Throwable throwable) {

        throwable.printStackTrace();

    }

    @Override
    public void onComplete() {

        System.out.println("All the orders were processed!");

    }
}

On this class we can see several methods implemented, which we saw on the previous diagram. We can explain each of them as:

  • onSubscribe: On this method, we receive a instance of subscription, which we use to request messages to the publisher. On our example, we stored the instance and requested 1 message to be processed – it is possible to establish a limit of more then 1 message per call, allowing the subscriber to process batches of messages  – with the subscription.request(n) method call;
  • onNext(T): On this method, we make the processing of the messages received. On our example, we print a message symbolizing the REST call and ask the publisher for another message;
  •  onError(Throwable throwable): On this method, we receive errors that can occur on the message processing. On our example, we simply print the errors;
  • onComplete(): This method is called after all messages are processed. On our example, we just print a message to signal the completion. It is important to note that, if we didn’t make the onNext(T) method to ask for other messages, this method would be called after the first message, since no more messages would be asked from the publisher;

Now that we understand how to implement our Subscribe, let’s try it out our stream by subscribing with the publisher and sending a message:

public static void main(String[] args) throws IOException {


    SubmissionPublisher<Order> submissionPublisher = new SubmissionPublisher<>();
    submissionPublisher.subscribe(new CDOrderConsumer());

    Order order = new Order();
    order.setId(1l);
    order.setOrderDate(new Date());
    order.setTotal(BigDecimal.valueOf(123));
    order.setProducts(List.of("product1", "product2", "product3"));


    submissionPublisher.submit(order);

    submissionPublisher.close();

    System.out.println("Waiting for processing.......");
    System.in.read();


}

On our script, we instantiate a publisher, create a order and submit the message for processing. Keep in mind that the submit call doesn’t mean that the message was sent to the subscriber: this is only done when the subscriber calls subscription.request(n). Lastly, we close the publisher, as we will not send any more messages.

Note: You may be thinking about why we put it that System.in.read() at the end. this is because all processing of the stream is done on a separate thread from the main one, so we need to make the program wait for the processing to complete, or else it will exit before the message is processed.

If we execute our program, we will see a output like this:

/Library/Java/JavaVirtualMachines/jdk-9.jdk/Contents/Home/bin/java "-javaagent:/Applications/IntelliJ IDEA CE.app/Contents/lib/idea_rt.jar=50218:/Applications/IntelliJ IDEA CE.app/Contents/bin" -Dfile.encoding=UTF-8 -classpath /Users/alexandrelourenco/Applications/git/ReactiveStreamsJava9/out/production/ReactiveStreamsJava9Lab com.alexandreesl.handson.Main
Waiting for processing.......
I am sending the Order to the CD!
All the orders were processed!

Success!!! Now we have a fully functional reactive stream, allowing us to process our messages.

Processors on Reactive Streams

Sometimes, on a stream, there will be logic that can be placed between the publisher and the consumer, such as filtering, transforming, and more. For this purpose, we can implement processors. Processors are like subscribers that also publish messages after the logic is applied. This way, processors can be chained together on a stream, executing one after another, before finally passing the message to a subscriber.

Let’s expand our previous example. We detected a bug on our e-commerce that sometimes places “phantom” orders with 0 total value on the stream. We didn’t identified the cause yet, but it is necessary to prevent this fake orders from been sent to the CD system. We can use a processor to filter this fake orders.

So, let’s implement the following class, OrderFilter, to accomplish this:

package com.alexandreesl.handson.processor;

import com.alexandreesl.handson.model.Order;

import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;


public class OrderFilter extends SubmissionPublisher<Order> implements Flow.Processor<Order, Order> {

    private Flow.Subscription subscription;


    @Override
    public void onSubscribe(Flow.Subscription subscription) {

        this.subscription = subscription;
        subscription.request(1);

    }

    @Override
    public void onNext(Order item) {

        if (item.getTotal().doubleValue() > 0) {

            submit(item);

        } else {

            System.out.println("INVALID ORDER! DISCARDING...");

        }

        subscription.request(1);


    }

    @Override
    public void onError(Throwable throwable) {

        throwable.printStackTrace();

    }

    @Override
    public void onComplete() {

        System.out.println("All the orders were processed!");

    }


}

On this class, we implement both publisher and subscriber interfaces. The code is basically the same from our subscriber, except that on the onNext(T) method we implement a logic that checks if a order has a total value bigger then 0. If it has, it is submitted to the subscriber, otherwise, it is discarded.

Next, we modify our code, subscribing the processor on our stream and testing it out with 2 orders, one valid and one fake:

public static void main(String[] args) throws IOException {

    SubmissionPublisher<Order> submissionPublisher = new SubmissionPublisher<>();
    OrderFilter filter = new OrderFilter();
    submissionPublisher.subscribe(filter);
    filter.subscribe(new CDOrderConsumer());

    Order order = new Order();
    order.setId(1l);
    order.setOrderDate(new Date());
    order.setTotal(BigDecimal.valueOf(123));
    order.setProducts(List.of("product1", "product2", "product3"));

    submissionPublisher.submit(order);

    order = new Order();
    order.setId(2l);
    order.setOrderDate(new Date());
    order.setProducts(List.of("product1", "product2", "product3"));

    order.setTotal(BigDecimal.ZERO);

    submissionPublisher.submit(order);

    submissionPublisher.close();

    System.out.println("Waiting for processing.......");
    System.in.read();

}

If we run the code, we will a message indicating that one of the messages was discarded, reflecting that our implementation was a success:

/Library/Java/JavaVirtualMachines/jdk-9.jdk/Contents/Home/bin/java "-javaagent:/Applications/IntelliJ IDEA CE.app/Contents/lib/idea_rt.jar=51469:/Applications/IntelliJ IDEA CE.app/Contents/bin" -Dfile.encoding=UTF-8 -classpath /Users/alexandrelourenco/Applications/git/ReactiveStreamsJava9/out/production/ReactiveStreamsJava9Lab com.alexandreesl.handson.Main
Waiting for processing.......
INVALID ORDER! DISCARDING...
I am sending the Order to the CD!
All the orders were processed!

The source code for our lab can be found here.

Conclusion

And so we conclude our learning on Reactive Streams. With a simple and intuitive approach, Reactive Streams are a good solution to try it out, specially on solutions that have capacity limitations. Please follow me next time for our last chapter on this series, where we will finally see the so famed new module system, Jigsaw. Thank you for following me on this post, until next time.

References

Reactive Streams (Wikipedia)

Reactive Streams Tutorial (another good tutorial to serve as guide)

Java 9: Learning the new features – part 2

Standard

Hi, dear readers! Welcome to my blog. On this post, we will continue our tour on Java 9, now focusing on what changed on Streams and Optionals.

Creating collections

Before Java 9, when we wanted to populate a collection with some data, we commonly would do this:

Map<Long,String> tasks = new HashMap<>();

tasks.put(1l,"Put trash on the street");
tasks.put(2l,"Buy bread");
tasks.put(3l,"Walk with the dog");
tasks.put(4l,"make dinner");

Of course, we could also create the collection like this:

Map<Long, String> tasks = new HashMap<>() {{

    put(1l, "Put trash on the street");
    put(2l, "Buy bread");
    put(3l, "Walk with the dog");
    put(4l, "make dinner");

}};

Still, it is quite a verbose way to create a collection. Finally, on Java 9, we can create a collection this way, much more cleaner:

Map<Long, String> tasks = Map.of(
        1l, "Put trash on the street",
        2l, "Buy bread",
        3l, "Walk with the dog",
        4l, "make dinner"

);

Two points are worth note about the of method, however:

  • There’s no way to choose which implementation will be used;
  • If there is any null values on the data, the creation will fail with a NullPointerException;

New collectors

Another good addition was the new collectors. With them, we can now apply filters or mappers to streams inside of the collecting. Let’s see some examples.

Let’s use the same tasks map from before. Let’s suppose the want a list of task values, filtered by only taks that doesn’t have the word dog. With the new collectors, we can accomplish this by doing:

tasks.values().stream().collect(Collectors.filtering(w -> !w.contains("dog"), Collectors.toList())).forEach(System.out::println);

If we execute the code, we will see that it will print all the tasks except the one about walling with the dog, as we expected.

Now, let’s see another example. Let’s suppose we want to create a list with only the first word of each task. This can be done by using the following code:

tasks.values().stream().collect(Collectors.mapping(w -> w.split(" ")[0], Collectors.toList())).forEach(System.out::println);

If we run the code, we will see that it will print a list with just the first words from the tasks, as we expected.

Iterating with streams

Another interesting new feature included was the dropwhile and takewhile operations. By using them, we can iterate sequentially on collections, discarding or including items while a predicate is not satisfied. Let’s see some examples.

Let’s begin by creating a collection for our tests:

List<String> words = List.of("we", "are", "testing", 
        "new", "features", "of", "Java", "9");

Now, let’s try the dropwhile:

words.stream().dropWhile(e -> !e.equals("new")).forEach(System.out::println);

The result if we execute our stream will be as follows:

new
features
of
Java
9

As we can see, it is the correct result, since we ordered the stream to drop items from our iteration while none of them are equals to “new”.

If we try the takewhile operation, with the same predicate, we will see that the stream will store the items while a item equals to “new” is not found, exactly as expected. This is the code modified for the new example:

words.stream().takeWhile(e -> !e.equals("new")).forEach(System.out::println);

And this is the new result:

we
are
testing

New features on Optionals

Optionals also get their share of improvements. Let’s begin with our previous example from the mapping collector.

Let’s suppose our tasks map uses Optionals for values instead of literal strings:

Map<Long, Optional<String>> tasks = Map.of(
        1l, Optional.ofNullable("Put trash on the street"),
        2l, Optional.ofNullable("Buy bread"),
        3l, Optional.ofNullable("Walk with the dog"),
        4l, Optional.ofNullable("make dinner")

);

If we wanted to use the same map to implement the previous stream, we would have to “extract” all the values from the optionals previous to using them on the stream. That is, until Java 9.

Now, we can implement the previous stream on this new scenario by doing this:

tasks.values().stream().flatMap(Optional::stream).collect(Collectors.mapping(w -> w.split(" ")[0], Collectors.toList())).forEach(System.out::println);

If we run our code, we will see that it will print the list with just the first words from the tasks, just like before.

Another good addition was the ifpresentorelse method. Now, if we need to implement logic that depends if a Optional is empty or not, we can just do:

myOptional.ifPresentOrElse(present -> System.out.println(present), () -> {
    System.out.println("nothing to do");
});

And even more interesting, now Optionals supports the or method, that allows us to create multiple fallback returns! We can see the method in action on the example bellow:

myOptional.or(() -> Optional.ofNullable("this is my first callback"))
        .or(() -> Optional.ofNullable("this is my second callback"))
        .or(() -> Optional.ofNullable("this is my third callback"))
        .or(() -> Optional.ofNullable("this is my fourth callback"));

Conclusion

And so we concluded another post from our series on the new features of Java 9. Please, stay tuned on my series, where we will talk about other features, such as the long waited Jigsaw. Thank you for following me on another post, thank you.

Java 9: Learning the new features – part 1

Standard

Hi, dear readers! Welcome to my blog. This is the first post from a series focused on studying the new features from Java 9.

After waiting so much time for some features like Jigsaw, the so-called Java module feature, Java 9 is finally upon us. Let’s begin our journey by exploring the new REPL console for the language, JShell!

Installing Java 9

To install Java 9, I recommend following the instructions on this link.

REPL

REPL is a acronym that stands for Read-Eval-Print-Loop. A REPL is a terminal where we can input commands and receive immediate feedback about the code we just entered.

The code is readed, his syntax is evaluated, then executed, the results are printed on the console and finally the terminal loops for the next command, hence concluding the execution, just like the acronym dictates.

Starting JShell

To start JShell, we just open a terminal and enter:

JShell

This will initialize the shell, as we can see bellow:

|  Welcome to JShell -- Version 9

|  For an introduction type: /help intro

jshell>

Just to finish our first glance at basic JShell commands, to exit the console, we just type:

jshell> /exit

|  Goodbye

Running commands

Now, let’s enter some commands. First, let’s create a String variable:

jshell> String myString = "Welcome to my JShell!"

myString ==> "Welcome to my JShell!"

jshell>

There’s two things we can notice on the code above: First, we don’t need to use a semicolon. Secondly, we can see the REPL in motion, as the code was processed and the results were printed on the console. If we just type the variable name, we can see that it will print his contents:

jshell> myString

myString ==> "Welcome to my JShell!"

jshell>

We can also use other types of commands as well, such as loops:

jshell> for (int i = 0;i < 10; i++)

   ...> System.out.println(i)

0

1

2

3

4

5

6

7

8

9

jshell>

It is also possible to make simple arithmetical operations. Let’s try a simple addition:

jshell> 1 + 2

$1 ==> 3

jshell>

Did you noticed we didn’t defined a variable? When we don’t include one, JShell do this for us, on this case, $1. This is defined by a $ followed by the command’s index, since JShell stores the commands of our session on a array-like structure.

We can see the command’s structure with the /list command, as follows:

jshell> /list

   1 : 1 + 2

   2 : String myString = "Welcome to my JShell!";

   3 : myString

   4 : for (int i = 0;i < 10; i++)

       System.out.println(i);

jshell>

Of course, variables implicit defined can also be used on other commands, as follows:

jshell> int i = $1 + 1

i ==> 4

jshell>

Editing scripts

JShell also allows us to edit and save scripts – snippets – of code, allowing us to create classes this way. Let’s see how to do it.

JShell comes with a editor, but it is also possible to change the editor for other of your choice. I will change my editor to Vim, using the following command:

jshell> /set editor vim

|  Editor set to: vim

jshell>

Now that our editor is changed, let’s begin by opening the command with the for loop on the editor – on my case, is the command at index 4:

jshell> /edit 4

This will open the snippet on Vim editor. Let’s edit the code as follows and save:

public class MyObject {

public static void myMethod() {

for (int i = 0;i < 10; i++)
System.out.println(i);

}

}

After saving, we will see a message indicating that the class was created:

jshell> /edit 4

|  created class MyObject

0

1

2

3

4

5

6

7

8

9

we can also discard the old code with the /drop command:

/drop 4

Now, let’s try to use our class on the shell:

jshell> MyObject.myMethod()

0

1

2

3

4

5

6

7

8

9

jshell>

As we can see, the code was correctly executed, proving that our class creation was a success.

Importing & Exporting

Importing and exporting is done by the /save and /open commands. Let’s run the following command:

/save <path-to-save>/backup.txt

The result will be a file like the following:

1 + 2
String myString = "Welcome to my JShell!";
myString
int i = $1 + 1;
public class MyObject {

public static void myMethod() {

for (int i = 0;i < 10; i++)
System.out.println(i);

}

}
MyObject.myMethod()

Now, let’s close the shell with the /exit command and open again, cleaning our session.

Now, let’s run the /open command to import our previous commands:

/open <path-to-save>/backup.txt

And finally, let’s run the /list command to see if the commands from our backup were imported:

jshell> /list

   1 : 1 + 2

   2 : String myString = "Welcome to my JShell!";

   3 : myString

   4 : int i = $1 + 1;

   5 : public class MyObject {

       

       public static void myMethod() {

       

       for (int i = 0;i < 10; i++)

       System.out.println(i);

       

       }

       

       }

   6 : MyObject.myMethod()

jshell>

We can see that our import was a success, successfully importing the commands from the session.

Other commands

Of course there are other commands alongside the ones showed on this post as well. A complete list of all the commands in JShell can be found on JShell’s documentation.

Conclusion

And so we conclude our first glimpse on the new features of Java 9. JShell is a interesting new addition to the Java language, allowing us to quickly test and run Java code. It is not a tool for production use, on my opinion, but it is a good tool for development and learning purposes. Thank you for following me on this post, until next time.

 

 

Apache Camel: integrating systems with Java

Standard

Hi, dear readers! Welcome to my blog. On this post, we will talk about Apache Camel, a robust solution for deploying system integrations across various technologies, such as REST, WS, JMS, JDBC, AWS Products, LDAP, SMTP, Elasticsearch etc.

So, let’s get start!

Origin

Apache Camel was created on Apache Service Mix. Apache Service Mix was a project powered by the Spring Framework and implemented following the JBI specification. The Java Business Integration specification specifies a plug and play platform for systems integrations, following the EIP (Enterprise Integration Patterns) patterns.

Terminology

Exchange

Exchanges – or MEPs(Message Exchange Patterns) – are like frames where we transport our data across the integrations on Camel. A Exchange can have 2 messages inside, one representing the input and another one representing the output of a integration.

The output message on Camel is optional, since we could have a integration that doesn’t have a response. Also, a Exchange can have properties, represented as key-value entries, that can be used as data that will be used across the whole route (we will see more about routes very soon)

Message

Messages are the data itself that is transferred inside a Camel route. A Message can have a body, which is the data itself and headers, which are, like properties on a Exchange, key-value entries that can be used along the processing.

One important aspect to keep in mind, however, is that along a Camel route our Messages are changed – when we convert the body with a Type converter, for instance – and when this happens, we lose all our headers. So, Message headers must be seen as ephemeral data, that will not be used through the whole route. For that type of data, it is better to use Exchange properties.

The Message body can de made of several types of data, such as binaries, JSON, etc.

Camel context

The Camel context is the runtime container where Camel runs it. It initializes type converters, routes, endpoints, EIPs etc.

A Camel context has 3 possible status: started, suspended and stopped. When started, the context will serve the routes processing as normal.

When on suspended status, the Camel context will stop the processing – after the Exchanges already on processing are completed – , but keep all the caches, resources etc still loaded. A suspended context can be restarted.

Finally, there’s the stop status. When stopped, the context will stop the processing like the suspended status, but also will release all the resources caches etc, making a complete shutdown. As with the suspended status, Camel will also guarantee that all the Exchanges being processing will be finished before the shutdown.

Route

Routes on Camel are the heart of the processing. It consists of a flow, that start on a endpoint, pass through a stream of processors/convertors and finishes on another endpoint. it is possible to chain routes by calling another route as the final endpoint of a previous route.

A route can also use other features, such as EIPs, asynchronous and parallel processing.

Channel

When Camel executes a route, the controller in which it executes the route is called Channel.

A Channel is responsible for chaining the processors execution, passing the Exchange from one to another, alongside monitoring the route execution. It also allow us to implement interceptors to run any logic on some route’s events, such as when a Exchange is going to a specific Endpoint.

Processor

Processors are the primary extension points on Camel. By creating classes that extend the org.apache.camel.Processor interface, we create programming units that we can use to include our own code on a Camel route, inside a convenient execute method.

Component

A Component act like a factory to instantiate Endpoints for our use. We don’t directly use a Component, we reference instead by defining a Endpoint URI, that makes Camel infer about the Component that it needs to be using in order to create the Endpoint.

Camel provides dozens of Components, from file to JMS, AWS Connections to their products and so on.

Registry

In order to utilize beans from IoC systems, such as OSGi, Spring and JNDI, Camel supplies us with a Bean Registry. The Registry’s mission is to supply the beans referred on Camel routes with the ones create on his associated context, such as a OSGi container, a Spring context etc

Type converter

Type converters, as the name implies, are used in order to convert the body of a message from one type to another. The uses for a converter are varied, ranging from converting a binary format to a String to converting XML to JSON.

We can create our own Type Converter by extending the org.apache.camel.TypeConverter  interface. After creating our own Converter by extending the interface, we need to register it on the Type’s Converter Registry.

Endpoint

A Endpoint is the entity responsible for communicating a Camel Route in or out of his execution process. It comprises several types of sources and destinations as mentioned before, such as SQS, files, Relational Databases and so on. A Endpoint is instantiated and configured by providing a URI to a Camel Route, following the pattern below:

component:option?option1=value1&option2=value2

We can create our own Components by extending the org.apache.camel.Endpoint interface. When extending the interface, we need to override 3 methods, where we supply the logic to create a polling consumer Endpoint, a passive consumer Endpoint and a producer Endpoint.

Lab

So, without further delay, let’s start our lab! On this lab, we will create a route that polls access files from a access log style file, sends the logs to a SQS and backups the file on a S3.

Setup

The setup for our lab is pretty simple: It is a Spring Boot application, configured to work with Camel. Our Gradle.build file is as follows:

apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'org.springframework.boot'
apply plugin: 'maven'
apply plugin: 'idea'

jar {
    baseName = 'apache-camel-handson'
    version = '1.0'
}

project.ext {
    springBootVersion = '1.5.4.RELEASE'
    camelVersion = '2.18.3'

}

sourceCompatibility = 1.8
targetCompatibility = 1.8

repositories {
    mavenLocal()
    mavenCentral()
}

bootRun {
    systemProperties = System.properties
}


dependencies {

    compile group: 'org.apache.camel', name: 'camel-spring-boot-starter', version: camelVersion
    compile group: 'org.apache.camel', name: 'camel-commands-spring-boot', version: camelVersion
    compile group: 'org.apache.camel',name: 'camel-aws', version: camelVersion
    compile group: 'org.apache.camel',name: 'camel-mail', version: camelVersion
    compile group: 'org.springframework.boot', name: 'spring-boot-autoconfigure', version: springBootVersion
    

}
group 'com.alexandreesl.handson'
version '1.0'

buildscript {
    repositories {
        mavenLocal()
        maven {
            url "https://plugins.gradle.org/m2/"
        }
        mavenCentral()
    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:1.5.4.RELEASE")
    }
}

And the Java main file is a simple Java Spring Boot Application file, as follows:

package com.alexandreesl.handson;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;

/**
 * Created by alexandrelourenco on 28/06/17.
 */

@ComponentScan(basePackages = {"com.alexandreesl.handson"})
@SpringBootApplication
@EnableAutoConfiguration
public class ApacheCamelHandsonApp {

    public static void main(String[] args) {
        SpringApplication.run(ApacheCamelHandsonApp.class, args);
    }

}

We also configure a configuration class, where we will register a type converter that we will create on the next section:

package com.alexandreesl.handson.configuration;

import org.apache.camel.CamelContext;
import org.apache.camel.spring.boot.CamelContextConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class CamelConfiguration {


    @Bean
    public CamelContextConfiguration camelContextConfiguration() {

        return new CamelContextConfiguration() {

            @Override
            public void beforeApplicationStart(CamelContext camelContext) {
               

            }

            @Override
            public void afterApplicationStart(CamelContext camelContext) {

            }

        };

    }

}

We also create a configuration which will create a AmazonS3Client and AmazonSQSClient, that will be used by the AWS-S3 and AWS-SQS Camel endpoints:

package com.alexandreesl.handson.configuration;

import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.internal.StaticCredentialsProvider;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.sqs.AmazonSQSClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;

@Configuration
public class AWSConfiguration {


    @Autowired
    private Environment environment;

    @Bean(name = "s3Client")
    public AmazonS3Client s3Client() {
        return new AmazonS3Client(staticCredentialsProvider()).withRegion(Regions.fromName("us-east-1"));
    }

    @Bean(name = "sqsClient")
    public AmazonSQSClient sqsClient() {
        return new AmazonSQSClient(staticCredentialsProvider()).withRegion(Regions.fromName("us-east-1"));
    }

    @Bean
    public StaticCredentialsProvider staticCredentialsProvider() {
        return new StaticCredentialsProvider(new BasicAWSCredentials("<access key>", "<secret access key>"));
    }

}

PS: this lab assumes that the reader is familiar with AWS and already has a account. For the lab, a bucket called “apache-camel-handson” and a SQS called “MyInputQueue” were created.

Configuring the route

Now that we have our Camel environment set up, let’s begin creating our route. First, we create a type converter called “StringToAccessLogDTOConverter” with the following code:

package com.alexandreesl.handson.converters;

import com.alexandreesl.handson.dto.AccessLogDTO;
import org.apache.camel.Converter;
import org.apache.camel.TypeConverters;

import java.util.StringTokenizer;

/**
 * Created by alexandrelourenco on 30/06/17.
 */

public class StringToAccessLogDTOConverter implements TypeConverters {

    @Converter
    public AccessLogDTO convert(String row) {

        AccessLogDTO dto = new AccessLogDTO();

        StringTokenizer tokens = new StringTokenizer(row);

        dto.setIp(tokens.nextToken());
        dto.setUrl(tokens.nextToken());
        dto.setHttpMethod(tokens.nextToken());
        dto.setDuration(Long.parseLong(tokens.nextToken()));

        return dto;

    }

}

Next, we change our Camel configuration, registering the converter:

package com.alexandreesl.handson.configuration;

import com.alexandreesl.handson.converters.StringToAccessLogDTOConverter;
import org.apache.camel.CamelContext;
import org.apache.camel.spring.boot.CamelContextConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class CamelConfiguration {


    @Bean
    public CamelContextConfiguration camelContextConfiguration() {

        return new CamelContextConfiguration() {

            @Override
            public void beforeApplicationStart(CamelContext camelContext) {

                camelContext.getTypeConverterRegistry().addTypeConverters(new StringToAccessLogDTOConverter());

            }

            @Override
            public void afterApplicationStart(CamelContext camelContext) {

            }

        };

    }

}

Our converter reads a String and converts to a DTO, with the following attributes:

package com.alexandreesl.handson.dto;

/**
 * Created by alexandrelourenco on 30/06/17.
 */
public class AccessLogDTO {

    private String ip;

    private String url;

    private String httpMethod;

    private long duration;

    public String getIp() {
        return ip;
    }

    public void setIp(String ip) {
        this.ip = ip;
    }

    public String getUrl() {
        return url;
    }

    public void setUrl(String url) {
        this.url = url;
    }

    public String getHttpMethod() {
        return httpMethod;
    }

    public void setHttpMethod(String httpMethod) {
        this.httpMethod = httpMethod;
    }

    public long getDuration() {
        return duration;
    }

    public void setDuration(long duration) {
        this.duration = duration;
    }

    @Override
    public String toString() {

        StringBuffer buffer = new StringBuffer();
        buffer.append("[");
        buffer.append(ip);
        buffer.append(",");
        buffer.append(url);
        buffer.append(",");
        buffer.append(httpMethod);
        buffer.append(",");
        buffer.append(duration);
        buffer.append("]");

        return buffer.toString();

    }
}

Finally, we have our route, defined on our RouteBuilder:

package com.alexandreesl.handson.routes;

import com.alexandreesl.handson.dto.AccessLogDTO;
import org.apache.camel.LoggingLevel;
import org.apache.camel.spring.SpringRouteBuilder;
import org.springframework.context.annotation.Configuration;

/**
 * Created by alexandrelourenco on 30/06/17.
 */

@Configuration
public class MyFirstCamelRoute extends SpringRouteBuilder {


    @Override
    public void configure() throws Exception {

        from("file:/Users/alexandrelourenco/Documents/apachecamelhandson?delay=1000&charset=utf-8&delete=true")
                .setHeader("CamelAwsS3Key", header("CamelFileName"))
                .to("aws-s3:arn:aws:s3:::apache-camel-handson?amazonS3Client=#s3Client")
                .convertBodyTo(String.class)
                .split().tokenize("\n")
                    .convertBodyTo(AccessLogDTO.class)
                    .log(LoggingLevel.INFO, "${body}")
                    .to("aws-sqs://MyInputQueue?amazonSQSClient=#sqsClient");

    }
}

On the route above, we define a file endpoint that will poll for files on a folder, each 1 second and remove the file if the processing is completed successfully. Then we send the file to Amazon using S3 as a backup storage.

Next, we split the file using a splitter, that generates a string for each line of the file. For each line we convert the line to a DTO, log the data and finally we send the data to a SQS.

Now that we have our code done, let’s run it!

Running

First, we start our Camel route. To do this, we simply run the main Spring Boot class, as we would do with any common Java program.

After firing up Spring Boot, we would receive on our console the output that the route was successful started:

. ____ _ __ _ _
 /\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/ ___)| |_)| | | | | || (_| | ) ) ) )
 ' |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot :: (v1.5.4.RELEASE)

2017-07-01 12:52:02.224 INFO 3042 --- [ main] c.a.handson.ApacheCamelHandsonApp : Starting ApacheCamelHandsonApp on Alexandres-MacBook-Pro.local with PID 3042 (/Users/alexandrelourenco/Applications/git/apache-camel-handson/build/classes/main started by alexandrelourenco in /Users/alexandrelourenco/Applications/git/apache-camel-handson)
2017-07-01 12:52:02.228 INFO 3042 --- [ main] c.a.handson.ApacheCamelHandsonApp : No active profile set, falling back to default profiles: default
2017-07-01 12:52:02.415 INFO 3042 --- [ main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@475e586c: startup date [Sat Jul 01 12:52:02 BRT 2017]; root of context hierarchy
2017-07-01 12:52:03.325 INFO 3042 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'org.apache.camel.spring.boot.CamelAutoConfiguration' of type [org.apache.camel.spring.boot.CamelAutoConfiguration$$EnhancerBySpringCGLIB$$72a2a9b] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2017-07-01 12:52:09.520 INFO 3042 --- [ main] o.a.c.i.converter.DefaultTypeConverter : Loaded 192 type converters
2017-07-01 12:52:09.612 INFO 3042 --- [ main] roperties$SimpleAuthenticationProperties :

Using default password for shell access: b738eab1-6577-4f9b-9a98-2f12eae59828




2017-07-01 12:52:15.463 WARN 3042 --- [ main] tarterDeprecatedWarningAutoConfiguration : spring-boot-starter-remote-shell is deprecated as of Spring Boot 1.5 and will be removed in Spring Boot 2.0
2017-07-01 12:52:15.511 INFO 3042 --- [ main] o.s.j.e.a.AnnotationMBeanExporter : Registering beans for JMX exposure on startup
2017-07-01 12:52:15.519 INFO 3042 --- [ main] o.s.c.support.DefaultLifecycleProcessor : Starting beans in phase 0
2017-07-01 12:52:15.615 INFO 3042 --- [ main] o.a.camel.spring.boot.RoutesCollector : Loading additional Camel XML routes from: classpath:camel/*.xml
2017-07-01 12:52:15.615 INFO 3042 --- [ main] o.a.camel.spring.boot.RoutesCollector : Loading additional Camel XML rests from: classpath:camel-rest/*.xml
2017-07-01 12:52:15.616 INFO 3042 --- [ main] o.a.camel.spring.SpringCamelContext : Apache Camel 2.18.3 (CamelContext: camel-1) is starting
2017-07-01 12:52:15.618 INFO 3042 --- [ main] o.a.c.m.ManagedManagementStrategy : JMX is enabled
2017-07-01 12:52:25.695 INFO 3042 --- [ main] o.a.c.i.DefaultRuntimeEndpointRegistry : Runtime endpoint registry is in extended mode gathering usage statistics of all incoming and outgoing endpoints (cache limit: 1000)
2017-07-01 12:52:25.810 INFO 3042 --- [ main] o.a.camel.spring.SpringCamelContext : StreamCaching is not in use. If using streams then its recommended to enable stream caching. See more details at http://camel.apache.org/stream-caching.html
2017-07-01 12:52:27.853 INFO 3042 --- [ main] o.a.camel.spring.SpringCamelContext : Route: route1 started and consuming from: file:///Users/alexandrelourenco/Documents/apachecamelhandson?charset=utf-8&delay=1000&delete=true
2017-07-01 12:52:27.854 INFO 3042 --- [ main] o.a.camel.spring.SpringCamelContext : Total 1 routes, of which 1 are started.
2017-07-01 12:52:27.854 INFO 3042 --- [ main] o.a.camel.spring.SpringCamelContext : Apache Camel 2.18.3 (CamelContext: camel-1) started in 12.238 seconds
2017-07-01 12:52:27.858 INFO 3042 --- [ main] c.a.handson.ApacheCamelHandsonApp : Started ApacheCamelHandsonApp in 36.001 seconds (JVM running for 36.523)

PS: Don’t forget it to replace the access key and secret with your own!

Now, to test it, we place a file on the polling folder. For testing, we create a file like the following:

10.12.64.3 /api/v1/test1 POST 123
10.12.67.3 /api/v1/test2 PATCH 125
10.15.64.3 /api/v1/test3 GET 166
10.120.64.23 /api/v1/test1 POST 100

We put a file with the content on the folder and after 1 second, the file is gone! Where did it go?

If we check the Amazon S3 bucket interface, we will see that the file was created on the storage:

 

Screen Shot 2017-07-01 at 13.12.39

And if we check the Amazon SQS interface, we will see 4 messages on the queue, proving that our integration is a success:

Screen Shot 2017-07-01 at 13.38.12

If we check the messages, we will see that Camel correctly parsed the information from the file, as we can see on the example bellow:

[10.12.64.3,/api/v1/test1,POST,123]

Implementing Error Handling

On Camel, we can implement logic designed for handling errors. These are done by defining routes as well, which inputs are the exceptions fired by the routes.

On our lab, let’s implement a error handling. First, we add a option on the file endpoint that makes the file to be moved to a .error folder when a error occurs, and then we send a email to ourselves to alert of the failure. we can do this by changing the route as follows:

package com.alexandreesl.handson.routes;

import com.alexandreesl.handson.dto.AccessLogDTO;
import org.apache.camel.LoggingLevel;
import org.apache.camel.spring.SpringRouteBuilder;
import org.springframework.context.annotation.Configuration;

/**
 * Created by alexandrelourenco on 30/06/17.
 */

@Configuration
public class MyFirstCamelRoute extends SpringRouteBuilder {


    @Override
    public void configure() throws Exception {

        onException(Exception.class)
                .handled(false)
                .log(LoggingLevel.ERROR, "An Error processing the file!")
                .to("smtps://smtp.gmail.com:465?password=xxxxxxxxxxxxxxxx&username=alexandreesl@gmail.com&subject=A error has occurred!");

        from("file:/Users/alexandrelourenco/Documents/apachecamelhandson?delay=1000&charset=utf-8&delete=true&moveFailed=.error")
                .setHeader("CamelAwsS3Key", header("CamelFileName"))
                .to("aws-s3:arn:aws:s3:::apache-camel-handson?amazonS3Client=#s3Client")
                .convertBodyTo(String.class)
                .split().tokenize("\n")
                    .convertBodyTo(AccessLogDTO.class)
                    .log(LoggingLevel.INFO, "${body}")
                    .to("aws-sqs://MyInputQueue?amazonSQSClient=#sqsClient");

    }
}

Then, we restart the route and feed up a file like the following, that will cause a parse exception:

10.12.64.3 /api/v1/test1 POST 123
10.12.67.3 /api/v1/test2 PATCH 125
10.15.64.3 /api/v1/test3 GET 166
10.120.64.23 /api/v1/test1 POST 10a

After the processing, we can see the console and watch how the error was handled:

2017-07-01 14:18:48.695  INFO 3230 --- [           main] o.a.camel.spring.SpringCamelContext      : Apache Camel 2.18.3 (CamelContext: camel-1) started in 11.899 seconds2017-07-01 14:18:48.695  INFO 3230 --- [           main] o.a.camel.spring.SpringCamelContext      : Apache Camel 2.18.3 (CamelContext: camel-1) started in 11.899 seconds2017-07-01 14:18:48.699  INFO 3230 --- [           main] c.a.handson.ApacheCamelHandsonApp        : Started ApacheCamelHandsonApp in 35.612 seconds (JVM running for 36.052)2017-07-01 14:18:52.737  WARN 3230 --- [checamelhandson] c.amazonaws.services.s3.AmazonS3Client   : No content length specified for stream data.  Stream contents will be buffered in memory and could result in out of memory errors.2017-07-01 14:18:53.105  INFO 3230 --- [checamelhandson] route1                                   : [10.12.64.3,/api/v1/test1,POST,123]2017-07-01 14:18:53.294  INFO 3230 --- [checamelhandson] route1                                   : [10.12.67.3,/api/v1/test2,PATCH,125]2017-07-01 14:18:53.504  INFO 3230 --- [checamelhandson] route1                                   : [10.15.64.3,/api/v1/test3,GET,166]2017-07-01 14:18:53.682 ERROR 3230 --- [checamelhandson] route1                                   : An Error processing the file!2017-07-01 14:19:02.058 ERROR 3230 --- [checamelhandson] o.a.camel.processor.DefaultErrorHandler  : Failed delivery for (MessageId: ID-Alexandres-MacBook-Pro-local-52251-1498929510223-0-9 on ExchangeId: ID-Alexandres-MacBook-Pro-local-52251-1498929510223-0-10). Exhausted after delivery attempt: 1 caught: org.apache.camel.InvalidPayloadException: No body available of type: com.alexandreesl.handson.dto.AccessLogDTO but has value: 10.120.64.23 /api/v1/test1 POST 10a of type: java.lang.String on: Message[ID-Alexandres-MacBook-Pro-local-52251-1498929510223-0-9]. Caused by: Error during type conversion from type: java.lang.String to the required type: com.alexandreesl.handson.dto.AccessLogDTO with value 10.120.64.23 /api/v1/test1 POST 10a due java.lang.NumberFormatException: For input string: "10a". Exchange[ID-Alexandres-MacBook-Pro-local-52251-1498929510223-0-10]. Caused by: [org.apache.camel.TypeConversionException - Error during type conversion from type: java.lang.String to the required type: com.alexandreesl.handson.dto.AccessLogDTO with value 10.120.64.23 /api/v1/test1 POST 10a due java.lang.NumberFormatException: For input string: "10a"]. Processed by failure processor: FatalFallbackErrorHandler[Pipeline[[Channel[Log(route1)[An Error processing the file!]], Channel[sendTo(smtps://smtp.gmail.com:465?password=xxxxxx&subject=A+error+has+occurred%21&username=alexandreesl%40gmail.com)]]]]
Message History---------------------------------------------------------------------------------------------------------------------------------------RouteId              ProcessorId          Processor                                                                        Elapsed (ms)[route1            ] [route1            ] [file:///Users/alexandrelourenco/Documents/apachecamelhandson?charset=utf-8&del] [      9323][route1            ] [convertBodyTo2    ] [convertBodyTo[com.alexandreesl.handson.dto.AccessLogDTO]                      ] [      8370][route1            ] [log1              ] [log                                                                           ] [         1][route1            ] [to1               ] [smtps://smtp.gmail.com:xxxxxx@gmail.com&subject=A error ha                    ] [      8366]
Stacktrace---------------------------------------------------------------------------------------------------------------------------------------
org.apache.camel.InvalidPayloadException: No body available of type: com.alexandreesl.handson.dto.AccessLogDTO but has value: 10.120.64.23 /api/v1/test1 POST 10a of type: java.lang.String on: Message[ID-Alexandres-MacBook-Pro-local-52251-1498929510223-0-9]. Caused by: Error during type conversion from type: java.lang.String to the required type: com.alexandreesl.handson.dto.AccessLogDTO with value 10.120.64.23 /api/v1/test1 POST 10a due java.lang.NumberFormatException: For input string: "10a". Exchange[ID-Alexandres-MacBook-Pro-local-52251-1498929510223-0-10]. Caused by: [org.apache.camel.TypeConversionException - Error during type conversion from type: java.lang.String to the required type: com.alexandreesl.handson.dto.AccessLogDTO with value 10.120.64.23 /api/v1/test1 POST 10a due java.lang.NumberFormatException: For input string: "10a"] at org.apache.camel.impl.MessageSupport.getMandatoryBody(MessageSupport.java:107) ~[camel-core-2.18.3.jar:2.18.3] at org.apache.camel.processor.ConvertBodyProcessor.process(ConvertBodyProcessor.java:91) ~[camel-core-2.18.3.jar:2.18.3] at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77) [camel-core-2.18.3.jar:2.18.3] at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:542) [camel-core-2.18.3.jar:2.18.3] at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:197) [camel-core-2.18.3.jar:2.18.3]

If we look to the folder, we will see that a .error folder was created and the file was moved to the folder:

Screen Shot 2017-07-01 at 14.25.18

And if we check the mailbox, we will see that we received the failure alert:

Screen Shot 2017-07-01 at 14.27.40

Conclusion

And so we conclude our tour through Apache Camel. With a easy-to-use architecture and dozens of components, it is a highly pluggable and robust option on integration developing. Thank you for following me on this post, until next time.