Skip to main content

Serialization

Restate sends data over the network for storing state, journaling actions, awakeables, etc. There are multiple ways to specify which (de)serializers should be used.

Default Serialization

By default, payloads are serialized with bytes(json.dumps(obj), "utf-8") and deserialized with json.loads(buf). If this does not work for your data type, then you need to specify a custom serializer, as shown below.

Pydantic

Pydantic is a data validation and parsing library for Python.

You can use Pydantic models to define the structure of your data: handler input/output, state, etc.

To serialize and deserialize Pydantic models, you can use the PydanticJsonSerde included in the Restate SDK:

class Delivery(BaseModel):
timestamp: datetime
dimensions: tuple[int, int]
class CompletedDelivery(BaseModel):
status: str
timestamp: datetime
# For the input/output serialization of your handlers
@my_object.handler()
async def deliver(ctx: ObjectContext, delivery: Delivery) -> CompletedDelivery:
# To serialize state
await ctx.get("delivery", serde=PydanticJsonSerde(Delivery))
ctx.set("delivery", delivery, serde=PydanticJsonSerde(Delivery))
# To serialize awakeable payloads
ctx.awakeable(serde=PydanticJsonSerde(Delivery))
# To serialize the results of actions
await ctx.run("some-task", some_task, serde=PydanticJsonSerde(Delivery))
# etc.
return CompletedDelivery(status="delivered", timestamp=datetime.now())
Pydantic & OpenAPI

Pydantic integrates well with OpenAPI. Restate generates the OpenAPI specifications for you. If you use Pydantic, you can use the OpenAPI-generated clients to interact with your services. You can find example clients in the UI playground (click on your service in the overview and then on playground).

Custom Serialization

To write a custom serializer, you implement the Serde interface.

For example a custom JSON serializer could look like this:

class MyData(typing.TypedDict):
"""Represents a response from the GPT model."""
some_value: str
my_number: int
class MySerde(Serde[MyData]):
def deserialize(self, buf: bytes) -> typing.Optional[MyData]:
if not buf:
return None
data = json.loads(buf)
return MyData(some_value=data["some_value"], my_number=data["some_number"])
def serialize(self, obj: typing.Optional[MyData]) -> bytes:
if obj is None:
return bytes()
data = {"some_value": obj["some_value"], "some_number": obj["my_number"]}
return bytes(json.dumps(data), "utf-8")

You then use this serializer in your handlers, as follows:

# For the input/output serialization of your handlers
@my_object.handler(input_serde=MySerde(), output_serde=MySerde())
async def my_handler(ctx: ObjectContext, greeting: str) -> str:
# To serialize state
await ctx.get("my_state", serde=MySerde())
ctx.set("my_state", MyData(some_value="value", my_number=123), serde=MySerde())
# To serialize awakeable payloads
ctx.awakeable(serde=MySerde())
# To serialize the results of actions
await ctx.run("some-task", some_task, serde=MySerde())
# etc.
return "some-output"