Skip to content

@ironflow/node

Node.js SDK for Ironflow. Provides workers, serve handlers, step execution, and a client for server interactions.

The package is public on npm and installs without authentication. For building from source, see the Local Development guide.

Terminal window
npm install @ironflow/node
import { ironflow } from '@ironflow/node';
const processOrder = ironflow.createFunction(
{
id: 'process-order',
triggers: [{ event: 'order.placed' }],
},
async ({ event, step }) => {
// Run a step (automatically memoized)
const validated = await step.run('validate', async () => {
return validateOrder(event.data);
});
// Sleep (durable - survives restarts)
await step.sleep('wait-for-inventory', '5m');
// Wait for external event
const approval = await step.waitForEvent('wait-approval', {
event: 'order.approved',
match: 'data.orderId', // JSON path for correlating events
timeout: '24h',
});
// Process payment
const result = await step.run('process-payment', async () => {
return processPayment(validated, approval);
});
return { success: true, receiptId: result.id };
}
);

For serverless environments (Next.js, Vercel, AWS Lambda):

// app/api/ironflow/route.ts (Next.js App Router)
import { serve } from '@ironflow/node';
export const POST = serve({
functions: [processOrder],
signingKey: process.env.IRONFLOW_SIGNING_KEY,
});

For long-running workers with no timeout limits:

import { createWorker } from '@ironflow/node';
const worker = createWorker({
serverUrl: 'http://localhost:9123',
functions: [processOrder],
maxConcurrentJobs: 10,
});
// Start the worker (blocks until stopped)
await worker.start();
// Graceful shutdown
process.on('SIGTERM', async () => {
await worker.drain();
});

Create an Ironflow function.

const myFunction = ironflow.createFunction(
{
// Required: unique function ID
id: 'my-function',
// Optional: display name
name: 'My Function',
// Optional: event triggers
triggers: [
{ event: 'user.created' },
{ event: 'order.*' },
{ cron: '0 0 * * *' }, // Daily at midnight
],
// Optional: retry configuration
retry: {
maxAttempts: 3,
initialDelayMs: 1000,
backoffFactor: 2,
maxDelayMs: 60000,
},
// Optional: execution timeout in milliseconds (default: 600000)
timeout: 1800000,
// Optional: concurrency control
concurrency: { limit: 10, key: 'event.data.customerId' },
// Optional: pause behavior for history editing (scoped injection)
// 'hold' (default) retains concurrency lane slot while paused
// 'release' frees the slot for other runs
pauseBehavior: 'hold',
// Optional: declarative cancel-on-event
// Auto-cancels in-flight runs when a matching event arrives.
// OR semantics across specs. Tenant-isolated by env_id.
// See how-to: ../../how-to-guides/workflows/cancel-on-event.md
cancelOn: [
{ event: 'order.cancelled', match: 'orderId' },
],
},
async ({ event, step, run, logger }) => {
// Function implementation
return { result: 'success' };
}
);

The handler receives a context object:

interface FunctionContext {
// Triggering event
event: {
id: string;
name: string;
version: number;
data: unknown;
timestamp: Date;
idempotencyKey?: string;
source?: string;
metadata?: Record<string, unknown>;
};
// Step client for recorded execution
step: StepClient;
// Run information
run: {
id: string;
functionId: string;
attempt: number;
startedAt: Date;
};
// Logger instance
logger: Logger;
// Resolved environment secrets (read-only)
secrets: SecretsClient;
}

Execute a step. Results are memoized—if the function restarts, completed steps are skipped.

const result = await step.run('step-id', async () => {
// This only runs once, even if function retries
return await someApiCall();
});

Sleep for a duration. Durable—survives function restarts.

// Sleep for 5 minutes
await step.sleep('wait', '5m');
// Supported formats: '30s', '5m', '2h', '1d'

Sleep until a specific time.

await step.sleepUntil('wait-until', new Date('2024-12-25T00:00:00Z'));

Wait for an external event.

const approval = await step.waitForEvent('wait-approval', {
// Event name pattern
event: 'order.approved',
// Optional: JSON path for correlating events (e.g. "data.orderId")
match: 'data.orderId',
// Optional: timeout
timeout: '24h',
});

Execute multiple branches in parallel.

const [user, orders, payments] = await step.parallel(
'fetch-all',
[
(step) => step.run('fetch-user', () => fetchUser(userId)),
(step) => step.run('fetch-orders', () => fetchOrders(userId)),
(step) => step.run('fetch-payments', () => fetchPayments(userId)),
],
{ concurrency: 2, onError: 'failFast' } // Optional
);

Map over items with automatic parallelization.

