744 lines
26 KiB
Python
744 lines
26 KiB
Python
"""ColonyRuntime — Orchestrates a colony of parallel worker clones.
|
|
|
|
Each worker is an exact copy of the queen's AgentLoop — same tools,
|
|
same prompt, same LLM. Workers run independently and report results
|
|
back to the queen via the event bus.
|
|
|
|
The ColonyRuntime replaces both AgentHost and ExecutionManager.
|
|
There are no graphs, no edges, no nodes, no data buffers.
|
|
Just: spawn N independent clones, let them run, collect results.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import time
|
|
import uuid
|
|
from collections import OrderedDict
|
|
from collections.abc import Callable
|
|
from dataclasses import dataclass, field
|
|
from pathlib import Path
|
|
from typing import TYPE_CHECKING, Any
|
|
|
|
from framework.agent_loop.types import AgentContext, AgentSpec
|
|
from framework.host.event_bus import AgentEvent, EventBus, EventType
|
|
from framework.host.triggers import TriggerDefinition
|
|
from framework.host.worker import Worker, WorkerInfo, WorkerResult, WorkerStatus
|
|
from framework.observability import set_trace_context
|
|
from framework.schemas.goal import Goal
|
|
from framework.storage.concurrent import ConcurrentStorage
|
|
from framework.storage.session_store import SessionStore
|
|
|
|
if TYPE_CHECKING:
|
|
from framework.agent_loop.agent_loop import AgentLoop
|
|
from framework.llm.provider import LLMProvider, Tool
|
|
from framework.pipeline.runner import PipelineRunner
|
|
from framework.skills.manager import SkillsManagerConfig
|
|
from framework.tracker.runtime_log_store import RuntimeLogStore
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class ColonyConfig:
|
|
max_concurrent_workers: int = 100
|
|
cache_ttl: float = 60.0
|
|
batch_interval: float = 0.1
|
|
max_history: int = 1000
|
|
result_retention_max: int = 1000
|
|
result_retention_ttl_seconds: float | None = None
|
|
idempotency_ttl_seconds: float = 300.0
|
|
idempotency_max_keys: int = 10000
|
|
webhook_host: str = "127.0.0.1"
|
|
webhook_port: int = 8080
|
|
webhook_routes: list[dict] = field(default_factory=list)
|
|
max_resurrections: int = 3
|
|
|
|
|
|
@dataclass
|
|
class TriggerSpec:
|
|
"""Specification for a trigger that auto-spawns workers."""
|
|
|
|
id: str
|
|
name: str
|
|
trigger_type: str # "webhook", "api", "timer", "event", "manual"
|
|
trigger_config: dict[str, Any] = field(default_factory=dict)
|
|
isolation_level: str = "shared"
|
|
priority: int = 0
|
|
max_concurrent: int = 10
|
|
max_resurrections: int = 3
|
|
|
|
|
|
class StreamEventBus(EventBus):
|
|
"""Proxy that stamps ``colony_id`` on every published event."""
|
|
|
|
def __init__(self, bus: EventBus, colony_id: str) -> None:
|
|
self._real_bus = bus
|
|
self._colony_id = colony_id
|
|
self.last_activity_time: float = time.monotonic()
|
|
|
|
async def publish(self, event: AgentEvent) -> None:
|
|
event.colony_id = self._colony_id
|
|
self.last_activity_time = time.monotonic()
|
|
await self._real_bus.publish(event)
|
|
|
|
def subscribe(self, *args: Any, **kwargs: Any) -> str:
|
|
return self._real_bus.subscribe(*args, **kwargs)
|
|
|
|
def unsubscribe(self, subscription_id: str) -> bool:
|
|
return self._real_bus.unsubscribe(subscription_id)
|
|
|
|
def get_history(self, *args: Any, **kwargs: Any) -> list:
|
|
return self._real_bus.get_history(*args, **kwargs)
|
|
|
|
def get_stats(self) -> dict:
|
|
return self._real_bus.get_stats()
|
|
|
|
async def wait_for(self, *args: Any, **kwargs: Any) -> Any:
|
|
return await self._real_bus.wait_for(*args, **kwargs)
|
|
|
|
|
|
class ColonyRuntime:
|
|
"""Orchestrates a colony of parallel worker clones.
|
|
|
|
Each worker is an exact copy of the queen's AgentLoop. Workers run
|
|
independently, report results via the event bus, and terminate.
|
|
|
|
Supports:
|
|
- Spawning/stopping workers
|
|
- Timer and webhook triggers that auto-spawn workers
|
|
- Pipeline middleware (credentials, tools, skills)
|
|
- Event pub/sub for queen-worker communication
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
agent_spec: AgentSpec,
|
|
goal: Goal,
|
|
storage_path: str | Path,
|
|
llm: LLMProvider | None = None,
|
|
tools: list[Tool] | None = None,
|
|
tool_executor: Callable | None = None,
|
|
config: ColonyConfig | None = None,
|
|
runtime_log_store: RuntimeLogStore | None = None,
|
|
colony_id: str | None = None,
|
|
accounts_prompt: str = "",
|
|
accounts_data: list[dict] | None = None,
|
|
tool_provider_map: dict[str, str] | None = None,
|
|
event_bus: EventBus | None = None,
|
|
skills_manager_config: SkillsManagerConfig | None = None,
|
|
skills_catalog_prompt: str = "",
|
|
protocols_prompt: str = "",
|
|
skill_dirs: list[str] | None = None,
|
|
pipeline_stages: list | None = None,
|
|
):
|
|
from framework.pipeline.runner import PipelineRunner
|
|
from framework.skills.manager import SkillsManager
|
|
|
|
self._agent_spec = agent_spec
|
|
self._goal = goal
|
|
self._config = config or ColonyConfig()
|
|
self._runtime_log_store = runtime_log_store
|
|
|
|
if pipeline_stages:
|
|
self._pipeline = PipelineRunner(pipeline_stages)
|
|
else:
|
|
self._pipeline = self._load_pipeline_from_config()
|
|
|
|
if skills_manager_config is not None:
|
|
self._skills_manager = SkillsManager(skills_manager_config)
|
|
self._skills_manager.load()
|
|
elif skills_catalog_prompt or protocols_prompt:
|
|
import warnings
|
|
|
|
warnings.warn(
|
|
"Passing pre-rendered skills_catalog_prompt/protocols_prompt "
|
|
"is deprecated. Pass skills_manager_config instead.",
|
|
DeprecationWarning,
|
|
stacklevel=2,
|
|
)
|
|
self._skills_manager = SkillsManager.from_precomputed(
|
|
skills_catalog_prompt, protocols_prompt
|
|
)
|
|
else:
|
|
self._skills_manager = SkillsManager()
|
|
self._skills_manager.load()
|
|
|
|
self.skill_dirs: list[str] = self._skills_manager.allowlisted_dirs
|
|
self.context_warn_ratio: float | None = self._skills_manager.context_warn_ratio
|
|
self.batch_init_nudge: str | None = self._skills_manager.batch_init_nudge
|
|
|
|
self._colony_id: str = colony_id or "primary"
|
|
self._accounts_prompt = accounts_prompt
|
|
self._accounts_data = accounts_data
|
|
self._tool_provider_map = tool_provider_map
|
|
self._dynamic_memory_provider_factory: Callable[[str], Callable[[], str] | None] | None = (
|
|
None
|
|
)
|
|
|
|
storage_path_obj = Path(storage_path) if isinstance(storage_path, str) else storage_path
|
|
self._storage = ConcurrentStorage(
|
|
base_path=storage_path_obj,
|
|
cache_ttl=self._config.cache_ttl,
|
|
batch_interval=self._config.batch_interval,
|
|
)
|
|
self._session_store = SessionStore(storage_path_obj)
|
|
|
|
self._event_bus = event_bus or EventBus(max_history=self._config.max_history)
|
|
self._scoped_event_bus = StreamEventBus(self._event_bus, self._colony_id)
|
|
|
|
self._llm = llm
|
|
self._tools = tools or []
|
|
self._tool_executor = tool_executor
|
|
|
|
# Worker management
|
|
self._workers: dict[str, Worker] = {}
|
|
self._triggers: dict[str, TriggerSpec] = {}
|
|
self._trigger_definitions: dict[str, TriggerDefinition] = {}
|
|
|
|
# Timer/webhook infrastructure
|
|
self._event_subscriptions: list[str] = []
|
|
self._timer_tasks: list[asyncio.Task] = []
|
|
self._timer_next_fire: dict[str, float] = {}
|
|
self._webhook_server: Any = None
|
|
|
|
# Idempotency
|
|
self._idempotency_keys: OrderedDict[str, str] = OrderedDict()
|
|
self._idempotency_times: dict[str, float] = {}
|
|
|
|
# User presence
|
|
self._last_user_input_time: float = 0.0
|
|
|
|
# Result retention
|
|
self._execution_results: OrderedDict[str, WorkerResult] = OrderedDict()
|
|
self._execution_result_times: dict[str, float] = {}
|
|
|
|
self._running = False
|
|
self._timers_paused = False
|
|
self._lock = asyncio.Lock()
|
|
|
|
self.intro_message: str = ""
|
|
|
|
@property
|
|
def skills_catalog_prompt(self) -> str:
|
|
return self._skills_manager.skills_catalog_prompt
|
|
|
|
@property
|
|
def protocols_prompt(self) -> str:
|
|
return self._skills_manager.protocols_prompt
|
|
|
|
@property
|
|
def colony_id(self) -> str:
|
|
return self._colony_id
|
|
|
|
@property
|
|
def agent_id(self) -> str:
|
|
return self._colony_id
|
|
|
|
@property
|
|
def is_running(self) -> bool:
|
|
return self._running
|
|
|
|
@property
|
|
def event_bus(self) -> EventBus:
|
|
return self._event_bus
|
|
|
|
@property
|
|
def timers_paused(self) -> bool:
|
|
return self._timers_paused
|
|
|
|
@property
|
|
def user_idle_seconds(self) -> float:
|
|
if self._last_user_input_time == 0.0:
|
|
return float("inf")
|
|
return time.monotonic() - self._last_user_input_time
|
|
|
|
@property
|
|
def agent_idle_seconds(self) -> float:
|
|
if not self._workers:
|
|
return float("inf")
|
|
min_idle = float("inf")
|
|
now = time.monotonic()
|
|
for w in self._workers.values():
|
|
if w.is_active and w._started_at > 0:
|
|
idle = now - w._started_at
|
|
if idle < min_idle:
|
|
min_idle = idle
|
|
bus_idle = now - self._scoped_event_bus.last_activity_time
|
|
return min(min_idle, bus_idle)
|
|
|
|
@property
|
|
def active_worker_count(self) -> int:
|
|
return sum(1 for w in self._workers.values() if w.is_active)
|
|
|
|
def _apply_pipeline_results(self) -> None:
|
|
for stage in self._pipeline.stages:
|
|
if stage.tool_registry is not None:
|
|
tools = list(stage.tool_registry.get_tools().values())
|
|
if tools:
|
|
self._tools = tools
|
|
self._tool_executor = stage.tool_registry.get_executor()
|
|
if stage.llm is not None and self._llm is None:
|
|
self._llm = stage.llm
|
|
if stage.accounts_prompt:
|
|
self._accounts_prompt = stage.accounts_prompt
|
|
self._accounts_data = stage.accounts_data
|
|
self._tool_provider_map = stage.tool_provider_map
|
|
if stage.skills_manager is not None:
|
|
self._skills_manager = stage.skills_manager
|
|
|
|
@staticmethod
|
|
def _load_pipeline_from_config():
|
|
from framework.config import get_hive_config
|
|
from framework.pipeline.registry import build_pipeline_from_config
|
|
from framework.pipeline.runner import PipelineRunner
|
|
|
|
config = get_hive_config()
|
|
stages_config = config.get("pipeline", {}).get("stages", [])
|
|
if not stages_config:
|
|
return PipelineRunner([])
|
|
return build_pipeline_from_config(stages_config)
|
|
|
|
# ── Lifecycle ───────────────────────────────────────────────
|
|
|
|
async def start(self) -> None:
|
|
if self._running:
|
|
return
|
|
|
|
async with self._lock:
|
|
await self._storage.start()
|
|
await self._pipeline.initialize_all()
|
|
self._apply_pipeline_results()
|
|
|
|
if self._config.webhook_routes:
|
|
from framework.host.webhook_server import (
|
|
WebhookRoute,
|
|
WebhookServer,
|
|
WebhookServerConfig,
|
|
)
|
|
|
|
wh_config = WebhookServerConfig(
|
|
host=self._config.webhook_host,
|
|
port=self._config.webhook_port,
|
|
)
|
|
self._webhook_server = WebhookServer(self._event_bus, wh_config)
|
|
for rc in self._config.webhook_routes:
|
|
route = WebhookRoute(
|
|
source_id=rc["source_id"],
|
|
path=rc["path"],
|
|
methods=rc.get("methods", ["POST"]),
|
|
secret=rc.get("secret"),
|
|
)
|
|
self._webhook_server.add_route(route)
|
|
await self._webhook_server.start()
|
|
|
|
await self._start_timers()
|
|
await self._skills_manager.start_watching()
|
|
|
|
self._running = True
|
|
self._timers_paused = False
|
|
logger.info(
|
|
"ColonyRuntime started: colony_id=%s, triggers=%d",
|
|
self._colony_id,
|
|
len(self._triggers),
|
|
)
|
|
|
|
async def stop(self) -> None:
|
|
if not self._running:
|
|
return
|
|
|
|
async with self._lock:
|
|
await self.stop_all_workers()
|
|
|
|
for task in self._timer_tasks:
|
|
task.cancel()
|
|
self._timer_tasks.clear()
|
|
|
|
for sub_id in self._event_subscriptions:
|
|
self._event_bus.unsubscribe(sub_id)
|
|
self._event_subscriptions.clear()
|
|
|
|
if self._webhook_server:
|
|
await self._webhook_server.stop()
|
|
self._webhook_server = None
|
|
|
|
await self._skills_manager.stop_watching()
|
|
await self._storage.stop()
|
|
|
|
self._running = False
|
|
logger.info("ColonyRuntime stopped: colony_id=%s", self._colony_id)
|
|
|
|
def pause_timers(self) -> None:
|
|
self._timers_paused = True
|
|
|
|
def resume_timers(self) -> None:
|
|
self._timers_paused = False
|
|
|
|
# ── Worker Spawning ─────────────────────────────────────────
|
|
|
|
async def spawn(
|
|
self,
|
|
task: str,
|
|
count: int = 1,
|
|
input_data: dict[str, Any] | None = None,
|
|
session_state: dict[str, Any] | None = None,
|
|
) -> list[str]:
|
|
"""Spawn worker clones and start them in the background.
|
|
|
|
Returns list of worker IDs.
|
|
"""
|
|
if not self._running:
|
|
raise RuntimeError("ColonyRuntime is not running")
|
|
|
|
from framework.agent_loop.agent_loop import AgentLoop
|
|
|
|
worker_ids = []
|
|
for i in range(count):
|
|
worker_id = self._session_store.generate_session_id()
|
|
|
|
agent_loop = AgentLoop(
|
|
llm=self._llm,
|
|
tools=list(self._tools),
|
|
tool_executor=self._tool_executor,
|
|
event_bus=self._scoped_event_bus,
|
|
stream_id=f"worker:{worker_id}",
|
|
execution_id=worker_id,
|
|
)
|
|
|
|
agent_context = AgentContext(
|
|
runtime=self._make_runtime_adapter(worker_id),
|
|
agent_id=worker_id,
|
|
agent_spec=self._agent_spec,
|
|
input_data=input_data or {"task": task},
|
|
goal_context=self._goal.to_prompt_context(),
|
|
goal=self._goal,
|
|
accounts_prompt=self._accounts_prompt,
|
|
skills_catalog_prompt=self.skills_catalog_prompt,
|
|
protocols_prompt=self.protocols_prompt,
|
|
skill_dirs=self.skill_dirs,
|
|
execution_id=worker_id,
|
|
stream_id=f"worker:{worker_id}",
|
|
)
|
|
|
|
worker = Worker(
|
|
worker_id=worker_id,
|
|
task=task,
|
|
agent_loop=agent_loop,
|
|
context=agent_context,
|
|
event_bus=self._scoped_event_bus,
|
|
colony_id=self._colony_id,
|
|
)
|
|
|
|
self._workers[worker_id] = worker
|
|
await worker.start_background()
|
|
worker_ids.append(worker_id)
|
|
|
|
logger.info(
|
|
"Spawned worker %s (%d/%d) for task: %s",
|
|
worker_id,
|
|
i + 1,
|
|
count,
|
|
task[:80],
|
|
)
|
|
|
|
return worker_ids
|
|
|
|
async def trigger(
|
|
self,
|
|
trigger_id: str,
|
|
input_data: dict[str, Any],
|
|
correlation_id: str | None = None,
|
|
session_state: dict[str, Any] | None = None,
|
|
idempotency_key: str | None = None,
|
|
) -> str:
|
|
"""Trigger a worker spawn from a trigger definition.
|
|
|
|
Non-blocking — returns worker ID immediately.
|
|
"""
|
|
if not self._running:
|
|
raise RuntimeError("ColonyRuntime is not running")
|
|
|
|
if idempotency_key is not None:
|
|
self._prune_idempotency_keys()
|
|
cached = self._idempotency_keys.get(idempotency_key)
|
|
if cached is not None:
|
|
return cached
|
|
|
|
if self._pipeline.stages:
|
|
from framework.pipeline.stage import PipelineContext
|
|
|
|
pipeline_ctx = PipelineContext(
|
|
entry_point_id=trigger_id,
|
|
input_data=input_data,
|
|
correlation_id=correlation_id,
|
|
session_state=session_state,
|
|
)
|
|
pipeline_ctx = await self._pipeline.run(pipeline_ctx)
|
|
input_data = pipeline_ctx.input_data
|
|
|
|
task = input_data.get("task", json.dumps(input_data))
|
|
worker_ids = await self.spawn(
|
|
task=task,
|
|
count=1,
|
|
input_data=input_data,
|
|
session_state=session_state,
|
|
)
|
|
|
|
worker_id = worker_ids[0] if worker_ids else ""
|
|
|
|
if idempotency_key is not None and worker_id:
|
|
self._idempotency_keys[idempotency_key] = worker_id
|
|
self._idempotency_times[idempotency_key] = time.time()
|
|
|
|
return worker_id
|
|
|
|
async def trigger_and_wait(
|
|
self,
|
|
trigger_id: str,
|
|
input_data: dict[str, Any],
|
|
timeout: float | None = None,
|
|
session_state: dict[str, Any] | None = None,
|
|
) -> WorkerResult | None:
|
|
worker_id = await self.trigger(trigger_id, input_data, session_state=session_state)
|
|
if not worker_id:
|
|
return None
|
|
return await self.wait_for_worker(worker_id, timeout)
|
|
|
|
# ── Worker Control ──────────────────────────────────────────
|
|
|
|
async def stop_worker(self, worker_id: str) -> None:
|
|
worker = self._workers.get(worker_id)
|
|
if worker:
|
|
await worker.stop()
|
|
logger.info("Stopped worker %s", worker_id)
|
|
|
|
async def stop_all_workers(self) -> None:
|
|
tasks = []
|
|
for worker in self._workers.values():
|
|
if worker.is_active:
|
|
tasks.append(worker.stop())
|
|
if tasks:
|
|
await asyncio.gather(*tasks, return_exceptions=True)
|
|
self._workers.clear()
|
|
|
|
async def send_to_worker(self, worker_id: str, message: str) -> bool:
|
|
worker = self._workers.get(worker_id)
|
|
if worker and worker.is_active:
|
|
await worker.inject(message)
|
|
return True
|
|
return False
|
|
|
|
# ── Status & Query ──────────────────────────────────────────
|
|
|
|
def list_workers(self) -> list[WorkerInfo]:
|
|
return [w.info for w in self._workers.values()]
|
|
|
|
def get_worker(self, worker_id: str) -> Worker | None:
|
|
return self._workers.get(worker_id)
|
|
|
|
def list_triggers(self) -> list[TriggerSpec]:
|
|
return list(self._triggers.values())
|
|
|
|
def get_entry_points(self) -> list[TriggerSpec]:
|
|
return list(self._triggers.values())
|
|
|
|
def get_timer_next_fire_in(self, trigger_id: str) -> float | None:
|
|
mono = self._timer_next_fire.get(trigger_id)
|
|
if mono is not None:
|
|
return max(0.0, mono - time.monotonic())
|
|
return None
|
|
|
|
def get_worker_result(self, worker_id: str) -> WorkerResult | None:
|
|
return self._execution_results.get(worker_id)
|
|
|
|
async def wait_for_worker(
|
|
self, worker_id: str, timeout: float | None = None
|
|
) -> WorkerResult | None:
|
|
worker = self._workers.get(worker_id)
|
|
if worker is None:
|
|
return self._execution_results.get(worker_id)
|
|
if worker._task_handle is None:
|
|
return worker.info.result
|
|
try:
|
|
await asyncio.wait_for(asyncio.shield(worker._task_handle), timeout=timeout)
|
|
except asyncio.TimeoutError:
|
|
return None
|
|
return worker.info.result
|
|
|
|
def get_stats(self) -> dict:
|
|
return {
|
|
"running": self._running,
|
|
"colony_id": self._colony_id,
|
|
"active_workers": self.active_worker_count,
|
|
"total_workers": len(self._workers),
|
|
"triggers": len(self._triggers),
|
|
"event_bus": self._event_bus.get_stats(),
|
|
}
|
|
|
|
def get_active_streams(self) -> list[dict[str, Any]]:
|
|
result = []
|
|
for wid, worker in self._workers.items():
|
|
if worker.is_active:
|
|
result.append(
|
|
{
|
|
"colony_id": self._colony_id,
|
|
"worker_id": wid,
|
|
"status": worker.status.value,
|
|
"task": worker.task[:100],
|
|
}
|
|
)
|
|
return result
|
|
|
|
async def inject_input(
|
|
self,
|
|
worker_id: str,
|
|
content: str,
|
|
*,
|
|
is_client_input: bool = False,
|
|
image_content: list[dict[str, Any]] | None = None,
|
|
) -> bool:
|
|
self._last_user_input_time = time.monotonic()
|
|
worker = self._workers.get(worker_id)
|
|
if worker and worker.is_active:
|
|
loop = worker._agent_loop
|
|
if hasattr(loop, "inject_event"):
|
|
await loop.inject_event(
|
|
content, is_client_input=is_client_input, image_content=image_content
|
|
)
|
|
return True
|
|
return False
|
|
|
|
# ── Event Subscriptions ─────────────────────────────────────
|
|
|
|
def subscribe_to_events(
|
|
self,
|
|
event_types: list,
|
|
handler: Callable,
|
|
filter_stream: str | None = None,
|
|
filter_colony: str | None = None,
|
|
) -> str:
|
|
return self._event_bus.subscribe(
|
|
event_types=event_types,
|
|
handler=handler,
|
|
filter_stream=filter_stream,
|
|
filter_colony=filter_colony,
|
|
)
|
|
|
|
def unsubscribe_from_events(self, subscription_id: str) -> bool:
|
|
return self._event_bus.unsubscribe(subscription_id)
|
|
|
|
# ── Trigger Registration ────────────────────────────────────
|
|
|
|
def register_trigger(self, spec: TriggerSpec) -> None:
|
|
if self._running:
|
|
raise RuntimeError("Cannot register triggers while runtime is running")
|
|
if spec.id in self._triggers:
|
|
raise ValueError(f"Trigger '{spec.id}' already registered")
|
|
self._triggers[spec.id] = spec
|
|
logger.info("Registered trigger: %s (%s)", spec.id, spec.trigger_type)
|
|
|
|
def unregister_trigger(self, trigger_id: str) -> bool:
|
|
if self._running:
|
|
raise RuntimeError("Cannot unregister triggers while runtime is running")
|
|
return self._triggers.pop(trigger_id, None) is not None
|
|
|
|
# ── Internal Helpers ────────────────────────────────────────
|
|
|
|
def _make_runtime_adapter(self, worker_id: str):
|
|
from framework.host.stream_runtime import StreamDecisionTracker
|
|
|
|
return StreamDecisionTracker(
|
|
stream_id=f"worker:{worker_id}",
|
|
storage=self._storage,
|
|
outcome_aggregator=None,
|
|
)
|
|
|
|
def _prune_idempotency_keys(self) -> None:
|
|
ttl = self._config.idempotency_ttl_seconds
|
|
if ttl > 0:
|
|
cutoff = time.time() - ttl
|
|
for key, recorded_at in list(self._idempotency_times.items()):
|
|
if recorded_at < cutoff:
|
|
self._idempotency_times.pop(key, None)
|
|
self._idempotency_keys.pop(key, None)
|
|
max_keys = self._config.idempotency_max_keys
|
|
if max_keys > 0:
|
|
while len(self._idempotency_keys) > max_keys:
|
|
old_key, _ = self._idempotency_keys.popitem(last=False)
|
|
self._idempotency_times.pop(old_key, None)
|
|
|
|
async def _start_timers(self) -> None:
|
|
for trig_id, spec in self._triggers.items():
|
|
if spec.trigger_type != "timer":
|
|
continue
|
|
tc = spec.trigger_config
|
|
_raw_interval = tc.get("interval_minutes")
|
|
interval = float(_raw_interval) if _raw_interval is not None else None
|
|
run_immediately = tc.get("run_immediately", False)
|
|
|
|
if interval and interval > 0 and self._running:
|
|
task = asyncio.create_task(self._timer_loop(trig_id, interval, run_immediately))
|
|
self._timer_tasks.append(task)
|
|
|
|
async def _timer_loop(
|
|
self,
|
|
trigger_id: str,
|
|
interval_minutes: float,
|
|
immediate: bool,
|
|
idle_timeout: float = 300,
|
|
) -> None:
|
|
interval_secs = interval_minutes * 60
|
|
if not immediate:
|
|
self._timer_next_fire[trigger_id] = time.monotonic() + interval_secs
|
|
await asyncio.sleep(interval_secs)
|
|
|
|
while self._running:
|
|
if self._timers_paused:
|
|
self._timer_next_fire[trigger_id] = time.monotonic() + interval_secs
|
|
await asyncio.sleep(interval_secs)
|
|
continue
|
|
|
|
idle = self.agent_idle_seconds
|
|
if idle < idle_timeout:
|
|
logger.debug("Timer '%s': agent active, skipping", trigger_id)
|
|
self._timer_next_fire[trigger_id] = time.monotonic() + interval_secs
|
|
await asyncio.sleep(interval_secs)
|
|
continue
|
|
|
|
self._timer_next_fire.pop(trigger_id, None)
|
|
try:
|
|
await self.trigger(
|
|
trigger_id,
|
|
{"event": {"source": "timer", "reason": "scheduled"}},
|
|
)
|
|
except Exception:
|
|
logger.error("Timer trigger failed for '%s'", trigger_id, exc_info=True)
|
|
|
|
self._timer_next_fire[trigger_id] = time.monotonic() + interval_secs
|
|
await asyncio.sleep(interval_secs)
|
|
|
|
async def cancel_all_tasks_async(self) -> bool:
|
|
cancelled = False
|
|
for worker in self._workers.values():
|
|
if worker._task_handle and not worker._task_handle.done():
|
|
worker._task_handle.cancel()
|
|
cancelled = True
|
|
return cancelled
|
|
|
|
def cancel_all_tasks(self, loop: asyncio.AbstractEventLoop) -> bool:
|
|
future = asyncio.run_coroutine_threadsafe(self.cancel_all_tasks_async(), loop)
|
|
try:
|
|
return future.result(timeout=5)
|
|
except Exception:
|
|
logger.warning("cancel_all_tasks: timed out or failed")
|
|
return False
|
|
|
|
async def cancel_execution(self, trigger_id: str, worker_id: str) -> bool:
|
|
worker = self._workers.get(worker_id)
|
|
if worker and worker.is_active:
|
|
await worker.stop()
|
|
return True
|
|
return False
|