Skip to content

Go SDK

The official Go SDK for Ironflow. Provides recorded execution (durable functions), event emission, real-time subscriptions, KV store, config management, entity streams, derived views (projections), and a full management API.

Terminal window
go get github.com/sahina/ironflow/sdk/go/ironflow

Requires Go 1.25+.

import "github.com/sahina/ironflow/sdk/go/ironflow"
var ProcessOrder = ironflow.CreateFunction(ironflow.FunctionConfig{
ID: "process-order",
Triggers: []ironflow.Trigger{{Event: "order.placed"}},
}, func(ctx ironflow.Context) (any, error) {
var order OrderData
if err := ctx.Event.Data(&order); err != nil {
return nil, err
}
// Run a step (automatically memoized)
validated, err := ironflow.Run(ctx, "validate", func() (any, error) {
return validateOrder(order)
})
if err != nil {
return nil, err
}
// Sleep (durable - survives restarts)
if err := ironflow.Sleep(ctx, "wait", 5*time.Minute); err != nil {
return nil, err
}
// Wait for external event
approval, err := ironflow.WaitForEvent[ApprovalEvent](ctx, "wait-approval", ironflow.EventFilter{
Event: "order.approved",
Match: "data.orderId",
Timeout: 24 * time.Hour,
})
if err != nil {
return nil, err
}
result, err := ironflow.Run(ctx, "process-payment", func() (any, error) {
return processPayment(validated, approval)
})
return result, err
})

For serverless deployments (Vercel, Lambda):

handler := ironflow.Serve(ironflow.ServeConfig{
Functions: []ironflow.Function{ProcessOrder},
SigningKey: os.Getenv("IRONFLOW_SIGNING_KEY"),
})
http.Handle("/api/ironflow", handler)
http.ListenAndServe(":3000", nil)

For long-running workers with no timeout limits:

worker := ironflow.NewWorker(ironflow.WorkerConfig{
Functions: []ironflow.Function{ProcessOrder},
MaxConcurrentJobs: 10,
})
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGTERM)
defer cancel()
if err := worker.Run(ctx); err != nil {
log.Fatal(err)
}

var MyFunction = ironflow.CreateFunction(ironflow.FunctionConfig{
ID: "my-function",
Name: "My Function",
// Event triggers
Triggers: []ironflow.Trigger{
{Event: "user.created"},
{Event: "order.*"},
{Event: "order.placed", Expression: `data.total > 100`}, // CEL filter
},
// Retry configuration
Retry: &ironflow.RetryConfig{
MaxAttempts: 5,
InitialDelay: 1 * time.Second,
BackoffFactor: 2.0,
MaxDelay: 5 * time.Minute,
},
// Execution timeout
Timeout: 30 * time.Minute,
StepTimeout: 60 * time.Second, // Default timeout for all steps
// Concurrency limit
Concurrency: &ironflow.ConcurrencyConfig{
Limit: 10,
Key: "event.data.customerId", // Group by customer
},
// Execution mode
Mode: ironflow.PullMode, // or ironflow.PushMode (default)
// Secrets required at runtime
Secrets: []string{"STRIPE_KEY", "DB_PASSWORD"},
// Declarative cancel-on-event.
// Auto-cancels in-flight runs when a matching event arrives.
// OR semantics across specs. Tenant-isolated by env_id.
// See how-to: ../../how-to-guides/workflows/cancel-on-event.md
CancelOn: []ironflow.CancelOnConfig{
{Event: "order.cancelled", Match: "orderId"},
},
}, func(ctx ironflow.Context) (any, error) {
return map[string]any{"status": "done"}, nil
})

The handler receives a Context:

type Context struct {
Event Event // Triggering event
Run RunInfo // Run info (ID, FunctionID, Attempt, StartedAt)
Secrets SecretsReader // Read-only access to resolved secrets
}
func(ctx ironflow.Context) (any, error) {
// Unmarshal event data into a struct
var order OrderData
if err := ctx.Event.Data(&order); err != nil {
return nil, err
}
// Access raw event fields
eventName := ctx.Event.Name // e.g. "order.placed"
eventID := ctx.Event.ID
eventVersion := ctx.Event.Version // Schema version (default: 1)
source := ctx.Event.Source // "api", "cron", "webhook"
// Access user-defined metadata attached when the event was emitted
traceID, _ := ctx.Event.Metadata["traceId"].(string)
eventSource, _ := ctx.Event.Metadata["source"].(string)
return nil, nil
}

Type-safe generic API with auto-generated function IDs:

