Aggregates & Entity Streams
Entity streams are Ironflow’s implementation of the Aggregate pattern. Each entity stream is a consistency boundary — an ordered log of events scoped to a single identity, with optimistic concurrency protecting its invariants.
The Mapping
Section titled “The Mapping”| DDD Concept | Ironflow Implementation |
|---|---|
| Aggregate root identity | Entity stream ID (e.g., order-123) |
| Applying a domain event | appendToStream() |
| Optimistic concurrency | expectedVersion parameter |
| Aggregate event history | streams.read() — full ordered log |
| Consistency boundary | One stream per entity, version-guarded writes |
Creating an Aggregate
Section titled “Creating an Aggregate”When you append the first event to an entity stream, the aggregate is born. Use expectedVersion: 0 to ensure you’re creating a new aggregate, not accidentally appending to an existing one:
import { createClient } from "@ironflow/node";
const ironflow = createClient({ apiKey: process.env.IRONFLOW_API_KEY });
// Create a new Order aggregateconst { entityVersion } = await ironflow.streams.append("order-123", { entityType: "order", name: "order.placed", data: { customerId: "cust-456", items: [{ sku: "WIDGET-1", qty: 2, price: 29.99 }], total: 59.98, },}, { expectedVersion: 0 }); // 0 = must be a new streamEnforcing Invariants
Section titled “Enforcing Invariants”In DDD, aggregates enforce business rules (invariants) before accepting changes. With Ironflow, the pattern is: read current state → validate → append.
// Read the aggregate's current stateconst { events } = await ironflow.streams.read("order-123");const info = await ironflow.streams.getInfo("order-123");const version = info?.version ?? 0;
// Rebuild state from eventsconst order = events.reduce((state, event) => { switch (event.name) { case "order.placed": return { ...event.data, status: "placed" }; case "order.paid": return { ...state, status: "paid" }; case "order.cancelled": return { ...state, status: "cancelled" }; default: return state; }}, {});
// Enforce invariant: can't ship a cancelled orderif (order.status === "cancelled") { throw new Error("Cannot ship a cancelled order");}
// Append with version guard — fails if another write happenedawait ironflow.streams.append("order-123", { entityType: "order", name: "order.shipped", data: { trackingNumber: "1Z999AA10123456784" },}, { expectedVersion: version });The expectedVersion guard is critical — it implements optimistic concurrency, ensuring no other process modified the aggregate between your read and write. If the version doesn’t match, Ironflow returns an ABORTED error (ConnectRPC/gRPC code 10), and you retry with fresh state.
One Stream Per Aggregate
Section titled “One Stream Per Aggregate”Each entity stream represents exactly one aggregate instance. Don’t mix different aggregate types in the same stream:
// ✓ Good — each aggregate has its own streamawait ironflow.streams.append("order-123", { entityType: "order", ... });await ironflow.streams.append("customer-456", { entityType: "customer", ... });
// ✗ Bad — don't mix aggregates in one streamawait ironflow.streams.append("mixed-stream", { entityType: "order", ... });await ironflow.streams.append("mixed-stream", { entityType: "customer", ... });Key Takeaways
Section titled “Key Takeaways”- Entity streams are aggregates — each stream is a consistency boundary with its own identity and event history.
- Use
expectedVersionto enforce invariants — optimistic concurrency prevents conflicting writes. - Read → validate → append — the standard pattern for aggregate commands.
For the full Entity Streams API (appending, reading, subscriptions, concurrency options), see the Entity Streams guide.