const results = await step.map(
'process-items',
items,
async (item, step, index) => {
return await step.run(`process-${index}`, () => processItem(item));
},
{ concurrency: 5 } // Optional: limit parallel execution
);

Register a compensation handler for saga rollbacks. If the function fails after this step, the compensation runs automatically.

await step.run('reserve-inventory', async () => {
await reserveInventory(orderId);
});
step.compensate('reserve-inventory', async () => {
await releaseInventory(orderId);
});

Synchronously invoke another function and wait for its result.

const result = await step.invoke<PaymentResult>('process-payment', {
orderId,
amount,
}, { timeout: '30s' });

Invoke another function asynchronously. Returns the run ID without waiting for completion.

const { runId } = await step.invokeAsync('send-notifications', {
userId,
message,
});

Publish a message to a topic.

const result = await step.publish('order.events', {
type: 'order.shipped',
orderId,
});

See Step Primitives Guide for detailed documentation and examples.


async ({ event, step }) => {
// event.data is typed as unknown — cast to your type
const order = event.data as OrderData;
// Access raw event fields
const eventName = event.name; // e.g. "order.placed"
const eventId = event.id;
const eventVersion = event.version; // Schema version (default: 1)
// Access user-defined metadata attached when the event was emitted
const traceId = event.metadata?.traceId as string | undefined;
const source = event.metadata?.source as string | undefined;
return { orderId: order.id };
}

Create a client for server interactions:

import { createClient } from '@ironflow/node';
const client = createClient({
serverUrl: 'http://localhost:9123', // default: IRONFLOW_SERVER_URL or http://localhost:9123
apiKey: 'optional-api-key', // for authenticated servers
timeout: 30000, // request timeout in ms (default: 30000)
onError: (error, context) => { // global error handler (optional)
console.error(`[${context.method}] ${error.message}`, context);
},
});

The createClient() function is the recommended way to create an Ironflow client instance.

The onError callback fires on every client error before the error is re-thrown. It provides centralized error observation for logging, metrics, and alerting without interfering with normal error propagation.

import { createClient, type ErrorContext, type OnErrorHandler } from '@ironflow/node';
const onError: OnErrorHandler = async (error, context) => {
// Log to your observability stack
await metrics.increment('ironflow.client.error', {
method: context.method,
statusCode: String(context.statusCode ?? 'network'),
});
logger.error('Ironflow client error', {
method: context.method, // e.g. "emit", "streams.append", "apiKeys.get"
endpoint: context.endpoint, // e.g. "/ironflow.v1.IronflowService/Trigger"
statusCode: context.statusCode, // HTTP status or undefined for network errors
error: error.message,
});
};
const client = createClient({ onError });

ErrorContext fields:

FieldTypeDescription
methodstringClient method name (e.g. "emit", "streams.append", "apiKeys.get")
endpointstringConnectRPC or REST endpoint path
statusCodenumber | undefinedHTTP status code (undefined for network/timeout errors)

Behavior:

  • The callback is awaited before the error is re-thrown
  • If the callback throws, the callback error is swallowed (logged to stderr) and the original error is still thrown
  • Both sync and async callbacks are supported (() => void | Promise<void>)
  • Propagates to sub-clients: client.kv() and client.config() inherit the handler

Method naming convention: Top-level methods use their name directly (e.g. "emit", "health"). Namespaced methods use dot notation (e.g. "streams.append", "apiKeys.create", "kv.bucket.get", "config.set").


Emit an event to trigger workflows:

const result = await client.emit('order.placed', {
orderId: '123',
amount: 99.99,
});
console.log('Event ID:', result.eventId);
console.log('Run IDs:', result.runIds);

Emit options:

// Emit with a specific event version
const result = await client.emit('order.placed', data, {
version: 2,
});
// Emit with idempotency key (deduplication)
const result = await client.emit('payment.processed', data, {
idempotencyKey: 'payment-abc',
});
// Emit with metadata
const result = await client.emit('order.placed', data, {
metadata: { source: 'api' },
});

Emit an event and wait synchronously for the triggered run to complete. Uses the server-side TriggerSync endpoint, which blocks until the run finishes or the timeout elapses.

Throws RunFailedError if the run fails, RunCancelledError if it is cancelled.

import { RunFailedError, RunCancelledError } from '@ironflow/node';
try {
const result = await client.emitSync('order.placed', {
orderId: '123',
amount: 99.99,
});
console.log('Run ID:', result.runId);
console.log('Function:', result.functionId);
console.log('Status:', result.status);
console.log('Output:', result.output);
console.log('Duration:', result.durationMs, 'ms');
} catch (err) {
if (err instanceof RunFailedError) {
console.error('Run failed:', err.runId, err.message);
} else if (err instanceof RunCancelledError) {
console.error('Run was cancelled:', err.runId);
}
}

