Kafka
You can invoke handlers via Kafka events, by doing the following:
Make sure to first register the handler you want to invoke.
Develop an event handler
You can invoke any handler via Kafka events. The event payload will be (de)serialized as JSON.
- When invoking Virtual Object handlers via Kafka, the key of the Kafka record will be used to determine the Virtual Object key. The key needs to be a valid UTF-8 string. For each Virtual Object, the events are delivered in the order in which they arrived on the topic partition.
- When invoking Service handlers over Kafka, events are delivered in parallel without ordering guarantees.
Since you can invoke any handler via Kafka events, a single handler can be invoked both by RPC and via Kafka.
Configure Restate to connect to a Kafka cluster
Define the Kafka cluster that Restate needs to connect to in the Restate configuration file:
[[ingress.kafka-clusters]]name = "my-cluster"brokers = ["PLAINTEXT://broker:9092"]
And make sure the Restate Server uses it via restate-server --config-file restate.toml
.
Check the configuration docs for more details.
Configuring Kafka clusters via environment variables
You can also configure the Kafka clusters via the RESTATE_INGRESS__KAFKA_CLUSTERS
environment variable:
RESTATE_INGRESS__KAFKA_CLUSTERS=[{name="my-cluster",brokers=["PLAINTEXT://broker:9092"]}]
Register the service you want to invoke.
Subscribe the event handler to the Kafka topic
Let Restate forward events from the Kafka topic to the event handler by creating a subscription using the Admin API:
curl localhost:9070/subscriptions -H 'content-type: application/json' \-d '{"source": "kafka://my-cluster/my-topic","sink": "service://MyService/handle","options": {"auto.offset.reset": "earliest"}}'
Once you've created a subscription, Restate immediately starts consuming events from Kafka. The handler will be invoked for each event received from Kafka.
The options
field is optional and accepts any configuration parameter from librdkafka configuration.
Have a look at the invocation docs for more commands to manage subscriptions.
Kafka connection configuration
You can pass arbitrary Kafka cluster options in the restate.toml
, and those options will be applied for all the subscriptions to that cluster, for example:
[[ingress.kafka-clusters]]name = "my-cluster"brokers = ["PLAINTEXT://broker:9092"]sasl.username = "me"sasl.password = "pass"
For the full list of options, check librdkafka configuration.
Multiple Kafka clusters support
You can configure multiple kafka clusters in the restate.toml
file:
[[ingress.kafka-clusters]]name = "my-cluster-1"brokers = ["PLAINTEXT://localhost:9092"][[ingress.kafka-clusters]]name = "my-cluster-2"brokers = ["PLAINTEXT://localhost:9093"]
And then, when creating the subscriptions, you refer to the specific cluster by name
:
# Subscription to my-cluster-1curl localhost:9070/subscriptions -H 'content-type: application/json' \-d '{"source": "kafka://my-cluster-1/topic-1","sink": "service://MyService/handleCluster1"}'# Subscription to my-cluster-2curl localhost:9070/subscriptions -H 'content-type: application/json' \-d '{"source": "kafka://my-cluster-2/topic-2","sink": "service://MyService/handleCluster2"}'
- TypeScript
- Java
- Go
Raw event support
By default handlers will deserialize the event payload as JSON.
By using serdes restate.serde.binary
you can override this behaviour. Check Typescript SDK > Serialization for more details.
Event metadata
Each event carries within the ctx.request().headers,
map the following entries:
restate.subscription.id
: The subscription identifier, as shown by the Admin API.kafka.offset
: The record offset.kafka.partition
: The record partition.kafka.timestamp
: The record timestamp.
Raw event support
By default handlers will deserialize the event payload as JSON.
By declaring the handler input parameter as byte[]
and annotating it @Raw
the JSON deserialization will be skipped, and the event payload will be passed as is.
Event metadata
Each event carries within the ctx.request().headers();
map the following entries:
restate.subscription.id
: The subscription identifier, as shown by the Admin API.kafka.offset
: The record offset.kafka.partition
: The record partition.kafka.timestamp
: The record timestamp.
Event metadata
Each event carries within the ctx.Request().Headers
map the following entries:
restate.subscription.id
: The subscription identifier, as shown by the Admin API.kafka.offset
: The record offset.kafka.partition
: The record partition.kafka.timestamp
: The record timestamp.