Step Primitives
Steps are the building blocks of workflows. Each step is memoized—if a workflow restarts, completed steps aren’t re-executed. This provides effectively-once execution via idempotent memoization (NATS JetStream is at-least-once; memoization makes step results stable across retries).
step.run(name, fn) — Execute and memoize
Executes a function and caches the result. If the workflow restarts, the cached result is returned instead of re-executing.
const user = await step.run("fetch-user", async () => { return await db.users.find(event.data.userId);});var d OrderEventif err := ctx.Event.Data(&d); err != nil { return nil, err}user, err := ironflow.Run(ctx, "fetch-user", func() (User, error) { return db.Users.Find(d.UserID)})if err != nil { return nil, err}Key behaviors:
- Results are persisted to the database
- On workflow restart, cached results are returned
- Step names must be unique within a function
- Return value is JSON-serializable
step.sleep(name, duration) — Pause execution
Pauses the workflow for a specified duration. The pause is durable—if the server restarts, the workflow resumes after the remaining time.
await step.sleep("wait-24h", "24h"); // "1h", "30m", "7d"Duration formats:
"30s"— 30 seconds"5m"— 5 minutes"2h"— 2 hours"7d"— 7 days
if err := ironflow.Sleep(ctx, "wait-24h", 24*time.Hour); err != nil { return nil, err}Duration examples:
30 * time.Second5 * time.Minute2 * time.Hour7 * 24 * time.Hour— 7 days
step.sleepUntil(name, until) — Pause until a specific time
Pauses the workflow until a specific date/time. Like step.sleep, the pause is durable and survives server restarts.
// Sleep until a specific ISO 8601 timestamp (must be in the future)const target = new Date(Date.now() + 24 * 60 * 60 * 1000).toISOString(); // 24h from nowawait step.sleepUntil("wait-24h", target);
// Or pass a Date objectconst tomorrow = new Date(Date.now() + 24 * 60 * 60 * 1000);await step.sleepUntil("wait-until-tomorrow", tomorrow.toISOString());target := time.Now().Add(24 * time.Hour) // 24h from nowif err := ironflow.SleepUntil(ctx, "wait-24h", target); err != nil { return nil, err}Key behaviors:
- Accepts an ISO 8601 string (TypeScript) or
time.Time(Go) - TS SDK throws synchronously if the target is in the past; Go SDK currently lacks this check — caller is responsible
- Results are memoized like all other steps
step.waitForEvent(name, filter) — Wait for correlated event
Pauses the workflow until a matching event arrives or the timeout expires.
const approval = await step.waitForEvent("wait-approval", { event: "order.approved", match: "data.orderId", // matches event.data.orderId timeout: "7d",});
// The handler only resumes when a matching event arrives.// If the timeout expires first, the run is failed server-side with// "waitForEvent timed out" — this line is never reached on timeout.console.log("Approved by:", approval.data.approvedBy);approval, err := ironflow.WaitForEvent[any](ctx, "wait-approval", ironflow.EventFilter{ Event: "order.approved", Match: "data.orderId", // matches event.data.orderId Timeout: 7 * 24 * time.Hour,})if err != nil { return nil, err}
// The handler only resumes when a matching event arrives. If the timeout// expires first, the run is failed server-side with "waitForEvent timed out"// — this code is never reached on timeout.var data struct { ApprovedBy string `json:"approvedBy"`}approval.Data(&data)fmt.Printf("Approved by: %s\n", data.ApprovedBy)Options:
| Option | Type | Required | Description |
|---|---|---|---|
event | string | yes | Event name to wait for |
match | string | no | Field path to correlate events (e.g., "data.orderId") |
timeout | TS string | number (ms) / Go time.Duration | no | Maximum time to wait (default "7d") |
step.parallel(name, branches) — Execute branches concurrently
Run multiple operations in parallel with isolated step contexts:
const [userResult, orderResult, inventoryResult] = await step.parallel( "fetch-all-data", [ async (s) => s.run("fetch-user", async () => fetchUser(userId)), async (s) => s.run("fetch-order", async () => fetchOrder(orderId)), async (s) => s.run("check-inventory", async () => checkInventory(productId)), ], { concurrency: 2, // Max 2 branches running at once onError: "failFast", // Stop on first error (default) or "allSettled" });// Assume User, Order, Inventory types are defined elsewhere.results, err := ironflow.Parallel(ctx, "fetch-all-data", []func(*ironflow.BranchContext) (any, error){ func(b *ironflow.BranchContext) (any, error) { return ironflow.RunWithBranch(b, "fetch-user", func() (User, error) { return fetchUser(userID) }) }, func(b *ironflow.BranchContext) (any, error) { return ironflow.RunWithBranch(b, "fetch-order", func() (Order, error) { return fetchOrder(orderID) }) }, func(b *ironflow.BranchContext) (any, error) { return ironflow.RunWithBranch(b, "check-inventory", func() (Inventory, error) { return checkInventory(productID) }) }, }, ironflow.ParallelOptions{ Concurrency: 2, // Max 2 branches running at once OnError: "failFast", // "failFast" (default) or "allSettled" },)if err != nil { return nil, err}
user := results[0].(User)order := results[1].(Order)inventory := results[2].(Inventory)Key behaviors:
- Each branch receives a scoped step client for proper memoization
- Results are returned in order regardless of completion order
- Supports concurrency limits to control parallelism
- Two error modes:
failFast(stop immediately) orallSettled(complete all)
Options:
| Option | Type | Default | Description |
|---|---|---|---|
concurrency | number | unlimited | Maximum concurrent branches |
onError | "failFast" | "allSettled" | "failFast" | Error handling strategy |
step.map(name, items, fn) — Fan-out/fan-in pattern
Process an array of items in parallel:
const userIds = ["user_1", "user_2", "user_3"];const users = await step.map( "fetch-all-users", userIds, async (userId, s, index) => { return await s.run(`fetch-user-${index}`, async () => { return await fetchUserDetails(userId); }); }, { concurrency: 5, // Max 5 concurrent operations onError: "allSettled", // Complete all items even if some fail });userIDs := []string{"user_1", "user_2", "user_3"}users, err := ironflow.Map(ctx, "fetch-all-users", userIDs, func(userID string, b *ironflow.BranchContext, index int) (User, error) { return ironflow.RunWithBranch(b, fmt.Sprintf("fetch-user-%d", index), func() (User, error) { return fetchUserDetails(userID) }) }, ironflow.ParallelOptions{ Concurrency: 5, // Max 5 concurrent operations OnError: "allSettled", // Complete all items even if some fail },)if err != nil { return nil, err}Use cases:
- Processing multiple orders simultaneously
- Fetching data from multiple sources
- Sending notifications to multiple users
- Batch processing with controlled parallelism
Options:
| Option | Type | Default | Description |
|---|---|---|---|
concurrency | number | unlimited | Maximum concurrent operations |
onError | "failFast" | "allSettled" | "failFast" | Error handling strategy |
Step Naming Best Practices
Step names must be unique within a function and should be descriptive:
// Good: descriptive, unique namesawait step.run("validate-order-items", ...);await step.run("calculate-shipping-cost", ...);await step.run("process-payment-stripe", ...);
// Bad: generic or duplicate namesawait step.run("step1", ...);await step.run("process", ...);await step.run("process", ...); // Error: duplicate nameGuidelines:
- Use kebab-case for consistency
- Include the action and target (e.g.,
fetch-user,send-email) - Make names unique even in loops (e.g.,
process-item-${index})
Other step primitives
Covered on neighbouring pages:
step.compensate— register an undo for a step; runs in reverse order on failure. See Sagas.step.invoke/step.invokeAsync— call another function from inside a step (sync awaits the result; async fire-and-forget).step.publish— emit an event from inside a step (memoized, so the event fires exactly once even on retry).
What’s Next?
- Execution Modes — Learn about push and pull modes
- Error Handling — Handle errors with NonRetryableError