Compare commits

...

36 Commits

Author SHA1 Message Date
Timothy @aden 46b4651073 Merge pull request #6589 from aden-hive/fix/data-disclosure-gaps
Release / Create Release (push) Waiting to run
Fix data disclosure gaps, add worker run digests, clean up deprecated tools
2026-03-17 20:46:12 -07:00
Timothy 86dd5246c6 Merge remote-tracking branch 'origin/fix/resume-with-scheduler' into fix/data-disclosure-gaps 2026-03-17 20:44:28 -07:00
Timothy a1227c88ee Merge remote-tracking branch 'origin/fix/resume-with-scheduler' into fix/data-disclosure-gaps 2026-03-17 20:42:25 -07:00
Timothy 535d7ab568 fix: worker digest sub event 2026-03-17 20:41:56 -07:00
Richard Tang af10494b31 chore: ruff lint 2026-03-17 20:41:08 -07:00
Richard Tang 39c1042827 fix: fall back to queen-only session when worker load fails on cold restore 2026-03-17 20:38:41 -07:00
Richard Tang 16e7dc11f4 fix: don't overwrite meta in queen creation 2026-03-17 20:27:39 -07:00
Richard Tang 7a27babefd feat: track and resume the session by phase 2026-03-17 20:22:54 -07:00
Timothy d53ae9d51d fix: deprecated tests 2026-03-17 20:20:21 -07:00
Timothy 910cf7727d Merge remote-tracking branch 'origin/fix/resume-with-scheduler' into fix/data-disclosure-gaps 2026-03-17 20:14:25 -07:00
Timothy 1698605f15 chore: lint 2026-03-17 19:59:23 -07:00
Timothy eda124a123 chore: lint 2026-03-17 19:58:08 -07:00
Timothy 15e9ce8d2f Merge remote-tracking branch 'origin/feature/session-digest' into fix/data-disclosure-gaps 2026-03-17 19:45:07 -07:00
Timothy c01dd603d7 fix: digest invocation 2026-03-17 19:44:22 -07:00
Timothy 9d5157d69f feat: queen subscribe to worker digest 2026-03-17 19:23:43 -07:00
Timothy d78795bdf5 Merge remote-tracking branch 'origin/feature/session-digest' into fix/data-disclosure-gaps 2026-03-17 19:15:22 -07:00
Timothy ff2b7f473e fix: subagent execution 2026-03-17 19:15:07 -07:00
Timothy 73c9a91811 feat: add worker memory consolidation hooks 2026-03-17 19:14:07 -07:00
Timothy 27b765d902 Merge branch 'feature/session-digest' into fix/data-disclosure-gaps 2026-03-17 18:32:20 -07:00
Timothy fddba419be fix: minor issues 2026-03-17 18:30:57 -07:00
Timothy f42d6308e8 Merge branch 'main' into fix/data-disclosure-gaps 2026-03-17 17:50:36 -07:00
Timothy c167002754 fix: data disclosure gaps 2026-03-17 17:50:08 -07:00
Timothy @aden ea26ee7d0c Merge pull request #6568 from aden-hive/feature/node-focus-prompt
Inject execution-scope preamble into worker node system prompts
2026-03-17 17:38:49 -07:00
Richard Tang 5280e908b2 feat: change the agent last active time 2026-03-17 17:35:01 -07:00
RichardTang-Aden 1c5dd8c664 Merge pull request #5178 from Schlaflied/feat/sdr-agent-template
feat(templates): add SDR Agent sample template
2026-03-17 16:05:45 -07:00
Richard Tang 3aca153be5 fix: add missing flowchart and terminal nodes 2026-03-17 16:03:29 -07:00
Timothy 65c8e1653c chore: lint 2026-03-17 15:31:36 -07:00
Timothy 58e4fa918c feat: make worker node aware of boundaries 2026-03-17 15:28:41 -07:00
Timothy 3af13d3f90 feat: session digest for run scoped diary 2026-03-17 14:25:32 -07:00
Timothy @aden d2eb86e534 Merge pull request #6540 from sundaram2021/fix/make-windows-compatibility
fix make test compatibility on windows
2026-03-17 11:41:32 -07:00
Schlaflied 48747e20af fix: remove personal oauth credential entries from .gitignore
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-17 13:53:16 -04:00
Schlaflied 58af593af6 revert: remove unrelated changes from previous commit
Restore .claude/settings.json and revert .gitignore change
that were accidentally included in the sdr-agent refactor commit.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-17 13:52:44 -04:00
Schlaflied 450575a927 refactor(sdr-agent): reuse agent.start() in tui command and fix mock mode
- Replace duplicated setup code in tui command with agent.start(mock_mode=mock)
- Fix mock mode to use MockLLMProvider instead of llm=None
- Add demo_contacts.json sample data for template testing
- Untrack .claude/settings.json and add to .gitignore

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-17 13:52:10 -04:00
Schlaflied eac2bb19b2 fix(sdr-agent): fix agent runtime lifecycle and mcp config
- Replace self._executor with self._agent_runtime (AgentRuntime | None)
- Import AgentRuntime for proper type annotation
- Add missing await self._agent_runtime.start() in start() — runtime
  was created but never started, causing silent failures at runtime
