Go SDK
The official Go SDK for Ironflow. Provides recorded execution (durable functions), event emission, real-time subscriptions, KV store, config management, entity streams, derived views (projections), and a full management API.
Installation
Section titled “Installation”go get github.com/sahina/ironflow/sdk/go/ironflowRequires Go 1.25+.
Quick Start
Section titled “Quick Start”Define a Function
Section titled “Define a Function”import "github.com/sahina/ironflow/sdk/go/ironflow"
var ProcessOrder = ironflow.CreateFunction(ironflow.FunctionConfig{ ID: "process-order", Triggers: []ironflow.Trigger{{Event: "order.placed"}},}, func(ctx ironflow.Context) (any, error) { var order OrderData if err := ctx.Event.Data(&order); err != nil { return nil, err }
// Run a step (automatically memoized) validated, err := ironflow.Run(ctx, "validate", func() (any, error) { return validateOrder(order) }) if err != nil { return nil, err }
// Sleep (durable - survives restarts) if err := ironflow.Sleep(ctx, "wait", 5*time.Minute); err != nil { return nil, err }
// Wait for external event approval, err := ironflow.WaitForEvent[ApprovalEvent](ctx, "wait-approval", ironflow.EventFilter{ Event: "order.approved", Match: "data.orderId", Timeout: 24 * time.Hour, }) if err != nil { return nil, err }
result, err := ironflow.Run(ctx, "process-payment", func() (any, error) { return processPayment(validated, approval) }) return result, err})Push Mode (Serverless)
Section titled “Push Mode (Serverless)”For serverless deployments (Vercel, Lambda):
handler := ironflow.Serve(ironflow.ServeConfig{ Functions: []ironflow.Function{ProcessOrder}, SigningKey: os.Getenv("IRONFLOW_SIGNING_KEY"),})
http.Handle("/api/ironflow", handler)http.ListenAndServe(":3000", nil)Pull Mode (Worker)
Section titled “Pull Mode (Worker)”For long-running workers with no timeout limits:
worker := ironflow.NewWorker(ironflow.WorkerConfig{ Functions: []ironflow.Function{ProcessOrder}, MaxConcurrentJobs: 10,})
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGTERM)defer cancel()
if err := worker.Run(ctx); err != nil { log.Fatal(err)}Function Definition
Section titled “Function Definition”CreateFunction
Section titled “CreateFunction”var MyFunction = ironflow.CreateFunction(ironflow.FunctionConfig{ ID: "my-function", Name: "My Function",
// Event triggers Triggers: []ironflow.Trigger{ {Event: "user.created"}, {Event: "order.*"}, {Event: "order.placed", Expression: `data.total > 100`}, // CEL filter },
// Retry configuration Retry: &ironflow.RetryConfig{ MaxAttempts: 5, InitialDelay: 1 * time.Second, BackoffFactor: 2.0, MaxDelay: 5 * time.Minute, },
// Execution timeout Timeout: 30 * time.Minute, StepTimeout: 60 * time.Second, // Default timeout for all steps
// Concurrency limit Concurrency: &ironflow.ConcurrencyConfig{ Limit: 10, Key: "event.data.customerId", // Group by customer },
// Execution mode Mode: ironflow.PullMode, // or ironflow.PushMode (default)
// Secrets required at runtime Secrets: []string{"STRIPE_KEY", "DB_PASSWORD"},
// Declarative cancel-on-event. // Auto-cancels in-flight runs when a matching event arrives. // OR semantics across specs. Tenant-isolated by env_id. // See how-to: ../../how-to-guides/workflows/cancel-on-event.md CancelOn: []ironflow.CancelOnConfig{ {Event: "order.cancelled", Match: "orderId"}, },}, func(ctx ironflow.Context) (any, error) { return map[string]any{"status": "done"}, nil})Function Context
Section titled “Function Context”The handler receives a Context:
type Context struct { Event Event // Triggering event Run RunInfo // Run info (ID, FunctionID, Attempt, StartedAt) Secrets SecretsReader // Read-only access to resolved secrets}Event Data
Section titled “Event Data”func(ctx ironflow.Context) (any, error) { // Unmarshal event data into a struct var order OrderData if err := ctx.Event.Data(&order); err != nil { return nil, err }
// Access raw event fields eventName := ctx.Event.Name // e.g. "order.placed" eventID := ctx.Event.ID eventVersion := ctx.Event.Version // Schema version (default: 1) source := ctx.Event.Source // "api", "cron", "webhook"
// Access user-defined metadata attached when the event was emitted traceID, _ := ctx.Event.Metadata["traceId"].(string) eventSource, _ := ctx.Event.Metadata["source"].(string)
return nil, nil}CreateHandler (Simplified API)
Section titled “CreateHandler (Simplified API)”Type-safe generic API with auto-generated function IDs:
type OrderEvent struct { OrderID string `json:"orderId"` Total float64 `json:"total"`}
// Minimal config — ID auto-generated as "order-placed-handler"var OrderHandler = ironflow.CreateHandler(ironflow.HandlerConfig[OrderEvent]{ Event: "order.placed", Handler: func(event OrderEvent, ctx *ironflow.HandlerContext) (any, error) { ctx.Logger.Info("Processing order", "id", event.OrderID) return map[string]any{"processed": true}, nil },})
// With all optionsvar HighValueHandler = ironflow.CreateHandler(ironflow.HandlerConfig[OrderEvent]{ Event: "order.placed", Handler: func(event OrderEvent, ctx *ironflow.HandlerContext) (any, error) { result, err := ctx.Step.Run("process", func() (any, error) { return processOrder(event) }) return result, err }, Options: &ironflow.HandlerOptions{ ID: "high-value-order-handler", Filter: `data.total > 1000`, Retry: &ironflow.RetryConfig{MaxAttempts: 5}, Concurrency: &ironflow.ConcurrencyConfig{ Limit: 10, Key: "event.data.customerId", }, },})The HandlerContext provides:
ctx.Event // Typed event datactx.EventMeta // Event metadata (ID, Name, Version, Timestamp, Source)ctx.Run // Run info (ID, FunctionID, Attempt)ctx.Secrets // SecretsReaderctx.Step // StepClient (Run, Sleep, SleepUntil, WaitForEvent, Parallel, Map, Compensate)ctx.Logger // LoggerStep Primitives
Section titled “Step Primitives”Execute a memoized step. Results are cached — if the function restarts, completed steps are skipped.
result, err := ironflow.Run(ctx, "step-id", func() (any, error) { return someApiCall()})
// With timeout overrideresult, err := ironflow.Run(ctx, "step-id", func() (any, error) { return someApiCall()}, ironflow.WithTimeout(10*time.Second))Durable pause. Survives function restarts and server upgrades.
if err := ironflow.Sleep(ctx, "wait", 5*time.Minute); err != nil { return nil, err}SleepUntil
Section titled “SleepUntil”Sleep until a specific time.
if err := ironflow.SleepUntil(ctx, "wait-until", time.Date(2025, 12, 25, 0, 0, 0, 0, time.UTC)); err != nil { return nil, err}WaitForEvent
Section titled “WaitForEvent”Wait for an external event matching a filter.
event, err := ironflow.WaitForEvent[ApprovalEvent](ctx, "wait-approval", ironflow.EventFilter{ Event: "order.approved", Match: "data.orderId", // JSON path for matching Timeout: 24 * time.Hour, // Default: 7 days})Parallel
Section titled “Parallel”Execute multiple branches concurrently.
results, err := ironflow.Parallel(ctx, "fetch-all", []func(*ironflow.BranchContext) (any, error){ func(b *ironflow.BranchContext) (any, error) { return ironflow.RunWithBranch(b, "fetch-user", func() (any, error) { return fetchUser(userID) }) }, func(b *ironflow.BranchContext) (any, error) { return ironflow.RunWithBranch(b, "fetch-orders", func() (any, error) { return fetchOrders(userID) }) }, }, ironflow.ParallelOptions{Concurrency: 2, OnError: "failFast"},)Map over items with automatic parallelization.
results, err := ironflow.Map(ctx, "process-items", items, func(item Item, b *ironflow.BranchContext, index int) (Result, error) { return ironflow.RunWithBranch(b, fmt.Sprintf("process-%d", index), func() (Result, error) { return processItem(item) }) }, ironflow.ParallelOptions{Concurrency: 5},)Invoke
Section titled “Invoke”Call another Ironflow function and wait for the result.
result, err := ironflow.Invoke[PaymentResult](ctx, "process-payment", map[string]any{ "orderId": "123", "amount": 99.99,})
// With custom timeout (default: 30s)result, err := ironflow.Invoke[PaymentResult](ctx, "process-payment", input, ironflow.WithInvokeTimeout(60*time.Second),)InvokeAsync
Section titled “InvokeAsync”Call another function without waiting for the result.
asyncResult, err := ironflow.InvokeAsync(ctx, "send-email", map[string]any{ "to": "user@example.com", "subject": "Order Confirmed",})fmt.Println("Child run:", asyncResult.RunID)Compensate (Saga Pattern)
Section titled “Compensate (Saga Pattern)”Register compensation handlers for rollback on failure.
_, err := ironflow.Run(ctx, "charge-payment", func() (any, error) { return chargeCard(order.CardID, order.Total)})if err != nil { return nil, err}
// Register compensation — runs in reverse order if a later step failsironflow.Compensate(ctx, "charge-payment", func() error { return refundCard(order.CardID, order.Total)})
// If this step fails, "charge-payment" compensation runs automatically_, err = ironflow.Run(ctx, "ship-order", func() (any, error) { return shipOrder(order.ID)})Publish (Step)
Section titled “Publish (Step)”Publish a message to a developer pub/sub topic as a durable step.
if err := ironflow.Publish(ctx, "notifications", map[string]any{ "type": "order.shipped", "orderId": order.ID,}); err != nil { return nil, err}Client
Section titled “Client”NewClient
Section titled “NewClient”client := ironflow.NewClient(ironflow.ClientConfig{ ServerURL: "http://localhost:9123", // default: IRONFLOW_SERVER_URL or http://localhost:9123 APIKey: "optional-api-key", // default: IRONFLOW_API_KEY Timeout: 30 * time.Second, // default: 30s
// Retry configuration (optional) Retry: &ironflow.ClientRetryConfig{ MaxAttempts: 3, InitialDelay: 100 * time.Millisecond, MaxDelay: 10 * time.Second, BackoffMultiplier: 2.0, ConnectionRetryDelay: 2 * time.Second, OnRetry: func(event ironflow.RetryEvent) { log.Printf("Retry %d/%d: %v (waiting %s)", event.Attempt, event.MaxAttempts, event.Error, event.Delay) }, },
// Custom logger (optional) Logger: ironflow.NewNoopLogger(), // disable logging})Event Emission
Section titled “Event Emission”Fire-and-forget event emission:
result, err := client.Emit(ctx, "order.placed", map[string]any{ "orderId": "123", "total": 99.99,})fmt.Println("Event ID:", result.EventID)fmt.Println("Run IDs:", result.RunIDs)Emit options:
// With idempotency key (deduplication)result, err := client.Emit(ctx, "payment.processed", data, ironflow.WithEmitIdempotencyKey("payment-abc"),)
// With event versionresult, err := client.Emit(ctx, "order.placed", data, ironflow.WithEmitVersion(2),)
// With metadataresult, err := client.Emit(ctx, "order.placed", data, ironflow.WithEmitMetadata(map[string]any{"source": "api"}),)EmitSync
Section titled “EmitSync”Emit an event and wait for the result:
result, err := client.EmitSync(ctx, "order.placed", map[string]any{ "orderId": "123",}, 30*time.Second)
fmt.Println("Status:", result.Status) // "completed", "failed"fmt.Println("Output:", result.Output)fmt.Println("Duration:", result.DurationMs, "ms")Run Management
Section titled “Run Management”GetRun
Section titled “GetRun”run, err := client.GetRun(ctx, "run_xyz789")fmt.Println("Status:", run.Status) // "pending", "running", "completed", "failed", "cancelled", "paused"ListRuns
Section titled “ListRuns”result, err := client.ListRuns(ctx, &ironflow.ListRunsOptions{ FunctionID: "process-order", Status: "completed", Limit: 50, Cursor: "abc123", // pagination})
for _, run := range result.Runs { fmt.Printf("Run %s: %s\n", run.ID, run.Status)}fmt.Println("Next page:", result.NextCursor)CancelRun
Section titled “CancelRun”run, err := client.CancelRun(ctx, "run_xyz789", "no longer needed")RetryRun
Section titled “RetryRun”run, err := client.RetryRun(ctx, "run_xyz789", "") // empty string = retry from beginningrun, err := client.RetryRun(ctx, "run_xyz789", "validate") // retry from specific stepPatchStep
Section titled “PatchStep”Hot-patch a step’s output to fix a failed run:
err := client.PatchStep(ctx, "step_abc123", map[string]any{ "correctedValue": 42,}, "fixed bad API response")History Editing (Scoped Injection)
Section titled “History Editing (Scoped Injection)”Pause running workflows at step boundaries, inspect and modify step outputs, then resume:
// Pause a running workflow at the next step boundarystatus, err := client.PauseRun(ctx, "run_abc123")fmt.Println("Status:", status) // "paused"
// Get the paused state with completed stepsstate, err := client.GetPausedState(ctx, "run_abc123")for _, step := range state.Steps { fmt.Printf("Step %s: injected=%v output=%s\n", step.Name, step.Injected, step.Output)}fmt.Println("Next step:", state.NextStepHint)
// Inject modified output for a stepprevOutput, err := client.InjectStepOutput(ctx, "run_abc123", "step_xyz", json.RawMessage(`{"corrected": true}`), "Manual correction")fmt.Println("Previous:", string(prevOutput))
// Resume the runrun, err := client.ResumeRun(ctx, "run_abc123", "")Configure PauseBehavior on functions to control concurrency lane handling during pause:
fn := ironflow.CreateFunction(ironflow.FunctionConfig{ ID: "process-order", PauseBehavior: "hold", // "hold" (default) or "release"})History Navigation (Time-Travel Debugging)
Section titled “History Navigation (Time-Travel Debugging)”Inspect historical run state at any timestamp. Requires Recording: true on the function.
// Get run state at a specific point in timesnapshot, err := client.GetRunStateAt(ctx, "run_abc123", time.Date(2024, 1, 15, 10, 30, 0, 0, time.UTC))fmt.Println("Status at that time:", snapshot.Status)for _, step := range snapshot.Steps { fmt.Printf("Step %s: status=%s patched=%v\n", step.Name, step.Status, step.Patched)}
// Get full timeline of audit events for a runevents, err := client.GetRunTimeline(ctx, "run_abc123")for _, evt := range events { fmt.Printf("[%s] %s: %s (significant=%v)\n", evt.Timestamp, evt.EventType, evt.Summary, evt.Significant)}
// Get a specific step's output at a point in timestepOutput, err := client.GetStepOutputAt(ctx, "run_abc123", "step_xyz", time.Date(2024, 1, 15, 10, 30, 0, 0, time.UTC))fmt.Printf("Output: %s (patched=%v, injected=%v)\n", stepOutput.Output, stepOutput.Patched, stepOutput.Injected)RegisterFunction
Section titled “RegisterFunction”err := client.RegisterFunction(ctx, ProcessOrder)Serve Handler
Section titled “Serve Handler”Create an HTTP handler for push mode:
handler := ironflow.Serve(ironflow.ServeConfig{ // Required Functions: []ironflow.Function{fn1, fn2, fn3},
// Optional: signing key for request verification SigningKey: os.Getenv("IRONFLOW_SIGNING_KEY"),
// Optional: skip verification (dev only) SkipVerification: true,
// Optional: projections to register Projections: []ironflow.Projection{OrderStats},
// Optional: webhook sources Webhooks: []ironflow.Webhook{StripeWebhook},
// Optional: server URL for emitting webhook events ServerURL: "http://localhost:9123",
// Optional: event schema upcasters Upcasters: registry,})
http.Handle("/api/ironflow", handler)http.ListenAndServe(":3000", nil)Worker
Section titled “Worker”NewWorker
Section titled “NewWorker”worker := ironflow.NewWorker(ironflow.WorkerConfig{ ServerURL: "http://localhost:9123", Functions: []ironflow.Function{fn1, fn2}, Projections: []ironflow.Projection{OrderStats}, MaxConcurrentJobs: 10, // default: 10 Labels: map[string]string{"region": "us-east-1"}, HeartbeatInterval: 30 * time.Second, // default: 30s ReconnectDelay: 5 * time.Second, // default: 5s Logger: ironflow.NewNoopLogger(), Upcasters: registry,})Worker Methods
Section titled “Worker Methods”// Start the worker (blocks until stopped)err := worker.Run(ctx)
// Graceful drain (wait for active jobs to complete)worker.Drain()
// Force stopworker.Stop()NewStreamingWorker
Section titled “NewStreamingWorker”Uses ConnectRPC bidirectional streaming instead of HTTP polling. Lower latency for step delivery and real-time step lifecycle visibility (StepStarted/Completed/Failed events streamed to the server).
worker := ironflow.NewStreamingWorker(ironflow.WorkerConfig{ ServerURL: "http://localhost:9123", Functions: []ironflow.Function{fn1, fn2}, Projections: []ironflow.Projection{OrderStats}, MaxConcurrentJobs: 10, Labels: map[string]string{"region": "us-east-1"}, HeartbeatInterval: 30 * time.Second, ReconnectDelay: 5 * time.Second,})Same WorkerConfig, same Run/Drain/Stop methods as NewWorker. The only difference is the transport: a single persistent HTTP/2 connection instead of polling.
When to use streaming vs polling:
- Use
NewStreamingWorkerwhen you need lower step delivery latency or real-time step visibility in the dashboard. - Use
NewWorkerwhen you want the simplest deployment (HTTP/1.1 compatible, no HTTP/2 requirement).
Both support h2c (HTTP/2 cleartext) for development and TLS for production.
Graceful Shutdown
Section titled “Graceful Shutdown”ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGTERM)defer cancel()
go func() { <-ctx.Done() worker.Drain()}()
if err := worker.Run(ctx); err != nil { log.Fatal(err)}Idempotency
Section titled “Idempotency”Both NewWorker and NewStreamingWorker use at-least-once delivery semantics. If a worker disconnects while executing a step, the server requeues the job. Completed steps are memoized (not re-executed), but the currently-running step may execute again.
This means steps with side effects (sending emails, charging cards, calling external APIs) must be idempotent. Use idempotency keys, database upserts, or check-before-act patterns to prevent duplicate actions.
This is the same guarantee used by Temporal, Inngest, and every other durable execution engine.
KV Store
Section titled “KV Store”client.KV()
Section titled “client.KV()”kv := client.KV()CreateBucket
Section titled “CreateBucket”info, err := kv.CreateBucket(ctx, ironflow.BucketConfig{ Name: "sessions", Description: "User session data", TTL: time.Hour, MaxValueSize: 65536, MaxBytes: 1 << 30, // 1GB History: 5,})DeleteBucket
Section titled “DeleteBucket”err := kv.DeleteBucket(ctx, "sessions")ListBuckets
Section titled “ListBuckets”buckets, err := kv.ListBuckets(ctx)for _, b := range buckets { fmt.Printf("%s: %d keys, %d bytes\n", b.Name, b.Values, b.Bytes)}GetBucketInfo
Section titled “GetBucketInfo”info, err := kv.GetBucketInfo(ctx, "sessions")Bucket Handle
Section titled “Bucket Handle”bucket := kv.Bucket("sessions")entry, err := bucket.Get(ctx, "user:123")// entry.Key, entry.Value ([]byte), entry.Revision, entry.CreatedAt, entry.OperationUnconditional write. Returns the new revision:
revision, err := bucket.Put(ctx, "user:123", []byte(`{"name":"Alice"}`))Create (If-Not-Exists)
Section titled “Create (If-Not-Exists)”Write only if the key does not exist:
revision, err := bucket.Create(ctx, "user:456", []byte(`{"name":"Bob"}`))Update (Compare-and-Swap)
Section titled “Update (Compare-and-Swap)”Write only if the revision matches:
revision, err := bucket.Update(ctx, "user:123", []byte(`{"name":"Alice Updated"}`), entry.Revision)Delete
Section titled “Delete”Soft-delete (tombstone):
err := bucket.Delete(ctx, "user:123")Permanently remove key and all history:
err := bucket.Purge(ctx, "user:123")ListKeys
Section titled “ListKeys”keys, err := bucket.ListKeys(ctx, "user.*")Config Management
Section titled “Config Management”client.Config()
Section titled “client.Config()”cfg := client.Config()Full replacement of a config document:
result, err := cfg.Set(ctx, "app-settings", map[string]any{ "theme": "dark", "maxRetries": 3,})// result.Revisionentry, err := cfg.Get(ctx, "app-settings")// entry.Name, entry.Data, entry.Revision, entry.UpdatedAtShallow merge — only specified keys are updated:
result, err := cfg.Patch(ctx, "app-settings", map[string]any{ "maxRetries": 5,})configs, err := cfg.List(ctx)for _, entry := range configs { fmt.Printf("%s: rev %d\n", entry.Name, entry.Revision)}Delete
Section titled “Delete”err := cfg.Delete(ctx, "app-settings")Stream real-time updates to a config document. Returns a *ConfigWatcher; call Stop() to end the watch.
watcher, err := cfg.Watch(ctx, "app-settings", ironflow.ConfigWatchCallbacks{ OnUpdate: func(event ironflow.ConfigWatchEvent) { fmt.Printf("Config %s updated to rev %d\n", event.Name, event.Revision) }, OnError: func(err error) { log.Printf("Watch error: %v", err) }, OnClose: func() { log.Println("Watch closed") },})defer watcher.Stop()Secrets
Section titled “Secrets”Declare secrets in the function config, then access them at runtime:
var MyFunction = ironflow.CreateFunction(ironflow.FunctionConfig{ ID: "my-function", Triggers: []ironflow.Trigger{{Event: "my.event"}}, Secrets: []string{"API_KEY", "DB_PASSWORD"},}, func(ctx ironflow.Context) (any, error) { // Get a required secret (returns error if not found) var apiKey string if err := ctx.Secrets.Get("API_KEY", &apiKey); err != nil { return nil, err }
// Check if an optional secret exists if ctx.Secrets.Has("OPTIONAL_KEY") { var optional string ctx.Secrets.Get("OPTIONAL_KEY", &optional) }
return nil, nil})Subscriptions
Section titled “Subscriptions”Real-time event subscriptions over WebSocket or gRPC.
Creating a Subscription Client
Section titled “Creating a Subscription Client”// From an existing clientsubClient := client.CreateSubscriptionClient() // WebSocketgrpcClient := client.CreateGrpcSubscriptionClient() // gRPC/HTTP streaming
// Or create directlysubClient := ironflow.NewSubscriptionClient(ironflow.SubscriptionClientConfig{ WSURL: "ws://localhost:9123/ws", AutoReconnect: true, ReconnectDelay: 1 * time.Second, MaxReconnectDelay: 30 * time.Second, ReconnectBackoff: 2.0,})Connect / Close
Section titled “Connect / Close”if err := subClient.Connect(ctx); err != nil { log.Fatal(err)}defer subClient.Close()Subscribe
Section titled “Subscribe”sub, err := subClient.Subscribe(ctx, "events:order.*", &ironflow.SubscribeOptions{ Replay: 10, // Replay last 10 events IncludeMetadata: true, Filter: "data.total > 100", // CEL expression ConsumerGroup: "processors", Namespace: "production",})
for event := range sub.Events() { fmt.Printf("Received: %s %v\n", event.Topic, event.Data)}Ackable Subscriptions
Section titled “Ackable Subscriptions”For consumer groups with manual acknowledgment:
sub, err := subClient.SubscribeAckable(ctx, "order.*", &ironflow.SubscribeOptions{ AckMode: ironflow.AckModeManual, ConsumerGroup: "order-processors",})
for event := range sub.Events() { if err := processOrder(event); err != nil { sub.Nak(event.ID, 5*time.Second) // Redeliver after 5s continue } sub.Ack(event.ID)}Entity Stream Subscriptions
Section titled “Entity Stream Subscriptions”sub, err := subClient.SubscribeEntityStream(ctx, "order-123", ironflow.EntitySubscribeOptions{ EntityType: "order", Replay: 10,})Pattern Helpers
Section titled “Pattern Helpers”ironflow.Patterns.AllRuns() // "system.run.>"ironflow.Patterns.Run("run_123") // "system.run.run_123.>"ironflow.Patterns.RunLifecycle("run_123") // "system.run.run_123.*"ironflow.Patterns.RunSteps("run_123") // "system.run.run_123.step.>"ironflow.Patterns.AllFunctions() // "system.function.>"ironflow.Patterns.Function("my-fn") // "system.function.my-fn.>"ironflow.Patterns.UserEvent("order.placed") // "events:order.placed"ironflow.Patterns.AllUserEvents() // "events:>"ironflow.Patterns.AllSecrets() // "system.secret.*"ironflow.Patterns.Secret("API_KEY") // "system.secret.API_KEY.*"ironflow.Patterns.SecretAction("updated") // "system.secret.*.updated"ironflow.Patterns.Topic("my-topic") // "topic:my-topic"ironflow.Patterns.AllTopics() // "topic:>"Connection State
Section titled “Connection State”subClient.SetConnectionCallback(func(connected bool) { if connected { fmt.Println("Connected") } else { fmt.Println("Disconnected") }})
fmt.Println(subClient.IsConnected())fmt.Println(subClient.State()) // "connecting", "connected", "disconnected", "reconnecting"Consumer Groups
Section titled “Consumer Groups”Load-balanced event delivery across multiple consumers.
CreateConsumerGroup
Section titled “CreateConsumerGroup”group, err := client.CreateConsumerGroup(ctx, ironflow.ConsumerGroupConfig{ Name: "order-processors", Pattern: "order.*", AckMode: ironflow.AckModeManual, Backpressure: ironflow.BackpressureBuffer, MaxInflight: 100, MaxRedeliveries: 3, RedeliverDelayMs: 5000,})JoinConsumerGroup
Section titled “JoinConsumerGroup”sub, err := client.JoinConsumerGroup(ctx, "order-processors")for event := range sub.Events() { if err := processOrder(event); err != nil { sub.Nak(event.ID, 5*time.Second) continue } sub.Ack(event.ID)}ListConsumerGroups
Section titled “ListConsumerGroups”groups, err := client.ListConsumerGroups(ctx)GetConsumerGroup
Section titled “GetConsumerGroup”group, err := client.GetConsumerGroup(ctx, "order-processors")DeleteConsumerGroup
Section titled “DeleteConsumerGroup”err := client.DeleteConsumerGroup(ctx, "order-processors")Entity Streams
Section titled “Entity Streams”Append domain events to per-entity streams with optimistic concurrency.
AppendStreamEvent
Section titled “AppendStreamEvent”result, err := client.AppendStreamEvent(ctx, "order-123", ironflow.AppendEventInput{ Name: "order.item_added", Data: map[string]any{"itemId": "item-789"}, EntityType: "order",}, ironflow.WithExpectedVersion(4), // Optimistic concurrency ironflow.WithAppendIdempotencyKey("add-item-789"), ironflow.WithEventVersion(2), // Event schema version)
fmt.Println("New version:", result.EntityVersion) // 5fmt.Println("Event ID:", result.EventID)ReadStream
Section titled “ReadStream”events, err := client.ReadStream(ctx, "order-123", ironflow.ReadStreamOpts{ FromVersion: 1, Limit: 50, Direction: "forward", // or "backward" },)
for _, event := range events { fmt.Printf("v%d: %s %v\n", event.EntityVersion, event.Name, event.Data)}GetStreamInfo
Section titled “GetStreamInfo”info, err := client.GetStreamInfo(ctx, "order-123")// info.EntityID, info.EntityType, info.Version, info.EventCountProjections
Section titled “Projections”Build read models from event streams. Two modes: managed (pure reducer, server stores state) and external (side-effect handler, server tracks position).
Managed Projection
Section titled “Managed Projection”var OrderStats = ironflow.CreateProjection(ironflow.ProjectionConfig{ Name: "order-stats", Events: []string{"order.*"}, InitialState: func() map[string]any { return map[string]any{"total": 0.0, "count": 0} }, Handler: func(state map[string]any, event ironflow.ProjectionEvent, ctx ironflow.ProjectionContext) (map[string]any, error) { amount, _ := event.Data["amount"].(float64) state["total"] = state["total"].(float64) + amount state["count"] = state["count"].(int) + 1 return state, nil },})External Projection
Section titled “External Projection”var EmailNotifier = ironflow.CreateProjection(ironflow.ProjectionConfig{ Name: "email-notifier", Events: []string{"order.completed"}, Handler: func(_ map[string]any, event ironflow.ProjectionEvent, ctx ironflow.ProjectionContext) (map[string]any, error) { sendEmail(event.Data["email"].(string), "Order complete!") return nil, nil },})Running Projections
Section titled “Running Projections”Register projections with a worker:
worker := ironflow.NewWorker(ironflow.WorkerConfig{ Functions: []ironflow.Function{ProcessOrder}, Projections: []ironflow.Projection{OrderStats, EmailNotifier},})worker.Run(ctx)ProjectionConfig
Section titled “ProjectionConfig”| Field | Type | Description |
|---|---|---|
Name | string | Unique projection name |
Events | []string | Event names to subscribe (supports wildcards) |
Mode | ProjectionMode | "managed" or "external" (auto-detected from InitialState) |
Handler | ProjectionHandler | Handler function |
InitialState | func() map[string]any | Required for managed mode |
PartitionKey | string | JSONPath for per-partition state |
MaxRetries | int | Default: 3 |
BatchSize | int | Default: 100 |
Event Versioning
Section titled “Event Versioning”Migrate event schemas using an upcaster registry.
registry := ironflow.NewUpcasterRegistry()
// Register v1 → v2 transformerregistry.Register("order.created", 1, 2, func(data json.RawMessage) (json.RawMessage, error) { var v1 map[string]any json.Unmarshal(data, &v1) v1["currency"] = "USD" // Add default currency return json.Marshal(v1)})
// Register v2 → v3 transformerregistry.Register("order.created", 2, 3, func(data json.RawMessage) (json.RawMessage, error) { var v2 map[string]any json.Unmarshal(data, &v2) v2["version"] = "v3" return json.Marshal(v2)})
// Get latest versionlatest := registry.LatestVersion("order.created") // 3
// Upcast from v1 → v3 (chains automatically)upcasted, err := registry.UpcastToLatest("order.created", rawData, 1)Pass the registry to Serve or NewWorker for automatic upcasting:
handler := ironflow.Serve(ironflow.ServeConfig{ Functions: []ironflow.Function{ProcessOrder}, Upcasters: registry,})
worker := ironflow.NewWorker(ironflow.WorkerConfig{ Functions: []ironflow.Function{ProcessOrder}, Upcasters: registry,})Webhooks
Section titled “Webhooks”Transform incoming webhooks into Ironflow events.
var StripeWebhook = ironflow.CreateWebhook(ironflow.WebhookConfig{ ID: "stripe", Verify: func(req *ironflow.WebhookRequest) error { // Verify Stripe signature sig := req.Header.Get("Stripe-Signature") return verifyStripeSignature(req.Body, sig) }, Transform: func(payload []byte) (*ironflow.WebhookEvent, error) { var stripeEvent StripeEvent if err := json.Unmarshal(payload, &stripeEvent); err != nil { return nil, err } return &ironflow.WebhookEvent{ Name: "stripe." + stripeEvent.Type, Data: payload, IdempotencyKey: stripeEvent.ID, }, nil },})
// Register with Servehandler := ironflow.Serve(ironflow.ServeConfig{ Functions: []ironflow.Function{ProcessPayment}, Webhooks: []ironflow.Webhook{StripeWebhook}, ServerURL: "http://localhost:9123",})// Webhook endpoint: POST /webhooks/stripePub/Sub Topics
Section titled “Pub/Sub Topics”Developer pub/sub for topic-based messaging (separate from workflow event triggers).
Publish (Client)
Section titled “Publish (Client)”result, err := client.Publish(ctx, "notifications", map[string]any{ "type": "order.shipped", "orderId": "123",}, ironflow.WithPublishIdempotencyKey("ship-123"),)fmt.Println("Event ID:", result.EventID)ListTopics
Section titled “ListTopics”topics, err := client.ListTopics(ctx)for _, t := range topics { fmt.Printf("%s: %d messages, %d consumers\n", t.Name, t.MessageCount, t.ConsumerCount)}GetTopicStats
Section titled “GetTopicStats”stats, err := client.GetTopicStats(ctx, "notifications")fmt.Printf("Messages: %d, Lag: %d\n", stats.MessageCount, stats.Lag)Auth Management
Section titled “Auth Management”API Keys
Section titled “API Keys”// Createkey, err := client.CreateAPIKey(ctx, ironflow.CreateAPIKeyInput{ Name: "ci-deploy", RoleIDs: []string{"role_admin"}, ExpiresIn: "90d",})fmt.Println("Key:", key.Key) // Only available on create
// Listkeys, err := client.ListAPIKeys(ctx)
// Getinfo, err := client.GetAPIKey(ctx, "key_id")
// RotatenewKey, err := client.RotateAPIKey(ctx, "key_id")
// Deleteerr := client.DeleteAPIKey(ctx, "key_id")Organizations
Section titled “Organizations”org, err := client.CreateOrganization(ctx, ironflow.CreateOrgInput{Name: "Acme Corp"})orgs, err := client.ListOrganizations(ctx)org, err := client.GetOrganization(ctx, "org_id")org, err := client.UpdateOrganization(ctx, "org_id", ironflow.UpdateOrgInput{Name: "Acme Inc"})err := client.DeleteOrganization(ctx, "org_id")role, err := client.CreateRole(ctx, ironflow.CreateRoleInput{Name: "deployer"})roles, err := client.ListRoles(ctx)role, err := client.GetRole(ctx, "role_id")role, err := client.UpdateRole(ctx, "role_id", ironflow.UpdateRoleInput{Name: "deploy-admin"})err := client.DeleteRole(ctx, "role_id")
// Assign/remove policieserr := client.AssignPolicyToRole(ctx, "role_id", "policy_id")err := client.RemovePolicyFromRole(ctx, "role_id", "policy_id")Policies
Section titled “Policies”policy, err := client.CreatePolicy(ctx, ironflow.CreatePolicyInput{ Name: "allow-emit", Effect: "allow", Actions: "emit:*", Resources: "*", Condition: `request.namespace == "production"`, // optional CEL})policies, err := client.ListPolicies(ctx)policy, err := client.GetPolicy(ctx, "policy_id")policy, err := client.UpdatePolicy(ctx, "policy_id", ironflow.UpdatePolicyInput{Name: "allow-all-emit"})err := client.DeletePolicy(ctx, "policy_id")History Inspection (Audit Trail)
Section titled “History Inspection (Audit Trail)”result, err := client.GetAuditTrail(ctx, "run_xyz789", ironflow.GetAuditTrailOpts{ EventType: "step.completed", FromTimestamp: "2025-01-01T00:00:00Z", Limit: 100, Cursor: "", // pagination },)
for _, event := range result.Events { fmt.Printf("[%s] %s: %v\n", event.CreatedAt, event.EventType, event.Payload)}fmt.Println("Total:", result.TotalCount)Server Introspection
Section titled “Server Introspection”ListFunctions
Section titled “ListFunctions”functions, err := client.ListFunctions(ctx)for _, fn := range functions { fmt.Printf("%s (%s): %s\n", fn.ID, fn.Name, fn.Status)}ListWorkers
Section titled “ListWorkers”workers, err := client.ListWorkers(ctx)for _, w := range workers { fmt.Printf("Worker %s: %d active jobs, functions: %v\n", w.ID, w.ActiveJobs, w.FunctionIDs)}Health
Section titled “Health”status, err := client.Health(ctx)fmt.Println("Server status:", status) // "healthy"GetCapabilities
Section titled “GetCapabilities”caps, err := client.GetCapabilities(ctx)fmt.Println("Version:", caps.Version)fmt.Println("Transports:", caps.Transports)fmt.Println("Features:", caps.Features)DetectTransport
Section titled “DetectTransport”transport, err := client.DetectTransport(ctx)// "grpc" or "websocket"Signature Verification
Section titled “Signature Verification”Verify webhook request signatures.
// Generate a signaturesignature := ironflow.SignPayload(payloadString, secret)// "t=1234567890,v1=abcdef..."
// Verify a signatureerr := ironflow.VerifySignature(payload, signature, secret, ironflow.DefaultSignatureTolerance)
// Boolean checkvalid := ironflow.IsValidSignature(payload, signature, secret, 5*time.Minute)
// Parse signature headerparams, err := ironflow.ParseSignature(header)// params.Timestamp, params.Signatures["v1"]
// Compute expected signatureexpected := ironflow.ComputeSignature(payload, secret, timestamp)Error Handling
Section titled “Error Handling”Built-in Errors
Section titled “Built-in Errors”import "github.com/sahina/ironflow/sdk/go/ironflow"
// Check error typesvar stepErr *ironflow.StepErrorif errors.As(err, &stepErr) { fmt.Println("Step failed:", stepErr.StepID, stepErr.StepName)}
var timeoutErr *ironflow.StepTimeoutErrorif errors.As(err, &timeoutErr) { fmt.Println("Step timed out:", timeoutErr.StepName, timeoutErr.Timeout)}
var invokeErr *ironflow.InvokeErrorif errors.As(err, &invokeErr) { fmt.Println("Invoke failed:", invokeErr.FunctionID, invokeErr.ChildRunID)}
// Check if retryableif ironflow.IsRetryable(err) { // Will be automatically retried}Non-Retryable Errors
Section titled “Non-Retryable Errors”Mark errors as non-retryable to prevent automatic retries:
result, err := ironflow.Run(ctx, "validate", func() (any, error) { if !isValid(data) { return nil, ironflow.NewNonRetryableError("invalid data - do not retry") } return data, nil})
// Or wrap an existing errorreturn nil, ironflow.WrapNonRetryable(fmt.Errorf("permanent failure: %w", err))Sentinel Errors
Section titled “Sentinel Errors”ironflow.ErrFunctionNotFoundironflow.ErrRunNotFoundironflow.ErrInvalidSignatureironflow.ErrSignatureExpiredironflow.ErrMissingSignatureironflow.ErrTimeoutironflow.ErrValidationironflow.ErrUnauthorizedironflow.ErrEnterpriseLicenseRequiredironflow.ErrForbiddenLogging
Section titled “Logging”// Create a loggerlogger := ironflow.NewLogger(ironflow.LoggerConfig{ Level: ironflow.LogLevelDebug, // debug, info, warn, error, silent Prefix: "[my-app]",})
logger.Info("Starting...", "port", 9123)logger.Debug("Debug data", "key", "value")logger.Warn("Slow query", "duration", "500ms")logger.Error("Failed", "err", err)
// Disable loggingnoopLogger := ironflow.NewNoopLogger()
// Parse level from stringlevel := ironflow.ParseLogLevel("debug")
// Get level from IRONFLOW_LOG_LEVEL env varlevel := ironflow.GetLogLevel()The Logger interface can be implemented with any logger (slog, zap, zerolog):
type Logger interface { Debug(msg string, args ...any) Info(msg string, args ...any) Warn(msg string, args ...any) Error(msg string, args ...any)}Constants & Environment Variables
Section titled “Constants & Environment Variables”Environment Variables
Section titled “Environment Variables”| Variable | Description | Default |
|---|---|---|
IRONFLOW_SERVER_URL | Server URL | http://localhost:9123 |
IRONFLOW_SIGNING_KEY | Request signing key | - |
IRONFLOW_API_KEY | API key for authentication | - |
IRONFLOW_LOG_LEVEL | Log level (debug, info, warn, error, silent) | info |
Helper Functions
Section titled “Helper Functions”ironflow.GetServerURL() // from IRONFLOW_SERVER_URL or defaultironflow.GetWebSocketURL(baseURL) // converts http→ws, adds /ws pathironflow.GetSigningKey() // from IRONFLOW_SIGNING_KEYironflow.GetAPIKey() // from IRONFLOW_API_KEYDefault Constants
Section titled “Default Constants”| Constant | Value |
|---|---|
DefaultPort | 9123 |
DefaultHost | "localhost" |
DefaultServerURL | "http://localhost:9123" |
DefaultWebSocketURL | "ws://localhost:9123/ws" |
DefaultClientTimeout | 30s |
DefaultFunctionTimeout | 10m |
DefaultEmitSyncTimeout | 30s |
DefaultRetryMaxAttempts | 3 |
DefaultRetryInitialDelay | 1s |
DefaultRetryBackoffFactor | 2.0 |
DefaultRetryMaxDelay | 5m |
DefaultWorkerMaxConcurrentJobs | 10 |
DefaultWorkerHeartbeatInterval | 30s |
DefaultWorkerReconnectDelay | 5s |
Complete Example
Section titled “Complete Example”package main
import ( "context" "fmt" "log" "os" "os/signal" "syscall" "time"
"github.com/sahina/ironflow/sdk/go/ironflow")
type OrderData struct { OrderID string `json:"orderId"` Amount float64 `json:"amount"` Email string `json:"email"`}
var ProcessOrder = ironflow.CreateFunction(ironflow.FunctionConfig{ ID: "process-order", Name: "Process Order", Triggers: []ironflow.Trigger{{Event: "order.placed"}},}, func(ctx ironflow.Context) (any, error) { var order OrderData if err := ctx.Event.Data(&order); err != nil { return nil, err }
// Step 1: Validate _, err := ironflow.Run(ctx, "validate", func() (any, error) { if order.Amount <= 0 { return nil, ironflow.NewNonRetryableError("invalid amount") } return map[string]any{"valid": true}, nil }) if err != nil { return nil, err }
// Step 2: Sleep if err := ironflow.Sleep(ctx, "delay", 5*time.Second); err != nil { return nil, err }
// Step 3: Process result, err := ironflow.Run(ctx, "process", func() (any, error) { return map[string]any{ "orderId": order.OrderID, "status": "processed", }, nil }) if err != nil { return nil, err }
return result, nil})
func main() { // Create client and emit events client := ironflow.NewClient(ironflow.ClientConfig{ ServerURL: ironflow.GetServerURL(), })
// Register function if err := client.RegisterFunction(context.Background(), ProcessOrder); err != nil { log.Printf("Registration: %v", err) }
// Start as worker worker := ironflow.NewWorker(ironflow.WorkerConfig{ Functions: []ironflow.Function{ProcessOrder}, })
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGTERM) defer cancel()
fmt.Println("Worker starting...") if err := worker.Run(ctx); err != nil { log.Fatal(err) }}Testing
Section titled “Testing”Use NewContextForTest to unit test function handlers:
ctx := ironflow.NewContextForTest(&ironflow.PushRequest{ RunID: "test-run-1", FunctionID: "process-order", Event: ironflow.PushEvent{ ID: "evt-1", Name: "order.placed", Data: json.RawMessage(`{"orderId":"123","amount":99.99}`), },})
result, err := ProcessOrder.Handler(ctx)