type OrderEvent struct {
OrderID string `json:"orderId"`
Total float64 `json:"total"`
}
// Minimal config — ID auto-generated as "order-placed-handler"
var OrderHandler = ironflow.CreateHandler(ironflow.HandlerConfig[OrderEvent]{
Event: "order.placed",
Handler: func(event OrderEvent, ctx *ironflow.HandlerContext) (any, error) {
ctx.Logger.Info("Processing order", "id", event.OrderID)
return map[string]any{"processed": true}, nil
},
})
// With all options
var HighValueHandler = ironflow.CreateHandler(ironflow.HandlerConfig[OrderEvent]{
Event: "order.placed",
Handler: func(event OrderEvent, ctx *ironflow.HandlerContext) (any, error) {
result, err := ctx.Step.Run("process", func() (any, error) {
return processOrder(event)
})
return result, err
},
Options: &ironflow.HandlerOptions{
ID: "high-value-order-handler",
Filter: `data.total > 1000`,
Retry: &ironflow.RetryConfig{MaxAttempts: 5},
Concurrency: &ironflow.ConcurrencyConfig{
Limit: 10,
Key: "event.data.customerId",
},
},
})

The HandlerContext provides:

ctx.Event // Typed event data
ctx.EventMeta // Event metadata (ID, Name, Version, Timestamp, Source)
ctx.Run // Run info (ID, FunctionID, Attempt)
ctx.Secrets // SecretsReader
ctx.Step // StepClient (Run, Sleep, SleepUntil, WaitForEvent, Parallel, Map, Compensate)
ctx.Logger // Logger

Execute a memoized step. Results are cached — if the function restarts, completed steps are skipped.

result, err := ironflow.Run(ctx, "step-id", func() (any, error) {
return someApiCall()
})
// With timeout override
result, err := ironflow.Run(ctx, "step-id", func() (any, error) {
return someApiCall()
}, ironflow.WithTimeout(10*time.Second))

Durable pause. Survives function restarts and server upgrades.

if err := ironflow.Sleep(ctx, "wait", 5*time.Minute); err != nil {
return nil, err
}

Sleep until a specific time.

if err := ironflow.SleepUntil(ctx, "wait-until", time.Date(2025, 12, 25, 0, 0, 0, 0, time.UTC)); err != nil {
return nil, err
}

Wait for an external event matching a filter.

event, err := ironflow.WaitForEvent[ApprovalEvent](ctx, "wait-approval", ironflow.EventFilter{
Event: "order.approved",
Match: "data.orderId", // JSON path for matching
Timeout: 24 * time.Hour, // Default: 7 days
})

Execute multiple branches concurrently.

results, err := ironflow.Parallel(ctx, "fetch-all",
[]func(*ironflow.BranchContext) (any, error){
func(b *ironflow.BranchContext) (any, error) {
return ironflow.RunWithBranch(b, "fetch-user", func() (any, error) {
return fetchUser(userID)
})
},
func(b *ironflow.BranchContext) (any, error) {
return ironflow.RunWithBranch(b, "fetch-orders", func() (any, error) {
return fetchOrders(userID)
})
},
},
ironflow.ParallelOptions{Concurrency: 2, OnError: "failFast"},
)

Map over items with automatic parallelization.

results, err := ironflow.Map(ctx, "process-items", items,
func(item Item, b *ironflow.BranchContext, index int) (Result, error) {
return ironflow.RunWithBranch(b, fmt.Sprintf("process-%d", index), func() (Result, error) {
return processItem(item)
})
},
ironflow.ParallelOptions{Concurrency: 5},
)

Call another Ironflow function and wait for the result.

result, err := ironflow.Invoke[PaymentResult](ctx, "process-payment", map[string]any{
"orderId": "123",
"amount": 99.99,
})
// With custom timeout (default: 30s)
result, err := ironflow.Invoke[PaymentResult](ctx, "process-payment", input,
ironflow.WithInvokeTimeout(60*time.Second),
)

Call another function without waiting for the result.

asyncResult, err := ironflow.InvokeAsync(ctx, "send-email", map[string]any{
"to": "user@example.com",
"subject": "Order Confirmed",
})
fmt.Println("Child run:", asyncResult.RunID)

Register compensation handlers for rollback on failure.

_, err := ironflow.Run(ctx, "charge-payment", func() (any, error) {
return chargeCard(order.CardID, order.Total)
})
if err != nil {
return nil, err
}
// Register compensation — runs in reverse order if a later step fails
ironflow.Compensate(ctx, "charge-payment", func() error {
return refundCard(order.CardID, order.Total)
})
// If this step fails, "charge-payment" compensation runs automatically
_, err = ironflow.Run(ctx, "ship-order", func() (any, error) {
return shipOrder(order.ID)
})

Publish a message to a developer pub/sub topic as a durable step.

if err := ironflow.Publish(ctx, "notifications", map[string]any{
"type": "order.shipped",
"orderId": order.ID,
}); err != nil {
return nil, err
}

client := ironflow.NewClient(ironflow.ClientConfig{
ServerURL: "http://localhost:9123", // default: IRONFLOW_SERVER_URL or http://localhost:9123
APIKey: "optional-api-key", // default: IRONFLOW_API_KEY
Timeout: 30 * time.Second, // default: 30s
// Retry configuration (optional)
Retry: &ironflow.ClientRetryConfig{
MaxAttempts: 3,
InitialDelay: 100 * time.Millisecond,
MaxDelay: 10 * time.Second,
BackoffMultiplier: 2.0,
ConnectionRetryDelay: 2 * time.Second,
OnRetry: func(event ironflow.RetryEvent) {
log.Printf("Retry %d/%d: %v (waiting %s)", event.Attempt, event.MaxAttempts, event.Error, event.Delay)
},
},
// Custom logger (optional)
Logger: ironflow.NewNoopLogger(), // disable logging
})

