Skip to main content

Restate + Kafka: Quickstart

In this guide, you will learn how to connect your Restate service handler to a Kafka topic.

Restate lets you connect any handler to a Kafka topic, and invoke the handler for each event that arrives on the topic. This way, you can use Restate to process events in a lightweight, flexible, transactional way.

info

The purpose of this guide is to get your setup working. If you are interested in the benefits and the things you can do with Restate's Kafka integration, have a look at the Event Processing use case page.

You can choose any of the SDK languages for this quickstart. You do not need to adapt the handler code to be able to read from Kafka. Any handler can be connected to Kafka.

2
Start the Kafka cluster

Let's start a Kafka cluster with a single broker, and create a topic named greetings.

You can run your Kafka cluster in your preferred way. Here, we will use Docker Compose to start a Kafka cluster.

Create a docker-compose.yaml file with the following content:

docker-compose.yaml
version: '3'
services:
broker:
image: confluentinc/cp-kafka:7.5.0
container_name: broker
ports:
- "9092:9092"
- "9101:9101"
environment:
KAFKA_BROKER_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_NODE_ID: 1
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@broker:29093
KAFKA_LISTENERS: PLAINTEXT://broker:29092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:9092
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LOG_DIRS: /tmp/kraft-combined-logs
CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk
init-kafka:
image: confluentinc/cp-kafka:7.5.0
depends_on:
- broker
entrypoint: [ '/bin/sh', '-c' ]
command: |
"# blocks until kafka is reachable
kafka-topics --bootstrap-server broker:29092 --list
echo -e 'Creating kafka topics'
kafka-topics --bootstrap-server broker:29092 --create --if-not-exists --topic greetings --replication-factor 1 --partitions 1
echo -e 'Successfully created the following topics:'
kafka-topics --bootstrap-server broker:29092 --list"

Start the Kafka broker:

docker compose up

3
Running Restate Server

Now, let's start the Restate Server and let it know about the Kafka cluster via the configuration file.

Store the following configuration in a file named restate.toml:

restate.toml
[[ingress.kafka-clusters]]
name = "my-cluster"
brokers = ["PLAINTEXT://localhost:9092"]

Start the Restate Server from the same location as the configuration file:

restate-server --config-file restate.toml

4
Register the service

Let the Restate Server know about the Greeter service by registering it:

restate deployments register localhost:9080

5
Connect the handler to the topic

Now, we need to make Restate subscribe to the Kafka topics and tell it where it should push the events that arrive on the topic.

Execute the following curl command to create a subscription, and invoke the handler for each event:

curl localhost:9070/subscriptions -H 'content-type: application/json' \
-d '{
"source": "kafka://my-cluster/greetings",
"sink": "service://Greeter/greet",
"options": {"auto.offset.reset": "earliest"}
}'

For Go, you need to capitalize the handler name: service://Greeter/Greet.

This curl command calls the Admin API of the Restate Server and tells it to invoke the greet handler of the Greeter service for each event that arrives on the greetings topic in the my-cluster Kafka cluster.

5
Invoke the handler by publishing an event

Create a Kafka producer and publish an event to the greetings topic:

docker exec -it broker kafka-console-producer --bootstrap-server broker:29092 --topic greetings

Then type a message and press enter. The greeter takes a String name as an input:

"Sarah"

🥳
We invoked a Restate service over Kafka

You now see your handler getting invoked. The greeter template implements a handler which probabilistically fails. You can see the retries and the eventual success in the logs.

The way this worked is that Restate reads the message off the Kafka topic and durably persisted it, similar to what it does for HTTP invocations.

It then pushed the message to the handler, as opposed to the pull mechanism that you would have with a Kafka consumer.

When the handler failed, Restate handled the retry requests and eventually succeeded.

When invoking services, Restates ignores the key of the message.

If your service would be a Virtual Object, Restate would use the key of the Kafka message to determine the Virtual Object ID. Whereas a Kafka partition contains multiple keys, Restate effectively keeps track of a queue per key.

Have a look at the Event Processing use case page to learn about what you can do with Restate and event processing.

🏁
Cleanup: removing the subscription

You can see the subscriptions that are active via the Admin API:

curl localhost:9070/subscriptions
{
"subscriptions": [
{
"id": "sub_11XHoawrCiWtv8kzhEyGtsR",
"source": "kafka://my-cluster/my-topic",
"sink": "service://Greeter/greet",
"options": {
"auto.offset.reset": "earliest",
"client.id": "restate",
"group.id": "sub_11XHoawrCiWtv8kzhEyGtsR"
}
}
]
}

As you can see, subscriptions have an ID that starts with sub_.

Now you can use the subscription ID to delete the subscription:

curl -X DELETE localhost:9070/subscriptions/sub_11XHoawrCiWtv8kzhEyGtsR