Swift 6.2 iOS 26+ macOS 26+ MIT License

Hive Documentation

Deterministic graph runtime for agent workflows in Swift. Same input, same output, every time — golden-testable, checkpoint-resumable, and built entirely on Swift concurrency.

1. Overview

Hive is the Swift equivalent of LangGraph — a deterministic graph runtime for building agent workflows. It executes workflows as superstep graphs where frontier nodes run concurrently, writes commit atomically, and routers schedule the next frontier.

Why Hive?

Quick Start

swift build                              # Build all targets
swift test                               # Run all tests
swift test --filter HiveCoreTests        # Single test target
swift run HiveTinyGraphExample           # Run example

30-Second Example

import HiveDSL

let workflow = Workflow<MySchema> {
    Node("classify") { input in
        let text = try input.store.get(MySchema.text)
        Effects {
            Set(MySchema.category, classify(text))
            UseGraphEdges()
        }
    }.start()

    Node("respond") { _ in Effects { End() } }
    Node("escalate") { _ in Effects { End() } }

    Branch(from: "classify") {
        Branch.case(name: "urgent", when: {
            (try? $0.get(MySchema.category)) == "urgent"
        }) { GoTo("escalate") }
        Branch.default { GoTo("respond") }
    }
}

let graph = try workflow.compile()
let runtime = HiveRuntime(graph: graph, environment: env)

2. Architecture

HiveCore (zero external deps — pure Swift) ├── HiveDSL result-builder workflow DSL ├── HiveConduit Conduit model client adapter ├── HiveCheckpointWax Wax-backed checkpoint store └── HiveRAGWax Wax-backed RAG snippets Hive (umbrella — re-exports Core + DSL + Conduit + CheckpointWax) HiveMacros @HiveSchema / @Channel / @WorkflowBlueprint
ModuleDependenciesPurpose
HiveCoreNoneSchema, graph, runtime, store
HiveDSLHiveCoreResult-builder workflow DSL
HiveConduitHiveCore, ConduitLLM provider adapter
HiveCheckpointWaxHiveCore, WaxPersistent checkpoints
HiveRAGWaxHiveCore, WaxVector RAG persistence
HiveAll aboveUmbrella re-export

Execution Flow

Schema defines channels → Graph compiled from DSL/builder → Runtime executes supersteps: 1. Frontier nodes execute concurrently (lexicographic order) 2. Writes collected, reduced, committed atomically 3. Routers run on fresh post-commit state 4. Next frontier scheduled 5. Repeat until End() or Interrupt()

3. Schema System

HiveSchema Protocol

public protocol HiveSchema: Sendable {
    associatedtype Context: Sendable = Void
    associatedtype Input: Sendable = Void
    associatedtype InterruptPayload: Codable & Sendable = String
    associatedtype ResumePayload: Codable & Sendable = String

    static var channelSpecs: [AnyHiveChannelSpec<Self>] { get }
    static func inputWrites(_ input: Input, inputContext: HiveInputContext) throws -> [AnyHiveWrite<Self>]
}

Channel Scopes & Persistence

ScopeStoreVisibility
.globalHiveGlobalStoreShared across all tasks
.taskLocalHiveTaskLocalStoreIsolated per spawned task
PersistenceCheckpointed?Reset
.checkpointedYesSurvives interrupt/resume
.untrackedNoPreserved across supersteps
.ephemeralNoReset each superstep

Built-in Reducers

ReducerBehavior
.lastWriteWins()Replaces current with update
.append()Appends update to current collection
.setUnion()Union of current and update sets
.dictionaryMerge(valueReducer:)Merges dictionaries with conflict resolution
.sum()Numeric addition
.min() / .max()Comparable selection
.binaryOp(_:)Custom binary operator

Complete Schema Example

enum MySchema: HiveSchema {
    typealias Input = String

    enum Channels {
        static let messages = HiveChannelKey<MySchema, [String]>(HiveChannelID("messages"))
    }

    static var channelSpecs: [AnyHiveChannelSpec<MySchema>] {
        [AnyHiveChannelSpec(HiveChannelSpec(
            key: Channels.messages,
            scope: .global,
            reducer: .append(),
            updatePolicy: .multi,
            initial: { [] },
            codec: HiveAnyCodec(HiveJSONCodec<[String]>()),
            persistence: .checkpointed
        ))]
    }

    static func inputWrites(_ input: String, inputContext: HiveInputContext) throws -> [AnyHiveWrite<MySchema>] {
        [AnyHiveWrite(Channels.messages, [input])]
    }
}

@HiveSchema Macro

@HiveSchema
enum MySchema: HiveSchema {
    @Channel(reducer: "lastWriteWins()", persistence: "untracked")
    static var _answer: String = ""

    @TaskLocalChannel(reducer: "append()", persistence: "checkpointed")
    static var _logs: [String] = []
}

4. Store Model

HiveSchemaRegistry (channel specs) | HiveInitialCache (defaults) / \ HiveGlobalStore HiveTaskLocalStore (global) (task-local) \ / HiveStoreView (read-only merged — given to nodes)

