feat: event source in runtime

This commit is contained in:
Timothy
2026-02-12 19:52:15 -08:00
parent 9a3c6703e1
commit 83140a1398
8 changed files with 1042 additions and 3 deletions
+83 -1
View File
@@ -8,7 +8,7 @@ while preserving the goal-driven approach.
import asyncio
import logging
from collections.abc import Callable
from dataclasses import dataclass
from dataclasses import dataclass, field
from pathlib import Path
from typing import TYPE_CHECKING, Any
@@ -39,6 +39,11 @@ class AgentRuntimeConfig:
max_history: int = 1000
execution_result_max: int = 1000
execution_result_ttl_seconds: float | None = None
# Webhook server config (only starts if webhook_routes is non-empty)
webhook_host: str = "127.0.0.1"
webhook_port: int = 8080
webhook_routes: list[dict] = field(default_factory=list)
# Each dict: {"source_id": str, "path": str, "methods": ["POST"], "secret": str|None}
class AgentRuntime:
@@ -150,6 +155,11 @@ class AgentRuntime:
self._entry_points: dict[str, EntryPointSpec] = {}
self._streams: dict[str, ExecutionStream] = {}
# Webhook server (created on start if webhook_routes configured)
self._webhook_server: Any = None
# Event-driven entry point subscriptions
self._event_subscriptions: list[str] = []
# State
self._running = False
self._lock = asyncio.Lock()
@@ -234,6 +244,63 @@ class AgentRuntime:
await stream.start()
self._streams[ep_id] = stream
# Start webhook server if routes are configured
if self._config.webhook_routes:
from framework.runtime.webhook_server import (
WebhookRoute,
WebhookServer,
WebhookServerConfig,
)
wh_config = WebhookServerConfig(
host=self._config.webhook_host,
port=self._config.webhook_port,
)
self._webhook_server = WebhookServer(self._event_bus, wh_config)
for rc in self._config.webhook_routes:
route = WebhookRoute(
source_id=rc["source_id"],
path=rc["path"],
methods=rc.get("methods", ["POST"]),
secret=rc.get("secret"),
)
self._webhook_server.add_route(route)
await self._webhook_server.start()
# Subscribe event-driven entry points to EventBus
from framework.runtime.event_bus import EventType as _ET
for ep_id, spec in self._entry_points.items():
if spec.trigger_type != "event":
continue
tc = spec.trigger_config
event_types = [_ET(et) for et in tc.get("event_types", [])]
if not event_types:
logger.warning(
f"Entry point '{ep_id}' has trigger_type='event' "
"but no event_types in trigger_config"
)
continue
# Capture ep_id in closure
def _make_handler(entry_point_id: str):
async def _on_event(event):
if self._running and entry_point_id in self._streams:
await self.trigger(entry_point_id, {"event": event.to_dict()})
return _on_event
sub_id = self._event_bus.subscribe(
event_types=event_types,
handler=_make_handler(ep_id),
filter_stream=tc.get("filter_stream"),
filter_node=tc.get("filter_node"),
)
self._event_subscriptions.append(sub_id)
self._running = True
logger.info(f"AgentRuntime started with {len(self._streams)} streams")
@@ -243,6 +310,16 @@ class AgentRuntime:
return
async with self._lock:
# Unsubscribe event-driven entry points
for sub_id in self._event_subscriptions:
self._event_bus.unsubscribe(sub_id)
self._event_subscriptions.clear()
# Stop webhook server
if self._webhook_server:
await self._webhook_server.stop()
self._webhook_server = None
# Stop all streams
for stream in self._streams.values():
await stream.stop()
@@ -448,6 +525,11 @@ class AgentRuntime:
"""Access the outcome aggregator."""
return self._outcome_aggregator
@property
def webhook_server(self) -> Any:
"""Access the webhook server (None if no webhook entry points)."""
return self._webhook_server
@property
def is_running(self) -> bool:
"""Check if runtime is running."""
+27
View File
@@ -66,6 +66,9 @@ class EventType(StrEnum):
# Context management
CONTEXT_COMPACTED = "context_compacted"
# External triggers
WEBHOOK_RECEIVED = "webhook_received"
# Custom events
CUSTOM = "custom"
@@ -636,6 +639,30 @@ class EventBus:
)
)
async def emit_webhook_received(
self,
source_id: str,
path: str,
method: str,
headers: dict[str, str],
payload: dict[str, Any],
query_params: dict[str, str] | None = None,
) -> None:
"""Emit webhook received event."""
await self.publish(
AgentEvent(
type=EventType.WEBHOOK_RECEIVED,
stream_id=source_id,
data={
"path": path,
"method": method,
"headers": headers,
"payload": payload,
"query_params": query_params or {},
},
)
)
# === QUERY OPERATIONS ===
def get_history(
@@ -0,0 +1,717 @@
"""
Tests for WebhookServer and event-driven entry points.
"""
import asyncio
import hashlib
import hmac as hmac_mod
import json
import tempfile
from pathlib import Path
from unittest.mock import patch
import aiohttp
import pytest
from framework.runtime.agent_runtime import AgentRuntime, AgentRuntimeConfig
from framework.runtime.event_bus import AgentEvent, EventBus, EventType
from framework.runtime.execution_stream import EntryPointSpec
from framework.runtime.webhook_server import (
WebhookRoute,
WebhookServer,
WebhookServerConfig,
)
def _make_server(event_bus: EventBus, routes: list[WebhookRoute] | None = None):
"""Helper to create a WebhookServer with port=0 for OS-assigned port."""
config = WebhookServerConfig(host="127.0.0.1", port=0)
server = WebhookServer(event_bus, config)
for route in routes or []:
server.add_route(route)
return server
def _base_url(server: WebhookServer) -> str:
"""Get the base URL for a running server."""
return f"http://127.0.0.1:{server.port}"
class TestWebhookServerLifecycle:
"""Tests for server start/stop."""
@pytest.mark.asyncio
async def test_start_stop(self):
bus = EventBus()
server = _make_server(
bus,
[
WebhookRoute(source_id="test", path="/webhooks/test", methods=["POST"]),
],
)
await server.start()
assert server.is_running
assert server.port is not None
await server.stop()
assert not server.is_running
assert server.port is None
@pytest.mark.asyncio
async def test_no_routes_skips_start(self):
bus = EventBus()
server = _make_server(bus) # no routes
await server.start()
assert not server.is_running
@pytest.mark.asyncio
async def test_stop_when_not_started(self):
bus = EventBus()
server = _make_server(bus)
# Should be a no-op, not raise
await server.stop()
assert not server.is_running
class TestWebhookEventPublishing:
"""Tests for HTTP request -> EventBus event publishing."""
@pytest.mark.asyncio
async def test_post_publishes_webhook_received(self):
bus = EventBus()
received = []
async def handler(event):
received.append(event)
bus.subscribe([EventType.WEBHOOK_RECEIVED], handler)
server = _make_server(
bus,
[
WebhookRoute(source_id="gh", path="/webhooks/github", methods=["POST"]),
],
)
await server.start()
try:
async with aiohttp.ClientSession() as session:
async with session.post(
f"{_base_url(server)}/webhooks/github",
json={"action": "opened", "number": 42},
) as resp:
assert resp.status == 202
body = await resp.json()
assert body["status"] == "accepted"
# Give event bus time to dispatch
await asyncio.sleep(0.05)
assert len(received) == 1
event = received[0]
assert event.type == EventType.WEBHOOK_RECEIVED
assert event.stream_id == "gh"
assert event.data["path"] == "/webhooks/github"
assert event.data["method"] == "POST"
assert event.data["payload"] == {"action": "opened", "number": 42}
assert isinstance(event.data["headers"], dict)
assert event.data["query_params"] == {}
finally:
await server.stop()
@pytest.mark.asyncio
async def test_query_params_included(self):
bus = EventBus()
received = []
async def handler(event):
received.append(event)
bus.subscribe([EventType.WEBHOOK_RECEIVED], handler)
server = _make_server(
bus,
[
WebhookRoute(source_id="hook", path="/webhooks/hook", methods=["POST"]),
],
)
await server.start()
try:
async with aiohttp.ClientSession() as session:
async with session.post(
f"{_base_url(server)}/webhooks/hook?source=test&v=2",
json={"data": "hello"},
) as resp:
assert resp.status == 202
await asyncio.sleep(0.05)
assert len(received) == 1
assert received[0].data["query_params"] == {"source": "test", "v": "2"}
finally:
await server.stop()
@pytest.mark.asyncio
async def test_non_json_body(self):
bus = EventBus()
received = []
async def handler(event):
received.append(event)
bus.subscribe([EventType.WEBHOOK_RECEIVED], handler)
server = _make_server(
bus,
[
WebhookRoute(source_id="raw", path="/webhooks/raw", methods=["POST"]),
],
)
await server.start()
try:
async with aiohttp.ClientSession() as session:
async with session.post(
f"{_base_url(server)}/webhooks/raw",
data=b"plain text body",
headers={"Content-Type": "text/plain"},
) as resp:
assert resp.status == 202
await asyncio.sleep(0.05)
assert len(received) == 1
assert received[0].data["payload"] == {"raw_body": "plain text body"}
finally:
await server.stop()
@pytest.mark.asyncio
async def test_empty_body(self):
bus = EventBus()
received = []
async def handler(event):
received.append(event)
bus.subscribe([EventType.WEBHOOK_RECEIVED], handler)
server = _make_server(
bus,
[
WebhookRoute(source_id="empty", path="/webhooks/empty", methods=["POST"]),
],
)
await server.start()
try:
async with aiohttp.ClientSession() as session:
async with session.post(f"{_base_url(server)}/webhooks/empty") as resp:
assert resp.status == 202
await asyncio.sleep(0.05)
assert len(received) == 1
assert received[0].data["payload"] == {}
finally:
await server.stop()
@pytest.mark.asyncio
async def test_multiple_routes(self):
bus = EventBus()
received = []
async def handler(event):
received.append(event)
bus.subscribe([EventType.WEBHOOK_RECEIVED], handler)
server = _make_server(
bus,
[
WebhookRoute(source_id="a", path="/webhooks/a", methods=["POST"]),
WebhookRoute(source_id="b", path="/webhooks/b", methods=["POST"]),
],
)
await server.start()
try:
async with aiohttp.ClientSession() as session:
async with session.post(
f"{_base_url(server)}/webhooks/a", json={"from": "a"}
) as resp:
assert resp.status == 202
async with session.post(
f"{_base_url(server)}/webhooks/b", json={"from": "b"}
) as resp:
assert resp.status == 202
await asyncio.sleep(0.05)
assert len(received) == 2
stream_ids = {e.stream_id for e in received}
assert stream_ids == {"a", "b"}
finally:
await server.stop()
@pytest.mark.asyncio
async def test_filter_stream_subscription(self):
"""Subscribers can filter by stream_id (source_id)."""
bus = EventBus()
a_events = []
b_events = []
async def handle_a(event):
a_events.append(event)
async def handle_b(event):
b_events.append(event)
bus.subscribe([EventType.WEBHOOK_RECEIVED], handle_a, filter_stream="a")
bus.subscribe([EventType.WEBHOOK_RECEIVED], handle_b, filter_stream="b")
server = _make_server(
bus,
[
WebhookRoute(source_id="a", path="/webhooks/a", methods=["POST"]),
WebhookRoute(source_id="b", path="/webhooks/b", methods=["POST"]),
],
)
await server.start()
try:
async with aiohttp.ClientSession() as session:
await session.post(f"{_base_url(server)}/webhooks/a", json={"x": 1})
await session.post(f"{_base_url(server)}/webhooks/b", json={"x": 2})
await asyncio.sleep(0.05)
assert len(a_events) == 1
assert a_events[0].data["payload"] == {"x": 1}
assert len(b_events) == 1
assert b_events[0].data["payload"] == {"x": 2}
finally:
await server.stop()
class TestHMACVerification:
"""Tests for HMAC-SHA256 signature verification."""
@pytest.mark.asyncio
async def test_valid_signature_accepted(self):
bus = EventBus()
received = []
async def handler(event):
received.append(event)
bus.subscribe([EventType.WEBHOOK_RECEIVED], handler)
secret = "test-secret-key"
server = _make_server(
bus,
[
WebhookRoute(
source_id="secure",
path="/webhooks/secure",
methods=["POST"],
secret=secret,
),
],
)
await server.start()
try:
body = json.dumps({"event": "push"}).encode()
sig = hmac_mod.new(secret.encode(), body, hashlib.sha256).hexdigest()
async with aiohttp.ClientSession() as session:
async with session.post(
f"{_base_url(server)}/webhooks/secure",
data=body,
headers={
"Content-Type": "application/json",
"X-Hub-Signature-256": f"sha256={sig}",
},
) as resp:
assert resp.status == 202
await asyncio.sleep(0.05)
assert len(received) == 1
finally:
await server.stop()
@pytest.mark.asyncio
async def test_invalid_signature_rejected(self):
bus = EventBus()
received = []
async def handler(event):
received.append(event)
bus.subscribe([EventType.WEBHOOK_RECEIVED], handler)
server = _make_server(
bus,
[
WebhookRoute(
source_id="secure",
path="/webhooks/secure",
methods=["POST"],
secret="real-secret",
),
],
)
await server.start()
try:
async with aiohttp.ClientSession() as session:
async with session.post(
f"{_base_url(server)}/webhooks/secure",
json={"event": "push"},
headers={"X-Hub-Signature-256": "sha256=invalidsignature"},
) as resp:
assert resp.status == 401
await asyncio.sleep(0.05)
assert len(received) == 0 # No event published
finally:
await server.stop()
@pytest.mark.asyncio
async def test_missing_signature_rejected(self):
bus = EventBus()
received = []
async def handler(event):
received.append(event)
bus.subscribe([EventType.WEBHOOK_RECEIVED], handler)
server = _make_server(
bus,
[
WebhookRoute(
source_id="secure",
path="/webhooks/secure",
methods=["POST"],
secret="my-secret",
),
],
)
await server.start()
try:
async with aiohttp.ClientSession() as session:
# No X-Hub-Signature-256 header
async with session.post(
f"{_base_url(server)}/webhooks/secure",
json={"event": "push"},
) as resp:
assert resp.status == 401
await asyncio.sleep(0.05)
assert len(received) == 0
finally:
await server.stop()
@pytest.mark.asyncio
async def test_no_secret_skips_verification(self):
"""Routes without a secret accept any request."""
bus = EventBus()
received = []
async def handler(event):
received.append(event)
bus.subscribe([EventType.WEBHOOK_RECEIVED], handler)
server = _make_server(
bus,
[
WebhookRoute(
source_id="open",
path="/webhooks/open",
methods=["POST"],
secret=None,
),
],
)
await server.start()
try:
async with aiohttp.ClientSession() as session:
async with session.post(
f"{_base_url(server)}/webhooks/open",
json={"data": "test"},
) as resp:
assert resp.status == 202
await asyncio.sleep(0.05)
assert len(received) == 1
finally:
await server.stop()
class TestEventDrivenEntryPoints:
"""Tests for event-driven entry points wired through AgentRuntime."""
def _make_graph_and_goal(self):
"""Minimal graph + goal for testing entry point triggering."""
from framework.graph import Goal
from framework.graph.edge import GraphSpec
from framework.graph.goal import SuccessCriterion
from framework.graph.node import NodeSpec
nodes = [
NodeSpec(
id="process-event",
name="Process Event",
description="Process incoming event",
node_type="llm_generate",
input_keys=["event"],
output_keys=["result"],
),
]
graph = GraphSpec(
id="test-graph",
goal_id="test-goal",
version="1.0.0",
entry_node="process-event",
entry_points={"start": "process-event"},
async_entry_points=[],
terminal_nodes=[],
pause_nodes=[],
nodes=nodes,
edges=[],
)
goal = Goal(
id="test-goal",
name="Test Goal",
description="Test",
success_criteria=[
SuccessCriterion(
id="sc-1",
description="Done",
metric="done",
target="yes",
weight=1.0,
),
],
)
return graph, goal
@pytest.mark.asyncio
async def test_event_entry_point_subscribes_to_bus(self):
"""Entry point with trigger_type='event' subscribes and triggers on matching events."""
graph, goal = self._make_graph_and_goal()
config = AgentRuntimeConfig(
webhook_host="127.0.0.1",
webhook_port=0,
webhook_routes=[
{"source_id": "gh", "path": "/webhooks/github"},
],
)
with tempfile.TemporaryDirectory() as tmpdir:
runtime = AgentRuntime(
graph=graph,
goal=goal,
storage_path=Path(tmpdir),
config=config,
)
runtime.register_entry_point(
EntryPointSpec(
id="gh-handler",
name="GitHub Handler",
entry_node="process-event",
trigger_type="event",
trigger_config={
"event_types": ["webhook_received"],
"filter_stream": "gh",
},
)
)
trigger_calls = []
async def mock_trigger(ep_id, data, **kwargs):
trigger_calls.append((ep_id, data))
with patch.object(runtime, "trigger", side_effect=mock_trigger):
await runtime.start()
try:
assert runtime.webhook_server is not None
assert runtime.webhook_server.is_running
port = runtime.webhook_server.port
async with aiohttp.ClientSession() as session:
async with session.post(
f"http://127.0.0.1:{port}/webhooks/github",
json={"action": "push", "ref": "main"},
) as resp:
assert resp.status == 202
await asyncio.sleep(0.1)
assert len(trigger_calls) == 1
ep_id, data = trigger_calls[0]
assert ep_id == "gh-handler"
assert "event" in data
assert data["event"]["type"] == "webhook_received"
assert data["event"]["stream_id"] == "gh"
assert data["event"]["data"]["payload"] == {
"action": "push",
"ref": "main",
}
finally:
await runtime.stop()
assert runtime.webhook_server is None
@pytest.mark.asyncio
async def test_event_entry_point_filter_stream(self):
"""Entry point only triggers for matching stream_id (source_id)."""
graph, goal = self._make_graph_and_goal()
config = AgentRuntimeConfig(
webhook_routes=[
{"source_id": "github", "path": "/webhooks/github"},
{"source_id": "stripe", "path": "/webhooks/stripe"},
],
webhook_port=0,
)
with tempfile.TemporaryDirectory() as tmpdir:
runtime = AgentRuntime(
graph=graph,
goal=goal,
storage_path=Path(tmpdir),
config=config,
)
runtime.register_entry_point(
EntryPointSpec(
id="gh-only",
name="GitHub Only",
entry_node="process-event",
trigger_type="event",
trigger_config={
"event_types": ["webhook_received"],
"filter_stream": "github",
},
)
)
trigger_calls = []
async def mock_trigger(ep_id, data, **kwargs):
trigger_calls.append((ep_id, data))
with patch.object(runtime, "trigger", side_effect=mock_trigger):
await runtime.start()
try:
port = runtime.webhook_server.port
async with aiohttp.ClientSession() as session:
# POST to stripe — should NOT trigger
await session.post(
f"http://127.0.0.1:{port}/webhooks/stripe",
json={"type": "payment"},
)
# POST to github — should trigger
await session.post(
f"http://127.0.0.1:{port}/webhooks/github",
json={"action": "opened"},
)
await asyncio.sleep(0.1)
assert len(trigger_calls) == 1
assert trigger_calls[0][0] == "gh-only"
finally:
await runtime.stop()
@pytest.mark.asyncio
async def test_no_webhook_routes_skips_server(self):
"""Runtime without webhook_routes does not start a webhook server."""
graph, goal = self._make_graph_and_goal()
with tempfile.TemporaryDirectory() as tmpdir:
runtime = AgentRuntime(
graph=graph,
goal=goal,
storage_path=Path(tmpdir),
)
runtime.register_entry_point(
EntryPointSpec(
id="manual",
name="Manual",
entry_node="process-event",
trigger_type="manual",
)
)
await runtime.start()
try:
assert runtime.webhook_server is None
finally:
await runtime.stop()
@pytest.mark.asyncio
async def test_event_entry_point_custom_event(self):
"""Entry point can subscribe to CUSTOM events, not just webhooks."""
graph, goal = self._make_graph_and_goal()
with tempfile.TemporaryDirectory() as tmpdir:
runtime = AgentRuntime(
graph=graph,
goal=goal,
storage_path=Path(tmpdir),
)
runtime.register_entry_point(
EntryPointSpec(
id="custom-handler",
name="Custom Handler",
entry_node="process-event",
trigger_type="event",
trigger_config={
"event_types": ["custom"],
},
)
)
trigger_calls = []
async def mock_trigger(ep_id, data, **kwargs):
trigger_calls.append((ep_id, data))
with patch.object(runtime, "trigger", side_effect=mock_trigger):
await runtime.start()
try:
await runtime.event_bus.publish(
AgentEvent(
type=EventType.CUSTOM,
stream_id="some-source",
data={"key": "value"},
)
)
await asyncio.sleep(0.1)
assert len(trigger_calls) == 1
assert trigger_calls[0][0] == "custom-handler"
assert trigger_calls[0][1]["event"]["type"] == "custom"
assert trigger_calls[0][1]["event"]["data"]["key"] == "value"
finally:
await runtime.stop()
+175
View File
@@ -0,0 +1,175 @@
"""
Webhook HTTP Server - Receives HTTP requests and publishes them as EventBus events.
Only starts if webhook-type entry points are registered. Uses aiohttp for
a lightweight embedded HTTP server that runs within the existing asyncio loop.
"""
import hashlib
import hmac
import json
import logging
from dataclasses import dataclass
from aiohttp import web
from framework.runtime.event_bus import EventBus
logger = logging.getLogger(__name__)
@dataclass
class WebhookRoute:
"""A registered webhook route derived from an EntryPointSpec."""
source_id: str
path: str
methods: list[str]
secret: str | None = None # For HMAC-SHA256 signature verification
@dataclass
class WebhookServerConfig:
"""Configuration for the webhook HTTP server."""
host: str = "127.0.0.1"
port: int = 8080
class WebhookServer:
"""
Embedded HTTP server that receives webhook requests and publishes
them as WEBHOOK_RECEIVED events on the EventBus.
The server's only job is: receive HTTP -> publish AgentEvent.
Subscribers decide what to do with the event.
Lifecycle:
server = WebhookServer(event_bus, config)
server.add_route(WebhookRoute(...))
await server.start()
# ... server running ...
await server.stop()
"""
def __init__(
self,
event_bus: EventBus,
config: WebhookServerConfig | None = None,
):
self._event_bus = event_bus
self._config = config or WebhookServerConfig()
self._routes: dict[str, WebhookRoute] = {} # path -> route
self._app: web.Application | None = None
self._runner: web.AppRunner | None = None
self._site: web.TCPSite | None = None
def add_route(self, route: WebhookRoute) -> None:
"""Register a webhook route."""
self._routes[route.path] = route
async def start(self) -> None:
"""Start the HTTP server. No-op if no routes registered."""
if not self._routes:
logger.debug("No webhook routes registered, skipping server start")
return
self._app = web.Application()
for path, route in self._routes.items():
for method in route.methods:
self._app.router.add_route(method, path, self._handle_request)
self._runner = web.AppRunner(self._app)
await self._runner.setup()
self._site = web.TCPSite(
self._runner,
self._config.host,
self._config.port,
)
await self._site.start()
logger.info(
f"Webhook server started on {self._config.host}:{self._config.port} "
f"with {len(self._routes)} route(s)"
)
async def stop(self) -> None:
"""Stop the HTTP server gracefully."""
if self._runner:
await self._runner.cleanup()
self._runner = None
self._app = None
self._site = None
logger.info("Webhook server stopped")
async def _handle_request(self, request: web.Request) -> web.Response:
"""Handle an incoming webhook request."""
path = request.path
route = self._routes.get(path)
if route is None:
return web.json_response({"error": "Not found"}, status=404)
# Read body
try:
body = await request.read()
except Exception:
return web.json_response(
{"error": "Failed to read request body"},
status=400,
)
# Verify HMAC signature if secret is configured
if route.secret:
if not self._verify_signature(request, body, route.secret):
return web.json_response({"error": "Invalid signature"}, status=401)
# Parse body as JSON (fall back to raw text for non-JSON)
try:
payload = json.loads(body) if body else {}
except (json.JSONDecodeError, ValueError):
payload = {"raw_body": body.decode("utf-8", errors="replace")}
# Publish event to bus
await self._event_bus.emit_webhook_received(
source_id=route.source_id,
path=path,
method=request.method,
headers=dict(request.headers),
payload=payload,
query_params=dict(request.query),
)
return web.json_response({"status": "accepted"}, status=202)
def _verify_signature(
self,
request: web.Request,
body: bytes,
secret: str,
) -> bool:
"""Verify HMAC-SHA256 signature from X-Hub-Signature-256 header."""
signature_header = request.headers.get("X-Hub-Signature-256", "")
if not signature_header.startswith("sha256="):
return False
expected_sig = signature_header[7:] # strip "sha256="
computed_sig = hmac.new(
secret.encode("utf-8"),
body,
hashlib.sha256,
).hexdigest()
return hmac.compare_digest(expected_sig, computed_sig)
@property
def is_running(self) -> bool:
"""Check if the server is running."""
return self._site is not None
@property
def port(self) -> int | None:
"""Return the actual listening port (useful when configured with port=0)."""
if self._site and self._site._server and self._site._server.sockets:
return self._site._server.sockets[0].getsockname()[1]
return None
+6
View File
@@ -409,6 +409,12 @@ class AdenTUI(App):
event.node_id or event.data.get("node_id", ""),
)
# Track active node in chat_repl for mid-execution input
if et == EventType.NODE_LOOP_STARTED:
self.chat_repl.handle_node_started(event.node_id or "")
elif et == EventType.NODE_LOOP_COMPLETED:
self.chat_repl.handle_node_completed(event.node_id or "")
# --- Graph view events ---
if et in (
EventType.EXECUTION_STARTED,
+28 -1
View File
@@ -83,6 +83,7 @@ class ChatRepl(Vertical):
self._waiting_for_input: bool = False
self._input_node_id: str | None = None
self._pending_ask_question: str = ""
self._active_node_id: str | None = None # Currently executing node
self._resume_session = resume_session
self._resume_checkpoint = resume_checkpoint
self._session_index: list[str] = [] # IDs from last listing
@@ -785,7 +786,22 @@ class ChatRepl(Vertical):
self._write_history(f"[bold red]Error delivering input:[/bold red] {e}")
return
# Double-submit guard: reject input while an execution is in-flight
# Mid-execution input: inject into the active node's conversation
if self._current_exec_id is not None and self._active_node_id:
self._write_history(f"[bold green]You:[/bold green] {user_input}")
message.input.value = ""
node_id = self._active_node_id
try:
future = asyncio.run_coroutine_threadsafe(
self.runtime.inject_input(node_id, user_input),
self._agent_loop,
)
await asyncio.wrap_future(future)
except Exception as e:
self._write_history(f"[bold red]Error delivering input:[/bold red] {e}")
return
# Double-submit guard: no active node to inject into
if self._current_exec_id is not None:
self._write_history("[dim]Agent is still running — please wait.[/dim]")
return
@@ -914,6 +930,7 @@ class ChatRepl(Vertical):
self._streaming_snapshot = ""
self._waiting_for_input = False
self._input_node_id = None
self._active_node_id = None
self._pending_ask_question = ""
# Re-enable input
@@ -935,6 +952,7 @@ class ChatRepl(Vertical):
self._waiting_for_input = False
self._pending_ask_question = ""
self._input_node_id = None
self._active_node_id = None
# Re-enable input
chat_input = self.query_one("#chat-input", Input)
@@ -972,3 +990,12 @@ class ChatRepl(Vertical):
chat_input.disabled = False
chat_input.placeholder = "Type your response..."
chat_input.focus()
def handle_node_started(self, node_id: str) -> None:
"""Track which node is currently executing."""
self._active_node_id = node_id
def handle_node_completed(self, node_id: str) -> None:
"""Clear active node when it finishes."""
if self._active_node_id == node_id:
self._active_node_id = None
+1
View File
@@ -20,6 +20,7 @@ dependencies = [
[project.optional-dependencies]
tui = ["textual>=0.75.0"]
webhook = ["aiohttp>=3.9.0"]
[project.scripts]
hive = "framework.cli:main"
Generated
+5 -1
View File
@@ -774,6 +774,9 @@ dependencies = [
tui = [
{ name = "textual" },
]
webhook = [
{ name = "aiohttp" },
]
[package.dev-dependencies]
dev = [
@@ -783,6 +786,7 @@ dev = [
[package.metadata]
requires-dist = [
{ name = "aiohttp", marker = "extra == 'webhook'", specifier = ">=3.9.0" },
{ name = "anthropic", specifier = ">=0.40.0" },
{ name = "fastmcp", specifier = ">=2.0.0" },
{ name = "httpx", specifier = ">=0.27.0" },
@@ -796,7 +800,7 @@ requires-dist = [
{ name = "textual", marker = "extra == 'tui'", specifier = ">=0.75.0" },
{ name = "tools", editable = "tools" },
]
provides-extras = ["tui"]
provides-extras = ["tui", "webhook"]
[package.metadata.requires-dev]
dev = [