fix: common transient errors and loop detection
This commit is contained in:
@@ -30,6 +30,7 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
RATE_LIMIT_MAX_RETRIES = 10
|
||||
RATE_LIMIT_BACKOFF_BASE = 2 # seconds
|
||||
RATE_LIMIT_MAX_DELAY = 120 # seconds - cap to prevent absurd waits
|
||||
|
||||
# Directory for dumping failed requests
|
||||
FAILED_REQUESTS_DIR = Path.home() / ".hive" / "failed_requests"
|
||||
@@ -84,6 +85,91 @@ def _dump_failed_request(
|
||||
return str(filepath)
|
||||
|
||||
|
||||
def _compute_retry_delay(
|
||||
attempt: int,
|
||||
exception: BaseException | None = None,
|
||||
backoff_base: int = RATE_LIMIT_BACKOFF_BASE,
|
||||
max_delay: int = RATE_LIMIT_MAX_DELAY,
|
||||
) -> float:
|
||||
"""Compute retry delay, preferring server-provided Retry-After headers.
|
||||
|
||||
Priority:
|
||||
1. retry-after-ms header (milliseconds, float)
|
||||
2. retry-after header as seconds (float)
|
||||
3. retry-after header as HTTP-date (RFC 7231)
|
||||
4. Exponential backoff: backoff_base * 2^attempt
|
||||
|
||||
All values are capped at max_delay seconds.
|
||||
"""
|
||||
if exception is not None:
|
||||
response = getattr(exception, "response", None)
|
||||
if response is not None:
|
||||
headers = getattr(response, "headers", None)
|
||||
if headers is not None:
|
||||
# Priority 1: retry-after-ms (milliseconds)
|
||||
retry_after_ms = headers.get("retry-after-ms")
|
||||
if retry_after_ms is not None:
|
||||
try:
|
||||
delay = float(retry_after_ms) / 1000.0
|
||||
return min(max(delay, 0), max_delay)
|
||||
except (ValueError, TypeError):
|
||||
pass
|
||||
|
||||
# Priority 2: retry-after (seconds or HTTP-date)
|
||||
retry_after = headers.get("retry-after")
|
||||
if retry_after is not None:
|
||||
# Try as seconds (float)
|
||||
try:
|
||||
delay = float(retry_after)
|
||||
return min(max(delay, 0), max_delay)
|
||||
except (ValueError, TypeError):
|
||||
pass
|
||||
|
||||
# Try as HTTP-date (e.g., "Fri, 31 Dec 2025 23:59:59 GMT")
|
||||
try:
|
||||
from email.utils import parsedate_to_datetime
|
||||
|
||||
retry_date = parsedate_to_datetime(retry_after)
|
||||
now = datetime.now(retry_date.tzinfo)
|
||||
delay = (retry_date - now).total_seconds()
|
||||
return min(max(delay, 0), max_delay)
|
||||
except (ValueError, TypeError, OverflowError):
|
||||
pass
|
||||
|
||||
# Fallback: exponential backoff
|
||||
delay = backoff_base * (2**attempt)
|
||||
return min(delay, max_delay)
|
||||
|
||||
|
||||
def _is_stream_transient_error(exc: BaseException) -> bool:
|
||||
"""Classify whether a streaming exception is transient (recoverable).
|
||||
|
||||
Transient errors (recoverable=True): network issues, server errors, timeouts.
|
||||
Permanent errors (recoverable=False): auth, bad request, context window, etc.
|
||||
"""
|
||||
try:
|
||||
from litellm.exceptions import (
|
||||
APIConnectionError,
|
||||
BadGatewayError,
|
||||
InternalServerError,
|
||||
ServiceUnavailableError,
|
||||
)
|
||||
|
||||
transient_types: tuple[type[BaseException], ...] = (
|
||||
APIConnectionError,
|
||||
InternalServerError,
|
||||
BadGatewayError,
|
||||
ServiceUnavailableError,
|
||||
TimeoutError,
|
||||
ConnectionError,
|
||||
OSError,
|
||||
)
|
||||
except ImportError:
|
||||
transient_types = (TimeoutError, ConnectionError, OSError)
|
||||
|
||||
return isinstance(exc, transient_types)
|
||||
|
||||
|
||||
class LiteLLMProvider(LLMProvider):
|
||||
"""
|
||||
LiteLLM-based LLM provider for multi-provider support.
|
||||
@@ -205,7 +291,7 @@ class LiteLLMProvider(LLMProvider):
|
||||
f"choices={len(response.choices) if response.choices else 0})"
|
||||
)
|
||||
return response
|
||||
wait = RATE_LIMIT_BACKOFF_BASE * (2**attempt)
|
||||
wait = _compute_retry_delay(attempt)
|
||||
logger.warning(
|
||||
f"[retry] {model} returned empty response "
|
||||
f"(finish_reason={finish_reason}, "
|
||||
@@ -236,7 +322,7 @@ class LiteLLMProvider(LLMProvider):
|
||||
f"Full request dumped to: {dump_path}"
|
||||
)
|
||||
raise
|
||||
wait = RATE_LIMIT_BACKOFF_BASE * (2**attempt)
|
||||
wait = _compute_retry_delay(attempt, exception=e)
|
||||
logger.warning(
|
||||
f"[retry] {model} rate limited (429): {e!s}. "
|
||||
f"~{token_count} tokens ({token_method}). "
|
||||
@@ -600,7 +686,7 @@ class LiteLLMProvider(LLMProvider):
|
||||
for event in tail_events:
|
||||
yield event
|
||||
return
|
||||
wait = RATE_LIMIT_BACKOFF_BASE * (2**attempt)
|
||||
wait = _compute_retry_delay(attempt)
|
||||
token_count, token_method = _estimate_tokens(
|
||||
self.model,
|
||||
full_messages,
|
||||
@@ -628,10 +714,10 @@ class LiteLLMProvider(LLMProvider):
|
||||
|
||||
except RateLimitError as e:
|
||||
if attempt < RATE_LIMIT_MAX_RETRIES:
|
||||
wait = RATE_LIMIT_BACKOFF_BASE * (2**attempt)
|
||||
wait = _compute_retry_delay(attempt, exception=e)
|
||||
logger.warning(
|
||||
f"[stream-retry] {self.model} rate limited (429): {e!s}. "
|
||||
f"Retrying in {wait}s "
|
||||
f"Retrying in {wait:.1f}s "
|
||||
f"(attempt {attempt + 1}/{RATE_LIMIT_MAX_RETRIES})"
|
||||
)
|
||||
await asyncio.sleep(wait)
|
||||
@@ -640,5 +726,6 @@ class LiteLLMProvider(LLMProvider):
|
||||
return
|
||||
|
||||
except Exception as e:
|
||||
yield StreamErrorEvent(error=str(e), recoverable=False)
|
||||
recoverable = _is_stream_transient_error(e)
|
||||
yield StreamErrorEvent(error=str(e), recoverable=recoverable)
|
||||
return
|
||||
|
||||
Reference in New Issue
Block a user