Skip to content

Entity Streams

An Entity Stream is an ordered sequence of events scoped to a specific ID (e.g., user_456). Ironflow uses these streams to maintain a “source of truth” for every change in your system.

Entity Event Stream — showing append-only log with version tracking, optimistic concurrency, and schema evolution

Appending Events

Appending an event is the primary way to update an entity. The stream is automatically created on the first append.

import { createClient } from "@ironflow/node";
const ironflow = createClient({ apiKey: "..." });
const { entityVersion, eventId } = await ironflow.streams.append("order-123", {
entityType: "order",
name: "order.placed",
data: { total: 100 },
});

Optimistic Concurrency

To prevent “lost updates” when multiple workers try to update the same entity, use the expectedVersion option.

ValueBehavior
0Create Only: Succeeds only if the stream doesn’t exist yet.
NExact Version: Succeeds only if the current version is exactly N.
-1No Guard: Append regardless of version (default).
// Fetch current version (returns null if the stream has no events yet)
const info = await ironflow.streams.getInfo("order-123");
const version = info?.version ?? 0;
// Try to append with a guard
try {
await ironflow.streams.append("order-123", { ... }, {
expectedVersion: version
});
} catch (err) {
// If another write happened first the server returns ConnectRPC code
// `aborted` (HTTP 409 over the connect protocol). The thrown error
// message is "version conflict" — match on that string, or inspect
// `err.code` if you're using a typed ConnectRPC client.
if (err instanceof Error && /version conflict/i.test(err.message)) {
// re-read, rebuild, retry
} else {
throw err;
}
}

Reading Stream History

You can read the entire history of an entity to reconstruct its state.

// Forward (oldest first)
const { events } = await ironflow.streams.read("order-123", {
direction: "forward",
limit: 50,
});
// Backward (latest first)
const { events: latest } = await ironflow.streams.read("order-123", {
direction: "backward",
limit: 1,
});

Real-Time Subscriptions

You can subscribe to an entity stream to receive live updates as new events are appended.

import { ironflow } from "@ironflow/browser";
const sub = await ironflow.streams.subscribe("order-123", {
entityType: "order",
onEvent: (e) => console.log(`New event: ${e.name}`),
replay: 10, // Replay last 10 events before going live
});

Projections

If you find yourself reading streams frequently to calculate state (e.g., “Total Balance”), use Projections instead.

Projections subscribe to the global event log across many streams and are a different fold from aggregate rehydration — entity-stream snapshots accelerate aggregate load, but they do not accelerate projection rebuild. See Projections read the global event log for why.

Domain-Driven Design

Entity streams implement the Aggregate pattern from DDD — each stream is a consistency boundary with its own identity and event history. See Aggregates & Entity Streams for the full DDD mapping.