Fire-and-forget event emission:

result, err := client.Emit(ctx, "order.placed", map[string]any{
"orderId": "123",
"total": 99.99,
})
fmt.Println("Event ID:", result.EventID)
fmt.Println("Run IDs:", result.RunIDs)

Emit options:

// With idempotency key (deduplication)
result, err := client.Emit(ctx, "payment.processed", data,
ironflow.WithEmitIdempotencyKey("payment-abc"),
)
// With event version
result, err := client.Emit(ctx, "order.placed", data,
ironflow.WithEmitVersion(2),
)
// With metadata
result, err := client.Emit(ctx, "order.placed", data,
ironflow.WithEmitMetadata(map[string]any{"source": "api"}),
)

Emit an event and wait for the result:

result, err := client.EmitSync(ctx, "order.placed", map[string]any{
"orderId": "123",
}, 30*time.Second)
fmt.Println("Status:", result.Status) // "completed", "failed"
fmt.Println("Output:", result.Output)
fmt.Println("Duration:", result.DurationMs, "ms")

run, err := client.GetRun(ctx, "run_xyz789")
fmt.Println("Status:", run.Status) // "pending", "running", "completed", "failed", "cancelled", "paused"
result, err := client.ListRuns(ctx, &ironflow.ListRunsOptions{
FunctionID: "process-order",
Status: "completed",
Limit: 50,
Cursor: "abc123", // pagination
})
for _, run := range result.Runs {
fmt.Printf("Run %s: %s\n", run.ID, run.Status)
}
fmt.Println("Next page:", result.NextCursor)
run, err := client.CancelRun(ctx, "run_xyz789", "no longer needed")
run, err := client.RetryRun(ctx, "run_xyz789", "") // empty string = retry from beginning
run, err := client.RetryRun(ctx, "run_xyz789", "validate") // retry from specific step

Hot-patch a step’s output to fix a failed run:

err := client.PatchStep(ctx, "step_abc123", map[string]any{
"correctedValue": 42,
}, "fixed bad API response")

Pause running workflows at step boundaries, inspect and modify step outputs, then resume:

// Pause a running workflow at the next step boundary
status, err := client.PauseRun(ctx, "run_abc123")
fmt.Println("Status:", status) // "paused"
// Get the paused state with completed steps
state, err := client.GetPausedState(ctx, "run_abc123")
for _, step := range state.Steps {
fmt.Printf("Step %s: injected=%v output=%s\n", step.Name, step.Injected, step.Output)
}
fmt.Println("Next step:", state.NextStepHint)
// Inject modified output for a step
prevOutput, err := client.InjectStepOutput(ctx, "run_abc123", "step_xyz",
json.RawMessage(`{"corrected": true}`), "Manual correction")
fmt.Println("Previous:", string(prevOutput))
// Resume the run
run, err := client.ResumeRun(ctx, "run_abc123", "")

Configure PauseBehavior on functions to control concurrency lane handling during pause:

fn := ironflow.CreateFunction(ironflow.FunctionConfig{
ID: "process-order",
PauseBehavior: "hold", // "hold" (default) or "release"
})

History Navigation (Time-Travel Debugging)

Section titled “History Navigation (Time-Travel Debugging)”

Inspect historical run state at any timestamp. Requires Recording: true on the function.

// Get run state at a specific point in time
snapshot, err := client.GetRunStateAt(ctx, "run_abc123", time.Date(2024, 1, 15, 10, 30, 0, 0, time.UTC))
fmt.Println("Status at that time:", snapshot.Status)
for _, step := range snapshot.Steps {
fmt.Printf("Step %s: status=%s patched=%v\n", step.Name, step.Status, step.Patched)
}
// Get full timeline of audit events for a run
events, err := client.GetRunTimeline(ctx, "run_abc123")
for _, evt := range events {
fmt.Printf("[%s] %s: %s (significant=%v)\n", evt.Timestamp, evt.EventType, evt.Summary, evt.Significant)
}
// Get a specific step's output at a point in time
stepOutput, err := client.GetStepOutputAt(ctx, "run_abc123", "step_xyz",
time.Date(2024, 1, 15, 10, 30, 0, 0, time.UTC))
fmt.Printf("Output: %s (patched=%v, injected=%v)\n", stepOutput.Output, stepOutput.Patched, stepOutput.Injected)
err := client.RegisterFunction(ctx, ProcessOrder)

Create an HTTP handler for push mode:

handler := ironflow.Serve(ironflow.ServeConfig{
// Required
Functions: []ironflow.Function{fn1, fn2, fn3},
// Optional: signing key for request verification
SigningKey: os.Getenv("IRONFLOW_SIGNING_KEY"),
// Optional: skip verification (dev only)
SkipVerification: true,
// Optional: projections to register
Projections: []ironflow.Projection{OrderStats},
// Optional: webhook sources
Webhooks: []ironflow.Webhook{StripeWebhook},
// Optional: server URL for emitting webhook events
ServerURL: "http://localhost:9123",
// Optional: event schema upcasters
Upcasters: registry,
})
http.Handle("/api/ironflow", handler)
http.ListenAndServe(":3000", nil)

