fix: linter update

This commit is contained in:
bryan
2026-03-08 19:37:57 -07:00
parent 0256e0c944
commit cba0ec110f
4 changed files with 151 additions and 58 deletions
+9 -5
View File
@@ -437,10 +437,12 @@ async def handle_update_trigger_task(request: web.Request) -> web.Response:
session_id = request.match_info["session_id"]
await _persist_active_triggers(session, session_id)
return web.json_response({
"trigger_id": trigger_id,
"task": tdef.task,
})
return web.json_response(
{
"trigger_id": trigger_id,
"task": tdef.task,
}
)
async def handle_session_graphs(request: web.Request) -> web.Response:
@@ -854,7 +856,9 @@ def register_routes(app: web.Application) -> None:
# Session info
app.router.add_get("/api/sessions/{session_id}/stats", handle_session_stats)
app.router.add_get("/api/sessions/{session_id}/entry-points", handle_session_entry_points)
app.router.add_patch("/api/sessions/{session_id}/triggers/{trigger_id}", handle_update_trigger_task)
app.router.add_patch(
"/api/sessions/{session_id}/triggers/{trigger_id}", handle_update_trigger_task
)
app.router.add_get("/api/sessions/{session_id}/graphs", handle_session_graphs)
app.router.add_get("/api/sessions/{session_id}/queen-messages", handle_queen_messages)
+4 -1
View File
@@ -1039,7 +1039,10 @@ class SessionManager:
detail = cfg.get("cron") or f"every {cfg.get('interval_minutes', '?')} min"
task_info = f' -> task: "{t.task}"' if t.task else " (no task configured)"
parts.append(f" - {t.id} ({t.trigger_type}: {detail}){task_info}")
trigger_lines = "\n\nAvailable triggers (inactive — use set_trigger to activate):\n" + "\n".join(parts)
trigger_lines = (
"\n\nAvailable triggers (inactive — use set_trigger to activate):\n"
+ "\n".join(parts)
)
await node.inject_event(f"[SYSTEM] Worker loaded.{profile}{trigger_lines}")
+137 -52
View File
@@ -270,27 +270,39 @@ async def _start_trigger_timer(session: Any, trigger_id: str, trigger_def: Any)
while True:
# Fire trigger
if getattr(session, "worker_runtime", None) is None:
logger.warning("Trigger '%s': worker not loaded, discarding tick", trigger_id)
logger.warning(
"Trigger '%s': worker not loaded, discarding tick", trigger_id
)
else:
queen_node = _get_queen_node(session)
if queen_node is not None:
trigger = TriggerEvent(
trigger_type="timer",
source_id=trigger_id,
payload={"schedule": cron_expr, "time": datetime.now(UTC).isoformat(), "task": trigger_def.task},
payload={
"schedule": cron_expr,
"time": datetime.now(UTC).isoformat(),
"task": trigger_def.task,
},
timestamp=time.time(),
)
await session.event_bus.publish(
AgentEvent(
type=EventType.TRIGGER_FIRED,
stream_id="queen",
data={"trigger_id": trigger_id, "trigger_type": "timer", "payload": trigger.payload},
data={
"trigger_id": trigger_id,
"trigger_type": "timer",
"payload": trigger.payload,
},
)
)
await queen_node.inject_trigger(trigger)
logger.info("Trigger '%s' fired (cron: %s)", trigger_id, cron_expr)
else:
logger.warning("Trigger '%s': queen node not available, skipping tick", trigger_id)
logger.warning(
"Trigger '%s': queen node not available, skipping tick", trigger_id
)
# Compute next fire
cron = croniter(cron_expr, datetime.now())
@@ -306,27 +318,41 @@ async def _start_trigger_timer(session: Any, trigger_id: str, trigger_def: Any)
while True:
if getattr(session, "worker_runtime", None) is None:
logger.warning("Trigger '%s': worker not loaded, discarding tick", trigger_id)
logger.warning(
"Trigger '%s': worker not loaded, discarding tick", trigger_id
)
else:
queen_node = _get_queen_node(session)
if queen_node is not None:
trigger = TriggerEvent(
trigger_type="timer",
source_id=trigger_id,
payload={"interval_minutes": interval_minutes, "time": datetime.now(UTC).isoformat(), "task": trigger_def.task},
payload={
"interval_minutes": interval_minutes,
"time": datetime.now(UTC).isoformat(),
"task": trigger_def.task,
},
timestamp=time.time(),
)
await session.event_bus.publish(
AgentEvent(
type=EventType.TRIGGER_FIRED,
stream_id="queen",
data={"trigger_id": trigger_id, "trigger_type": "timer", "payload": trigger.payload},
data={
"trigger_id": trigger_id,
"trigger_type": "timer",
"payload": trigger.payload,
},
)
)
await queen_node.inject_trigger(trigger)
logger.info("Trigger '%s' fired (interval: %dm)", trigger_id, interval_minutes)
logger.info(
"Trigger '%s' fired (interval: %dm)", trigger_id, interval_minutes
)
else:
logger.warning("Trigger '%s': queen node not available, skipping tick", trigger_id)
logger.warning(
"Trigger '%s': queen node not available, skipping tick", trigger_id
)
session.trigger_next_fire[trigger_id] = time.monotonic() + sleep_secs
await asyncio.sleep(sleep_secs)
@@ -394,11 +420,17 @@ async def _start_trigger_webhook(session: Any, trigger_id: str, trigger_def: Any
AgentEvent(
type=EventType.TRIGGER_FIRED,
stream_id="queen",
data={"trigger_id": trigger_id, "trigger_type": "webhook", "payload": trigger.payload},
data={
"trigger_id": trigger_id,
"trigger_type": "webhook",
"payload": trigger.payload,
},
)
)
await queen_node.inject_trigger(trigger)
logger.info("Webhook trigger '%s' fired (%s %s)", trigger_id, event.data.get("method", ""), path)
logger.info(
"Webhook trigger '%s' fired (%s %s)", trigger_id, event.data.get("method", ""), path
)
sub_id = session.event_bus.subscribe(
event_types=[EventType.WEBHOOK_RECEIVED],
@@ -1844,9 +1876,14 @@ def register_queen_lifecycle_tools(
)
available[trigger_id] = tdef
else:
return json.dumps({
"error": f"Trigger '{trigger_id}' not found. Provide trigger_type and trigger_config to create a custom trigger."
})
return json.dumps(
{
"error": (
f"Trigger '{trigger_id}' not found. "
"Provide trigger_type and trigger_config to create a custom trigger."
)
}
)
# Apply task override if provided
if task:
@@ -1854,10 +1891,12 @@ def register_queen_lifecycle_tools(
# Task is mandatory before activation
if not tdef.task:
return json.dumps({
"error": f"Trigger '{trigger_id}' has no task configured. "
"Set a task describing what the worker should do when this trigger fires."
})
return json.dumps(
{
"error": f"Trigger '{trigger_id}' has no task configured. "
"Set a task describing what the worker should do when this trigger fires."
}
)
# Use provided overrides if given
t_type = trigger_type or tdef.trigger_type
@@ -1871,12 +1910,21 @@ def register_queen_lifecycle_tools(
if t_type == "webhook":
path = t_config.get("path", "").strip()
if not path or not path.startswith("/"):
return json.dumps({"error": "Webhook trigger requires 'path' starting with '/' in trigger_config (e.g. '/hooks/github')."})
return json.dumps(
{
"error": (
"Webhook trigger requires 'path' starting with '/'"
" in trigger_config (e.g. '/hooks/github')."
)
}
)
valid_methods = {"GET", "POST", "PUT", "PATCH", "DELETE", "HEAD", "OPTIONS"}
methods = t_config.get("methods", ["POST"])
invalid = [m.upper() for m in methods if m.upper() not in valid_methods]
if invalid:
return json.dumps({"error": f"Invalid HTTP methods: {invalid}. Valid: {sorted(valid_methods)}"})
return json.dumps(
{"error": f"Invalid HTTP methods: {invalid}. Valid: {sorted(valid_methods)}"}
)
try:
await _start_trigger_webhook(session, trigger_id, tdef)
@@ -1892,16 +1940,22 @@ def register_queen_lifecycle_tools(
AgentEvent(
type=EventType.TRIGGER_ACTIVATED,
stream_id="queen",
data={"trigger_id": trigger_id, "trigger_type": t_type, "trigger_config": t_config},
data={
"trigger_id": trigger_id,
"trigger_type": t_type,
"trigger_config": t_config,
},
)
)
port = int(t_config.get("port", 8090))
return json.dumps({
"status": "activated",
"trigger_id": trigger_id,
"trigger_type": t_type,
"webhook_url": f"http://127.0.0.1:{port}{path}",
})
return json.dumps(
{
"status": "activated",
"trigger_id": trigger_id,
"trigger_type": t_type,
"webhook_url": f"http://127.0.0.1:{port}{path}",
}
)
if t_type != "timer":
return json.dumps({"error": f"Unsupported trigger type: {t_type}"})
@@ -1915,12 +1969,16 @@ def register_queen_lifecycle_tools(
if not croniter.is_valid(cron_expr):
return json.dumps({"error": f"Invalid cron expression: {cron_expr}"})
except ImportError:
return json.dumps({"error": "croniter package not installed — cannot validate cron expression."})
return json.dumps(
{"error": "croniter package not installed — cannot validate cron expression."}
)
elif interval:
if not isinstance(interval, (int, float)) or interval <= 0:
return json.dumps({"error": f"interval_minutes must be > 0, got {interval}"})
else:
return json.dumps({"error": "Timer trigger needs 'cron' or 'interval_minutes' in trigger_config."})
return json.dumps(
{"error": "Timer trigger needs 'cron' or 'interval_minutes' in trigger_config."}
)
# Start timer
try:
@@ -1941,30 +1999,40 @@ def register_queen_lifecycle_tools(
AgentEvent(
type=EventType.TRIGGER_ACTIVATED,
stream_id="queen",
data={"trigger_id": trigger_id, "trigger_type": t_type, "trigger_config": t_config},
data={
"trigger_id": trigger_id,
"trigger_type": t_type,
"trigger_config": t_config,
},
)
)
return json.dumps({
"status": "activated",
"trigger_id": trigger_id,
"trigger_type": t_type,
"trigger_config": t_config,
})
return json.dumps(
{
"status": "activated",
"trigger_id": trigger_id,
"trigger_type": t_type,
"trigger_config": t_config,
}
)
_set_trigger_tool = Tool(
name="set_trigger",
description=(
"Activate a trigger (timer) so it fires periodically. "
"Use trigger_id of an available trigger, or provide trigger_type + trigger_config to create a custom one. "
"A task must be configured before activation — either pre-set on the trigger or provided here."
"Use trigger_id of an available trigger, or provide trigger_type + trigger_config"
" to create a custom one. "
"A task must be configured before activation —"
" either pre-set on the trigger or provided here."
),
parameters={
"type": "object",
"properties": {
"trigger_id": {
"type": "string",
"description": "ID of the trigger to activate (from list_triggers) or a new custom ID",
"description": (
"ID of the trigger to activate (from list_triggers) or a new custom ID"
),
},
"trigger_type": {
"type": "string",
@@ -1972,11 +2040,19 @@ def register_queen_lifecycle_tools(
},
"trigger_config": {
"type": "object",
"description": "Config for the trigger. Timer: {cron: '*/5 * * * *'} or {interval_minutes: 5}. Only needed for custom triggers.",
"description": (
"Config for the trigger."
" Timer: {cron: '*/5 * * * *'} or {interval_minutes: 5}."
" Only needed for custom triggers."
),
},
"task": {
"type": "string",
"description": "The task/instructions for the worker when this trigger fires (e.g. 'Process inbox emails using saved rules'). Required if not already configured on the trigger.",
"description": (
"The task/instructions for the worker when this trigger fires"
" (e.g. 'Process inbox emails using saved rules')."
" Required if not already configured on the trigger."
),
},
},
"required": ["trigger_id"],
@@ -2032,7 +2108,10 @@ def register_queen_lifecycle_tools(
_remove_trigger_tool = Tool(
name="remove_trigger",
description="Deactivate an active trigger. The trigger stops firing but remains available for re-activation.",
description=(
"Deactivate an active trigger."
" The trigger stops firing but remains available for re-activation."
),
parameters={
"type": "object",
"properties": {
@@ -2044,7 +2123,9 @@ def register_queen_lifecycle_tools(
"required": ["trigger_id"],
},
)
registry.register("remove_trigger", _remove_trigger_tool, lambda inputs: remove_trigger(**inputs))
registry.register(
"remove_trigger", _remove_trigger_tool, lambda inputs: remove_trigger(**inputs)
)
tools_registered += 1
# --- list_triggers ---------------------------------------------------------
@@ -2054,19 +2135,23 @@ def register_queen_lifecycle_tools(
available = getattr(session, "available_triggers", {})
triggers = []
for tdef in available.values():
triggers.append({
"id": tdef.id,
"trigger_type": tdef.trigger_type,
"trigger_config": tdef.trigger_config,
"description": tdef.description,
"task": tdef.task,
"active": tdef.active,
})
triggers.append(
{
"id": tdef.id,
"trigger_type": tdef.trigger_type,
"trigger_config": tdef.trigger_config,
"description": tdef.description,
"task": tdef.task,
"active": tdef.active,
}
)
return json.dumps({"triggers": triggers})
_list_triggers_tool = Tool(
name="list_triggers",
description="List all available triggers (from the loaded worker) and their active/inactive status.",
description=(
"List all available triggers (from the loaded worker) and their active/inactive status."
),
parameters={
"type": "object",
"properties": {},
@@ -9,6 +9,7 @@ Verifies that:
- run_agent_with_input is in _QUEEN_RUNNING_TOOLS
- System prompts reference run_agent_with_input, not start_worker()
"""
from __future__ import annotations
import asyncio