feat(observability): Structured logging with automatic trace context propagation
This commit is contained in:
@@ -30,6 +30,7 @@ from framework.graph.node import (
|
||||
from framework.graph.output_cleaner import CleansingConfig, OutputCleaner
|
||||
from framework.graph.validator import OutputValidator
|
||||
from framework.llm.provider import LLMProvider, Tool
|
||||
from framework.observability import set_trace_context
|
||||
from framework.runtime.core import Runtime
|
||||
|
||||
|
||||
@@ -184,6 +185,9 @@ class GraphExecutor:
|
||||
Returns:
|
||||
ExecutionResult with output and metrics
|
||||
"""
|
||||
# Add agent_id to trace context for correlation
|
||||
set_trace_context(agent_id=graph.id)
|
||||
|
||||
# Validate graph
|
||||
errors = graph.validate()
|
||||
if errors:
|
||||
|
||||
@@ -0,0 +1,23 @@
|
||||
"""
|
||||
Observability module for automatic trace correlation and structured logging.
|
||||
|
||||
This module provides zero-friction observability:
|
||||
- Automatic trace context propagation via ContextVar
|
||||
- Structured JSON logging for production
|
||||
- Human-readable logging for development
|
||||
- No manual ID passing required
|
||||
"""
|
||||
|
||||
from framework.observability.logging import (
|
||||
clear_trace_context,
|
||||
configure_logging,
|
||||
get_trace_context,
|
||||
set_trace_context,
|
||||
)
|
||||
|
||||
__all__ = [
|
||||
"configure_logging",
|
||||
"get_trace_context",
|
||||
"set_trace_context",
|
||||
"clear_trace_context",
|
||||
]
|
||||
@@ -0,0 +1,253 @@
|
||||
"""
|
||||
Structured logging with automatic trace context propagation.
|
||||
|
||||
Key Features:
|
||||
- Zero developer friction: Standard logger.info() calls get automatic context
|
||||
- ContextVar-based propagation: Thread-safe and async-safe
|
||||
- Dual output modes: JSON for production, human-readable for development
|
||||
- Correlation IDs: trace_id follows entire request flow automatically
|
||||
|
||||
Architecture:
|
||||
Runtime.start_run() → Generates trace_id, sets context once
|
||||
↓ (automatic propagation via ContextVar)
|
||||
GraphExecutor.execute() → Adds agent_id to context
|
||||
↓ (automatic propagation)
|
||||
Node.execute() → Adds node_id to context
|
||||
↓ (automatic propagation)
|
||||
User code → logger.info("message") → Gets ALL context automatically!
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
from contextvars import ContextVar
|
||||
from datetime import UTC, datetime
|
||||
from typing import Any
|
||||
|
||||
# Context variable for trace propagation
|
||||
# ContextVar is thread-safe and async-safe - perfect for concurrent agent execution
|
||||
trace_context: ContextVar[dict[str, Any] | None] = ContextVar(
|
||||
"trace_context", default=None
|
||||
)
|
||||
|
||||
|
||||
class StructuredFormatter(logging.Formatter):
|
||||
"""
|
||||
JSON formatter for structured logging.
|
||||
|
||||
Produces machine-parseable log entries with:
|
||||
- Standard fields (timestamp, level, logger, message)
|
||||
- Trace context (trace_id, execution_id, agent_id, etc.) - AUTOMATIC
|
||||
- Custom fields from extra dict
|
||||
"""
|
||||
|
||||
def format(self, record: logging.LogRecord) -> str:
|
||||
"""Format log record as JSON."""
|
||||
# Get trace context for correlation - AUTOMATIC!
|
||||
context = trace_context.get() or {}
|
||||
|
||||
# Build base log entry
|
||||
log_entry = {
|
||||
"timestamp": datetime.now(UTC).isoformat(),
|
||||
"level": record.levelname.lower(),
|
||||
"logger": record.name,
|
||||
"message": record.getMessage(),
|
||||
}
|
||||
|
||||
# Add trace context (trace_id, execution_id, agent_id, etc.) - AUTOMATIC!
|
||||
log_entry.update(context)
|
||||
|
||||
# Add custom fields from extra (optional)
|
||||
event = getattr(record, "event", None)
|
||||
if event is not None:
|
||||
log_entry["event"] = event
|
||||
|
||||
latency_ms = getattr(record, "latency_ms", None)
|
||||
if latency_ms is not None:
|
||||
log_entry["latency_ms"] = latency_ms
|
||||
|
||||
tokens_used = getattr(record, "tokens_used", None)
|
||||
if tokens_used is not None:
|
||||
log_entry["tokens_used"] = tokens_used
|
||||
|
||||
node_id = getattr(record, "node_id", None)
|
||||
if node_id is not None:
|
||||
log_entry["node_id"] = node_id
|
||||
|
||||
model = getattr(record, "model", None)
|
||||
if model is not None:
|
||||
log_entry["model"] = model
|
||||
|
||||
# Add exception info if present
|
||||
if record.exc_info:
|
||||
log_entry["exception"] = self.formatException(record.exc_info)
|
||||
|
||||
return json.dumps(log_entry)
|
||||
|
||||
|
||||
class HumanReadableFormatter(logging.Formatter):
|
||||
"""
|
||||
Human-readable formatter for development.
|
||||
|
||||
Provides colorized logs with trace context for local debugging.
|
||||
Includes trace_id prefix for correlation - AUTOMATIC!
|
||||
"""
|
||||
|
||||
COLORS = {
|
||||
"DEBUG": "\033[36m", # Cyan
|
||||
"INFO": "\033[32m", # Green
|
||||
"WARNING": "\033[33m", # Yellow
|
||||
"ERROR": "\033[31m", # Red
|
||||
"CRITICAL": "\033[35m", # Magenta
|
||||
}
|
||||
RESET = "\033[0m"
|
||||
|
||||
def format(self, record: logging.LogRecord) -> str:
|
||||
"""Format log record as human-readable string."""
|
||||
# Get trace context - AUTOMATIC!
|
||||
context = trace_context.get() or {}
|
||||
trace_id = context.get("trace_id", "")
|
||||
execution_id = context.get("execution_id", "")
|
||||
agent_id = context.get("agent_id", "")
|
||||
|
||||
# Build context prefix
|
||||
prefix_parts = []
|
||||
if trace_id:
|
||||
prefix_parts.append(f"trace:{trace_id[:8]}")
|
||||
if execution_id:
|
||||
prefix_parts.append(f"exec:{execution_id[-8:]}")
|
||||
if agent_id:
|
||||
prefix_parts.append(f"agent:{agent_id}")
|
||||
|
||||
context_prefix = f"[{' | '.join(prefix_parts)}] " if prefix_parts else ""
|
||||
|
||||
# Get color
|
||||
color = self.COLORS.get(record.levelname, "")
|
||||
reset = self.RESET
|
||||
|
||||
# Format log level (5 chars wide for alignment)
|
||||
level = f"{record.levelname:<8}"
|
||||
|
||||
# Add event if present
|
||||
event = ""
|
||||
record_event = getattr(record, "event", None)
|
||||
if record_event is not None:
|
||||
event = f" [{record_event}]"
|
||||
|
||||
# Format message: [LEVEL] [trace context] message
|
||||
return f"{color}[{level}]{reset} {context_prefix}{record.getMessage()}{event}"
|
||||
|
||||
|
||||
def configure_logging(
|
||||
level: str = "INFO",
|
||||
format: str = "auto", # "json", "human", or "auto"
|
||||
) -> None:
|
||||
"""
|
||||
Configure structured logging for the application.
|
||||
|
||||
This should be called ONCE at application startup, typically in:
|
||||
- AgentRunner._setup()
|
||||
- Main entry point
|
||||
- Test fixtures
|
||||
|
||||
Args:
|
||||
level: Log level (DEBUG, INFO, WARNING, ERROR, CRITICAL)
|
||||
format: Output format:
|
||||
- "json": Machine-parseable JSON (for production)
|
||||
- "human": Human-readable with colors (for development)
|
||||
- "auto": JSON if LOG_FORMAT=json or ENV=production, else human
|
||||
|
||||
Examples:
|
||||
# Development mode (human-readable)
|
||||
configure_logging(level="DEBUG", format="human")
|
||||
|
||||
# Production mode (JSON)
|
||||
configure_logging(level="INFO", format="json")
|
||||
|
||||
# Auto-detect from environment
|
||||
configure_logging(level="INFO", format="auto")
|
||||
"""
|
||||
# Auto-detect format
|
||||
if format == "auto":
|
||||
# Use JSON if LOG_FORMAT=json or ENV=production
|
||||
log_format_env = os.getenv("LOG_FORMAT", "").lower()
|
||||
env = os.getenv("ENV", "development").lower()
|
||||
|
||||
if log_format_env == "json" or env == "production":
|
||||
format = "json"
|
||||
else:
|
||||
format = "human"
|
||||
|
||||
# Select formatter
|
||||
if format == "json":
|
||||
formatter = StructuredFormatter()
|
||||
else:
|
||||
formatter = HumanReadableFormatter()
|
||||
|
||||
# Configure handler
|
||||
handler = logging.StreamHandler()
|
||||
handler.setFormatter(formatter)
|
||||
|
||||
# Configure root logger
|
||||
root_logger = logging.getLogger()
|
||||
root_logger.handlers.clear()
|
||||
root_logger.addHandler(handler)
|
||||
root_logger.setLevel(level.upper())
|
||||
|
||||
|
||||
def set_trace_context(**kwargs: Any) -> None:
|
||||
"""
|
||||
Set trace context for current execution.
|
||||
|
||||
Context is stored in a ContextVar and AUTOMATICALLY propagates
|
||||
through async calls within the same execution context.
|
||||
|
||||
This is called by the framework at key points:
|
||||
- Runtime.start_run(): Sets trace_id, execution_id, goal_id
|
||||
- GraphExecutor.execute(): Adds agent_id
|
||||
- Node execution: Adds node_id
|
||||
|
||||
Developers/agents NEVER call this directly - it's framework-managed.
|
||||
|
||||
Args:
|
||||
**kwargs: Context fields (trace_id, execution_id, agent_id, etc.)
|
||||
|
||||
Example (framework code):
|
||||
# In Runtime.start_run()
|
||||
trace_id = f"tr_{uuid.uuid4().hex}" # Full UUID for uniqueness
|
||||
set_trace_context(
|
||||
trace_id=trace_id,
|
||||
execution_id=run_id,
|
||||
goal_id=goal_id
|
||||
)
|
||||
# All subsequent logs in this execution get these fields automatically!
|
||||
"""
|
||||
current = trace_context.get() or {}
|
||||
trace_context.set({**current, **kwargs})
|
||||
|
||||
|
||||
def get_trace_context() -> dict:
|
||||
"""
|
||||
Get current trace context.
|
||||
|
||||
Returns:
|
||||
Dict with trace_id, execution_id, agent_id, etc.
|
||||
Empty dict if no context set.
|
||||
"""
|
||||
context = trace_context.get() or {}
|
||||
return context.copy()
|
||||
|
||||
|
||||
def clear_trace_context() -> None:
|
||||
"""
|
||||
Clear trace context.
|
||||
|
||||
Useful for:
|
||||
- Cleanup between test runs
|
||||
- Starting a completely new execution context
|
||||
- Manual context management (rare)
|
||||
|
||||
Note: Framework typically doesn't need to call this - ContextVar
|
||||
is execution-scoped and cleans itself up automatically.
|
||||
"""
|
||||
trace_context.set(None)
|
||||
@@ -405,6 +405,11 @@ class AgentRunner:
|
||||
|
||||
def _setup(self) -> None:
|
||||
"""Set up runtime, LLM, and executor."""
|
||||
# Configure structured logging (auto-detects JSON vs human-readable)
|
||||
from framework.observability import configure_logging
|
||||
|
||||
configure_logging(level="INFO", format="auto")
|
||||
|
||||
# Set up session context for tools (workspace_id, agent_id, session_id)
|
||||
workspace_id = "default" # Could be derived from storage path
|
||||
agent_id = self.graph.id or "unknown"
|
||||
|
||||
@@ -13,6 +13,7 @@ from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from framework.observability import set_trace_context
|
||||
from framework.schemas.decision import Decision, DecisionType, Option, Outcome
|
||||
from framework.schemas.run import Run, RunStatus
|
||||
from framework.storage.backend import FileStorage
|
||||
@@ -80,6 +81,17 @@ class Runtime:
|
||||
"""
|
||||
run_id = f"run_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{uuid.uuid4().hex[:8]}"
|
||||
|
||||
# Generate trace_id for correlation across entire execution
|
||||
# Full UUID (128 bits) for uniqueness, displayed shortened in human mode
|
||||
trace_id = f"tr_{uuid.uuid4().hex}"
|
||||
|
||||
# Set trace context - automatically propagates to all logs in this execution
|
||||
set_trace_context(
|
||||
trace_id=trace_id,
|
||||
execution_id=run_id,
|
||||
goal_id=goal_id,
|
||||
)
|
||||
|
||||
self._current_run = Run(
|
||||
id=run_id,
|
||||
goal_id=goal_id,
|
||||
|
||||
@@ -314,6 +314,13 @@ class ExecutionStream:
|
||||
# Create runtime adapter for this execution
|
||||
runtime_adapter = StreamRuntimeAdapter(self._runtime, execution_id)
|
||||
|
||||
# Start run to set trace context (CRITICAL for observability)
|
||||
runtime_adapter.start_run(
|
||||
goal_id=self.goal.id,
|
||||
goal_description=self.goal.description,
|
||||
input_data=ctx.input_data,
|
||||
)
|
||||
|
||||
# Create executor for this execution
|
||||
executor = GraphExecutor(
|
||||
runtime=runtime_adapter,
|
||||
@@ -337,6 +344,13 @@ class ExecutionStream:
|
||||
# Store result with retention
|
||||
self._record_execution_result(execution_id, result)
|
||||
|
||||
# End run to complete trace (for observability)
|
||||
runtime_adapter.end_run(
|
||||
success=result.success,
|
||||
narrative=f"Execution {'succeeded' if result.success else 'failed'}",
|
||||
output_data=result.output,
|
||||
)
|
||||
|
||||
# Update context
|
||||
ctx.completed_at = datetime.now()
|
||||
ctx.status = "completed" if result.success else "failed"
|
||||
@@ -379,6 +393,16 @@ class ExecutionStream:
|
||||
),
|
||||
)
|
||||
|
||||
# End run with failure (for observability)
|
||||
try:
|
||||
runtime_adapter.end_run(
|
||||
success=False,
|
||||
narrative=f"Execution failed: {str(e)}",
|
||||
output_data={},
|
||||
)
|
||||
except Exception:
|
||||
pass # Don't let end_run errors mask the original error
|
||||
|
||||
# Emit failure event
|
||||
if self._event_bus:
|
||||
await self._event_bus.emit_execution_failed(
|
||||
|
||||
@@ -12,6 +12,7 @@ import uuid
|
||||
from datetime import datetime
|
||||
from typing import TYPE_CHECKING, Any
|
||||
|
||||
from framework.observability import set_trace_context
|
||||
from framework.schemas.decision import Decision, DecisionType, Option, Outcome
|
||||
from framework.schemas.run import Run, RunStatus
|
||||
from framework.storage.concurrent import ConcurrentStorage
|
||||
@@ -120,6 +121,19 @@ class StreamRuntime:
|
||||
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||||
run_id = f"run_{self.stream_id}_{timestamp}_{uuid.uuid4().hex[:8]}"
|
||||
|
||||
# Generate trace_id for correlation across entire execution
|
||||
# Full UUID (128 bits) for uniqueness, displayed shortened in human mode
|
||||
trace_id = f"tr_{uuid.uuid4().hex}"
|
||||
|
||||
# Set trace context - automatically propagates to all logs in this execution
|
||||
set_trace_context(
|
||||
trace_id=trace_id,
|
||||
execution_id=execution_id,
|
||||
run_id=run_id,
|
||||
goal_id=goal_id,
|
||||
stream_id=self.stream_id,
|
||||
)
|
||||
|
||||
run = Run(
|
||||
id=run_id,
|
||||
goal_id=goal_id,
|
||||
|
||||
Reference in New Issue
Block a user