worker := ironflow.NewWorker(ironflow.WorkerConfig{
ServerURL: "http://localhost:9123",
Functions: []ironflow.Function{fn1, fn2},
Projections: []ironflow.Projection{OrderStats},
MaxConcurrentJobs: 10, // default: 10
Labels: map[string]string{"region": "us-east-1"},
HeartbeatInterval: 30 * time.Second, // default: 30s
ReconnectDelay: 5 * time.Second, // default: 5s
Logger: ironflow.NewNoopLogger(),
Upcasters: registry,
})
// Start the worker (blocks until stopped)
err := worker.Run(ctx)
// Graceful drain (wait for active jobs to complete)
worker.Drain()
// Force stop
worker.Stop()

Uses ConnectRPC bidirectional streaming instead of HTTP polling. Lower latency for step delivery and real-time step lifecycle visibility (StepStarted/Completed/Failed events streamed to the server).

worker := ironflow.NewStreamingWorker(ironflow.WorkerConfig{
ServerURL: "http://localhost:9123",
Functions: []ironflow.Function{fn1, fn2},
Projections: []ironflow.Projection{OrderStats},
MaxConcurrentJobs: 10,
Labels: map[string]string{"region": "us-east-1"},
HeartbeatInterval: 30 * time.Second,
ReconnectDelay: 5 * time.Second,
})

Same WorkerConfig, same Run/Drain/Stop methods as NewWorker. The only difference is the transport: a single persistent HTTP/2 connection instead of polling.

When to use streaming vs polling:

  • Use NewStreamingWorker when you need lower step delivery latency or real-time step visibility in the dashboard.
  • Use NewWorker when you want the simplest deployment (HTTP/1.1 compatible, no HTTP/2 requirement).

Both support h2c (HTTP/2 cleartext) for development and TLS for production.

ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGTERM)
defer cancel()
go func() {
<-ctx.Done()
worker.Drain()
}()
if err := worker.Run(ctx); err != nil {
log.Fatal(err)
}

Both NewWorker and NewStreamingWorker use at-least-once delivery semantics. If a worker disconnects while executing a step, the server requeues the job. Completed steps are memoized (not re-executed), but the currently-running step may execute again.

This means steps with side effects (sending emails, charging cards, calling external APIs) must be idempotent. Use idempotency keys, database upserts, or check-before-act patterns to prevent duplicate actions.

This is the same guarantee used by Temporal, Inngest, and every other durable execution engine.


kv := client.KV()
info, err := kv.CreateBucket(ctx, ironflow.BucketConfig{
Name: "sessions",
Description: "User session data",
TTL: time.Hour,
MaxValueSize: 65536,
MaxBytes: 1 << 30, // 1GB
History: 5,
})
err := kv.DeleteBucket(ctx, "sessions")
buckets, err := kv.ListBuckets(ctx)
for _, b := range buckets {
fmt.Printf("%s: %d keys, %d bytes\n", b.Name, b.Values, b.Bytes)
}
info, err := kv.GetBucketInfo(ctx, "sessions")
bucket := kv.Bucket("sessions")
entry, err := bucket.Get(ctx, "user:123")
// entry.Key, entry.Value ([]byte), entry.Revision, entry.CreatedAt, entry.Operation

Unconditional write. Returns the new revision:

revision, err := bucket.Put(ctx, "user:123", []byte(`{"name":"Alice"}`))

Write only if the key does not exist:

revision, err := bucket.Create(ctx, "user:456", []byte(`{"name":"Bob"}`))

Write only if the revision matches:

revision, err := bucket.Update(ctx, "user:123", []byte(`{"name":"Alice Updated"}`), entry.Revision)

Soft-delete (tombstone):

err := bucket.Delete(ctx, "user:123")

Permanently remove key and all history:

err := bucket.Purge(ctx, "user:123")
keys, err := bucket.ListKeys(ctx, "user.*")

cfg := client.Config()

Full replacement of a config document:

result, err := cfg.Set(ctx, "app-settings", map[string]any{
"theme": "dark",
"maxRetries": 3,
})
// result.Revision
entry, err := cfg.Get(ctx, "app-settings")
// entry.Name, entry.Data, entry.Revision, entry.UpdatedAt

Shallow merge — only specified keys are updated:

result, err := cfg.Patch(ctx, "app-settings", map[string]any{
"maxRetries": 5,
})
configs, err := cfg.List(ctx)
for _, entry := range configs {
fmt.Printf("%s: rev %d\n", entry.Name, entry.Revision)
}
err := cfg.Delete(ctx, "app-settings")

Stream real-time updates to a config document. Returns a *ConfigWatcher; call Stop() to end the watch.