Options:

ParameterTypeDescription
eventNamestringEvent name to emit
dataunknownEvent payload
options.timeoutnumberTimeout in ms before the request is aborted (default: 30000)

Returns: EmitSyncResult

FieldTypeDescription
runIdstringRun ID
functionIdstringFunction ID that handled the event
statusstringFinal run status
outputunknownFunction return value
durationMsnumberTotal run duration in milliseconds

Publish a message to a developer pub/sub topic. Unlike emit(), this does not trigger functions — it delivers messages directly to topic subscribers.

const result = await client.publish('notifications', {
userId: '123',
message: 'Your order has shipped!',
});
console.log('Event ID:', result.eventId);
console.log('Sequence:', result.sequence);

Options:

const data = { userId: 'user-123', message: 'Hello!' };
const result = await client.publish('notifications', data, {
idempotencyKey: 'notif-abc-123', // Optional deduplication key
});

Returns: PublishResult

FieldTypeDescription
eventIdstringUnique ID for this published message
sequencenumberJetStream sequence number

List all active developer pub/sub topics.

const topics = await client.listTopics();
for (const t of topics) {
console.log(`${t.name}: ${t.messageCount} messages, ${t.consumerCount} consumers`);
}

Returns: TopicInfo[]

FieldTypeDescription
namestringTopic name
messageCountnumberNumber of messages in the topic
consumerCountnumberNumber of active consumers
firstMessageAtstring | undefinedTimestamp of the first message
lastMessageAtstring | undefinedTimestamp of the most recent message

Get detailed statistics for a specific topic, including consumer lag.

const stats = await client.getTopicStats('notifications');
console.log('Messages:', stats.messageCount);
console.log('Consumers:', stats.consumerCount);
console.log('Lag:', stats.lag);
console.log('Sequence range:', stats.firstSeq, '', stats.lastSeq);

Returns: TopicStats

FieldTypeDescription
namestringTopic name
messageCountnumberNumber of messages
consumerCountnumberNumber of active consumers
lagnumberMessages pending delivery (consumer lag)
firstSeqnumberFirst sequence number
lastSeqnumberLast sequence number

Get run details:

const run = await client.getRun('run_xyz789');
console.log('Status:', run.status);

List runs with filtering:

const result = await client.listRuns({
functionId: 'process-order',
status: 'completed',
limit: 50,
cursor: 'abc123', // pagination
});
for (const run of result.runs) {
console.log(`Run ${run.id}: ${run.status}`);
}
console.log('Next page:', result.nextCursor);

Cancel a running workflow:

const run = await client.cancelRun('run_xyz789', 'no longer needed');

Retry a failed run:

const run = await client.retryRun('run_xyz789');

Resume a paused or failed run:

const run = await client.resumeRun('run_xyz789');
// Pause a running workflow at the next step boundary
const result = await client.pauseRun("run_abc123");
// result.status: "pause_requested" or "paused"
// View completed steps while paused
const state = await client.getPausedState("run_abc123");
for (const step of state.steps) {
console.log(step.name, step.output, step.injected);
}
// Inject modified output for a step
const prev = await client.injectStepOutput(
"run_abc123",
"step_xyz",
{ corrected: true },
"Manual correction"
);
// Resume the workflow
await client.resumeRun("run_abc123");

Hot-patch a step’s output to fix a failed run:

await client.patchStep('step_abc123', { correctedValue: 42 }, 'fixed bad API response');

Register a function with the server:

await client.registerFunction({
id: 'my-function',
name: 'My Function',
triggers: [{ event: 'my.event' }],
endpointUrl: 'http://localhost:3000/api/ironflow',
preferredMode: 'push',
});

Reconstruct run state at a point in time, inspect step outputs, and walk through a run’s event timeline. Useful for debugging failed or unexpected runs after the fact.

Get the reconstructed state of a run at a specific timestamp.

const state = await client.getRunStateAt('run_xyz789', new Date('2024-06-01T12:00:00Z'));
console.log('Run ID:', state.runId);
console.log('Status at timestamp:', state.status);
for (const step of state.steps) {
console.log(` ${step.name} (${step.id}): ${step.status}`, step.output);
}

Parameters:

ParameterTypeDescription
runIdstringRun ID to query
timestampDatePoint in time to reconstruct state at

Returns: TimeTravelRunState

