Entity Streams
An Entity Stream is an ordered sequence of events scoped to a specific ID (e.g., user_456). Ironflow uses these streams to maintain a “source of truth” for every change in your system.
Appending Events
Appending an event is the primary way to update an entity. The stream is automatically created on the first append.
import { createClient } from "@ironflow/node";
const ironflow = createClient({ apiKey: "..." });
const { entityVersion, eventId } = await ironflow.streams.append("order-123", { entityType: "order", name: "order.placed", data: { total: 100 },});res, err := client.AppendStreamEvent(ctx, "order-123", ironflow.AppendEventInput{ EntityType: "order", Name: "order.placed", Data: map[string]any{"total": 100},})Optimistic Concurrency
To prevent “lost updates” when multiple workers try to update the same entity, use the expectedVersion option.
| Value | Behavior |
|---|---|
0 | Create Only: Succeeds only if the stream doesn’t exist yet. |
N | Exact Version: Succeeds only if the current version is exactly N. |
-1 | No Guard: Append regardless of version (default). |
// Fetch current version (returns null if the stream has no events yet)const info = await ironflow.streams.getInfo("order-123");const version = info?.version ?? 0;
// Try to append with a guardtry { await ironflow.streams.append("order-123", { ... }, { expectedVersion: version });} catch (err) { // If another write happened first the server returns ConnectRPC code // `aborted` (HTTP 409 over the connect protocol). The thrown error // message is "version conflict" — match on that string, or inspect // `err.code` if you're using a typed ConnectRPC client. if (err instanceof Error && /version conflict/i.test(err.message)) { // re-read, rebuild, retry } else { throw err; }}Reading Stream History
You can read the entire history of an entity to reconstruct its state.
// Forward (oldest first)const { events } = await ironflow.streams.read("order-123", { direction: "forward", limit: 50,});
// Backward (latest first)const { events: latest } = await ironflow.streams.read("order-123", { direction: "backward", limit: 1,});events, err := client.ReadStream(ctx, "order-123", ironflow.ReadStreamOpts{ Direction: "forward",})Real-Time Subscriptions
You can subscribe to an entity stream to receive live updates as new events are appended.
import { ironflow } from "@ironflow/browser";
const sub = await ironflow.streams.subscribe("order-123", { entityType: "order", onEvent: (e) => console.log(`New event: ${e.name}`), replay: 10, // Replay last 10 events before going live});Projections
If you find yourself reading streams frequently to calculate state (e.g., “Total Balance”), use Projections instead.
Projections subscribe to the global event log across many streams and are a different fold from aggregate rehydration — entity-stream snapshots accelerate aggregate load, but they do not accelerate projection rebuild. See Projections read the global event log for why.
Domain-Driven Design
Entity streams implement the Aggregate pattern from DDD — each stream is a consistency boundary with its own identity and event history. See Aggregates & Entity Streams for the full DDD mapping.