watcher, err := cfg.Watch(ctx, "app-settings", ironflow.ConfigWatchCallbacks{
OnUpdate: func(event ironflow.ConfigWatchEvent) {
fmt.Printf("Config %s updated to rev %d\n", event.Name, event.Revision)
},
OnError: func(err error) {
log.Printf("Watch error: %v", err)
},
OnClose: func() {
log.Println("Watch closed")
},
})
defer watcher.Stop()

Declare secrets in the function config, then access them at runtime:

var MyFunction = ironflow.CreateFunction(ironflow.FunctionConfig{
ID: "my-function",
Triggers: []ironflow.Trigger{{Event: "my.event"}},
Secrets: []string{"API_KEY", "DB_PASSWORD"},
}, func(ctx ironflow.Context) (any, error) {
// Get a required secret (returns error if not found)
var apiKey string
if err := ctx.Secrets.Get("API_KEY", &apiKey); err != nil {
return nil, err
}
// Check if an optional secret exists
if ctx.Secrets.Has("OPTIONAL_KEY") {
var optional string
ctx.Secrets.Get("OPTIONAL_KEY", &optional)
}
return nil, nil
})

Real-time event subscriptions over WebSocket or gRPC.

// From an existing client
subClient := client.CreateSubscriptionClient() // WebSocket
grpcClient := client.CreateGrpcSubscriptionClient() // gRPC/HTTP streaming
// Or create directly
subClient := ironflow.NewSubscriptionClient(ironflow.SubscriptionClientConfig{
WSURL: "ws://localhost:9123/ws",
AutoReconnect: true,
ReconnectDelay: 1 * time.Second,
MaxReconnectDelay: 30 * time.Second,
ReconnectBackoff: 2.0,
})
if err := subClient.Connect(ctx); err != nil {
log.Fatal(err)
}
defer subClient.Close()
sub, err := subClient.Subscribe(ctx, "events:order.*", &ironflow.SubscribeOptions{
Replay: 10, // Replay last 10 events
IncludeMetadata: true,
Filter: "data.total > 100", // CEL expression
ConsumerGroup: "processors",
Namespace: "production",
})
for event := range sub.Events() {
fmt.Printf("Received: %s %v\n", event.Topic, event.Data)
}

For consumer groups with manual acknowledgment:

sub, err := subClient.SubscribeAckable(ctx, "order.*", &ironflow.SubscribeOptions{
AckMode: ironflow.AckModeManual,
ConsumerGroup: "order-processors",
})
for event := range sub.Events() {
if err := processOrder(event); err != nil {
sub.Nak(event.ID, 5*time.Second) // Redeliver after 5s
continue
}
sub.Ack(event.ID)
}
sub, err := subClient.SubscribeEntityStream(ctx, "order-123", ironflow.EntitySubscribeOptions{
EntityType: "order",
Replay: 10,
})
ironflow.Patterns.AllRuns() // "system.run.>"
ironflow.Patterns.Run("run_123") // "system.run.run_123.>"
ironflow.Patterns.RunLifecycle("run_123") // "system.run.run_123.*"
ironflow.Patterns.RunSteps("run_123") // "system.run.run_123.step.>"
ironflow.Patterns.AllFunctions() // "system.function.>"
ironflow.Patterns.Function("my-fn") // "system.function.my-fn.>"
ironflow.Patterns.UserEvent("order.placed") // "events:order.placed"
ironflow.Patterns.AllUserEvents() // "events:>"
ironflow.Patterns.AllSecrets() // "system.secret.*"
ironflow.Patterns.Secret("API_KEY") // "system.secret.API_KEY.*"
ironflow.Patterns.SecretAction("updated") // "system.secret.*.updated"
ironflow.Patterns.Topic("my-topic") // "topic:my-topic"
ironflow.Patterns.AllTopics() // "topic:>"
subClient.SetConnectionCallback(func(connected bool) {
if connected {
fmt.Println("Connected")
} else {
fmt.Println("Disconnected")
}
})
fmt.Println(subClient.IsConnected())
fmt.Println(subClient.State()) // "connecting", "connected", "disconnected", "reconnecting"

Load-balanced event delivery across multiple consumers.

group, err := client.CreateConsumerGroup(ctx, ironflow.ConsumerGroupConfig{
Name: "order-processors",
Pattern: "order.*",
AckMode: ironflow.AckModeManual,
Backpressure: ironflow.BackpressureBuffer,
MaxInflight: 100,
MaxRedeliveries: 3,
RedeliverDelayMs: 5000,
})
sub, err := client.JoinConsumerGroup(ctx, "order-processors")
for event := range sub.Events() {
if err := processOrder(event); err != nil {
sub.Nak(event.ID, 5*time.Second)
continue
}
sub.Ack(event.ID)
}
groups, err := client.ListConsumerGroups(ctx)
group, err := client.GetConsumerGroup(ctx, "order-processors")
err := client.DeleteConsumerGroup(ctx, "order-processors")

Append domain events to per-entity streams with optimistic concurrency.

