Hive Documentation
Deterministic graph runtime for agent workflows in Swift. Same input, same output, every time — golden-testable, checkpoint-resumable, and built entirely on Swift concurrency.
1. Overview
Hive is the Swift equivalent of LangGraph — a deterministic graph runtime for building agent workflows. It executes workflows as superstep graphs where frontier nodes run concurrently, writes commit atomically, and routers schedule the next frontier.
Why Hive?
- Deterministic — BSP supersteps with lexicographic ordering. Every run produces identical event traces.
- Swift-native — Actors,
Sendable,async/await, result builders. No Python, no YAML. - Agent-ready — Tool calling, bounded agent loops, streaming tokens, fan-out/join, hybrid inference.
- Resumable — Interrupt for human approval. Checkpoint state. Resume with typed payloads.
Quick Start
swift build # Build all targets
swift test # Run all tests
swift test --filter HiveCoreTests # Single test target
swift run HiveTinyGraphExample # Run example
30-Second Example
import HiveDSL
let workflow = Workflow<MySchema> {
Node("classify") { input in
let text = try input.store.get(MySchema.text)
Effects {
Set(MySchema.category, classify(text))
UseGraphEdges()
}
}.start()
Node("respond") { _ in Effects { End() } }
Node("escalate") { _ in Effects { End() } }
Branch(from: "classify") {
Branch.case(name: "urgent", when: {
(try? $0.get(MySchema.category)) == "urgent"
}) { GoTo("escalate") }
Branch.default { GoTo("respond") }
}
}
let graph = try workflow.compile()
let runtime = HiveRuntime(graph: graph, environment: env)
2. Architecture
| Module | Dependencies | Purpose |
|---|---|---|
HiveCore | None | Schema, graph, runtime, store |
HiveDSL | HiveCore | Result-builder workflow DSL |
HiveConduit | HiveCore, Conduit | LLM provider adapter |
HiveCheckpointWax | HiveCore, Wax | Persistent checkpoints |
HiveRAGWax | HiveCore, Wax | Vector RAG persistence |
Hive | All above | Umbrella re-export |
Execution Flow
3. Schema System
HiveSchema Protocol
public protocol HiveSchema: Sendable {
associatedtype Context: Sendable = Void
associatedtype Input: Sendable = Void
associatedtype InterruptPayload: Codable & Sendable = String
associatedtype ResumePayload: Codable & Sendable = String
static var channelSpecs: [AnyHiveChannelSpec<Self>] { get }
static func inputWrites(_ input: Input, inputContext: HiveInputContext) throws -> [AnyHiveWrite<Self>]
}
Channel Scopes & Persistence
| Scope | Store | Visibility |
|---|---|---|
.global | HiveGlobalStore | Shared across all tasks |
.taskLocal | HiveTaskLocalStore | Isolated per spawned task |
| Persistence | Checkpointed? | Reset |
|---|---|---|
.checkpointed | Yes | Survives interrupt/resume |
.untracked | No | Preserved across supersteps |
.ephemeral | No | Reset each superstep |
Built-in Reducers
| Reducer | Behavior |
|---|---|
.lastWriteWins() | Replaces current with update |
.append() | Appends update to current collection |
.setUnion() | Union of current and update sets |
.dictionaryMerge(valueReducer:) | Merges dictionaries with conflict resolution |
.sum() | Numeric addition |
.min() / .max() | Comparable selection |
.binaryOp(_:) | Custom binary operator |
Complete Schema Example
enum MySchema: HiveSchema {
typealias Input = String
enum Channels {
static let messages = HiveChannelKey<MySchema, [String]>(HiveChannelID("messages"))
}
static var channelSpecs: [AnyHiveChannelSpec<MySchema>] {
[AnyHiveChannelSpec(HiveChannelSpec(
key: Channels.messages,
scope: .global,
reducer: .append(),
updatePolicy: .multi,
initial: { [] },
codec: HiveAnyCodec(HiveJSONCodec<[String]>()),
persistence: .checkpointed
))]
}
static func inputWrites(_ input: String, inputContext: HiveInputContext) throws -> [AnyHiveWrite<MySchema>] {
[AnyHiveWrite(Channels.messages, [input])]
}
}
@HiveSchema Macro
@HiveSchema
enum MySchema: HiveSchema {
@Channel(reducer: "lastWriteWins()", persistence: "untracked")
static var _answer: String = ""
@TaskLocalChannel(reducer: "append()", persistence: "checkpointed")
static var _logs: [String] = []
}
4. Store Model
HiveGlobalStore
Holds current value for every global channel. All tasks in a superstep see the same pre-commit snapshot. During commit, writes are sorted by (taskOrdinal, emissionIndex) and reduced through each channel's reducer.
HiveTaskLocalStore
Sparse overlay for task-local channels. Each spawned task carries its own instance. get() returns Optional (nil = use initial value).
HiveStoreView
Read-only merged view given to nodes via input.store. Global channels delegate to HiveGlobalStore. Task-local channels check overlay first, fall back to HiveInitialCache.
Fingerprinting
HiveTaskLocalFingerprint computes SHA-256 digests of task-local state for deduplication and caching.
5. Graph Compilation
var builder = HiveGraphBuilder<Schema>(start: [HiveNodeID("A")])
builder.addNode(HiveNodeID("A")) { input in
HiveNodeOutput(writes: [AnyHiveWrite(key, value)], next: .useGraphEdges)
}
builder.addEdge(from: HiveNodeID("A"), to: HiveNodeID("B"))
builder.addJoinEdge(parents: [HiveNodeID("W1"), HiveNodeID("W2")], target: HiveNodeID("Gate"))
builder.addRouter(from: HiveNodeID("A")) { view in .nodes([HiveNodeID("B")]) }
let graph = try builder.compile()
compile() validates: no duplicate nodes, all edge targets exist, no static-edge cycles (router cycles allowed), and start nodes exist. Produces CompiledHiveGraph with SHA-256 versioning and Mermaid export.
6. Runtime Engine
HiveRuntime Actor
public actor HiveRuntime<Schema: HiveSchema>: Sendable {
public func run(threadID:, input:, options:) -> HiveRunHandle
public func resume(threadID:, interruptID:, payload:, options:) -> HiveRunHandle
public func fork(threadID:, fromCheckpointID:, into:, options:) -> HiveRunHandle
public func getLatestStore(threadID:) -> HiveGlobalStore?
}
BSP Superstep Model
| Phase | Action |
|---|---|
| 1. Execute | All frontier nodes run concurrently in TaskGroup |
| 2. Commit | Writes sorted, reduced, applied atomically |
| 3. Schedule | Routers run on post-commit state; next frontier assembled |
Determinism Guarantees
- Lexicographic ordering of nodes by
HiveNodeID - Deterministic task/interrupt/checkpoint IDs via SHA-256
- Atomic superstep commits with stable write ordering
- Sorted channel iteration
- Optional deterministic token streaming (buffer per-task, replay in order)
Event Streaming
| Category | Events |
|---|---|
| Run | runStarted, runFinished, runInterrupted, runCancelled |
| Step | stepStarted, stepFinished |
| Task | taskStarted, taskFinished, taskFailed |
| Writes | writeApplied |
| Model | modelInvocationStarted, modelToken, modelInvocationFinished |
7. HiveDSL
DSL Grammar
Workflow<Schema> {
Node("id") { input -> HiveNodeOutput }.start()
ModelTurn("id", model:, messages:).tools(.environment).start()
Edge("from", to: "to")
Join(parents: ["a", "b"], to: "target")
Chain { .start("A"); .then("B") }
Branch(from: "node") {
Branch.case(name:, when:) { GoTo("x") }
Branch.default { End() }
}
FanOut(from: "src", to: ["a","b"], joinTo: "merge")
}
// Inside nodes:
Effects {
Set(key, value)
Append(key, elements: [...])
GoTo("node")
UseGraphEdges()
End()
Interrupt(payload)
SpawnEach(items, node: "worker") { item in localStore }
}
Effects Primitives
| Primitive | Purpose |
|---|---|
Set(key, value) | Write a value to a channel |
Append(key, elements:) | Append to collection channel |
GoTo("node") | Route to specific node |
UseGraphEdges() | Follow static edges |
End() | Terminate workflow |
Interrupt(payload) | Pause, save checkpoint |
SpawnEach(items, node:) | Fan-out parallel tasks |
Workflow Patching
var patch = WorkflowPatch<Schema>()
patch.replaceNode("B") { input in Effects { End() } }
patch.insertProbe("monitor", between: "A", and: "B") { input in
Effects { Set(probeKey, "observed"); UseGraphEdges() }
}
let result = try patch.apply(to: graph)
8. Checkpointing
Checkpoints capture: global store, frontier tasks, join barriers, superstep index, version counters, and pending interrupts.
Checkpoint Policy
| Policy | Behavior |
|---|---|
.disabled | No checkpoints |
.everyStep | Save after every superstep |
.every(steps: N) | Save every N steps |
.onInterrupt | Save only when interrupted |
Store Protocol
public protocol HiveCheckpointStore: Sendable {
func save(_ checkpoint: HiveCheckpoint) async throws
func loadLatest(threadID: HiveThreadID) async throws -> HiveCheckpoint?
}
9. Interrupt/Resume Protocol
Flow
- Node returns
interrupt: HiveInterruptRequest(payload: ...) - Runtime selects interrupt from lowest-ordinal task
- Checkpoint saved
- Outcome:
.interrupted(interruption:)
- Caller invokes
runtime.resume(threadID:, interruptID:, payload:) - Checkpoint loaded, interrupt ID verified
HiveResumedelivered to nodes viainput.run.resume- Execution continues from saved frontier
// Interrupt
Node("review") { _ in Effects { Interrupt("Approve?") } }
// Resume
let resumed = await runtime.resume(
threadID: tid,
interruptID: interruption.interrupt.id,
payload: "approved",
options: opts
)
10. Hybrid Inference
Model Client
public protocol HiveModelClient: Sendable {
func complete(_ request: HiveChatRequest) async throws -> HiveChatResponse
func stream(_ request: HiveChatRequest) -> AsyncThrowingStream<HiveChatStreamChunk, Error>
}
ReAct Loop
HiveModelToolLoop implements bounded ReAct: send to model → if tool calls, execute tools → loop. Configurable maxModelInvocations and toolCallOrder for determinism.
ModelTurn("chat", model: "gpt-4", messages: [...])
.tools(.environment)
.agentLoop(.init(maxModelInvocations: 8))
.writes(to: answerKey)
.start()
11. Memory System
public protocol HiveMemoryStore: Sendable {
func remember(namespace:, key:, text:, metadata:) async throws
func get(namespace:, key:) async throws -> HiveMemoryItem?
func recall(namespace:, query:, limit:) async throws -> [HiveMemoryItem]
func delete(namespace:, key:) async throws
}
InMemoryHiveMemoryStore provides BM25-based recall via HiveInvertedIndex for testing and development.
12. Adapter Modules
HiveConduit
Bridges Conduit LLM providers to HiveModelClient. Handles message conversion, tool definition mapping, and streaming.
HiveCheckpointWax
Wax-backed persistent checkpoints. JSON-encodes checkpoints as Wax frames. Supports history browsing via HiveCheckpointQueryableStore.
HiveRAGWax
Wax-backed HiveMemoryStore. Stores text as frames, performs keyword matching for recall.
13. Data Structures
HiveBitset
Fixed-size dynamic bitset backed by [UInt64]. O(1) insert/contains. Used for join barrier tracking.
HiveInvertedIndex
BM25-style inverted index (k1=1.2, b=0.75). Supports upsert, remove, and ranked query. Used by InMemoryHiveMemoryStore.
14. Error Handling
| Category | Errors |
|---|---|
| Store | unknownChannelID, scopeMismatch, channelTypeMismatch, storeValueMissing |
| Writes | updatePolicyViolation, taskLocalWriteNotAllowed |
| Checkpoint | checkpointStoreMissing, checkpointVersionMismatch, checkpointDecodeFailed |
| Interrupt | interruptPending, noCheckpointToResume, resumeInterruptMismatch |
| Model | modelClientMissing, modelStreamInvalid, toolRegistryMissing |
| Graph | duplicateChannelID, staticGraphCycleDetected |
15. Testing Guide
Framework
Swift Testing: @Test, #expect, #require. Tests are async throws free functions.
Pattern
- Define inline
HiveSchemaenum with minimal channels - Build graph imperatively via
HiveGraphBuilder - Collect events from
AsyncThrowingStream - Assert exact event ordering (not just presence)
@Test("Two parallel writers produce deterministic order")
func twoWriters() async throws {
enum Schema: HiveSchema {
static var channelSpecs: [AnyHiveChannelSpec<Schema>] {
// ... inline schema
}
}
var builder = HiveGraphBuilder<Schema>(start: [HiveNodeID("A"), HiveNodeID("B")])
// ... add nodes, compile, run
guard case let .finished(output, _) = outcome else { return }
#expect(try store.get(valuesKey) == [1, 2, 3])
}
16. Examples
Hello World
let graph = try Workflow<Schema> {
Node("greet") { _ in
Effects { Set(messageKey, "Hello from Hive!"); End() }
}.start()
}.compile()
Branching
Branch(from: "check") {
Branch.case(name: "high", when: { ($0.get(scoreKey) ?? 0) >= 70 }) {
GoTo("pass")
}
Branch.default { GoTo("fail") }
}
Fan-Out + Join + Interrupt
Node("dispatch") { _ in
Effects {
SpawnEach(["a", "b", "c"], node: "worker") { item in
var local = HiveTaskLocalStore<Schema>.empty
try! local.set(itemKey, item)
return local
}
End()
}
}.start()
Join(parents: ["worker"], to: "review")
Node("review") { _ in Effects { Interrupt("Approve?") } }
Agent Loop
ModelTurn("chat", model: "gpt-4", messages: [
HiveChatMessage(id: "u1", role: .user, content: "Weather in SF?")
])
.tools(.environment)
.agentLoop(.init(maxModelInvocations: 8))
.writes(to: answerKey)
.start()
TinyGraph Executable
swift run HiveTinyGraphExample
Demonstrates fan-out, join barriers, interrupt/resume, and checkpoint persistence. See Sources/Hive/Examples/TinyGraph/main.swift.
Generated from the Hive codebase. See HIVE_SPEC.md for the normative specification.