Sagas
When building distributed systems, it is crucial to ensure that the system remains consistent even in the presence of failures. One way to achieve this is by using the Saga pattern.
Sagas are a way to manage transactions that span multiple services. They allow you to run compensations when your code crashes halfway through. This way, you can ensure that your system remains consistent even in the presence of failures.
Implementing Sagas in Restate
Let’s assume we want to build a travel booking application. The core of our application is a workflow that first tries to book the flight, then rents a car, and finally processes the customer’s payment before confirming the flight and car rental. When the payment fails, we want to undo the flight booking and car rental.
Restate lets us implement this purely in user code:
- Wrap your business logic in a try-block, and throw a terminal error for cases where you want to compensate and finish.
- For each step you do in your try-block, add a compensation to a list.
- In the catch block, in case of a terminal error, you run the compensations in reverse order, and rethrow the error.
Restate guarantees us that all code will execute. So if a terminal error is thrown, all compensations will run:
- TypeScript
- Java
- Kotlin
- Python
- Go
const bookingWorkflow = restate.workflow({name: "BookingWorkflow",handlers: {run: async (ctx: restate.WorkflowContext, req: BookingRequest) => {const {flights, car, paymentInfo} = req;// create a list of undo actionsconst compensations = [];try {// Reserve the flights and let Restate remember the reservation ID// This sends an HTTP request via Restate to the Restate flights serviceconst flightBookingId = await ctx.serviceClient(FlightsService).reserve(flights);// Use the flightBookingId to register the undo action for the flight reservation,// or later confirm the reservation.compensations.push(() => ctx.serviceClient(FlightsService).cancel({flightBookingId}));// Reserve the car and let Restate remember the reservation IDconst carBookingId = await ctx.serviceClient(CarRentalService).reserve(car);// Register the undo action for the car rental.compensations.push(() => ctx.serviceClient(CarRentalService).cancel({carBookingId}));// Generate an idempotency key for the payment; stable on retriesconst paymentId = ctx.rand.uuidv4();// Register the refund as a compensation, using the idempotency keycompensations.push(() => ctx.run(() => paymentClient.refund({ paymentId })));// Do the payment using the idempotency key (sometimes throws Terminal Errors)await ctx.run(() => paymentClient.charge({ paymentInfo, paymentId }));// Confirm the flight and car reservationsawait ctx.serviceClient(FlightsService).confirm({flightBookingId});await ctx.serviceClient(CarRentalService).confirm({carBookingId});} catch (e) {// Terminal errors tell Restate not to retry, but to compensate and fail the workflowif (e instanceof restate.TerminalError) {// Undo all the steps up to this point by running the compensations// Restate guarantees that all compensations are executedfor (const compensation of compensations.reverse()) {await compensation();}}// Rethrow error to fail this workflowthrow e;}}},});restate.endpoint().bind(bookingWorkflow).bind(carRentalService).bind(flightsService).listen(9080);
@Workflowpublic class BookingWorkflow {public record BookingRequest(FlightService.FlightBookingRequest flights,CarRentalService.CarRentalRequest car,PaymentClient.PaymentInfo paymentInfo) {}@Workflowpublic void run(WorkflowContext ctx, BookingRequest req) throws TerminalException {// Create a list of undo actionsList<Runnable> compensations = new ArrayList<>();try {// Reserve the flights; Restate remembers the reservation ID// This sends an HTTP request via Restate to the Restate flights servicevar flightsRpcClient = FlightServiceClient.fromContext(ctx);String flightBookingId = flightsRpcClient.reserve(req.flights()).await();// Use the flightBookingId to register the undo action for the flight reservation,// or later confirm the reservation.compensations.add(() -> flightsRpcClient.cancel(flightBookingId).await());// Reserve the car; Restate remembers the reservation IDvar carRentalRpcClient = CarRentalServiceClient.fromContext(ctx);String carBookingId = carRentalRpcClient.reserve(req.car()).await();// Register the undo action for the car rental.compensations.add(() -> carRentalRpcClient.cancel(carBookingId).await());// Charge the payment; Generate a payment ID and store it in RestateString paymentId = ctx.random().nextUUID().toString();// Register the refund as a compensation, using the idempotency keycompensations.add(() -> ctx.run(() -> PaymentClient.refund(paymentId)));// Do the payment using the paymentId as idempotency keyctx.run(() -> PaymentClient.charge(req.paymentInfo(), paymentId));// Confirm the flight and car reservationsflightsRpcClient.confirm(flightBookingId).await();carRentalRpcClient.confirm(carBookingId).await();}// Terminal errors tell Restate not to retry, but to compensate and fail the workflowcatch (TerminalException e) {// Undo all the steps up to this point by running the compensations// Restate guarantees that all compensations are executedfor (Runnable compensation : compensations) {compensation.run();}// Rethrow error to fail this workflowthrow e;}}public static void main(String[] args) {RestateHttpEndpointBuilder.builder().bind(new BookingWorkflow()).bind(new CarRentalService()).bind(new FlightService()).buildAndListen();}}
@Workflowclass BookingWorkflow {@Workflowsuspend fun run(ctx: WorkflowContext, req: TravelBookingRequest) {// Create a list of compensations to run in case of a failure or cancellation.val compensations: MutableList<suspend () -> Unit> = mutableListOf()try {// Reserve the flights and let Restate remember the reservation ID// This sends an HTTP request via Restate to the Restate flights serviceval flightsRpcClient = FlightServiceClient.fromContext(ctx)val flightBookingId = flightsRpcClient.reserve(req.flights).await()// Use the flightBookingId to register the undo action for the flight reservation,// or later confirm the reservation.compensations.add { flightsRpcClient.cancel(flightBookingId).await() }// Reserve the car and let Restate remember the reservation IDval carRentalRpcClient = CarRentalServiceClient.fromContext(ctx)val carBookingId = carRentalRpcClient.reserve(req.car).await()// Register the compensation to undo the car rental reservation.compensations.add { carRentalRpcClient.cancel(carBookingId).await() }// Charge the payment; Generate a payment ID and store it in Restateval paymentId = ctx.random().nextUUID().toString()// Register the payment refund using the paymentIdcompensations.add { ctx.runBlock { refundCustomer(paymentId)} }// Do the payment using the paymentId as idempotency keyctx.runBlock { chargeCustomer(req.paymentInfo, paymentId)}// Confirm flight and car rental after payment doneflightsRpcClient.confirm(flightBookingId).await()carRentalRpcClient.confirm(carBookingId).await()}// Terminal errors tell Restate not to retry, but to compensate and fail the workflowcatch (e: TerminalException) {// Undo all the steps up to this point by running the compensations// Restate guarantees that all compensations are executedcompensations.reversed().forEach {it()}throw TerminalException("Failed to reserve the trip: ${e.message}. Ran ${compensations.size} compensations.")}}}fun main() {RestateHttpEndpointBuilder.builder().bind(BookingWorkflow()).bind(FlightService()).bind(CarRentalService()).buildAndListen()}
booking_workflow = Workflow("BookingWorkflow")@booking_workflow.main()async def run(ctx: restate.WorkflowContext, req: BookingRequest):# Create a list of undo actionscompensations = []try:# Reserve the flights; Restate remembers the reservation ID# This sends an HTTP request via Restate to the Restate flights serviceflight_booking_id = await ctx.service_call(flight_service.reserve, arg=req.flights)# Use the flightBookingId to register the undo action for the flight reservation,# or later confirm the reservation.compensations.append(lambda: ctx.service_call(flight_service.cancel, arg=flight_booking_id))# Reserve the car and let Restate remember the reservation IDcar_booking_id = await ctx.service_call(car_rental_service.reserve, arg=req.car)# Register the undo action for the car rental.compensations.append(lambda: ctx.service_call(car_rental_service.cancel, arg=car_booking_id))# Generate an idempotency key for the paymentpayment_id = await ctx.run("payment_id", lambda: str(uuid.uuid4()))# Register the refund as a compensation, using the idempotency keyasync def refund():return await payment_client.refund(payment_id)compensations.append(lambda: ctx.run("refund", refund))# Do the payment using the idempotency keyasync def charge():return await payment_client.charge(req.payment_info, payment_id)await ctx.run("charge", charge)# Confirm the flight and car reservationsawait ctx.service_call(flight_service.confirm, arg=flight_booking_id)await ctx.service_call(car_rental_service.confirm, arg=car_booking_id)# Terminal errors tell Restate not to retry, but to compensate and fail the workflowexcept TerminalError as e:# Undo all the steps up to this point by running the compensations# Restate guarantees that all compensations are executedfor compensation in reversed(compensations):await compensation()# Rethrow error to fail this workflowraise eapp = restate.app([booking_workflow, car_rental_service.service, flight_service.service])
type BookingWorkflow struct{}func (BookingWorkflow) Run(ctx restate.WorkflowContext, req BookingRequest) error {// Create a list of undo actionscompensations := make([]func() error, 0, 3)addCompensation := func(compensation func() error) {compensations = append(compensations, compensation)}handleError := func(err error) error {// Terminal errors tell Restate not to retry, but to compensate and fail the workflowif restate.IsTerminalError(err) {// Undo all the steps up to this point by running the compensations// Restate guarantees that all compensations are executedfor _, compensation := range compensations {if err := compensation(); err != nil {return err}}}return err}// Reserve the flights; Restate remembers the reservation ID// This sends an HTTP request via Restate to the Restate flight serviceflightBookingID, err := restate.Service[string](ctx, "FlightService", "Reserve").Request(req.Flights)if err != nil {return err}// Use the flightBookingId to register the undo action for the flight reservation,// or later confirm the reservation.addCompensation(func() error {_, err := restate.Service[restate.Void](ctx, "FlightService", "Cancel").Request(flightBookingID)return err})// Reserve the car and let Restate remember the reservation IDcarBookingID, err := restate.Service[string](ctx, "CarRentalService", "Reserve").Request(req.Car)if err != nil {return handleError(err)}// Register the undo action for the car rental.addCompensation(func() error {_, err := restate.Service[restate.Void](ctx, "CarRentalService", "Cancel").Request(carBookingID)return err})// Charge the payment; Generate a payment ID and store it in RestatepaymentID := restate.Rand(ctx).UUID().String()// Register the refund as a compensation, using the idempotency keyaddCompensation(func() error {_, err = restate.Run(ctx, func(ctx restate.RunContext) (restate.Void, error) {return restate.Void{}, activities.PaymentClient{}.Refund(paymentID)})return err})// Do the payment using the idempotency keyif _, err = restate.Run(ctx, func(ctx restate.RunContext) (restate.Void, error) {return restate.Void{}, activities.PaymentClient{}.Charge(paymentID, req.PaymentInfo)}); err != nil {return handleError(err)}// Confirm the flight and car reservationsif _, err = restate.Service[restate.Void](ctx, "FlightService", "Confirm").Request(flightBookingID); err != nil {return handleError(err)}if _, err = restate.Service[restate.Void](ctx, "CarRentalService", "Confirm").Request(carBookingID); err != nil {return handleError(err)}return nil}func main() {server := server.NewRestate().Bind(restate.Reflect(BookingWorkflow{})).Bind(restate.Reflect(activities.CarRentalService{})).Bind(restate.Reflect(activities.FlightService{}))if err := server.Start(context.Background(), ":9080"); err != nil {slog.Error("application exited unexpectedly", "err", err.Error())os.Exit(1)}}
When to use Sagas
Restate runs invocations till completion, with infinite retries and recovery of partial progress. In that sense, you do not require to run compensations in between retries. Restate will start the retry attempt from the point where the invocation failed.
However, there can still be cases in your business logic where you want to stop a handler from executing any further and run compensations for the work done so far.
You will also need sagas to end up in a consistent state when you cancel an invocation (via the CLI or programmatically). For example, if an invocation gets stuck because an external system is not responding, you might want to stop executing the invocation while keeping the overall system state consistent.
Registering compensations
Because this is all implemented in pure user code, there are no restrictions on what you can do in compensations, as long as its idempotent.
It is for example possible to reset the state of the service, call other services to undo previously executed calls, or run ctx.run
actions to delete previously inserted rows in a database.
Adding compensations
Depending on the characteristics of the API, adding the compensation might look different:
-
The flights and cars require to first reserve, and then use the ID you get to confirm or cancel. In this case, we add the compensation after creating the reservation (because we need the ID).
-
The example of the payment API requires you to generate an idempotency key yourself, and executes in one shot. Here, we add the compensation before performing the action, using the same UUID. This way, we ensure that a payment which throws a terminal error did not go through.