Kafka on Ruby

Laszlo Papp
Zendesk Engineering
8 min readFeb 17, 2020

--

Kafka is written in Scala and Java and you can get great support and tools if you’re using these languages. On the other side, if you use another language like Ruby, you can run into unmatured libraries and small developer communities. I want to show you some tools in Ruby that can help you to start using Kafka, and which library or framework might be the best choice for you if you’re on the fence about which tool would fit your requirements. I’ll show how you can start producing and consuming messages quickly with the Kafka libraries and how different frameworks can help you when your system becomes bigger.

What is Kafka?

Kafka is a distributed pub-sub messaging system that keeps track of streams of events, very similar to log files where applications log events stored as a stream and written to the disk, usually into the /var/log folder on Unix systems. Multiple processes of an application can log messages into the same file, while on the other side, log processors convert these messages into a readable format, for instance converting a plain text into JSON format and store it in Elastic Search as a document or sending an alert when the log severity is low. In my current example, the applications behave as producers and converters behave as consumers.

What Kafka actually does is something very similar to what we do in Unix shells when we write or read a stream of lines in a file:

  • producer: echo “hello world" >> /var/log/log_file;
  • consumer: tail -f /var/log/logfile | consumer_app

Ruby Libraries and Frameworks

Right now there are two popular open-source Kafka libraries for Ruby, ruby-kafka and ruby-rdkafka. ruby-kafka from Zendesk is fully written in Ruby, while ruby-rdkafka is a wrapper around the rdkafka C++ library. These libraries support both writings and readings of Kafka streams and you can use them to produce and consume events on Kafka streams. There are some open-source frameworks, Racecar, Phobos and Karafka that help you to manage multiple producers and consumers, also to organize them into groups, application units, and processes. These frameworks currently are built on the top of theruby-kafka library but some frameworks are moving to ruby-rdkafka in their early-version releases.

Kafka Libraries and Frameworks in Ruby

Producers

Producing messages are really like writing into a file, I’ll show you how you can do that with ruby-kafka and rdkafka.

Rdkafka

rdkafka is a C++ wrapper around librdkafka, therefore using this library requires a C++ compiler installed on your environment. To set up a producer, you will need to specify some configuration details, for instance, the address of your bootstrap servers where the Kafka brokers are running. In my case, it’s running on thekafka.docker host, on the 9092 default port. If you use a docker image, probably your broker address is localhost:9092. To start sending messages, we need to create a producer according to our configuration and call the produce method on this instance that automatically emits events to Kafka.

require 'ruby-rdkafka'config = { "bootstrap.servers": "kafka.docker:9092" }
producer = Rdkafka::Config.new(config).producer
producer.produce(topic: "test", payload: "Hello World!")

ruby-rdkafka emits events in async, the message will be written to a queue and you need call wait on the producer, if you need a sync request:

producer.produce(topic: "test", payload: "Hello World!").wait

Ruby-kafka

In ruby-kafka it’s very similar to rdkafka, the constructor of Kafka expects the Kafka brokers and then calling deliver_message will write to the stream.

require "kafka"kafka = Kafka.new(["kafka.docker:9092"])
kafka.deliver_message("Hello World!", topic: "test")

deliver_message is a sync operator, the function won’t return until the message is written into the wire. If you don’t necessarily need to know the result of deliver_message, you can send the message async, in this case ruby-kafka will maintain a thread that manages a queue in the background.

kafka.async_producer.produce("Hellow World", topic: "test")

note: async sending happens in the background, ruby-kafka maintains a queue for pending messages and will write all messages to the wire in a background thread. Make sure you don’t have unsent messages in the queue when your process terminates.

Consumers

Both ruby-kafka and rdkafka provides solutions for consuming messages. There are multiple frameworks that wrap these libraries into a complete platform and make it easy to add and scale consumers. Let’s start first with some basic consumers in rdkafka and ruby-kafka.

Rdkafka

The configuration of the consumer is similar to the producer’s config, the bootstrap.servers option needs to be specified to know where the Kafka server is located, but there’s an additional group.id setting that we need to specify. Group ID defines the Kafka group name which the consumer will join. When your consumer starts processing messages, the Kafka broker will keep in track the last message that the consumer group processed successfully. If you use the same group id, you can stop your consumer any time, next time it’s going to process the next unprocessed message, regardless of how long it was stopped.

require 'rdkafka'config = { 
"bootstrap.servers": "kafka.docker:9092",
"group.id": "ruby-test"
}
consumer = Rdkafka::Config.new(config).consumer

Now there’s a consumer instance, we just need to specify the topic that it will read from and a basic iterator that going to be yielded when a message was written to the topic.

consumer.subscribe('test')consumer.each do |message|
puts "Message received: #{message}"
end

Ruby-kafka

The configuration is similar to the producer, in the constructor we need to pass the docker hostname and port number

require "kafka"kafka = Kafka.new(["kafka.docker:9092"])consumer = kafka.consumer(group_id: "ruby-test")

Consuming the messages are very similar to the way inrdkafka , the consumer needs to subscribe to a topic and iterate on the messages with the each_message iterator.

consumer.subscribe('test')consumer.each_message do |message|
puts "Message received: #{message}"
end

Frameworks

