Skip to main content

Kafka

You can invoke handlers via Kafka events, by doing the following:

info

Make sure to first register the handler you want to invoke.

1
Develop an event handler

You can invoke any handler via Kafka events. The event payload will be (de)serialized as JSON.

  • When invoking Virtual Object handlers via Kafka, the key of the Kafka record will be used to determine the Virtual Object key. The key needs to be a valid UTF-8 string. For each Virtual Object, the events are delivered in the order in which they arrived on the topic partition.
  • When invoking Service handlers over Kafka, events are delivered in parallel without ordering guarantees.
Combining RPC and Kafka within a service/handler

Since you can invoke any handler via Kafka events, a single handler can be invoked both by RPC and via Kafka.

2
Configure Restate to connect to a Kafka cluster

Define the Kafka cluster that Restate needs to connect to in the Restate configuration file:

restate.toml

[[ingress.kafka-clusters]]
name = "my-cluster"
brokers = ["PLAINTEXT://broker:9092"]

And make sure the Restate Server uses it via restate-server --config-file restate.toml.

Check the configuration docs for more details.

Configuring Kafka clusters via environment variables

You can also configure the Kafka clusters via the RESTATE_INGRESS__KAFKA_CLUSTERS environment variable:


RESTATE_INGRESS__KAFKA_CLUSTERS=[{name="my-cluster",brokers=["PLAINTEXT://broker:9092"]}]

3
Register the service you want to invoke.

4
Subscribe the event handler to the Kafka topic

Let Restate forward events from the Kafka topic to the event handler by creating a subscription using the Admin API:


curl localhost:9070/subscriptions -H 'content-type: application/json' \
-d '{
"source": "kafka://my-cluster/my-topic",
"sink": "service://MyService/handle",
"options": {"auto.offset.reset": "earliest"}
}'

Once you've created a subscription, Restate immediately starts consuming events from Kafka. The handler will be invoked for each event received from Kafka.

The options field is optional and accepts any configuration parameter from librdkafka configuration.

Managing subscriptions

Have a look at the invocation docs for more commands to manage subscriptions.

Kafka connection configuration

You can pass arbitrary Kafka cluster options in the restate.toml, and those options will be applied for all the subscriptions to that cluster, for example:

restate.toml

[[ingress.kafka-clusters]]
name = "my-cluster"
brokers = ["PLAINTEXT://broker:9092"]
sasl.username = "me"
sasl.password = "pass"

For the full list of options, check librdkafka configuration.

Multiple Kafka clusters support

You can configure multiple kafka clusters in the restate.toml file:

restate.toml

[[ingress.kafka-clusters]]
name = "my-cluster-1"
brokers = ["PLAINTEXT://localhost:9092"]
[[ingress.kafka-clusters]]
name = "my-cluster-2"
brokers = ["PLAINTEXT://localhost:9093"]

And then, when creating the subscriptions, you refer to the specific cluster by name:


# Subscription to my-cluster-1
curl localhost:9070/subscriptions -H 'content-type: application/json' \
-d '{
"source": "kafka://my-cluster-1/topic-1",
"sink": "service://MyService/handleCluster1"
}'
# Subscription to my-cluster-2
curl localhost:9070/subscriptions -H 'content-type: application/json' \
-d '{
"source": "kafka://my-cluster-2/topic-2",
"sink": "service://MyService/handleCluster2"
}'

Raw event support

By default handlers will deserialize the event payload as JSON. By using serdes restate.serde.binary you can override this behaviour. Check Typescript SDK > Serialization for more details.

Event metadata

Each event carries within the ctx.request().headers, map the following entries:

  • restate.subscription.id: The subscription identifier, as shown by the Admin API.
  • kafka.offset: The record offset.
  • kafka.partition: The record partition.
  • kafka.timestamp: The record timestamp.