Merge remote-tracking branch 'origin/feature/session-digest' into fix/data-disclosure-gaps

This commit is contained in:
Timothy
2026-03-17 19:15:22 -07:00
2 changed files with 97 additions and 33 deletions
+44 -22
View File
@@ -75,7 +75,10 @@ def _collect_run_events(bus: "EventBus", run_id: str, limit: int = 2000) -> list
return [e for e in bus.get_history(limit=limit) if getattr(e, "run_id", None) == run_id]
def _build_run_context(events: list["AgentEvent"], outcome_event: "AgentEvent") -> str:
def _build_run_context(
events: list["AgentEvent"],
outcome_event: "AgentEvent | None",
) -> str:
"""Assemble a plain-text run context string for the digest LLM call."""
from framework.runtime.event_bus import EventType
@@ -91,14 +94,17 @@ def _build_run_context(events: list["AgentEvent"], outcome_event: "AgentEvent")
if inp:
lines.append(f"Task input: {str(inp)[:400]}")
# Duration
# Duration (elapsed so far if no outcome yet)
ref_ts = outcome_event.timestamp if outcome_event else datetime.utcnow()
if started:
elapsed = (outcome_event.timestamp - started[0].timestamp).total_seconds()
elapsed = (ref_ts - started[0].timestamp).total_seconds()
m, s = divmod(int(elapsed), 60)
lines.append(f"Duration: {m}m {s}s" if m else f"Duration: {s}s")
lines.append(f"Duration so far: {m}m {s}s" if m else f"Duration so far: {s}s")
# Outcome
if outcome_event.type == EventType.EXECUTION_COMPLETED:
if outcome_event is None:
lines.append("Status: still running (mid-run snapshot)")
elif outcome_event.type == EventType.EXECUTION_COMPLETED:
out = outcome_event.data.get("output", {})
lines.append(f"Outcome: completed. Output: {str(out)[:300]}" if out else "Outcome: completed.")
else:
@@ -161,22 +167,26 @@ def _build_run_context(events: list["AgentEvent"], outcome_event: "AgentEvent")
async def consolidate_worker_run(
agent_name: str,
run_id: str,
outcome_event: "AgentEvent",
outcome_event: "AgentEvent | None",
bus: "EventBus",
llm: Any,
) -> None:
"""Write a digest for a completed or failed worker run.
"""Write (or overwrite) the digest for a worker run.
Called fire-and-forget after EXECUTION_COMPLETED or EXECUTION_FAILED for
a worker stream. Failures are logged and silently swallowed so they never
block the caller.
Called fire-and-forget either:
- After EXECUTION_COMPLETED / EXECUTION_FAILED (outcome_event set, final write)
- Periodically during a run on a cooldown timer (outcome_event=None, mid-run snapshot)
The digest file is always overwritten so each call produces the freshest view.
The final completion/failure call supersedes any mid-run snapshot.
Args:
agent_name: Worker agent directory name (used to determine storage path).
run_id: The run ID from the triggering event.
outcome_event: The EXECUTION_COMPLETED or EXECUTION_FAILED event.
bus: The session EventBus (shared queen + worker).
llm: LLMProvider with an acomplete() method.
agent_name: Worker agent directory name (determines storage path).
run_id: The run ID.
outcome_event: EXECUTION_COMPLETED or EXECUTION_FAILED event, or None for
a mid-run snapshot.
bus: The session EventBus (shared queen + worker).
llm: LLMProvider with an acomplete() method.
"""
try:
events = _collect_run_events(bus, run_id)
@@ -185,7 +195,12 @@ async def consolidate_worker_run(
logger.debug("worker_memory: no events for run %s, skipping digest", run_id)
return
logger.info("worker_memory: generating digest for run %s ...", run_id)
is_final = outcome_event is not None
logger.info(
"worker_memory: generating %s digest for run %s ...",
"final" if is_final else "mid-run",
run_id,
)
from framework.agents.queen.config import default_config
@@ -204,18 +219,25 @@ async def consolidate_worker_run(
from framework.runtime.event_bus import EventType
ts = outcome_event.timestamp.strftime("%Y-%m-%d %H:%M")
status = (
"completed"
if outcome_event.type == EventType.EXECUTION_COMPLETED
else "failed"
ts = (outcome_event.timestamp if outcome_event else datetime.utcnow()).strftime(
"%Y-%m-%d %H:%M"
)
if outcome_event is None:
status = "running"
elif outcome_event.type == EventType.EXECUTION_COMPLETED:
status = "completed"
else:
status = "failed"
path.write_text(
f"# {run_id}\n\n**{ts}** | {status}\n\n{digest_text}\n",
encoding="utf-8",
)
logger.info(
"worker_memory: digest written for run %s (%d chars)", run_id, len(digest_text)
"worker_memory: %s digest written for run %s (%d chars)",
status,
run_id,
len(digest_text),
)
except Exception:
+53 -11
View File
@@ -667,9 +667,20 @@ class SessionManager:
logger.warning("Worker handoff received but queen node not ready")
def _subscribe_worker_digest(self, session: Session) -> None:
"""Subscribe to worker execution completion events to write run digests."""
"""Subscribe to worker events to write per-run digests.
Two triggers:
- NODE_LOOP_ITERATION: write a mid-run snapshot, throttled to at most
once every _DIGEST_COOLDOWN seconds per execution.
- EXECUTION_COMPLETED / EXECUTION_FAILED: always write the final digest,
bypassing the cooldown.
"""
import time as _time
from framework.runtime.event_bus import EventType as _ET
_DIGEST_COOLDOWN = 300.0 # seconds between mid-run snapshots
if session.worker_digest_sub is not None:
try:
session.event_bus.unsubscribe(session.worker_digest_sub)
@@ -684,23 +695,54 @@ class SessionManager:
_agent_name = agent_name
_llm = session.llm
_bus = session.event_bus
# per-execution_id monotonic timestamp of last mid-run digest
_last_digest: dict[str, float] = {}
async def _on_execution_done(event: Any) -> None:
run_id = getattr(event, "run_id", None)
if not run_id or event.stream_id == "queen":
def _resolve_run_id(exec_id: str) -> str | None:
"""Look up the run_id for a given execution_id via EXECUTION_STARTED history."""
for e in _bus.get_history(event_type=_ET.EXECUTION_STARTED, limit=200):
if e.execution_id == exec_id and getattr(e, "run_id", None):
return e.run_id
return None
async def _on_worker_event(event: Any) -> None:
if event.stream_id == "queen":
return
import asyncio as _asyncio
from framework.agents.worker_memory import consolidate_worker_run
_asyncio.create_task(
consolidate_worker_run(_agent_name, run_id, event, _bus, _llm),
name=f"worker-digest-{run_id}",
)
if event.type in (_ET.EXECUTION_COMPLETED, _ET.EXECUTION_FAILED):
# Final digest — always fire, ignore cooldown
run_id = getattr(event, "run_id", None)
if run_id:
asyncio.create_task(
consolidate_worker_run(_agent_name, run_id, event, _bus, _llm),
name=f"worker-digest-final-{run_id}",
)
elif event.type == _ET.NODE_LOOP_ITERATION:
# Mid-run snapshot — respect 300 s cooldown per execution
exec_id = event.execution_id
if not exec_id:
return
now = _time.monotonic()
if now - _last_digest.get(exec_id, 0.0) < _DIGEST_COOLDOWN:
return
_last_digest[exec_id] = now
run_id = _resolve_run_id(exec_id)
if run_id:
asyncio.create_task(
consolidate_worker_run(_agent_name, run_id, None, _bus, _llm),
name=f"worker-digest-{run_id}",
)
session.worker_digest_sub = session.event_bus.subscribe(
event_types=[_ET.EXECUTION_COMPLETED, _ET.EXECUTION_FAILED],
handler=_on_execution_done,
event_types=[
_ET.NODE_LOOP_ITERATION,
_ET.EXECUTION_COMPLETED,
_ET.EXECUTION_FAILED,
],
handler=_on_worker_event,
)
def _subscribe_worker_handoffs(self, session: Session, executor: Any) -> None: