Event Streaming in Rails with Kafka

Do you need to process a lot of data in real time? Event streaming is a pattern that could help. David Sanchez walks us through how to do event streaming in Rails with Apache Kafka, the popular open-source event streaming platform.

Companies want to react quickly to the need of processing and sharing large amounts of data in real time to gain insights and create more engaging customer experiences. So, traditional data processing is no longer viable in today’s world.

To achieve that, you need to process a lot of data as fast as possible and then send it to other services for more processing. But in the middle of all these quick actions, it's necessary to notify consumers when the event occurs—and we can do just that using event streaming.

This is the repo in GitHub that we will be using.

Events

Before talking about event streaming, let’s talk about what an event is. An event that happens within an application can be related to a user process or simply actions that affect the business.

Events represent a state change, not the question of how to modify the application. Consider these as examples:

  • A user logging into a service
  • A payment transaction
  • A writer publishing a post in a blog

In most cases, an event will trigger more events; for example, when a user signs up for a service, the app sends a notification to their device, inserts the record in the database, and sends a welcoming email.

Event streaming

Event Streaming is a pattern for capturing data in real time from event sources such as databases. The main parts of event streaming are as follows:

  • Broker: The system in charge of storing events
  • Topic: A category of events
  • Producer: Sends events to a broker on a specific topic
  • Consumer: Reads the events
  • Events: Data that producers want to communicate to consumers

It is inevitable to talk about publish and subscribe architecture pattern (pub/sub pattern) at this point; event streaming is an implementation of that pattern but with these changes:

  • Events occur instead of messages.
  • Events are ordered, typically by time.
  • Consumers can read events from a specific point in the topic.
  • The events have temporal durability.

The flow starts when the producer publishes a new event into a topic (as we saw previously, the topic is just the categorization for a specific type of event). Then, consumers interested in events of a particular category subscribe to that topic. Finally, the broker identifies the consumers of the topic and makes the desired events available.

Advantages of event streaming

  • Decoupling There's no dependency between publishers and consumers because they don't need to know each other. In addition, the events don't specify their actions, so many consumers could get the same event and perform different actions.

  • Low Latency Events are decoupled and let the consumer utilize them anytime; it can happen in milliseconds.

  • Independence As we know, publishers and consumers are independent, so different teams can work with them using the same events for other actions or purposes.

  • Fault Tolerance Some event streaming platforms help you deal with consumer failures; for example, consumers can save their position and start from there again if an error occurs.

  • Real-Time Handling Feedback is received in real time, so the users don't need to wait minutes or hours to see the response of their events.

  • High Performance Event platforms can handle many messages due to the low latency—for example, thousands of events in a second.

Disadvantages of event streaming

  • Monitoring Some event streaming tools don't have a complete monitoring tool; they call for additional tools to be implemented, such as Datadog or New Relic.

  • Configuration The configuration in some tools can be overwhelming even for experienced people. There are many parameters, and sometimes, you need to know in depth about the subject to implement them.

  • Client libraries It isn't easy to implement Kafka in languages other than Java. Sometimes, the client libraries are not up to date, show instability, or don't offer many alternatives to choose from.

One of the most popular tools for event streaming is Apache Kafka. This tool allows users to send, store, and request data whenever and wherever they need it; let's talk about it.

Apache Kafka

"Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications."

Being specifically designed for real-time log transmission, Apache Kafka is ideal for applications that require the following:

  • Reliable data exchange between different components
  • The ability to divide messaging workloads as application requirements change
  • Real-time transmission for data processing

Let's use Kafka in a Rails application!

Using Kafka with Rails

The most famous gem to use Kafka in Ruby is called ruby-kafka by Zendesk, and it is great! Still, you need to do all the implementation manually, which is why we have some "frameworks" built with ruby-kafka. They also help us with all the configuration and execution steps.

Karafka is a framework used to simplify Apache Kafka-based Ruby applications development.

To work with Kafka, it is necessary to install Java. Because Kafka is a Scala and Java application also, installing Zookeeper will be required.