HiveGlobalStore

Holds current value for every global channel. All tasks in a superstep see the same pre-commit snapshot. During commit, writes are sorted by (taskOrdinal, emissionIndex) and reduced through each channel's reducer.

HiveTaskLocalStore

Sparse overlay for task-local channels. Each spawned task carries its own instance. get() returns Optional (nil = use initial value).

HiveStoreView

Read-only merged view given to nodes via input.store. Global channels delegate to HiveGlobalStore. Task-local channels check overlay first, fall back to HiveInitialCache.

Fingerprinting

HiveTaskLocalFingerprint computes SHA-256 digests of task-local state for deduplication and caching.

5. Graph Compilation

var builder = HiveGraphBuilder<Schema>(start: [HiveNodeID("A")])

builder.addNode(HiveNodeID("A")) { input in
    HiveNodeOutput(writes: [AnyHiveWrite(key, value)], next: .useGraphEdges)
}
builder.addEdge(from: HiveNodeID("A"), to: HiveNodeID("B"))
builder.addJoinEdge(parents: [HiveNodeID("W1"), HiveNodeID("W2")], target: HiveNodeID("Gate"))
builder.addRouter(from: HiveNodeID("A")) { view in .nodes([HiveNodeID("B")]) }

let graph = try builder.compile()

compile() validates: no duplicate nodes, all edge targets exist, no static-edge cycles (router cycles allowed), and start nodes exist. Produces CompiledHiveGraph with SHA-256 versioning and Mermaid export.

6. Runtime Engine

HiveRuntime Actor

public actor HiveRuntime<Schema: HiveSchema>: Sendable {
    public func run(threadID:, input:, options:) -> HiveRunHandle
    public func resume(threadID:, interruptID:, payload:, options:) -> HiveRunHandle
    public func fork(threadID:, fromCheckpointID:, into:, options:) -> HiveRunHandle
    public func getLatestStore(threadID:) -> HiveGlobalStore?
}

BSP Superstep Model

PhaseAction
1. ExecuteAll frontier nodes run concurrently in TaskGroup
2. CommitWrites sorted, reduced, applied atomically
3. ScheduleRouters run on post-commit state; next frontier assembled

Determinism Guarantees

Event Streaming

CategoryEvents
RunrunStarted, runFinished, runInterrupted, runCancelled
StepstepStarted, stepFinished
TasktaskStarted, taskFinished, taskFailed
WriteswriteApplied
ModelmodelInvocationStarted, modelToken, modelInvocationFinished

7. HiveDSL

DSL Grammar

Workflow<Schema> {
    Node("id") { input -> HiveNodeOutput }.start()
    ModelTurn("id", model:, messages:).tools(.environment).start()

    Edge("from", to: "to")
    Join(parents: ["a", "b"], to: "target")
    Chain { .start("A"); .then("B") }
    Branch(from: "node") {
        Branch.case(name:, when:) { GoTo("x") }
        Branch.default { End() }
    }
    FanOut(from: "src", to: ["a","b"], joinTo: "merge")
}

// Inside nodes:
Effects {
    Set(key, value)
    Append(key, elements: [...])
    GoTo("node")
    UseGraphEdges()
    End()
    Interrupt(payload)
    SpawnEach(items, node: "worker") { item in localStore }
}

Effects Primitives

PrimitivePurpose
Set(key, value)Write a value to a channel
Append(key, elements:)Append to collection channel
GoTo("node")Route to specific node
UseGraphEdges()Follow static edges
End()Terminate workflow
Interrupt(payload)Pause, save checkpoint
SpawnEach(items, node:)Fan-out parallel tasks

Workflow Patching

var patch = WorkflowPatch<Schema>()
patch.replaceNode("B") { input in Effects { End() } }
patch.insertProbe("monitor", between: "A", and: "B") { input in
    Effects { Set(probeKey, "observed"); UseGraphEdges() }
}
let result = try patch.apply(to: graph)

8. Checkpointing

Checkpoints capture: global store, frontier tasks, join barriers, superstep index, version counters, and pending interrupts.

Checkpoint Policy

PolicyBehavior
.disabledNo checkpoints
.everyStepSave after every superstep
.every(steps: N)Save every N steps
.onInterruptSave only when interrupted

Store Protocol

public protocol HiveCheckpointStore: Sendable {
    func save(_ checkpoint: HiveCheckpoint) async throws
    func loadLatest(threadID: HiveThreadID) async throws -> HiveCheckpoint?
}

9. Interrupt/Resume Protocol

Flow

  1. Node returns interrupt: HiveInterruptRequest(payload: ...)
  2. Runtime selects interrupt from lowest-ordinal task
  3. Checkpoint saved
  4. Outcome: .interrupted(interruption:)
  1. Caller invokes runtime.resume(threadID:, interruptID:, payload:)
  2. Checkpoint loaded, interrupt ID verified
  3. HiveResume delivered to nodes via input.run.resume
  4. Execution continues from saved frontier
