Protocols
Ironflow supports two streaming protocols for real-time event subscriptions:
| Protocol | Endpoint | Use Case |
|---|---|---|
| WebSocket | ws://localhost:9123/ws | Browser clients, bidirectional communication |
| ConnectRPC | http://localhost:9123/ironflow.v1.PubSubService/* | Server-to-server, gRPC-compatible clients |
Both protocols support pattern subscriptions, replay, CEL filtering, and consumer groups. Manual acknowledgment is only supported via the WebSocket protocol; the ConnectRPC transport does not support manual ack and will throw an error if attempted.
WebSocket Protocol
Section titled “WebSocket Protocol”Connect directly to ws://localhost:9123/ws for browser clients or custom integrations.
Message Types
Section titled “Message Types”| Type | Direction | Description |
|---|---|---|
subscribe | Client → Server | Request subscription |
unsubscribe | Client → Server | Cancel subscription |
ack | Client → Server | Event acknowledgment |
subscription_result | Server → Client | Subscription confirmation |
subscribed | Server → Client | Legacy subscription confirmation |
event | Server → Client | Event delivery |
subscription_error | Server → Client | Subscription-specific error |
Subscribe Request
Section titled “Subscribe Request”{ "type": "subscribe", "subscription": { "pattern": "system.run.>", "options": { "replay": 10, "includeMetadata": true, "filter": "data.status == \"completed\"", "consumerGroup": "my-group", "ackMode": "manual" } }}Subscription Result
Section titled “Subscription Result”{ "type": "subscription_result", "results": [ { "pattern": "system.run.>", "status": "ok", "subscriptionId": "sub_abc123" } ]}Event Delivery
Section titled “Event Delivery”{ "type": "event", "subscriptionId": "sub_abc123", "eventId": "evt_xyz789", "topic": "system.run.xyz.created", "data": { "runId": "xyz", "functionId": "process-order", "status": "running" }, "meta": { "timestamp": "2024-01-15T10:30:00Z", "sequence": 12345 }}When includeMetadata: true is set in the subscribe options, the meta field is included in each event delivery. For user-emitted events, meta also contains a custom field with any metadata attached at emit time:
{ "type": "event", "subscriptionId": "sub_abc123", "eventId": "evt_xyz789", "topic": "events:order.placed", "data": { "orderId": "123" }, "meta": { "timestamp": "2024-01-15T10:30:00Z", "sequence": 12346, "custom": { "source": "checkout", "traceId": "abc-123" } }}Unsubscribe Request
Section titled “Unsubscribe Request”{ "type": "unsubscribe", "subscriptionId": "sub_abc123"}Acknowledgment (Manual Ack Mode)
Section titled “Acknowledgment (Manual Ack Mode)”{ "type": "ack", "eventId": "evt_xyz789", "ackType": "ack"}ackType | Description |
|---|---|
ack | Successfully processed |
nak | Failed, request redelivery |
term | Failed permanently, do not redeliver |
Error Response
Section titled “Error Response”{ "type": "subscription_error", "subscriptionId": "sub_abc123", "code": "INVALID_PATTERN", "message": "Pattern must start with 'system.', 'events:', 'entity:', 'public.', or 'topic:'", "retrying": false}ConnectRPC Protocol
Section titled “ConnectRPC Protocol”ConnectRPC provides gRPC-compatible streaming over HTTP. Use the PubSubService for subscriptions.
Service Definition
Section titled “Service Definition”service PubSubService { // Emit an event (alias for Trigger with emit semantics) rpc Emit(EmitRequest) returns (EmitResponse);
// Server-streaming subscription (auto-ack) rpc Subscribe(SubscribeRequest) returns (stream SubscriptionEvent);
// Bidirectional streaming (manual ack) rpc SubscribeBidirectional(stream SubscriptionAck) returns (stream SubscriptionEvent);
// Consumer group management rpc CreateConsumerGroup(CreateConsumerGroupRequest) returns (ConsumerGroup); rpc GetConsumerGroup(GetConsumerGroupRequest) returns (ConsumerGroup); rpc ListConsumerGroups(ListConsumerGroupsRequest) returns (ListConsumerGroupsResponse); rpc UpdateConsumerGroup(UpdateConsumerGroupRequest) returns (ConsumerGroup); rpc DeleteConsumerGroup(DeleteConsumerGroupRequest) returns (google.protobuf.Empty);
// Consumer group streaming rpc JoinConsumerGroup(JoinConsumerGroupRequest) returns (stream SubscriptionEvent);
// Developer pub/sub (does NOT trigger workflow functions) rpc Publish(PublishRequest) returns (PublishResponse); rpc ListTopics(ListTopicsRequest) returns (ListTopicsResponse); rpc GetTopicStats(GetTopicStatsRequest) returns (GetTopicStatsResponse);}Subscribe Request
Section titled “Subscribe Request”Endpoint: POST /ironflow.v1.PubSubService/Subscribe
Request:
{ "pattern": "system.run.>", "options": { "replay": 10, "include_metadata": true, "filter": "data.status == \"completed\"", "namespace": "default", "consumer_group": "my-group", "ack_mode": "ACK_MODE_AUTO", "backpressure": "BACKPRESSURE_MODE_BUFFER" }}Response (streaming):
{ "subscription_id": "sub_abc123", "event_id": "evt_xyz789", "topic": "system.run.xyz.created", "data": { "runId": "xyz", "status": "running" }, "metadata": { "timestamp": "2024-01-15T10:30:00.000Z", "source": "system", "namespace": "default" }, "sequence": "12345", "delivery_attempt": 1}When include_metadata: true is set and the event has user-defined metadata (attached at emit time), it is included in the metadata object under the custom key:
{ "subscription_id": "sub_abc123", "event_id": "evt_xyz789", "topic": "events:order.placed", "data": { "orderId": "123" }, "metadata": { "timestamp": "2024-01-15T10:30:00.000Z", "source": "user", "namespace": "default", "custom": { "traceId": "abc-123", "source": "checkout" } }, "sequence": "12346", "delivery_attempt": 1}Join Consumer Group
Section titled “Join Consumer Group”Endpoint: POST /ironflow.v1.PubSubService/JoinConsumerGroup
Request:
{ "namespace": "default", "group_name": "order-processors", "consumer_id": "consumer-1", "client_id": "my-app-instance"}Response: Same streaming format as Subscribe.
Bidirectional Streaming (Manual Ack)
Section titled “Bidirectional Streaming (Manual Ack)”Endpoint: POST /ironflow.v1.PubSubService/SubscribeBidirectional
Ack Types (for future use):
| Value | Description |
|---|---|
ACK_TYPE_ACK | Successfully processed |
ACK_TYPE_NAK | Failed, request redelivery |
ACK_TYPE_TERM | Failed permanently, do not redeliver |
Using with cURL
Section titled “Using with cURL”# Subscribe to events (streaming response)curl -X POST http://localhost:9123/ironflow.v1.PubSubService/Subscribe \ -H "Content-Type: application/json" \ -d '{ "pattern": "system.run.>", "options": { "replay": 10, "include_metadata": true } }'
# Join a consumer groupcurl -X POST http://localhost:9123/ironflow.v1.PubSubService/JoinConsumerGroup \ -H "Content-Type: application/json" \ -d '{ "group_name": "order-processors" }'Error Codes
Section titled “Error Codes”Both protocols use the same error codes:
| Code | Description | Recoverable |
|---|---|---|
INVALID_PATTERN | Pattern syntax error or invalid namespace | No |
INVALID_NAMESPACE | Invalid or unknown namespace | No |
LIMIT_EXCEEDED | Too many subscriptions or replay limit exceeded | No |
NATS_DISCONNECT | Server lost connection to NATS | Yes (auto-retry) |
INTERNAL_ERROR | Server-side failure | Maybe |
Choosing a Protocol
Section titled “Choosing a Protocol”| Consideration | WebSocket | ConnectRPC |
|---|---|---|
| Browser support | Native | Requires library |
| Bidirectional | Built-in | Separate RPC |
| gRPC ecosystem | No | Yes |
| Proxy compatibility | May need configuration | Standard HTTP/2 |
| SDK support | Browser SDK | Go SDK, Node SDK |
Recommendation:
- Use WebSocket for browser clients (via
@ironflow/browser) - Use ConnectRPC for server-to-server communication (via Go SDK or direct HTTP)