Kafka on Ruby
Kafka is written in Scala and Java and you can get great support and tools if you’re using these languages. On the other side, if you use another language like Ruby, you can run into unmatured libraries and small developer communities. I want to show you some tools in Ruby that can help you to start using Kafka, and which library or framework might be the best choice for you if you’re on the fence about which tool would fit your requirements. I’ll show how you can start producing and consuming messages quickly with the Kafka libraries and how different frameworks can help you when your system becomes bigger.
What is Kafka?
Kafka is a distributed pub-sub messaging system that keeps track of streams of events, very similar to log files where applications log events stored as a stream and written to the disk, usually into the /var/log
folder on Unix systems. Multiple processes of an application can log messages into the same file, while on the other side, log processors convert these messages into a readable format, for instance converting a plain text into JSON format and store it in Elastic Search as a document or sending an alert when the log severity is low. In my current example, the applications behave as producers and converters behave as consumers.
What Kafka actually does is something very similar to what we do in Unix shells when we write or read a stream of lines in a file:
- producer:
echo “hello world" >> /var/log/log_file;
- consumer:
tail -f /var/log/logfile | consumer_app
Ruby Libraries and Frameworks
Right now there are two popular open-source Kafka libraries for Ruby, ruby-kafka
and ruby-rdkafka
. ruby-kafka
from Zendesk is fully written in Ruby, while ruby-rdkafka
is a wrapper around the rdkafka
C++ library. These libraries support both writings and readings of Kafka streams and you can use them to produce and consume events on Kafka streams. There are some open-source frameworks, Racecar
, Phobos
and Karafka
that help you to manage multiple producers and consumers, also to organize them into groups, application units, and processes. These frameworks currently are built on the top of theruby-kafka
library but some frameworks are moving to ruby-rdkafka
in their early-version releases.
Producers
Producing messages are really like writing into a file, I’ll show you how you can do that with ruby-kafka
and rdkafka.
Rdkafka
rdkafka
is a C++ wrapper around librdkafka
, therefore using this library requires a C++ compiler installed on your environment. To set up a producer, you will need to specify some configuration details, for instance, the address of your bootstrap servers where the Kafka brokers are running. In my case, it’s running on thekafka.docker
host, on the 9092
default port. If you use a docker image, probably your broker address is localhost:9092
. To start sending messages, we need to create a producer according to our configuration and call the produce
method on this instance that automatically emits events to Kafka.
require 'ruby-rdkafka'config = { "bootstrap.servers": "kafka.docker:9092" }
producer = Rdkafka::Config.new(config).producer
producer.produce(topic: "test", payload: "Hello World!")
ruby-rdkafka
emits events in async, the message will be written to a queue and you need call wait
on the producer, if you need a sync request:
producer.produce(topic: "test", payload: "Hello World!").wait
Ruby-kafka
In ruby-kafka
it’s very similar to rdkafka
, the constructor of Kafka
expects the Kafka brokers and then calling deliver_message
will write to the stream.
require "kafka"kafka = Kafka.new(["kafka.docker:9092"])
kafka.deliver_message("Hello World!", topic: "test")
deliver_message
is a sync operator, the function won’t return until the message is written into the wire. If you don’t necessarily need to know the result of deliver_message
, you can send the message async, in this case ruby-kafka
will maintain a thread that manages a queue in the background.
kafka.async_producer.produce("Hellow World", topic: "test")
note: async sending happens in the background, ruby-kafka
maintains a queue for pending messages and will write all messages to the wire in a background thread. Make sure you don’t have unsent messages in the queue when your process terminates.
Consumers
Both ruby-kafka
and rdkafka
provides solutions for consuming messages. There are multiple frameworks that wrap these libraries into a complete platform and make it easy to add and scale consumers. Let’s start first with some basic consumers in rdkafka
and ruby-kafka
.
Rdkafka
The configuration of the consumer is similar to the producer’s config, the bootstrap.servers
option needs to be specified to know where the Kafka server is located, but there’s an additional group.id
setting that we need to specify. Group ID defines the Kafka group name which the consumer will join. When your consumer starts processing messages, the Kafka broker will keep in track the last message that the consumer group processed successfully. If you use the same group id, you can stop your consumer any time, next time it’s going to process the next unprocessed message, regardless of how long it was stopped.
require 'rdkafka'config = {
"bootstrap.servers": "kafka.docker:9092",
"group.id": "ruby-test"
}consumer = Rdkafka::Config.new(config).consumer
Now there’s a consumer instance, we just need to specify the topic that it will read from and a basic iterator that going to be yielded when a message was written to the topic.
consumer.subscribe('test')consumer.each do |message|
puts "Message received: #{message}"
end
Ruby-kafka
The configuration is similar to the producer, in the constructor we need to pass the docker hostname and port number
require "kafka"kafka = Kafka.new(["kafka.docker:9092"])consumer = kafka.consumer(group_id: "ruby-test")
Consuming the messages are very similar to the way inrdkafka
, the consumer needs to subscribe to a topic and iterate on the messages with the each_message
iterator.
consumer.subscribe('test')consumer.each_message do |message|
puts "Message received: #{message}"
end
Frameworks
Using only the libraries may help you to start processing messages from a topic quickly, especially when you’re working a small script that requires some data from Kafka. When you use each or each_message methods above provided by the libraries, you need to take into consideration that they are blocking the execution flow, therefore you need to use threads or background processes if you want to consume multiple topics concurrently. When you work with a multi-thread environment, there are certain things you need to deal with, e.g. graceful shutdown, backoff strategy. The following frameworks can help you to avoid some head-aches by putting the basic consumer for-each loops into threads and processes and providing configs to manage them in an easy way. Since producing messages is quite simple by using the libraries, in the following sections I would focus on consuming messages only.
Racecar
It’s one of the simplest Kafka frameworks. Racecar wraps only one consumer into a process and the framework handles everything you need in a production environment like instrumentation and graceful termination when the process gets a SIGTERM signal. With racecar you don’t need to worry about Kafka consumer groups, everything happens behind the hood.
To start playing with Racecar, you need to add the racecar
gem into your project’s Gemfile:
# Gemfilegem 'racecar'
and implement a class that is inherited from Racecar::Consumer
. Here is an example for a consumer that consumes the test
topic. If you are using Rails, it’s recommended to put your consumers into the /app/consumers
folder.
# /app/consumers/test_consumer.rbclass TestConsumer < Racecar::Consumer
def process(message)
puts message.inspect
end
end
When Racecar boots up it creates an instance of your class and calls the process
method on it every time a message is read from the topic. Because there’s only one consumer instance being created during the boot, instance variables will be shared between the requests, that’s why it’s strongly recommended to not store states in instance variables on a multi-tenant system.
You can start consuming messages with passing the class name to the racecar application:
bundle exec racecar TestConsumer
Phobos
When you have multiple consumers, you might want to add them into the same process, especially when you work with Rails and you need to load the whole application into the memory before. Putting consumers into separated processes can multiply the memory usage of your application that could add extra cost to your cloud budget.
The big advantage of Phobos comparing to Racecar is you can specify what consumers you want to execute in the same process and how many concurrent consumers, which is very useful if your topic contains multiple partitions.
To add Phobos into your project, add the phobos
gem into your Gemfile.
# Gemfilegem 'phobos'
You can initialize it and generating the configuration by executingphobos init
in your terminal.
# /app/consumers/test_consumer.rbclass TestConsumer
include Phobos::Handler def consume(payload, metadata)
puts metadata
puts payload
end
end
phobos init
will create a config file for you into /app/config/phobos.yml
if you’re using Rails. The following configuration will create twoTestConsumers
, they will consume messages from the test
topic and will join thetest-consumer-group
Kafka consumer group.
# /app/config/phobos.ymlkafka:
seed_brokers:
- kafka.docker:9092listeners:
- handler: TestConsumer
topic: test
group_id: test-consumer-group
max_concurrency: 2 # run it on 2 threads
If you want to add more consumers to the same process, you need to create a new handler with a similar configuration to the TestConsumer
. In this case, the same process will execute multiple threads for each consumer instance.
You can start the consumers by
bundle exec phobos start
If you want to put consumers into processes to balance your load in your production environment then you need to create different config files for each group and pass the path of the config to phobos start
:
bundle exec phobos start -c config/test_consumers.yml
bundle exec phobos start -c config/user_events.yml
Karafka
Phobos and Racecar are very small frameworks, if I wanted to compare them to Ruby Web frameworks then I would say Racecar is like Sinatra, Phobos is like Padrino and Karafka is like Rails.
To start using Karafka in your project, add thekarafka
gem into the Gemfile.
# Gemfilegem 'karafka'
after executing bundle install
just run the following command to set up your environment and get the default configuration file.
bundle exec karafka install
In the root folder of your application, you should get a karafka.rb
file, the configuration file that describes your environment and the routing definitions.
# /karafka.rbclass KarafkaApp < Karafka::App
config.kafka.seed_brokers = %w[kafka://kafka.docker:9092]
config.client_id = 'example_app'
endconsumer_groups.draw do
topic :test do
consumer TestConsumer
end consumer_group :user_events do
topic :user_updates
consumer UserUpdates
end topic :user_notifications do
consumer UserNotifications
end
end
end
consumer_groups.draw
describes topics and other consumer_groups. Each topic
and consumer_group
in theconsumer_groups.draw
block is going to be executed on its own thread, in the example above there are going to be 2 threads, one for the TestConsumer
and another for the user_events
consumer group. If you want to scale out and run it on multiple processes, you need to start multiple Karafka apps.
Let’s see how you can add a basic TestConsumer
into your project. Under the /app/consumers
create a file with test_consumer.rb
filename:
# /app/consumers/test_consumer.rbclass TestConsumer < Karafka::BaseConsumer
def consume
# You can reach the message in the params like in Rails
puts params.inspect
end
end
Now there’s nothing more left just to start your Karafka application by
bundle exec karafka server
If you want to start only certain consumer groups, you can pass the consumer group names as extra parameters
bundle exec karafka server --consumer_groups user_events
in the current example, only consumers that belong to the user_events group are going to be executed only.
Karafka is a massive framework with lots of configuration options and consumer features, you can find more details in their documentation.
Summary
These libraries and frameworks can help you start integrating Kafka with your application and start producing and consuming messages quickly. I didn’t dive into the very details but they also provide you rich configuration for optimizing your producers and consumers. The Racecar framework is a nice lightweight tool to start using Kafka quickly, but as soon as the number of the consumers increases you might need to consider using Phobos or Karafka because they can manage consumer groups, pipelines better.
Please like the page if you want to hear more about Kafka and Ruby and in another post, I can dive into the details. Thanks.