Files
deer-flow/backend/packages/harness/deerflow/config/app_config.py
T
Nan Gao c09c334544 fix(harness): resolve runtime paths from project root (#2642)
* fix(harness): resolve runtime paths from project root

* docs(config): update

* fix(config): address runtime path review feedback

* test(config): fix skills path e2e root

* test(config): cover legacy config fallback when project root lacks config files

Verifies that when DEER_FLOW_PROJECT_ROOT is unset and cwd has no
config.yaml/extensions_config.json, AppConfig and ExtensionsConfig fall back
to the legacy backend/repo-root candidates — the backward-compat path
requested in PR #2642 review.

---------

Co-authored-by: Willem Jiang <willem.jiang@gmail.com>
2026-05-01 22:19:50 +08:00

454 lines
19 KiB
Python

import logging
import os
from contextvars import ContextVar
from pathlib import Path
from typing import Any, Self
import yaml
from dotenv import load_dotenv
from pydantic import BaseModel, ConfigDict, Field
from deerflow.config.acp_config import load_acp_config_from_dict
from deerflow.config.agents_api_config import AgentsApiConfig, load_agents_api_config_from_dict
from deerflow.config.checkpointer_config import CheckpointerConfig, load_checkpointer_config_from_dict
from deerflow.config.database_config import DatabaseConfig
from deerflow.config.extensions_config import ExtensionsConfig
from deerflow.config.guardrails_config import GuardrailsConfig, load_guardrails_config_from_dict
from deerflow.config.memory_config import MemoryConfig, load_memory_config_from_dict
from deerflow.config.model_config import ModelConfig
from deerflow.config.run_events_config import RunEventsConfig
from deerflow.config.runtime_paths import existing_project_file
from deerflow.config.sandbox_config import SandboxConfig
from deerflow.config.skill_evolution_config import SkillEvolutionConfig
from deerflow.config.skills_config import SkillsConfig
from deerflow.config.stream_bridge_config import StreamBridgeConfig, load_stream_bridge_config_from_dict
from deerflow.config.subagents_config import SubagentsAppConfig, load_subagents_config_from_dict
from deerflow.config.summarization_config import SummarizationConfig, load_summarization_config_from_dict
from deerflow.config.title_config import TitleConfig, load_title_config_from_dict
from deerflow.config.token_usage_config import TokenUsageConfig
from deerflow.config.tool_config import ToolConfig, ToolGroupConfig
from deerflow.config.tool_search_config import ToolSearchConfig, load_tool_search_config_from_dict
load_dotenv()
logger = logging.getLogger(__name__)
CONFIG_FILE_DATABASE_DEFAULTS = {
"backend": "sqlite",
"sqlite_dir": ".deer-flow/data",
}
class CircuitBreakerConfig(BaseModel):
"""Configuration for the LLM Circuit Breaker."""
failure_threshold: int = Field(default=5, description="Number of consecutive failures before tripping the circuit")
recovery_timeout_sec: int = Field(default=60, description="Time in seconds before attempting to recover the circuit")
def _legacy_config_candidates() -> tuple[Path, ...]:
"""Return source-tree config.yaml locations for monorepo compatibility."""
backend_dir = Path(__file__).resolve().parents[4]
repo_root = backend_dir.parent
return (backend_dir / "config.yaml", repo_root / "config.yaml")
def logging_level_from_config(name: str | None) -> int:
"""Map ``config.yaml`` ``log_level`` string to a :mod:`logging` level constant."""
mapping = logging.getLevelNamesMapping()
return mapping.get((name or "info").strip().upper(), logging.INFO)
def apply_logging_level(name: str | None) -> None:
"""Resolve *name* to a logging level and apply it to the ``deerflow``/``app`` logger hierarchies.
Only the ``deerflow`` and ``app`` logger levels are changed so that
third-party library verbosity (e.g. uvicorn, sqlalchemy) is not
affected. Root handler levels are lowered (never raised) so that
messages from the configured loggers can propagate through without
being filtered, while preserving handler thresholds that may be
intentionally restrictive for third-party log output.
"""
level = logging_level_from_config(name)
for logger_name in ("deerflow", "app"):
logging.getLogger(logger_name).setLevel(level)
for handler in logging.root.handlers:
if level < handler.level:
handler.setLevel(level)
class AppConfig(BaseModel):
"""Config for the DeerFlow application"""
log_level: str = Field(default="info", description="Logging level for deerflow and app modules (debug/info/warning/error); third-party libraries are not affected")
token_usage: TokenUsageConfig = Field(default_factory=TokenUsageConfig, description="Token usage tracking configuration")
models: list[ModelConfig] = Field(default_factory=list, description="Available models")
sandbox: SandboxConfig = Field(description="Sandbox configuration")
tools: list[ToolConfig] = Field(default_factory=list, description="Available tools")
tool_groups: list[ToolGroupConfig] = Field(default_factory=list, description="Available tool groups")
skills: SkillsConfig = Field(default_factory=SkillsConfig, description="Skills configuration")
skill_evolution: SkillEvolutionConfig = Field(default_factory=SkillEvolutionConfig, description="Agent-managed skill evolution configuration")
extensions: ExtensionsConfig = Field(default_factory=ExtensionsConfig, description="Extensions configuration (MCP servers and skills state)")
tool_search: ToolSearchConfig = Field(default_factory=ToolSearchConfig, description="Tool search / deferred loading configuration")
title: TitleConfig = Field(default_factory=TitleConfig, description="Automatic title generation configuration")
summarization: SummarizationConfig = Field(default_factory=SummarizationConfig, description="Conversation summarization configuration")
memory: MemoryConfig = Field(default_factory=MemoryConfig, description="Memory subsystem configuration")
agents_api: AgentsApiConfig = Field(default_factory=AgentsApiConfig, description="Custom-agent management API configuration")
subagents: SubagentsAppConfig = Field(default_factory=SubagentsAppConfig, description="Subagent runtime configuration")
guardrails: GuardrailsConfig = Field(default_factory=GuardrailsConfig, description="Guardrail middleware configuration")
circuit_breaker: CircuitBreakerConfig = Field(default_factory=CircuitBreakerConfig, description="LLM circuit breaker configuration")
model_config = ConfigDict(extra="allow")
database: DatabaseConfig = Field(default_factory=DatabaseConfig, description="Unified database backend configuration")
run_events: RunEventsConfig = Field(default_factory=RunEventsConfig, description="Run event storage configuration")
checkpointer: CheckpointerConfig | None = Field(default=None, description="Checkpointer configuration")
stream_bridge: StreamBridgeConfig | None = Field(default=None, description="Stream bridge configuration")
@classmethod
def resolve_config_path(cls, config_path: str | None = None) -> Path:
"""Resolve the config file path.
Priority:
1. If provided `config_path` argument, use it.
2. If provided `DEER_FLOW_CONFIG_PATH` environment variable, use it.
3. Otherwise, search the caller project root.
4. Finally, search legacy backend/repository-root defaults for monorepo compatibility.
"""
if config_path:
path = Path(config_path)
if not Path.exists(path):
raise FileNotFoundError(f"Config file specified by param `config_path` not found at {path}")
return path
elif os.getenv("DEER_FLOW_CONFIG_PATH"):
path = Path(os.getenv("DEER_FLOW_CONFIG_PATH"))
if not Path.exists(path):
raise FileNotFoundError(f"Config file specified by environment variable `DEER_FLOW_CONFIG_PATH` not found at {path}")
return path
else:
project_config = existing_project_file(("config.yaml",))
if project_config is not None:
return project_config
for path in _legacy_config_candidates():
if path.exists():
return path
raise FileNotFoundError("`config.yaml` file not found in the project root or legacy backend/repository root locations")
@classmethod
def from_file(cls, config_path: str | None = None) -> Self:
"""Load config from YAML file.
See `resolve_config_path` for more details.
Args:
config_path: Path to the config file.
Returns:
AppConfig: The loaded config.
"""
resolved_path = cls.resolve_config_path(config_path)
with open(resolved_path, encoding="utf-8") as f:
config_data = yaml.safe_load(f) or {}
# Check config version before processing
cls._check_config_version(config_data, resolved_path)
config_data = cls.resolve_env_variables(config_data)
cls._apply_database_defaults(config_data)
# Load title config if present
if "title" in config_data:
load_title_config_from_dict(config_data["title"])
# Load summarization config if present
if "summarization" in config_data:
load_summarization_config_from_dict(config_data["summarization"])
# Load memory config if present
if "memory" in config_data:
load_memory_config_from_dict(config_data["memory"])
# Always refresh agents API config so removed config sections reset
# singleton-backed state to its default/disabled values on reload.
load_agents_api_config_from_dict(config_data.get("agents_api") or {})
# Load subagents config if present
if "subagents" in config_data:
load_subagents_config_from_dict(config_data["subagents"])
# Load tool_search config if present
if "tool_search" in config_data:
load_tool_search_config_from_dict(config_data["tool_search"])
# Load guardrails config if present
if "guardrails" in config_data:
load_guardrails_config_from_dict(config_data["guardrails"])
# Load circuit_breaker config if present
if "circuit_breaker" in config_data:
config_data["circuit_breaker"] = config_data["circuit_breaker"]
# Load checkpointer config if present
if "checkpointer" in config_data:
load_checkpointer_config_from_dict(config_data["checkpointer"])
# Load stream bridge config if present
if "stream_bridge" in config_data:
load_stream_bridge_config_from_dict(config_data["stream_bridge"])
# Always refresh ACP agent config so removed entries do not linger across reloads.
load_acp_config_from_dict(config_data.get("acp_agents", {}))
# Load extensions config separately (it's in a different file)
extensions_config = ExtensionsConfig.from_file()
config_data["extensions"] = extensions_config.model_dump()
result = cls.model_validate(config_data)
return result
@classmethod
def _apply_database_defaults(cls, config_data: dict[str, Any]) -> None:
"""Apply config.yaml defaults for persistence when the section is absent."""
database_config = config_data.get("database")
if database_config is None:
database_config = {}
config_data["database"] = database_config
if not isinstance(database_config, dict):
return
for key, value in CONFIG_FILE_DATABASE_DEFAULTS.items():
database_config.setdefault(key, value)
@classmethod
def _check_config_version(cls, config_data: dict, config_path: Path) -> None:
"""Check if the user's config.yaml is outdated compared to config.example.yaml.
Emits a warning if the user's config_version is lower than the example's.
Missing config_version is treated as version 0 (pre-versioning).
"""
try:
user_version = int(config_data.get("config_version", 0))
except (TypeError, ValueError):
user_version = 0
# Find config.example.yaml by searching config.yaml's directory and its parents
example_path = None
search_dir = config_path.parent
for _ in range(5): # search up to 5 levels
candidate = search_dir / "config.example.yaml"
if candidate.exists():
example_path = candidate
break
parent = search_dir.parent
if parent == search_dir:
break
search_dir = parent
if example_path is None:
return
try:
with open(example_path, encoding="utf-8") as f:
example_data = yaml.safe_load(f)
raw = example_data.get("config_version", 0) if example_data else 0
try:
example_version = int(raw)
except (TypeError, ValueError):
example_version = 0
except Exception:
return
if user_version < example_version:
logger.warning(
"Your config.yaml (version %d) is outdated — the latest version is %d. Run `make config-upgrade` to merge new fields into your config.",
user_version,
example_version,
)
@classmethod
def resolve_env_variables(cls, config: Any) -> Any:
"""Recursively resolve environment variables in the config.
Environment variables are resolved using the `os.getenv` function. Example: $OPENAI_API_KEY
Args:
config: The config to resolve environment variables in.
Returns:
The config with environment variables resolved.
"""
if isinstance(config, str):
if config.startswith("$"):
env_value = os.getenv(config[1:])
if env_value is None:
raise ValueError(f"Environment variable {config[1:]} not found for config value {config}")
return env_value
return config
elif isinstance(config, dict):
return {k: cls.resolve_env_variables(v) for k, v in config.items()}
elif isinstance(config, list):
return [cls.resolve_env_variables(item) for item in config]
return config
def get_model_config(self, name: str) -> ModelConfig | None:
"""Get the model config by name.
Args:
name: The name of the model to get the config for.
Returns:
The model config if found, otherwise None.
"""
return next((model for model in self.models if model.name == name), None)
def get_tool_config(self, name: str) -> ToolConfig | None:
"""Get the tool config by name.
Args:
name: The name of the tool to get the config for.
Returns:
The tool config if found, otherwise None.
"""
return next((tool for tool in self.tools if tool.name == name), None)
def get_tool_group_config(self, name: str) -> ToolGroupConfig | None:
"""Get the tool group config by name.
Args:
name: The name of the tool group to get the config for.
Returns:
The tool group config if found, otherwise None.
"""
return next((group for group in self.tool_groups if group.name == name), None)
# Compatibility singleton layer for code paths that have not yet been
# migrated to explicit ``AppConfig`` threading. New composition roots should
# prefer constructing ``AppConfig`` once and passing it down directly.
_app_config: AppConfig | None = None
_app_config_path: Path | None = None
_app_config_mtime: float | None = None
_app_config_is_custom = False
_current_app_config: ContextVar[AppConfig | None] = ContextVar("deerflow_current_app_config", default=None)
_current_app_config_stack: ContextVar[tuple[AppConfig | None, ...]] = ContextVar("deerflow_current_app_config_stack", default=())
def _get_config_mtime(config_path: Path) -> float | None:
"""Get the modification time of a config file if it exists."""
try:
return config_path.stat().st_mtime
except OSError:
return None
def _load_and_cache_app_config(config_path: str | None = None) -> AppConfig:
"""Load config from disk and refresh cache metadata."""
global _app_config, _app_config_path, _app_config_mtime, _app_config_is_custom
resolved_path = AppConfig.resolve_config_path(config_path)
_app_config = AppConfig.from_file(str(resolved_path))
_app_config_path = resolved_path
_app_config_mtime = _get_config_mtime(resolved_path)
_app_config_is_custom = False
return _app_config
def get_app_config() -> AppConfig:
"""Get the DeerFlow config instance.
Returns a cached singleton instance and automatically reloads it when the
underlying config file path or modification time changes. Use
`reload_app_config()` to force a reload, or `reset_app_config()` to clear
the cache.
"""
global _app_config, _app_config_path, _app_config_mtime
runtime_override = _current_app_config.get()
if runtime_override is not None:
return runtime_override
if _app_config is not None and _app_config_is_custom:
return _app_config
resolved_path = AppConfig.resolve_config_path()
current_mtime = _get_config_mtime(resolved_path)
should_reload = _app_config is None or _app_config_path != resolved_path or _app_config_mtime != current_mtime
if should_reload:
if _app_config_path == resolved_path and _app_config_mtime is not None and current_mtime is not None and _app_config_mtime != current_mtime:
logger.info(
"Config file has been modified (mtime: %s -> %s), reloading AppConfig",
_app_config_mtime,
current_mtime,
)
_load_and_cache_app_config(str(resolved_path))
return _app_config
def reload_app_config(config_path: str | None = None) -> AppConfig:
"""Reload the config from file and update the cached instance.
This is useful when the config file has been modified and you want
to pick up the changes without restarting the application.
Args:
config_path: Optional path to config file. If not provided,
uses the default resolution strategy.
Returns:
The newly loaded AppConfig instance.
"""
return _load_and_cache_app_config(config_path)
def reset_app_config() -> None:
"""Reset the cached config instance.
This clears the singleton cache, causing the next call to
`get_app_config()` to reload from file. Useful for testing
or when switching between different configurations.
"""
global _app_config, _app_config_path, _app_config_mtime, _app_config_is_custom
_app_config = None
_app_config_path = None
_app_config_mtime = None
_app_config_is_custom = False
def set_app_config(config: AppConfig) -> None:
"""Set a custom config instance.
This allows injecting a custom or mock config for testing purposes.
Args:
config: The AppConfig instance to use.
"""
global _app_config, _app_config_path, _app_config_mtime, _app_config_is_custom
_app_config = config
_app_config_path = None
_app_config_mtime = None
_app_config_is_custom = True
def peek_current_app_config() -> AppConfig | None:
"""Return the runtime-scoped AppConfig override, if one is active."""
return _current_app_config.get()
def push_current_app_config(config: AppConfig) -> None:
"""Push a runtime-scoped AppConfig override for the current execution context."""
stack = _current_app_config_stack.get()
_current_app_config_stack.set(stack + (_current_app_config.get(),))
_current_app_config.set(config)
def pop_current_app_config() -> None:
"""Pop the latest runtime-scoped AppConfig override for the current execution context."""
stack = _current_app_config_stack.get()
if not stack:
_current_app_config.set(None)
return
previous = stack[-1]
_current_app_config_stack.set(stack[:-1])
_current_app_config.set(previous)