Serialization
Restate sends data over the network for storing state, journaling actions, awakeables, etc. There are multiple ways to specify which (de)serializers should be used.
Default Serialization
By default, payloads are serialized with bytes(json.dumps(obj), "utf-8")
and deserialized with json.loads(buf)
.
If this does not work for your data type, then you need to specify a custom serializer, as shown below.
Custom Serialization
To write a custom serializer, you implement the Serde
interface.
For example a custom JSON serializer could look like this:
class MyData(typing.TypedDict):"""Represents a response from the GPT model."""some_value: strmy_number: intclass MySerde(Serde[MyData]):def deserialize(self, buf: bytes) -> typing.Optional[MyData]:if not buf:return Nonedata = json.loads(buf)return MyData(some_value=data["some_value"], my_number=data["some_number"])def serialize(self, obj: typing.Optional[MyData]) -> bytes:if obj is None:return bytes()data = {"some_value": obj["some_value"],"some_number": obj["my_number"]}return bytes(json.dumps(data), "utf-8")
You then use this serializer in your handlers, as follows:
# For the input/output serialization of your handlers@my_object.handler(input_serde=MySerde(), output_serde=MySerde())async def my_handler(ctx: ObjectContext, greeting: str) -> str:# To serialize stateawait ctx.get("my_state", serde=MySerde())ctx.set("my_state", MyData(some_value="value", my_number=123), serde=MySerde())# To serialize awakeable payloadsctx.awakeable(serde=MySerde())# To serialize the results of actionsawait ctx.run("some-task", some_task, serde=MySerde())# etc.return "some-output"