- TypeScript
- Java
- Kotlin
- Python
Async tasks
Flexible, durable scheduling across processes and time.
Restate lets you write resilient scheduling logic with the flexibility of code: delay execution, re-attach to ongoing tasks, fan out/in, etc. Restate runs your tasks to completion exactly once.
Durable timers
(Delayed) task queue
Switch between async and sync
Async tasks with Restate
Schedule tasks for now or later with the Restate SDK.
- TypeScript
- Java
- Kotlin
- Python
Execute any handler async
Every handler in Restate is executed asynchronously and can be treated as a reliable asynchronous task. No matter whether it is a simple function, or a complex workflow. Restate persists the requests to this handler and makes sure they run to completion. Restate handles retries and recovery upon failures.
Execute any handler async
Every handler in Restate is executed asynchronously and can be treated as a reliable asynchronous task. No matter whether it is a simple function, or a complex workflow. Restate persists the requests to this handler and makes sure they run to completion. Restate handles retries and recovery upon failures.
Schedule tasks reliably
Schedule tasks asynchronously, by using Restate as message queue. Restate reliably queues them, also under backpressure/load.
Handlers can be called asynchronously from anywhere. This returns a task handle once the call in enqueued.
Schedule tasks reliably
Schedule tasks asynchronously, by using Restate as message queue. Restate reliably queues them, also under backpressure/load.
Handlers can be called asynchronously from anywhere. This returns a task handle once the call in enqueued.
Idempotent task submission
Use an idempotency key to ensure that the task is only scheduled once. Restate will deduplicate the request and return the previous response.
Idempotent task submission
Use an idempotency key to ensure that the task is only scheduled once. Restate will deduplicate the request and return the previous response.
Latch on to the task
For requests with an idempotency key, you can use this task handle to latch on to the task later and retrieve the result, or wait for it to finish.
Latch on to the task
For requests with an idempotency key, you can use this task handle to latch on to the task later and retrieve the result, or wait for it to finish.
This works across processes, so you can have a separate process latch on to the task later.
This works across processes, so you can have a separate process latch on to the task later.
const asyncTaskService = restate.service({name: "taskWorker",handlers: {runTask: async (ctx: Context, params: TaskOpts) => {return someHeavyWork(params);},},});export type AsyncTaskService = typeof asyncTaskService;restate.endpoint().bind(asyncTaskService).listen(9080);
async function submitAndAwaitTask(task: TaskOpts) {const restateClient = restate.connect({ url: RESTATE_URL });const taskHandle = await restateClient.serviceSendClient<AsyncTaskService>({ name: "taskWorker" }).runTask(task,SendOpts.from({ idempotencyKey: "dQw4w9WgXcQ" }));// await the handler's resultconst result = await restateClient.result(taskHandle);}async function attachToTask(taskHandle: string) {const rs = restate.connect({ url: RESTATE_URL });const result = await rs.result<string>(JSON.parse(taskHandle));}
Execute any handler async
Every handler in Restate is executed asynchronously and can be treated as a reliable asynchronous task. No matter whether it is a simple function, or a complex workflow. Restate persists the requests to this handler and makes sure they run to completion. Restate handles retries and recovery upon failures.
Execute any handler async
Every handler in Restate is executed asynchronously and can be treated as a reliable asynchronous task. No matter whether it is a simple function, or a complex workflow. Restate persists the requests to this handler and makes sure they run to completion. Restate handles retries and recovery upon failures.
Schedule tasks reliably
Schedule tasks asynchronously, by using Restate as message queue. Restate reliably queues them, also under backpressure/load.
Handlers can be called asynchronously from anywhere. This returns a task handle once the call in enqueued.
Schedule tasks reliably
Schedule tasks asynchronously, by using Restate as message queue. Restate reliably queues them, also under backpressure/load.
Handlers can be called asynchronously from anywhere. This returns a task handle once the call in enqueued.
Idempotent task submission
Use an idempotency key to ensure that the task is only scheduled once. Restate will deduplicate the request and return the previous response.
Idempotent task submission
Use an idempotency key to ensure that the task is only scheduled once. Restate will deduplicate the request and return the previous response.
Latch on to the task
For requests with an idempotency key, you can use this task handle to latch on to the task later and retrieve the result, or wait for it to finish.
This works across processes, so you can have a separate process latch on to the task later.
Latch on to the task
For requests with an idempotency key, you can use this task handle to latch on to the task later and retrieve the result, or wait for it to finish.
This works across processes, so you can have a separate process latch on to the task later.
@Servicepublic class AsyncTaskService {@Handlerpublic String runTask(Context ctx, TaskOpts params) {return someHeavyWork(params);}}
public class TaskSubmitter {private static final Client rs = Client.connect("http://localhost:8080");public void submitAndAwaitTasks(TaskOpts taskOpts) {SendResponse handle =AsyncTaskServiceClient.fromClient(rs).send().runTask(taskOpts,CallRequestOptions.DEFAULT.withIdempotency("dQw4w9WgXcQ"));// await the handler's resultString result = rs.invocationHandle(handle.getInvocationId(), JsonSerdes.STRING).attach();}}
Execute any handler async
Every handler in Restate is executed asynchronously and can be treated as a reliable asynchronous task. No matter whether it is a simple function, or a complex workflow. Restate persists the requests to this handler and makes sure they run to completion. Restate handles retries and recovery upon failures.
Execute any handler async
Every handler in Restate is executed asynchronously and can be treated as a reliable asynchronous task. No matter whether it is a simple function, or a complex workflow. Restate persists the requests to this handler and makes sure they run to completion. Restate handles retries and recovery upon failures.
Schedule tasks reliably
Schedule tasks asynchronously, by using Restate as message queue. Restate reliably queues them, also under backpressure/load.
Handlers can be called asynchronously from anywhere. This returns a task handle once the call in enqueued.
Schedule tasks reliably
Schedule tasks asynchronously, by using Restate as message queue. Restate reliably queues them, also under backpressure/load.
Handlers can be called asynchronously from anywhere. This returns a task handle once the call in enqueued.
Idempotent task submission
Use an idempotency key to ensure that the task is only scheduled once. Restate will deduplicate the request and return the previous response.
Idempotent task submission
Use an idempotency key to ensure that the task is only scheduled once. Restate will deduplicate the request and return the previous response.
Latch on to the task
For requests with an idempotency key, you can use this task handle to latch on to the task later and retrieve the result, or wait for it to finish.
This works across processes, so you can have a separate process latch on to the task later.
Latch on to the task
For requests with an idempotency key, you can use this task handle to latch on to the task later and retrieve the result, or wait for it to finish.
This works across processes, so you can have a separate process latch on to the task later.
@Serviceclass AsyncTaskService {@Handlersuspend fun runTask(ctx: Context, params: TaskOpts): String {return someHeavyWork(params)}}
Execute any handler async
Every handler in Restate can be treated as a reliable asynchronous task. No matter whether it is a simple function, or a complex workflow. Restate persists the requests to this handler and makes sure they run to completion. Restate handles retries and recovery upon failures.
Execute any handler async
Every handler in Restate can be treated as a reliable asynchronous task. No matter whether it is a simple function, or a complex workflow. Restate persists the requests to this handler and makes sure they run to completion. Restate handles retries and recovery upon failures.
Schedule tasks reliably
Schedule async tasks by using Restate as message queue. Restate reliably queues them, also under backpressure/load.
Handlers can be called asynchronously from anywhere. This returns a task handle once the call in enqueued.
Schedule tasks reliably
Schedule async tasks by using Restate as message queue. Restate reliably queues them, also under backpressure/load.
Handlers can be called asynchronously from anywhere. This returns a task handle once the call in enqueued.
Idempotent task submission
Use an idempotency key to ensure that the task is only scheduled once. Restate will deduplicate the request and return the previous response.
Idempotent task submission
Use an idempotency key to ensure that the task is only scheduled once. Restate will deduplicate the request and return the previous response.
Latch on to the task
For requests with an idempotency key, you can use this task handle to latch on to the task later and retrieve the result, or wait for it to finish.
This works across processes, so you can have a separate process latch on to the task later.
Latch on to the task
For requests with an idempotency key, you can use this task handle to latch on to the task later and retrieve the result, or wait for it to finish.
This works across processes, so you can have a separate process latch on to the task later.
async_task_service = Service("taskWorker")@async_task_service.handler("runTask")async def run_task(ctx: Context, params: TaskOpts):return some_heavy_work(params)app = restate.app([async_task_service])
def submit_and_await_task(task: TaskOpts):idempotency_key = task["id"]headers = {"idempotency-key": idempotency_key,"Content-Type": "application/json"}url = f"{RESTATE_URL}/taskWorker/runTask/send"requests.post(url, json=json.dumps(task), headers=headers)# Do something else, with task running in the background# Attach back to the task to retrieve the resultattach_url = f"{RESTATE_URL}/restate/invocation/taskWorker/runTask/{idempotency_key}/attach"response = requests.get(attach_url)
LOW-LATENCY
Restate’s event-driven foundation built in Rust lets you queue events. Restate pushes them to your functions at high speed.
DURABLE EXECUTION
Restate makes sure all tasks run to completion. It keeps track of timers, handles retries and recovery upon failures, and ensures that tasks are executed exactly once.
Parallelizing work with Restate
Write flexible scheduling logic via durable building blocks.
Restate makes it easy to parallelize async work by fanning out tasks. Afterwards, you can collect the result by fanning in the partial results. Durable Execution ensures that the fan-out and fan-in steps happen reliably exactly once.
- TypeScript
- Java
- Kotlin
- Python
Fan out
Fan out tasks by calling the subtask handler for each subtask. Every handler is an asynchronous task, for which Restate serves as the queue.
The subtasks might run in different processes, if this is deployed in a parallel setup.
Fan out
Fan out tasks by calling the subtask handler for each subtask. Every handler is an asynchronous task, for which Restate serves as the queue.
The subtasks might run in different processes, if this is deployed in a parallel setup.
Fan in
Invocations produce durable promises that can be awaited and combined. Fan in by simply awaiting the combined promise. Invocation promises recover from failures, re-connect to running subtasks.
Fan in
Invocations produce durable promises that can be awaited and combined. Fan in by simply awaiting the combined promise. Invocation promises recover from failures, re-connect to running subtasks.
Server(less)
Deploy this service on an platform like Kubernetes or AWS Lambda to automatically get parallel scale out.
Server(less)
Deploy this service on an platform like Kubernetes or AWS Lambda to automatically get parallel scale out.
const workerService = restate.service({name: "worker",handlers: {run: async (ctx: Context, task: Task) => {// Split the task in subtasksconst subtasks: SubTask[] = await ctx.run("split task", () =>split(task));const resultPromises = [];for (const subtask of subtasks) {const subResultPromise = ctx.serviceClient(workerService).runSubtask(subtask);resultPromises.push(subResultPromise);}const results = await CombineablePromise.all(resultPromises);return aggregate(results);},runSubtask: async (ctx: Context, subtask: SubTask) => {// Processing logic goes here ...// Can be moved to a separate service to scale independently},},});export const handler = restate.endpoint().bind(workerService).handler();
Fan out
Fan out tasks by calling the subtask handler for each subtask. Every handler is an asynchronous task, for which Restate serves as the queue.
The subtasks might run in different processes, if this is deployed in a parallel setup.
Fan out
Fan out tasks by calling the subtask handler for each subtask. Every handler is an asynchronous task, for which Restate serves as the queue.
The subtasks might run in different processes, if this is deployed in a parallel setup.
Fan in
Invocations produce durable promises that can be awaited and combined. Fan in by simply awaiting the combined promise. Invocation promises recover from failures, re-connect to running subtasks.
Fan in
Invocations produce durable promises that can be awaited and combined. Fan in by simply awaiting the combined promise. Invocation promises recover from failures, re-connect to running subtasks.
Server(less)
Deploy this service on an platform like Kubernetes or AWS Lambda to automatically get parallel scale out.
Server(less)
Deploy this service on an platform like Kubernetes or AWS Lambda to automatically get parallel scale out.
@Servicepublic class FanOutWorker {@Handlerpublic Result run(Context ctx, Task task) {// Split the task in subtasksSubTask[] subTasks = ctx.run(JacksonSerdes.of(new TypeReference<>() {}), () -> split(task));List<Awaitable<?>> resultFutures = new ArrayList<>();for (SubTask subTask : subTasks) {Awaitable<SubTaskResult> subResultFuture =FanOutWorkerClient.fromContext(ctx).runSubtask(subTask);resultFutures.add(subResultFuture);}Awaitable.all(resultFutures).await();SubTaskResult[] results =(SubTaskResult[]) resultFutures.stream().map(Awaitable::await).toArray();return aggregate(results);}@Handlerpublic SubTaskResult runSubtask(Context ctx, SubTask subTask) {// Processing logic goes here ...// Can be moved to a separate service to scale independentlyreturn new SubTaskResult();}}
Fan out
Fan out tasks by calling the subtask handler for each subtask. Every handler is an asynchronous task, for which Restate serves as the queue.
The subtasks might run in different processes, if this is deployed in a parallel setup.
Fan out
Fan out tasks by calling the subtask handler for each subtask. Every handler is an asynchronous task, for which Restate serves as the queue.
The subtasks might run in different processes, if this is deployed in a parallel setup.
Fan in
Invocations produce durable promises that can be awaited and combined. Fan in by simply awaiting the combined promise. Invocation promises recover from failures, re-connect to running subtasks.
Fan in
Invocations produce durable promises that can be awaited and combined. Fan in by simply awaiting the combined promise. Invocation promises recover from failures, re-connect to running subtasks.
Server(less)
Deploy this service on an platform like Kubernetes or AWS Lambda to automatically get parallel scale out.
Server(less)
Deploy this service on an platform like Kubernetes or AWS Lambda to automatically get parallel scale out.
@Serviceclass FanOutWorker {@Handlersuspend fun run(ctx: Context, task: Task): Result {val subTasks = ctx.runBlock { split(task) }val resultFutures: MutableList<Awaitable<SubTaskResult>> = mutableListOf()for (subTask in subTasks) {val subResultFuture = FanOutWorkerClient.fromContext(ctx).runSubtask(subTask)resultFutures.add(subResultFuture)}val results = resultFutures.awaitAll()return aggregate(results)}@Handlersuspend fun runSubtask(ctx: Context?, subTask: SubTask?): SubTaskResult {// Processing logic goes here ...// Can be moved to a separate service to scale independentlyreturn SubTaskResult()}}
Fan out
Fan out tasks by calling the subtask handler for each subtask. Every handler is an asynchronous task, for which Restate serves as the queue.
The subtasks might run in different processes, if this is deployed in a parallel setup.
Fan out
Fan out tasks by calling the subtask handler for each subtask. Every handler is an asynchronous task, for which Restate serves as the queue.
The subtasks might run in different processes, if this is deployed in a parallel setup.
Fan in
Invocations produce durable promises that can be awaited and combined. Fan in by simply awaiting the combined promise. Invocation promises recover from failures, re-connect to running subtasks.
Fan in
Invocations produce durable promises that can be awaited and combined. Fan in by simply awaiting the combined promise. Invocation promises recover from failures, re-connect to running subtasks.
Server(less)
Deploy this service on an platform like Kubernetes or AWS Lambda to automatically get parallel scale out.
Server(less)
Deploy this service on an platform like Kubernetes or AWS Lambda to automatically get parallel scale out.
worker_service = Service("worker")@worker_service.handler()async def run(ctx: Context, task: Task):subtasks = await ctx.run("split task", lambda: split(task))result_promises = []for subtask in subtasks:sub_result_promise = ctx.service_call(run_subtask, arg=subtask)result_promises.append(sub_result_promise)results = [await promise for promise in result_promises]return aggregate(results)@worker_service.handler()async def run_subtask(ctx: Context, subtask: Subtask):# Processing logic goes here...# Can be moved to a separate service to scale independentlypassapp = restate.app([worker_service])
Restate as sophisticated task queue
Restate is built as an event-driven foundation, and therefore supports task queues by design.
Async tasks run like any other function in your infrastructure: on K8S, FaaS, or mix-and-match.
No need to spin up extra infrastructure or message queues.
Switch between async and sync with Restate
- TypeScript
- Java
- Kotlin
- Python
Imagine a data preparation workflow that creates an S3 bucket, uploads a file to it, and then returns the URL.
Let's now kick off this workflow from another process.
Imagine a data preparation workflow that creates an S3 bucket, uploads a file to it, and then returns the URL.
Let's now kick off this workflow from another process.
- Connect to the Restate server and create a client for the data preparation workflow.
- Connect to the Restate server and create a client for the data preparation workflow.
- Kick off a new data preparation workflow. This is idempotent per workflow ID.
- Kick off a new data preparation workflow. This is idempotent per workflow ID.
- Wait for the result for 30 seconds.
- Wait for the result for 30 seconds.
- If it takes longer, rewire the workflow to send an email instead. If returns within 30 seconds, process the URL directly.
- If it takes longer, rewire the workflow to send an email instead. If returns within 30 seconds, process the URL directly.
- This is implemented in the data preparation workflow by letting the workflow signal our handler when it's done. It does this by resolving a shared Durable Promise that we then retrieve in our handler to send the email.
- This is implemented in the data preparation workflow by letting the workflow signal our handler when it's done. It does this by resolving a shared Durable Promise that we then retrieve in our handler to send the email.
const dataPreparationService = restate.workflow({name: "dataPrep",handlers: {run: async (ctx: WorkflowContext) => {const url = await ctx.run(() => createS3Bucket());await ctx.run(() => uploadData(url));await ctx.promise<URL>("url").resolve(url);return url;},resultAsEmail: async (ctx: WorkflowSharedContext,req: { email: string }) => {const url = await ctx.promise<URL>("url");await ctx.run(() => sendEmail(url, req.email));},},});export type DataPrepService = typeof dataPreparationService;
const rs = restate.connect({ url: RESTATE_URL });const dataPrepService: DataPrepService = { name: "dataPrep" };async function downloadData(user: { id: string, email: string }) {const dataPrep = rs.workflowClient(dataPrepService, user.id);await dataPrep.workflowSubmit();const result = await withTimeout(dataPrep.workflowAttach(), 30_000);if (result === Timeout) {// Hit timeout... Mail us the link laterawait dataPrep.resultAsEmail({ email: user.email });return;}// ... process directly ...}
Imagine a data preparation workflow that creates an S3 bucket, uploads a file to it, and then returns the URL.
Let's now kick off this workflow from another process.
Imagine a data preparation workflow that creates an S3 bucket, uploads a file to it, and then returns the URL.
Let's now kick off this workflow from another process.
- Connect to the Restate server and create a client for the data preparation workflow.
- Connect to the Restate server and create a client for the data preparation workflow.
- Kick off a new data preparation workflow. This is idempotent per workflow ID.
- Kick off a new data preparation workflow. This is idempotent per workflow ID.
- Wait for the result for 30 seconds.
- Wait for the result for 30 seconds.
- If it takes longer, rewire the workflow to send an email instead. If returns within 30 seconds, process the URL directly.
- If it takes longer, rewire the workflow to send an email instead. If returns within 30 seconds, process the URL directly.
- This is implemented in the data preparation workflow by letting the workflow signal our handler when it's done. It does this by resolving a shared Durable Promise that we then retrieve in our handler to send the email.
- This is implemented in the data preparation workflow by letting the workflow signal our handler when it's done. It does this by resolving a shared Durable Promise that we then retrieve in our handler to send the email.
@Workflowpublic class DataPreparationService {private static final DurablePromiseKey<URL> URL_PROMISE =DurablePromiseKey.of("url", JacksonSerdes.of(URL.class));@Workflowpublic URL run(WorkflowContext ctx) {URL url = ctx.run(JacksonSerdes.of(URL.class), () -> createS3Bucket());ctx.run(() -> uploadData(url));ctx.promiseHandle(URL_PROMISE).resolve(url);return url;}@Sharedpublic void resultAsEmail(SharedWorkflowContext ctx, Email email) {URL url = ctx.promise(URL_PROMISE).awaitable().await();ctx.run(() -> sendEmail(url, email));}}
public void downloadData(String userId, Email email) {Client rs = Client.connect("http://localhost:8080");IngressClient uploadClient = DataPreparationServiceClient.fromClient(rs, userId);uploadClient.submit();try {uploadClient.workflowHandle().attachAsync().orTimeout(30, TimeUnit.SECONDS).join();} catch (Exception e) {if (e.getCause() instanceof TimeoutException) {uploadClient.resultAsEmail(email);return;}throw e;}// ... process directly ...}
Imagine a data preparation workflow that creates an S3 bucket, uploads a file to it, and then returns the URL.
Let's now kick off this workflow from another process.
Imagine a data preparation workflow that creates an S3 bucket, uploads a file to it, and then returns the URL.
Let's now kick off this workflow from another process.
- Connect to the Restate server and create a client for the data preparation workflow.
- Connect to the Restate server and create a client for the data preparation workflow.
- Kick off a new data preparation workflow. This is idempotent per workflow ID.
- Kick off a new data preparation workflow. This is idempotent per workflow ID.
- Wait for the result for 30 seconds.
- Wait for the result for 30 seconds.
- If it takes longer, rewire the workflow to send an email instead. If returns within 30 seconds, process the URL directly.
- If it takes longer, rewire the workflow to send an email instead. If returns within 30 seconds, process the URL directly.
- This is implemented in the data preparation workflow by letting the workflow signal our handler when it's done. It does this by resolving a shared Durable Promise that we then retrieve in our handler to send the email.
- This is implemented in the data preparation workflow by letting the workflow signal our handler when it's done. It does this by resolving a shared Durable Promise that we then retrieve in our handler to send the email.
@Workflowclass DataPreparationService {companion object {private val URL_PROMISE = DurablePromiseKey.of("url", KtSerdes.json<URL>())}@Workflowsuspend fun run(ctx: WorkflowContext): URL {val url: URL = ctx.runBlock { createS3Bucket() }ctx.runBlock { uploadData(url) }ctx.promiseHandle(URL_PROMISE).resolve(url)return url}@Sharedsuspend fun resultAsEmail(ctx: SharedWorkflowContext, email: Email) {val url: URL = ctx.promise(URL_PROMISE).awaitable().await()ctx.runBlock { sendEmail(url, email) }}}
suspend fun downloadData(userId: String, email: Email) {val rs: Client = Client.connect("http://localhost:8080")val client = DataPreparationServiceClient.fromClient(rs, userId)client.submit()try {client.workflowHandle().attachAsync().orTimeout(30, TimeUnit.SECONDS).join()} catch (e: Exception) {client.resultAsEmail(email)return}// ... process directly ...}
Send a request to Restate to kick off a new data preparation workflow. This is idempotent per workflow ID.
Wait for the result for 30 seconds.
Send a request to Restate to kick off a new data preparation workflow. This is idempotent per workflow ID.
Wait for the result for 30 seconds.
If it takes too long, rewire the workflow to send an email instead. If returns within 30 seconds, process the URL directly.
If it takes too long, rewire the workflow to send an email instead. If returns within 30 seconds, process the URL directly.
This is implemented in the data preparation workflow by letting the workflow signal our handler when it's done. It does this by resolving a shared Durable Promise that we then retrieve in our handler to send the email.
This is implemented in the data preparation workflow by letting the workflow signal our handler when it's done. It does this by resolving a shared Durable Promise that we then retrieve in our handler to send the email.
def upload_data(user: User):headers = {"Content-Type": "application/json"}try:url = f"{RESTATE_URL}/dataPrep/{user.id}/run/send"data_prep = requests.post(url, headers=headers, timeout=30)except requests.exceptions.Timeout:# Hit timeout... Mail us the link lateremail_url = f"{RESTATE_URL}/dataPrep/{user.id}/resultAsEmail/send"requests.post(email_url, json=user.email, headers=headers)return# ... process result directly ...
data_preparation = Workflow("dataPrep")@data_preparation.main()async def run(ctx: WorkflowContext) -> str:url = await ctx.run("bucket creation", lambda: create_s3_bucket())await ctx.run("upload", lambda: upload_data(url))await ctx.promise("url").resolve(url)return url@data_preparation.handler("resultAsEmail")async def result_as_email(ctx: WorkflowSharedContext, email: str):url = await ctx.promise("url").value()await ctx.run("email", lambda: send_email(url, email))
What you can build with Async Tasks and Restate
Job scheduler
A job scheduler that can handle both immediate and delayed jobs. The scheduler makes sure job submissions are idempotent and that jobs run to completion exactly once. You can attach back to jobs to retrieve their result later.