Orchestration API Reference¶
Market-Based Allocation¶
MarketAllocator¶
End-to-end allocation pipeline: capability filter → bid solicitation → scoring → award.
from grampus.orchestration.market import (
MarketAllocator, CapabilityRegistry, TaskBoard,
BidScorer, ReputationTracker,
)
registry = CapabilityRegistry(max_candidates=5)
board = TaskBoard()
reputation = ReputationTracker()
scorer = BidScorer(reputation)
allocator = MarketAllocator(
registry=registry,
board=board,
scorer=scorer,
reputation=reputation,
model_client=client, # any Grampus ModelClient; used for bid solicitation
tracer=None, # optional GrampusTracer; emits market.allocate / market.award spans
)
result = await allocator.allocate(spec) # AllocationResult
await allocator.report_outcome(outcome) # updates board + reputation
grampus.orchestration.market.allocator.MarketAllocator
¶
End-to-end market allocation pipeline.
Steps: 1. Filter to capable agents (CapabilityRegistry.filter_capable). 2. Solicit bids from each capable agent via LLM call. 3. Score all bids (BidScorer.score_all). 4. Award to highest final_score above min_success_threshold. 5. If no bids pass threshold → AllocationStatus.REJECTED. 6. Update TaskBoard status.
OTEL spans emitted: - market.allocate (parent): task_id, num_candidates, num_bids - market.bid_solicitation (child per agent): agent_id - market.award: winner_agent_id, final_score, calibrated_success_prob
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
registry
|
CapabilityRegistry
|
Agent capability registry. |
required |
board
|
TaskBoard
|
Task board for posting and status updates. |
required |
scorer
|
BidScorer
|
Bid scoring engine. |
required |
reputation
|
ReputationTracker
|
Reputation tracker for calibration data. |
required |
model_client
|
Any
|
ModelClient for bid solicitation LLM calls. |
required |
tracer
|
Any | None
|
Optional GrampusTracer for OTEL span emission. |
None
|
allocate(spec)
async
¶
Run the full allocation pipeline.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
spec
|
TaskSpec
|
The task specification to allocate. |
required |
Returns:
| Type | Description |
|---|---|
AllocationResult
|
AllocationResult with status and winning bid (if any). |
report_outcome(outcome)
async
¶
Update board status and reputation after a task completes.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
outcome
|
TaskOutcome
|
The completed task outcome. |
required |
CapabilityRegistry¶
Stores worker capability profiles with capability-first filtering (COALESCE, arXiv 2506.01900).
from grampus.orchestration.market import CapabilityRegistry, CapabilityProfile, AgentTier
registry = CapabilityRegistry(max_candidates=5) # default 5
profile = CapabilityProfile(
agent_id="researcher",
agent_name="Web Researcher",
skill_tags=["web_search", "summarize"],
model_tier=AgentTier.BALANCED,
cost_per_step_usd=0.002,
max_steps=10,
)
await registry.register(profile)
await registry.deregister("researcher")
capable = registry.filter_capable(
required_skills=["web_search"],
preferred_skills=["summarize"],
) # → list[CapabilityProfile], ranked by preferred matches, capped at max_candidates
grampus.orchestration.market.registry.CapabilityRegistry
¶
Stores worker agent capability profiles with in-memory fast lookup.
Profiles are persisted to Dapr state (namespace: "market:capability") and cached in memory for sub-millisecond capability filtering without a Dapr roundtrip on every allocation.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
state_store
|
Any | None
|
Optional DaprStateStore for persistence. When None, profiles are in-memory only (useful for testing). |
None
|
max_candidates
|
int
|
Maximum agents returned by filter_capable(). Limits bid solicitation cost — COALESCE insight (arXiv 2506.01900). |
_DEFAULT_MAX_CANDIDATES
|
register(profile)
async
¶
Register or update a worker agent's capability profile.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
profile
|
CapabilityProfile
|
The agent's CapabilityProfile to register. |
required |
deregister(agent_id)
async
¶
Remove a worker agent from the registry.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
agent_id
|
str
|
The agent to remove. |
required |
filter_capable(required_skills, preferred_skills)
¶
Return agents that satisfy all required skills, ranked by preferred matches.
Implements COALESCE capability-first filtering (arXiv 2506.01900): filter before bid solicitation to avoid wasting LLM calls on agents that cannot do the task.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
required_skills
|
list[str]
|
Skills every returned agent must have. |
required |
preferred_skills
|
list[str]
|
Skills used to rank capable agents (more = better). |
required |
Returns:
| Type | Description |
|---|---|
list[CapabilityProfile]
|
Up to max_candidates profiles sorted descending by preferred skill count. |
load_all()
async
¶
Reload all profiles from Dapr state into memory.
Called on startup to warm the in-memory cache from durable storage.
list_agents()
¶
Return all currently registered profiles.
TaskBoard¶
Durable task and bid store. Backed by Dapr state when a state_store is provided.
from grampus.orchestration.market import TaskBoard, TaskSpec, AllocationStatus
board = TaskBoard(state_store=None) # in-memory; pass DaprStateStore for persistence
task_id = await board.post_task(spec)
await board.submit_bid(bid)
bids = await board.get_bids_for_task(task_id)
await board.update_task_status(task_id, AllocationStatus.ALLOCATED)
await board.mark_outcome(outcome) # → COMPLETED or FAILED
ReputationTracker¶
UCB-based per-agent reputation (DRF, arXiv 2509.05764). Persists to Dapr state.
from grampus.orchestration.market import ReputationTracker, TaskOutcome
tracker = ReputationTracker(state_store=None)
record = await tracker.get("agent-id") # ReputationRecord
record = await tracker.update(outcome) # → updated ReputationRecord
cal_factor = await tracker.calibration_factor("agent-id") # float
ucb = await tracker.ucb_bonus("agent-id") # float; decays with history
tracker.record_self_report("agent-id", 0.85) # feed bid data for calibration
UCB formula: sqrt(2 × ln(max(2, N)) / max(1, n_i))
New agents always receive a positive exploration bonus.
BidScorer¶
Composite scoring with calibration discount.
from grampus.orchestration.market import BidScorer, ReputationTracker
scorer = BidScorer(
reputation_tracker=ReputationTracker(),
alpha=0.35, # reputation weight
beta=0.45, # calibrated success weight
gamma=0.20, # cost efficiency weight
# alpha + beta + gamma must equal 1.0 (ValueError on mismatch)
)
score = await scorer.score(bid, task_spec) # BidScore
scores = await scorer.score_all(bids, task_spec) # list[BidScore], sorted desc
Formula:
calibrated_success = clamp(raw_prob × calibration_factor, 0, 1)
cost_score = 1 / (1 + estimated_cost / budget)
composite = α×reputation + β×calibrated_success + γ×cost_score
final_score = composite + ucb_bonus
Bids with calibrated_success < min_success_threshold receive final_score = -1.0 (moral hazard guard).
MarketCrew¶
Crew subclass with opt-in market allocation.
from grampus.orchestration.market import MarketCrew
crew = MarketCrew(
members=members,
session_id="session-1",
pattern=CrewPattern.SEQUENTIAL, # used when use_market=False
allocator=allocator, # required when use_market=True
use_market=True, # default False — zero overhead when off
)
# Market path: post → allocate → run → report
result = await crew.run_task_with_market(
task_description="Summarise the latest AI papers.",
required_skills=["web_search"],
preferred_skills=["summarize"],
budget_usd=0.05,
)
# Standard path (use_market=False): identical to Crew.run()
result = await crew.run(initial_input="Summarise the latest AI papers.")
grampus.orchestration.market.crew.MarketCrew
¶
Bases: Crew
Extends Crew with optional market-based task allocation.
When use_market=True, each task is posted to the TaskBoard, MarketAllocator selects the best-fit worker agent, the selected agent executes via its AgentRunner, and the outcome is reported back to ReputationTracker.
When use_market=False (default), falls back to standard Crew execution with no market overhead — suitable for small crews.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
members
|
list[CrewMember]
|
List of crew members (each has its own AgentRunner). |
required |
pattern
|
CrewPattern
|
Execution pattern (sequential, parallel, hierarchical). |
SEQUENTIAL
|
shared_state_store
|
Any | None
|
Optional Dapr state store for cross-agent shared state. |
None
|
lock
|
Any | None
|
Optional DaprLock for coordinating shared state writes. |
None
|
session_id
|
str
|
Shared session identifier for the crew run. |
required |
allocator
|
MarketAllocator | None
|
MarketAllocator instance. Required when use_market=True. |
None
|
use_market
|
bool
|
Enable market-based allocation. Default False. |
False
|
run_task_with_market(task_description, required_skills, budget_usd=None, **kwargs)
async
¶
Post task to board, allocate best worker, run it, report outcome.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
task_description
|
str
|
Natural language task description. |
required |
required_skills
|
list[str]
|
Skills required for capability filtering. |
required |
budget_usd
|
float | None
|
Hard cost cap for the task, or None for unlimited. |
None
|
**kwargs
|
Any
|
Extra metadata forwarded to TaskSpec. |
{}
|
Returns:
| Type | Description |
|---|---|
ExecutionResult
|
ExecutionResult from the winning agent. |
Raises:
| Type | Description |
|---|---|
MarketAllocationError
|
When no capable agent wins allocation. |
market_node¶
Graph node factory that runs market allocation as a graph step.
from grampus.orchestration.nodes import market_node
handler = market_node(
allocator=allocator,
required_skills=["web_search"],
budget_usd=0.05,
node_name="market_allocate", # used in log messages
)
Reads from state:
- state.metadata["task_description"] — task description string
Writes to state:
- state.metadata["market_winner"] — winning agent_id string (or None)
- state.metadata["market_result"] — serialized AllocationResult dict
- state.status = AgentStatus.FAILED when allocation is REJECTED
Types¶
CapabilityProfile¶
| Field | Type | Default | Description |
|---|---|---|---|
agent_id |
str |
required | Unique identifier |
agent_name |
str |
required | Human-readable name |
skill_tags |
list[str] |
required | Capability labels used for filtering |
model_tier |
AgentTier |
BALANCED |
fast / balanced / powerful |
cost_per_step_usd |
float |
0.0 |
Self-reported step cost (used in fallback bid) |
max_steps |
int |
20 |
Maximum steps the agent will attempt |
latency_sla_ms |
int \| None |
None |
Optional latency SLA commitment |
TaskSpec¶
| Field | Type | Default | Description |
|---|---|---|---|
task_id |
str |
required | Unique task identifier |
description |
str |
required | Natural language task description |
required_skills |
list[str] |
required | Must-have skills (hard filter) |
preferred_skills |
list[str] |
[] |
Skills used for ranking (soft filter) |
budget_usd |
float \| None |
None |
Hard cost cap; None = unlimited |
min_success_threshold |
float |
0.5 |
Minimum calibrated probability to accept a bid |
deadline_ms |
int \| None |
None |
Optional wall-clock budget in milliseconds |
allow_partial |
bool |
False |
Whether PARTIAL outcome counts as success |
Bid¶
| Field | Type | Description |
|---|---|---|
bid_id |
str |
Unique bid ID (auto-generated UUID) |
task_id |
str |
Task this bid is for |
agent_id |
str |
Bidding agent |
self_reported_success_prob |
float |
Agent's own estimate (0–1); will be discounted |
estimated_cost_usd |
float |
Self-reported cost |
estimated_steps |
int |
Estimated number of steps |
rationale |
str |
One-sentence explanation (from bid solicitation prompt) |
BidScore¶
| Field | Type | Description |
|---|---|---|
raw_success_prob |
float |
Self-reported, before calibration |
calibrated_success_prob |
float |
After calibration_factor discount |
reputation_score |
float |
success_rate from ReputationTracker (0.5 for new agents) |
cost_score |
float |
1 / (1 + normalized_cost) |
composite |
float |
Weighted blend |
ucb_bonus |
float |
UCB1 exploration bonus |
final_score |
float |
composite + ucb_bonus; -1.0 when below threshold |
AllocationResult¶
| Field | Type | Description |
|---|---|---|
task_id |
str |
The allocated task |
status |
AllocationStatus |
ALLOCATED, REJECTED, BIDDING, etc. |
winning_agent_id |
str \| None |
Agent that won (None when REJECTED) |
winning_bid |
Bid \| None |
The winning Bid |
winning_score |
BidScore \| None |
The winning BidScore |
all_scores |
list[BidScore] |
All computed scores, sorted descending |
capability_filtered_out |
list[str] |
Agent IDs filtered before bid solicitation |
reject_reason |
str \| None |
Human-readable reason when REJECTED |
AllocationStatus¶
from grampus.orchestration.market import AllocationStatus
AllocationStatus.PENDING # task posted, no bids yet
AllocationStatus.BIDDING # bid solicitation in progress
AllocationStatus.ALLOCATED # winner selected
AllocationStatus.REJECTED # no capable bidders or all below threshold
AllocationStatus.COMPLETED # task finished successfully
AllocationStatus.FAILED # task execution failed
ReputationRecord¶
| Field | Type | Default | Description |
|---|---|---|---|
total_tasks |
int |
0 |
Tasks completed (success or failure) |
successful_tasks |
int |
0 |
Successful completions |
success_rate |
float |
0.0 |
successful / total |
cost_accuracy |
float |
1.0 |
EMA of actual_cost / estimated_cost; 1.0 = perfect |
calibration_factor |
float |
1.0 |
EMA multiplier for future bid discounting |
ucb_confidence |
float |
1.0 |
Current UCB exploration bonus |
TaskOutcome¶
| Field | Type | Description |
|---|---|---|
task_id |
str |
The completed task |
agent_id |
str |
The agent that executed it |
actual_success |
bool |
Whether it succeeded |
actual_cost_usd |
float |
Actual cost incurred |
actual_steps |
int |
Actual steps taken |
See the Market-Based Allocation guide for full usage and research citations.
Long-Horizon Planning¶
PlanningRunner¶
Top-level orchestrator for structured multi-step task execution.
from grampus.orchestration import PlanningRunner, PlanningConfig
planner = PlanningRunner(
agent_runner=agent_runner, # AgentRunner instance
model_client=client, # LLM client for planning calls
model_id="claude-opus-4-7", # model for planner/verifier/synthesizer
config=PlanningConfig(
complexity_threshold=4,
max_subgoals=12,
max_replans=3,
enable_lookahead=True,
enable_parallel_subgoals=True,
),
cost_tracker=None, # optional CostTracker
tracer=None, # optional GrampusTracer or any span(name, **attrs) tracer
)
result = await planner.run(task, agent_def, tool_names=["web_search"], memory_context="")
grampus.orchestration.planning.runner.PlanningRunner
¶
Orchestrates long-horizon task execution via structured planning.
Architecture: 1. Estimate complexity. Below threshold → delegate to AgentRunner directly. 2. Call Planner.create_plan() to get a SubGoal DAG. 3. Topological sort → execution waves. 4. Execute each wave: independent subgoals run via asyncio.gather. 5. After each subgoal: verify. On FAIL: try fallback, then trigger Replanner. 6. On replan: re-sort new plan, continue from first new wave. 7. After all subgoals complete: synthesize final output via one LLM call. 8. Return PlanResult.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
agent_runner
|
Any
|
AgentRunner instance for subgoal execution (duck-typed). |
required |
model_client
|
Any
|
LLM client for planning/verification/synthesis calls. |
required |
model_id
|
str
|
Model identifier for planning calls. |
required |
config
|
PlanningConfig | None
|
PlanningConfig tuning parameters. |
None
|
cost_tracker
|
Any | None
|
Optional cost tracker shared across all calls. |
None
|
tracer
|
Any | None
|
Optional OTEL tracer for planning.run spans. |
None
|
run(task, agent_def, *, tool_names=None, memory_context='')
async
¶
Execute the full planning + execution pipeline.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
task
|
str
|
User task description. |
required |
agent_def
|
Any
|
AgentDefinition for subgoal execution. |
required |
tool_names
|
list[str] | None
|
Available tool names for planning hints. |
None
|
memory_context
|
str
|
Optional memory context for the planner. |
''
|
Returns:
| Type | Description |
|---|---|
PlanResult
|
PlanResult with outcome, subgoal results, token usage, and timing. |
PlanningConfig¶
| Field | Type | Default | Description |
|---|---|---|---|
max_subgoals |
int |
12 |
Hard cap on subgoals per plan |
max_replans |
int |
3 |
Maximum replan cycles before PlanningError |
complexity_threshold |
int |
4 |
Skip planning when estimated steps ≤ this |
enable_lookahead |
bool |
True |
FLARE-style path simulation before each subgoal |
lookahead_paths |
int |
2 |
Candidate paths generated per lookahead call |
enable_parallel_subgoals |
bool |
True |
Run independent subgoals via asyncio.gather |
cost_budget_usd |
float \| None |
None |
Hard cost cap across all planning calls |
planner_model_tier |
str |
"powerful" |
Model tier for plan generation |
executor_model_tier |
str |
"balanced" |
Model tier for subgoal execution |
verifier_model_tier |
str |
"fast" |
Model tier for postcondition verification |
Plan¶
from grampus.orchestration import Plan, SubGoal
plan = Plan(
task="original task",
subgoals=[SubGoal(...)],
total_estimated_steps=6,
version=1, # increments on each replan
)
| Field | Type | Description |
|---|---|---|
task |
str |
Original user task |
subgoals |
list[SubGoal] |
Ordered list; DAG implied by dependencies |
total_estimated_steps |
int |
Planner's estimate of total tool calls |
created_at |
datetime |
UTC timestamp of plan creation |
version |
int |
Increments on each replan (starts at 1) |
SubGoal¶
| Field | Type | Default | Description |
|---|---|---|---|
id |
str |
required | Short snake_case slug, ≤ 20 chars |
description |
str |
required | What this step should accomplish |
success_criterion |
str |
required | Verifiable completion condition |
dependencies |
list[str] |
[] |
IDs of subgoals that must complete first |
tool_hints |
list[str] |
[] |
Suggested tool names (advisory) |
fallback_strategy |
str |
"" |
Alternative approach if primary fails |
max_retries |
int |
2 |
PARTIAL retries before declaring FAIL |
status |
SubGoalStatus |
PENDING |
Current execution status |
output_summary |
str |
"" |
1-2 sentence summary filled after completion |
attempts |
int |
0 |
Total execution attempts so far |
failure_reason |
str |
"" |
Last failure reason (filled on FAIL) |
SubGoalStatus¶
from grampus.orchestration import SubGoalStatus
SubGoalStatus.PENDING # not yet started
SubGoalStatus.RUNNING # currently executing
SubGoalStatus.COMPLETED # success criterion met
SubGoalStatus.FAILED # could not be completed after all retries
SubGoalStatus.SKIPPED # skipped (e.g. dependency failed)
VerificationResult¶
Returned by PostconditionVerifier.verify() after each subgoal execution:
from grampus.orchestration import VerificationResult
VerificationResult.PASS # criterion clearly met
VerificationResult.PARTIAL # progress made; criterion not fully met (retry)
VerificationResult.FAIL # criterion not met; retry unlikely to help
PlanResult¶
Returned by PlanningRunner.run():
| Field | Type | Description |
|---|---|---|
task |
str |
Original user task |
plan |
Plan |
Final plan version executed |
final_output |
str |
Synthesized answer from all completed subgoals |
completed_subgoals |
list[str] |
IDs of successfully completed subgoals |
failed_subgoals |
list[str] |
IDs of subgoals that could not be completed |
replans_triggered |
int |
Number of replan cycles that occurred |
total_token_usage |
TokenUsage \| None |
Accumulated token usage |
duration_seconds |
float |
Wall-clock duration |
success |
bool |
True when all subgoals completed |
planning_node¶
Graph node factory wrapping a PlanningRunner:
from grampus.orchestration import planning_node, Graph, human_node
handler = planning_node(
planning_runner=planner,
agent_def=agent_def,
tool_names=["web_search", "write_file"],
memory_context_key="memory_context", # reads from state.metadata
)
async def route(state):
plan = state.metadata.get("plan_result", {})
return "review" if not plan.get("success") else "end"
graph = (
Graph(graph_id="pipeline")
.add_node("plan", handler, entry=True)
.add_conditional_edge("plan", route, {"review": "review", "end": None})
.add_node("review", human_node("Planning failed — please review."))
)
The ASSISTANT message appended by the node carries:
message.metadata["plan_result"] # full PlanResult serialised as dict
message.metadata["replans_triggered"] # int
message.metadata["subgoals_completed"] # int
See the Long-Horizon Planning guide for full usage and research citations.
Multi-Agent Debate¶
DebateOrchestrator¶
from grampus.orchestration.debate import DebateOrchestrator, DebateConfig, DebaterConfig
orch = DebateOrchestrator(
config=DebateConfig(...),
cost_tracker=None, # optional CostTracker
tracer=None, # optional GrampusTracer or any span(name, **attrs) tracer
)
result = await orch.run("question text")
grampus.orchestration.debate.orchestrator.DebateOrchestrator
¶
Runs multi-agent debate and returns a DebateResult.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
DebateConfig
|
DebateConfig specifying debaters, rounds, aggregation, and budgets. |
required |
cost_tracker
|
Any | None
|
Optional CostTracker; |
None
|
tracer
|
Any | None
|
Optional tracer with a |
None
|
run(question)
async
¶
Execute the full debate pipeline and return a DebateResult.
Steps: 1. Adaptive routing check — skip to single-agent if confidence is high. 2. Round 1: all debaters answer independently (asyncio.gather). 3. Convergence check → stop early if threshold met. 4. Rounds 2..max_rounds: debaters respond to peers (asyncio.gather per round). 5. Aggregation via configured strategy. 6. Act-vs-escalate: escalate_to_human when convergence < escalate_threshold.
DebateConfig¶
from grampus.orchestration.debate import DebateConfig, AggregationStrategy
cfg = DebateConfig(
debaters=[...], # min 2 DebaterConfig entries
max_rounds=3,
aggregation=AggregationStrategy.WEIGHTED_VOTE,
convergence_threshold=0.8, # stop early when this fraction agrees
adaptive_routing=True, # skip debate if first model is confident
routing_confidence_threshold=0.85,
escalate_threshold=0.5, # set escalate_to_human when below this
)
| Field | Type | Default |
|---|---|---|
debaters |
list[DebaterConfig] |
required (≥ 2) |
max_rounds |
int |
3 |
aggregation |
AggregationStrategy |
WEIGHTED_VOTE |
convergence_threshold |
float |
0.8 |
adaptive_routing |
bool |
True |
routing_confidence_threshold |
float |
0.85 |
routing_model_client |
ModelClient \| None |
None → debaters[0] |
routing_model_id |
str |
"" → debaters[0] |
judge_config |
DebaterConfig \| None |
None |
cost_budget_usd |
float \| None |
None |
escalate_threshold |
float |
0.5 |
DebaterConfig¶
from grampus.orchestration.debate import DebaterConfig
cfg = DebaterConfig(
model_client=client, # any Grampus ModelClient
model_id="claude-sonnet-4-6",
temperature=0.7,
role_hint="You are a skeptical devil's advocate.", # optional persona
weight=1.0, # vote weight for WEIGHTED_VOTE aggregation
)
AggregationStrategy¶
from grampus.orchestration.debate import AggregationStrategy
AggregationStrategy.MAJORITY_VOTE # largest Jaccard cluster, highest-confidence rep
AggregationStrategy.WEIGHTED_VOTE # clusters scored by debater.weight × confidence
AggregationStrategy.JUDGE # separate judge model synthesises all positions
DebateResult¶
Returned by DebateOrchestrator.run():
| Field | Type | Description |
|---|---|---|
final_answer |
str |
Aggregated winning answer |
final_reasoning |
str |
Reasoning from the winning position |
confidence |
float |
Aggregated confidence (0–1) |
escalate_to_human |
bool |
True when final_convergence_score < escalate_threshold |
rounds |
list[DebateRound] |
Full per-round transcript |
routing_decision |
RoutingDecision |
"debate" or "single_agent" |
total_rounds_run |
int |
Rounds actually completed |
converged |
bool |
Whether early stopping triggered |
final_convergence_score |
float |
Convergence score in the final round |
total_token_usage |
TokenUsage |
Cumulative tokens across all rounds |
total_cost_usd |
float |
Total spend |
duration_seconds |
float |
Wall-clock time |
debate_node¶
Graph node factory that wraps a DebateOrchestrator:
from grampus.orchestration import debate_node, Graph, human_node
handler = debate_node(
orchestrator,
question_extractor=None, # defaults to last USER message content
on_escalate="human_review", # metadata flag written when escalate_to_human=True
)
graph = (
Graph(graph_id="qa")
.add_node("debate", handler, entry=True)
.add_conditional_edge("debate", route_fn, {"escalate": "human_review", "end": None})
.add_node("human_review", human_node("Low confidence — please review."))
)
The ASSISTANT message appended by the node carries:
message.metadata["debate_result"] # full DebateResult serialised as dict
message.metadata["debate_confidence"] # float
message.metadata["debate_escalate"] # bool
message.metadata["debate_rounds"] # int
message.metadata["debate_routing"] # "debate" | "single_agent"
Uncertainty Quantification¶
UncertaintyMonitor¶
Session-level uncertainty tracker implementing Dual-Process AUQ. Attach to AgentRunner via uncertainty_monitor=monitor.
from grampus.orchestration import UncertaintyMonitor, UncertaintyPolicy, UncertaintyEstimator
policy = UncertaintyPolicy(
low_threshold=0.80,
medium_threshold=0.60,
high_threshold=0.40,
enable_p_true=True,
enable_semantic_sampling=False,
irreversible_tool_names=["send_email", "delete", "deploy"],
inject_reflection_on_high=True,
)
monitor = UncertaintyMonitor(policy=policy)
runner = AgentRunner(client, executor, uncertainty_monitor=monitor)
grampus.orchestration.uncertainty.monitor.UncertaintyMonitor
¶
Session-level uncertainty tracking implementing Dual-Process AUQ.
System 1 (fast, always): P(True) + verbalized fusion → SAUP propagation → classify → decide.
System 2 (slow, triggered on uncertain zone): Semantic entropy sampling when fused confidence is in trigger zone. Reflection injection when HIGH uncertainty is detected.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
estimator
|
UncertaintyEstimator | None
|
UncertaintyEstimator instance; defaults to UncertaintyEstimator(). |
None
|
propagator
|
UncertaintyPropagator | None
|
UncertaintyPropagator instance; defaults to UncertaintyPropagator(). |
None
|
policy
|
UncertaintyPolicy | None
|
UncertaintyPolicy instance; defaults to UncertaintyPolicy(). |
None
|
tracer
|
Any | None
|
Optional GrampusTracer for OTEL spans (duck-typed). |
None
|
initialize(session_id, agent_id)
¶
Reset belief state for a new session. Call at start of each run().
observe_llm_response(response_text, step_id, *, prompt_messages=None, model_client=None, model_id='', step_type='llm_call')
async
¶
Full estimation pipeline for one LLM response.
Runs System 1 always; System 2 when fused confidence in trigger zone.
Returns:
| Type | Description |
|---|---|
tuple[StepUncertainty, UncertaintyAction]
|
(StepUncertainty record, UncertaintyAction to take). |
observe_tool_call(tool_name, step_id)
async
¶
Check uncertainty before executing a tool using current cumulative state.
Does not make any new LLM calls — uses existing cumulative_confidence.
Returns:
| Type | Description |
|---|---|
tuple[StepUncertainty, UncertaintyAction]
|
(StepUncertainty record, UncertaintyAction to take). |
get_belief_state()
¶
Return the current session belief state.
summary_metadata()
¶
Return compact metadata dict for state.metadata["uncertainty"].
reset()
¶
Reset belief state to empty.
UncertaintyPolicy¶
| Field | Type | Default | Description |
|---|---|---|---|
low_threshold |
float |
0.80 |
Confidence floor for LOW → PROCEED |
medium_threshold |
float |
0.60 |
Floor for MEDIUM → PROCEED_WITH_LOG |
high_threshold |
float |
0.40 |
Floor for HIGH → PAUSE_FOR_HUMAN |
enable_p_true |
bool |
True |
Run P(True) follow-up call |
enable_semantic_sampling |
bool |
False |
Enable adaptive semantic entropy slow path |
irreversible_tool_names |
list[str] |
[] |
Tool name substrings triggering PAUSE at MEDIUM |
inject_reflection_on_high |
bool |
True |
Inject System-2 reflection before PAUSE |
UncertaintyEstimator¶
| Field | Type | Default | Description |
|---|---|---|---|
verbalized_weight |
float |
0.4 |
Fusion weight for verbalized signal |
p_true_weight |
float |
0.6 |
Fusion weight for P(True) signal |
verbalized_calibration_bias |
float |
0.25 |
ECE correction for verbalized (documented ECE ≥ 0.377) |
p_true_calibration_bias |
float |
0.10 |
ECE correction for P(True) |
min_samples |
int |
2 |
Adaptive entropy: start sample count |
max_samples |
int |
5 |
Adaptive entropy: extend on disagreement |
early_stop_jaccard |
float |
0.60 |
First-pair agreement threshold for early stop |
semantic_trigger_low |
float |
0.50 |
Lower bound of sampling trigger zone |
semantic_trigger_high |
float |
0.72 |
Upper bound of sampling trigger zone |
UncertaintyLevel¶
from grampus.orchestration import UncertaintyLevel
UncertaintyLevel.LOW # ≥ low_threshold
UncertaintyLevel.MEDIUM # ≥ medium_threshold
UncertaintyLevel.HIGH # ≥ high_threshold
UncertaintyLevel.CRITICAL # < high_threshold
UncertaintyAction¶
from grampus.orchestration import UncertaintyAction
UncertaintyAction.PROCEED # run continues
UncertaintyAction.PROCEED_WITH_LOG # run continues; warning logged
UncertaintyAction.PAUSE_FOR_HUMAN # status=WAITING_FOR_HUMAN
UncertaintyAction.ABORT # UncertaintyError raised
StepUncertainty¶
Returned by observe_llm_response() and observe_tool_call():
| Field | Type | Description |
|---|---|---|
step_id |
str |
Unique step identifier |
step_type |
str |
"llm_call", "tool_call", "memory_read", "decision" |
verbalized_confidence |
float |
Raw extracted confidence (before calibration) |
p_true_confidence |
float |
P(True) result; -1.0 when not run |
fused_confidence |
float |
Calibrated weighted fusion |
propagated_confidence |
float |
After SAUP propagation through prior steps |
level |
UncertaintyLevel |
Classified tier |
action |
UncertaintyAction |
Control action taken |
samples_used |
int |
Semantic entropy samples drawn (0 = not run) |
reflection_injected |
bool |
Whether System-2 reflection was injected |
uncertainty_guard_node¶
Graph node factory for explicit uncertainty checkpoints:
from grampus.orchestration import uncertainty_guard_node, Graph, human_node
handler = uncertainty_guard_node(
monitor,
step_type="decision", # SAUP weight lookup
escalate_node="human_review", # sets metadata["uncertainty_escalate"]=True on PAUSE
)
async def route(state):
return "human_review" if state.metadata.get("uncertainty_escalate") else "continue"
graph = (
Graph(graph_id="safe-qa")
.add_node("llm", llm_handler, entry=True)
.add_node("guard", handler)
.add_conditional_edge("guard", route, {"human_review": "human_review", "continue": "end"})
.add_node("human_review", human_node("Low confidence — please review."))
)
See the Uncertainty Quantification guide for full usage and research citations.
AgentRunner¶
The main agent execution loop implementing the ReAct (Reason+Act) pattern.
grampus.orchestration.runner.AgentRunner
¶
Main agent execution loop implementing the ReAct pattern.
Observe → Think (LLM) → Act (tools) → repeat until done or max_iterations.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
model_client
|
Any
|
LLM client for completions (duck-typed as Any). |
required |
tool_executor
|
ToolExecutor
|
Executor for tool calls. |
required |
memory_manager
|
MemoryManager | None
|
Optional memory facade. When None, memory is disabled. |
None
|
cost_tracker
|
CostTracker | None
|
Optional cost tracker. When None, cost is not tracked. |
None
|
state_store
|
Any | None
|
Optional Dapr state store for persisting AgentState between turns. When None, state is only in-memory. |
None
|
config
|
RunnerConfig | None
|
Tuning parameters. |
None
|
run(agent_def, user_input, *, session_id, user_id=None, agent_state=None, _handoff_depth=0, _prefix_messages=None)
async
¶
Execute the agent loop for one user turn.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
agent_def
|
AgentDefinition
|
Blueprint describing model, tools, and behaviour config. |
required |
user_input
|
str
|
The user's message or task. |
required |
session_id
|
str
|
Unique identifier for this session. |
required |
agent_state
|
AgentState | None
|
Pre-existing state to restore, or None to build fresh. |
None
|
Returns:
| Type | Description |
|---|---|
ExecutionResult
|
ExecutionResult with output, messages, costs, and timing. |
Raises:
| Type | Description |
|---|---|
OrchestrationError
|
code="MAX_ITERATIONS_EXCEEDED" when loop limit is reached without a final answer. |
BudgetExceededError
|
Propagated from CostTracker when budget is hit. |
resume(agent_id, session_id, human_response)
async
¶
Resume a paused agent (status=WAITING_FOR_HUMAN).
Loads state from state_store, appends human_response as a user message, then re-enters the ReAct loop.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
agent_id
|
str
|
The agent name used as key (matches agent_def.name). |
required |
session_id
|
str
|
Session to restore. |
required |
human_response
|
str
|
Human reply to inject as a user message. |
required |
Raises:
| Type | Description |
|---|---|
OrchestrationError
|
code="NO_STATE_FOUND" when state not in store. |
OrchestrationError
|
code="AGENT_NOT_WAITING" when not paused. |
cost_summary()
¶
Return cost accumulation summary, or None when no tracker is configured.
RunnerConfig¶
from grampus.orchestration.runner import RunnerConfig
config = RunnerConfig(
max_iterations=10, # max ReAct iterations before OrchestrationError
memory_top_k=5, # episodic/semantic results per recall query
enable_memory=True, # enable memory read/write during runs
react_pattern=True, # use ReAct loop (vs. single-shot)
)
| Field | Type | Default | Description |
|---|---|---|---|
max_iterations |
int |
10 |
Abort with OrchestrationError if exceeded |
memory_top_k |
int |
5 |
Results per MemoryManager.recall() call |
enable_memory |
bool |
True |
Whether to read/write memory during the loop |
react_pattern |
bool |
True |
Use ReAct; set False for single-shot LLM calls |
Crew¶
Multi-agent orchestration with sequential, parallel, and hierarchical patterns.
grampus.orchestration.crew.Crew
¶
Coordinates multiple AgentRunner instances.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
members
|
list[CrewMember]
|
List of crew members (each has its own AgentRunner). |
required |
pattern
|
CrewPattern
|
Execution pattern (sequential, parallel, hierarchical). |
SEQUENTIAL
|
shared_state_store
|
Any | None
|
Optional Dapr state store for cross-agent shared state. |
None
|
lock
|
Any | None
|
Optional DaprLock for coordinating shared state writes. |
None
|
session_id
|
str
|
Shared session identifier for the crew run. |
required |
run(initial_input)
async
¶
Execute the crew with the configured pattern.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
initial_input
|
str
|
Prompt or task passed to the first agent (sequential) or all agents (parallel / supervisor in hierarchical). |
required |
Returns:
| Type | Description |
|---|---|
CrewResult
|
CrewResult with per-agent outputs, total cost, duration, and pattern. |
Raises:
| Type | Description |
|---|---|
OrchestrationError
|
code="CREW_MEMBER_FAILED" if any member raises. |
CrewPattern¶
from grampus.orchestration.crew import CrewPattern
CrewPattern.SEQUENTIAL # agents run one after another; outputs accumulate
CrewPattern.PARALLEL # agents run concurrently on the same input
CrewPattern.HIERARCHICAL # first member (role="supervisor") delegates to workers
CrewMember¶
from grampus.orchestration.crew import CrewMember
member = CrewMember(
agent_def=AgentDefinition(...),
runner=AgentRunner(...),
role="researcher", # semantic label; "supervisor" triggers hierarchical delegation
)
CrewResult¶
Returned by Crew.run():
@dataclass
class CrewResult:
outputs: dict[str, str] # agent_name → output string
total_cost_usd: float
duration_seconds: float
pattern: CrewPattern
Graph engine¶
grampus.orchestration.graph.Graph
¶
Directed graph of async node handlers with checkpoint/restore support.
Build with the builder methods, then call execute().
grampus.orchestration.nodes.llm_node(model_client, *, model, system_prompt='', extract_tool_calls=True)
¶
Return a handler that calls the LLM with the current message window.
Appends the assistant response as a new Message to state.messages. Accumulates token usage in state.total_token_usage. Sets state.status = AgentStatus.RUNNING.
grampus.orchestration.nodes.tool_node(executor)
¶
Return a handler that executes all pending tool calls in state.
Reads tool calls from the last assistant message. Executes each, appends a TOOL message containing the ToolResult objects. Sets state.status = AgentStatus.RUNNING when tool calls are found. Passes state through unchanged when no tool calls are pending.
grampus.orchestration.nodes.conditional_node(condition_fn)
¶
Return a pass-through handler marking a conditional decision point.
The handler returns state unchanged — routing is handled via add_conditional_edge() on the parent Graph using the same condition_fn.
grampus.orchestration.nodes.human_node(prompt='Waiting for human input...')
¶
Return a handler that pauses execution for human review.
Sets state.status = AgentStatus.WAITING_FOR_HUMAN. Appends a system message with the prompt text. Returns immediately — the caller is responsible for resuming.
Model router¶
grampus.orchestration.model_router.ModelRouter
¶
Routes execution steps to the cheapest capable model.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
models
|
list[ModelSpec]
|
Registered model specs; must cover at least one tier in use. |
required |
clients
|
dict[str, Any]
|
Map of model_id -> ModelClient instance (duck-typed as Any). |
required |
routing_rules
|
list[RoutingRule] | None
|
Ordered rules; first substring match wins. Falls back to BALANCED when no rule matches. |
None
|
fallback_chain
|
list[ModelTier] | None
|
Ordered tiers to attempt on failure. Defaults to [BALANCED, FAST, POWERFUL]. |
None
|
Model tiers¶
| Tier | Use case | Example models |
|---|---|---|
fast |
Simple classification, routing | claude-haiku-4-5 |
balanced |
Standard agent tasks | claude-sonnet-4-6 |
powerful |
Complex reasoning, synthesis | claude-opus-4-7 |
Cost tracker¶
grampus.orchestration.cost_tracker.CostTracker
¶
Tracks token usage and cost per agent/session/step/model.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
agent_id
|
str
|
Agent being tracked. |
required |
session_id
|
str
|
Current session. |
required |
budget_usd
|
float | None
|
Hard spend limit. None means unlimited. |
None
|
pubsub
|
Any | None
|
Optional pub/sub object; must expose |
None
|
cost_topic
|
str
|
Topic name for cost events. |
'grampus.cost.events'
|
record(usage, *, step_name, model_spec)
async
¶
Record usage from one LLM call.
Accumulates totals, publishes a CostEvent, then checks the budget. Budget check happens after recording so the event is always emitted.
Raises:
| Type | Description |
|---|---|
BudgetExceededError
|
code="BUDGET_EXCEEDED" when total exceeds budget. |
summary()
¶
Return current accumulated totals. Pure computation, no I/O.
check_budget(estimated_cost_usd=0.0)
¶
Pre-flight budget check before an LLM call.
Raises:
| Type | Description |
|---|---|
BudgetExceededError
|
When current + estimated exceeds the budget limit. |
CostSummary¶
@dataclass
class CostSummary:
total_cost_usd: float
total_tokens: int
by_model: dict[str, float] # model → USD
by_step: list[StepCost] # per-iteration breakdown
budget_usd: float | None
budget_remaining_usd: float | None
ExecutionResult¶
Returned by AgentRunner.run():
grampus.core.types.ExecutionResult
¶
Bases: BaseModel
The final result of an agent execution.
AgentDefinition¶
Blueprint passed to AgentRunner.run():
grampus.core.types.AgentDefinition
¶
Bases: BaseModel
Blueprint for an agent — its model, tools, and behaviour config.
AgentState¶
Mutable runtime state maintained by the runner:
grampus.core.types.AgentState
¶
Bases: BaseModel
Mutable runtime state of an executing agent.