Compare commits

...

7 Commits

Author SHA1 Message Date
Richard Tang dafc90b5a7 fix: codex api wiring mode 2026-04-01 18:03:38 -07:00
Richard Tang 04ab4645e6 feat: models and parameters 2026-04-01 18:03:18 -07:00
Vasu Bansal ba0b0130f4 refactor: remove duplicate kimi provider branch 2026-04-01 12:52:53 +05:30
Vasu Bansal 92f07ce194 fix: harden codex responses transport 2026-04-01 12:34:27 +05:30
Vasu Bansal 34b9c33581 test: skip POSIX execute-bit check on Windows 2026-04-01 11:17:53 +05:30
Vasu Bansal 1bc1bce94b test: cover codex responses transport for newer models 2026-04-01 09:54:34 +05:30
Vasu Bansal 41a7d6d110 fix: add adapter-first codex provider integration 2026-04-01 08:59:22 +05:30
15 changed files with 1278 additions and 265 deletions
+7 -26
View File
@@ -13,6 +13,7 @@ from pathlib import Path
from typing import Any
from framework.graph.edge import DEFAULT_MAX_TOKENS
from framework.llm.codex_backend import CODEX_API_BASE, build_codex_litellm_kwargs
# ---------------------------------------------------------------------------
# Low-level config file access
@@ -125,7 +126,6 @@ def get_worker_api_key() -> str | None:
return token
except ImportError:
pass
api_key_env_var = worker_llm.get("api_key_env_var")
if api_key_env_var:
return os.environ.get(api_key_env_var)
@@ -141,7 +141,7 @@ def get_worker_api_base() -> str | None:
return get_api_base()
if worker_llm.get("use_codex_subscription"):
return "https://chatgpt.com/backend-api/codex"
return CODEX_API_BASE
if worker_llm.get("use_kimi_code_subscription"):
return "https://api.kimi.com/coding"
if worker_llm.get("use_antigravity_subscription"):
@@ -169,23 +169,14 @@ def get_worker_llm_extra_kwargs() -> dict[str, Any]:
if worker_llm.get("use_codex_subscription"):
api_key = get_worker_api_key()
if api_key:
headers: dict[str, str] = {
"Authorization": f"Bearer {api_key}",
"User-Agent": "CodexBar",
}
account_id = None
try:
from framework.runner.runner import get_codex_account_id
account_id = get_codex_account_id()
if account_id:
headers["ChatGPT-Account-Id"] = account_id
except ImportError:
pass
return {
"extra_headers": headers,
"store": False,
"allowed_openai_params": ["store"],
}
return build_codex_litellm_kwargs(api_key, account_id=account_id)
if worker_llm.get("provider") == "ollama":
return {"num_ctx": worker_llm.get("num_ctx", 16384)}
return {}
@@ -276,7 +267,6 @@ def get_api_key() -> str | None:
return token
except ImportError:
pass
# Standard env-var path (covers ZAI Code and all API-key providers)
api_key_env_var = llm.get("api_key_env_var")
if api_key_env_var:
@@ -382,7 +372,7 @@ def get_api_base() -> str | None:
llm = get_hive_config().get("llm", {})
if llm.get("use_codex_subscription"):
# Codex subscription routes through the ChatGPT backend, not api.openai.com.
return "https://chatgpt.com/backend-api/codex"
return CODEX_API_BASE
if llm.get("use_kimi_code_subscription"):
# Kimi Code uses an Anthropic-compatible endpoint (no /v1 suffix).
return "https://api.kimi.com/coding"
@@ -417,23 +407,14 @@ def get_llm_extra_kwargs() -> dict[str, Any]:
if llm.get("use_codex_subscription"):
api_key = get_api_key()
if api_key:
headers: dict[str, str] = {
"Authorization": f"Bearer {api_key}",
"User-Agent": "CodexBar",
}
account_id = None
try:
from framework.runner.runner import get_codex_account_id
account_id = get_codex_account_id()
if account_id:
headers["ChatGPT-Account-Id"] = account_id
except ImportError:
pass
return {
"extra_headers": headers,
"store": False,
"allowed_openai_params": ["store"],
}
return build_codex_litellm_kwargs(api_key, account_id=account_id)
if llm.get("provider") == "ollama":
# Pass num_ctx to Ollama so it doesn't silently truncate the ~9.5k Queen prompt.
# Ollama's default num_ctx is only 2048. We set it to 16384 here so LiteLLM
+260
View File
@@ -0,0 +1,260 @@
"""Codex adapter for Hive's LiteLLM provider.
Codex CLI is tool-first and event-structured: tool invocations and tool results
are emitted as explicit response items, not as plain-text workflow narration.
This adapter keeps the ChatGPT Codex backend aligned with Hive's normal
provider contract by normalizing Codex request shaping and response recovery at
the provider boundary.
"""
from __future__ import annotations
import logging
from typing import TYPE_CHECKING, Any
from framework.llm.codex_backend import (
build_codex_extra_headers,
is_codex_api_base,
merge_codex_allowed_openai_params,
normalize_codex_api_base,
)
from framework.llm.provider import Tool
if TYPE_CHECKING:
from collections.abc import Callable
from framework.llm.litellm import LiteLLMProvider
from framework.llm.stream_events import StreamEvent
logger = logging.getLogger(__name__)
_CODEX_CRITICAL_TOOL_NAMES = frozenset(
{
"ask_user",
"ask_user_multiple",
"set_output",
"escalate",
"save_agent_draft",
"confirm_and_build",
"initialize_and_build_agent",
}
)
_CODEX_SYSTEM_CHUNK_CHARS = 3500
_CODEX_SYSTEM_PREAMBLE = """# Codex Execution Contract
Follow the system sections below in order.
- Obey every CRITICAL, MUST, NEVER, and ONLY instruction exactly.
- When tools are available, emit structured tool calls instead of replying with plain-text promises.
- Do not skip required workflow boundaries or approval gates.
"""
class CodexResponsesAdapter:
"""Normalize the ChatGPT Codex backend to Hive's standard provider semantics."""
def __init__(self, provider: LiteLLMProvider):
self._provider = provider
@property
def enabled(self) -> bool:
"""Return True when the provider targets the ChatGPT Codex backend."""
return is_codex_api_base(self._provider.api_base)
def chunk_system_prompt(self, system: str) -> list[str]:
"""Break large system prompts into smaller Codex-friendly chunks."""
normalized = system.replace("\r\n", "\n").strip()
if not normalized:
return []
sections: list[str] = []
current: list[str] = []
for line in normalized.splitlines():
if line.startswith("#") and current:
sections.append("\n".join(current).strip())
current = [line]
else:
current.append(line)
if current:
sections.append("\n".join(current).strip())
chunks: list[str] = []
for section in sections:
if len(section) <= _CODEX_SYSTEM_CHUNK_CHARS:
chunks.append(section)
continue
paragraphs = [
paragraph.strip() for paragraph in section.split("\n\n") if paragraph.strip()
]
current_chunk = ""
for paragraph in paragraphs:
candidate = paragraph if not current_chunk else f"{current_chunk}\n\n{paragraph}"
if current_chunk and len(candidate) > _CODEX_SYSTEM_CHUNK_CHARS:
chunks.append(current_chunk)
current_chunk = paragraph
else:
current_chunk = candidate
if current_chunk:
chunks.append(current_chunk)
return chunks or [normalized]
def build_system_messages(
self,
system: str,
*,
json_mode: bool,
) -> list[dict[str, Any]]:
"""Build Codex system messages in the tool-first format Codex CLI expects."""
system_messages: list[dict[str, Any]] = []
if system:
chunks = self.chunk_system_prompt(system)
if chunks:
if len(chunks) > 1 or len(chunks[0]) > _CODEX_SYSTEM_CHUNK_CHARS:
system_messages.append({"role": "system", "content": _CODEX_SYSTEM_PREAMBLE})
for chunk in chunks:
system_messages.append({"role": "system", "content": chunk})
else:
system_messages.append(
{"role": "system", "content": "You are a helpful assistant."}
)
else:
system_messages.append({"role": "system", "content": "You are a helpful assistant."})
if json_mode:
system_messages.append(
{"role": "system", "content": "Please respond with a valid JSON object."}
)
return system_messages
def derive_tool_choice(
self,
messages: list[dict[str, Any]],
tools: list[Tool] | None,
) -> str | dict[str, Any] | None:
"""Force structured tool use when Codex sees critical framework tools."""
if not tools:
return None
tool_names = {tool.name for tool in tools}
if not (tool_names & _CODEX_CRITICAL_TOOL_NAMES):
return None
last_role = next(
(m.get("role") for m in reversed(messages) if m.get("role") != "system"),
None,
)
if last_role == "assistant":
return None
return "required"
def harden_request_kwargs(self, kwargs: dict[str, Any]) -> dict[str, Any]:
"""Strip unsupported params and inject the Codex backend headers."""
cleaned = dict(kwargs)
cleaned["api_base"] = normalize_codex_api_base(
cleaned.get("api_base") or self._provider.api_base
)
cleaned["store"] = False
cleaned["allowed_openai_params"] = merge_codex_allowed_openai_params(
cleaned.get("allowed_openai_params")
)
cleaned.pop("max_tokens", None)
cleaned.pop("stream_options", None)
extra_headers = dict(cleaned.get("extra_headers") or {})
if "ChatGPT-Account-Id" not in extra_headers:
try:
from framework.runner.runner import get_codex_account_id
account_id = get_codex_account_id()
if account_id:
extra_headers["ChatGPT-Account-Id"] = account_id
except Exception:
logger.debug("Could not populate ChatGPT-Account-Id", exc_info=True)
cleaned["extra_headers"] = build_codex_extra_headers(
self._provider.api_key,
account_id=extra_headers.get("ChatGPT-Account-Id"),
extra_headers=extra_headers,
)
return cleaned
async def recover_empty_stream(
self,
kwargs: dict[str, Any],
*,
last_role: str | None,
acompletion: Callable[..., Any],
) -> list[StreamEvent] | None:
"""Try a non-stream completion when Codex returns an empty stream."""
fallback_kwargs = dict(kwargs)
fallback_kwargs.pop("stream", None)
fallback_kwargs.pop("stream_options", None)
fallback_kwargs = self._provider._sanitize_request_kwargs(fallback_kwargs, stream=False)
try:
response = await acompletion(**fallback_kwargs)
except Exception as exc:
logger.debug(
"[stream-recover] %s non-stream fallback after empty %s stream failed: %s",
self._provider.model,
last_role,
exc,
)
return None
events = self._provider._build_stream_events_from_nonstream_response(response)
if events:
logger.info(
"[stream-recover] %s recovered empty %s stream via non-stream completion",
self._provider.model,
last_role,
)
return events
return None
def merge_tool_call_chunk(
self,
tool_calls_acc: dict[int, dict[str, str]],
tc: Any,
last_tool_idx: int,
) -> int:
"""Merge a streamed tool-call chunk, compensating for broken bridge indexes."""
idx = tc.index if hasattr(tc, "index") and tc.index is not None else 0
tc_id = getattr(tc, "id", None) or ""
func = getattr(tc, "function", None)
func_name = getattr(func, "name", "") if func is not None else ""
func_args = getattr(func, "arguments", "") if func is not None else ""
if tc_id:
existing_idx = next(
(key for key, value in tool_calls_acc.items() if value["id"] == tc_id),
None,
)
if existing_idx is not None:
idx = existing_idx
elif idx in tool_calls_acc and tool_calls_acc[idx]["id"] not in ("", tc_id):
idx = max(tool_calls_acc.keys(), default=-1) + 1
last_tool_idx = idx
elif func_name:
if (
last_tool_idx in tool_calls_acc
and tool_calls_acc[last_tool_idx]["name"]
and tool_calls_acc[last_tool_idx]["name"] != func_name
and tool_calls_acc[last_tool_idx]["arguments"]
):
idx = max(tool_calls_acc.keys(), default=-1) + 1
last_tool_idx = idx
else:
idx = last_tool_idx if tool_calls_acc else idx
else:
idx = last_tool_idx if tool_calls_acc else idx
if idx not in tool_calls_acc:
tool_calls_acc[idx] = {"id": "", "name": "", "arguments": ""}
if tc_id:
tool_calls_acc[idx]["id"] = tc_id
if func_name:
tool_calls_acc[idx]["name"] = func_name
if func_args:
tool_calls_acc[idx]["arguments"] += func_args
return idx
+85
View File
@@ -0,0 +1,85 @@
"""Shared helpers for Codex's ChatGPT-backed transport.
Codex CLI talks to the ChatGPT Codex backend, which is not the standard
platform OpenAI API. Hive keeps its normal provider contract by centralizing
the transport-specific headers and request kwargs here.
"""
from __future__ import annotations
from typing import Any
from urllib.parse import urlparse, urlunparse
CODEX_API_BASE = "https://chatgpt.com/backend-api/codex"
CODEX_USER_AGENT = "CodexBar"
CODEX_ALLOWED_OPENAI_PARAMS = ("store",)
_CODEX_HOST = "chatgpt.com"
_CODEX_PATH = "/backend-api/codex"
def is_codex_api_base(api_base: str | None) -> bool:
"""Return True when *api_base* targets the ChatGPT Codex backend."""
if not api_base:
return False
parsed = urlparse(api_base)
path = parsed.path.rstrip("/")
return (
parsed.scheme == "https"
and parsed.hostname == _CODEX_HOST
and (path == _CODEX_PATH or path == f"{_CODEX_PATH}/responses")
)
def normalize_codex_api_base(api_base: str | None) -> str | None:
"""Normalize ChatGPT Codex backend URLs to the stable base endpoint."""
if not api_base:
return api_base
parsed = urlparse(api_base)
path = parsed.path.rstrip("/")
if not is_codex_api_base(api_base):
return api_base.rstrip("/")
if path.endswith("/responses"):
path = path[: -len("/responses")]
normalized = parsed._replace(path=path, params="", query="", fragment="")
return urlunparse(normalized).rstrip("/")
def merge_codex_allowed_openai_params(params: list[str] | tuple[str, ...] | None) -> list[str]:
"""Ensure Codex-required pass-through params are always present."""
allowed = set(params or [])
allowed.update(CODEX_ALLOWED_OPENAI_PARAMS)
return sorted(allowed)
def build_codex_extra_headers(
api_key: str | None,
*,
account_id: str | None = None,
extra_headers: dict[str, str] | None = None,
) -> dict[str, str]:
"""Build headers for the ChatGPT Codex backend."""
headers = dict(extra_headers or {})
if api_key:
headers.setdefault("Authorization", f"Bearer {api_key}")
headers.setdefault("User-Agent", CODEX_USER_AGENT)
if account_id:
headers.setdefault("ChatGPT-Account-Id", account_id)
return headers
def build_codex_litellm_kwargs(
api_key: str | None,
*,
account_id: str | None = None,
extra_headers: dict[str, str] | None = None,
) -> dict[str, Any]:
"""Return the LiteLLM kwargs required by the ChatGPT Codex backend."""
return {
"extra_headers": build_codex_extra_headers(
api_key,
account_id=account_id,
extra_headers=extra_headers,
),
"store": False,
"allowed_openai_params": list(CODEX_ALLOWED_OPENAI_PARAMS),
}
+443 -189
View File
@@ -16,6 +16,7 @@ import os
import re
import time
from collections.abc import AsyncIterator
from contextlib import contextmanager
from datetime import datetime
from pathlib import Path
from typing import Any
@@ -28,6 +29,8 @@ except ImportError:
RateLimitError = Exception # type: ignore[assignment, misc]
from framework.config import HIVE_LLM_ENDPOINT as HIVE_API_BASE
from framework.llm.codex_adapter import CodexResponsesAdapter
from framework.llm.codex_backend import normalize_codex_api_base
from framework.llm.provider import LLMProvider, LLMResponse, Tool
from framework.llm.stream_events import StreamEvent
@@ -534,7 +537,9 @@ class LiteLLMProvider(LLMProvider):
api_base = api_base.rstrip("/")[:-3]
self.model = model
self.api_key = api_key
self.api_base = api_base or self._default_api_base_for_model(_original_model)
self.api_base = normalize_codex_api_base(
api_base or self._default_api_base_for_model(_original_model)
)
self.extra_kwargs = kwargs
# Detect Claude Code OAuth subscription by checking the api_key prefix.
self._claude_code_oauth = bool(api_key and api_key.startswith("sk-ant-oat"))
@@ -542,25 +547,28 @@ class LiteLLMProvider(LLMProvider):
# Anthropic requires a specific User-Agent for OAuth requests.
eh = self.extra_kwargs.setdefault("extra_headers", {})
eh.setdefault("user-agent", CLAUDE_CODE_USER_AGENT)
# The Codex ChatGPT backend (chatgpt.com/backend-api/codex) rejects
# several standard OpenAI params: max_output_tokens, stream_options.
self._codex_backend = bool(
self.api_base and "chatgpt.com/backend-api/codex" in self.api_base
)
# Antigravity routes through a local OpenAI-compatible proxy — no patches needed.
self._antigravity = bool(self.api_base and "localhost:8069" in self.api_base)
self._codex_adapter = CodexResponsesAdapter(self)
# Backward-compatible alias for existing tests/callers.
self._codex_backend = self._codex_adapter.enabled
if litellm is None:
raise ImportError(
"LiteLLM is not installed. Please install it with: uv pip install litellm"
)
# Note: The Codex ChatGPT backend is a Responses API endpoint at
# The Codex ChatGPT backend is a Responses API endpoint at
# chatgpt.com/backend-api/codex/responses. LiteLLM's model registry
# correctly marks codex models with mode="responses", so we do NOT
# override the mode. The responses_api_bridge in litellm handles
# converting Chat Completions requests to Responses API format.
# marks legacy codex models (gpt-5.3-codex) with mode="responses",
# but newer models like gpt-5.4 default to mode="chat". Force
# mode="responses" so litellm routes through the responses_api_bridge.
if self._codex_backend and litellm is not None:
_strip = self.model.removeprefix("openai/")
_entry = litellm.model_cost.get(_strip, {})
if _entry.get("mode") != "responses":
litellm.model_cost.setdefault(_strip, {})
litellm.model_cost[_strip]["mode"] = "responses"
@staticmethod
def _default_api_base_for_model(model: str) -> str | None:
"""Return provider-specific default API base when required."""
@@ -575,6 +583,161 @@ class LiteLLMProvider(LLMProvider):
return HIVE_API_BASE
return None
@staticmethod
def _normalize_codex_api_base(api_base: str | None) -> str | None:
"""Normalize ChatGPT Codex backend URLs to the stable base endpoint."""
return normalize_codex_api_base(api_base)
def _chunk_codex_system_prompt(self, system: str) -> list[str]:
"""Break large system prompts into smaller Codex-friendly chunks."""
return self._codex_adapter.chunk_system_prompt(system)
@contextmanager
def _codex_responses_mode_override(self, model: str | None = None):
"""Temporarily route Codex requests through LiteLLM's Responses bridge."""
if not self._codex_backend or litellm is None:
yield
return
stripped_model = (model or self.model).removeprefix("openai/")
previous_entry = litellm.model_cost.get(stripped_model)
previous_mode = previous_entry.get("mode") if isinstance(previous_entry, dict) else None
if previous_mode == "responses":
yield
return
if previous_entry is None:
litellm.model_cost[stripped_model] = {"mode": "responses"}
else:
litellm.model_cost[stripped_model] = {**previous_entry, "mode": "responses"}
try:
yield
finally:
if previous_entry is None:
litellm.model_cost.pop(stripped_model, None)
else:
litellm.model_cost[stripped_model] = previous_entry
def _build_request_messages(
self,
messages: list[dict[str, Any]],
system: str,
*,
json_mode: bool,
) -> list[dict[str, Any]]:
"""Build request messages, including Codex-specific prompt chunking."""
full_messages: list[dict[str, Any]] = []
if self._claude_code_oauth:
billing = _claude_code_billing_header(messages)
full_messages.append({"role": "system", "content": billing})
system_messages: list[dict[str, Any]] = []
if system:
if self._codex_backend:
system_messages.extend(
self._codex_adapter.build_system_messages(system, json_mode=json_mode)
)
else:
sys_msg: dict[str, Any] = {"role": "system", "content": system}
if _model_supports_cache_control(self.model):
sys_msg["cache_control"] = {"type": "ephemeral"}
system_messages.append(sys_msg)
elif self._codex_backend:
system_messages.extend(
self._codex_adapter.build_system_messages("", json_mode=json_mode)
)
if json_mode and not self._codex_backend:
json_instruction = "Please respond with a valid JSON object."
if system_messages:
system_messages[0] = {
**system_messages[0],
"content": f"{system_messages[0]['content']}\n\n{json_instruction}",
}
else:
system_messages.append({"role": "system", "content": json_instruction})
full_messages.extend(system_messages)
full_messages.extend(messages)
return [
m
for m in full_messages
if not (
m.get("role") == "assistant" and not m.get("content") and not m.get("tool_calls")
)
]
def _derive_codex_tool_choice(
self,
messages: list[dict[str, Any]],
tools: list[Tool] | None,
) -> str | dict[str, Any] | None:
"""Force tool use for Codex when critical framework tools are available."""
if not self._codex_backend:
return None
return self._codex_adapter.derive_tool_choice(messages, tools)
def _sanitize_request_kwargs(
self,
kwargs: dict[str, Any],
*,
stream: bool,
) -> dict[str, Any]:
"""Normalize provider kwargs, with extra hardening for Codex."""
cleaned = dict(kwargs)
if cleaned.get("metadata") is None:
cleaned.pop("metadata", None)
if self._codex_backend:
cleaned = self._codex_adapter.harden_request_kwargs(cleaned)
if stream:
cleaned["stream"] = True
return cleaned
def _build_completion_kwargs(
self,
messages: list[dict[str, Any]],
system: str,
*,
tools: list[Tool] | None,
max_tokens: int,
response_format: dict[str, Any] | None,
json_mode: bool,
stream: bool,
) -> dict[str, Any]:
"""Build request kwargs for completion/stream calls."""
full_messages = self._build_request_messages(messages, system, json_mode=json_mode)
kwargs: dict[str, Any] = {
"model": self.model,
"messages": full_messages,
**self.extra_kwargs,
}
if not stream:
kwargs["max_tokens"] = max_tokens
else:
kwargs["max_tokens"] = max_tokens
if not self._is_anthropic_model():
kwargs["stream_options"] = {"include_usage": True}
if self.api_key:
kwargs["api_key"] = self.api_key
if self.api_base:
kwargs["api_base"] = self.api_base
if tools:
kwargs["tools"] = [self._tool_to_openai_format(t) for t in tools]
tool_choice = self._derive_codex_tool_choice(full_messages, tools)
if tool_choice is not None:
kwargs["tool_choice"] = tool_choice
elif _is_ollama_model(self.model):
kwargs.setdefault("tool_choice", "auto")
if response_format:
kwargs["response_format"] = response_format
return self._sanitize_request_kwargs(kwargs, stream=stream)
def _completion_with_rate_limit_retry(
self, max_retries: int | None = None, **kwargs: Any
) -> Any:
@@ -583,7 +746,8 @@ class LiteLLMProvider(LLMProvider):
retries = max_retries if max_retries is not None else RATE_LIMIT_MAX_RETRIES
for attempt in range(retries + 1):
try:
response = litellm.completion(**kwargs) # type: ignore[union-attr]
with self._codex_responses_mode_override(model):
response = litellm.completion(**kwargs) # type: ignore[union-attr]
# Some providers (e.g. Gemini) return 200 with empty content on
# rate limit / quota exhaustion instead of a proper 429. Treat
@@ -713,46 +877,15 @@ class LiteLLMProvider(LLMProvider):
)
)
# Prepare messages with system prompt
full_messages = []
if system:
full_messages.append({"role": "system", "content": system})
full_messages.extend(messages)
# Add JSON mode via prompt engineering (works across all providers)
if json_mode:
json_instruction = "\n\nPlease respond with a valid JSON object."
# Append to system message if present, otherwise add as system message
if full_messages and full_messages[0]["role"] == "system":
full_messages[0]["content"] += json_instruction
else:
full_messages.insert(0, {"role": "system", "content": json_instruction.strip()})
# Build kwargs
kwargs: dict[str, Any] = {
"model": self.model,
"messages": full_messages,
"max_tokens": max_tokens,
**self.extra_kwargs,
}
if self.api_key:
kwargs["api_key"] = self.api_key
if self.api_base:
kwargs["api_base"] = self.api_base
# Add tools if provided
if tools:
kwargs["tools"] = [self._tool_to_openai_format(t) for t in tools]
if _is_ollama_model(self.model):
# Ollama requires explicit tool_choice=auto for function calling
# so future readers don't have to guess.
kwargs.setdefault("tool_choice", "auto")
# Add response_format for structured output
# LiteLLM passes this through to the underlying provider
if response_format:
kwargs["response_format"] = response_format
kwargs = self._build_completion_kwargs(
messages,
system,
tools=tools,
max_tokens=max_tokens,
response_format=response_format,
json_mode=json_mode,
stream=False,
)
# Make the call
response = self._completion_with_rate_limit_retry(max_retries=max_retries, **kwargs)
@@ -794,7 +927,8 @@ class LiteLLMProvider(LLMProvider):
retries = max_retries if max_retries is not None else RATE_LIMIT_MAX_RETRIES
for attempt in range(retries + 1):
try:
response = await litellm.acompletion(**kwargs) # type: ignore[union-attr]
with self._codex_responses_mode_override(model):
response = await litellm.acompletion(**kwargs) # type: ignore[union-attr]
content = response.choices[0].message.content if response.choices else None
has_tool_calls = bool(response.choices and response.choices[0].message.tool_calls)
@@ -913,44 +1047,15 @@ class LiteLLMProvider(LLMProvider):
json_mode=json_mode,
)
return await self._collect_stream_to_response(stream_iter)
full_messages: list[dict[str, Any]] = []
if self._claude_code_oauth:
billing = _claude_code_billing_header(messages)
full_messages.append({"role": "system", "content": billing})
if system:
sys_msg: dict[str, Any] = {"role": "system", "content": system}
if _model_supports_cache_control(self.model):
sys_msg["cache_control"] = {"type": "ephemeral"}
full_messages.append(sys_msg)
full_messages.extend(messages)
if json_mode:
json_instruction = "\n\nPlease respond with a valid JSON object."
if full_messages and full_messages[0]["role"] == "system":
full_messages[0]["content"] += json_instruction
else:
full_messages.insert(0, {"role": "system", "content": json_instruction.strip()})
kwargs: dict[str, Any] = {
"model": self.model,
"messages": full_messages,
"max_tokens": max_tokens,
**self.extra_kwargs,
}
if self.api_key:
kwargs["api_key"] = self.api_key
if self.api_base:
kwargs["api_base"] = self.api_base
if tools:
kwargs["tools"] = [self._tool_to_openai_format(t) for t in tools]
if _is_ollama_model(self.model):
# Ollama requires explicit tool_choice=auto for function calling
# so future readers don't have to guess.
kwargs.setdefault("tool_choice", "auto")
if response_format:
kwargs["response_format"] = response_format
kwargs = self._build_completion_kwargs(
messages,
system,
tools=tools,
max_tokens=max_tokens,
response_format=response_format,
json_mode=json_mode,
stream=False,
)
response = await self._acompletion_with_rate_limit_retry(max_retries=max_retries, **kwargs)
@@ -1200,17 +1305,92 @@ class LiteLLMProvider(LLMProvider):
return parsed
return None
@staticmethod
def _normalize_pythonish_tool_arguments(raw_arguments: str) -> str:
"""Convert common JSON-like literals into a form ast.literal_eval can parse."""
replacements = {
"true": "True",
"false": "False",
"null": "None",
}
out: list[str] = []
token: list[str] = []
in_string = False
string_quote = ""
escaped = False
def flush_token() -> None:
if not token:
return
word = "".join(token)
out.append(replacements.get(word, word))
token.clear()
for char in raw_arguments:
if in_string:
out.append(char)
if escaped:
escaped = False
elif char == "\\":
escaped = True
elif char == string_quote:
in_string = False
continue
if char in {'"', "'"}:
flush_token()
in_string = True
string_quote = char
out.append(char)
continue
if char.isalpha():
token.append(char)
continue
flush_token()
out.append(char)
flush_token()
return "".join(out)
@staticmethod
def _strip_tool_argument_fence(raw_arguments: str) -> str:
"""Remove surrounding fenced-code markers from streamed tool arguments."""
stripped = raw_arguments.strip()
if not stripped.startswith("```") or not stripped.endswith("```"):
return stripped
lines = stripped.splitlines()
if len(lines) >= 2:
return "\n".join(lines[1:-1]).strip()
return stripped.strip("`").strip()
def _parse_pythonish_tool_arguments(self, raw_arguments: str) -> dict[str, Any] | None:
"""Parse single-quoted / trailing-comma argument payloads safely."""
stripped = self._strip_tool_argument_fence(raw_arguments)
if not stripped or stripped[0] != "{":
return None
candidate = self._close_truncated_json_fragment(stripped)
candidate = self._normalize_pythonish_tool_arguments(candidate)
try:
parsed = ast.literal_eval(candidate)
except (SyntaxError, ValueError):
return None
return parsed if isinstance(parsed, dict) else None
def _parse_tool_call_arguments(self, raw_arguments: str, tool_name: str) -> dict[str, Any]:
"""Parse streamed tool arguments, repairing truncation when possible."""
stripped = self._strip_tool_argument_fence(raw_arguments)
try:
parsed = json.loads(raw_arguments) if raw_arguments else {}
parsed = json.loads(stripped) if stripped else {}
except json.JSONDecodeError:
parsed = None
if isinstance(parsed, dict):
return parsed
repaired = self._repair_truncated_tool_arguments(raw_arguments)
repaired = self._repair_truncated_tool_arguments(stripped)
if repaired is not None:
logger.warning(
"[tool-args] Recovered truncated arguments for %s on %s",
@@ -1219,6 +1399,15 @@ class LiteLLMProvider(LLMProvider):
)
return repaired
pythonish = self._parse_pythonish_tool_arguments(stripped)
if pythonish is not None:
logger.warning(
"[tool-args] Recovered malformed arguments for %s on %s",
tool_name,
self.model,
)
return pythonish
raise ValueError(
f"Failed to parse tool call arguments for '{tool_name}' (likely truncated JSON)."
)
@@ -1546,6 +1735,144 @@ class LiteLLMProvider(LLMProvider):
model=response.model,
)
def _build_stream_events_from_nonstream_response(
self,
response: Any,
) -> list[StreamEvent]:
"""Convert a non-stream completion response into stream events."""
from framework.llm.stream_events import (
FinishEvent,
TextDeltaEvent,
TextEndEvent,
ToolCallEvent,
)
choices = getattr(response, "choices", None) or []
if not choices:
output_text = getattr(response, "output_text", "") or ""
if not output_text:
return []
from framework.llm.stream_events import FinishEvent, TextDeltaEvent, TextEndEvent
usage = getattr(response, "usage", None)
return [
TextDeltaEvent(content=output_text, snapshot=output_text),
TextEndEvent(full_text=output_text),
FinishEvent(
stop_reason="stop",
input_tokens=getattr(usage, "prompt_tokens", 0) or 0 if usage else 0,
output_tokens=getattr(usage, "completion_tokens", 0) or 0 if usage else 0,
model=getattr(response, "model", None) or self.model,
),
]
choice = choices[0]
message = getattr(choice, "message", None)
content = self._extract_message_text(message)
tool_calls = getattr(message, "tool_calls", None) or []
events: list[StreamEvent] = []
for tc in tool_calls:
parsed_args = self._coerce_tool_input(
tc.function.arguments if tc.function else {},
tc.function.name if tc.function else "",
)
events.append(
ToolCallEvent(
tool_use_id=getattr(tc, "id", ""),
tool_name=tc.function.name if tc.function else "",
tool_input=parsed_args,
)
)
if content:
events.append(TextDeltaEvent(content=content, snapshot=content))
events.append(TextEndEvent(full_text=content))
usage = getattr(response, "usage", None)
input_tokens = getattr(usage, "prompt_tokens", 0) or 0 if usage else 0
output_tokens = getattr(usage, "completion_tokens", 0) or 0 if usage else 0
cached_tokens = 0
if usage:
details = getattr(usage, "prompt_tokens_details", None)
cached_tokens = (
getattr(details, "cached_tokens", 0) or 0
if details is not None
else getattr(usage, "cache_read_input_tokens", 0) or 0
)
events.append(
FinishEvent(
stop_reason=getattr(choice, "finish_reason", None)
or ("tool_calls" if tool_calls else "stop"),
input_tokens=input_tokens,
output_tokens=output_tokens,
cached_tokens=cached_tokens,
model=getattr(response, "model", None) or self.model,
)
)
return events
@staticmethod
def _extract_message_text(message: Any) -> str:
"""Extract text from a provider message object across response shapes."""
if message is None:
return ""
content = getattr(message, "content", "")
if isinstance(content, str):
return content
if isinstance(content, list):
parts: list[str] = []
for block in content:
if isinstance(block, str):
parts.append(block)
elif isinstance(block, dict):
text = block.get("text") or block.get("content") or ""
if isinstance(text, str):
parts.append(text)
else:
text = getattr(block, "text", "") or getattr(block, "content", "")
if isinstance(text, str):
parts.append(text)
return "".join(parts)
return str(content or "")
def _coerce_tool_input(self, raw_arguments: Any, tool_name: str) -> dict[str, Any]:
"""Normalize raw tool-call arguments from either string or object forms."""
if isinstance(raw_arguments, dict):
return raw_arguments
if raw_arguments in (None, ""):
return {}
return self._parse_tool_call_arguments(str(raw_arguments), tool_name)
async def _recover_empty_codex_stream(
self,
kwargs: dict[str, Any],
last_role: str | None,
) -> list[StreamEvent] | None:
"""Try a non-stream completion when Codex returns an empty stream."""
if not self._codex_backend:
return None
async def _codex_acompletion(**fallback_kwargs: Any) -> Any:
with self._codex_responses_mode_override(fallback_kwargs.get("model")):
return await litellm.acompletion(**fallback_kwargs) # type: ignore[union-attr]
return await self._codex_adapter.recover_empty_stream(
kwargs,
last_role=last_role,
acompletion=_codex_acompletion,
)
def _merge_tool_call_chunk(
self,
tool_calls_acc: dict[int, dict[str, str]],
tc: Any,
last_tool_idx: int,
) -> int:
"""Merge a streamed tool-call chunk, compensating for broken Codex indexes."""
return self._codex_adapter.merge_tool_call_chunk(tool_calls_acc, tc, last_tool_idx)
async def stream(
self,
messages: list[dict[str, Any]],
@@ -1597,69 +1924,16 @@ class LiteLLMProvider(LLMProvider):
yield event
return
full_messages: list[dict[str, Any]] = []
if self._claude_code_oauth:
billing = _claude_code_billing_header(messages)
full_messages.append({"role": "system", "content": billing})
if system:
sys_msg: dict[str, Any] = {"role": "system", "content": system}
if _model_supports_cache_control(self.model):
sys_msg["cache_control"] = {"type": "ephemeral"}
full_messages.append(sys_msg)
full_messages.extend(messages)
# Codex Responses API requires an `instructions` field (system prompt).
# Inject a minimal one when callers don't provide a system message.
if self._codex_backend and not any(m["role"] == "system" for m in full_messages):
full_messages.insert(0, {"role": "system", "content": "You are a helpful assistant."})
# Add JSON mode via prompt engineering (works across all providers)
if json_mode:
json_instruction = "\n\nPlease respond with a valid JSON object."
if full_messages and full_messages[0]["role"] == "system":
full_messages[0]["content"] += json_instruction
else:
full_messages.insert(0, {"role": "system", "content": json_instruction.strip()})
# Remove ghost empty assistant messages (content="" and no tool_calls).
# These arise when a model returns an empty stream after a tool result
# (an "expected" no-op turn). Keeping them in history confuses some
# models (notably Codex/gpt-5.3) and causes cascading empty streams.
full_messages = [
m
for m in full_messages
if not (
m.get("role") == "assistant" and not m.get("content") and not m.get("tool_calls")
)
]
kwargs: dict[str, Any] = {
"model": self.model,
"messages": full_messages,
"max_tokens": max_tokens,
"stream": True,
**self.extra_kwargs,
}
# stream_options is OpenAI-specific; Anthropic rejects it with 400.
# Only include it for providers that support it.
if not self._is_anthropic_model():
kwargs["stream_options"] = {"include_usage": True}
if self.api_key:
kwargs["api_key"] = self.api_key
if self.api_base:
kwargs["api_base"] = self.api_base
if tools:
kwargs["tools"] = [self._tool_to_openai_format(t) for t in tools]
if _is_ollama_model(self.model):
# Ollama requires explicit tool_choice=auto for function calling
# so future readers don't have to guess.
kwargs.setdefault("tool_choice", "auto")
if response_format:
kwargs["response_format"] = response_format
# The Codex ChatGPT backend (Responses API) rejects several params.
if self._codex_backend:
kwargs.pop("max_tokens", None)
kwargs.pop("stream_options", None)
kwargs = self._build_completion_kwargs(
messages,
system,
tools=tools,
max_tokens=max_tokens,
response_format=response_format,
json_mode=json_mode,
stream=True,
)
full_messages = kwargs["messages"]
for attempt in range(RATE_LIMIT_MAX_RETRIES + 1):
# Post-stream events (ToolCall, TextEnd, Finish) are buffered
@@ -1674,7 +1948,8 @@ class LiteLLMProvider(LLMProvider):
stream_finish_reason: str | None = None
try:
response = await litellm.acompletion(**kwargs) # type: ignore[union-attr]
with self._codex_responses_mode_override(kwargs.get("model")):
response = await litellm.acompletion(**kwargs) # type: ignore[union-attr]
async for chunk in response:
# Capture usage from the trailing usage-only chunk that
@@ -1717,43 +1992,17 @@ class LiteLLMProvider(LLMProvider):
# argument deltas that arrive with id=None.
if delta and delta.tool_calls:
for tc in delta.tool_calls:
idx = tc.index if hasattr(tc, "index") and tc.index is not None else 0
if tc.id:
# New tool call announced (or done event re-sent).
# Check if this id already has a slot.
existing_idx = next(
(k for k, v in tool_calls_acc.items() if v["id"] == tc.id),
None,
)
if existing_idx is not None:
idx = existing_idx
elif idx in tool_calls_acc and tool_calls_acc[idx]["id"] not in (
"",
tc.id,
):
# Slot taken by a different call — assign new index
idx = max(tool_calls_acc.keys()) + 1
_last_tool_idx = idx
else:
# Argument delta with no id — route to last opened slot
idx = _last_tool_idx
if idx not in tool_calls_acc:
tool_calls_acc[idx] = {"id": "", "name": "", "arguments": ""}
if tc.id:
tool_calls_acc[idx]["id"] = tc.id
if tc.function:
if tc.function.name:
tool_calls_acc[idx]["name"] = tc.function.name
if tc.function.arguments:
tool_calls_acc[idx]["arguments"] += tc.function.arguments
_last_tool_idx = self._merge_tool_call_chunk(
tool_calls_acc,
tc,
_last_tool_idx,
)
# --- Finish ---
if choice.finish_reason:
stream_finish_reason = choice.finish_reason
for _idx, tc_data in sorted(tool_calls_acc.items()):
parsed_args = self._parse_tool_call_arguments(
parsed_args = self._coerce_tool_input(
tc_data.get("arguments", ""),
tc_data.get("name", ""),
)
@@ -1886,6 +2135,11 @@ class LiteLLMProvider(LLMProvider):
(m["role"] for m in reversed(full_messages) if m.get("role") != "system"),
None,
)
recovered_events = await self._recover_empty_codex_stream(kwargs, last_role)
if recovered_events:
for event in recovered_events:
yield event
return
if attempt < EMPTY_STREAM_MAX_RETRIES:
token_count, token_method = _estimate_tokens(
self.model,
+72 -33
View File
@@ -22,6 +22,7 @@ from framework.graph.edge import (
)
from framework.graph.executor import ExecutionResult
from framework.graph.node import NodeSpec
from framework.llm.codex_backend import CODEX_API_BASE, build_codex_litellm_kwargs
from framework.llm.provider import LLMProvider, Tool
from framework.runner.preload_validation import run_preload_validation
from framework.runner.tool_registry import ToolRegistry
@@ -327,17 +328,68 @@ def _read_codex_auth_file() -> dict | None:
return None
def _get_jwt_claims(token: str) -> dict | None:
"""Decode JWT claims without verification for local expiry/account inspection."""
import base64
try:
parts = token.split(".")
if len(parts) != 3:
return None
payload = parts[1]
padding = 4 - len(payload) % 4
if padding != 4:
payload += "=" * padding
decoded = base64.urlsafe_b64decode(payload)
claims = json.loads(decoded)
return claims if isinstance(claims, dict) else None
except Exception:
return None
def _get_codex_token_expiry(auth_data: dict) -> float | None:
"""Return the best-known expiry timestamp for a Codex access token."""
from datetime import datetime
tokens = auth_data.get("tokens", {})
access_token = tokens.get("access_token")
explicit = (
auth_data.get("expires_at")
or auth_data.get("expiresAt")
or tokens.get("expires_at")
or tokens.get("expiresAt")
)
if isinstance(explicit, (int, float)):
return float(explicit)
if isinstance(explicit, str):
try:
return datetime.fromisoformat(explicit.replace("Z", "+00:00")).timestamp()
except (ValueError, TypeError):
pass
if isinstance(access_token, str):
claims = _get_jwt_claims(access_token) or {}
exp = claims.get("exp")
if isinstance(exp, (int, float)):
return float(exp)
return None
def _is_codex_token_expired(auth_data: dict) -> bool:
"""Check whether the Codex token is expired or close to expiry.
The Codex auth.json has no explicit ``expiresAt`` field, so we infer
expiry as ``last_refresh + _CODEX_TOKEN_LIFETIME_SECS``. Falls back
to the file mtime when ``last_refresh`` is absent.
to JWT ``exp`` or file age heuristics when no explicit timestamp exists.
"""
import time
from datetime import datetime
now = time.time()
explicit_expiry = _get_codex_token_expiry(auth_data)
if explicit_expiry is not None:
return now >= (explicit_expiry - _TOKEN_REFRESH_BUFFER_SECS)
last_refresh = auth_data.get("last_refresh")
if last_refresh is None:
@@ -431,6 +483,8 @@ def get_codex_token() -> str | None:
Returns:
The access token if available, None otherwise.
"""
import time
# Try Keychain first, then file
auth_data = _read_codex_keychain() or _read_codex_auth_file()
if not auth_data:
@@ -441,15 +495,20 @@ def get_codex_token() -> str | None:
if not access_token:
return None
explicit_expiry = _get_codex_token_expiry(auth_data)
is_expired = _is_codex_token_expired(auth_data)
# Check if token is still valid
if not _is_codex_token_expired(auth_data):
if not is_expired:
return access_token
# Token is expired or near expiry — attempt refresh
refresh_token = tokens.get("refresh_token")
if not refresh_token:
logger.warning("Codex token expired and no refresh token available")
return access_token # Return expired token; it may still work briefly
if explicit_expiry is not None and time.time() >= explicit_expiry:
return None
return access_token
logger.info("Codex token expired or near expiry, refreshing...")
token_data = _refresh_codex_token(refresh_token)
@@ -460,6 +519,8 @@ def get_codex_token() -> str | None:
# Refresh failed — return the existing token and warn
logger.warning("Codex token refresh failed. Run 'codex' to re-authenticate.")
if explicit_expiry is not None and time.time() >= explicit_expiry:
return None
return access_token
@@ -471,26 +532,12 @@ def _get_account_id_from_jwt(access_token: str) -> str | None:
This is used as a fallback when the auth.json doesn't store the
account_id explicitly.
"""
import base64
try:
parts = access_token.split(".")
if len(parts) != 3:
return None
payload = parts[1]
# Add base64 padding
padding = 4 - len(payload) % 4
if padding != 4:
payload += "=" * padding
decoded = base64.urlsafe_b64decode(payload)
claims = json.loads(decoded)
auth = claims.get("https://api.openai.com/auth")
if isinstance(auth, dict):
account_id = auth.get("chatgpt_account_id")
if isinstance(account_id, str) and account_id:
return account_id
except Exception:
pass
claims = _get_jwt_claims(access_token) or {}
auth = claims.get("https://api.openai.com/auth")
if isinstance(auth, dict):
account_id = auth.get("chatgpt_account_id")
if isinstance(account_id, str) and account_id:
return account_id
return None
@@ -1569,20 +1616,12 @@ class AgentRunner:
# OpenAI Codex subscription routes through the ChatGPT backend
# (chatgpt.com/backend-api/codex/responses), NOT the standard
# OpenAI API. The consumer OAuth token lacks platform API scopes.
extra_headers: dict[str, str] = {
"Authorization": f"Bearer {api_key}",
"User-Agent": "CodexBar",
}
account_id = get_codex_account_id()
if account_id:
extra_headers["ChatGPT-Account-Id"] = account_id
self._llm = LiteLLMProvider(
model=self.model,
api_key=api_key,
api_base="https://chatgpt.com/backend-api/codex",
extra_headers=extra_headers,
store=False,
allowed_openai_params=["store"],
api_base=CODEX_API_BASE,
**build_codex_litellm_kwargs(api_key, account_id=account_id),
)
elif api_key and use_kimi_code:
# Kimi Code subscription uses the Kimi coding API (OpenAI-compatible).
+1 -1
View File
@@ -33,7 +33,7 @@ async def test_codex_stream():
print(f"extra_kwargs keys: {list(extra_kwargs.keys())}")
print(f"extra_headers: {list(extra_kwargs.get('extra_headers', {}).keys())}")
model = "openai/gpt-5.3-codex"
model = "openai/gpt-5.4"
# Create the provider
provider = LiteLLMProvider(
+1 -1
View File
@@ -33,7 +33,7 @@ async def main():
return
provider = LiteLLMProvider(
model="openai/gpt-5.3-codex",
model="openai/gpt-5.4",
api_key=api_key,
api_base=api_base,
**extra_kwargs,
+66 -2
View File
@@ -1,8 +1,15 @@
"""Tests for framework/config.py - Hive configuration loading."""
import logging
from unittest.mock import patch
from framework.config import get_api_base, get_hive_config, get_preferred_model
from framework.config import (
get_api_base,
get_hive_config,
get_llm_extra_kwargs,
get_preferred_model,
)
from framework.llm.codex_backend import CODEX_API_BASE, is_codex_api_base, normalize_codex_api_base
class TestGetHiveConfig:
@@ -59,9 +66,66 @@ class TestOpenRouterConfig:
def test_get_api_base_keeps_explicit_openrouter_api_base(self, tmp_path, monkeypatch):
config_file = tmp_path / "configuration.json"
config_file.write_text(
'{"llm":{"provider":"openrouter","model":"x-ai/grok-4.20-beta","api_base":"https://proxy.example/v1"}}',
(
'{"llm":{"provider":"openrouter","model":"x-ai/grok-4.20-beta",'
'"api_base":"https://proxy.example/v1"}}'
),
encoding="utf-8",
)
monkeypatch.setattr("framework.config.HIVE_CONFIG_FILE", config_file)
assert get_api_base() == "https://proxy.example/v1"
class TestCodexConfig:
"""Codex config helpers should share the same transport defaults."""
def test_get_api_base_uses_shared_codex_backend(self, tmp_path, monkeypatch):
config_file = tmp_path / "configuration.json"
config_file.write_text(
'{"llm":{"provider":"openai","model":"gpt-5.4","use_codex_subscription":true}}',
encoding="utf-8",
)
monkeypatch.setattr("framework.config.HIVE_CONFIG_FILE", config_file)
assert get_api_base() == CODEX_API_BASE
def test_get_llm_extra_kwargs_uses_shared_codex_transport(self, tmp_path, monkeypatch):
config_file = tmp_path / "configuration.json"
config_file.write_text(
'{"llm":{"provider":"openai","model":"gpt-5.4","use_codex_subscription":true}}',
encoding="utf-8",
)
monkeypatch.setattr("framework.config.HIVE_CONFIG_FILE", config_file)
with (
patch("framework.runner.runner.get_codex_token", return_value="tok_test"),
patch("framework.runner.runner.get_codex_account_id", return_value="acct_123"),
):
kwargs = get_llm_extra_kwargs()
assert kwargs["store"] is False
assert kwargs["allowed_openai_params"] == ["store"]
assert kwargs["extra_headers"] == {
"Authorization": "Bearer tok_test",
"User-Agent": "CodexBar",
"ChatGPT-Account-Id": "acct_123",
}
def test_codex_api_base_detection_requires_real_chatgpt_origin(self):
assert is_codex_api_base("https://chatgpt.com/backend-api/codex")
assert is_codex_api_base("https://chatgpt.com/backend-api/codex/responses")
assert not is_codex_api_base("http://chatgpt.com/backend-api/codex")
assert not is_codex_api_base(
"https://proxy.example/v1?target=https://chatgpt.com/backend-api/codex"
)
def test_normalize_codex_api_base_strips_only_real_responses_suffix(self):
assert (
normalize_codex_api_base("https://chatgpt.com/backend-api/codex/responses")
== CODEX_API_BASE
)
assert (
normalize_codex_api_base("https://proxy.example/v1/responses")
== "https://proxy.example/v1/responses"
)
+323
View File
@@ -241,6 +241,60 @@ class TestToolConversion:
with pytest.raises(ValueError, match="Failed to parse tool call arguments"):
provider._parse_tool_call_arguments('{"question": foo', "ask_user")
def test_parse_tool_call_arguments_recovers_pythonish_payloads(self):
"""Single-quoted and trailing-comma argument payloads should be recovered."""
provider = LiteLLMProvider(
model="openai/gpt-5.4",
api_key="test-key",
api_base="https://chatgpt.com/backend-api/codex",
)
parsed = provider._parse_tool_call_arguments(
"{'question': 'Continue?', 'options': ['Yes', 'No'],}",
"ask_user",
)
assert parsed == {
"question": "Continue?",
"options": ["Yes", "No"],
}
def test_parse_tool_call_arguments_keeps_null_inside_strings(self):
"""Literal normalization should not mutate quoted text values."""
provider = LiteLLMProvider(
model="openai/gpt-5.4",
api_key="test-key",
api_base="https://chatgpt.com/backend-api/codex",
)
parsed = provider._parse_tool_call_arguments(
"{'hypothesis': 'null hypothesis', 'approved': false}",
"summarize",
)
assert parsed == {
"hypothesis": "null hypothesis",
"approved": False,
}
def test_parse_tool_call_arguments_strips_json_code_fences(self):
"""Fence stripping should remove the language tag before JSON parsing."""
provider = LiteLLMProvider(
model="openai/gpt-5.4",
api_key="test-key",
api_base="https://chatgpt.com/backend-api/codex",
)
parsed = provider._parse_tool_call_arguments(
'```json\n{"question":"Continue?","options":["Yes","No"]}\n```',
"ask_user",
)
assert parsed == {
"question": "Continue?",
"options": ["Yes", "No"],
}
class TestAnthropicProviderBackwardCompatibility:
"""Test AnthropicProvider backward compatibility with LiteLLM backend."""
@@ -731,6 +785,275 @@ class TestMiniMaxStreamFallback:
assert not LiteLLMProvider(model="gpt-4o-mini", api_key="x")._is_minimax_model()
class TestCodexEmptyStreamRecovery:
"""Codex empty streams should fall back before surfacing ghost-stream retries."""
@pytest.mark.asyncio
@patch("litellm.acompletion")
async def test_stream_recovers_empty_codex_stream_via_nonstream_completion(
self,
mock_acompletion,
):
"""An empty Codex stream should be salvaged with a non-stream completion."""
from framework.llm.stream_events import FinishEvent, TextDeltaEvent
provider = LiteLLMProvider(
model="openai/gpt-5.4",
api_key="test-key",
api_base="https://chatgpt.com/backend-api/codex",
)
class EmptyStreamResponse:
chunks: list = []
def __aiter__(self):
return self
async def __anext__(self):
raise StopAsyncIteration
recovered = MagicMock()
recovered.choices = [MagicMock()]
recovered.choices[0].message.content = "Recovered via fallback"
recovered.choices[0].message.tool_calls = []
recovered.choices[0].finish_reason = "stop"
recovered.model = provider.model
recovered.usage.prompt_tokens = 12
recovered.usage.completion_tokens = 4
async def side_effect(*args, **kwargs):
if kwargs.get("stream"):
return EmptyStreamResponse()
return recovered
mock_acompletion.side_effect = side_effect
events = []
async for event in provider.stream(messages=[{"role": "user", "content": "hi"}]):
events.append(event)
text_events = [event for event in events if isinstance(event, TextDeltaEvent)]
assert len(text_events) == 1
assert text_events[0].snapshot == "Recovered via fallback"
finish_events = [event for event in events if isinstance(event, FinishEvent)]
assert len(finish_events) == 1
assert finish_events[0].stop_reason == "stop"
assert finish_events[0].input_tokens == 12
assert finish_events[0].output_tokens == 4
assert mock_acompletion.call_count == 2
assert mock_acompletion.call_args_list[0].kwargs["stream"] is True
assert "stream" not in mock_acompletion.call_args_list[1].kwargs
@pytest.mark.asyncio
@patch("litellm.acompletion")
async def test_stream_recovers_empty_codex_stream_with_tool_calls(
self,
mock_acompletion,
):
"""Non-stream fallback should preserve tool calls, not just text."""
from framework.llm.stream_events import FinishEvent, ToolCallEvent
provider = LiteLLMProvider(
model="openai/gpt-5.4",
api_key="test-key",
api_base="https://chatgpt.com/backend-api/codex/responses",
)
class EmptyStreamResponse:
chunks: list = []
def __aiter__(self):
return self
async def __anext__(self):
raise StopAsyncIteration
tc = MagicMock()
tc.id = "tool_1"
tc.function.name = "ask_user"
tc.function.arguments = '{"question":"Continue?","options":["Yes","No"]}'
recovered = MagicMock()
recovered.choices = [MagicMock()]
recovered.choices[0].message.content = ""
recovered.choices[0].message.tool_calls = [tc]
recovered.choices[0].finish_reason = "tool_calls"
recovered.model = provider.model
recovered.usage.prompt_tokens = 14
recovered.usage.completion_tokens = 5
async def side_effect(*args, **kwargs):
if kwargs.get("stream"):
return EmptyStreamResponse()
return recovered
mock_acompletion.side_effect = side_effect
events = []
async for event in provider.stream(
messages=[{"role": "user", "content": "Should we continue?"}],
tools=[
Tool(
name="ask_user",
description="Ask the user",
parameters={"properties": {"question": {"type": "string"}}},
)
],
):
events.append(event)
tool_events = [event for event in events if isinstance(event, ToolCallEvent)]
assert len(tool_events) == 1
assert tool_events[0].tool_name == "ask_user"
assert tool_events[0].tool_input == {
"question": "Continue?",
"options": ["Yes", "No"],
}
finish_events = [event for event in events if isinstance(event, FinishEvent)]
assert len(finish_events) == 1
assert finish_events[0].stop_reason == "tool_calls"
class TestCodexRequestHardening:
def test_codex_backend_forces_responses_mode_per_request(self):
"""Codex backend should scope the Responses override to the active request."""
import litellm
original = litellm.model_cost.get("gpt-5.4")
litellm.model_cost["gpt-5.4"] = {"mode": "chat"}
try:
provider = LiteLLMProvider(
model="openai/gpt-5.4",
api_key="test-key",
api_base="https://chatgpt.com/backend-api/codex",
)
assert provider._codex_backend is True
async def _fake_acompletion(**kwargs):
assert kwargs["model"] == "openai/gpt-5.4"
assert litellm.model_cost["gpt-5.4"]["mode"] == "responses"
response = MagicMock()
response.choices = [MagicMock()]
response.choices[0].message.content = "ok"
response.choices[0].message.tool_calls = []
response.choices[0].finish_reason = "stop"
return response
with patch("litellm.acompletion", new=_fake_acompletion):
response = asyncio.run(
provider._acompletion_with_rate_limit_retry(
model=provider.model,
messages=[{"role": "user", "content": "hi"}],
)
)
assert response.choices[0].message.content == "ok"
assert litellm.model_cost["gpt-5.4"]["mode"] == "chat"
finally:
if original is None:
litellm.model_cost.pop("gpt-5.4", None)
else:
litellm.model_cost["gpt-5.4"] = original
def test_codex_build_completion_kwargs_splits_prompt_and_forces_tool_choice(self):
"""Codex requests should chunk large system prompts and require tools when needed."""
provider = LiteLLMProvider(
model="openai/gpt-5.4",
api_key="test-key",
api_base="https://chatgpt.com/backend-api/codex/responses",
store=True,
)
kwargs = provider._build_completion_kwargs(
messages=[{"role": "user", "content": "hi"}],
system="# Identity\n" + ("rule\n" * 2000),
tools=[
Tool(
name="ask_user",
description="Ask the user",
parameters={"properties": {"question": {"type": "string"}}},
)
],
max_tokens=256,
response_format=None,
json_mode=False,
stream=True,
)
system_messages = [m for m in kwargs["messages"] if m["role"] == "system"]
assert len(system_messages) >= 2
assert system_messages[0]["content"].startswith("# Codex Execution Contract")
assert kwargs["tool_choice"] == "required"
assert kwargs["store"] is False
assert "max_tokens" not in kwargs
assert "stream_options" not in kwargs
assert kwargs["api_base"] == "https://chatgpt.com/backend-api/codex"
assert "store" in kwargs["allowed_openai_params"]
def test_codex_build_system_messages_handles_whitespace_only_prompt(self):
"""Whitespace-only system prompts should fall back to the default prompt."""
provider = LiteLLMProvider(
model="openai/gpt-5.4",
api_key="test-key",
api_base="https://chatgpt.com/backend-api/codex",
)
messages = provider._codex_adapter.build_system_messages("\n \n", json_mode=False)
assert messages == [{"role": "system", "content": "You are a helpful assistant."}]
def test_codex_merge_tool_call_chunk_handles_parallel_calls_with_broken_indexes(self):
"""Codex chunk merging should survive index=0 for multiple parallel tool calls."""
from types import SimpleNamespace
provider = LiteLLMProvider(
model="openai/gpt-5.4",
api_key="test-key",
api_base="https://chatgpt.com/backend-api/codex",
)
acc: dict[int, dict[str, str]] = {}
last_idx = 0
chunks = [
SimpleNamespace(
id="tool_1",
index=0,
function=SimpleNamespace(name="web_search", arguments='{"query":"alpha'),
),
SimpleNamespace(
id="tool_2",
index=0,
function=SimpleNamespace(name="read_file", arguments='{"path":"beta'),
),
SimpleNamespace(
id=None,
index=0,
function=SimpleNamespace(name=None, arguments='"}'),
),
SimpleNamespace(
id=None,
index=0,
function=SimpleNamespace(name=None, arguments='"}'),
),
]
for chunk in chunks:
last_idx = provider._merge_tool_call_chunk(acc, chunk, last_idx)
assert len(acc) == 2
parsed = [
provider._parse_tool_call_arguments(slot["arguments"], slot["name"])
for _, slot in sorted(acc.items())
]
assert parsed == [
{"query": "alpha"},
{"path": "beta"},
]
class TestOpenRouterToolCompatFallback:
"""OpenRouter models should fall back when native tool use is unavailable."""
+7
View File
@@ -5,8 +5,11 @@ One test per strict check — happy path plus each individual failure mode.
from __future__ import annotations
import sys
from pathlib import Path
import pytest
from framework.skills.validator import validate_strict
@@ -285,6 +288,10 @@ class TestCheck11Scripts:
assert result.passed is False
assert any("executable" in e.lower() for e in result.errors)
@pytest.mark.skipif(
sys.platform == "win32",
reason="POSIX execute-bit semantics do not hold on Windows",
)
def test_passes_with_executable_script(self, tmp_path):
path = _write_skill(tmp_path, _VALID_CONTENT)
scripts_dir = path.parent / "scripts"
+1 -1
View File
@@ -331,7 +331,7 @@ def _make_codex_provider():
if not api_key or not api_base:
return None
return LiteLLMProvider(
model="openai/gpt-5.3-codex",
model="openai/gpt-5.4",
api_key=api_key,
api_base=api_base,
**extra_kwargs,
+3 -3
View File
@@ -1279,9 +1279,9 @@ switch ($num) {
if ($CodexCredDetected) {
$SubscriptionMode = "codex"
$SelectedProviderId = "openai"
$SelectedModel = "gpt-5.3-codex"
$SelectedMaxTokens = 16384
$SelectedMaxContextTokens = 120000
$SelectedModel = "gpt-5.4"
$SelectedMaxTokens = 128000
$SelectedMaxContextTokens = 900000
Write-Host ""
Write-Ok "Using OpenAI Codex subscription"
}
+3 -3
View File
@@ -1306,9 +1306,9 @@ case $choice in
if [ "$CODEX_CRED_DETECTED" = true ]; then
SUBSCRIPTION_MODE="codex"
SELECTED_PROVIDER_ID="openai"
SELECTED_MODEL="gpt-5.3-codex"
SELECTED_MAX_TOKENS=16384
SELECTED_MAX_CONTEXT_TOKENS=120000 # GPT Codex — 128k context window
SELECTED_MODEL="gpt-5.4"
SELECTED_MAX_TOKENS=128000
SELECTED_MAX_CONTEXT_TOKENS=900000 # GPT-5.4 — 1.05M context window
echo ""
echo -e "${GREEN}${NC} Using OpenAI Codex subscription"
fi
+3 -3
View File
@@ -561,9 +561,9 @@ switch ($num) {
if ($CodexCredDetected) {
$SubscriptionMode = "codex"
$SelectedProviderId = "openai"
$SelectedModel = "gpt-5.3-codex"
$SelectedMaxTokens = 16384
$SelectedMaxContextTokens = 120000
$SelectedModel = "gpt-5.4"
$SelectedMaxTokens = 128000
$SelectedMaxContextTokens = 900000
Write-Host ""
Write-Ok "Using OpenAI Codex subscription"
}
+3 -3
View File
@@ -870,9 +870,9 @@ case $choice in
if [ "$CODEX_CRED_DETECTED" = true ]; then
SUBSCRIPTION_MODE="codex"
SELECTED_PROVIDER_ID="openai"
SELECTED_MODEL="gpt-5.3-codex"
SELECTED_MAX_TOKENS=16384
SELECTED_MAX_CONTEXT_TOKENS=120000 # GPT Codex — 128k context window
SELECTED_MODEL="gpt-5.4"
SELECTED_MAX_TOKENS=128000
SELECTED_MAX_CONTEXT_TOKENS=900000 # GPT-5.4 — 1.05M context window
echo ""
echo -e "${GREEN}${NC} Using OpenAI Codex subscription"
fi