fix(openrouter): stabilize quickstart and tool execution

- add cross-platform OpenRouter quickstart setup, config fallbacks, and key validation\n- harden LiteLLM/OpenRouter tool execution, duplicate question handling, and worker loading UX\n- add backend and frontend regression coverage for OpenRouter flows
This commit is contained in:
Sundaram Kumar Jha
2026-03-14 20:48:58 +05:30
parent ff01c1fd99
commit 9202cbd4d4
20 changed files with 1685 additions and 151 deletions
+511 -9
View File
@@ -7,9 +7,12 @@ Groq, and local models.
See: https://docs.litellm.ai/docs/providers
"""
import ast
import asyncio
import json
import logging
import os
import re
import time
from collections.abc import AsyncIterator
from datetime import datetime
@@ -113,11 +116,17 @@ def _patch_litellm_metadata_nonetype() -> None:
if litellm is not None:
_patch_litellm_anthropic_oauth()
_patch_litellm_metadata_nonetype()
litellm.suppress_debug_info = True
if not os.environ.get("LITELLM_LOG"):
logging.getLogger("LiteLLM").setLevel(logging.WARNING)
logging.getLogger("LiteLLM Router").setLevel(logging.WARNING)
logging.getLogger("LiteLLM Proxy").setLevel(logging.WARNING)
RATE_LIMIT_MAX_RETRIES = 10
RATE_LIMIT_BACKOFF_BASE = 2 # seconds
RATE_LIMIT_MAX_DELAY = 120 # seconds - cap to prevent absurd waits
MINIMAX_API_BASE = "https://api.minimax.io/v1"
OPENROUTER_API_BASE = "https://openrouter.ai/api/v1"
# Providers that accept cache_control on message content blocks.
# Anthropic: native ephemeral caching. MiniMax & Z-AI/GLM: pass-through to their APIs.
@@ -146,6 +155,16 @@ KIMI_API_BASE = "https://api.kimi.com/coding"
# Conversation-structure issues are deterministic — long waits don't help.
EMPTY_STREAM_MAX_RETRIES = 3
EMPTY_STREAM_RETRY_DELAY = 1.0 # seconds
OPENROUTER_TOOL_COMPAT_ERROR_SNIPPETS = (
"no endpoints found that support tool use",
"no endpoints available that support tool use",
"provider routing",
)
OPENROUTER_TOOL_CALL_RE = re.compile(
r"<\|tool_call_start\|>\s*(.*?)\s*<\|tool_call_end\|>",
re.DOTALL,
)
OPENROUTER_TOOL_COMPAT_MODEL_CACHE: set[str] = set()
# Directory for dumping failed requests
FAILED_REQUESTS_DIR = Path.home() / ".hive" / "failed_requests"
@@ -385,6 +404,8 @@ class LiteLLMProvider(LLMProvider):
model_lower = model.lower()
if model_lower.startswith("minimax/") or model_lower.startswith("minimax-"):
return MINIMAX_API_BASE
if model_lower.startswith("openrouter/"):
return OPENROUTER_API_BASE
if model_lower.startswith("kimi/"):
return KIMI_API_BASE
return None
@@ -791,6 +812,464 @@ class LiteLLMProvider(LLMProvider):
model = (self.model or "").lower()
return model.startswith("minimax/") or model.startswith("minimax-")
def _is_openrouter_model(self) -> bool:
"""Return True when the configured model targets OpenRouter."""
model = (self.model or "").lower()
if model.startswith("openrouter/"):
return True
api_base = (self.api_base or "").lower()
return "openrouter.ai/api/v1" in api_base
def _should_use_openrouter_tool_compat(
self,
error: BaseException,
tools: list[Tool] | None,
) -> bool:
"""Return True when OpenRouter rejects native tool use for the model."""
if not tools or not self._is_openrouter_model():
return False
error_text = str(error).lower()
return "openrouter" in error_text and any(
snippet in error_text for snippet in OPENROUTER_TOOL_COMPAT_ERROR_SNIPPETS
)
@staticmethod
def _extract_json_object(text: str) -> dict[str, Any] | None:
"""Extract the first JSON object from a model response."""
candidates = [text.strip()]
stripped = text.strip()
if stripped.startswith("```"):
fence_lines = stripped.splitlines()
if len(fence_lines) >= 3:
candidates.append("\n".join(fence_lines[1:-1]).strip())
decoder = json.JSONDecoder()
for candidate in candidates:
if not candidate:
continue
try:
parsed = json.loads(candidate)
except json.JSONDecodeError:
parsed = None
if isinstance(parsed, dict):
return parsed
for start_idx, char in enumerate(candidate):
if char != "{":
continue
try:
parsed, _ = decoder.raw_decode(candidate[start_idx:])
except json.JSONDecodeError:
continue
if isinstance(parsed, dict):
return parsed
return None
def _parse_openrouter_tool_compat_response(
self,
content: str,
tools: list[Tool],
) -> tuple[str, list[dict[str, Any]]]:
"""Parse JSON tool-compat output into assistant text and tool calls."""
payload = self._extract_json_object(content)
if payload is None:
text_tool_content, text_tool_calls = self._parse_openrouter_text_tool_calls(
content,
tools,
)
if text_tool_calls:
logger.info(
"[openrouter-tool-compat] Parsed textual tool-call markers for %s",
self.model,
)
return text_tool_content, text_tool_calls
logger.info(
"[openrouter-tool-compat] %s returned non-JSON fallback content; "
"treating it as plain text.",
self.model,
)
return content.strip(), []
assistant_text = payload.get("assistant_response")
if not isinstance(assistant_text, str):
assistant_text = payload.get("content")
if not isinstance(assistant_text, str):
assistant_text = payload.get("response")
if not isinstance(assistant_text, str):
assistant_text = ""
tool_calls_raw = payload.get("tool_calls")
if not tool_calls_raw and {"name", "arguments"} <= payload.keys():
tool_calls_raw = [payload]
elif isinstance(payload.get("tool_call"), dict):
tool_calls_raw = [payload["tool_call"]]
if not isinstance(tool_calls_raw, list):
tool_calls_raw = []
allowed_tool_names = {tool.name for tool in tools}
tool_calls: list[dict[str, Any]] = []
compat_prefix = f"openrouter_compat_{time.time_ns()}"
for idx, raw_call in enumerate(tool_calls_raw):
if not isinstance(raw_call, dict):
continue
function_block = raw_call.get("function")
function_name = (
raw_call.get("name")
or raw_call.get("tool_name")
or (function_block.get("name") if isinstance(function_block, dict) else None)
)
if not isinstance(function_name, str) or function_name not in allowed_tool_names:
if function_name:
logger.warning(
"[openrouter-tool-compat] Ignoring unknown tool '%s' for model %s",
function_name,
self.model,
)
continue
arguments = raw_call.get("arguments")
if arguments is None:
arguments = raw_call.get("tool_input")
if arguments is None:
arguments = raw_call.get("input")
if arguments is None and isinstance(function_block, dict):
arguments = function_block.get("arguments")
if arguments is None:
arguments = {}
if isinstance(arguments, str):
try:
arguments = json.loads(arguments)
except json.JSONDecodeError:
arguments = {"_raw": arguments}
elif not isinstance(arguments, dict):
arguments = {"value": arguments}
tool_calls.append(
{
"id": f"{compat_prefix}_{idx}",
"name": function_name,
"input": arguments,
}
)
return assistant_text.strip(), tool_calls
@staticmethod
def _close_truncated_json_fragment(fragment: str) -> str:
"""Close a truncated JSON fragment by balancing quotes/brackets."""
stack: list[str] = []
in_string = False
escaped = False
normalized = fragment.rstrip()
while normalized and normalized[-1] in ",:{[":
normalized = normalized[:-1].rstrip()
for char in normalized:
if in_string:
if escaped:
escaped = False
elif char == "\\":
escaped = True
elif char == '"':
in_string = False
continue
if char == '"':
in_string = True
elif char in "{[":
stack.append(char)
elif char == "}" and stack and stack[-1] == "{":
stack.pop()
elif char == "]" and stack and stack[-1] == "[":
stack.pop()
if in_string:
if escaped:
normalized = normalized[:-1]
normalized += '"'
for opener in reversed(stack):
normalized += "}" if opener == "{" else "]"
return normalized
def _repair_truncated_tool_arguments(self, raw_arguments: str) -> dict[str, Any] | None:
"""Try to recover a truncated JSON object from tool-call arguments."""
stripped = raw_arguments.strip()
if not stripped or stripped[0] != "{":
return None
max_trim = min(len(stripped), 256)
for trim in range(max_trim + 1):
candidate = stripped[: len(stripped) - trim].rstrip()
if not candidate:
break
candidate = self._close_truncated_json_fragment(candidate)
try:
parsed = json.loads(candidate)
except json.JSONDecodeError:
continue
if isinstance(parsed, dict):
return parsed
return None
def _parse_tool_call_arguments(self, raw_arguments: str, tool_name: str) -> dict[str, Any]:
"""Parse streamed tool arguments, repairing truncation when possible."""
try:
parsed = json.loads(raw_arguments) if raw_arguments else {}
except json.JSONDecodeError:
parsed = None
if isinstance(parsed, dict):
return parsed
repaired = self._repair_truncated_tool_arguments(raw_arguments)
if repaired is not None:
logger.warning(
"[tool-args] Recovered truncated arguments for %s on %s",
tool_name,
self.model,
)
return repaired
raise ValueError(
f"Failed to parse tool call arguments for '{tool_name}' "
"(likely truncated JSON)."
)
def _parse_openrouter_text_tool_calls(
self,
content: str,
tools: list[Tool],
) -> tuple[str, list[dict[str, Any]]]:
"""Parse textual OpenRouter tool-call markers into synthetic tool calls."""
tools_by_name = {tool.name: tool for tool in tools}
compat_prefix = f"openrouter_compat_{time.time_ns()}"
tool_calls: list[dict[str, Any]] = []
for block_index, match in enumerate(OPENROUTER_TOOL_CALL_RE.finditer(content)):
tool_calls.extend(
self._parse_openrouter_text_tool_call_block(
block=match.group(1),
tools_by_name=tools_by_name,
compat_prefix=f"{compat_prefix}_{block_index}",
)
)
stripped_text = OPENROUTER_TOOL_CALL_RE.sub("", content).strip()
return stripped_text, tool_calls
def _parse_openrouter_text_tool_call_block(
self,
block: str,
tools_by_name: dict[str, Tool],
compat_prefix: str,
) -> list[dict[str, Any]]:
"""Parse a single textual tool-call block like [tool(arg='x')]."""
try:
parsed = ast.parse(block.strip(), mode="eval").body
except SyntaxError:
return []
call_nodes = parsed.elts if isinstance(parsed, ast.List) else [parsed]
tool_calls: list[dict[str, Any]] = []
for call_index, call_node in enumerate(call_nodes):
if not isinstance(call_node, ast.Call) or not isinstance(call_node.func, ast.Name):
continue
tool_name = call_node.func.id
tool = tools_by_name.get(tool_name)
if tool is None:
continue
try:
tool_input = self._parse_openrouter_text_tool_call_arguments(
call_node=call_node,
tool=tool,
)
except (ValueError, SyntaxError):
continue
tool_calls.append(
{
"id": f"{compat_prefix}_{call_index}",
"name": tool_name,
"input": tool_input,
}
)
return tool_calls
@staticmethod
def _parse_openrouter_text_tool_call_arguments(
call_node: ast.Call,
tool: Tool,
) -> dict[str, Any]:
"""Parse positional/keyword args from a textual tool call."""
properties = tool.parameters.get("properties", {})
positional_keys = list(properties.keys())
tool_input: dict[str, Any] = {}
if len(call_node.args) > len(positional_keys):
raise ValueError("Too many positional args for textual tool call")
for idx, arg_node in enumerate(call_node.args):
tool_input[positional_keys[idx]] = ast.literal_eval(arg_node)
for kwarg in call_node.keywords:
if kwarg.arg is None:
raise ValueError("Star args are not supported in textual tool calls")
tool_input[kwarg.arg] = ast.literal_eval(kwarg.value)
return tool_input
def _build_openrouter_tool_compat_messages(
self,
messages: list[dict[str, Any]],
system: str,
tools: list[Tool],
) -> list[dict[str, Any]]:
"""Build a JSON-only prompt for models without native tool support."""
tool_specs = [
{
"name": tool.name,
"description": tool.description,
"parameters": tool.parameters,
}
for tool in tools
]
compat_instruction = (
"Tool compatibility mode is active because this OpenRouter model does not support "
"native function calling on the routed provider.\n"
"Return exactly one JSON object and nothing else.\n"
'Schema: {"assistant_response": string, '
'"tool_calls": [{"name": string, "arguments": object}]}\n'
"Rules:\n"
"- If a tool is required, put one or more entries in tool_calls "
"and do not invent tool results.\n"
"- If no tool is required, set tool_calls to [] and put the full "
"answer in assistant_response.\n"
"- Only use tool names from the allowed tool list.\n"
"- arguments must always be valid JSON objects.\n"
f"Allowed tools:\n{json.dumps(tool_specs, ensure_ascii=True)}"
)
compat_system = compat_instruction if not system else f"{system}\n\n{compat_instruction}"
full_messages: list[dict[str, Any]] = [{"role": "system", "content": compat_system}]
full_messages.extend(messages)
return [
message
for message in full_messages
if not (
message.get("role") == "assistant"
and not message.get("content")
and not message.get("tool_calls")
)
]
async def _acomplete_via_openrouter_tool_compat(
self,
messages: list[dict[str, Any]],
system: str,
tools: list[Tool],
max_tokens: int,
) -> LLMResponse:
"""Emulate tool calling via JSON when OpenRouter rejects native tools."""
full_messages = self._build_openrouter_tool_compat_messages(messages, system, tools)
kwargs: dict[str, Any] = {
"model": self.model,
"messages": full_messages,
"max_tokens": max_tokens,
**self.extra_kwargs,
}
if self.api_key:
kwargs["api_key"] = self.api_key
if self.api_base:
kwargs["api_base"] = self.api_base
response = await self._acompletion_with_rate_limit_retry(**kwargs)
raw_content = response.choices[0].message.content or ""
assistant_text, tool_calls = self._parse_openrouter_tool_compat_response(
raw_content,
tools,
)
usage = response.usage
input_tokens = usage.prompt_tokens if usage else 0
output_tokens = usage.completion_tokens if usage else 0
stop_reason = "tool_calls" if tool_calls else (response.choices[0].finish_reason or "stop")
return LLMResponse(
content=assistant_text,
model=response.model or self.model,
input_tokens=input_tokens,
output_tokens=output_tokens,
stop_reason=stop_reason,
raw_response={
"compat_mode": "openrouter_tool_emulation",
"tool_calls": tool_calls,
"response": response,
},
)
async def _stream_via_openrouter_tool_compat(
self,
messages: list[dict[str, Any]],
system: str,
tools: list[Tool],
max_tokens: int,
) -> AsyncIterator[StreamEvent]:
"""Fallback stream for OpenRouter models without native tool support."""
from framework.llm.stream_events import (
FinishEvent,
StreamErrorEvent,
TextDeltaEvent,
TextEndEvent,
ToolCallEvent,
)
logger.info(
"[openrouter-tool-compat] Using compatibility mode for %s",
self.model,
)
try:
response = await self._acomplete_via_openrouter_tool_compat(
messages=messages,
system=system,
tools=tools,
max_tokens=max_tokens,
)
except Exception as e:
yield StreamErrorEvent(error=str(e), recoverable=False)
return
raw_response = response.raw_response if isinstance(response.raw_response, dict) else {}
tool_calls = raw_response.get("tool_calls", [])
if response.content:
yield TextDeltaEvent(content=response.content, snapshot=response.content)
yield TextEndEvent(full_text=response.content)
for tool_call in tool_calls:
yield ToolCallEvent(
tool_use_id=tool_call["id"],
tool_name=tool_call["name"],
tool_input=tool_call["input"],
)
yield FinishEvent(
stop_reason=response.stop_reason,
input_tokens=response.input_tokens,
output_tokens=response.output_tokens,
model=response.model,
)
async def _stream_via_nonstream_completion(
self,
messages: list[dict[str, Any]],
@@ -834,12 +1313,11 @@ class LiteLLMProvider(LLMProvider):
tool_calls = msg.tool_calls or []
for tc in tool_calls:
parsed_args: Any
args = tc.function.arguments if tc.function else ""
try:
parsed_args = json.loads(args) if args else {}
except json.JSONDecodeError:
parsed_args = {"_raw": args}
parsed_args = self._parse_tool_call_arguments(
args,
tc.function.name if tc.function else "",
)
yield ToolCallEvent(
tool_use_id=getattr(tc, "id", ""),
tool_name=tc.function.name if tc.function else "",
@@ -898,6 +1376,20 @@ class LiteLLMProvider(LLMProvider):
yield event
return
if (
tools
and self._is_openrouter_model()
and self.model in OPENROUTER_TOOL_COMPAT_MODEL_CACHE
):
async for event in self._stream_via_openrouter_tool_compat(
messages=messages,
system=system,
tools=tools,
max_tokens=max_tokens,
):
yield event
return
full_messages: list[dict[str, Any]] = []
if system:
sys_msg: dict[str, Any] = {"role": "system", "content": system}
@@ -1044,10 +1536,10 @@ class LiteLLMProvider(LLMProvider):
if choice.finish_reason:
stream_finish_reason = choice.finish_reason
for _idx, tc_data in sorted(tool_calls_acc.items()):
try:
parsed_args = json.loads(tc_data["arguments"])
except (json.JSONDecodeError, KeyError):
parsed_args = {"_raw": tc_data.get("arguments", "")}
parsed_args = self._parse_tool_call_arguments(
tc_data.get("arguments", ""),
tc_data.get("name", ""),
)
tail_events.append(
ToolCallEvent(
tool_use_id=tc_data["id"],
@@ -1228,6 +1720,16 @@ class LiteLLMProvider(LLMProvider):
return
except Exception as e:
if self._should_use_openrouter_tool_compat(e, tools):
OPENROUTER_TOOL_COMPAT_MODEL_CACHE.add(self.model)
async for event in self._stream_via_openrouter_tool_compat(
messages=messages,
system=system,
tools=tools or [],
max_tokens=max_tokens,
):
yield event
return
if _is_stream_transient_error(e) and attempt < RATE_LIMIT_MAX_RETRIES:
wait = _compute_retry_delay(attempt, exception=e)
logger.warning(