Skip to content

Python SDK

The official Python SDK for Ironflow. Provides a typed HTTP client covering all REST API endpoints for events, runs, projections, KV store, config management, entity streams, and the full management API.

Terminal window
pip install ironflow

Requires Python 3.9+. Zero external dependencies (uses only urllib and json from the standard library).

from ironflow import IronflowClient
# Create a client
client = IronflowClient(
server_url="http://localhost:9123",
api_key="ifkey_...",
)
# Emit an event (triggers any listening functions)
result = client.events_create(body={
"name": "order.placed",
"data": {"order_id": "123", "customer_id": "456"},
})
# List runs
runs = client.runs_list()
# Get a specific run
run = client.runs_get("run_abc123")
# Cancel a run
client.runs_cancel("run_abc123", body={"reason": "duplicate"})
client = IronflowClient(
server_url="http://localhost:9123", # Default
api_key="ifkey_...", # Optional, required for authenticated endpoints
timeout=30.0, # Request timeout in seconds
)

All methods follow the {group}_{action} naming pattern in snake_case.

# Emit an event — server expects "name", "data", optional "metadata"
client.events_create(body={"name": "user.created", "data": {"user_id": "123"}})
# List events
events = client.events_list()
runs = client.runs_list()
run = client.runs_get("run_id")
client.runs_cancel("run_id", body={"reason": "test"})
client.runs_list_steps("run_id")
client.runs_list_audit("run_id")
client.runs_resume(body={"run_id": "run_id"})
# Patch a step's output (POST /api/v1/steps/patch — grouped under "runs" by codegen)
client.runs_create(body={"step_id": "step_xyz", "output": {"corrected": True}, "reason": "manual fix"})
projections = client.projections_list()
projection = client.projections_get("my-projection")
status = client.projections_list_status("my-projection")
client.projections_rebuild("my-projection")
client.projections_list_rebuild("my-projection") # rebuild job status
client.projections_cancel("my-projection") # cancel in-flight rebuild
client.projections_pause("my-projection")
client.projections_resume("my-projection")
client.projections_delete("my-projection")
# Buckets
client.kv_list_buckets()
client.kv_buckets(body={"name": "my-bucket"}) # Create
client.kv_get_buckets("my-bucket")
client.kv_delete_buckets("my-bucket")
# Keys
client.kv_list_buckets_keys("my-bucket")
client.kv_get_buckets_keys("my-bucket", "my-key")
client.kv_update_buckets_keys("my-bucket", "my-key", body={"value": "data"})
client.kv_delete_buckets_keys("my-bucket", "my-key")
# Watch (streaming endpoint — Python plain GET will not iterate updates)
client.kv_list_buckets_watch("my-bucket")
client.config_list()
client.config_get("my-config")
client.config_create("my-config", body={"key": "value"}) # POST — full replacement
client.config_patch("my-config", body={"key": "updated"}) # shallow merge
client.config_delete("my-config")
client.config_list_watch("my-config") # streaming — Python plain GET will not iterate updates
client.secrets_list()
client.secrets_get("my-secret")
client.secrets_create(body={"name": "my-secret", "value": "s3cret", "description": "optional"})
client.secrets_update("my-secret", body={"value": "new-value"}) # PUT — full replacement
client.secrets_patch("my-secret", body={"description": "updated"}) # PATCH — partial update
client.secrets_delete("my-secret")
client.streams_list()
client.streams_get("entity-123")
# Append event — server requires "entity_type" and "event_name"
client.streams_events("entity-123", body={
"entity_type": "order",
"event_name": "order.updated",
"data": {"status": "shipped"},
"expected_version": 4, # optional — optimistic concurrency
"idempotency_key": "ship-1", # optional
"version": 1, # optional — event schema version
})
client.streams_list_events("entity-123")
client.streams_list_history("entity-123")
# Snapshots
client.streams_snapshots("entity-123", body={
"entity_type": "order",
"entity_version": 10,
"state": {"status": "shipped"},
})
client.streams_list_snapshots("entity-123")
client.api_keys_list()
client.api_keys_create(body={"name": "my-key"})
client.api_keys_get("ak_123")
client.api_keys_delete("ak_123")
client.api_keys_rotate("ak_123")
client.projects_list()
client.projects_create(body={"name": "my-project"})
client.projects_update("proj_123", body={"name": "renamed"})
client.projects_delete("proj_123")
client.environments_list()
client.environments_create(body={"name": "staging", "project_id": "proj_123"})
client.environments_update("env_123", body={"name": "production"})
client.environments_delete("env_123")
client.functions_list()
# Direct invoke — Python is the only SDK that exposes this REST endpoint.
client.functions_invoke("process-order", body={"data": {"order_id": "123"}})
client.schemas_list()
client.schemas_get("order.placed") # latest version
client.schemas_get_by_name("order.placed", "2") # specific version
client.schemas_create(body={
"event_name": "order.placed",
"version": 1,
"schema_json": '{"type":"object","properties":{"orderId":{"type":"string"}}}',
})
client.schemas_delete("order.placed", "1")
# Test an upcast transformation between schema versions
client.schemas_create_upcast(body={
"event_name": "order.placed",
"from_version": 1,
"to_version": 2,
"data": {"orderId": "123"},
})
# Publish to a developer topic
client.pub_sub_create(body={"topic": "notifications", "data": {"type": "ping"}})

Listing topics and topic stats are ConnectRPC-only and not exposed in the Python SDK.

# Run a SQL query against projection tables
client.sql_create(body={"query": "SELECT * FROM order_stats LIMIT 10"})
# Global audit feed
client.audit_list()
client.users_list()
client.users_get("user_123")
client.users_create(body={"email": "alice@example.com", "password": "s3cret", "roles": ["admin"]})
client.users_patch("user_123", body={"name": "Alice"})
client.users_patch_password("user_123", body={"password": "new-s3cret"})
client.users_delete("user_123")
# Organizations — full CRUD
client.orgs_list()
client.orgs_get("org_123")
client.orgs_create(body={"name": "my-org"})
client.orgs_patch("org_123", body={"name": "renamed"})
client.orgs_delete("org_123")
# Roles — full CRUD plus policy attach/detach
client.roles_list()
client.roles_get("role_123")
client.roles_create(body={"name": "editor", "org_id": "org_123"})
client.roles_patch("role_123", body={"name": "senior-editor"})
client.roles_delete("role_123")
client.roles_policies("role_123", body={"policy_id": "policy_456"}) # attach
client.roles_delete_policies("role_123", "policy_456") # detach
# Policies — full CRUD
client.policies_list()
client.policies_get("policy_456")
client.policies_create(body={
"name": "allow-emit",
"effect": "allow",
"actions": "emit:*",
"resources": "*",
})
client.policies_patch("policy_456", body={"name": "allow-all-emit"})
client.policies_delete("policy_456")
# Tenants
client.tenants_list()
client.tenants_provision(body={"name": "new-tenant"})
from ironflow.client import IronflowError
try:
run = client.runs_get("nonexistent")
except IronflowError as e:
print(f"Status: {e.status_code}") # 404
print(f"Code: {e.code}") # "NOT_FOUND"
print(f"Message: {e}") # "run not found"

The Python SDK covers 103 REST API methods across 22 endpoint groups. Run make sdk-health to see the full coverage report.

This SDK is auto-generated from the Ironflow server’s route manifest using cmd/sdk-gen. To regenerate after server changes:

Terminal window
make sdk-manifest # Extract endpoint manifest from running server
make sdk-gen-python # Regenerate Python client
make test-python # Run tests