Before the installation, I want to explain a bit about Zookeeper. Zookeeper is a centralized service essential for Kafka; it sends notifications in case of changes such as the creation of a new topic, crash of a broker, removal of a broker, deletion of topics, and so on.

Its main task is to manage Kafka brokers, maintain a list with their respective metadata, and facilitate health-checking mechanisms. In addition, it helps to select the leading broker for different partitions of the topics.

Requirements

For MacOS:

Now, let's install Java and Zookeeper with the following commands:

brew install java
brew install zookeeper

Then, we can continue installing Kafka running this:

brew install kafka

Once we have Kafka and Zookeeper installed, it's necessary to start the services this way:

brew services start zookeeper
brew services start kafka

For Windows and Linux:

Instructions:

  1. Installing Java
  2. Download Zookeeper

Setting Up Rails

Just create a simple Rails application as usual:

rails new karafka_example

and add the karafka gem within the Gemfile:

gem 'karafka'

Then, run bundle install to install the gem recently added, and don't forget to run the following command to get all the Karafka things:

bundle exec karafka install

That command should generate some interesting files: the first one is karafka.rb in the root directory, app/consumers/application_consumer.rb, and app/responders/application_responder.rb.

Karafka Initializer

The karafka.rb file is like an initializer application separated from Rails config. It allows you to configure the Karafka application and draw some routes, similar in terms of API as Rails application routes. But here, it’s for topics and consumers.

Producer

The producer is in charge of creating the events, and we can add them into the app/responders folder. Now, let’s make a simple producer for users:

# app/responders/users_responder.rb

class UsersResponder < ApplicationResponder
  topic :users

  def respond(event_payload)
    respond_to :users, event_payload
  end
end

Consumer

The consumer is responsible for reading all the events/messages sent from the producer. This is just a consumer that logs the received message.

# app/consumers/users_consumer.rb

class UsersConsumer < ApplicationConsumer
  def consume
    Karafka.logger.info "New [User] event: #{params}"
  end
end

We use params to get the event. But if you'll read events in batches and you have the config config.batch_fetching as true, you should use params_batch.

Testing

To run our Karafka service (the one that will be hearing the events), go to the console, open a new tab, go to the Rails project, and run:

bundle exec karafka server

Successful Event

Now, open another console tab, go to the Rails project, and type this:

rails c

There, let’s create an event with our responder:

> UsersResponder.call({ event_name: "user_created", payload: { user_id: 1 } })

If you check the Rails console, we will receive this message after the event is created:

Successfully appended 1 messages to users/0 on 192.168.1.77:9092 (node_id=0)
=> {"users"=>[["{\"event_name\":\"user_created\",\"payload\":{\"user_id\":1}}", {:topic=>"users"}]]}

And in the Karafka service tab, you’ll see something like this:

New [User] event: #<Karafka::Params::Params:0x00007fa76f0316c8>
Inline processing of topic users with 1 messages took 0 ms
1 message on users topic delegated to UsersConsumer
[[karafka_example] {}:] Marking users/0:1 as processed
[[karafka_example] {}:] Committing offsets: users/0:2
[[karafka_example] {}:] [offset_commit] Sending offset_commit API request 28 to 192.168.1.77:9092

But if you just want the message payload, you can add params.payload in your consumer and you will have something like this:

Params deserialization for users topic successful in 0 ms
New [User] event: {"event_name"=>"user_created", "payload"=>{"user_id"=>1}}
Inline processing of topic users with 1 messages took 1 ms
1 message on users topic delegated to UsersConsumer

Failed Event

You can create a User model with some attributes like email, first_name and last_name running the following command:

rails g model User email first_name last_name

Then, you can run the migration with this:

rails db:migrate

Now, add some validations like this:

class User < ApplicationRecord
  validates :email, uniqueness: true
end

Finally, we can change the consumer:

class UsersConsumer < ApplicationConsumer
  def consume
    Karafka.logger.info "New [User] event: #{params.payload}"
    User.create!(params.payload['user'])
  end