FieldTypeDescription
runIdstringRun ID
statusstringRun status at the given timestamp
stepsArray<{ id, name, status, output }>Step states at the given timestamp
timestampstringISO-8601 timestamp of the query

Get the timeline of audit events for a run, ordered chronologically. Includes step transitions, status changes, and other significant events.

const events = await client.getRunTimeline('run_xyz789');
for (const e of events) {
console.log(`[${e.timestamp}] ${e.eventType}: ${e.summary}`);
}

Returns: TimeTravelTimelineEvent[]

FieldTypeDescription
idstringEvent ID
eventTypestringType of timeline event (e.g. step.completed)
stepIdstringAssociated step ID (empty if not step-related)
stepNamestringAssociated step name
summarystringHuman-readable description
significantbooleanWhether this event is considered a key milestone
timestampDateWhen this event occurred

Get the output of a specific step at a point in time. Useful for comparing step outputs before and after a patch or injection.

const output = await client.getStepOutputAt(
'run_xyz789',
'step_abc123',
new Date('2024-06-01T12:05:00Z')
);
console.log('Step ID:', output.stepId);
console.log('Output at time:', output.output);
console.log('Timestamp:', output.timestamp);

Parameters:

ParameterTypeDescription
runIdstringRun ID
stepIdstringStep ID to query
timestampDatePoint in time to query

Returns: TimeTravelStepOutput

FieldTypeDescription
stepIdstringStep ID
outputunknownStep output at the given timestamp
timestampstringISO-8601 timestamp of the query

Get the audit trail for a run — a list of recorded events capturing what happened at each stage of execution.

const entries = await client.getAuditTrail('run_xyz789');
for (const entry of entries) {
console.log(`[${entry.timestamp}] ${entry.type}`, entry.data);
}

Returns: AuditTrailEntry[]

FieldTypeDescription
idstringEntry ID
typestringAudit event type (e.g. run.started, step.completed)
timestampstringISO-8601 timestamp
dataunknownEvent-specific payload

Create an HTTP handler for push mode.

import { serve } from '@ironflow/node';
const handler = serve({
// Required: functions to serve
functions: [fn1, fn2, fn3],
// Optional: signing key for request verification
signingKey: process.env.IRONFLOW_SIGNING_KEY,
// Optional: event definitions for automatic upcasting
eventDefinitions: registry,
// Optional: custom logger
logger: console,
// Optional: Ironflow server URL for emitting webhook events
serverUrl: 'http://localhost:9123',
// Optional: webhook sources to handle
webhooks: [stripeWebhook],
});

Next.js App Router:

app/api/ironflow/route.ts
import { serve } from '@ironflow/node';
export const POST = serve({ functions: [myFunction] });

Express:

import express from 'express';
import { createHandler } from '@ironflow/node';
const app = express();
// createHandler is an alias for serve()
app.post('/api/ironflow', createHandler({ functions: [myFunction] }));

Create a pull mode worker.

import { createWorker } from '@ironflow/node';
const worker = createWorker({
// Optional: server URL (default: IRONFLOW_SERVER_URL or http://localhost:9123)
serverUrl: 'http://localhost:9123',
// Required: functions to execute
functions: [fn1, fn2],
// Optional: API key for authentication (default: IRONFLOW_API_KEY env var)
apiKey: process.env.IRONFLOW_API_KEY,
// Optional: event definitions for automatic upcasting
eventDefinitions: registry,
// Optional: max concurrent jobs (default: 10)
maxConcurrentJobs: 10,
// Optional: worker labels for routing
labels: { region: 'us-east-1' },
// Optional: heartbeat interval in ms (default: 30000)
heartbeatInterval: 30000,
// Optional: reconnect delay in ms (default: 5000)
reconnectDelay: 5000,
// Optional: custom logger
logger: console,
});
// Start the worker (blocks until stopped)
await worker.start();
// Graceful drain (wait for active jobs to complete)
await worker.drain();
// Force stop
worker.stop();

Create a worker using gRPC bidirectional streaming (lower latency).

Note: Import from the separate entry point to avoid loading protobuf dependencies.

import { createStreamingWorker } from '@ironflow/node/worker-streaming';
const worker = createStreamingWorker({
serverUrl: 'http://localhost:9123',
functions: [myFunction],
});
await worker.start();

Workers can run projection runners alongside functions. The worker automatically starts a ProjectionRunner for each projection, using streaming mode with automatic fallback to polling.

