@ironflow/node
Node.js SDK for Ironflow. Provides workers, serve handlers, step execution, and a client for server interactions.
Installation
Section titled “Installation”The package is public on npm and installs without authentication. For building from source, see the Local Development guide.
npm install @ironflow/nodeQuick Start
Section titled “Quick Start”Define a Function
Section titled “Define a Function”import { ironflow } from '@ironflow/node';
const processOrder = ironflow.createFunction( { id: 'process-order', triggers: [{ event: 'order.placed' }], }, async ({ event, step }) => { // Run a step (automatically memoized) const validated = await step.run('validate', async () => { return validateOrder(event.data); });
// Sleep (durable - survives restarts) await step.sleep('wait-for-inventory', '5m');
// Wait for external event const approval = await step.waitForEvent('wait-approval', { event: 'order.approved', match: 'data.orderId', // JSON path for correlating events timeout: '24h', });
// Process payment const result = await step.run('process-payment', async () => { return processPayment(validated, approval); });
return { success: true, receiptId: result.id }; });Push Mode (Serverless)
Section titled “Push Mode (Serverless)”For serverless environments (Next.js, Vercel, AWS Lambda):
// app/api/ironflow/route.ts (Next.js App Router)import { serve } from '@ironflow/node';
export const POST = serve({ functions: [processOrder], signingKey: process.env.IRONFLOW_SIGNING_KEY,});Pull Mode (Worker)
Section titled “Pull Mode (Worker)”For long-running workers with no timeout limits:
import { createWorker } from '@ironflow/node';
const worker = createWorker({ serverUrl: 'http://localhost:9123', functions: [processOrder], maxConcurrentJobs: 10,});
// Start the worker (blocks until stopped)await worker.start();
// Graceful shutdownprocess.on('SIGTERM', async () => { await worker.drain();});Function Definition
Section titled “Function Definition”createFunction
Section titled “createFunction”Create an Ironflow function.
const myFunction = ironflow.createFunction( { // Required: unique function ID id: 'my-function',
// Optional: display name name: 'My Function',
// Optional: event triggers triggers: [ { event: 'user.created' }, { event: 'order.*' }, { cron: '0 0 * * *' }, // Daily at midnight ],
// Optional: retry configuration retry: { maxAttempts: 3, initialDelayMs: 1000, backoffFactor: 2, maxDelayMs: 60000, },
// Optional: execution timeout in milliseconds (default: 600000) timeout: 1800000,
// Optional: concurrency control concurrency: { limit: 10, key: 'event.data.customerId' },
// Optional: pause behavior for history editing (scoped injection) // 'hold' (default) retains concurrency lane slot while paused // 'release' frees the slot for other runs pauseBehavior: 'hold',
// Optional: declarative cancel-on-event // Auto-cancels in-flight runs when a matching event arrives. // OR semantics across specs. Tenant-isolated by env_id. // See how-to: ../../how-to-guides/workflows/cancel-on-event.md cancelOn: [ { event: 'order.cancelled', match: 'orderId' }, ], }, async ({ event, step, run, logger }) => { // Function implementation return { result: 'success' }; });Function Context
Section titled “Function Context”The handler receives a context object:
interface FunctionContext { // Triggering event event: { id: string; name: string; version: number; data: unknown; timestamp: Date; idempotencyKey?: string; source?: string; metadata?: Record<string, unknown>; };
// Step client for recorded execution step: StepClient;
// Run information run: { id: string; functionId: string; attempt: number; startedAt: Date; };
// Logger instance logger: Logger;
// Resolved environment secrets (read-only) secrets: SecretsClient;}Step Primitives
Section titled “Step Primitives”Execute a step. Results are memoized—if the function restarts, completed steps are skipped.
const result = await step.run('step-id', async () => { // This only runs once, even if function retries return await someApiCall();});Sleep for a duration. Durable—survives function restarts.
// Sleep for 5 minutesawait step.sleep('wait', '5m');
// Supported formats: '30s', '5m', '2h', '1d'sleepUntil
Section titled “sleepUntil”Sleep until a specific time.
await step.sleepUntil('wait-until', new Date('2024-12-25T00:00:00Z'));waitForEvent
Section titled “waitForEvent”Wait for an external event.
const approval = await step.waitForEvent('wait-approval', { // Event name pattern event: 'order.approved',
// Optional: JSON path for correlating events (e.g. "data.orderId") match: 'data.orderId',
// Optional: timeout timeout: '24h',});parallel
Section titled “parallel”Execute multiple branches in parallel.
const [user, orders, payments] = await step.parallel( 'fetch-all', [ (step) => step.run('fetch-user', () => fetchUser(userId)), (step) => step.run('fetch-orders', () => fetchOrders(userId)), (step) => step.run('fetch-payments', () => fetchPayments(userId)), ], { concurrency: 2, onError: 'failFast' } // Optional);Map over items with automatic parallelization.
const results = await step.map( 'process-items', items, async (item, step, index) => { return await step.run(`process-${index}`, () => processItem(item)); }, { concurrency: 5 } // Optional: limit parallel execution);compensate
Section titled “compensate”Register a compensation handler for saga rollbacks. If the function fails after this step, the compensation runs automatically.
await step.run('reserve-inventory', async () => { await reserveInventory(orderId);});
step.compensate('reserve-inventory', async () => { await releaseInventory(orderId);});invoke
Section titled “invoke”Synchronously invoke another function and wait for its result.
const result = await step.invoke<PaymentResult>('process-payment', { orderId, amount,}, { timeout: '30s' });invokeAsync
Section titled “invokeAsync”Invoke another function asynchronously. Returns the run ID without waiting for completion.
const { runId } = await step.invokeAsync('send-notifications', { userId, message,});publish
Section titled “publish”Publish a message to a topic.
const result = await step.publish('order.events', { type: 'order.shipped', orderId,});See Step Primitives Guide for detailed documentation and examples.
Event Data
Section titled “Event Data”Parsing Event Data
Section titled “Parsing Event Data”async ({ event, step }) => { // event.data is typed as unknown — cast to your type const order = event.data as OrderData;
// Access raw event fields const eventName = event.name; // e.g. "order.placed" const eventId = event.id; const eventVersion = event.version; // Schema version (default: 1)
// Access user-defined metadata attached when the event was emitted const traceId = event.metadata?.traceId as string | undefined; const source = event.metadata?.source as string | undefined;
return { orderId: order.id };}Client
Section titled “Client”createClient
Section titled “createClient”Create a client for server interactions:
import { createClient } from '@ironflow/node';
const client = createClient({ serverUrl: 'http://localhost:9123', // default: IRONFLOW_SERVER_URL or http://localhost:9123 apiKey: 'optional-api-key', // for authenticated servers timeout: 30000, // request timeout in ms (default: 30000) onError: (error, context) => { // global error handler (optional) console.error(`[${context.method}] ${error.message}`, context); },});The createClient() function is the recommended way to create an Ironflow client instance.
Global Error Handler (onError)
Section titled “Global Error Handler (onError)”The onError callback fires on every client error before the error is re-thrown. It provides centralized error observation for logging, metrics, and alerting without interfering with normal error propagation.
import { createClient, type ErrorContext, type OnErrorHandler } from '@ironflow/node';
const onError: OnErrorHandler = async (error, context) => { // Log to your observability stack await metrics.increment('ironflow.client.error', { method: context.method, statusCode: String(context.statusCode ?? 'network'), });
logger.error('Ironflow client error', { method: context.method, // e.g. "emit", "streams.append", "apiKeys.get" endpoint: context.endpoint, // e.g. "/ironflow.v1.IronflowService/Trigger" statusCode: context.statusCode, // HTTP status or undefined for network errors error: error.message, });};
const client = createClient({ onError });ErrorContext fields:
| Field | Type | Description |
|---|---|---|
method | string | Client method name (e.g. "emit", "streams.append", "apiKeys.get") |
endpoint | string | ConnectRPC or REST endpoint path |
statusCode | number | undefined | HTTP status code (undefined for network/timeout errors) |
Behavior:
- The callback is awaited before the error is re-thrown
- If the callback throws, the callback error is swallowed (logged to stderr) and the original error is still thrown
- Both sync and async callbacks are supported (
() => void | Promise<void>) - Propagates to sub-clients:
client.kv()andclient.config()inherit the handler
Method naming convention: Top-level methods use their name directly (e.g. "emit", "health"). Namespaced methods use dot notation (e.g. "streams.append", "apiKeys.create", "kv.bucket.get", "config.set").
Event Emission
Section titled “Event Emission”Emit an event to trigger workflows:
const result = await client.emit('order.placed', { orderId: '123', amount: 99.99,});
console.log('Event ID:', result.eventId);console.log('Run IDs:', result.runIds);Emit options:
// Emit with a specific event versionconst result = await client.emit('order.placed', data, { version: 2,});
// Emit with idempotency key (deduplication)const result = await client.emit('payment.processed', data, { idempotencyKey: 'payment-abc',});
// Emit with metadataconst result = await client.emit('order.placed', data, { metadata: { source: 'api' },});emitSync
Section titled “emitSync”Emit an event and wait synchronously for the triggered run to complete. Uses the server-side TriggerSync endpoint, which blocks until the run finishes or the timeout elapses.
Throws RunFailedError if the run fails, RunCancelledError if it is cancelled.
import { RunFailedError, RunCancelledError } from '@ironflow/node';
try { const result = await client.emitSync('order.placed', { orderId: '123', amount: 99.99, });
console.log('Run ID:', result.runId); console.log('Function:', result.functionId); console.log('Status:', result.status); console.log('Output:', result.output); console.log('Duration:', result.durationMs, 'ms');} catch (err) { if (err instanceof RunFailedError) { console.error('Run failed:', err.runId, err.message); } else if (err instanceof RunCancelledError) { console.error('Run was cancelled:', err.runId); }}Options:
| Parameter | Type | Description |
|---|---|---|
eventName | string | Event name to emit |
data | unknown | Event payload |
options.timeout | number | Timeout in ms before the request is aborted (default: 30000) |
Returns: EmitSyncResult
| Field | Type | Description |
|---|---|---|
runId | string | Run ID |
functionId | string | Function ID that handled the event |
status | string | Final run status |
output | unknown | Function return value |
durationMs | number | Total run duration in milliseconds |
Pub/Sub
Section titled “Pub/Sub”publish
Section titled “publish”Publish a message to a developer pub/sub topic. Unlike emit(), this does not trigger functions — it delivers messages directly to topic subscribers.
const result = await client.publish('notifications', { userId: '123', message: 'Your order has shipped!',});
console.log('Event ID:', result.eventId);console.log('Sequence:', result.sequence);Options:
const data = { userId: 'user-123', message: 'Hello!' };const result = await client.publish('notifications', data, { idempotencyKey: 'notif-abc-123', // Optional deduplication key});Returns: PublishResult
| Field | Type | Description |
|---|---|---|
eventId | string | Unique ID for this published message |
sequence | number | JetStream sequence number |
listTopics
Section titled “listTopics”List all active developer pub/sub topics.
const topics = await client.listTopics();for (const t of topics) { console.log(`${t.name}: ${t.messageCount} messages, ${t.consumerCount} consumers`);}Returns: TopicInfo[]
| Field | Type | Description |
|---|---|---|
name | string | Topic name |
messageCount | number | Number of messages in the topic |
consumerCount | number | Number of active consumers |
firstMessageAt | string | undefined | Timestamp of the first message |
lastMessageAt | string | undefined | Timestamp of the most recent message |
getTopicStats
Section titled “getTopicStats”Get detailed statistics for a specific topic, including consumer lag.
const stats = await client.getTopicStats('notifications');console.log('Messages:', stats.messageCount);console.log('Consumers:', stats.consumerCount);console.log('Lag:', stats.lag);console.log('Sequence range:', stats.firstSeq, '→', stats.lastSeq);Returns: TopicStats
| Field | Type | Description |
|---|---|---|
name | string | Topic name |
messageCount | number | Number of messages |
consumerCount | number | Number of active consumers |
lag | number | Messages pending delivery (consumer lag) |
firstSeq | number | First sequence number |
lastSeq | number | Last sequence number |
Run Management
Section titled “Run Management”getRun
Section titled “getRun”Get run details:
const run = await client.getRun('run_xyz789');console.log('Status:', run.status);listRuns
Section titled “listRuns”List runs with filtering:
const result = await client.listRuns({ functionId: 'process-order', status: 'completed', limit: 50, cursor: 'abc123', // pagination});
for (const run of result.runs) { console.log(`Run ${run.id}: ${run.status}`);}console.log('Next page:', result.nextCursor);cancelRun
Section titled “cancelRun”Cancel a running workflow:
const run = await client.cancelRun('run_xyz789', 'no longer needed');retryRun
Section titled “retryRun”Retry a failed run:
const run = await client.retryRun('run_xyz789');resumeRun
Section titled “resumeRun”Resume a paused or failed run:
const run = await client.resumeRun('run_xyz789');History Editing (Scoped Injection)
Section titled “History Editing (Scoped Injection)”// Pause a running workflow at the next step boundaryconst result = await client.pauseRun("run_abc123");// result.status: "pause_requested" or "paused"
// View completed steps while pausedconst state = await client.getPausedState("run_abc123");for (const step of state.steps) { console.log(step.name, step.output, step.injected);}
// Inject modified output for a stepconst prev = await client.injectStepOutput( "run_abc123", "step_xyz", { corrected: true }, "Manual correction");
// Resume the workflowawait client.resumeRun("run_abc123");patchStep
Section titled “patchStep”Hot-patch a step’s output to fix a failed run:
await client.patchStep('step_abc123', { correctedValue: 42 }, 'fixed bad API response');registerFunction
Section titled “registerFunction”Register a function with the server:
await client.registerFunction({ id: 'my-function', name: 'My Function', triggers: [{ event: 'my.event' }], endpointUrl: 'http://localhost:3000/api/ironflow', preferredMode: 'push',});Time-Travel Debugging
Section titled “Time-Travel Debugging”Reconstruct run state at a point in time, inspect step outputs, and walk through a run’s event timeline. Useful for debugging failed or unexpected runs after the fact.
getRunStateAt
Section titled “getRunStateAt”Get the reconstructed state of a run at a specific timestamp.
const state = await client.getRunStateAt('run_xyz789', new Date('2024-06-01T12:00:00Z'));
console.log('Run ID:', state.runId);console.log('Status at timestamp:', state.status);for (const step of state.steps) { console.log(` ${step.name} (${step.id}): ${step.status}`, step.output);}Parameters:
| Parameter | Type | Description |
|---|---|---|
runId | string | Run ID to query |
timestamp | Date | Point in time to reconstruct state at |
Returns: TimeTravelRunState
| Field | Type | Description |
|---|---|---|
runId | string | Run ID |
status | string | Run status at the given timestamp |
steps | Array<{ id, name, status, output }> | Step states at the given timestamp |
timestamp | string | ISO-8601 timestamp of the query |
getRunTimeline
Section titled “getRunTimeline”Get the timeline of audit events for a run, ordered chronologically. Includes step transitions, status changes, and other significant events.
const events = await client.getRunTimeline('run_xyz789');for (const e of events) { console.log(`[${e.timestamp}] ${e.eventType}: ${e.summary}`);}Returns: TimeTravelTimelineEvent[]
| Field | Type | Description |
|---|---|---|
id | string | Event ID |
eventType | string | Type of timeline event (e.g. step.completed) |
stepId | string | Associated step ID (empty if not step-related) |
stepName | string | Associated step name |
summary | string | Human-readable description |
significant | boolean | Whether this event is considered a key milestone |
timestamp | Date | When this event occurred |
getStepOutputAt
Section titled “getStepOutputAt”Get the output of a specific step at a point in time. Useful for comparing step outputs before and after a patch or injection.
const output = await client.getStepOutputAt( 'run_xyz789', 'step_abc123', new Date('2024-06-01T12:05:00Z'));
console.log('Step ID:', output.stepId);console.log('Output at time:', output.output);console.log('Timestamp:', output.timestamp);Parameters:
| Parameter | Type | Description |
|---|---|---|
runId | string | Run ID |
stepId | string | Step ID to query |
timestamp | Date | Point in time to query |
Returns: TimeTravelStepOutput
| Field | Type | Description |
|---|---|---|
stepId | string | Step ID |
output | unknown | Step output at the given timestamp |
timestamp | string | ISO-8601 timestamp of the query |
Audit Trail
Section titled “Audit Trail”getAuditTrail
Section titled “getAuditTrail”Get the audit trail for a run — a list of recorded events capturing what happened at each stage of execution.
const entries = await client.getAuditTrail('run_xyz789');for (const entry of entries) { console.log(`[${entry.timestamp}] ${entry.type}`, entry.data);}Returns: AuditTrailEntry[]
| Field | Type | Description |
|---|---|---|
id | string | Entry ID |
type | string | Audit event type (e.g. run.started, step.completed) |
timestamp | string | ISO-8601 timestamp |
data | unknown | Event-specific payload |
Serve Handler
Section titled “Serve Handler”Create an HTTP handler for push mode.
import { serve } from '@ironflow/node';
const handler = serve({ // Required: functions to serve functions: [fn1, fn2, fn3],
// Optional: signing key for request verification signingKey: process.env.IRONFLOW_SIGNING_KEY,
// Optional: event definitions for automatic upcasting eventDefinitions: registry,
// Optional: custom logger logger: console,
// Optional: Ironflow server URL for emitting webhook events serverUrl: 'http://localhost:9123',
// Optional: webhook sources to handle webhooks: [stripeWebhook],});Framework Integration
Section titled “Framework Integration”Next.js App Router:
import { serve } from '@ironflow/node';
export const POST = serve({ functions: [myFunction] });Express:
import express from 'express';import { createHandler } from '@ironflow/node';
const app = express();
// createHandler is an alias for serve()app.post('/api/ironflow', createHandler({ functions: [myFunction] }));Worker
Section titled “Worker”createWorker
Section titled “createWorker”Create a pull mode worker.
import { createWorker } from '@ironflow/node';
const worker = createWorker({ // Optional: server URL (default: IRONFLOW_SERVER_URL or http://localhost:9123) serverUrl: 'http://localhost:9123',
// Required: functions to execute functions: [fn1, fn2],
// Optional: API key for authentication (default: IRONFLOW_API_KEY env var) apiKey: process.env.IRONFLOW_API_KEY,
// Optional: event definitions for automatic upcasting eventDefinitions: registry,
// Optional: max concurrent jobs (default: 10) maxConcurrentJobs: 10,
// Optional: worker labels for routing labels: { region: 'us-east-1' },
// Optional: heartbeat interval in ms (default: 30000) heartbeatInterval: 30000,
// Optional: reconnect delay in ms (default: 5000) reconnectDelay: 5000,
// Optional: custom logger logger: console,});Worker Methods
Section titled “Worker Methods”// Start the worker (blocks until stopped)await worker.start();
// Graceful drain (wait for active jobs to complete)await worker.drain();
// Force stopworker.stop();createStreamingWorker
Section titled “createStreamingWorker”Create a worker using gRPC bidirectional streaming (lower latency).
Note: Import from the separate entry point to avoid loading protobuf dependencies.
import { createStreamingWorker } from '@ironflow/node/worker-streaming';
const worker = createStreamingWorker({ serverUrl: 'http://localhost:9123', functions: [myFunction],});
await worker.start();Worker with Projections
Section titled “Worker with Projections”Workers can run projection runners alongside functions. The worker automatically starts a ProjectionRunner for each projection, using streaming mode with automatic fallback to polling.
import { createWorker, createProjection } from '@ironflow/node';
const orderStats = createProjection({ name: 'order-stats', events: ['order.created', 'order.completed'], initialState: () => ({ total: 0, completed: 0 }), handler: (state, event) => { if (event.name === 'order.created') return { ...state, total: state.total + 1 }; if (event.name === 'order.completed') return { ...state, completed: state.completed + 1 }; return state; },});
const worker = createWorker({ functions: [processOrder], projections: [orderStats],});
await worker.start();// Projection runners start automatically with streaming-first fallbackProjections
Section titled “Projections”createProjection
Section titled “createProjection”Define a projection for use with a worker.
import { createProjection } from '@ironflow/node';
// Managed projection (pure reducer, Ironflow stores state)const stats = createProjection({ name: 'order-stats', events: ['order.created', 'order.completed'], initialState: () => ({ total: 0 }), handler: (state, event, ctx) => { return { ...state, total: state.total + 1 }; }, // Optional: batchSize: 100, // Events per batch (default: 100) partitionKey: '$.data.customerId', // JSONPath for partitioning});
// External projection (side effects, you manage storage)const searchSync = createProjection({ name: 'search-sync', events: ['product.*'], handler: async (event, ctx) => { await searchIndex.upsert(event.data); }, // No initialState → external mode});createProjectionRunner
Section titled “createProjectionRunner”For advanced use cases, create a standalone projection runner outside of a worker:
import { createProjectionRunner, StreamingUnsupportedError } from '@ironflow/node';
const runner = createProjectionRunner({ projection: myProjection, baseUrl: 'http://localhost:9123', headers: { 'x-ironflow-api-key': 'my-key' }, logger: console, signal: abortController.signal,});
try { // Try streaming first (real-time, lower latency) await runner.startStreaming();} catch (err) { if (err instanceof StreamingUnsupportedError) { // Server doesn't support streaming — fall back to polling await runner.start(); } else { throw err; }}StreamingUnsupportedError
Section titled “StreamingUnsupportedError”Thrown by startStreaming() when the server doesn’t support the StreamProjectionEvents RPC (returns 404 or 501). Catch this to fall back to polling mode.
import { StreamingUnsupportedError } from '@ironflow/node';ProjectionRunnerConfig
Section titled “ProjectionRunnerConfig”interface ProjectionRunnerConfig { projection: IronflowProjection; // Projection definition baseUrl: string; // Ironflow server URL headers: Record<string, string>; // Request headers (auth, etc.) logger: Logger; // Logger instance signal?: AbortSignal; // For graceful shutdown}See the Projections Guide for full documentation on managed vs external projections, partitioning, and real-time subscriptions.
KV Store
Section titled “KV Store”client.kv()
Section titled “client.kv()”Get a KV client for bucket management and key operations:
const kv = client.kv();createBucket
Section titled “createBucket”const info = await kv.createBucket({ name: "sessions", description: "User session data", ttlSeconds: 3600, maxValueSize: 65536, history: 3,});deleteBucket
Section titled “deleteBucket”await kv.deleteBucket("sessions");listBuckets
Section titled “listBuckets”const buckets = await kv.listBuckets();for (const b of buckets) { console.log(`${b.name}: ${b.values} keys, ${b.bytes} bytes`);}getBucketInfo
Section titled “getBucketInfo”const info = await kv.getBucketInfo("sessions");// info.name, info.values, info.bytes, info.history, info.created_atBucket Handle
Section titled “Bucket Handle”Get a bucket handle for key operations:
const bucket = kv.bucket("sessions");const entry = await bucket.get("user:123");// entry.key, entry.value, entry.revision, entry.created_at, entry.operationUnconditional write. Returns the new revision:
const { revision } = await bucket.put("user:123", { name: "Alice" });create (If-Not-Exists)
Section titled “create (If-Not-Exists)”Write only if the key does not exist. Throws on conflict (HTTP 412):
const { revision } = await bucket.create("user:123", { name: "Alice" });update (Compare-and-Swap)
Section titled “update (Compare-and-Swap)”Write only if the revision matches. Throws on mismatch (HTTP 412):
const { revision } = await bucket.update("user:123", newValue, entry.revision);delete
Section titled “delete”Soft-delete (tombstone):
await bucket.delete("user:123");Permanently remove key and all history:
await bucket.purge("user:123");listKeys
Section titled “listKeys”List keys with optional wildcard filter:
const keys = await bucket.listKeys("user.*");// Pass no argument for all keysSee the KV Store Guide for detailed usage patterns.
Config Management
Section titled “Config Management”client.config()
Section titled “client.config()”Get a config client for environment-scoped configuration:
const config = client.config();Full replacement of a config document:
const { revision } = await config.set("app-settings", { theme: "dark", maxRetries: 3,});const entry = await config.get("app-settings");// entry.name, entry.data, entry.revision, entry.updatedAtShallow merge — only specified keys are updated:
const { revision } = await config.patch("app-settings", { maxRetries: 5, timeout: 30,});const configs = await config.list();for (const entry of configs) { console.log(`${entry.name}: rev ${entry.revision}`);}delete
Section titled “delete”Delete a config. Idempotent — succeeds silently if the config does not exist.
await config.delete("app-settings");Watch a config document for real-time changes via WebSocket. Calls onUpdate each time the document is created or updated on the server. Returns a ConfigWatcher handle with a stop() method to close the connection.
Requires Node.js 20+ (uses the built-in global WebSocket).
const watcher = config.watch("app-settings", { onUpdate: (event) => { console.log("Config updated:", event.name); console.log("New data:", event.data); console.log("Revision:", event.revision); console.log("Updated at:", event.updatedAt); }, onError: (err) => { console.error("Watch error:", err.message); }, onClose: () => { console.log("Watch connection closed"); },});
// Stop watching when donewatcher.stop();Parameters:
| Parameter | Type | Description |
|---|---|---|
name | string | Config document name to watch |
callbacks.onUpdate | (event: ConfigWatchEvent) => void | Called when the config is created or updated |
callbacks.onError | (error: Error) => void | Optional. Called when an error occurs |
callbacks.onClose | () => void | Optional. Called when the WebSocket connection closes |
ConfigWatchEvent fields:
| Field | Type | Description |
|---|---|---|
type | string | Always "config_update" for data events |
name | string | Config document name |
data | Record<string, unknown> | Current config data |
revision | number | KV revision number after this update |
updatedAt | string | ISO-8601 timestamp of the update |
Returns: ConfigWatcher — call watcher.stop() to close the connection.
See the Config Management Guide for detailed usage patterns.
Secrets
Section titled “Secrets”Secrets are encrypted, environment-scoped values (API keys, passwords, tokens) that can only be read at runtime inside functions. They are managed via the CLI or REST API and accessed through the SDK’s read-only interface.
Declaring Secrets
Section titled “Declaring Secrets”Declare which secrets a function needs in the function config:
import { ironflow } from '@ironflow/node';
const myFunction = ironflow.createFunction( { id: 'my-function', triggers: [{ event: 'my.event' }], secrets: ['API_KEY', 'DB_PASSWORD'], }, async ({ event, step, secrets }) => { // Access resolved secret values const apiKey = secrets.get('API_KEY'); // Use apiKey... });The engine resolves only the declared secrets at execution time. Undeclared secrets are not available.
SecretsClient
Section titled “SecretsClient”The secrets parameter in the function handler implements the SecretsClient interface:
| Method | Signature | Description |
|---|---|---|
get | get(name: string): string | Get a secret’s value. Throws if not found. |
has | has(name: string): boolean | Check if a secret was resolved. |
async ({ event, step, secrets }) => { // Check if an optional secret exists if (secrets.has('OPTIONAL_KEY')) { const key = secrets.get('OPTIONAL_KEY'); // Use key... }
// Get a required secret (throws if not set) const password = secrets.get('DB_PASSWORD');}See the Secrets Management Guide for detailed usage patterns.
Subscriptions
Section titled “Subscriptions”Real-time event subscriptions over WebSocket. Subscribe to system events (runs, functions, secrets) or user events with pattern matching.
createSubscriptionClient
Section titled “createSubscriptionClient”Create a subscription client instance.
import { createSubscriptionClient } from "@ironflow/node";
const client = createSubscriptionClient({ serverUrl: "http://localhost:9123", // Optional: apiKey: "my-api-key", // API key when auth is enabled environment: "production", // Environment name autoReconnect: true, // Auto-reconnect on disconnect (default: true) reconnectDelay: 1000, // Initial reconnect delay in ms (default: 1000) maxReconnectDelay: 30000, // Max reconnect delay in ms (default: 30000)});connect / close
Section titled “connect / close”await client.connect();// ... use subscriptions ...client.close();subscribe
Section titled “subscribe”Subscribe to events matching a pattern. Returns a Subscription handle.
const sub = await client.subscribe("system.run.>", { onEvent: (event) => { console.log(event.topic, event.data); }, onError: (err) => { console.error(err.code, err.message); }, // Optional: replay: 10, // Replay last N events includeMetadata: true, // Include event metadata filter: "status:failed", // Server-side filter expression});
// Unsubscribe when donesub.unsubscribe();Ackable Subscriptions
Section titled “Ackable Subscriptions”For consumer groups with manual acknowledgment:
const sub = await client.subscribe("order.*", { ackMode: "manual", consumerGroup: "processors", onEvent: async (event) => { try { await processOrder(event.data); await sub.ack(event.eventId); } catch (err) { await sub.nak(event.eventId, 5000); // Redeliver after 5s } },});patterns
Section titled “patterns”Pattern helpers for common subscription topics:
import { patterns } from "@ironflow/node";
patterns.allRuns() // "system.run.>"patterns.run("run_123") // "system.run.run_123.>"patterns.allFunctions() // "system.function.>"patterns.function("my-fn") // "system.function.my-fn.>"patterns.allSecrets() // "system.secret.*"patterns.secret("API_KEY") // "system.secret.API_KEY.*"patterns.secretAction("updated") // "system.secret.*.updated"patterns.allUserEvents() // "events:>"patterns.userEvent("order.placed") // "events:order.placed"Connection State
Section titled “Connection State”client.onConnectionChange((state) => { // state: "disconnected" | "connecting" | "connected" | "reconnecting" console.log("Connection:", state);});
client.onError((err) => { console.error("Global error:", err.code, err.message);});
console.log(client.isConnected); // booleanconsole.log(client.connectionState); // ConnectionStateEntity Streams
Section titled “Entity Streams”streams.append
Section titled “streams.append”Appends an event to an entity stream.
const result = await client.streams.append("order-123", { name: "order.item_added", data: { itemId: "item-789" }, entityType: "order",}, { expectedVersion: 4 });
// result.entityVersion = 5// result.eventId = "evt-..."streams.read
Section titled “streams.read”Reads events from an entity stream.
const { events, totalCount } = await client.streams.read("order-123", { fromVersion: 1, limit: 50, direction: "forward",});streams.getInfo
Section titled “streams.getInfo”Returns stream metadata, or null if no events have been written to this stream yet.
const info = await client.streams.getInfo("order-123");// info.entityId, info.entityType, info.version, info.eventCount// info is null for streams with no events — safe to pass expectedVersion: 0 to append()Returns: Promise<StreamInfo | null>
streams.createSnapshot
Section titled “streams.createSnapshot”Create a snapshot of the materialized state at a specific stream version. Use snapshots to speed up state reconstruction for long-lived entity streams by avoiding a full event replay.
const { snapshotId } = await client.streams.createSnapshot("order-123", { entityType: "order", entityVersion: 42, state: { status: "shipped", items: [...] },});
console.log("Snapshot ID:", snapshotId);Parameters:
| Parameter | Type | Description |
|---|---|---|
entityId | string | Entity ID |
input.entityType | string | Entity type (e.g. "order") |
input.entityVersion | number | Stream version this snapshot represents |
input.state | Record<string, unknown> | Materialized state at this version |
Returns: { snapshotId: string }
streams.getSnapshot
Section titled “streams.getSnapshot”Get the latest snapshot at or before a given version. Returns the closest snapshot without exceeding the requested version, so you can replay only the delta of events from the snapshot forward.
const snapshot = await client.streams.getSnapshot("order-123");// Or get snapshot at or before a specific version:const snapshotBeforeVersion = await client.streams.getSnapshot("order-123", { beforeVersion: 40 });
console.log("Snapshot version:", snapshot.entityVersion);console.log("State:", snapshot.state);console.log("Created at:", snapshot.createdAt);Parameters:
| Parameter | Type | Description |
|---|---|---|
entityId | string | Entity ID |
options.beforeVersion | number | Return snapshot at or before this version (default: latest) |
Returns: StreamSnapshot
| Field | Type | Description |
|---|---|---|
snapshotId | string | Snapshot ID |
entityId | string | Entity ID |
entityType | string | Entity type |
entityVersion | number | Stream version this snapshot represents |
state | Record<string, unknown> | Materialized state |
createdAt | string | ISO-8601 timestamp |
streams.listStreams
Section titled “streams.listStreams”List all entity streams registered in the system.
const streams = await client.streams.listStreams();for (const s of streams) { console.log(`${s.entityType}/${s.entityId}: v${s.version}, ${s.eventCount} events`);}Returns: StreamListEntry[]
| Field | Type | Description |
|---|---|---|
entityId | string | Entity ID |
entityType | string | Entity type |
version | number | Current stream version |
eventCount | number | Total number of events |
lastEventAt | string | ISO-8601 timestamp of the last event |
streams.getEntityHistory
Section titled “streams.getEntityHistory”Get the full event history for an entity as a flat array. Unlike streams.read(), this returns a simplified view without pagination options.
const history = await client.streams.getEntityHistory("order-123");for (const entry of history) { console.log(`v${entry.version} [${entry.timestamp}] ${entry.eventName}`, entry.data);}Returns: EntityHistoryEntry[]
| Field | Type | Description |
|---|---|---|
eventName | string | Event name (e.g. "order.item_added") |
data | unknown | Event payload |
version | number | Stream version at which this event was appended |
timestamp | string | ISO-8601 timestamp |
Event Versioning
Section titled “Event Versioning”Automatic Upcasting
Section titled “Automatic Upcasting”Pass an event definition registry to serve() or createWorker() to automatically upcast old events before they reach your handler:
import { defineEvent, createEventDefinitionRegistry } from '@ironflow/core';import { serve } from '@ironflow/node';
const registry = createEventDefinitionRegistry();registry.register(defineEvent({ name: 'order.created', version: 1 }));registry.register(defineEvent({ name: 'order.created', version: 2, upcast: (data) => ({ ...data, currency: 'USD' }),}));
const handler = serve({ functions: [myFunction], eventDefinitions: registry,});Handlers automatically receive upcasted event data. See @ironflow/core docs for defineEvent() and createUpcasterRegistry() details.
Admin & Tenant Operations
Section titled “Admin & Tenant Operations”The Node client exposes the same admin namespaces as the browser client. See the browser package reference for full method signatures and examples — the shapes are identical.
| Namespace | Methods | Purpose |
|---|---|---|
client.apiKeys | create, list, get, delete, rotate | Environment API key management |
client.orgs | create, list, get, update, delete | Organization CRUD (enterprise) |
client.roles | create, list, get, update, delete, assignPolicy, removePolicy | RBAC roles (enterprise) |
client.policies | create, list, get, update, delete | Authorization policies (enterprise) |
client.users | create, list, get, update, delete | Dashboard users (enterprise) |
client.tenants | list | Tenant listing (enterprise) |
client.projects | list, create, update, delete | Project resource hierarchy |
client.environments | list, create, update, delete | Environment CRUD |
client.secrets | get, set, update, list, delete | Programmatic secrets CRUD |
client.schemas | register, list, get, getVersion, delete, testUpcast | Event schema registry |
client.webhooks | create, listSources, deleteSource, listDeliveries | Webhook source management |
client.projections | get, list, getStatus, rebuild | Projection control plane |
client.sqlProjections | create, query | SQL-backed projection helpers |
const client = createClient();
// Create an environment API keyconst key = await client.apiKeys.create({ name: 'ci-runner' });
// Register a new event schema versionconst schema = await client.schemas.register({ name: 'order.placed', version: 2, schema: { type: 'object', properties: { /* ... */ } },});Server Introspection
Section titled “Server Introspection”listFunctions
Section titled “listFunctions”List registered functions:
const functions = await client.listFunctions();for (const fn of functions) { console.log(fn);}listWorkers
Section titled “listWorkers”List connected workers:
const workers = await client.listWorkers();for (const w of workers) { console.log(w);}health
Section titled “health”Check server health:
const status = await client.health();console.log('Server status:', status); // "ok"Error Handling
Section titled “Error Handling”Global Error Handler
Section titled “Global Error Handler”Use the onError option on createClient() to observe all client errors in one place. See Global Error Handler (onError) in the Client section above.
Built-in Errors
Section titled “Built-in Errors”import { IronflowError, StepError, TimeoutError, NonRetryableError, isRetryable,} from '@ironflow/node';
try { await step.run('risky-operation', async () => { // ... });} catch (error) { if (error instanceof StepError) { console.log('Step failed:', error.stepId); }
if (isRetryable(error)) { // Will be automatically retried }}Non-Retryable Errors
Section titled “Non-Retryable Errors”Mark errors as non-retryable to prevent automatic retries:
import { NonRetryableError } from '@ironflow/node';
await step.run('validate', async () => { if (!isValid(data)) { throw new NonRetryableError('Invalid data - do not retry'); } return data;});Utilities
Section titled “Utilities”Duration Parsing
Section titled “Duration Parsing”import { parseDuration } from '@ironflow/node';
parseDuration('30s'); // 30000 (ms)parseDuration('5m'); // 300000parseDuration('2h'); // 7200000parseDuration('1d'); // 86400000ID Generation
Section titled “ID Generation”import { generateId, createRunId, createStepId } from '@ironflow/node';
const id = generateId('prefix'); // 'prefix_abc123...'const runId = createRunId('run_abc123'); // Branded RunId typeconst stepId = createStepId('step_def'); // Branded StepId typeLogging
Section titled “Logging”import { createLogger, createNoopLogger } from '@ironflow/node';
const logger = createLogger({ prefix: '[myapp]', level: 'debug', // 'debug' | 'info' | 'warn' | 'error'});
logger.info('Starting...');logger.debug('Debug data', { foo: 'bar' });
// Disable loggingconst noopLogger = createNoopLogger();Constants
Section titled “Constants”import { DEFAULT_SERVER_URL, // 'http://localhost:9123' DEFAULT_PORT, // 9123 DEFAULT_HOST, // 'localhost' DEFAULT_TIMEOUTS, // { CLIENT, FUNCTION, TRIGGER_SYNC } DEFAULT_RETRY, // { MAX_ATTEMPTS, INITIAL_DELAY_MS, BACKOFF_FACTOR, MAX_DELAY_MS } DEFAULT_WORKER, // { MAX_CONCURRENT_JOBS, HEARTBEAT_INTERVAL_MS, RECONNECT_DELAY_MS } ENV_VARS, // Environment variable names} from '@ironflow/node';Environment Variables
Section titled “Environment Variables”| Variable | Description | Default |
|---|---|---|
IRONFLOW_SERVER_URL | Server URL | http://localhost:9123 |
IRONFLOW_SIGNING_KEY | Request signing key | - |
IRONFLOW_API_KEY | API key for authentication | - |
IRONFLOW_LOG_LEVEL | Log level | info |
Requirements
Section titled “Requirements”- Node.js 20+
Complete Example
Section titled “Complete Example”import { ironflow, serve, createClient } from '@ironflow/node';
// Define a functionconst processOrder = ironflow.createFunction( { id: 'process-order', name: 'Process Order', triggers: [{ event: 'order.placed' }], }, async ({ event, step }) => { const order = event.data as { orderId: string; amount: number };
// Step 1: Validate await step.run('validate', async () => { if (order.amount <= 0) { throw new Error('Invalid amount'); } return { valid: true }; });
// Step 2: Sleep await step.sleep('delay', '5s');
// Step 3: Process const result = await step.run('process', async () => { return { orderId: order.orderId, status: 'processed', }; });
return result; });
// Register function and emit events from server-side codeconst client = createClient();await client.registerFunction({ id: 'process-order', triggers: [{ event: 'order.placed' }], endpointUrl: 'http://localhost:3001/api/ironflow', preferredMode: 'push',});
// Serve as push mode handlerexport const POST = serve({ functions: [processOrder],});See Also
Section titled “See Also”- JavaScript SDK Overview
- Browser Package - Browser client
- Core Package - Shared types and utilities
- Step Primitives Guide - Detailed step documentation
- Execution Modes Guide - Push vs pull mode
- Projections Guide - Managed and external projections
- Event Sourcing Guide - Entity streams and versioning
- KV Store Guide - Key-value storage
- Config Management Guide - Environment-scoped configuration
- Secrets Management Guide - Encrypted secrets