Skip to content

Protocols

Ironflow supports two streaming protocols for real-time event subscriptions:

ProtocolEndpointUse Case
WebSocketws://localhost:9123/wsBrowser clients, bidirectional communication
ConnectRPChttp://localhost:9123/ironflow.v1.PubSubService/*Server-to-server, gRPC-compatible clients

Both protocols support pattern subscriptions, replay, CEL filtering, and consumer groups. Manual acknowledgment is only supported via the WebSocket protocol; the ConnectRPC transport does not support manual ack and will throw an error if attempted.


Connect directly to ws://localhost:9123/ws for browser clients or custom integrations.

TypeDirectionDescription
subscribeClient → ServerRequest subscription
unsubscribeClient → ServerCancel subscription
ackClient → ServerEvent acknowledgment
subscription_resultServer → ClientSubscription confirmation
subscribedServer → ClientLegacy subscription confirmation
eventServer → ClientEvent delivery
subscription_errorServer → ClientSubscription-specific error
{
"type": "subscribe",
"subscription": {
"pattern": "system.run.>",
"options": {
"replay": 10,
"includeMetadata": true,
"filter": "data.status == \"completed\"",
"consumerGroup": "my-group",
"ackMode": "manual"
}
}
}
{
"type": "subscription_result",
"results": [
{
"pattern": "system.run.>",
"status": "ok",
"subscriptionId": "sub_abc123"
}
]
}
{
"type": "event",
"subscriptionId": "sub_abc123",
"eventId": "evt_xyz789",
"topic": "system.run.xyz.created",
"data": {
"runId": "xyz",
"functionId": "process-order",
"status": "running"
},
"meta": {
"timestamp": "2024-01-15T10:30:00Z",
"sequence": 12345
}
}

When includeMetadata: true is set in the subscribe options, the meta field is included in each event delivery. For user-emitted events, meta also contains a custom field with any metadata attached at emit time:

{
"type": "event",
"subscriptionId": "sub_abc123",
"eventId": "evt_xyz789",
"topic": "events:order.placed",
"data": { "orderId": "123" },
"meta": {
"timestamp": "2024-01-15T10:30:00Z",
"sequence": 12346,
"custom": {
"source": "checkout",
"traceId": "abc-123"
}
}
}
{
"type": "unsubscribe",
"subscriptionId": "sub_abc123"
}
{
"type": "ack",
"eventId": "evt_xyz789",
"ackType": "ack"
}
ackTypeDescription
ackSuccessfully processed
nakFailed, request redelivery
termFailed permanently, do not redeliver
{
"type": "subscription_error",
"subscriptionId": "sub_abc123",
"code": "INVALID_PATTERN",
"message": "Pattern must start with 'system.', 'events:', 'entity:', 'public.', or 'topic:'",
"retrying": false
}

ConnectRPC provides gRPC-compatible streaming over HTTP. Use the PubSubService for subscriptions.

service PubSubService {
// Emit an event (alias for Trigger with emit semantics)
rpc Emit(EmitRequest) returns (EmitResponse);
// Server-streaming subscription (auto-ack)
rpc Subscribe(SubscribeRequest) returns (stream SubscriptionEvent);
// Bidirectional streaming (manual ack)
rpc SubscribeBidirectional(stream SubscriptionAck) returns (stream SubscriptionEvent);
// Consumer group management
rpc CreateConsumerGroup(CreateConsumerGroupRequest) returns (ConsumerGroup);
rpc GetConsumerGroup(GetConsumerGroupRequest) returns (ConsumerGroup);
rpc ListConsumerGroups(ListConsumerGroupsRequest) returns (ListConsumerGroupsResponse);
rpc UpdateConsumerGroup(UpdateConsumerGroupRequest) returns (ConsumerGroup);
rpc DeleteConsumerGroup(DeleteConsumerGroupRequest) returns (google.protobuf.Empty);
// Consumer group streaming
rpc JoinConsumerGroup(JoinConsumerGroupRequest) returns (stream SubscriptionEvent);
// Developer pub/sub (does NOT trigger workflow functions)
rpc Publish(PublishRequest) returns (PublishResponse);
rpc ListTopics(ListTopicsRequest) returns (ListTopicsResponse);
rpc GetTopicStats(GetTopicStatsRequest) returns (GetTopicStatsResponse);
}

Endpoint: POST /ironflow.v1.PubSubService/Subscribe

Request:

{
"pattern": "system.run.>",
"options": {
"replay": 10,
"include_metadata": true,
"filter": "data.status == \"completed\"",
"namespace": "default",
"consumer_group": "my-group",
"ack_mode": "ACK_MODE_AUTO",
"backpressure": "BACKPRESSURE_MODE_BUFFER"
}
}

Response (streaming):

{
"subscription_id": "sub_abc123",
"event_id": "evt_xyz789",
"topic": "system.run.xyz.created",
"data": {
"runId": "xyz",
"status": "running"
},
"metadata": {
"timestamp": "2024-01-15T10:30:00.000Z",
"source": "system",
"namespace": "default"
},
"sequence": "12345",
"delivery_attempt": 1
}

When include_metadata: true is set and the event has user-defined metadata (attached at emit time), it is included in the metadata object under the custom key:

{
"subscription_id": "sub_abc123",
"event_id": "evt_xyz789",
"topic": "events:order.placed",
"data": { "orderId": "123" },
"metadata": {
"timestamp": "2024-01-15T10:30:00.000Z",
"source": "user",
"namespace": "default",
"custom": {
"traceId": "abc-123",
"source": "checkout"
}
},
"sequence": "12346",
"delivery_attempt": 1
}

Endpoint: POST /ironflow.v1.PubSubService/JoinConsumerGroup

Request:

{
"namespace": "default",
"group_name": "order-processors",
"consumer_id": "consumer-1",
"client_id": "my-app-instance"
}

Response: Same streaming format as Subscribe.

Endpoint: POST /ironflow.v1.PubSubService/SubscribeBidirectional

Ack Types (for future use):

ValueDescription
ACK_TYPE_ACKSuccessfully processed
ACK_TYPE_NAKFailed, request redelivery
ACK_TYPE_TERMFailed permanently, do not redeliver
Terminal window
# Subscribe to events (streaming response)
curl -X POST http://localhost:9123/ironflow.v1.PubSubService/Subscribe \
-H "Content-Type: application/json" \
-d '{
"pattern": "system.run.>",
"options": {
"replay": 10,
"include_metadata": true
}
}'
# Join a consumer group
curl -X POST http://localhost:9123/ironflow.v1.PubSubService/JoinConsumerGroup \
-H "Content-Type: application/json" \
-d '{
"group_name": "order-processors"
}'

Both protocols use the same error codes:

CodeDescriptionRecoverable
INVALID_PATTERNPattern syntax error or invalid namespaceNo
INVALID_NAMESPACEInvalid or unknown namespaceNo
LIMIT_EXCEEDEDToo many subscriptions or replay limit exceededNo
NATS_DISCONNECTServer lost connection to NATSYes (auto-retry)
INTERNAL_ERRORServer-side failureMaybe

ConsiderationWebSocketConnectRPC
Browser supportNativeRequires library
BidirectionalBuilt-inSeparate RPC
gRPC ecosystemNoYes
Proxy compatibilityMay need configurationStandard HTTP/2
SDK supportBrowser SDKGo SDK, Node SDK

Recommendation:

  • Use WebSocket for browser clients (via @ironflow/browser)
  • Use ConnectRPC for server-to-server communication (via Go SDK or direct HTTP)