Skip to content

Aggregates & Entity Streams

Entity streams are Ironflow’s implementation of the Aggregate pattern. Each entity stream is a consistency boundary — an ordered log of events scoped to a single identity, with optimistic concurrency protecting its invariants.

DDD ConceptIronflow Implementation
Aggregate root identityEntity stream ID (e.g., order-123)
Applying a domain eventappendToStream()
Optimistic concurrencyexpectedVersion parameter
Aggregate event historystreams.read() — full ordered log
Consistency boundaryOne stream per entity, version-guarded writes

When you append the first event to an entity stream, the aggregate is born. Use expectedVersion: 0 to ensure you’re creating a new aggregate, not accidentally appending to an existing one:

import { createClient } from "@ironflow/node";
const ironflow = createClient({ apiKey: process.env.IRONFLOW_API_KEY });
// Create a new Order aggregate
const { entityVersion } = await ironflow.streams.append("order-123", {
entityType: "order",
name: "order.placed",
data: {
customerId: "cust-456",
items: [{ sku: "WIDGET-1", qty: 2, price: 29.99 }],
total: 59.98,
},
}, { expectedVersion: 0 }); // 0 = must be a new stream

In DDD, aggregates enforce business rules (invariants) before accepting changes. With Ironflow, the pattern is: read current state → validate → append.

// Read the aggregate's current state
const { events } = await ironflow.streams.read("order-123");
const info = await ironflow.streams.getInfo("order-123");
const version = info?.version ?? 0;
// Rebuild state from events
const order = events.reduce((state, event) => {
switch (event.name) {
case "order.placed":
return { ...event.data, status: "placed" };
case "order.paid":
return { ...state, status: "paid" };
case "order.cancelled":
return { ...state, status: "cancelled" };
default:
return state;
}
}, {});
// Enforce invariant: can't ship a cancelled order
if (order.status === "cancelled") {
throw new Error("Cannot ship a cancelled order");
}
// Append with version guard — fails if another write happened
await ironflow.streams.append("order-123", {
entityType: "order",
name: "order.shipped",
data: { trackingNumber: "1Z999AA10123456784" },
}, { expectedVersion: version });

The expectedVersion guard is critical — it implements optimistic concurrency, ensuring no other process modified the aggregate between your read and write. If the version doesn’t match, Ironflow returns an ABORTED error (ConnectRPC/gRPC code 10), and you retry with fresh state.

Each entity stream represents exactly one aggregate instance. Don’t mix different aggregate types in the same stream:

// ✓ Good — each aggregate has its own stream
await ironflow.streams.append("order-123", { entityType: "order", ... });
await ironflow.streams.append("customer-456", { entityType: "customer", ... });
// ✗ Bad — don't mix aggregates in one stream
await ironflow.streams.append("mixed-stream", { entityType: "order", ... });
await ironflow.streams.append("mixed-stream", { entityType: "customer", ... });
  • Entity streams are aggregates — each stream is a consistency boundary with its own identity and event history.
  • Use expectedVersion to enforce invariants — optimistic concurrency prevents conflicting writes.
  • Read → validate → append — the standard pattern for aggregate commands.

For the full Entity Streams API (appending, reading, subscriptions, concurrency options), see the Entity Streams guide.