import { createWorker, createProjection } from '@ironflow/node';
const orderStats = createProjection({
name: 'order-stats',
events: ['order.created', 'order.completed'],
initialState: () => ({ total: 0, completed: 0 }),
handler: (state, event) => {
if (event.name === 'order.created') return { ...state, total: state.total + 1 };
if (event.name === 'order.completed') return { ...state, completed: state.completed + 1 };
return state;
},
});
const worker = createWorker({
functions: [processOrder],
projections: [orderStats],
});
await worker.start();
// Projection runners start automatically with streaming-first fallback

Define a projection for use with a worker.

import { createProjection } from '@ironflow/node';
// Managed projection (pure reducer, Ironflow stores state)
const stats = createProjection({
name: 'order-stats',
events: ['order.created', 'order.completed'],
initialState: () => ({ total: 0 }),
handler: (state, event, ctx) => {
return { ...state, total: state.total + 1 };
},
// Optional:
batchSize: 100, // Events per batch (default: 100)
partitionKey: '$.data.customerId', // JSONPath for partitioning
});
// External projection (side effects, you manage storage)
const searchSync = createProjection({
name: 'search-sync',
events: ['product.*'],
handler: async (event, ctx) => {
await searchIndex.upsert(event.data);
},
// No initialState → external mode
});

For advanced use cases, create a standalone projection runner outside of a worker:

import { createProjectionRunner, StreamingUnsupportedError } from '@ironflow/node';
const runner = createProjectionRunner({
projection: myProjection,
baseUrl: 'http://localhost:9123',
headers: { 'x-ironflow-api-key': 'my-key' },
logger: console,
signal: abortController.signal,
});
try {
// Try streaming first (real-time, lower latency)
await runner.startStreaming();
} catch (err) {
if (err instanceof StreamingUnsupportedError) {
// Server doesn't support streaming — fall back to polling
await runner.start();
} else {
throw err;
}
}

Thrown by startStreaming() when the server doesn’t support the StreamProjectionEvents RPC (returns 404 or 501). Catch this to fall back to polling mode.

import { StreamingUnsupportedError } from '@ironflow/node';
interface ProjectionRunnerConfig {
projection: IronflowProjection; // Projection definition
baseUrl: string; // Ironflow server URL
headers: Record<string, string>; // Request headers (auth, etc.)
logger: Logger; // Logger instance
signal?: AbortSignal; // For graceful shutdown
}

See the Projections Guide for full documentation on managed vs external projections, partitioning, and real-time subscriptions.


Get a KV client for bucket management and key operations:

const kv = client.kv();
const info = await kv.createBucket({
name: "sessions",
description: "User session data",
ttlSeconds: 3600,
maxValueSize: 65536,
history: 3,
});
await kv.deleteBucket("sessions");
const buckets = await kv.listBuckets();
for (const b of buckets) {
console.log(`${b.name}: ${b.values} keys, ${b.bytes} bytes`);
}
const info = await kv.getBucketInfo("sessions");
// info.name, info.values, info.bytes, info.history, info.created_at

Get a bucket handle for key operations:

const bucket = kv.bucket("sessions");
const entry = await bucket.get("user:123");
// entry.key, entry.value, entry.revision, entry.created_at, entry.operation

Unconditional write. Returns the new revision:

const { revision } = await bucket.put("user:123", { name: "Alice" });

Write only if the key does not exist. Throws on conflict (HTTP 412):

const { revision } = await bucket.create("user:123", { name: "Alice" });

Write only if the revision matches. Throws on mismatch (HTTP 412):

const { revision } = await bucket.update("user:123", newValue, entry.revision);

Soft-delete (tombstone):

await bucket.delete("user:123");

Permanently remove key and all history:

await bucket.purge("user:123");

List keys with optional wildcard filter:

const keys = await bucket.listKeys("user.*");
// Pass no argument for all keys

See the KV Store Guide for detailed usage patterns.


Get a config client for environment-scoped configuration:

const config = client.config();

Full replacement of a config document:

const { revision } = await config.set("app-settings", {
theme: "dark",
maxRetries: 3,
});
const entry = await config.get("app-settings");
// entry.name, entry.data, entry.revision, entry.updatedAt

Shallow merge — only specified keys are updated:

const { revision } = await config.patch("app-settings", {
maxRetries: 5,
timeout: 30,
});
const configs = await config.list();
for (const entry of configs) {
console.log(`${entry.name}: rev ${entry.revision}`);
}

Delete a config. Idempotent — succeeds silently if the config does not exist.

await config.delete("app-settings");

Watch a config document for real-time changes via WebSocket. Calls onUpdate each time the document is created or updated on the server. Returns a ConfigWatcher handle with a stop() method to close the connection.

Requires Node.js 20+ (uses the built-in global WebSocket).