// Interrupt
Node("review") { _ in Effects { Interrupt("Approve?") } }

// Resume
let resumed = await runtime.resume(
    threadID: tid,
    interruptID: interruption.interrupt.id,
    payload: "approved",
    options: opts
)

10. Hybrid Inference

Model Client

public protocol HiveModelClient: Sendable {
    func complete(_ request: HiveChatRequest) async throws -> HiveChatResponse
    func stream(_ request: HiveChatRequest) -> AsyncThrowingStream<HiveChatStreamChunk, Error>
}

ReAct Loop

HiveModelToolLoop implements bounded ReAct: send to model → if tool calls, execute tools → loop. Configurable maxModelInvocations and toolCallOrder for determinism.

ModelTurn("chat", model: "gpt-4", messages: [...])
    .tools(.environment)
    .agentLoop(.init(maxModelInvocations: 8))
    .writes(to: answerKey)
    .start()

11. Memory System

public protocol HiveMemoryStore: Sendable {
    func remember(namespace:, key:, text:, metadata:) async throws
    func get(namespace:, key:) async throws -> HiveMemoryItem?
    func recall(namespace:, query:, limit:) async throws -> [HiveMemoryItem]
    func delete(namespace:, key:) async throws
}

InMemoryHiveMemoryStore provides BM25-based recall via HiveInvertedIndex for testing and development.

12. Adapter Modules

HiveConduit

Bridges Conduit LLM providers to HiveModelClient. Handles message conversion, tool definition mapping, and streaming.

HiveCheckpointWax

Wax-backed persistent checkpoints. JSON-encodes checkpoints as Wax frames. Supports history browsing via HiveCheckpointQueryableStore.

HiveRAGWax

Wax-backed HiveMemoryStore. Stores text as frames, performs keyword matching for recall.

13. Data Structures

HiveBitset

Fixed-size dynamic bitset backed by [UInt64]. O(1) insert/contains. Used for join barrier tracking.

HiveInvertedIndex

BM25-style inverted index (k1=1.2, b=0.75). Supports upsert, remove, and ranked query. Used by InMemoryHiveMemoryStore.

14. Error Handling

CategoryErrors
StoreunknownChannelID, scopeMismatch, channelTypeMismatch, storeValueMissing
WritesupdatePolicyViolation, taskLocalWriteNotAllowed
CheckpointcheckpointStoreMissing, checkpointVersionMismatch, checkpointDecodeFailed
InterruptinterruptPending, noCheckpointToResume, resumeInterruptMismatch
ModelmodelClientMissing, modelStreamInvalid, toolRegistryMissing
GraphduplicateChannelID, staticGraphCycleDetected

15. Testing Guide

Framework

Swift Testing: @Test, #expect, #require. Tests are async throws free functions.

Pattern

  1. Define inline HiveSchema enum with minimal channels
  2. Build graph imperatively via HiveGraphBuilder
  3. Collect events from AsyncThrowingStream
  4. Assert exact event ordering (not just presence)
@Test("Two parallel writers produce deterministic order")
func twoWriters() async throws {
    enum Schema: HiveSchema {
        static var channelSpecs: [AnyHiveChannelSpec<Schema>] {
            // ... inline schema
        }
    }

    var builder = HiveGraphBuilder<Schema>(start: [HiveNodeID("A"), HiveNodeID("B")])
    // ... add nodes, compile, run

    guard case let .finished(output, _) = outcome else { return }
    #expect(try store.get(valuesKey) == [1, 2, 3])
}

16. Examples

Hello World

let graph = try Workflow<Schema> {
    Node("greet") { _ in
        Effects { Set(messageKey, "Hello from Hive!"); End() }
    }.start()
}.compile()

Branching

Branch(from: "check") {
    Branch.case(name: "high", when: { ($0.get(scoreKey) ?? 0) >= 70 }) {
        GoTo("pass")
    }
    Branch.default { GoTo("fail") }
}

Fan-Out + Join + Interrupt

Node("dispatch") { _ in
    Effects {
        SpawnEach(["a", "b", "c"], node: "worker") { item in
            var local = HiveTaskLocalStore<Schema>.empty
            try! local.set(itemKey, item)
            return local
        }
        End()
    }
}.start()

Join(parents: ["worker"], to: "review")
Node("review") { _ in Effects { Interrupt("Approve?") } }

Agent Loop

ModelTurn("chat", model: "gpt-4", messages: [
    HiveChatMessage(id: "u1", role: .user, content: "Weather in SF?")
])
.tools(.environment)
.agentLoop(.init(maxModelInvocations: 8))
.writes(to: answerKey)
.start()

TinyGraph Executable

swift run HiveTinyGraphExample

Demonstrates fan-out, join barriers, interrupt/resume, and checkpoint persistence. See Sources/Hive/Examples/TinyGraph/main.swift.


Generated from the Hive codebase. See HIVE_SPEC.md for the normative specification.