08afdcb907
* feat(channels): add DingTalk channel integration Add a new DingTalk messaging channel using the dingtalk-stream SDK with Stream Push (WebSocket), requiring no public IP. Supports both plain sampleMarkdown replies and optional AI Card streaming for a typewriter effect when card_template_id is configured. - Add DingTalkChannel implementation with token management, message routing, allowed_users filtering, and markdown adaptation - Register dingtalk in channel service registry and capability map - Propagate inbound metadata to outbound messages in ChannelManager for DingTalk sender context (sender_staff_id, conversation_type) - Add dingtalk-stream dependency to pyproject.toml - Add configuration examples in config.example.yaml and .env.example - Update all README translations with setup instructions - Add comprehensive test suite (test_dingtalk_channel.py) and metadata propagation test in test_channels.py - Update backend CLAUDE.md to document DingTalk channel Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix(channels): address PR review feedback for DingTalk integration - Replace runtime mutation of CHANNEL_CAPABILITIES with a `supports_streaming` property on the Channel base class, overridden by DingTalkChannel, FeishuChannel, and WeComChannel - Store stream client reference and attempt graceful disconnect in stop(); guard _on_chatbot_message with _running check to prevent post-stop message processing - Use msg.chat_id as the primary routing key in send/send_file via a shared _resolve_routing helper, with metadata as fallback - Fix process() return type annotation from tuple[str, str] to tuple[int, str] to match AckMessage.STATUS_OK - Protect _incoming_messages with threading.Lock for cross-thread safety between the Stream Push thread and the asyncio loop - Re-add Docker Compose URL guidance removed during DingTalk setup docs addition in README.md - Fix incomplete sentence in README_zh.md (missing verb "启用") Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix(docs): restore plain paragraph format for Docker Compose note Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix(channels): fix isinstance TypeError and add file size guard in DingTalk channel Use tuple syntax for isinstance() type check to avoid runtime TypeError with PEP 604 union types. Add upload size limit (20MB) before reading files into memory. Narrow exception handlers to specific types. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix(channels): propagate markdown fallback errors and validate access token response - Re-raise exceptions in _send_markdown_fallback to prevent partial deliveries (files sent without accompanying text) - Validate _get_access_token response: reject non-dict bodies, empty tokens, and coerce invalid expireIn to a safe default - Add tests for both fixes Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix(channels): validate upload response and broaden send_file exception handling - Validate _upload_media JSON response: handle JSONDecodeError and non-dict payloads gracefully by returning None - Broaden send_file exception tuple to include TypeError and AttributeError for unexpected JSON shapes Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix(channels): fix streaming race on channel registration and slim outbound metadata - Register channel in service before calling start() to avoid race where background receiver publishes inbound before registration, causing manager to fall back to static CHANNEL_CAPABILITIES - Strip known-large metadata keys (raw_message, ref_msg) from outbound messages to prevent memory bloat from propagated inbound payloads Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * Update service.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update CLAUDE.md Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com> Co-authored-by: Willem Jiang <willem.jiang@gmail.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
231 lines
8.6 KiB
Python
231 lines
8.6 KiB
Python
"""ChannelService — manages the lifecycle of all IM channels."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import os
|
|
from typing import TYPE_CHECKING, Any
|
|
|
|
from app.channels.base import Channel
|
|
from app.channels.manager import DEFAULT_GATEWAY_URL, DEFAULT_LANGGRAPH_URL, ChannelManager
|
|
from app.channels.message_bus import MessageBus
|
|
from app.channels.store import ChannelStore
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
if TYPE_CHECKING:
|
|
from deerflow.config.app_config import AppConfig
|
|
|
|
# Channel name → import path for lazy loading
|
|
_CHANNEL_REGISTRY: dict[str, str] = {
|
|
"dingtalk": "app.channels.dingtalk:DingTalkChannel",
|
|
"discord": "app.channels.discord:DiscordChannel",
|
|
"feishu": "app.channels.feishu:FeishuChannel",
|
|
"slack": "app.channels.slack:SlackChannel",
|
|
"telegram": "app.channels.telegram:TelegramChannel",
|
|
"wechat": "app.channels.wechat:WechatChannel",
|
|
"wecom": "app.channels.wecom:WeComChannel",
|
|
}
|
|
|
|
# Keys that indicate a user has configured credentials for a channel.
|
|
_CHANNEL_CREDENTIAL_KEYS: dict[str, list[str]] = {
|
|
"dingtalk": ["client_id", "client_secret"],
|
|
"discord": ["bot_token"],
|
|
"feishu": ["app_id", "app_secret"],
|
|
"slack": ["bot_token", "app_token"],
|
|
"telegram": ["bot_token"],
|
|
"wecom": ["bot_id", "bot_secret"],
|
|
"wechat": ["bot_token"],
|
|
}
|
|
|
|
_CHANNELS_LANGGRAPH_URL_ENV = "DEER_FLOW_CHANNELS_LANGGRAPH_URL"
|
|
_CHANNELS_GATEWAY_URL_ENV = "DEER_FLOW_CHANNELS_GATEWAY_URL"
|
|
|
|
|
|
def _resolve_service_url(config: dict[str, Any], config_key: str, env_key: str, default: str) -> str:
|
|
value = config.pop(config_key, None)
|
|
if isinstance(value, str) and value.strip():
|
|
return value
|
|
env_value = os.getenv(env_key, "").strip()
|
|
if env_value:
|
|
return env_value
|
|
return default
|
|
|
|
|
|
class ChannelService:
|
|
"""Manages the lifecycle of all configured IM channels.
|
|
|
|
Reads configuration from ``config.yaml`` under the ``channels`` key,
|
|
instantiates enabled channels, and starts the ChannelManager dispatcher.
|
|
"""
|
|
|
|
def __init__(self, channels_config: dict[str, Any] | None = None) -> None:
|
|
self.bus = MessageBus()
|
|
self.store = ChannelStore()
|
|
config = dict(channels_config or {})
|
|
langgraph_url = _resolve_service_url(config, "langgraph_url", _CHANNELS_LANGGRAPH_URL_ENV, DEFAULT_LANGGRAPH_URL)
|
|
gateway_url = _resolve_service_url(config, "gateway_url", _CHANNELS_GATEWAY_URL_ENV, DEFAULT_GATEWAY_URL)
|
|
default_session = config.pop("session", None)
|
|
channel_sessions = {name: channel_config.get("session") for name, channel_config in config.items() if isinstance(channel_config, dict)}
|
|
self.manager = ChannelManager(
|
|
bus=self.bus,
|
|
store=self.store,
|
|
langgraph_url=langgraph_url,
|
|
gateway_url=gateway_url,
|
|
default_session=default_session if isinstance(default_session, dict) else None,
|
|
channel_sessions=channel_sessions,
|
|
)
|
|
self._channels: dict[str, Any] = {} # name -> Channel instance
|
|
self._config = config
|
|
self._running = False
|
|
|
|
@classmethod
|
|
def from_app_config(cls, app_config: AppConfig | None = None) -> ChannelService:
|
|
"""Create a ChannelService from the application config."""
|
|
if app_config is None:
|
|
from deerflow.config.app_config import get_app_config
|
|
|
|
app_config = get_app_config()
|
|
channels_config = {}
|
|
# extra fields are allowed by AppConfig (extra="allow")
|
|
extra = app_config.model_extra or {}
|
|
if "channels" in extra:
|
|
channels_config = extra["channels"]
|
|
return cls(channels_config=channels_config)
|
|
|
|
async def start(self) -> None:
|
|
"""Start the manager and all enabled channels."""
|
|
if self._running:
|
|
return
|
|
|
|
await self.manager.start()
|
|
|
|
for name, channel_config in self._config.items():
|
|
if not isinstance(channel_config, dict):
|
|
continue
|
|
if not channel_config.get("enabled", False):
|
|
cred_keys = _CHANNEL_CREDENTIAL_KEYS.get(name, [])
|
|
has_creds = any(not isinstance(channel_config.get(k), bool) and channel_config.get(k) is not None and str(channel_config[k]).strip() for k in cred_keys)
|
|
if has_creds:
|
|
logger.warning(
|
|
"Channel '%s' has credentials configured but is disabled. Set enabled: true under channels.%s in config.yaml to activate it.",
|
|
name,
|
|
name,
|
|
)
|
|
else:
|
|
logger.info("Channel %s is disabled, skipping", name)
|
|
continue
|
|
|
|
await self._start_channel(name, channel_config)
|
|
|
|
self._running = True
|
|
logger.info("ChannelService started with channels: %s", list(self._channels.keys()))
|
|
|
|
async def stop(self) -> None:
|
|
"""Stop all channels and the manager."""
|
|
for name, channel in list(self._channels.items()):
|
|
try:
|
|
await channel.stop()
|
|
logger.info("Channel %s stopped", name)
|
|
except Exception:
|
|
logger.exception("Error stopping channel %s", name)
|
|
self._channels.clear()
|
|
|
|
await self.manager.stop()
|
|
self._running = False
|
|
logger.info("ChannelService stopped")
|
|
|
|
async def restart_channel(self, name: str) -> bool:
|
|
"""Restart a specific channel. Returns True if successful."""
|
|
if name in self._channels:
|
|
try:
|
|
await self._channels[name].stop()
|
|
except Exception:
|
|
logger.exception("Error stopping channel %s for restart", name)
|
|
del self._channels[name]
|
|
|
|
config = self._config.get(name)
|
|
if not config or not isinstance(config, dict):
|
|
logger.warning("No config for channel %s", name)
|
|
return False
|
|
|
|
return await self._start_channel(name, config)
|
|
|
|
async def _start_channel(self, name: str, config: dict[str, Any]) -> bool:
|
|
"""Instantiate and start a single channel."""
|
|
import_path = _CHANNEL_REGISTRY.get(name)
|
|
if not import_path:
|
|
logger.warning("Unknown channel type: %s", name)
|
|
return False
|
|
|
|
try:
|
|
from deerflow.reflection import resolve_class
|
|
|
|
channel_cls = resolve_class(import_path, base_class=None)
|
|
except Exception:
|
|
logger.exception("Failed to import channel class for %s", name)
|
|
return False
|
|
|
|
try:
|
|
channel = channel_cls(bus=self.bus, config=config)
|
|
self._channels[name] = channel
|
|
await channel.start()
|
|
if not channel.is_running:
|
|
self._channels.pop(name, None)
|
|
logger.error("Channel %s did not enter a running state after start()", name)
|
|
return False
|
|
logger.info("Channel %s started", name)
|
|
return True
|
|
except Exception:
|
|
self._channels.pop(name, None)
|
|
logger.exception("Failed to start channel %s", name)
|
|
return False
|
|
|
|
def get_status(self) -> dict[str, Any]:
|
|
"""Return status information for all channels."""
|
|
channels_status = {}
|
|
for name in _CHANNEL_REGISTRY:
|
|
config = self._config.get(name, {})
|
|
enabled = isinstance(config, dict) and config.get("enabled", False)
|
|
running = name in self._channels and self._channels[name].is_running
|
|
channels_status[name] = {
|
|
"enabled": enabled,
|
|
"running": running,
|
|
}
|
|
return {
|
|
"service_running": self._running,
|
|
"channels": channels_status,
|
|
}
|
|
|
|
def get_channel(self, name: str) -> Channel | None:
|
|
"""Return a running channel instance by name when available."""
|
|
return self._channels.get(name)
|
|
|
|
|
|
# -- singleton access -------------------------------------------------------
|
|
|
|
_channel_service: ChannelService | None = None
|
|
|
|
|
|
def get_channel_service() -> ChannelService | None:
|
|
"""Get the singleton ChannelService instance (if started)."""
|
|
return _channel_service
|
|
|
|
|
|
async def start_channel_service(app_config: AppConfig | None = None) -> ChannelService:
|
|
"""Create and start the global ChannelService from app config."""
|
|
global _channel_service
|
|
if _channel_service is not None:
|
|
return _channel_service
|
|
_channel_service = ChannelService.from_app_config(app_config)
|
|
await _channel_service.start()
|
|
return _channel_service
|
|
|
|
|
|
async def stop_channel_service() -> None:
|
|
"""Stop the global ChannelService."""
|
|
global _channel_service
|
|
if _channel_service is not None:
|
|
await _channel_service.stop()
|
|
_channel_service = None
|