const watcher = config.watch("app-settings", {
onUpdate: (event) => {
console.log("Config updated:", event.name);
console.log("New data:", event.data);
console.log("Revision:", event.revision);
console.log("Updated at:", event.updatedAt);
},
onError: (err) => {
console.error("Watch error:", err.message);
},
onClose: () => {
console.log("Watch connection closed");
},
});
// Stop watching when done
watcher.stop();

Parameters:

ParameterTypeDescription
namestringConfig document name to watch
callbacks.onUpdate(event: ConfigWatchEvent) => voidCalled when the config is created or updated
callbacks.onError(error: Error) => voidOptional. Called when an error occurs
callbacks.onClose() => voidOptional. Called when the WebSocket connection closes

ConfigWatchEvent fields:

FieldTypeDescription
typestringAlways "config_update" for data events
namestringConfig document name
dataRecord<string, unknown>Current config data
revisionnumberKV revision number after this update
updatedAtstringISO-8601 timestamp of the update

Returns: ConfigWatcher — call watcher.stop() to close the connection.

See the Config Management Guide for detailed usage patterns.


Secrets are encrypted, environment-scoped values (API keys, passwords, tokens) that can only be read at runtime inside functions. They are managed via the CLI or REST API and accessed through the SDK’s read-only interface.

Declare which secrets a function needs in the function config:

import { ironflow } from '@ironflow/node';
const myFunction = ironflow.createFunction(
{
id: 'my-function',
triggers: [{ event: 'my.event' }],
secrets: ['API_KEY', 'DB_PASSWORD'],
},
async ({ event, step, secrets }) => {
// Access resolved secret values
const apiKey = secrets.get('API_KEY');
// Use apiKey...
}
);

The engine resolves only the declared secrets at execution time. Undeclared secrets are not available.

The secrets parameter in the function handler implements the SecretsClient interface:

MethodSignatureDescription
getget(name: string): stringGet a secret’s value. Throws if not found.
hashas(name: string): booleanCheck if a secret was resolved.
async ({ event, step, secrets }) => {
// Check if an optional secret exists
if (secrets.has('OPTIONAL_KEY')) {
const key = secrets.get('OPTIONAL_KEY');
// Use key...
}
// Get a required secret (throws if not set)
const password = secrets.get('DB_PASSWORD');
}

See the Secrets Management Guide for detailed usage patterns.


Real-time event subscriptions over WebSocket. Subscribe to system events (runs, functions, secrets) or user events with pattern matching.

Create a subscription client instance.

import { createSubscriptionClient } from "@ironflow/node";
const client = createSubscriptionClient({
serverUrl: "http://localhost:9123",
// Optional:
apiKey: "my-api-key", // API key when auth is enabled
environment: "production", // Environment name
autoReconnect: true, // Auto-reconnect on disconnect (default: true)
reconnectDelay: 1000, // Initial reconnect delay in ms (default: 1000)
maxReconnectDelay: 30000, // Max reconnect delay in ms (default: 30000)
});
await client.connect();
// ... use subscriptions ...
client.close();

Subscribe to events matching a pattern. Returns a Subscription handle.

const sub = await client.subscribe("system.run.>", {
onEvent: (event) => {
console.log(event.topic, event.data);
},
onError: (err) => {
console.error(err.code, err.message);
},
// Optional:
replay: 10, // Replay last N events
includeMetadata: true, // Include event metadata
filter: "status:failed", // Server-side filter expression
});
// Unsubscribe when done
sub.unsubscribe();

For consumer groups with manual acknowledgment:

const sub = await client.subscribe("order.*", {
ackMode: "manual",
consumerGroup: "processors",
onEvent: async (event) => {
try {
await processOrder(event.data);
await sub.ack(event.eventId);
} catch (err) {
await sub.nak(event.eventId, 5000); // Redeliver after 5s
}
},
});

Pattern helpers for common subscription topics:

import { patterns } from "@ironflow/node";
patterns.allRuns() // "system.run.>"
patterns.run("run_123") // "system.run.run_123.>"
patterns.allFunctions() // "system.function.>"
patterns.function("my-fn") // "system.function.my-fn.>"
patterns.allSecrets() // "system.secret.*"
patterns.secret("API_KEY") // "system.secret.API_KEY.*"
patterns.secretAction("updated") // "system.secret.*.updated"
patterns.allUserEvents() // "events:>"
patterns.userEvent("order.placed") // "events:order.placed"
client.onConnectionChange((state) => {
// state: "disconnected" | "connecting" | "connected" | "reconnecting"
console.log("Connection:", state);
});
client.onError((err) => {
console.error("Global error:", err.code, err.message);
});
console.log(client.isConnected); // boolean
console.log(client.connectionState); // ConnectionState

