Compare commits

...

25 Commits

Author SHA1 Message Date
Richard Tang b80559df68 chore: ruff lint 2026-03-13 16:38:50 -07:00
Richard Tang def4f62a51 fix: update meta.json when loaded worker 2026-03-13 14:05:57 -07:00
bryan b0c5bcd210 chore: update tab management guidelines and add concurrent subagent patterns 2026-03-13 14:04:40 -07:00
bryan 2fe1343343 feat: inject unique browser profile per GCU subagent 2026-03-13 14:03:21 -07:00
bryan de0dcff50f feat: add tab origin/age metadata and per-subagent profile isolation 2026-03-13 14:02:15 -07:00
bryan 1fb5c6337a fix: anchor worker monitoring to queen's session ID on cold-restore 2026-03-13 12:50:50 -07:00
bryan 1e8b5b96eb Merge branch 'main' into feat/gcu-updates 2026-03-13 09:26:06 -07:00
bryan 2434c86cdf docs: clarify two-step escalation relay protocol in queen prompt 2026-03-12 16:50:17 -07:00
bryan c4a5e621aa docs: update GCU prompt with popup tracking and close_all guidance 2026-03-12 16:50:06 -07:00
bryan 0f5b83d86a feat: add browser_close_all tool for bulk tab cleanup 2026-03-12 16:49:55 -07:00
bryan b5aadcd51e feat: auto-track popup pages and improve session startup logging 2026-03-12 16:49:46 -07:00
bryan 290d2f6823 feat: add --no-startup-window to Chrome launch flags 2026-03-12 16:49:36 -07:00
bryan 9f3339650d chore: linter update 2026-03-12 14:27:17 -07:00
bryan d5e5d3e83d feat: add subagent activity tracking to queen status and instructions 2026-03-12 14:26:49 -07:00
bryan 5ea27dda09 refactor: update GCU system prompt for auto-snapshots and batching 2026-03-12 14:26:38 -07:00
bryan 6f9066ef20 feat: return auto-snapshot from browser interaction tools 2026-03-12 14:26:24 -07:00
bryan c37185732a feat: kill orphaned Chrome processes on GCU server shutdown 2026-03-12 14:26:05 -07:00
bryan 0c900fb50e refactor: clean session startup and add page lifecycle management 2026-03-12 14:25:16 -07:00
bryan 4d3ac28878 feat: launch Chrome on macOS via open -n to coexist with user's browser 2026-03-12 14:24:55 -07:00
bryan 270c1f8c50 fix: use lazy %-formatting in subagent completion log to avoid f-string in logger 2026-03-12 14:24:30 -07:00
bryan 3d0859d06a fix: stop clearing credentials_required on modal close to prevent infinite loop 2026-03-12 14:24:14 -07:00
bryan ffe47c0f71 fix: credential modal eating errors, banner stays open 2026-03-12 09:41:53 -07:00
bryan bf4652db4b fix: share event bus so tool events are visible to parent 2026-03-12 08:41:34 -07:00
bryan 2acd526b71 feat: dynamic viewport sizing and suppress Chrome warning bar 2026-03-12 08:40:49 -07:00
bryan df71834e4b refactor: switch from Playwright browser to system Chrome via CDP 2026-03-12 08:39:43 -07:00
25 changed files with 1511 additions and 205 deletions
+29 -4
View File
@@ -1035,6 +1035,19 @@ You wake up when:
If the user asks for progress, call get_worker_status() ONCE and report. \
If the summary mentions issues, follow up with get_worker_status(focus="issues").
## Subagent delegations (browser automation, GCU)
When the worker delegates to a subagent (e.g., GCU browser automation), expect it \
to take 2-5 minutes. During this time:
- Progress will show 0% this is NORMAL. The subagent only calls set_output at the end.
- Check get_worker_status(focus="full") for "subagent_activity" this shows the \
subagent's latest reasoning text and confirms it is making real progress.
- Do NOT conclude the subagent is stuck just because progress is 0% or because \
you see repeated browser_click/browser_snapshot calls that is the expected \
pattern for web scraping.
- Only intervene if: the subagent has been running for 5+ minutes with no new \
subagent_activity updates, OR the judge escalates.
## Handling worker termination ([WORKER_TERMINAL])
When you receive a `[WORKER_TERMINAL]` event, the worker has finished:
@@ -1063,19 +1076,30 @@ IMPORTANT: Only auto-handle if the user has NOT explicitly told you how to handl
escalations. If the user gave you instructions (e.g., "just retry on errors", \
"skip any auth issues"), follow those instructions instead.
CRITICAL escalation relay protocol:
When an escalation requires user input (auth blocks, human review), the worker \
or its subagent is BLOCKED and waiting for your response. You MUST follow this \
exact two-step sequence:
Step 1: call ask_user() to get the user's answer.
Step 2: call inject_worker_message() with the user's answer IMMEDIATELY after.
If you skip Step 2, the worker/subagent stays blocked FOREVER and the task hangs. \
NEVER respond to the user without also calling inject_worker_message() to unblock \
the worker. Even if the user says "skip" or "cancel", you must still relay that \
decision via inject_worker_message() so the worker can clean up.
**Auth blocks / credential issues:**
- ALWAYS ask the user (unless user explicitly told you how to handle this).
- The worker cannot proceed without valid credentials.
- Explain which credential is missing or invalid.
- Use ask_user to get guidance: "Provide credentials", "Skip this task", "Stop and edit agent"
- Use inject_worker_message() to relay user decisions back to the worker.
- Step 1: ask_user for guidance "Provide credentials", "Skip this task", "Stop and edit agent"
- Step 2: inject_worker_message() with the user's response to unblock the worker.
**Need human review / approval:**
- ALWAYS ask the user (unless user explicitly told you how to handle this).
- The worker is explicitly requesting human judgment.
- Present the context clearly (what decision is needed, what are the options).
- Use ask_user with the actual decision options.
- Use inject_worker_message() to relay user decisions back to the worker.
- Step 1: ask_user with the actual decision options.
- Step 2: inject_worker_message() with the user's decision to unblock the worker.
**Errors / unexpected failures:**
- Explain what went wrong in plain terms.
@@ -1083,6 +1107,7 @@ escalations. If the user gave you instructions (e.g., "just retry on errors", \
- Or offer: "Diagnose the issue" use stop_worker_and_plan() to investigate first.
- Or offer: "Retry as-is", "Skip this task", "Abort run"
- (Skip asking if user explicitly told you to auto-retry or auto-skip errors.)
- If the escalation had wait_for_response: inject_worker_message() with the decision.
**Informational / progress updates:**
- Acknowledge briefly and let the worker continue.
@@ -27,7 +27,9 @@
## GCU Errors
15. **Manually wiring browser tools on event_loop nodes** — Use `node_type="gcu"` which auto-includes browser tools. Do NOT manually list browser tool names.
16. **Using GCU nodes as regular graph nodes** — GCU nodes are subagents only. They must ONLY appear in `sub_agents=["gcu-node-id"]` and be invoked via `delegate_to_sub_agent()`. Never connect via edges or use as entry/terminal nodes.
17. **Reusing the same GCU node ID for parallel tasks** — Each concurrent browser task needs a distinct GCU node ID (e.g. `gcu-site-a`, `gcu-site-b`). Two `delegate_to_sub_agent` calls with the same `agent_id` share a browser profile and will interfere with each other's pages.
18. **Passing `profile=` in GCU tool calls** — Profile isolation for parallel subagents is automatic. The framework injects a unique profile per subagent via an asyncio `ContextVar`. Hardcoding `profile="default"` in a GCU system prompt breaks this isolation.
## Worker Agent Errors
17. **Adding client-facing intake node to workers** — The queen owns intake. Workers should start with an autonomous processing node. Client-facing nodes in workers are for mid-execution review/approval only.
18. **Putting `escalate` or `set_output` in NodeSpec `tools=[]`** — These are synthetic framework tools, auto-injected at runtime. Only list MCP tools from `list_agent_tools()`.
19. **Adding client-facing intake node to workers** — The queen owns intake. Workers should start with an autonomous processing node. Client-facing nodes in workers are for mid-execution review/approval only.
20. **Putting `escalate` or `set_output` in NodeSpec `tools=[]`** — These are synthetic framework tools, auto-injected at runtime. Only list MCP tools from `list_agent_tools()`.
@@ -109,6 +109,45 @@ Key rules to bake into GCU node prompts:
- Keep tool calls per turn ≤10
- Tab isolation: when browser is already running, use `browser_open(background=true)` and pass `target_id` to every call
## Multiple Concurrent GCU Subagents
When a task can be parallelized across multiple sites or profiles, declare a distinct GCU
node for each and invoke them all in the same LLM turn. The framework batches all
`delegate_to_sub_agent` calls made in one turn and runs them with `asyncio.gather`, so
they execute concurrently — not sequentially.
**Each GCU subagent automatically gets its own isolated browser context** — no `profile=`
argument is needed in tool calls. The framework derives a unique profile from the subagent's
node ID and instance counter and injects it via an asyncio `ContextVar` before the subagent
runs.
### Example: three sites in parallel
```python
# Three distinct GCU nodes
gcu_site_a = NodeSpec(id="gcu-site-a", node_type="gcu", ...)
gcu_site_b = NodeSpec(id="gcu-site-b", node_type="gcu", ...)
gcu_site_c = NodeSpec(id="gcu-site-c", node_type="gcu", ...)
orchestrator = NodeSpec(
id="orchestrator",
node_type="event_loop",
sub_agents=["gcu-site-a", "gcu-site-b", "gcu-site-c"],
system_prompt="""\
Call all three subagents in a single response to run them in parallel:
delegate_to_sub_agent(agent_id="gcu-site-a", task="Scrape prices from site A")
delegate_to_sub_agent(agent_id="gcu-site-b", task="Scrape prices from site B")
delegate_to_sub_agent(agent_id="gcu-site-c", task="Scrape prices from site C")
""",
)
```
**Rules:**
- Use distinct node IDs for each concurrent task — sharing an ID shares the browser context.
- The GCU node prompts do not need to mention `profile=`; isolation is automatic.
- Cleanup is automatic at session end, but GCU nodes can call `browser_stop()` explicitly
if they want to release resources mid-run.
## GCU Anti-Patterns
- Using `browser_screenshot` to read text (use `browser_snapshot`)
+8
View File
@@ -121,6 +121,14 @@ def get_gcu_enabled() -> bool:
return get_hive_config().get("gcu_enabled", True)
def get_gcu_viewport_scale() -> float:
"""Return GCU viewport scale factor (0.1-1.0), default 0.8."""
scale = get_hive_config().get("gcu_viewport_scale", 0.8)
if isinstance(scale, (int, float)) and 0.1 <= scale <= 1.0:
return float(scale)
return 0.8
def get_api_base() -> str | None:
"""Return the api_base URL for OpenAI-compatible endpoints, if configured."""
llm = get_hive_config().get("llm", {})
+47 -4
View File
@@ -4679,7 +4679,7 @@ class EventLoopNode(NodeProtocol):
)
subagent_node = EventLoopNode(
event_bus=None, # Subagents don't emit events to parent's bus
event_bus=self._event_bus, # Subagent events visible to Queen via shared bus
judge=SubagentJudge(task=task, max_iterations=max_iter),
config=LoopConfig(
max_iterations=max_iter, # Tighter budget
@@ -4694,25 +4694,42 @@ class EventLoopNode(NodeProtocol):
conversation_store=subagent_conv_store,
)
# Inject a unique GCU browser profile for this subagent so that
# concurrent GCU subagents (run via asyncio.gather) each get their own
# isolated BrowserContext. asyncio.gather copies the current context
# for each coroutine, so the reset token is safe to call in finally.
_profile_token = None
try:
from gcu.browser.session import set_active_profile as _set_gcu_profile
_profile_token = _set_gcu_profile(f"{agent_id}-{subagent_instance}")
except ImportError:
pass # GCU tools not installed; no-op
try:
logger.info("🚀 Starting subagent '%s' execution...", agent_id)
start_time = time.time()
result = await subagent_node.execute(subagent_ctx)
latency_ms = int((time.time() - start_time) * 1000)
separator = "-" * 60
logger.info(
"\n" + "-" * 60 + "\n"
"\n%s\n"
"✅ SUBAGENT '%s' COMPLETED\n"
"-" * 60 + "\n"
"%s\n"
"Success: %s\n"
"Latency: %dms\n"
"Tokens used: %s\n"
"Output keys: %s\n" + "-" * 60,
"Output keys: %s\n"
"%s",
separator,
agent_id,
separator,
result.success,
latency_ms,
result.tokens_used,
list(result.output.keys()) if result.output else [],
separator,
)
result_json = {
@@ -4758,3 +4775,29 @@ class EventLoopNode(NodeProtocol):
content=json.dumps(result_json, indent=2),
is_error=True,
)
finally:
# Restore the GCU profile context that was set before this subagent ran.
if _profile_token is not None:
from gcu.browser.session import _active_profile as _gcu_profile_var
_gcu_profile_var.reset(_profile_token)
# Stop the browser session for this subagent's profile so tabs are
# closed immediately rather than accumulating until server shutdown.
if self._tool_executor is not None:
_subagent_profile = f"{agent_id}-{subagent_instance}"
try:
_stop_use = ToolUse(
id="gcu-cleanup",
name="browser_stop",
input={"profile": _subagent_profile},
)
_stop_result = self._tool_executor(_stop_use)
if asyncio.iscoroutine(_stop_result) or asyncio.isfuture(_stop_result):
await _stop_result
except Exception as _gcu_exc:
logger.warning(
"GCU browser_stop failed for profile %r: %s",
_subagent_profile,
_gcu_exc,
)
+51 -11
View File
@@ -37,24 +37,42 @@ Follow these rules for reliable, efficient browser interaction.
## Reading Pages
- ALWAYS prefer `browser_snapshot` over `browser_get_text("body")`
it returns a compact ~1-5 KB accessibility tree vs 100+ KB of raw HTML.
- Use `browser_snapshot_aria` when you need full ARIA properties
for detailed element inspection.
- Interaction tools (`browser_click`, `browser_type`, `browser_fill`,
`browser_scroll`, etc.) return a page snapshot automatically in their
result. Use it to decide your next action do NOT call
`browser_snapshot` separately after every action.
Only call `browser_snapshot` when you need a fresh view without
performing an action, or after setting `auto_snapshot=false`.
- Do NOT use `browser_screenshot` for reading text content
it produces huge base64 images with no searchable text.
- Only fall back to `browser_get_text` for extracting specific
small elements by CSS selector.
## Navigation & Waiting
- Always call `browser_wait` after navigation actions
(`browser_open`, `browser_navigate`, `browser_click` on links)
to let the page load.
- `browser_navigate` and `browser_open` already wait for the page to
load (`domcontentloaded`). Do NOT call `browser_wait` with no
arguments after navigation it wastes time.
Only use `browser_wait` when you need a *specific element* or *text*
to appear (pass `selector` or `text`).
- NEVER re-navigate to the same URL after scrolling
this resets your scroll position and loses loaded content.
## Scrolling
- Use large scroll amounts ~2000 when loading more content
sites like twitter and linkedin have lazy loading for paging.
- After scrolling, take a new `browser_snapshot` to see updated content.
- The scroll result includes a snapshot automatically no need to call
`browser_snapshot` separately.
## Batching Actions
- You can call multiple tools in a single turn they execute in parallel.
ALWAYS batch independent actions together. Examples:
- Fill multiple form fields in one turn.
- Navigate + snapshot in one turn.
- Click + scroll if targeting different elements.
- When batching, set `auto_snapshot=false` on all but the last action
to avoid redundant snapshots.
- Aim for 3-5 tool calls per turn minimum. One tool call per turn is
wasteful.
## Error Recovery
- If a tool fails, retry once with the same approach.
@@ -65,11 +83,33 @@ Follow these rules for reliable, efficient browser interaction.
then `browser_start`, then retry.
## Tab Management
- Use `browser_tabs` to list open tabs when managing multiple pages.
- Pass `target_id` to tools when operating on a specific tab.
- Open background tabs with `browser_open(url=..., background=true)`
to avoid losing your current context.
- Close tabs you no longer need with `browser_close` to free resources.
**Close tabs as soon as you are done with them** not only at the end of the task.
After reading or extracting data from a tab, close it immediately.
**Decision rules:**
- Finished reading/extracting from a tab? `browser_close(target_id=...)`
- Completed a multi-tab workflow? `browser_close_finished()` to clean up all your tabs
- More than 3 tabs open? stop and close finished ones before opening more
- Popup appeared that you didn't need? → close it immediately
**Origin awareness:** `browser_tabs` returns an `origin` field for each tab:
- `"agent"` you opened it; you own it; close it when done
- `"popup"` opened by a link or script; close after extracting what you need
- `"startup"` or `"user"` leave these alone unless the task requires it
**Cleanup tools:**
- `browser_close(target_id=...)` close one specific tab
- `browser_close_finished()` close all your agent/popup tabs (safe: leaves startup/user tabs)
- `browser_close_all()` close everything except the active tab (use only for full reset)
**Multi-tab workflow pattern:**
1. Open background tabs with `browser_open(url=..., background=true)` to stay on current tab
2. Process each tab and close it with `browser_close` when done
3. When the full workflow completes, call `browser_close_finished()` to confirm cleanup
4. Check `browser_tabs` at any point it shows `origin` and `age_seconds` per tab
Never accumulate tabs. Treat every tab you open as a resource you must free.
## Login & Auth Walls
- If you see a "Log in" or "Sign up" prompt instead of expected
@@ -132,6 +132,7 @@ async def create_queen(
session.worker_path,
stream_id="queen",
worker_graph_id=session.worker_runtime._graph_id,
default_session_id=session.id,
)
queen_tools = list(queen_registry.get_tools().values())
+20
View File
@@ -427,6 +427,26 @@ class SessionManager:
if agent_path.name != "queen" and session.worker_runtime:
await self._notify_queen_worker_loaded(session)
# Update meta.json so cold-restore can discover this session by agent_path
storage_session_id = session.queen_resume_from or session.id
meta_path = Path.home() / ".hive" / "queen" / "session" / storage_session_id / "meta.json"
try:
_agent_name = (
session.worker_info.name
if session.worker_info
else str(agent_path.name).replace("_", " ").title()
)
existing_meta = {}
if meta_path.exists():
existing_meta = json.loads(meta_path.read_text(encoding="utf-8"))
existing_meta["agent_name"] = _agent_name
existing_meta["agent_path"] = (
str(session.worker_path) if session.worker_path else str(agent_path)
)
meta_path.write_text(json.dumps(existing_meta), encoding="utf-8")
except OSError:
pass
# Restore previously active triggers from persisted session state
if session.available_triggers and session.worker_runtime:
try:
+34 -4
View File
@@ -2853,6 +2853,16 @@ def register_queen_lifecycle_tools(
else:
parts.append("No issues detected")
# Latest subagent progress (if any delegation is in flight)
bus = _get_event_bus()
if bus:
sa_reports = bus.get_history(event_type=EventType.SUBAGENT_REPORT, limit=1)
if sa_reports:
latest = sa_reports[0]
sa_msg = str(latest.data.get("message", ""))[:200]
ago = _format_time_ago(latest.timestamp)
parts.append(f"Latest subagent update ({ago}): {sa_msg}")
return ". ".join(parts) + "."
def _format_activity(bus: EventBus, preamble: dict[str, Any], last_n: int) -> str:
@@ -2980,6 +2990,10 @@ def register_queen_lifecycle_tools(
duration = evt.data.get("duration_s")
dur_str = f", {duration:.1f}s" if duration else ""
lines.append(f" {name} ({node}) — {status}{dur_str}")
result_text = evt.data.get("result", "")
if result_text:
preview = str(result_text)[:300].replace("\n", " ")
lines.append(f" Result: {preview}")
else:
lines.append("No recent tool calls.")
@@ -3146,15 +3160,19 @@ def register_queen_lifecycle_tools(
for evt in running
]
if tool_completed:
result["recent_tool_calls"] = [
{
recent_calls = []
for evt in tool_completed[:last_n]:
entry: dict[str, Any] = {
"tool": evt.data.get("tool_name"),
"error": bool(evt.data.get("is_error")),
"node": evt.node_id,
"time": evt.timestamp.isoformat(),
}
for evt in tool_completed[:last_n]
]
result_text = evt.data.get("result", "")
if result_text:
entry["result_preview"] = str(result_text)[:300]
recent_calls.append(entry)
result["recent_tool_calls"] = recent_calls
# Node transitions
edges = bus.get_history(event_type=EventType.EDGE_TRAVERSED, limit=last_n)
@@ -3207,6 +3225,18 @@ def register_queen_lifecycle_tools(
if issues:
result["issues"] = issues
# Subagent activity (in-flight progress from delegated subagents)
sa_reports = bus.get_history(event_type=EventType.SUBAGENT_REPORT, limit=last_n)
if sa_reports:
result["subagent_activity"] = [
{
"subagent": evt.data.get("subagent_id"),
"message": str(evt.data.get("message", ""))[:300],
"time": evt.timestamp.isoformat(),
}
for evt in sa_reports[:last_n]
]
# Constraint violations
violations = bus.get_history(event_type=EventType.CONSTRAINT_VIOLATION, limit=5)
if violations:
+28 -15
View File
@@ -44,6 +44,7 @@ def register_worker_monitoring_tools(
storage_path: Path,
stream_id: str = "monitoring",
worker_graph_id: str | None = None,
default_session_id: str | None = None,
) -> int:
"""Register worker monitoring tools bound to *event_bus* and *storage_path*.
@@ -55,6 +56,12 @@ def register_worker_monitoring_tools(
stream_id: Stream ID used when emitting events.
worker_graph_id: The primary worker graph's ID. Included in health summary
so the judge can populate ticket identity fields accurately.
default_session_id: When set, ``get_worker_health_summary`` uses this
session ID as the default instead of auto-discovering
the most-recent-by-mtime session. Callers should pass
the queen's own session ID so that after a cold-restore
the monitoring tool reads the correct worker session
rather than a stale orphaned one.
Returns:
Number of tools registered.
@@ -97,23 +104,29 @@ def register_worker_monitoring_tools(
if not sessions_dir.exists():
return json.dumps({"error": "No sessions found — worker has not started yet"})
candidates = [
d for d in sessions_dir.iterdir() if d.is_dir() and (d / "state.json").exists()
]
if not candidates:
return json.dumps({"error": "No sessions found — worker has not started yet"})
# Prefer the queen's own session ID (set at registration time) over
# mtime-based discovery, which can pick a stale orphaned session after
# a cold-restore when a newer-but-empty session directory exists.
if default_session_id and (sessions_dir / default_session_id).is_dir():
session_id = default_session_id
else:
candidates = [
d for d in sessions_dir.iterdir() if d.is_dir() and (d / "state.json").exists()
]
if not candidates:
return json.dumps({"error": "No sessions found — worker has not started yet"})
def _sort_key(d: Path):
try:
state = json.loads((d / "state.json").read_text(encoding="utf-8"))
# in_progress/running sorts before completed/failed
priority = 0 if state.get("status", "") in ("in_progress", "running") else 1
return (priority, -d.stat().st_mtime)
except Exception:
return (2, 0)
def _sort_key(d: Path):
try:
state = json.loads((d / "state.json").read_text(encoding="utf-8"))
# in_progress/running sorts before completed/failed
priority = 0 if state.get("status", "") in ("in_progress", "running") else 1
return (priority, -d.stat().st_mtime)
except Exception:
return (2, 0)
candidates.sort(key=_sort_key)
session_id = candidates[0].name
candidates.sort(key=_sort_key)
session_id = candidates[0].name
# Resolve log paths
session_dir = storage_path / "sessions" / session_id
@@ -126,8 +126,13 @@ export default function CredentialsModal({
// No real path — no credentials to show
setRows([]);
}
} catch {
// Backend unavailable — fall back to legacy props or empty
} catch (err) {
// Surface the error so the modal shows a meaningful message
const message =
err instanceof Error ? err.message : "Failed to check credentials";
setError(message);
// Fall back to legacy props or empty rows
if (legacyCredentials) {
setRows(legacyCredentials.map(c => ({
...c,
@@ -289,11 +294,18 @@ export default function CredentialsModal({
{/* Status banner */}
{!loading && (
<div className={`mx-5 mt-4 px-3 py-2.5 rounded-lg border text-xs font-medium flex items-center gap-2 ${
allRequiredMet
? "bg-emerald-500/10 border-emerald-500/20 text-emerald-600"
: "bg-destructive/5 border-destructive/20 text-destructive"
error && rows.length === 0
? "bg-destructive/5 border-destructive/20 text-destructive"
: allRequiredMet
? "bg-emerald-500/10 border-emerald-500/20 text-emerald-600"
: "bg-destructive/5 border-destructive/20 text-destructive"
}`}>
{allRequiredMet ? (
{error && rows.length === 0 ? (
<>
<AlertCircle className="w-3.5 h-3.5 flex-shrink-0" />
<span className="break-words">Failed to check credentials: {error}</span>
</>
) : allRequiredMet ? (
<>
<Shield className="w-3.5 h-3.5" />
{rows.length === 0
+9 -1
View File
@@ -3089,7 +3089,15 @@ export default function Workspace() {
agentLabel={activeWorkerLabel}
agentPath={credentialAgentPath || activeAgentState?.agentPath || (!activeWorker.startsWith("new-agent") ? activeWorker : undefined)}
open={credentialsOpen}
onClose={() => { setCredentialsOpen(false); setCredentialAgentPath(null); setDismissedBanner(null); }}
onClose={() => {
setCredentialsOpen(false);
setCredentialAgentPath(null);
// Keep credentials_required error set — clearing it here triggers
// the auto-load effect which retries session creation immediately,
// causing an infinite modal loop when credentials are still missing.
// The error is only cleared in onCredentialChange (below) when the
// user actually saves valid credentials.
}}
credentials={activeSession?.credentials || []}
onCredentialChange={() => {
// Clear credential error so the auto-load effect retries session creation
+57
View File
@@ -601,6 +601,63 @@ class TestReportToParentExecution:
# Metadata should include report_count
assert result_data["metadata"]["report_count"] == 1
@pytest.mark.asyncio
async def test_subagent_tool_events_visible_on_shared_bus(
self, runtime, parent_node_spec, subagent_node_spec
):
"""Subagent internal tool calls should emit TOOL_CALL events on the shared bus."""
bus = EventBus()
tool_events = []
async def handler(event):
tool_events.append(event)
bus.subscribe(
event_types=[EventType.TOOL_CALL_STARTED, EventType.TOOL_CALL_COMPLETED],
handler=handler,
)
subagent_llm = MockStreamingLLM(
[
set_output_scenario("findings", "Results"),
text_finish_scenario(),
]
)
node = EventLoopNode(
event_bus=bus,
config=LoopConfig(max_iterations=10),
)
memory = SharedMemory()
scoped = memory.with_permissions(read_keys=[], write_keys=["result"])
ctx = NodeContext(
runtime=runtime,
node_id="parent",
node_spec=parent_node_spec,
memory=scoped,
input_data={},
llm=subagent_llm,
available_tools=[],
goal_context="",
goal=None,
node_registry={"researcher": subagent_node_spec},
)
result = await node._execute_subagent(ctx, "researcher", "Do research")
assert result.is_error is False
# Subagent tool calls should appear on the shared bus
started = [e for e in tool_events if e.type == EventType.TOOL_CALL_STARTED]
completed = [e for e in tool_events if e.type == EventType.TOOL_CALL_COMPLETED]
assert len(started) >= 1, "Expected at least one TOOL_CALL_STARTED from subagent"
assert len(completed) >= 1, "Expected at least one TOOL_CALL_COMPLETED from subagent"
# Events should have the namespaced subagent node_id
for evt in started + completed:
assert "subagent" in evt.node_id, f"Expected namespaced node_id, got: {evt.node_id}"
@pytest.mark.asyncio
async def test_event_bus_receives_subagent_report(
self, runtime, parent_node_spec, subagent_node_spec
+8 -16
View File
@@ -6,7 +6,7 @@
.DESCRIPTION
An interactive setup wizard that:
1. Installs Python dependencies via uv
2. Installs Playwright browser for web scraping
2. Checks for Chrome/Edge browser for web automation
3. Helps configure LLM API keys
4. Verifies everything works
@@ -518,22 +518,14 @@ try {
exit 1
}
# Install Playwright browser
Write-Host " Installing Playwright browser... " -NoNewline
$null = & uv run python -c "import playwright" 2>&1
$importExitCode = $LASTEXITCODE
if ($importExitCode -eq 0) {
$null = & uv run python -m playwright install chromium 2>&1
$playwrightExitCode = $LASTEXITCODE
if ($playwrightExitCode -eq 0) {
Write-Ok "ok"
} else {
Write-Warn "skipped (install manually: uv run python -m playwright install chromium)"
}
# Check for Chrome/Edge (required for GCU browser tools)
Write-Host " Checking for Chrome/Edge browser... " -NoNewline
$null = & uv run python -c "from gcu.browser.chrome_finder import find_chrome; assert find_chrome()" 2>&1
$chromeCheckExit = $LASTEXITCODE
if ($chromeCheckExit -eq 0) {
Write-Ok "ok"
} else {
Write-Warn "skipped"
Write-Warn "not found - install Chrome or Edge for browser tools"
}
} finally {
Pop-Location
+6 -10
View File
@@ -4,7 +4,7 @@
#
# An interactive setup wizard that:
# 1. Installs Python dependencies
# 2. Installs Playwright browser for web scraping
# 2. Checks for Chrome/Edge browser for web automation
# 3. Helps configure LLM API keys
# 4. Verifies everything works
#
@@ -253,16 +253,12 @@ else
exit 1
fi
# Install Playwright browser
echo -n " Installing Playwright browser... "
if uv run python -c "import playwright" > /dev/null 2>&1; then
if uv run python -m playwright install chromium > /dev/null 2>&1; then
echo -e "${GREEN}ok${NC}"
else
echo -e "${YELLOW}${NC}"
fi
# Check for Chrome/Edge (required for GCU browser tools)
echo -n " Checking for Chrome/Edge browser... "
if uv run python -c "from gcu.browser.chrome_finder import find_chrome; assert find_chrome()" > /dev/null 2>&1; then
echo -e "${GREEN}ok${NC}"
else
echo -e "${YELLOW}${NC}"
echo -e "${YELLOW}not found — install Chrome or Edge for browser tools${NC}"
fi
cd "$SCRIPT_DIR"
+8 -2
View File
@@ -14,8 +14,14 @@ COPY mcp_server.py ./
# Install package with all dependencies
RUN pip install --no-cache-dir -e .
# Install Playwright Chromium browser and system dependencies
RUN playwright install chromium --with-deps
# Install Google Chrome (stable) — used by GCU browser tools via CDP
RUN apt-get update && apt-get install -y wget gnupg \
&& mkdir -p /etc/apt/keyrings \
&& wget -q -O /etc/apt/keyrings/google-chrome.asc https://dl.google.com/linux/linux_signing_key.pub \
&& echo "deb [arch=amd64 signed-by=/etc/apt/keyrings/google-chrome.asc] http://dl.google.com/linux/chrome/deb/ stable main" \
> /etc/apt/sources.list.d/google-chrome.list \
&& apt-get update && apt-get install -y google-chrome-stable \
&& apt-get clean && rm -rf /var/lib/apt/lists/*
# Create non-root user for security
RUN useradd -m -u 1001 appuser
+2
View File
@@ -30,6 +30,7 @@ from .session import (
get_all_sessions,
get_session,
get_shared_browser,
shutdown_all_browsers,
)
from .tools import (
register_advanced_tools,
@@ -73,6 +74,7 @@ __all__ = [
# Shared browser for agent contexts
"get_shared_browser",
"close_shared_browser",
"shutdown_all_browsers",
# Constants
"DEFAULT_TIMEOUT_MS",
"DEFAULT_NAVIGATION_TIMEOUT_MS",
+106
View File
@@ -0,0 +1,106 @@
"""
Detect system-installed Chrome or Edge browsers.
Searches platform-specific well-known paths to find a Chromium-based browser
executable. Used by chrome_launcher to avoid bundling Playwright's Chromium.
"""
from __future__ import annotations
import os
import shutil
import sys
from pathlib import Path
# Search order per platform: Chrome stable first, then Edge, then Chromium.
_MACOS_CANDIDATES = [
"/Applications/Google Chrome.app/Contents/MacOS/Google Chrome",
"/Applications/Google Chrome Canary.app/Contents/MacOS/Google Chrome Canary",
"/Applications/Microsoft Edge.app/Contents/MacOS/Microsoft Edge",
"/Applications/Chromium.app/Contents/MacOS/Chromium",
]
_LINUX_WHICH_NAMES = [
"google-chrome",
"google-chrome-stable",
"chromium-browser",
"chromium",
"microsoft-edge",
"microsoft-edge-stable",
]
_WINDOWS_CANDIDATES = [
r"Google\Chrome\Application\chrome.exe",
r"Microsoft\Edge\Application\msedge.exe",
]
def find_chrome() -> str | None:
"""Return the absolute path to a system Chrome/Edge executable, or None.
Check order:
1. ``CHROME_PATH`` environment variable (explicit override)
2. Platform-specific well-known install locations
"""
# 1. Explicit override
env_path = os.environ.get("CHROME_PATH")
if env_path and _is_executable(env_path):
return env_path
# 2. Platform search
if sys.platform == "darwin":
return _find_macos()
elif sys.platform == "win32":
return _find_windows()
else:
return _find_linux()
def require_chrome() -> str:
"""Return a Chrome/Edge path or raise with an actionable error message."""
path = find_chrome()
if path is None:
raise RuntimeError(
"No Chrome or Edge browser found. GCU browser tools require a "
"Chromium-based browser.\n\n"
"Options:\n"
" 1. Install Google Chrome: https://www.google.com/chrome/\n"
" 2. Set the CHROME_PATH environment variable to your browser executable\n"
)
return path
def _is_executable(path: str) -> bool:
"""Check that path exists and is executable."""
p = Path(path)
return p.exists() and os.access(p, os.X_OK)
def _find_macos() -> str | None:
for candidate in _MACOS_CANDIDATES:
if _is_executable(candidate):
return candidate
return None
def _find_linux() -> str | None:
for name in _LINUX_WHICH_NAMES:
result = shutil.which(name)
if result:
return result
return None
def _find_windows() -> str | None:
program_dirs = []
for env_var in ("PROGRAMFILES", "PROGRAMFILES(X86)", "LOCALAPPDATA"):
val = os.environ.get(env_var)
if val:
program_dirs.append(val)
for base_dir in program_dirs:
for candidate in _WINDOWS_CANDIDATES:
full_path = os.path.join(base_dir, candidate)
if os.path.isfile(full_path):
return full_path
return None
+397
View File
@@ -0,0 +1,397 @@
"""
Launch and manage a system Chrome/Edge process for CDP connections.
Starts the browser as a subprocess with ``--remote-debugging-port`` and waits
until the CDP endpoint is ready. Used by ``session.py`` to replace
Playwright's ``chromium.launch()`` with a system-installed browser.
On macOS, uses ``open -n -a`` to force a new Chrome instance even when the
user's personal Chrome is already running.
"""
from __future__ import annotations
import asyncio
import logging
import os
import signal
import subprocess
import sys
import tempfile
import time
from dataclasses import dataclass, field
from pathlib import Path
from .chrome_finder import require_chrome
logger = logging.getLogger(__name__)
# Chrome flags for all browser launches
_CHROME_ARGS = [
"--disable-dev-shm-usage",
"--no-first-run",
"--no-default-browser-check",
"--disable-session-crashed-bubble",
"--noerrdialogs",
"--no-startup-window",
]
# Sandbox flags are only needed on Linux (Docker, CI). On macOS they
# trigger a yellow warning bar and serve no purpose.
if sys.platform == "linux":
_CHROME_ARGS = ["--no-sandbox", "--disable-setuid-sandbox", *_CHROME_ARGS]
# CDP readiness polling
_CDP_POLL_INTERVAL_S = 0.1
_CDP_MAX_WAIT_S = 10.0
def _clear_session_restore(user_data_dir: Path) -> None:
"""Remove Chrome session restore files to prevent tab/window restoration.
Cookies and localStorage are stored separately and are unaffected.
"""
default_dir = user_data_dir / "Default"
for name in ("Current Session", "Current Tabs", "Last Session", "Last Tabs"):
target = default_dir / name
if target.exists():
try:
target.unlink()
logger.debug("Removed session restore file: %s", target)
except OSError:
pass
def _resolve_app_bundle(executable_path: str) -> str | None:
"""Extract .app bundle path from a macOS executable path.
e.g. '/Applications/Google Chrome.app/Contents/MacOS/Google Chrome'
-> '/Applications/Google Chrome.app'
"""
parts = Path(executable_path).parts
for i, part in enumerate(parts):
if part.endswith(".app"):
return str(Path(*parts[: i + 1]))
return None
def _find_pid_on_port(port: int) -> int | None:
"""Find the PID listening on a TCP port via lsof."""
try:
output = subprocess.check_output(
["lsof", "-ti", f"tcp:{port}", "-sTCP:LISTEN"],
text=True,
timeout=5,
).strip()
pids = [int(p) for p in output.split("\n") if p.strip()]
return pids[0] if pids else None
except Exception:
return None
def _kill_chrome_by_data_dir(user_data_dir: Path) -> None:
"""Find and kill a Chrome process by its --user-data-dir argument.
Fallback for when Chrome started but never bound the CDP port,
so _find_pid_on_port cannot locate it.
"""
try:
# pgrep -f matches against the full command line
output = subprocess.check_output(
["pgrep", "-f", f"--user-data-dir={user_data_dir}"],
text=True,
timeout=5,
).strip()
for pid_str in output.split("\n"):
pid_str = pid_str.strip()
if pid_str:
try:
pid = int(pid_str)
os.kill(pid, signal.SIGKILL)
logger.info(f"Killed orphaned Chrome pid={pid} (matched user-data-dir)")
except (ValueError, OSError):
pass
except (subprocess.CalledProcessError, subprocess.TimeoutExpired):
pass # No matching process found
@dataclass
class ChromeProcess:
"""Handle to a running Chrome process launched for CDP access."""
process: subprocess.Popen[bytes] | None # None when launched via open -n (macOS)
cdp_port: int
cdp_url: str
user_data_dir: Path
_temp_dir: tempfile.TemporaryDirectory[str] | None = field(default=None, repr=False)
_pid: int | None = field(default=None, repr=False)
def is_alive(self) -> bool:
if self.process is not None:
return self.process.poll() is None
if self._pid is not None:
try:
os.kill(self._pid, 0)
return True
except OSError:
return False
return False
async def kill(self) -> None:
"""Terminate the Chrome process and clean up resources."""
if self.process is not None and self.process.poll() is None:
self.process.terminate()
try:
await asyncio.wait_for(
asyncio.get_event_loop().run_in_executor(None, self.process.wait),
timeout=5.0,
)
except TimeoutError:
self.process.kill()
self.process.wait()
logger.info(f"Chrome process (port {self.cdp_port}) terminated")
elif self._pid is not None:
try:
os.kill(self._pid, signal.SIGTERM)
# Wait briefly for graceful shutdown
loop = asyncio.get_event_loop()
for _ in range(50): # 5 seconds max
alive = await loop.run_in_executor(None, self.is_alive)
if not alive:
break
await asyncio.sleep(0.1)
else:
os.kill(self._pid, signal.SIGKILL)
logger.info(f"Chrome process pid={self._pid} (port {self.cdp_port}) terminated")
except OSError:
pass
self._pid = None
# Clean up temp directory for ephemeral sessions
if self._temp_dir is not None:
try:
self._temp_dir.cleanup()
except Exception:
pass
self._temp_dir = None
async def launch_chrome(
cdp_port: int,
user_data_dir: Path | None = None,
headless: bool = True,
extra_args: list[str] | None = None,
) -> ChromeProcess:
"""Launch system Chrome and wait for CDP to become ready.
Args:
cdp_port: Port for ``--remote-debugging-port``.
user_data_dir: Profile directory. If *None*, a temporary directory is
created and cleaned up when the process is killed (ephemeral mode).
headless: Use Chrome's headless mode (``--headless=new``).
extra_args: Additional Chrome CLI flags.
Returns:
A :class:`ChromeProcess` handle.
Raises:
RuntimeError: If Chrome is not found, fails to start, or CDP does not
become ready within the timeout.
"""
chrome_path = require_chrome()
temp_dir: tempfile.TemporaryDirectory[str] | None = None
if user_data_dir is None:
temp_dir = tempfile.TemporaryDirectory(prefix="hive-browser-")
user_data_dir = Path(temp_dir.name)
_clear_session_restore(user_data_dir)
from .session import _get_viewport
vp = _get_viewport()
chrome_flags = [
f"--remote-debugging-port={cdp_port}",
f"--user-data-dir={user_data_dir}",
f"--window-size={vp['width']},{vp['height']}",
"--lang=en-US",
*_CHROME_ARGS,
*(extra_args or []),
]
if headless:
chrome_flags.append("--headless=new")
# Don't pass a URL arg — let Chrome open its default page.
# session.py will close all initial pages and create a clean one.
# Passing "about:blank" caused macOS to show a visible blank tab
# that the CDP connection couldn't control, blocking the session.
cdp_url = f"http://127.0.0.1:{cdp_port}"
# On macOS, use `open -n -a` to force a new Chrome instance even when the
# user's personal Chrome is already running. Chrome's Mach-based IPC would
# otherwise delegate to the existing instance and exit with code 0.
if sys.platform == "darwin":
app_bundle = _resolve_app_bundle(chrome_path)
if app_bundle:
return await _launch_chrome_macos(
app_bundle, chrome_flags, cdp_port, cdp_url, user_data_dir, temp_dir
)
# Linux, Windows, or macOS fallback (no .app bundle found)
return await _launch_chrome_subprocess(
chrome_path, chrome_flags, cdp_port, cdp_url, user_data_dir, temp_dir
)
async def _launch_chrome_macos(
app_bundle: str,
chrome_flags: list[str],
cdp_port: int,
cdp_url: str,
user_data_dir: Path,
temp_dir: tempfile.TemporaryDirectory[str] | None,
) -> ChromeProcess:
"""Launch Chrome on macOS using ``open -n -a`` to bypass single-instance IPC."""
logger.info(
f"Launching Chrome (macOS open -n): app={app_bundle}, port={cdp_port}, "
f"user_data_dir={user_data_dir}"
)
# `open -n` forces a new instance; --args passes flags to Chrome
subprocess.Popen(
["open", "-n", "-a", app_bundle, "--args", *chrome_flags],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
# `open` returns immediately — Chrome is now a child of launchd, not us.
try:
await _wait_for_cdp(cdp_port)
except Exception:
# Chrome may have started but not yet bound the CDP port.
# Poll briefly to find and kill the orphaned process so it
# doesn't hold the profile lock and block future launches.
killed = False
for _ in range(30): # up to 3 seconds
pid = _find_pid_on_port(cdp_port)
if pid:
try:
os.kill(pid, signal.SIGKILL)
killed = True
logger.info(f"Killed orphaned Chrome pid={pid} on port {cdp_port}")
except OSError:
pass
break
time.sleep(0.1)
if not killed:
# Last resort: find Chrome by user-data-dir in process list
_kill_chrome_by_data_dir(user_data_dir)
if temp_dir is not None:
temp_dir.cleanup()
raise
# Discover the Chrome PID listening on the CDP port
pid = _find_pid_on_port(cdp_port)
if pid is None:
logger.warning(f"CDP ready on port {cdp_port} but could not discover Chrome PID")
return ChromeProcess(
process=None,
cdp_port=cdp_port,
cdp_url=cdp_url,
user_data_dir=user_data_dir,
_temp_dir=temp_dir,
_pid=pid,
)
async def _launch_chrome_subprocess(
chrome_path: str,
chrome_flags: list[str],
cdp_port: int,
cdp_url: str,
user_data_dir: Path,
temp_dir: tempfile.TemporaryDirectory[str] | None,
) -> ChromeProcess:
"""Launch Chrome as a direct subprocess (Linux, Windows, macOS fallback)."""
args = [chrome_path, *chrome_flags]
logger.info(f"Launching Chrome: port={cdp_port}, user_data_dir={user_data_dir}")
process = subprocess.Popen(
args,
stdout=subprocess.DEVNULL,
stderr=subprocess.PIPE,
)
try:
await _wait_for_cdp(cdp_port, process=process)
except Exception:
process.kill()
process.wait()
if temp_dir is not None:
temp_dir.cleanup()
raise
return ChromeProcess(
process=process,
cdp_port=cdp_port,
cdp_url=cdp_url,
user_data_dir=user_data_dir,
_temp_dir=temp_dir,
)
async def _wait_for_cdp(
port: int,
process: subprocess.Popen[bytes] | None = None,
timeout: float = _CDP_MAX_WAIT_S,
) -> None:
"""Poll ``/json/version`` until Chrome's CDP endpoint is ready.
When *process* is provided, also checks that the subprocess hasn't exited.
When *process* is None (macOS ``open -n`` path), only polls the endpoint.
"""
import urllib.error
import urllib.request
url = f"http://127.0.0.1:{port}/json/version"
deadline = time.monotonic() + timeout
def _probe() -> bool:
try:
req = urllib.request.Request(url, method="GET")
with urllib.request.urlopen(req, timeout=1) as resp:
return resp.status == 200
except (urllib.error.URLError, OSError, ConnectionError):
return False
while time.monotonic() < deadline:
# Check the subprocess hasn't crashed (only when we have a handle)
if process is not None and process.poll() is not None:
stderr = ""
if process.stderr:
stderr = process.stderr.read().decode(errors="replace")
raise RuntimeError(
f"Chrome exited with code {process.returncode} before CDP "
f"was ready.\nstderr: {stderr[:500]}"
)
try:
loop = asyncio.get_running_loop()
ready = await asyncio.wait_for(
loop.run_in_executor(None, _probe),
timeout=2.0,
)
if ready:
elapsed = timeout - (deadline - time.monotonic())
logger.info(f"CDP ready on port {port} after {elapsed:.1f}s")
return
except TimeoutError:
pass
await asyncio.sleep(_CDP_POLL_INTERVAL_S)
raise RuntimeError(f"Chrome CDP endpoint did not become ready within {timeout}s on port {port}")
+9 -4
View File
@@ -7,7 +7,10 @@ Purely cosmetic — pointer-events: none, self-removing, fire-and-forget.
Configure via environment variables:
HIVE_BROWSER_HIGHLIGHTS=0 Disable entirely
HIVE_HIGHLIGHT_COLOR Override color (default: #FAC43B)
HIVE_HIGHLIGHT_DURATION_MS Override visible duration (default: 600)
HIVE_HIGHLIGHT_DURATION_MS Override visible duration (default: 1500)
HIVE_HIGHLIGHT_WAIT_S Seconds to block after injecting highlight
(default: 0 fire-and-forget; set 0.35 for
the old blocking behavior)
"""
from __future__ import annotations
@@ -23,7 +26,7 @@ logger = logging.getLogger(__name__)
_ENABLED = os.environ.get("HIVE_BROWSER_HIGHLIGHTS", "1") != "0"
_COLOR = os.environ.get("HIVE_HIGHLIGHT_COLOR", "#FAC43B")
_DURATION_MS = int(os.environ.get("HIVE_HIGHLIGHT_DURATION_MS", "1500"))
_ANIMATION_WAIT_S = 0.35
_ANIMATION_WAIT_S = float(os.environ.get("HIVE_HIGHLIGHT_WAIT_S", "0"))
# ---------------------------------------------------------------------------
# JS templates
@@ -179,7 +182,8 @@ async def highlight_element(page: Page, selector: str) -> None:
_ELEMENT_HIGHLIGHT_JS,
[box, _COLOR, _DURATION_MS],
)
await asyncio.sleep(_ANIMATION_WAIT_S)
if _ANIMATION_WAIT_S > 0:
await asyncio.sleep(_ANIMATION_WAIT_S)
except Exception:
logger.debug("highlight_element failed for %s", selector, exc_info=True)
@@ -193,6 +197,7 @@ async def highlight_coordinate(page: Page, x: float, y: float) -> None:
_COORDINATE_HIGHLIGHT_JS,
[x, y, _COLOR, _DURATION_MS],
)
await asyncio.sleep(_ANIMATION_WAIT_S)
if _ANIMATION_WAIT_S > 0:
await asyncio.sleep(_ANIMATION_WAIT_S)
except Exception:
logger.debug("highlight_coordinate failed at (%s, %s)", x, y, exc_info=True)
+358 -108
View File
@@ -1,8 +1,9 @@
"""
Browser session management.
Manages Playwright browser instances with support for multiple profiles,
each with independent browser context and multiple tabs.
Connects to system-installed Chrome/Edge via CDP for browser automation.
Each session launches a Chrome subprocess with ``--remote-debugging-port``
and connects Playwright as a CDP client.
Supports three session types:
- Standard: Single browser with ephemeral or persistent context
@@ -13,8 +14,11 @@ Supports three session types:
from __future__ import annotations
import asyncio
import contextvars
import logging
import os
import sys
import time
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
@@ -165,43 +169,133 @@ VALID_WAIT_UNTIL = {"commit", "domcontentloaded", "load", "networkidle"}
# ---------------------------------------------------------------------------
# Shared browser for agent contexts
# ---------------------------------------------------------------------------
# All agent sessions share this single browser process. Created via
# chromium.launch() (not persistent context) so we can call
# browser.new_context() multiple times with different storage states.
# All agent sessions share this single Chrome process + CDP connection.
# We can call browser.new_context() multiple times with different storage states.
_shared_browser: Browser | None = None
_shared_playwright: Any = None
_shared_chrome_process: Any = None # ChromeProcess | None (avoid circular import)
_shared_cdp_port: int | None = None
# Chrome flags shared between all browser launches
_CHROME_ARGS = [
"--no-sandbox",
"--disable-setuid-sandbox",
"--disable-dev-shm-usage",
"--disable-blink-features=AutomationControlled",
"--no-first-run",
"--no-default-browser-check",
]
# ---------------------------------------------------------------------------
# Dynamic viewport sizing
# ---------------------------------------------------------------------------
DEFAULT_VIEWPORT_SCALE = 0.8
_FALLBACK_WIDTH = 1920
_FALLBACK_HEIGHT = 1080
def _detect_screen_resolution() -> tuple[int, int] | None:
"""Detect primary monitor resolution using platform-native tools.
Returns (width, height) or None if detection fails (headless, no display).
"""
if sys.platform == "darwin":
try:
import subprocess
out = subprocess.check_output(
["system_profiler", "SPDisplaysDataType"],
text=True,
timeout=5,
)
import re
match = re.search(r"Resolution:\s+(\d+)\s*x\s*(\d+)", out)
if match:
return int(match.group(1)), int(match.group(2))
except Exception:
pass
elif sys.platform == "win32":
try:
import ctypes
user32 = ctypes.windll.user32
return user32.GetSystemMetrics(0), user32.GetSystemMetrics(1)
except Exception:
pass
else:
# Linux — try xrandr
try:
import subprocess
out = subprocess.check_output(
["xrandr", "--current"],
text=True,
timeout=5,
)
import re
match = re.search(r"(\d+)x(\d+)\s+\d+\.\d+\*", out)
if match:
return int(match.group(1)), int(match.group(2))
except Exception:
pass
return None
def _get_viewport(scale: float | None = None) -> dict[str, int]:
"""Compute viewport as a percentage of the primary monitor resolution.
Falls back to 1920x1080 if screen detection fails (e.g. headless server).
Scale priority: explicit arg > env var > config file > default (0.8).
"""
if scale is None:
env_scale = os.environ.get("HIVE_BROWSER_VIEWPORT_SCALE")
if env_scale:
try:
scale = float(env_scale)
except ValueError:
logger.warning("Invalid HIVE_BROWSER_VIEWPORT_SCALE=%r, using default", env_scale)
if scale is None:
try:
from framework.config import get_gcu_viewport_scale
scale = get_gcu_viewport_scale()
except ImportError:
scale = DEFAULT_VIEWPORT_SCALE
scale = max(0.1, min(1.0, scale))
resolution = _detect_screen_resolution()
if resolution:
w, h = resolution
logger.debug("Detected screen resolution: %dx%d", w, h)
else:
w, h = _FALLBACK_WIDTH, _FALLBACK_HEIGHT
logger.debug("Could not detect screen resolution, using default %dx%d", w, h)
return {"width": int(w * scale), "height": int(h * scale)}
async def get_shared_browser(headless: bool = True) -> Browser:
"""Get or create the shared browser instance for agent contexts."""
global _shared_browser, _shared_playwright
global _shared_browser, _shared_playwright, _shared_chrome_process, _shared_cdp_port
if _shared_browser and _shared_browser.is_connected():
return _shared_browser
_shared_playwright = await async_playwright().start()
_shared_browser = await _shared_playwright.chromium.launch(
from .chrome_launcher import launch_chrome
from .port_manager import allocate_port
cdp_port = allocate_port("__shared__")
_shared_cdp_port = cdp_port
_shared_chrome_process = await launch_chrome(
cdp_port=cdp_port,
user_data_dir=None, # ephemeral
headless=headless,
args=_CHROME_ARGS,
)
logger.info("Started shared browser for agent contexts")
_shared_playwright = await async_playwright().start()
_shared_browser = await _shared_playwright.chromium.connect_over_cdp(
_shared_chrome_process.cdp_url
)
logger.info("Started shared browser for agent contexts (system Chrome)")
return _shared_browser
async def close_shared_browser() -> None:
"""Close the shared browser and clean up all agent contexts."""
global _shared_browser, _shared_playwright
global _shared_browser, _shared_playwright, _shared_chrome_process, _shared_cdp_port
if _shared_browser:
await _shared_browser.close()
@@ -212,6 +306,30 @@ async def close_shared_browser() -> None:
await _shared_playwright.stop()
_shared_playwright = None
if _shared_chrome_process:
await _shared_chrome_process.kill()
_shared_chrome_process = None
if _shared_cdp_port is not None:
from .port_manager import release_port
release_port(_shared_cdp_port)
_shared_cdp_port = None
@dataclass
class TabMeta:
"""Metadata for a tracked browser tab."""
created_at: float
"""Unix timestamp when the tab was registered."""
origin: str
"""Who opened this tab: "agent", "popup", "user", or "startup"."""
opener_url: str | None = None
"""URL of the page that triggered the popup (popup origin only)."""
@dataclass
class BrowserSession:
@@ -234,6 +352,7 @@ class BrowserSession:
pages: dict[str, Page] = field(default_factory=dict)
active_page_id: str | None = None
console_messages: dict[str, list[dict]] = field(default_factory=dict)
page_meta: dict[str, TabMeta] = field(default_factory=dict)
_playwright: Any = None
_lock: asyncio.Lock = field(default_factory=asyncio.Lock)
@@ -245,6 +364,9 @@ class BrowserSession:
# Session type: "standard" (default) or "agent" (ephemeral context from shared browser)
session_type: str = "standard"
# Chrome subprocess handle (standard sessions only)
_chrome_process: Any = None # ChromeProcess | None
def _is_running(self) -> bool:
"""Check if browser is currently running."""
if self.session_type == "agent":
@@ -254,9 +376,7 @@ class BrowserSession:
and self.browser is not None
and self.browser.is_connected()
)
if self.persistent:
# Persistent context doesn't have a separate browser object
return self.context is not None
# Both persistent and ephemeral now have a browser object via CDP
return self.browser is not None and self.browser.is_connected()
async def _health_check(self) -> None:
@@ -316,9 +436,17 @@ class BrowserSession:
pass
self._playwright = None
if self._chrome_process:
try:
await self._chrome_process.kill()
except Exception:
pass
self._chrome_process = None
self.pages.clear()
self.active_page_id = None
self.console_messages.clear()
self.page_meta.clear()
async def start(self, headless: bool = True, persistent: bool = True) -> dict:
"""
@@ -343,19 +471,12 @@ class BrowserSession:
"cdp_port": self.cdp_port,
}
from .chrome_launcher import launch_chrome
from .port_manager import allocate_port
self._playwright = await async_playwright().start()
self.persistent = persistent
# Common Chrome flags
chrome_args = [
"--no-sandbox",
"--disable-setuid-sandbox",
"--disable-dev-shm-usage",
"--disable-blink-features=AutomationControlled",
"--no-first-run",
"--no-default-browser-check",
]
if persistent:
# Get storage path from environment (set by AgentRunner)
storage_path_str = os.environ.get("HIVE_STORAGE_PATH")
@@ -370,66 +491,82 @@ class BrowserSession:
)
self.user_data_dir.mkdir(parents=True, exist_ok=True)
# Allocate CDP port
from .port_manager import allocate_port
self.cdp_port = allocate_port(self.profile)
chrome_args.append(f"--remote-debugging-port={self.cdp_port}")
logger.info(
f"Starting persistent browser: profile={self.profile}, "
f"user_data_dir={self.user_data_dir}, cdp_port={self.cdp_port}"
)
# Use launch_persistent_context for true Chrome profile persistence
# Note: Returns BrowserContext directly, no separate Browser object
self.context = await self._playwright.chromium.launch_persistent_context(
user_data_dir=str(self.user_data_dir),
headless=headless,
viewport={"width": 1920, "height": 1080},
user_agent=BROWSER_USER_AGENT,
locale="en-US",
args=chrome_args,
)
self.browser = None # No separate browser object with persistent context
# Inject stealth script to hide automation detection
await self.context.add_init_script(STEALTH_SCRIPT)
# Register existing pages from restored session
for page in self.context.pages:
target_id = f"tab_{id(page)}"
self.pages[target_id] = page
self.console_messages[target_id] = []
page.on("console", lambda msg, tid=target_id: self._capture_console(tid, msg))
if self.active_page_id is None:
self.active_page_id = target_id
# Set branded Hive start page on the first blank page
if self.context.pages:
first_page = self.context.pages[0]
url = first_page.url
# Only set branded content if it's a blank/new tab page
if url in ("", "about:blank", "chrome://newtab/"):
await first_page.set_content(HIVE_START_PAGE)
else:
# Ephemeral mode - original behavior
logger.info(f"Starting ephemeral browser: profile={self.profile}")
self.browser = await self._playwright.chromium.launch(
headless=headless,
args=chrome_args,
)
self.context = await self.browser.new_context(
viewport={"width": 1920, "height": 1080},
user_agent=BROWSER_USER_AGENT,
locale="en-US",
)
self.user_data_dir = None # chrome_launcher creates a temp dir
# Inject stealth script to hide automation detection
await self.context.add_init_script(STEALTH_SCRIPT)
# Allocate CDP port for system Chrome
self.cdp_port = allocate_port(self.profile)
logger.info(
f"Starting {'persistent' if persistent else 'ephemeral'} browser: "
f"profile={self.profile}, user_data_dir={self.user_data_dir}, "
f"cdp_port={self.cdp_port}"
)
# Launch system Chrome and connect via CDP
logger.info("start(): launching Chrome...")
try:
self._chrome_process = await launch_chrome(
cdp_port=self.cdp_port,
user_data_dir=self.user_data_dir,
headless=headless,
extra_args=[f"--user-agent={BROWSER_USER_AGENT}"],
)
logger.info("start(): Chrome launched, connecting CDP...")
self.browser = await self._playwright.chromium.connect_over_cdp(
self._chrome_process.cdp_url
)
except Exception as exc:
logger.error(f"Browser launch failed: {exc}")
await self._cleanup_after_failed_start()
raise
self.context = self.browser.contexts[0]
logger.info(
f"start(): CDP connected: contexts={len(self.browser.contexts)}, "
f"pages={len(self.context.pages)}"
)
# Inject stealth script to hide automation detection
await self.context.add_init_script(STEALTH_SCRIPT)
# Close ALL pages/contexts Chrome opened on startup (session
# restore, about:blank, new-tab page, etc.) and create a single
# clean page we fully control.
viewport = _get_viewport()
for ctx in self.browser.contexts[1:]:
try:
await ctx.close()
except Exception:
pass
logger.info("start(): closing %d initial pages...", len(self.context.pages))
for page in list(self.context.pages):
try:
await page.close()
except Exception:
pass
logger.info("start(): creating new page...")
first_page = await self.context.new_page()
logger.info("start(): setting viewport...")
await first_page.set_viewport_size(viewport)
# Register the clean page
target_id = f"tab_{id(first_page)}"
self._register_page(first_page, target_id, origin="startup")
# Set branded Hive start page on the initial tab
logger.info("start(): setting Hive start page content...")
await first_page.set_content(HIVE_START_PAGE)
# Auto-track pages opened by popups / target="_blank" links
# (attached after setup so it doesn't fire during startup)
self.context.on("page", self._handle_popup_page)
# Health check: confirm the browser is actually responsive
logger.info("start(): running health check...")
try:
await self._health_check()
except Exception as exc:
@@ -474,12 +611,18 @@ class BrowserSession:
if self._playwright:
await self._playwright.stop()
self._playwright = None
# Kill the Chrome subprocess
if self._chrome_process:
await self._chrome_process.kill()
self._chrome_process = None
else:
self.browser = None # Drop reference to shared browser
self.pages.clear()
self.active_page_id = None
self.console_messages.clear()
self.page_meta.clear()
self.user_data_dir = None
self.persistent = False
@@ -518,7 +661,7 @@ class BrowserSession:
# Create an isolated context stamped with the snapshot
context = await browser.new_context(
storage_state=storage_state,
viewport={"width": 1920, "height": 1080},
viewport=_get_viewport(),
user_agent=BROWSER_USER_AGENT,
locale="en-US",
)
@@ -530,6 +673,10 @@ class BrowserSession:
context=context,
session_type="agent",
)
# Auto-track pages opened by popups / target="_blank" links
context.on("page", session._handle_popup_page)
logger.info(f"Created agent session '{agent_id}' from profile '{source_session.profile}'")
return session
@@ -578,12 +725,7 @@ class BrowserSession:
page = await self.context.new_page()
target_id = f"tab_{id(page)}"
self.pages[target_id] = page
self.active_page_id = target_id
self.console_messages[target_id] = []
# Set up console message capture
page.on("console", lambda msg: self._capture_console(target_id, msg))
self._register_page(page, target_id, origin="agent")
await page.goto(url, wait_until=wait_until, timeout=DEFAULT_NAVIGATION_TIMEOUT_MS)
@@ -609,10 +751,7 @@ class BrowserSession:
# Nothing to steal focus from — just open normally
page = await self.context.new_page()
target_id = f"tab_{id(page)}"
self.pages[target_id] = page
self.active_page_id = target_id
self.console_messages[target_id] = []
page.on("console", lambda msg: self._capture_console(target_id, msg))
self._register_page(page, target_id, origin="agent")
await page.goto(url, wait_until=wait_until, timeout=DEFAULT_NAVIGATION_TIMEOUT_MS)
return {
"ok": True,
@@ -646,10 +785,8 @@ class BrowserSession:
await cdp.detach()
target_id = f"tab_{id(page)}"
self.pages[target_id] = page
# Don't update active_page_id — the whole point is to stay on the current tab
self.console_messages[target_id] = []
page.on("console", lambda msg: self._capture_console(target_id, msg))
self._register_page(page, target_id, set_active=False, origin="agent")
return {
"ok": True,
@@ -659,6 +796,71 @@ class BrowserSession:
"background": True,
}
def _handle_page_close(self, target_id: str) -> None:
"""Clean up session state when a page is closed (by user or programmatically)."""
self.pages.pop(target_id, None)
self.console_messages.pop(target_id, None)
self.page_meta.pop(target_id, None)
if self.active_page_id == target_id:
self.active_page_id = next(iter(self.pages), None)
if self.active_page_id:
logger.info("Active tab %s closed, switched to %s", target_id, self.active_page_id)
else:
logger.warning("Active tab %s closed, no remaining tabs", target_id)
def _handle_popup_page(self, page: Page) -> None:
"""Auto-register pages opened by popups or target="_blank" links.
Attached as a persistent listener via ``context.on("page", ...)``.
Skips pages already tracked (e.g. created by ``open_tab``).
"""
# context.on("page") fires for ALL new pages, including ones
# created explicitly by open_tab / _open_tab_background.
# Check identity to avoid double-registration.
for existing in self.pages.values():
if existing is page:
return
# Capture the opener's URL as context for the popup origin
opener_url: str | None = None
active_page = self.get_active_page()
if active_page:
try:
opener_url = active_page.url
except Exception:
pass
target_id = f"tab_{id(page)}"
self._register_page(
page, target_id, set_active=False, origin="popup", opener_url=opener_url
)
logger.info("Auto-registered popup page: %s (url=%s)", target_id, page.url)
def _register_page(
self,
page: Page,
target_id: str,
*,
set_active: bool = True,
origin: str = "user",
opener_url: str | None = None,
) -> None:
"""Register a page in the session with all necessary event listeners."""
if target_id in self.pages:
if set_active:
self.active_page_id = target_id
return
self.pages[target_id] = page
self.console_messages[target_id] = []
self.page_meta[target_id] = TabMeta(
created_at=time.time(),
origin=origin,
opener_url=opener_url,
)
page.on("console", lambda msg, tid=target_id: self._capture_console(tid, msg))
page.on("close", lambda tid=target_id: self._handle_page_close(tid))
if set_active:
self.active_page_id = target_id
def _capture_console(self, target_id: str, msg: Any) -> None:
"""Capture console messages for a tab."""
if target_id in self.console_messages:
@@ -678,6 +880,7 @@ class BrowserSession:
page = self.pages.pop(tid)
await page.close()
self.console_messages.pop(tid, None)
self.page_meta.pop(tid, None)
if self.active_page_id == tid:
self.active_page_id = next(iter(self.pages), None)
@@ -695,15 +898,19 @@ class BrowserSession:
async def list_tabs(self) -> list[dict]:
"""List all open tabs with their metadata."""
now = time.time()
tabs = []
for tid, page in self.pages.items():
try:
meta = self.page_meta.get(tid)
tabs.append(
{
"targetId": tid,
"url": page.url,
"title": await page.title(),
"active": tid == self.active_page_id,
"origin": meta.origin if meta else "unknown",
"age_seconds": int(now - meta.created_at) if meta else None,
}
)
except Exception:
@@ -729,14 +936,57 @@ class BrowserSession:
_sessions: dict[str, BrowserSession] = {}
# ContextVar that lets the framework inject a per-subagent profile without
# changing any tool signatures. Each asyncio Task (including those spawned
# by asyncio.gather) inherits a *copy* of the current context, so concurrent
# GCU subagents each see their own value here.
_active_profile: contextvars.ContextVar[str] = contextvars.ContextVar(
"hive_gcu_profile", default="default"
)
def get_session(profile: str = "default") -> BrowserSession:
"""Get or create a browser session for a profile."""
if profile not in _sessions:
_sessions[profile] = BrowserSession(profile=profile)
return _sessions[profile]
def set_active_profile(profile: str) -> contextvars.Token:
"""Set the active browser profile for the current async context.
Returns a token that can be passed to ``_active_profile.reset()`` to
restore the previous value when the subagent finishes.
"""
return _active_profile.set(profile)
def get_session(profile: str | None = None) -> BrowserSession:
"""Get or create a browser session for a profile.
If *profile* is not given, the value set by :func:`set_active_profile`
for the current async context is used (default: ``"default"``). This
allows the framework to automatically route concurrent GCU subagents to
separate browser contexts without any changes to tool call sites.
"""
resolved = profile if profile is not None else _active_profile.get()
if resolved not in _sessions:
_sessions[resolved] = BrowserSession(profile=resolved)
return _sessions[resolved]
def get_all_sessions() -> dict[str, BrowserSession]:
"""Get all registered sessions."""
return _sessions
async def shutdown_all_browsers() -> None:
"""Stop all browser sessions and the shared browser.
Called at server shutdown to kill orphaned Chrome processes.
"""
for name, session in list(_sessions.items()):
try:
await session.stop()
logger.info("Stopped browser session: %s", name)
except Exception as exc:
logger.warning("Error stopping session %s: %s", name, exc)
_sessions.clear()
try:
await close_shared_browser()
except Exception as exc:
logger.warning("Error closing shared browser: %s", exc)
+136 -14
View File
@@ -6,17 +6,58 @@ Tools for interacting with page elements.
from __future__ import annotations
import logging
from typing import Literal
from fastmcp import FastMCP
from playwright.async_api import (
Error as PlaywrightError,
Page,
TimeoutError as PlaywrightTimeout,
)
from ..highlight import highlight_coordinate, highlight_element
from ..session import DEFAULT_TIMEOUT_MS, get_session
logger = logging.getLogger(__name__)
_AUTO_SNAPSHOT_MAX_CHARS = 4000
async def _auto_snapshot(
page: Page,
*,
wait_for_nav: bool = False,
max_chars: int = _AUTO_SNAPSHOT_MAX_CHARS,
) -> str | None:
"""Capture a compact aria snapshot for auto-attach to action results.
Args:
page: Playwright Page instance.
wait_for_nav: If True, briefly wait for any in-flight navigation to
settle before snapshotting. Used after click actions that may
trigger page navigation.
max_chars: Truncate snapshot to this many characters. Keeps the
result small enough to survive conversation pruning (~10K char
protection budget). Set 0 to disable truncation.
"""
try:
if wait_for_nav:
try:
await page.wait_for_load_state("domcontentloaded", timeout=1000)
except Exception:
pass # No navigation happened — that's fine
snapshot = await page.locator(":root").aria_snapshot()
if snapshot and max_chars > 0 and len(snapshot) > max_chars:
snapshot = (
snapshot[:max_chars]
+ "\n... [truncated — call browser_snapshot for full page tree]"
)
return snapshot
except Exception:
logger.debug("_auto_snapshot failed", exc_info=True)
return None
def register_interaction_tools(mcp: FastMCP) -> None:
"""Register browser interaction tools."""
@@ -29,10 +70,14 @@ def register_interaction_tools(mcp: FastMCP) -> None:
button: Literal["left", "right", "middle"] = "left",
double_click: bool = False,
timeout_ms: int = DEFAULT_TIMEOUT_MS,
auto_snapshot: bool = True,
) -> dict:
"""
Click an element on the page.
Returns an accessibility snapshot of the page after the click
so you can decide your next action immediately.
Args:
selector: CSS selector or element ref (e.g., 'e12' from snapshot)
target_id: Tab ID (default: active tab)
@@ -40,9 +85,10 @@ def register_interaction_tools(mcp: FastMCP) -> None:
button: Mouse button to click (left, right, middle)
double_click: Perform double-click (default: False)
timeout_ms: Timeout in milliseconds (default: 30000)
auto_snapshot: Include page snapshot in result (default: True)
Returns:
Dict with click result
Dict with click result and optional snapshot
"""
try:
session = get_session(profile)
@@ -57,7 +103,13 @@ def register_interaction_tools(mcp: FastMCP) -> None:
else:
await page.click(selector, button=button, timeout=timeout_ms)
return {"ok": True, "action": "click", "selector": selector}
result: dict = {"ok": True, "action": "click", "selector": selector}
if auto_snapshot:
snapshot = await _auto_snapshot(page, wait_for_nav=True)
if snapshot:
result["snapshot"] = snapshot
result["url"] = page.url
return result
except PlaywrightTimeout:
return {"ok": False, "error": f"Element not found: {selector}"}
except PlaywrightError as e:
@@ -70,19 +122,23 @@ def register_interaction_tools(mcp: FastMCP) -> None:
target_id: str | None = None,
profile: str = "default",
button: Literal["left", "right", "middle"] = "left",
auto_snapshot: bool = True,
) -> dict:
"""
Click at specific viewport coordinates.
Returns an accessibility snapshot of the page after the click.
Args:
x: X coordinate in the viewport
y: Y coordinate in the viewport
target_id: Tab ID (default: active tab)
profile: Browser profile name (default: "default")
button: Mouse button to click (left, right, middle)
auto_snapshot: Include page snapshot in result (default: True)
Returns:
Dict with click result
Dict with click result and optional snapshot
"""
try:
session = get_session(profile)
@@ -93,7 +149,13 @@ def register_interaction_tools(mcp: FastMCP) -> None:
await highlight_coordinate(page, x, y)
await page.mouse.click(x, y, button=button)
return {"ok": True, "action": "click_coordinate", "x": x, "y": y}
result: dict = {"ok": True, "action": "click_coordinate", "x": x, "y": y}
if auto_snapshot:
snapshot = await _auto_snapshot(page, wait_for_nav=True)
if snapshot:
result["snapshot"] = snapshot
result["url"] = page.url
return result
except PlaywrightError as e:
return {"ok": False, "error": f"Click failed: {e!s}"}
@@ -106,10 +168,13 @@ def register_interaction_tools(mcp: FastMCP) -> None:
delay_ms: int = 0,
clear_first: bool = True,
timeout_ms: int = DEFAULT_TIMEOUT_MS,
auto_snapshot: bool = True,
) -> dict:
"""
Type text into an input element.
Returns an accessibility snapshot of the page after typing.
Args:
selector: CSS selector or element ref (e.g., 'e12' from snapshot)
text: Text to type
@@ -118,9 +183,10 @@ def register_interaction_tools(mcp: FastMCP) -> None:
delay_ms: Delay between keystrokes in ms (default: 0)
clear_first: Clear existing text before typing (default: True)
timeout_ms: Timeout in milliseconds (default: 30000)
auto_snapshot: Include page snapshot in result (default: True)
Returns:
Dict with type result
Dict with type result and optional snapshot
"""
try:
session = get_session(profile)
@@ -134,7 +200,13 @@ def register_interaction_tools(mcp: FastMCP) -> None:
await page.fill(selector, "", timeout=timeout_ms)
await page.type(selector, text, delay=delay_ms, timeout=timeout_ms)
return {"ok": True, "action": "type", "selector": selector, "length": len(text)}
result: dict = {"ok": True, "action": "type", "selector": selector, "length": len(text)}
if auto_snapshot:
snapshot = await _auto_snapshot(page)
if snapshot:
result["snapshot"] = snapshot
result["url"] = page.url
return result
except PlaywrightTimeout:
return {"ok": False, "error": f"Element not found: {selector}"}
except PlaywrightError as e:
@@ -147,11 +219,13 @@ def register_interaction_tools(mcp: FastMCP) -> None:
target_id: str | None = None,
profile: str = "default",
timeout_ms: int = DEFAULT_TIMEOUT_MS,
auto_snapshot: bool = True,
) -> dict:
"""
Fill an input element with a value (clears existing content first).
Faster than browser_type for filling form fields.
Returns an accessibility snapshot of the page after filling.
Args:
selector: CSS selector or element ref
@@ -159,9 +233,10 @@ def register_interaction_tools(mcp: FastMCP) -> None:
target_id: Tab ID (default: active tab)
profile: Browser profile name (default: "default")
timeout_ms: Timeout in milliseconds (default: 30000)
auto_snapshot: Include page snapshot in result (default: True)
Returns:
Dict with fill result
Dict with fill result and optional snapshot
"""
try:
session = get_session(profile)
@@ -172,7 +247,13 @@ def register_interaction_tools(mcp: FastMCP) -> None:
await highlight_element(page, selector)
await page.fill(selector, value, timeout=timeout_ms)
return {"ok": True, "action": "fill", "selector": selector}
result: dict = {"ok": True, "action": "fill", "selector": selector}
if auto_snapshot:
snapshot = await _auto_snapshot(page)
if snapshot:
result["snapshot"] = snapshot
result["url"] = page.url
return result
except PlaywrightTimeout:
return {"ok": False, "error": f"Element not found: {selector}"}
except PlaywrightError as e:
@@ -255,19 +336,23 @@ def register_interaction_tools(mcp: FastMCP) -> None:
target_id: str | None = None,
profile: str = "default",
timeout_ms: int = DEFAULT_TIMEOUT_MS,
auto_snapshot: bool = True,
) -> dict:
"""
Select option(s) in a dropdown/select element.
Returns an accessibility snapshot of the page after selection.
Args:
selector: CSS selector for the select element
values: List of values to select
target_id: Tab ID (default: active tab)
profile: Browser profile name (default: "default")
timeout_ms: Timeout in milliseconds (default: 30000)
auto_snapshot: Include page snapshot in result (default: True)
Returns:
Dict with select result
Dict with select result and optional snapshot
"""
try:
session = get_session(profile)
@@ -276,7 +361,18 @@ def register_interaction_tools(mcp: FastMCP) -> None:
return {"ok": False, "error": "No active tab"}
selected = await page.select_option(selector, values, timeout=timeout_ms)
return {"ok": True, "action": "select", "selector": selector, "selected": selected}
result: dict = {
"ok": True,
"action": "select",
"selector": selector,
"selected": selected,
}
if auto_snapshot:
snapshot = await _auto_snapshot(page)
if snapshot:
result["snapshot"] = snapshot
result["url"] = page.url
return result
except PlaywrightTimeout:
return {"ok": False, "error": f"Element not found: {selector}"}
except PlaywrightError as e:
@@ -289,19 +385,24 @@ def register_interaction_tools(mcp: FastMCP) -> None:
selector: str | None = None,
target_id: str | None = None,
profile: str = "default",
auto_snapshot: bool = True,
) -> dict:
"""
Scroll the page or an element.
Returns an accessibility snapshot of the page after scrolling
so you can see newly loaded content immediately.
Args:
direction: Scroll direction (up, down, left, right)
amount: Scroll amount in pixels (default: 500)
selector: Element to scroll (optional, scrolls page if not provided)
target_id: Tab ID (default: active tab)
profile: Browser profile name (default: "default")
auto_snapshot: Include page snapshot in result (default: True)
Returns:
Dict with scroll result
Dict with scroll result and optional snapshot
"""
try:
session = get_session(profile)
@@ -327,7 +428,18 @@ def register_interaction_tools(mcp: FastMCP) -> None:
else:
await page.mouse.wheel(delta_x, delta_y)
return {"ok": True, "action": "scroll", "direction": direction, "amount": amount}
result: dict = {
"ok": True,
"action": "scroll",
"direction": direction,
"amount": amount,
}
if auto_snapshot:
snapshot = await _auto_snapshot(page)
if snapshot:
result["snapshot"] = snapshot
result["url"] = page.url
return result
except PlaywrightError as e:
return {"ok": False, "error": f"Scroll failed: {e!s}"}
@@ -338,19 +450,23 @@ def register_interaction_tools(mcp: FastMCP) -> None:
target_id: str | None = None,
profile: str = "default",
timeout_ms: int = DEFAULT_TIMEOUT_MS,
auto_snapshot: bool = True,
) -> dict:
"""
Drag from one element to another.
Returns an accessibility snapshot of the page after the drag.
Args:
start_selector: CSS selector for drag start element
end_selector: CSS selector for drag end element
target_id: Tab ID (default: active tab)
profile: Browser profile name (default: "default")
timeout_ms: Timeout in milliseconds (default: 30000)
auto_snapshot: Include page snapshot in result (default: True)
Returns:
Dict with drag result
Dict with drag result and optional snapshot
"""
try:
session = get_session(profile)
@@ -363,12 +479,18 @@ def register_interaction_tools(mcp: FastMCP) -> None:
end_selector,
timeout=timeout_ms,
)
return {
result: dict = {
"ok": True,
"action": "drag",
"from": start_selector,
"to": end_selector,
}
if auto_snapshot:
snapshot = await _auto_snapshot(page)
if snapshot:
result["snapshot"] = snapshot
result["url"] = page.url
return result
except PlaywrightTimeout:
return {"ok": False, "error": "Element not found for drag operation"}
except PlaywrightError as e:
@@ -24,6 +24,10 @@ def register_navigation_tools(mcp: FastMCP) -> None:
"""
Navigate the current tab to a URL.
This tool already waits for the page to reach the ``wait_until``
condition (default: ``domcontentloaded``) before returning.
You do NOT need to call ``browser_wait`` afterward.
Args:
url: URL to navigate to
target_id: Tab ID to navigate (default: active tab)
+97 -3
View File
@@ -17,17 +17,38 @@ def register_tab_tools(mcp: FastMCP) -> None:
@mcp.tool()
async def browser_tabs(profile: str = "default") -> dict:
"""
List all open browser tabs.
List all open browser tabs with origin and age metadata.
Each tab includes:
- ``targetId``: Unique tab identifier
- ``url``: Current URL
- ``title``: Page title
- ``active``: Whether this is the active tab
- ``origin``: Who opened the tab ``"agent"`` (you opened it),
``"popup"`` (opened by a link/script), ``"startup"`` (initial
browser tab), or ``"user"`` (opened externally)
- ``age_seconds``: How long the tab has been open
The response also includes summary counts: ``total``,
``agent_count``, and ``popup_count``.
Args:
profile: Browser profile name (default: "default")
Returns:
Dict with list of tabs (targetId, url, title, active)
Dict with list of tabs and summary counts
"""
session = get_session(profile)
tabs = await session.list_tabs()
return {"ok": True, "tabs": tabs}
agent_count = sum(1 for t in tabs if t.get("origin") == "agent")
popup_count = sum(1 for t in tabs if t.get("origin") == "popup")
return {
"ok": True,
"tabs": tabs,
"total": len(tabs),
"agent_count": agent_count,
"popup_count": popup_count,
}
@mcp.tool()
async def browser_open(
@@ -39,6 +60,10 @@ def register_tab_tools(mcp: FastMCP) -> None:
"""
Open a new browser tab and navigate to the given URL.
This tool already waits for the page to reach the ``wait_until``
condition (default: ``load``) before returning.
You do NOT need to call ``browser_wait`` afterward.
Args:
url: URL to navigate to
background: Open in background without stealing focus
@@ -90,3 +115,72 @@ def register_tab_tools(mcp: FastMCP) -> None:
"""
session = get_session(profile)
return await session.focus_tab(target_id)
@mcp.tool()
async def browser_close_all(keep_active: bool = True, profile: str = "default") -> dict:
"""
Close all browser tabs, optionally keeping the active tab.
Args:
keep_active: If True (default), keep the active tab open.
If False, close ALL tabs (browser remains running).
profile: Browser profile name (default: "default")
Returns:
Dict with number of closed tabs and remaining count
"""
session = get_session(profile)
to_close = [
tid
for tid in list(session.pages.keys())
if not (keep_active and tid == session.active_page_id)
]
closed = 0
for tid in to_close:
result = await session.close_tab(tid)
if result.get("ok"):
closed += 1
return {"ok": True, "closed_count": closed, "remaining": len(session.pages)}
@mcp.tool()
async def browser_close_finished(keep_active: bool = True, profile: str = "default") -> dict:
"""
Close all agent-opened and popup tabs that you are done with.
This is the preferred cleanup tool during and after multi-tab tasks.
It only closes tabs with ``origin="agent"`` or ``origin="popup"``,
leaving ``"startup"`` and ``"user"`` tabs untouched.
Use this instead of ``browser_close_all`` when you want to clean up
your own tabs without disturbing tabs the user may have open.
Args:
keep_active: If True (default), skip closing the active tab even
if it is agent- or popup-owned. Set to False to close it too.
profile: Browser profile name (default: "default")
Returns:
Dict with closed_count, skipped_count, and remaining tab count
"""
session = get_session(profile)
closeable_origins = {"agent", "popup"}
to_close = [
tid
for tid, meta in session.page_meta.items()
if meta.origin in closeable_origins
and not (keep_active and tid == session.active_page_id)
]
closed = 0
skipped = 0
for tid in to_close:
result = await session.close_tab(tid)
if result.get("ok"):
closed += 1
else:
skipped += 1
return {
"ok": True,
"closed_count": closed,
"skipped_count": skipped,
"remaining": len(session.pages),
}
+35 -1
View File
@@ -21,9 +21,13 @@ Environment Variables:
from __future__ import annotations
import argparse
import asyncio
import atexit
import logging
import os
import sys
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
logger = logging.getLogger(__name__)
@@ -57,7 +61,37 @@ from fastmcp import FastMCP # noqa: E402
from gcu import register_gcu_tools # noqa: E402
mcp = FastMCP("gcu-tools")
# ---------------------------------------------------------------------------
# Shutdown hooks — kill Chrome processes when the server exits
# ---------------------------------------------------------------------------
@asynccontextmanager
async def _lifespan(server: FastMCP) -> AsyncIterator[dict]:
"""FastMCP lifespan hook: clean up all browsers on shutdown."""
yield {}
from gcu.browser.session import shutdown_all_browsers
logger.info("Server shutting down, cleaning up browser sessions...")
await shutdown_all_browsers()
def _sync_shutdown() -> None:
"""atexit fallback: run async browser cleanup from sync context.
Covers SIGTERM and other exits where the lifespan teardown may not run.
"""
from gcu.browser.session import shutdown_all_browsers
try:
asyncio.run(shutdown_all_browsers())
except Exception:
pass
atexit.register(_sync_shutdown)
mcp = FastMCP("gcu-tools", lifespan=_lifespan)
def main() -> None: