Blockchain: transferring & storing value securely

Standard

Hello, my friends! Welcome to my blog. In this post, we will delve into Blockchain, a technology that is pretty famous for its use with digital money. But is that all that it can be done with Blockchain? Let’s find out!

Bitcoin

Satoshi Nakamoto, at Budapest (credit)

The year was 2008. The global financial crisis brings down financial companies such as the Lehman Brothers. The crisis has spawned several discussions about economic systems, such as wealth centralization.

In a traditional economic system, we have money centralized in governments, moreover in banks, without real control of the capital’s owners, that is the people, from whom the capital was obtained.

With this in mind,  Satoshi Nakamoto – a person, or group of persons, since was never discovered the true identity – launched a paper proposing a peer-to-peer network payment system, where value (wealth) would be transferred by a decentralized, irreversible transaction network. With decentralized, it means there is no universal authority responsible for validating transactions, using a consensus mechanism, where the network itself validates the transactions.

And by irreversible, it means that, once a transaction is written on the network, it can’t be changed anymore. The database system, where the transactions are persisted, it is called Blockchain. A Blockchain is composed of blocks of transactions, linked together as a chain, hence the name. A block, in this context, consists of a data structure, like a row in an accounting book, or a register in a database, for example.

Also, since transactions can’t be tampered with, the Blockchain can be easily accessed, in an open, public way, typically with applications developed as browsers for the network, called Blockchain Explorers.

On a Blockchain, each user is identified by an address. An address consists of a hash id (more on that later), which can send and receive transactions, using a wallet. The currency inside the Blockchain has the same name as the technology, Bitcoin, with the “cents” equivalent being called Satoshis. Bitcoin uses 8 decimal places for value, allowing a much more precise value definition than traditional currencies.

On January 2009, Bitcoin was launched, with the first block in the network being mined by Nakamoto. Several people that were creators of digital currencies before Bitcoin have received coins, like Wei Dai and Nick Szabo. The first commercial transaction using Bitcoin was done in 2010 when two pizzas were bought in the Papa John’s Pizza food chain.

Picture of the historical day

Today, Bitcoin is the oldest and more valuable digital currency in the world, evaluating about 17,413.90 US dollars by today’s value (12-15-2022).

For achieving validation, Bitcoin uses a consensus mechanism called PoW (Proof-of-Work). This mechanism uses very CPU-intensive computations for defining the validator responsible for writing the next block on the chain (we will learn more about this soon). These computations are very hardware-demanding, incurring more processing power (hardware) and consequently, more energy to run that hardware. This incurs Bitcoin receiving heavy criticism about its environmental impact since lots of energy was spent on hardware to improve the chances of receiving transaction fees from the Blockchain.

For some years, Bitcoin reigned alone in the digital currencies ecosystem. However, as we will see next, that was about to change.

Ethereum

Vitalik Buterin, Ethereum’s founder (credit)

Ethereum was conceived in 2013, by russian-canadian programmer  Vitalik Buterin, alongside other founders such as Charles Hoskinson. The network was developed in 2014 and went live on July 2015. It is the second most valuable digital coin in the world, second only to Bitcoin.

Ethereum operates a Blockchain network following the same principles as Bitcoin, such as decentralized validation and immutability. However, Ethereum not only implements a digital asset, but also a framework for the implementation and deployment of Smart Contracts. Smart Contracts, as we will learn, consist of a schema for developing dApps (Decentralized Apps), that is, apps that run in the Blockchain. These apps manipulate tokens, allowing users to transfer and receive value in a secure and auditable way.

Recently in Ethereum’s history, a major event changed Ethereum’s technical architecture, called The Merge. This event made the change in the consensus mechanism used by Ethereum from PoW to PoS (Proof-of-Stake). This mechanism, which implements a much more eco-friendly approach, is claimed to have reduced Ethereum energy costs by 99,95%.

Ethereum revolutionized the way digital assets are implemented, allowing a booming of literal thousands of other coins, as we will see next.

AltCoins

Cardano, one of the most famous AltCoins (credit)

Altcoins, as the name implies, is used for referring to basically every coin alternative to Bitcoin. By supplying a tokenizing method, Ethereum opened the door to a digital coins explosion, with literally thousands of coins being offered at exchanges. According to coinmarketcap, there are about 22,088 coins currently in operation.

Is it true that some coins were created from Bitcoin forks, such as Monero, but is undeniable that Ethereum was the major player in this revolution. Famous coins that run on Ethereum are Tether, Cardano, and Solana. Cardano is one of the most valuable of Ethereum’s “children“, created by Charles Hoskinson, one of Ethereum’s co-founders.

Altcoins use different strategies, according to their “philosophies“. We have coins that are called private coins, that implement extra features focused on obfuscating the origin and destinations of Blockchain’s transactions, focusing on keeping the user’s privacies. These coins are particularly in check by governments, as this sadly also facilitates these coins to be used in illegal activities, such as money laundering. Examples of these coins are Monero and PIVX.

Other coins are called stablecoins, which implement different mechanisms with the promise of being “stable investments”. One example is Tether, a coin that promises to keep parity of 1 Tether for 1 US Dollar. A catastrophic event occurred in 2022, where the coin Terra Luna “stablecoin” crashed, losing much of its value. This is an alert for investment in cryptocurrencies, which must be done with caution and care, as it is a highly speculative business.

Blockchain concepts

As we learned before, a Blockchain consists of a block list, where each block has a link (actually, a hash, as we will learn later) for its previous block in the chain. This way, we ensure data can’t be changed once is properly written in a block.

Now, let’s make a tour of the computing concepts behind Blockchain, to get a deeper view of the technology.

Asymmetric cryptography

In computing, cryptography is used for encrypting data, in order to secure information from being read by malicious and/or unauthorized entities.

Asymmetric cryptography is a category of algorithms where we have two keys, a public key, and a private key. In these algorithms, the public key is used for encryption, and the private key is used for decryption. These keys are created at the same time, forming a key pair. Examples of algorithms that use this scheme are RSA, DSA, ECC, and ECDSA (the one used by Bitcoin).

Is not possible to decrypt data using a private key which encryption was not done by the correspondent public key, even if it is a valid private key, according to the algorithm. This way, only by using the correct key pair is it possible to make the encryption.

Another use case for asymmetric cryptography is for digital signatures. On algorithms like ECDSA, the private key is used for signing (generating a hash, based on data passed to the signature), and it can only be considered valid upon a validation method, using the public key. This way, only the correct private key can generate signatures that the correspondent public key can validate.

Now that we understood how asymmetric cryptography works, let’s learn how Blockchain uses it to leverage security for the users to send and receive their coins.

Wallet concepts

A wallet inside Bitcoin is a user’s identification and storage, allowing him to send and receive coins. A wallet consists of the following components:

  • A Blockchain address;
  • A private key, for generating signatures;
  • A PubKey script, for locking the coins from being spent without authorization.
Schematics of a Bitcoin wallet

When we open a Bitcoin client for the first time and ask to create a wallet for us, these components are created. As we learned before, private keys are used for signing transactions, and public keys are used for validating the signatures.

The reader is probably thinking “where is the public key?”. Bitcoin uses a slightly different approach. Instead of using just one private key and one public key for signing all wallet transactions, each transaction has its own private and public keys.

The private key created at the wallet creation is called the master private key. She is used for creating the key pairs used on the wallet transactions. The master private key can be restored by passing a seed, that consists of a string sequence used to recreate the key. These seeds are the famous secret passphrases supplied when the wallet is created.

If the secret passphrase is lost and the wallet is in a locked state, there is no way to recover the coins, resulting in the famous cases of people that lost thousands of dollars on irrecoverable bitcoins.

When the master private key is created for the first time, of course, there is also a public key generated. This public key is used for just one step before being thrown away, and that is to generate the Blockchain address. On Bitcoin’s implementation, the Blockchain address is basically just a compressed version of the public key, prefaced with “00“. The compression is done by using a base58check function.

Lastly, let’s talk about the PubKey script. Wallets implement a locking mechanism in order to secure the coins from being just “left open” for spending. When we want to spend coins from our wallet, the private key is supplied for the PubKey Script, which validates if the coins can be unlocked or not. The script uses a combination of the private key with all public keys from received transactions in the wallet, this way ensuring that only the owner of the wallet’s private key can unlock the coins for the transaction.

So basically, this is a wallet, an address, a group of private keys (the one created with the seed and all the ones used for sending coins), and a PubKey script for securing the coins from being spent from anyone else than the owner. To know the wallet’s balance, the wallet calculates the transactions received that contain unspent coins (we will know more about unspent transactions when learning about UTXO).

Now that we know about how the wallet works, let’s learn about how the blocks in the chain are formed.

Block structure

Block hashes & linkage

So, as we learned previously, Bitcoin’s wallet uses hashes to sign that their transactions are legit. To validate and link the blocks, the same principle is applied.

Bitcoin’s blocks are linked using the block’s header. A block header has two hashes, calculated using SHA-256. The first hash, called hashPrevBlock, as the name implies, is the hash of the previous block.

The block hash, which is used as a reference in the field, is calculated by applying a double SHA-256 on the block header data, that is the previous hash – the block hash from the previous block -, the Merkle root – more on that next – and other data, such as the timestamp from when the block was created. If a single bit of this data is changed, the resulting hash will be completely different from the original hash.

The other hash is the hashMerkleRoot. This hash is also a double SHA256, made by all transaction data, including the signature hashes from the transactions processed on this block.

The following diagram illustrates how the blockchain and the hashes work in practice:

Blockchain blocks connected by the hashes

So, as we can see, the link is established by hash references, as the next block in the chain has a reference to the previous one. This is what guarantees there is no tampering on the blockchain. Why? Let’s see an example.

Let’s suppose we want to overwrite one transaction on our middle block from our previous example. This would be the result of this tampering:

Blockchain blocks disconnected due to tampering

As we can see, when we change transaction data, we change the Merkle root, and also, as consequence, we change our block’s hash. And since the next block has a reference to the original block, not the new one, the tampering can’t enter the network, as it is not recognized by the network’s nodes. The process of making a block valid on Bitcoin is called mining, as we will see in the next sections, and involves computations to make it harder to achieve since is a remunerable activity.

And so, since is hard to achieve for just one block, imagine achieving the mining rights not only for the tampered block but also for all blocks upstream, all the way up to the chain’s head. Not to mention, new blocks are created about every 10 minutes, making it also a race to catch up with the head in order to successfully achieve the transaction adulteration.

All this effort to change the data is what makes Blockchain virtually immune to adulteration, making it a secure storage for immutable data, in this case, financial value.

Block transactions

A block on the Blockchain also has a transaction section. This section has a list of transactions, composed of inputs (the coins that will be used for sending in the block), outputs (the coins receivers), and hash signatures, from the addresses that are sending their coins, in order to validate their authenticity.

The following image shows an example transaction section of a Bitcoin block:

A block transaction section

We can see other information as well, such as the transaction fee, gained by the miner, and the number of inputs and outputs each transaction has. Finally, the coindays destroyed metric is used to measure how many days the coins remained on the input address before being spent. This is used to help track long-term investors and big players in the network’s economy.

Now that we have a general understanding of what a block is, how they are connected, and their structure, let’s understand what happens when we ask to send some coins from our addresses to another on Bitcoin, in order to understand the inner workings of the network in practice.

What happens when you ask to send a Bitcoin

Mining

Bitcoin’s blockchain network is composed of nodes. Nodes are responsible for validating the transactions inside a block, as well as agreeing upon a new block to be added to the chain. The act of generating a new block is called mining, hence the nodes are also called miners.

New blocks are mined every ten minutes. To generate a new block, all a miner has to do is generate a valid block hash, which must be a number below the target. SHA-256 hashes can be transformed into hexadecimal numbers, this way allowing the target’s comparison

The target is a global network parameter that defines the difficulty to generate a new block. This number dynamically fluctuates automatically, as the network increases or decreases in node numbers. This ensures the 10-minute block generation is maintained, and also ensures the increases won’t result in blocks being mined easier. This is important, due to mining results being a paid activity since the miner receives a fee for creating a new block.

Miners validate UTXO transactions, which are a special type of transaction that protects the system from a double spending exploit.

UTXO transactions

In the previous section, we talked about the double spending exploit. This exploit consists of using a delay in transaction validation on a financial system, making an account send the same value twice or more times to a given destination, making the multiple sents to be validated, by making the transactions quicker than the system can validate them.

Blockchain protects the system from this exploit by using UTXO (Unspent Transaction Output) transactions. UTXO transactions are transactions that contain unspent coins, that is, transactions that were received by one address and not sent to another.

The Blockchain network has a monitor, called UTXO Monitor, which is controlled by the nodes to keep track of the unspent transactions. Only transactions in this state can be accepted as inputs in a block. let’s see a real-life example to better understand how this works.

Let’s suppose Ana wants to pay $70 to Alexandre. Ana only has two money bills, each of $50. So, in order to pay Alexandre, Ana gives the two bills to Alexandre, as we can see below:

Ana gives the two $50 bills to Alexandre

However, since Ana wants to pay $70, not $100, Alexandre needs to give change to Ana, in this case, $30:

Alexandre gives back $30 to Ana as change

The same thing applies to UTXO transactions in Bitcoin. Suppose Ana wants to send 1 BTC to Alexandre. She has a UTXO transaction of 2 Bitcoins that she can supply as input and is willing to pay 0.01 BTC as the transaction fee. In this case, two UTXO transactions are generated as outputs, one of 1 BTC to Alexandre and one of 0.99 BTC to herself. This ensures that there is no double spending since the wallet’s balance can only be spent by resources that are monitored and agreed upon by the whole network.

The following image illustrates the UTXOs dynamic from the previous example:

UTXOs transactions in action

Transaction fees are defined by the clients and can be set on the wallet and changed from one transaction to another. Of course, miners can prioritize the transactions that offer the highest fees, so the higher the fee, the better the chance that the transaction will be chosen to be processed on the next block. All pending transactions are maintained on a pool in the network. The miners calculate how much fee they will receive on a transaction by summing it up all inputs and outputs involved in the transaction.

Inside Bitcoin’s architecture, there is a special event that happens every few years called the halving. In this event, a parameter is introduced in the miner’s fees calculation, in order to diminish the amount of the fees (on the next one, which will happen in a few years, it will drop by half the transaction fee values). This measure is made in order to ensure Bitcoin’s value, in order to slow down the coin’s supply. Think like measures done by governments to control inflation on fiat currencies.

So that’s how transactions work. But how the blocks itself are validated in order to enter the Blockchain?

How the block is validated

As we learned before, miners constantly try to generate valid hashes, that must also be lower than the global target setting. When a miner finds a valid hash for mining, it sends a broadcast of his valid block, with the block hash and all data it composes (both header and transactions section).

As miners receive the proposed new block, they validate not only the block hash but also all transactions added to the block. Is during this step that the miners validate if the transaction hashes (signatures) are valid, by using the public key.

This ensures that the coins are really sent by someone who has access to the wallet’s private keys since the public key and hash provided can only be generated by the wallet’s private key.

We could be thinking “what happens if more than one miner generates a valid block at the same time?”. In order to fix this problem, when a miner broadcasts a block, the miners on the network start signaling that accept the block, as they validate it.

If multiple miners broadcast valid blocks at the same time, we enter a race condition, where different blocks are validated by different miners at different paces. The first block that is validated by 50% of the nodes in the network is considered the winner of the “race”, so only he is accepted, and the miner receives his fee.

All the other blocks that are disputed to be valid are discarded, where they enter a state called orphan since the block created a chain that is not accepted by the network.

So, in a nutshell, this is what happens when we send Bitcoin to someone on the Blockchain:

  1. we send from our wallet UTXO transactions as input funds, alongside the amount to be sent, the destination, and finally, the proposed output UTXO transaction that is the “change” to give back to us, and sign the transaction;
  2. The transaction goes to a pool, where miners scan for transactions to process;
  3. A miner generates a valid block with our transaction, and broadcasts it to the network for validation;
  4. If sufficient miners validate the block, he is accepted and added to the chain;
  5. The funds are delivered to the destination. If there are more coins on the input than the amount sent plus the transaction fee, the change is delivered back to us as the output UTXO transaction, minus the miner fee, which is kept for the miner.

This comic stripe explains, in a humorous way, the process of a Bitcoin transaction:

Sending Bitcoin to someone (comic stripe)

As we saw before, this PoW consensus model used by Bitcoin is very hardware demanding, making it one the most detractive points of the technology. However, as new coins are born, new consensus models are been developed.

Consensus models

As new coins and Blockchain systems were born, so it emerged new consensus models. Let’s delve a little into some of the other ones that are currently in use.

The second most known consensus model is called PoS, or Proof of Stake. In this model, instead of generating hashes, the validators (on PoS blockchains, the nodes are called validators, as there is no mining in this case) are randomly chosen by the network to create the new block, so no hardware-intensive competition is done.

This model has other types of criticism, however, as each Blockchain tends to add randomizing parameters that are not standard to all networks, and not always is clear how it works. Another common criticism is that it incentives the eternal “division of class” inside the network, as the most common influence parameter to be selected to create a block is the number of coins inside the validator (hence the term stake, referring to the coins deposited inside the validator).

Another consensus model is PoH (Proof of History), where the validators must answer problems from the history of the Blockchain, like in a “quiz show” (a transaction is randomly selected and is questioned if it happened before or after another transaction. Whoever answer correctly first, wins the right to create the block). As this kind of problem is more memory-intensive than CPU-intensive, the energy cost that plagues the PoW model is not as problematic in this case.

As the technology evolves more and more, new models will appear, improving the robustness and solving the problems of today’s implementations.

Now that we take a grasp of Blockchain technology architecture, let’s move on to learn more about Ethereum, and its proposal to revolutionize secure applications. It is important to notice that, aside from differences here and there – Ethereum for example uses a different hash algorithm than Bitcoin – most of the concepts and design on all other crypto coins follow closely to what Bitcoin pioneered, making an excellent central point to learn the technology.

Ethereum & blockchain revolution

Smart Contracts (dApps)

Architecture of a Web3 application (credit)

Up until now, we have learned about Bitcoin primarily. Bitcoin is an architecture designed for a single purpose, which is for creating a cryptocurrency.

Ethereum, on the other hand, has a more diverse proposal. Not only is it a cryptocurrency, but also a platform for developing applications that run on top of the Blockchain. These apps are called dApps, or Decentralized Apps.

The term refers to the Blockchain architecture, which has decentralized processing, as we saw during our study.

These applications are developed by deploying a Smart Contract. A Smart Contract is a definition, much like an interface, that defines the functions that will be implemented by the application. The contract, alongside his implementation, is deployed on the Blockchain, much like an application package. dApps are developed using the Solidity language.

Considerations must be taken when developing a dApp. dApps doesn’t have access to external resources, that is, resources outside the network (also referred to as off-chain resources). All persistence is done at the Blockchain, which is persisted as tokens. The persistence on dApps is a major point to pay attention to. As Blockchain data can’t be changed after is validated, one mistake done at the persistence layer can only be solved without the implication of correcting historical data, as there is no possible way of changing already persisted data.

Another hyped term in this field is Web3. On Web3, we have dApps serving as the back-end to web applications. This expands tremendously the applications of Blockchain, by becoming a lot more accessible to users around the world.

dApps are a subject that deserves an article of their own. Stay tuned for an article specially focused on dApps in the future!

Tokens

Last section we talked about tokens. Tokens on Ethereum are the data that can be generated and/or distributed across addresses on the network. So, with this context in mind, we can say the coins themselves – called ether in Ethereum – are tokens, since, in the end, coins on the network are data inside the network, being transferred from address to address.

Ethereum introduces a new standard for tokens, called ERC-20. This standard defines a pattern for Fungible tokens. Fungible tokens are tokens that represent value, such as Ether – but they are not the same, as Ether is the standard token used for payment in the network, while fungible tokens are a representation of value built on top of the network -, that can be interchanged between addresses.

Many AltCoins that circulate today are, from the inside, Fungible Tokens inside Ethereum. Examples are:

  • Tether USD;
  • USD Coin;
  • Shiba Inu;
  • BNB;
  • etc.

NFTs

The Bored Ape Yacht Club NFT (credit)

Probably all of us have heard of NFTs. Traded at very high prices, we see people across the world paying for buying things like a monkey gif set, for example. But what are NFTs, after all?

NFT stands for Non-Fungible Token. That’s right, NFTs are nothing more than tokens inside Ethereum’s network. They are implemented by the ERC-721 standard. What makes them different from the ERC-20 ones is that they are the antithesis of the ERC-20 tokens: they are unique.

By unique, it means that they have properties that make them 100% different from any other token that can be created on Ethereum’s network. The standard establishes that metadata can be persisted at the Token’s creation and that no other token can be created with that same metadata.

These characteristics are what makes the NFTs evaluated so high: the exclusive ownership of something that no one else in the entire world can have.

Is not in the scope of this article to discuss the morality or meaning of the NFT market, so this subject won’t be touched. However, one thing to keep in mind is the applications that NFTs open for business, as the uniqueness of NFTs aligned with the secure and immutable nature of the Blockchain, allows us to build solutions where these types of tokens can be used to persist information, like documents, for example. One example is the mortgage company LoanSnap, which adopted NFTs to secure its mortgage contracts.

Private Blockchains

Kaleido

Up until now, we just talked about Ethereum’s public network. But that is not the only possible way to run and use the Ethereum network.

Ethereum is open-source software. That means that anyone can take some computers and run their own network – given time to study the complexities of running a network, of course.

As it is not the goal of a company to know the entrails of Blockchain’s network and some companies may have regulations that forbid the data to be available to the public, as much as Ethereum’s network is secure as it is, a private network – and consequently, Blockchain – is made necessary.

Some companies, like Kaleido, have cloud Blockchains available. Using cloud computing vendors such as AWS, allows the creation of private Blockchains, with NFTs and ERC-20 tokens as we see fit.

Consortiums

Consortiums are industry-wide groups, intended to establish standards for the adoption and integration of Blockchain applications across companies in the same business field. This reminds us of another good application for Blockchain: as a secure and auditable global network, it is possible to use Blockchain as an integration layer across companies.

There are several consortia from different business groups across the globe, such as BiTA (Logistics) and Hashed Health (HealthCare).

Conclusion

Phew, that was a lot to cover! Blockchain is a technology in his youth, full of concepts and potential. This article aimed to make a general view of the technology, as the basis for future specific studies in this field.

But this is not the end of our journey. In the future, we will revisit this world, this time focused on the dApps ecosystem and development. Thank you for following me one more time, until next time!

Acknowledgments

Thank you for following me and supporting this blog, by visiting and sharing the knowledge, it means very much to me! Also thanks to Loggi, which supported me in my Blockchain courses, it helped a lot in my studies, thank you!

And as we talked about blockchain and cryptocurrencies, If you kindly wanna support me on my adventures in producing content, feel free to donate a crypto coin to me. If you prefer to donate Neblios (NEBL), please donate to:

Ni239Wqwvp12eHNnbXGbo6BwFbBhwF13L5

And if you prefer to donate Ethereum (ETH), please donate to:

0x2D9A0781872aCe96365d2b20b3ce905E0473981D

Please keep in mind that this is entirely optional and it won’t stop me from producing content whatsoever, it is just a possible way to express recognition of my effort, and I will be very grateful on my part!

Thank you for following me on another post, until next time!

References

Bitcoin basics

What is UTXO

What are ERC-20 tokens

Bitcoin transactions validation

Akka & Akka Streams: Construa sistemas distribuídos com atores

Standard

Hello, dear readers! Welcome to my blog. In this post, I would like to share with you my new personal project: a new book! This book comes to join my first child, where I talk about Elasticsearch.

In this book, you will find all information necessary to learn about concurrency and Akka frameworks, that allow us to develop applications using the Actor model to handle massive loads of data, using techniques such as asynchronous processing and back-pressure.

The book uses real-life examples to demonstrate the concepts, making you develop applications such as integrations using Kafka and relational databases, and an E-commerce API.

The book can be found at:

Thank you for following me and supporting this blog, by visiting and sharing the knowledge, means very much to me!

If you kindly wanna support me on my adventures in producing content, feel free to donate a crypto coin to me. Blockchain is one of my new fields of interest right now, which I promise I will tackle in a future article!

If you prefer to donate Neblios (NEBL), please donate to:

Ni239Wqwvp12eHNnbXGbo6BwFbBhwF13L5

If you prefer to donate Ethereum (ETH), please donate to:

0x2D9A0781872aCe96365d2b20b3ce905E0473981D

Please keep in mind that this is entirely optional and it won’t stop me from producing content whatsoever, it is just a possible way to express recognition to my effort, and it will be very grateful on my part!

Thank you for following me on another post, until next time!

MongoDB aggregations: making powerful data-driven applications on MongoDB

Standard

Hello, dear readers! Welcome to my blog. On this post, we will talk about MongoDB aggregations, and how it could help us on developing powerful applications.

NoSQL

NoSQL (non-SQL) stands for databases that uses mechanisms that differ from traditional relational DBs. NoSQL DBs implement eventual consistency, which means there’s no read-after-write guarantee – but with clustering techniques, it is not typically a big deal – but with good clustering support, allows us to deploy robust databases with horizontal scalability and excellent performance, specially for data retrieval.

MongoDB

MongoDB is one of these databases that are classified as as NoSQL. It is document-based, meaning the data is stored as JSON-like objects (called BSONs, that internally are serialized as bytes to storage compression) in groups called collections, that are kind like tables in a database.

Used on thousands of companies across the globe, is one of the most popular NoSQL solutions around the world. It also provides a PAAS solution, called MongoDB Atlas.

Aggregations

Aggregations allows us to make operations upon data, like filtering, grouping, sums, counting, etc. At the end of the aggregation pipeline, we have our final, transformed, dataset with the results we want.

A good analogy to think about MongoDB aggregations is to imagine a conveyor belt in a factory. In this analogy, data are like parts been constructed in a factory: they got different construction stages, such as soldering, post manufactoring, etc. In the end, we have a finished part, ready to be shipped.

Now that we have the concepts effectively grasped, let’s begin to dive on the code!

Lab Setup

First, we need a dataset to make our aggregations. For this lab, we will use a dataset from fivethirtyeight, a popular site with public datasets for use in machine learning projects.

We will use a dataset containing lists of comic book characters from DC and Marvel up to 2014, compiled with their first appearance month/year and number of appearances since their debuts. The dataset is uploaded at the lab’s Github repository, and also can be found here.

The dataset comes in the format of two csv files. To import to MongoDB, we will use mongoimport. This link from Mongo’s documentation instruct how to install Mongo’s database tools.

We will use Docker to instantiate MongoDB database. In the repository we have a Docker Compose file ready to use to create the DB, by running:

docker-compose up -d

After that, we can connect to Mongo using Mongo Shell, by typing:

mongo

Next, let’s add the csvs to a new database called comic-book by inputing (from the project’s root folder):

mongoimport --db comic-book --collection DC --type csv --headerline --ignoreBlanks --file ./data/dc_wiki.csv
mongoimport --db comic-book --collection Marvel --type csv --headerline --ignoreBlanks --file ./data/marvel_wiki.csv

After running, in the shell, we can check everything was imported by running the following commands:

use comic-book
db.DC.findOne()
db.Marvel.findOne()

This would produce results like the following:

Let’s now understanding the meaning of our dataset’s fields. As stated in the dataset’s repository, the fields are:

VariableDefinition
page_idThe unique identifier for that characters page within the wikia
nameThe name of the character
urlslugThe unique url within the wikia that takes you to the character
IDThe identity status of the character (Secret Identity, Public identity, [on marvel only: No Dual Identity])
ALIGNIf the character is Good, Bad or Neutral
EYEEye color of the character
HAIRHair color of the character
SEXSex of the character (e.g. Male, Female, etc.)
GSMIf the character is a gender or sexual minority (e.g. Homosexual characters, bisexual characters)
ALIVEIf the character is alive or deceased
APPEARANCESThe number of appareances of the character in comic books (as of Sep. 2, 2014. Number will become increasingly out of date as time goes on.)
FIRST APPEARANCEThe month and year of the character’s first appearance in a comic book, if available
YEARThe year of the character’s first appearance in a comic book, if available

Now that we have our data ready, let’s begin aggregating!

Our first aggregations

For this lab, we will use Python 3.6. As a first example, let’s search for all Flashes there is on DC comics. For this, we create a generic runner structure, composed of a main script:

import re

from aggregations.aggregations import run


def main():
    run('DC', [{
        '$match': {
            'name': re.compile(r"Flash")
        }
    }])


if __name__ == "__main__":
    main()

And a generic aggregation runner, where we pass the pipeline to be executed as a parameter:

from pymongo import MongoClient
import pprint

client = MongoClient('mongodb://localhost:27017/?readPreference=primary&appname=MongoDB%20Compass&ssl=false')


def run(collection, pipeline):
    result = client['comic-book'][collection].aggregate(
        pipeline
    )

    pprint.pprint(list(result))

After running it, we will get the following results:

[{'ALIGN': 'Good Characters',
  'ALIVE': 'Living Characters',
  'APPEARANCES': 1028,
  'EYE': 'Blue Eyes',
  'FIRST APPEARANCE': '1956, October',
  'HAIR': 'Blond Hair',
  'ID': 'Secret Identity',
  'SEX': 'Male Characters',
  'YEAR': 1956,
  '_id': ObjectId('60358560856a9437c0892dfa'),
  'name': 'Flash (Barry Allen)',
  'page_id': 1380,
  'urlslug': '\\/wiki\\/Flash_(Barry_Allen)'},
 {'ALIGN': 'Good Characters',
  'ALIVE': 'Living Characters',
  'APPEARANCES': 14,
  'FIRST APPEARANCE': '2008, August',
  'SEX': 'Male Characters',
  'YEAR': 2008,
  '_id': ObjectId('60358560856a9437c08934ac'),
  'name': 'Well-Spoken Sonic Lightning Flash (New Earth)',
  'page_id': 87335,
  'urlslug': '\\/wiki\\/Well-Spoken_Sonic_Lightning_Flash_(New_Earth)'},
 {'ALIGN': 'Neutral Characters',
  'ALIVE': 'Living Characters',
  'APPEARANCES': 12,
  'FIRST APPEARANCE': '1998, June',
  'SEX': 'Male Characters',
  'YEAR': 1998,
  '_id': ObjectId('60358560856a9437c08935ac'),
  'name': 'Black Flash (New Earth)',
  'page_id': 22026,
  'urlslug': '\\/wiki\\/Black_Flash_(New_Earth)'},
 {'ALIGN': 'Bad Characters',
  'ALIVE': 'Living Characters',
  'APPEARANCES': 3,
  'FIRST APPEARANCE': '2007, November',
  'ID': 'Secret Identity',
  'SEX': 'Male Characters',
  'YEAR': 2007,
  '_id': ObjectId('60358560856a9437c0893f36'),
  'name': 'Bizarro Flash (New Earth)',
  'page_id': 32435,
  'urlslug': '\\/wiki\\/Bizarro_Flash_(New_Earth)'},
 {'ALIGN': 'Good Characters',
  'ALIVE': 'Living Characters',
  'EYE': 'Green Eyes',
  'FIRST APPEARANCE': '1960, January',
  'HAIR': 'Red Hair',
  'ID': 'Secret Identity',
  'SEX': 'Male Characters',
  'YEAR': 1960,
  '_id': ObjectId('60358560856a9437c08948d3'),
  'name': 'Flash (Wally West)',
  'page_id': 1383,
  'urlslug': '\\/wiki\\/Flash_(Wally_West)'}]

Lol, that’s a lot of Flashes! However, we also got some villains together, such as the strange Bizarro Flash. let’s improve our pipeline by filtering to only show the good guys:

[{
        '$match': {
            'name': re.compile(r"Flash ", flags=re.IGNORECASE),
            'ALIGN': 'Good Characters'
        }
    }]

If we run again, we can see characters such as Bizarro Flash no longer exist in our results.

We also had our first glance at our first pipeline stage, $match. This stage allows us to filter our results, by using the same kind of filtering expressions we can use in normal searches.

In MongoDB’s aggregations, an aggregation pipeline is composed of stages, which are the items inside an array that will be executed by Mongo. The stages are executed from first to last item in the array, with the output of one stage servings as input for the next.

As an example of how a pipeline can have multiple stages and how each stage handles the data from the previous one, let’s use another stage, $project. Project allows us to create new fields by applying operations to it (more in a minute) and also allow us to control which fields we want on our results. Let’s remove all other fields except for the name and first appearance to see this in practice:

[{
        '$match': {
            'name': re.compile(r"Flash ", flags=re.IGNORECASE),
            'ALIGN': 'Good Characters'
        }
    },
    {
        '$project': {
            'FIRST APPEARANCE': 1, 'name': 1, '_id': 0
        }
    }]

When defining which fields to maintain, we use 1 to define a field to be maintained and 0 to define fields that won’t be maintained (only the defined fields will be maintained, the only field we need to use 0 to define we don’t want is the object’s ID field, so we set 0 only for him).

This produces the following:

[{'FIRST APPEARANCE': '1956, October', 'name': 'Flash (Barry Allen)'},
 {'FIRST APPEARANCE': '2008, August',
  'name': 'Well-Spoken Sonic Lightning Flash (New Earth)'},
 {'FIRST APPEARANCE': '1960, January', 'name': 'Flash (Wally West)'}]

Now, let’s make another aggregation. In our dataset, the first appearance is a string following this pattern:

<year> , <month>

Not only that, we also have some “dirty” data, as there is records without this field, or records with just the year in numerical format. We need to do some cleaning, using just characters we have the information and transform the data to a standard numerical field, in order to make it more useful for our use cases.

Now, let’s say we want to know the names of all characters created in the comics silver age (a time-period that goes from 1956 to 1970). To do this, first we create a field with the first appearance’s year, like we said before, and then use the new field to filter only silver age characters.

We already have a field with the year properly defined on the dataset, but for exercise’s sake, let’s suppose we had only this string-formatted field available, in order to examine more features and simulate real-world scenarios where many times data has formatting and correctness issues.

Let’s begin by creating our new field:

[
        {
        '$match': {
            'FIRST APPEARANCE': {'$exists': True}
        }
        },
        {
            '$project': {
                '_id': 0,
                'name': 1,
                'year': {'$toInt': {'$arrayElemAt': [{'$split': [{'$toString': '$FIRST APPEARANCE'}, ","]}, 0]}}
            }
        }
    ]

Lol, that’s some transforming! We filter the records to just use the ones that have the field, and in project stage we make conversions to allows us to extract the year from the field and convert him to a number. This produces something like this fragment:

 ...
 {'name': 'Tempus (New Earth)', 'year': 1997},
 {'name': 'Valkyra (New Earth)', 'year': 1997},
 {'name': 'Spider (New Earth)', 'year': 1997},
 {'name': 'Vayla (New Earth)', 'year': 1997},
 {'name': 'Widow (New Earth)', 'year': 1997},
 {'name': "William O'Neil (New Earth)", 'year': 1997},
 {'name': 'Arzaz (New Earth)', 'year': 1996},
 {'name': 'Download II (New Earth)', 'year': 1996}
...

With everything transformed, we only need to filter the year to get our silver age characters:

        [{
            '$match': {
                'FIRST APPEARANCE': {'$exists': True}
            }
        },
        {
            '$project': {
                '_id': 0,
                'name': 1,
                'year': {'$toInt': {'$arrayElemAt': [{'$split': [{'$toString': '$FIRST APPEARANCE'}, ","]}, 0]}}
            }
        },
        {
            '$match': {
                'year': {"$gte": 1956, "$lte": 1970}
            }
        }]

This produces the following list (just a fragment, due to size):

[{'name': 'Dinah Laurel Lance (New Earth)', 'year': 1969},
 {'name': 'Flash (Barry Allen)', 'year': 1956},
 {'name': 'GenderTest', 'year': 1956},
 {'name': 'Barbara Gordon (New Earth)', 'year': 1967},
 {'name': 'Green Lantern (Hal Jordan)', 'year': 1959},
 {'name': 'Raymond Palmer (New Earth)', 'year': 1961},
 {'name': 'Guy Gardner (New Earth)', 'year': 1968},
 {'name': 'Garfield Logan (New Earth)', 'year': 1965},
 {'name': 'Ralph Dibny (New Earth)', 'year': 1960},
...

Talking about big lists, one feature that is good to keep in mind when aggregating datasets, is that there is a memory limit imposed by MongoDB for doing in-memory aggregation (the default behavior). To allow MongoDB to use disk swap when aggregating, we add the following option when running the pipeline:

result = client['comic-book'][collection].aggregate(
        pipeline, allowDiskUse=True
    )

By running again, we can see everything is still working as expected.

We also can notice the results are not sorted. To sort by the name, we can add a $sort stage:

        [{
            '$match': {
                'FIRST APPEARANCE': {'$exists': True}
            }
        },
        {
            '$project': {
                '_id': 0,
                'name': 1,
                'year': {'$toInt': {'$arrayElemAt': [{'$split': [{'$toString': '$FIRST APPEARANCE'}, ","]}, 0]}}
            }
        },
        {
            '$match': {
                'year': {"$gte": 1956, "$lte": 1970}
            }
        },
        {
            '$sort': {
                'name': 1
            }
        }]

The $sort stage receives fields definitions, where 1 means ascending order and -1 means descending. After running, we can see the results were sorted:

[{'name': 'Abel (New Earth)', 'year': 1969},
 {'name': 'Abel Tarrant (New Earth)', 'year': 1963},
 {'name': 'Abin Sur (New Earth)', 'year': 1959},
 {'name': 'Abnegazar (New Earth)', 'year': 1962},
 {'name': 'Abner Krill (New Earth)', 'year': 1962},
 {'name': 'Ace Arn (New Earth)', 'year': 1965},
 {'name': 'Ace Chance (New Earth)', 'year': 1966},
 {'name': 'Achilles Milo (New Earth)', 'year': 1957},
 {'name': 'Adam Strange (New Earth)', 'year': 1958},
 {'name': 'Agantha (New Earth)', 'year': 1964},
 {'name': 'Ahk-Ton (New Earth)', 'year': 1965},
 {'name': 'Alanna Strange (New Earth)', 'year': 1958},
 {'name': 'Albert Desmond (New Earth)', 'year': 1958},
 {'name': 'Albrecht Raines (New Earth)', 'year': 1958},
 {'name': 'Alpheus Hyatt (New Earth)', 'year': 1962},
 {'name': 'Aluminum (New Earth)', 'year': 1963},
 {'name': 'Amazo (New Earth)', 'year': 1960},
 {'name': 'Amos Fortune (New Earth)', 'year': 1961},
 {'name': 'Anais Guillot (New Earth)', 'year': 1959},
...

Now, let’s suppose we just wanted to count how many characters from silver age are on DC, not know their names or any other data. We could do this by adding a $count stage:

[{
            '$match': {
                'FIRST APPEARANCE': {'$exists': True}
            }
        },
        {
            '$project': {
                '_id': 0,
                'name': 1,
                'year': {'$toInt': {'$arrayElemAt': [{'$split': [{'$toString': '$FIRST APPEARANCE'}, ","]}, 0]}}
            }
        },
        {
            '$match': {
                'year': {"$gte": 1956, "$lte": 1970}
            }
        },
        {
            '$count': 'total'
        }]

Count only ask us to define a name for the count. This would produce the following:

[{'total': 556}]

Grouping data together

Let’s start using Marvel collection now. To know how many characters there are in Marvel according to their aligns (neutral, good or bad), we use a $group stage to group the data by the align and use a accumulator to count the characters:

[
        {
            '$match': {
                'ALIGN': {'$exists': True}
            }
        },
        {
            '$group': {
                '_id': '$ALIGN',
                'count': {'$sum': 1}
            }
        }
    ]

Yes, is simple as that! This produces the following:

[{'_id': 'Good Characters', 'count': 4636},
 {'_id': 'Bad Characters', 'count': 6720},
 {'_id': 'Neutral Characters', 'count': 2208}]

Now, let’s try a different example. We want to count all characters created by Marvel, breaking down by decade.

To make this grouping, we will use the $bucket stage, as follows:

[{
            '$bucket': {
                'groupBy': "$Year",
                'boundaries': [1930, 1940, 1950, 1960, 1970, 1980, 1990, 2000, 2010, 2020],
                'default': "Unknown",
                'output': {
                    'count': {'$sum': 1}
                }
            }
        }]

Here we have a groupBy field to define which field to use as bucket selector. We define boundaries for the buckets and a default in case a bucket can’t be defined. Finally, we have an output, where we define aggregators to be executed for each bucket.

This produces the following results:

[{'_id': 1930, 'count': 69},
 {'_id': 1940, 'count': 1441},
 {'_id': 1950, 'count': 302},
 {'_id': 1960, 'count': 1306},
 {'_id': 1970, 'count': 2234},
 {'_id': 1980, 'count': 2425},
 {'_id': 1990, 'count': 3657},
 {'_id': 2000, 'count': 3086},
 {'_id': 2010, 'count': 1041},
 {'_id': 'Unknown', 'count': 815}]

Another type of bucketing is by using the $bucketAuto stage. This stage allows us to let MongoDB do the grouping for us, without needing to define the boundaries. Let’s try it out with DC:

[{
            '$bucketAuto': {
                'groupBy': "$YEAR",
                'buckets': 10,
                'output': {
                    'count': {'$sum': 1}
                }
            }
        }]

This produces:

[{'_id': {'max': 1965, 'min': None}, 'count': 702},
 {'_id': {'max': 1981, 'min': 1965}, 'count': 703},
 {'_id': {'max': 1987, 'min': 1981}, 'count': 779},
 {'_id': {'max': 1990, 'min': 1987}, 'count': 806},
 {'_id': {'max': 1994, 'min': 1990}, 'count': 707},
 {'_id': {'max': 1998, 'min': 1994}, 'count': 779},
 {'_id': {'max': 2004, 'min': 1998}, 'count': 791},
 {'_id': {'max': 2008, 'min': 2004}, 'count': 752},
 {'_id': {'max': 2011, 'min': 2008}, 'count': 716},
 {'_id': {'max': 2013, 'min': 2011}, 'count': 161}]

By default, Mongo will try to spread the buckets the more evenly as possible. We can define a field called granularity to better restrict how we want to group:

[{
            '$match': {
                'YEAR': {'$exists': True}
            }
        },
        {
            '$bucketAuto': {
                'groupBy': '$YEAR',
                'buckets': 10,
                'granularity': 'E192',
                'output': {
                    'count': {'$sum': 1}
                }
            }
        }]

This defines a preferred time series to round the buckets and calculate the edges. More info can be found at the documentation. One important thing to note is that granularity must only be used on numeric buckets and must not contain data without the field (that’s why we introduced the filter).

This produces:

[{'_id': {'max': 1980.0, 'min': 1930.0}, 'count': 1300},
 {'_id': {'max': 2000.0, 'min': 1980.0}, 'count': 3429},
 {'_id': {'max': 2030.0, 'min': 2000.0}, 'count': 2098}]

Faceting & persisting the results

Now, let’s make a report where we have some of our previous aggregations grouped in a single result, like a report. We can do this by creating facet stages, which act like independent pipelines that will be grouped as a single result at the end.

This can be achieved by adding the following facets:

[
        {
            '$facet': {
                'Silver age characters': [
                    {
                        '$match': {
                            'FIRST APPEARANCE': {'$exists': True}
                        }
                    },
                    {
                        '$project': {
                            '_id': 0,
                            'name': 1,
                            'year': {
                                '$toInt': {'$arrayElemAt': [{'$split': [{'$toString': '$FIRST APPEARANCE'}, ","]}, 0]}}
                        }
                    },
                    {
                        '$match': {
                            'year': {"$gte": 1956, "$lte": 1970}
                        }
                    },
                    {
                        '$sort': {
                            'name': 1
                        }
                    }
                ],
                'Characters by decade': [
                    {
                        '$bucket': {
                            'groupBy': "$YEAR",
                            'boundaries': [1930, 1940, 1950, 1960, 1970, 1980, 1990, 2000, 2010, 2020],
                            'default': "Unknown",
                            'output': {
                                'count': {'$sum': 1}
                            }
                        }
                    }
                ]
            }
        }
    ]

This produces results like the following, running for DC:

[{'Characters by decade': [{'_id': 1930, 'count': 42},
                           {'_id': 1940, 'count': 268},
                           {'_id': 1950, 'count': 121},
                           {'_id': 1960, 'count': 453},
                           {'_id': 1970, 'count': 416},
                           {'_id': 1980, 'count': 1621},
                           {'_id': 1990, 'count': 1808},
                           {'_id': 2000, 'count': 1658},
                           {'_id': 2010, 'count': 440},
                           {'_id': 'Unknown', 'count': 69}],
  'Silver age characters': [{'name': 'Abel (New Earth)', 'year': 1969},
                            {'name': 'Abel Tarrant (New Earth)', 'year': 1963},
                            {'name': 'Abin Sur (New Earth)', 'year': 1959},
                            {'name': 'Abnegazar (New Earth)', 'year': 1962},
                            {'name': 'Abner Krill (New Earth)', 'year': 1962},
                            {'name': 'Ace Arn (New Earth)', 'year': 1965},
...

Now, what if we wanted to persist this report, without having to resort to -re-run the aggregation? For this, we add a $out stage, which persist the aggregation’s results on a collection. Let’s change our pipeline like this:

[
        {
            '$facet': {
                'Silver age characters': [
                    {
                        '$match': {
                            'FIRST APPEARANCE': {'$exists': True}
                        }
                    },
                    {
                        '$project': {
                            '_id': 0,
                            'name': 1,
                            'year': {
                                '$toInt': {'$arrayElemAt': [{'$split': [{'$toString': '$FIRST APPEARANCE'}, ","]}, 0]}}
                        }
                    },
                    {
                        '$match': {
                            'year': {"$gte": 1956, "$lte": 1970}
                        }
                    },
                    {
                        '$sort': {
                            'name': 1
                        }
                    }
                ],
                'Characters by decade': [
                    {
                        '$bucket': {
                            'groupBy': "$YEAR",
                            'boundaries': [1930, 1940, 1950, 1960, 1970, 1980, 1990, 2000, 2010, 2020],
                            'default': "Unknown",
                            'output': {
                                'count': {'$sum': 1}
                            }
                        }
                    }
                ]
            }
        },
        {'$out': 'DC-reports'}
    ]

After running, we can check the DB for the persisted data:

IMPORTANT: All data on the collection will be deleted and replaced by the aggregation’s results upon re-running!

Conclusion

And that concludes our quick tour on MongoDB’s aggregations. Of course, this is just a taste of what we are capable of with the framework. I suggest reading the documentation for learning more features, such as $lookup, that allows us to left-join collections.

With a simple and intuitive interface, is a very robust and powerful solution, that must be explored. Thank you for following me on one more article, until next time.

Coroutines: making scalable code in Kotlin

Standard

Hello, dear readers! Welcome to my blog. In this post, we will talk about Coroutines and how it helps our Kotlin applications to run with more scalability and robustness.

The Kotlin language

Created by Intellij, Kotlin is a robust language, that quickly stablished as the standard language for Android programming.

In this article we won’t learn about the language itself as the focus is learning about coroutines. If the reader wants to learn about the language itself, I recommend this good book.

Reactive programming

Another important concept to grasp is reactive programming. Reactive programming encompasses a paradigm where the parts of a program are uncoupled by using asynchronous technology, so slow operations such as IO are not blocking the execution. My article about ReactiveX talks in more detail about this concept.

Now that we have a better understand about what is Kotlin and reactive programming, let’s begin our tour in coroutines. Let’s first understand what are threads and how coroutines are not threads.

What are threads

Threads are programming units that run inside a process. A process, typically, is translated to the application itself, with memory and CPU allocated for her to consume.

Threads are like subprocesses, that a computing process allocates to do task loads. Threads have their own share of resources, like memory, for example (in Java, for example, each thread has his own object stack), so they are expensive and frequently re-used by thread pools.

An important concept to keep it in mind is that threads are not necessarily parallel: when a process allocates threads, they are been executed by CPU cores.

If we have just one core, for example, and create two threads, they won’t really run in parallel. What will happen is that the core will switch between threads doing a little of execution in each one, but a lot of times extremely fast, making it look like is really running at the same time. This becomes apparent when one of the threads has to run an IO-blocking operation: this makes the whole processing to freeze, including the work on the other thread.

We only achieve parallelism when running inside an application with multiple cores: in this case, each thread can be run inside one core, and indeed, in this case, the CPU cores are really running in parallel.

Normally we don’t need to make any low-level code to make threads run on different cores, but it is an important thing to know, because if for example, we create a lot more threads then cores are available, we could end up with an application that is slower then a single-threaded, sequential one.

That’s because all execution control to switch executions and overburden to create a massive amount of threads will end up demanding more from the hardware than simply running everything in one thread. So, it is important to remember that, like everything in life, threads are not a silver bullet for everything.

So now we know what threads are. What about coroutines?

What are coroutines

Coroutines are a light-weighted programming unit, available by Kotlin. What that means is that coroutines are not threads, but actually functions (like sets of instructions) that run inside thread pools.

One thread runs multiple coroutines, and a coroutine can start processing in one thread, be suspended (more later), and resumed in another thread.

This is achieved by Kotlin’s inner coroutine support, where dispatchers are responsible for controlling how the coroutine will be executed if it will running everything in just one thread, a thread pool, or unconfined.

So, as we can see, coroutines are kind like threads, but more light and without the burden of resource consumption threads have. This also means that we can allocate a lot more coroutines then we would typically do with threads since the dispatcher will control the number of threads used.

Coroutine suspension

We talked briefly about coroutine suspension. What did we mean by that?

When running our code, sometimes our task will need to wait for operations, such as I/O, to be performed before continuing. When a coroutine reaches a point where it has to wait for something to runs, it enters a suspension state. In this state, the code will wait for the operation to complete before continuing to execute the coroutine.

One very important detail to keep in mind is that this state is not blocking: once the coroutine stops to wait for the operation, her thread is freed to continue running another thread meanwhile the operation is done.

Well, but let’s stop with all theory and get to the point we are all waiting for: begin the coding!

Our first coroutine

Let’s begin with a simple example. All code from the lab can be found here or at the end of the article. I strongly recommend using Intellij IDEA as IDE as it will make it really simple to run the examples (just click in the application file and run it), importing as a Gradle project. The project runs with Kotlin 1.3 and JDK 8.

In order to use coroutines, we need to add a library to build.gradle, to add his support:

dependencies {
    implementation "org.jetbrains.kotlin:kotlin-stdlib-jdk8"

    compile "io.github.microutils:kotlin-logging:1.8.3"
    compile "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.5"
    compile "org.jetbrains.kotlinx:kotlinx-coroutines-rx2:1.3.5"
    compile "org.slf4j:slf4j-simple:1.7.29"
}

Let’s suppose we have a task that it takes some time to complete – we will simulate this by using a delay – and this task is necessary to run a second task. Let’s implement a simple code, with a coroutine:

package com.alexandreesl

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers.IO
import kotlinx.coroutines.async
import kotlinx.coroutines.delay
import mu.KotlinLogging

object Application {

    private val logger = KotlinLogging.logger {}

    @JvmStatic
    fun main(args: Array<String>) {


        val deferred = CoroutineScope(IO).async {
            logger.info { "running on ${Thread.currentThread().name}" }
            delay(2000)
            123
        }

        logger.info { "running on ${Thread.currentThread().name}" }

        val resp = deferred.getCompleted()

        logger.info { "the result is $resp" }


    }

}

We have a first glance on some concepts, such as coroutine scopes and dispatchers – the IO passed to the scope. We will talk more about scopes later, but lets, for now, understand what are dispatchers.

Dispatchers, as talked before, are responsible for starting and controlling coroutine executions. The most commonly used ones are Main (used in Android development, it makes the code run inside the UI thread, a requirement from the Android platform to certain operations), and IO.

The IO dispatcher uses a thread-poll with the same size as the machine’s cores, making them ideal for parallel tasks. There are other ones such as previously mentioned Unconfined, which keeps switching from thread to thread as new scopes with other dispatchers are created, but commonly Main and IO are the ones we will use most of the time.

The code inside the braces is the coroutine itself. The receiver method, in this case async, is called a coroutine builder, as it instantiates the coroutine and supplies it to the dispatcher. There are other builders, such as launch, designed for fire-and-forget operations.

Async is used when we want to run something that returns a result, as it supplies a Deferred. Think of a deferred to be like a promise or future: It allows us an interface for receiving a result from asynchronous processing.

When we run the code, this happens:


Exception in thread "main" java.lang.IllegalStateException: This job has not completed yet
	at kotlinx.coroutines.JobSupport.getCompletedInternal$kotlinx_coroutines_core(JobSupport.kt:1198)
	at kotlinx.coroutines.DeferredCoroutine.getCompleted(Builders.common.kt:98)
	at com.alexandreesl.Application.main(Application.kt:26)

Process finished with exit code 1

What happened?! The problem is, we ran our coroutine in another thread, but our main code is running inside the main thread, which keeps running and tried to get the result from the deferred before is completed – getCompleted() is also an experimental method marked for removal, so it is not recommended. Let’s refactor our code to a better way of running our coroutine:

package com.alexandreesl

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers.IO
import kotlinx.coroutines.async
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import mu.KotlinLogging

object Application {

    private val logger = KotlinLogging.logger {}

    @JvmStatic
    fun main(args: Array<String>) {


        val deferred = CoroutineScope(IO).async {
            logger.info { "running on ${Thread.currentThread().name}" }
            delay(2000)
            123
        }

        runBlocking {

            logger.info { "running on ${Thread.currentThread().name}" }

            val resp = deferred.await()

            logger.info { "the result is $resp" }

        }



    }

}

Notice we added a runBlocking code block. This special coroutine builder has the purpose of creating a coroutine which will block the current thread (making it possible to run simple sample codes and test cases such as these). It is important to keep in mind that in real production code we need to avoid using this as much as possible, since we are making blocking code this way.

We also changed to use await to get the result of the coroutine. This method is at the heart of coroutines usage: is by calling this that we make our coroutine to wait for the completion of the task before continuing. As talked before, this operation is not blocking.

When we run the code, we receive something like this in the console:

[DefaultDispatcher-worker-1] INFO com.alexandreesl.Application - running on DefaultDispatcher-worker-1
[main] INFO com.alexandreesl.Application - running on main
[main] INFO com.alexandreesl.Application - the result is 123

Process finished with exit code 0

As we can see, we have logs in different threads, showing our coroutine indeed ran on another thread.

Now, there is just one thing missing from the explanation: what is that CoroutineScope thing? Let’s talk about this now.

Coroutine scopes

When passing our coroutine builders, each one of them creates a coroutine. Let’s take this code snippet as an example:

runBlocking {

            val deferred = CoroutineScope(IO).async {
                logger.info { "I am the first Coroutine, I am running on ${Thread.currentThread().name}" }
                delay(2000)

                val job = CoroutineScope(IO).launch {
                    logger.info { "I am the second Coroutine, I am running on ${Thread.currentThread().name}" }
                    delay(2000)
                }

                job.join()
                123

            }

            logger.info { "running on ${Thread.currentThread().name}" }

            val resp = deferred.await()

            logger.info { "the result is $resp" }

        }

When running, we will get the following from the console:

[main] INFO com.alexandreesl.Application - running on main
[DefaultDispatcher-worker-1] INFO com.alexandreesl.Application - I am the first Coroutine, I am running on DefaultDispatcher-worker-1
[DefaultDispatcher-worker-3] INFO com.alexandreesl.Application - I am the second Coroutine, I am running on DefaultDispatcher-worker-3
[main] INFO com.alexandreesl.Application - the result is 123

As we can see it, each coroutine runs on his own thread, showing that indeed each builder created a coroutine.

Let’s suppose that we have some IO code that, due to some reason, we have scenarios where we will have to cancel the execution – for example, if a database session hangs and the code is forever hang in an non-terminal state – , can we do this with coroutines? Let’s find it out.

We change our code like this and run again:

package com.alexandreesl

import kotlinx.coroutines.*
import kotlinx.coroutines.Dispatchers.IO
import mu.KotlinLogging

object Application {

    private val logger = KotlinLogging.logger {}

    @JvmStatic
    fun main(args: Array<String>) {

        runBlocking {

            val deferred = CoroutineScope(IO).async {
                logger.info { "I am the first Coroutine, I am running on ${Thread.currentThread().name}" }
                delay(2000)

                val job = CoroutineScope(IO).launch {
                    logger.info { "I am the second Coroutine, I am running on ${Thread.currentThread().name}" }
                    delay(2000)
                }

                job.join()
                123

            }

            logger.info { "running on ${Thread.currentThread().name}" }

            deferred.cancelAndJoin()
            

        }

    }

}

After running, we see something like this on the terminal:

[main] INFO com.alexandreesl.Application - running on main
[DefaultDispatcher-worker-1] INFO com.alexandreesl.Application - I am the first Coroutine, I am running on DefaultDispatcher-worker-1

Only the first coroutine got the chance to start, and immediately after we got to the cancellation. But what would happen if we didn’t declared the two in the same scope?

We change the code and run again:

       runBlocking {

            val deferred = CoroutineScope(IO).async {
                logger.info { "I am the first Coroutine, I am running on ${Thread.currentThread().name}" }
                delay(2000)
                123
            }

            val job = CoroutineScope(IO).launch {
                logger.info { "I am the second Coroutine, I am running on ${Thread.currentThread().name}" }
                delay(2000)
            }

            logger.info { "running on ${Thread.currentThread().name}" }

            deferred.cancelAndJoin()
            
        }

This leads to:

[DefaultDispatcher-worker-1] INFO com.alexandreesl.Application - I am the first Coroutine, I am running on DefaultDispatcher-worker-1
[DefaultDispatcher-worker-3] INFO com.alexandreesl.Application - I am the second Coroutine, I am running on DefaultDispatcher-worker-3
[main] INFO com.alexandreesl.Application - running on main

Wait, the second one got to run? That’s because she ran outside the scope. In order to make the other coroutine to cancel as well, we would need to manually cancel her too.

That’s the main goal behind coroutine scopes: it allows us to group coroutines together, so we can make operations such as cancellations in a grouped manner, instead of making all the work one by one.

Suspending functions

Now let’s organize our code, moving our first example to another class. We create the following class:

package com.alexandreesl.examples

import kotlinx.coroutines.*
import mu.KotlinLogging

class BasicExample {

    private val logger = KotlinLogging.logger {}

    suspend fun runExample() {
        

            val deferred = CoroutineScope(Dispatchers.IO).async {
                logger.info { "I am the first Coroutine, I am running on ${Thread.currentThread().name}" }
                delay(2000)

                val job = CoroutineScope(Dispatchers.IO).launch {
                    logger.info { "I am the second Coroutine, I am running on ${Thread.currentThread().name}" }
                    delay(2000)
                }

                job.join()
                123

            }

            logger.info { "running on ${Thread.currentThread().name}" }

            val resp = deferred.await()

            logger.info { "the result is $resp" }

        
    }

}

We created a class that has a suspending function. Suspending functions are functions that, in her nature, their whole body is a coroutine. That’s why we can declare operations such as await without the need of a runBlocking code block, for example.

Now we change main to this:

package com.alexandreesl

import com.alexandreesl.examples.BasicExample
import kotlinx.coroutines.*
import kotlinx.coroutines.Dispatchers.IO
import mu.KotlinLogging

object Application {

    private val logger = KotlinLogging.logger {}

    @JvmStatic
    fun main(args: Array<String>) {

        runBlocking {

          val basicExample =  BasicExample()

            basicExample.runExample()

        }

    }

}

And after running again, we see everything is running as before. In a nutshell, suspending functions are a neat way of running most of our code inside coroutines.

Coroutine contexts

Another concept to grasp is coroutine contexts. Basically, a context is the composition of two things: the scope and the dispatcher.

Kotlin’s coroutines have another neat feature that is a builder called withContext. Using this feature, we can switch between contexts at will. Let’s see a example:

package com.alexandreesl.examples

import kotlinx.coroutines.delay
import kotlinx.coroutines.newSingleThreadContext
import kotlinx.coroutines.withContext
import mu.KotlinLogging

class ContextExample {

    private val logger = KotlinLogging.logger {}

    suspend fun runExample() {

        val context1 = newSingleThreadContext("MyOwnThread1")
        val context2 = newSingleThreadContext("MyOwnThread2")

        withContext(context1) {

            logger.info { "I am a coroutine, I am running on ${Thread.currentThread().name}" }

            withContext(context2) {

                logger.info { "I am also a coroutine, I am running on ${Thread.currentThread().name}" }
                delay(3000)

            }

            logger.info { "I am the previous coroutine, I am running on ${Thread.currentThread().name}" }

            delay(3000)

        }

    }

}

Here we created two single-thread contexts and switched between then inside two nested coroutines. If we run and look at the console, we will see something like this:

[MyOwnThread1] INFO com.alexandreesl.examples.ContextExample - I am a coroutine, I am running on MyOwnThread1
[MyOwnThread2] INFO com.alexandreesl.examples.ContextExample - I am also a coroutine, I am running on MyOwnThread2
[MyOwnThread1] INFO com.alexandreesl.examples.ContextExample - I am the previous coroutine, I am running on MyOwnThread1

Showing that we successfully ran our coroutines with different contexts.

Processing streams with coroutines

In all our previous examples, we just returned a single value. What if we need to emit multiple values, that will be processed as they are generated?

For this purpose, Kotlin coroutines have flows. With flows, we can make data streams to emit data at the rate it is produced. Let’s see a example.

Let’s suppose we have a sequence of 10 numbers, that have IO operations between then and only after the operations the numbers can be emitted. The following code simulates this:

package com.alexandreesl.examples

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import mu.KotlinLogging
import java.time.LocalDateTime

class FlowExample {

    private val logger = KotlinLogging.logger {}

    suspend fun runExample() {

        flow {
            for (number in 1..10) {
                logger.info { "I am processing the number $number at ${LocalDateTime.now()}" }
                delay(2000)
                emit(number)
            }
        }.collect { item ->
            logger.info { "Receiving number $item at ${LocalDateTime.now()}" }
        }

    }

}

When running, we will get logs like this:

[main] INFO com.alexandreesl.examples.FlowExample - I am processing the number 1 at 2020-09-24T19:55:35.579
[main] INFO com.alexandreesl.examples.FlowExample - Receiving number 1 at 2020-09-24T19:55:37.592
[main] INFO com.alexandreesl.examples.FlowExample - I am processing the number 2 at 2020-09-24T19:55:37.592
[main] INFO com.alexandreesl.examples.FlowExample - Receiving number 2 at 2020-09-24T19:55:39.597
[main] INFO com.alexandreesl.examples.FlowExample - I am processing the number 3 at 2020-09-24T19:55:39.597
[main] INFO com.alexandreesl.examples.FlowExample - Receiving number 3 at 2020-09-24T19:55:41.600
[main] INFO com.alexandreesl.examples.FlowExample - I am processing the number 4 at 2020-09-24T19:55:41.600
[main] INFO com.alexandreesl.examples.FlowExample - Receiving number 4 at 2020-09-24T19:55:43.605
[main] INFO com.alexandreesl.examples.FlowExample - I am processing the number 5 at 2020-09-24T19:55:43.606
[main] INFO com.alexandreesl.examples.FlowExample - Receiving number 5 at 2020-09-24T19:55:45.608
[main] INFO com.alexandreesl.examples.FlowExample - I am processing the number 6 at 2020-09-24T19:55:45.608
[main] INFO com.alexandreesl.examples.FlowExample - Receiving number 6 at 2020-09-24T19:55:47.611
[main] INFO com.alexandreesl.examples.FlowExample - I am processing the number 7 at 2020-09-24T19:55:47.611
[main] INFO com.alexandreesl.examples.FlowExample - Receiving number 7 at 2020-09-24T19:55:49.616
[main] INFO com.alexandreesl.examples.FlowExample - I am processing the number 8 at 2020-09-24T19:55:49.616
[main] INFO com.alexandreesl.examples.FlowExample - Receiving number 8 at 2020-09-24T19:55:51.621
[main] INFO com.alexandreesl.examples.FlowExample - I am processing the number 9 at 2020-09-24T19:55:51.622
[main] INFO com.alexandreesl.examples.FlowExample - Receiving number 9 at 2020-09-24T19:55:53.627
[main] INFO com.alexandreesl.examples.FlowExample - I am processing the number 10 at 2020-09-24T19:55:53.627
[main] INFO com.alexandreesl.examples.FlowExample - Receiving number 10 at 2020-09-24T19:55:55.628

Showing how the data is been processed as it is generated. Flows are like other collections we have in Kotlin: We can use operations such as map, reduce, filter etc. Let’s improve our example to demonstrate this.

We change our code to filter only even numbers, log each filtered number and sum it all numbers from the flow:

package com.alexandreesl.examples

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.reduce
import mu.KotlinLogging
import java.time.LocalDateTime

class FlowExample {

    private val logger = KotlinLogging.logger {}

    suspend fun runExample() {

        val result = flow {
            for (number in 1..10) {
                logger.info { "I am processing the number $number at ${LocalDateTime.now()}" }
                delay(2000)
                emit(number)
            }
        }.filter { it % 2 == 0 }
            .map {
                logger.info { "processing number $it" }
                it
            }
            .reduce { accumulator, value ->
                accumulator + value
            }
        logger.info { "result: $result" }

    }

}

Notice that we didn’t need to add a await for the operation: since reduce is a terminal operation, Kotlin implicitly suspends the execution for us to receive the result.

We can also transform simple collections to execute as flows, using the asFlow() function, like this:

(1..5).asFlow().collect {
            logger.info { "item: $it" }
        }

This will produce:

[main] INFO com.alexandreesl.examples.FlowExample - item: 1
[main] INFO com.alexandreesl.examples.FlowExample - item: 2
[main] INFO com.alexandreesl.examples.FlowExample - item: 3
[main] INFO com.alexandreesl.examples.FlowExample - item: 4
[main] INFO com.alexandreesl.examples.FlowExample - item: 5

One last thing for us to learn before wrapping it up on Flows is the flowOn() function. With this function, we can define the coroutine context in which the flow will run. For example, if we want our flow to run in a specific thread, we can do this:

       val context1 = newSingleThreadContext("MyOwnThread1")

        val result = flow {
            for (number in 1..10) {
                logger.info { "I am processing the number $number at ${LocalDateTime.now()}" }
                delay(2000)
                emit(number)
            }
        }
            .flowOn(context1)
            .filter { it % 2 == 0 }
            .map {
                logger.info { "processing number $it" }
                it
            }
            .reduce { accumulator, value ->
                accumulator + value
            }
        logger.info { "result: $result" }

This produces the following output:

[MyOwnThread1] INFO com.alexandreesl.examples.FlowExample - I am processing the number 1 at 2020-09-24T20:22:36.840
[MyOwnThread1] INFO com.alexandreesl.examples.FlowExample - I am processing the number 2 at 2020-09-24T20:22:38.854
[MyOwnThread1] INFO com.alexandreesl.examples.FlowExample - I am processing the number 3 at 2020-09-24T20:22:40.855
[main] INFO com.alexandreesl.examples.FlowExample - processing number 2
[MyOwnThread1] INFO com.alexandreesl.examples.FlowExample - I am processing the number 4 at 2020-09-24T20:22:42.860
[main] INFO com.alexandreesl.examples.FlowExample - processing number 4
[MyOwnThread1] INFO com.alexandreesl.examples.FlowExample - I am processing the number 5 at 2020-09-24T20:22:44.866
[MyOwnThread1] INFO com.alexandreesl.examples.FlowExample - I am processing the number 6 at 2020-09-24T20:22:46.871
[main] INFO com.alexandreesl.examples.FlowExample - processing number 6
[MyOwnThread1] INFO com.alexandreesl.examples.FlowExample - I am processing the number 7 at 2020-09-24T20:22:48.877
[MyOwnThread1] INFO com.alexandreesl.examples.FlowExample - I am processing the number 8 at 2020-09-24T20:22:50.882
[main] INFO com.alexandreesl.examples.FlowExample - processing number 8
[MyOwnThread1] INFO com.alexandreesl.examples.FlowExample - I am processing the number 9 at 2020-09-24T20:22:52.887
[MyOwnThread1] INFO com.alexandreesl.examples.FlowExample - I am processing the number 10 at 2020-09-24T20:22:54.893
[main] INFO com.alexandreesl.examples.FlowExample - processing number 10
[main] INFO com.alexandreesl.examples.FlowExample - result: 30

Proving our configuration was a success – notice that some logs were on main thread, since only the flow code is running in other context.

Communication between coroutines

Sometimes, there are scenarios where we need to make two coroutines to work with each other, like when we have to send data to a API only after some heavy IO processing is done with that same data. Each one of this operations must be done in different coroutines, to allow parallel execution.

One way of doing this is by chaining the coroutines together, and another one is using channels.

Channels, as the name implies, allows coroutines to share data between them. Let’s see an example.

Suppose we have two coroutines, where one of then will delay by some seconds, to represent the IO processing, and after that will send the data to a channel to be processed by another coroutine:

package com.alexandreesl.examples

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ClosedReceiveChannelException
import mu.KotlinLogging

class ChannelExample {

    private val logger = KotlinLogging.logger {}

    suspend fun runExample() {

        val channel = Channel<Int>()

        CoroutineScope(Dispatchers.IO).launch {
            for (item in 1..10) {
                // delay to represent IO
                delay(2000)
                logger.info { "Sending: $item" }
                channel.send(item)
            }
            // closing the channel, we are done
            channel.close()
        }

        val deferred = CoroutineScope(Dispatchers.IO).async {
            try {
                while (true) {
                    val received = channel.receive()
                    logger.info { "Received: $received" }
                }
            } catch (e: ClosedReceiveChannelException) {
                // this happens when the channel is closed
            }
            //signalling the process has ended
            true
        }

        deferred.await()

    }


}

This will produce the following:

[DefaultDispatcher-worker-3] INFO com.alexandreesl.examples.ChannelExample - Sending: 1
[DefaultDispatcher-worker-3] INFO com.alexandreesl.examples.ChannelExample - Received: 1
[DefaultDispatcher-worker-3] INFO com.alexandreesl.examples.ChannelExample - Sending: 2
[DefaultDispatcher-worker-3] INFO com.alexandreesl.examples.ChannelExample - Received: 2
[DefaultDispatcher-worker-3] INFO com.alexandreesl.examples.ChannelExample - Sending: 3
[DefaultDispatcher-worker-3] INFO com.alexandreesl.examples.ChannelExample - Received: 3
[DefaultDispatcher-worker-3] INFO com.alexandreesl.examples.ChannelExample - Sending: 4
[DefaultDispatcher-worker-3] INFO com.alexandreesl.examples.ChannelExample - Received: 4
[DefaultDispatcher-worker-3] INFO com.alexandreesl.examples.ChannelExample - Sending: 5
[DefaultDispatcher-worker-3] INFO com.alexandreesl.examples.ChannelExample - Received: 5
[DefaultDispatcher-worker-3] INFO com.alexandreesl.examples.ChannelExample - Sending: 6
[DefaultDispatcher-worker-3] INFO com.alexandreesl.examples.ChannelExample - Received: 6
[DefaultDispatcher-worker-3] INFO com.alexandreesl.examples.ChannelExample - Sending: 7
[DefaultDispatcher-worker-3] INFO com.alexandreesl.examples.ChannelExample - Received: 7
[DefaultDispatcher-worker-3] INFO com.alexandreesl.examples.ChannelExample - Sending: 8
[DefaultDispatcher-worker-2] INFO com.alexandreesl.examples.ChannelExample - Received: 8
[DefaultDispatcher-worker-2] INFO com.alexandreesl.examples.ChannelExample - Sending: 9
[DefaultDispatcher-worker-2] INFO com.alexandreesl.examples.ChannelExample - Received: 9
[DefaultDispatcher-worker-2] INFO com.alexandreesl.examples.ChannelExample - Sending: 10
[DefaultDispatcher-worker-2] INFO com.alexandreesl.examples.ChannelExample - Received: 10

Notice that sometimes both receiving and sending are done in the same thread. This is a good example to show how Kotlin keeps re-using the same threads to run different coroutines, in order to optimize the execution

As we can see, the channel ran successfully, sending data from one coroutine to another.

Conclusion

And so we concluded our tour about coroutines. With a simple interface, but powerful features, coroutines are a excellent tool for developing robust solutions in Kotlin. All code from our lab can be found here.

Thank you for following me on my blog, until next time.

What should I write next?

Standard

Hello, dear readers! Welcome to my blog. On this post, I would like to suggest something different. I would like to leave to you, the reader, the option to decide our next theme! Please, vote on the options bellow, after one month, we will see the results and I will start preparing the new post.

So, without further delay, let’s begin our voting. See ya later!

ReactiveX: implementing reactive solutions in the JVM

Standard

Hello, dear readers! Welcome to my blog. In this post, we will talk about ReactiveX, a Java framework that allows us to easily develop and deploy reactive solutions. But what is the reactive paradigm and why should we want to use it? Let’s find out!

What is the reactive paradigm?

According to wikipedia, reactive programming is

In computingreactive programming is a declarative programming paradigm concerned with data streams and the propagation of change. With this paradigm it is possible to express static (e.g., arrays) or dynamic (e.g., event emitters) data streams with ease, and also communicate that an inferred dependency within the associated execution model exists, which facilitates the automatic propagation of the changed data flow.

What this means is that the reactive paradigm is about handling data as streams, which flows, typically in an asynchronous manner, processing data as events, on the frequency they are produced (or consumed, depending on if we are using back-pressure or not). Is a different way of development thinking, outside of the traditional request-response. But why use reactive programming?

Why use reactive?

The main advantages on using reactive are non-blocking IO and parallelism. By providing a plug-and-play set of operators, designed to be easily transmuted to asynchronous running, we can develop solutions that scale well against massive chunks of data to process.

The non-blocking IO refers to the inners of reactive programming frameworks, that provide buffers and other types of techniques to make the code to operate independently. Let’s use an example to better explain.

Let’s imagine a classic API. Requests are made by clients to a server, which have a HTTP thread pool to process the requests. In traditional computing, all processing, from the controller up to the DB IO operations, are made on the HTTP thread.

Now imagine if we have a ton of requests arriving at near-light speed. Soon, the HTTP thread pool will be exhausted and the server will start rejecting requests.

Now, let’s see the same use case developed using the reactive paradigm. With reactive, we could break the solution in more components, for example by putting another thread pool to process the DB IO operations, with a buffer in the middle to make a decoupling of the slow IO code and the more need-to-be-fast HTTP code. The following picture illustrates the solution:

Keep it in mind that we are talking about the solution at a very low-level detail: typically, we won’t need to make this arrangements directly, but using all facilities from an reactive framework, such as ReactiveX.

With this approach, an HTTP thread only needs to receive the request and deliver to the buffer, immediately been freed to accept the next request. Normally there’s two possible solutions to produce the response for the client:

  1. By using fire-and-forget, that is, the HTTP thread just returns a ok message for the client and leave it to the DB thread;
  2. The HTTP thread sends to the buffer, alongside the data itself, an observer, which wraps the client’s connection. This way, the DB thread is responsible for emitting a response for the client, after making her processing.

Another powerful reactive technique is back-pressure. In back-pressure, we make the client-side of the stream responsible for dictating the frequency of the processing: the client requests for the server – or an middle-man buffer, depending on the framework’s implementation – for the next data to be processed.

This way we can, for example, protect the DB from been crushed under a massive load of writes coming his way. I wrote a good article about Akka streams, another good reactive framework made in Scala, which already explains in more detail about back-pressure. You can find the article here.

ReactiveX

So, now that we learned about reactive programming, let’s start learning about ReactiveX. Made in Java, with libraries for better Kotlin support, is one of the most popular reactive frameworks on the Java ecosystem.

For this lab, we will use Kotlin. Created by JetBrains, Kotlin is a good programming language, becoming very popular on Java ecosystems across the globe. With a much better support to functional programming than Java’s, is an excellent choice of language to learn today. All code will run on Java 11 and Kotlin 1.3.72.

To keep focused, we will stick to standalone examples, since making an API, DB migrations etc will drive our focus away from learning about the framework. If you want to read a more complex example, I suggest reading my Akka article after reading this.

All code from the lab can be found here. So, without further delay, let’s get started!

Our first stream

We will use Gradle as our dependency manager. First, let’s add ReactiveX dependencies on build.gradle file:

...
dependencies {
    runtime group: 'io.reactivex.rxjava2', name: 'rxkotlin', version: '2.4.0'
    compile group: 'io.reactivex.rxjava2', name: 'rxjava', version: '2.2.19'
}
...

We added RxJava2 core library, as well as Kotlin Rx library to add more Kotlin support.

Now, let’s create a Main function. Just like in Java, this function is a entry point for running the application:

fun main(args: Array<String>) {
}

Now, let’s make a very simple stream. We will just take a group of words, map the length of the words and print on the console:

import io.reactivex.Observable

fun main(args: Array<String>) {
    Observable.just("I",
        "Love",
        "Ana",
        "Carolina")
        .map(String::length)
        .subscribe(
            {println("length $it")},
            { println("error $it")},
            { println("stream ended!")}
        )
}

When runnig the following script, the following is print on the console:

length 1
length 4
length 3
length 8
stream ended!

We covered some ground with this simples example, let’s talk about the code:

  • First, we created an Observable. In ReactiveX, streams are divided in two types: Observables and Flowables. The main difference between the two is that Flowables allows us to use back-pressure. We will learn about Flowables further on the lab;
  • Second, we used just to initialise the Observable. This method is useful for simple examples where we know exactly the items we want to process;
  • Then, we used map. Map is a traditional stream operation, where we read each item and produce another item as output. In our case, we read each item and produce the word length;
  • Finally, we subscribe in the stream. The subscribe method is where we supply the code that will make the bottom of the stream, the last code to be processed. In this case, we print all word lengths, or a message error in case anything goes wrong (probably nothing will happen in this case, but you got the idea 🙂 ). We also provide a message for when the stream ends.

One last interesting concept to grasp before we move on is the stream event cycle. After the stream is started, we got the following possible sequence of actions:

  • The events are sent, one by one, downstream to the bottom. The code we supplied on first lambda of subscribe is the one that will be fired on this case;
  • In case anything goes wrong with the stream, an error event is sent to the bottom. This is what runs on our second lambda. When this happens, the stream stops;
  • Finally, when all events are processed, the completed event is fired, and processed by our third lambda. Keep it in mind that there’s cases where the stream can be infinite, such as streaming from an topic for example, so this event can be never fired up.

So that’s concludes our first stream. Yes, it is simple as that! Now, let’s learn some more things we can do with our streams.

Stream operations

Now, let’s make a tour on some of the basic operations we can do with streams. Of course, this won’t be a exhaustive list of every operation, since this would make our lab a dull boring showcase of tons of methods, but just a quick glance to allow us to first dive in the framework.

First, let’s move our first example to another script and change the Main script to run as an import:

package com.alexandreesl.examples

import io.reactivex.Observable

fun firstExample() {
    Observable.just("I",
        "Love",
        "Ana",
        "Carolina")
        .map(String::length)
        .subscribe(
            {println("length $it")},
            { println("error $it")},
            { println("stream ended!")}
        )
}
import com.alexandreesl.examples.firstExample

fun main(args: Array<String>) {
    firstExample()
}

From now on, we will keep adding examples to functions and run everything from Main.

Now, let’s make another quick example. We will take a single word, append a prefix and print to the console:

package com.alexandreesl.examples

import io.reactivex.Single

fun prefixExample() {
    Single.just("Alexandre")
        .map { "I am $it" }
        .subscribe(
            {println("$it")},
            { println("error $it")}
        )


}

After running, we get this output:

I am Alexandre

This example introduces us to Single. Single is a specialised observable, which primary concern is to provide a simplified version of the original Observable, focused on scenarios where we will emit just a single item.

For this reason, on subscribe we only have success and error event handlers that can be defined, since a onComplete event won’t make much sense since there’s just one item to process.

Now let’s make another example. We will get a random generated array of numbers, filter just the even ones and print:

package com.alexandreesl.examples

import io.reactivex.Observable
import java.util.*

fun evenNumbersExample() {

    Observable.fromIterable((1..50).map { Random().nextInt() })
        .filter { it % 2 == 0 }
        .subscribe(
            {println("$it")},
            { println("error $it")}
        )

}

This will produce something like the following (of course, every run will produce a different result):

-925639320
-1367241566
-1993909882
786568802
1215269342
-480862456
-1166646146
1171590568
682849868
-1321837860
-1021426524
-196824578
858851372
-213227382
966926184
-1095918914
-950267778
-2128264986
-1770877918
227807366
1692840490
1622475038
-308642896
-1907658182

On this example, we see another basic operator, filter. As the name implies, this operator will filter items emitted from the stream, based on a predicate.

Moving on, let’s make another example to wrap it up our quick tour. Let’s take a list of numbers (with duplicates) and use distinct and reduce, in order to sum all unique numbers on the array:

package com.alexandreesl.examples

import io.reactivex.Observable

fun distinctSumExample() {

    val numbers = listOf(1,2,3,4,5,5,6,12,14,18,22,25,25)

    Observable.fromIterable(numbers)
        .distinct()
        .reduce(0) { total,next -> total + next }
        .subscribe(
            {println("$it")},
            {println("error $it")}
        )
    
}

As expected, we will receive the number 112 as the result.

And this concludes our quick tour in ReactiveX operations. As we can see, is very powerful and easy to use. I suggest exploring the documentation, to discover much more from where this come from!

Handling events

As we saw before, there is three types of events on ReactiveX event cycle:

  • Next;
  • Error;
  • Complete.

Let’s say we want to add code snippets to be run on different stages of the stream, for example, to print the item before filtering. We can do this using doOnNext:

package com.alexandreesl.examples

import io.reactivex.Observable
import java.util.*

fun evenNumbersWithPrintExample() {

    Observable.fromIterable((1..50).map { Random().nextInt() })
        .doOnNext { println("item: $it") }
        .filter { it % 2 == 0 }
        .subscribe(
            {println("$it")},
            { println("error $it")}
        )

}

Running the code, we receive the following output (truncated for brevity):

...
item: -1075891450
-1075891450
item: 994876337
item: 297766092
297766092
item: -858967930
-858967930
item: 1633354503
item: -1792369156
-1792369156
item: 1255904419
item: 1325177149
item: 1773095686
1773095686
...

As we have doOnNext, we also have doOnError and doOnComplete for the same kind of effect.

There is two additional methods that are interesting to learn before we move on to the next subject. Let’s revisit our reduce example. Let’s suppose that something can go wrong and we want to return 0 as default in case of an error. We can do this as follows (we also added code to “force” a error):

package com.alexandreesl.examples

import io.reactivex.Observable
import java.lang.RuntimeException

fun distinctSumWithDefaultExample() {

    val numbers = listOf(1,2,3,4,5,5,6,12,14,18,22,25,25)

    var counter = 0

    Observable.fromIterable(numbers)
        .distinct()
        .reduce(0) { total,next ->
            counter++
            if (counter == 5)  throw RuntimeException("BOOOM!!!!")
            total + next }
        .onErrorReturn { 0 }
        .subscribe(
            {println("$it")},
            {println("error $it")}
        )

}

If we run this, we will receive 0, just as expected.

Another interesting method is onErrorResumeNext. This method allow us to return another Observable to be executed, in case the main Observable fails. Let’s see a simple example, where we divide 10 by a range of numbers and one of them is 0:

package com.alexandreesl.examples

import io.reactivex.Observable

fun divisionWithResumeNextExample() {

    Observable.just(1,2,3,4,0,8,2)
        .map {item -> 10 / item}
        .onErrorResumeNext(Observable.just(10).repeat(3))
        .subscribe(
            {println("item  $it")},
            {println("error $it")}
        )
}

This will produce the following. Notice that the stream stops immediately to process the original stream items and consumes the fallback stream, skipping the last items:

item  10
item  5
item  3
item  2
item  10
item  10
item  10

And that concludes our tour on event handling. Next, let’s learn about ReactiveX Flowables.

Flowables

When working with ReactiveX, there’s two major ways of using the framework: Observables and Flowables. The major difference between Observable and Flowable is that Flowable uses back-pressure, as stated before. Let’s see a example to better understand what that means.

Let’s imagine a stream that emits events from a very big range of numbers, that takes a little time to be processed. To see the problem, we will use a Scheduler (more on Schedulers later) to disconnect the emitter thread from the receiver one:

    Observable.fromIterable((1..5000000))
        .map { println("emitting $it")
            it }
        .observeOn(Schedulers.computation())
        .subscribe({
            Thread.sleep(5L)
            println(" receiving $it")
        })
    Thread.sleep(60000L)

When running, we would see something like this (of course, is only a fragment):

...
emitting 1264
emitting 1265
emitting 1266
emitting 1267
emitting 1268
emitting 1269
emitting 1270
emitting 1271
emitting 1272
emitting 1273
emitting 1274
emitting 1275
emitting 1276
 receiving 3
emitting 1277
emitting 1278
emitting 1279
emitting 1280
emitting 1281
emitting 1282
emitting 1283
emitting 1284
emitting 1285
...

As we can see, the emitter was a lot faster than the receiver. In this case, the scheduler is using by default a unbounded buffer so no data would be lost, but imagine that the risks on a production environment of incurring on memory overflows is very high.

So, how do we fix this? By changing to a Flowable, of course:

 Flowable.fromIterable((1..5000000))
        .map { println("emitting $it")
            it }
        .observeOn(Schedulers.computation())
        .subscribe({
            Thread.sleep(5L)
            println(" receiving $it")
        })
    Thread.sleep(60000L)

Yep, is simple as that to use a Flowable. Keep it in mind, all operators available to Observable are also available to Flowable, so all examples from before are valid for Flowables as well. If we run, we would see fragments like this:

... 
emitting 124
emitting 125
emitting 126
emitting 127
emitting 128
 receiving 1
 receiving 2
 receiving 3
 receiving 4
 receiving 5
 receiving 6
 receiving 7
 receiving 8
 receiving 9
 receiving 10
 receiving 11
 receiving 12
 receiving 13
 receiving 14
 receiving 15
 receiving 16
 receiving 17
 receiving 18
 receiving 19
 receiving 20
 receiving 21
 receiving 22
 receiving 23
...

We won’t enter deeply on the internals of Flowable implementation, but keeping it simple, the key here is that Flowable buffers data to the subscriber (the stream’s bottom). So after it hits the 128 row, it delivered the first chunk to be processed.

The main detail here is to note that we stopped receiving emitting messages. This is because the emitter stopped: is waiting for the subscriber to process the chunk, and only after that, it will start sending data again.

But what is that Scheduler we saw? Let’s find out!

Schedulers

Before start learning about schedulers, I want to ask you a question: have you noticed that, when we added a scheduler previously, we added a Thread.sleep after the Flowable? Did you guess why we did that?

The answer to this will probably be a shock: By default, ReactiveX is not multithread!

It is true: by default, all streams (Observables or Flowables) all run inside the same thread it created them, on our examples, the main thread. It is true that some operators (like timeout for example, where we define a time limit for the upstream to emit an event and throws a error if no event is emitted) are not single-thread, but the reason for that is that actually, from under the hood, they are defining a scheduler to run the operator.

So, what that Schedulers.computation() do? Basically, ReactiveX is getting the stream we declared and delivering his execution for a thread pool to be executed. What computation means? Is the type of scheduler we want to use. ReactiveX comes with several scheduler flavours, the main ones been:

  • computation: Computation is designed for streams that do CPU-intensive processing. When an application using ReactiveX is started, the framework creates a thread pool with the size equivalent to the number of cores on the CPU of the machine where the application is running;
  • IO: IO is designed for streams that do simple IO operations, that won’t take too much time to run. IO’s thread pool is unbounded: as the demand grows, ReactiveX will instantiate more threads, if the current pool is exhausted. This is a point of attention: if you are unsure of the load the application will have, using this option can be dangerous, as it can add more threads then the application can handle and result in an breakdown in more severe cases;
  • newThread: This works just like the name says: no thread pool, just keep creating new threads and deliver the new executions for them. Of course this is an even more dangerous one, since can even more easily result on the problem described above. Usable only on situations that the processing is indeed extremely fast;
  • single: This is like the opposite of newThread: it just creates one thread and keeps using it for all processing. This can be useful on simple streams, where you just need a little boost on performance. Remember that create threads is a costly operation, so indeed, in simple streams, it can be a better solution then IO, for example.

In my personal view, I think computation is the best all-around option, as it shields the risks of “over-threading” the application and creates the most optimal number for a thread pool, the same number of cores (where there is more threads then cores, the CPU divides them between the cores, so the multitask improvement is shrunk)

So, when we added observeOn, does the whole stream ran on a computation thread? Well, to tell you the true, I simplified my first answer a little: ReactiveX uses two methods for defining schedulers: subscribeOn and observeOn. So actually, in that case, just the receiver part of the stream ran on a different thread.

So, what is the difference between what we did previously, and this:

    Flowable.fromIterable((1..5000000))
        .map { println("emitting $it")
            it }
        .subscribeOn(Schedulers.computation())
        .subscribe({
            Thread.sleep(5L)
            println(" receiving $it")
        })
    Thread.sleep(60000L)

Well, actually, when using this, it indeed does what I said before: The whole stream will now run inside a computation thread. When using subscribeOn, we tell the whole stream to use a given scheduler. If we use both subscribeOn and observeOn on an stream, ReactiveX will use the scheduler defined on subscribeOn to upstream until the first observeOn is reached, and observeOn will be used for downstream, until the stream’s end is reached or another observeOn

Here is a example where we can see this thread manipulation running in practice:

fun schedulersExample() {

    Flowable.fromIterable((1..10))
        .map { println("emitting $it on ${Thread.currentThread().getName()}")
            it }
        .observeOn(Schedulers.io())
        .map { println("first receiving $it on ${Thread.currentThread().getName()}")
            it }
        .observeOn(Schedulers.single())
        .map { println("secondly receiving $it on ${Thread.currentThread().getName()}")
            it }
        .subscribeOn(Schedulers.computation())
        .subscribe {
            println("finally receiving $it on ${Thread.currentThread().getName()}")
        }
    Thread.sleep(300L)

}

After running the code, we will see an output like this:

emitting 1 on RxComputationThreadPool-1
emitting 2 on RxComputationThreadPool-1
emitting 3 on RxComputationThreadPool-1
emitting 4 on RxComputationThreadPool-1
first receiving 1 on RxCachedThreadScheduler-1
emitting 5 on RxComputationThreadPool-1
first receiving 2 on RxCachedThreadScheduler-1
emitting 6 on RxComputationThreadPool-1
secondly receiving 1 on RxSingleScheduler-1
first receiving 3 on RxCachedThreadScheduler-1
finally receiving 1 on RxSingleScheduler-1
emitting 7 on RxComputationThreadPool-1
secondly receiving 2 on RxSingleScheduler-1
finally receiving 2 on RxSingleScheduler-1
first receiving 4 on RxCachedThreadScheduler-1
secondly receiving 3 on RxSingleScheduler-1
finally receiving 3 on RxSingleScheduler-1
emitting 8 on RxComputationThreadPool-1
emitting 9 on RxComputationThreadPool-1
secondly receiving 4 on RxSingleScheduler-1
first receiving 5 on RxCachedThreadScheduler-1
finally receiving 4 on RxSingleScheduler-1
emitting 10 on RxComputationThreadPool-1
secondly receiving 5 on RxSingleScheduler-1
finally receiving 5 on RxSingleScheduler-1
first receiving 6 on RxCachedThreadScheduler-1
first receiving 7 on RxCachedThreadScheduler-1
secondly receiving 6 on RxSingleScheduler-1
first receiving 8 on RxCachedThreadScheduler-1
finally receiving 6 on RxSingleScheduler-1
first receiving 9 on RxCachedThreadScheduler-1
secondly receiving 7 on RxSingleScheduler-1
first receiving 10 on RxCachedThreadScheduler-1
finally receiving 7 on RxSingleScheduler-1
secondly receiving 8 on RxSingleScheduler-1
finally receiving 8 on RxSingleScheduler-1
secondly receiving 9 on RxSingleScheduler-1
finally receiving 9 on RxSingleScheduler-1
secondly receiving 10 on RxSingleScheduler-1
finally receiving 10 on RxSingleScheduler-1

As we can see, it started using an computation thread, as we defined on subscribeOn and later switched as reached different observeOns (RxCachedThreadScheduler is how ReactiveX names the threads from the IO scheduler).

And so we conclude our quick tour on Schedulers. Pretty cool, huh?

Conclusion

And so we conclude our first glimpse on ReactiveX. Of course, one article is too small to possibly feature everything that there is to learn about (I recommend to search about more topics such as Disposables, Scheduler strategies and specialised test classes such as TestObserver and TestSubscriber), but I hope to have at least teached the basics for a good overall understanding of the framework.

ReactiveX is a excellent framework, that it should be used. Happy coding!

References

Good ReactiveX book that I recommend

ReactiveX documentation

ReactiveX marbles (good site with diagrams that helps understand how the operators work)

Lab code

Gatling: making performance tests with Scala

Standard

Hello, dear readers! Welcome to my blog. In this post, we will talk about Gatling, a Scala library designed for developing performance/stress tests. But why do we need to make such tests? Let’s find it out!

Measuring our code

It is no mystery to anyone that performance is key to any architecture, in today’s ever-growing necessity to crunch more and more massive chunks of data at high speed. So, in order to supply this demand, we need to keep an eye out for aspects concerning performance, such as network throughputs and I/O costs.

There is some debate about when to start thinking about performance, with people both defending that it should be the first thing to do, or that you should think only at later stages when the need arises.

Wherever is your opinion on this subject, the point is that, at some point, you will be prompted to think about your application performance. When this happens, a good tool that you could use for helping you in measuring the performance are performance tests, such as stress and load tests.

The difference between stress and load tests is essentially their objectives. Stress tests objective is to check how much “punishment” an application can take it before it will breakdown, or become very close to doing so. This can help to define scaling thresholds to add more instances before the breaking point is met, for example.

Load tests, on the other hand, has the objective to test how the application will behave with different loads of data, up to a massive amount that reflects a peak situation, even more than a normal peak. This can help to identify bottlenecks, such as a search endpoint which have a more complex query that won’t withstand an access peak and will become very slow at high traffic, for example.

Well, so now that we know why is good to do performance tests, why we need a tool to do that?

Why use a library for that?

Of course, we could just write a simple script that loads a chunk of threads and start firing at will at our application – let’s assume we are talking about a REST API at this point, which is the focus of our lab later on -, and just see if it breaks after a wave of HTTP requests. Easy enough, right?

The problem with this approach is that test suite features are not that simple as it seems. Not only we need to make more complex testing scenarios, such as ramping up users (to simulate increasing usage across a timeline), distributed calls on a time frame, parallelism, etc but also we need to think about reading the results itself, as it would be useful to calculate things such as percentiles, average call durations, etc.

When we use a tool such as Gatling to this end, we get all these features out-of-box, making our life much easier. Besides, since it is already made in a way that’s easy to reuse, we can make scripts that will be used across several different applications, and can even be used on CI pipelines.

What about monitoring platforms?

You could be thinking about some monitoring platforms, such as NewRelic, which already use technologies such as Java profiling to make real-time performance monitoring, pointing out issues at specific layers such as databases, for example. These platforms are excellent and should be used, no doubt about it.

However, if possible, for applications that really are performance-critical, it could pay off to make use of a tool such as Gatling, since as said before, it could be integrated to the CI pipeline, making it possible to test his performance on critical operations even before the new code is sent to Production.

So, now that we talked about the importance of performance and about what Gatling is for and why it is a good tool, let’s start with a simple lab to show his usefulness in practice.

Lab

For this lab, we will create a simple Spring Boot API, that uses Postgres as a database. It will be a simple user CRUD service, since our primary focus is not on showing how to develop an API, but on how to use Gatling to measure it.

So, for our lab we will have an API with the following endpoints:

The whole project is dockerized, creating an API alongside a populated database. Gatling will also run inside a container. For convenience, there is also a Makefile with the necessary commands to execute the stack. To run it, you need to have Docker and Java 11 installed and use the Makefile included on the project, as shown:

make run

If the reader doesn’t have – or want – Java installed in your machine, there is also a convenient Docker image provided on Docker hub. Just use run-docker instead of run on make and it will run everything with just Docker.

Since our focus is on Gatling, we will not enter in more discussions about Spring Boot itself and other API details. If the reader doesn’t know Spring Boot, I suggest reading this article.

To run Gatling, we will use a Docker container that will run our simulations (Gatling’s terminology for his test suites). All simulations are inside the project, on a folder called src/gatling/simulations.

Inside the gatling folder, we can also find another two folders, the first one being conf. Gatling ships with a good set of default values for configuration, so we just use this folder to set a default value for our simulations on the reports. If we don’t do this, it will keep asking for a description every time we run it. We can find all possible settings to be used on this link.

The last folder is the reports one. This folder contains the reports generated for us at every run. We will start analyzing the reports soon enough.

All coding from our lab can be found here. Now, let’s begin our simulations!

Working with Gatling

Let’s begin with our first simulation. The simulations are written on Scala. If the reader is not familiar with Scala, I suggest reading my series of articles here.  Our first simulation simply makes a call for each operation of the API, as follows:

import io.gatling.core.Predef._
import io.gatling.http.Predef._
import scala.concurrent.duration._

class CrudSimulation extends Simulation {

  val httpProtocol = http // 1
    .baseUrl("http://api:8080/user") // 2
    .acceptHeader("application/json") // 3
    .userAgentHeader("Mozilla/5.0 (Windows NT 5.1; rv:31.0) " +
    "Gecko/20100101 Firefox/31.0") // 4

  val scn = scenario("CrudSimulation") // 5
    .exec(http("request_get") // 6
    .get("/1")) // 7
    .pause(5) // 8
    .exec(http("request_post")
    .post("/")
    .body(StringBody(
      """{ "name": "MytestDummy",
        | "phone":11938284334 }""".stripMargin)).asJson)
    .pause(5)
    .exec(http("request_patch")
      .patch("/")
      .body(StringBody(
        """{ "id":1, "name": "MytestDummy2",
          |"phone":11938284123 }""".stripMargin)).asJson)
    .pause(5)
    .exec(http("request_get_name")
      .get("/name/MytestDummy"))
    .pause(5)
    .exec(http("request_delete")
      .delete("/1"))
    .pause(5)


  setUp( // 9
    scn.inject(atOnceUsers(1)) // 10
  ).protocols(httpProtocol) // 11
}

Let’s introduce the simulation structure by analyzing the code:

  1. First, we define an http object, which will be used to set the default values for our simulation;
  2. Here we define the base URL for all our calls on the simulation;
  3. Here we define the media header that defines that all our calls will be on JSON;
  4. Here we define the user-agent, this is not so important on an API test, but could be useful for a website that requires testing for different browsers, for example;
  5. Here we create our scenario. Simulations are composed of scenarios, where calls are made;
  6. Here we prepare our first call. By creating an http object, we start defining the call we wish to make;
  7. Here we define we want to make a GET call. On the next lines, we can also see how to make POST and PATCH calls, where a request body is provided;
  8. Here we define a pause of 5 seconds before making the next call;
  9. Here we invoke setUp, which will initialize the scenario to run;
  10. Here we tell how we want to run the scenario. For this simple first simulation, we just want one user to make the calls;
  11. Here we define the protocol used in the scenario. It is here that we pass the first object we created, with all the default values we defined at first.

Gatling also offers a recorder, where you can record a browser session to simulate user iterations. This is useful when we want to create tests for a website, instead of a API. The recorder can be found here.

As we can see, is very easy to create simulations. To run the simulation, just use the following make command:

make run

Like we talked about the run command, there is also a run-docker command, if the reader just want to use Docker

After running Gatling, in the end, we will see a table like the following:

Screen Shot 2020-02-09 at 20.51.44

The table has information summarizing the execution. Some interesting assets are like the average, min and max response times – in milliseconds -, and percentiles. Percentiles are calculations that show for a given data load, the percentage a given value occurs. For example, on the table below, we can see that, for our 5 count requests, 50% of the requests have 24 milliseconds response time. You can read more about percentiles here.

After running, Gatling also generated an HTML report inside the aforementioned reports folder. If we open index.html, we can see not only the data we just talked about but also other information, such as active users during the simulation, response time distribution, etc.

Screen Shot 2020-02-09 at 21.20.09

Now, let’s make another two scenarios. We will start to really make performance tests, both on writing and reading API operations.

We start by making a refactoring. We create some traits – Traits are more or less the equivalent of Java interfaces in Scala, but with some powerful differences – to reuse code and group the scenarios together in a single simulation. First, we create GatlingProtocol trait:

import io.gatling.core.Predef._
import io.gatling.http.Predef._

trait GatlingProtocol {

  val httpProtocol = http
    .baseUrl("http://api:8080/user")
    .acceptHeader("application/json")
    .userAgentHeader("Mozilla/5.0 (Windows NT 5.1; rv:31.0) " +
      "Gecko/20100101 Firefox/31.0")

}

Next, we create a NumberUtils trait, to reuse code we will use on both new scenarios:

trait NumberUtils {

  val leftLimit = 1L
  val rightLimit = 10L
  def generateLong = leftLimit + (Math.random * (rightLimit - leftLimit)).toLong

}

After this refactoring, this is how we code our previously simulation:

import io.gatling.core.Predef._
import io.gatling.http.Predef._
import scala.concurrent.duration._

trait CrudSimulation {

  val crudScn = scenario("CrudScenario")
    .exec(http("request_get")
      .get("/1"))
    .pause(5)
    .exec(http("request_post")
      .post("/")
      .body(StringBody(
        """{ "name": "MytestDummy",
          | "phone":11938284334 }""".stripMargin)).asJson)
    .pause(5)
    .exec(http("request_patch")
      .patch("/")
      .body(StringBody(
        """{ "id":11, "name": "MytestDummy2",
          |"phone":11938284123 }""".stripMargin)).asJson)
    .pause(5)
    .exec(http("request_get_name")
      .get("/name/MytestDummy"))
    .pause(5)
    .exec(http("request_delete")
      .delete("/11"))
    .pause(5)


}

Then, we create two new scenarios, one with reading operations from the API, and another with the writing ones:

import io.gatling.core.Predef._
import io.gatling.http.Predef._
import scala.concurrent.duration._

trait WriteOperationsSimulation extends NumberUtils {

  val writeScn = scenario("WriteScenario")
    .exec(http("request_post")
    .post("/")
    .body(StringBody(
      """{ "name": "MytestDummy",
        | "phone":11938284334 }""".stripMargin)).asJson)
    .pause(5)
    .exec(http("request_patch")
      .patch("/")
      .body(StringBody(
        s"""{ "id":$generateLong, "name": "MytestDummy$generateLong",
          |"phone":11938284123 }""".stripMargin)).asJson)

}
import io.gatling.core.Predef._
import io.gatling.http.Predef._
import scala.concurrent.duration._

trait ReadOperationsSimulation extends NumberUtils {

  val readScn = scenario("ReadScenario")
    .exec(http("request_get")
      .get("/" + generateLong))
    .pause(5)
    .exec(http("request_get_name")
      .get("/name/Alexandre"))
    .pause(5)


}

Finally, we create a runner, which will set all scenarios in a simulation to run:

import io.gatling.core.Predef._
import io.gatling.http.Predef._
import scala.concurrent.duration._

class SimulationRunner extends Simulation with CrudSimulation with ReadOperationsSimulation with WriteOperationsSimulation with GatlingProtocol {

  setUp(
    crudScn.inject(atOnceUsers(1)), 
    readScn.inject(constantUsersPerSec(50) during (5 minutes)), // 1
    writeScn.inject(rampUsers(200) during (2 minutes)) // 2
  ).protocols(httpProtocol)

}

Other new features we can see here are new forms of setting how Gatling will distribute the users to fire our scenarios on the simulation. In our case, we are telling Gatling to:

  1. Start 50 users per second along with the execution, for 5 minutes;
  2. Start users up to 200, evenly distributed across a 2-minute time range.

More examples of configuration scenarios can be found here.

After running the tests again, we can see we get a lot more requests made – and of course, the test gets a lot more to run – and lots of data to analyze:

Screen Shot 2020-02-10 at 20.29.41

Analyzing for bottlenecks

Now, let’s see if we can use our tests to check for possible performance bottlenecks.

If we check the get by name endpoint report, we will see that the max response time is more then 1 second:

Screen Shot 2020-02-10 at 20.38.50

Now let’s imagine that this is not an acceptable response time for our needs since our business requirements declare that this endpoint will be heavily used by the clients. By using Gatling, we could detect the problem, before it was dispatched to production.

In our case, our most likely culprit is the database, since the API is so simple. Let’s try to improve the performance of the search by creating an index on the name field.  After creating the index and re-running the tests, we can see that our performance has improved:

Screen Shot 2020-02-10 at 21.35.24

We can see that now the max response time is below 1 second, solving our performance problem.

Stressing the API

Let’s do one last test before wrapping it up. Let’s do a stress test and see how much more punishment our API can take before starting getting errors from the load.

First we increase the users injected in the scenarios:

import io.gatling.core.Predef._
import io.gatling.http.Predef._
import scala.concurrent.duration._

class SimulationRunner extends Simulation with CrudSimulation with ReadOperationsSimulation with WriteOperationsSimulation with GatlingProtocol {

  setUp(
    crudScn.inject(atOnceUsers(1)),
    readScn.inject(constantUsersPerSec(150) during (5 minutes)),
    writeScn.inject(rampUsers(500) during (2 minutes))
  ).protocols(httpProtocol)

}

Then we run:

Screen Shot 2020-02-11 at 21.57.27

Wow, still no failures! But of course, the degradation is perceptible. Let’s increase a little more:

import io.gatling.core.Predef._
import io.gatling.http.Predef._
import scala.concurrent.duration._

class SimulationRunner extends Simulation with CrudSimulation with ReadOperationsSimulation with WriteOperationsSimulation with GatlingProtocol {

  setUp(
    crudScn.inject(atOnceUsers(1)),
    readScn.inject(constantUsersPerSec(250) during (5 minutes)),
    writeScn.inject(rampUsers(600) during (2 minutes))
  ).protocols(httpProtocol)


}

Screen Shot 2020-02-11 at 22.23.07Now we got some errors! The cause, as we can see from the report, are timeouts from server threads been exhausted due to the massive amount of requests. In a real scenario, we should think of options such as horizontal balancing, reactive streams and such. But since our focus is not on API performance in the post, we will not continue for now. The main point for this little test is to show how Gatling can help us in testing the capacity of our applications.

Conclusion

And so we conclude our lesson. With simple code, we created a powerful tool, that it will help us in testing our API for performance and load tests. Thank you for following me on another article, until next time.

Continue reading

Akka Streams: developing robust applications using Scala

Standard

Hi, dear readers! Welcome to my blog. A long time ago, I wrote a post about the actor model and how to use Akka to implement solutions using actors. If the reader doesn’t read the post, it can be found here. Now, more than 4 years later – how fast time goes! – it is time to revisit this world, with a much better understanding and maturity. At the time, I used good old Java to do the task.

There’s nothing wrong with using Java, but if you really want to delve on Akka, then Scala is the language of choice, especially if we want to use Akka, a project specially tailored to develop data flows that could do tasks such as system integrations.

With non-blocking IO and parallelism embedded at his core – and encouraged to be used on our custom code by following their good practices! – Akka streams allow us to develop really fast applications that can easily scale. From my personal experience, it is a  specially good option for integrating with Apache Kafka.

So, without further delay, let’s begin our journey!

Actor Model

The actor model was already explained in my previous post so we won’t waste much time with this explanation. To sum it up, we have a system where actors work with each other asynchronously, creating a system where tasks are broken down on multiple steps by different actors, each one of them communicating by a personal queue (mailbox) that enqueues messages to be processed by the actors. This way, we have a scalable solution, where tasks are done in parallel.

Akka quick recap

Actors

Actors are the core components of an actor system. An actor consists of a program unit that implements logic based on messages it receives from his mailbox.

When developing Akka applications on Scala, an actor must implement a receive method, where will create logic for different types of messages it can receive. Each time a message arrives at the mailbox, the dispatcher delivers the message to the actor. It is important to notice it, however, that it is the actor which asks for the next message, as it completes processing the current message – by default, each actor process just one message a time -, this way avoiding an actor to be overloaded. This technique is called back-pressure, which we will talk more about it in the next sections.

ActorSystem

The actor system comprehends the whole actor solution. It is composed of actors, dispatchers, and mailboxes.

Applications can have multiple actor systems inside. Also, it is possible to define actor systems to be linked together remotely, forming a cluster.

Execution Context (Dispatchers)

Execution contexts, also known as dispatchers, are responsible for serving actors with messages by delivering the messages to the mailboxes.

Dispatchers are also responsible for allocating the actors themselves, including details such as parallel actor execution, using strategies such as thread pools, for example. A dispatcher can be defined globally for the whole system or defined at actor level.

An important note regarding performance with dispatchers is that they run the actors inside thinner layers that are memory-optimized, so memory consumption inside Akka solutions is lower than in traditional Java applications.

One interesting thing to notice on actor instantiation is how Akka treats actor references. When asking for an actor to be created inside a system, Akka will create an actor reference, which can be used to send messages to it.

These messages are sent using remote calls, even when the actor system is been used all locally. This guarantees that when using actor systems remotely, such as in a cluster, for example, there will be no difference in the code.

Mailboxes are, like the name suggests, repositories to messages that it will be processed by actors. Mailboxes can have different strategies to treat messages, such as unbounded lists, single and multi-consumer, priority queues and more.

Actor Supervisors & Lifecycle

When creating actors, we can create them at the system level or create them inside another actor. When creating inside an actor, we call the parent actor a Supervisor and the actors created inside are called Child Actors (when creating an actor at the system level, also known as Top-level Actors, they also are child actors, in this case from a reserved actor from Akka itself).

Every actor has a lifecycle: it can be started, on which case is running, stopped when an unrecoverable failure occurs and restarted or resumed, depending on the circumstances of the failure.

Supervisors are actors responsible for deciding for their child actors what to do when one of them faces a failure. It is possible to simply stop the actor, restart him, or resume (the main difference between restart and resume is that resources are freed on a restart, while on a resume the actor simply resumes his execution).

These decisions are called Supervisor Policies. These policies can also be set to behavior as one-for-one or all-for-one, meaning that when an error occurs on one child actor, the policy will be applied to all actors bellow (for example, all actors would restart) or just to the failed actor.

And that concludes our quick recap of Akka. Now, let’s begin our talk about Akka Stream.

Stream concepts

A stream is composed of tasks that must be done – continuously or not – in order to do a process. Each stream must have a Source, which is the beginning, a Flow composed of multiple tasks that can run at parallel depending on the needs and a Sink, which is the stream’s end.

Actor materialization

Akka Streams runs on top of Akka. That means when a stream is started, internally Akka Streams creates an actor system with actors responsible for running the tasks of the stream.

The responsibility for doing this task is of the Actor Materializer, that creates (materializes) the resources need to run the stream. One interesting thing is that it is possible to explicitly define materializing points on our flow.

These points are used by Akka streams to define points where it will group the tasks from the flow to run on separated actors, so it is a good technique to keep it in mind when doing stream tunning.

Sources

Sources are flow’s beginnings. A source is used for defining an entrypoint for data, be a finite datasource, such as a file, to an infinite one, such as a Kafka topic, for example. It is possible to zip multiple source definitions on a single combined source for processing, but still, a flow can have only one source.

Flows

Flows are the middle of the stream. One flow can have an infinite number of tasks (steps), that range from data transformation to enrichment by calling external resources.

Sinks

Sinks are flow’s endings. Analogous to sources, sinks can have multiple types of destinations, such as files, Kafka topics, REST endpoints, etc. Likewise the source, flows can also have only one sink.

Graphs

When modeling an Akka stream, as seen previously, we define a source, a sink and several flows in between. All of this generates a graph, where each node represents a task on the stream.

When instantiating a stream, a runnable graph is created, which represents a blueprint for executions. After executing the stream with the run() method for example (there’s also a runWith(Sink) method that accepts a sink as a parameter) the runnable graph is materialized and executed.

During our lab, we will see Graph Stages. Graph stages are like “boxes” that group tasks together, making them look like a single node in the final graph.

Back-pressure

One very important concept when learning about Akka streams is back-pressure. Typically, on a producer-consumer architecture, the producer will keep sending data to the consumer, without really knowing if the consumer is capable of keeping it up with the load or not. This can create a problem where a producer overloads a consumer, generating all kinds of errors and slowness.

With back-pressure, this approach is reversed. Now, it is the consumer that dictates when to receive a new message, by pulling new data at his rhythm. while no new message is asked, the producer keeps waiting for a signal and only then it starts pushing messages again.

The image below illustrates the concept in action:

graph_stage_conceptual

Stream error handling

Of course, just like with an actor system, streams also can fail. Just like with actors, error handling is also done with supervisors, that defines policies for a stream to resume, stop or restart depending on the error.

Streams also support a recovery configuration, that allows us, for example, to chain another stream execution in case of error after several retries.

Alpakka project

The Alpakka project is an integration library composed of several components that allow us to quickly deploy integrations between several technologies, such as files, REST endpoints and even AWS technologies such as Amazon Kinesis. During the course of our lab, we will use resources from this project, so stay tuned for more!

The project documentation can be found in:

https://developer.lightbend.com/docs/alpakka/current/

Lab

Pre-requisites

So, without further delay, let’s begin! This lab requires that the reader already have some knowledge of Scala and Akka. On my blog, it is possible to read my previous post on Akka, alongside my series about the Scala language.

Creating the project & infrastructure code

To create the project, let’s begin by just creating the sbt file that will hold our project’s dependencies.  We will start by creating a folder that will hold our project (all sources for the lab can be found here) and type the following on a file called build.sbt:

name := "Akka-stream-lab"
version := "1.0"
scalaVersion := "2.12.5"

enablePlugins(JavaAppPackaging)

mainClass in Compile := Some("Main")

lazy val Versions = new {
  val akkaVersion = "2.5.11"
}

lazy val akkaDependencies = Seq(
  "com.lightbend.akka" %% "akka-stream-alpakka-csv" % "0.8",
  "com.typesafe.akka" %% "akka-stream-kafka" % "0.22",
  "com.typesafe.akka" %% "akka-actor" % Versions.akkaVersion,
  "com.typesafe.akka" %% "akka-stream" % Versions.akkaVersion,
  "com.typesafe.akka" %% "akka-slf4j" % Versions.akkaVersion,
  "com.typesafe.akka" %% "akka-testkit" % Versions.akkaVersion % Test,
  "com.typesafe.akka" %% "akka-stream-testkit" % Versions.akkaVersion % Test,
  "com.typesafe.akka" %% "akka-testkit" % Versions.akkaVersion % Test
)

lazy val testDependencies = Seq(
  "org.scalacheck" %% "scalacheck" % "1.13.4" % Test,
  "org.scalamock" %% "scalamock" % "4.1.0" % Test,
  "org.mockito" % "mockito-core" % "2.19.0" % Test,
  "org.scalatest" %% "scalatest" % "3.0.5" % Test
)

lazy val loggingDependencies = Seq(
  "ch.qos.logback" % "logback-classic" % "1.2.3",
  "com.typesafe.scala-logging" %% "scala-logging" % "3.9.0",
  "org.slf4j" % "slf4j-api" % "1.7.25"
)

lazy val otherDependencies = Seq(
  "io.spray" %% "spray-json" % "1.3.5"
)

libraryDependencies ++= (
  akkaDependencies++
  loggingDependencies++
  testDependencies++
  otherDependencies
  )

As can be seen above, not only we defined a sbt project, but also included dependencies for Akka and logging, alongside Akka Streams itself. We also added a packaging plugin to simplify our use when running the project from command-line.

In order to use the plugin, we need to add it to sbt project’s definition. To do that, we create a project folder and inside create an plugins.sbt file, with the following:


logLevel := Level.Warn

addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.3.2")

Finally, we create our main Scala object that will be the App launcher for our project. We create a Scala source folder and add a Main.scala file, containing the following:


import akka.actor.ActorSystem

object Main extends App {
implicit val system: ActorSystem = ActorSystem("akka-streams-lab")
}

Our first version is very simple: It simply creates a new actor system. Of course, during our lab, it will receive more code to evolve to our final solution.

Concluding the setup, this would be the first structure of our project when seen (some other files, such as sbt’s build.properties, are created automatically when running the project using sbt):

Screen Shot 2018-12-23 at 15.44.04

This image is taken from Intellij, which I recommend as the IDE for the lab.

Without spoiling too much, as we can see in our next section, we will need some infrastructure code that will set it up a Kafka cluster for our use in the lab. For this, we will use Docker Compose.

So, let’s begin by creating a docker-compose.yml file, that will create our Kafka cluster. The file will be as follows:


version: '2'
services:
  zookeeper:
    image: wurstmeister/zookeeper:3.4.6
    ports:
      - 2181:2181
  kafka:
    image: wurstmeister/kafka:1.1.0
    ports:
      - 9092:9092
    environment:
      KAFKA_ADVERTISED_HOST_NAME: localhost
      KAFKA_CREATE_TOPICS: "accounts:1:1"
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

In this simple docker-compose file, we create an embedded cluster with a broker node and a Zookeeper node and also create a accounts topic at startup. To test it out our code, with docker up and running, we can start a cluster by running:


docker-compose up -d

Finally, let’s create a logback config file for our logging. To do this, let’s create a file called logback.xml inside resources folder and enter the following:

%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} – %msg%n

That’s it! now that we have our infrastructure code ready, let’s begin the actual coding.

Lab’s use case

In our lab, we will create 2 streams. One of them will read data from a file and publish to a Kafka topic. The other one will read from this same topic and save the message on other files, this way demonstrating the flow of data from a point A to a point B.

Creating the first stream

Let’s begin by creating the first stream, which is a simple stream that will read data (accounts in our case) from a file and send to Kafka. At first, let’s code the stream in the main code itself and evolve as we develop.

First, we create a file called input1.csv and add the following:

“COD”,”NAME”,”DOCUMENT”,”AGE”,”CIVIL_STATUS”,”PHONE”,”BIRTHDAY”,”HOME_COUNTRY”,”HOME_STATE”,”HOME_CITY”,”HOME_STREET”,”HOME_STREETNUM”,”HOME_NEIGHBORHOOD”
1,”alexandre eleuterio santos lourenco 1″,”43456754356″,36,”Single”,”+5511234433443″,”21/06/1982″,”Brazil”,”São Paulo”,”São Paulo”,”false st”,3134,”neigh 1″
2,”alexandre eleuterio santos lourenco 2″,”43456754376″,37,”Single”,”+5511234433444″,”21/06/1983″,”Brazil”,”São Paulo”,”São Paulo”,”false st”,3135,”neigh 1″

That will be the csv input file for our first stream. Next, we create a case class Account to encapsulate the data from the file:


package com.alexandreesl.model

case class Account(cod: Long, name: String, document: String,
age: Int, civilStatus: String,
                   phone: String, birthday: String,
country: String, state: String,
                   city: String, street: String,
streetNum: Long, neighBorhood: String)

We also create an object to allow JSON marshaling/unmarshalling when sending/receiving messages in Kafka:


package com.alexandreesl.json

import com.alexandreesl.model.Account
import spray.json.DefaultJsonProtocol

object JsonParsing extends DefaultJsonProtocol {

  implicit val accountFormat = jsonFormat13(Account)

}

Finally, we code the stream that reads from the file and publishes to Kafka (don’t you worry, we will refactor later):


import java.nio.file.Paths

import akka.actor.ActorSystem
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.scaladsl.{FileIO, Flow, Framing}
import akka.stream.{ActorMaterializer, ActorMaterializerSettings}
import akka.util.ByteString
import com.alexandreesl.model.Account
import com.typesafe.scalalogging.Logger
import org.apache.kafka.common.serialization.StringSerializer
import org.slf4j.LoggerFactory
import spray.json._
import com.alexandreesl.json.JsonParsing._
import org.apache.kafka.clients.producer.ProducerRecord
object Main extends App {

implicit val system: ActorSystem = ActorSystem("akka-streams-lab")
implicit val materializer: ActorMaterializer = ActorMaterializer(ActorMaterializerSettings(system))
implicit val ec = system.dispatcher
private val logger = Logger(LoggerFactory.getLogger("Main"))
val config = system.settings.config.getConfig("akka.kafka.producer")
val producerSettings =
ProducerSettings(config, new StringSerializer, new StringSerializer)
.withBootstrapServers("localhost:9092")

logger.info("Starting streams...")

private def convertToClass(csv: Array[String]): Account = {
Account(csv(0).toLong,
csv(1), csv(2),
csv(3).toInt, csv(4),
csv(5), csv(6),
csv(7), csv(8),
csv(9), csv(10),
csv(11).toLong, csv(12))
}

private val flow = Flow[String].filter(s => !s.contains("COD"))
.map(line => {
convertToClass(line.split(","))
})

FileIO.fromPath(Paths.get("input1.csv"))
.via(Framing.delimiter(ByteString("\n"), 4096)
.map(_.utf8String))
.via(flow)
.map(value => new ProducerRecord[String, String]("accounts", value.toJson.compactPrint))
.runWith(Producer.plainSink(producerSettings))

logger.info("Stream system is initialized!")

}

As we can see, the code is pretty straight-forward. We just added a stream that reads from a csv file and dispatches the lines to Kafka. To see the messages on Kafka, first, we start the application – don’t forget to run docker compose up first! -, and them we can use a shell inside the broker’s container, as follows:

docker exec -t -i akka-stream-lab_kafka_1 /opt/kafka/bin/kafka-console-consumer.sh –bootstrap-server :9092 –topic accounts –from-beginning

This will produce an output as follows:

{“age”:36,”birthday”:”\”21/06/1982\””,”city”:”\”São Paulo\””,”civilStatus”:”\”Single\””,”cod”:1,”country”:”\”Brazil\””,”document”:”\”43456754356\””,”name”:”\”alexandre eleuterio santos lourenco 1\””,”neighBorhood”:”\”neigh 1\””,”phone”:”\”+5511234433443\””,”state”:”\”São Paulo\””,”street”:”\”false st\””,”streetNum”:3134}
{“age”:37,”birthday”:”\”21/06/1983\””,”city”:”\”São Paulo\””,”civilStatus”:”\”Single\””,”cod”:2,”country”:”\”Brazil\””,”document”:”\”43456754376\””,”name”:”\”alexandre eleuterio santos lourenco 2\””,”neighBorhood”:”\”neigh 1\””,”phone”:”\”+5511234433444\””,”state”:”\”São Paulo\””,”street”:”\”false st\””,”streetNum”:3135}

Now that we have coded our first stream, let’s create our second stream. Don’t you worry about the messing code right now, we will refactor later when implementing error handling using actors.

Creating the second Stream

Now, let’s create our second stream. This stream will read from Kafka and generate two files, one with personal data and another with address data.

For this, we will use a graph stage, that will make the file write in parallel. We will disable autocommit for Kafka consumption and commit only at the end.

Before creating the stage, let’s do our first refactoring, by moving the Account case class to a GraphMessages object, which will hold all case classes we will use on our coding:

package com.alexandreesl.graph

import akka.kafka.ConsumerMessage

object GraphMessages {

  case class Account(cod: Long, name: String, document: String,
                     age: Int, civilStatus: String,
                     phone: String, birthday: String,
                     country: String, state: String,
                     city: String, street: String,
                     streetNum: Long, neighBorhood: String)

  case class InputMessage(acc: Account,
                          offset: ConsumerMessage.CommittableOffset)

  case class AccountPersonalData(cod: Long, name: String,
                                 document: String, age: Int,
                                 civilStatus: String,
                                 phone: String, birthday: String)

  case class AccountAddressData(cod: Long, country: String,
                                state: String, city: String,
                                street: String, streetNum: Long,
                                neighBorhood: String)

}

We also update our Json protocol accordingly, since we will use JSON marshaling for the other classes as well:

package com.alexandreesl.json

import com.alexandreesl.graph.GraphMessages.{Account, AccountAddressData, AccountPersonalData}
import spray.json.DefaultJsonProtocol

object JsonParsing extends DefaultJsonProtocol {

  implicit val accountFormat = jsonFormat13(Account)
  implicit val accountPersonalFormat = jsonFormat7(AccountPersonalData)
  implicit val accountAddressFormat = jsonFormat7(AccountAddressData)

}

Finally, let’s create our graph stage. Notice the ~> symbol? That symbol is used inside the graph stage builder to create the stage flow. This allows us to code flows in a visual manner, making a lot simpler to design stream flows.

package com.alexandreesl.graph

import java.nio.file.{Paths, StandardOpenOption}

import akka.actor.ActorSystem
import akka.kafka.ConsumerMessage
import akka.stream.{ActorMaterializer, FlowShape}
import akka.stream.scaladsl.{Broadcast, FileIO, Flow, GraphDSL, Source, Zip}
import akka.util.ByteString
import com.alexandreesl.graph.GraphMessages.{Account, AccountAddressData, AccountPersonalData, InputMessage}
import spray.json._
import com.alexandreesl.json.JsonParsing._

object AccountWriterGraphStage {

  val personal = Paths.get("personal.csv")
  val address = Paths.get("address.csv")

  def graph(implicit system: ActorSystem, materializer: ActorMaterializer) =
Flow.fromGraph(GraphDSL.create() { implicit builder =>

    import GraphDSL.Implicits._

    val flowPersonal = Flow[InputMessage].map(msg => {
      Source.single(AccountPersonalData(msg.acc.cod,
        msg.acc.name, msg.acc.document, msg.acc.age,
msg.acc.civilStatus, msg.acc.phone,
msg.acc.birthday).toJson.compactPrint + "\n")
        .map(t => ByteString(t))
        .runWith(FileIO.toPath(personal,
Set(StandardOpenOption.CREATE, StandardOpenOption.APPEND)))
      msg.acc
    })

    val flowAddress = Flow[InputMessage].map(msg => {
      Source.single(AccountAddressData(msg.acc.cod,
        msg.acc.country, msg.acc.state,
msg.acc.city, msg.acc.street, msg.acc.streetNum,
msg.acc.neighBorhood).toJson.compactPrint + "\n")
        .map(t => ByteString(t))
        .runWith(FileIO.toPath(address,
Set(StandardOpenOption.CREATE, StandardOpenOption.APPEND)))
      msg.offset
    })

    val bcastJson = builder.add(Broadcast[InputMessage](2))
    val zip = builder.add(Zip[Account, ConsumerMessage.CommittableOffset])

    bcastJson ~> flowPersonal ~> zip.in0
    bcastJson ~> flowAddress ~> zip.in1

    FlowShape(bcastJson.in, zip.out)

  })

}

On our stage, we created two flows that executed in parallel a broadcast, each one passing through one value from the original input message. In the end, we use zip to generate a tuple from the two objects that will be passed to the next stage. Finally, let’s create our stream, which will be using the graph stage as part of the stream. I promise this will be the last time we will see that big messy main object: next section we will start refactoring.

import java.nio.file.Paths

import akka.actor.ActorSystem
import akka.kafka.{ConsumerSettings, ProducerSettings, Subscriptions}
import akka.kafka.scaladsl.{Consumer, Producer}
import akka.stream.scaladsl.{FileIO, Flow, Framing, Sink}
import akka.stream.{ActorMaterializer, ActorMaterializerSettings}
import akka.util.ByteString
import com.alexandreesl.graph.AccountWriterGraphStage
import com.alexandreesl.graph.GraphMessages.{Account, InputMessage}
import com.typesafe.scalalogging.Logger
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
import org.slf4j.LoggerFactory
import spray.json._
import com.alexandreesl.json.JsonParsing._
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerRecord

import scala.concurrent.Future

object Main extends App {

  implicit val system: ActorSystem = ActorSystem("akka-streams-lab")
  implicit val materializer: ActorMaterializer = ActorMaterializer(ActorMaterializerSettings(system))
  implicit val ec = system.dispatcher
  private val logger = Logger(LoggerFactory.getLogger("Main"))
  val configProducer = system.settings.config.getConfig("akka.kafka.producer")
  val producerSettings =
    ProducerSettings(configProducer, new StringSerializer, new StringSerializer)
      .withBootstrapServers("localhost:9092")
  val configConsumer = system.settings.config.getConfig("akka.kafka.consumer")
  val consumerSettings =
    ConsumerSettings(configConsumer, new StringDeserializer, new StringDeserializer)
      .withBootstrapServers("localhost:9092")
      .withGroupId("group1")
      .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

  logger.info("Starting streams...")

  private def convertToClass(csv: Array[String]): Account = {
    Account(csv(0).toLong,
      csv(1), csv(2),
      csv(3).toInt, csv(4),
      csv(5), csv(6),
      csv(7), csv(8),
      csv(9), csv(10),
      csv(11).toLong, csv(12))
  }

  private val flow = Flow[String].filter(s => !s.contains("COD"))
    .map(line => {
      convertToClass(line.split(","))
    })

  FileIO.fromPath(Paths.get("input1.csv"))
    .via(Framing.delimiter(ByteString("\n"), 4096)
      .map(_.utf8String))
    .via(flow)
    .map(value => new ProducerRecord[String, String]("accounts", value.toJson.compactPrint))
    .runWith(Producer.plainSink(producerSettings))

  Consumer
    .committableSource(consumerSettings, Subscriptions.topics("accounts"))
    .mapAsync(10)(msg =>
      Future.successful(InputMessage(msg.record.value.parseJson.convertTo[Account], msg.committableOffset))
    ).via(AccountWriterGraphStage.graph)
    .mapAsync(10) { tuple =>
      val acc = tuple._1
      logger.info(s"persisted Account: $acc")
      tuple._2.commitScaladsl()
    }.runWith(Sink.ignore)

  logger.info("Stream system is initialized!")

}

That’s it! since we started both streams in the main application, it is possible to test the stream simply by running the application, which will make the first stream to enqueue 2 messages, that it will be dequeued by the other stream. If we see the logs, we will see the following:

[INFO] [12/27/2018 23:59:52.395] [akka-streams-lab-akka.actor.default-dispatcher-3] [SingleSourceLogic(akka://akka-streams-lab)] Assigned partitions: Set(accounts-0). All partitions: Set(accounts-0)
23:59:52.436 [akka-streams-lab-akka.actor.default-dispatcher-2] INFO Main – persisted Account: Account(1,”alexandre eleuterio santos lourenco 1″,”43456754356″,36,”Single”,”+5511234433443″,”21/06/1982″,”Brazil”,”São Paulo”,”São Paulo”,”false st”,3134,”neigh 1″)
23:59:52.449 [akka-streams-lab-akka.actor.default-dispatcher-2] INFO Main – persisted Account: Account(2,”alexandre eleuterio santos lourenco 2″,”43456754376″,37,”Single”,”+5511234433444″,”21/06/1983″,”Brazil”,”São Paulo”,”São Paulo”,”false st”,3135,”neigh 1″)

And if we inspect the files, we will see that it wrote the data accordingly, as we can see below:

personal.csv:

{“age”:36,”birthday”:”\”21/06/1982\””,”civilStatus”:”\”Single\””,”cod”:1,”document”:”\”43456754356\””,”name”:”\”alexandre eleuterio santos lourenco 1\””,”phone”:”\”+5511234433443\””}
{“age”:37,”birthday”:”\”21/06/1983\””,”civilStatus”:”\”Single\””,”cod”:2,”document”:”\”43456754376\””,”name”:”\”alexandre eleuterio santos lourenco 2\””,”phone”:”\”+5511234433444\””}

address.csv:

{“city”:”\”São Paulo\””,”cod”:1,”country”:”\”Brazil\””,”neighBorhood”:”\”neigh 1\””,”state”:”\”São Paulo\””,”street”:”\”false st\””,”streetNum”:3134}
{“city”:”\”São Paulo\””,”cod”:2,”country”:”\”Brazil\””,”neighBorhood”:”\”neigh 1\””,”state”:”\”São Paulo\””,”street”:”\”false st\””,”streetNum”:3135}

Now, let’s refactor this code to be more maintainable and introduce error handling.

Implementing error handling

In order to easier error handling, we will move our streams to actors. Them we will create a supervisor, defining what to do when an error occurs.

Let’s begin by creating the first actor, called KafkaImporterActor  and move the stream:


package com.alexandreesl.actor

import java.nio.file.Paths

import akka.actor.{Actor, ActorLogging, ActorSystem, Props}
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{FileIO, Flow, Framing}
import akka.util.ByteString
import com.alexandreesl.actor.KafkaImporterActor.Start
import com.alexandreesl.graph.GraphMessages.Account
import com.alexandreesl.json.JsonParsing._
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.StringSerializer
import spray.json._

import scala.concurrent.ExecutionContextExecutor

class KafkaImporterActor extends Actor with ActorLogging {

implicit val actorSystem: ActorSystem = context.system
implicit val materializer: ActorMaterializer = ActorMaterializer()
implicit val dispatcher: ExecutionContextExecutor = context.system.dispatcher

private def convertToClass(csv: Array[String]): Account = {
Account(csv(0).toLong,
csv(1), csv(2),
csv(3).toInt, csv(4),
csv(5), csv(6),
csv(7), csv(8),
csv(9), csv(10),
csv(11).toLong, csv(12))
}

private val flow = Flow[String].filter(s => !s.contains("COD"))
.map(line => {
convertToClass(line.split(","))
})
private val configProducer = actorSystem.settings.config.getConfig("akka.kafka.producer")
private val producerSettings =
ProducerSettings(configProducer, new StringSerializer, new StringSerializer)
.withBootstrapServers("localhost:9092")

override def preStart(): Unit = {
self ! Start
}

override def receive: Receive = {
case Start =>
FileIO.fromPath(Paths.get("input1.csv"))
.via(Framing.delimiter(ByteString("\n"), 4096)
.map(_.utf8String))
.via(flow)
.map(value => new ProducerRecord[String, String]("accounts", value.toJson.compactPrint))
.runWith(Producer.plainSink(producerSettings))

}

}

object KafkaImporterActor {

val name = "Kafka-Importer-actor"

def props = Props(new KafkaImporterActor)

case object Start

}

On this actor, we just moved our configurations and created a receive method. On that method, we created a message case that it is fired at actor startup, making the actor starting up the stream as soon it is instantiated.

Now, let’s do the same to the other stream:


package com.alexandreesl.actor

import akka.actor.{Actor, ActorLogging, ActorSystem, Props}
import akka.kafka.{ConsumerSettings, Subscriptions}
import akka.kafka.scaladsl.Consumer
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import com.alexandreesl.actor.KafkaExporterActor.Start
import com.alexandreesl.graph.AccountWriterGraphStage
import com.alexandreesl.graph.GraphMessages.{Account, InputMessage}
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import spray.json._
import com.alexandreesl.json.JsonParsing._

import scala.concurrent.{ExecutionContextExecutor, Future}

class KafkaExporterActor extends Actor with ActorLogging {

implicit val actorSystem: ActorSystem = context.system
implicit val materializer: ActorMaterializer = ActorMaterializer()
implicit val dispatcher: ExecutionContextExecutor = context.system.dispatcher
private val configConsumer = actorSystem.settings.config.getConfig("akka.kafka.consumer")
private val consumerSettings =
ConsumerSettings(configConsumer, new StringDeserializer, new StringDeserializer)
.withBootstrapServers("localhost:9092")
.withGroupId("group1")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

override def preStart(): Unit = {
self ! Start
}

override def receive: Receive = {
case Start =>
Consumer
.committableSource(consumerSettings, Subscriptions.topics("accounts"))
.mapAsync(10)(msg =>
Future.successful(InputMessage(msg.record.value.parseJson.convertTo[Account], msg.committableOffset))
).via(AccountWriterGraphStage.graph)
.mapAsync(10) { tuple =>
val acc = tuple._1
log.info(s"persisted Account: $acc")
tuple._2.commitScaladsl()
}.runWith(Sink.ignore)

}

}

object KafkaExporterActor {

val name = "Kafka-Exporter-actor"

def props = Props(new KafkaExporterActor)

case object Start

}

Finally, as we moved the code to actors, we now can simplify the main object to be just this:


import akka.actor.ActorSystem
import com.alexandreesl.actor.{KafkaExporterActor, KafkaImporterActor}
import com.typesafe.scalalogging.Logger
import org.slf4j.LoggerFactory

object Main extends App {

implicit val system: ActorSystem = ActorSystem("akka-streams-lab")
private val logger = Logger(LoggerFactory.getLogger("Main"))

logger.info("Starting streams...")

system.actorOf(KafkaImporterActor.props, KafkaImporterActor.name)
system.actorOf(KafkaExporterActor.props, KafkaExporterActor.name)

logger.info("Stream system is initialized!")

}

If we run the application again, we will see that it runs just like before, proving our refactoring was a success.

Now that our code is better organized, let’s introduce a supervisor to guarantee error handling. We will define a supervisor strategy in our actors and backoff policies to make the actors restart gradually slower as errors repeat, for example, to wait for Kafka to recover from a shutdown.

To do this, we change our main object code like this:


import akka.actor.{ActorSystem, OneForOneStrategy, Props, SupervisorStrategy}
import akka.pattern.{Backoff, BackoffSupervisor}
import com.alexandreesl.actor.{KafkaExporterActor, KafkaImporterActor}
import com.typesafe.scalalogging.Logger
import org.slf4j.LoggerFactory

import scala.concurrent.duration._

object Main extends App {

implicit val system: ActorSystem = ActorSystem("akka-streams-lab")
private val logger = Logger(LoggerFactory.getLogger("Main"))

logger.info("Starting streams...")

private val supervisorStrategy = OneForOneStrategy() {
case ex: Exception =>
logger.info(s"exception: $ex")
SupervisorStrategy.Restart

}
private val importerProps: Props = BackoffSupervisor.props(
Backoff.onStop(
childProps = KafkaImporterActor.props,
childName = KafkaImporterActor.name,
minBackoff = 3.seconds,
maxBackoff = 30.seconds,
randomFactor = 0.2
).withSupervisorStrategy(supervisorStrategy)
)
private val exporterProps: Props = BackoffSupervisor.props(
Backoff.onStop(
childProps = KafkaExporterActor.props,
childName = KafkaExporterActor.name,
minBackoff = 3.seconds,
maxBackoff = 30.seconds,
randomFactor = 0.2
).withSupervisorStrategy(supervisorStrategy)
)

system.actorOf(importerProps, "Kafka-importer")
system.actorOf(exporterProps, "Kafka-exporter")

logger.info("Stream system is initialized!")

}

On our code, we now defined backoff policies that start restarting after 3 seconds, all up to 30 seconds, randomly scaling up the time between retries after each retry. As supervisor policy, we defined a OneForOne strategy, meaning that if one of the actors restart, only the faulty actor will be affected by the policy.

Finally, we define a simple policy where any errors that occur will be logged and the actor will be restarted. Since errors in the stream will also escalate to the encapsulating actor, this means that errors in the stream will also make the actor fail, causing a restart.

To make this escalation to work,  we need to change our actors to make the errors inside the streams to propagate. To do this, we change the actors as follows, adding code to check the status from the stream’s futures:

KafkaImporterActor


package com.alexandreesl.actor

import java.nio.file.Paths

import akka.actor.{Actor, ActorLogging, ActorSystem, PoisonPill, Props}
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{FileIO, Flow, Framing}
import akka.util.ByteString
import com.alexandreesl.actor.KafkaImporterActor.Start
import com.alexandreesl.graph.GraphMessages.Account
import com.alexandreesl.json.JsonParsing._
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.StringSerializer
import spray.json._

import scala.concurrent.ExecutionContextExecutor
import scala.util.{Failure, Success}

class KafkaImporterActor extends Actor with ActorLogging {

implicit val actorSystem: ActorSystem = context.system
implicit val materializer: ActorMaterializer = ActorMaterializer()
implicit val dispatcher: ExecutionContextExecutor = context.system.dispatcher

private def convertToClass(csv: Array[String]): Account = {
Account(csv(0).toLong,
csv(1), csv(2),
csv(3).toInt, csv(4),
csv(5), csv(6),
csv(7), csv(8),
csv(9), csv(10),
csv(11).toLong, csv(12))
}

private val flow = Flow[String].filter(s => !s.contains("COD"))
.map(line => {
convertToClass(line.split(","))
})
private val configProducer = actorSystem.settings.config.getConfig("akka.kafka.producer")
private val producerSettings =
ProducerSettings(configProducer, new StringSerializer, new StringSerializer)
.withBootstrapServers("localhost:9092")

override def preStart(): Unit = {
self ! Start
}

override def receive: Receive = {
case Start =>
val done = FileIO.fromPath(Paths.get("input1.csv"))
.via(Framing.delimiter(ByteString("\n"), 4096)
.map(_.utf8String))
.via(flow)
.map(value => new ProducerRecord[String, String]("accounts", value.toJson.compactPrint))
.runWith(Producer.plainSink(producerSettings))
done onComplete {
case Success(_) =>
log.info("I completed successfully, I am so happy :)")
case Failure(ex) =>
log.error(ex, "I received a error! Goodbye cruel world!")
self ! PoisonPill
}

}

}

object KafkaImporterActor {

val name = "Kafka-Importer-actor"

def props = Props(new KafkaImporterActor)

case object Start

}

KafkaExporterActor


package com.alexandreesl.actor

import akka.actor.{Actor, ActorLogging, ActorSystem, PoisonPill, Props}
import akka.kafka.{ConsumerSettings, Subscriptions}
import akka.kafka.scaladsl.Consumer
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import com.alexandreesl.actor.KafkaExporterActor.Start
import com.alexandreesl.graph.AccountWriterGraphStage
import com.alexandreesl.graph.GraphMessages.{Account, InputMessage}
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import spray.json._
import com.alexandreesl.json.JsonParsing._

import scala.concurrent.{ExecutionContextExecutor, Future}
import scala.util.{Failure, Success}

class KafkaExporterActor extends Actor with ActorLogging {

implicit val actorSystem: ActorSystem = context.system
implicit val materializer: ActorMaterializer = ActorMaterializer()
implicit val dispatcher: ExecutionContextExecutor = context.system.dispatcher
private val configConsumer = actorSystem.settings.config.getConfig("akka.kafka.consumer")
private val consumerSettings =
ConsumerSettings(configConsumer, new StringDeserializer, new StringDeserializer)
.withBootstrapServers("localhost:9092")
.withGroupId("group1")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

override def preStart(): Unit = {
self ! Start
}

override def receive: Receive = {
case Start =>
val done = Consumer
.committableSource(consumerSettings, Subscriptions.topics("accounts"))
.mapAsync(10)(msg =>
Future.successful(InputMessage(msg.record.value.parseJson.convertTo[Account], msg.committableOffset))
).via(AccountWriterGraphStage.graph)
.mapAsync(10) { tuple =>
val acc = tuple._1
log.info(s"persisted Account: $acc")
tuple._2.commitScaladsl()
}.runWith(Sink.ignore)
done onComplete {
case Success(_) =>
log.info("I completed successfully, I am so happy :)")
case Failure(ex) =>
log.error(ex, "I received a error! Goodbye cruel world!")
self ! PoisonPill
}

}

}

object KafkaExporterActor {

val name = "Kafka-Exporter-actor"

def props = Props(new KafkaExporterActor)

case object Start

}

Finally, let’s test it out. We begin by shutting down Kafka without stopping our application, by running:

docker-compose down

If we look at the logs, we will see the streams will start complaining about not connecting to Kafka. After some time, we will get an actor terminated error, caused by the poison pill we make it swallow:

[INFO] [12/28/2018 22:23:54.236] [akka-streams-lab-akka.actor.default-dispatcher-4] [akka://akka-streams-lab/system/kafka-consumer-1] Message [akka.kafka.KafkaConsumerActor$Stop$] without sender to Actor[akka://akka-streams-lab/system/kafka-consumer-1#1868012916] was not delivered. [2] dead letters encountered. If this is not an expected behavior, then [Actor[akka://akka-streams-lab/system/kafka-consumer-1#1868012916]] may have terminated unexpectedly, This logging can be turned off or adjusted with configuration settings ‘akka.log-dead-letters’ and ‘akka.log-dead-letters-during-shutdown’.
[ERROR] [12/28/2018 22:23:54.236] [akka-streams-lab-akka.actor.default-dispatcher-5] [akka://akka-streams-lab/user/Kafka-exporter/Kafka-Exporter-actor] I received a error! Goodbye cruel world!
akka.kafka.ConsumerFailed: Consumer actor terminated
at akka.kafka.internal.SingleSourceLogic.$anonfun$preStart$1(SingleSourceLogic.scala:66)
at akka.kafka.internal.SingleSourceLogic.$anonfun$preStart$1$adapted(SingleSourceLogic.scala:53)
at akka.stream.stage.GraphStageLogic$StageActor.internalReceive(GraphStage.scala:230)
at akka.stream.stage.GraphStageLogic$StageActor.$anonfun$callback$1(GraphStage.scala:198)
at akka.stream.stage.GraphStageLogic$StageActor.$anonfun$callback$1$adapted(GraphStage.scala:198)
at akka.stream.impl.fusing.GraphInterpreter.runAsyncInput(GraphInterpreter.scala:454)
at

If we just keep watching, we will see this cycle endless repeating, as streams are restarted, they fail to connect to Kafka and the poison pill is swallowed again.

To make the application come back again, let’s restart our Kafka cluster with:

docker-compose up -d

We will see that after Kafka returns, the streams will resume to normal.

And that concludes our error handling code. Of course, that it is not all we can do in this field. Another interesting error handling technique that can be used in some cases is recovering, where we can define another stream to be executed in case of a failure, as a circuit breaker. This can be seen in more detail here.

Finally, let’s test our Package plugin, by running the code in the terminal. Let’s open a terminal and run the following:

sbt stage

This will prepare our application, including a shell script to run the application. We can run it by typing:

./target/universal/stage/bin/akka-stream-lab

After entering, we will see that our application will run just like in Intellij:

Screen Shot 2019-01-01 at 12.12.15

Automated testing the stream

Finally, to wrap it up, we will see how to test our streams. Automated tests are important to code’s sturdiness, also allowing CI pipelines to be implemented efficiently.

Streams can be tested by using probes to run the streams and check the results. Let’s start by creating a test for the converter flow that generates accounts from csv lines – the rest of the code would just be testing third-party libraries so we will focus on our own code only  – and next, we will test our graph stage.

On our tests, we will use several traits to add support to several features we will/can use. It is good practice to join all traits inside a single one so our test classes won’t have a big single line of trait declarations.

So let’s begin by creating our trait:


package com.alexandreesl.test

import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, Matchers, WordSpecLike}

trait TestEnvironment extends WordSpecLike with Matchers 

with BeforeAndAfter with BeforeAndAfterAll

Before coding the test, we will move the flow on KafkaImporterActor to the object companion, this way allowing us to reference in the test:


package com.alexandreesl.actor

import java.nio.file.Paths

import akka.actor.{Actor, ActorLogging, ActorSystem, PoisonPill, Props}
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{FileIO, Flow, Framing}
import akka.util.ByteString
import com.alexandreesl.actor.KafkaImporterActor.Start
import com.alexandreesl.graph.GraphMessages.Account
import com.alexandreesl.json.JsonParsing._
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.StringSerializer
import spray.json._

import scala.concurrent.ExecutionContextExecutor
import scala.util.{Failure, Success}

class KafkaImporterActor extends Actor with ActorLogging {

implicit val actorSystem: ActorSystem = context.system
implicit val materializer: ActorMaterializer = ActorMaterializer()
implicit val dispatcher: ExecutionContextExecutor = context.system.dispatcher

private val configProducer = actorSystem.settings.config.getConfig("akka.kafka.producer")
private val producerSettings =
ProducerSettings(configProducer, new StringSerializer, new StringSerializer)
.withBootstrapServers("localhost:9092")

override def preStart(): Unit = {
self ! Start
}

override def receive: Receive = {
case Start =>
val done = FileIO.fromPath(Paths.get("input1.csv"))
.via(Framing.delimiter(ByteString("\n"), 4096)
.map(_.utf8String))
.via(KafkaImporterActor.flow)
.map(value => new ProducerRecord[String, String]("accounts", value.toJson.compactPrint))
.runWith(Producer.plainSink(producerSettings))
done onComplete {
case Success(_) =>
log.info("I completed successfully, I am so happy :)")
case Failure(ex) =>
log.error(ex, "I received a error! Goodbye cruel world!")
self ! PoisonPill
}

}

}

object KafkaImporterActor {

private def convertToClass(csv: Array[String]): Account = {
Account(csv(0).toLong,
csv(1), csv(2),
csv(3).toInt, csv(4),
csv(5), csv(6),
csv(7), csv(8),
csv(9), csv(10),
csv(11).toLong, csv(12))
}

val flow = Flow[String].filter(s => !s.contains("COD"))
.map(line => {
convertToClass(line.split(","))
})

val name = "Kafka-Importer-actor"

def props = Props(new KafkaImporterActor)

case object Start

}

Next, we will implement an different equals method on Account class so it will work properly in test assertations:


package com.alexandreesl.graph

import akka.kafka.ConsumerMessage

object GraphMessages {

case class Account(cod: Long, name: String, document: String, age: Int, civilStatus: String,
phone: String, birthday: String, country: String, state: String,
city: String, street: String, streetNum: Long, neighBorhood: String) {
override def equals(that: Any): Boolean =
that match {
case that: Account => that.canEqual(this) && that.cod == this.cod
case _ => false
}
}

case class InputMessage(acc: Account, offset: ConsumerMessage.CommittableOffset)

case class AccountPersonalData(cod: Long, name: String, document: String, age: Int, civilStatus: String,
phone: String, birthday: String)

case class AccountAddressData(cod: Long, country: String, state: String,
city: String, street: String, streetNum: Long, neighBorhood: String)

}

Finally, let’s code the test:


package com.alexandreesl.test.importer

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
import akka.testkit.{TestKit, TestProbe}
import com.alexandreesl.actor.KafkaImporterActor
import com.alexandreesl.graph.GraphMessages.Account
import com.alexandreesl.test.TestEnvironment

import scala.concurrent.duration._

class KafkaImporterActorSpec extends TestKit(ActorSystem("MyTestSystem")) with TestEnvironment {
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher

override def afterAll() = TestKit.shutdownActorSystem(system)

"A csv line in a file " when {
val csv = "1,\"alexandre eleuterio santos lourenco 1\",\"43456754356\",36,\"Single\",\"+5511234433443\",\"21/06/1982\",\"Brazil\",\"São Paulo\",\"São Paulo\",\"false st\",3134,\"neigh 1\""

" should convert to Account case class " in {

val probe = TestProbe()

Source.single(csv)
.via(KafkaImporterActor.flow)
.to(Sink.actorRef(probe.ref, "completed"))
.run()

probe.expectMsg(2.seconds, Account(1, "alexandre eleuterio santos lourenco 1",
"43456754356", 36, "Single", "+5511234433443", "21/06/1982",
"Brazil", "São Paulo", "São Paulo", "false st", 3134, "neigh 1"))
probe.expectMsg(2.seconds, "completed")

}

}

}

On this code, we create a probe that waits for a message containing the Account converted from the flow and a “completed” message, that the sink will emit at the end. The 2 seconds timeout is to control how much time the probe will wait for a message to come.

Now, let’s code our second test. Before writing the test itself, let’s make a little refactoring on KafkaExporterActor, by exposing a part of the stream to the spec. This way we will test all our custom code:


package com.alexandreesl.actor

import akka.actor.{Actor, ActorLogging, ActorSystem, PoisonPill, Props}
import akka.kafka.{ConsumerSettings, Subscriptions}
import akka.kafka.scaladsl.Consumer
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Flow, Sink}
import com.alexandreesl.actor.KafkaExporterActor.Start
import com.alexandreesl.graph.AccountWriterGraphStage
import com.alexandreesl.graph.GraphMessages.{Account, InputMessage}
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import spray.json._
import com.alexandreesl.json.JsonParsing._
import com.typesafe.scalalogging.Logger
import org.slf4j.LoggerFactory

import scala.concurrent.{ExecutionContextExecutor, Future}
import scala.util.{Failure, Success}

class KafkaExporterActor extends Actor with ActorLogging {

implicit val actorSystem: ActorSystem = context.system
implicit val materializer: ActorMaterializer = ActorMaterializer()
implicit val dispatcher: ExecutionContextExecutor = context.system.dispatcher
private val configConsumer = actorSystem.settings.config.getConfig("akka.kafka.consumer")
private val consumerSettings =
ConsumerSettings(configConsumer, new StringDeserializer, new StringDeserializer)
.withBootstrapServers("localhost:9092")
.withGroupId("group1")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

override def preStart(): Unit = {
self ! Start
}

override def receive: Receive = {
case Start =>
val done = Consumer
.committableSource(consumerSettings, Subscriptions.topics("accounts"))
.mapAsync(10)(msg =>
Future.successful(InputMessage(msg.record.value.parseJson.convertTo[Account], msg.committableOffset))
)
.via(KafkaExporterActor.flow)
.runWith(Sink.ignore)
done onComplete {
case Success(_) =>
log.info("I completed successfully, I am so happy :)")
case Failure(ex) =>
log.error(ex, "I received a error! Goodbye cruel world!")
self ! PoisonPill
}

}

}

object KafkaExporterActor {

private val logger = Logger(LoggerFactory.getLogger("KafkaExporterActor"))

def flow()(implicit actorSystem: ActorSystem,
materializer: ActorMaterializer) =
Flow[InputMessage].via(AccountWriterGraphStage.graph)
.mapAsync(10) { tuple =>
val acc = tuple._1
logger.info(s"persisted Account: $acc")
tuple._2.commitScaladsl()
}

val name = "Kafka-Exporter-actor"

def props = Props(new KafkaExporterActor)

case object Start

}

Finally, let’s code our test:


package com.alexandreesl.test.exporter

import akka.Done
import akka.actor.ActorSystem
import akka.kafka.ConsumerMessage.CommittableOffset
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
import akka.testkit.{TestKit, TestProbe}
import com.alexandreesl.actor.KafkaExporterActor
import com.alexandreesl.graph.GraphMessages.{Account, InputMessage}
import com.alexandreesl.test.TestEnvironment
import org.mockito.Mockito.{times, verify, when}
import org.scalatest.mockito.MockitoSugar.mock

import scala.concurrent.Future
import scala.concurrent.duration._

class KafkaExporterActorSpec extends TestKit(ActorSystem("MyTestSystem")) with TestEnvironment {
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher

override def afterAll() = TestKit.shutdownActorSystem(system)

"A Account object " when {

val offset = mock[CommittableOffset]
when(offset commitScaladsl) thenReturn Future.successful(Done)
val account = Account(1, "alexandre eleuterio santos lourenco 1",
"43456754356", 36, "Single", "+5511234433443", "21/06/1982",
"Brazil", "São Paulo", "São Paulo", "false st", 3134, "neigh 1")
val inputMessage = InputMessage(account, offset)

" should pass through the graph stage " in {

val probe = TestProbe()
Source.single(inputMessage)
.via(KafkaExporterActor.flow)
.to(Sink.actorRef(probe.ref, "completed"))
.run()

probe.expectMsg(2.seconds, Done)
probe.expectMsg(2.seconds, "completed")
verify(offset, times(1)) commitScaladsl

}

}

}

On this code, again we wait for the probe to receive messages. In this case, we first receive the Done object from Kafka commit – in this case, we create a mock object in order to allow us to run tests without Kafka – and next receive our good old “completed” message. Finally, we test if our mock was called, to assure that the flow is committing messages back to Kafka after processing.

Of course, this was just a tiny taste of what can be done with Akka’s Testkit. The probe we used just for sinks can also be used for sources as well and just like we test streams, it is also possible to test actors communicating with each other in an actor solution, for example.

In the references section, it is possible to get links to all documentation supplied to this and other subjects observed on this article.

Going beyond

Of course, this brief article can’t possibly talk about every detail of Akka and Akka Streams. From subjects not talked here we can spotlight a few:

  • Akka FSM (Finite State Machine): It allows us to implement state machines on actors solutions;
  • Akka HTTP: Allows us to call and expose HTTP endpoints;
  • Akka persistence: Allows us to implement a persistence layer on messages flowing through Akka, in order to implement better recoverability in case of failures;
  • Akka Event Bus: A built-in publisher-subscriber event bus inside Akka, Allows us to implement broadcasting of messages inside actor solutions.

In the references section, it is possible to find links for this and more!

Thanks

Special thanks to Iago David Santos (LinkedIn), which revised this article and pointed some things. Thanks, Iago!

Conclusion

And that concludes our article. With a great toolkit and sturdiness, Akka Streams is a great tool to be considered when coding integrations, APIs and more. Thank you for following me again on this article, until next time!

Continue reading

AWS Lambda: building a serverless back-end on Amazon

Standard

Hello, dear readers! welcome to my blog. On this post, we will learn about AWS Lambda, a serverless architectural solution that enables us to quickly deploy serverless back-end infrastructures. But why using this service, instead of good old EC2s?Let’s find it out!

Motivation behind AWS Lambda

Alongside the benefits of developing a back-end using the serverless paradigm – which can be learned on more detail on this other post of mine – another good point on using AWS is pricing.

When deploying your application with a EC2, be a on-demand, spot or reserved one, we are charged by hour. This is true even if our application is not called at all during that hour, resulting on wasted resources and money.

With AWS lambda, Amazon charge us by processing time, as such, it only charges us the time spent on lambdas execution. This results on a much cleaner architecture, where less resources and money are spent. This post details the case on more detail.

AWS Lambda development is based on functions. When developing a lambda, we develop a function that can run as a REST endpoint – served by Amazon API Gateway – or a event processing function, running on events such as a file been uploaded to a S3 bucket.

Limitations

However, not all is simple on this service. When developing with AWS Lambda, two things must be kept in mind: cold starts and resource restrictions.

Cold starts consist of the first time a lambda is called, or after some time is passed and the server – behind the scenes, obviously there are servers that runs the functions, but this is hidden from the user – used to run the lambda is already down due to inactivity. Amazon has algorithms that make the server be up and serving as long as there is a consistent frequency of client calls, but off course, from time to time, there will be idle times.

When a cold start is made, this causes the requests to have a more slow response, since it will wait for a server to be up and running to run the function. This can be worsen if clients have low timeout configurations, resulting on requests failing. This should be taking on account when developing lambdas that act as APIs.

Another important aspect to take note are resource restrictions. Been designed to be used for small functions (“microservices”), lambdas have several limitations, such as amount of memory, disk and cpu. This limits can be increased, but only by a small amount. This link on AWS docs details more about the limits.

One important limit is the running time of the lambda itself. A AWS Lambda can run at most 5 minutes. This is a important limit to understand the nature of what lambdas must be in nature: simple functions, that must not run by long periods of time.

If any of this limits are reached, the lambda will fail his execution.

Lab

For this lab, we will use a framework called Serverless. Serverless is a framework that automates for us some tasks that are a little boring to do if developing with AWS Lambda by hand, such as creating a zip file with all our sources to be uploaded to S3 and creating/configuring all AWS resources. Serverless uses CloudFormation under the hood, managing resource creation and updates for us. For programming language, we will use Python 3.6.

To install Serverless, please follow this guide.

With Serverless installed, let’s begin our lab! First, let’s create a project, by running:

serverless create --template aws-python3 --path MyAmazingAWSLambdaService 

This command will create a new Serverless project, using a initial template for our first Python lambda. Let’s open the project – I will be using PyCharm, but any IDE or editor of choice will suffice – and see what the framework created for us.

Project structure

Serverless created a simple project structure, consisting of a serverless YAML file and a Python script. It is on the YAML that we declare our functions, the cloud provider, IAM permissions, resources to be created etc.

The file created by the command is as follows:

# Welcome to Serverless!
#
# This file is the main config file for your service.
# It's very minimal at this point and uses default values.
# You can always add more config options for more control.
# We've included some commented out config examples here.
# Just uncomment any of them to get that config option.
#
# For full config options, check the docs:
#    docs.serverless.com
#
# Happy Coding!

service: MyAmazingAWSLambdaService

# You can pin your service to only deploy with a specific Serverless version
# Check out our docs for more details
# frameworkVersion: "=X.X.X"

provider:
  name: aws
  runtime: python3.6

# you can overwrite defaults here
#  stage: dev
#  region: us-east-1

# you can add statements to the Lambda function's IAM Role here
#  iamRoleStatements:
#    - Effect: "Allow"
#      Action:
#        - "s3:ListBucket"
#      Resource: { "Fn::Join" : ["", ["arn:aws:s3:::", { "Ref" : "ServerlessDeploymentBucket" } ] ]  }
#    - Effect: "Allow"
#      Action:
#        - "s3:PutObject"
#      Resource:
#        Fn::Join:
#          - ""
#          - - "arn:aws:s3:::"
#            - "Ref" : "ServerlessDeploymentBucket"
#            - "/*"

# you can define service wide environment variables here
#  environment:
#    variable1: value1

# you can add packaging information here
#package:
#  include:
#    - include-me.py
#    - include-me-dir/**
#  exclude:
#    - exclude-me.py
#    - exclude-me-dir/**

functions:
  hello:
    handler: handler.hello

#    The following are a few example events you can configure
#    NOTE: Please make sure to change your handler code to work with those events
#    Check the event documentation for details
#    events:
#      - http:
#          path: users/create
#          method: get
#      - s3: ${env:BUCKET}
#      - schedule: rate(10 minutes)
#      - sns: greeter-topic
#      - stream: arn:aws:dynamodb:region:XXXXXX:table/foo/stream/1970-01-01T00:00:00.000
#      - alexaSkill
#      - alexaSmartHome: amzn1.ask.skill.xx-xx-xx-xx
#      - iot:
#          sql: "SELECT * FROM 'some_topic'"
#      - cloudwatchEvent:
#          event:
#            source:
#              - "aws.ec2"
#            detail-type:
#              - "EC2 Instance State-change Notification"
#            detail:
#              state:
#                - pending
#      - cloudwatchLog: '/aws/lambda/hello'
#      - cognitoUserPool:
#          pool: MyUserPool
#          trigger: PreSignUp

#    Define function environment variables here
#    environment:
#      variable2: value2

# you can add CloudFormation resource templates here
#resources:
#  Resources:
#    NewResource:
#      Type: AWS::S3::Bucket
#      Properties:
#        BucketName: my-new-bucket
#  Outputs:
#     NewOutput:
#       Description: "Description for the output"
#       Value: "Some output value"

For now, let’s just remove all comments in order to have a cleaner file. The other file is a Python script, which have our first function. Let’s see it:

import json


def hello(event, context):
    body = {
        "message": "Go Serverless v1.0! Your function executed successfully!",
        "input": event
    }

    response = {
        "statusCode": 200,
        "body": json.dumps(body)
    }

    return response

    # Use this code if you don't use the http event with the LAMBDA-PROXY
    # integration
    """
    return {
        "message": "Go Serverless v1.0! Your function executed successfully!",
        "event": event
    }
    """

As we can see, is a pretty simple script. All we have to do is create a function that receives 2 parameters, context and event. Event is used to pass the input data on which the lambda will work. Context is used by AWS to pass information about the environment on which the lambda is running. For example, if we wanted to know how much time is left before our running time limit is reached, we could do the following call:

print("Time remaining (MS):", context.get_remaining_time_in_millis())

The dictionary returned by the function is the standard response for a lambda that acts as a API, proxied by AWS API Gateway.

For now, let’s leave the script as it is, as we will add more functions to the project. Let’s begin by adding the Dynamodb table we will use on our lab, alongside other configurations.

Creating Dynamodb table

In order to create the table, we modify our YAML as follows:

service: MyAmazingAWSLambdaService

provider:
  name: aws
  runtime: python3.6
  stage: ${opt:stage, 'dev'}
  region: ${opt:region, 'us-east-1'}
  profile: personal

functions:
  hello:
    handler: handler.hello

resources:
  Resources:
    product:
      Type: AWS::DynamoDB::Table
      Properties:
        TableName: product
        AttributeDefinitions:
          - AttributeName: id
            AttributeType: S
        KeySchema:
          - AttributeName: id
            KeyType: HASH
        ProvisionedThroughput:
          ReadCapacityUnits: 1
          WriteCapacityUnits: 1

We added a resources section, where we defined a dynamodb table called product and defined a atribute called id to be key in table’s items. We also defined the stage and region to be collected as command-line options – stages are used as application environments, such as QA and Production. Finally, we defined that we want the deploys to use a IAM profile called personal. This is useful when having several accounts on the same machine.

Let’s deploy the stack by entering:

serverless deploy --stage prod

After some time, we will see that our stack was successfully deployed, as we can see on the console:

Serverless: Packaging service...
Serverless: Excluding development dependencies...
Serverless: Creating Stack...
Serverless: Checking Stack create progress...
.....
Serverless: Stack create finished...
Serverless: Uploading CloudFormation file to S3...
Serverless: Uploading artifacts...
Serverless: Uploading service .zip file to S3 (3.38 KB)...
Serverless: Validating template...
Serverless: Updating Stack...
Serverless: Checking Stack update progress...
..................
Serverless: Stack update finished...
Service Information
service: MyAmazingAWSLambdaService
stage: prod
region: us-east-1
stack: MyAmazingAWSLambdaService-prod
api keys:
 None
endpoints:
 None
functions:
 hello: MyAmazingAWSLambdaService-prod-hello

During the deployment, Serverless generated a zip file with all our resources, uploaded to a bucket, created a CloudFormation stack and deployed our lambda with it, alongside the necessary permissions to run. It also created our dynamodb table, as required.

Now that we have our stack and table, let’s begin by creating a group of lambdas to implement CRUD operations on our table.

Creating CRUD lambdas

Now, let’s create our lambdas. First, let’s create a Python class to encapsulate the operations on our dynamodb table:

# -*- coding: UTF-8 -*-

import boto3

dynamodb = boto3.resource('dynamodb')


class DynamoDbHelper:

    def __init__(self, **kwargs):
        self.table = dynamodb.Table(kwargs.get('table', """missing"""))

    def save(self, entity):
        """
           save entity from DYNAMODB_self.table
        """
        saved = self.table.put_item(Item=entity)
        print('Saving result: ({})'.format(saved))

    def get(self, entity_id):
        """
         get entity from DYNAMODB_self.table
      """
        entity = self.table.get_item(Key={'id': entity_id})

        if 'Item' in entity:
            return entity['Item']
        else:
            return None

    def delete(self, entity_id):
        """
           delete entity from DYNAMODB_self.table
        """
        deleted = self.table.delete_item(Key={'id': entity_id})
        print('Deleting result: ({})'.format(deleted))

Then, we create another script to make a converter. This will be used to read the input data from lambda calls to dynamodb’s data:

# -*- coding: UTF-8 -*-
import json


class ProductConverter:

    def convert(self, event):
        """
           convert entity from input to dictionary to be saved on dynamodb
        """
        if isinstance(event['body'], dict):
            data = event['body']
        else:
            data = json.loads(event['body'])
        return {
            'id': data['id'],
            'name': data.get('name', ''),
            'description': data.get('description', ''),
            'price': data.get('price', '')
        }

Next, we change the script created by Serverless, creating the lambda handlers:

# -*- coding: UTF-8 -*-

import json

from helpers.DynamoDbHelper import DynamoDbHelper
from helpers.ProductConverter import ProductConverter


def get_path_parameters(event, name):
    return event['pathParameters'][name]


dynamodb_helper = DynamoDbHelper(table='product')
product_converter = ProductConverter()


def save(event, context):
    product = product_converter.convert(event)

    if 'id' not in event['body']:
        return {
            "statusCode": 422,
            "body": json.dumps({'message': 'Product ID is required!'})
        }

    try:
        dynamodb_helper.save(product)
        response = {
            "statusCode": 200,
            "body": json.dumps({'message': 'saved successfully'})
        }
    except Exception as e:
        return {
            "statusCode": 500,
            "body": json.dumps({'message': str(e)})
        }

    return response


def get(event, context):
    product = dynamodb_helper.get(get_path_parameters(event, 'product_id'))

    if product is not None:
        response = {
            "statusCode": 200,
            "body": json.dumps(product)
        }
    else:
        response = {
            "statusCode": 404,
            "body": json.dumps({'message': 'not found'})
        }
    return response


def delete(event, context):
    try:
        dynamodb_helper.delete(get_path_parameters(event, 'product_id'))
        response = {
            "statusCode": 200,
            "body": json.dumps({'message': 'deleted successfully'})
        }
    except Exception as e:
        return {
            "statusCode": 500,
            "body": json.dumps({'message': str(e)})
        }

    return response

Finally, we change serverless.yml, defining our lambdas to be deployed on AWS:

service: MyAmazingAWSLambdaService

provider:
  name: aws
  runtime: python3.6
  stage: ${opt:stage, 'dev'}
  region: ${opt:region, 'us-east-1'}
  profile: personal

  iamRoleStatements: # permissions for all of your functions can be set here
      - Effect: Allow
        Action:
          - dynamodb:*
        Resource: "arn:aws:dynamodb:us-east-1:<your_account_id>:table/product"

functions:
  save_product:
    handler: handler.save
    events:
      - http:
          path: product/save
          method: post
  update_product:
    handler: handler.save
    events:
      - http:
          path: product/update
          method: patch
  get_product:
    handler: handler.get
    events:
      - http:
          path: product/{product_id}
          method: get
  delete_product:
    handler: handler.delete
    events:
      - http:
          path: product/{product_id}
          method: delete

resources:
  Resources:
    product:
      Type: AWS::DynamoDB::Table
      Properties:
        TableName: product
        AttributeDefinitions:
          - AttributeName: id
            AttributeType: S
        KeySchema:
          - AttributeName: id
            KeyType: HASH
        ProvisionedThroughput:
          ReadCapacityUnits: 1
          WriteCapacityUnits: 1

After running the deployment again, we can see the functions were deployed, as seen on terminal:

Serverless: Packaging service...
Serverless: Excluding development dependencies...
Serverless: Uploading CloudFormation file to S3...
Serverless: Uploading artifacts...
Serverless: Uploading service .zip file to S3 (4.92 KB)...
Serverless: Validating template...
Serverless: Updating Stack...
Serverless: Checking Stack update progress...
........................................................................................
Serverless: Stack update finished...
Service Information
service: MyAmazingAWSLambdaService
stage: prod
region: us-east-1
stack: MyAmazingAWSLambdaService-prod
api keys:
 None
endpoints:
 POST - https://xxxxxxxxxxxx.execute-api.us-east-1.amazonaws.com/prod/product/save
 PATCH - https://xxxxxxxxxxxx.execute-api.us-east-1.amazonaws.com/prod/product/update
 GET - https://xxxxxxxxxxxx.execute-api.us-east-1.amazonaws.com/prod/product/{product_id}
 DELETE - https://xxxxxxxxxxxx.execute-api.us-east-1.amazonaws.com/prod/product/{product_id}
functions:
 save_product: MyAmazingAWSLambdaService-prod-save_product
 update_product: MyAmazingAWSLambdaService-prod-update_product
 get_product: MyAmazingAWSLambdaService-prod-get_product
 delete_product: MyAmazingAWSLambdaService-prod-delete_product

PS: the rest api id was intentionally masked by me for security reasons.

On terminal, we can also see the URLs to call our lambdas. On AWS lambda, the URLs follows this pattern:

https://{restapi_id}.execute-api.{region}.amazonaws.com/{stage_name}/

Later on our lab we will learn how to test our lambdas. For now, let’s learn how to create our last lambda, the one that will read from S3 events.

Creating S3 lambda to bulk create to Dynamodb

Now, let’s implement a lambda that will bulk process product inserts. This lambda will use a csv file as parameter, receiving chunks of data. The lambda will process the data as a stream, using the streaming interface from boto3 behind the hood, saving products as it reads them. To facilitate, we will use Pandas Python library to read the csv . The lambda code is as follows:

# -*- coding: UTF-8 -*-

import boto3
import pandas as pd
import io

from helpers.DynamoDbHelper import DynamoDbHelper

s3 = boto3.client('s3')
dynamodb_helper = DynamoDbHelper(table='product')


def bulk_insert(event, context):
    for record in event['Records']:
        bucket = record['s3']['bucket']['name']
        key = record['s3']['object']['key']
        print('Bucket: ' + bucket)
        print('Key: ' + key)
        obj = s3.get_object(Bucket=bucket, Key=key)
        products = pd.read_csv(io.BytesIO(obj['Body'].read()))
        products_records = products.fillna('').to_dict('records')
        for product in products_records:
            record = {
                'id': str(product['id']),
                'name': product.get('name', ''),
                'description': product.get('description', ''),
                'price': str(product.get('price', '0'))
            }
            dynamodb_helper.save(record)

You may notice that we iterate over a list to read bucket files. That’s because AWS can bulk S3 changes to call our lambda fewer times, so we need to read all references that are sent to us.

Before moving on to serverless.yml changes, we need to install a plugin called serverless-python-requirements. This plugin is responsible for adding our pip requirements to lambda packaging.

First, let’s create a file called package.json, with the following. This file must be created by the npm init command:

{
  "name": "myamazingawslambdaservice",
  "version": "1.0.0",
  "description": "",
  "main": "index.js",
  "author": "",
  "license": "ISC"
}

Next, let’s install the plugin, using npm:

npm install --save serverless-python-requirements

And finally, add the plugin to serverless.yml. Here it is all the changes needed for the new lambda:

service: MyAmazingAWSLambdaService

plugins:
  - serverless-python-requirements

custom:
  pythonRequirements:
    dockerizePip: non-linux

provider:
  name: aws
  runtime: python3.6
  stage: ${opt:stage, 'dev'}
  region: ${opt:region, 'us-east-1'}
  profile: personal

  iamRoleStatements: # permissions for all of your functions can be set here
      - Effect: Allow
        Action:
          - dynamodb:*
        Resource: "arn:aws:dynamodb:us-east-1:<your_account_id>:table/product"
      - Effect: Allow
        Action:
          - s3:GetObject
        Resource: "arn:aws:s3:::myamazingbuckets3/*"

functions:
  bulk_insert:
    handler: bulk_handler.bulk_insert
    events:
      - s3:
          bucket: myamazingbuckets3
          event: s3:ObjectCreated:*
  save_product:
    handler: handler.save
    events:
      - http:
          path: product/save
          method: post
  update_product:
    handler: handler.save
    events:
      - http:
          path: product/update
          method: patch
  get_product:
    handler: handler.get
    events:
      - http:
          path: product/{product_id}
          method: get
  delete_product:
    handler: handler.delete
    events:
      - http:
          path: product/{product_id}
          method: delete

resources:
  Resources:
    product:
      Type: AWS::DynamoDB::Table
      Properties:
        TableName: product
        AttributeDefinitions:
          - AttributeName: id
            AttributeType: S
        KeySchema:
          - AttributeName: id
            KeyType: HASH
        ProvisionedThroughput:
          ReadCapacityUnits: 1
          WriteCapacityUnits: 1

PS: because of the plugin, is now needed to have Docker running on deployment. This is because the plugin uses Docker to compile Python packages that requires OS binaries to be installed. The first time you run it, you may notice the process ‘hangs’ at docker step. This is because is downloading the docker image, which is quite sizeable (about 600Mb). 

All we had to do is add IAM permissions to the bucket and define the lambda, adding a event to fire at object creations on the bucket. It is not needed to add the bucket to the resource creation section, as Serverless will already create the bucket as we defined that will be used by a lambda on the project.

Now, let’s test the bulk insert. After redeploying, let’s create a csv like following:

id,name,description,price
1,product 1,description 1,125.23
2,product 2,description 2,133.43
3,product 3,description 3,142.24

Let’s save the file and upload to our bucket. After some instants, we will see that our DynamoDB table will be populated with the data, as we can see bellow:

Screen Shot 2018-05-07 at 23.45.56

DynamoDB table populated with csv bulk data

Now that we have all our lambdas developed, let’s learn how to test locally and invoke our lambdas from different locations.

Testing local

Let’s test locally our insert product lambda. First, we need to create a json file to hold our test data. Let’s call it example_json.json :

{
  "body": {
    "id": "123",
    "name": "product 1",
    "description": "description 1",
    "price": "123.24"
  }
}

Next, let’s run locally by using Serverless, with the following command:

serverless invoke local --function save_product --path example_json.json

PS: Before running, don’t forget to set the AWS_PROFILE env variable to your profile.

After some instants, we will see the terminal outputting as follows:

Serverless: Invoke invoke:local
Saving result: ({'ResponseMetadata': {'RequestId': '0CDUDLGJEU7UBLLSCQ82SF0B53VV4KQNSO5AEMVJF66Q9ASUAAJG', 'HTTPStatusCode': 200, 'HTTPHeaders': {'server': 'Server', 'date': 'Mon, 14 May 2018 15:13:22 GMT', 'content-type': 'application/x-amz-json-1.0', 'content-length': '2', 'connection': 'keep-alive', 'x-amzn-requestid': '0CDUDLGJEU7UBLLSCQ82SF0B53VV4KQNSO5AEMVJF66Q9ASUAAJG', 'x-amz-crc32': '2745614147'}, 'RetryAttempts': 0}})

{
 "statusCode": 200,
 "body": "{\"message\": \"saved successfully\"}"
}

Behind the hood, Serverless is creating a emulated environment as close as it gets to AWS lambda environment, using the permissions described on the YAML to emulate the permissions set for the function.

It is important to notice that the framework doesn’t guarantee 100% accuracy with a real lambda environment, so more testing in a separate stage – QA, for example – is still necessary before going to production.

Testing on AWS API gateway

To test with AWS API gateway interface, is pretty simple: simply navigate to API Gateway inside Amazon console, then select prod-MyAmazingAWSLambdaService API, navigate to the POST endpoint for example, and select the TEST link.

Provide a JSON like the one we used on our local test – but without the body atribute, moving the attributes to the root – and run it. The API will run successfully, as we can see on the picture bellow:

Screen Shot 2018-05-14 at 16.12.59

AWS Lambda running on API Gateway

Testing as a consumer

Finally, let’s test like a consumer would call our API. For that, we will use curl. We open a terminal and run:

curl -d '{ "id": "123", "name": "product 1", "description": "description 1", "price": "123.24" }' -H "Content-Type: application/json" -X POST https://<your_rest_api_id>.execute-api.us-east-1.amazonaws.com/prod/product/save

This will produce the following output:

{"message": "saved successfully"}%  

Proving our API is successfully deployed for consuming!

Adding security (API keys)

In our previous example, our API is exposed without security to the open world. Of course, on a real scenario, this is not good. It is possible to integrate lambda with several security solutions such as AWS Cognito, to improve security. In our lab, we will use basic API token authentication provided by AWS API gateway.

Let’s change our YAML as follows:

...omitted....
functions:
  bulk_insert:
    handler: bulk_handler.bulk_insert
    events:
      - s3:
          bucket: myamazingbuckets3
          event: s3:ObjectCreated:*
  save_product:
    handler: handler.save
    events:
      - http:
          path: product/save
          method: post
          private: true
  update_product:
    handler: handler.save
    events:
      - http:
          path: product/update
          method: patch
          private: true
  get_product:
    handler: handler.get
    events:
      - http:
          path: product/{product_id}
          method: get
          private: true
  delete_product:
    handler: handler.delete
    events:
      - http:
          path: product/{product_id}
          method: delete
          private: true
...omitted....

Now, after redeploying, if we try to make our previous curl, we will receive the following error:

{"message":"Forbidden"}% 

So, how do we call it now? First, we need to enter API Gateway and generate a API key, as bellow:

Screen Shot 2018-05-14 at 16.35.10

API gateway keys

Secondly, we need to create a usage plan, to associate the key with our API. We do this by following the steps bellow inside API gateway interface:

usage1

usage2

usage3

With our key in hand, let’s try again our curl call, adding the header to pass our key:

curl -d '{ "id": "123", "name": "product 1", "description": "description 1", "price": "123.24" }' -H "Content-Type: application/json" -H "x-api-key: <your_API_key>" -X POST https://udw8q0957h.execute-api.us-east-1.amazonaws.com/prod/product/save

After the call, we will receive again the saved successfully response, proving our configuration was successful.

Lambda Logs (CloudWatch)

One last thing we will talk about is logging on AWS Lambda. The reader may noticed the use of Python’s print function in our code. On AWS Lambda, the prints done by Python are collected and organised inside another AWS service, called CloudWatch. We can access CloudWatch on the Amazon Console, as follows:

Screen Shot 2018-05-14 at 17.05.56

CloudWatch logs list

On the list above, we have each function separated as a link. If we drill down inside one of the links, we will see another link list of each execution made by that function. The print bellow is a example of one of our lambda’s executions:

Screen Shot 2018-05-14 at 17.06.11

Lambda execution log

Conclusion

And so we concluded our tour through AWS Lambda. With a simple and intuitive approach, it is a good option to deploy applications back-ends following the microservices paradigm. Thank you for following me on this post, until next time.

sources used on this lab

Kafka: implementing stream architectures

Standard

Hi, dear readers! Welcome to my blog. On this post, we will learn about Apache Kafka, a distributed messaging system which is being used on lots of streaming solutions.

This article will be divided on several sections, allowing the reader to not only understand the concepts behind but also a lab to exercise the concepts in practice. So, without further delay, let’s begin!

Kafka architecture

Overview

Kafka is a distributed messaging system created by Linkedin. On Kafka, we have stream data structures called topics, which can be consumed by several clients, organized on consumer groups. This topics are stored on a Kafka cluster, where which node is called a broker.

Kafka’s ecosystem also need a Zookeeper cluster in order to run. Zookeeper is a key-value  storage solution, which on Kafka’s context is used to store metadata. Several operations such as topic creation are done on Zookeeper, instead of in the brokers.

The main difference from Kafka to other messaging solutions that utilizes classic topic structures is that on Kafka we have offsets. Offsets act like cursors, pointing to the last location a consumer and/or producer has reached consuming/producing messages for a given topic on a given partition.

Partitions on Kafka are like shards on some NOSQL databases: they divide the data, organizing by partition and/or message keys (more about this when we talk about ingesting data on Kafka).

So, on Kafka, we have producers ingesting data, controlled by producer offsets, while we have consumers consuming data from topics, also with their offsets. The main advantages on this approach are:

  • Data can be read and replayed by consumers, since there’s no link between consumed data and produced data. This also allows to implement solutions with back-pressure, that is, solutions where consumers can poll data according to their processing limits;
  • Data can be retained for more time, since on streams, different from classic topics, data is not removed from the structure after been sent to all consumers. It is also possible to compress the data on the stream, allowing Kafka clusters to retain lots of data on their streams;

Kafka’s offsets explained

The following  diagram illustrates a Kafka topic on the run:

kafka-diagram-1

Kafka Topic producer/consumer offsets

On the diagram, we can see a topic with 2 partitions. Each little rectangle represents a offset pointing to a location on the topic. On the producer side, we can see 2 offsets pointing to the topic’s head, showing our producer ingesting data on topic.

On Kafka, each partition is assigned to a broker and each broker is responsible for delivering production/consumption for that partition. The broker responsible for this is called a partition leader on Kafka.

How many partitions are needed for a topic? The main factor for this point is the desired throughput for production/consumption. Several factors are key for the throughput, such as the producer ack type, number of replicas etc.

Too much partitions are also something to take care when planning a new topic, as too much partitions can hinder availability and end-to-end latency, alongside memory consumption on the client side – remember that both producer and consumer can operate with several partitions at the same time. This article is a excellent reference for this matter.

On the consumer side, we see some interesting features. Each consumer has his own offset, consuming data from just one partition. This is a important concept on Kafka: each consumer is responsible for consuming one partition on Kafka and each consumer group consumes the data individually, that is, there is no relation between the consumption of one group and the others.

We can see this on the diagram, where the offsets from one group are on different positions from the others. All Kafka offsets are stored on a internal topic inside Kafka’s cluster, both producer and consumer offsets. Offsets are committed (updated) on the cluster using auto-commit or by committing manually on code, analogous as relational database commits. We will see more about this when coding our own consumer.

What happens when there’s more partitions than consumers? When this happens, Kafka’s delivers data from more then one partition to the same consumer, as we can see bellow. It is important to note that it is possible to increase the number of consumers on a group, avoiding this situation altogether:

kafka-diagram-2

The same consumer consuming from more then one partition on Kafka

One important thing to notice is what happens on the opposite situation, when there is less partitions than consumers configured:

kafka-diagram-3

Idle consumers on Kafka

As we can see, on this case, we end up with idle consumers, that won’t process any messages until a new partition is created. This is important to keep in mind when setting a new consumer on Kafka, as increasing too much the number of consumers will just end up with idle resources not been used at all.

One of the key features on Kafka is that it guarantees message ordering. This ordering is done on the messages within the same partition, but not on the whole topic. That means that when we consume data from Kafka with our consumers, the data across the partitions is read on parallel, but data from the same partition is read with a single thread, guaranteeing the order.

IMPORTANT: As stated on Kafka’s documentation, it is not recommended to process data coming from Kafka in parallel, as it will scramble the messages order. The recommended way to scale the solution is by adding more partitions, as it will add more threads to process data on parallel, without losing the ordering inside the partitions.

Partitions on Kafka are always written on a single mount point on disk. They are written on files, that are splitted when they reach a certain amount of data, or a certain time period – 1GB or 1 week of data respectively by default, whatever  it comes first – that are called log segments. The more recent log segment, that represents the data ingested up to the head of the stream is called active segment and it is never deleted. The older segments are removed from disk according to the retention policies configured.

Replicas

In order to guarantee data availability, Kafka works with replicas. When creating a topic, we define how much replicas we want  to have for each partition on the topic. If we configure we want 3 replicas, for example, that means that for a topic with 2 partitions, we will have 6 replicas from that topic, plus the 2 active partitions.

Kafka replicates the data just like we would do by hand: brokers that are responsible for maintaining the replicas – called partition followers – will subscribe for the topic and keep reading data from the partition leader and writing to their replicas. Followers that have data up-to-date with the leader are called In-Synch replicas. Replicas can become out of synch for example due to network issues, that causes the synching process to be slow and lag behind the leader to a unacceptable point.

Rebalance

When a rebalance occurs, for example if a broker is down, all the writing/reading from partitions that broker was a partition leader are ceased. The cluster elects a new partition leader, from one of the IS (In-Synch) replicas, and so the writing/reading is resumed.

During this period, applications that were using the old leader to publish data will receive a specific error when trying to use the partition, indicating that a rebalance is occurring (unless we configure the producer to just deliver the messages without any acknowledgment, which we will see in more detail on the next sections). On the consumer side, it is possible to implement a rebalance listener, which can clean up the work for when the partition is available again.

It is important to notice that, as a broker is down, it could be possible that some messages won’t be committed, causing messages to be processed twice when the partition processing is resumed.

What happens if a broker is down and no IS replicas are available? That depends on what we configured on the cluster. If unclean election is disabled, then all processing is suspended on that partition, until the broker that was down comes back again. If unclean election is enabled, then one of the brokers that were a follower is elected as leader.

Off course, each option has his advantages: without unclean election, we can lose the partition in case we can’t restart the lost broker, but with unclean election, we risk losing some messages, since their offsets will be overwritten by the new leader, when new data arrives at the partition.

If the old leader comes back again, it will resume the partition’s processing as a new follower, and it will not insert the lost messages in case of a unclean election.

Kafka’s producer explained

On this section, we will learn the internals that compose a Kafka producer, responsible for sending messages to Kafka topics. When working with the producer, we create ProducerRecords, that we send to Kafka by using the producer.

Producer architecture

Kafka producer internal structure is divided as we can see on the following diagram:

kafka-producer

Kafka Producer internal details

As we can see, there is a lot going on when producing messages to Kafka. First, as said before, we create a ProducerRecord, that consist of 3 sections:

  • Partition Key: The partition key is a optional field. If it is passed, it indicates the partition that it must be sent the message too;
  • Message Key: The message key is a required field. If no partition key is passed, the partitioner will use this field to determine on which partition it will send the message. Kafka guarantees that all messages for a same given message key will always be sent to the same partition – as long as the number of partitions on a topic stay the same;
  • Value (payload): The value field is a required field and, as obvious, is the message itself that must be sended;

All the fields from the ProducerRecord must be serialized to byte arrays before sent to Kafka, so that’s exactly what is done by the Serializer at the first step of our sending – we will see later on our lab that we always define a serializer for our keys and  value – , after that, the records are sent to the Partitioner, that determines the partition to send the message.

The Partitioner then send the message to bulk processes, running on different threads, that “stack” the messages until a threshold is reached – a certain number of bytes or a certain time without new messages, whatever  it comes first – and finally, after the threshold is reached, the messages are sent to the Kafka broker.

Let’s keep in mind  that, as we saw before, brokers are elected as partition leaders for partitions on topics, so when sending the messages, they are sent directly to the partition leader’s broker.

Acknowledgment types

Kafka’s producer works with 3 types of acks (acknowledgments) that a message has been successfully sent. The types are:

  • ack=0: The producer just send the message and don’t wait for a confirmation, even from the partition leader. Of course, this is fastest option to deliver messages, but there is also risk of message loss;
  • ack=1: The producer waits for the partition leader to reply that wrote the message before moving on. This option is more safe, however, there is also some degree of risk, since a partition leader can go down just after the acknowledgement without repassing the message to any replica;
  • ack=all: The producer waits for the partition leader and all IS replicas to write before moving on. This option is naturally the safest of all, but there is also the disadvantage of possible performance issues, due to waiting for all network replication to occur before continuing. This aggravates when there is no IS replicas at the moment, as it will hold the production until at least one replica is made;

Which one to use? That depends on the characteristics of the solution we are working with. A ack=0 could be useful on a solution that works with lots of messages that are not critic in case of losses – monitoring events, for example, are short-lived information that could be lost at certain degree – unlike, for example, bank account transactions, where ack=all is a must, since message losses are unacceptable on this kind of application.

Producer configurations

There are several configurations that could be made on the producer. Here we have some of the more basic and interesting ones to know:

  • bootstrap.servers: A list of Kafka brokers for the producer to communicate with. This list is updated automatically when brokers are added/removed from the cluster, but it is advised to set at least 2 brokers, as the producer won’t start if just one broker is set and the broker is down;
  • key.serializer: The serializer that it will be used when transforming the keys to byte arrays. Of course, the serializer class will depend on the keys type been used;
  • value.serializer: The serializer that it will be used to transform the message to a byte array. When using complex types such as Java objects, it is possible to use one of the several out-of-box serializers, or implement your own;
  • acks: This is where we define the acknowledgement type, as we saw previously;
  • batch.size: This is the amount of memory the bulk process will wait to stack it up until reached to send the message batches;
  • linger.ms: The amount of time, in milliseconds, the producer will wait for new messages, before sending the messages it has buffered. Of course, if the batch.size is reached first, then the message batch is sent before reaching this threshold;
  • max.in.flight.requests.per.connection: This parameters defines how many messages the producer will send before waiting for responses from Kafka (if ack is not set as 0, of course). As stated on Kafka’s documentation, this configuration must be set to 1 to guarantee the messages on Kafka will be written at the same order  they are sent by the producer;
  • client.id: This parameter can be set with any string value and identifies the producer on the Kafka cluster. It is used by the cluster to build metrics and logging;
  • compression.type: This parameter define a compression to be used on messages, before they are sent to Kafka. It supports snappy, gzip and lz4 formats. By default, no compression is used;
  • retries: This parameter defines how many times the producer will retry sending a message to a broker, before notifying the application that a error has occurred;
  • retry.backoff.ms: This parameter defines how many milliseconds the producer will wait between the retries. By default, the time is 100ms;

Kafka’s consumer explained

On this section, we will learn the internals that compose a Kafka consumer, responsible for reading messages from Kafka topics.

Consumer architecture

Kafka consumer internal structure is divided as we can see on the following diagram:

kafka-consumer-3

Kafka consumer internal details

When we request a Kafka broker to create a consumer group for one or more topics, the broker creates a Consumer Group Coordinator. Each broker has a group coordinator for the partitions it is the partition leader.

This component is responsible for deciding which consumer will be responsible for consuming which partitions, by the rules we talked about on the offsets section. It is also responsible for checking consumers health, by establishing heartbeat frequencies to be sent at intervals. If a consumer fails to send heartbeats, it is considered unhealthy, so Kafka delegates the partitions assigned to that consumer to another one.

The consumer, on his turn, uses a deserializer to convert the messages from byte arrays to the required types. Like with the producer, we can also use several different types of out-of-box deserializers, as well as creating our own.

IMPORTANT: Kafka consumer must always run on the main thread. If you try to create a consumer and delegate to run on another thread, there’s a check on the consumer that will thrown a error. This is due to Kafka consumer not been thread safe. The recommended way to scale a application that consumes from Kafka is by creating new application instances, each one running his own consumer on the same consumer group.

One important point to take note is that, when a message is delivered to Kafka, it only becomes available to consume after it is properly replicated to all IS replicas for his respective partition. This is important to ensure data availability, but it also means that messages can take a significant amount of time to be delivered for consuming.

Kafka works with the concept of back-pressure. This means that applications are responsible for asking for new chunks of messages to process, allowing clients to process data at their paces.

Commit strategies

kafka works with 3 commit strategies, to know:

  • Auto-commit: On this strategy, messages are marked as committed as soon as they are successfully consumed from the broker. The downside of this approach is that messages that were not processed correctly could be lost due to already been committed;
  • Synchronous manual commit: On this strategy, messages are manually committed synchronously. This is the safest option, but has the downside of hindering the performance, as commits become more slow;
  • Asynchronous manual commit: On this strategy, messages are manually committed asynchronously. This option has better performance then the previous one as commits are done on a separate thread, but there is also some level of risk that messages won’t been committed due to some problem, resulting on messages been processed more then once;

Like when we talked about acknowledgement types, the best commit strategy to be used depends on the characteristics of the solution been implemented.

Consumer configurations

There are several configurations that could be made on the consumer. Here we have some of the more basic and interesting ones to know:

  • fetch.min.bytes: This defines the minimum amount of bytes a consumer wants to receive from a bulk of messages. The consumer will wait for this minimum to be reached, or a time limit to process messages, as defined on other config;
  • max.partition.fetch.bytes: As opposite to the previous config, this defines the maximum size, in bytes, that we want to receive on the chunk of data we asked for Kafka. As previously, if the time limit is reached first, Kafka will sent the messages it have;
  • fetch.max.wait.ms: As we talked on previous configs, this is the property that we define the time limit, on milliseconds, for Kafka to wait for more messages to fetch, before sending what it have to the consumer application;
  • auto.offset.reset: This defines what the consumer will do when first reading from a partition it never readed before or it has a invalid commit offset, for example if a consumer was down for so long that his last committed offset has already been purged from the partition. The default is latest, which means it will start reading from the newest records. The other option is earliest, on that case, the consumer will read all messages from the partition, since the beginning;
  • session.timeout.ms: This property defines the time limit for which a consumer must sent a heartbeat to still be considered healthy. The default is 3 seconds.

IMPORTANT: heartbeats are sent at each polling and/or commits made by the consumer. This means that, on the poll loop, we must be careful with the processing time, as if it passes the session timeout period, Kafka will consider the consumer unhealthy and it will redeliver the messages to another consumer.

Hands-on

Well, that was a lot to cover. Now that we learned Kafka main concepts, let’s begin our hands-on Kafka and learn what we talked in practice!

Set up

Unfortunately, there is no official  Kafka Docker image. So, for our lab, we will use Zookeeper and Kafka images provided by wurstmeister (thanks, man!). At the end, we can see links for his images.

Also at the end of the article, we can find a repository with the sources for this lab. There is also a docker compose stack that could be found there to get a Kafka cluster up and running. This is the stack:

version: '2'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
ports:
- "9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: ${MY_IP}
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_DELETE_TOPIC_ENABLE: "true"

volumes:
- /var/run/docker.sock:/var/run/docker.sock

In order to run a cluster with 3 nodes, we can run the following commands:

export MY_IP=`ip route get 1 | awk '{print $NF;exit}'`
docker-compose up -d --scale kafka=3

To stop it, just run:

docker-compose stop

On our repo’s lab there is also a convenient bash script that set up a 3 node Kafka cluster without the need to enter the commands above every time.

Coding the producer

Now that we have our environment, let’s begin our lab. First, we need to create our topic. To create a topic, we need to use a shell inside one of the brokers, pointing Zookeeper address as a parameter – some operations, such as topic CRUD operations, are done pointing to Zookeeper instead of Kafka. There is plans to move all operations to be done on brokers directly on next releases – alongside other parameters. Assuming we have a terminal with MY_IP environment variable set, this can be done using the following command:

docker exec -t -i kafkalab_kafka_1 /opt/kafka/bin/kafka-topics.sh 
--create --zookeeper ${MY_IP}:2181 --replication-factor 1 
--partitions 2 --topic test

PS: All commands assume the name of the Kafka containers follows docker compose naming standards. If running on the lab repo, it will be created as kafkalab_kafka_1,kafkalab_kafka_2,etc

On the previous command, we created a topic named test with replication factor of 1 and 2 partitions. We can check if the topic was created by running the list topics command, as follows:

docker exec -t -i kafkalab_kafka_1 /opt/kafka/bin/kafka-topics.sh 
--list --zookeeper ${MY_IP}:2181

This will return a list of topics that exist on Zookeeper, on this case, “test”.

Now, let’s create a producer. All code on this lab will be done on Java, using Kafka’s APIs. After creating a Java project, we will code our own producer wrapper. Let’s begin by creating the wrapper itself:

package com.alexandreesl.producer;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class MyProducer {


  private KafkaProducer producer;


  public MyProducer() throws UnknownHostException {

    InetAddress ip = InetAddress.getLocalHost();

    StringBuilder builder = new StringBuilder();
    builder.append(ip.getHostAddress());
    builder.append(":");
    builder.append("");
    builder.append(",");
    builder.append(ip.getHostAddress());
    builder.append(":");
    builder.append("");

    Properties kafkaProps = new Properties();
    kafkaProps.put("bootstrap.servers", builder.toString());
    kafkaProps.put("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
    kafkaProps.put("value.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
    kafkaProps.put("acks", "all");
    producer = new KafkaProducer<String, String>(kafkaProps);

  }


  public void sendMessage(String topic, String key, String message) 
throws Exception {

    ProducerRecord<String, String> record = 
new ProducerRecord<>(topic,
        key, message);
    try {
      producer.send(record).get();
    } catch (Exception e) {
      throw e;
    }

  }
}

The code is very simple. We just defined the addresses from 2 brokers of our cluster – docker composer will automatically define ports for the brokers, so we need to change the ports accordingly to our environment first – ,  key and value serializers and set the acknowledgement type, on our case all, marking that we want all  replicas to be made before confirming the commit.

PS: Did you noticed the get() method been called after send()? This is because the send method is asynchronous by default. As we want to wait for Kafka to write the message before ending, we call get() to make the call synchronous.

The main class that uses our wrapper class is as follows:

package com.alexandreesl;

import com.alexandreesl.producer.MyProducer;

public class Main {

  public static void main(String[] args) throws Exception {

    MyProducer producer = new MyProducer();

    producer.sendMessage("test", "mysuperkey", "my value");


  }

}

As we can see, is a very simple class, just instantiate the class and use it.  If we run it, we will see the following output on terminal, with Kafka’s commit Id at the end, showing our producer is correctly implemented:

[main] INFO org.apache.kafka.clients.producer.ProducerConfig - 
ProducerConfig values: [main] 
INFO org.apache.kafka.clients.producer.ProducerConfig - 
ProducerConfig values:  
acks = all batch.size = 16384 
bootstrap.servers = [192.168.10.107:32813, 192.168.10.107:32814] 
buffer.memory = 33554432 client.id =  
compression.type = none 
connections.max.idle.ms = 540000 
enable.idempotence = false 
interceptor.classes = null 
key.serializer = class 
org.apache.kafka.common.serialization.StringSerializer 
linger.ms = 0 max.block.ms = 60000 
max.in.flight.requests.per.connection = 5 
max.request.size = 1048576 
metadata.max.age.ms = 300000 
metric.reporters = [] 
metrics.num.samples = 2 
metrics.recording.level = INFO 
metrics.sample.window.ms = 30000 
partitioner.class = class 
org.apache.kafka.clients.producer.internals.DefaultPartitioner 
receive.buffer.bytes = 32768 
reconnect.backoff.max.ms = 1000 
reconnect.backoff.ms = 50 
request.timeout.ms = 30000 
retries = 0 
retry.backoff.ms = 100 
sasl.jaas.config = null 
sasl.kerberos.kinit.cmd = /usr/bin/kinit 
sasl.kerberos.min.time.before.relogin = 60000 
sasl.kerberos.service.name = null 
sasl.kerberos.ticket.renew.jitter = 0.05 
sasl.kerberos.ticket.renew.window.factor = 0.8 
sasl.mechanism = GSSAPI 
security.protocol = PLAINTEXT 
send.buffer.bytes = 131072 
ssl.cipher.suites = null 
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] 
ssl.endpoint.identification.algorithm = null 
ssl.key.password = null ssl.keymanager.algorithm = SunX509 
ssl.keystore.location = null ssl.keystore.password = null 
ssl.keystore.type = JKS 
ssl.protocol = TLS ssl.provider = null 
ssl.secure.random.implementation = null 
ssl.trustmanager.algorithm = PKIX 
ssl.truststore.location = null 
ssl.truststore.password = null 
ssl.truststore.type = JKS 
transaction.timeout.ms = 60000 
transactional.id = null value.serializer = class 
org.apache.kafka.common.serialization.StringSerializer

[main] INFO org.apache.kafka.common.utils.AppInfoParser - 
Kafka version : 0.11.0.2
[main] INFO org.apache.kafka.common.utils.AppInfoParser - 
Kafka commitId : 73be1e1168f91ee2

Process finished with exit code 0

Now that we have our producer implemented, let’s move on to the consumer.

Coding the consumer

Now, let’s code our consumer. First, we create a consumer wrapper, like the following:

package com.alexandreesl.consumer;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Collections;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class MyConsumer {

  private KafkaConsumer<String, String> consumer;

  public MyConsumer() throws UnknownHostException {

    InetAddress ip = InetAddress.getLocalHost();

    StringBuilder builder = new StringBuilder();
    builder.append(ip.getHostAddress());
    builder.append(":");
    builder.append("");
    builder.append(",");
    builder.append(ip.getHostAddress());
    builder.append(":");
    builder.append("");

    Properties kafkaProps = new Properties();
    kafkaProps.put("bootstrap.servers", builder.toString());
    kafkaProps.put("group.id", "MyConsumerGroup");
    kafkaProps.put("key.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
    kafkaProps
        .put("value.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
    consumer = new KafkaConsumer<String, String>(kafkaProps);

  }

  public void consume(String topic) {

    consumer.subscribe(Collections.singletonList(topic));

    try {
      while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records) {
          System.out.println("Key: " + record.key());
          System.out.println("Value: " + record.value());
        }
      }
    } finally {
      consumer.close();
    }


  }

}

On wrapper, we subscribed to our test topic, configuring a ConsumerGroup ID and deserializers for our messages. When we call the subscribe method,  ConsumerGroupCoordinators are updated on the brokers, making the cluster allocate partitions for us on topics we asked for consumption, as long as there is no more consumers than partitions, like we talked about previously.

Then, we create the consume method, which has a infinite loop to keep consuming messages from topic. On our case, we just keep calling the poll method, which returns a List of messages – on default settings, up to 100 messages -, print keys and values of messages and keep polling. At the end, we close the connection.

On our example, we can notice we didn’t explicit commit the messages at any point. This is because we are using default settings, so it is doing auto-commit. As we talked previously, using auto-commit can be a option on some solutions, depending on the situation.

Now, let’s change our main class to allow us to produce and consume using the same program and also allowing to input messages to produce. We do this by adding some input parameters, as follows:

package com.alexandreesl;

import com.alexandreesl.consumer.MyConsumer;
import com.alexandreesl.producer.MyProducer;
import java.util.Scanner;

public class Main {

  public static void main(String[] args) throws Exception {

    Scanner scanner = new Scanner(System.in);

    System.out.println("Please select operation" + " 
(1 for producer, 2 for consumer) :");

    String operation = scanner.next();

    System.out.println("Please enter topic name :");

    String topic = scanner.next();

    if (operation.equals("1")) {

      MyProducer producer = new MyProducer();

      System.out.println("Please enter key :");

      String key = scanner.next();

      System.out.println("Please enter value :");

      String value = scanner.next();

      producer.sendMessage(topic, key, value);
    } else if (operation.equals("2")) {

      MyConsumer consumer = new MyConsumer();

      consumer.consume(topic);


    }


  }

}

If we run our code, we will see some interesting output on console, such as the consumer joining the ConsumerGroupCoordinator and been assigned to partitions. At the end it will print the messages we send as the producer, proving our coding was successful.

[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator -
 Discovered coordinator 192.168.10.107:32814 (id: 2147482646 rack: null) 
for group MyConsumerGroup.
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - 
Revoking previously assigned partitions [] for group MyConsumerGroup
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - 
(Re-)joining group MyConsumerGroup 
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - 
Successfully joined group MyConsumerGroup with generation 2 
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - 
Setting newly assigned partitions [test-1, test-0] for group MyConsumerGroup 
Key: mysuperkey 
Value: my value

Manual committing

Now that we know the basis to producing/consuming Kafka streams, let’s dive in on more details about Kafka’s consumer. We saw previously that our example used default auto-commit to commit offsets after reading. We do this by changing the code as follows:

package com.alexandreesl.consumer;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Collections;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class MyConsumer {

  private KafkaConsumer<String, String> consumer;

  public MyConsumer() throws UnknownHostException {

    InetAddress ip = InetAddress.getLocalHost();

    StringBuilder builder = new StringBuilder();
    builder.append(ip.getHostAddress());
    builder.append(":");
    builder.append("");
    builder.append(",");
    builder.append(ip.getHostAddress());
    builder.append(":");
    builder.append("");

    Properties kafkaProps = new Properties();
    kafkaProps.put("bootstrap.servers", builder.toString());
    kafkaProps.put("group.id", "MyConsumerGroup");
    kafkaProps.put("key.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
    kafkaProps
        .put("value.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
    kafkaProps.put("enable.auto.commit", "false");
    consumer = new KafkaConsumer<String, String>(kafkaProps);

  }

  public void consume(String topic) {

    consumer.subscribe(Collections.singletonList(topic));

    try {
      while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records) {
          System.out.println("Key: " + record.key());
          System.out.println("Value: " + record.value());


        }

        consumer.commitSync();

      }
    } finally {
      consumer.close();
    }


  }

}

If we run our code, we will see that it will continue to consume messages, as expected:

Please select operation (1 for producer, 2 for consumer) :2
Please enter topic name :test
[main] INFO org.apache.kafka.clients.consumer.ConsumerConfig - 
ConsumerConfig values:  auto.commit.interval.ms = 5000 
auto.offset.reset = latest 
bootstrap.servers = [192.168.10.107:32771, 192.168.10.107:32772] 
check.crcs = true client.id =  
connections.max.idle.ms = 540000 
enable.auto.commit = true 
exclude.internal.topics = true 
fetch.max.bytes = 52428800 
fetch.max.wait.ms = 500 
fetch.min.bytes = 1 
group.id = MyConsumerGroup 
heartbeat.interval.ms = 3000 
interceptor.classes = null 
internal.leave.group.on.close = true 
isolation.level = read_uncommitted 
key.deserializer = class 
org.apache.kafka.common.serialization.StringDeserializer 
max.partition.fetch.bytes = 1048576 
max.poll.interval.ms = 300000 
max.poll.records = 500 
metadata.max.age.ms = 300000 
metric.reporters = [] 
metrics.num.samples = 2 
metrics.recording.level = INFO 
metrics.sample.window.ms = 30000 
partition.assignment.strategy = 
[class org.apache.kafka.clients.consumer.RangeAssignor] 
receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 
reconnect.backoff.ms = 50 request.timeout.ms = 305000 
retry.backoff.ms = 100 sasl.jaas.config = null

sasl.kerberos.kinit.cmd = /usr/bin/kinit 
sasl.kerberos.min.time.before.relogin = 60000 
sasl.kerberos.service.name = null 
sasl.kerberos.ticket.renew.jitter = 0.05 
sasl.kerberos.ticket.renew.window.factor = 0.8 
sasl.mechanism = GSSAPI security.protocol = PLAINTEXT 
send.buffer.bytes = 131072 session.timeout.ms = 10000 
ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] 
ssl.endpoint.identification.algorithm = null ssl.key.password = null 
ssl.keymanager.algorithm = SunX509 
ssl.keystore.location = null 
ssl.keystore.password = null 
ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null 
ssl.secure.random.implementation = null 
ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null 
ssl.truststore.password = null ssl.truststore.type = JKS 
value.deserializer = class 
org.apache.kafka.common.serialization.StringDeserializer
[main] INFO org.apache.kafka.common.utils.AppInfoParser - 
Kafka version : 0.11.0.2[main] INFO org.apache.kafka.common.utils.AppInfoParser - 
Kafka commitId : 73be1e1168f91ee2
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator -
 Discovered coordinator 192.168.10.107:32773 
(id: 2147482645 rack: null) for group MyConsumerGroup.
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator -
 Revoking previously assigned partitions [] for group MyConsumerGroup
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator -
 (Re-)joining group MyConsumerGroup
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator -
 Successfully joined group MyConsumerGroup with generation 6
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator -
 Setting newly assigned partitions [test-1, test-0] 
for group MyConsumerGroup
Key: key
Value: value
Key: my
Value: key

On our example, we used synch committing, that is, the main thread is blocked waiting for the commit before start reading the next batch of messages. We can change this just by changing the commit method, as follows:

public void consume(String topic) {

  consumer.subscribe(Collections.singletonList(topic));

  try {
    while (true) {
      ConsumerRecords<String, String> records = consumer.poll(100);
      for (ConsumerRecord<String, String> record : records) {
        System.out.println("Key: " + record.key());
        System.out.println("Value: " + record.value());


      }

      consumer.commitAsync();

    }
  } finally {
    consumer.close();
  }


}

One last thing to check before we move on is committing specific offsets. On our previous examples, we committed all messages at once. If we wanted to do, for example, a asynch commit as messages are processed, we can do the following:

public void consume(String topic) {

  consumer.subscribe(Collections.singletonList(topic));

  try {
    while (true) {
      ConsumerRecords<String, String> records = consumer.poll(100);
      for (ConsumerRecord<String, String> record : records) {
        System.out.println("Key: " + record.key());
        System.out.println("Value: " + record.value());

        HashMap<TopicPartition, OffsetAndMetadata> offsets = 
new HashMap<>();

        offsets.put(new TopicPartition(record.topic(), record.partition()),
            new OffsetAndMetadata(record.offset() + 1, "no metadata"));

        consumer.commitAsync(offsets, null);


      }


    }
  } finally {
    consumer.close();
  }


}

Assigning to specific partitions

On our examples, we delegate to Kafka which partitions the consumers will consume. If we want to specify the partitions a consumer will be assigned to, we can use the assign method.

It is important to notice that this approach is not very recommended, as consumers won’t be replaced automatically by others when going down, neither new partitions will be added for consuming before been explicit assigned to a consumer.

On the example bellow, we do this, by marking that we want just to consume messages from one partition:

public void consume(String topic) {

  List partitions = new ArrayList<>();

  List partitionInfos = null;
  partitionInfos = consumer.partitionsFor(topic);
  if (partitionInfos != null) {
    partitions.add(
        new TopicPartition(partitionInfos.get(0).topic(), 
partitionInfos.get(0).partition()));

  }
  consumer.assign(partitions);

  try {
    while (true) {
      ConsumerRecords<String, String> records = consumer.poll(100);

      for (ConsumerRecord<String, String> record : records) {
        System.out.println("Key: " + record.key());
        System.out.println("Value: " + record.value());

        HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();

        offsets.put(new TopicPartition(record.topic(), record.partition()),
            new OffsetAndMetadata(record.offset() + 1, "no metadata"));

        consumer.commitAsync(offsets, null);


      }


    }
  } finally {
    consumer.close();
  }


}

Consumer rebalance

When consuming from a topic, we can scale consumption by adding more instances of our application, by parallelizing the processing. Let’s see this on practice.

First, let’s start a consumer. After initializing, we can see it joined both partitions from our topic:

[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator -
 Discovered coordinator 192.168.10.107:32772 (id: 2147482645 rack: null) 
for group MyConsumerGroup. 
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator -
 Revoking previously assigned partitions [] 
for group MyConsumerGroup [main] INFO 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator - 
(Re-)joining group MyConsumerGroup 
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator -
 Successfully joined group MyConsumerGroup with generation 18 
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator -
 Setting newly assigned partitions [test-1, test-0] for group 
MyConsumerGroup

Now, let’s start another consumer. We will see that, as soon it joins the ConsumerGroupCoordinator, it will be assigned to one of the partitions:

[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator -
 Discovered coordinator 192.168.10.107:32772 
(id: 2147482645 rack: null) 
for group MyConsumerGroup. 
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator -
 Revoking previously assigned partitions [] for group MyConsumerGroup 
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator -
 (Re-)joining group MyConsumerGroup 
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator -
 Successfully joined group MyConsumerGroup with generation 19 
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator -
 Setting newly assigned partitions [test-0] for group MyConsumerGroup

And if we see our old consumer, we will see that will be now reading from the other partition only:

[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator -
 (Re-)joining group MyConsumerGroup 
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator -
 Successfully joined group MyConsumerGroup with generation 19 
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator -
 Setting newly assigned partitions [test-1] for group MyConsumerGroup

This show us the power of Kafka ConsumerGroup Coordinator, that takes care of everything for us.

But, it is important to notice that, on real scenarios, we can implement listeners that are invoked when partitions are revoked to other consumers due to rebalance and before a partition starts consumption on his new consumer. This can be done by implementing the ConsumerRebalanceListener interface, as follows:

package com.alexandreesl.listener;

import java.util.Collection;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.common.TopicPartition;

public class MyConsumerRebalanceInterface implements 
ConsumerRebalanceListener {

  @Override
  public void onPartitionsRevoked(Collection partitions) {
    System.out.println("I am losing the following partitions:");
    for (TopicPartition partition : partitions) {
      System.out.println(partition.partition());
    }
  }

  @Override
  public void onPartitionsAssigned(Collection partitions) {
    System.out.println("I am starting on the following partitions:");
    for (TopicPartition partition : partitions) {
      System.out.println(partition.partition());
    }
  }
}

Of course, this is just a mock implementation. On a real implementation, we would be doing tasks such as committing offsets – if we buffered our commits on blocks before committing instead of committing one by one, that would turn out to be a necessity -, closing connections, etc.

We add our new listener by passing him as parameter to the subscribe() method, as follows:

public void consume(String topic) {

  consumer.subscribe(Collections.singletonList(topic), 
new MyConsumerRebalanceInterface());

  try {
    while (true) {
      ConsumerRecords<String, String> records = consumer.poll(100);

      for (ConsumerRecord<String, String> record : records) {
        System.out.println("Key: " + record.key());
        System.out.println("Value: " + record.value());

        HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();

        offsets.put(new TopicPartition(record.topic(), record.partition()),
            new OffsetAndMetadata(record.offset() + 1, "no metadata"));

        consumer.commitAsync(offsets, null);


      }


    }
  } finally {
    consumer.close();
  }


}

Now, let’s terminate all our previously started consumers and start them again. When starting the first consumer, we will see the following outputs on terminal:

[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator -
 Discovered coordinator 192.168.10.107:32772 (id: 2147482645 rack: null) 
for group MyConsumerGroup. 
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator -
 Revoking previously assigned partitions [] for group 
MyConsumerGroup I am losing the following partitions: 
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator -
 (Re-)joining group MyConsumerGroup 
I am starting on the following partitions:
 1 0 
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - 
Successfully joined group MyConsumerGroup with generation 21 
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator -
 Setting newly assigned partitions [test-1, test-0] 
for group MyConsumerGroup

That shows our listener was invoked. Let’s now start the second consumer and see what happens:

I am losing the following partitions: 
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator -
 Discovered coordinator 192.168.10.107:32772 
(id: 2147482645 rack: null) for group MyConsumerGroup. 
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator -
 Revoking previously assigned partitions [] for group 
MyConsumerGroup 
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator -
 (Re-)joining group MyConsumerGroup 
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator -
 Successfully joined group MyConsumerGroup with generation 22 
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator -
 Setting newly assigned partitions [test-0] 
for group MyConsumerGroup 
I am starting on the following partitions: 0

And finally, if we see the first consumer, we will see that both revoked and reassigned partitions were printed on console, showing our listener was implemented correctly:

[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator -
 (Re-)joining group MyConsumerGroup 
I am losing the following partitions: 1 0 
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator -
 Successfully joined group MyConsumerGroup with generation 22 
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator -
 Setting newly assigned partitions [test-1] 
for group MyConsumerGroup 
I am starting on the following partitions: 1

PS: Kafka works rebalancing by revoking all partitions and redistributing them. That’s why we see the first consumer losing all partitions before been reassigned to one of the old ones.

Log compaction

Log compaction is a powerful cleanup feature of Kafka. With log compaction, we define a  point from which messages from a same key on a same partition are compacted so only the more recent message is retained.

This is done by setting configurations that establish a compaction entry point and a retention entry point. This entry points consists of time periods, from which Kafka allow messages to keep coming from the producers, but at the same time removing old messages that doesn’t matter anymore. The following diagram explain the system on practice:

kafka-compaction

Kafka log compaction explained

In order to configure log compaction, we need to introduce some configurations both on cluster and topic. For the cluster, we change our docker compose YAML as follows:

version: '2'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
  kafka:
    image: wurstmeister/kafka
    ports:
      - "9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: ${MY_IP}
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_DELETE_TOPIC_ENABLE: "true"
      KAFKA_LOG_CLEANER_ENABLED: "true"

    volumes:
      - /var/run/docker.sock:/var/run/docker.sock

This change is needed due to log cleaning not been enabled by default on Kafka. Then, we change our topic configuration with the following new entries:

docker exec -t -i kafkalab_kafka_1 /opt/kafka/bin/kafka-configs.sh 
--zookeeper ${MY_IP}:2181 --entity-type topics --entity-name test --alter 
--add-config min.compaction.lag.ms=1800000,delete.retention.ms=172800000,
cleanup.policy=compact

On the command above, we set the compaction entry point – min.compaction.lag.ms – to 30 minutes, so all messages from the head of the stream to 30 minutes after will be on the dirty section. The other config stablished a retention period of 48 hours, so from 30 minutes up to 48 hours, all messages will be on the clean section, where compaction will occur. Messages older than 48 hours will be removed from the stream.

Lastly, we configured the cleanup policy, making compaction enabled. We can check our configs were successfully set by using the following command:

docker exec -t -i kafkalab_kafka_1 /opt/kafka/bin/kafka-configs.sh 
--zookeeper ${MY_IP}:2181 --entity-type topics --entity-name test --describe

Which will produce the following output:

Configs for topic 'test' are 
min.compaction.lag.ms=1800000,delete.retention.ms=172800000,
cleanup.policy=compact

One last thing we need to know before moving on to our next topic is that compaction also allows messages to be removed. If we want a message to be completely removed from our stream, all we need to do is send a message with his key, but with null as value. When sent this way with compaction enabled, it will remove all messages from the stream. This kind of messages are called tombstones on Kafka.

Kafka connect

Kafka connect is a integration framework, like others such as Apache Camel, that ships with Kafka – but runs on a cluster of his own – and allows us to quickly develop integrations from/to Kafka to other systems. It is maintained by Confluence.

This framework deserves a article of his own, so it won’t be covered here. If the reader wants to know more about it, please go to the following link:

https://docs.confluent.io/current/connect/intro.html

Kafka Streams

Kafka Streams is a framework shipped with Kafka that allows us to implement stream applications using Kafka. By stream applications, that means applications that have streams as input and output as well, consisting typically of operations such as aggregation, reduction, etc.

A typical example of a stream application is reading data from 2 different streams and producing a aggregated result from the two on a third stream.

This framework deserves a article of his own, so it won’t be covered here. If the reader wants to know more about it, please go to the following link:

https://kafka.apache.org/documentation/streams/

Kafka MirrorMaker

Kafka MirrorMaker is a tool that allows us to mirror Kafka clusters, by making copies from a source cluster to a target cluster, as messages goes in. As with Kafka connect and Streams, is a tool that deserves his own article, so it won’t be covered here. More information about it could be found on the following link:

https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=27846330

Kafka administration

Now that we covered most of the developing code to use Kafka, let’s see how to administrate a Kafka cluster. All commands for Kafka administration are done by their shell scripts, like we did previously on our study.

Kafka CRUD topic operations

Let’s begin with basic topic operations. Like we saw before, topics can be created with the following command:

docker exec -t -i kafkalab_kafka_1 /opt/kafka/bin/kafka-topics.sh 
--create --zookeeper ${MY_IP}:2181 --replication-factor 1 --partitions 2 
--topic test

Changing topics – not configurations, like we saw on log compaction, but the topic itself, such as the number of partitions  – are done with the same shell, just changing some options:

docker exec -t -i kafkalab_kafka_1 /opt/kafka/bin/kafka-topics.sh 
--alter --zookeeper ${MY_IP}:2181 --partitions 4 --topic test

IMPORTANT: changing partition numbers on topics also can change partition logic, meaning messages that always were sent to a same partition A can be now always sent to a partition B. This is important to watch out as can lead to message ordering issues if not taken with care.

We can search for topics by using the list command, like we did before:

docker exec -t -i kafkalab_kafka_1 /opt/kafka/bin/kafka-topics.sh 
--list --zookeeper ${MY_IP}:2181

If we want to delete a topic, we issue the following command. Take note that, if the configuration delete.topic.enabled is not set, the topic will just be marked for deletion, not removed:

docker exec -t -i kafkalab_kafka_1 /opt/kafka/bin/kafka-topics.sh 
--delete --zookeeper ${MY_IP}:2181 --topic test

Other Kafka admin operations

Let’s now see other Kafka admin operations. First, let’s create a new topic to test it out:

docker exec -t -i kafkalab_kafka_1 /opt/kafka/bin/kafka-topics.sh 
--create --zookeeper ${MY_IP}:2181 --replication-factor 1 
--partitions 3 --topic mytopic

The first operation we will see is preferred replica election. When Kafka creates a topic, at first, the partition leaders are spread out as evenly as possible, reducing impact risks on nodes going down. However, after some time, this distribution could be compromised, due to nodes going down and up several times, inducing on several rebalances. This could be specially problematic on a small cluster.

The preferred replica election operation tries to rebalance a topic to as closest as possible to his original partition leader distribution, solving the distribution problem. This is done with the following command:

docker exec -t -i kafkalab_kafka_1 
/opt/kafka/bin/kafka-preferred-replica-election.sh 
--zookeeper ${MY_IP}:2181

IMPORTANT: This command triggers rebalances on all topics from the cluster, so it must be used with care.

We can also trigger rebalance for just one topic, by writing a JSON file like this:

{
  "partitions": [
    {
      "partition": 1,
      "topic": "mytopic"
    },
    {
      "partition": 2,
      "topic": "mytopic"
    },
    {
      "partition": 3,
      "topic": "mytopic"
    }
  ]
}

And running the command like this:

docker exec -t -i kafkalab_kafka_1 
/opt/kafka/bin/kafka-preferred-replica-election.sh 
--zookeeper ${MY_IP}:2181 
--path-to-json-file rebalance-example.json

PS: Before running the command, it is necessary to copy the file to the container where it will run the command.

Another useful command is reassigning of replicas. This is useful, for example, if we want to isolate a broker from the cluster, that it will be removed for maintenance, or if a new broker is added and need to receive his share of topics in order to balance the cluster.

The first step is to generate a file that will be used to request a proposal to partition moves. We write the following file, calling “partition-req.json”:

{
 "topics": [
 {
 "topic": "mytopic"
 }
 ],
 "version": 1
}

On our stack, we have only 3 nodes, so reassign proposal can fail due to the cluster been so small. We change our start cluster shell as follows and run again:

#!/usr/bin/env bash

export MY_IP=`ip route get 1 | awk '{print $NF;exit}'`
docker-compose up -d --scale kafka=6

We then execute the following command. Remember to copy the file to the container first:

docker exec -t -i kafkalab_kafka_1 
/opt/kafka/bin/kafka-reassign-partitions.sh 
--zookeeper ${MY_IP}:2181 --generate 
--topics-to-move-json-file partition-req.json 
--broker-list 1004,1005,1006

IMPORTANT: We can copy the file as follows:

docker cp partition-req.json kafkalab_kafka_1:/partition-req.json

On the command above, we ask Kafka that we want to redistribute the replica set from the current brokers to the brokers 1004,1005 and 1006. We receive the following output, with the actual distribution and a proposed one:

Current partition replica assignment 
{"version":1,"partitions":[{"topic":"mytopic","partition":2,"replicas":[1001]
,"log_dirs":["any"]},{"topic":"mytopic","partition":1,"replicas":[1003],
"log_dirs":["any"]},{"topic":"mytopic","partition":0,"replicas":[1002],
"log_dirs":["any"]}]}

Proposed partition reassignment configuration 
{"version":1,"partitions":[{"topic":"mytopic","partition":2,"replicas":[1005]
,"log_dirs":["any"]},
{"topic":"mytopic","partition":1,"replicas":[1004],
"log_dirs":["any"]},{"topic":"mytopic","partition":0,"replicas":[1006],
"log_dirs":["any"]}]}

The first JSON can be saved for rolling back, in case anything goes wrong. Let’s save the second JSON on a file called replica-proposal.json:

{"version":1,"partitions":[{"topic":"mytopic","partition":2,
"replicas":[1005],"log_dirs":["any"]},
{"topic":"mytopic","partition":1,"replicas":[1004]
,"log_dirs":["any"]},{"topic":"mytopic","partition":0
,"replicas":[1006],"log_dirs":["any"]}]}

Finally, we run the replica assignment command, using the proposed distribution file as parameter – don’t forget to copy the file to the container first -, as follows:

docker exec -t -i kafkalab_kafka_1 
/opt/kafka/bin/kafka-reassign-partitions.sh 
--zookeeper ${MY_IP}:2181 --execute 
--reassignment-json-file replica-proposal.json

We will receive a output like this:

Current partition replica assignment {"version":1,"partitions":
[{"topic":"mytopic","partition":2,"replicas":[1001],"log_dirs":["any"]}
,{"topic":"mytopic","partition":1,"replicas":[1003],"log_dirs":["any"]}
,{"topic":"mytopic","partition":0,"replicas":[1002],"log_dirs":["any"]}]}

Save this to use as the --reassignment-json-file option during rollback

Successfully started reassignment of partitions.

This means that reassigning is been performed. During this phase, Kafka will redistribute the replicas and copy all data across the new brokers, so depending on the amount of data, this operation can take a lot of time. We can check the status of reassignment by running:

docker exec -t -i kafkalab_kafka_1 
/opt/kafka/bin/kafka-reassign-partitions.sh 
--zookeeper ${MY_IP}:2181 --verify 
--reassignment-json-file replica-proposal.json

When reassignment is finished, we will see the following:

Status of partition reassignment: 

Reassignment of partition mytopic-2 completed successfully

Reassignment of partition mytopic-1 completed successfully

Reassignment of partition mytopic-0 completed successfully

We can also check the status of our topics by running the describe command, as follows:

docker exec -t -i kafkalab_kafkadocker exec -t -i 
kafkalab_kafka_1 /opt/kafka/bin/kafka-topics.sh 
--zookeeper ${MY_IP}:2181 --describe

After our reassignment, it will output something like this:

Topic:mytopic PartitionCount:3 ReplicationFactor:1 Configs:

Topic: mytopicPartition: 0Leader: 1006Replicas: 1006Isr: 1006

Topic: mytopicPartition: 1Leader: 1004Replicas: 1004Isr: 1004

Topic: mytopicPartition: 2Leader: 1005Replicas: 1005Isr: 1005

Topic:test PartitionCount:2 ReplicationFactor:1 Configs:

Topic: testPartition: 0Leader: 1003Replicas: 1003Isr: 1003

Topic: testPartition: 1Leader: 1001Replicas: 1001Isr: 1001

Kafka offset lag

Kafka’s offset lag refers to a situation where we have consumers lagging behind the head of a stream. Let’s revisit one of our diagrams from the offsets explained section:

kafka-diagram-3

Consumers lagging behind on a stream

As we can see on the diagram above, we have 2 consumers groups in a stream. Consumer group 1 is 3 messages from the stream’s head, while consumer group 2 is 8 messages away. This difference between head and current position of a consumer on a stream is called offset lag.

The causes for a offset lag may vary, ranging from network problems to issues on the consumer application itself. It is important to keep this lag in check by monitoring it. One good tool for this is Burrow, provided by Linkedin. More information about it could be found on the following link:

https://github.com/linkedin/Burrow

Testing the cluster

It is important to test our cluster configuration, in order to verify how the cluster will behave on several situations, such as when brokers goes down – with partition leaderships or not -, new brokers goes in, etc.

We can code our own tests for this intent using the VerifiableProducer and VerifiableConsumer interfaces on Apache Kafka’s APIs. The usage for this interfaces are essentially the same as the original ones we saw on our lab.

There is also a read-to-use bash version of this interfaces, that can be used to make some testing. For example, if we wanted to test our cluster by sending 200000 messages to mytopic, we can something like this:

docker exec -t -i kafkalab_kafka_1 
/opt/kafka/bin/kafka-verifiable-producer.sh 
--topic mytopic --max-messages 200000 
--broker-list 
 ${MY_IP}:<a broker port>,${MY_IP}:<a broker port>

This will produce a output like the following:

{"timestamp":1516571263855,"name":"startup_complete"}

{"timestamp":1516571264213,"name":"producer_send_success","key":null,"value":"0","offset":0,"topic":"mytopic","partition":1}

{"timestamp":1516571264216,"name":"producer_send_success","key":null,"value":"3","offset":1,"topic":"mytopic","partition":1}

{"timestamp":1516571264216,"name":"producer_send_success","key":null,"value":"6","offset":2,"topic":"mytopic","partition":1}

{"timestamp":1516571264216,"name":"producer_send_success","key":null,"value":"9","offset":3,"topic":"mytopic","partition":1}

{"timestamp":1516571264216,"name":"producer_send_success","key":null,"value":"12","offset":4,"topic":"mytopic","partition":1}

{"timestamp":1516571264216,"name":"producer_send_success","key":null,"value":"15","offset":5,"topic":"mytopic","partition":1}

{"timestamp":1516571264217,"name":"producer_send_success","key":null,"value":"18","offset":6,"topic":"mytopic","partition":1}

{"timestamp":1516571264217,"name":"producer_send_success","key":null,"value":"21","offset":7,"topic":"mytopic","partition":1}

{"timestamp":1516571264218,"name":"producer_send_success","key":null,"value":"24","offset":8,"topic":"mytopic","partition":1}

{"timestamp":1516571264218,"name":"producer_send_success","key":null,"value":"27","offset":9,"topic":"mytopic","partition":1}

{"timestamp":1516571264218,"name":"producer_send_success","key":null,"value":"30","offset":10,"topic":"mytopic","partition":1}

{"timestamp":1516571264219,"name":"producer_send_success","key":null,"value":"33","offset":11,"topic":"mytopic","partition":1}

{"timestamp":1516571264220,"name":"producer_send_success","key":null,"value":"36","offset":12,"topic":"mytopic","partition":1}

{"timestamp":1516571264220,"name":"producer_send_success","key":null,"value":"39","offset":13,"topic":"mytopic","partition":1}

{"timestamp":1516571264220,"name":"producer_send_success","key":null,"value":"42","offset":14,"topic":"mytopic","partition":1}

{"timestamp":1516571264221,"name":"producer_send_success","key":null,"value":"45","offset":15,"topic":"mytopic","partition":1}

{"timestamp":1516571264224,"name":"producer_send_success","key":null,"value":"48","offset":16,"topic":"mytopic","partition":1}

{"timestamp":1516571264225,"name":"producer_send_success","key":null,"value":"51","offset":17,"topic":"mytopic","partition":1}

{"timestamp":1516571264225,"name":"producer_send_success","key":null,"value":"54","offset":18,"topic":"mytopic","partition":1}

...omitted...

{"timestamp":1516571272789,"name":"producer_send_success","key":null,"value":"199980","offset":66660,"topic":"mytopic","partition":1}

{"timestamp":1516571272789,"name":"producer_send_success","key":null,"value":"199983","offset":66661,"topic":"mytopic","partition":1}

{"timestamp":1516571272789,"name":"producer_send_success","key":null,"value":"199986","offset":66662,"topic":"mytopic","partition":1}

{"timestamp":1516571272789,"name":"producer_send_success","key":null,"value":"199989","offset":66663,"topic":"mytopic","partition":1}

{"timestamp":1516571272789,"name":"producer_send_success","key":null,"value":"199992","offset":66664,"topic":"mytopic","partition":1}

{"timestamp":1516571272789,"name":"producer_send_success","key":null,"value":"199995","offset":66665,"topic":"mytopic","partition":1}

{"timestamp":1516571272789,"name":"producer_send_success","key":null,"value":"199998","offset":66666,"topic":"mytopic","partition":1}

{"timestamp":1516571272803,"name":"shutdown_complete"}

{"timestamp":1516571272805,"name":"tool_data","sent":200000,"acked":200000,"target_throughput":-1,"avg_throughput":22346.368715083798}

And similarly, we can test the consumer by running something like this:

docker exec -t -i kafkalab_kafka_1 
/opt/kafka/bin/kafka-verifiable-consumer.sh 
--topic mytopic --max-messages 1000 
--group-id testing 
--broker-list ${MY_IP}:<a broker port>,${MY_IP}:<a broker port>

Which will output something like this:

{"timestamp":1516571973384,"name":"startup_complete"}

{"timestamp":1516571973534,"name":"partitions_revoked","partitions":[]}

{"timestamp":1516571973557,"name":"partitions_assigned","partitions":[{"topic":"mytopic","partition":2},{"topic":"mytopic","partition":1},{"topic":"mytopic","partition":0}]}

{"timestamp":1516571973669,"name":"records_consumed","count":500,"partitions":[{"topic":"mytopic","partition":1,"count":500,"minOffset":66667,"maxOffset":67166}]}

{"timestamp":1516571973680,"name":"offsets_committed","offsets":[{"topic":"mytopic","partition":1,"offset":67167}],"success":true}

{"timestamp":1516571973687,"name":"records_consumed","count":500,"partitions":[{"topic":"mytopic","partition":1,"count":500,"minOffset":67167,"maxOffset":67666}]}

{"timestamp":1516571973690,"name":"offsets_committed","offsets":[{"topic":"mytopic","partition":1,"offset":67667}],"success":true}

{"timestamp":1516571973692,"name":"records_consumed","count":500,"partitions":[]}

{"timestamp":1516571973692,"name":"offsets_committed","offsets":[],"success":true}

{"timestamp":1516571973694,"name":"records_consumed","count":500,"partitions":[]}

{"timestamp":1516571973694,"name":"offsets_committed","offsets":[],"success":true}

{"timestamp":1516571973696,"name":"records_consumed","count":500,"partitions":[]}

{"timestamp":1516571973696,"name":"offsets_committed","offsets":[],"success":true}

{"timestamp":1516571973697,"name":"records_consumed","count":500,"partitions":[]}

{"timestamp":1516571973697,"name":"offsets_committed","offsets":[],"success":true}

{"timestamp":1516571973698,"name":"records_consumed","count":500,"partitions":[]}

{"timestamp":1516571973699,"name":"offsets_committed","offsets":[],"success":true}

{"timestamp":1516571973700,"name":"records_consumed","count":500,"partitions":[]}

{"timestamp":1516571973700,"name":"offsets_committed","offsets":[],"success":true}

{"timestamp":1516571973701,"name":"records_consumed","count":500,"partitions":[]}

{"timestamp":1516571973702,"name":"offsets_committed","offsets":[],"success":true}

{"timestamp":1516571973702,"name":"records_consumed","count":500,"partitions":[]}

{"timestamp":1516571973703,"name":"offsets_committed","offsets":[],"success":true}

{"timestamp":1516571973704,"name":"records_consumed","count":500,"partitions":[]}

{"timestamp":1516571973704,"name":"offsets_committed","offsets":[],"success":true}

{"timestamp":1516571973705,"name":"records_consumed","count":500,"partitions":[]}

{"timestamp":1516571973705,"name":"offsets_committed","offsets":[],"success":true}

{"timestamp":1516571973706,"name":"records_consumed","count":500,"partitions":[]}

{"timestamp":1516571973706,"name":"offsets_committed","offsets":[],"success":true}

{"timestamp":1516571973708,"name":"records_consumed","count":500,"partitions":[]}

{"timestamp":1516571973708,"name":"offsets_committed","offsets":[],"success":true}

{"timestamp":1516571973709,"name":"records_consumed","count":500,"partitions":[]}

{"timestamp":1516571973709,"name":"offsets_committed","offsets":[],"success":true}

{"timestamp":1516571973710,"name":"records_consumed","count":500,"partitions":[]}

{"timestamp":1516571973711,"name":"offsets_committed","offsets":[],"success":true}

{"timestamp":1516571973714,"name":"records_consumed","count":500,"partitions":[]}

{"timestamp":1516571973714,"name":"offsets_committed","offsets":[],"success":true}

{"timestamp":1516571973715,"name":"records_consumed","count":500,"partitions":[]}

{"timestamp":1516571973715,"name":"offsets_committed","offsets":[],"success":true}

{"timestamp":1516571973716,"name":"records_consumed","count":500,"partitions":[]}

{"timestamp":1516571973716,"name":"offsets_committed","offsets":[],"success":true}
...omitted...

Conclusion

And that concludes our study of Apache Kafka. I hope to have passed for the reader a solid explanation of Kafka core concepts, as well as directions for complementary studies on his different usages and applications. Thank you for following me on this post, until next time.

Continue reading