Serialization
Restate sends data over the network for storing state, journaling actions, awakeables, etc. There are multiple ways to specify which (de)serializers should be used.
Default Serialization
By default, payloads are serialized with bytes(json.dumps(obj), "utf-8")
and deserialized with json.loads(buf)
.
If this does not work for your data type, then you need to specify a custom serializer, as shown below.
Pydantic
Pydantic is a data validation and parsing library for Python.
You can use Pydantic models to define the structure of your data: handler input/output, state, etc.
To serialize and deserialize Pydantic models, you can use the PydanticJsonSerde
included in the Restate SDK:
class Delivery(BaseModel):timestamp: datetimedimensions: tuple[int, int]class CompletedDelivery(BaseModel):status: strtimestamp: datetime# For the input/output serialization of your handlers@my_object.handler()async def deliver(ctx: ObjectContext, delivery: Delivery) -> CompletedDelivery:# To serialize stateawait ctx.get("delivery", serde=PydanticJsonSerde(Delivery))ctx.set("delivery", delivery, serde=PydanticJsonSerde(Delivery))# To serialize awakeable payloadsctx.awakeable(serde=PydanticJsonSerde(Delivery))# To serialize the results of actionsawait ctx.run("some-task", some_task, serde=PydanticJsonSerde(Delivery))# etc.return CompletedDelivery(status="delivered", timestamp=datetime.now())
Pydantic integrates well with OpenAPI. Restate generates the OpenAPI specifications for you. If you use Pydantic, you can use the OpenAPI-generated clients to interact with your services. You can find example clients in the UI playground (click on your service in the overview and then on playground).
Custom Serialization
To write a custom serializer, you implement the Serde
interface.
For example a custom JSON serializer could look like this:
class MyData(typing.TypedDict):"""Represents a response from the GPT model."""some_value: strmy_number: intclass MySerde(Serde[MyData]):def deserialize(self, buf: bytes) -> typing.Optional[MyData]:if not buf:return Nonedata = json.loads(buf)return MyData(some_value=data["some_value"], my_number=data["some_number"])def serialize(self, obj: typing.Optional[MyData]) -> bytes:if obj is None:return bytes()data = {"some_value": obj["some_value"], "some_number": obj["my_number"]}return bytes(json.dumps(data), "utf-8")
You then use this serializer in your handlers, as follows:
# For the input/output serialization of your handlers@my_object.handler(input_serde=MySerde(), output_serde=MySerde())async def my_handler(ctx: ObjectContext, greeting: str) -> str:# To serialize stateawait ctx.get("my_state", serde=MySerde())ctx.set("my_state", MyData(some_value="value", my_number=123), serde=MySerde())# To serialize awakeable payloadsctx.awakeable(serde=MySerde())# To serialize the results of actionsawait ctx.run("some-task", some_task, serde=MySerde())# etc.return "some-output"