result, err := client.AppendStreamEvent(ctx, "order-123", ironflow.AppendEventInput{
Name: "order.item_added",
Data: map[string]any{"itemId": "item-789"},
EntityType: "order",
},
ironflow.WithExpectedVersion(4), // Optimistic concurrency
ironflow.WithAppendIdempotencyKey("add-item-789"),
ironflow.WithEventVersion(2), // Event schema version
)
fmt.Println("New version:", result.EntityVersion) // 5
fmt.Println("Event ID:", result.EventID)
events, err := client.ReadStream(ctx, "order-123",
ironflow.ReadStreamOpts{
FromVersion: 1,
Limit: 50,
Direction: "forward", // or "backward"
},
)
for _, event := range events {
fmt.Printf("v%d: %s %v\n", event.EntityVersion, event.Name, event.Data)
}
info, err := client.GetStreamInfo(ctx, "order-123")
// info.EntityID, info.EntityType, info.Version, info.EventCount

Build read models from event streams. Two modes: managed (pure reducer, server stores state) and external (side-effect handler, server tracks position).

var OrderStats = ironflow.CreateProjection(ironflow.ProjectionConfig{
Name: "order-stats",
Events: []string{"order.*"},
InitialState: func() map[string]any {
return map[string]any{"total": 0.0, "count": 0}
},
Handler: func(state map[string]any, event ironflow.ProjectionEvent, ctx ironflow.ProjectionContext) (map[string]any, error) {
amount, _ := event.Data["amount"].(float64)
state["total"] = state["total"].(float64) + amount
state["count"] = state["count"].(int) + 1
return state, nil
},
})
var EmailNotifier = ironflow.CreateProjection(ironflow.ProjectionConfig{
Name: "email-notifier",
Events: []string{"order.completed"},
Handler: func(_ map[string]any, event ironflow.ProjectionEvent, ctx ironflow.ProjectionContext) (map[string]any, error) {
sendEmail(event.Data["email"].(string), "Order complete!")
return nil, nil
},
})

Register projections with a worker:

worker := ironflow.NewWorker(ironflow.WorkerConfig{
Functions: []ironflow.Function{ProcessOrder},
Projections: []ironflow.Projection{OrderStats, EmailNotifier},
})
worker.Run(ctx)
FieldTypeDescription
NamestringUnique projection name
Events[]stringEvent names to subscribe (supports wildcards)
ModeProjectionMode"managed" or "external" (auto-detected from InitialState)
HandlerProjectionHandlerHandler function
InitialStatefunc() map[string]anyRequired for managed mode
PartitionKeystringJSONPath for per-partition state
MaxRetriesintDefault: 3
BatchSizeintDefault: 100

Migrate event schemas using an upcaster registry.

registry := ironflow.NewUpcasterRegistry()
// Register v1 → v2 transformer
registry.Register("order.created", 1, 2, func(data json.RawMessage) (json.RawMessage, error) {
var v1 map[string]any
json.Unmarshal(data, &v1)
v1["currency"] = "USD" // Add default currency
return json.Marshal(v1)
})
// Register v2 → v3 transformer
registry.Register("order.created", 2, 3, func(data json.RawMessage) (json.RawMessage, error) {
var v2 map[string]any
json.Unmarshal(data, &v2)
v2["version"] = "v3"
return json.Marshal(v2)
})
// Get latest version
latest := registry.LatestVersion("order.created") // 3
// Upcast from v1 → v3 (chains automatically)
upcasted, err := registry.UpcastToLatest("order.created", rawData, 1)

Pass the registry to Serve or NewWorker for automatic upcasting:

handler := ironflow.Serve(ironflow.ServeConfig{
Functions: []ironflow.Function{ProcessOrder},
Upcasters: registry,
})
worker := ironflow.NewWorker(ironflow.WorkerConfig{
Functions: []ironflow.Function{ProcessOrder},
Upcasters: registry,
})

Transform incoming webhooks into Ironflow events.

var StripeWebhook = ironflow.CreateWebhook(ironflow.WebhookConfig{
ID: "stripe",
Verify: func(req *ironflow.WebhookRequest) error {
// Verify Stripe signature
sig := req.Header.Get("Stripe-Signature")
return verifyStripeSignature(req.Body, sig)
},
Transform: func(payload []byte) (*ironflow.WebhookEvent, error) {
var stripeEvent StripeEvent
if err := json.Unmarshal(payload, &stripeEvent); err != nil {
return nil, err
}
return &ironflow.WebhookEvent{
Name: "stripe." + stripeEvent.Type,
Data: payload,
IdempotencyKey: stripeEvent.ID,
}, nil
},
})
// Register with Serve
handler := ironflow.Serve(ironflow.ServeConfig{
Functions: []ironflow.Function{ProcessPayment},
Webhooks: []ironflow.Webhook{StripeWebhook},
ServerURL: "http://localhost:9123",
})
// Webhook endpoint: POST /webhooks/stripe

Developer pub/sub for topic-based messaging (separate from workflow event triggers).

