fix: browser tools

This commit is contained in:
Timothy
2026-04-03 15:47:54 -07:00
parent d5ae7e6c4b
commit 102866780c
10 changed files with 1155 additions and 250 deletions
+40 -11
View File
@@ -14,9 +14,9 @@ Curated list of known browser automation edge cases with symptoms, causes, and f
| **Symptom** | `browser_scroll()` returns `{ok: true}` but page doesn't move |
| **Root Cause** | Content is in a nested scrollable div (`overflow: scroll`), not the main window |
| **Detection** | `document.querySelectorAll('*')` with `overflow: scroll/auto` has large candidates |
| **Fix** | Find largest scrollable container, dispatch mouse wheel at its center coordinates |
| **Code** | `bridge.py:808-981` - smart scroll with container detection |
| **Verified** | 2026-04-02 |
| **Fix** | JavaScript finds largest scrollable container, uses `container.scrollBy()` |
| **Code** | `bridge.py:808-891` - smart scroll with container detection |
| **Verified** | 2026-04-03 ✓ |
### #2: Twitter/X Lazy Loading
@@ -80,7 +80,7 @@ Curated list of known browser automation edge cases with symptoms, causes, and f
| **Detection** | `element.shadowRoot !== null` on parent elements |
| **Fix** | Use piercing selector (`host >>> target`) or traverse shadow roots |
| **Code** | See SKILL.md P6 pattern |
| **Verified** | - |
| **Verified** | 2026-04-03 ✓ |
---
@@ -96,7 +96,7 @@ Curated list of known browser automation edge cases with symptoms, causes, and f
| **Detection** | `element.contentEditable === 'true'` |
| **Fix** | Focus via JavaScript, use `execCommand('insertText')` or `Input.dispatchKeyEvent` |
| **Code** | `bridge.py:616-694` - contentEditable handling |
| **Verified** | - |
| **Verified** | 2026-04-03 ✓ |
### #8: Autocomplete Field Clearing
@@ -108,7 +108,7 @@ Curated list of known browser automation edge cases with symptoms, causes, and f
| **Detection** | Field has autocomplete listeners or dropdown appears |
| **Fix** | Add `delay_ms=50` between keystrokes |
| **Code** | `bridge.py:type()` - delay_ms parameter |
| **Verified** | - |
| **Verified** | 2026-04-03 ✓ |
### #9: Custom Date Pickers
@@ -134,9 +134,9 @@ Curated list of known browser automation edge cases with symptoms, causes, and f
| **Symptom** | `browser_snapshot()` hangs forever |
| **Root Cause** | 10k+ DOM nodes, accessibility tree has 50k+ nodes |
| **Detection** | `document.querySelectorAll('*').length > 5000` |
| **Fix** | Add timeout (10s default), truncate tree at 2000 nodes |
| **Code** | `bridge.py:1005-1050` - timeout_s param, max_nodes limit |
| **Verified** | 2026-04-02 |
| **Fix** | Add `timeout_s` param with `asyncio.timeout()`, proper error handling |
| **Code** | `bridge.py:1041-1028` - snapshot with timeout protection |
| **Verified** | 2026-04-03 ✓ (0.08s on LinkedIn) |
### #11: SPA Hydration Delay
@@ -192,6 +192,34 @@ Curated list of known browser automation edge cases with symptoms, causes, and f
---
## Screenshot Issues
### #15: Selector Screenshot Not Implemented
| Attribute | Value |
|-----------|-------|
| **Site** | Any site |
| **Symptom** | `browser_screenshot(selector="h1")` takes full viewport instead of element |
| **Root Cause** | `selector` param existed in signature but was silently ignored in both `bridge.py` and `inspection.py` |
| **Detection** | Screenshot with selector same byte size as screenshot without selector |
| **Fix** | Use CDP `Runtime.evaluate` to call `getBoundingClientRect()` on the element, pass result as `clip` to `Page.captureScreenshot` |
| **Code** | `bridge.py:1315-1344` - selector clip logic; `inspection.py:94-96` - pass selector to bridge |
| **Verified** | 2026-04-03 ✓ (JS rect query returns correct viewport coords; requires server restart) |
### #16: Stale Browser Context (Group ID Mismatch)
| Attribute | Value |
|-----------|-------|
| **Site** | Any |
| **Symptom** | `browser_open()` returns `"No group with id: XXXXXXX"` even though `browser_status` shows `running: true` |
| **Root Cause** | In-memory `_contexts` dict has a stale `groupId` from a Chrome tab group that was closed outside the tool (e.g. user closed the tab group) |
| **Detection** | `browser_status` returns `running: true` but `browser_open` fails with "No group with id" |
| **Fix** | Call `browser_stop()` to clear stale context from `_contexts`, then `browser_start()` again |
| **Code** | `tools/lifecycle.py:144-160` - `already_running` check uses cached dict without validating against Chrome |
| **Verified** | 2026-04-03 ✓ |
---
## How to Add New Edge Cases
1. **Reproduce** the issue with minimal test case
@@ -227,6 +255,7 @@ Curated list of known browser automation edge cases with symptoms, causes, and f
| Input Issues | 3 |
| Snapshot Issues | 3 |
| Navigation Issues | 2 |
| **Total** | **14** |
| Screenshot Issues | 2 |
| **Total** | **16** |
Last updated: 2026-04-02
Last updated: 2026-04-03
@@ -71,8 +71,11 @@ async def test_shadow_dom():
</html>
"""
data_url = f"data:text/html;base64,{base64.b64encode(test_html.encode()).decode()}"
await bridge.navigate(tab_id, data_url, wait_until="load")
# Write to file and use file:// URL (data: URLs don't work well with extension)
test_file = Path("/tmp/shadow_dom_test.html")
test_file.write_text(test_html.strip())
file_url = f"file://{test_file}"
await bridge.navigate(tab_id, file_url, wait_until="load")
print("✓ Page loaded")
# Screenshot
@@ -132,10 +135,10 @@ async def test_shadow_dom():
tab_id,
"(function() { return window.shadowClickCount || 0; })()"
)
count = count_result.get("result", 0)
count = count_result.get("result") or 0
print(f"Shadow click count: {count}")
if count > 0:
if count and count > 0:
print("✓ PASS: Shadow DOM element clicked successfully")
else:
print("✗ FAIL: Could not click Shadow DOM element")
@@ -80,13 +80,19 @@ async def test_contenteditable():
</html>
"""
data_url = f"data:text/html;base64,{base64.b64encode(test_html.encode()).decode()}"
await bridge.navigate(tab_id, data_url, wait_until="load")
# Write to file and use file:// URL (data: URLs don't work well with extension)
test_file = Path("/tmp/contenteditable_test.html")
test_file.write_text(test_html.strip())
file_url = f"file://{test_file}"
await bridge.navigate(tab_id, file_url, wait_until="load")
print("✓ Page loaded")
# Screenshot
screenshot = await bridge.screenshot(tab_id)
print(f"Screenshot: {len(screenshot.get('data', ''))} bytes")
# Screenshot with timeout protection
try:
screenshot = await asyncio.wait_for(bridge.screenshot(tab_id), timeout=10.0)
print(f"Screenshot: {len(screenshot.get('data', ''))} bytes")
except asyncio.TimeoutError:
print("Screenshot timed out (skipping)")
# Detect contenteditable
print("\n--- Detecting contenteditable elements ---")
@@ -107,7 +113,7 @@ async def test_contenteditable():
# Test 1: Type into regular input (baseline)
print("\n--- Test 1: Regular input ---")
await bridge.click(tab_id, "#input1")
await bridge.type(tab_id, "#input1", "Hello input")
await bridge.type_text(tab_id, "#input1", "Hello input")
input_result = await bridge.evaluate(
tab_id,
"(function() { return document.getElementById('input1').value; })()"
@@ -117,7 +123,7 @@ async def test_contenteditable():
# Test 2: Type into contenteditable div
print("\n--- Test 2: Contenteditable div ---")
await bridge.click(tab_id, "#editor1")
await bridge.type(tab_id, "#editor1", "Hello contenteditable", clear_first=True)
await bridge.type_text(tab_id, "#editor1", "Hello contenteditable", clear_first=True)
editor_result = await bridge.evaluate(
tab_id,
"(function() { return document.getElementById('editor1').innerText; })()"
@@ -140,9 +146,12 @@ async def test_contenteditable():
)
print(f"Editor2 after execCommand: {insert_result.get('result', '')}")
# Screenshot after
screenshot_after = await bridge.screenshot(tab_id)
print(f"Screenshot after: {len(screenshot_after.get('data', ''))} bytes")
# Screenshot after with timeout protection
try:
screenshot_after = await asyncio.wait_for(bridge.screenshot(tab_id), timeout=10.0)
print(f"Screenshot after: {len(screenshot_after.get('data', ''))} bytes")
except asyncio.TimeoutError:
print("Screenshot after timed out (skipping)")
# Results
print("\n--- Results ---")
@@ -154,8 +154,11 @@ async def test_autocomplete():
</html>
"""
data_url = f"data:text/html;base64,{base64.b64encode(test_html.encode()).decode()}"
await bridge.navigate(tab_id, data_url, wait_until="load")
# Write to file and use file:// URL (data: URLs don't work well with extension)
test_file = Path("/tmp/autocomplete_test.html")
test_file.write_text(test_html.strip())
file_url = f"file://{test_file}"
await bridge.navigate(tab_id, file_url, wait_until="load")
print("✓ Page loaded")
# Screenshot
@@ -165,7 +168,7 @@ async def test_autocomplete():
# Test 1: Fast typing (no delay) - may fail
print("\n--- Test 1: Fast typing (delay_ms=0) ---")
await bridge.click(tab_id, "#search")
await bridge.type(tab_id, "#search", "Ger", clear_first=True, delay_ms=0)
await bridge.type_text(tab_id, "#search", "Ger", clear_first=True, delay_ms=0)
await asyncio.sleep(0.5)
fast_result = await bridge.evaluate(
@@ -185,7 +188,7 @@ async def test_autocomplete():
# Test 2: Slow typing (with delay) - should work
print("\n--- Test 2: Slow typing (delay_ms=100) ---")
await bridge.click(tab_id, "#search")
await bridge.type(tab_id, "#search", "United", clear_first=True, delay_ms=100)
await bridge.type_text(tab_id, "#search", "United", clear_first=True, delay_ms=100)
await asyncio.sleep(0.5)
slow_result = await bridge.evaluate(
@@ -94,9 +94,9 @@ async def test_huge_dom():
elem_count = count_result.get("result", 0)
print(f"DOM elements: {elem_count}")
# Screenshot to verify page loaded
screenshot = await bridge.screenshot(tab_id)
print(f"Screenshot: {len(screenshot.get('data', ''))} bytes")
# Skip screenshot on huge DOM - it can timeout
# Instead verify page loaded by checking DOM
print("✓ Page verified (skipping screenshot on huge DOM)")
# Test snapshot with timeout
print("\n--- Testing snapshot with 10s timeout ---")
@@ -106,12 +106,15 @@ async def test_spa_navigation():
</html>
"""
data_url = f"data:text/html;base64,{base64.b64encode(spa_html.encode()).decode()}"
# Write to file and use file:// URL (data: URLs don't work well with extension)
test_file = Path("/tmp/spa_test.html")
test_file.write_text(spa_html.strip())
file_url = f"file://{test_file}"
# Test 1: wait_until="load" - may fire before content ready
print("\n--- Test 1: wait_until='load' ---")
start = time.perf_counter()
await bridge.navigate(tab_id, data_url, wait_until="load")
await bridge.navigate(tab_id, file_url, wait_until="load")
elapsed = time.perf_counter() - start
print(f"Navigation completed in {elapsed:.3f}s")
@@ -159,7 +162,7 @@ async def test_spa_navigation():
# Test 3: wait_until="networkidle"
print("\n--- Test 3: wait_until='networkidle' ---")
await bridge.navigate(tab_id, data_url, wait_until="networkidle", timeout_ms=10000)
await bridge.navigate(tab_id, file_url, wait_until="networkidle", timeout_ms=10000)
# Check content immediately
content_networkidle = await bridge.evaluate(
@@ -0,0 +1,262 @@
#!/usr/bin/env python
"""
Test #15: Screenshot Functionality
Tests browser_screenshot across multiple scenarios:
- Basic viewport screenshot
- Full-page screenshot
- Selector-based screenshot
- Screenshot on complex DOM
- Timeout handling
Category: screenshot
"""
import asyncio
import base64
import sys
import time
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent.parent.parent / "tools" / "src"))
from gcu.browser.bridge import BeelineBridge
CONTEXT_NAME = "screenshot-test"
SIMPLE_HTML = """<!DOCTYPE html>
<html>
<head><style>
body { margin: 0; background: #fff; font-family: sans-serif; }
h1 { color: #333; padding: 20px; }
.box { width: 200px; height: 100px; background: #4a90e2; margin: 20px; }
.long-content { height: 2000px; background: linear-gradient(blue, red); }
</style></head>
<body>
<h1 id="title">Screenshot Test Page</h1>
<div class="box" id="target-box">Target Box</div>
<div class="long-content"></div>
</body>
</html>"""
def check_png(data: str) -> bool:
"""Verify that base64 data decodes to a valid PNG."""
try:
raw = base64.b64decode(data)
return raw[:8] == b'\x89PNG\r\n\x1a\n'
except Exception:
return False
async def test_basic_screenshot(bridge: BeelineBridge, tab_id: int, data_url: str):
print("\n--- Test 1: Basic Viewport Screenshot ---")
await bridge.navigate(tab_id, data_url, wait_until="load")
await asyncio.sleep(0.5)
start = time.perf_counter()
result = await bridge.screenshot(tab_id)
elapsed = time.perf_counter() - start
ok = result.get("ok")
data = result.get("data", "")
mime = result.get("mimeType", "")
print(f" ok={ok}, mimeType={mime}, elapsed={elapsed:.3f}s")
print(f" data length: {len(data)} chars")
if ok and data:
valid_png = check_png(data)
print(f" valid PNG: {valid_png}")
if valid_png:
raw = base64.b64decode(data)
print(f" PNG size: {len(raw)} bytes")
print(" ✓ PASS: Basic screenshot works")
return True
else:
print(" ✗ FAIL: Data is not a valid PNG")
else:
print(f" ✗ FAIL: {result.get('error', 'no data')}")
return False
async def test_full_page_screenshot(bridge: BeelineBridge, tab_id: int, data_url: str):
print("\n--- Test 2: Full Page Screenshot ---")
await bridge.navigate(tab_id, data_url, wait_until="load")
await asyncio.sleep(0.5)
viewport_result = await bridge.screenshot(tab_id, full_page=False)
full_result = await bridge.screenshot(tab_id, full_page=True)
v_data = viewport_result.get("data", "")
f_data = full_result.get("data", "")
if not v_data or not f_data:
print(f" ✗ FAIL: viewport ok={viewport_result.get('ok')}, full ok={full_result.get('ok')}")
return False
v_size = len(base64.b64decode(v_data))
f_size = len(base64.b64decode(f_data))
print(f" Viewport PNG: {v_size} bytes")
print(f" Full page PNG: {f_size} bytes")
if f_size > v_size:
print(" ✓ PASS: Full page larger than viewport")
return True
else:
print(" ✗ FAIL: Full page not larger than viewport (may not capture long pages)")
return False
async def test_selector_screenshot(bridge: BeelineBridge, tab_id: int, data_url: str):
print("\n--- Test 3: Selector Screenshot ---")
await bridge.navigate(tab_id, data_url, wait_until="load")
await asyncio.sleep(0.5)
# selector param exists in signature but may not be implemented
result = await bridge.screenshot(tab_id, selector="#target-box")
ok = result.get("ok")
data = result.get("data", "")
if ok and data:
# If implemented, the box screenshot should be smaller than a full viewport screenshot
full_result = await bridge.screenshot(tab_id)
full_data = full_result.get("data", "")
if full_data:
sel_size = len(base64.b64decode(data))
full_size = len(base64.b64decode(full_data))
print(f" Selector PNG: {sel_size} bytes")
print(f" Full page PNG: {full_size} bytes")
if sel_size < full_size:
print(" ✓ PASS: Selector screenshot smaller than full page")
return True
else:
print(" ⚠ WARNING: Selector screenshot not smaller (may be full page)")
return False
else:
print(f" ⚠ NOT IMPLEMENTED: selector param ignored (returns full page) - error={result.get('error')}")
print(" NOTE: selector parameter exists in signature but is not used in implementation")
return False
async def test_screenshot_url_metadata(bridge: BeelineBridge, tab_id: int):
print("\n--- Test 4: Screenshot URL Metadata ---")
await bridge.navigate(tab_id, "https://example.com", wait_until="load")
await asyncio.sleep(1)
result = await bridge.screenshot(tab_id)
url = result.get("url", "")
tab = result.get("tabId")
print(f" url={url!r}, tabId={tab}")
if "example.com" in url:
print(" ✓ PASS: URL metadata captured correctly")
return True
else:
print(f" ✗ FAIL: Expected example.com in URL, got {url!r}")
return False
async def test_screenshot_timeout(bridge: BeelineBridge, tab_id: int, data_url: str):
print("\n--- Test 5: Timeout Handling ---")
await bridge.navigate(tab_id, data_url, wait_until="load")
# Very short timeout - likely still completes since simple page
start = time.perf_counter()
result = await bridge.screenshot(tab_id, timeout_s=0.001)
elapsed = time.perf_counter() - start
if not result.get("ok"):
err = result.get("error", "")
if "timed out" in err or "cancelled" in err:
print(f" ✓ PASS: Timeout handled gracefully: {err!r}")
return True
else:
print(f" ⚠ Fast enough to beat timeout: {err!r} in {elapsed:.3f}s")
return True # Not a failure, just fast
else:
print(f" ⚠ Screenshot completed before timeout ({elapsed:.3f}s) - too fast to test timeout")
return True # Still ok, just very fast
async def test_screenshot_complex_site(bridge: BeelineBridge, tab_id: int):
print("\n--- Test 6: Complex Site (example.com) ---")
await bridge.navigate(tab_id, "https://example.com", wait_until="load")
await asyncio.sleep(1)
start = time.perf_counter()
result = await bridge.screenshot(tab_id)
elapsed = time.perf_counter() - start
ok = result.get("ok")
data = result.get("data", "")
print(f" ok={ok}, elapsed={elapsed:.3f}s, data_len={len(data)}")
if ok and check_png(data):
print(" ✓ PASS: Screenshot on real site works")
return True
else:
print(f" ✗ FAIL: {result.get('error', 'bad data')}")
return False
async def main():
print("=" * 70)
print("TEST #15: Screenshot Functionality")
print("=" * 70)
bridge = BeelineBridge()
try:
await bridge.start()
for i in range(10):
await asyncio.sleep(1)
if bridge.is_connected:
print("✓ Extension connected!")
break
print(f"Waiting for extension... ({i+1}/10)")
else:
print("✗ Extension not connected. Ensure Chrome with Beeline extension is running.")
return
context = await bridge.create_context(CONTEXT_NAME)
tab_id = context.get("tabId")
group_id = context.get("groupId")
print(f"✓ Created tab: {tab_id}")
data_url = f"data:text/html;base64,{base64.b64encode(SIMPLE_HTML.encode()).decode()}"
results = {
"basic": await test_basic_screenshot(bridge, tab_id, data_url),
"full_page": await test_full_page_screenshot(bridge, tab_id, data_url),
"selector": await test_selector_screenshot(bridge, tab_id, data_url),
"metadata": await test_screenshot_url_metadata(bridge, tab_id),
"timeout": await test_screenshot_timeout(bridge, tab_id, data_url),
"complex_site": await test_screenshot_complex_site(bridge, tab_id),
}
print("\n" + "=" * 70)
print("SUMMARY")
print("=" * 70)
for name, passed in results.items():
status = "✓ PASS" if passed else "✗ FAIL"
print(f" {status}: {name}")
passed_count = sum(1 for v in results.values() if v)
total = len(results)
print(f"\n {passed_count}/{total} tests passed")
await bridge.destroy_context(group_id)
print("\n✓ Context destroyed")
finally:
await bridge.stop()
print("✓ Bridge stopped")
if __name__ == "__main__":
asyncio.run(main())
+395 -208
View File
@@ -41,6 +41,10 @@ BRIDGE_PORT = 9229
# CDP wait_until values
VALID_WAIT_UNTIL = {"commit", "domcontentloaded", "load", "networkidle"}
# Last interaction highlight per tab_id: {x, y, w, h, label, kind}
# kind: "rect" (element) or "point" (coordinate)
_interaction_highlights: dict[int, dict] = {}
def _get_active_profile() -> str:
"""Get the current active profile from context variable."""
@@ -301,7 +305,9 @@ class BeelineBridge:
"Runtime.evaluate",
{"expression": "document.readyState", "returnByValue": True},
)
ready_state = eval_result.get("result", {}).get("result", {}).get("value", "")
ready_state = (
(eval_result or {}).get("result", {}).get("result", {}).get("value", "")
)
if wait_until == "domcontentloaded" and ready_state in ("interactive", "complete"):
break
@@ -333,8 +339,8 @@ class BeelineBridge:
return {
"ok": True,
"tabId": tab_id,
"url": url_result.get("result", {}).get("result", {}).get("value", ""),
"title": title_result.get("result", {}).get("result", {}).get("value", ""),
"url": (url_result or {}).get("result", {}).get("result", {}).get("value", ""),
"title": (title_result or {}).get("result", {}).get("result", {}).get("value", ""),
}
async def go_back(self, tab_id: int) -> dict:
@@ -352,7 +358,7 @@ class BeelineBridge:
return {
"ok": True,
"action": "back",
"url": result.get("result", {}).get("result", {}).get("value", ""),
"url": (result or {}).get("result", {}).get("result", {}).get("value", ""),
}
async def go_forward(self, tab_id: int) -> dict:
@@ -369,7 +375,7 @@ class BeelineBridge:
return {
"ok": True,
"action": "forward",
"url": result.get("result", {}).get("result", {}).get("value", ""),
"url": (result or {}).get("result", {}).get("result", {}).get("value", ""),
}
async def reload(self, tab_id: int) -> dict:
@@ -386,7 +392,7 @@ class BeelineBridge:
return {
"ok": True,
"action": "reload",
"url": result.get("result", {}).get("result", {}).get("value", ""),
"url": (result or {}).get("result", {}).get("result", {}).get("value", ""),
}
# ── Interaction ────────────────────────────────────────────────────────────
@@ -451,7 +457,7 @@ class BeelineBridge:
})();
"""
viewport_result = await self.evaluate(tab_id, viewport_script)
viewport = viewport_result.get("result", {}).get("value", {})
viewport = (viewport_result or {}).get("result") or {}
viewport_width = viewport.get("width", 1920)
viewport_height = viewport.get("height", 1080)
@@ -487,10 +493,13 @@ class BeelineBridge:
try:
result = await self.evaluate(tab_id, click_script)
value = result.get("result", {}).get("value")
value = (result or {}).get("result")
if isinstance(value, dict) and "error" not in value:
# JavaScript click succeeded
# JavaScript click succeeded — highlight element
rx = value.get("x", 0) - value.get("width", 0) / 2
ry = value.get("y", 0) - value.get("height", 0) / 2
await self.highlight_rect(tab_id, rx, ry, value.get("width", 0), value.get("height", 0), label=selector)
return {
"ok": True,
"action": "click",
@@ -522,7 +531,7 @@ class BeelineBridge:
}})();
"""
bounds_result = await self.evaluate(tab_id, bounds_script)
bounds_value = bounds_result.get("result", {}).get("value")
bounds_value = (bounds_result or {}).get("result")
if not bounds_value:
return {"ok": False, "error": f"Could not get element bounds: {selector}"}
@@ -587,6 +596,9 @@ class BeelineBridge:
except asyncio.TimeoutError:
pass # Continue even if timeout
w = bounds_value.get("width", 0)
h = bounds_value.get("height", 0)
await self.highlight_rect(tab_id, x - w / 2, y - h / 2, w, h, label=selector)
return {"ok": True, "action": "click", "selector": selector, "x": x, "y": y, "method": "cdp"}
except Exception as e:
@@ -595,6 +607,7 @@ class BeelineBridge:
async def click_coordinate(self, tab_id: int, x: float, y: float, button: str = "left") -> dict:
"""Click at specific coordinates."""
await self.cdp_attach(tab_id)
await self._try_enable_domain(tab_id, "DOM")
await self._try_enable_domain(tab_id, "Input")
button_map = {"left": "left", "right": "right", "middle": "middle"}
@@ -611,6 +624,7 @@ class BeelineBridge:
{"type": "mouseReleased", "x": x, "y": y, "button": cdp_button, "clickCount": 1},
)
await self.highlight_point(tab_id, x, y, label=f"click ({x},{y})")
return {"ok": True, "action": "click_coordinate", "x": x, "y": y}
async def type_text(
@@ -657,14 +671,14 @@ class BeelineBridge:
"""
focus_result = await self.evaluate(tab_id, focus_script)
success = focus_result.get("result", {}).get("value", False)
success = (focus_result or {}).get("result", False)
if not success:
# Element not found - wait and retry
deadline = asyncio.get_event_loop().time() + timeout_ms / 1000
while asyncio.get_event_loop().time() < deadline:
result = await self.evaluate(tab_id, focus_script)
if result.get("result", {}).get("value", False):
if result and (result or {}).get("result", False):
success = True
break
await asyncio.sleep(0.1)
@@ -691,6 +705,15 @@ class BeelineBridge:
if delay_ms > 0:
await asyncio.sleep(delay_ms / 1000)
# Highlight the element that was typed into
rect_result = await self.evaluate(
tab_id,
f"(function(){{const el=document.querySelector({json.dumps(selector)});if(!el)return null;"
f"const r=el.getBoundingClientRect();return{{x:r.left,y:r.top,w:r.width,h:r.height}};}})()",
)
rect = (rect_result or {}).get("result")
if rect:
await self.highlight_rect(tab_id, rect["x"], rect["y"], rect["w"], rect["h"], label=selector)
return {"ok": True, "action": "type", "selector": selector, "length": len(text)}
async def press_key(self, tab_id: int, key: str, selector: str | None = None) -> dict:
@@ -745,8 +768,44 @@ class BeelineBridge:
return {"ok": True, "action": "press", "key": key}
# Shared JS snippet: shadow-piercing querySelector via ">>>" separator
_SHADOW_QUERY_JS = """
function _shadowQuery(sel) {
const parts = sel.split('>>>').map(s => s.trim());
let node = document;
for (const part of parts) {
if (!node) return null;
node = (node.shadowRoot || node).querySelector(part);
}
return node;
}
"""
async def shadow_query(self, tab_id: int, selector: str) -> dict:
"""querySelector that pierces shadow roots using '>>>' separator.
Returns CSS-pixel getBoundingClientRect of the matched element.
Example: '#interop-outlet >>> #ember37 >>> p'
"""
await self.cdp_attach(tab_id)
script = (
f"{self._SHADOW_QUERY_JS}"
f"(function(){{"
f"const el=_shadowQuery({json.dumps(selector)});"
f"if(!el)return null;"
f"const r=el.getBoundingClientRect();"
f"return{{found:true,tag:el.tagName,x:r.left,y:r.top,w:r.width,h:r.height,"
f"cx:r.left+r.width/2,cy:r.top+r.height/2}};"
f"}})()"
)
result = await self.evaluate(tab_id, script)
rect = (result or {}).get("result")
if not rect:
return {"ok": False, "error": f"Element not found: {selector}"}
return {"ok": True, "selector": selector, "rect": rect}
async def hover(self, tab_id: int, selector: str, timeout_ms: int = 30000) -> dict:
"""Hover over an element.
"""Hover over an element. Supports '>>>' shadow-piercing selectors.
Uses JavaScript for bounds (more reliable than CDP getBoxModel).
"""
@@ -756,14 +815,17 @@ class BeelineBridge:
await self._try_enable_domain(tab_id, "Runtime")
# Use JavaScript to scroll into view and get bounds
# Supports ">>>" shadow-piercing selectors
if ">>>" in selector:
query_fn = f"{self._SHADOW_QUERY_JS} _shadowQuery({json.dumps(selector)})"
else:
query_fn = f"document.querySelector({json.dumps(selector)})"
hover_script = f"""
(function() {{
const el = document.querySelector({json.dumps(selector)});
const el = {query_fn};
if (!el) return null;
// Scroll into view
el.scrollIntoView({{ block: 'center' }});
const rect = el.getBoundingClientRect();
return {{
x: rect.x + rect.width / 2,
@@ -780,7 +842,7 @@ class BeelineBridge:
while asyncio.get_event_loop().time() < deadline:
result = await self.evaluate(tab_id, hover_script)
bounds_value = result.get("result", {}).get("value")
bounds_value = (result or {}).get("result")
if bounds_value:
break
await asyncio.sleep(0.1)
@@ -803,18 +865,138 @@ class BeelineBridge:
{"type": "mouseMoved", "x": x, "y": y},
)
w = bounds_value.get("width", 0)
h = bounds_value.get("height", 0)
await self.highlight_rect(tab_id, x - w / 2, y - h / 2, w, h, label=selector)
return {"ok": True, "action": "hover", "selector": selector, "x": x, "y": y}
async def hover_coordinate(self, tab_id: int, x: float, y: float) -> dict:
"""Hover at CSS pixel coordinates.
Works for overlay/virtual-rendered content where CSS selectors fail.
Dispatches a mouseMoved event at (x, y) without needing a DOM element.
"""
await self.cdp_attach(tab_id)
await self._try_enable_domain(tab_id, "DOM")
await self._try_enable_domain(tab_id, "Input")
await self._cdp(
tab_id,
"Input.dispatchMouseEvent",
{"type": "mouseMoved", "x": x, "y": y, "buttons": 0},
)
await self.highlight_point(tab_id, x, y, label=f"hover ({x},{y})")
return {"ok": True, "action": "hover_coordinate", "x": x, "y": y}
async def press_key_at(self, tab_id: int, x: float, y: float, key: str) -> dict:
"""Move mouse to (x, y) then dispatch a key event.
Useful for overlays where browser_press misses because document.activeElement
is in the regular DOM while the focused element is in virtual/overlay rendering.
Moving the mouse first routes the key event through the browser's native
hit-testing rather than the DOM focus chain.
"""
await self.cdp_attach(tab_id)
await self._try_enable_domain(tab_id, "DOM")
await self._try_enable_domain(tab_id, "Input")
# Move mouse into position so the browser's native focus follows
await self._cdp(
tab_id,
"Input.dispatchMouseEvent",
{"type": "mouseMoved", "x": x, "y": y, "buttons": 0},
)
key_map = {
"Enter": ("\r", "Enter"),
"Tab": ("\t", "Tab"),
"Escape": ("\x1b", "Escape"),
"Backspace": ("\b", "Backspace"),
"Delete": ("\x7f", "Delete"),
"ArrowUp": ("", "ArrowUp"),
"ArrowDown": ("", "ArrowDown"),
"ArrowLeft": ("", "ArrowLeft"),
"ArrowRight": ("", "ArrowRight"),
"Home": ("", "Home"),
"End": ("", "End"),
"Space": (" ", " "),
" ": (" ", " "),
}
text, key_name = key_map.get(key, (key, key))
await self._cdp(
tab_id,
"Input.dispatchKeyEvent",
{"type": "keyDown", "key": key_name, "text": text or None},
)
await self._cdp(
tab_id,
"Input.dispatchKeyEvent",
{"type": "keyUp", "key": key_name, "text": text or None},
)
await self.highlight_point(tab_id, x, y, label=f"{key} ({x},{y})")
return {"ok": True, "action": "press_at", "x": x, "y": y, "key": key}
async def highlight_rect(
self,
tab_id: int,
x: float,
y: float,
w: float,
h: float,
label: str = "",
color: dict | None = None,
) -> None:
"""Draw a CDP Overlay highlight box in the live browser window.
Visible in the next screenshot. Automatically cleared on the next
interaction or by calling clear_highlight().
"""
await self.cdp_attach(tab_id)
await self._try_enable_domain(tab_id, "Overlay")
fill = color or {"r": 59, "g": 130, "b": 246, "a": 0.35} # blue-500 @ 35%
outline = {"r": fill["r"], "g": fill["g"], "b": fill["b"], "a": 1.0}
await self._cdp(
tab_id,
"Overlay.highlightRect",
{
"x": int(x),
"y": int(y),
"width": max(1, int(w)),
"height": max(1, int(h)),
"color": fill,
"outlineColor": outline,
},
)
_interaction_highlights[tab_id] = {
"x": x, "y": y, "w": w, "h": h, "label": label, "kind": "rect",
}
async def highlight_point(self, tab_id: int, x: float, y: float, label: str = "") -> None:
"""Highlight a coordinate as a small crosshair box in the browser."""
r = 12 # half-size of the crosshair box in CSS px
await self.highlight_rect(
tab_id, x - r, y - r, r * 2, r * 2, label=label,
color={"r": 239, "g": 68, "b": 68, "a": 0.45}, # red-500 @ 45%
)
_interaction_highlights[tab_id] = {
"x": x, "y": y, "w": 0, "h": 0, "label": label, "kind": "point",
}
async def clear_highlight(self, tab_id: int) -> None:
"""Remove the CDP Overlay highlight from the browser."""
try:
await self._cdp(tab_id, "Overlay.hideHighlight")
except Exception:
pass
_interaction_highlights.pop(tab_id, None)
async def scroll(self, tab_id: int, direction: str = "down", amount: int = 500) -> dict:
"""Scroll the page.
Uses multiple methods for robustness:
1. Find and scroll the largest scrollable container (handles SPAs like LinkedIn)
2. Fallback to window scroll
3. Fallback to mouse wheel events via CDP
Uses JavaScript to find and scroll the appropriate container.
Handles SPAs like LinkedIn where content is in a nested scrollable div.
"""
await self.cdp_attach(tab_id)
delta_x = 0
delta_y = 0
if direction == "down":
@@ -826,156 +1008,70 @@ class BeelineBridge:
elif direction == "left":
delta_x = -amount
# Method 1: Find and scroll the largest scrollable container
# This handles SPAs like LinkedIn where content is in a nested scrollable div
smart_scroll_script = f"""
(function() {{
// Find the largest scrollable container
function findScrollableContainer() {{
const candidates = [];
# JavaScript scroll that finds the largest scrollable container
# NOTE: Do NOT wrap in IIFE - evaluate() already wraps scripts
scroll_script = f"""
// Find the largest scrollable container
const candidates = [];
const allElements = document.querySelectorAll('*');
// Check all elements with overflow scroll/auto
const allElements = document.querySelectorAll('*');
for (const el of allElements) {{
const style = getComputedStyle(el);
const overflow = style.overflow + style.overflowY;
for (const el of allElements) {{
const style = getComputedStyle(el);
const overflow = style.overflow + style.overflowY;
if (overflow.includes('scroll') || overflow.includes('auto')) {{
const rect = el.getBoundingClientRect();
// Must be visible and reasonably large
if (rect.width > 100 && rect.height > 100 &&
el.scrollHeight > el.clientHeight + 100) {{
candidates.push({{
el: el,
area: rect.width * rect.height,
scrollable: el.scrollHeight - el.clientHeight
}});
}}
}}
if (overflow.includes('scroll') || overflow.includes('auto')) {{
const rect = el.getBoundingClientRect();
if (rect.width > 100 && rect.height > 100 &&
el.scrollHeight > el.clientHeight + 100) {{
candidates.push({{el: el, area: rect.width * rect.height}});
}}
// Sort by area (largest first) and return best candidate
candidates.sort((a, b) => b.area - a.area);
return candidates.length > 0 ? candidates[0].el : null;
}}
}}
const container = findScrollableContainer();
candidates.sort((a, b) => b.area - a.area);
const container = candidates.length > 0 ? candidates[0].el : null;
if (container) {{
container.scrollBy({{
top: {delta_y},
left: {delta_x},
behavior: 'smooth'
}});
return {{
method: 'container-smooth',
success: true,
containerTag: container.tagName,
containerClass: container.className.substring(0, 50)
}};
}}
if (container) {{
container.scrollBy({{ top: {delta_y}, left: {delta_x}, behavior: 'smooth' }});
return {{
success: true,
method: 'container',
tag: container.tagName,
scrolled: true
}};
}}
// Fallback to window scroll
if ('scrollBehavior' in document.documentElement.style) {{
window.scrollBy({{
top: {delta_y},
left: {delta_x},
behavior: 'smooth'
}});
return {{ method: 'window-smooth', success: true }};
}}
window.scrollBy({delta_x}, {delta_y});
return {{ method: 'window-instant', success: true }};
}})();
// Fallback to window scroll
window.scrollBy({{ top: {delta_y}, left: {delta_x}, behavior: 'smooth' }});
return {{
success: true,
method: 'window',
tag: 'WINDOW',
scrolled: true
}};
"""
try:
result = await self.evaluate(tab_id, smart_scroll_script)
value = result.get("result", {})
if value and value.get("success"):
result = await asyncio.wait_for(
self.evaluate(tab_id, scroll_script),
timeout=5.0
)
value = (result or {}).get("result") or {}
if value.get("success"):
return {
"ok": True,
"action": "scroll",
"direction": direction,
"amount": amount,
"method": value.get("method", "js"),
"container": value.get("containerTag", "window")
"container": value.get("tag", "unknown")
}
except Exception as e:
logger.debug("Smart scroll script failed: %s", e)
# Method 2: Find scrollable container and use mouse wheel at its center
try:
# Find the largest scrollable container and its position
find_container_script = """
(function() {
const candidates = [];
const allElements = document.querySelectorAll('*');
for (const el of allElements) {
const style = getComputedStyle(el);
const overflow = style.overflow + style.overflowY;
if (overflow.includes('scroll') || overflow.includes('auto')) {
const rect = el.getBoundingClientRect();
if (rect.width > 100 && rect.height > 100 &&
el.scrollHeight > el.clientHeight + 100) {
candidates.push({
x: Math.round(rect.left + rect.width / 2),
y: Math.round(rect.top + rect.height / 2),
area: rect.width * rect.height,
tag: el.tagName
});
}
}
}
candidates.sort((a, b) => b.area - a.area);
return candidates.length > 0 ? candidates[0] : null;
})();
"""
container_result = await self._cdp(
tab_id,
"Runtime.evaluate",
{"expression": find_container_script, "returnByValue": True},
)
container_info = container_result.get("result", {}).get("value", {})
if container_info and isinstance(container_info, dict):
x = container_info.get("x", 400)
y = container_info.get("y", 300)
else:
# Fallback to viewport center
viewport_result = await self._cdp(
tab_id,
"Runtime.evaluate",
{
"expression": "({w: window.innerWidth, h: window.innerHeight})",
"returnByValue": True,
},
)
vp = viewport_result.get("result", {}).get("value", {})
x = vp.get("w", 800) // 2
y = vp.get("h", 600) // 2
return {"ok": False, "error": "scroll script returned failure"}
# Dispatch mouse wheel event at container center
await self._cdp(
tab_id,
"Input.dispatchMouseEvent",
{
"type": "mouseWheel",
"x": x,
"y": y,
"deltaX": -delta_x,
"deltaY": -delta_y,
},
)
return {
"ok": True,
"action": "scroll",
"direction": direction,
"amount": amount,
"method": "mouseWheel",
"target": f"({x}, {y})"
}
except asyncio.TimeoutError:
return {"ok": False, "error": "scroll timed out"}
except Exception as e:
logger.warning("Scroll failed: %s", e)
return {"ok": False, "error": str(e)}
@@ -1011,8 +1107,32 @@ class BeelineBridge:
await self.cdp_attach(tab_id)
await self._try_enable_domain(tab_id, "Runtime")
# Wrap in IIFE to allow return statements at top level
wrapped_script = f"(function() {{ {script} }})()"
stripped = script.strip()
# Already a complete IIFE — run as-is, no re-wrapping
is_iife = stripped.startswith("(function") and (
stripped.endswith("})()") or stripped.endswith("})();")
)
# Arrow-function IIFE: (() => { ... })()
is_arrow_iife = stripped.startswith("(()") and (
stripped.endswith("})()") or stripped.endswith("})();")
or stripped.endswith(")()") or stripped.endswith(")()")
)
if is_iife or is_arrow_iife:
# Already self-contained — just run it
wrapped_script = stripped
elif stripped.startswith("return "):
# Single return statement — wrap in IIFE
wrapped_script = f"(function() {{ {stripped} }})()"
elif "\n" in stripped or ";" in stripped:
# Multi-statement block — wrap without prepending return
# (caller should use explicit return if they want a value)
wrapped_script = f"(function() {{ {stripped} }})()"
else:
# Single expression — wrap with return to capture value
wrapped_script = f"(function() {{ return {stripped} }})()"
result = await self._cdp(
tab_id,
"Runtime.evaluate",
@@ -1023,10 +1143,10 @@ class BeelineBridge:
return {"ok": False, "error": "CDP returned no result"}
if "exceptionDetails" in result:
return {
"ok": False,
"error": result["exceptionDetails"].get("text", "Script error"),
}
ex = result["exceptionDetails"]
# Extract the actual exception message from the nested structure
ex_value = (ex.get("exception") or {}).get("description") or ex.get("text", "Script error")
return {"ok": False, "error": ex_value}
# The CDP response structure is {result: {type: ..., value: ...}}
# But our bridge returns just the inner result object
@@ -1051,15 +1171,16 @@ class BeelineBridge:
tab_id: The tab ID to snapshot
timeout_s: Maximum time to spend building snapshot (default 10s)
"""
async with asyncio.timeout(timeout_s):
await self.cdp_attach(tab_id)
await self._try_enable_domain(tab_id, "Accessibility")
await self._try_enable_domain(tab_id, "DOM")
await self._try_enable_domain(tab_id, "Runtime")
try:
async with asyncio.timeout(timeout_s):
await self.cdp_attach(tab_id)
await self._try_enable_domain(tab_id, "Accessibility")
await self._try_enable_domain(tab_id, "DOM")
await self._try_enable_domain(tab_id, "Runtime")
# Try accessibility tree first
result = await self._cdp(tab_id, "Accessibility.getFullAXTree")
nodes = result.get("nodes", [])
# Try accessibility tree first
result = await self._cdp(tab_id, "Accessibility.getFullAXTree")
nodes = result.get("nodes", [])
# Count non-ignored nodes
visible_count = sum(1 for n in nodes if not n.get("ignored", False))
@@ -1089,7 +1210,7 @@ class BeelineBridge:
"Runtime.evaluate",
{"expression": "window.location.href", "returnByValue": True},
)
url = url_result.get("result", {}).get("value", "")
url = (url_result or {}).get("result", {}).get("value", "")
return {
"ok": True,
@@ -1097,6 +1218,15 @@ class BeelineBridge:
"url": url,
"tree": snapshot,
}
except asyncio.TimeoutError:
logger.warning("Snapshot timed out after %ss", timeout_s)
return {"ok": False, "error": f"snapshot timed out after {timeout_s}s"}
except asyncio.CancelledError:
logger.warning("Snapshot cancelled (extension may have disconnected)")
return {"ok": False, "error": "snapshot cancelled - extension disconnected"}
except Exception as e:
logger.error("Snapshot failed: %s", e)
return {"ok": False, "error": str(e)}
async def _dom_snapshot(self, tab_id: int) -> dict:
"""Fallback: build snapshot from DOM tree with visibility info."""
@@ -1196,7 +1326,7 @@ class BeelineBridge:
"Runtime.evaluate",
{"expression": "window.location.href", "returnByValue": True},
)
url = url_result.get("result", {}).get("value", "")
url = (url_result or {}).get("result", {}).get("value", "")
return {
"ok": True,
@@ -1325,7 +1455,7 @@ class BeelineBridge:
"Runtime.evaluate",
{"expression": script, "returnByValue": True},
)
text = result.get("result", {}).get("result", {}).get("value")
text = (result or {}).get("result", {}).get("result", {}).get("value")
if text is not None:
return {"ok": True, "selector": selector, "text": text}
await asyncio.sleep(0.1)
@@ -1352,7 +1482,7 @@ class BeelineBridge:
"Runtime.evaluate",
{"expression": script, "returnByValue": True},
)
value = result.get("result", {}).get("result", {}).get("value")
value = (result or {}).get("result", {}).get("result", {}).get("value")
if value is not None:
return {"ok": True, "selector": selector, "attribute": attribute, "value": value}
await asyncio.sleep(0.1)
@@ -1360,49 +1490,106 @@ class BeelineBridge:
return {"ok": False, "error": f"Element not found: {selector}"}
async def screenshot(
self, tab_id: int, full_page: bool = False, selector: str | None = None
self, tab_id: int, full_page: bool = False, selector: str | None = None,
timeout_s: float = 30.0,
) -> dict:
"""Take a screenshot of the page or element.
Returns {"ok": True, "data": base64_string, "mimeType": "image/png"}.
"""
await self.cdp_attach(tab_id)
await self._cdp(tab_id, "Page.enable")
try:
async with asyncio.timeout(timeout_s):
await self.cdp_attach(tab_id)
await self._cdp(tab_id, "Page.enable")
params: dict[str, Any] = {"format": "png"}
if full_page:
# Get layout metrics for full page
metrics = await self._cdp(tab_id, "Page.getLayoutMetrics")
content_size = metrics.get("contentSize", {})
params["clip"] = {
"x": 0,
"y": 0,
"width": content_size.get("width", 1280),
"height": content_size.get("height", 720),
"scale": 1,
}
params: dict[str, Any] = {"format": "png"}
if selector:
# Clip to the element's bounding rect (viewport-relative)
rect_result = await self._cdp(
tab_id,
"Runtime.evaluate",
{
"expression": (
f"(function(){{"
f"const el=document.querySelector({json.dumps(selector)});"
f"if(!el)return null;"
f"const r=el.getBoundingClientRect();"
f"return{{x:r.left,y:r.top,width:r.width,height:r.height}};"
f"}})()"
),
"returnByValue": True,
},
)
rect = (
(rect_result or {}).get("result", {}).get("result", {}).get("value")
)
if rect and rect.get("width") and rect.get("height"):
params["clip"] = {
"x": rect["x"],
"y": rect["y"],
"width": rect["width"],
"height": rect["height"],
"scale": 1,
}
else:
return {"ok": False, "error": f"Selector not found: {selector}"}
elif full_page:
# Get layout metrics for full page
metrics = await self._cdp(tab_id, "Page.getLayoutMetrics")
content_size = metrics.get("contentSize", {})
params["clip"] = {
"x": 0,
"y": 0,
"width": content_size.get("width", 1280),
"height": content_size.get("height", 720),
"scale": 1,
}
result = await self._cdp(tab_id, "Page.captureScreenshot", params)
data = result.get("data")
result = await self._cdp(tab_id, "Page.captureScreenshot", params)
data = result.get("data")
if not data:
return {"ok": False, "error": "Screenshot failed"}
if not data:
return {"ok": False, "error": "Screenshot failed"}
# Get URL for metadata
url_result = await self._cdp(
tab_id,
"Runtime.evaluate",
{"expression": "window.location.href", "returnByValue": True},
)
url = url_result.get("result", {}).get("result", {}).get("value", "")
# Get URL and viewport metadata in one evaluate call
meta_result = await self._cdp(
tab_id,
"Runtime.evaluate",
{
"expression": (
"(function(){"
"return{"
"url:window.location.href,"
"dpr:window.devicePixelRatio,"
"cssWidth:window.innerWidth,"
"cssHeight:window.innerHeight"
"};"
"})()"
),
"returnByValue": True,
},
)
meta = (meta_result or {}).get("result", {}).get("result", {}).get("value") or {}
return {
"ok": True,
"tabId": tab_id,
"url": url,
"data": data,
"mimeType": "image/png",
}
return {
"ok": True,
"tabId": tab_id,
"url": meta.get("url", ""),
"devicePixelRatio": meta.get("dpr", 1.0),
"cssWidth": meta.get("cssWidth", 0),
"cssHeight": meta.get("cssHeight", 0),
"data": data,
"mimeType": "image/png",
}
except asyncio.TimeoutError:
logger.warning("Screenshot timed out after %ss", timeout_s)
return {"ok": False, "error": f"screenshot timed out after {timeout_s}s"}
except asyncio.CancelledError:
logger.warning("Screenshot cancelled (extension may have disconnected)")
return {"ok": False, "error": "screenshot cancelled - extension disconnected"}
except Exception as e:
logger.error("Screenshot failed: %s", e)
return {"ok": False, "error": str(e)}
async def wait_for_selector(self, tab_id: int, selector: str, timeout_ms: int = 30000) -> dict:
"""Wait for an element to appear."""
@@ -1421,7 +1608,7 @@ class BeelineBridge:
"Runtime.evaluate",
{"expression": script, "returnByValue": True},
)
found = result.get("result", {}).get("result", {}).get("value", False)
found = (result or {}).get("result", {}).get("result", {}).get("value", False)
if found:
return {"ok": True, "selector": selector}
await asyncio.sleep(0.1)
@@ -1445,7 +1632,7 @@ class BeelineBridge:
"Runtime.evaluate",
{"expression": script, "returnByValue": True},
)
found = result.get("result", {}).get("result", {}).get("value", False)
found = (result or {}).get("result", {}).get("result", {}).get("value", False)
if found:
return {"ok": True, "text": text}
await asyncio.sleep(0.1)
+287 -7
View File
@@ -7,6 +7,7 @@ All operations go through the Beeline extension via CDP - no Playwright required
from __future__ import annotations
import base64
import io
import json
import logging
import time
@@ -21,6 +22,98 @@ from .tabs import _get_context
logger = logging.getLogger(__name__)
# Target width for normalized screenshots (px in the delivered image)
_SCREENSHOT_WIDTH = 600
# Maps tab_id -> physical scale: image_coord × scale = physical pixels (for CDP Input events)
_screenshot_scales: dict[int, float] = {}
# Maps tab_id -> CSS scale: image_coord × scale = CSS pixels (for DOM APIs / getBoundingClientRect)
_screenshot_css_scales: dict[int, float] = {}
def _resize_and_annotate(
data: str,
css_width: int,
dpr: float = 1.0,
highlights: list[dict] | None = None,
width: int = _SCREENSHOT_WIDTH,
) -> tuple[str, float, float]:
"""Resize a base64 PNG to _SCREENSHOT_WIDTH wide, annotate highlights.
Returns (new_b64, physical_scale, css_scale) where:
physical_scale = physical_px_per_image_px (multiply image coords physical px)
css_scale = css_px_per_image_px (multiply image coords CSS px for DOM APIs)
Highlights have x,y,w,h in CSS pixels (what getBoundingClientRect returns,
and what CDP Input.dispatchMouseEvent accepts).
Falls back to original data if Pillow unavailable or resize fails.
"""
try:
from PIL import Image, ImageDraw, ImageFont
raw = base64.b64decode(data)
img = Image.open(io.BytesIO(raw)).convert("RGBA")
orig_w, orig_h = img.size
new_w = width
new_h = round(orig_h * new_w / orig_w)
img = img.resize((new_w, new_h), Image.LANCZOS)
# Physical scale: how many native/physical pixels per image pixel
physical_scale = orig_w / width
# CSS scale: physical_scale / DPR
css_scale = (css_width / width) if css_width else (physical_scale / max(dpr, 1.0))
if highlights:
overlay = Image.new("RGBA", img.size, (0, 0, 0, 0))
draw = ImageDraw.Draw(overlay)
try:
font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", 11)
except Exception:
font = ImageFont.load_default()
for h in highlights:
kind = h.get("kind", "rect")
label = h.get("label", "")
# Highlights are in CSS px → convert to image px
ix = h["x"] / css_scale
iy = h["y"] / css_scale
iw = h.get("w", 0) / css_scale
ih = h.get("h", 0) / css_scale
if kind == "point":
cx, cy, r = ix, iy, 10
draw.ellipse([(cx - r, cy - r), (cx + r, cy + r)],
fill=(239, 68, 68, 100), outline=(239, 68, 68, 220), width=2)
draw.line([(cx - r - 4, cy), (cx + r + 4, cy)], fill=(239, 68, 68, 220), width=2)
draw.line([(cx, cy - r - 4), (cx, cy + r + 4)], fill=(239, 68, 68, 220), width=2)
else:
draw.rectangle([(ix, iy), (ix + iw, iy + ih)],
fill=(59, 130, 246, 70), outline=(59, 130, 246, 220), width=2)
# Label: show image pixel position so user knows where to look
img_coords = f"img:({round(ix)},{round(iy)})"
display_label = f"{img_coords} {label}" if label else img_coords
lx, ly = ix, max(2, iy - 16)
lx = max(2, min(lx, width - 120))
bbox = draw.textbbox((lx, ly), display_label, font=font)
pad = 3
draw.rectangle(
[(bbox[0] - pad, bbox[1] - pad), (bbox[2] + pad, bbox[3] + pad)],
fill=(59, 130, 246, 200),
)
draw.text((lx, ly), display_label, fill=(255, 255, 255, 255), font=font)
img = Image.alpha_composite(img, overlay).convert("RGB")
else:
img = img.convert("RGB")
buf = io.BytesIO()
img.save(buf, format="PNG", optimize=True)
return base64.b64encode(buf.getvalue()).decode(), round(physical_scale, 4), round(css_scale, 4)
except Exception:
logger.debug("Screenshot resize/annotate failed, using original", exc_info=True)
return data, 1.0, 1.0
def register_inspection_tools(mcp: FastMCP) -> None:
"""Register browser inspection tools."""
@@ -32,19 +125,25 @@ def register_inspection_tools(mcp: FastMCP) -> None:
full_page: bool = False,
selector: str | None = None,
image_type: Literal["png", "jpeg"] = "png",
annotate: bool = True,
width: int = _SCREENSHOT_WIDTH,
) -> list:
"""
Take a screenshot of the current page.
Returns the screenshot as an image the LLM can see, alongside
text metadata (URL, size, etc.).
Returns a normalized image alongside text metadata (URL, size, scale
factors, etc.). Automatically annotates the last interaction (click,
hover, type) with a bounding box overlay.
Args:
tab_id: Chrome tab ID (default: active tab)
profile: Browser profile name (default: "default")
full_page: Capture full scrollable page (default: False)
selector: CSS selector to screenshot element (optional - not supported)
selector: CSS selector to screenshot a specific element (optional)
image_type: Image format - png or jpeg (default: png)
annotate: Draw bounding box of last interaction on image (default: True)
width: Output image width in pixels (default: 600). Use 800+ for fine
text, 400 for quick layout checks.
Returns:
List of content blocks: text metadata + image
@@ -91,10 +190,9 @@ def register_inspection_tools(mcp: FastMCP) -> None:
return result
try:
if selector:
logger.warning("Element screenshots not supported, capturing full page")
screenshot_result = await bridge.screenshot(target_tab, full_page=full_page)
screenshot_result = await bridge.screenshot(
target_tab, full_page=full_page, selector=selector
)
if not screenshot_result.get("ok"):
log_tool_call(
@@ -107,6 +205,21 @@ def register_inspection_tools(mcp: FastMCP) -> None:
data = screenshot_result.get("data")
mime_type = screenshot_result.get("mimeType", "image/png")
css_width = screenshot_result.get("cssWidth", 0)
dpr = screenshot_result.get("devicePixelRatio", 1.0)
# Collect highlights: last interaction from bridge + CDP already drew in browser
from ..bridge import _interaction_highlights
highlights: list[dict] | None = None
if annotate and target_tab in _interaction_highlights:
highlights = [_interaction_highlights[target_tab]]
# Normalize to 800px wide and annotate
data, physical_scale, css_scale = _resize_and_annotate(
data, css_width, dpr=dpr, highlights=highlights, width=width
)
_screenshot_scales[target_tab] = physical_scale
_screenshot_css_scales[target_tab] = css_scale
meta = json.dumps(
{
@@ -115,7 +228,16 @@ def register_inspection_tools(mcp: FastMCP) -> None:
"url": screenshot_result.get("url", ""),
"imageType": mime_type.split("/")[-1],
"size": len(base64.b64decode(data)) if data else 0,
"imageWidth": width,
"fullPage": full_page,
"devicePixelRatio": dpr,
"physicalScale": physical_scale,
"cssScale": css_scale,
"annotated": bool(highlights),
"scaleHint": (
f"image_coord × {physical_scale} = physical px (for browser_click_coordinate/hover_coordinate); "
f"image_coord × {css_scale} = CSS px (for getBoundingClientRect)"
),
}
)
@@ -126,6 +248,8 @@ def register_inspection_tools(mcp: FastMCP) -> None:
"ok": True,
"size": len(base64.b64decode(data)) if data else 0,
"url": screenshot_result.get("url", ""),
"physicalScale": physical_scale,
"cssScale": css_scale,
},
duration_ms=(time.perf_counter() - start) * 1000,
)
@@ -143,6 +267,162 @@ def register_inspection_tools(mcp: FastMCP) -> None:
)
return [TextContent(type="text", text=json.dumps({"ok": False, "error": str(e)}))]
@mcp.tool()
def browser_coords(
x: float,
y: float,
tab_id: int | None = None,
profile: str | None = None,
) -> dict:
"""
Convert screenshot image coordinates to browser coordinates.
After browser_screenshot returns an 800px-wide image, use this to translate
pixel positions you see in the image into the two coordinate spaces used by
browser tools:
- physical_x/y use with browser_click_coordinate, browser_hover_coordinate,
browser_press_at (CDP Input events work in physical pixels)
- css_x/y use with getBoundingClientRect comparisons and DOM APIs
Args:
x: X pixel position in the 800px screenshot image
y: Y pixel position in the 800px screenshot image
tab_id: Chrome tab ID (default: active tab for profile)
profile: Browser profile name (default: "default")
Returns:
Dict with physical_x, physical_y, css_x, css_y, and scale factors
"""
ctx = _get_context(profile)
target_tab = tab_id or (ctx.get("activeTabId") if ctx else None)
physical_scale = _screenshot_scales.get(target_tab, 1.0) if target_tab else 1.0
# css_scale stored in second slot via _screenshot_css_scales
css_scale = _screenshot_css_scales.get(target_tab, physical_scale) if target_tab else physical_scale
return {
"ok": True,
"physical_x": round(x * physical_scale, 1),
"physical_y": round(y * physical_scale, 1),
"css_x": round(x * css_scale, 1),
"css_y": round(y * css_scale, 1),
"physicalScale": physical_scale,
"cssScale": css_scale,
"tabId": target_tab,
"note": "Use physical_x/y with browser_click_coordinate, browser_hover_coordinate, browser_press_at. Use css_x/y with getBoundingClientRect and DOM APIs.",
}
@mcp.tool()
async def browser_shadow_query(
selector: str,
tab_id: int | None = None,
profile: str | None = None,
) -> dict:
"""
Shadow-piercing querySelector using '>>>' syntax.
Traverses shadow roots to find elements inside closed/open shadow DOM,
overlays, and virtual-rendered components (e.g. LinkedIn's #interop-outlet).
Returns getBoundingClientRect in both CSS and physical pixels.
Args:
selector: CSS selectors joined by ' >>> ' to pierce shadow roots.
Example: '#interop-outlet >>> #ember37 >>> p'
tab_id: Chrome tab ID (default: active tab)
profile: Browser profile name (default: "default")
Returns:
Dict with rect (CSS px) and physical rect (CSS px × DPR) of the element
"""
bridge = get_bridge()
if not bridge or not bridge.is_connected:
return {"ok": False, "error": "Browser extension not connected"}
ctx = _get_context(profile)
if not ctx:
return {"ok": False, "error": "Browser not started"}
target_tab = tab_id or ctx.get("activeTabId")
if target_tab is None:
return {"ok": False, "error": "No active tab"}
result = await bridge.shadow_query(target_tab, selector)
if not result.get("ok"):
return result
rect = result["rect"]
physical_scale = _screenshot_scales.get(target_tab, 1.0)
css_scale = _screenshot_css_scales.get(target_tab, 1.0)
dpr = physical_scale / css_scale if css_scale else 1.0
return {
"ok": True,
"selector": selector,
"tag": rect.get("tag"),
"css": {"x": rect["x"], "y": rect["y"], "w": rect["w"], "h": rect["h"],
"cx": rect["cx"], "cy": rect["cy"]},
"physical": {
"x": round(rect["x"] * dpr, 1), "y": round(rect["y"] * dpr, 1),
"w": round(rect["w"] * dpr, 1), "h": round(rect["h"] * dpr, 1),
"cx": round(rect["cx"] * dpr, 1), "cy": round(rect["cy"] * dpr, 1),
},
"note": "Use physical.cx/cy with browser_click_coordinate or browser_hover_coordinate. Use css.cx/cy with getBoundingClientRect comparisons.",
}
@mcp.tool()
async def browser_get_rect(
selector: str,
tab_id: int | None = None,
profile: str | None = None,
) -> dict:
"""
Get the bounding rect of an element by CSS selector.
Supports '>>>' shadow-piercing selectors for overlay/shadow DOM content.
Returns coordinates in both CSS pixels (for DOM APIs) and physical pixels
(for browser_click_coordinate, browser_hover_coordinate, browser_press_at).
Args:
selector: CSS selector, optionally with ' >>> ' to pierce shadow roots.
Example: 'button.submit' or '#shadow-host >>> button'
tab_id: Chrome tab ID (default: active tab)
profile: Browser profile name (default: "default")
Returns:
Dict with css and physical bounding rects
"""
bridge = get_bridge()
if not bridge or not bridge.is_connected:
return {"ok": False, "error": "Browser extension not connected"}
ctx = _get_context(profile)
if not ctx:
return {"ok": False, "error": "Browser not started"}
target_tab = tab_id or ctx.get("activeTabId")
if target_tab is None:
return {"ok": False, "error": "No active tab"}
result = await bridge.shadow_query(target_tab, selector)
if not result.get("ok"):
return result
rect = result["rect"]
physical_scale = _screenshot_scales.get(target_tab, 1.0)
css_scale = _screenshot_css_scales.get(target_tab, 1.0)
dpr = physical_scale / css_scale if css_scale else 1.0
return {
"ok": True,
"selector": selector,
"tag": rect.get("tag"),
"css": {"x": rect["x"], "y": rect["y"], "w": rect["w"], "h": rect["h"],
"cx": rect["cx"], "cy": rect["cy"]},
"physical": {
"x": round(rect["x"] * dpr, 1), "y": round(rect["y"] * dpr, 1),
"w": round(rect["w"] * dpr, 1), "h": round(rect["h"] * dpr, 1),
"cx": round(rect["cx"] * dpr, 1), "cy": round(rect["cy"] * dpr, 1),
},
"note": "Use physical.cx/cy with browser_click_coordinate or browser_hover_coordinate.",
}
@mcp.tool()
async def browser_snapshot(
tab_id: int | None = None,
+129
View File
@@ -370,6 +370,135 @@ def register_interaction_tools(mcp: FastMCP) -> None:
)
return result
@mcp.tool()
async def browser_hover_coordinate(
x: float,
y: float,
tab_id: int | None = None,
profile: str | None = None,
) -> dict:
"""
Hover at CSS pixel coordinates without needing a CSS selector.
Use this instead of browser_hover when the element is in an overlay,
shadow DOM, or virtual-rendered component that isn't in the regular DOM.
Pair with browser_coords to convert screenshot image positions to CSS pixels.
Args:
x: CSS pixel X coordinate
y: CSS pixel Y coordinate
tab_id: Chrome tab ID (default: active tab)
profile: Browser profile name (default: "default")
Returns:
Dict with hover result
"""
start = time.perf_counter()
params = {"x": x, "y": y, "tab_id": tab_id, "profile": profile}
bridge = get_bridge()
if not bridge or not bridge.is_connected:
result = {"ok": False, "error": "Browser extension not connected"}
log_tool_call("browser_hover_coordinate", params, result=result)
return result
ctx = _get_context(profile)
if not ctx:
result = {"ok": False, "error": "Browser not started. Call browser_start first."}
log_tool_call("browser_hover_coordinate", params, result=result)
return result
target_tab = tab_id or ctx.get("activeTabId")
if target_tab is None:
result = {"ok": False, "error": "No active tab"}
log_tool_call("browser_hover_coordinate", params, result=result)
return result
try:
hover_result = await bridge.hover_coordinate(target_tab, x, y)
log_tool_call(
"browser_hover_coordinate",
params,
result=hover_result,
duration_ms=(time.perf_counter() - start) * 1000,
)
return hover_result
except Exception as e:
result = {"ok": False, "error": str(e)}
log_tool_call(
"browser_hover_coordinate",
params,
error=e,
duration_ms=(time.perf_counter() - start) * 1000,
)
return result
@mcp.tool()
async def browser_press_at(
x: float,
y: float,
key: str,
tab_id: int | None = None,
profile: str | None = None,
) -> dict:
"""
Move mouse to CSS pixel coordinates then press a key.
Use this instead of browser_press when the focused element is in an overlay
or virtual-rendered component. Moving the mouse first routes the key event
through native browser hit-testing instead of the DOM focus chain.
Pair with browser_coords to convert screenshot image positions to CSS pixels.
Args:
x: CSS pixel X coordinate to position mouse
y: CSS pixel Y coordinate to position mouse
key: Key to press (e.g. 'Enter', 'Space', 'Escape', 'ArrowDown')
tab_id: Chrome tab ID (default: active tab)
profile: Browser profile name (default: "default")
Returns:
Dict with press result
"""
start = time.perf_counter()
params = {"x": x, "y": y, "key": key, "tab_id": tab_id, "profile": profile}
bridge = get_bridge()
if not bridge or not bridge.is_connected:
result = {"ok": False, "error": "Browser extension not connected"}
log_tool_call("browser_press_at", params, result=result)
return result
ctx = _get_context(profile)
if not ctx:
result = {"ok": False, "error": "Browser not started. Call browser_start first."}
log_tool_call("browser_press_at", params, result=result)
return result
target_tab = tab_id or ctx.get("activeTabId")
if target_tab is None:
result = {"ok": False, "error": "No active tab"}
log_tool_call("browser_press_at", params, result=result)
return result
try:
press_result = await bridge.press_key_at(target_tab, x, y, key)
log_tool_call(
"browser_press_at",
params,
result=press_result,
duration_ms=(time.perf_counter() - start) * 1000,
)
return press_result
except Exception as e:
result = {"ok": False, "error": str(e)}
log_tool_call(
"browser_press_at",
params,
error=e,
duration_ms=(time.perf_counter() - start) * 1000,
)
return result
@mcp.tool()
async def browser_select(
selector: str,