00e0e9a49a
* feat(persistence): add SQLAlchemy 2.0 async ORM scaffold Introduce a unified database configuration (DatabaseConfig) that controls both the LangGraph checkpointer and the DeerFlow application persistence layer from a single `database:` config section. New modules: - deerflow.config.database_config — Pydantic config with memory/sqlite/postgres backends - deerflow.persistence — async engine lifecycle, DeclarativeBase with to_dict mixin, Alembic skeleton - deerflow.runtime.runs.store — RunStore ABC + MemoryRunStore implementation Gateway integration initializes/tears down the persistence engine in the existing langgraph_runtime() context manager. Legacy checkpointer config is preserved for backward compatibility. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * feat(persistence): add RunEventStore ABC + MemoryRunEventStore Phase 2-A prerequisite for event storage: adds the unified run event stream interface (RunEventStore) with an in-memory implementation, RunEventsConfig, gateway integration, and comprehensive tests (27 cases). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * feat(persistence): add ORM models, repositories, DB/JSONL event stores, RunJournal, and API endpoints Phase 2-B: run persistence + event storage + token tracking. - ORM models: RunRow (with token fields), ThreadMetaRow, RunEventRow - RunRepository implements RunStore ABC via SQLAlchemy ORM - ThreadMetaRepository with owner access control - DbRunEventStore with trace content truncation and cursor pagination - JsonlRunEventStore with per-run files and seq recovery from disk - RunJournal (BaseCallbackHandler) captures LLM/tool/lifecycle events, accumulates token usage by caller type, buffers and flushes to store - RunManager now accepts optional RunStore for persistent backing - Worker creates RunJournal, writes human_message, injects callbacks - Gateway deps use factory functions (RunRepository when DB available) - New endpoints: messages, run messages, run events, token-usage - ThreadCreateRequest gains assistant_id field - 92 tests pass (33 new), zero regressions Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * feat(persistence): add user feedback + follow-up run association Phase 2-C: feedback and follow-up tracking. - FeedbackRow ORM model (rating +1/-1, optional message_id, comment) - FeedbackRepository with CRUD, list_by_run/thread, aggregate stats - Feedback API endpoints: create, list, stats, delete - follow_up_to_run_id in RunCreateRequest (explicit or auto-detected from latest successful run on the thread) - Worker writes follow_up_to_run_id into human_message event metadata - Gateway deps: feedback_repo factory + getter - 17 new tests (14 FeedbackRepository + 3 follow-up association) - 109 total tests pass, zero regressions Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * test+config: comprehensive Phase 2 test coverage + deprecate checkpointer config - config.example.yaml: deprecate standalone checkpointer section, activate unified database:sqlite as default (drives both checkpointer + app data) - New: test_thread_meta_repo.py (14 tests) — full ThreadMetaRepository coverage including check_access owner logic, list_by_owner pagination - Extended test_run_repository.py (+4 tests) — completion preserves fields, list ordering desc, limit, owner_none returns all - Extended test_run_journal.py (+8 tests) — on_chain_error, track_tokens=false, middleware no ai_message, unknown caller tokens, convenience fields, tool_error, non-summarization custom event - Extended test_run_event_store.py (+7 tests) — DB batch seq continuity, make_run_event_store factory (memory/db/jsonl/fallback/unknown) - Extended test_phase2b_integration.py (+4 tests) — create_or_reject persists, follow-up metadata, summarization in history, full DB-backed lifecycle - Fixed DB integration test to use proper fake objects (not MagicMock) for JSON-serializable metadata - 157 total Phase 2 tests pass, zero regressions Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * config: move default sqlite_dir to .deer-flow/data Keep SQLite databases alongside other DeerFlow-managed data (threads, memory) under the .deer-flow/ directory instead of a top-level ./data folder. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * refactor(persistence): remove UTFJSON, use engine-level json_serializer + datetime.now() - Replace custom UTFJSON type with standard sqlalchemy.JSON in all ORM models. Add json_serializer=json.dumps(ensure_ascii=False) to all create_async_engine calls so non-ASCII text (Chinese etc.) is stored as-is in both SQLite and Postgres. - Change ORM datetime defaults from datetime.now(UTC) to datetime.now(), remove UTC imports. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * refactor(gateway): simplify deps.py with getter factory + inline repos - Replace 6 identical getter functions with _require() factory. - Inline 3 _make_*_repo() factories into langgraph_runtime(), call get_session_factory() once instead of 3 times. - Add thread_meta upsert in start_run (services.py). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * feat(docker): add UV_EXTRAS build arg for optional dependencies Support installing optional dependency groups (e.g. postgres) at Docker build time via UV_EXTRAS build arg: UV_EXTRAS=postgres docker compose build Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * refactor(journal): fix flush, token tracking, and consolidate tests RunJournal fixes: - _flush_sync: retain events in buffer when no event loop instead of dropping them; worker's finally block flushes via async flush(). - on_llm_end: add tool_calls filter and caller=="lead_agent" guard for ai_message events; mark message IDs for dedup with record_llm_usage. - worker.py: persist completion data (tokens, message count) to RunStore in finally block. Model factory: - Auto-inject stream_usage=True for BaseChatOpenAI subclasses with custom api_base, so usage_metadata is populated in streaming responses. Test consolidation: - Delete test_phase2b_integration.py (redundant with existing tests). - Move DB-backed lifecycle test into test_run_journal.py. - Add tests for stream_usage injection in test_model_factory.py. - Clean up executor/task_tool dead journal references. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * feat(events): widen content type to str|dict in all store backends Allow event content to be a dict (for structured OpenAI-format messages) in addition to plain strings. Dict values are JSON-serialized for the DB backend and deserialized on read; memory and JSONL backends handle dicts natively. Trace truncation now serializes dicts to JSON before measuring. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix(events): use metadata flag instead of heuristic for dict content detection Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * feat(converters): add LangChain-to-OpenAI message format converters Pure functions langchain_to_openai_message, langchain_to_openai_completion, langchain_messages_to_openai, and _infer_finish_reason for converting LangChain BaseMessage objects to OpenAI Chat Completions format, used by RunJournal for event storage. 15 unit tests added. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix(converters): handle empty list content as null, clean up test Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * feat(events): human_message content uses OpenAI user message format Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * feat(events): ai_message uses OpenAI format, add ai_tool_call message event - ai_message content now uses {"role": "assistant", "content": "..."} format - New ai_tool_call message event emitted when lead_agent LLM responds with tool_calls - ai_tool_call uses langchain_to_openai_message converter for consistent format - Both events include finish_reason in metadata ("stop" or "tool_calls") Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * feat(events): add tool_result message event with OpenAI tool message format Cache tool_call_id from on_tool_start keyed by run_id as fallback for on_tool_end, then emit a tool_result message event (role=tool, tool_call_id, content) after each successful tool completion. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * feat(events): summary content uses OpenAI system message format Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * feat(events): replace llm_start/llm_end with llm_request/llm_response in OpenAI format Add on_chat_model_start to capture structured prompt messages as llm_request events. Replace llm_end trace events with llm_response using OpenAI Chat Completions format. Track llm_call_index to pair request/response events. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * feat(events): add record_middleware method for middleware trace events Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * test(events): add full run sequence integration test for OpenAI content format Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * feat(events): align message events with checkpoint format and add middleware tag injection - Message events (ai_message, ai_tool_call, tool_result, human_message) now use BaseMessage.model_dump() format, matching LangGraph checkpoint values.messages - on_tool_end extracts tool_call_id/name/status from ToolMessage objects - on_tool_error now emits tool_result message events with error status - record_middleware uses middleware:{tag} event_type and middleware category - Summarization custom events use middleware:summarize category - TitleMiddleware injects middleware:title tag via get_config() inheritance - SummarizationMiddleware model bound with middleware:summarize tag - Worker writes human_message using HumanMessage.model_dump() Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * feat(threads): switch search endpoint to threads_meta table and sync title - POST /api/threads/search now queries threads_meta table directly, removing the two-phase Store + Checkpointer scan approach - Add ThreadMetaRepository.search() with metadata/status filters - Add ThreadMetaRepository.update_display_name() for title sync - Worker syncs checkpoint title to threads_meta.display_name on run completion - Map display_name to values.title in search response for API compatibility Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * feat(threads): history endpoint reads messages from event store - POST /api/threads/{thread_id}/history now combines two data sources: checkpointer for checkpoint_id, metadata, title, thread_data; event store for messages (complete history, not truncated by summarization) - Strip internal LangGraph metadata keys from response - Remove full channel_values serialization in favor of selective fields Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: remove duplicate optional-dependencies header in pyproject.toml Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix(middleware): pass tagged config to TitleMiddleware ainvoke call Without the config, the middleware:title tag was not injected, causing the LLM response to be recorded as a lead_agent ai_message in run_events. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: resolve merge conflict in .env.example Keep both DATABASE_URL (from persistence-scaffold) and WECOM credentials (from main) after the merge. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix(persistence): address review feedback on PR #1851 - Fix naive datetime.now() → datetime.now(UTC) in all ORM models - Fix seq race condition in DbRunEventStore.put() with FOR UPDATE and UNIQUE(thread_id, seq) constraint - Encapsulate _store access in RunManager.update_run_completion() - Deduplicate _store.put() logic in RunManager via _persist_to_store() - Add update_run_completion to RunStore ABC + MemoryRunStore - Wire follow_up_to_run_id through the full create path - Add error recovery to RunJournal._flush_sync() lost-event scenario - Add migration note for search_threads breaking change - Fix test_checkpointer_none_fix mock to set database=None Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * chore: update uv.lock Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix(persistence): address 22 review comments from CodeQL, Copilot, and Code Quality Bug fixes: - Sanitize log params to prevent log injection (CodeQL) - Reset threads_meta.status to idle/error when run completes - Attach messages only to latest checkpoint in /history response - Write threads_meta on POST /threads so new threads appear in search Lint fixes: - Remove unused imports (journal.py, migrations/env.py, test_converters.py) - Convert lambda to named function (engine.py, Ruff E731) - Remove unused logger definitions in repos (Ruff F841) - Add logging to JSONL decode errors and empty except blocks - Separate assert side-effects in tests (CodeQL) - Remove unused local variables in tests (Ruff F841) - Fix max_trace_content truncation to use byte length, not char length Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * style: apply ruff format to persistence and runtime files Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * Potential fix for pull request finding 'Statement has no effect' Co-authored-by: Copilot Autofix powered by AI <223894421+github-code-quality[bot]@users.noreply.github.com> * refactor(runtime): introduce RunContext to reduce run_agent parameter bloat Extract checkpointer, store, event_store, run_events_config, thread_meta_repo, and follow_up_to_run_id into a frozen RunContext dataclass. Add get_run_context() in deps.py to build the base context from app.state singletons. start_run() uses dataclasses.replace() to enrich per-run fields before passing ctx to run_agent. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * refactor(gateway): move sanitize_log_param to app/gateway/utils.py Extract the log-injection sanitizer from routers/threads.py into a shared utils module and rename to sanitize_log_param (public API). Eliminates the reverse service → router import in services.py. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * perf: use SQL aggregation for feedback stats and thread token usage Replace Python-side counting in FeedbackRepository.aggregate_by_run with a single SELECT COUNT/SUM query. Add RunStore.aggregate_tokens_by_thread abstract method with SQL GROUP BY implementation in RunRepository and Python fallback in MemoryRunStore. Simplify the thread_token_usage endpoint to delegate to the new method, eliminating the limit=10000 truncation risk. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * docs: annotate DbRunEventStore.put() as low-frequency path Add docstring clarifying that put() opens a per-call transaction with FOR UPDATE and should only be used for infrequent writes (currently just the initial human_message event). High-throughput callers should use put_batch() instead. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix(threads): fall back to Store search when ThreadMetaRepository is unavailable When database.backend=memory (default) or no SQL session factory is configured, search_threads now queries the LangGraph Store instead of returning 503. Returns empty list if neither Store nor repo is available. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * refactor(persistence): introduce ThreadMetaStore ABC for backend-agnostic thread metadata Add ThreadMetaStore abstract base class with create/get/search/update/delete interface. ThreadMetaRepository (SQL) now inherits from it. New MemoryThreadMetaStore wraps LangGraph BaseStore for memory-mode deployments. deps.py now always provides a non-None thread_meta_repo, eliminating all `if thread_meta_repo is not None` guards in services.py, worker.py, and routers/threads.py. search_threads no longer needs a Store fallback branch. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * refactor(history): read messages from checkpointer instead of RunEventStore The /history endpoint now reads messages directly from the checkpointer's channel_values (the authoritative source) instead of querying RunEventStore.list_messages(). The RunEventStore API is preserved for other consumers. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix(persistence): address new Copilot review comments - feedback.py: validate thread_id/run_id before deleting feedback - jsonl.py: add path traversal protection with ID validation - run_repo.py: parse `before` to datetime for PostgreSQL compat - thread_meta_repo.py: fix pagination when metadata filter is active - database_config.py: use resolve_path for sqlite_dir consistency Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * Implement skill self-evolution and skill_manage flow (#1874) * chore: ignore .worktrees directory * Add skill_manage self-evolution flow * Fix CI regressions for skill_manage * Address PR review feedback for skill evolution * fix(skill-evolution): preserve history on delete * fix(skill-evolution): tighten scanner fallbacks * docs: add skill_manage e2e evidence screenshot * fix(skill-manage): avoid blocking fs ops in session runtime --------- Co-authored-by: Willem Jiang <willem.jiang@gmail.com> * fix(config): resolve sqlite_dir relative to CWD, not Paths.base_dir resolve_path() resolves relative to Paths.base_dir (.deer-flow), which double-nested the path to .deer-flow/.deer-flow/data/app.db. Use Path.resolve() (CWD-relative) instead. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * Feature/feishu receive file (#1608) * feat(feishu): add channel file materialization hook for inbound messages - Introduce Channel.receive_file(msg, thread_id) as a base method for file materialization; default is no-op. - Implement FeishuChannel.receive_file to download files/images from Feishu messages, save to sandbox, and inject virtual paths into msg.text. - Update ChannelManager to call receive_file for any channel if msg.files is present, enabling downstream model access to user-uploaded files. - No impact on Slack/Telegram or other channels (they inherit the default no-op). * style(backend): format code with ruff for lint compliance - Auto-formatted packages/harness/deerflow/agents/factory.py and tests/test_create_deerflow_agent.py using `ruff format` - Ensured both files conform to project linting standards - Fixes CI lint check failures caused by code style issues * fix(feishu): handle file write operation asynchronously to prevent blocking * fix(feishu): rename GetMessageResourceRequest to _GetMessageResourceRequest and remove redundant code * test(feishu): add tests for receive_file method and placeholder replacement * fix(manager): remove unnecessary type casting for channel retrieval * fix(feishu): update logging messages to reflect resource handling instead of image * fix(feishu): sanitize filename by replacing invalid characters in file uploads * fix(feishu): improve filename sanitization and reorder image key handling in message processing * fix(feishu): add thread lock to prevent filename conflicts during file downloads * fix(test): correct bad merge in test_feishu_parser.py * chore: run ruff and apply formatting cleanup fix(feishu): preserve rich-text attachment order and improve fallback filename handling * fix(docker): restore gateway env vars and fix langgraph empty arg issue (#1915) Two production docker-compose.yaml bugs prevent `make up` from working: 1. Gateway missing DEER_FLOW_CONFIG_PATH and DEER_FLOW_EXTENSIONS_CONFIG_PATH environment overrides. Added infb2d99f(#1836) but accidentally reverted byca2fb95(#1847). Without them, gateway reads host paths from .env via env_file, causing FileNotFoundError inside the container. 2. Langgraph command fails when LANGGRAPH_ALLOW_BLOCKING is unset (default). Empty $${allow_blocking} inserts a bare space between flags, causing ' --no-reload' to be parsed as unexpected extra argument. Fix by building args string first and conditionally appending --allow-blocking. Co-authored-by: cooper <cooperfu@tencent.com> * fix(frontend): resolve invalid HTML nesting and tabnabbing vulnerabilities (#1904) * fix(frontend): resolve invalid HTML nesting and tabnabbing vulnerabilities Fix `<button>` inside `<a>` invalid HTML in artifact components and add missing `noopener,noreferrer` to `window.open` calls to prevent reverse tabnabbing. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix(frontend): address Copilot review on tabnabbing and double-tab-open Remove redundant parent onClick on web_fetch ChainOfThoughtStep to prevent opening two tabs on link click, and explicitly null out window.opener after window.open() for defensive tabnabbing hardening. --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com> * refactor(persistence): organize entities into per-entity directories Restructure the persistence layer from horizontal "models/ + repositories/" split into vertical entity-aligned directories. Each entity (thread_meta, run, feedback) now owns its ORM model, abstract interface (where applicable), and concrete implementations under a single directory with an aggregating __init__.py for one-line imports. Layout: persistence/thread_meta/{base,model,sql,memory}.py persistence/run/{model,sql}.py persistence/feedback/{model,sql}.py models/__init__.py is kept as a facade so Alembic autogenerate continues to discover all ORM tables via Base.metadata. RunEventRow remains under models/run_event.py because its storage implementation lives in runtime/events/store/db.py and has no matching repository directory. The repositories/ directory is removed entirely. All call sites in gateway/deps.py and tests are updated to import from the new entity packages, e.g.: from deerflow.persistence.thread_meta import ThreadMetaRepository from deerflow.persistence.run import RunRepository from deerflow.persistence.feedback import FeedbackRepository Full test suite passes (1690 passed, 14 skipped). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix(gateway): sync thread rename and delete through ThreadMetaStore The POST /threads/{id}/state endpoint previously synced title changes only to the LangGraph Store via _store_upsert. In sqlite mode the search endpoint reads from the ThreadMetaRepository SQL table, so renames never appeared in /threads/search until the next agent run completed (worker.py syncs title from checkpoint to thread_meta in its finally block). Likewise the DELETE /threads/{id} endpoint cleaned up the filesystem, Store, and checkpointer but left the threads_meta row orphaned in sqlite, so deleted threads kept appearing in /threads/search. Fix both endpoints by routing through the ThreadMetaStore abstraction which already has the correct sqlite/memory implementations wired up by deps.py. The rename path now calls update_display_name() and the delete path calls delete() — both work uniformly across backends. Verified end-to-end with curl in gateway mode against sqlite backend. Existing test suite (1690 passed) and focused router/repo tests pass. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * refactor(gateway): route all thread metadata access through ThreadMetaStore Following the rename/delete bug fix in PR1, migrate the remaining direct LangGraph Store reads/writes in the threads router and services to the ThreadMetaStore abstraction so that the sqlite and memory backends behave identically and the legacy dual-write paths can be removed. Migrated endpoints (threads.py): - create_thread: idempotency check + write now use thread_meta_repo.get/create instead of dual-writing the LangGraph Store and the SQL row. - get_thread: reads from thread_meta_repo.get; the checkpoint-only fallback for legacy threads is preserved. - patch_thread: replaced _store_get/_store_put with thread_meta_repo.update_metadata. - delete_thread_data: dropped the legacy store.adelete; thread_meta_repo.delete already covers it. Removed dead code (services.py): - _upsert_thread_in_store — redundant with the immediately following thread_meta_repo.create() call. - _sync_thread_title_after_run — worker.py's finally block already syncs the title via thread_meta_repo.update_display_name() after each run. Removed dead code (threads.py): - _store_get / _store_put / _store_upsert helpers (no remaining callers). - THREADS_NS constant. - get_store import (router no longer touches the LangGraph Store directly). New abstract method: - ThreadMetaStore.update_metadata(thread_id, metadata) merges metadata into the thread's metadata field. Implemented in both ThreadMetaRepository (SQL, read-modify-write inside one session) and MemoryThreadMetaStore. Three new unit tests cover merge / empty / nonexistent behaviour. Net change: -134 lines. Full test suite: 1693 passed, 14 skipped. Verified end-to-end with curl in gateway mode against sqlite backend (create / patch / get / rename / search / delete). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Co-authored-by: Copilot Autofix powered by AI <223894421+github-code-quality[bot]@users.noreply.github.com> Co-authored-by: DanielWalnut <45447813+hetaoBackend@users.noreply.github.com> Co-authored-by: Willem Jiang <willem.jiang@gmail.com> Co-authored-by: JilongSun <965640067@qq.com> Co-authored-by: jie <49781832+stan-fu@users.noreply.github.com> Co-authored-by: cooper <cooperfu@tencent.com> Co-authored-by: yangzheli <43645580+yangzheli@users.noreply.github.com>
941 lines
36 KiB
Python
941 lines
36 KiB
Python
"""ChannelManager — consumes inbound messages and dispatches them to the DeerFlow agent via LangGraph Server."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import logging
|
|
import mimetypes
|
|
import re
|
|
import time
|
|
from collections.abc import Awaitable, Callable, Mapping
|
|
from typing import Any
|
|
|
|
import httpx
|
|
from langgraph_sdk.errors import ConflictError
|
|
|
|
from app.channels.commands import KNOWN_CHANNEL_COMMANDS
|
|
from app.channels.message_bus import InboundMessage, InboundMessageType, MessageBus, OutboundMessage, ResolvedAttachment
|
|
from app.channels.store import ChannelStore
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
DEFAULT_LANGGRAPH_URL = "http://localhost:2024"
|
|
DEFAULT_GATEWAY_URL = "http://localhost:8001"
|
|
DEFAULT_ASSISTANT_ID = "lead_agent"
|
|
CUSTOM_AGENT_NAME_PATTERN = re.compile(r"^[A-Za-z0-9-]+$")
|
|
|
|
DEFAULT_RUN_CONFIG: dict[str, Any] = {"recursion_limit": 100}
|
|
DEFAULT_RUN_CONTEXT: dict[str, Any] = {
|
|
"thinking_enabled": True,
|
|
"is_plan_mode": False,
|
|
"subagent_enabled": False,
|
|
}
|
|
STREAM_UPDATE_MIN_INTERVAL_SECONDS = 0.35
|
|
THREAD_BUSY_MESSAGE = "This conversation is already processing another request. Please wait for it to finish and try again."
|
|
|
|
CHANNEL_CAPABILITIES = {
|
|
"feishu": {"supports_streaming": True},
|
|
"slack": {"supports_streaming": False},
|
|
"telegram": {"supports_streaming": False},
|
|
"wecom": {"supports_streaming": True},
|
|
}
|
|
|
|
InboundFileReader = Callable[[dict[str, Any], httpx.AsyncClient], Awaitable[bytes | None]]
|
|
|
|
|
|
INBOUND_FILE_READERS: dict[str, InboundFileReader] = {}
|
|
|
|
|
|
def register_inbound_file_reader(channel_name: str, reader: InboundFileReader) -> None:
|
|
INBOUND_FILE_READERS[channel_name] = reader
|
|
|
|
|
|
async def _read_http_inbound_file(file_info: dict[str, Any], client: httpx.AsyncClient) -> bytes | None:
|
|
url = file_info.get("url")
|
|
if not isinstance(url, str) or not url:
|
|
return None
|
|
|
|
resp = await client.get(url)
|
|
resp.raise_for_status()
|
|
return resp.content
|
|
|
|
|
|
async def _read_wecom_inbound_file(file_info: dict[str, Any], client: httpx.AsyncClient) -> bytes | None:
|
|
data = await _read_http_inbound_file(file_info, client)
|
|
if data is None:
|
|
return None
|
|
|
|
aeskey = file_info.get("aeskey") if isinstance(file_info.get("aeskey"), str) else None
|
|
if not aeskey:
|
|
return data
|
|
|
|
try:
|
|
from aibot.crypto_utils import decrypt_file
|
|
except Exception:
|
|
logger.exception("[Manager] failed to import WeCom decrypt_file")
|
|
return None
|
|
|
|
return decrypt_file(data, aeskey)
|
|
|
|
|
|
register_inbound_file_reader("wecom", _read_wecom_inbound_file)
|
|
|
|
|
|
class InvalidChannelSessionConfigError(ValueError):
|
|
"""Raised when IM channel session overrides contain invalid agent config."""
|
|
|
|
|
|
def _is_thread_busy_error(exc: BaseException | None) -> bool:
|
|
if exc is None:
|
|
return False
|
|
if isinstance(exc, ConflictError):
|
|
return True
|
|
return "already running a task" in str(exc)
|
|
|
|
|
|
def _as_dict(value: Any) -> dict[str, Any]:
|
|
return dict(value) if isinstance(value, Mapping) else {}
|
|
|
|
|
|
def _merge_dicts(*layers: Any) -> dict[str, Any]:
|
|
merged: dict[str, Any] = {}
|
|
for layer in layers:
|
|
if isinstance(layer, Mapping):
|
|
merged.update(layer)
|
|
return merged
|
|
|
|
|
|
def _normalize_custom_agent_name(raw_value: str) -> str:
|
|
"""Normalize legacy channel assistant IDs into valid custom agent names."""
|
|
normalized = raw_value.strip().lower().replace("_", "-")
|
|
if not normalized:
|
|
raise InvalidChannelSessionConfigError("Channel session assistant_id is empty. Use 'lead_agent' or a valid custom agent name.")
|
|
if not CUSTOM_AGENT_NAME_PATTERN.fullmatch(normalized):
|
|
raise InvalidChannelSessionConfigError(f"Invalid channel session assistant_id {raw_value!r}. Use 'lead_agent' or a custom agent name containing only letters, digits, and hyphens.")
|
|
return normalized
|
|
|
|
|
|
def _extract_response_text(result: dict | list) -> str:
|
|
"""Extract the last AI message text from a LangGraph runs.wait result.
|
|
|
|
``runs.wait`` returns the final state dict which contains a ``messages``
|
|
list. Each message is a dict with at least ``type`` and ``content``.
|
|
|
|
Handles special cases:
|
|
- Regular AI text responses
|
|
- Clarification interrupts (``ask_clarification`` tool messages)
|
|
- AI messages with tool_calls but no text content
|
|
"""
|
|
if isinstance(result, list):
|
|
messages = result
|
|
elif isinstance(result, dict):
|
|
messages = result.get("messages", [])
|
|
else:
|
|
return ""
|
|
|
|
# Walk backwards to find usable response text, but stop at the last
|
|
# human message to avoid returning text from a previous turn.
|
|
for msg in reversed(messages):
|
|
if not isinstance(msg, dict):
|
|
continue
|
|
|
|
msg_type = msg.get("type")
|
|
|
|
# Stop at the last human message — anything before it is a previous turn
|
|
if msg_type == "human":
|
|
break
|
|
|
|
# Check for tool messages from ask_clarification (interrupt case)
|
|
if msg_type == "tool" and msg.get("name") == "ask_clarification":
|
|
content = msg.get("content", "")
|
|
if isinstance(content, str) and content:
|
|
return content
|
|
|
|
# Regular AI message with text content
|
|
if msg_type == "ai":
|
|
content = msg.get("content", "")
|
|
if isinstance(content, str) and content:
|
|
return content
|
|
# content can be a list of content blocks
|
|
if isinstance(content, list):
|
|
parts = []
|
|
for block in content:
|
|
if isinstance(block, dict) and block.get("type") == "text":
|
|
parts.append(block.get("text", ""))
|
|
elif isinstance(block, str):
|
|
parts.append(block)
|
|
text = "".join(parts)
|
|
if text:
|
|
return text
|
|
return ""
|
|
|
|
|
|
def _extract_text_content(content: Any) -> str:
|
|
"""Extract text from a streaming payload content field."""
|
|
if isinstance(content, str):
|
|
return content
|
|
if isinstance(content, list):
|
|
parts: list[str] = []
|
|
for block in content:
|
|
if isinstance(block, str):
|
|
parts.append(block)
|
|
elif isinstance(block, Mapping):
|
|
text = block.get("text")
|
|
if isinstance(text, str):
|
|
parts.append(text)
|
|
else:
|
|
nested = block.get("content")
|
|
if isinstance(nested, str):
|
|
parts.append(nested)
|
|
return "".join(parts)
|
|
if isinstance(content, Mapping):
|
|
for key in ("text", "content"):
|
|
value = content.get(key)
|
|
if isinstance(value, str):
|
|
return value
|
|
return ""
|
|
|
|
|
|
def _merge_stream_text(existing: str, chunk: str) -> str:
|
|
"""Merge either delta text or cumulative text into a single snapshot."""
|
|
if not chunk:
|
|
return existing
|
|
if not existing or chunk == existing:
|
|
return chunk or existing
|
|
if chunk.startswith(existing):
|
|
return chunk
|
|
if existing.endswith(chunk):
|
|
return existing
|
|
return existing + chunk
|
|
|
|
|
|
def _extract_stream_message_id(payload: Any, metadata: Any) -> str | None:
|
|
"""Best-effort extraction of the streamed AI message identifier."""
|
|
candidates = [payload, metadata]
|
|
if isinstance(payload, Mapping):
|
|
candidates.append(payload.get("kwargs"))
|
|
|
|
for candidate in candidates:
|
|
if not isinstance(candidate, Mapping):
|
|
continue
|
|
for key in ("id", "message_id"):
|
|
value = candidate.get(key)
|
|
if isinstance(value, str) and value:
|
|
return value
|
|
return None
|
|
|
|
|
|
def _accumulate_stream_text(
|
|
buffers: dict[str, str],
|
|
current_message_id: str | None,
|
|
event_data: Any,
|
|
) -> tuple[str | None, str | None]:
|
|
"""Convert a ``messages-tuple`` event into the latest displayable AI text."""
|
|
payload = event_data
|
|
metadata: Any = None
|
|
if isinstance(event_data, (list, tuple)):
|
|
if event_data:
|
|
payload = event_data[0]
|
|
if len(event_data) > 1:
|
|
metadata = event_data[1]
|
|
|
|
if isinstance(payload, str):
|
|
message_id = current_message_id or "__default__"
|
|
buffers[message_id] = _merge_stream_text(buffers.get(message_id, ""), payload)
|
|
return buffers[message_id], message_id
|
|
|
|
if not isinstance(payload, Mapping):
|
|
return None, current_message_id
|
|
|
|
payload_type = str(payload.get("type", "")).lower()
|
|
if "tool" in payload_type:
|
|
return None, current_message_id
|
|
|
|
text = _extract_text_content(payload.get("content"))
|
|
if not text and isinstance(payload.get("kwargs"), Mapping):
|
|
text = _extract_text_content(payload["kwargs"].get("content"))
|
|
if not text:
|
|
return None, current_message_id
|
|
|
|
message_id = _extract_stream_message_id(payload, metadata) or current_message_id or "__default__"
|
|
buffers[message_id] = _merge_stream_text(buffers.get(message_id, ""), text)
|
|
return buffers[message_id], message_id
|
|
|
|
|
|
def _extract_artifacts(result: dict | list) -> list[str]:
|
|
"""Extract artifact paths from the last AI response cycle only.
|
|
|
|
Instead of reading the full accumulated ``artifacts`` state (which contains
|
|
all artifacts ever produced in the thread), this inspects the messages after
|
|
the last human message and collects file paths from ``present_files`` tool
|
|
calls. This ensures only newly-produced artifacts are returned.
|
|
"""
|
|
if isinstance(result, list):
|
|
messages = result
|
|
elif isinstance(result, dict):
|
|
messages = result.get("messages", [])
|
|
else:
|
|
return []
|
|
|
|
artifacts: list[str] = []
|
|
for msg in reversed(messages):
|
|
if not isinstance(msg, dict):
|
|
continue
|
|
# Stop at the last human message — anything before it is a previous turn
|
|
if msg.get("type") == "human":
|
|
break
|
|
# Look for AI messages with present_files tool calls
|
|
if msg.get("type") == "ai":
|
|
for tc in msg.get("tool_calls", []):
|
|
if isinstance(tc, dict) and tc.get("name") == "present_files":
|
|
args = tc.get("args", {})
|
|
paths = args.get("filepaths", [])
|
|
if isinstance(paths, list):
|
|
artifacts.extend(p for p in paths if isinstance(p, str))
|
|
return artifacts
|
|
|
|
|
|
def _format_artifact_text(artifacts: list[str]) -> str:
|
|
"""Format artifact paths into a human-readable text block listing filenames."""
|
|
import posixpath
|
|
|
|
filenames = [posixpath.basename(p) for p in artifacts]
|
|
if len(filenames) == 1:
|
|
return f"Created File: 📎 {filenames[0]}"
|
|
return "Created Files: 📎 " + "、".join(filenames)
|
|
|
|
|
|
_OUTPUTS_VIRTUAL_PREFIX = "/mnt/user-data/outputs/"
|
|
|
|
|
|
def _resolve_attachments(thread_id: str, artifacts: list[str]) -> list[ResolvedAttachment]:
|
|
"""Resolve virtual artifact paths to host filesystem paths with metadata.
|
|
|
|
Only paths under ``/mnt/user-data/outputs/`` are accepted; any other
|
|
virtual path is rejected with a warning to prevent exfiltrating uploads
|
|
or workspace files via IM channels.
|
|
|
|
Skips artifacts that cannot be resolved (missing files, invalid paths)
|
|
and logs warnings for them.
|
|
"""
|
|
from deerflow.config.paths import get_paths
|
|
|
|
attachments: list[ResolvedAttachment] = []
|
|
paths = get_paths()
|
|
outputs_dir = paths.sandbox_outputs_dir(thread_id).resolve()
|
|
for virtual_path in artifacts:
|
|
# Security: only allow files from the agent outputs directory
|
|
if not virtual_path.startswith(_OUTPUTS_VIRTUAL_PREFIX):
|
|
logger.warning("[Manager] rejected non-outputs artifact path: %s", virtual_path)
|
|
continue
|
|
try:
|
|
actual = paths.resolve_virtual_path(thread_id, virtual_path)
|
|
# Verify the resolved path is actually under the outputs directory
|
|
# (guards against path-traversal even after prefix check)
|
|
try:
|
|
actual.resolve().relative_to(outputs_dir)
|
|
except ValueError:
|
|
logger.warning("[Manager] artifact path escapes outputs dir: %s -> %s", virtual_path, actual)
|
|
continue
|
|
if not actual.is_file():
|
|
logger.warning("[Manager] artifact not found on disk: %s -> %s", virtual_path, actual)
|
|
continue
|
|
mime, _ = mimetypes.guess_type(str(actual))
|
|
mime = mime or "application/octet-stream"
|
|
attachments.append(
|
|
ResolvedAttachment(
|
|
virtual_path=virtual_path,
|
|
actual_path=actual,
|
|
filename=actual.name,
|
|
mime_type=mime,
|
|
size=actual.stat().st_size,
|
|
is_image=mime.startswith("image/"),
|
|
)
|
|
)
|
|
except (ValueError, OSError) as exc:
|
|
logger.warning("[Manager] failed to resolve artifact %s: %s", virtual_path, exc)
|
|
return attachments
|
|
|
|
|
|
def _prepare_artifact_delivery(
|
|
thread_id: str,
|
|
response_text: str,
|
|
artifacts: list[str],
|
|
) -> tuple[str, list[ResolvedAttachment]]:
|
|
"""Resolve attachments and append filename fallbacks to the text response."""
|
|
attachments: list[ResolvedAttachment] = []
|
|
if not artifacts:
|
|
return response_text, attachments
|
|
|
|
attachments = _resolve_attachments(thread_id, artifacts)
|
|
resolved_virtuals = {attachment.virtual_path for attachment in attachments}
|
|
unresolved = [path for path in artifacts if path not in resolved_virtuals]
|
|
|
|
if unresolved:
|
|
artifact_text = _format_artifact_text(unresolved)
|
|
response_text = (response_text + "\n\n" + artifact_text) if response_text else artifact_text
|
|
|
|
# Always include resolved attachment filenames as a text fallback so files
|
|
# remain discoverable even when the upload is skipped or fails.
|
|
if attachments:
|
|
resolved_text = _format_artifact_text([attachment.virtual_path for attachment in attachments])
|
|
response_text = (response_text + "\n\n" + resolved_text) if response_text else resolved_text
|
|
|
|
return response_text, attachments
|
|
|
|
|
|
async def _ingest_inbound_files(thread_id: str, msg: InboundMessage) -> list[dict[str, Any]]:
|
|
if not msg.files:
|
|
return []
|
|
|
|
from deerflow.uploads.manager import claim_unique_filename, ensure_uploads_dir, normalize_filename
|
|
|
|
uploads_dir = ensure_uploads_dir(thread_id)
|
|
seen_names = {entry.name for entry in uploads_dir.iterdir() if entry.is_file()}
|
|
|
|
created: list[dict[str, Any]] = []
|
|
file_reader = INBOUND_FILE_READERS.get(msg.channel_name, _read_http_inbound_file)
|
|
async with httpx.AsyncClient(timeout=httpx.Timeout(20.0)) as client:
|
|
for idx, f in enumerate(msg.files):
|
|
if not isinstance(f, dict):
|
|
continue
|
|
|
|
ftype = f.get("type") if isinstance(f.get("type"), str) else "file"
|
|
filename = f.get("filename") if isinstance(f.get("filename"), str) else ""
|
|
|
|
try:
|
|
data = await file_reader(f, client)
|
|
except Exception:
|
|
logger.exception(
|
|
"[Manager] failed to read inbound file: channel=%s, file=%s",
|
|
msg.channel_name,
|
|
f.get("url") or filename or idx,
|
|
)
|
|
continue
|
|
|
|
if data is None:
|
|
logger.warning(
|
|
"[Manager] inbound file reader returned no data: channel=%s, file=%s",
|
|
msg.channel_name,
|
|
f.get("url") or filename or idx,
|
|
)
|
|
continue
|
|
|
|
if not filename:
|
|
ext = ".bin"
|
|
if ftype == "image":
|
|
ext = ".png"
|
|
filename = f"{msg.thread_ts or 'msg'}_{idx}{ext}"
|
|
|
|
try:
|
|
safe_name = claim_unique_filename(normalize_filename(filename), seen_names)
|
|
except ValueError:
|
|
logger.warning(
|
|
"[Manager] skipping inbound file with unsafe filename: channel=%s, file=%r",
|
|
msg.channel_name,
|
|
filename,
|
|
)
|
|
continue
|
|
|
|
dest = uploads_dir / safe_name
|
|
try:
|
|
dest.write_bytes(data)
|
|
except Exception:
|
|
logger.exception("[Manager] failed to write inbound file: %s", dest)
|
|
continue
|
|
|
|
created.append(
|
|
{
|
|
"filename": safe_name,
|
|
"size": len(data),
|
|
"path": f"/mnt/user-data/uploads/{safe_name}",
|
|
"is_image": ftype == "image",
|
|
}
|
|
)
|
|
|
|
return created
|
|
|
|
|
|
def _format_uploaded_files_block(files: list[dict[str, Any]]) -> str:
|
|
lines = [
|
|
"<uploaded_files>",
|
|
"The following files were uploaded in this message:",
|
|
"",
|
|
]
|
|
if not files:
|
|
lines.append("(empty)")
|
|
else:
|
|
for f in files:
|
|
filename = f.get("filename", "")
|
|
size = int(f.get("size") or 0)
|
|
size_kb = size / 1024 if size else 0
|
|
size_str = f"{size_kb:.1f} KB" if size_kb < 1024 else f"{size_kb / 1024:.1f} MB"
|
|
path = f.get("path", "")
|
|
is_image = bool(f.get("is_image"))
|
|
file_kind = "image" if is_image else "file"
|
|
lines.append(f"- {filename} ({size_str})")
|
|
lines.append(f" Type: {file_kind}")
|
|
lines.append(f" Path: {path}")
|
|
lines.append("")
|
|
lines.append("Use `read_file` for text-based files and documents.")
|
|
lines.append("Use `view_image` for image files (jpg, jpeg, png, webp) so the model can inspect the image content.")
|
|
lines.append("</uploaded_files>")
|
|
return "\n".join(lines)
|
|
|
|
|
|
class ChannelManager:
|
|
"""Core dispatcher that bridges IM channels to the DeerFlow agent.
|
|
|
|
It reads from the MessageBus inbound queue, creates/reuses threads on
|
|
the LangGraph Server, sends messages via ``runs.wait``, and publishes
|
|
outbound responses back through the bus.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
bus: MessageBus,
|
|
store: ChannelStore,
|
|
*,
|
|
max_concurrency: int = 5,
|
|
langgraph_url: str = DEFAULT_LANGGRAPH_URL,
|
|
gateway_url: str = DEFAULT_GATEWAY_URL,
|
|
assistant_id: str = DEFAULT_ASSISTANT_ID,
|
|
default_session: dict[str, Any] | None = None,
|
|
channel_sessions: dict[str, Any] | None = None,
|
|
) -> None:
|
|
self.bus = bus
|
|
self.store = store
|
|
self._max_concurrency = max_concurrency
|
|
self._langgraph_url = langgraph_url
|
|
self._gateway_url = gateway_url
|
|
self._assistant_id = assistant_id
|
|
self._default_session = _as_dict(default_session)
|
|
self._channel_sessions = dict(channel_sessions or {})
|
|
self._client = None # lazy init — langgraph_sdk async client
|
|
self._semaphore: asyncio.Semaphore | None = None
|
|
self._running = False
|
|
self._task: asyncio.Task | None = None
|
|
|
|
@staticmethod
|
|
def _channel_supports_streaming(channel_name: str) -> bool:
|
|
return CHANNEL_CAPABILITIES.get(channel_name, {}).get("supports_streaming", False)
|
|
|
|
def _resolve_session_layer(self, msg: InboundMessage) -> tuple[dict[str, Any], dict[str, Any]]:
|
|
channel_layer = _as_dict(self._channel_sessions.get(msg.channel_name))
|
|
users_layer = _as_dict(channel_layer.get("users"))
|
|
user_layer = _as_dict(users_layer.get(msg.user_id))
|
|
return channel_layer, user_layer
|
|
|
|
def _resolve_run_params(self, msg: InboundMessage, thread_id: str) -> tuple[str, dict[str, Any], dict[str, Any]]:
|
|
channel_layer, user_layer = self._resolve_session_layer(msg)
|
|
|
|
assistant_id = user_layer.get("assistant_id") or channel_layer.get("assistant_id") or self._default_session.get("assistant_id") or self._assistant_id
|
|
if not isinstance(assistant_id, str) or not assistant_id.strip():
|
|
assistant_id = self._assistant_id
|
|
|
|
run_config = _merge_dicts(
|
|
DEFAULT_RUN_CONFIG,
|
|
self._default_session.get("config"),
|
|
channel_layer.get("config"),
|
|
user_layer.get("config"),
|
|
)
|
|
|
|
run_context = _merge_dicts(
|
|
DEFAULT_RUN_CONTEXT,
|
|
self._default_session.get("context"),
|
|
channel_layer.get("context"),
|
|
user_layer.get("context"),
|
|
{"thread_id": thread_id},
|
|
)
|
|
|
|
# Custom agents are implemented as lead_agent + agent_name context.
|
|
# Keep backward compatibility for channel configs that set
|
|
# assistant_id: <custom-agent-name> by routing through lead_agent.
|
|
if assistant_id != DEFAULT_ASSISTANT_ID:
|
|
run_context.setdefault("agent_name", _normalize_custom_agent_name(assistant_id))
|
|
assistant_id = DEFAULT_ASSISTANT_ID
|
|
|
|
return assistant_id, run_config, run_context
|
|
|
|
# -- LangGraph SDK client (lazy) ----------------------------------------
|
|
|
|
def _get_client(self):
|
|
"""Return the ``langgraph_sdk`` async client, creating it on first use."""
|
|
if self._client is None:
|
|
from langgraph_sdk import get_client
|
|
|
|
self._client = get_client(url=self._langgraph_url)
|
|
return self._client
|
|
|
|
# -- lifecycle ---------------------------------------------------------
|
|
|
|
async def start(self) -> None:
|
|
"""Start the dispatch loop."""
|
|
if self._running:
|
|
return
|
|
self._running = True
|
|
self._semaphore = asyncio.Semaphore(self._max_concurrency)
|
|
self._task = asyncio.create_task(self._dispatch_loop())
|
|
logger.info("ChannelManager started (max_concurrency=%d)", self._max_concurrency)
|
|
|
|
async def stop(self) -> None:
|
|
"""Stop the dispatch loop."""
|
|
self._running = False
|
|
if self._task:
|
|
self._task.cancel()
|
|
try:
|
|
await self._task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
self._task = None
|
|
logger.info("ChannelManager stopped")
|
|
|
|
# -- dispatch loop -----------------------------------------------------
|
|
|
|
async def _dispatch_loop(self) -> None:
|
|
logger.info("[Manager] dispatch loop started, waiting for inbound messages")
|
|
while self._running:
|
|
try:
|
|
msg = await asyncio.wait_for(self.bus.get_inbound(), timeout=1.0)
|
|
except TimeoutError:
|
|
continue
|
|
except asyncio.CancelledError:
|
|
break
|
|
|
|
logger.info(
|
|
"[Manager] received inbound: channel=%s, chat_id=%s, type=%s, text=%r",
|
|
msg.channel_name,
|
|
msg.chat_id,
|
|
msg.msg_type.value,
|
|
msg.text[:100] if msg.text else "",
|
|
)
|
|
task = asyncio.create_task(self._handle_message(msg))
|
|
task.add_done_callback(self._log_task_error)
|
|
|
|
@staticmethod
|
|
def _log_task_error(task: asyncio.Task) -> None:
|
|
"""Surface unhandled exceptions from background tasks."""
|
|
if task.cancelled():
|
|
return
|
|
exc = task.exception()
|
|
if exc:
|
|
logger.error("[Manager] unhandled error in message task: %s", exc, exc_info=exc)
|
|
|
|
async def _handle_message(self, msg: InboundMessage) -> None:
|
|
async with self._semaphore:
|
|
try:
|
|
if msg.msg_type == InboundMessageType.COMMAND:
|
|
await self._handle_command(msg)
|
|
else:
|
|
await self._handle_chat(msg)
|
|
except InvalidChannelSessionConfigError as exc:
|
|
logger.warning(
|
|
"Invalid channel session config for %s (chat=%s): %s",
|
|
msg.channel_name,
|
|
msg.chat_id,
|
|
exc,
|
|
)
|
|
await self._send_error(msg, str(exc))
|
|
except Exception:
|
|
logger.exception(
|
|
"Error handling message from %s (chat=%s)",
|
|
msg.channel_name,
|
|
msg.chat_id,
|
|
)
|
|
await self._send_error(msg, "An internal error occurred. Please try again.")
|
|
|
|
# -- chat handling -----------------------------------------------------
|
|
|
|
async def _create_thread(self, client, msg: InboundMessage) -> str:
|
|
"""Create a new thread on the LangGraph Server and store the mapping."""
|
|
thread = await client.threads.create()
|
|
thread_id = thread["thread_id"]
|
|
self.store.set_thread_id(
|
|
msg.channel_name,
|
|
msg.chat_id,
|
|
thread_id,
|
|
topic_id=msg.topic_id,
|
|
user_id=msg.user_id,
|
|
)
|
|
logger.info("[Manager] new thread created on LangGraph Server: thread_id=%s for chat_id=%s topic_id=%s", thread_id, msg.chat_id, msg.topic_id)
|
|
return thread_id
|
|
|
|
async def _handle_chat(self, msg: InboundMessage, extra_context: dict[str, Any] | None = None) -> None:
|
|
client = self._get_client()
|
|
|
|
# Look up existing DeerFlow thread.
|
|
# topic_id may be None (e.g. Telegram private chats) — the store
|
|
# handles this by using the "channel:chat_id" key without a topic suffix.
|
|
thread_id = self.store.get_thread_id(msg.channel_name, msg.chat_id, topic_id=msg.topic_id)
|
|
if thread_id:
|
|
logger.info("[Manager] reusing thread: thread_id=%s for topic_id=%s", thread_id, msg.topic_id)
|
|
|
|
# No existing thread found — create a new one
|
|
if thread_id is None:
|
|
thread_id = await self._create_thread(client, msg)
|
|
|
|
assistant_id, run_config, run_context = self._resolve_run_params(msg, thread_id)
|
|
|
|
# If the inbound message contains file attachments, let the channel
|
|
# materialize (download) them and update msg.text to include sandbox file paths.
|
|
# This enables downstream models to access user-uploaded files by path.
|
|
# Channels that do not support file download will simply return the original message.
|
|
if msg.files:
|
|
from .service import get_channel_service
|
|
|
|
service = get_channel_service()
|
|
channel = service.get_channel(msg.channel_name) if service else None
|
|
logger.info("[Manager] preparing receive file context for %d attachments", len(msg.files))
|
|
msg = await channel.receive_file(msg, thread_id) if channel else msg
|
|
if extra_context:
|
|
run_context.update(extra_context)
|
|
|
|
uploaded = await _ingest_inbound_files(thread_id, msg)
|
|
if uploaded:
|
|
msg.text = f"{_format_uploaded_files_block(uploaded)}\n\n{msg.text}".strip()
|
|
|
|
if self._channel_supports_streaming(msg.channel_name):
|
|
await self._handle_streaming_chat(
|
|
client,
|
|
msg,
|
|
thread_id,
|
|
assistant_id,
|
|
run_config,
|
|
run_context,
|
|
)
|
|
return
|
|
|
|
logger.info("[Manager] invoking runs.wait(thread_id=%s, text=%r)", thread_id, msg.text[:100])
|
|
result = await client.runs.wait(
|
|
thread_id,
|
|
assistant_id,
|
|
input={"messages": [{"role": "human", "content": msg.text}]},
|
|
config=run_config,
|
|
context=run_context,
|
|
)
|
|
|
|
response_text = _extract_response_text(result)
|
|
artifacts = _extract_artifacts(result)
|
|
|
|
logger.info(
|
|
"[Manager] agent response received: thread_id=%s, response_len=%d, artifacts=%d",
|
|
thread_id,
|
|
len(response_text) if response_text else 0,
|
|
len(artifacts),
|
|
)
|
|
|
|
response_text, attachments = _prepare_artifact_delivery(thread_id, response_text, artifacts)
|
|
|
|
if not response_text:
|
|
if attachments:
|
|
response_text = _format_artifact_text([a.virtual_path for a in attachments])
|
|
else:
|
|
response_text = "(No response from agent)"
|
|
|
|
outbound = OutboundMessage(
|
|
channel_name=msg.channel_name,
|
|
chat_id=msg.chat_id,
|
|
thread_id=thread_id,
|
|
text=response_text,
|
|
artifacts=artifacts,
|
|
attachments=attachments,
|
|
thread_ts=msg.thread_ts,
|
|
)
|
|
logger.info("[Manager] publishing outbound message to bus: channel=%s, chat_id=%s", msg.channel_name, msg.chat_id)
|
|
await self.bus.publish_outbound(outbound)
|
|
|
|
async def _handle_streaming_chat(
|
|
self,
|
|
client,
|
|
msg: InboundMessage,
|
|
thread_id: str,
|
|
assistant_id: str,
|
|
run_config: dict[str, Any],
|
|
run_context: dict[str, Any],
|
|
) -> None:
|
|
logger.info("[Manager] invoking runs.stream(thread_id=%s, text=%r)", thread_id, msg.text[:100])
|
|
|
|
last_values: dict[str, Any] | list | None = None
|
|
streamed_buffers: dict[str, str] = {}
|
|
current_message_id: str | None = None
|
|
latest_text = ""
|
|
last_published_text = ""
|
|
last_publish_at = 0.0
|
|
stream_error: BaseException | None = None
|
|
|
|
try:
|
|
async for chunk in client.runs.stream(
|
|
thread_id,
|
|
assistant_id,
|
|
input={"messages": [{"role": "human", "content": msg.text}]},
|
|
config=run_config,
|
|
context=run_context,
|
|
stream_mode=["messages-tuple", "values"],
|
|
multitask_strategy="reject",
|
|
):
|
|
event = getattr(chunk, "event", "")
|
|
data = getattr(chunk, "data", None)
|
|
|
|
if event == "messages-tuple":
|
|
accumulated_text, current_message_id = _accumulate_stream_text(streamed_buffers, current_message_id, data)
|
|
if accumulated_text:
|
|
latest_text = accumulated_text
|
|
elif event == "values" and isinstance(data, (dict, list)):
|
|
last_values = data
|
|
snapshot_text = _extract_response_text(data)
|
|
if snapshot_text:
|
|
latest_text = snapshot_text
|
|
|
|
if not latest_text or latest_text == last_published_text:
|
|
continue
|
|
|
|
now = time.monotonic()
|
|
if last_published_text and now - last_publish_at < STREAM_UPDATE_MIN_INTERVAL_SECONDS:
|
|
continue
|
|
|
|
await self.bus.publish_outbound(
|
|
OutboundMessage(
|
|
channel_name=msg.channel_name,
|
|
chat_id=msg.chat_id,
|
|
thread_id=thread_id,
|
|
text=latest_text,
|
|
is_final=False,
|
|
thread_ts=msg.thread_ts,
|
|
)
|
|
)
|
|
last_published_text = latest_text
|
|
last_publish_at = now
|
|
except Exception as exc:
|
|
stream_error = exc
|
|
if _is_thread_busy_error(exc):
|
|
logger.warning("[Manager] thread busy (concurrent run rejected): thread_id=%s", thread_id)
|
|
else:
|
|
logger.exception("[Manager] streaming error: thread_id=%s", thread_id)
|
|
finally:
|
|
result = last_values if last_values is not None else {"messages": [{"type": "ai", "content": latest_text}]}
|
|
response_text = _extract_response_text(result)
|
|
artifacts = _extract_artifacts(result)
|
|
response_text, attachments = _prepare_artifact_delivery(thread_id, response_text, artifacts)
|
|
|
|
if not response_text:
|
|
if attachments:
|
|
response_text = _format_artifact_text([attachment.virtual_path for attachment in attachments])
|
|
elif stream_error:
|
|
if _is_thread_busy_error(stream_error):
|
|
response_text = THREAD_BUSY_MESSAGE
|
|
else:
|
|
response_text = "An error occurred while processing your request. Please try again."
|
|
else:
|
|
response_text = latest_text or "(No response from agent)"
|
|
|
|
logger.info(
|
|
"[Manager] streaming response completed: thread_id=%s, response_len=%d, artifacts=%d, error=%s",
|
|
thread_id,
|
|
len(response_text),
|
|
len(artifacts),
|
|
stream_error,
|
|
)
|
|
await self.bus.publish_outbound(
|
|
OutboundMessage(
|
|
channel_name=msg.channel_name,
|
|
chat_id=msg.chat_id,
|
|
thread_id=thread_id,
|
|
text=response_text,
|
|
artifacts=artifacts,
|
|
attachments=attachments,
|
|
is_final=True,
|
|
thread_ts=msg.thread_ts,
|
|
)
|
|
)
|
|
|
|
# -- command handling --------------------------------------------------
|
|
|
|
async def _handle_command(self, msg: InboundMessage) -> None:
|
|
text = msg.text.strip()
|
|
parts = text.split(maxsplit=1)
|
|
command = parts[0].lower().lstrip("/")
|
|
|
|
if command == "bootstrap":
|
|
from dataclasses import replace as _dc_replace
|
|
|
|
chat_text = parts[1] if len(parts) > 1 else "Initialize workspace"
|
|
chat_msg = _dc_replace(msg, text=chat_text, msg_type=InboundMessageType.CHAT)
|
|
await self._handle_chat(chat_msg, extra_context={"is_bootstrap": True})
|
|
return
|
|
|
|
if command == "new":
|
|
# Create a new thread on the LangGraph Server
|
|
client = self._get_client()
|
|
thread = await client.threads.create()
|
|
new_thread_id = thread["thread_id"]
|
|
self.store.set_thread_id(
|
|
msg.channel_name,
|
|
msg.chat_id,
|
|
new_thread_id,
|
|
topic_id=msg.topic_id,
|
|
user_id=msg.user_id,
|
|
)
|
|
reply = "New conversation started."
|
|
elif command == "status":
|
|
thread_id = self.store.get_thread_id(msg.channel_name, msg.chat_id, topic_id=msg.topic_id)
|
|
reply = f"Active thread: {thread_id}" if thread_id else "No active conversation."
|
|
elif command == "models":
|
|
reply = await self._fetch_gateway("/api/models", "models")
|
|
elif command == "memory":
|
|
reply = await self._fetch_gateway("/api/memory", "memory")
|
|
elif command == "help":
|
|
reply = (
|
|
"Available commands:\n"
|
|
"/bootstrap — Start a bootstrap session (enables agent setup)\n"
|
|
"/new — Start a new conversation\n"
|
|
"/status — Show current thread info\n"
|
|
"/models — List available models\n"
|
|
"/memory — Show memory status\n"
|
|
"/help — Show this help"
|
|
)
|
|
else:
|
|
available = " | ".join(sorted(KNOWN_CHANNEL_COMMANDS))
|
|
reply = f"Unknown command: /{command}. Available commands: {available}"
|
|
|
|
outbound = OutboundMessage(
|
|
channel_name=msg.channel_name,
|
|
chat_id=msg.chat_id,
|
|
thread_id=self.store.get_thread_id(msg.channel_name, msg.chat_id) or "",
|
|
text=reply,
|
|
thread_ts=msg.thread_ts,
|
|
)
|
|
await self.bus.publish_outbound(outbound)
|
|
|
|
async def _fetch_gateway(self, path: str, kind: str) -> str:
|
|
"""Fetch data from the Gateway API for command responses."""
|
|
import httpx
|
|
|
|
try:
|
|
async with httpx.AsyncClient() as http:
|
|
resp = await http.get(f"{self._gateway_url}{path}", timeout=10)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
except Exception:
|
|
logger.exception("Failed to fetch %s from gateway", kind)
|
|
return f"Failed to fetch {kind} information."
|
|
|
|
if kind == "models":
|
|
names = [m["name"] for m in data.get("models", [])]
|
|
return ("Available models:\n" + "\n".join(f"• {n}" for n in names)) if names else "No models configured."
|
|
elif kind == "memory":
|
|
facts = data.get("facts", [])
|
|
return f"Memory contains {len(facts)} fact(s)."
|
|
return str(data)
|
|
|
|
# -- error helper ------------------------------------------------------
|
|
|
|
async def _send_error(self, msg: InboundMessage, error_text: str) -> None:
|
|
outbound = OutboundMessage(
|
|
channel_name=msg.channel_name,
|
|
chat_id=msg.chat_id,
|
|
thread_id=self.store.get_thread_id(msg.channel_name, msg.chat_id) or "",
|
|
text=error_text,
|
|
thread_ts=msg.thread_ts,
|
|
)
|
|
await self.bus.publish_outbound(outbound)
|