Appends an event to an entity stream.

const result = await client.streams.append("order-123", {
name: "order.item_added",
data: { itemId: "item-789" },
entityType: "order",
}, { expectedVersion: 4 });
// result.entityVersion = 5
// result.eventId = "evt-..."

Reads events from an entity stream.

const { events, totalCount } = await client.streams.read("order-123", {
fromVersion: 1,
limit: 50,
direction: "forward",
});

Returns stream metadata, or null if no events have been written to this stream yet.

const info = await client.streams.getInfo("order-123");
// info.entityId, info.entityType, info.version, info.eventCount
// info is null for streams with no events — safe to pass expectedVersion: 0 to append()

Returns: Promise<StreamInfo | null>

Create a snapshot of the materialized state at a specific stream version. Use snapshots to speed up state reconstruction for long-lived entity streams by avoiding a full event replay.

const { snapshotId } = await client.streams.createSnapshot("order-123", {
entityType: "order",
entityVersion: 42,
state: { status: "shipped", items: [...] },
});
console.log("Snapshot ID:", snapshotId);

Parameters:

ParameterTypeDescription
entityIdstringEntity ID
input.entityTypestringEntity type (e.g. "order")
input.entityVersionnumberStream version this snapshot represents
input.stateRecord<string, unknown>Materialized state at this version

Returns: { snapshotId: string }

Get the latest snapshot at or before a given version. Returns the closest snapshot without exceeding the requested version, so you can replay only the delta of events from the snapshot forward.

const snapshot = await client.streams.getSnapshot("order-123");
// Or get snapshot at or before a specific version:
const snapshotBeforeVersion = await client.streams.getSnapshot("order-123", { beforeVersion: 40 });
console.log("Snapshot version:", snapshot.entityVersion);
console.log("State:", snapshot.state);
console.log("Created at:", snapshot.createdAt);

Parameters:

ParameterTypeDescription
entityIdstringEntity ID
options.beforeVersionnumberReturn snapshot at or before this version (default: latest)

Returns: StreamSnapshot

FieldTypeDescription
snapshotIdstringSnapshot ID
entityIdstringEntity ID
entityTypestringEntity type
entityVersionnumberStream version this snapshot represents
stateRecord<string, unknown>Materialized state
createdAtstringISO-8601 timestamp

List all entity streams registered in the system.

const streams = await client.streams.listStreams();
for (const s of streams) {
console.log(`${s.entityType}/${s.entityId}: v${s.version}, ${s.eventCount} events`);
}

Returns: StreamListEntry[]

FieldTypeDescription
entityIdstringEntity ID
entityTypestringEntity type
versionnumberCurrent stream version
eventCountnumberTotal number of events
lastEventAtstringISO-8601 timestamp of the last event

Get the full event history for an entity as a flat array. Unlike streams.read(), this returns a simplified view without pagination options.

const history = await client.streams.getEntityHistory("order-123");
for (const entry of history) {
console.log(`v${entry.version} [${entry.timestamp}] ${entry.eventName}`, entry.data);
}

Returns: EntityHistoryEntry[]

FieldTypeDescription
eventNamestringEvent name (e.g. "order.item_added")
dataunknownEvent payload
versionnumberStream version at which this event was appended
timestampstringISO-8601 timestamp

Pass an event definition registry to serve() or createWorker() to automatically upcast old events before they reach your handler:

import { defineEvent, createEventDefinitionRegistry } from '@ironflow/core';
import { serve } from '@ironflow/node';
const registry = createEventDefinitionRegistry();
registry.register(defineEvent({ name: 'order.created', version: 1 }));
registry.register(defineEvent({
name: 'order.created',
version: 2,
upcast: (data) => ({ ...data, currency: 'USD' }),
}));
const handler = serve({
functions: [myFunction],
eventDefinitions: registry,
});

Handlers automatically receive upcasted event data. See @ironflow/core docs for defineEvent() and createUpcasterRegistry() details.


The Node client exposes the same admin namespaces as the browser client. See the browser package reference for full method signatures and examples — the shapes are identical.

