Restate + Kafka: Quickstart
In this guide, you will learn how to connect your Restate service handler to a Kafka topic.
Restate lets you connect any handler to a Kafka topic, and invoke the handler for each event that arrives on the topic. This way, you can use Restate to process events in a lightweight, flexible, transactional way.
The purpose of this guide is to get your setup working. If you are interested in the benefits and the things you can do with Restate's Kafka integration, have a look at the Event Processing use case page.
You can choose any of the SDK languages for this quickstart. You do not need to adapt the handler code to be able to read from Kafka. Any handler can be connected to Kafka.
Start the Kafka cluster
Let's start a Kafka cluster with a single broker, and create a topic named greetings
.
You can run your Kafka cluster in your preferred way. Here, we will use Docker Compose to start a Kafka cluster.
Create a docker-compose.yaml
file with the following content:
version: '3'services:broker:image: confluentinc/cp-kafka:7.5.0container_name: brokerports:- "9092:9092"- "9101:9101"environment:KAFKA_BROKER_ID: 1KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXTKAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1KAFKA_PROCESS_ROLES: broker,controllerKAFKA_NODE_ID: 1KAFKA_CONTROLLER_QUORUM_VOTERS: 1@broker:29093KAFKA_LISTENERS: PLAINTEXT://broker:29092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:9092KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXTKAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLERKAFKA_LOG_DIRS: /tmp/kraft-combined-logsCLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qkinit-kafka:image: confluentinc/cp-kafka:7.5.0depends_on:- brokerentrypoint: [ '/bin/sh', '-c' ]command: |"# blocks until kafka is reachablekafka-topics --bootstrap-server broker:29092 --listecho -e 'Creating kafka topics'kafka-topics --bootstrap-server broker:29092 --create --if-not-exists --topic greetings --replication-factor 1 --partitions 1echo -e 'Successfully created the following topics:'kafka-topics --bootstrap-server broker:29092 --list"
Start the Kafka broker:
docker compose up
Running Restate Server
Now, let's start the Restate Server and let it know about the Kafka cluster via the configuration file.
Store the following configuration in a file named restate.toml
:
[[ingress.kafka-clusters]]name = "my-cluster"brokers = ["PLAINTEXT://localhost:9092"]
Start the Restate Server from the same location as the configuration file:
restate-server --config-file restate.toml
Register the service
Let the Restate Server know about the Greeter service by registering it:
restate deployments register localhost:9080
Connect the handler to the topic
Now, we need to make Restate subscribe to the Kafka topics and tell it where it should push the events that arrive on the topic.
Execute the following curl command to create a subscription, and invoke the handler for each event:
curl localhost:9070/subscriptions -H 'content-type: application/json' \-d '{"source": "kafka://my-cluster/greetings","sink": "service://Greeter/greet","options": {"auto.offset.reset": "earliest"}}'
For Go, you need to capitalize the handler name: service://Greeter/Greet
.
This curl command calls the Admin API of the Restate Server and tells it to invoke the greet
handler of the Greeter
service for each event that arrives on the greetings
topic in the my-cluster
Kafka cluster.
Invoke the handler by publishing an event
Create a Kafka producer and publish an event to the greetings
topic:
docker exec -it broker kafka-console-producer --bootstrap-server broker:29092 --topic greetings
Then type a message and press enter. The greeter takes a String name as an input:
"Sarah"
We invoked a Restate service over Kafka
You now see your handler getting invoked. The greeter template implements a handler which probabilistically fails. You can see the retries and the eventual success in the logs.
The way this worked is that Restate reads the message off the Kafka topic and durably persisted it, similar to what it does for HTTP invocations.
It then pushed the message to the handler, as opposed to the pull mechanism that you would have with a Kafka consumer.
When the handler failed, Restate handled the retry requests and eventually succeeded.
When invoking services, Restates ignores the key of the message.
If your service would be a Virtual Object, Restate would use the key of the Kafka message to determine the Virtual Object ID. Whereas a Kafka partition contains multiple keys, Restate effectively keeps track of a queue per key.
Have a look at the Event Processing use case page to learn about what you can do with Restate and event processing.
Cleanup: removing the subscription
You can see the subscriptions that are active via the Admin API:
curl localhost:9070/subscriptions
{"subscriptions": [{"id": "sub_11XHoawrCiWtv8kzhEyGtsR","source": "kafka://my-cluster/my-topic","sink": "service://Greeter/greet","options": {"auto.offset.reset": "earliest","client.id": "restate","group.id": "sub_11XHoawrCiWtv8kzhEyGtsR"}}]}
As you can see, subscriptions have an ID that starts with sub_
.
Now you can use the subscription ID to delete the subscription:
curl -X DELETE localhost:9070/subscriptions/sub_11XHoawrCiWtv8kzhEyGtsR
Related resources
- Event processing use cases: Have a look at examples of how Restate gives you lightweight, transactional event processing.
- Have a look at the examples. There, you can find examples on Durable Execution and event processing.
- Reference docs for invoking over Kafka
- Blog post on Kafka integration