result, err := client.Publish(ctx, "notifications", map[string]any{
"type": "order.shipped",
"orderId": "123",
},
ironflow.WithPublishIdempotencyKey("ship-123"),
)
fmt.Println("Event ID:", result.EventID)
topics, err := client.ListTopics(ctx)
for _, t := range topics {
fmt.Printf("%s: %d messages, %d consumers\n", t.Name, t.MessageCount, t.ConsumerCount)
}
stats, err := client.GetTopicStats(ctx, "notifications")
fmt.Printf("Messages: %d, Lag: %d\n", stats.MessageCount, stats.Lag)

// Create
key, err := client.CreateAPIKey(ctx, ironflow.CreateAPIKeyInput{
Name: "ci-deploy",
RoleIDs: []string{"role_admin"},
ExpiresIn: "90d",
})
fmt.Println("Key:", key.Key) // Only available on create
// List
keys, err := client.ListAPIKeys(ctx)
// Get
info, err := client.GetAPIKey(ctx, "key_id")
// Rotate
newKey, err := client.RotateAPIKey(ctx, "key_id")
// Delete
err := client.DeleteAPIKey(ctx, "key_id")
org, err := client.CreateOrganization(ctx, ironflow.CreateOrgInput{Name: "Acme Corp"})
orgs, err := client.ListOrganizations(ctx)
org, err := client.GetOrganization(ctx, "org_id")
org, err := client.UpdateOrganization(ctx, "org_id", ironflow.UpdateOrgInput{Name: "Acme Inc"})
err := client.DeleteOrganization(ctx, "org_id")
role, err := client.CreateRole(ctx, ironflow.CreateRoleInput{Name: "deployer"})
roles, err := client.ListRoles(ctx)
role, err := client.GetRole(ctx, "role_id")
role, err := client.UpdateRole(ctx, "role_id", ironflow.UpdateRoleInput{Name: "deploy-admin"})
err := client.DeleteRole(ctx, "role_id")
// Assign/remove policies
err := client.AssignPolicyToRole(ctx, "role_id", "policy_id")
err := client.RemovePolicyFromRole(ctx, "role_id", "policy_id")
policy, err := client.CreatePolicy(ctx, ironflow.CreatePolicyInput{
Name: "allow-emit",
Effect: "allow",
Actions: "emit:*",
Resources: "*",
Condition: `request.namespace == "production"`, // optional CEL
})
policies, err := client.ListPolicies(ctx)
policy, err := client.GetPolicy(ctx, "policy_id")
policy, err := client.UpdatePolicy(ctx, "policy_id", ironflow.UpdatePolicyInput{Name: "allow-all-emit"})
err := client.DeletePolicy(ctx, "policy_id")

result, err := client.GetAuditTrail(ctx, "run_xyz789",
ironflow.GetAuditTrailOpts{
EventType: "step.completed",
FromTimestamp: "2025-01-01T00:00:00Z",
Limit: 100,
Cursor: "", // pagination
},
)
for _, event := range result.Events {
fmt.Printf("[%s] %s: %v\n", event.CreatedAt, event.EventType, event.Payload)
}
fmt.Println("Total:", result.TotalCount)

functions, err := client.ListFunctions(ctx)
for _, fn := range functions {
fmt.Printf("%s (%s): %s\n", fn.ID, fn.Name, fn.Status)
}
workers, err := client.ListWorkers(ctx)
for _, w := range workers {
fmt.Printf("Worker %s: %d active jobs, functions: %v\n", w.ID, w.ActiveJobs, w.FunctionIDs)
}
status, err := client.Health(ctx)
fmt.Println("Server status:", status) // "healthy"
caps, err := client.GetCapabilities(ctx)
fmt.Println("Version:", caps.Version)
fmt.Println("Transports:", caps.Transports)
fmt.Println("Features:", caps.Features)
transport, err := client.DetectTransport(ctx)
// "grpc" or "websocket"

Verify webhook request signatures.

// Generate a signature
signature := ironflow.SignPayload(payloadString, secret)
// "t=1234567890,v1=abcdef..."
// Verify a signature
err := ironflow.VerifySignature(payload, signature, secret, ironflow.DefaultSignatureTolerance)
// Boolean check
valid := ironflow.IsValidSignature(payload, signature, secret, 5*time.Minute)
// Parse signature header
params, err := ironflow.ParseSignature(header)
// params.Timestamp, params.Signatures["v1"]
// Compute expected signature
expected := ironflow.ComputeSignature(payload, secret, timestamp)

import "github.com/sahina/ironflow/sdk/go/ironflow"
// Check error types
var stepErr *ironflow.StepError
if errors.As(err, &stepErr) {
fmt.Println("Step failed:", stepErr.StepID, stepErr.StepName)
}
var timeoutErr *ironflow.StepTimeoutError
if errors.As(err, &timeoutErr) {
fmt.Println("Step timed out:", timeoutErr.StepName, timeoutErr.Timeout)
}
var invokeErr *ironflow.InvokeError
if errors.As(err, &invokeErr) {
fmt.Println("Invoke failed:", invokeErr.FunctionID, invokeErr.ChildRunID)
}
// Check if retryable
if ironflow.IsRetryable(err) {
// Will be automatically retried
}

Mark errors as non-retryable to prevent automatic retries:

