Parallelizing work
This guide shows how to use the Restate to execute a list of tasks in parallel and then gather their result, also known as fan-out, fan-in.
Fan out: You can fan out tasks with Restate by creating a handler that processes a single subtask, and then scheduling it repeatedly from another handler. Restate guarantees and manages the execution of all the tasks across failures.
Fan in: You can fan in the results of the subtasks by using Restate's Promise Combinators to wait for all promises to resolve.
Example
The example implements a worker service:
- It splits a task into subtasks.
- It schedules all the subtasks. Each subtask results in a promise that gets added to a list.
- The result is gathered by waiting for all promises to resolve.
You can run this on FaaS infrastructure, like AWS Lambda, and it will scale automatically.
The run
handler will then suspend while it waits for all subtasks to finish.
Restate will then resume the handler when all subtasks are done.
- TypeScript
- Java
- Python
- Go
const fanOutWorker = restate.service({name: "worker",handlers: {run: async (ctx: Context, task: Task) => {// Split the task in subtasksconst subtasks: SubTask[] = await ctx.run("split task", () =>split(task));// Fan out the subtasks - run them in parallelconst resultPromises = [];for (const subtask of subtasks) {const subResultPromise = ctx.serviceClient(fanOutWorker).runSubtask(subtask);resultPromises.push(subResultPromise);}// Fan in - Aggregate the resultsconst results = await CombineablePromise.all(resultPromises);return aggregate(results);},// Can also run on FaaSrunSubtask: async (ctx: Context, subtask: SubTask) => {// Processing logic goes here ...// Can be moved to a separate service to scale independently},},});export const handler = restate.endpoint().bind(fanOutWorker).handler();
@Servicepublic class FanOutWorker {public record Task(){}public record SubTask(){}public record SubTaskResult(){}public record Result(){}@Handlerpublic Result run(Context ctx, Task task) {// Split the task in subtasksList<SubTask> subTasks = ctx.run(JacksonSerdes.of(new TypeReference<>() {}),() -> split(task));// Fan out the subtasks - run them in parallelList<Awaitable<?>> resultFutures = new ArrayList<>();for (SubTask subTask : subTasks) {resultFutures.add(FanOutWorkerClient.fromContext(ctx).runSubtask(subTask));}Awaitable.all(resultFutures).await();// Fan in - Aggregate the resultsvar results = (SubTaskResult[]) resultFutures.stream().map(Awaitable::await).toArray();return aggregate(results);}// Can also run on FaaS@Handlerpublic SubTaskResult runSubtask(Context ctx, SubTask subTask) {// Processing logic goes here ...// Can be moved to a separate service to scale independentlyreturn new SubTaskResult();}public static void main(String[] args) {RestateHttpEndpointBuilder.builder().bind(new FanOutWorker()).buildAndListen(9082);}}
fanout_worker = Service("FanOutWorker")# Restate makes it easy to parallelize async work by fanning out tasks.# Afterwards, you can collect the result by fanning in the partial results.# Durable Execution ensures that the fan-out and fan-in steps happen reliably exactly once.@fanout_worker.handler()async def run(ctx: Context, task: Task):# Split the task in subtaskssubtasks = await ctx.run("split task", lambda: split(task))# Fan out the subtasks - run them in parallelresult_promises = []for subtask in subtasks:sub_result_promise = ctx.service_call(run_subtask, arg=subtask)result_promises.append(sub_result_promise)# Fan in - Aggregate the resultsresults = [await promise for promise in result_promises]return aggregate(results)# Can also run on FaaS@fanout_worker.handler()async def run_subtask(ctx: Context, subtask: Subtask):# Processing logic goes here...# Can be moved to a separate service to scale independentlypassapp = restate.app([fanout_worker])
type FanOutWorker struct{}func (FanOutWorker) Run(ctx restate.Context, task Task) (Result, error) {// Split the task into subtaskssubtasks, err := split(task)if err != nil {return Result{}, err}// Fan out the subtasks - run them in parallelsubtaskFutures := make([]restate.Selectable, 0, len(subtasks))for _, subtask := range subtasks {subtaskFutures = append(subtaskFutures,restate.Service[SubTaskResult](ctx, "FanOutWorker", "RunSubtask").RequestFuture(subtask))}selector := restate.Select(ctx, subtaskFutures...)// Fan in - Aggregate the resultssubResults := make([]SubTaskResult, 0, len(subtasks))for selector.Remaining() {response, err := selector.Select().(restate.ResponseFuture[SubTaskResult]).Response()if err != nil {return Result{}, err}subResults = append(subResults, response)}// Fan in - Aggregate the resultsreturn aggregate(subResults)}// RunSubtask can also run on FaaSfunc (FanOutWorker) RunSubtask(ctx restate.Context, subtask SubTask) (SubTaskResult, error) {// Processing logic goes here ...// Can be moved to a separate service to scale independentlyreturn executeSubtask(ctx, subtask)}func main() {server := server.NewRestate().Bind(restate.Reflect(FanOutWorker{}))if err := server.Start(context.Background(), ":9080"); err != nil {slog.Error("application exited unexpectedly", "err", err.Error())os.Exit(1)}}/*Try running the FanOutWorker service with the following command:curl -X POST http://localhost:8080/FanOutWorker/Run -H "Content-Type: application/json" -d '{"description": "get out of bed,shower,make coffee,have breakfast"}'*/
Running the example
Download the example
- ts
- java
- python
- go
restate example typescript-patterns-use-cases && cd typescript-patterns-use-cases
restate example java-patterns-use-cases && cd java-patterns-use-cases
restate example python-patterns-use-cases && cd python-patterns-use-cases
restate example go-patterns-use-cases && cd go-patterns-use-cases
Start the Restate Server
restate-server
Start the Service
- ts
- java
- python
- go
npx tsx watch ./src/parallelizework/fan_out_worker.ts
./gradlew -PmainClass=my.example.parallelizework.FanOutWorker run
python -m hypercorn --config hypercorn-config.toml src/parallelizework/fan_out_worker:app
go run ./src/parallelizework
Register the services
restate deployments register localhost:9080
Send a request
- ts
- java
- python
- go
curl localhost:8080/worker/run \--json '{"description": "get out of bed,shower,make coffee,have breakfast"}'
curl localhost:8080/FanOutWorker/run \--json '{"description": "get out of bed,shower,make coffee,have breakfast"}'
curl localhost:8080/FanOutWorker/run \--json '{"description": "get out of bed,shower,make coffee,have breakfast"}'
curl localhost:8080/FanOutWorker/Run \--json '{"description": "get out of bed,shower,make coffee,have breakfast"}'
Check the service logs
See how all tasks get spawned in parallel, finish at different times, and then get aggregated.
- ts
- java
- python
- go
[restate] [worker/runSubtask][inv_17jBqoqRG0TN3msVqHEpZn2aQMOX5kSKrf][2025-01-17T08:51:44.993Z] INFO: Started executing subtask: get out of bed[restate] [worker/runSubtask][inv_1f8R1NuF0LF27EdQ0R6s7PR8hld245OM8h][2025-01-17T08:51:44.995Z] INFO: Started executing subtask: shower[restate] [worker/runSubtask][inv_101oPhGwxQqZ0sQebkQnpGyV9Rp3oj9CSJ][2025-01-17T08:51:44.997Z] INFO: Started executing subtask: make coffee[restate] [worker/runSubtask][inv_1eKDShaxMCEB6DXasrR5OtRXJEvA2je33X][2025-01-17T08:51:44.998Z] INFO: Started executing subtask: have breakfast[restate] [worker/runSubtask][inv_17jBqoqRG0TN3msVqHEpZn2aQMOX5kSKrf][2025-01-17T08:51:47.003Z] INFO: Execution subtask finished: get out of bed[restate] [worker/runSubtask][inv_101oPhGwxQqZ0sQebkQnpGyV9Rp3oj9CSJ][2025-01-17T08:51:48.007Z] INFO: Execution subtask finished: make coffee[restate] [worker/runSubtask][inv_1f8R1NuF0LF27EdQ0R6s7PR8hld245OM8h][2025-01-17T08:51:48.999Z] INFO: Execution subtask finished: shower[restate] [worker/runSubtask][inv_1eKDShaxMCEB6DXasrR5OtRXJEvA2je33X][2025-01-17T08:51:49.001Z] INFO: Execution subtask finished: have breakfast[restate] [worker/run][inv_18QHSeAYfvim1oNXRl9I5105veQcTW3BEl][2025-01-17T08:51:49.007Z] INFO: Aggregated result: get out of bed: DONE,shower: DONE,make coffee: DONE,have breakfast: DONE
2025-01-17 10:00:58 INFO [FanOutWorker/run][inv_1jNoSMJtWluo4Ir43OUyDAxD9weMAQ4OeR] dev.restate.sdk.core.InvocationStateMachine - Start invocation2025-01-17 10:00:58 INFO [FanOutWorker/runSubtask][inv_1kdpBvVXdqyo3saU6KThul6Jgkfot6LcRP] dev.restate.sdk.core.InvocationStateMachine - Start invocation2025-01-17 10:00:58 INFO [FanOutWorker/runSubtask][inv_1kdpBvVXdqyo3saU6KThul6Jgkfot6LcRP] my.example.parallelizework.utils.Utils - Started executing subtask: get out of bed2025-01-17 10:00:58 INFO [FanOutWorker/runSubtask][inv_162MCD5ertQ65pdG0uDIYRMgLBFYZkNPnb] dev.restate.sdk.core.InvocationStateMachine - Start invocation2025-01-17 10:00:58 INFO [FanOutWorker/runSubtask][inv_162MCD5ertQ65pdG0uDIYRMgLBFYZkNPnb] my.example.parallelizework.utils.Utils - Started executing subtask: shower2025-01-17 10:00:58 INFO [FanOutWorker/runSubtask][inv_10bPiFTjBUXX35qtzOTPNr0vfgoYYVehpf] dev.restate.sdk.core.InvocationStateMachine - Start invocation2025-01-17 10:00:58 INFO [FanOutWorker/runSubtask][inv_10bPiFTjBUXX35qtzOTPNr0vfgoYYVehpf] my.example.parallelizework.utils.Utils - Started executing subtask: make coffee2025-01-17 10:00:58 INFO [FanOutWorker/runSubtask][inv_1115lzidXq7M7CtLZn0aEyUvMC4zkXYLWF] dev.restate.sdk.core.InvocationStateMachine - Start invocation2025-01-17 10:00:58 INFO [FanOutWorker/runSubtask][inv_1115lzidXq7M7CtLZn0aEyUvMC4zkXYLWF] my.example.parallelizework.utils.Utils - Started executing subtask: have breakfast2025-01-17 10:00:59 INFO [FanOutWorker/runSubtask][inv_162MCD5ertQ65pdG0uDIYRMgLBFYZkNPnb] my.example.parallelizework.utils.Utils - Execution subtask finished: shower2025-01-17 10:00:59 INFO [FanOutWorker/runSubtask][inv_162MCD5ertQ65pdG0uDIYRMgLBFYZkNPnb] dev.restate.sdk.core.InvocationStateMachine - End invocation2025-01-17 10:01:00 INFO [FanOutWorker/runSubtask][inv_1kdpBvVXdqyo3saU6KThul6Jgkfot6LcRP] my.example.parallelizework.utils.Utils - Execution subtask finished: get out of bed2025-01-17 10:01:00 INFO [FanOutWorker/runSubtask][inv_1kdpBvVXdqyo3saU6KThul6Jgkfot6LcRP] dev.restate.sdk.core.InvocationStateMachine - End invocation2025-01-17 10:01:04 INFO [FanOutWorker/runSubtask][inv_10bPiFTjBUXX35qtzOTPNr0vfgoYYVehpf] my.example.parallelizework.utils.Utils - Execution subtask finished: make coffee2025-01-17 10:01:04 INFO [FanOutWorker/runSubtask][inv_10bPiFTjBUXX35qtzOTPNr0vfgoYYVehpf] dev.restate.sdk.core.InvocationStateMachine - End invocation2025-01-17 10:01:05 INFO [FanOutWorker/runSubtask][inv_1115lzidXq7M7CtLZn0aEyUvMC4zkXYLWF] my.example.parallelizework.utils.Utils - Execution subtask finished: have breakfast2025-01-17 10:01:05 INFO [FanOutWorker/runSubtask][inv_1115lzidXq7M7CtLZn0aEyUvMC4zkXYLWF] dev.restate.sdk.core.InvocationStateMachine - End invocation2025-01-17 10:01:05 INFO [FanOutWorker/run][inv_1jNoSMJtWluo4Ir43OUyDAxD9weMAQ4OeR] my.example.parallelizework.utils.Utils - Aggregated result: get out of bed: DONE, shower: DONE, make coffee: DONE, have breakfast: DONE2025-01-17 10:01:05 INFO [FanOutWorker/run][inv_1jNoSMJtWluo4Ir43OUyDAxD9weMAQ4OeR] dev.restate.sdk.core.InvocationStateMachine - End invocation
[2025-01-17 12:00:05,183] [12245] [INFO] - Started executing subtask: get out of bed[2025-01-17 12:00:05,184] [12247] [INFO] - Started executing subtask: shower[2025-01-17 12:00:05,184] [12245] [INFO] - Started executing subtask: make coffee[2025-01-17 12:00:05,185] [12245] [INFO] - Started executing subtask: have breakfast[2025-01-17 12:00:05,188] [12245] [INFO] - Execution subtask finished: make coffee[2025-01-17 12:00:08,193] [12245] [INFO] - Execution subtask finished: get out of bed[2025-01-17 12:00:10,194] [12247] [INFO] - Execution subtask finished: shower[2025-01-17 12:00:15,196] [12245] [INFO] - Execution subtask finished: have breakfast[2025-01-17 12:00:15,198] [12245] [INFO] - Aggregated result: get out of bed: DONE,shower: DONE,make coffee: DONE,have breakfast: DONE
2025/01/16 16:41:22 INFO Handling invocation method=FanOutWorker/Run invocationID=inv_1lkcVTBmCorR3fSPhE0pNTiO8XFXoV34C52025/01/16 16:41:22 INFO Handling invocation method=FanOutWorker/RunSubtask invocationID=inv_1jpZWOrDK45b2ZwWCapl68GgXzNfoOh0BP2025/01/16 16:41:22 Started executing subtask: get out of bed2025/01/16 16:41:22 INFO Handling invocation method=FanOutWorker/RunSubtask invocationID=inv_10eVGnmjP1ET4PgI3z82rvXcVlCnSOep3P2025/01/16 16:41:22 Started executing subtask: shower2025/01/16 16:41:22 INFO Handling invocation method=FanOutWorker/RunSubtask invocationID=inv_1i3RduoDMNnb4ideAtWaWCLRDaIO62eghP2025/01/16 16:41:22 Started executing subtask: make coffee2025/01/16 16:41:22 INFO Handling invocation method=FanOutWorker/RunSubtask invocationID=inv_142WnXnWDxfy6k4JanZ7DQVqAL6zmktuxP2025/01/16 16:41:22 Started executing subtask: have breakfast2025/01/16 16:41:24 Execution subtask finished: get out of bed2025/01/16 16:41:24 INFO Invocation completed successfully method=FanOutWorker/RunSubtask invocationID=inv_1jpZWOrDK45b2ZwWCapl68GgXzNfoOh0BP2025/01/16 16:41:25 Execution subtask finished: shower2025/01/16 16:41:25 INFO Invocation completed successfully method=FanOutWorker/RunSubtask invocationID=inv_10eVGnmjP1ET4PgI3z82rvXcVlCnSOep3P2025/01/16 16:41:25 Execution subtask finished: have breakfast2025/01/16 16:41:25 INFO Invocation completed successfully method=FanOutWorker/RunSubtask invocationID=inv_142WnXnWDxfy6k4JanZ7DQVqAL6zmktuxP2025/01/16 16:41:26 Execution subtask finished: make coffee2025/01/16 16:41:26 INFO Invocation completed successfully method=FanOutWorker/RunSubtask invocationID=inv_1i3RduoDMNnb4ideAtWaWCLRDaIO62eghP2025/01/16 16:41:26 Aggregated result: get out of bed: DONE,shower: DONE,have breakfast: DONE,make coffee: DONE2025/01/16 16:41:26 INFO Invocation completed successfully method=FanOutWorker/Run invocationID=inv_1lkcVTBmCorR3fSPhE0pNTiO8XFXoV34C5