- Add self._agent_runtime = None reset in stop() for clean restart
- Remove redundant self._graph is None guard in trigger_and_wait()
- Update mcp_servers.json with hive-tools server config
- Add credential file patterns to .gitignore

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-17 13:50:29 -04:00
Schlaflied 756a815bf0 feat(templates): add SDR Agent sample template 2026-03-17 13:50:05 -04:00
Sundaram Kumar Jha ff7b5c7e27 fix: prepend ~/.local/bin to PATH so uv is found in Git Bash on Windows 2026-03-17 01:28:25 +05:30
37 changed files with 3049 additions and 1157 deletions
-1
View File
@@ -68,7 +68,6 @@ temp/
exports/*
.claude/settings.local.json
.claude/skills/ship-it/
.venv
+9 -2
View File
@@ -1,4 +1,11 @@
.PHONY: lint format check test install-hooks help frontend-install frontend-dev frontend-build
.PHONY: lint format check test test-tools test-live test-all install-hooks help frontend-install frontend-dev frontend-build
# ── Ensure uv is findable in Git Bash on Windows ──────────────────────────────
# uv installs to ~/.local/bin on Windows/Linux/macOS. Git Bash may not include
# this in PATH by default, so we prepend it here.
export PATH := $(HOME)/.local/bin:$(PATH)
# ── Targets ───────────────────────────────────────────────────────────────────
help: ## Show this help
@grep -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) | \
@@ -46,4 +53,4 @@ frontend-dev: ## Start frontend dev server
cd core/frontend && npm run dev
frontend-build: ## Build frontend for production
cd core/frontend && npm run build
cd core/frontend && npm run build
+50 -19
View File
@@ -23,25 +23,56 @@ class AgentEntry:
last_active: str | None = None
def _get_last_active(agent_name: str) -> str | None:
"""Return the most recent updated_at timestamp across all sessions."""
sessions_dir = Path.home() / ".hive" / "agents" / agent_name / "sessions"
if not sessions_dir.exists():
return None
def _get_last_active(agent_path: Path) -> str | None:
"""Return the most recent updated_at timestamp across all sessions.
Checks both worker sessions (``~/.hive/agents/{name}/sessions/``) and
queen sessions (``~/.hive/queen/session/``) whose ``meta.json`` references
the same *agent_path*.
"""
from datetime import datetime
agent_name = agent_path.name
latest: str | None = None
for session_dir in sessions_dir.iterdir():
if not session_dir.is_dir() or not session_dir.name.startswith("session_"):
continue
state_file = session_dir / "state.json"
if not state_file.exists():
continue
try:
data = json.loads(state_file.read_text(encoding="utf-8"))
ts = data.get("timestamps", {}).get("updated_at")
if ts and (latest is None or ts > latest):
latest = ts
except Exception:
continue
# 1. Worker sessions
sessions_dir = Path.home() / ".hive" / "agents" / agent_name / "sessions"
if sessions_dir.exists():
for session_dir in sessions_dir.iterdir():
if not session_dir.is_dir() or not session_dir.name.startswith("session_"):
continue
state_file = session_dir / "state.json"
if not state_file.exists():
continue
try:
data = json.loads(state_file.read_text(encoding="utf-8"))
ts = data.get("timestamps", {}).get("updated_at")
if ts and (latest is None or ts > latest):
latest = ts
except Exception:
continue
# 2. Queen sessions
queen_sessions_dir = Path.home() / ".hive" / "queen" / "session"
if queen_sessions_dir.exists():
resolved = agent_path.resolve()
for d in queen_sessions_dir.iterdir():
if not d.is_dir():
continue
meta_file = d / "meta.json"
if not meta_file.exists():
continue
try:
meta = json.loads(meta_file.read_text(encoding="utf-8"))
stored = meta.get("agent_path")
if not stored or Path(stored).resolve() != resolved:
continue
ts = datetime.fromtimestamp(d.stat().st_mtime).isoformat()
if latest is None or ts > latest:
latest = ts
except Exception:
continue
return latest
@@ -169,7 +200,7 @@ def discover_agents() -> dict[str, list[AgentEntry]]:
node_count=node_count,
tool_count=tool_count,
tags=tags,
last_active=_get_last_active(path.name),
last_active=_get_last_active(path),
)
)
if entries:
+286
View File
@@ -0,0 +1,286 @@
"""Worker per-run digest (run diary).
Storage layout:
~/.hive/agents/{agent_name}/runs/{run_id}/digest.md
Each completed or failed worker run gets one digest file. The queen reads
these via get_worker_status(focus='diary') before digging into live runtime
logs the diary is a cheap, persistent record that survives across sessions.
"""
from __future__ import annotations
import logging
import traceback
from collections import Counter
from datetime import datetime
from pathlib import Path
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from framework.runtime.event_bus import AgentEvent, EventBus
logger = logging.getLogger(__name__)
_DIGEST_SYSTEM = """\
You maintain run digests for a worker agent.
A run digest is a concise, factual record of a single task execution.
Write 3-6 sentences covering:
- What the worker was asked to do (the task/goal)
- What approach it took and what tools it used
- What the outcome was (success, partial, or failure and why if relevant)
- Any notable issues, retries, or escalations to the queen
Write in third person past tense. Be direct and specific.
Omit routine tool invocations unless the result matters.
Output only the digest prose no headings, no code fences.
"""
def _worker_runs_dir(agent_name: str) -> Path:
return Path.home() / ".hive" / "agents" / agent_name / "runs"
def digest_path(agent_name: str, run_id: str) -> Path:
return _worker_runs_dir(agent_name) / run_id / "digest.md"
def _collect_run_events(bus: EventBus, run_id: str, limit: int = 2000) -> list[AgentEvent]:
"""Collect all events belonging to *run_id* from the bus history.
Strategy: find the EXECUTION_STARTED event that carries ``run_id``,
extract its ``execution_id``, then query the bus by that execution_id.
This works because TOOL_CALL_*, EDGE_TRAVERSED, NODE_STALLED etc. carry
execution_id but not run_id.
Falls back to a full-scan run_id filter when EXECUTION_STARTED is not
found (e.g. bus was rotated).
"""
from framework.runtime.event_bus import EventType
# Pass 1: find execution_id via EXECUTION_STARTED with matching run_id
started = bus.get_history(event_type=EventType.EXECUTION_STARTED, limit=limit)
exec_id: str | None = None
for e in started:
if getattr(e, "run_id", None) == run_id and e.execution_id:
exec_id = e.execution_id
break
if exec_id:
return bus.get_history(execution_id=exec_id, limit=limit)
# Fallback: scan all events and match by run_id attribute
return [e for e in bus.get_history(limit=limit) if getattr(e, "run_id", None) == run_id]
def _build_run_context(
events: list[AgentEvent],
outcome_event: AgentEvent | None,
) -> str:
"""Assemble a plain-text run context string for the digest LLM call."""
from framework.runtime.event_bus import EventType
# Reverse so events are in chronological order
events_chron = list(reversed(events))
lines: list[str] = []
# Task input from EXECUTION_STARTED
started = [e for e in events_chron if e.type == EventType.EXECUTION_STARTED]
if started:
inp = started[0].data.get("input", {})
if inp:
lines.append(f"Task input: {str(inp)[:400]}")
# Duration (elapsed so far if no outcome yet)
ref_ts = outcome_event.timestamp if outcome_event else datetime.utcnow()
if started:
elapsed = (ref_ts - started[0].timestamp).total_seconds()
m, s = divmod(int(elapsed), 60)
lines.append(f"Duration so far: {m}m {s}s" if m else f"Duration so far: {s}s")
# Outcome
if outcome_event is None:
lines.append("Status: still running (mid-run snapshot)")
elif outcome_event.type == EventType.EXECUTION_COMPLETED:
out = outcome_event.data.get("output", {})
out_str = f"Outcome: completed. Output: {str(out)[:300]}"
lines.append(out_str if out else "Outcome: completed.")
else:
err = outcome_event.data.get("error", "")
lines.append(f"Outcome: failed. Error: {str(err)[:300]}" if err else "Outcome: failed.")
# Node path (edge traversals)
edges = [e for e in events_chron if e.type == EventType.EDGE_TRAVERSED]
if edges:
parts = [
f"{e.data.get('source_node', '?')}->{e.data.get('target_node', '?')}"
for e in edges[-20:]
]
lines.append(f"Node path: {', '.join(parts)}")
# Tools used
tool_events = [e for e in events_chron if e.type == EventType.TOOL_CALL_COMPLETED]
if tool_events:
names = [e.data.get("tool_name", "?") for e in tool_events]
counts = Counter(names)
summary = ", ".join(f"{name}×{n}" if n > 1 else name for name, n in counts.most_common())
lines.append(f"Tools used: {summary}")
# Note any tool errors
errors = [e for e in tool_events if e.data.get("is_error")]
if errors:
err_names = Counter(e.data.get("tool_name", "?") for e in errors)
lines.append(f"Tool errors: {dict(err_names)}")
# Issues
issue_map = {
EventType.NODE_STALLED: "stall",
EventType.NODE_TOOL_DOOM_LOOP: "doom loop",
EventType.CONSTRAINT_VIOLATION: "constraint violation",
EventType.NODE_RETRY: "retry",
}
issue_parts: list[str] = []
for evt_type, label in issue_map.items():
n = sum(1 for e in events_chron if e.type == evt_type)
if n:
issue_parts.append(f"{n} {label}(s)")
if issue_parts:
lines.append(f"Issues: {', '.join(issue_parts)}")
# Escalations to queen
escalations = [e for e in events_chron if e.type == EventType.ESCALATION_REQUESTED]
if escalations:
lines.append(f"Escalations to queen: {len(escalations)}")
# Final LLM output snippet (last LLM_TEXT_DELTA snapshot)
text_events = [e for e in reversed(events_chron) if e.type == EventType.LLM_TEXT_DELTA]
if text_events:
snapshot = text_events[0].data.get("snapshot", "") or ""
if snapshot:
lines.append(f"Final LLM output: {snapshot[-400:].strip()}")
return "\n".join(lines)
async def consolidate_worker_run(
agent_name: str,
run_id: str,
outcome_event: AgentEvent | None,
bus: EventBus,
llm: Any,
) -> None:
"""Write (or overwrite) the digest for a worker run.
Called fire-and-forget either:
- After EXECUTION_COMPLETED / EXECUTION_FAILED (outcome_event set, final write)
- Periodically during a run on a cooldown timer (outcome_event=None, mid-run snapshot)
The digest file is always overwritten so each call produces the freshest view.
The final completion/failure call supersedes any mid-run snapshot.
Args:
agent_name: Worker agent directory name (determines storage path).
run_id: The run ID.
outcome_event: EXECUTION_COMPLETED or EXECUTION_FAILED event, or None for
a mid-run snapshot.
bus: The session EventBus (shared queen + worker).
llm: LLMProvider with an acomplete() method.
"""
try:
events = _collect_run_events(bus, run_id)
run_context = _build_run_context(events, outcome_event)
if not run_context:
logger.debug("worker_memory: no events for run %s, skipping digest", run_id)
return
is_final = outcome_event is not None
logger.info(
"worker_memory: generating %s digest for run %s ...",
"final" if is_final else "mid-run",
run_id,
)
from framework.agents.queen.config import default_config
resp = await llm.acomplete(
messages=[{"role": "user", "content": run_context}],
system=_DIGEST_SYSTEM,
max_tokens=min(default_config.max_tokens, 512),
)
digest_text = (resp.content or "").strip()
if not digest_text:
logger.warning("worker_memory: LLM returned empty digest for run %s", run_id)
return
path = digest_path(agent_name, run_id)
path.parent.mkdir(parents=True, exist_ok=True)
from framework.runtime.event_bus import EventType
ts = (outcome_event.timestamp if outcome_event else datetime.utcnow()).strftime(
"%Y-%m-%d %H:%M"
)
if outcome_event is None:
status = "running"
elif outcome_event.type == EventType.EXECUTION_COMPLETED:
status = "completed"
else:
status = "failed"
path.write_text(
f"# {run_id}\n\n**{ts}** | {status}\n\n{digest_text}\n",
encoding="utf-8",
)
logger.info(
"worker_memory: %s digest written for run %s (%d chars)",
status,
run_id,
len(digest_text),
)
except Exception:
tb = traceback.format_exc()
logger.exception("worker_memory: digest failed for run %s", run_id)
# Persist the error so it's findable without log access
error_path = _worker_runs_dir(agent_name) / run_id / "digest_error.txt"
try:
error_path.parent.mkdir(parents=True, exist_ok=True)
error_path.write_text(
f"run_id: {run_id}\ntime: {datetime.now().isoformat()}\n\n{tb}",
encoding="utf-8",
)
except Exception:
pass
def read_recent_digests(agent_name: str, max_runs: int = 5) -> list[tuple[str, str]]:
"""Return recent run digests as [(run_id, content), ...], newest first.
Args:
agent_name: Worker agent directory name.
max_runs: Maximum number of digests to return.
Returns:
List of (run_id, digest_content) tuples, ordered newest first.
"""
runs_dir = _worker_runs_dir(agent_name)
if not runs_dir.exists():
return []
digest_files = sorted(
runs_dir.glob("*/digest.md"),
key=lambda p: p.stat().st_mtime,
reverse=True,
)[:max_runs]
result: list[tuple[str, str]] = []
for f in digest_files:
try:
content = f.read_text(encoding="utf-8").strip()
if content:
result.append((f.parent.name, content))
except OSError:
continue
return result
+6 -2
View File
@@ -612,6 +612,11 @@ class NodeConversation:
continue # never prune errors
if msg.content.startswith("[Pruned tool result"):
continue # already pruned
# Tiny results (set_output acks, confirmations) — pruning
# saves negligible space but makes the LLM think the call
# failed, causing costly retries.
if len(msg.content) < 100:
continue
# Phase-aware: protect current phase messages
if self._current_phase and msg.phase_id == self._current_phase:
@@ -901,8 +906,7 @@ class NodeConversation:
full_path = str((spill_path / conv_filename).resolve())
ref_parts.append(
f"[Previous conversation saved to '{full_path}'. "
f"Use load_data('{conv_filename}'), read_file('{full_path}'), "
f"or run_command('cat \"{full_path}\"') to review if needed.]"
f"Use load_data('{conv_filename}') to review if needed.]"
)
elif not collapsed_msgs:
ref_parts.append("[Previous freeform messages compacted.]")
+97 -61
View File
@@ -243,7 +243,7 @@ class LoopConfig:
# Maximum seconds a delegate_to_sub_agent call may run before being
# killed. Subagents run a full event-loop so they naturally take
# longer than a single tool call — default is 10 minutes. 0 = no timeout.
subagent_timeout_seconds: float = 300.0
subagent_timeout_seconds: float = 600.0
# --- Lifecycle hooks ---
# Hooks are async callables keyed by event name. Supported events:
@@ -293,13 +293,26 @@ class OutputAccumulator:
Values are stored in memory and optionally written through to a
ConversationStore's cursor data for crash recovery.
When *spillover_dir* and *max_value_chars* are set, large values are
automatically saved to files and replaced with lightweight file
references. This guarantees auto-spill fires on **every** ``set()``
call regardless of code path (resume, checkpoint restore, etc.).
"""
values: dict[str, Any] = field(default_factory=dict)
store: ConversationStore | None = None
spillover_dir: str | None = None
max_value_chars: int = 0 # 0 = disabled
async def set(self, key: str, value: Any) -> None:
"""Set a key-value pair, persisting immediately if store is available."""
"""Set a key-value pair, auto-spilling large values to files.
When the serialised value exceeds *max_value_chars*, the data is
saved to ``<spillover_dir>/output_<key>.<ext>`` and *value* is
replaced with a compact file-reference string.
"""
value = self._auto_spill(key, value)
self.values[key] = value
if self.store:
cursor = await self.store.read_cursor() or {}
@@ -308,6 +321,39 @@ class OutputAccumulator:
cursor["outputs"] = outputs
await self.store.write_cursor(cursor)
def _auto_spill(self, key: str, value: Any) -> Any:
"""Save large values to a file and return a reference string."""
if self.max_value_chars <= 0 or not self.spillover_dir:
return value
val_str = json.dumps(value, ensure_ascii=False) if not isinstance(value, str) else value
if len(val_str) <= self.max_value_chars:
return value
spill_path = Path(self.spillover_dir)
spill_path.mkdir(parents=True, exist_ok=True)
ext = ".json" if isinstance(value, (dict, list)) else ".txt"
filename = f"output_{key}{ext}"
write_content = (
json.dumps(value, indent=2, ensure_ascii=False)
if isinstance(value, (dict, list))
else str(value)
)
(spill_path / filename).write_text(write_content, encoding="utf-8")
file_size = (spill_path / filename).stat().st_size
logger.info(
"set_output value auto-spilled: key=%s, %d chars → %s (%d bytes)",
key,
len(val_str),
filename,
file_size,
)
return (
f"[Saved to '{filename}' ({file_size:,} bytes). "
f"Use load_data(filename='{filename}') "
f"to access full data.]"
)
def get(self, key: str) -> Any | None:
"""Get a value by key, or None if not present."""
return self.values.get(key)
@@ -467,7 +513,11 @@ class EventLoopNode(NodeProtocol):
conversation._output_keys = (
ctx.cumulative_output_keys or ctx.node_spec.output_keys or None
)
accumulator = OutputAccumulator(store=self._conversation_store)
accumulator = OutputAccumulator(
store=self._conversation_store,
spillover_dir=self._config.spillover_dir,
max_value_chars=self._config.max_output_value_chars,
)
start_iteration = 0
_restored_recent_responses: list[str] = []
_restored_tool_fingerprints: list[list[tuple[str, str]]] = []
@@ -504,9 +554,21 @@ class EventLoopNode(NodeProtocol):
_restored_tool_fingerprints = []
# Fresh conversation: either isolated mode or first node in continuous mode.
from framework.graph.prompt_composer import _with_datetime
from framework.graph.prompt_composer import (
EXECUTION_SCOPE_PREAMBLE,
_with_datetime,
)
system_prompt = _with_datetime(ctx.node_spec.system_prompt or "")
# Prepend execution-scope preamble for worker nodes so the
# LLM knows it is one step in a pipeline and should not try
# to perform work that belongs to other nodes.
if (
not ctx.is_subagent_mode
and ctx.node_spec.node_type in ("event_loop", "gcu")
and ctx.node_spec.output_keys
):
system_prompt = f"{EXECUTION_SCOPE_PREAMBLE}\n\n{system_prompt}"
# Prepend GCU browser best-practices prompt for gcu nodes
if ctx.node_spec.node_type == "gcu":
from framework.graph.gcu import GCU_BROWSER_SYSTEM_PROMPT
@@ -573,7 +635,11 @@ class EventLoopNode(NodeProtocol):
# Stamp phase for first node in continuous mode
if _is_continuous:
conversation.set_current_phase(ctx.node_id)
accumulator = OutputAccumulator(store=self._conversation_store)
accumulator = OutputAccumulator(
store=self._conversation_store,
spillover_dir=self._config.spillover_dir,
max_value_chars=self._config.max_output_value_chars,
)
start_iteration = 0
# Add initial user message from input data
@@ -2197,58 +2263,24 @@ class EventLoopNode(NodeProtocol):
pass
key = tc.tool_input.get("key", "")
# Auto-spill: save large values to data files and
# replace with a lightweight file reference so shared
# memory / adapt.md / transition markers stay small.
spill_dir = self._config.spillover_dir
max_val = self._config.max_output_value_chars
if max_val > 0 and spill_dir:
val_str = (
json.dumps(value, ensure_ascii=False)
if not isinstance(value, str)
else value
)
if len(val_str) > max_val:
spill_path = Path(spill_dir)
spill_path.mkdir(parents=True, exist_ok=True)
ext = ".json" if isinstance(value, (dict, list)) else ".txt"
filename = f"output_{key}{ext}"
write_content = (
json.dumps(value, indent=2, ensure_ascii=False)
if isinstance(value, (dict, list))
else str(value)
)
(spill_path / filename).write_text(write_content, encoding="utf-8")
file_size = (spill_path / filename).stat().st_size
logger.info(
"set_output value auto-spilled: key=%s, "
"%d chars → %s (%d bytes)",
key,
len(val_str),
filename,
file_size,
)
# Replace value with reference
value = (
f"[Saved to '{filename}' ({file_size:,} bytes). "
f"Use load_data(filename='{filename}') "
f"to access full data.]"
)
# Update tool result to inform the LLM
result = ToolResult(
tool_use_id=tc.tool_use_id,
content=(
f"Output '{key}' was large "
f"({len(val_str):,} chars) — data saved "
f"to '{filename}' ({file_size:,} bytes). "
f"The next phase will see the file "
f"reference and can load full data."
),
is_error=False,
)
# Auto-spill happens inside accumulator.set()
# — it fires on every code path (fresh, resume,
# restore) and prevents overwrite regression.
await accumulator.set(key, value)
self._record_learning(key, value)
stored = accumulator.get(key)
# If the accumulator spilled, update the tool
# result so the LLM knows data was saved to a file.
if isinstance(stored, str) and stored.startswith("[Saved to '"):
result = ToolResult(
tool_use_id=tc.tool_use_id,
content=(
f"Output '{key}' auto-saved to file "
f"(value was too large for inline). "
f"{stored}"
),
is_error=False,
)
self._record_learning(key, stored)
outputs_set_this_turn.append(key)
await self._publish_output_key_set(stream_id, node_id, key, execution_id)
logged_tool_calls.append(
@@ -2627,6 +2659,11 @@ class EventLoopNode(NodeProtocol):
content=raw.content,
is_error=raw.is_error,
)
# Route through _truncate_tool_result so large
# subagent results are saved to spillover files
# and survive pruning (instead of being "cleared
# from context" with no recovery path).
result = self._truncate_tool_result(result, "delegate_to_sub_agent")
results_by_id[tc.tool_use_id] = result
logged_tool_calls.append(
{
@@ -4287,17 +4324,14 @@ class EventLoopNode(NodeProtocol):
)
parts.append(
"CONVERSATION HISTORY (freeform messages saved during compaction — "
"use load_data('<filename>'), read_file('<full_path>'), "
"or run_command('cat \"<full_path>\"') to review earlier dialogue):\n"
+ conv_list
"use load_data('<filename>') to review earlier dialogue):\n" + conv_list
)
if data_files:
file_list = "\n".join(
f" - {f} (full path: {data_dir / f})" for f in data_files[:30]
)
parts.append(
"DATA FILES (use load_data('<filename>'), read_file('<full_path>'), "
"or run_command('cat \"<full_path>\"') to read):\n" + file_list
"DATA FILES (use load_data('<filename>') to read):\n" + file_list
)
if not all_files:
parts.append(
@@ -4363,6 +4397,8 @@ class EventLoopNode(NodeProtocol):
return None
accumulator = await OutputAccumulator.restore(self._conversation_store)
accumulator.spillover_dir = self._config.spillover_dir
accumulator.max_value_chars = self._config.max_output_value_chars
cursor = await self._conversation_store.read_cursor()
start_iteration = cursor.get("iteration", 0) + 1 if cursor else 0
+7 -1
View File
@@ -1420,6 +1420,7 @@ class GraphExecutor:
next_spec = graph.get_node(current_node_id)
if next_spec and next_spec.node_type == "event_loop":
from framework.graph.prompt_composer import (
EXECUTION_SCOPE_PREAMBLE,
build_accounts_prompt,
build_narrative,
build_transition_marker,
@@ -1459,9 +1460,14 @@ class GraphExecutor:
)
# Compose new system prompt (Layer 1 + 2 + 3 + accounts)
# Prepend scope preamble to focus so the LLM stays
# within this node's responsibility.
_focus = next_spec.system_prompt
if next_spec.output_keys and _focus:
_focus = f"{EXECUTION_SCOPE_PREAMBLE}\n\n{_focus}"
new_system = compose_system_prompt(
identity_prompt=getattr(graph, "identity_prompt", None),
focus_prompt=next_spec.system_prompt,
focus_prompt=_focus,
narrative=narrative,
accounts_prompt=_node_accounts,
)
+43 -3
View File
@@ -26,6 +26,16 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
# Injected into every worker node's system prompt so the LLM understands
# it is one step in a multi-node pipeline and should not overreach.
EXECUTION_SCOPE_PREAMBLE = (
"EXECUTION SCOPE: You are one node in a multi-step workflow graph. "
"Focus ONLY on the task described in your instructions below. "
"Call set_output() for each of your declared output keys, then stop. "
"Do NOT attempt work that belongs to other nodes — the framework "
"routes data between nodes automatically."
)
def _with_datetime(prompt: str) -> str:
"""Append current datetime with local timezone to a system prompt."""
@@ -267,7 +277,9 @@ def build_transition_marker(
sections.append(f"\nCompleted: {previous_node.name}")
sections.append(f" {previous_node.description}")
# Outputs in memory
# Outputs in memory — use file references for large values so the
# next node loads full data from disk instead of seeing truncated
# inline previews that look deceptively complete.
all_memory = memory.read_all()
if all_memory:
memory_lines: list[str] = []
@@ -275,7 +287,29 @@ def build_transition_marker(
if value is None:
continue
val_str = str(value)
if len(val_str) > 300:
if len(val_str) > 300 and data_dir:
# Auto-spill large transition values to data files
import json as _json
data_path = Path(data_dir)
data_path.mkdir(parents=True, exist_ok=True)
ext = ".json" if isinstance(value, (dict, list)) else ".txt"
filename = f"output_{key}{ext}"
try:
write_content = (
_json.dumps(value, indent=2, ensure_ascii=False)
if isinstance(value, (dict, list))
else str(value)
)
(data_path / filename).write_text(write_content, encoding="utf-8")
file_size = (data_path / filename).stat().st_size
val_str = (
f"[Saved to '{filename}' ({file_size:,} bytes). "
f"Use load_data(filename='{filename}') to access.]"
)
except Exception:
val_str = val_str[:300] + "..."
elif len(val_str) > 300:
val_str = val_str[:300] + "..."
memory_lines.append(f" {key}: {val_str}")
if memory_lines:
@@ -292,7 +326,7 @@ def build_transition_marker(
]
if file_lines:
sections.append(
"\nData files (use read_file to access):\n" + "\n".join(file_lines)
"\nData files (use load_data to access):\n" + "\n".join(file_lines)
)
# Agent working memory
@@ -306,6 +340,12 @@ def build_transition_marker(
# Next phase
sections.append(f"\nNow entering: {next_node.name}")
sections.append(f" {next_node.description}")
if next_node.output_keys:
sections.append(
f"\nYour ONLY job in this phase: complete the task above and call "
f"set_output() for {next_node.output_keys}. Do NOT do work that "
f"belongs to later phases."
)
# Reflection prompt (engineered metacognition)
sections.append(
@@ -69,6 +69,7 @@ async def create_queen(
QueenPhaseState,
register_queen_lifecycle_tools,
)
from framework.tools.queen_memory_tools import register_queen_memory_tools
hive_home = Path.home() / ".hive"
@@ -122,6 +123,9 @@ async def create_queen(
phase_state=phase_state,
)
# ---- Episodic memory tools (always registered) ---------------------
register_queen_memory_tools(queen_registry)
# ---- Monitoring tools (only when worker is loaded) ----------------
if session.worker_runtime:
from framework.tools.worker_monitoring_tools import register_worker_monitoring_tools
+296 -53
View File
@@ -47,6 +47,8 @@ class Session:
worker_handoff_sub: str | None = None
# Memory consolidation subscription (fires on CONTEXT_COMPACTED)
memory_consolidation_sub: str | None = None
# Worker run digest subscription (fires on EXECUTION_COMPLETED / EXECUTION_FAILED)
worker_digest_sub: str | None = None
# Trigger definitions loaded from agent's triggers.json (available but inactive)
available_triggers: dict[str, TriggerDefinition] = field(default_factory=dict)
# Active trigger tracking (IDs currently firing + their asyncio tasks)
@@ -177,6 +179,31 @@ class SessionManager:
agent_path = Path(agent_path)
resolved_worker_id = agent_id or agent_path.name
# When cold-restoring, check meta.json for the phase — if the agent
# was still being built we must NOT try to load the worker (the code
# is incomplete and will fail to import).
if queen_resume_from:
_resume_phase = None
_meta_path = (
Path.home() / ".hive" / "queen" / "session" / queen_resume_from / "meta.json"
)
if _meta_path.exists():
try:
_meta = json.loads(_meta_path.read_text(encoding="utf-8"))
_resume_phase = _meta.get("phase")
except (json.JSONDecodeError, OSError):
pass
if _resume_phase in ("building", "planning"):
# Fall back to queen-only session — cold resume handler in
# _start_queen will set phase_state.agent_path and switch to
# the correct phase.
return await self.create_session(
session_id=session_id,
model=model,
initial_prompt=initial_prompt,
queen_resume_from=queen_resume_from,
)
# Reuse the original session ID when cold-restoring so the frontend
# sees one continuous session instead of a new one each time.
session = await self._create_session_core(
@@ -193,6 +220,9 @@ class SessionManager:
model=model,
)
# Restore active triggers from persisted state (cold restore)
await self._restore_active_triggers(session, session.id)
# Start queen with worker profile + lifecycle + monitoring tools
worker_identity = (
build_worker_profile(session.worker_runtime, agent_path=agent_path)
@@ -204,7 +234,23 @@ class SessionManager:
)
except Exception:
# If anything fails, tear down the session
if queen_resume_from:
# Cold restore: worker load failed (e.g. incomplete code from a
# building session). Fall back to queen-only so the user can
# continue the conversation and fix / rebuild the agent.
logger.warning(
"Cold restore: worker load failed for '%s', falling back to queen-only",
agent_path,
exc_info=True,
)
await self.stop_session(session.id)
return await self.create_session(
session_id=session_id,
model=model,
initial_prompt=initial_prompt,
queen_resume_from=queen_resume_from,
)
# If anything fails (non-cold-restore), tear down the session
await self.stop_session(session.id)
raise
return session
@@ -297,6 +343,9 @@ class SessionManager:
session.worker_runtime = runtime
session.worker_info = info
# Subscribe to execution completion for per-run digest generation
self._subscribe_worker_digest(session)
async with self._lock:
self._loading.discard(session.id)
@@ -399,6 +448,51 @@ class SessionManager:
return False
return True
async def _restore_active_triggers(self, session: "Session", session_id: str) -> None:
"""Restore previously active triggers from persisted session state.
Called after worker loading to restart any timer/webhook triggers
that were active before a server restart.
"""
if not session.available_triggers or not session.worker_runtime:
return
try:
store = session.worker_runtime._session_store
state = await store.read_state(session_id)
if state and state.active_triggers:
from framework.tools.queen_lifecycle_tools import (
_start_trigger_timer,
_start_trigger_webhook,
)
saved_tasks = getattr(state, "trigger_tasks", {}) or {}
for tid in state.active_triggers:
tdef = session.available_triggers.get(tid)
if tdef:
# Restore user-configured task override
saved_task = saved_tasks.get(tid, "")
if saved_task:
tdef.task = saved_task
tdef.active = True
session.active_trigger_ids.add(tid)
if tdef.trigger_type == "timer":
await _start_trigger_timer(session, tid, tdef)
logger.info("Restored trigger timer '%s'", tid)
elif tdef.trigger_type == "webhook":
await _start_trigger_webhook(session, tid, tdef)
logger.info("Restored webhook trigger '%s'", tid)
else:
logger.warning(
"Saved trigger '%s' not found in worker entry points, skipping",
tid,
)
# Restore worker_configured flag
if state and getattr(state, "worker_configured", False):
session.worker_configured = True
except Exception as e:
logger.warning("Failed to restore active triggers: %s", e)
async def load_worker(
self,
session_id: str,
@@ -447,44 +541,7 @@ class SessionManager:
except OSError:
pass
# Restore previously active triggers from persisted session state
if session.available_triggers and session.worker_runtime:
try:
store = session.worker_runtime._session_store
state = await store.read_state(session_id)
if state and state.active_triggers:
from framework.tools.queen_lifecycle_tools import (
_start_trigger_timer,
_start_trigger_webhook,
)
saved_tasks = getattr(state, "trigger_tasks", {}) or {}
for tid in state.active_triggers:
tdef = session.available_triggers.get(tid)
if tdef:
# Restore user-configured task override
saved_task = saved_tasks.get(tid, "")
if saved_task:
tdef.task = saved_task
tdef.active = True
session.active_trigger_ids.add(tid)
if tdef.trigger_type == "timer":
await _start_trigger_timer(session, tid, tdef)
logger.info("Restored trigger timer '%s'", tid)
elif tdef.trigger_type == "webhook":
await _start_trigger_webhook(session, tid, tdef)
logger.info("Restored webhook trigger '%s'", tid)
else:
logger.warning(
"Saved trigger '%s' not found in worker entry points, skipping",
tid,
)
# Restore worker_configured flag
if state and getattr(state, "worker_configured", False):
session.worker_configured = True
except Exception as e:
logger.warning("Failed to restore active triggers: %s", e)
await self._restore_active_triggers(session, session_id)
# Emit SSE event so the frontend can update UI
await self._emit_worker_loaded(session)
@@ -526,6 +583,13 @@ class SessionManager:
await self._emit_trigger_events(session, "removed", session.available_triggers)
session.available_triggers.clear()
if session.worker_digest_sub is not None:
try:
session.event_bus.unsubscribe(session.worker_digest_sub)
except Exception:
pass
session.worker_digest_sub = None
worker_id = session.worker_id
session.worker_id = None
session.worker_path = None
@@ -563,6 +627,13 @@ class SessionManager:
pass
session.worker_handoff_sub = None
if session.worker_digest_sub is not None:
try:
session.event_bus.unsubscribe(session.worker_digest_sub)
except Exception:
pass
session.worker_digest_sub = None
# Stop queen and memory consolidation subscription
if session.memory_consolidation_sub is not None:
try:
@@ -647,6 +718,134 @@ class SessionManager:
else:
logger.warning("Worker handoff received but queen node not ready")
def _subscribe_worker_digest(self, session: Session) -> None:
"""Subscribe to worker events to write per-run digests.
Three triggers:
- NODE_LOOP_ITERATION: write a mid-run snapshot, throttled to at most
once every _DIGEST_COOLDOWN seconds per execution.
- TOOL_CALL_COMPLETED for delegate_to_sub_agent: same throttled snapshot.
Orchestrator nodes often run all subagent calls in a single LLM turn,
so NODE_LOOP_ITERATION only fires once at the end. Subagent
completions provide intermediate checkpoints.
- EXECUTION_COMPLETED / EXECUTION_FAILED: always write the final digest,
bypassing the cooldown.
"""
import time as _time
from framework.runtime.event_bus import EventType as _ET
_DIGEST_COOLDOWN = 300.0 # seconds between mid-run snapshots
if session.worker_digest_sub is not None:
try:
session.event_bus.unsubscribe(session.worker_digest_sub)
except Exception:
pass
session.worker_digest_sub = None
agent_name = session.worker_path.name if session.worker_path else None
if not agent_name:
return
_agent_name = agent_name
_llm = session.llm
_bus = session.event_bus
# per-execution_id monotonic timestamp of last mid-run digest
_last_digest: dict[str, float] = {}
def _resolve_run_id(exec_id: str) -> str | None:
"""Look up the run_id for a given execution_id via EXECUTION_STARTED history."""
for e in _bus.get_history(event_type=_ET.EXECUTION_STARTED, limit=200):
if e.execution_id == exec_id and getattr(e, "run_id", None):
return e.run_id
return None
async def _inject_digest_to_queen(run_id: str) -> None:
"""Read the written digest and push it into the queen's conversation."""
from framework.agents.worker_memory import digest_path
try:
content = digest_path(_agent_name, run_id).read_text(encoding="utf-8").strip()
except OSError:
return
if not content:
return
executor = session.queen_executor
if executor is None:
return
node = executor.node_registry.get("queen")
if node is None or not hasattr(node, "inject_event"):
return
await node.inject_event(f"[WORKER_DIGEST]\n{content}")
async def _consolidate_and_notify(run_id: str, outcome_event: Any) -> None:
"""Write the digest then push it to the queen."""
from framework.agents.worker_memory import consolidate_worker_run
await consolidate_worker_run(_agent_name, run_id, outcome_event, _bus, _llm)
await _inject_digest_to_queen(run_id)
async def _on_worker_event(event: Any) -> None:
if event.stream_id == "queen":
return
exec_id = event.execution_id
if event.type == _ET.EXECUTION_STARTED:
# New run on this execution_id — reset cooldown so the first
# iteration always produces a mid-run snapshot.
if exec_id:
_last_digest.pop(exec_id, None)
elif event.type in (
_ET.EXECUTION_COMPLETED,
_ET.EXECUTION_FAILED,
_ET.EXECUTION_PAUSED,
):
# Final digest — always fire, ignore cooldown.
# EXECUTION_PAUSED covers cancellation (queen re-triggering the
# worker cancels the previous execution, emitting paused).
run_id = getattr(event, "run_id", None) or _resolve_run_id(exec_id)
if run_id:
asyncio.create_task(
_consolidate_and_notify(run_id, event),
name=f"worker-digest-final-{run_id}",
)
elif event.type in (_ET.NODE_LOOP_ITERATION, _ET.TOOL_CALL_COMPLETED):
# Mid-run snapshot — respect 300 s cooldown per execution.
# TOOL_CALL_COMPLETED is only interesting for subagent calls;
# regular tool completions are too frequent and too cheap.
if event.type == _ET.TOOL_CALL_COMPLETED:
tool_name = (event.data or {}).get("tool_name", "")
if tool_name != "delegate_to_sub_agent":
return
if not exec_id:
return
now = _time.monotonic()
if now - _last_digest.get(exec_id, 0.0) < _DIGEST_COOLDOWN:
return
run_id = _resolve_run_id(exec_id)
if run_id:
_last_digest[exec_id] = now
asyncio.create_task(
_consolidate_and_notify(run_id, None),
name=f"worker-digest-{run_id}",
)
session.worker_digest_sub = session.event_bus.subscribe(
event_types=[
_ET.EXECUTION_STARTED,
_ET.NODE_LOOP_ITERATION,
_ET.TOOL_CALL_COMPLETED,
_ET.EXECUTION_COMPLETED,
_ET.EXECUTION_FAILED,
_ET.EXECUTION_PAUSED,
],
handler=_on_worker_event,
)
def _subscribe_worker_handoffs(self, session: Session, executor: Any) -> None:
"""Subscribe queen to worker/subagent escalation handoff events."""
from framework.runtime.event_bus import EventType as _ET
@@ -700,16 +899,21 @@ class SessionManager:
else None
)
)
_meta_path.write_text(
json.dumps(
{
"agent_name": _agent_name,
"agent_path": str(session.worker_path) if session.worker_path else None,
"created_at": time.time(),
}
),
encoding="utf-8",
)
# Merge into existing meta.json to preserve fields written by
# _update_meta_json (e.g. phase, agent_path set during building).
_existing_meta: dict = {}
if _meta_path.exists():
try:
_existing_meta = json.loads(_meta_path.read_text(encoding="utf-8"))
except (json.JSONDecodeError, OSError):
pass
_new_meta: dict = {"created_at": time.time()}
if _agent_name is not None:
_new_meta["agent_name"] = _agent_name
if session.worker_path is not None:
_new_meta["agent_path"] = str(session.worker_path)
_existing_meta.update(_new_meta)
_meta_path.write_text(json.dumps(_existing_meta), encoding="utf-8")
except OSError:
pass
@@ -762,11 +966,27 @@ class SessionManager:
try:
_meta = json.loads(meta_path.read_text(encoding="utf-8"))
_agent_path = _meta.get("agent_path")
_phase = _meta.get("phase")
if _agent_path and Path(_agent_path).exists():
await self.load_worker(session.id, _agent_path)
if session.phase_state:
await session.phase_state.switch_to_staging(source="auto")
logger.info("Cold restore: auto-loaded worker from %s", _agent_path)
if _phase in ("staging", "running", None):
# Agent fully built — load worker and resume
await self.load_worker(session.id, _agent_path)
if session.phase_state:
await session.phase_state.switch_to_staging(source="auto")
# Emit flowchart overlay so frontend can display it
await self._emit_flowchart_on_restore(session, _agent_path)
logger.info("Cold restore: auto-loaded worker from %s", _agent_path)
elif _phase == "building":
# Agent folder exists but incomplete — resume building
if session.phase_state:
session.phase_state.agent_path = _agent_path
await session.phase_state.switch_to_building(source="auto")
logger.info("Cold restore: resumed BUILDING phase for %s", _agent_path)
elif _phase == "planning":
if session.phase_state:
session.phase_state.agent_path = _agent_path
logger.info("Cold restore: PLANNING phase for %s", _agent_path)
except Exception:
logger.warning("Cold restore: failed to auto-load worker", exc_info=True)
@@ -841,6 +1061,29 @@ class SessionManager:
)
)
async def _emit_flowchart_on_restore(self, session: Session, agent_path: str | Path) -> None:
"""Emit FLOWCHART_MAP_UPDATED from persisted flowchart file on cold restore."""
from framework.runtime.event_bus import AgentEvent, EventType
from framework.tools.flowchart_utils import load_flowchart_file
original_draft, flowchart_map = load_flowchart_file(agent_path)
if original_draft is None:
return
# Cache in phase_state so the REST endpoint also returns it
if session.phase_state:
session.phase_state.original_draft_graph = original_draft
session.phase_state.flowchart_map = flowchart_map
await session.event_bus.publish(
AgentEvent(
type=EventType.FLOWCHART_MAP_UPDATED,
stream_id="queen",
data={
"map": flowchart_map,
"original_draft": original_draft,
},
)
)
async def _notify_queen_worker_unloaded(self, session: Session) -> None:
"""Notify the queen that the worker has been unloaded."""
executor = session.queen_executor
+110 -12
View File
@@ -727,6 +727,25 @@ def _dissolve_planning_nodes(
return converted, flowchart_map
def _update_meta_json(session_manager, manager_session_id, updates: dict) -> None:
"""Merge updates into the queen session's meta.json."""
if session_manager is None or not manager_session_id:
return
srv_session = session_manager.get_session(manager_session_id)
if not srv_session:
return
storage_sid = getattr(srv_session, "queen_resume_from", None) or srv_session.id
meta_path = Path.home() / ".hive" / "queen" / "session" / storage_sid / "meta.json"
try:
existing = {}
if meta_path.exists():
existing = json.loads(meta_path.read_text(encoding="utf-8"))
existing.update(updates)
meta_path.write_text(json.dumps(existing), encoding="utf-8")
except OSError:
pass
def register_queen_lifecycle_tools(
registry: ToolRegistry,
session: Any = None,
@@ -975,6 +994,7 @@ def register_queen_lifecycle_tools(
# Switch to building phase
if phase_state is not None:
await phase_state.switch_to_building()
_update_meta_json(session_manager, manager_session_id, {"phase": "building"})
result = json.loads(stop_result)
result["phase"] = "building"
@@ -1559,12 +1579,22 @@ def register_queen_lifecycle_tools(
# Find edges where this leaf node is the source
out_edges = [e for e in validated_edges if e["source"] == leaf_id]
in_edges = [e for e in validated_edges if e["target"] == leaf_id]
if not out_edges:
continue # already a proper leaf
# Identify the parent (predecessor that connects IN)
parent_ids = [e["source"] for e in in_edges]
if not out_edges:
# Already a proper leaf — still ensure sub_agents is set
for pid in parent_ids:
parent = node_by_id_v.get(pid)
if parent is None:
continue
existing = parent.get("sub_agents") or []
if leaf_id not in existing:
existing.append(leaf_id)
parent["sub_agents"] = existing
continue
# Strip all outgoing edges from the leaf node that
# don't go back to a parent (report edges are OK)
illegal_targets: list[str] = []
@@ -1978,6 +2008,17 @@ def register_queen_lifecycle_tools(
"type": "string",
"description": "What success looks like for this node",
},
"sub_agents": {
"type": "array",
"items": {"type": "string"},
"description": (
"IDs of GCU/browser sub-agent nodes managed by this node. "
"At build time, sub-agent nodes are dissolved into this list. "
"Set this on the PARENT node — e.g. the orchestrator that "
"delegates to GCU leaves. Visual delegation edges are "
"synthesized automatically."
),
},
"decision_clause": {
"type": "string",
"description": (
@@ -2095,8 +2136,22 @@ def register_queen_lifecycle_tools(
phase_state.draft_graph = converted
phase_state.flowchart_map = fmap
# Note: flowchart file is persisted later, in initialize_and_build_agent
# (after the agent folder is scaffolded) or in load_built_agent.
# Create agent folder early so flowchart and agent_path are available
# throughout the entire BUILDING phase.
_agent_name = phase_state.draft_graph.get("agent_name", "").strip()
if _agent_name:
_agent_folder = Path("exports") / _agent_name
_agent_folder.mkdir(parents=True, exist_ok=True)
_save_flowchart_file(_agent_folder, original_copy, fmap)
phase_state.agent_path = str(_agent_folder)
_update_meta_json(
session_manager,
manager_session_id,
{
"agent_path": str(_agent_folder),
"agent_name": _agent_name.replace("_", " ").title(),
},
)
dissolved_count = len(original_nodes) - len(converted.get("nodes", []))
decision_count = sum(1 for n in original_nodes if n.get("flowchart_type") == "decision")
@@ -2228,6 +2283,7 @@ def register_queen_lifecycle_tools(
if fallback_path:
phase_state.agent_path = str(fallback_path)
await phase_state.switch_to_building(source="tool")
_update_meta_json(session_manager, manager_session_id, {"phase": "building"})
if phase_state.inject_notification:
await phase_state.inject_notification(
"[PHASE CHANGE] Switched to BUILDING phase. "
@@ -2270,8 +2326,13 @@ def register_queen_lifecycle_tools(
if parsed.get("success", True):
if phase_state is not None:
# Set agent_path so the frontend can query credentials
phase_state.agent_path = str(Path("exports") / agent_name)
phase_state.agent_path = phase_state.agent_path or str(
Path("exports") / agent_name
)
await phase_state.switch_to_building(source="tool")
_update_meta_json(
session_manager, manager_session_id, {"phase": "building"}
)
# Reset draft state after successful scaffolding
phase_state.build_confirmed = False
# Persist flowchart now that the agent folder exists
@@ -2319,6 +2380,7 @@ def register_queen_lifecycle_tools(
# Switch to staging phase
if phase_state is not None:
await phase_state.switch_to_staging()
_update_meta_json(session_manager, manager_session_id, {"phase": "staging"})
result = json.loads(stop_result)
result["phase"] = "staging"
@@ -2347,6 +2409,30 @@ def register_queen_lifecycle_tools(
"""Get the session's event bus for querying history."""
return getattr(session, "event_bus", None)
def _get_worker_name() -> str | None:
"""Return the worker agent directory name, used for diary lookups."""
p = getattr(session, "worker_path", None)
return p.name if p else None
def _format_diary(max_runs: int) -> str:
"""Read recent run digests from disk — no EventBus required."""
agent_name = _get_worker_name()
if not agent_name:
return "No worker loaded — diary unavailable."
from framework.agents.worker_memory import read_recent_digests
entries = read_recent_digests(agent_name, max_runs)
if not entries:
return (
f"No run digests for '{agent_name}' yet. "
"Digests are written at the end of each completed run."
)
lines = [f"Worker '{agent_name}'{len(entries)} recent run digest(s):", ""]
for _run_id, content in entries:
lines.append(content)
lines.append("")
return "\n".join(lines).rstrip()
# Tiered cooldowns: summary is free, detail has short cooldown, full keeps 30s
_COOLDOWN_FULL = 30.0
_COOLDOWN_DETAIL = 10.0
@@ -2949,16 +3035,17 @@ def register_queen_lifecycle_tools(
import time as _time
# --- Tiered cooldown ---
# diary is free (file reads only), summary is free, detail has 10s, full has 30s
now = _time.monotonic()
if focus == "full":
cooldown = _COOLDOWN_FULL
tier = "full"
elif focus is not None:
elif focus == "diary" or focus is None:
cooldown = 0.0
tier = focus or "summary"
else:
cooldown = _COOLDOWN_DETAIL
tier = "detail"
else:
cooldown = 0.0
tier = "summary"
elapsed_since = now - _status_last_called.get(tier, 0.0)
if elapsed_since < cooldown:
@@ -2974,6 +3061,10 @@ def register_queen_lifecycle_tools(
)
_status_last_called[tier] = now
# --- Diary: pure file reads, no runtime required ---
if focus == "diary":
return _format_diary(last_n)
# --- Runtime check ---
runtime = _get_runtime()
if runtime is None:
@@ -3023,7 +3114,7 @@ def register_queen_lifecycle_tools(
else:
return (
f"Unknown focus '{focus}'. "
"Valid options: activity, memory, tools, issues, progress, full."
"Valid options: diary, activity, memory, tools, issues, progress, full."
)
except Exception as exc:
logger.exception("get_worker_status error")
@@ -3034,6 +3125,8 @@ def register_queen_lifecycle_tools(
description=(
"Check on the worker. Returns a brief prose summary by default. "
"Use 'focus' to drill into specifics:\n"
"- diary: persistent run digests from past executions — read this first "
"before digging into live runtime logs\n"
"- activity: current node, transitions, latest LLM output\n"
"- memory: worker's accumulated knowledge and state\n"
"- tools: running and recent tool calls\n"
@@ -3046,8 +3139,11 @@ def register_queen_lifecycle_tools(
"properties": {
"focus": {
"type": "string",
"enum": ["activity", "memory", "tools", "issues", "progress", "full"],
"description": ("Aspect to inspect. Omit for a brief summary."),
"enum": ["diary", "activity", "memory", "tools", "issues", "progress", "full"],
"description": (
"Aspect to inspect. Omit for a brief summary. "
"Use 'diary' to read persistent run history before checking live logs."
),
},
"last_n": {
"type": "integer",
@@ -3446,6 +3542,7 @@ def register_queen_lifecycle_tools(
if phase_state is not None:
phase_state.agent_path = str(resolved_path)
await phase_state.switch_to_staging()
_update_meta_json(session_manager, manager_session_id, {"phase": "staging"})
worker_name = info.name if info else updated_session.worker_id
return json.dumps(
@@ -3565,6 +3662,7 @@ def register_queen_lifecycle_tools(
# Switch to running phase
if phase_state is not None:
await phase_state.switch_to_running()
_update_meta_json(session_manager, manager_session_id, {"phase": "running"})
return json.dumps(
{
+8 -1
View File
@@ -27,7 +27,14 @@ export default function MyAgents() {
agentsApi
.discover()
.then((result) => {
setAgents(result["Your Agents"] || []);
const entries = result["Your Agents"] || [];
entries.sort((a, b) => {
if (!a.last_active && !b.last_active) return 0;
if (!a.last_active) return 1;
if (!b.last_active) return -1;
return b.last_active.localeCompare(a.last_active);
});
setAgents(entries);
})
.catch((err) => {
setError(err.message || "Failed to load agents");
+59 -3
View File
@@ -252,6 +252,10 @@ function truncate(s: string, max: number): string {
type SessionRestoreResult = {
messages: ChatMessage[];
restoredPhase: "planning" | "building" | "staging" | "running" | null;
/** Last flowchart map from events — used to restore flowchart overlay on cold resume. */
flowchartMap: Record<string, string[]> | null;
/** Last original draft from events — used to restore flowchart overlay on cold resume. */
originalDraft: DraftGraphData | null;
};
/**
@@ -268,6 +272,8 @@ async function restoreSessionMessages(
if (events.length > 0) {
const messages: ChatMessage[] = [];
let runningPhase: ChatMessage["phase"] = undefined;
let flowchartMap: Record<string, string[]> | null = null;
let originalDraft: DraftGraphData | null = null;
for (const evt of events) {
// Track phase transitions so each message gets the phase it was created in
const p = evt.type === "queen_phase_changed" ? evt.data?.phase as string
@@ -276,6 +282,12 @@ async function restoreSessionMessages(
if (p && ["planning", "building", "staging", "running"].includes(p)) {
runningPhase = p as ChatMessage["phase"];
}
// Track last flowchart state for cold restore
if (evt.type === "flowchart_map_updated" && evt.data) {
const mapData = evt.data as { map?: Record<string, string[]>; original_draft?: DraftGraphData };
flowchartMap = mapData.map ?? null;
originalDraft = mapData.original_draft ?? null;
}
const msg = sseEventToChatMessage(evt, thread, agentDisplayName);
if (!msg) continue;
if (evt.stream_id === "queen") {
@@ -284,12 +296,12 @@ async function restoreSessionMessages(
}
messages.push(msg);
}
return { messages, restoredPhase: runningPhase ?? null };
return { messages, restoredPhase: runningPhase ?? null, flowchartMap, originalDraft };
}
} catch {
// Event log not available — session will start fresh.
}
return { messages: [], restoredPhase: null };
return { messages: [], restoredPhase: null, flowchartMap: null, originalDraft: null };
}
// --- Per-agent backend state (consolidated) ---
@@ -799,6 +811,8 @@ export default function Workspace() {
}
let restoredPhase: "planning" | "building" | "staging" | "running" | null = null;
let restoredFlowchartMap: Record<string, string[]> | null = null;
let restoredOriginalDraft: DraftGraphData | null = null;
if (!liveSession) {
// Fetch conversation history from disk BEFORE creating the new session.
// SKIP if messages were already pre-populated by handleHistoryOpen.
@@ -810,9 +824,22 @@ export default function Workspace() {
const restored = await restoreSessionMessages(restoreFrom, agentType, "Queen Bee");
preRestoredMsgs.push(...restored.messages);
restoredPhase = restored.restoredPhase;
restoredFlowchartMap = restored.flowchartMap;
restoredOriginalDraft = restored.originalDraft;
} catch {
// Not available — will start fresh
}
} else if (restoreFrom && alreadyHasMessages) {
// Messages already cached in localStorage — still fetch events for
// non-message state (phase, flowchart) that isn't cached.
try {
const restored = await restoreSessionMessages(restoreFrom, agentType, "Queen Bee");
restoredPhase = restored.restoredPhase;
restoredFlowchartMap = restored.flowchartMap;
restoredOriginalDraft = restored.originalDraft;
} catch {
// Not critical — UI will still show cached messages
}
}
// Suppress the queen's intro cycle whenever we are about to restore a
@@ -835,7 +862,7 @@ export default function Workspace() {
}));
}
restoredMessageCount = preRestoredMsgs.length;
} else if (restoreFrom && activeId) {
} else if (restoreFrom && activeId && !alreadyHasMessages) {
// We had a stored session but no messages on disk — wipe stale localStorage cache
setSessionsByAgent(prev => ({
...prev,
@@ -889,6 +916,9 @@ export default function Workspace() {
queenReady: true,
queenPhase: qPhase,
queenBuilding: qPhase === "building",
// Restore flowchart overlay from persisted events
...(restoredFlowchartMap ? { flowchartMap: restoredFlowchartMap } : {}),
...(restoredOriginalDraft ? { originalDraft: restoredOriginalDraft, draftGraph: null } : {}),
});
} catch (err: unknown) {
const msg = err instanceof Error ? err.message : String(err);
@@ -963,6 +993,8 @@ export default function Workspace() {
// Track the last queen phase seen in the event log for cold restore
let restoredPhase: "planning" | "building" | "staging" | "running" | null = null;
let restoredFlowchartMap: Record<string, string[]> | null = null;
let restoredOriginalDraft: DraftGraphData | null = null;
if (!liveSession) {
// Reconnect failed — clear stale cached messages from localStorage restore.
@@ -990,6 +1022,19 @@ export default function Workspace() {
const restored = await restoreSessionMessages(coldRestoreId, agentType, displayNameTemp);
preQueenMsgs = restored.messages;
restoredPhase = restored.restoredPhase;
restoredFlowchartMap = restored.flowchartMap;
restoredOriginalDraft = restored.originalDraft;
} else if (coldRestoreId && alreadyHasMessages) {
// Messages already cached — still fetch events for non-message state (phase, flowchart)
try {
const displayNameTemp = formatAgentDisplayName(agentPath);
const restored = await restoreSessionMessages(coldRestoreId, agentType, displayNameTemp);
restoredPhase = restored.restoredPhase;
restoredFlowchartMap = restored.flowchartMap;
restoredOriginalDraft = restored.originalDraft;
} catch {
// Not critical — UI will still show cached messages
}
}
// Suppress intro whenever we are about to restore a previous conversation.
@@ -1070,6 +1115,9 @@ export default function Workspace() {
displayName,
queenPhase: initialPhase,
queenBuilding: initialPhase === "building",
// Restore flowchart overlay from persisted events
...(restoredFlowchartMap ? { flowchartMap: restoredFlowchartMap } : {}),
...(restoredOriginalDraft ? { originalDraft: restoredOriginalDraft, draftGraph: null } : {}),
});
// Update the session label + backendSessionId. Also set historySourceId
@@ -1107,6 +1155,11 @@ export default function Workspace() {
if (historyId && !coldRestoreId) {
const restored = await restoreSessionMessages(historyId, agentType, displayName);
restoredMsgs.push(...restored.messages);
// Use flowchart from event log if not already set
if (restored.flowchartMap && !restoredFlowchartMap) {
restoredFlowchartMap = restored.flowchartMap;
restoredOriginalDraft = restored.originalDraft;
}
// Check worker status (needed for isWorkerRunning flag)
try {
@@ -1149,6 +1202,9 @@ export default function Workspace() {
loading: false,
queenReady: !!(isResumedSession || hasRestoredContent),
...(isWorkerRunning ? { workerRunState: "running" } : {}),
// Restore flowchart overlay from persisted events
...(restoredFlowchartMap ? { flowchartMap: restoredFlowchartMap } : {}),
...(restoredOriginalDraft ? { originalDraft: restoredOriginalDraft, draftGraph: null } : {}),
});
} catch (err: unknown) {
const msg = err instanceof Error ? err.message : String(err);
+136
View File
@@ -0,0 +1,136 @@
# SDR Agent
An AI-powered sales development outreach automation template for [Hive](https://github.com/aden-hive/hive).
Score contacts by priority, filter suspicious profiles, generate personalized messages, and create Gmail drafts — all with human review before anything is sent.
## Overview
The SDR Agent automates the full outreach pipeline:
```
Intake → Score Contacts → Filter Contacts → Personalize → Send Outreach → Report
```
1. **Intake** — Accept a contact list and outreach goal; confirm strategy with user
2. **Score Contacts** — Rank contacts 0100 using priority factors (alumni, degree, domain, etc.)
3. **Filter Contacts** — Detect and skip suspicious/fake profiles (risk score ≥ 7)
4. **Personalize** — Generate an 80120 word personalized message per contact
5. **Send Outreach** — Create Gmail drafts for human review (never sends automatically)
6. **Report** — Summarize campaign: contacts scored, filtered, drafted
## Quickstart
```bash
cd examples/templates/sdr_agent
# Run interactively via TUI
python -m sdr_agent tui
# Run via CLI with a contacts JSON string
python -m sdr_agent run \
--contacts '[{"name":"Jane Doe","company":"Acme","title":"Engineer","connection_degree":"2nd","is_alumni":true}]' \
--goal "coffee chat" \
--background "Learning Technologist at UWO" \
--max-contacts 20
# Validate agent structure
python -m sdr_agent validate
```
## Contact Schema
Each contact in your list supports the following fields:
| Field | Type | Required | Description |
|-------|------|----------|-------------|
| `name` | string | ✅ | Contact's full name |
| `email` | string | ❌ | Email address (draft placeholder if missing) |
| `company` | string | ✅ | Current company |
| `title` | string | ✅ | Job title |
| `linkedin_url` | string | ❌ | LinkedIn profile URL |
| `connection_degree` | string | ❌ | `"1st"`, `"2nd"`, or `"3rd"` |
| `is_alumni` | boolean | ❌ | Shares school with user |
| `school_name` | string | ❌ | School name for alumni messaging |
| `connections_count` | integer | ❌ | Number of LinkedIn connections |
| `mutual_connections` | integer | ❌ | Count of mutual connections |
| `has_photo` | boolean | ❌ | Has a profile photo |
## Scoring Model
The `score-contacts` node ranks each contact 0100:
| Factor | Points |
|--------|--------|
| Alumni | +30 |
| 1st degree | +25 |
| 2nd degree | +20 |
| 3rd degree | +10 |
| Domain verified | +10 |
| Mutual connections (×1, max 10) | +10 |
| Active job posting | +10 |
| Has profile photo | +5 |
| 500+ connections | +5 |
## Scam Detection
The `filter-contacts` node calculates a risk score and excludes contacts with risk ≥ 7:
| Red Flag | Risk |
|----------|------|
| Fewer than 50 connections | +3 |
| No profile photo | +2 |
| Fewer than 2 work positions | +2 |
| Generic title + few connections | +2 |
| Unverifiable company | +2 |
| AI-generated-looking profile | +2 |
| 5000+ connections, 0 mutual | +1 |
## Pipeline Output Files
Each run writes to `~/.hive/agents/sdr_agent/data/`:
| File | Contents |
|------|----------|
| `contacts.jsonl` | Raw contact list |
| `scored_contacts.jsonl` | Contacts with `priority_score` |
| `safe_contacts.jsonl` | Contacts passing scam filter |
| `personalized_contacts.jsonl` | Contacts with `outreach_message` |
| `drafts.jsonl` | Draft creation records |
## Safety Constraints
- **Never sends emails** — only `gmail_create_draft` is called; human must review and send
- **Batch limit** — processes at most `max_contacts` per run (default: 20)
- **Skip suspicious** — contacts with `risk_score ≥ 7` are always excluded
## Tools Required
- `gmail_create_draft` — create Gmail draft for each contact
- `load_data` — read JSONL data files
- `append_data` — write to JSONL data files
## Architecture
```
┌──────────────────────────────────────────────────────────────┐
│ SDR Agent │
│ │
│ ┌────────┐ ┌───────────────┐ ┌────────────────┐ │
│ │ Intake │──▶│ Score Contacts│──▶│ Filter Contacts│ │
│ └────────┘ └───────────────┘ └────────────────┘ │
│ ▲ │ │
│ │ ▼ │
│ ┌────────┐ ┌───────────────┐ ┌─────────────┐ │
│ │ Report │◀──│ Send Outreach │◀──│ Personalize │ │
│ └────────┘ └───────────────┘ └─────────────┘ │
│ │
│ ● client_facing nodes: intake, report │
│ ● automated nodes: score-contacts, filter-contacts, │
│ personalize, send-outreach │
└──────────────────────────────────────────────────────────────┘
```
## Inspiration
This template is inspired by real-world SDR automation patterns, including contact ranking, scam detection, and two-step personalization (hook extraction → message generation) — demonstrating how job-search and sales outreach workflows can be modeled as AI agent pipelines in Hive.
+45
View File
@@ -0,0 +1,45 @@
"""
SDR Agent Automated sales development outreach pipeline.
Score contacts by priority, filter suspicious profiles, generate personalized
outreach messages, and create Gmail drafts for human review before sending.
"""
from .agent import (
SDRAgent,
default_agent,
goal,
nodes,
edges,
loop_config,
async_entry_points,
entry_node,
entry_points,
pause_nodes,
terminal_nodes,
conversation_mode,
identity_prompt,
)
from .config import RuntimeConfig, AgentMetadata, default_config, metadata
__version__ = "1.0.0"
__all__ = [
"SDRAgent",
"default_agent",
"goal",
"nodes",
"edges",
"loop_config",
"async_entry_points",
"entry_node",
"entry_points",
"pause_nodes",
"terminal_nodes",
"conversation_mode",
"identity_prompt",
"RuntimeConfig",
"AgentMetadata",
"default_config",
"metadata",
]
+234
View File
@@ -0,0 +1,234 @@
"""
CLI entry point for SDR Agent.
Automates sales development outreach: score contacts, filter suspicious
profiles, generate personalized messages, and create Gmail drafts.
"""
import asyncio
import json
import logging
import sys
import click
from .agent import default_agent, SDRAgent
def setup_logging(verbose=False, debug=False):
"""Configure logging for execution visibility."""
if debug:
level, fmt = logging.DEBUG, "%(asctime)s %(name)s: %(message)s"
elif verbose:
level, fmt = logging.INFO, "%(message)s"
else:
level, fmt = logging.WARNING, "%(levelname)s: %(message)s"
logging.basicConfig(level=level, format=fmt, stream=sys.stderr)
logging.getLogger("framework").setLevel(level)
@click.group()
@click.version_option(version="1.0.0")
def cli():
"""SDR Agent - Automated outreach with contact scoring and personalization."""
pass
@cli.command()
@click.option(
"--contacts",
"-c",
type=str,
required=True,
help="JSON string or file path of contacts list",
)
@click.option(
"--goal",
"-g",
type=str,
default="coffee chat",
help="Outreach goal (e.g. 'coffee chat', 'sales pitch')",
)
@click.option(
"--background",
"-b",
type=str,
default="",
help="Your background/role for personalization",
)
@click.option(
"--max-contacts",
"-m",
type=int,
default=20,
help="Max contacts to process per batch (default: 20)",
)
@click.option(
"--mock", is_flag=True, help="Run in mock mode without LLM or Gmail calls"
)
@click.option("--quiet", "-q", is_flag=True, help="Only output result JSON")
@click.option("--verbose", "-v", is_flag=True, help="Show execution details")
@click.option("--debug", is_flag=True, help="Show debug logging")
def run(contacts, goal, background, max_contacts, mock, quiet, verbose, debug):
"""Execute an SDR outreach campaign for the given contacts."""
if not quiet:
setup_logging(verbose=verbose, debug=debug)
context = {
"contacts": contacts,
"outreach_goal": goal,
"user_background": background,
"max_contacts": str(max_contacts),
}
result = asyncio.run(default_agent.run(context, mock_mode=mock))
output_data = {
"success": result.success,
"steps_executed": result.steps_executed,
"output": result.output,
}
if result.error:
output_data["error"] = result.error
click.echo(json.dumps(output_data, indent=2, default=str))
sys.exit(0 if result.success else 1)
@cli.command()
@click.option("--mock", is_flag=True, help="Run in mock mode")
@click.option("--verbose", "-v", is_flag=True, help="Show execution details")
@click.option("--debug", is_flag=True, help="Show debug logging")
def tui(mock, verbose, debug):
"""Launch the TUI dashboard for interactive SDR outreach."""
setup_logging(verbose=verbose, debug=debug)
try:
from framework.tui.app import AdenTUI
except ImportError:
click.echo(
"TUI requires the 'textual' package. Install with: pip install textual"
)
sys.exit(1)
async def run_with_tui():
agent = SDRAgent()
await agent.start(mock_mode=mock)
try:
app = AdenTUI(agent._agent_runtime)
await app.run_async()
finally:
await agent.stop()
asyncio.run(run_with_tui())
@cli.command()
@click.option("--json", "output_json", is_flag=True)
def info(output_json):
"""Show agent information."""
info_data = default_agent.info()
if output_json:
click.echo(json.dumps(info_data, indent=2))
else:
click.echo(f"Agent: {info_data['name']}")
click.echo(f"Version: {info_data['version']}")
click.echo(f"Description: {info_data['description']}")
click.echo(f"\nNodes: {', '.join(info_data['nodes'])}")
click.echo(f"Client-facing: {', '.join(info_data['client_facing_nodes'])}")
click.echo(f"Entry: {info_data['entry_node']}")
click.echo(f"Terminal: {', '.join(info_data['terminal_nodes'])}")
@cli.command()
def validate():
"""Validate agent structure."""
validation = default_agent.validate()
if validation["valid"]:
click.echo("Agent is valid")
if validation["warnings"]:
for warning in validation["warnings"]:
click.echo(f" WARNING: {warning}")
else:
click.echo("Agent has errors:")
for error in validation["errors"]:
click.echo(f" ERROR: {error}")
sys.exit(0 if validation["valid"] else 1)
@cli.command()
@click.option("--verbose", "-v", is_flag=True)
def shell(verbose):
"""Interactive SDR outreach session (CLI, no TUI)."""
asyncio.run(_interactive_shell(verbose))
async def _interactive_shell(verbose=False):
"""Async interactive shell."""
setup_logging(verbose=verbose)
click.echo("=== SDR Agent ===")
click.echo("Automated contact scoring, filtering, and outreach personalization\n")
agent = SDRAgent()
await agent.start()
try:
while True:
try:
goal = await asyncio.get_event_loop().run_in_executor(
None, input, "Outreach goal (e.g. 'coffee chat')> "
)
if goal.lower() in ["quit", "exit", "q"]:
click.echo("Goodbye!")
break
contacts = await asyncio.get_event_loop().run_in_executor(
None, input, "Contacts (JSON)> "
)
background = await asyncio.get_event_loop().run_in_executor(
None, input, "Your background/role> "
)
if not contacts.strip():
continue
click.echo("\nRunning SDR campaign...\n")
result = await agent.trigger_and_wait(
"start",
{
"contacts": contacts,
"outreach_goal": goal,
"user_background": background,
"max_contacts": "20",
},
)
if result is None:
click.echo("\n[Execution timed out]\n")
continue
if result.success:
output = result.output
if "summary_report" in output:
click.echo("\n--- Campaign Report ---\n")
click.echo(output["summary_report"])
click.echo("\n")
else:
click.echo(f"\nCampaign failed: {result.error}\n")
except KeyboardInterrupt:
click.echo("\nGoodbye!")
break
except Exception as e:
click.echo(f"Error: {e}", err=True)
import traceback
traceback.print_exc()
finally:
await agent.stop()
if __name__ == "__main__":
cli()
+378
View File
@@ -0,0 +1,378 @@
{
"agent": {
"id": "sdr_agent",
"name": "SDR Agent",
"version": "1.0.0",
"description": "Automate sales development outreach using AI-powered contact scoring, scam detection, and personalized message generation. Score contacts by priority, filter suspicious profiles, generate personalized outreach messages, and create Gmail drafts for review — all without sending emails automatically."
},
"graph": {
"id": "sdr-agent-graph",
"goal_id": "sdr-agent",
"version": "1.0.0",
"entry_node": "intake",
"entry_points": {
"start": "intake"
},
"pause_nodes": [],
"terminal_nodes": ["complete"],
"conversation_mode": "continuous",
"identity_prompt": "You are an SDR (Sales Development Representative) assistant. You help users automate their outreach by scoring contacts, filtering suspicious profiles, generating personalized messages, and creating Gmail drafts — all with human review before anything is sent.",
"nodes": [
{
"id": "intake",
"name": "Intake",
"description": "Receive the contact list and outreach goal from the user. Confirm the strategy and batch size before proceeding.",
"node_type": "event_loop",
"input_keys": [
"contacts",
"outreach_goal",
"max_contacts",
"user_background"
],
"output_keys": [
"contacts",
"outreach_goal",
"max_contacts",
"user_background"
],
"nullable_output_keys": [],
"input_schema": {},
"output_schema": {},
"system_prompt": "You are an SDR (Sales Development Representative) assistant helping automate outreach.\n\n**STEP 1 — Respond to the user (text only, NO tool calls):**\n\nRead the user's input from context. Confirm your understanding of:\n- The contact list they provided (or ask them to provide one)\n- Their outreach goal (e.g. \"coffee chat\", \"sales pitch\", \"networking\")\n- Their background/role (used to personalize messages)\n- The batch size (max_contacts). Default to 20 if not specified.\n\nPresent a summary like:\n\"Here's what I'll do:\n1. Score and rank your contacts by priority (alumni status, connection degree, etc.)\n2. Filter out suspicious or low-quality profiles (risk score ≥ 7)\n3. Generate a personalized outreach message for each contact\n4. Create Gmail draft emails for your review — I never send automatically\n\nReady to proceed with [N] contacts for [goal]?\"\n\n**STEP 2 — After the user confirms, call set_output:**\n\n- set_output(\"contacts\", <the contact list as a JSON string>)\n- set_output(\"outreach_goal\", <the confirmed goal, e.g. \"coffee chat\">)\n- set_output(\"max_contacts\", <the confirmed batch size as a string, e.g. \"20\">)\n- set_output(\"user_background\", <user's background/role, e.g. \"Learning Technologist at UWO\">)",
"tools": [],
"model": null,
"function": null,
"routes": {},
"max_retries": 3,
"retry_on": [],
"max_node_visits": 0,
"output_model": null,
"max_validation_retries": 2,
"client_facing": true,
"success_criteria": null
},
{
"id": "score-contacts",
"name": "Score Contacts",
"description": "Score and rank each contact from 0 to 100 based on priority factors: alumni status, connection degree, domain verification, mutual connections, and active job postings.",
"node_type": "event_loop",
"input_keys": [
"contacts",
"outreach_goal"
],
"output_keys": [
"scored_contacts"
],
"nullable_output_keys": [],
"input_schema": {},
"output_schema": {},
"system_prompt": "You are a contact prioritization engine. Score each contact from 0 to 100.\n\n**SCORING RULES (additive):**\n- Alumni of the user's school: +30 points\n- 1st degree connection: +25 points\n- 2nd degree connection: +20 points\n- 3rd degree connection: +10 points\n- Domain verified (company email matches LinkedIn company): +10 points\n- Has mutual connections (1 point each, max 10): up to +10 points\n- Active job posting at their company: +10 points\n- Has a profile photo: +5 points\n- Over 500 connections: +5 points\n\nCap the final score at 100.\n\n**STEP 1 — Load the contacts:**\nCall load_data(filename=\"contacts.jsonl\") to read the contact list.\nIf \"contacts\" in context is a JSON string (not a filename), write it first:\n- For each contact in the list, call append_data(filename=\"contacts.jsonl\", data=<JSON contact object>)\nThen read it back.\n\n**STEP 2 — Score each contact:**\nFor each contact, calculate the priority score using the rules above.\nAdd a \"priority_score\" field to each contact object.\n\n**STEP 3 — Write scored contacts and set output:**\n- Call append_data(filename=\"scored_contacts.jsonl\", data=<JSON contact with priority_score>) for each contact.\n- Sort contacts by priority_score (highest first) in your final output.\n- Call set_output(\"scored_contacts\", \"scored_contacts.jsonl\")",
"tools": [
"load_data",
"append_data"
],
"model": null,
"function": null,
"routes": {},
"max_retries": 3,
"retry_on": [],
"max_node_visits": 0,
"output_model": null,
"max_validation_retries": 2,
"client_facing": false,
"success_criteria": null
},
{
"id": "filter-contacts",
"name": "Filter Contacts",
"description": "Analyze each contact for authenticity and filter out suspicious profiles. Any contact with a risk score of 7 or higher is skipped.",
"node_type": "event_loop",
"input_keys": [
"scored_contacts"
],
"output_keys": [
"safe_contacts",
"filtered_count"
],
"nullable_output_keys": [],
"input_schema": {},
"output_schema": {},
"system_prompt": "You are a profile authenticity analyzer. Your job is to detect suspicious or fake LinkedIn profiles.\n\n**RISK SCORING RULES (additive):**\n- Fewer than 50 connections: +3 points\n- No profile photo: +2 points\n- Fewer than 2 positions in work history: +2 points\n- Generic title (e.g. \"entrepreneur\", \"CEO\", \"consultant\") AND fewer than 100 connections: +2 points\n- Company name appears generic or unverifiable: +2 points\n- Profile text seems auto-generated or overly promotional: +2 points\n- Connection count over 5000 with no mutual connections: +1 point\n\n**DECISION RULE:**\n- risk_score < 4: SAFE — include in outreach\n- risk_score 46: CAUTION — include but flag\n- risk_score ≥ 7: SKIP — exclude from outreach\n\n**STEP 1 — Load scored contacts:**\nCall load_data(filename=<the \"scored_contacts\" value from context>).\nProcess contacts chunk by chunk if has_more=true.\n\n**STEP 2 — Analyze each contact:**\nFor each contact, calculate a risk_score using the rules above.\nDetermine: is_safe (risk_score < 7), recommendation (safe/caution/skip), flags (list of triggered rules).\n\n**STEP 3 — Write safe contacts and set output:**\n- For each contact where risk_score < 7: call append_data(filename=\"safe_contacts.jsonl\", data=<contact JSON with risk_score and flags added>)\n- Track how many contacts were filtered (risk_score ≥ 7)\n- Call set_output(\"safe_contacts\", \"safe_contacts.jsonl\")\n- Call set_output(\"filtered_count\", <number of skipped contacts as string>)",
"tools": [
"load_data",
"append_data"
],
"model": null,
"function": null,
"routes": {},
"max_retries": 3,
"retry_on": [],
"max_node_visits": 0,
"output_model": null,
"max_validation_retries": 2,
"client_facing": false,
"success_criteria": null
},
{
"id": "personalize",
"name": "Personalize",
"description": "Generate a personalized outreach message for each contact based on their profile, shared background, and the user's outreach goal.",
"node_type": "event_loop",
"input_keys": [
"safe_contacts",
"outreach_goal",
"user_background"
],
"output_keys": [
"personalized_contacts"
],
"nullable_output_keys": [],
"input_schema": {},
"output_schema": {},
"system_prompt": "You are a professional outreach message writer. Generate personalized messages for each contact.\n\n**TWO-STEP PERSONALIZATION:**\n\nFor each contact, follow this two-step approach:\n\nSTEP A — Extract hooks (analyze the profile):\nLook for 2-3 specific talking points from the contact's profile:\n- Shared alumni connection\n- Specific role, company, or career transition worth mentioning\n- Any mutual interests aligned with the user's background\n\nSTEP B — Generate the message:\nWrite a warm, professional outreach message using the hooks.\n\n**MESSAGE REQUIREMENTS:**\n- 80-120 words (LinkedIn message length)\n- Start with a specific observation (\"I noticed you...\" or \"Fellow [school] alum here...\")\n- Mention the shared connection or interest naturally\n- State the outreach goal clearly but softly (e.g. \"Open to a brief 15-min chat?\")\n- Professional but warm tone — NOT templated or AI-sounding\n- Do NOT mention job postings directly unless the goal is job-related\n- Do NOT use generic openers like \"I hope this finds you well\"\n- End with a low-pressure ask\n\n**STEP 1 — Load safe contacts:**\nCall load_data(filename=<the \"safe_contacts\" value from context>).\n\n**STEP 2 — Generate message for each contact:**\nFor each contact: generate the personalized message using the two-step approach above.\nAdd \"outreach_message\" field to each contact object.\n\n**STEP 3 — Write output and set:**\n- Call append_data(filename=\"personalized_contacts.jsonl\", data=<contact JSON with outreach_message>) for each.\n- Call set_output(\"personalized_contacts\", \"personalized_contacts.jsonl\")",
"tools": [
"load_data",
"append_data"
],
"model": null,
"function": null,
"routes": {},
"max_retries": 3,
"retry_on": [],
"max_node_visits": 0,
"output_model": null,
"max_validation_retries": 2,
"client_facing": false,
"success_criteria": null
},
{
"id": "send-outreach",
"name": "Send Outreach",
"description": "Create Gmail draft emails for each contact using their personalized message. Drafts are created for human review — emails are never sent automatically.",
"node_type": "event_loop",
"input_keys": [
"personalized_contacts",
"outreach_goal"
],
"output_keys": [
"drafts_created"
],
"nullable_output_keys": [],
"input_schema": {},
"output_schema": {},
"system_prompt": "You are an outreach execution assistant. Create Gmail draft emails for each contact.\n\n**CRITICAL RULE: NEVER send emails automatically. Only create drafts.**\n\n**STEP 1 — Load personalized contacts:**\nCall load_data(filename=<the \"personalized_contacts\" value from context>).\nProcess chunk by chunk if has_more=true.\n\n**STEP 2 — Create Gmail draft for each contact:**\nFor each contact with an \"outreach_message\":\n- subject: \"Coffee Chat Request\" (or appropriate subject based on outreach_goal)\n- to: contact's email address (use LinkedIn profile URL if email not available — note this in body)\n- body: the \"outreach_message\" from the contact object\n\nCall gmail_create_draft(\n to=<contact email or linkedin_url as placeholder>,\n subject=<appropriate subject line>,\n body=<outreach_message>\n)\n\nRecord each draft: call append_data(\n filename=\"drafts.jsonl\",\n data=<JSON: {contact_name, contact_email, subject, status: \"draft_created\"}>\n)\n\n**STEP 3 — Set output:**\n- Call set_output(\"drafts_created\", \"drafts.jsonl\")\n\n**IMPORTANT:** If a contact has no email address, create the draft with their LinkedIn URL as a placeholder and add a note in the body: \"Note: Please find the recipient's email before sending.\"",
"tools": [
"gmail_create_draft",
"load_data",
"append_data"
],
"model": null,
"function": null,
"routes": {},
"max_retries": 3,
"retry_on": [],
"max_node_visits": 0,
"output_model": null,
"max_validation_retries": 2,
"client_facing": false,
"success_criteria": null
},
{
"id": "report",
"name": "Report",
"description": "Generate a summary report of the outreach campaign: contacts scored, filtered, messaged, and drafts created. Present to user for review.",
"node_type": "event_loop",
"input_keys": [
"drafts_created",
"filtered_count",
"outreach_goal"
],
"output_keys": [
"summary_report"
],
"nullable_output_keys": [],
"input_schema": {},
"output_schema": {},
"system_prompt": "You are an SDR assistant. Generate a clear campaign summary report and present it to the user.\n\n**STEP 1 — Load draft records:**\nCall load_data(filename=<the \"drafts_created\" value from context>) to read the draft records.\nIf has_more=true, load additional chunks until all records are loaded.\n\n**STEP 2 — Present the report (text only, NO tool calls):**\n\nPresent a clean summary:\n\n📊 **SDR Campaign Summary — [outreach_goal]**\n\n**Overview:**\n- Total contacts processed: [N]\n- Contacts filtered (suspicious profiles): [filtered_count]\n- Safe contacts messaged: [N - filtered_count]\n- Gmail drafts created: [N]\n\n**Drafts Created:**\nList each draft: Contact Name | Company | Subject\n\n**Next Steps:**\n\"Your Gmail drafts are ready for review. Please:\n1. Open Gmail and review each draft\n2. Personalize further if needed\n3. Send when ready\n\nCampaign complete!\"\n\n**STEP 3 — After the user responds, call set_output:**\n- set_output(\"summary_report\", <the formatted report text>)",
"tools": [
"load_data"
],
"model": null,
"function": null,
"routes": {},
"max_retries": 3,
"retry_on": [],
"max_node_visits": 0,
"output_model": null,
"max_validation_retries": 2,
"client_facing": true,
"success_criteria": null
},
{
"id": "complete",
"name": "Complete",
"description": "Terminal node - campaign complete.",
"node_type": "event_loop",
"input_keys": [
"summary_report"
],
"output_keys": [
"final_report"
],
"nullable_output_keys": [],
"input_schema": {},
"output_schema": {},
"system_prompt": "Campaign is complete. Set the final output.\n\nCall set_output(\"final_report\", <summary_report value from context>)",
"tools": [],
"model": null,
"function": null,
"routes": {},
"max_retries": 3,
"retry_on": [],
"max_node_visits": 1,
"output_model": null,
"max_validation_retries": 2,
"client_facing": false,
"success_criteria": null
}
],
"edges": [
{
"id": "intake-to-score",
"source": "intake",
"target": "score-contacts",
"condition": "on_success",
"condition_expr": null,
"priority": 1,
"input_mapping": {}
},
{
"id": "score-to-filter",
"source": "score-contacts",
"target": "filter-contacts",
"condition": "on_success",
"condition_expr": null,
"priority": 1,
"input_mapping": {}
},
{
"id": "filter-to-personalize",
"source": "filter-contacts",
"target": "personalize",
"condition": "on_success",
"condition_expr": null,
"priority": 1,
"input_mapping": {}
},
{
"id": "personalize-to-send",
"source": "personalize",
"target": "send-outreach",
"condition": "on_success",
"condition_expr": null,
"priority": 1,
"input_mapping": {}
},
{
"id": "send-to-report",
"source": "send-outreach",
"target": "report",
"condition": "on_success",
"condition_expr": null,
"priority": 1,
"input_mapping": {}
},
{
"id": "report-to-complete",
"source": "report",
"target": "complete",
"condition": "on_success",
"condition_expr": null,
"priority": 1,
"input_mapping": {}
}
],
"max_steps": 100,
"max_retries_per_node": 3,
"description": "Automated SDR outreach pipeline: score contacts by priority, filter suspicious profiles, generate personalized messages, and create Gmail drafts for human review."
},
"goal": {
"id": "sdr-agent",
"name": "SDR Agent",
"description": "Automate sales development outreach: score contacts by priority, filter suspicious profiles, generate personalized messages, and create Gmail drafts for human review.",
"status": "draft",
"success_criteria": [
{
"id": "contact-scoring-accuracy",
"description": "Contacts are correctly scored and ranked by priority factors (alumni status, connection degree, domain verification)",
"metric": "scoring_accuracy",
"target": ">=90%",
"weight": 0.30,
"met": false
},
{
"id": "scam-filter-effectiveness",
"description": "Suspicious profiles (risk_score >= 7) are correctly identified and excluded from outreach",
"metric": "filter_precision",
"target": ">=95%",
"weight": 0.25,
"met": false
},
{
"id": "message-personalization",
"description": "Generated messages reference specific profile details (alumni connection, role, company) and match the outreach goal",
"metric": "personalization_score",
"target": ">=80%",
"weight": 0.30,
"met": false
},
{
"id": "draft-creation",
"description": "Gmail drafts are created for all safe contacts without errors",
"metric": "draft_success_rate",
"target": "100%",
"weight": 0.15,
"met": false
}
],
"constraints": [
{
"id": "draft-not-send",
"description": "Agent creates Gmail drafts but NEVER sends emails automatically",
"constraint_type": "hard",
"category": "safety",
"check": ""
},
{
"id": "respect-batch-limit",
"description": "Must not process more contacts than the configured max_contacts parameter",
"constraint_type": "hard",
"category": "operational",
"check": ""
},
{
"id": "skip-suspicious",
"description": "Contacts with risk_score >= 7 must be excluded from outreach",
"constraint_type": "hard",
"category": "safety",
"check": ""
}
],
"context": {},
"required_capabilities": [],
"input_schema": {},
"output_schema": {},
"version": "1.0.0",
"parent_version": null,
"evolution_reason": null
},
"required_tools": [
"gmail_create_draft",
"load_data",
"append_data"
],
"metadata": {
"node_count": 7,
"edge_count": 6
}
}
+375
View File
@@ -0,0 +1,375 @@
"""Agent graph construction for SDR Agent."""
from pathlib import Path
from framework.graph import EdgeSpec, EdgeCondition, Goal, SuccessCriterion, Constraint
from framework.graph.checkpoint_config import CheckpointConfig
from framework.graph.edge import AsyncEntryPointSpec, GraphSpec
from framework.graph.executor import ExecutionResult
from framework.llm import LiteLLMProvider
from framework.runner.tool_registry import ToolRegistry
from framework.runtime.agent_runtime import AgentRuntime, create_agent_runtime
from framework.runtime.execution_stream import EntryPointSpec
from .config import default_config, metadata
from .nodes import (
intake_node,
score_contacts_node,
filter_contacts_node,
personalize_node,
send_outreach_node,
report_node,
)
# Goal definition
goal = Goal(
id="sdr-agent",
name="SDR Agent",
description=(
"Automate sales development outreach: score contacts by priority, "
"filter suspicious profiles, generate personalized messages, "
"and create Gmail drafts for human review."
),
success_criteria=[
SuccessCriterion(
id="contact-scoring-accuracy",
description=(
"Contacts are correctly scored and ranked by priority factors "
"(alumni status, connection degree, domain verification)"
),
metric="scoring_accuracy",
target=">=90%",
weight=0.30,
),
SuccessCriterion(
id="scam-filter-effectiveness",
description=(
"Suspicious profiles (risk_score >= 7) are correctly identified "
"and excluded from outreach"
),
metric="filter_precision",
target=">=95%",
weight=0.25,
),
SuccessCriterion(
id="message-personalization",
description=(
"Generated messages reference specific profile details "
"(alumni connection, role, company) and match the outreach goal"
),
metric="personalization_score",
target=">=80%",
weight=0.30,
),
SuccessCriterion(
id="draft-creation",
description="Gmail drafts are created for all safe contacts without errors",
metric="draft_success_rate",
target="100%",
weight=0.15,
),
],
constraints=[
Constraint(
id="draft-not-send",
description="Agent creates Gmail drafts but NEVER sends emails automatically",
constraint_type="hard",
category="safety",
),
Constraint(
id="respect-batch-limit",
description="Must not process more contacts than the configured max_contacts parameter",
constraint_type="hard",
category="operational",
),
Constraint(
id="skip-suspicious",
description="Contacts with risk_score >= 7 must be excluded from outreach",
constraint_type="hard",
category="safety",
),
],
)
# Node list
nodes = [
intake_node,
score_contacts_node,
filter_contacts_node,
personalize_node,
send_outreach_node,
report_node,
]
# Edge definitions
edges = [
EdgeSpec(
id="intake-to-score",
source="intake",
target="score-contacts",
condition=EdgeCondition.ON_SUCCESS,
priority=1,
),
EdgeSpec(
id="score-to-filter",
source="score-contacts",
target="filter-contacts",
condition=EdgeCondition.ON_SUCCESS,
priority=1,
),
EdgeSpec(
id="filter-to-personalize",
source="filter-contacts",
target="personalize",
condition=EdgeCondition.ON_SUCCESS,
priority=1,
),
EdgeSpec(
id="personalize-to-send",
source="personalize",
target="send-outreach",
condition=EdgeCondition.ON_SUCCESS,
priority=1,
),
EdgeSpec(
id="send-to-report",
source="send-outreach",
target="report",
condition=EdgeCondition.ON_SUCCESS,
priority=1,
),
EdgeSpec(
id="report-to-intake",
source="report",
target="intake",
condition=EdgeCondition.ON_SUCCESS,
priority=1,
),
]
# Graph configuration
entry_node = "intake"
entry_points = {"start": "intake"}
async_entry_points: list[AsyncEntryPointSpec] = [] # SDR Agent is manually triggered
pause_nodes = []
terminal_nodes = []
loop_config = {
"max_iterations": 100,
"max_tool_calls_per_turn": 30,
"max_tool_result_chars": 8000,
"max_history_tokens": 32000,
}
conversation_mode = "continuous"
identity_prompt = (
"You are an SDR (Sales Development Representative) assistant. "
"You help users automate their outreach by scoring contacts, filtering "
"suspicious profiles, generating personalized messages, and creating "
"Gmail drafts — all with human review before anything is sent."
)
class SDRAgent:
"""
SDR Agent 6-node pipeline for automated outreach.
Flow: intake -> score-contacts -> filter-contacts -> personalize
-> send-outreach -> report -> intake (loop)
Pipeline:
1. intake: Receive contact list and outreach goal
2. score-contacts: Rank contacts 0-100 by priority factors
3. filter-contacts: Remove suspicious profiles (risk >= 7)
4. personalize: Generate personalized messages for each contact
5. send-outreach: Create Gmail drafts (never sends automatically)
6. report: Summarize campaign results and present to user
"""
def __init__(self, config=None):
self.config = config or default_config
self.goal = goal
self.nodes = nodes
self.edges = edges
self.entry_node = entry_node
self.entry_points = entry_points
self.pause_nodes = pause_nodes
self.terminal_nodes = terminal_nodes
self._agent_runtime: AgentRuntime | None = None
self._graph: GraphSpec | None = None
self._tool_registry: ToolRegistry | None = None
def _build_graph(self) -> GraphSpec:
"""Build the GraphSpec."""
return GraphSpec(
id="sdr-agent-graph",
goal_id=self.goal.id,
version="1.0.0",
entry_node=self.entry_node,
entry_points=self.entry_points,
terminal_nodes=self.terminal_nodes,
pause_nodes=self.pause_nodes,
nodes=self.nodes,
edges=self.edges,
default_model=self.config.model,
max_tokens=self.config.max_tokens,
loop_config=loop_config,
conversation_mode=conversation_mode,
identity_prompt=identity_prompt,
)
def _setup(self, mock_mode=False) -> None:
"""Set up the agent runtime with sessions, checkpoints, and logging."""
self._storage_path = Path.home() / ".hive" / "agents" / "sdr_agent"
self._storage_path.mkdir(parents=True, exist_ok=True)
self._tool_registry = ToolRegistry()
mcp_config_path = Path(__file__).parent / "mcp_servers.json"
if mcp_config_path.exists():
self._tool_registry.load_mcp_config(mcp_config_path)
tools_path = Path(__file__).parent / "tools.py"
if tools_path.exists():
self._tool_registry.discover_from_module(tools_path)
if mock_mode:
from framework.llm.mock import MockLLMProvider
llm = MockLLMProvider()
else:
llm = LiteLLMProvider(
model=self.config.model,
api_key=self.config.api_key,
api_base=self.config.api_base,
)
tool_executor = self._tool_registry.get_executor()
tools = list(self._tool_registry.get_tools().values())
self._graph = self._build_graph()
checkpoint_config = CheckpointConfig(
enabled=True,
checkpoint_on_node_start=False,
checkpoint_on_node_complete=True,
checkpoint_max_age_days=7,
async_checkpoint=True,
)
entry_point_specs = [
EntryPointSpec(
id="default",
name="Default",
entry_node=self.entry_node,
trigger_type="manual",
isolation_level="shared",
),
]
self._agent_runtime = create_agent_runtime(
graph=self._graph,
goal=self.goal,
storage_path=self._storage_path,
entry_points=entry_point_specs,
llm=llm,
tools=tools,
tool_executor=tool_executor,
checkpoint_config=checkpoint_config,
)
async def start(self, mock_mode=False) -> None:
"""Set up and start the agent runtime."""
if self._agent_runtime is None:
self._setup(mock_mode=mock_mode)
if not self._agent_runtime.is_running:
await self._agent_runtime.start()
async def stop(self) -> None:
"""Stop the agent runtime and clean up."""
if self._agent_runtime and self._agent_runtime.is_running:
await self._agent_runtime.stop()
self._agent_runtime = None
async def trigger_and_wait(
self,
entry_point: str,
input_data: dict,
timeout: float | None = None,
session_state: dict | None = None,
) -> ExecutionResult | None:
"""Execute the graph and wait for completion."""
if self._agent_runtime is None:
raise RuntimeError("Agent not started. Call start() first.")
return await self._agent_runtime.trigger_and_wait(
entry_point_id=entry_point,
input_data=input_data,
timeout=timeout,
session_state=session_state,
)
async def run(
self, context: dict, mock_mode=False, session_state=None
) -> ExecutionResult:
"""Run the agent (convenience method for single execution)."""
await self.start(mock_mode=mock_mode)
try:
result = await self.trigger_and_wait(
"default", context, session_state=session_state
)
return result or ExecutionResult(success=False, error="Execution timeout")
finally:
await self.stop()
def info(self):
"""Get agent information."""
return {
"name": metadata.name,
"version": metadata.version,
"description": metadata.description,
"goal": {
"name": self.goal.name,
"description": self.goal.description,
},
"nodes": [n.id for n in self.nodes],
"edges": [e.id for e in self.edges],
"entry_node": self.entry_node,
"entry_points": self.entry_points,
"pause_nodes": self.pause_nodes,
"terminal_nodes": self.terminal_nodes,
"client_facing_nodes": [n.id for n in self.nodes if n.client_facing],
}
def validate(self):
"""Validate agent structure."""
errors = []
warnings = []
node_ids = {node.id for node in self.nodes}
for edge in self.edges:
if edge.source not in node_ids:
errors.append(f"Edge {edge.id}: source '{edge.source}' not found")
if edge.target not in node_ids:
errors.append(f"Edge {edge.id}: target '{edge.target}' not found")
if self.entry_node not in node_ids:
errors.append(f"Entry node '{self.entry_node}' not found")
for terminal in self.terminal_nodes:
if terminal not in node_ids:
errors.append(f"Terminal node '{terminal}' not found")
for ep_id, node_id in self.entry_points.items():
if node_id not in node_ids:
errors.append(
f"Entry point '{ep_id}' references unknown node '{node_id}'"
)
return {
"valid": len(errors) == 0,
"errors": errors,
"warnings": warnings,
}
# Create default instance
default_agent = SDRAgent()
+30
View File
@@ -0,0 +1,30 @@
"""Runtime configuration for SDR Agent."""
from dataclasses import dataclass
from framework.config import RuntimeConfig
default_config = RuntimeConfig()
@dataclass
class AgentMetadata:
name: str = "SDR Agent"
version: str = "1.0.0"
description: str = (
"Automate sales development outreach using AI-powered contact scoring, "
"scam detection, and personalized message generation. "
"Score contacts by priority, filter suspicious profiles, generate "
"personalized outreach messages, and create Gmail drafts for review."
)
intro_message: str = (
"Hi! I'm your SDR (Sales Development Representative) assistant. "
"Provide a list of contacts and your outreach goal, and I'll "
"score them by priority, filter out suspicious profiles, generate "
"personalized messages for each contact, and create Gmail drafts "
"for your review. I never send emails automatically — you stay in control. "
"To get started, share your contact list and tell me about your outreach goal!"
)
metadata = AgentMetadata()
@@ -0,0 +1,97 @@
[
{
"name": "Sarah Chen",
"email": "sarah.chen@techcorp.io",
"company": "TechCorp",
"title": "Learning & Development Manager",
"linkedin_url": "https://linkedin.com/in/sarah-chen-ld",
"connection_degree": "2nd",
"is_alumni": true,
"school_name": "University of Western Ontario",
"connections_count": 843,
"mutual_connections": 7,
"has_photo": true,
"company_domain_verified": true
},
{
"name": "James Okafor",
"email": "james.okafor@edventure.co",
"company": "EdVenture",
"title": "Instructional Designer",
"linkedin_url": "https://linkedin.com/in/james-okafor-id",
"connection_degree": "1st",
"is_alumni": false,
"connections_count": 621,
"mutual_connections": 12,
"has_photo": true,
"company_domain_verified": true
},
{
"name": "Emily Zhao",
"email": "emily.zhao@univedu.ca",
"company": "UniEdu",
"title": "Director of Digital Learning",
"linkedin_url": "https://linkedin.com/in/emily-zhao-dl",
"connection_degree": "2nd",
"is_alumni": true,
"school_name": "University of Western Ontario",
"connections_count": 1204,
"mutual_connections": 3,
"has_photo": true,
"company_domain_verified": true,
"active_job_posting": true
},
{
"name": "Marcus Williams",
"email": "marcus@growthsales.io",
"company": "GrowthSales",
"title": "CEO",
"linkedin_url": "https://linkedin.com/in/marcus-williams-ceo",
"connection_degree": "3rd",
"is_alumni": false,
"connections_count": 6300,
"mutual_connections": 0,
"has_photo": true,
"company_domain_verified": false
},
{
"name": "Priya Patel",
"email": "",
"company": "FutureLearn Inc.",
"title": "EdTech Product Manager",
"linkedin_url": "https://linkedin.com/in/priya-patel-edtech",
"connection_degree": "2nd",
"is_alumni": false,
"connections_count": 512,
"mutual_connections": 5,
"has_photo": true,
"company_domain_verified": true
},
{
"name": "Alex Johnson",
"email": "alex@bizopp.biz",
"company": "Biz Opportunity Global",
"title": "Entrepreneur",
"linkedin_url": "https://linkedin.com/in/alex-johnson-biz",
"connection_degree": "3rd",
"is_alumni": false,
"connections_count": 38,
"mutual_connections": 0,
"has_photo": false,
"company_domain_verified": false
},
{
"name": "Natalie Brown",
"email": "natalie.brown@learningpro.com",
"company": "LearningPro",
"title": "HR Learning Specialist",
"linkedin_url": "https://linkedin.com/in/natalie-brown-hr",
"connection_degree": "1st",
"is_alumni": true,
"school_name": "University of Western Ontario",
"connections_count": 389,
"mutual_connections": 9,
"has_photo": true,
"company_domain_verified": true
}
]
+270
View File
@@ -0,0 +1,270 @@
{
"original_draft": {
"agent_name": "sdr_agent",
"goal": "Automate sales development outreach: score contacts by priority, filter suspicious profiles, generate personalized messages, and create Gmail drafts for human review.",
"description": "",
"success_criteria": [
"Contacts are correctly scored and ranked by priority factors (alumni status, connection degree, domain verification)",
"Suspicious profiles (risk_score >= 7) are correctly identified and excluded from outreach",
"Generated messages reference specific profile details (alumni connection, role, company) and match the outreach goal",
"Gmail drafts are created for all safe contacts without errors"
],
"constraints": [
"Agent creates Gmail drafts but NEVER sends emails automatically",
"Must not process more contacts than the configured max_contacts parameter",
"Contacts with risk_score >= 7 must be excluded from outreach"
],
"nodes": [
{
"id": "intake",
"name": "Intake",
"description": "Receive the contact list and outreach goal from the user. Confirm the strategy and batch size before proceeding.",
"node_type": "event_loop",
"tools": [
"load_contacts_from_file"
],
"input_keys": [
"contacts",
"outreach_goal",
"max_contacts",
"user_background"
],
"output_keys": [
"contacts",
"outreach_goal",
"max_contacts",
"user_background"
],
"success_criteria": "The user has confirmed the contact list, outreach goal, batch size, and their background. All four keys have been written via set_output.",
"sub_agents": [],
"flowchart_type": "start",
"flowchart_shape": "stadium",
"flowchart_color": "#8aad3f"
},
{
"id": "score-contacts",
"name": "Score Contacts",
"description": "Score and rank each contact from 0 to 100 based on priority factors: alumni status, connection degree, domain verification, mutual connections, and active job postings.",
"node_type": "event_loop",
"tools": [
"load_data",
"append_data"
],
"input_keys": [
"contacts",
"outreach_goal"
],
"output_keys": [
"scored_contacts"
],
"success_criteria": "Every contact has a priority_score field (0-100) and scored_contacts.jsonl has been written and referenced via set_output.",
"sub_agents": [],
"flowchart_type": "database",
"flowchart_shape": "cylinder",
"flowchart_color": "#508878"
},
{
"id": "filter-contacts",
"name": "Filter Contacts",
"description": "Analyze each contact for authenticity and filter out suspicious profiles. Any contact with a risk score of 7 or higher is skipped.",
"node_type": "event_loop",
"tools": [
"load_data",
"append_data"
],
"input_keys": [
"scored_contacts"
],
"output_keys": [
"safe_contacts",
"filtered_count"
],
"success_criteria": "Each contact has a risk_score and recommendation field. Contacts with risk_score >= 7 are excluded. safe_contacts.jsonl and filtered_count are set via set_output.",
"sub_agents": [],
"flowchart_type": "database",
"flowchart_shape": "cylinder",
"flowchart_color": "#508878"
},
{
"id": "personalize",
"name": "Personalize",
"description": "Generate a personalized outreach message for each contact based on their profile, shared background, and the user's outreach goal.",
"node_type": "event_loop",
"tools": [
"load_data",
"append_data"
],
"input_keys": [
"safe_contacts",
"outreach_goal",
"user_background"
],
"output_keys": [
"personalized_contacts"
],
"success_criteria": "Every safe contact has an outreach_message field of 80-120 words that references a specific hook from their profile. personalized_contacts.jsonl is set via set_output.",
"sub_agents": [],
"flowchart_type": "database",
"flowchart_shape": "cylinder",
"flowchart_color": "#508878"
},
{
"id": "send-outreach",
"name": "Send Outreach",
"description": "Create Gmail draft emails for each contact using their personalized message. Drafts are created for human review \u2014 emails are never sent automatically.",
"node_type": "event_loop",
"tools": [
"gmail_create_draft",
"load_data",
"append_data"
],
"input_keys": [
"personalized_contacts",
"outreach_goal"
],
"output_keys": [
"drafts_created"
],
"success_criteria": "A Gmail draft has been created for every safe contact. drafts.jsonl records each draft and drafts_created is set via set_output.",
"sub_agents": [],
"flowchart_type": "database",
"flowchart_shape": "cylinder",
"flowchart_color": "#508878"
},
{
"id": "report",
"name": "Report",
"description": "Generate a summary report of the outreach campaign: contacts scored, filtered, messaged, and drafts created. Present to user for review.",
"node_type": "event_loop",
"tools": [
"load_data"
],
"input_keys": [
"drafts_created",
"filtered_count",
"outreach_goal"
],
"output_keys": [
"summary_report"
],
"success_criteria": "A campaign summary has been presented to the user listing totals for contacts scored, filtered, messaged, and drafts created. summary_report is set via set_output.",
"sub_agents": [],
"flowchart_type": "terminal",
"flowchart_shape": "stadium",
"flowchart_color": "#b5453a"
}
],
"edges": [
{
"id": "edge-0",
"source": "intake",
"target": "score-contacts",
"condition": "on_success",
"description": "",
"label": ""
},
{
"id": "edge-1",
"source": "score-contacts",
"target": "filter-contacts",
"condition": "on_success",
"description": "",
"label": ""
},
{
"id": "edge-2",
"source": "filter-contacts",
"target": "personalize",
"condition": "on_success",
"description": "",
"label": ""
},
{
"id": "edge-3",
"source": "personalize",
"target": "send-outreach",
"condition": "on_success",
"description": "",
"label": ""
},
{
"id": "edge-4",
"source": "send-outreach",
"target": "report",
"condition": "on_success",
"description": "",
"label": ""
},
{
"id": "edge-5",
"source": "report",
"target": "intake",
"condition": "on_success",
"description": "",
"label": ""
}
],
"entry_node": "intake",
"terminal_nodes": [
"report"
],
"flowchart_legend": {
"start": {
"shape": "stadium",
"color": "#8aad3f"
},
"terminal": {
"shape": "stadium",
"color": "#b5453a"
},
"process": {
"shape": "rectangle",
"color": "#b5a575"
},
"decision": {
"shape": "diamond",
"color": "#d89d26"
},
"io": {
"shape": "parallelogram",
"color": "#d06818"
},
"document": {
"shape": "document",
"color": "#c4b830"
},
"database": {
"shape": "cylinder",
"color": "#508878"
},
"subprocess": {
"shape": "subroutine",
"color": "#887a48"
},
"browser": {
"shape": "hexagon",
"color": "#cc8850"
}
}
},
"flowchart_map": {
"intake": [
"intake"
],
"score-contacts": [
"score-contacts"
],
"filter-contacts": [
"filter-contacts"
],
"personalize": [
"personalize"
],
"send-outreach": [
"send-outreach"
],
"report": [
"report"
]
}
}
@@ -0,0 +1,14 @@
{
"hive-tools": {
"transport": "stdio",
"command": "uv",
"args": [
"run",
"python",
"mcp_server.py",
"--stdio"
],
"cwd": "../../../tools",
"description": "Hive tools MCP server"
}
}
@@ -0,0 +1,339 @@
"""Node definitions for SDR Agent."""
from framework.graph import NodeSpec
# Node 1: Intake (client-facing)
# Receives contact list and outreach goal, confirms with user before proceeding.
intake_node = NodeSpec(
id="intake",
name="Intake",
description=(
"Receive the contact list and outreach goal from the user. "
"Confirm the strategy and batch size before proceeding."
),
node_type="event_loop",
client_facing=True,
max_node_visits=0,
input_keys=["contacts", "outreach_goal", "max_contacts", "user_background"],
output_keys=["contacts", "outreach_goal", "max_contacts", "user_background"],
success_criteria=(
"The user has confirmed the contact list, outreach goal, batch size, and "
"their background. All four keys have been written via set_output."
),
system_prompt="""\
You are an SDR (Sales Development Representative) assistant helping automate outreach.
**STEP 1 Understand the input (text only, NO tool calls):**
Read the user's input from context. Determine what they provided:
- If "contacts" is a **file path** (ends in .json or .jsonl), note that you'll load it in step 2.
- If "contacts" is a **JSON string**, you'll use it directly.
- Identify the outreach goal, background, and batch size (default 20).
**STEP 2 Load contacts if needed:**
If the user provided a file path for contacts, call:
- load_contacts_from_file(file_path=<the path>)
This writes the contacts to contacts.jsonl in the session directory.
**STEP 3 Confirm with the user (text only, NO tool calls):**
Present a summary like:
"Here's what I'll do:
1. Score and rank your contacts by priority (alumni status, connection degree, etc.)
2. Filter out suspicious or low-quality profiles (risk score 7)
3. Generate a personalized outreach message for each contact
4. Create Gmail draft emails for your review I never send automatically
Ready to proceed with [N] contacts for [goal]?"
**STEP 4 After the user confirms, call set_output:**
- set_output("contacts", <the contact list as a JSON string, or "contacts.jsonl" if loaded from file>)
- set_output("outreach_goal", <the confirmed goal, e.g. "coffee chat">)
- set_output("max_contacts", <the confirmed batch size as a string, e.g. "20">)
- set_output("user_background", <user's background/role, e.g. "Learning Technologist at UWO">)
""",
tools=["load_contacts_from_file"],
)
# Node 2: Score Contacts
# Ranks contacts 0-100 based on alumni status, connection degree, domain, etc.
score_contacts_node = NodeSpec(
id="score-contacts",
name="Score Contacts",
description=(
"Score and rank each contact from 0 to 100 based on priority factors: "
"alumni status, connection degree, domain verification, mutual connections, "
"and active job postings."
),
node_type="event_loop",
client_facing=False,
max_node_visits=0,
input_keys=["contacts", "outreach_goal"],
output_keys=["scored_contacts"],
success_criteria=(
"Every contact has a priority_score field (0-100) and scored_contacts.jsonl "
"has been written and referenced via set_output."
),
system_prompt="""\
You are a contact prioritization engine. Score each contact from 0 to 100.
**SCORING RULES (additive):**
- Alumni of the user's school: +30 points
- 1st degree connection: +25 points
- 2nd degree connection: +20 points
- 3rd degree connection: +10 points
- Domain verified (company email matches LinkedIn company): +10 points
- Has mutual connections (1 point each, max 10): up to +10 points
- Active job posting at their company: +10 points
- Has a profile photo: +5 points
- Over 500 connections: +5 points
Cap the final score at 100.
**STEP 1 Load the contacts:**
Call load_data(filename="contacts.jsonl") to read the contact list.
If "contacts" in context is a JSON string (not a filename), write it first:
- For each contact in the list, call append_data(filename="contacts.jsonl", data=<JSON contact object>)
Then read it back.
**STEP 2 Score each contact:**
For each contact, calculate the priority score using the rules above.
Add a "priority_score" field to each contact object.
**STEP 3 Write scored contacts and set output:**
- Call append_data(filename="scored_contacts.jsonl", data=<JSON contact with priority_score>) for each contact.
- Sort contacts by priority_score (highest first) in your final output.
- Call set_output("scored_contacts", "scored_contacts.jsonl")
""",
tools=["load_data", "append_data"],
)
# Node 3: Filter Contacts (Scam Detection)
# Filters out suspicious or fake profiles using a risk scoring system.
filter_contacts_node = NodeSpec(
id="filter-contacts",
name="Filter Contacts",
description=(
"Analyze each contact for authenticity and filter out suspicious profiles. "
"Any contact with a risk score of 7 or higher is skipped."
),
node_type="event_loop",
client_facing=False,
max_node_visits=0,
input_keys=["scored_contacts"],
output_keys=["safe_contacts", "filtered_count"],
success_criteria=(
"Each contact has a risk_score and recommendation field. Contacts with "
"risk_score >= 7 are excluded. safe_contacts.jsonl and filtered_count are "
"set via set_output."
),
system_prompt="""\
You are a profile authenticity analyzer. Your job is to detect suspicious or fake LinkedIn profiles.
**RISK SCORING RULES (additive):**
- Fewer than 50 connections: +3 points
- No profile photo: +2 points
- Fewer than 2 positions in work history: +2 points
- Generic title (e.g. "entrepreneur", "CEO", "consultant") AND fewer than 100 connections: +2 points
- Company name appears generic or unverifiable: +2 points
- Profile text seems auto-generated or overly promotional: +2 points
- Connection count over 5000 with no mutual connections: +1 point
**DECISION RULE:**
- risk_score < 4: SAFE include in outreach
- risk_score 46: CAUTION include but flag
- risk_score 7: SKIP exclude from outreach
**STEP 1 Load scored contacts:**
Call load_data(filename=<the "scored_contacts" value from context>).
Process contacts chunk by chunk if has_more=true.
**STEP 2 Analyze each contact:**
For each contact, calculate a risk_score using the rules above.
Determine: is_safe (risk_score < 7), recommendation (safe/caution/skip), flags (list of triggered rules).
**STEP 3 Write safe contacts and set output:**
- For each contact where risk_score < 7: call append_data(filename="safe_contacts.jsonl", data=<contact JSON with risk_score and flags added>)
- Track how many contacts were filtered (risk_score 7)
- Call set_output("safe_contacts", "safe_contacts.jsonl")
- Call set_output("filtered_count", <number of skipped contacts as string>)
""",
tools=["load_data", "append_data"],
)
# Node 4: Personalize Messages
# Generates personalized outreach messages for each safe contact.
personalize_node = NodeSpec(
id="personalize",
name="Personalize",
description=(
"Generate a personalized outreach message for each contact based on "
"their profile, shared background, and the user's outreach goal."
),
node_type="event_loop",
client_facing=False,
max_node_visits=0,
input_keys=["safe_contacts", "outreach_goal", "user_background"],
output_keys=["personalized_contacts"],
success_criteria=(
"Every safe contact has an outreach_message field of 80-120 words that "
"references a specific hook from their profile. personalized_contacts.jsonl "
"is set via set_output."
),
system_prompt="""\
You are a professional outreach message writer. Generate personalized messages for each contact.
**TWO-STEP PERSONALIZATION:**
For each contact, follow this two-step approach:
STEP A Extract hooks (analyze the profile):
Look for 2-3 specific talking points from the contact's profile:
- Shared alumni connection
- Specific role, company, or career transition worth mentioning
- Any mutual interests aligned with the user's background
STEP B Generate the message:
Write a warm, professional outreach message using the hooks.
**MESSAGE REQUIREMENTS:**
- 80-120 words (LinkedIn message length)
- Start with a specific observation ("I noticed you..." or "Fellow [school] alum here...")
- Mention the shared connection or interest naturally
- State the outreach goal clearly but softly (e.g. "Open to a brief 15-min chat?")
- Professional but warm tone NOT templated or AI-sounding
- Do NOT mention job postings directly unless the goal is job-related
- Do NOT use generic openers like "I hope this finds you well"
- End with a low-pressure ask
**STEP 1 Load safe contacts:**
Call load_data(filename=<the "safe_contacts" value from context>).
**STEP 2 Generate message for each contact:**
For each contact: generate the personalized message using the two-step approach above.
Add "outreach_message" field to each contact object.
**STEP 3 Write output and set:**
- Call append_data(filename="personalized_contacts.jsonl", data=<contact JSON with outreach_message>) for each.
- Call set_output("personalized_contacts", "personalized_contacts.jsonl")
""",
tools=["load_data", "append_data"],
)
# Node 5: Send Outreach (Create Gmail Drafts)
# Creates Gmail draft emails for each personalized contact. Never sends automatically.
send_outreach_node = NodeSpec(
id="send-outreach",
name="Send Outreach",
description=(
"Create Gmail draft emails for each contact using their personalized message. "
"Drafts are created for human review — emails are never sent automatically."
),
node_type="event_loop",
client_facing=False,
max_node_visits=0,
input_keys=["personalized_contacts", "outreach_goal"],
output_keys=["drafts_created"],
success_criteria=(
"A Gmail draft has been created for every safe contact. "
"drafts.jsonl records each draft and drafts_created is set via set_output."
),
system_prompt="""\
You are an outreach execution assistant. Create Gmail draft emails for each contact.
**CRITICAL RULE: NEVER send emails automatically. Only create drafts.**
**STEP 1 Load personalized contacts:**
Call load_data(filename=<the "personalized_contacts" value from context>).
Process chunk by chunk if has_more=true.
**STEP 2 Create Gmail draft for each contact:**
For each contact with an "outreach_message":
- subject: "Coffee Chat Request" (or appropriate subject based on outreach_goal)
- to: contact's email address (use LinkedIn profile URL if email not available — note this in body)
- body: the "outreach_message" from the contact object
Call gmail_create_draft(
to=<contact email or linkedin_url as placeholder>,
subject=<appropriate subject line>,
body=<outreach_message>
)
Record each draft: call append_data(
filename="drafts.jsonl",
data=<JSON: {contact_name, contact_email, subject, status: "draft_created"}>
)
**STEP 3 Set output:**
- Call set_output("drafts_created", "drafts.jsonl")
**IMPORTANT:** If a contact has no email address, create the draft with their LinkedIn URL as a placeholder
and add a note in the body: "Note: Please find the recipient's email before sending."
""",
tools=["gmail_create_draft", "load_data", "append_data"],
)
# Node 6: Report (client-facing)
# Summarizes results and presents to user for review.
report_node = NodeSpec(
id="report",
name="Report",
description=(
"Generate a summary report of the outreach campaign: contacts scored, "
"filtered, messaged, and drafts created. Present to user for review."
),
node_type="event_loop",
client_facing=True,
max_node_visits=0,
input_keys=["drafts_created", "filtered_count", "outreach_goal"],
output_keys=["summary_report"],
success_criteria=(
"A campaign summary has been presented to the user listing totals for "
"contacts scored, filtered, messaged, and drafts created. "
"summary_report is set via set_output."
),
system_prompt="""\
You are an SDR assistant. Generate a clear campaign summary report and present it to the user.
**STEP 1 Load draft records:**
Call load_data(filename=<the "drafts_created" value from context>) to read the draft records.
If has_more=true, load additional chunks until all records are loaded.
**STEP 2 Present the report (text only, NO tool calls):**
Present a clean summary:
📊 **SDR Campaign Summary [outreach_goal]**
**Overview:**
- Total contacts processed: [N]
- Contacts filtered (suspicious profiles): [filtered_count]
- Safe contacts messaged: [N - filtered_count]
- Gmail drafts created: [N]
**Drafts Created:**
List each draft: Contact Name | Company | Subject
**Next Steps:**
"Your Gmail drafts are ready for review. Please:
1. Open Gmail and review each draft
2. Personalize further if needed
3. Send when ready
Would you like to run another outreach batch or adjust the strategy?"
**STEP 3 After the user responds, call set_output:**
- set_output("summary_report", <the formatted report text>)
""",
tools=["load_data"],
)
__all__ = [
"intake_node",
"score_contacts_node",
"filter_contacts_node",
"personalize_node",
"send_outreach_node",
"report_node",
]
+132
View File
@@ -0,0 +1,132 @@
"""
Custom tool functions for SDR Agent.
Follows the ToolRegistry.discover_from_module() contract:
- TOOLS: dict[str, Tool] tool definitions
- tool_executor(tool_use) unified dispatcher
These tools provide SDR-specific utilities for loading contact data
from a JSON file and writing it to the session's data directory for
downstream nodes to process via the standard load_data/append_data tools.
"""
from __future__ import annotations
import json
from framework.llm.provider import Tool, ToolResult, ToolUse
from framework.runner.tool_registry import _execution_context
# ---------------------------------------------------------------------------
# Tool definitions (auto-discovered by ToolRegistry.discover_from_module)
# ---------------------------------------------------------------------------
TOOLS = {
"load_contacts_from_file": Tool(
name="load_contacts_from_file",
description=(
"Load a contacts JSON file from an absolute or relative path "
"and write its contents to contacts.jsonl in the session data directory. "
"Returns the number of contacts loaded and the output filename."
),
parameters={
"type": "object",
"properties": {
"file_path": {
"type": "string",
"description": (
"Absolute or relative path to a JSON file containing "
"a list of contact objects."
),
},
},
"required": ["file_path"],
},
),
}
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _get_data_dir() -> str:
"""Get the session-scoped data_dir from ToolRegistry execution context."""
ctx = _execution_context.get()
if not ctx or "data_dir" not in ctx:
raise RuntimeError(
"data_dir not set in execution context. "
"Is the tool running inside a GraphExecutor?"
)
return ctx["data_dir"]
# ---------------------------------------------------------------------------
# Core implementation
# ---------------------------------------------------------------------------
def _load_contacts_from_file(file_path: str) -> dict:
"""Read a contacts JSON file and write it as contacts.jsonl to data_dir.
Args:
file_path: Path to the contacts JSON file.
Returns:
dict with ``filename`` (always ``"contacts.jsonl"``) and ``count``.
"""
from pathlib import Path
data_dir = _get_data_dir()
Path(data_dir).mkdir(parents=True, exist_ok=True)
output_path = Path(data_dir) / "contacts.jsonl"
try:
with open(file_path, encoding="utf-8") as f:
contacts = json.load(f)
except FileNotFoundError:
return {"error": f"File not found: {file_path}"}
except json.JSONDecodeError as e:
return {"error": f"Invalid JSON: {e}"}
if not isinstance(contacts, list):
contacts = [contacts]
count = 0
with open(output_path, "w", encoding="utf-8") as f:
for contact in contacts:
f.write(json.dumps(contact, ensure_ascii=False) + "\n")
count += 1
return {"filename": "contacts.jsonl", "count": count}
# ---------------------------------------------------------------------------
# Unified tool executor (auto-discovered by ToolRegistry.discover_from_module)
# ---------------------------------------------------------------------------
def tool_executor(tool_use: ToolUse) -> ToolResult:
"""Dispatch tool calls to their implementations."""
if tool_use.name == "load_contacts_from_file":
try:
file_path = tool_use.input.get("file_path", "")
result = _load_contacts_from_file(file_path=file_path)
return ToolResult(
tool_use_id=tool_use.id,
content=json.dumps(result),
is_error="error" in result,
)
except Exception as e:
return ToolResult(
tool_use_id=tool_use.id,
content=json.dumps({"error": str(e)}),
is_error=True,
)
return ToolResult(
tool_use_id=tool_use.id,
content=json.dumps({"error": f"Unknown tool: {tool_use.name}"}),
is_error=True,
)
+4 -4
View File
@@ -1058,7 +1058,7 @@ case $choice in
SELECTED_PROVIDER_ID="anthropic"
SELECTED_MODEL="claude-opus-4-6"
SELECTED_MAX_TOKENS=32768
SELECTED_MAX_CONTEXT_TOKENS=180000 # Claude — 200k context window
SELECTED_MAX_CONTEXT_TOKENS=960000 # Claude — 1M context window
echo ""
echo -e "${GREEN}${NC} Using Claude Code subscription"
fi
@@ -1070,7 +1070,7 @@ case $choice in
SELECTED_ENV_VAR="ZAI_API_KEY"
SELECTED_MODEL="glm-5"
SELECTED_MAX_TOKENS=32768
SELECTED_MAX_CONTEXT_TOKENS=120000 # GLM-5 — 128k context window
SELECTED_MAX_CONTEXT_TOKENS=180000 # GLM-5 — 200k context window
PROVIDER_NAME="ZAI"
echo ""
echo -e "${GREEN}${NC} Using ZAI Code subscription"
@@ -1128,7 +1128,7 @@ case $choice in
SELECTED_ENV_VAR="KIMI_API_KEY"
SELECTED_MODEL="kimi-k2.5"
SELECTED_MAX_TOKENS=32768
SELECTED_MAX_CONTEXT_TOKENS=120000 # Kimi K2.5 — 128k context window
SELECTED_MAX_CONTEXT_TOKENS=240000 # Kimi K2.5 — 256k context window
SELECTED_API_BASE="https://api.kimi.com/coding"
PROVIDER_NAME="Kimi"
SIGNUP_URL="https://www.kimi.com/code"
@@ -1142,7 +1142,7 @@ case $choice in
SELECTED_PROVIDER_ID="hive"
SELECTED_ENV_VAR="HIVE_API_KEY"
SELECTED_MAX_TOKENS=32768
SELECTED_MAX_CONTEXT_TOKENS=120000
SELECTED_MAX_CONTEXT_TOKENS=180000
SELECTED_API_BASE="$HIVE_LLM_ENDPOINT"
PROVIDER_NAME="Hive"
SIGNUP_URL="https://discord.com/invite/hQdU7QDkgR"
+1 -1
View File
@@ -12,7 +12,7 @@ import zlib
# Files beyond this size are skipped/rejected in hashline mode because
# hashline anchors are not practical on files this large (minified
# bundles, logs, data dumps). Shared by view_file, grep_search, and
# bundles, logs, data dumps). Shared by read_file, grep_search, and
# hashline_edit.
HASHLINE_MAX_FILE_BYTES = 10 * 1024 * 1024 # 10 MB
+1 -5
View File
@@ -70,8 +70,6 @@ from .file_system_toolkits.list_dir import register_tools as register_list_dir
from .file_system_toolkits.replace_file_content import (
register_tools as register_replace_file_content,
)
from .file_system_toolkits.view_file import register_tools as register_view_file
from .file_system_toolkits.write_to_file import register_tools as register_write_to_file
from .github_tool import register_tools as register_github
from .gitlab_tool import register_tools as register_gitlab
from .gmail_tool import register_tools as register_gmail
@@ -186,14 +184,12 @@ def _register_verified(
register_account_info(mcp, credentials=credentials)
# --- File system toolkits ---
register_view_file(mcp)
register_write_to_file(mcp)
register_list_dir(mcp)
register_replace_file_content(mcp)
register_apply_diff(mcp)
register_apply_patch(mcp)
register_grep_search(mcp)
# hashline_edit: anchor-based editing, pairs with view_file/grep_search hashline mode
# hashline_edit: anchor-based editing, pairs with read_file/grep_search hashline mode
register_hashline_edit(mcp)
register_execute_command(mcp)
register_data_tools(mcp)
@@ -75,7 +75,7 @@ def register_tools(mcp: FastMCP) -> None:
try:
if hashline:
# Use splitlines() for anchor consistency with
# view_file/hashline_edit (handles Unicode line
# read_file/hashline_edit (handles Unicode line
# separators like \u2028, \x85).
# Skip files > 10MB to avoid excessive memory use.
file_size = os.path.getsize(file_path)
@@ -6,11 +6,11 @@ Edit files using anchor-based line references for precise, hash-validated edits.
The `hashline_edit` tool enables file editing using short content-hash anchors (`N:hhhh`) instead of requiring exact text reproduction. Each line's anchor includes a 4-character hash of its content. If the file has changed since the model last read it, the hash won't match and the edit is cleanly rejected.
Use this tool together with `view_file(hashline=True)` and `grep_search(hashline=True)`, which return anchors for each line.
Use this tool together with `read_file(hashline=True)` and `grep_search(hashline=True)`, which return anchors for each line.
## Use Cases
- Making targeted edits after reading a file with `view_file(hashline=True)`
- Making targeted edits after reading a file with `read_file(hashline=True)`
- Replacing single lines, line ranges, or inserting new lines by anchor
- Batch editing multiple locations in a single atomic call
- Falling back to string replacement when anchors are not available
@@ -21,7 +21,7 @@ Use this tool together with `view_file(hashline=True)` and `grep_search(hashline
import json
# First, read the file with hashline mode to get anchors
content = view_file(path="app.py", hashline=True, workspace_id="ws-1", agent_id="a-1", session_id="s-1")
content = read_file(path="app.py", hashline=True)
# Returns lines like: 1:a3b1|def main(): 2:f1c2| print("hello") ...
# Then edit using the anchors
@@ -29,25 +29,10 @@ hashline_edit(
path="app.py",
edits=json.dumps([
{"op": "set_line", "anchor": "2:f1c2", "content": ' print("goodbye")'}
]),
workspace_id="ws-1",
agent_id="a-1",
session_id="s-1"
])
)
```
## Arguments
| Argument | Type | Required | Default | Description |
|----------|------|----------|---------|-------------|
| `path` | str | Yes | - | The path to the file (relative to session root) |
| `edits` | str | Yes | - | JSON string containing a list of edit operations (see Operations below) |
| `workspace_id` | str | Yes | - | The ID of the workspace |
| `agent_id` | str | Yes | - | The ID of the agent |
| `session_id` | str | Yes | - | The ID of the current session |
| `auto_cleanup` | bool | No | `True` | Strip hashline prefixes and echoed context from content. Set to `False` to write content exactly as provided. |
| `encoding` | str | No | `"utf-8"` | File encoding. Must match the file's actual encoding. |
## Operations
The `edits` parameter is a JSON array of operation objects. Each object must have an `"op"` field:
@@ -61,62 +46,6 @@ The `edits` parameter is a JSON array of operation objects. Each object must hav
| `replace` | `old_content`, `new_content`, `allow_multiple` (optional) | Fallback string replacement; errors if 0 or 2+ matches (unless `allow_multiple: true`) |
| `append` | `content` | Append new lines to end of file (works for empty files too) |
## Returns
**Success:**
```python
{
"success": True,
"path": "app.py",
"edits_applied": 2,
"content": "1:b2c4|def main():\n2:c4a1| print(\"goodbye\")\n..."
}
```
**Success (noop, content unchanged after applying edits):**
```python
{
"success": True,
"path": "app.py",
"edits_applied": 0,
"note": "Content unchanged after applying edits",
"content": "1:b2c4|def main():\n..."
}
```
**Success (with auto-cleanup applied):**
```python
{
"success": True,
"path": "app.py",
"edits_applied": 1,
"content": "...",
"cleanup_applied": ["prefix_strip"]
}
```
The `cleanup_applied` field is only present when cleanup actually modified content. Possible values: `prefix_strip`, `boundary_echo_strip`, `insert_echo_strip`.
**Success (replace with allow_multiple):**
```python
{
"success": True,
"path": "app.py",
"edits_applied": 1,
"content": "...",
"replacements": {"edit_1": 3}
}
```
The `replacements` field is only present when `allow_multiple: true` was used, showing the count per replace op.
**Error:**
```python
{
"error": "Edit #1 (set_line): Hash mismatch at line 2: expected 'f1c2', got 'a3b1'. Re-read the file to get current anchors."
}
```
## Error Handling
- Returns an error if the file doesn't exist
@@ -127,90 +56,11 @@ The `replacements` field is only present when `allow_multiple: true` was used, s
- Returns an error for unknown op types or invalid JSON
- All edits are validated before any writes occur (atomic): on any error the file is unchanged
## Examples
### Replacing a single line
```python
edits = json.dumps([
{"op": "set_line", "anchor": "5:a3b1", "content": " return result"}
])
result = hashline_edit(path="app.py", edits=edits, workspace_id="ws-1", agent_id="a-1", session_id="s-1")
# Returns: {"success": True, "path": "app.py", "edits_applied": 1, "content": "..."}
```
### Replacing a range of lines
```python
edits = json.dumps([{
"op": "replace_lines",
"start_anchor": "10:b1c2",
"end_anchor": "15:c2d3",
"content": " # simplified\n return x + y"
}])
result = hashline_edit(path="math.py", edits=edits, workspace_id="ws-1", agent_id="a-1", session_id="s-1")
```
### Inserting new lines after
```python
edits = json.dumps([
{"op": "insert_after", "anchor": "3:d4e5", "content": "import os\nimport sys"}
])
result = hashline_edit(path="app.py", edits=edits, workspace_id="ws-1", agent_id="a-1", session_id="s-1")
```
### Inserting new lines before
```python
edits = json.dumps([
{"op": "insert_before", "anchor": "1:a1b2", "content": "#!/usr/bin/env python3"}
])
result = hashline_edit(path="app.py", edits=edits, workspace_id="ws-1", agent_id="a-1", session_id="s-1")
```
### Batch editing
```python
edits = json.dumps([
{"op": "set_line", "anchor": "1:a1b2", "content": "#!/usr/bin/env python3"},
{"op": "insert_after", "anchor": "2:b2c3", "content": "import logging"},
{"op": "set_line", "anchor": "10:c3d4", "content": " logging.info('done')"},
])
result = hashline_edit(path="app.py", edits=edits, workspace_id="ws-1", agent_id="a-1", session_id="s-1")
```
### Replace all occurrences
```python
edits = json.dumps([
{"op": "replace", "old_content": "old_name", "new_content": "new_name", "allow_multiple": True}
])
result = hashline_edit(path="app.py", edits=edits, workspace_id="ws-1", agent_id="a-1", session_id="s-1")
# Returns: {..., "replacements": {"edit_1": 5}}
```
## Notes
- Anchors are generated by `view_file(hashline=True)` and `grep_search(hashline=True)`
- Anchors are generated by `read_file(hashline=True)` and `grep_search(hashline=True)`
- The hash is a CRC32-based 4-char hex digest of the line content (with trailing spaces and tabs stripped; leading whitespace is included so indentation changes invalidate anchors). Collision probability is ~0.0015% per changed line.
- All anchor-based ops are validated before any writes occur; if any op fails validation, the file is left unchanged
- String `replace` ops are applied after all anchor-based splices, so they match against post-splice content
- Original line endings (LF or CRLF) are preserved
- The response includes the updated file content in hashline format, so subsequent edits can use the new anchors without re-reading
## Auto-Cleanup Details
When `auto_cleanup=True` (the default), the tool strips hashline prefixes and echoed context that LLMs frequently include in edit content. Prefix stripping uses a **2+ non-empty line threshold** to avoid false positives. The prefix regex matches the `N:hhhh|` pattern (4-char hex hash).
**Why the threshold matters:** Single-line content matching the `N:hhhh|` pattern is ambiguous. It could be literal content (CSV data, config values, log format strings) that happens to match the pattern. With 2+ lines all matching, the probability of a false positive drops dramatically.
**Single-line example (NOT stripped):**
```python
# set_line with content "5:a3b1|hello" writes literally "5:a3b1|hello"
{"op": "set_line", "anchor": "2:f1c2", "content": "5:a3b1|hello"}
```
**Multi-line example (stripped):**
```python
# replace_lines where all lines match N:hhhh| pattern gets stripped
{"op": "replace_lines", "start_anchor": "2:f1c2", "end_anchor": "3:b2d3",
"content": "2:a3b1|BBB\n3:c4d2|CCC"}
# Writes "BBB\nCCC" (prefixes removed)
```
**Escape hatch:** Set `auto_cleanup=False` to write content exactly as provided, bypassing all cleanup heuristics.
@@ -39,7 +39,7 @@ def register_tools(mcp: FastMCP) -> None:
Edit a file using anchor-based line references (N:hash) for precise edits.
When to use
After reading a file with view_file(hashline=True), use the anchors to make
After reading a file with read_file(hashline=True), use the anchors to make
targeted edits without reproducing exact file content.
Rules & Constraints
@@ -1,106 +0,0 @@
# View File Tool
Reads the content of a file within the secure session sandbox.
## Description
The `view_file` tool allows you to read and retrieve the complete content of files within a sandboxed session environment. It provides metadata about the file along with its content.
## Use Cases
- Reading configuration files
- Viewing source code
- Inspecting log files
- Retrieving data files for processing
## Usage
```python
view_file(
path="config/settings.json",
workspace_id="workspace-123",
agent_id="agent-456",
session_id="session-789"
)
```
## Arguments
| Argument | Type | Required | Default | Description |
|----------|------|----------|---------|-------------|
| `path` | str | Yes | - | The path to the file (relative to session root) |
| `workspace_id` | str | Yes | - | The ID of the workspace |
| `agent_id` | str | Yes | - | The ID of the agent |
| `session_id` | str | Yes | - | The ID of the current session |
| `encoding` | str | No | `"utf-8"` | The encoding to use for reading the file |
| `max_size` | int | No | `10485760` | Maximum size of file content to return in bytes (10 MB) |
| `hashline` | bool | No | `False` | If True, return content with `N:hhhh\|content` anchors for use with `hashline_edit` |
| `offset` | int | No | `1` | 1-indexed start line (only used when `hashline=True`) |
| `limit` | int | No | `0` | Max lines to return, 0 = all (only used when `hashline=True`) |
## Returns
Returns a dictionary with the following structure:
**Success (default mode):**
```python
{
"success": True,
"path": "config/settings.json",
"content": "{\"debug\": true}",
"size_bytes": 16,
"lines": 1
}
```
**Success (hashline mode):**
```python
{
"success": True,
"path": "app.py",
"content": "1:a3f2|def main():\n2:f1c4| print(\"hello\")",
"hashline": True,
"offset": 1,
"limit": 0,
"total_lines": 2,
"shown_lines": 2,
"size_bytes": 35
}
```
**Error:**
```python
{
"error": "File not found at config/settings.json"
}
```
## Error Handling
- Returns an error dict if the file doesn't exist
- Returns an error dict if the file cannot be read (permission issues, encoding errors, etc.)
- Handles binary files gracefully by returning appropriate error messages
## Examples
### Reading a text file
```python
result = view_file(
path="README.md",
workspace_id="ws-1",
agent_id="agent-1",
session_id="session-1"
)
# Returns: {"success": True, "path": "README.md", "content": "# My Project\n...", "size_bytes": 1024, "lines": 42}
```
### Handling missing files
```python
result = view_file(
path="nonexistent.txt",
workspace_id="ws-1",
agent_id="agent-1",
session_id="session-1"
)
# Returns: {"error": "File not found at nonexistent.txt"}
```
@@ -1,3 +0,0 @@
from .view_file import register_tools
__all__ = ["register_tools"]
@@ -1,134 +0,0 @@
import os
from mcp.server.fastmcp import FastMCP
from aden_tools.hashline import HASHLINE_MAX_FILE_BYTES, format_hashlines
from ..security import get_secure_path
def register_tools(mcp: FastMCP) -> None:
"""Register file view tools with the MCP server."""
if getattr(mcp, "_file_tools_registered", False):
return
mcp._file_tools_registered = True
@mcp.tool()
def view_file(
path: str,
workspace_id: str,
agent_id: str,
session_id: str,
encoding: str = "utf-8",
max_size: int = HASHLINE_MAX_FILE_BYTES,
hashline: bool = False,
offset: int = 1,
limit: int = 0,
) -> dict:
"""
Purpose
Read the content of a file within the session sandbox.
When to use
Inspect file contents before making changes
Retrieve stored data or configuration
Review logs or artifacts
Rules & Constraints
File must exist at the specified path
Returns full content with size and line count
Always read before patching or modifying
Args:
path: The path to the file (relative to session root)
workspace_id: The ID of workspace
agent_id: The ID of agent
session_id: The ID of the current session
encoding: The encoding to use for reading the file (default: "utf-8")
max_size: The maximum size of file content to return in bytes (default: 10MB)
hashline: If True, return content with N:hhhh|content anchors
for use with hashline_edit (default: False)
offset: 1-indexed start line, only used when hashline=True (default: 1)
limit: Max lines to return, 0 = all, only used when hashline=True (default: 0)
Returns:
Dict with file content and metadata, or error dict
"""
try:
if max_size < 0:
return {"error": f"max_size must be non-negative, got {max_size}"}
secure_path = get_secure_path(path, workspace_id, agent_id, session_id)
if not os.path.exists(secure_path):
return {"error": f"File not found at {path}"}
if not os.path.isfile(secure_path):
return {"error": f"Path is not a file: {path}"}
with open(secure_path, encoding=encoding) as f:
content_raw = f.read()
if not hashline and (offset != 1 or limit != 0):
return {
"error": "offset and limit are only supported when hashline=True. "
"Set hashline=True to use paging."
}
if hashline:
if offset < 1:
return {"error": f"offset must be >= 1, got {offset}"}
if limit < 0:
return {"error": f"limit must be >= 0, got {limit}"}
all_lines = content_raw.splitlines()
total_lines = len(all_lines)
raw_size = len(content_raw.encode(encoding))
if offset > max(total_lines, 1):
return {"error": f"offset {offset} is beyond end of file ({total_lines} lines)"}
# Check size after considering offset/limit. When paging
# (offset or limit set), only check the formatted output size.
# When reading the full file, check the raw size.
is_paging = offset > 1 or limit > 0
if not is_paging and raw_size > max_size:
return {
"error": f"File too large for hashline mode ({raw_size} bytes, "
f"max {max_size}). Use offset and limit to read a section at a time."
}
formatted = format_hashlines(all_lines, offset=offset, limit=limit)
shown_lines = len(formatted.splitlines()) if formatted else 0
if is_paging and len(formatted.encode(encoding)) > max_size:
return {
"error": f"Requested section too large ({shown_lines} lines). "
f"Reduce limit to read a smaller section."
}
return {
"success": True,
"path": path,
"content": formatted,
"hashline": True,
"offset": offset,
"limit": limit,
"total_lines": total_lines,
"shown_lines": shown_lines,
"size_bytes": raw_size,
}
content = content_raw
if len(content.encode(encoding)) > max_size:
content = content[:max_size]
content += "\n\n[... Content truncated due to size limit ...]"
return {
"success": True,
"path": path,
"content": content,
"size_bytes": len(content.encode(encoding)),
"lines": len(content.splitlines()),
}
except Exception as e:
return {"error": f"Failed to read file: {str(e)}"}
@@ -1,92 +0,0 @@
# Write to File Tool
Writes content to a file within the secure session sandbox. Supports both overwriting and appending modes.
## Description
The `write_to_file` tool allows you to create new files or modify existing files within a sandboxed session environment. It automatically creates parent directories if they don't exist and provides flexible write modes.
## Use Cases
- Creating new configuration files
- Writing generated code or data
- Appending logs or output to existing files
- Saving processed results to disk
## Usage
```python
write_to_file(
path="config/settings.json",
content='{"debug": true}',
workspace_id="workspace-123",
agent_id="agent-456",
session_id="session-789",
append=False
)
```
## Arguments
| Argument | Type | Required | Default | Description |
|----------|------|----------|---------|-------------|
| `path` | str | Yes | - | The path to the file (relative to session root) |
| `content` | str | Yes | - | The content to write to the file |
| `workspace_id` | str | Yes | - | The ID of the workspace |
| `agent_id` | str | Yes | - | The ID of the agent |
| `session_id` | str | Yes | - | The ID of the current session |
| `append` | bool | No | False | Whether to append to the file instead of overwriting |
## Returns
Returns a dictionary with the following structure:
**Success:**
```python
{
"success": True,
"path": "config/settings.json",
"mode": "written", # or "appended"
"bytes_written": 18
}
```
**Error:**
```python
{
"error": "Failed to write to file: [error message]"
}
```
## Error Handling
- Returns an error dict if the file cannot be written (permission issues, invalid path, etc.)
- Automatically creates parent directories if they don't exist
- Handles encoding errors gracefully
## Examples
### Creating a new file
```python
result = write_to_file(
path="data/output.txt",
content="Hello, world!",
workspace_id="ws-1",
agent_id="agent-1",
session_id="session-1"
)
# Returns: {"success": True, "path": "data/output.txt", "mode": "written", "bytes_written": 13}
```
### Appending to a file
```python
result = write_to_file(
path="logs/activity.log",
content="\n[INFO] Task completed",
workspace_id="ws-1",
agent_id="agent-1",
session_id="session-1",
append=True
)
# Returns: {"success": True, "path": "logs/activity.log", "mode": "appended", "bytes_written": 24}
```
@@ -1,3 +0,0 @@
from .write_to_file import register_tools
__all__ = ["register_tools"]
@@ -1,61 +0,0 @@
import os
from mcp.server.fastmcp import FastMCP
from ..security import get_secure_path
def register_tools(mcp: FastMCP) -> None:
"""Register file write tools with the MCP server."""
@mcp.tool()
def write_to_file(
path: str,
content: str,
workspace_id: str,
agent_id: str,
session_id: str,
append: bool = False,
) -> dict:
"""
Purpose
Create a new file or append content to an existing file.
When to use
Append new events to append-only logs
Create new artifacts or summaries
Initialize new canonical memory files
Rules & Constraints
Must not overwrite canonical memory unless explicitly allowed
Should include structured data (JSON, Markdown with headers)
Every write must be intentional and minimal
Anti-pattern
Do NOT dump raw conversation transcripts without structure or reason.
Args:
path: The path to the file (relative to session root)
content: The content to write to the file
workspace_id: The ID of the workspace
agent_id: The ID of the agent
session_id: The ID of the current session
append: Whether to append to the file instead of overwriting (default: False)
Returns:
Dict with success status and path, or error dict
"""
try:
secure_path = get_secure_path(path, workspace_id, agent_id, session_id)
os.makedirs(os.path.dirname(secure_path), exist_ok=True)
mode = "a" if append else "w"
with open(secure_path, mode, encoding="utf-8") as f:
f.write(content)
return {
"success": True,
"path": path,
"mode": "appended" if append else "written",
"bytes_written": len(content.encode("utf-8")),
}
except Exception as e:
return {"error": f"Failed to write to file: {str(e)}"}
+11 -433
View File
@@ -32,290 +32,42 @@ def mock_secure_path(tmp_path):
return os.path.join(tmp_path, path)
with patch(
"aden_tools.tools.file_system_toolkits.view_file.view_file.get_secure_path",
"aden_tools.tools.file_system_toolkits.list_dir.list_dir.get_secure_path",
side_effect=_get_secure_path,
):
with patch(
"aden_tools.tools.file_system_toolkits.write_to_file.write_to_file.get_secure_path",
"aden_tools.tools.file_system_toolkits.replace_file_content.replace_file_content.get_secure_path",
side_effect=_get_secure_path,
):
with patch(
"aden_tools.tools.file_system_toolkits.list_dir.list_dir.get_secure_path",
"aden_tools.tools.file_system_toolkits.apply_diff.apply_diff.get_secure_path",
side_effect=_get_secure_path,
):
with patch(
"aden_tools.tools.file_system_toolkits.replace_file_content.replace_file_content.get_secure_path",
"aden_tools.tools.file_system_toolkits.apply_patch.apply_patch.get_secure_path",
side_effect=_get_secure_path,
):
with patch(
"aden_tools.tools.file_system_toolkits.apply_diff.apply_diff.get_secure_path",
"aden_tools.tools.file_system_toolkits.grep_search.grep_search.get_secure_path",
side_effect=_get_secure_path,
):
with patch(
"aden_tools.tools.file_system_toolkits.apply_patch.apply_patch.get_secure_path",
side_effect=_get_secure_path,
"aden_tools.tools.file_system_toolkits.grep_search.grep_search.WORKSPACES_DIR",
str(tmp_path),
):
with patch(
"aden_tools.tools.file_system_toolkits.grep_search.grep_search.get_secure_path",
"aden_tools.tools.file_system_toolkits.execute_command_tool.execute_command_tool.get_secure_path",
side_effect=_get_secure_path,
):
with patch(
"aden_tools.tools.file_system_toolkits.grep_search.grep_search.WORKSPACES_DIR",
"aden_tools.tools.file_system_toolkits.execute_command_tool.execute_command_tool.WORKSPACES_DIR",
str(tmp_path),
):
with patch(
"aden_tools.tools.file_system_toolkits.execute_command_tool.execute_command_tool.get_secure_path",
"aden_tools.tools.file_system_toolkits.hashline_edit.hashline_edit.get_secure_path",
side_effect=_get_secure_path,
):
with patch(
"aden_tools.tools.file_system_toolkits.execute_command_tool.execute_command_tool.WORKSPACES_DIR",
str(tmp_path),
):
with patch(
"aden_tools.tools.file_system_toolkits.hashline_edit.hashline_edit.get_secure_path",
side_effect=_get_secure_path,
):
yield
class TestViewFileTool:
"""Tests for view_file tool."""
@pytest.fixture
def view_file_fn(self, mcp):
from aden_tools.tools.file_system_toolkits.view_file import register_tools
register_tools(mcp)
return mcp._tool_manager._tools["view_file"].fn
def test_view_existing_file(self, view_file_fn, mock_workspace, mock_secure_path, tmp_path):
"""Viewing an existing file returns content and metadata."""
test_file = tmp_path / "test.txt"
test_file.write_text("Hello, World!", encoding="utf-8")
result = view_file_fn(path="test.txt", **mock_workspace)
assert result["success"] is True
assert result["content"] == "Hello, World!"
assert result["size_bytes"] == len(b"Hello, World!")
assert result["lines"] == 1
def test_view_nonexistent_file(self, view_file_fn, mock_workspace, mock_secure_path):
"""Viewing a non-existent file returns an error."""
result = view_file_fn(path="nonexistent.txt", **mock_workspace)
assert "error" in result
assert "not found" in result["error"].lower()
def test_view_multiline_file(self, view_file_fn, mock_workspace, mock_secure_path, tmp_path):
"""Viewing a multiline file returns correct line count."""
test_file = tmp_path / "multiline.txt"
content = "Line 1\nLine 2\nLine 3\nLine 4\n"
test_file.write_text(content, encoding="utf-8")
result = view_file_fn(path="multiline.txt", **mock_workspace)
assert result["success"] is True
assert result["content"] == content
assert result["lines"] == 4
def test_view_empty_file(self, view_file_fn, mock_workspace, mock_secure_path, tmp_path):
"""Viewing an empty file returns empty content."""
test_file = tmp_path / "empty.txt"
test_file.write_text("", encoding="utf-8")
result = view_file_fn(path="empty.txt", **mock_workspace)
assert result["success"] is True
assert result["content"] == ""
assert result["size_bytes"] == 0
assert result["lines"] == 0
def test_view_file_with_unicode(self, view_file_fn, mock_workspace, mock_secure_path, tmp_path):
"""Viewing a file with unicode characters works correctly."""
test_file = tmp_path / "unicode.txt"
content = "Hello 世界! 🌍 émoji"
test_file.write_text(content, encoding="utf-8")
result = view_file_fn(path="unicode.txt", **mock_workspace)
assert result["success"] is True
assert result["content"] == content
assert result["size_bytes"] == len(content.encode("utf-8"))
def test_view_nested_file(self, view_file_fn, mock_workspace, mock_secure_path, tmp_path):
"""Viewing a file in a nested directory works correctly."""
nested = tmp_path / "nested" / "dir"
nested.mkdir(parents=True)
test_file = nested / "file.txt"
test_file.write_text("nested content", encoding="utf-8")
result = view_file_fn(path="nested/dir/file.txt", **mock_workspace)
assert result["success"] is True
assert result["content"] == "nested content"
def test_view_file_with_max_size_truncation(
self, view_file_fn, mock_workspace, mock_secure_path, tmp_path
):
"""Viewing a file with max_size truncates content when exceeding limit."""
test_file = tmp_path / "large.txt"
content = "x" * 1000
test_file.write_text(content, encoding="utf-8")
result = view_file_fn(path="large.txt", max_size=100, **mock_workspace)
assert result["success"] is True
assert len(result["content"]) <= 100 + len(
"\n\n[... Content truncated due to size limit ...]"
)
assert "[... Content truncated due to size limit ...]" in result["content"]
def test_view_file_with_negative_max_size(
self, view_file_fn, mock_workspace, mock_secure_path, tmp_path
):
"""Viewing a file with negative max_size returns error."""
test_file = tmp_path / "test.txt"
test_file.write_text("content", encoding="utf-8")
result = view_file_fn(path="test.txt", max_size=-1, **mock_workspace)
assert "error" in result
assert "max_size must be non-negative" in result["error"]
def test_view_file_with_custom_encoding(
self, view_file_fn, mock_workspace, mock_secure_path, tmp_path
):
"""Viewing a file with custom encoding works correctly."""
test_file = tmp_path / "encoded.txt"
content = "Hello 世界"
test_file.write_text(content, encoding="utf-8")
result = view_file_fn(path="encoded.txt", encoding="utf-8", **mock_workspace)
assert result["success"] is True
assert result["content"] == content
def test_view_file_with_invalid_encoding(
self, view_file_fn, mock_workspace, mock_secure_path, tmp_path
):
"""Viewing a file with invalid encoding returns error."""
test_file = tmp_path / "test.txt"
test_file.write_text("content", encoding="utf-8")
result = view_file_fn(path="test.txt", encoding="invalid-encoding", **mock_workspace)
assert "error" in result
assert "Failed to read file" in result["error"]
def test_offset_without_hashline_returns_error(
self, view_file_fn, mock_workspace, mock_secure_path, tmp_path
):
"""Passing offset without hashline=True returns error."""
test_file = tmp_path / "test.txt"
test_file.write_text("aaa\nbbb\nccc\n")
result = view_file_fn(path="test.txt", offset=5, **mock_workspace)
assert "error" in result
assert "hashline=True" in result["error"]
def test_limit_without_hashline_returns_error(
self, view_file_fn, mock_workspace, mock_secure_path, tmp_path
):
"""Passing limit without hashline=True returns error."""
test_file = tmp_path / "test.txt"
test_file.write_text("aaa\nbbb\nccc\n")
result = view_file_fn(path="test.txt", limit=10, **mock_workspace)
assert "error" in result
assert "hashline=True" in result["error"]
def test_offset_and_limit_without_hashline_returns_error(
self, view_file_fn, mock_workspace, mock_secure_path, tmp_path
):
"""Passing both offset and limit without hashline=True returns error."""
test_file = tmp_path / "test.txt"
test_file.write_text("aaa\nbbb\nccc\n")
result = view_file_fn(path="test.txt", offset=2, limit=5, **mock_workspace)
assert "error" in result
assert "hashline=True" in result["error"]
class TestWriteToFileTool:
"""Tests for write_to_file tool."""
@pytest.fixture
def write_to_file_fn(self, mcp):
from aden_tools.tools.file_system_toolkits.write_to_file import register_tools
register_tools(mcp)
return mcp._tool_manager._tools["write_to_file"].fn
def test_write_new_file(self, write_to_file_fn, mock_workspace, mock_secure_path, tmp_path):
"""Writing to a new file creates it successfully."""
result = write_to_file_fn(path="new_file.txt", content="Test content", **mock_workspace)
assert result["success"] is True
assert result["mode"] == "written"
assert result["bytes_written"] > 0
# Verify file was created
created_file = tmp_path / "new_file.txt"
assert created_file.exists()
assert created_file.read_text(encoding="utf-8") == "Test content"
def test_write_append_mode(self, write_to_file_fn, mock_workspace, mock_secure_path, tmp_path):
"""Writing with append=True appends to existing file."""
test_file = tmp_path / "append_test.txt"
test_file.write_text("Line 1\n", encoding="utf-8")
result = write_to_file_fn(
path="append_test.txt", content="Line 2\n", append=True, **mock_workspace
)
assert result["success"] is True
assert result["mode"] == "appended"
assert test_file.read_text(encoding="utf-8") == "Line 1\nLine 2\n"
def test_write_overwrite_existing(
self, write_to_file_fn, mock_workspace, mock_secure_path, tmp_path
):
"""Writing to existing file overwrites it by default."""
test_file = tmp_path / "overwrite.txt"
test_file.write_text("Original content", encoding="utf-8")
result = write_to_file_fn(path="overwrite.txt", content="New content", **mock_workspace)
assert result["success"] is True
assert result["mode"] == "written"
assert test_file.read_text(encoding="utf-8") == "New content"
def test_write_creates_parent_directories(
self, write_to_file_fn, mock_workspace, mock_secure_path, tmp_path
):
"""Writing creates parent directories if they don't exist."""
result = write_to_file_fn(path="nested/dir/file.txt", content="Test", **mock_workspace)
assert result["success"] is True
created_file = tmp_path / "nested" / "dir" / "file.txt"
assert created_file.exists()
assert created_file.read_text(encoding="utf-8") == "Test"
def test_write_empty_content(
self, write_to_file_fn, mock_workspace, mock_secure_path, tmp_path
):
"""Writing empty content creates empty file."""
result = write_to_file_fn(path="empty.txt", content="", **mock_workspace)
assert result["success"] is True
assert result["bytes_written"] == 0
created_file = tmp_path / "empty.txt"
assert created_file.exists()
assert created_file.read_text(encoding="utf-8") == ""
yield
class TestListDirTool:
@@ -805,167 +557,6 @@ class TestApplyPatchTool:
assert test_file.read_text(encoding="utf-8") == modified
class TestViewFileHashlineMode:
"""Tests for view_file hashline mode."""
@pytest.fixture
def view_file_fn(self, mcp):
from aden_tools.tools.file_system_toolkits.view_file import register_tools
register_tools(mcp)
return mcp._tool_manager._tools["view_file"].fn
def test_hashline_format(self, view_file_fn, mock_workspace, mock_secure_path, tmp_path):
"""hashline=True returns N:hhhh|content format."""
test_file = tmp_path / "test.txt"
test_file.write_text("hello\nworld\n")
result = view_file_fn(path="test.txt", hashline=True, **mock_workspace)
assert result["success"] is True
assert result["hashline"] is True
lines = result["content"].split("\n")
assert lines[0].startswith("1:")
assert "|hello" in lines[0]
assert lines[1].startswith("2:")
assert "|world" in lines[1]
def test_hashline_offset(self, view_file_fn, mock_workspace, mock_secure_path, tmp_path):
"""hashline with offset skips initial lines."""
test_file = tmp_path / "test.txt"
test_file.write_text("aaa\nbbb\nccc\n")
result = view_file_fn(path="test.txt", hashline=True, offset=2, **mock_workspace)
assert result["success"] is True
assert result["offset"] == 2
lines = result["content"].split("\n")
assert lines[0].startswith("2:")
assert "|bbb" in lines[0]
def test_hashline_limit(self, view_file_fn, mock_workspace, mock_secure_path, tmp_path):
"""hashline with limit restricts number of lines."""
test_file = tmp_path / "test.txt"
test_file.write_text("aaa\nbbb\nccc\nddd\n")
result = view_file_fn(path="test.txt", hashline=True, limit=2, **mock_workspace)
assert result["success"] is True
assert result["limit"] == 2
assert result["shown_lines"] == 2
assert result["total_lines"] == 4
def test_hashline_total_and_shown_lines(
self, view_file_fn, mock_workspace, mock_secure_path, tmp_path
):
"""total_lines and shown_lines are reported correctly."""
test_file = tmp_path / "test.txt"
test_file.write_text("a\nb\nc\nd\ne\n")
result = view_file_fn(path="test.txt", hashline=True, offset=2, limit=2, **mock_workspace)
assert result["total_lines"] == 5
assert result["shown_lines"] == 2
def test_default_mode_unchanged(self, view_file_fn, mock_workspace, mock_secure_path, tmp_path):
"""Default mode (hashline=False) returns the same format as before."""
test_file = tmp_path / "test.txt"
test_file.write_text("hello\n")
result = view_file_fn(path="test.txt", **mock_workspace)
assert result["success"] is True
assert "hashline" not in result
assert result["content"] == "hello\n"
assert result["lines"] == 1
def test_hashline_offset_zero_returns_error(
self, view_file_fn, mock_workspace, mock_secure_path, tmp_path
):
"""hashline with offset=0 returns error (must be >= 1)."""
test_file = tmp_path / "test.txt"
test_file.write_text("aaa\nbbb\n")
result = view_file_fn(path="test.txt", hashline=True, offset=0, **mock_workspace)
assert "error" in result
assert "offset" in result["error"].lower()
def test_hashline_negative_offset_returns_error(
self, view_file_fn, mock_workspace, mock_secure_path, tmp_path
):
"""hashline with negative offset returns error."""
test_file = tmp_path / "test.txt"
test_file.write_text("aaa\nbbb\n")
result = view_file_fn(path="test.txt", hashline=True, offset=-1, **mock_workspace)
assert "error" in result
assert "offset" in result["error"].lower()
def test_hashline_negative_limit_returns_error(
self, view_file_fn, mock_workspace, mock_secure_path, tmp_path
):
"""hashline with negative limit returns error."""
test_file = tmp_path / "test.txt"
test_file.write_text("aaa\nbbb\n")
result = view_file_fn(path="test.txt", hashline=True, limit=-1, **mock_workspace)
assert "error" in result
assert "limit" in result["error"].lower()
def test_hashline_truncated_file_returns_error(
self, view_file_fn, mock_workspace, mock_secure_path, tmp_path
):
"""Large file with hashline=True and no offset/limit returns error directing to paginate."""
test_file = tmp_path / "large.txt"
# Create a file larger than the max_size we'll pass
content = "line\n" * 100 # 500 bytes
test_file.write_text(content)
result = view_file_fn(path="large.txt", hashline=True, max_size=50, **mock_workspace)
assert "error" in result
assert "too large" in result["error"].lower()
assert "offset" in result["error"].lower()
assert "limit" in result["error"].lower()
def test_hashline_offset_beyond_end_returns_error(
self, view_file_fn, mock_workspace, mock_secure_path, tmp_path
):
"""hashline with offset beyond total lines returns error."""
test_file = tmp_path / "test.txt"
test_file.write_text("aaa\nbbb\n")
result = view_file_fn(path="test.txt", hashline=True, offset=50, **mock_workspace)
assert "error" in result
assert "beyond" in result["error"].lower()
assert "2 lines" in result["error"]
def test_hashline_large_file_with_offset_limit_works(
self, view_file_fn, mock_workspace, mock_secure_path, tmp_path
):
"""Large file using offset/limit bypasses full-file size check."""
test_file = tmp_path / "large.txt"
lines = [f"line {i}" for i in range(1, 101)]
test_file.write_text("\n".join(lines) + "\n")
# File is large (> max_size=200), but offset/limit lets us page through it
result = view_file_fn(
path="large.txt", hashline=True, offset=10, limit=5, max_size=200, **mock_workspace
)
assert result["success"] is True
assert result["shown_lines"] == 5
assert result["total_lines"] == 100
# First shown line should be line 10
first_line = result["content"].split("\n")[0]
assert first_line.startswith("10:")
assert "|line 10" in first_line
class TestGrepSearchHashlineMode:
"""Tests for grep_search hashline mode."""
@@ -1047,13 +638,6 @@ class TestGrepSearchHashlineMode:
class TestHashlineCrossToolConsistency:
"""Cross-tool consistency tests for hashline workflows."""
@pytest.fixture
def view_file_fn(self, mcp):
from aden_tools.tools.file_system_toolkits.view_file import register_tools
register_tools(mcp)
return mcp._tool_manager._tools["view_file"].fn
@pytest.fixture
def grep_search_fn(self, mcp):
from aden_tools.tools.file_system_toolkits.grep_search import register_tools
@@ -1070,7 +654,6 @@ class TestHashlineCrossToolConsistency:
def test_unicode_line_separator_anchor_roundtrip(
self,
view_file_fn,
grep_search_fn,
hashline_edit_fn,
mock_workspace,
@@ -1081,11 +664,6 @@ class TestHashlineCrossToolConsistency:
test_file = tmp_path / "test.txt"
test_file.write_text("A\u2028B\nC\n", encoding="utf-8")
# Hashline view sees U+2028 as a line boundary via splitlines()
view_res = view_file_fn(path="test.txt", hashline=True, **mock_workspace)
assert view_res["success"] is True
assert view_res["total_lines"] == 3
# grep_search line iteration treats U+2028 as in-line content
grep_res = grep_search_fn(path="test.txt", pattern="B", hashline=True, **mock_workspace)
assert grep_res["success"] is True