result, err := ironflow.Run(ctx, "validate", func() (any, error) {
if !isValid(data) {
return nil, ironflow.NewNonRetryableError("invalid data - do not retry")
}
return data, nil
})
// Or wrap an existing error
return nil, ironflow.WrapNonRetryable(fmt.Errorf("permanent failure: %w", err))
ironflow.ErrFunctionNotFound
ironflow.ErrRunNotFound
ironflow.ErrInvalidSignature
ironflow.ErrSignatureExpired
ironflow.ErrMissingSignature
ironflow.ErrTimeout
ironflow.ErrValidation
ironflow.ErrUnauthorized
ironflow.ErrEnterpriseLicenseRequired
ironflow.ErrForbidden

// Create a logger
logger := ironflow.NewLogger(ironflow.LoggerConfig{
Level: ironflow.LogLevelDebug, // debug, info, warn, error, silent
Prefix: "[my-app]",
})
logger.Info("Starting...", "port", 9123)
logger.Debug("Debug data", "key", "value")
logger.Warn("Slow query", "duration", "500ms")
logger.Error("Failed", "err", err)
// Disable logging
noopLogger := ironflow.NewNoopLogger()
// Parse level from string
level := ironflow.ParseLogLevel("debug")
// Get level from IRONFLOW_LOG_LEVEL env var
level := ironflow.GetLogLevel()

The Logger interface can be implemented with any logger (slog, zap, zerolog):

type Logger interface {
Debug(msg string, args ...any)
Info(msg string, args ...any)
Warn(msg string, args ...any)
Error(msg string, args ...any)
}

VariableDescriptionDefault
IRONFLOW_SERVER_URLServer URLhttp://localhost:9123
IRONFLOW_SIGNING_KEYRequest signing key-
IRONFLOW_API_KEYAPI key for authentication-
IRONFLOW_LOG_LEVELLog level (debug, info, warn, error, silent)info
ironflow.GetServerURL() // from IRONFLOW_SERVER_URL or default
ironflow.GetWebSocketURL(baseURL) // converts http→ws, adds /ws path
ironflow.GetSigningKey() // from IRONFLOW_SIGNING_KEY
ironflow.GetAPIKey() // from IRONFLOW_API_KEY
ConstantValue
DefaultPort9123
DefaultHost"localhost"
DefaultServerURL"http://localhost:9123"
DefaultWebSocketURL"ws://localhost:9123/ws"
DefaultClientTimeout30s
DefaultFunctionTimeout10m
DefaultEmitSyncTimeout30s
DefaultRetryMaxAttempts3
DefaultRetryInitialDelay1s
DefaultRetryBackoffFactor2.0
DefaultRetryMaxDelay5m
DefaultWorkerMaxConcurrentJobs10
DefaultWorkerHeartbeatInterval30s
DefaultWorkerReconnectDelay5s

package main
import (
"context"
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"
"github.com/sahina/ironflow/sdk/go/ironflow"
)
type OrderData struct {
OrderID string `json:"orderId"`
Amount float64 `json:"amount"`
Email string `json:"email"`
}
var ProcessOrder = ironflow.CreateFunction(ironflow.FunctionConfig{
ID: "process-order",
Name: "Process Order",
Triggers: []ironflow.Trigger{{Event: "order.placed"}},
}, func(ctx ironflow.Context) (any, error) {
var order OrderData
if err := ctx.Event.Data(&order); err != nil {
return nil, err
}
// Step 1: Validate
_, err := ironflow.Run(ctx, "validate", func() (any, error) {
if order.Amount <= 0 {
return nil, ironflow.NewNonRetryableError("invalid amount")
}
return map[string]any{"valid": true}, nil
})
if err != nil {
return nil, err
}
// Step 2: Sleep
if err := ironflow.Sleep(ctx, "delay", 5*time.Second); err != nil {
return nil, err
}
// Step 3: Process
result, err := ironflow.Run(ctx, "process", func() (any, error) {
return map[string]any{
"orderId": order.OrderID,
"status": "processed",
}, nil
})
if err != nil {
return nil, err
}
return result, nil
})
func main() {
// Create client and emit events
client := ironflow.NewClient(ironflow.ClientConfig{
ServerURL: ironflow.GetServerURL(),
})
// Register function
if err := client.RegisterFunction(context.Background(), ProcessOrder); err != nil {
log.Printf("Registration: %v", err)
}
// Start as worker
worker := ironflow.NewWorker(ironflow.WorkerConfig{
Functions: []ironflow.Function{ProcessOrder},
})
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGTERM)
defer cancel()
fmt.Println("Worker starting...")
if err := worker.Run(ctx); err != nil {
log.Fatal(err)
}
}

Use NewContextForTest to unit test function handlers:

ctx := ironflow.NewContextForTest(&ironflow.PushRequest{
RunID: "test-run-1",
FunctionID: "process-order",
Event: ironflow.PushEvent{
ID: "evt-1",
Name: "order.placed",
Data: json.RawMessage(`{"orderId":"123","amount":99.99}`),
},
})
result, err := ProcessOrder.Handler(ctx)