end

So, let's create two events with the same email:

UsersResponder.call({ event_name: "user_created", user: { user_id: 1, email: 'batman@mail.com', first_name: 'Bruce', last_name: 'Wayne' } } )

UsersResponder.call({ event_name: "user_created", user: { user_id: 2, email: 'batman@mail.com', first_name: 'Bruce', last_name: 'Wayne' } } )

With this, the first event is created in the database:

New [User] event: {"event_name"=>"user_created", "user"=>{"user_id"=>1, "email"=>"batman@mail.com", "first_name"=>"Bruce", "last_name"=>"Wayne"}}
[[karafka_example] {users: 0}:] [fetch] Received response 2 from 192.168.1.77:9092
[[karafka_example] {users: 0}:] Fetching batches
[[karafka_example] {users: 0}:] [fetch] Sending fetch API request 3 to 192.168.1.77:9092
[[karafka_example] {users: 0}:] [fetch] Waiting for response 3 from 192.168.1.77:9092
  TRANSACTION (0.1ms)  BEGIN
  ↳ app/consumers/users_consumer.rb:14:in `consume'
  User Create (9.6ms)  INSERT INTO "users" ("user_id", "email", "first_name", "last_name", "created_at", "updated_at") VALUES ($1, $2, $3, $4, $5, $6) RETURNING "id"  [["user_id", "1"], ["email", "batman@mail.com"], ["first_name", "Bruce"], ["last_name", "Wayne"], ["created_at", "2021-03-10 04:29:14.827778"], ["updated_at", "2021-03-10 04:29:14.827778"]]
  ↳ app/consumers/users_consumer.rb:14:in `consume'
  TRANSACTION (5.0ms)  COMMIT
  ↳ app/consumers/users_consumer.rb:14:in `consume'
Inline processing of topic users with 1 messages took 70 ms
1 message on users topic delegated to UsersConsumer

But the second one will fail, because we have a validation that says the email is unique. If you try to add another record with an existing email, you will see something like this:

New [User] event: {"event_name"=>"user_created", "user"=>{"user_id"=>2, "email"=>"batman@mail.com", "first_name"=>"Bruce", "last_name"=>"Wayne"}}
[[karafka_example] {users: 0}:] [fetch] Received response 2 from 192.168.1.77:9092
[[karafka_example] {users: 0}:] Fetching batches
[[karafka_example] {users: 0}:] [fetch] Sending fetch API request 3 to 192.168.1.77:9092
[[karafka_example] {users: 0}:] [fetch] Waiting for response 3 from 192.168.1.77:9092
  TRANSACTION (0.2ms)  BEGIN
  ↳ app/consumers/users_consumer.rb:14:in `consume'
  User Exists? (0.3ms)  SELECT 1 AS one FROM "users" WHERE "users"."email" = $1 LIMIT $2  [["email", "batman@mail.com"], ["LIMIT", 1]]
  ↳ app/consumers/users_consumer.rb:14:in `consume'
  TRANSACTION (0.2ms)  ROLLBACK
  ↳ app/consumers/users_consumer.rb:14:in `consume'
[[karafka_example] {users: 0}:] Exception raised when processing users/0 at offset 42 -- ActiveRecord::RecordInvalid: Validation failed: Email has already been taken

You can see the error in the last line ActiveRecord::RecordInvalid: Validation failed: Email has already been taken. But the interesting thing here is that Kafka will try to process the event, again and again. Even if you restart the Karafka server, it will try to process the last event. How does Kafka know where to start?

If you see your console, after the error, you will see this:

[[karafka_example] {users: 0}:] Exception raised when processing users/0 at offset 42

It will tell you which offset was processed: in this case, it was offset 42. So, if you restart the Karafka service, it will start in that offset.

[[karafka_example] {}:] Committing offsets with recommit: users/0:42
[[karafka_example] {users: 0}:] Fetching batches

It will still fail because we have the email validation in our User model. At this point, stop the Karafka server, remove or comment that validation, and start your server again; you’ll see how the event is processed successfully:

[[karafka_example] {}:] Committing offsets with recommit: users/0:42
[[karafka_example] {}:] [offset_commit] Sending offset_commit API request 5 to 192.168.1.77:9092
[[karafka_example] {}:] [offset_commit] Waiting for response 5 from 192.168.1.77:9092
[[karafka_example] {}:] [offset_commit] Received response 5 from 192.168.1.77:9092
Params deserialization for users topic successful in 0 ms
New [User] event: {"event_name"=>"user_created", "user"=>{"user_id"=>2, "email"=>"batman@mail.com", "first_name"=>"Bruce", "last_name"=>"Wayne"}}
  TRANSACTION (0.2ms)  BEGIN
  ↳ app/consumers/users_consumer.rb:14:in `consume'
  User Create (3.8ms)  INSERT INTO "users" ("user_id", "email", "first_name", "last_name", "created_at", "updated_at") VALUES ($1, $2, $3, $4, $5, $6) RETURNING "id"  [["user_id", "2"], ["email", "batman@mail.com"], ["first_name", "Bruce"], ["last_name", "Wayne"], ["created_at", "2021-03-10 04:49:37.832452"], ["updated_at", "2021-03-10 04:49:37.832452"]]
  ↳ app/consumers/users_consumer.rb:14:in `consume'
  TRANSACTION (5.5ms)  COMMIT
  ↳ app/consumers/users_consumer.rb:14:in `consume'
Inline processing of topic users with 1 messages took 69 ms
1 message on users topic delegated to UsersConsumer
[[karafka_example] {}:] Marking users/0:43 as processed

Finally, you can see this message in the last line: Marking users/0:43 as processed.

Callbacks

This is something cool that Karafka offers: you can use callbacks in your Consumer. To do that, you only need to import the module and use them. Then, open your UserConsumer and add this:

class UsersConsumer < ApplicationConsumer
  include Karafka::Consumers::Callbacks

  before_poll do
    Karafka.logger.info "*** Checking something new for #{topic.name}"
  end

  after_poll do
    Karafka.logger.info '*** We just checked for new messages!'
  end

  def consume
    Karafka.logger.info "New [User] event: #{params.payload}"
    User.create!(params.payload['user'])
  end
end

Poll is the medium through which we fetch records based on the current partition offset. So, those callbacks before_poll and after_poll, like their name suggests, are executed at that moment. We are just logging a message, and you can see them in your Karafka server—one before fetching and the other one after that:

*** Checking something new for users
[[karafka_example] {}:] No batches to process
[[karafka_example] {users: 0}:] [fetch] Received response 325 from 192.168.1.77:9092
[[karafka_example] {users: 0}:] Fetching batches
[[karafka_example] {users: 0}:] [fetch] Sending fetch API request 326 to 192.168.1.77:9092
[[karafka_example] {users: 0}:] [fetch] Waiting for response 326 from 192.168.1.77:9092
*** We just checked for new messages!

Heartbeats

A heartbeat is just the way we, as consumers, say to Kafka we are alive; otherwise, Kafka will assume that the consumer is dead.

In Karafka, we have a default config to do this in a period of time; it is kafka.heartbeat_interval and the default is 10 seconds. You can see this heartbeat in your Karafka server.

*** Checking something new for users
[[karafka_example_example] {}:] Sending heartbeat...
[[karafka_example_example] {}:] [heartbeat] Sending heartbeat API request 72 to 192.168.1.77:9092
[[karafka_example_example] {}:] [heartbeat] Waiting for response 72 from 192.168.1.77:9092
[[karafka_example_example] {}:] [heartbeat] Received response 72 from 192.168.1.77:9092
*** We just checked for new messages!

With Sending heartbeat..., Kafka knows that we are alive and we are a valid member of its consumer group. Also, we can consume more records.

Commit

Marking an offset as consumed is called committing an offset. In Kafka, we record offset commits by writing to an internal Kafka topic called the offsets topic. A message is considered consumed only when its offset is committed to the offsets topic.

Karafka has a config to carry out this commit automatically each time; the config is kafka.offset_commit_interval, and its value is 10 seconds by default. With this, Karakfa will do an offset commit every 10 seconds, and you can view that message in your Karafka server:

*** Checking something new for users
[[karafka_example] {}:] No batches to process
[[karafka_example] {users: 0}:] [fetch] Received response 307 from 192.168.1.77:9092
[[karafka_example] {users: 0}:] Fetching batches
[[karafka_example] {users: 0}:] [fetch] Sending fetch API request 308 to 192.168.1.77:9092
[[karafka_example] {users: 0}:] [fetch] Waiting for response 308 from 192.168.1.77:9092
[[karafka_example] {}:] Committing offsets: users/0:44
[[karafka_example] {}:] [offset_commit] Sending offset_commit API request 69 to 192.168.1.77:9092
[[karafka_example] {}:] [offset_commit] Waiting for response 69 from 192.168.1.77:9092
[[karafka_example] {}:] [offset_commit] Received response 69 from 192.168.1.77:9092
*** We just checked for new messages!

The Committing offsets: users/0:44 tell us which offset it’s committing; in my case, it told Kafka that it can commit the offset number 44 from topic 0. In this way, if something happens with our service, Karafka can start again to process events from that offset.

Conclusion

Event streaming helps us to be faster, to make better use of data, and to design better user experiences. As a matter of fact, many companies are using this pattern to communicate all their services and to be able to react to different events in real time. As I mentioned before, there are other alternatives apart from Karafka that you can use with Rails. You already have the basics; now, feel free to experiment with them.

References

What to do next:
  1. Try Honeybadger for FREE
    Honeybadger helps you find and fix errors before your users can even report them. Get set up in minutes and check monitoring off your to-do list.
    Start free trial
    Easy 5-minute setup — No credit card required
  2. Get the Honeybadger newsletter
    Each month we share news, best practices, and stories from the DevOps & monitoring community—exclusively for developers like you.
    author photo

    David Sanchez

    Software Engineer and full-time Dad. I love dogs, soccer, and Ruby/Rails also I'm passionate about coding, learning, and teaching others new things.

    More articles by David Sanchez
    Stop wasting time manually checking logs for errors!

    Try the only application health monitoring tool that allows you to track application errors, uptime, and cron jobs in one simple platform.

    • Know when critical errors occur, and which customers are affected.
    • Respond instantly when your systems go down.
    • Improve the health of your systems over time.
    • Fix problems before your customers can report them!

    As developers ourselves, we hated wasting time tracking down errors—so we built the system we always wanted.

    Honeybadger tracks everything you need and nothing you don't, creating one simple solution to keep your application running and error free so you can do what you do best—release new code. Try it free and see for yourself.

    Start free trial
    Simple 5-minute setup — No credit card required

    Learn more

    "We've looked at a lot of error management systems. Honeybadger is head and shoulders above the rest and somehow gets better with every new release."
    — Michael Smith, Cofounder & CTO of YvesBlue

    Honeybadger is trusted by top companies like:

    “Everyone is in love with Honeybadger ... the UI is spot on.”
    Molly Struve, Sr. Site Reliability Engineer, Netflix
    Start free trial
    Are you using Sentry, Rollbar, Bugsnag, or Airbrake for your monitoring? Honeybadger includes error tracking with a whole suite of amazing monitoring tools — all for probably less than you're paying now. Discover why so many companies are switching to Honeybadger here.
    Start free trial
    Stop digging through chat logs to find the bug-fix someone mentioned last month. Honeybadger's built-in issue tracker keeps discussion central to each error, so that if it pops up again you'll be able to pick up right where you left off.
    Start free trial
    “Wow — Customers are blown away that I email them so quickly after an error.”
    Chris Patton, Founder of Punchpass.com
    Start free trial