NamespaceMethodsPurpose
client.apiKeyscreate, list, get, delete, rotateEnvironment API key management
client.orgscreate, list, get, update, deleteOrganization CRUD (enterprise)
client.rolescreate, list, get, update, delete, assignPolicy, removePolicyRBAC roles (enterprise)
client.policiescreate, list, get, update, deleteAuthorization policies (enterprise)
client.userscreate, list, get, update, deleteDashboard users (enterprise)
client.tenantslistTenant listing (enterprise)
client.projectslist, create, update, deleteProject resource hierarchy
client.environmentslist, create, update, deleteEnvironment CRUD
client.secretsget, set, update, list, deleteProgrammatic secrets CRUD
client.schemasregister, list, get, getVersion, delete, testUpcastEvent schema registry
client.webhookscreate, listSources, deleteSource, listDeliveriesWebhook source management
client.projectionsget, list, getStatus, rebuildProjection control plane
client.sqlProjectionscreate, querySQL-backed projection helpers
const client = createClient();
// Create an environment API key
const key = await client.apiKeys.create({ name: 'ci-runner' });
// Register a new event schema version
const schema = await client.schemas.register({
name: 'order.placed',
version: 2,
schema: { type: 'object', properties: { /* ... */ } },
});

List registered functions:

const functions = await client.listFunctions();
for (const fn of functions) {
console.log(fn);
}

List connected workers:

const workers = await client.listWorkers();
for (const w of workers) {
console.log(w);
}

Check server health:

const status = await client.health();
console.log('Server status:', status); // "ok"

Use the onError option on createClient() to observe all client errors in one place. See Global Error Handler (onError) in the Client section above.

import {
IronflowError,
StepError,
TimeoutError,
NonRetryableError,
isRetryable,
} from '@ironflow/node';
try {
await step.run('risky-operation', async () => {
// ...
});
} catch (error) {
if (error instanceof StepError) {
console.log('Step failed:', error.stepId);
}
if (isRetryable(error)) {
// Will be automatically retried
}
}

Mark errors as non-retryable to prevent automatic retries:

import { NonRetryableError } from '@ironflow/node';
await step.run('validate', async () => {
if (!isValid(data)) {
throw new NonRetryableError('Invalid data - do not retry');
}
return data;
});

import { parseDuration } from '@ironflow/node';
parseDuration('30s'); // 30000 (ms)
parseDuration('5m'); // 300000
parseDuration('2h'); // 7200000
parseDuration('1d'); // 86400000
import { generateId, createRunId, createStepId } from '@ironflow/node';
const id = generateId('prefix'); // 'prefix_abc123...'
const runId = createRunId('run_abc123'); // Branded RunId type
const stepId = createStepId('step_def'); // Branded StepId type
import { createLogger, createNoopLogger } from '@ironflow/node';
const logger = createLogger({
prefix: '[myapp]',
level: 'debug', // 'debug' | 'info' | 'warn' | 'error'
});
logger.info('Starting...');
logger.debug('Debug data', { foo: 'bar' });
// Disable logging
const noopLogger = createNoopLogger();

import {
DEFAULT_SERVER_URL, // 'http://localhost:9123'
DEFAULT_PORT, // 9123
DEFAULT_HOST, // 'localhost'
DEFAULT_TIMEOUTS, // { CLIENT, FUNCTION, TRIGGER_SYNC }
DEFAULT_RETRY, // { MAX_ATTEMPTS, INITIAL_DELAY_MS, BACKOFF_FACTOR, MAX_DELAY_MS }
DEFAULT_WORKER, // { MAX_CONCURRENT_JOBS, HEARTBEAT_INTERVAL_MS, RECONNECT_DELAY_MS }
ENV_VARS, // Environment variable names
} from '@ironflow/node';

VariableDescriptionDefault
IRONFLOW_SERVER_URLServer URLhttp://localhost:9123
IRONFLOW_SIGNING_KEYRequest signing key-
IRONFLOW_API_KEYAPI key for authentication-
IRONFLOW_LOG_LEVELLog levelinfo

  • Node.js 20+

import { ironflow, serve, createClient } from '@ironflow/node';
// Define a function
const processOrder = ironflow.createFunction(
{
id: 'process-order',
name: 'Process Order',
triggers: [{ event: 'order.placed' }],
},
async ({ event, step }) => {
const order = event.data as { orderId: string; amount: number };
// Step 1: Validate
await step.run('validate', async () => {
if (order.amount <= 0) {
throw new Error('Invalid amount');
}
return { valid: true };
});
// Step 2: Sleep
await step.sleep('delay', '5s');
// Step 3: Process
const result = await step.run('process', async () => {
return {
orderId: order.orderId,
status: 'processed',
};
});
return result;
}
);
// Register function and emit events from server-side code
const client = createClient();
await client.registerFunction({
id: 'process-order',
triggers: [{ event: 'order.placed' }],
endpointUrl: 'http://localhost:3001/api/ironflow',
preferredMode: 'push',
});
// Serve as push mode handler
export const POST = serve({
functions: [processOrder],
});