---
title: Event Streaming in Rails with Kafka
published: "2022-05-23"
publisher: Honeybadger
author: David Sanchez
category: Ruby articles
tags:
  - Ruby
  - Rails
description: "Do you need to process a lot of data in real time? Event streaming is a pattern that could help. David Sanchez walks us through how to do event streaming in Rails with Apache Kafka, the popular open-source event streaming platform."
url: "https://www.honeybadger.io/blog/event-streaming-rails-kafka/"
---

Companies want to react quickly to the need of processing and sharing large amounts of data in real time to gain insights and create more engaging customer experiences. So, traditional data processing is no longer viable in today’s world.

To achieve that, you need to process a lot of data as fast as possible and then send it to other services for more processing. But in the middle of all these quick actions, it's necessary to notify consumers when the event occurs—and we can do just that using event streaming.

This is the [repo](https://github.com/Sanchezdav/karafka_example) in GitHub that we will be using.

## Events

Before talking about event streaming, let’s talk about what an event is. An event that happens within an application can be related to a user process or simply actions that affect the business.

Events represent a state change, not the question of how to modify the application. Consider these as examples:

- A user logging into a service
- A payment transaction
- A writer publishing a post in a blog

In most cases, an event will trigger more events; for example, when a user signs up for a service, the app sends a notification to their device, inserts the record in the database, and sends a welcoming email.

## Event streaming

**Event Streaming** is a pattern for capturing data in real time from event sources such as databases. The main parts of event streaming are as follows:

- **Broker**: The system in charge of storing events
- **Topic**: A category of events
- **Producer**: Sends events to a broker on a specific topic
- **Consumer**: Reads the events
- **Events**: Data that producers want to communicate to consumers

It is inevitable to talk about **publish and subscribe architecture pattern ([pub/sub pattern](https://en.wikipedia.org/wiki/Publish%E2%80%93subscribe_pattern))** at this point; event streaming is an implementation of that pattern but with these changes:

- Events occur instead of messages.
- Events are ordered, typically by time.
- Consumers can read events from a specific point in the topic.
- The events have temporal durability.

The flow starts when the **producer** publishes a new **event** into a **topic** (as we saw previously, the topic is just the categorization for a specific type of event). Then, **consumers** interested in events of a particular category subscribe to that topic. Finally, the **broker** identifies the consumers of the topic and makes the desired events available.

## Advantages of event streaming

- **Decoupling** There's no dependency between publishers and consumers because they don't need to know each other. In addition, the events don't specify their actions, so many consumers could get the same event and perform different actions.
    
- **Low Latency** Events are decoupled and let the consumer utilize them anytime; it can happen in milliseconds.
    
- **Independence** As we know, publishers and consumers are independent, so different teams can work with them using the same events for other actions or purposes.
    
- **Fault Tolerance** Some event streaming platforms help you deal with consumer failures; for example, consumers can save their position and start from there again if an error occurs.
    
- **Real-Time Handling** Feedback is received in real time, so the users don't need to wait minutes or hours to see the response of their events.
    
- **High Performance** Event platforms can handle many messages due to the low latency—for example, thousands of events in a second.
    

## Disadvantages of event streaming

- **Monitoring** Some event streaming tools don't have a complete monitoring tool; they call for additional tools to be implemented, such as Datadog or New Relic.
    
- **Configuration** The configuration in some tools can be overwhelming even for experienced people. There are many parameters, and sometimes, you need to know in depth about the subject to implement them.
    
- **Client libraries** It isn't easy to implement Kafka in languages other than Java. Sometimes, the client libraries are not up to date, show instability, or don't offer many alternatives to choose from.
    

One of the most popular tools for event streaming is **Apache Kafka**. This tool allows users to send, store, and request data whenever and wherever they need it; let's talk about it.

## Apache Kafka

_"[Apache Kafka](https://kafka.apache.org/) is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications."_

Being specifically designed for real-time log transmission, Apache Kafka is ideal for applications that require the following:

- Reliable data exchange between different components
- The ability to divide messaging workloads as application requirements change
- Real-time transmission for data processing

Let's use Kafka in a Rails application!

## Using Kafka with Rails

The most famous gem to use Kafka in Ruby is called [ruby-kafka](https://github.com/zendesk/ruby-kafka) by Zendesk, and it is great! Still, you need to do all the implementation manually, which is why we have some "frameworks" built with ruby-kafka. They also help us with all the configuration and execution steps.

[Karafka](https://github.com/karafka/karafka) is a framework used to simplify Apache Kafka-based Ruby applications development.

To work with Kafka, it is necessary to install [Java](https://www.java.com/es/). Because Kafka is a Scala and Java application also, installing [Zookeeper](https://zookeeper.apache.org/) will be required.

Before the installation, I want to explain a bit about Zookeeper. Zookeeper is a centralized service essential for Kafka; it sends notifications in case of changes such as the creation of a new topic, crash of a broker, removal of a broker, deletion of topics, and so on.

Its main task is to manage Kafka brokers, maintain a list with their respective metadata, and facilitate health-checking mechanisms. In addition, it helps to select the leading broker for different partitions of the topics.

## Requirements

**For MacOS:**

Now, let's install Java and Zookeeper with the following commands:

```bash
brew install java brew install zookeeper
```

Then, we can continue installing Kafka running this:

```bash
brew install kafka
```

Once we have Kafka and Zookeeper installed, it's necessary to start the services this way:

```bash
brew services start zookeeper brew services start kafka
```

**For Windows and Linux:**

Instructions:

1. [Installing Java](https://java.com/en/download/help/windows_manual_download.html)
2. [Download Zookeeper](https://zookeeper.apache.org/releases.html#download)

## Setting Up Rails

Just create a simple Rails application as usual:

```bash
rails new karafka_example
```

and add the karafka gem within the Gemfile:

```ruby
gem 'karafka'
```

Then, run `bundle install` to install the gem recently added, and don't forget to run the following command to get all the Karafka things:

```bash
bundle exec karafka install
```

That command should generate some interesting files: the first one is `karafka.rb` in the root directory, `app/consumers/application_consumer.rb`, and `app/responders/application_responder.rb`.

## Karafka Initializer

The `karafka.rb` file is like an initializer application separated from Rails config. It allows you to configure the Karafka application and draw some routes, similar in terms of API as Rails application routes. But here, it’s for topics and consumers.

## Producer

The **producer** is in charge of creating the events, and we can add them into the `app/responders` folder. Now, let’s make a simple producer for users:

```ruby
# app/responders/users_responder.rb class UsersResponder < ApplicationResponder topic :users def respond(event_payload) respond_to :users, event_payload end end
```

## Consumer

The **consumer** is responsible for reading all the events/messages sent from the producer. This is just a consumer that logs the received message.

```ruby
# app/consumers/users_consumer.rb class UsersConsumer < ApplicationConsumer def consume Karafka.logger.info "New [User] event: #{params}" end end
```

We use `params` to get the event. But if you'll read events in batches and you have the config `config.batch_fetching` as true, you should use `params_batch`.

## Testing

To run our Karafka service (the one that will be hearing the events), go to the console, open a new tab, go to the Rails project, and run:

```bash
bundle exec karafka server
```

## Successful Event

Now, open another console tab, go to the Rails project, and type this:

```bash
rails c
```

There, let’s create an event with our responder:

```bash
> UsersResponder.call({ event_name: "user_created", payload: { user_id: 1 } })
```

If you check the Rails console, we will receive this message after the event is created:

```bash
Successfully appended 1 messages to users/0 on 192.168.1.77:9092 (node_id=0) => {"users"=>[["{\"event_name\":\"user_created\",\"payload\":{\"user_id\":1}}", {:topic=>"users"}]]}
```

And in the Karafka service tab, you’ll see something like this:

```bash
New [User] event: #<Karafka::Params::Params:0x00007fa76f0316c8> Inline processing of topic users with 1 messages took 0 ms 1 message on users topic delegated to UsersConsumer [[karafka_example] {}:] Marking users/0:1 as processed [[karafka_example] {}:] Committing offsets: users/0:2 [[karafka_example] {}:] [offset_commit] Sending offset_commit API request 28 to 192.168.1.77:9092
```

But if you just want the message payload, you can add `params.payload` in your consumer and you will have something like this:

```bash
Params deserialization for users topic successful in 0 ms New [User] event: {"event_name"=>"user_created", "payload"=>{"user_id"=>1}} Inline processing of topic users with 1 messages took 1 ms 1 message on users topic delegated to UsersConsumer
```

## Failed Event

You can create a User model with some attributes like `email`, `first_name` and `last_name` running the following command:

```bash
rails g model User email first_name last_name
```

Then, you can run the migration with this:

```bash
rails db:migrate
```

Now, add some validations like this:

```ruby
class User < ApplicationRecord validates :email, uniqueness: true end
```

Finally, we can change the consumer:

```ruby
class UsersConsumer < ApplicationConsumer def consume Karafka.logger.info "New [User] event: #{params.payload}" User.create!(params.payload['user']) end end
```

So, let's create two events with the same email:

```bash
UsersResponder.call({ event_name: "user_created", user: { user_id: 1, email: 'batman@mail.com', first_name: 'Bruce', last_name: 'Wayne' } } ) UsersResponder.call({ event_name: "user_created", user: { user_id: 2, email: 'batman@mail.com', first_name: 'Bruce', last_name: 'Wayne' } } )
```

With this, the first event is created in the database:

```bash
New [User] event: {"event_name"=>"user_created", "user"=>{"user_id"=>1, "email"=>"batman@mail.com", "first_name"=>"Bruce", "last_name"=>"Wayne"}} [[karafka_example] {users: 0}:] [fetch] Received response 2 from 192.168.1.77:9092 [[karafka_example] {users: 0}:] Fetching batches [[karafka_example] {users: 0}:] [fetch] Sending fetch API request 3 to 192.168.1.77:9092 [[karafka_example] {users: 0}:] [fetch] Waiting for response 3 from 192.168.1.77:9092 TRANSACTION (0.1ms)  BEGIN ↳ app/consumers/users_consumer.rb:14:in `consume' User Create (9.6ms)  INSERT INTO "users" ("user_id", "email", "first_name", "last_name", "created_at", "updated_at") VALUES ($1, $2, $3, $4, $5, $6) RETURNING "id"  [["user_id", "1"], ["email", "batman@mail.com"], ["first_name", "Bruce"], ["last_name", "Wayne"], ["created_at", "2021-03-10 04:29:14.827778"], ["updated_at", "2021-03-10 04:29:14.827778"]] ↳ app/consumers/users_consumer.rb:14:in `consume' TRANSACTION (5.0ms)  COMMIT ↳ app/consumers/users_consumer.rb:14:in `consume' Inline processing of topic users with 1 messages took 70 ms 1 message on users topic delegated to UsersConsumer
```

But the second one will fail, because we have a validation that says the email is unique. If you try to add another record with an existing email, you will see something like this:

```bash
New [User] event: {"event_name"=>"user_created", "user"=>{"user_id"=>2, "email"=>"batman@mail.com", "first_name"=>"Bruce", "last_name"=>"Wayne"}} [[karafka_example] {users: 0}:] [fetch] Received response 2 from 192.168.1.77:9092 [[karafka_example] {users: 0}:] Fetching batches [[karafka_example] {users: 0}:] [fetch] Sending fetch API request 3 to 192.168.1.77:9092 [[karafka_example] {users: 0}:] [fetch] Waiting for response 3 from 192.168.1.77:9092 TRANSACTION (0.2ms)  BEGIN ↳ app/consumers/users_consumer.rb:14:in `consume' User Exists? (0.3ms)  SELECT 1 AS one FROM "users" WHERE "users"."email" = $1 LIMIT $2  [["email", "batman@mail.com"], ["LIMIT", 1]] ↳ app/consumers/users_consumer.rb:14:in `consume' TRANSACTION (0.2ms)  ROLLBACK ↳ app/consumers/users_consumer.rb:14:in `consume' [[karafka_example] {users: 0}:] Exception raised when processing users/0 at offset 42 -- ActiveRecord::RecordInvalid: Validation failed: Email has already been taken
```

You can see the error in the last line `ActiveRecord::RecordInvalid: Validation failed: Email has already been taken`. But the interesting thing here is that Kafka will try to process the event, again and again. Even if you restart the Karafka server, it will try to process the last event. How does Kafka know where to start?

If you see your console, after the error, you will see this:

```bash
[[karafka_example] {users: 0}:] Exception raised when processing users/0 at offset 42
```

It will tell you which offset was processed: in this case, it was offset 42. So, if you restart the Karafka service, it will start in that offset.

```bash
[[karafka_example] {}:] Committing offsets with recommit: users/0:42 [[karafka_example] {users: 0}:] Fetching batches
```

It will still fail because we have the email validation in our User model. At this point, stop the Karafka server, remove or comment that validation, and start your server again; you’ll see how the event is processed successfully:

```bash
[[karafka_example] {}:] Committing offsets with recommit: users/0:42 [[karafka_example] {}:] [offset_commit] Sending offset_commit API request 5 to 192.168.1.77:9092 [[karafka_example] {}:] [offset_commit] Waiting for response 5 from 192.168.1.77:9092 [[karafka_example] {}:] [offset_commit] Received response 5 from 192.168.1.77:9092 Params deserialization for users topic successful in 0 ms New [User] event: {"event_name"=>"user_created", "user"=>{"user_id"=>2, "email"=>"batman@mail.com", "first_name"=>"Bruce", "last_name"=>"Wayne"}} TRANSACTION (0.2ms)  BEGIN ↳ app/consumers/users_consumer.rb:14:in `consume' User Create (3.8ms)  INSERT INTO "users" ("user_id", "email", "first_name", "last_name", "created_at", "updated_at") VALUES ($1, $2, $3, $4, $5, $6) RETURNING "id"  [["user_id", "2"], ["email", "batman@mail.com"], ["first_name", "Bruce"], ["last_name", "Wayne"], ["created_at", "2021-03-10 04:49:37.832452"], ["updated_at", "2021-03-10 04:49:37.832452"]] ↳ app/consumers/users_consumer.rb:14:in `consume' TRANSACTION (5.5ms)  COMMIT ↳ app/consumers/users_consumer.rb:14:in `consume' Inline processing of topic users with 1 messages took 69 ms 1 message on users topic delegated to UsersConsumer [[karafka_example] {}:] Marking users/0:43 as processed
```

Finally, you can see this message in the last line: `Marking users/0:43 as processed`.

## Callbacks

This is something cool that Karafka offers: you can use callbacks in your Consumer. To do that, you only need to import the module and use them. Then, open your `UserConsumer` and add this:

```ruby
class UsersConsumer < ApplicationConsumer include Karafka::Consumers::Callbacks before_poll do Karafka.logger.info "*** Checking something new for #{topic.name}" end after_poll do Karafka.logger.info '*** We just checked for new messages!' end def consume Karafka.logger.info "New [User] event: #{params.payload}" User.create!(params.payload['user']) end end
```

Poll is the medium through which we fetch records based on the current partition offset. So, those callbacks `before_poll` and `after_poll`, like their name suggests, are executed at that moment. We are just logging a message, and you can see them in your Karafka server—one before fetching and the other one after that:

```bash
*** Checking something new for users [[karafka_example] {}:] No batches to process [[karafka_example] {users: 0}:] [fetch] Received response 325 from 192.168.1.77:9092 [[karafka_example] {users: 0}:] Fetching batches [[karafka_example] {users: 0}:] [fetch] Sending fetch API request 326 to 192.168.1.77:9092 [[karafka_example] {users: 0}:] [fetch] Waiting for response 326 from 192.168.1.77:9092 *** We just checked for new messages!
```

## Heartbeats

A heartbeat is just the way we, as consumers, say to Kafka we are alive; otherwise, Kafka will assume that the consumer is dead.

In Karafka, we have a default config to do this in a period of time; it is `kafka.heartbeat_interval` and the default is 10 seconds. You can see this heartbeat in your Karafka server.

```bash
*** Checking something new for users [[karafka_example_example] {}:] Sending heartbeat... [[karafka_example_example] {}:] [heartbeat] Sending heartbeat API request 72 to 192.168.1.77:9092 [[karafka_example_example] {}:] [heartbeat] Waiting for response 72 from 192.168.1.77:9092 [[karafka_example_example] {}:] [heartbeat] Received response 72 from 192.168.1.77:9092 *** We just checked for new messages!
```

With `Sending heartbeat...`, Kafka knows that we are alive and we are a valid member of its consumer group. Also, we can consume more records.

## Commit

Marking an offset as consumed is called committing an offset. In Kafka, we record offset commits by writing to an internal Kafka topic called the offsets topic. A message is considered consumed only when its offset is committed to the offsets topic.

Karafka has a config to carry out this commit automatically each time; the config is `kafka.offset_commit_interval`, and its value is 10 seconds by default. With this, Karakfa will do an offset commit every 10 seconds, and you can view that message in your Karafka server:

```bash
*** Checking something new for users [[karafka_example] {}:] No batches to process [[karafka_example] {users: 0}:] [fetch] Received response 307 from 192.168.1.77:9092 [[karafka_example] {users: 0}:] Fetching batches [[karafka_example] {users: 0}:] [fetch] Sending fetch API request 308 to 192.168.1.77:9092 [[karafka_example] {users: 0}:] [fetch] Waiting for response 308 from 192.168.1.77:9092 [[karafka_example] {}:] Committing offsets: users/0:44 [[karafka_example] {}:] [offset_commit] Sending offset_commit API request 69 to 192.168.1.77:9092 [[karafka_example] {}:] [offset_commit] Waiting for response 69 from 192.168.1.77:9092 [[karafka_example] {}:] [offset_commit] Received response 69 from 192.168.1.77:9092 *** We just checked for new messages!
```

The `Committing offsets: users/0:44` tell us which offset it’s committing; in my case, it told Kafka that it can commit the offset number 44 from topic 0. In this way, if something happens with our service, Karafka can start again to process events from that offset.

## Conclusion

Event streaming helps us to be faster, to make better use of data, and to design better user experiences. As a matter of fact, many companies are using this pattern to communicate all their services and to be able to react to different events in real time. As I mentioned before, there are other alternatives apart from Karafka that you can use with Rails. You already have the basics; now, feel free to experiment with them.

## References

- [https://kafka.apache.org/](https://kafka.apache.org/)
- [https://github.com/karafka/karafka](https://github.com/karafka/karafka)
- [https://en.wikipedia.org/wiki/Publish%E2%80%93subscribe\_pattern](https://en.wikipedia.org/wiki/Publish%E2%80%93subscribe_pattern)

---

## Try Honeybadger for FREE

Intelligent logging, error tracking, and Just Enough APM™ in one dev-friendly platform. Find and fix problems before users notice.

[Start free trial](https://app.honeybadger.io/users/sign_up)

[See plans and pricing](https://www.honeybadger.io/plans/)