Using only the libraries may help you to start processing messages from a topic quickly, especially when you’re working a small script that requires some data from Kafka. When you use each or each_message methods above provided by the libraries, you need to take into consideration that they are blocking the execution flow, therefore you need to use threads or background processes if you want to consume multiple topics concurrently. When you work with a multi-thread environment, there are certain things you need to deal with, e.g. graceful shutdown, backoff strategy. The following frameworks can help you to avoid some head-aches by putting the basic consumer for-each loops into threads and processes and providing configs to manage them in an easy way. Since producing messages is quite simple by using the libraries, in the following sections I would focus on consuming messages only.

Racecar

It’s one of the simplest Kafka frameworks. Racecar wraps only one consumer into a process and the framework handles everything you need in a production environment like instrumentation and graceful termination when the process gets a SIGTERM signal. With racecar you don’t need to worry about Kafka consumer groups, everything happens behind the hood.

To start playing with Racecar, you need to add the racecar gem into your project’s Gemfile:

# Gemfilegem 'racecar'

and implement a class that is inherited from Racecar::Consumer. Here is an example for a consumer that consumes the test topic. If you are using Rails, it’s recommended to put your consumers into the /app/consumers folder.

# /app/consumers/test_consumer.rbclass TestConsumer < Racecar::Consumer
def process(message)
puts message.inspect
end
end

When Racecar boots up it creates an instance of your class and calls the process method on it every time a message is read from the topic. Because there’s only one consumer instance being created during the boot, instance variables will be shared between the requests, that’s why it’s strongly recommended to not store states in instance variables on a multi-tenant system.

You can start consuming messages with passing the class name to the racecar application:

bundle exec racecar TestConsumer

Phobos

When you have multiple consumers, you might want to add them into the same process, especially when you work with Rails and you need to load the whole application into the memory before. Putting consumers into separated processes can multiply the memory usage of your application that could add extra cost to your cloud budget.

The big advantage of Phobos comparing to Racecar is you can specify what consumers you want to execute in the same process and how many concurrent consumers, which is very useful if your topic contains multiple partitions.

To add Phobos into your project, add the phobos gem into your Gemfile.

# Gemfilegem 'phobos'

You can initialize it and generating the configuration by executingphobos init in your terminal.

# /app/consumers/test_consumer.rbclass TestConsumer
include Phobos::Handler
def consume(payload, metadata)
puts metadata
puts payload
end
end

phobos init will create a config file for you into /app/config/phobos.yml if you’re using Rails. The following configuration will create twoTestConsumers, they will consume messages from the test topic and will join thetest-consumer-group Kafka consumer group.

# /app/config/phobos.ymlkafka: 
seed_brokers:
- kafka.docker:9092
listeners:
- handler: TestConsumer
topic: test
group_id: test-consumer-group
max_concurrency: 2 # run it on 2 threads

If you want to add more consumers to the same process, you need to create a new handler with a similar configuration to the TestConsumer. In this case, the same process will execute multiple threads for each consumer instance.

You can start the consumers by

bundle exec phobos start

If you want to put consumers into processes to balance your load in your production environment then you need to create different config files for each group and pass the path of the config to phobos start :

bundle exec phobos start -c config/test_consumers.yml
bundle exec phobos start -c config/user_events.yml

Karafka

Phobos and Racecar are very small frameworks, if I wanted to compare them to Ruby Web frameworks then I would say Racecar is like Sinatra, Phobos is like Padrino and Karafka is like Rails.

To start using Karafka in your project, add thekarafka gem into the Gemfile.

# Gemfilegem 'karafka'

after executing bundle install just run the following command to set up your environment and get the default configuration file.

bundle exec karafka install

In the root folder of your application, you should get a karafka.rb file, the configuration file that describes your environment and the routing definitions.

# /karafka.rbclass KarafkaApp < Karafka::App
config.kafka.seed_brokers = %w[kafka://kafka.docker:9092]
config.client_id = 'example_app'
end
consumer_groups.draw do
topic :test do
consumer TestConsumer
end
consumer_group :user_events do
topic :user_updates
consumer UserUpdates
end
topic :user_notifications do
consumer UserNotifications
end
end
end

consumer_groups.draw describes topics and other consumer_groups. Each topic and consumer_group in theconsumer_groups.draw block is going to be executed on its own thread, in the example above there are going to be 2 threads, one for the TestConsumer and another for the user_events consumer group. If you want to scale out and run it on multiple processes, you need to start multiple Karafka apps.

Let’s see how you can add a basic TestConsumerinto your project. Under the /app/consumers create a file with test_consumer.rb filename:

# /app/consumers/test_consumer.rbclass TestConsumer < Karafka::BaseConsumer
def consume
# You can reach the message in the params like in Rails
puts params.inspect
end
end

Now there’s nothing more left just to start your Karafka application by

bundle exec karafka server

If you want to start only certain consumer groups, you can pass the consumer group names as extra parameters

bundle exec karafka server --consumer_groups user_events

in the current example, only consumers that belong to the user_events group are going to be executed only.

Karafka is a massive framework with lots of configuration options and consumer features, you can find more details in their documentation.

Summary

These libraries and frameworks can help you start integrating Kafka with your application and start producing and consuming messages quickly. I didn’t dive into the very details but they also provide you rich configuration for optimizing your producers and consumers. The Racecar framework is a nice lightweight tool to start using Kafka quickly, but as soon as the number of the consumers increases you might need to consider using Phobos or Karafka because they can manage consumer groups, pipelines better.

Please like the page if you want to hear more about Kafka and Ruby and in another post, I can dive into the details. Thanks.

--

--