Create a test data generator using Kafka Connect

Shiyan
Zendesk Engineering
4 min readJul 10, 2018

--

In some recent projects, I have worked on a few data pipelines with Apache Kafka. When it came to performance testing, data generation would always introduce some boilerplate code throughout activities like creating client instances, writing control flow to send data, randomizing payload based on business logic, and so on.

During test setup, it would be nice to have a framework that handles all the heavy lifting such that only two basic and essential questions need to be answered:

  • What should the data look like? (schema)
  • How much data to generate? (volume)

With Kafka Connect, it turns out that implementing a custom source connector is able to achieve this. Here is a quick overview of an example property list for generating test data.

topic.name=generated.events
poll.size=10
poll.interval.ms=5000
message.template={"status":"foo","direction":"up"}
random.fields=status:foo|bar|baz, direction:up|down|left|right

These properties are self-explanatory. To answer the two essential questions mentioned above: message.template and random.fields control the schema, while poll.size and poll.interval.ms control the volume.

Based on these properties, I created a custom source connector named “kafka-connect-datagen” (or “datagen” for short), available on GitHub.

In the next section, I will briefly walk through some implementation details.

Implement a custom Connector

A Kafka Connect source connector copies data from a data store to Kafka, while a sink connector does the opposite. Although a source connector, datagen does not link to any data store; it generates the data from within itself. The rest of its implementation is standard according to the Kafka Connect development guide: it extends SourceConnector and SourceTask, and implements some lifecycle method hooks. The following snippets are abbreviated from datagen.

As illustrated in the code, Connector defines what kind of Task to run and what configurations to set for the Task, while Task is the working unit that executes custom logic. Both Connector and Task instances run in a Worker process. The Confluent documentation explains these concepts in more detail.

In addition to implementing these 2 classes, there is one more step before running a demo: defining a list of configurations (ConfigDef) for users. After that, these classes are ready to be packaged as a Connector plugin. The full implementation can be found on GitHub.

In the next section, I will demonstrate how to use the plugin with dockerized local cluster setup.

A QuickStart demo

In this QuickStart example, we use docker-compose to manage all the required services such as ZooKeeper, Kafka, and Kafka Connect. To bring up all these services, run docker-compose up -d, then run docker-compose ps to print the state info as shown below.

Name                          State                     Ports
-------------------------------------------------------------------
quickstart_broker_1 Up 0.0.0.0:9092->9092/tcp
quickstart_connect_1 Up 0.0.0.0:8083->8083/tcp, ...
quickstart_kafka-connect-ui_1 Up 0.0.0.0:8001->8000/tcp
quickstart_kafka-rest-proxy_1 Up 0.0.0.0:8082->8082/tcp
quickstart_kafka-topics-ui_1 Up 0.0.0.0:8000->8000/tcp
quickstart_zookeeper_1 Up 0.0.0.0:2181->2181/tcp, ...

Kafka and Kafka Connect will take longer to start. Thanks to Landoop Ltd, we have these nice UI tools: open http://localhost:8000 to see Kafka topics UI, and http://localhost:8001 to see Kafka Connect UI. You may also run docker-compose logs -f to view logs.

Since usually Kafka Connect service is the last one to finish starting up, we can observe its logs by running docker-compose logs -f connect to see the up-and-running indicator as below.

INFO Starting connectors and tasks using config offset -1 (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
INFO Finished starting connectors and tasks (org.apache.kafka.connect.runtime.distributed.DistributedHerder)

When all services are fully up, it is time to create the “datagen” Connector instance. Here is an example of configurations to use for this demo. It basically sets “datagen” tasks to generate 10 messages every 5 seconds. Each message uses the defined JSON message template with some randomized fields. Run the following command to instantiate the Connector and Task.

curl -X POST http://localhost:8083/connectors \
-H 'Content-Type:application/json' \
-H 'Accept:application/json' \
-d @connect.source.datagen.json

Now in Kafka topics UI, we are able to see randomized JSON messages being published to topic generated.events at the defined rate.

kafka-connect-datagen published messages

To stop the generation, we can go to Kafka Connect UI and pause or delete the connector. Equivalently, we can use REST APIs as shown below to achieve the same outcomes. Check out this Confluent documentation for more operations.

# pause connector (empty response if succeeds)
curl -X PUT http://localhost:8083/connectors/connect.source.datagen/pause

# delete connector (empty response if succeeds)
curl -X DELETE http://localhost:8083/connectors/connect.source.datagen

In summary, we are able to leverage on Kafka Connect, an off-the-shelf tool that integrates nicely with Kafka to achieve random data generation with minimum level of boilerplate code. The custom Connector plugin — kafka-connect-datagen — is highly portable, and can be extended further to support features such as integration tests and different message formats.

--

--