feat: browser automated test

This commit is contained in:
Timothy
2026-04-03 07:31:10 -07:00
parent e0cd16b92b
commit 95f1d1abcd
25 changed files with 5095 additions and 458 deletions
+614 -116
View File
@@ -78,8 +78,20 @@ class BeelineBridge:
return
try:
self._server = await websockets.serve(self._handle_connection, "127.0.0.1", port)
logger.info("Beeline bridge listening on ws://127.0.0.1:%d/bridge", port)
# Suppress noisy websockets logging for invalid upgrade attempts
# by providing a null logger
import logging
null_logger = logging.getLogger("websockets.null")
null_logger.setLevel(logging.CRITICAL)
null_logger.addHandler(logging.NullHandler())
self._server = await websockets.serve(
self._handle_connection,
"127.0.0.1",
port,
logger=null_logger,
)
logger.info("Beeline bridge listening on ws://127.0.0.1:%d", port)
except OSError as e:
logger.warning("Beeline bridge could not start on port %d: %s", port, e)
@@ -170,6 +182,21 @@ class BeelineBridge:
log_cdp_command(tab_id, method, params, error=str(e), duration_ms=duration_ms)
raise
async def _try_enable_domain(self, tab_id: int, domain: str) -> None:
"""Try to enable a CDP domain, ignoring errors if not available.
Some domains (like Input) may not be available on certain page types
(e.g., chrome:// URLs, extension pages, or restricted sites).
"""
try:
await self._cdp(tab_id, f"{domain}.enable")
except RuntimeError as e:
# Log but don't fail - domain may not be available on all pages
if "wasn't found" in str(e) or "not found" in str(e).lower():
logger.debug("CDP domain %s.enable not available for tab %s", domain, tab_id)
else:
raise
# ── Context (Tab Group) Management ─────────────────────────────────────────
async def create_context(self, agent_id: str) -> dict:
@@ -374,12 +401,15 @@ class BeelineBridge:
) -> dict:
"""Click an element by selector.
Uses DOM.getDocument + DOM.querySelector to find the element,
then DOM.getBoxModel to get coordinates, then Input.dispatchMouseEvent.
Uses multiple fallback methods for robustness:
1. CDP mouse events with JavaScript bounds
2. JavaScript click() as fallback
Inspired by browser-use's robust click implementation.
"""
await self.cdp_attach(tab_id)
await self._cdp(tab_id, "DOM.enable")
await self._cdp(tab_id, "Input.enable")
await self._try_enable_domain(tab_id, "DOM")
await self._try_enable_domain(tab_id, "Input")
# Get document and find element
doc = await self._cdp(tab_id, "DOM.getDocument")
@@ -400,56 +430,172 @@ class BeelineBridge:
if not node_id:
return {"ok": False, "error": f"Element not found: {selector}"}
# Get box model for coordinates
box = await self._cdp(tab_id, "DOM.getBoxModel", {"nodeId": node_id})
content = box.get("content", [])
if len(content) < 4:
# Scroll into view FIRST to ensure element is rendered
try:
await self._cdp(
tab_id,
"DOM.scrollIntoViewIfNeeded",
{"nodeId": node_id},
)
await asyncio.sleep(0.05) # Wait for scroll to complete
except Exception:
pass # Best effort - continue even if scroll fails
# Get viewport dimensions for bounds checking
viewport_script = """
(function() {
return {
width: window.innerWidth,
height: window.innerHeight
};
})();
"""
viewport_result = await self.evaluate(tab_id, viewport_script)
viewport = viewport_result.get("result", {}).get("value", {})
viewport_width = viewport.get("width", 1920)
viewport_height = viewport.get("height", 1080)
# Method 1: Use JavaScript to get element bounds and click
# This is more reliable than CDP for complex layouts
click_script = f"""
(function() {{
const el = document.querySelector({json.dumps(selector)});
if (!el) return {{ error: 'Element not found' }};
// Check if element is visible
const rect = el.getBoundingClientRect();
if (rect.width === 0 || rect.height === 0) {{
return {{ error: 'Element has zero dimensions' }};
}}
// Check if element is within viewport
if (rect.bottom < 0 || rect.top > {viewport_height} ||
rect.right < 0 || rect.left > {viewport_width}) {{
return {{ error: 'Element not in viewport' }};
}}
// Get center for metadata
const x = rect.x + rect.width / 2;
const y = rect.y + rect.height / 2;
// Perform the click
el.click();
return {{ x: x, y: y, width: rect.width, height: rect.height }};
}})();
"""
try:
result = await self.evaluate(tab_id, click_script)
value = result.get("result", {}).get("value")
if isinstance(value, dict) and "error" not in value:
# JavaScript click succeeded
return {
"ok": True,
"action": "click",
"selector": selector,
"x": value.get("x", 0),
"y": value.get("y", 0),
"method": "javascript"
}
# If JavaScript click failed, try CDP approach
if isinstance(value, dict) and value.get("error"):
logger.debug("JS click failed: %s, trying CDP", value["error"])
except Exception as e:
logger.debug("JS click exception: %s, trying CDP", e)
# Method 2: CDP mouse events (fallback)
# Get element bounds via JavaScript (more reliable than CDP getBoxModel)
bounds_script = f"""
(function() {{
const el = document.querySelector({json.dumps(selector)});
if (!el) return null;
const rect = el.getBoundingClientRect();
return {{
x: rect.x + rect.width / 2,
y: rect.y + rect.height / 2,
width: rect.width,
height: rect.height
}};
}})();
"""
bounds_result = await self.evaluate(tab_id, bounds_script)
bounds_value = bounds_result.get("result", {}).get("value")
if not bounds_value:
return {"ok": False, "error": f"Could not get element bounds: {selector}"}
# Calculate center of element (content quad is [x1,y1, x2,y2, x3,y3, x4,y4])
x = (content[0] + content[2] + content[4] + content[6]) / 4
y = (content[1] + content[3] + content[5] + content[7]) / 4
x = bounds_value.get("x", 0)
y = bounds_value.get("y", 0)
# Scroll into view first
await self._cdp(
tab_id,
"DOM.scrollIntoViewIfNeeded",
{"nodeId": node_id},
)
# Clamp coordinates to viewport bounds
x = max(0, min(viewport_width - 1, x))
y = max(0, min(viewport_height - 1, y))
# Dispatch mouse events
# Dispatch mouse events with proper timing
button_map = {"left": "left", "right": "right", "middle": "middle"}
cdp_button = button_map.get(button, "left")
await self._cdp(
tab_id,
"Input.dispatchMouseEvent",
{
"type": "mousePressed",
"x": x,
"y": y,
"button": cdp_button,
"clickCount": click_count,
},
)
await self._cdp(
tab_id,
"Input.dispatchMouseEvent",
{
"type": "mouseReleased",
"x": x,
"y": y,
"button": cdp_button,
"clickCount": click_count,
},
)
try:
# Move mouse to element first
await self._cdp(
tab_id,
"Input.dispatchMouseEvent",
{"type": "mouseMoved", "x": x, "y": y},
)
await asyncio.sleep(0.05)
return {"ok": True, "action": "click", "selector": selector, "x": x, "y": y}
# Mouse down
try:
await asyncio.wait_for(
self._cdp(
tab_id,
"Input.dispatchMouseEvent",
{
"type": "mousePressed",
"x": x,
"y": y,
"button": cdp_button,
"clickCount": click_count,
},
),
timeout=1.0,
)
except asyncio.TimeoutError:
pass # Continue even if timeout
await asyncio.sleep(0.08)
# Mouse up
try:
await asyncio.wait_for(
self._cdp(
tab_id,
"Input.dispatchMouseEvent",
{
"type": "mouseReleased",
"x": x,
"y": y,
"button": cdp_button,
"clickCount": click_count,
},
),
timeout=3.0,
)
except asyncio.TimeoutError:
pass # Continue even if timeout
return {"ok": True, "action": "click", "selector": selector, "x": x, "y": y, "method": "cdp"}
except Exception as e:
return {"ok": False, "error": f"Click failed: {e}"}
async def click_coordinate(self, tab_id: int, x: float, y: float, button: str = "left") -> dict:
"""Click at specific coordinates."""
await self.cdp_attach(tab_id)
await self._cdp(tab_id, "Input.enable")
await self._try_enable_domain(tab_id, "Input")
button_map = {"left": "left", "right": "right", "middle": "middle"}
cdp_button = button_map.get(button, "left")
@@ -476,44 +622,59 @@ class BeelineBridge:
delay_ms: int = 0,
timeout_ms: int = 30000,
) -> dict:
"""Type text into an element."""
"""Type text into an element.
Uses JavaScript focus for reliability, then CDP key events.
"""
await self.cdp_attach(tab_id)
await self._cdp(tab_id, "DOM.enable")
await self._cdp(tab_id, "Input.enable")
await self._try_enable_domain(tab_id, "DOM")
await self._try_enable_domain(tab_id, "Input")
await self._try_enable_domain(tab_id, "Runtime")
# Get document and find element
doc = await self._cdp(tab_id, "DOM.getDocument")
root_id = doc.get("root", {}).get("nodeId")
# First, scroll into view and focus via JavaScript (more reliable than CDP)
focus_script = f"""
(function() {{
const el = document.querySelector({json.dumps(selector)});
if (!el) return false;
deadline = asyncio.get_event_loop().time() + timeout_ms / 1000
node_id = None
while asyncio.get_event_loop().time() < deadline:
result = await self._cdp(
tab_id, "DOM.querySelector", {"nodeId": root_id, "selector": selector}
)
node_id = result.get("nodeId")
if node_id:
break
await asyncio.sleep(0.1)
// Scroll into view
el.scrollIntoView({{ block: 'center' }});
if not node_id:
return {"ok": False, "error": f"Element not found: {selector}"}
// Focus the element
el.focus();
# Focus the element
await self._cdp(tab_id, "DOM.focus", {"nodeId": node_id})
// Clear if requested
if ({str(clear_first).lower()}) {{
if (el.value !== undefined) {{
el.value = '';
}} else if (el.isContentEditable) {{
el.textContent = '';
}}
}}
# Clear if requested
if clear_first:
await self._cdp(
tab_id,
"Runtime.evaluate",
{
"expression": f"document.querySelector({json.dumps(selector)}).value = ''",
"returnByValue": True,
},
)
return true;
}})();
"""
# Type each character
focus_result = await self.evaluate(tab_id, focus_script)
success = focus_result.get("result", {}).get("value", False)
if not success:
# Element not found - wait and retry
deadline = asyncio.get_event_loop().time() + timeout_ms / 1000
while asyncio.get_event_loop().time() < deadline:
result = await self.evaluate(tab_id, focus_script)
if result.get("result", {}).get("value", False):
success = True
break
await asyncio.sleep(0.1)
if not success:
return {"ok": False, "error": f"Element not found: {selector}"}
await asyncio.sleep(0.05) # Wait for focus to take effect
# Type each character using CDP key events
for char in text:
# Dispatch key down
await self._cdp(
@@ -540,7 +701,7 @@ class BeelineBridge:
selector: Optional selector to focus first
"""
await self.cdp_attach(tab_id)
await self._cdp(tab_id, "Input.enable")
await self._try_enable_domain(tab_id, "Input")
if selector:
doc = await self._cdp(tab_id, "DOM.getDocument")
@@ -585,43 +746,73 @@ class BeelineBridge:
return {"ok": True, "action": "press", "key": key}
async def hover(self, tab_id: int, selector: str, timeout_ms: int = 30000) -> dict:
"""Hover over an element."""
"""Hover over an element.
Uses JavaScript for bounds (more reliable than CDP getBoxModel).
"""
await self.cdp_attach(tab_id)
await self._cdp(tab_id, "DOM.enable")
await self._cdp(tab_id, "Input.enable")
await self._try_enable_domain(tab_id, "DOM")
await self._try_enable_domain(tab_id, "Input")
await self._try_enable_domain(tab_id, "Runtime")
doc = await self._cdp(tab_id, "DOM.getDocument")
root_id = doc.get("root", {}).get("nodeId")
# Use JavaScript to scroll into view and get bounds
hover_script = f"""
(function() {{
const el = document.querySelector({json.dumps(selector)});
if (!el) return null;
// Scroll into view
el.scrollIntoView({{ block: 'center' }});
const rect = el.getBoundingClientRect();
return {{
x: rect.x + rect.width / 2,
y: rect.y + rect.height / 2,
width: rect.width,
height: rect.height
}};
}})();
"""
# Wait for element and get bounds
deadline = asyncio.get_event_loop().time() + timeout_ms / 1000
node_id = None
bounds_value = None
while asyncio.get_event_loop().time() < deadline:
result = await self._cdp(
tab_id, "DOM.querySelector", {"nodeId": root_id, "selector": selector}
)
node_id = result.get("nodeId")
if node_id:
result = await self.evaluate(tab_id, hover_script)
bounds_value = result.get("result", {}).get("value")
if bounds_value:
break
await asyncio.sleep(0.1)
if not node_id:
if not bounds_value:
return {"ok": False, "error": f"Element not found: {selector}"}
box = await self._cdp(tab_id, "DOM.getBoxModel", {"nodeId": node_id})
content = box.get("content", [])
x = (content[0] + content[2] + content[4] + content[6]) / 4
y = (content[1] + content[3] + content[5] + content[7]) / 4
x = bounds_value.get("x", 0)
y = bounds_value.get("y", 0)
if x == 0 and y == 0:
return {"ok": False, "error": f"Element has zero dimensions: {selector}"}
await asyncio.sleep(0.05) # Wait for scroll
# Dispatch mouse moved event
await self._cdp(
tab_id,
"Input.dispatchMouseEvent",
{"type": "mouseMoved", "x": x, "y": y},
)
return {"ok": True, "action": "hover", "selector": selector}
return {"ok": True, "action": "hover", "selector": selector, "x": x, "y": y}
async def scroll(self, tab_id: int, direction: str = "down", amount: int = 500) -> dict:
"""Scroll the page."""
"""Scroll the page.
Uses multiple methods for robustness:
1. Find and scroll the largest scrollable container (handles SPAs like LinkedIn)
2. Fallback to window scroll
3. Fallback to mouse wheel events via CDP
"""
await self.cdp_attach(tab_id)
delta_x = 0
@@ -635,16 +826,159 @@ class BeelineBridge:
elif direction == "left":
delta_x = -amount
await self._cdp(
tab_id,
"Runtime.evaluate",
{
"expression": f"window.scrollBy({delta_x}, {delta_y})",
"returnByValue": True,
},
)
# Method 1: Find and scroll the largest scrollable container
# This handles SPAs like LinkedIn where content is in a nested scrollable div
smart_scroll_script = f"""
(function() {{
// Find the largest scrollable container
function findScrollableContainer() {{
const candidates = [];
return {"ok": True, "action": "scroll", "direction": direction, "amount": amount}
// Check all elements with overflow scroll/auto
const allElements = document.querySelectorAll('*');
for (const el of allElements) {{
const style = getComputedStyle(el);
const overflow = style.overflow + style.overflowY;
if (overflow.includes('scroll') || overflow.includes('auto')) {{
const rect = el.getBoundingClientRect();
// Must be visible and reasonably large
if (rect.width > 100 && rect.height > 100 &&
el.scrollHeight > el.clientHeight + 100) {{
candidates.push({{
el: el,
area: rect.width * rect.height,
scrollable: el.scrollHeight - el.clientHeight
}});
}}
}}
}}
// Sort by area (largest first) and return best candidate
candidates.sort((a, b) => b.area - a.area);
return candidates.length > 0 ? candidates[0].el : null;
}}
const container = findScrollableContainer();
if (container) {{
container.scrollBy({{
top: {delta_y},
left: {delta_x},
behavior: 'smooth'
}});
return {{
method: 'container-smooth',
success: true,
containerTag: container.tagName,
containerClass: container.className.substring(0, 50)
}};
}}
// Fallback to window scroll
if ('scrollBehavior' in document.documentElement.style) {{
window.scrollBy({{
top: {delta_y},
left: {delta_x},
behavior: 'smooth'
}});
return {{ method: 'window-smooth', success: true }};
}}
window.scrollBy({delta_x}, {delta_y});
return {{ method: 'window-instant', success: true }};
}})();
"""
try:
result = await self.evaluate(tab_id, smart_scroll_script)
value = result.get("result", {})
if value and value.get("success"):
return {
"ok": True,
"action": "scroll",
"direction": direction,
"amount": amount,
"method": value.get("method", "js"),
"container": value.get("containerTag", "window")
}
except Exception as e:
logger.debug("Smart scroll script failed: %s", e)
# Method 2: Find scrollable container and use mouse wheel at its center
try:
# Find the largest scrollable container and its position
find_container_script = """
(function() {
const candidates = [];
const allElements = document.querySelectorAll('*');
for (const el of allElements) {
const style = getComputedStyle(el);
const overflow = style.overflow + style.overflowY;
if (overflow.includes('scroll') || overflow.includes('auto')) {
const rect = el.getBoundingClientRect();
if (rect.width > 100 && rect.height > 100 &&
el.scrollHeight > el.clientHeight + 100) {
candidates.push({
x: Math.round(rect.left + rect.width / 2),
y: Math.round(rect.top + rect.height / 2),
area: rect.width * rect.height,
tag: el.tagName
});
}
}
}
candidates.sort((a, b) => b.area - a.area);
return candidates.length > 0 ? candidates[0] : null;
})();
"""
container_result = await self._cdp(
tab_id,
"Runtime.evaluate",
{"expression": find_container_script, "returnByValue": True},
)
container_info = container_result.get("result", {}).get("value", {})
if container_info and isinstance(container_info, dict):
x = container_info.get("x", 400)
y = container_info.get("y", 300)
else:
# Fallback to viewport center
viewport_result = await self._cdp(
tab_id,
"Runtime.evaluate",
{
"expression": "({w: window.innerWidth, h: window.innerHeight})",
"returnByValue": True,
},
)
vp = viewport_result.get("result", {}).get("value", {})
x = vp.get("w", 800) // 2
y = vp.get("h", 600) // 2
# Dispatch mouse wheel event at container center
await self._cdp(
tab_id,
"Input.dispatchMouseEvent",
{
"type": "mouseWheel",
"x": x,
"y": y,
"deltaX": -delta_x,
"deltaY": -delta_y,
},
)
return {
"ok": True,
"action": "scroll",
"direction": direction,
"amount": amount,
"method": "mouseWheel",
"target": f"({x}, {y})"
}
except Exception as e:
logger.warning("Scroll failed: %s", e)
return {"ok": False, "error": str(e)}
async def select_option(self, tab_id: int, selector: str, values: list[str]) -> dict:
"""Select options in a select element."""
@@ -675,6 +1009,8 @@ class BeelineBridge:
async def evaluate(self, tab_id: int, script: str) -> dict:
"""Execute JavaScript in the page."""
await self.cdp_attach(tab_id)
await self._try_enable_domain(tab_id, "Runtime")
# Wrap in IIFE to allow return statements at top level
wrapped_script = f"(function() {{ {script} }})()"
result = await self._cdp(
@@ -683,31 +1019,176 @@ class BeelineBridge:
{"expression": wrapped_script, "returnByValue": True, "awaitPromise": True},
)
if result is None:
return {"ok": False, "error": "CDP returned no result"}
if "exceptionDetails" in result:
return {
"ok": False,
"error": result["exceptionDetails"].get("text", "Script error"),
}
# The CDP response structure is {result: {type: ..., value: ...}}
# But our bridge returns just the inner result object
inner_result = result.get("result", {})
value = inner_result.get("value") if isinstance(inner_result, dict) else None
return {
"ok": True,
"action": "evaluate",
"result": result.get("result", {}).get("value"),
"result": value,
}
async def snapshot(self, tab_id: int) -> dict:
async def snapshot(self, tab_id: int, timeout_s: float = 10.0) -> dict:
"""Get an accessibility snapshot of the page.
Uses CDP Accessibility.getFullAXTree and formats it as a readable tree.
Uses a hybrid approach:
1. CDP Accessibility.getFullAXTree for semantic structure
2. DOM queries for visibility and computed styles
3. Falls back to DOM tree if accessibility returns mostly ignored
Args:
tab_id: The tab ID to snapshot
timeout_s: Maximum time to spend building snapshot (default 10s)
"""
await self.cdp_attach(tab_id)
await self._cdp(tab_id, "Accessibility.enable")
async with asyncio.timeout(timeout_s):
await self.cdp_attach(tab_id)
await self._try_enable_domain(tab_id, "Accessibility")
await self._try_enable_domain(tab_id, "DOM")
await self._try_enable_domain(tab_id, "Runtime")
result = await self._cdp(tab_id, "Accessibility.getFullAXTree")
nodes = result.get("nodes", [])
# Try accessibility tree first
result = await self._cdp(tab_id, "Accessibility.getFullAXTree")
nodes = result.get("nodes", [])
# Format the tree
snapshot = self._format_ax_tree(nodes)
# Count non-ignored nodes
visible_count = sum(1 for n in nodes if not n.get("ignored", False))
# If tree is too large or mostly ignored, use DOM-based snapshot
if len(nodes) > 5000:
logger.debug(
"Accessibility tree too large (%d nodes), using DOM snapshot",
len(nodes),
)
return await self._dom_snapshot(tab_id)
if visible_count < 10 and len(nodes) > 50:
logger.debug(
"Accessibility tree has only %d/%d visible nodes, falling back to DOM snapshot",
visible_count,
len(nodes),
)
return await self._dom_snapshot(tab_id)
# Format the accessibility tree (with node limit)
snapshot = self._format_ax_tree(nodes, max_nodes=2000)
# Get URL
url_result = await self._cdp(
tab_id,
"Runtime.evaluate",
{"expression": "window.location.href", "returnByValue": True},
)
url = url_result.get("result", {}).get("value", "")
return {
"ok": True,
"tabId": tab_id,
"url": url,
"tree": snapshot,
}
async def _dom_snapshot(self, tab_id: int) -> dict:
"""Fallback: build snapshot from DOM tree with visibility info."""
# Get all interactive elements using DOM queries
script = """
(function() {
const interactiveSelectors = [
'a', 'button', 'input', 'textarea', 'select', 'option',
'[onclick]', '[role="button"]', '[role="link"]',
'[contenteditable="true"]', 'summary', 'details',
'a[href]', 'button[type]', 'input[type]',
'label', 'form', 'nav', 'nav a', 'nav button',
'[aria-label]', '[aria-labelledby]', '[tabindex]',
'h1', 'h2', 'h3', 'h4', 'h5', 'h6'
].join(', ');
const elements = document.querySelectorAll(interactiveSelectors);
const results = [];
for (const el of elements) {
const rect = el.getBoundingClientRect();
const styles = window.getComputedStyle(el);
// Skip invisible elements
if (rect.width === 0 || rect.height === 1 ||
styles.display === 'none' ||
styles.visibility === 'hidden' ||
styles.opacity === '0') {
continue;
}
// Skip elements outside viewport
if (rect.bottom < 0 || rect.top > window.innerHeight ||
rect.right < 0 || rect.left > window.innerWidth) {
continue;
}
const tag = el.tagName.toLowerCase();
const text = (el.innerText || el.value || el.placeholder || el.getAttribute('aria-label') || '').substring(0, 80);
const type = el.type || tag;
const role = el.getAttribute('role') || tag;
const name = el.name || el.id || '';
const href = el.href || '';
const className = el.className || '';
results.push({
tag,
type,
role,
text: text.trim(),
name,
href,
className: className.split(' ').slice(0, 3).join(' '),
rect: {
x: Math.round(rect.x),
y: Math.round(rect.y),
width: Math.round(rect.width),
height: Math.round(rect.height)
}
});
}
return results;
})();
"""
result = await self.evaluate(tab_id, script)
elements = result.get("result", [])
if not elements:
return {
"ok": True,
"tabId": tab_id,
"tree": "(no visible interactive elements found)",
}
# Format as tree
lines = []
for i in range(0, min(100, len(elements))):
el = elements[i]
ref = f"e{i}"
tag = el.get("tag", "unknown")
text = el.get("text", "")
role = el.get("role", tag)
desc = f"{role}"
if text:
desc += f' "{text[:40]}"'
if el.get("href"):
desc += f' [href]'
desc += f" [ref={ref}]"
lines.append(f" - {desc}")
# Get URL
url_result = await self._cdp(
@@ -715,17 +1196,22 @@ class BeelineBridge:
"Runtime.evaluate",
{"expression": "window.location.href", "returnByValue": True},
)
url = url_result.get("result", {}).get("result", {}).get("value", "")
url = url_result.get("result", {}).get("value", "")
return {
"ok": True,
"tabId": tab_id,
"url": url,
"snapshot": snapshot,
"tree": "\n".join(lines),
}
def _format_ax_tree(self, nodes: list[dict]) -> str:
"""Format a CDP Accessibility.getFullAXTree result."""
def _format_ax_tree(self, nodes: list[dict], max_nodes: int = 2000) -> str:
"""Format a CDP Accessibility.getFullAXTree result.
Args:
nodes: List of accessibility tree nodes
max_nodes: Maximum number of nodes to process (prevents hangs on huge trees)
"""
if not nodes:
return "(empty tree)"
@@ -737,9 +1223,14 @@ class BeelineBridge:
lines: list[str] = []
ref_counter = [0] # Use list to allow mutation in nested function
node_counter = [0] # Track total nodes processed
ref_map: dict[str, str] = {}
def _walk(node_id: str, depth: int) -> None:
# Stop if we've processed enough nodes
if node_counter[0] >= max_nodes:
return
node = by_id.get(node_id)
if not node:
return
@@ -760,6 +1251,8 @@ class BeelineBridge:
_walk(cid, depth)
return
node_counter[0] += 1
name_info = node.get("name", {})
name = name_info.get("value", "") if isinstance(name_info, dict) else str(name_info)
@@ -807,6 +1300,11 @@ class BeelineBridge:
_walk(cid, depth + 1)
_walk(nodes[0]["nodeId"], 0)
# Add truncation notice if we hit the limit
if node_counter[0] >= max_nodes:
lines.append("... (tree truncated, too many nodes)")
return "\n".join(lines) if lines else "(empty tree)"
async def get_text(self, tab_id: int, selector: str, timeout_ms: int = 30000) -> dict: