Skip to main content

Serialization

Restate sends data over the network for storing state, journaling actions, awakeables, etc. There are multiple ways to specify which (de)serializers should be used.

Default Serialization

By default, payloads are serialized with bytes(json.dumps(obj), "utf-8") and deserialized with json.loads(buf). If this does not work for your data type, then you need to specify a custom serializer, as shown below.

Custom Serialization

To write a custom serializer, you implement the Serde interface.

For example a custom JSON serializer could look like this:

class MyData(typing.TypedDict):
"""Represents a response from the GPT model."""
some_value: str
my_number: int
class MySerde(Serde[MyData]):
def deserialize(self, buf: bytes) -> typing.Optional[MyData]:
if not buf:
return None
data = json.loads(buf)
return MyData(some_value=data["some_value"], my_number=data["some_number"])
def serialize(self, obj: typing.Optional[MyData]) -> bytes:
if obj is None:
return bytes()
data = {
"some_value": obj["some_value"],
"some_number": obj["my_number"]
}
return bytes(json.dumps(data), "utf-8")

You then use this serializer in your handlers, as follows:

# For the input/output serialization of your handlers
@my_object.handler(input_serde=MySerde(), output_serde=MySerde())
async def my_handler(ctx: ObjectContext, greeting: str) -> str:
# To serialize state
await ctx.get("my_state", serde=MySerde())
ctx.set("my_state", MyData(some_value="value", my_number=123), serde=MySerde())
# To serialize awakeable payloads
ctx.awakeable(serde=MySerde())
# To serialize the results of actions
await ctx.run("some-task", some_task, serde=MySerde())
# etc.
return "some-output"