Merge remote-tracking branch 'origin/feature/queen-worker-comm' into feature/queen-worker-comm

This commit is contained in:
Richard Tang
2026-03-02 21:27:22 -08:00
9 changed files with 873 additions and 80 deletions
+26 -5
View File
@@ -107,17 +107,38 @@ _TC_ARG_LIMIT = 200 # max chars per tool_call argument after compaction
def _compact_tool_calls(tool_calls: list[dict[str, Any]]) -> list[dict[str, Any]]:
"""Truncate tool_call arguments to save context tokens during compaction.
Preserves ``id``, ``type``, and ``function.name`` exactly. Truncates
``function.arguments`` (a JSON string) to at most ``_TC_ARG_LIMIT`` chars
so that large payloads (e.g. set_output with full findings) don't survive
compaction and defeat the purpose of context reduction.
Preserves ``id``, ``type``, and ``function.name`` exactly. When arguments
exceed ``_TC_ARG_LIMIT``, replaces the full JSON string with a compact
**valid** JSON summary. The Anthropic API parses tool_call arguments and
rejects requests with malformed JSON (e.g. unterminated strings), so we
must never produce broken JSON here.
"""
compact = []
for tc in tool_calls:
func = tc.get("function", {})
args = func.get("arguments", "")
if len(args) > _TC_ARG_LIMIT:
args = args[:_TC_ARG_LIMIT] + "…[truncated]"
# Build a valid JSON summary instead of slicing mid-string.
# Try to extract top-level keys for a meaningful preview.
try:
parsed = json.loads(args)
if isinstance(parsed, dict):
# Preserve key names, truncate values
summary_parts = []
for k, v in parsed.items():
v_str = str(v)
if len(v_str) > 60:
v_str = v_str[:60] + "..."
summary_parts.append(f"{k}={v_str}")
summary = ", ".join(summary_parts)
if len(summary) > _TC_ARG_LIMIT:
summary = summary[:_TC_ARG_LIMIT] + "..."
args = json.dumps({"_compacted": summary})
else:
args = json.dumps({"_compacted": str(parsed)[:_TC_ARG_LIMIT]})
except (json.JSONDecodeError, TypeError):
# Args were already invalid JSON — wrap the preview safely
args = json.dumps({"_compacted": args[:_TC_ARG_LIMIT]})
compact.append(
{
"id": tc.get("id", ""),
+4
View File
@@ -338,6 +338,10 @@ class AsyncEntryPointSpec(BaseModel):
max_concurrent: int = Field(
default=10, description="Maximum concurrent executions for this entry point"
)
max_resurrections: int = Field(
default=3,
description="Auto-restart on non-fatal failure (0 to disable)",
)
model_config = {"extra": "allow"}
+69
View File
@@ -511,6 +511,7 @@ class EventLoopNode(NodeProtocol):
# 5. Stall / doom loop detection state (restored from cursor if resuming)
recent_responses: list[str] = _restored_recent_responses
recent_tool_fingerprints: list[list[tuple[str, str]]] = _restored_tool_fingerprints
_consecutive_empty_turns: int = 0
# 6. Main loop
for iteration in range(start_iteration, self._config.max_iterations):
@@ -649,6 +650,22 @@ class EventLoopNode(NodeProtocol):
error=str(e)[:500],
execution_id=execution_id,
)
# For malformed tool call errors, inject feedback into
# the conversation before retrying. Retrying with the
# same messages is futile — the LLM will reproduce the
# same truncated JSON. The nudge tells it to shorten
# its arguments.
error_str = str(e).lower()
if "failed to parse tool call" in error_str:
await conversation.add_user_message(
"[System: Your previous tool call had malformed "
"JSON arguments (likely truncated). Keep your "
"tool call arguments shorter and simpler. Do NOT "
"repeat the same long argument — summarize or "
"split into multiple calls.]"
)
await asyncio.sleep(delay)
continue # retry same iteration
@@ -774,6 +791,57 @@ class EventLoopNode(NodeProtocol):
latency_ms=latency_ms,
conversation=conversation if _is_continuous else None,
)
else:
# Ghost empty stream: LLM returned nothing and outputs
# are still missing. The conversation hasn't changed, so
# repeating the same call will produce the same empty
# result. Inject a nudge to break the cycle.
_consecutive_empty_turns += 1
logger.warning(
"[%s] iter=%d: empty response with missing outputs %s (consecutive=%d)",
node_id,
iteration,
missing,
_consecutive_empty_turns,
)
if _consecutive_empty_turns >= self._config.stall_detection_threshold:
# Persistent ghost stream — fail the node.
error_msg = (
f"Ghost empty stream: {_consecutive_empty_turns} "
f"consecutive empty responses with missing "
f"outputs {missing}"
)
latency_ms = int((time.time() - start_time) * 1000)
if ctx.runtime_logger:
ctx.runtime_logger.log_node_complete(
node_id=node_id,
node_name=ctx.node_spec.name,
node_type="event_loop",
success=False,
error=error_msg,
total_steps=iteration + 1,
tokens_used=total_input_tokens + total_output_tokens,
input_tokens=total_input_tokens,
output_tokens=total_output_tokens,
latency_ms=latency_ms,
exit_status="ghost_stream",
accept_count=_accept_count,
retry_count=_retry_count,
escalate_count=_escalate_count,
continue_count=_continue_count,
)
raise RuntimeError(error_msg)
# First nudge — inject a system message to break the
# empty-response cycle.
await conversation.add_user_message(
"[System: Your response was empty. You have required "
f"outputs that are not yet set: {missing}. Review "
"your task and call the appropriate tools to make "
"progress.]"
)
continue
else:
_consecutive_empty_turns = 0
# 6f. Stall detection
recent_responses.append(assistant_text)
@@ -2502,6 +2570,7 @@ class EventLoopNode(NodeProtocol):
"service unavailable",
"bad gateway",
"overloaded",
"failed to parse tool call",
]
return any(kw in error_str for kw in transient_keywords)
+28 -30
View File
@@ -237,6 +237,11 @@ def _is_stream_transient_error(exc: BaseException) -> bool:
Transient errors (recoverable=True): network issues, server errors, timeouts.
Permanent errors (recoverable=False): auth, bad request, context window, etc.
NOTE: "Failed to parse tool call arguments" (malformed LLM output) is NOT
transient at the stream level retrying with the same messages produces the
same malformed output. This error is handled at the EventLoopNode level
where the conversation can be modified before retrying.
"""
try:
from litellm.exceptions import (
@@ -917,30 +922,6 @@ class LiteLLMProvider(LLMProvider):
# and we skip the retry path — nothing was yielded in vain.)
has_content = accumulated_text or tool_calls_acc
if not has_content:
# If the conversation ends with an assistant or tool
# message, an empty stream is expected — the LLM has
# nothing new to say. Don't burn retries on this;
# let the caller (EventLoopNode) decide what to do.
# Typical case: client_facing node where the LLM set
# all outputs via set_output tool calls, and the tool
# results are the last messages.
last_role = next(
(m["role"] for m in reversed(full_messages) if m.get("role") != "system"),
None,
)
if last_role in ("assistant", "tool"):
logger.warning(
"[stream] %s returned empty stream after %s message "
"(no text, no tool calls). Treating as a no-op turn. "
"If this repeats, the agent may be stuck — check for "
"ghost empty assistant messages in conversation history.",
self.model,
last_role,
)
for event in tail_events:
yield event
return
# finish_reason=length means the model exhausted
# max_tokens before producing content. Retrying with
# the same max_tokens will never help.
@@ -958,10 +939,16 @@ class LiteLLMProvider(LLMProvider):
yield event
return
# Empty stream after a user message — use short fixed
# retries, not the rate-limit backoff. This is likely
# a deterministic conversation-structure issue, so long
# exponential waits don't help.
# Empty stream — always retry regardless of last message
# role. Ghost empty streams after tool results are NOT
# expected no-ops; they create infinite loops when the
# conversation doesn't change between iterations.
# After retries, return the empty result and let the
# caller (EventLoopNode) decide how to handle it.
last_role = next(
(m["role"] for m in reversed(full_messages) if m.get("role") != "system"),
None,
)
if attempt < EMPTY_STREAM_MAX_RETRIES:
token_count, token_method = _estimate_tokens(
self.model,
@@ -974,7 +961,8 @@ class LiteLLMProvider(LLMProvider):
attempt=attempt,
)
logger.warning(
f"[stream-retry] {self.model} returned empty stream "
f"[stream-retry] {self.model} returned empty stream "
f"after {last_role} message — "
f"~{token_count} tokens ({token_method}). "
f"Request dumped to: {dump_path}. "
f"Retrying in {EMPTY_STREAM_RETRY_DELAY}s "
@@ -983,7 +971,17 @@ class LiteLLMProvider(LLMProvider):
await asyncio.sleep(EMPTY_STREAM_RETRY_DELAY)
continue
# Success (or final attempt) — flush remaining events.
# All retries exhausted — log and return the empty
# result. EventLoopNode's empty response guard will
# accept if all outputs are set, or handle the ghost
# stream case if outputs are still missing.
logger.error(
f"[stream] {self.model} returned empty stream after "
f"{EMPTY_STREAM_MAX_RETRIES} retries "
f"(last_role={last_role}). Returning empty result."
)
# Success (or empty after exhausted retries) — flush events.
for event in tail_events:
yield event
return
+1
View File
@@ -1274,6 +1274,7 @@ class AgentRunner:
isolation_level=async_ep.isolation_level,
priority=async_ep.priority,
max_concurrent=async_ep.max_concurrent,
max_resurrections=async_ep.max_resurrections,
)
entry_points.append(ep)
+3
View File
@@ -130,6 +130,9 @@ class EventType(StrEnum):
WORKER_ESCALATION_TICKET = "worker_escalation_ticket"
QUEEN_INTERVENTION_REQUESTED = "queen_intervention_requested"
# Execution resurrection (auto-restart on non-fatal failure)
EXECUTION_RESURRECTED = "execution_resurrected"
# Worker lifecycle (session manager → frontend)
WORKER_LOADED = "worker_loaded"
CREDENTIALS_REQUIRED = "credentials_required"
+130 -38
View File
@@ -108,6 +108,7 @@ class EntryPointSpec:
isolation_level: str = "shared" # "isolated" | "shared" | "synchronized"
priority: int = 0
max_concurrent: int = 10 # Max concurrent executions for this entry point
max_resurrections: int = 3 # Auto-restart on non-fatal failure (0 to disable)
def get_isolation_level(self) -> IsolationLevel:
"""Convert string isolation level to enum."""
@@ -510,8 +511,37 @@ class ExecutionStream:
logger.debug(f"Queued execution {execution_id} for stream {self.stream_id}")
return execution_id
# Errors that indicate a fundamental configuration or environment problem.
# Resurrecting after these is pointless — the same error will recur.
_FATAL_ERROR_PATTERNS: tuple[str, ...] = (
"credential",
"authentication",
"unauthorized",
"forbidden",
"api key",
"import error",
"module not found",
"no module named",
"permission denied",
"invalid api",
"configuration error",
)
@classmethod
def _is_fatal_error(cls, error: str | None) -> bool:
"""Return True if the error is life-threatening (no point resurrecting)."""
if not error:
return False
error_lower = error.lower()
return any(pat in error_lower for pat in cls._FATAL_ERROR_PATTERNS)
async def _run_execution(self, ctx: ExecutionContext) -> None:
"""Run a single execution within the stream."""
"""Run a single execution within the stream.
Supports automatic resurrection: when the execution fails with a
non-fatal error, it restarts from the failed node up to
``entry_spec.max_resurrections`` times (default 3).
"""
execution_id = ctx.id
# When sharing a session with another entry point (resume_session_id),
@@ -519,6 +549,11 @@ class ExecutionStream:
# owns the state.json and _write_progress() keeps memory up-to-date.
_is_shared_session = bool(ctx.session_state and ctx.session_state.get("resume_session_id"))
max_resurrections = self.entry_spec.max_resurrections
_resurrection_count = 0
_current_session_state = ctx.session_state
_current_input_data = ctx.input_data
# Acquire semaphore to limit concurrency
async with self._semaphore:
ctx.status = "running"
@@ -559,12 +594,6 @@ class ExecutionStream:
store=self._runtime_log_store, agent_id=self.graph.id
)
# Create executor for this execution.
# Each execution gets its own storage under sessions/{exec_id}/
# so conversations, spillover, and data files are all scoped
# to this execution. The executor sets data_dir via execution
# context (contextvars) so data tools and spillover share the
# same session-scoped directory.
# Derive storage from session_store (graph-specific for secondary
# graphs) so that all files — conversations, state, checkpoints,
# data — land under the graph's own sessions/ directory, not the
@@ -573,43 +602,106 @@ class ExecutionStream:
exec_storage = self._session_store.sessions_dir / execution_id
else:
exec_storage = self._storage.base_path / "sessions" / execution_id
executor = GraphExecutor(
runtime=runtime_adapter,
llm=self._llm,
tools=self._tools,
tool_executor=self._tool_executor,
event_bus=self._scoped_event_bus,
stream_id=self.stream_id,
execution_id=execution_id,
storage_path=exec_storage,
runtime_logger=runtime_logger,
loop_config=self.graph.loop_config,
accounts_prompt=self._accounts_prompt,
accounts_data=self._accounts_data,
tool_provider_map=self._tool_provider_map,
)
# Track executor so inject_input() can reach EventLoopNode instances
self._active_executors[execution_id] = executor
# Write initial session state
if not _is_shared_session:
await self._write_session_state(execution_id, ctx)
# Create modified graph with entry point
# We need to override the entry_node to use our entry point
modified_graph = self._create_modified_graph()
# Execute
result = await executor.execute(
graph=modified_graph,
goal=self.goal,
input_data=ctx.input_data,
session_state=ctx.session_state,
checkpoint_config=self._checkpoint_config,
)
# Write initial session state
if not _is_shared_session:
await self._write_session_state(execution_id, ctx)
# Clean up executor reference
self._active_executors.pop(execution_id, None)
# --- Resurrection loop ---
# Each iteration creates a fresh executor. On non-fatal failure,
# the executor's session_state (memory + resume_from) carries
# forward so the next attempt resumes at the failed node.
while True:
# Create executor for this execution.
# Each execution gets its own storage under sessions/{exec_id}/
# so conversations, spillover, and data files are all scoped
# to this execution. The executor sets data_dir via execution
# context (contextvars) so data tools and spillover share the
# same session-scoped directory.
executor = GraphExecutor(
runtime=runtime_adapter,
llm=self._llm,
tools=self._tools,
tool_executor=self._tool_executor,
event_bus=self._scoped_event_bus,
stream_id=self.stream_id,
execution_id=execution_id,
storage_path=exec_storage,
runtime_logger=runtime_logger,
loop_config=self.graph.loop_config,
accounts_prompt=self._accounts_prompt,
accounts_data=self._accounts_data,
tool_provider_map=self._tool_provider_map,
)
# Track executor so inject_input() can reach EventLoopNode instances
self._active_executors[execution_id] = executor
# Execute
result = await executor.execute(
graph=modified_graph,
goal=self.goal,
input_data=_current_input_data,
session_state=_current_session_state,
checkpoint_config=self._checkpoint_config,
)
# Clean up executor reference
self._active_executors.pop(execution_id, None)
# Check if resurrection is appropriate
if (
not result.success
and not result.paused_at
and _resurrection_count < max_resurrections
and result.session_state
and not self._is_fatal_error(result.error)
):
_resurrection_count += 1
logger.warning(
"Execution %s failed (%s) — resurrecting (%d/%d) from node '%s'",
execution_id,
(result.error or "unknown")[:200],
_resurrection_count,
max_resurrections,
result.session_state.get("resume_from", "?"),
)
# Emit resurrection event
if self._scoped_event_bus:
from framework.runtime.event_bus import AgentEvent, EventType
await self._scoped_event_bus.publish(
AgentEvent(
type=EventType.EXECUTION_RESURRECTED,
stream_id=self.stream_id,
execution_id=execution_id,
data={
"attempt": _resurrection_count,
"max_resurrections": max_resurrections,
"error": (result.error or "")[:500],
"resume_from": result.session_state.get("resume_from"),
},
)
)
# Resume from the failed node with preserved memory
_current_session_state = {
**result.session_state,
"resume_session_id": execution_id,
}
# On resurrection, input_data is already in memory —
# pass empty so we don't overwrite intermediate results.
_current_input_data = {}
# Brief cooldown before resurrection
await asyncio.sleep(2.0)
continue
break # success, fatal failure, or resurrections exhausted
# Store result with retention
self._record_execution_result(execution_id, result)
+600
View File
@@ -0,0 +1,600 @@
FULL CALL PATH: FRONTEND SESSION START TO AGENT EXECUTION
===================================================================
STEP 1: FRONTEND HTTP REQUEST (API ENTRY POINT)
===================================================================
FILE: /Users/timothy/repo/hive/core/framework/server/routes_sessions.py
ENDPOINT: POST /api/sessions (line 103)
FUNCTION: async def handle_create_session(request: web.Request) -> web.Response
- Accepts optional "agent_path" in request body
- If agent_path provided: calls manager.create_session_with_worker()
- If no agent_path: calls manager.create_session()
- Returns 201 with session details
CALL CHAIN:
handle_create_session (line 103)
├─ validate_agent_path(agent_path) [line 128]
├─ manager.create_session_with_worker() [line 135] OR manager.create_session() [line 143]
└─ _session_to_live_dict(session) [line 169]
===================================================================
STEP 2: SESSION CREATION (MANAGER LAYER)
===================================================================
FILE: /Users/timothy/repo/hive/core/framework/server/session_manager.py
FLOW A: Create Session with Worker (Single Step)
─────────────────────────────────────────────────
FUNCTION: async def create_session_with_worker() (line 128)
- Creates session infrastructure (EventBus, LLM)
- Loads worker agent
- Starts queen
CALL SEQUENCE:
create_session_with_worker (line 128)
├─ _create_session_core(model=model) [line 150]
│ │ Creates RuntimeConfig, LiteLLMProvider, EventBus
│ │ Creates Session dataclass with event_bus and llm
│ │ Stores in self._sessions[resolved_id]
│ └─ returns Session object
├─ _load_worker_core(session, agent_path, worker_id) [line 153]
│ │ Loads AgentRunner (blocking I/O via executor)
│ │ Calls runner._setup(event_bus=session.event_bus)
│ │ Starts worker_runtime if not already running
│ │ Cleans up stale sessions on disk
│ │ Updates session.runner, session.worker_runtime, etc.
│ └─ returns None (modifies session in-place)
├─ build_worker_profile(session.worker_runtime) [line 162]
│ └─ returns worker identity string for queen
└─ _start_queen(session, worker_identity) [line 166]
(See STEP 3 below)
FLOW B: Create Queen-Only Session
─────────────────────────────────
FUNCTION: async def create_session() (line 109)
CALL SEQUENCE:
create_session (line 109)
├─ _create_session_core(session_id, model) [line 120]
│ └─ (same as above)
└─ _start_queen(session, worker_identity=None) [line 123]
(See STEP 3 below)
===================================================================
STEP 3: WORKER AGENT LOADING (AGENT RUNNER LAYER)
===================================================================
FILE: /Users/timothy/repo/hive/core/framework/runner/runner.py
FUNCTION: AgentRunner.load() (line 789) - Static method
CALLED BY: _load_worker_core() via loop.run_in_executor() (line 213-220)
LOAD SEQUENCE:
load(agent_path, model, interactive, skip_credential_validation) (line 789)
├─ Tries agent.py path first:
│ └─ agent_py = agent_path / "agent.py"
│ ├─ _import_agent_module(agent_path) [line 823]
│ │ (Dynamically imports agent Python module)
│ │
│ ├─ Extract goal, nodes, edges from module [line 825-827]
│ ├─ Build GraphSpec from module variables [line 854-876]
│ └─ return AgentRunner(...) [line 889]
└─ Fallback to agent.json if no agent.py:
└─ load_agent_export(agent_json_path) [line 911]
└─ return AgentRunner(...) [line 913]
RETURN: AgentRunner instance (NOT YET STARTED)
AgentRunner.__init__() (line 609) - Constructor
├─ Stores graph, goal, model, storage_path
├─ _validate_credentials() [line 684]
│ (Checks required credentials are available)
├─ Auto-discover tools from tools.py [line 687-689]
│ └─ _tool_registry.discover_from_module(tools_path)
└─ Auto-discover MCP servers from mcp_servers.json [line 697-699]
└─ _load_mcp_servers_from_config(mcp_config_path)
NOTE: __init__ does NOT call _setup() yet — that happens later.
===================================================================
STEP 4: WORKER RUNTIME SETUP (AFTER LOAD)
===================================================================
FILE: /Users/timothy/repo/hive/core/framework/runner/runner.py
FUNCTION: runner._setup(event_bus=None) (line 1012)
CALLED BY: _load_worker_core() via loop.run_in_executor() (line 225-227)
SETUP SEQUENCE:
_setup(event_bus=session.event_bus) (line 1012)
├─ Configure logging [line 1015-1017]
│ └─ configure_logging(level="INFO", format="auto")
├─ Create LLM provider [line 1031-1145]
│ ├─ Check for mock mode → MockLLMProvider
│ ├─ Check for Claude Code subscription → LiteLLMProvider with OAuth
│ ├─ Check for Codex subscription → LiteLLMProvider with Codex API
│ ├─ Fallback to environment variables or credential store
│ └─ self._llm = <LLMProvider instance>
├─ Auto-register GCU MCP server if needed [line 1148-1170]
├─ Auto-register file tools MCP server [line 1173-1192]
├─ Get all tools from registry [line 1195-1196]
│ └─ tools = list(self._tool_registry.get_tools().values())
└─ _setup_agent_runtime(tools, tool_executor, accounts_prompt, event_bus) [line 1215]
(See STEP 5 below)
===================================================================
STEP 5: AGENT RUNTIME CREATION (CORE RUNTIME INSTANTIATION)
===================================================================
FILE: /Users/timothy/repo/hive/core/framework/runner/runner.py
(method _setup_agent_runtime, line 1299)
& /Users/timothy/repo/hive/core/framework/runtime/agent_runtime.py
(function create_agent_runtime, line 1642)
FUNCTION: runner._setup_agent_runtime() (line 1299)
CALLED BY: runner._setup() [line 1215]
SETUP SEQUENCE:
_setup_agent_runtime(tools, tool_executor, accounts_prompt, event_bus) (line 1299)
├─ Convert AsyncEntryPointSpec to EntryPointSpec [line 1310-1323]
├─ Create primary entry point for entry_node [line 1328-1338]
├─ Create RuntimeLogStore [line 1341]
├─ Create CheckpointConfig [line 1346-1352]
│ (Enables checkpointing by default for resumable sessions)
└─ create_agent_runtime(
graph=self.graph,
goal=self.goal,
storage_path=self._storage_path,
entry_points=entry_points,
llm=self._llm,
tools=tools,
tool_executor=tool_executor,
runtime_log_store=log_store,
checkpoint_config=checkpoint_config,
event_bus=event_bus,
) [line 1364]
NEXT: create_agent_runtime() in agent_runtime.py
FUNCTION: create_agent_runtime() (line 1642)
CREATION SEQUENCE:
create_agent_runtime(...) (line 1642)
├─ Auto-create RuntimeLogStore if needed [line 1689-1694]
├─ Create AgentRuntime instance [line 1696]
│ └─ runtime = AgentRuntime(
│ graph=graph,
│ goal=goal,
│ storage_path=storage_path,
│ llm=llm,
│ tools=tools,
│ tool_executor=tool_executor,
│ runtime_log_store=runtime_log_store,
│ checkpoint_config=checkpoint_config,
│ event_bus=event_bus, # <-- SHARED WITH QUEEN/JUDGE
│ ) [line 1696]
├─ Register each entry point [line 1713-1714]
│ └─ runtime.register_entry_point(spec) for each spec
└─ return runtime [line 1716]
RETURN: AgentRuntime instance (NOT YET STARTED)
===================================================================
STEP 6: AGENT RUNTIME INITIALIZATION (RUNTIME CLASS)
===================================================================
FILE: /Users/timothy/repo/hive/core/framework/runtime/agent_runtime.py
FUNCTION: AgentRuntime.__init__() (line 118)
INITIALIZATION:
AgentRuntime.__init__(...) (line 118)
├─ Initialize storage (ConcurrentStorage) [line 175-179]
├─ Initialize SessionStore for unified sessions [line 182]
├─ Initialize shared components:
│ ├─ SharedStateManager [line 185]
│ ├─ EventBus (or use shared one) [line 186]
│ └─ OutcomeAggregator [line 187]
├─ Store LLM, tools, tool_executor [line 190-195]
├─ Initialize entry points dict [line 198]
├─ Initialize execution streams dict [line 199]
└─ Set state to NOT running [line 211: self._running = False]
RETURN: Unstarted AgentRuntime instance
NEXT: register_entry_point() for each entry point
FUNCTION: AgentRuntime.register_entry_point() (line 218)
├─ Validate entry node exists [line 236-237]
└─ Store spec in self._entry_points[spec.id] [line 239]
===================================================================
STEP 7: QUEEN STARTUP (CONCURRENT WITH WORKER)
===================================================================
FILE: /Users/timothy/repo/hive/core/framework/server/session_manager.py
FUNCTION: _start_queen() (line 394)
CALLED BY: create_session() OR create_session_with_worker()
QUEEN STARTUP SEQUENCE:
_start_queen(session, worker_identity, initial_prompt) (line 394)
├─ Create queen directory [line 410-411]
│ └─ ~/.hive/queen/session/{session.id}/
├─ Register MCP coding tools [line 414-424]
│ └─ Load from hive_coder/mcp_servers.json
├─ Register lifecycle tools [line 428-436]
│ └─ register_queen_lifecycle_tools()
├─ Register worker monitoring tools if worker exists [line 438-448]
│ └─ register_worker_monitoring_tools()
├─ Build queen graph with adjusted prompt [line 454-478]
│ ├─ Add worker_identity to system prompt
│ └─ Filter tools to available ones
├─ Create queen executor task [line 482-519]
│ └─ async def _queen_loop():
│ ├─ Create GraphExecutor [line 484]
│ ├─ Call executor.execute(graph=queen_graph, goal=queen_goal, ...) [line 501]
│ └─ (Queen stays alive forever unless error)
└─ session.queen_task = asyncio.create_task(_queen_loop()) [line 519]
RESULT: Queen task starts in background, never awaited
===================================================================
STEP 8: WORKER RUNTIME START
===================================================================
FILE: /Users/timothy/repo/hive/core/framework/runtime/agent_runtime.py
FUNCTION: AgentRuntime.start() (line 263)
CALLED BY: _load_worker_core() [line 234 in session_manager.py]
START SEQUENCE:
await runtime.start() (line 263)
├─ Mark as running [line 266: self._running = True]
├─ Create ExecutionStream for each registered entry point [loop in start()]
│ └─ stream = ExecutionStream(
│ stream_id=entry_point.id,
│ entry_spec=entry_point_spec,
│ graph=self.graph,
│ goal=self.goal,
│ state_manager=self._state_manager,
│ storage=self._storage,
│ outcome_aggregator=self._outcome_aggregator,
│ event_bus=self._event_bus, # <-- SHARED
│ llm=self._llm,
│ tools=self._tools,
│ tool_executor=self._tool_executor,
│ )
├─ Start each stream [await stream.start() for each stream]
├─ Setup webhook server if configured [line ~350]
├─ Register event-driven entry points (timers, webhooks) [line ~400]
└─ self._running = True [line 266]
RESULT: AgentRuntime ready to execute
===================================================================
STEP 9: TRIGGER EXECUTION (MANUAL VIA ENTRY POINT)
===================================================================
FILE: /Users/timothy/repo/hive/core/framework/runtime/agent_runtime.py
FUNCTION: async def trigger() (line 790)
CALLED BY: Frontend API, timers, webhooks, manual calls
TRIGGER SEQUENCE:
await runtime.trigger(entry_point_id, input_data, session_state) (line 790)
├─ Verify runtime is running [line 818]
├─ Resolve stream for entry point [line 821]
│ └─ stream = self._resolve_stream(entry_point_id)
└─ return await stream.execute(input_data, correlation_id, session_state) [line 825]
(See STEP 10 below)
RETURNS: execution_id (non-blocking)
===================================================================
STEP 10: EXECUTION STREAM MANAGEMENT
===================================================================
FILE: /Users/timothy/repo/hive/core/framework/runtime/execution_stream.py
FUNCTION: ExecutionStream.execute() (line 426)
CALLED BY: AgentRuntime.trigger() [line 825]
EXECUTE SEQUENCE:
await stream.execute(input_data, correlation_id, session_state) (line 426)
├─ Verify stream is running [line 445]
├─ Cancel any existing running executions [line 453-467]
│ (Only one execution per stream at a time)
├─ Generate execution_id [line 473-487]
│ ├─ If resuming: use resume_session_id [line 474]
│ ├─ Otherwise: generate from SessionStore [line 476]
│ └─ Format: session_{timestamp}_{uuid}
├─ Create ExecutionContext [line 493]
│ └─ ctx = ExecutionContext(
│ id=execution_id,
│ correlation_id=correlation_id,
│ stream_id=stream_id,
│ input_data=input_data,
│ session_state=session_state,
│ )
├─ Store context in self._active_executions [line 504]
├─ Create completion event [line 505]
├─ Start async execution task [line 508]
│ └─ task = asyncio.create_task(self._run_execution(ctx))
└─ return execution_id [line 512] (non-blocking)
RESULT: Execution queued, _run_execution() runs in background
===================================================================
STEP 11: EXECUTION RUNNER (BACKGROUND TASK)
===================================================================
FILE: /Users/timothy/repo/hive/core/framework/runtime/execution_stream.py
FUNCTION: ExecutionStream._run_execution() (line 538)
CALLED BY: asyncio.create_task() [line 508]
RUNS IN BACKGROUND: Yes, non-blocking
EXECUTION SEQUENCE:
await _run_execution(ctx) (line 538)
├─ Acquire semaphore for concurrency control [line 558]
├─ Mark status as "running" [line 559]
├─ Create execution-scoped memory [line 572-576]
│ └─ self._state_manager.create_memory(execution_id, stream_id, isolation)
├─ Start runtime adapter [line 579-586]
│ └─ runtime_adapter.start_run(goal_id, goal_description, input_data)
├─ Create RuntimeLogger [line 589-595]
├─ Determine storage location [line 601-604]
│ └─ exec_storage = self._session_store.sessions_dir / execution_id
├─ Write initial session state [line 611-612]
├─ RESURRECTION LOOP [line 618]
│ └─ while True:
│ ├─ Create GraphExecutor [line 625-639]
│ │ └─ executor = GraphExecutor(
│ │ runtime=runtime_adapter,
│ │ llm=self._llm,
│ │ tools=self._tools,
│ │ tool_executor=self._tool_executor,
│ │ event_bus=self._scoped_event_bus, # <-- SHARED
│ │ storage_path=exec_storage,
│ │ checkpoint_config=self._checkpoint_config,
│ │ )
│ │
│ ├─ Execute graph [line 644]
│ │ └─ result = await executor.execute(
│ │ graph=modified_graph,
│ │ goal=self.goal,
│ │ input_data=_current_input_data,
│ │ session_state=_current_session_state,
│ │ checkpoint_config=self._checkpoint_config,
│ │ )
│ │
│ └─ Check for resurrection [line 656-707]
│ (On non-fatal error, retry from failed node)
├─ Record result [line 710]
│ └─ self._record_execution_result(execution_id, result)
├─ Emit completion event [line 730-754]
│ ├─ execution_completed (if success)
│ ├─ execution_paused (if paused)
│ └─ execution_failed (if error)
└─ Mark completion event [line 774]
└─ self._completion_events[execution_id].set()
RESULT: Execution complete, event emitted, task ends
===================================================================
STEP 12: GRAPH EXECUTION (THE ACTUAL AGENT LOGIC)
===================================================================
FILE: /Users/timothy/repo/hive/core/framework/graph/executor.py
FUNCTION: GraphExecutor.execute() (line 289)
CALLED BY: ExecutionStream._run_execution() [line 644]
RUNS IN BACKGROUND: Yes, as part of _run_execution task
EXECUTION SEQUENCE:
await executor.execute(graph, goal, input_data, session_state, checkpoint_config) (line 289)
├─ Validate graph [line 312-318]
├─ Validate tool availability [line 320-332]
├─ Initialize SharedMemory for session [line 335]
├─ Restore session state if resuming [line 353-369]
│ └─ Load memory from previous session
├─ Restore checkpoints if available [line 412-463]
├─ Determine entry point (normal or resume) [line 464-492]
├─ Start run in observability system [line 567-579]
├─ MAIN EXECUTION LOOP [line 596]
│ └─ while steps < graph.max_steps:
│ │
│ ├─ Check for pause requests [line 599-636]
│ │
│ ├─ Get current node spec [line 648-650]
│ │ └─ node_spec = graph.get_node(current_node_id)
│ │
│ ├─ Enforce max_node_visits [line 652-678]
│ │
│ ├─ Append node to execution path [line 680]
│ │
│ ├─ Clear stale nullable outputs [line 682-695]
│ │
│ ├─ Create node context [line 730-745]
│ │ └─ ctx = self._build_context(node_spec, memory, goal, ...)
│ │
│ ├─ Get/create node implementation [line 760]
│ │ └─ node_impl = self._get_node_implementation(node_spec, ...)
│ │
│ ├─ Validate inputs [line 762-769]
│ │
│ ├─ Create checkpoints [line 771-790]
│ │
│ ├─ EXECUTE NODE [line 800-802]
│ │ └─ result = await node_impl.execute(ctx)
│ │ (Executes LLM call, tool calls, or other logic)
│ │
│ ├─ Handle success [line 825-876]
│ │ ├─ Validate output [line 836-850]
│ │ └─ Write to memory [line 874-876]
│ │
│ ├─ Handle failure and retries [line 884-934]
│ │ ├─ Track retry count [line 886-888]
│ │ ├─ Check max_retries [line 906-934]
│ │ └─ Sleep with exponential backoff before retry
│ │
│ ├─ Update progress in state.json [line 941]
│ │ └─ self._write_progress(current_node_id, path, memory, ...)
│ │
│ ├─ FOLLOW EDGES [line 942+]
│ │ └─ next_node = await self._follow_edges(
│ │ graph, goal, current_node_id,
│ │ node_spec, result, memory
│ │ )
│ │ Evaluates conditional edges, determines next node
│ │
│ └─ Transition to next node [line steps += 1]
│ (Loop continues with next node)
├─ Handle timeout/max_steps [line 596: while steps < graph.max_steps]
└─ Return ExecutionResult [line 1100+]
└─ ExecutionResult(
success=success,
output=final_output,
error=error_message,
paused_at=paused_node_id,
session_state={memory, path, ...},
)
RESULT: ExecutionResult returned to ExecutionStream._run_execution()
===================================================================
DATA FLOW SUMMARY
===================================================================
Shared Component: EventBus
├─ Created in Session (line 95 in session_manager.py)
├─ Passed to AgentRuntime.__init__ (line 186 in agent_runtime.py)
├─ Stored and used by ExecutionStream (line 219 in execution_stream.py)
├─ Wrapped as GraphScopedEventBus (line 254 in execution_stream.py)
├─ Passed to GraphExecutor (line 630 in execution_stream.py)
└─ Used for event publishing during execution
Shared Component: LLM Provider
├─ Created in Session._create_session_core() (line 89-94 in session_manager.py)
├─ Passed to AgentRuntime.__init__ (line 123 in agent_runtime.py)
├─ Stored and used by ExecutionStream (line 220 in execution_stream.py)
├─ Passed to GraphExecutor (line 627 in execution_stream.py)
└─ Used by node implementations for LLM calls
Memory Flow:
├─ Each execution has ExecutionContext with input_data
├─ SharedMemory created per execution (line 572-576 in execution_stream.py)
├─ Session state restored if resuming (line 354-369 in executor.py)
├─ Each node reads from memory via input_keys
├─ Each node writes to memory via output_keys
├─ Memory checkpoints created for resumability
└─ Final memory returned in ExecutionResult
===================================================================
KEY FILE PATHS AND LINE NUMBERS
===================================================================
1. API Entry: /Users/timothy/repo/hive/core/framework/server/routes_sessions.py:103
2. Session Manager: /Users/timothy/repo/hive/core/framework/server/session_manager.py:128
3. Agent Runner Load: /Users/timothy/repo/hive/core/framework/runner/runner.py:789
4. Agent Runner Setup: /Users/timothy/repo/hive/core/framework/runner/runner.py:1012
5. Runtime Creation: /Users/timothy/repo/hive/core/framework/runtime/agent_runtime.py:1642
6. Runtime Class: /Users/timothy/repo/hive/core/framework/runtime/agent_runtime.py:66
7. Trigger Method: /Users/timothy/repo/hive/core/framework/runtime/agent_runtime.py:790
8. Execution Stream: /Users/timothy/repo/hive/core/framework/runtime/execution_stream.py:134
9. Graph Executor: /Users/timothy/repo/hive/core/framework/graph/executor.py:102
10. Main Loop: /Users/timothy/repo/hive/core/framework/graph/executor.py:596
@@ -161,7 +161,7 @@ Only include the jobs the user explicitly selected.
customize_node = NodeSpec(
id="customize",
name="Customize",
description="For each selected job, generate resume customization list and cold outreach email as HTML",
description="For each selected job, generate resume customization list and cold outreach email, create Gmail drafts",
node_type="event_loop",
client_facing=True,
max_node_visits=1,
@@ -169,7 +169,7 @@ customize_node = NodeSpec(
output_keys=["application_materials"],
success_criteria=(
"Resume customization list and cold outreach email generated "
"for each selected job, saved as a single HTML file and opened for the user."
"for each selected job, saved as HTML, and Gmail drafts created in user's inbox."
),
system_prompt="""\
You are a career coach creating personalized application materials.
@@ -223,8 +223,8 @@ append_data(filename="application_materials.html", data="</body>\\n</html>")
```
**Step 4 Serve the file:**
Call serve_file_to_user(filename="application_materials.html", open_in_browser=true)
Print the file_path from the result so the user can click it later.
Call serve_file_to_user(filename="application_materials.html")
Print the file_path from the result so the user can access it later.
**Step 5 Create Gmail Drafts (in batches of 5):**
IMPORTANT: Do NOT create all drafts in one turn. Create at most 5 gmail_create_draft calls \
@@ -234,12 +234,17 @@ drafts, then create the remaining drafts in the next turn.
For each selected job, call gmail_create_draft with:
- to: hiring manager email if available, otherwise "hiring@company-domain.com"
- subject: the cold email subject line
- html: the cold email body as HTML
- body: the cold email body as plain text
- draft: true (create as draft, not send immediately)
If gmail_create_draft errors (e.g. credentials not configured), skip ALL remaining drafts and tell the user:
"Gmail drafts could not be created (Gmail not connected). You can copy the emails from the HTML report instead."
**Step 6 Finish:**
Call set_output("application_materials", "Created application_materials.html with materials for {N} jobs")
**Step 6 Confirm Gmail Drafts Created:**
After all drafts are created, tell the user: "Created {N} draft emails in your Gmail inbox. You can review and send them when ready."
**Step 7 Finish:**
Call set_output("application_materials", "Created application_materials.html with materials for {N} jobs and {N} Gmail drafts")
**IMPORTANT:**
- Only suggest truthful resume changes enhance presentation, never fabricate