Compare commits

...

4 Commits

Author SHA1 Message Date
Richard Tang 8451b70d28 fix: google auth 2026-04-14 15:25:16 -07:00
Richard Tang 44c56f89ad fix: active account and handle events correctly 2026-04-14 12:42:14 -07:00
Richard Tang ba340f3dca Merge branch 'feature/new-colony' into feat/antigravity-v2 2026-04-14 11:53:12 -07:00
Richard Tang f48d632215 feat(wip): antigravity support v2 2026-04-10 08:31:07 -07:00
15 changed files with 2412 additions and 1362 deletions
+25 -94
View File
@@ -8,6 +8,7 @@ helper functions.
import json
import logging
import os
import time
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
@@ -96,6 +97,7 @@ _PROVIDER_CRED_MAP: dict[str, str] = {
"deepseek": "deepseek",
"kimi": "kimi",
"hive": "hive",
"google-gemini-cli": "gemini",
}
@@ -192,13 +194,17 @@ def get_worker_api_key() -> str | None:
except ImportError:
pass
if worker_llm.get("use_antigravity_subscription"):
if worker_llm.get("use_google_gemini_cli_subscription"):
try:
from framework.loader.agent_loader import get_antigravity_token
from framework.llm.google import _load_accounts_from_json, _do_token_refresh
token = get_antigravity_token()
if token:
return token
access, refresh, expires_at = _load_accounts_from_json()
if access and expires_at and time.time() < expires_at - 60:
return access
if refresh:
result = _do_token_refresh(refresh)
if result:
return result[0]
except ImportError:
pass
@@ -220,8 +226,8 @@ def get_worker_api_base() -> str | None:
return "https://chatgpt.com/backend-api/codex"
if worker_llm.get("use_kimi_code_subscription"):
return "https://api.kimi.com/coding"
if worker_llm.get("use_antigravity_subscription"):
# Antigravity uses AntigravityProvider directly — no api_base needed.
if worker_llm.get("use_google_gemini_cli_subscription"):
# Google Gemini CLI uses GoogleGeminiCliProvider directly — no api_base needed.
return None
if worker_llm.get("api_base"):
return worker_llm["api_base"]
@@ -363,14 +369,18 @@ def get_api_key() -> str | None:
except ImportError:
pass
# Antigravity subscription: read OAuth token from accounts JSON
if llm.get("use_antigravity_subscription"):
# Google Gemini CLI subscription: read OAuth token from accounts JSON
if llm.get("use_google_gemini_cli_subscription"):
try:
from framework.loader.agent_loader import get_antigravity_token
from framework.llm.google import _load_accounts_from_json, _do_token_refresh
token = get_antigravity_token()
if token:
return token
access, refresh, expires_at = _load_accounts_from_json()
if access and expires_at and time.time() < expires_at - 60:
return access
if refresh:
result = _do_token_refresh(refresh)
if result:
return result[0]
except ImportError:
pass
@@ -385,85 +395,6 @@ def get_api_key() -> str | None:
return _get_api_key_from_credential_store(llm.get("provider", ""))
# OAuth credentials for Antigravity are fetched from the opencode-antigravity-auth project.
# This project reverse-engineered and published the public OAuth credentials
# for Google's Antigravity/Cloud Code Assist API.
# Source: https://github.com/NoeFabris/opencode-antigravity-auth
_ANTIGRAVITY_CREDENTIALS_URL = (
"https://raw.githubusercontent.com/NoeFabris/opencode-antigravity-auth/dev/src/constants.ts"
)
_antigravity_credentials_cache: tuple[str | None, str | None] = (None, None)
def _fetch_antigravity_credentials() -> tuple[str | None, str | None]:
"""Fetch OAuth client ID and secret from the public npm package source on GitHub."""
global _antigravity_credentials_cache
if _antigravity_credentials_cache[0] and _antigravity_credentials_cache[1]:
return _antigravity_credentials_cache
import re
import urllib.request
try:
req = urllib.request.Request(
_ANTIGRAVITY_CREDENTIALS_URL, headers={"User-Agent": "Hive/1.0"}
)
with urllib.request.urlopen(req, timeout=10) as resp:
content = resp.read().decode("utf-8")
id_match = re.search(r'ANTIGRAVITY_CLIENT_ID\s*=\s*"([^"]+)"', content)
secret_match = re.search(r'ANTIGRAVITY_CLIENT_SECRET\s*=\s*"([^"]+)"', content)
client_id = id_match.group(1) if id_match else None
client_secret = secret_match.group(1) if secret_match else None
if client_id and client_secret:
_antigravity_credentials_cache = (client_id, client_secret)
return client_id, client_secret
except Exception as e:
logger.debug("Failed to fetch Antigravity credentials from public source: %s", e)
return None, None
def get_antigravity_client_id() -> str:
"""Return the Antigravity OAuth application client ID.
Checked in order:
1. ``ANTIGRAVITY_CLIENT_ID`` environment variable
2. ``llm.antigravity_client_id`` in ~/.hive/configuration.json
3. Fetch from public source (opencode-antigravity-auth project on GitHub)
"""
env = os.environ.get("ANTIGRAVITY_CLIENT_ID")
if env:
return env
cfg_val = get_hive_config().get("llm", {}).get("antigravity_client_id")
if cfg_val:
return cfg_val
# Fetch from public source
client_id, _ = _fetch_antigravity_credentials()
if client_id:
return client_id
raise RuntimeError("Could not obtain Antigravity OAuth client ID")
def get_antigravity_client_secret() -> str | None:
"""Return the Antigravity OAuth client secret.
Checked in order:
1. ``ANTIGRAVITY_CLIENT_SECRET`` environment variable
2. ``llm.antigravity_client_secret`` in ~/.hive/configuration.json
3. Fetch from public source (opencode-antigravity-auth project on GitHub)
Returns None when not found token refresh will be skipped and
the caller must use whatever access token is already available.
"""
env = os.environ.get("ANTIGRAVITY_CLIENT_SECRET")
if env:
return env
cfg_val = get_hive_config().get("llm", {}).get("antigravity_client_secret") or None
if cfg_val:
return cfg_val
# Fetch from public source
_, secret = _fetch_antigravity_credentials()
return secret
def get_gcu_enabled() -> bool:
"""Return whether GCU (browser automation) is enabled in user config."""
@@ -487,8 +418,8 @@ def get_api_base() -> str | None:
if llm.get("use_kimi_code_subscription"):
# Kimi Code uses an Anthropic-compatible endpoint (no /v1 suffix).
return "https://api.kimi.com/coding"
if llm.get("use_antigravity_subscription"):
# Antigravity uses AntigravityProvider directly — no api_base needed.
if llm.get("use_google_gemini_cli_subscription"):
# Google Gemini CLI uses GoogleGeminiCliProvider directly — no api_base needed.
return None
if llm.get("api_base"):
return llm["api_base"]
-706
View File
@@ -1,706 +0,0 @@
"""Antigravity (Google internal Cloud Code Assist) LLM provider.
Antigravity is Google's unified gateway API that routes requests to Gemini,
Claude, and GPT-OSS models through a single Gemini-style interface. It is
NOT the public ``generativelanguage.googleapis.com`` API.
Authentication uses Google OAuth2. Token refresh is done directly with the
OAuth client secret no local proxy required.
Credential sources (checked in order):
1. ``~/.hive/antigravity-accounts.json`` (native OAuth implementation)
2. Antigravity IDE SQLite state DB (macOS / Linux)
"""
from __future__ import annotations
import json
import logging
import re
import time
import uuid
from collections.abc import AsyncIterator, Callable, Iterator
from pathlib import Path
from typing import Any
from framework.llm.provider import LLMProvider, LLMResponse, Tool
from framework.llm.stream_events import (
FinishEvent,
StreamErrorEvent,
StreamEvent,
TextDeltaEvent,
TextEndEvent,
ToolCallEvent,
)
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
_TOKEN_URL = "https://oauth2.googleapis.com/token"
# Fallback order: daily sandbox → autopush sandbox → production
_ENDPOINTS = [
"https://daily-cloudcode-pa.sandbox.googleapis.com",
"https://autopush-cloudcode-pa.sandbox.googleapis.com",
"https://cloudcode-pa.googleapis.com",
]
_DEFAULT_PROJECT_ID = "rising-fact-p41fc"
_TOKEN_REFRESH_BUFFER_SECS = 60
# Credentials file in ~/.hive/ (native implementation)
_ACCOUNTS_FILE = Path.home() / ".hive" / "antigravity-accounts.json"
_IDE_STATE_DB_MAC = (
Path.home()
/ "Library"
/ "Application Support"
/ "Antigravity"
/ "User"
/ "globalStorage"
/ "state.vscdb"
)
_IDE_STATE_DB_LINUX = (
Path.home() / ".config" / "Antigravity" / "User" / "globalStorage" / "state.vscdb"
)
_IDE_STATE_DB_KEY = "antigravityUnifiedStateSync.oauthToken"
_BASE_HEADERS: dict[str, str] = {
# Mimic the Antigravity Electron app so the API accepts the request.
"User-Agent": (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 "
"(KHTML, like Gecko) Antigravity/1.18.3 Chrome/138.0.7204.235 "
"Electron/37.3.1 Safari/537.36"
),
"X-Goog-Api-Client": "google-cloud-sdk vscode_cloudshelleditor/0.1",
"Client-Metadata": '{"ideType":"ANTIGRAVITY","platform":"MACOS","pluginType":"GEMINI"}',
}
# ---------------------------------------------------------------------------
# Credential loading helpers
# ---------------------------------------------------------------------------
def _load_from_json_file() -> tuple[str | None, str | None, str, float]:
"""Read credentials from JSON accounts file.
Reads from ~/.hive/antigravity-accounts.json.
Returns ``(access_token | None, refresh_token | None, project_id, expires_at)``.
``expires_at`` is a Unix timestamp (seconds); 0.0 means unknown.
"""
if not _ACCOUNTS_FILE.exists():
return None, None, _DEFAULT_PROJECT_ID, 0.0
try:
with open(_ACCOUNTS_FILE, encoding="utf-8") as fh:
data = json.load(fh)
except (OSError, json.JSONDecodeError) as exc:
logger.debug("Failed to read Antigravity accounts file: %s", exc)
return None, None, _DEFAULT_PROJECT_ID, 0.0
accounts = data.get("accounts", [])
if not accounts:
return None, None, _DEFAULT_PROJECT_ID, 0.0
account = next((a for a in accounts if a.get("enabled", True) is not False), accounts[0])
schema_version = data.get("schemaVersion", 1)
if schema_version >= 4:
# V4 schema: refresh = "refreshToken|projectId[|managedProjectId]"
refresh_str = account.get("refresh", "")
parts = refresh_str.split("|") if refresh_str else []
refresh_token: str | None = parts[0] if parts else None
project_id = parts[1] if len(parts) >= 2 and parts[1] else _DEFAULT_PROJECT_ID
access_token: str | None = account.get("access")
expires_ms: int = account.get("expires", 0)
expires_at = float(expires_ms) / 1000.0 if expires_ms else 0.0
# Treat near-expiry tokens as absent so _ensure_token() triggers a refresh.
if access_token and expires_at and time.time() >= expires_at - _TOKEN_REFRESH_BUFFER_SECS:
access_token = None
expires_at = 0.0
return access_token, refresh_token, project_id, expires_at
else:
# V1V3 schema: plain accessToken / refreshToken fields
access_token = account.get("accessToken")
refresh_token = account.get("refreshToken")
# Estimate expiry from last_refresh + 1 h
last_refresh_str: str | None = data.get("last_refresh")
expires_at = 0.0
if last_refresh_str:
try:
from datetime import datetime # noqa: PLC0415
ts = datetime.fromisoformat(last_refresh_str.replace("Z", "+00:00")).timestamp()
expires_at = ts + 3600.0
if time.time() >= expires_at - _TOKEN_REFRESH_BUFFER_SECS:
access_token = None
except (ValueError, TypeError):
pass
return access_token, refresh_token, _DEFAULT_PROJECT_ID, expires_at
def _load_from_ide_db() -> tuple[str | None, str | None, float]:
"""Extract ``(access_token, refresh_token, expires_at)`` from the IDE SQLite DB."""
import base64 # noqa: PLC0415
import sqlite3 # noqa: PLC0415
for db_path in (_IDE_STATE_DB_MAC, _IDE_STATE_DB_LINUX):
if not db_path.exists():
continue
try:
con = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True)
try:
row = con.execute(
"SELECT value FROM ItemTable WHERE key = ?",
(_IDE_STATE_DB_KEY,),
).fetchone()
finally:
con.close()
if not row:
continue
blob = base64.b64decode(row[0])
candidates = re.findall(rb"[A-Za-z0-9+/=_\-]{40,}", blob)
access_token: str | None = None
refresh_token: str | None = None
for candidate in candidates:
try:
padded = candidate + b"=" * (-len(candidate) % 4)
inner = base64.urlsafe_b64decode(padded)
except Exception:
continue
if not access_token:
m = re.search(rb"ya29\.[A-Za-z0-9_\-\.]+", inner)
if m:
access_token = m.group(0).decode("ascii")
if not refresh_token:
m = re.search(rb"1//[A-Za-z0-9_\-\.]+", inner)
if m:
refresh_token = m.group(0).decode("ascii")
if access_token and refresh_token:
break
if access_token:
# Estimate expiry from DB mtime (IDE refreshes while running)
mtime = db_path.stat().st_mtime
expires_at = mtime + 3600.0
return access_token, refresh_token, expires_at
except Exception as exc:
logger.debug("Failed to read Antigravity IDE state DB: %s", exc)
continue
return None, None, 0.0
def _do_token_refresh(refresh_token: str) -> tuple[str, float] | None:
"""POST to Google OAuth endpoint and return ``(new_access_token, expires_at)``.
The client secret is sourced via ``get_antigravity_client_secret()`` (env var,
config file, or npm package fallback). When unavailable the refresh is attempted
without it Google will reject it for web-app clients, but the npm fallback in
``get_antigravity_client_secret()`` should ensure the secret is found at runtime.
Returns None when the HTTP request fails.
"""
from framework.config import get_antigravity_client_secret # noqa: PLC0415
client_secret = get_antigravity_client_secret()
if not client_secret:
logger.debug(
"Antigravity client secret not configured — attempting refresh without it. "
"Set ANTIGRAVITY_CLIENT_SECRET or run quickstart to configure."
)
import urllib.error # noqa: PLC0415
import urllib.parse # noqa: PLC0415
import urllib.request # noqa: PLC0415
from framework.config import get_antigravity_client_id # noqa: PLC0415
params: dict[str, str] = {
"grant_type": "refresh_token",
"refresh_token": refresh_token,
"client_id": get_antigravity_client_id(),
}
if client_secret:
params["client_secret"] = client_secret
body = urllib.parse.urlencode(params).encode("utf-8")
req = urllib.request.Request(
_TOKEN_URL,
data=body,
headers={"Content-Type": "application/x-www-form-urlencoded"},
method="POST",
)
try:
with urllib.request.urlopen(req, timeout=15) as resp: # noqa: S310
payload = json.loads(resp.read())
access_token: str = payload["access_token"]
expires_in: int = payload.get("expires_in", 3600)
logger.debug("Antigravity token refreshed successfully")
return access_token, time.time() + expires_in
except Exception as exc:
logger.debug("Antigravity token refresh failed: %s", exc)
return None
# ---------------------------------------------------------------------------
# Message conversion helpers
# ---------------------------------------------------------------------------
def _clean_tool_name(name: str) -> str:
"""Sanitize a tool name for the Antigravity function-calling schema."""
name = re.sub(r"[/\s]", "_", name)
if name and not (name[0].isalpha() or name[0] == "_"):
name = "_" + name
return name[:64]
def _to_gemini_contents(
messages: list[dict[str, Any]],
thought_sigs: dict[str, str] | None = None,
) -> list[dict[str, Any]]:
"""Convert OpenAI-format messages to Gemini-style ``contents`` array."""
# Pre-build a map tool_call_id → function_name from assistant messages.
# Tool result messages (role="tool") only carry tool_call_id, not the name,
# but Gemini requires functionResponse.name to match the functionCall.name.
tc_id_to_name: dict[str, str] = {}
for msg in messages:
if msg.get("role") == "assistant":
for tc in msg.get("tool_calls") or []:
tc_id = tc.get("id")
fn_name = tc.get("function", {}).get("name", "")
if tc_id and fn_name:
tc_id_to_name[tc_id] = fn_name
contents: list[dict[str, Any]] = []
# Consecutive tool-result messages must be batched into one user turn.
pending_tool_parts: list[dict[str, Any]] = []
def _flush_tool_results() -> None:
if pending_tool_parts:
contents.append({"role": "user", "parts": list(pending_tool_parts)})
pending_tool_parts.clear()
for msg in messages:
role = msg.get("role", "user")
content = msg.get("content")
if role == "system":
continue # Handled via systemInstruction, not in contents.
if role == "tool":
# OpenAI tool result → Gemini functionResponse part.
result_str = content if isinstance(content, str) else str(content or "")
tc_id = msg.get("tool_call_id", "")
# Look up function name from the pre-built map; fall back to msg.name.
fn_name = tc_id_to_name.get(tc_id) or msg.get("name", "")
pending_tool_parts.append(
{
"functionResponse": {
"name": fn_name,
"id": tc_id,
"response": {"content": result_str},
}
}
)
continue
_flush_tool_results()
gemini_role = "model" if role == "assistant" else "user"
parts: list[dict[str, Any]] = []
if isinstance(content, str) and content:
parts.append({"text": content})
elif isinstance(content, list):
for block in content:
if not isinstance(block, dict):
continue
if block.get("type") == "text":
text = block.get("text", "")
if text:
parts.append({"text": text})
# Other block types (image_url etc.) skipped.
# Assistant messages may carry OpenAI-style tool_calls.
for tc in msg.get("tool_calls") or []:
fn = tc.get("function", {})
try:
args = json.loads(fn.get("arguments", "{}") or "{}")
except (json.JSONDecodeError, TypeError):
args = {}
tc_id = tc.get("id", str(uuid.uuid4()))
fc_part: dict[str, Any] = {
"functionCall": {
"name": fn.get("name", ""),
"args": args,
"id": tc_id,
}
}
if thought_sigs:
sig = thought_sigs.get(tc_id, "")
if sig:
fc_part["thoughtSignature"] = sig # part-level, not inside functionCall
parts.append(fc_part)
if parts:
contents.append({"role": gemini_role, "parts": parts})
_flush_tool_results()
# Gemini requires the first turn to be a user turn. Drop any leading
# model messages so the API doesn't reject with a 400.
while contents and contents[0].get("role") == "model":
contents.pop(0)
return contents
# ---------------------------------------------------------------------------
# Response parsing helpers
# ---------------------------------------------------------------------------
def _map_finish_reason(reason: str) -> str:
return {"STOP": "stop", "MAX_TOKENS": "max_tokens", "OTHER": "tool_use"}.get(
(reason or "").upper(), "stop"
)
def _parse_complete_response(raw: dict[str, Any], model: str) -> LLMResponse:
"""Parse a non-streaming Antigravity response dict → LLMResponse."""
payload: dict[str, Any] = raw.get("response", raw)
candidates: list[dict[str, Any]] = payload.get("candidates", [])
usage: dict[str, Any] = payload.get("usageMetadata", {})
text_parts: list[str] = []
if candidates:
for part in candidates[0].get("content", {}).get("parts", []):
if "text" in part and not part.get("thought"):
text_parts.append(part["text"])
return LLMResponse(
content="".join(text_parts),
model=payload.get("modelVersion", model),
input_tokens=usage.get("promptTokenCount", 0),
output_tokens=usage.get("candidatesTokenCount", 0),
stop_reason=_map_finish_reason(candidates[0].get("finishReason", "") if candidates else ""),
raw_response=raw,
)
def _parse_sse_stream(
response: Any,
model: str,
on_thought_signature: Callable[[str, str], None] | None = None,
) -> Iterator[StreamEvent]:
"""Parse Antigravity SSE response line-by-line → StreamEvents.
Each SSE line looks like::
data: {"response": {"candidates": [...], "usageMetadata": {...}}, "traceId": "..."}
"""
accumulated = ""
input_tokens = 0
output_tokens = 0
finish_reason = ""
for raw_line in response:
line: str = raw_line.decode("utf-8", errors="replace").rstrip("\r\n")
if not line.startswith("data:"):
continue
data_str = line[5:].strip()
if not data_str or data_str == "[DONE]":
continue
try:
data: dict[str, Any] = json.loads(data_str)
except json.JSONDecodeError:
continue
# The outer envelope is {"response": {...}, "traceId": "..."}.
payload: dict[str, Any] = data.get("response", data)
usage = payload.get("usageMetadata", {})
if usage:
input_tokens = usage.get("promptTokenCount", input_tokens)
output_tokens = usage.get("candidatesTokenCount", output_tokens)
for candidate in payload.get("candidates", []):
fr = candidate.get("finishReason", "")
if fr:
finish_reason = fr
for part in candidate.get("content", {}).get("parts", []):
if "text" in part and not part.get("thought"):
delta: str = part["text"]
accumulated += delta
yield TextDeltaEvent(content=delta, snapshot=accumulated)
elif "functionCall" in part:
fc: dict[str, Any] = part["functionCall"]
tool_use_id = fc.get("id") or str(uuid.uuid4())
thought_sig = part.get("thoughtSignature", "") # sibling of functionCall
if thought_sig and on_thought_signature:
on_thought_signature(tool_use_id, thought_sig)
args = fc.get("args", {})
if isinstance(args, str):
try:
args = json.loads(args)
except json.JSONDecodeError:
args = {}
yield ToolCallEvent(
tool_use_id=tool_use_id,
tool_name=fc.get("name", ""),
tool_input=args,
)
if accumulated:
yield TextEndEvent(full_text=accumulated)
yield FinishEvent(
stop_reason=_map_finish_reason(finish_reason),
input_tokens=input_tokens,
output_tokens=output_tokens,
model=model,
)
# ---------------------------------------------------------------------------
# Provider
# ---------------------------------------------------------------------------
class AntigravityProvider(LLMProvider):
"""LLM provider for Google's internal Antigravity Code Assist gateway.
No local proxy required. Handles OAuth token refresh, Gemini-format
request/response conversion, and SSE streaming directly.
"""
def __init__(self, model: str = "gemini-3-flash") -> None:
# Strip any provider prefix ("openai/gemini-3-flash" → "gemini-3-flash").
if "/" in model:
model = model.split("/", 1)[1]
self.model = model
self._access_token: str | None = None
self._refresh_token: str | None = None
self._project_id: str = _DEFAULT_PROJECT_ID
self._token_expires_at: float = 0.0
self._thought_sigs: dict[str, str] = {} # tool_use_id → thoughtSignature
self._init_credentials()
# --- Credential management -------------------------------------------- #
def _init_credentials(self) -> None:
"""Load credentials from the best available source."""
access, refresh, project_id, expires_at = _load_from_json_file()
if refresh:
self._refresh_token = refresh
self._project_id = project_id
self._access_token = access
self._token_expires_at = expires_at
return
# Fall back to IDE state DB.
access, refresh, expires_at = _load_from_ide_db()
if access:
self._access_token = access
self._refresh_token = refresh
self._token_expires_at = expires_at
def has_credentials(self) -> bool:
"""Return True if any credential is available."""
return bool(self._access_token or self._refresh_token)
def _ensure_token(self) -> str:
"""Return a valid access token, refreshing via OAuth if needed."""
if (
self._access_token
and self._token_expires_at
and time.time() < self._token_expires_at - _TOKEN_REFRESH_BUFFER_SECS
):
return self._access_token
if self._refresh_token:
result = _do_token_refresh(self._refresh_token)
if result:
self._access_token, self._token_expires_at = result
return self._access_token
if self._access_token:
logger.warning("Using potentially stale Antigravity access token")
return self._access_token
raise RuntimeError(
"No valid Antigravity credentials. "
"Run: uv run python core/antigravity_auth.py auth account add"
)
# --- Request building -------------------------------------------------- #
def _build_body(
self,
messages: list[dict[str, Any]],
system: str,
tools: list[Tool] | None,
max_tokens: int,
) -> dict[str, Any]:
contents = _to_gemini_contents(messages, self._thought_sigs)
inner: dict[str, Any] = {
"contents": contents,
"generationConfig": {"maxOutputTokens": max_tokens},
}
if system:
inner["systemInstruction"] = {"parts": [{"text": system}]}
if tools:
inner["tools"] = [
{
"functionDeclarations": [
{
"name": _clean_tool_name(t.name),
"description": t.description,
"parameters": t.parameters
or {
"type": "object",
"properties": {},
},
}
for t in tools
]
}
]
return {
"project": self._project_id,
"model": self.model,
"request": inner,
"requestType": "agent",
"userAgent": "antigravity",
"requestId": f"agent-{uuid.uuid4()}",
}
# --- HTTP transport ---------------------------------------------------- #
def _post(self, body: dict[str, Any], *, streaming: bool) -> Any:
"""POST to the Antigravity endpoint, falling back through the endpoint list."""
import urllib.error # noqa: PLC0415
import urllib.request # noqa: PLC0415
token = self._ensure_token()
body_bytes = json.dumps(body).encode("utf-8")
path = (
"/v1internal:streamGenerateContent?alt=sse"
if streaming
else "/v1internal:generateContent"
)
headers = {
**_BASE_HEADERS,
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
}
if streaming:
headers["Accept"] = "text/event-stream"
last_exc: Exception | None = None
for base_url in _ENDPOINTS:
url = f"{base_url}{path}"
req = urllib.request.Request(url, data=body_bytes, headers=headers, method="POST")
try:
return urllib.request.urlopen(req, timeout=120) # noqa: S310
except urllib.error.HTTPError as exc:
if exc.code in (401, 403) and self._refresh_token:
# Token rejected — refresh once and retry this endpoint.
result = _do_token_refresh(self._refresh_token)
if result:
self._access_token, self._token_expires_at = result
headers["Authorization"] = f"Bearer {self._access_token}"
req2 = urllib.request.Request(
url, data=body_bytes, headers=headers, method="POST"
)
try:
return urllib.request.urlopen(req2, timeout=120) # noqa: S310
except urllib.error.HTTPError as exc2:
last_exc = exc2
continue
last_exc = exc
continue
elif exc.code >= 500:
last_exc = exc
continue
# Include the API response body in the exception for easier debugging.
try:
err_body = exc.read().decode("utf-8", errors="replace")
except Exception:
err_body = "(unreadable)"
raise RuntimeError(f"Antigravity HTTP {exc.code} from {url}: {err_body}") from exc
except (urllib.error.URLError, OSError) as exc:
last_exc = exc
continue
raise RuntimeError(
f"All Antigravity endpoints failed. Last error: {last_exc}"
) from last_exc
# --- LLMProvider interface --------------------------------------------- #
def complete(
self,
messages: list[dict[str, Any]],
system: str = "",
tools: list[Tool] | None = None,
max_tokens: int = 1024,
response_format: dict[str, Any] | None = None,
json_mode: bool = False,
max_retries: int | None = None,
) -> LLMResponse:
if json_mode:
suffix = "\n\nPlease respond with a valid JSON object."
system = (system + suffix) if system else suffix.strip()
body = self._build_body(messages, system, tools, max_tokens)
resp = self._post(body, streaming=False)
return _parse_complete_response(json.loads(resp.read()), self.model)
async def stream(
self,
messages: list[dict[str, Any]],
system: str = "",
tools: list[Tool] | None = None,
max_tokens: int = 4096,
) -> AsyncIterator[StreamEvent]:
import asyncio # noqa: PLC0415
import concurrent.futures # noqa: PLC0415
loop = asyncio.get_running_loop()
queue: asyncio.Queue[StreamEvent | None] = asyncio.Queue()
def _blocking_work() -> None:
try:
body = self._build_body(messages, system, tools, max_tokens)
http_resp = self._post(body, streaming=True)
for event in _parse_sse_stream(
http_resp, self.model, self._thought_sigs.__setitem__
):
loop.call_soon_threadsafe(queue.put_nowait, event)
except Exception as exc:
logger.error("Antigravity stream error: %s", exc)
loop.call_soon_threadsafe(queue.put_nowait, StreamErrorEvent(error=str(exc)))
finally:
loop.call_soon_threadsafe(queue.put_nowait, None) # sentinel
executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
fut = loop.run_in_executor(executor, _blocking_work)
try:
while True:
event = await queue.get()
if event is None:
break
yield event
finally:
await fut
executor.shutdown(wait=False)
File diff suppressed because it is too large Load Diff
+157 -4
View File
@@ -72,6 +72,93 @@
}
]
},
"google": {
"default_model": "gemini-3.1-pro-preview",
"models": [
{
"id": "gemini-3.1-pro-preview",
"label": "Gemini 3.1 Pro - Best quality",
"recommended": true,
"max_tokens": 32768,
"max_context_tokens": 1048576
},
{
"id": "gemini-3.1-pro-preview-customtools",
"label": "Gemini 3.1 Pro (Custom Tools) - API key only",
"recommended": false,
"max_tokens": 32768,
"max_context_tokens": 1048576
},
{
"id": "gemini-3-flash-preview",
"label": "Gemini 3 Flash - Fast",
"recommended": false,
"max_tokens": 32768,
"max_context_tokens": 1048576
},
{
"id": "gemini-3.1-flash-lite-preview",
"label": "Gemini 3.1 Flash Lite - Lightest",
"recommended": false,
"max_tokens": 32768,
"max_context_tokens": 1048576
},
{
"id": "gemini-2.5-pro",
"label": "Gemini 2.5 Pro - Proven quality",
"recommended": false,
"max_tokens": 32768,
"max_context_tokens": 1048576
},
{
"id": "gemini-2.5-flash",
"label": "Gemini 2.5 Flash - Fast + proven",
"recommended": false,
"max_tokens": 32768,
"max_context_tokens": 1048576
}
]
},
"google-gemini-cli": {
"default_model": "gemini-3.1-pro-preview",
"models": [
{
"id": "gemini-3.1-pro-preview",
"label": "Gemini 3.1 Pro - Best quality",
"recommended": true,
"max_tokens": 32768,
"max_context_tokens": 1048576
},
{
"id": "gemini-3-flash-preview",
"label": "Gemini 3 Flash - Fast",
"recommended": false,
"max_tokens": 32768,
"max_context_tokens": 1048576
},
{
"id": "gemini-3.1-flash-lite-preview",
"label": "Gemini 3.1 Flash Lite - Lightest",
"recommended": false,
"max_tokens": 32768,
"max_context_tokens": 1048576
},
{
"id": "gemini-2.5-pro",
"label": "Gemini 2.5 Pro - Proven quality",
"recommended": false,
"max_tokens": 32768,
"max_context_tokens": 1048576
},
{
"id": "gemini-2.5-flash",
"label": "Gemini 2.5 Flash - Fast + proven",
"recommended": false,
"max_tokens": 32768,
"max_context_tokens": 1048576
}
]
},
"groq": {
"default_model": "openai/gpt-oss-120b",
"models": [
@@ -384,11 +471,77 @@
}
]
},
"antigravity": {
"provider": "openai",
"model": "gemini-3-flash",
"google_api": {
"provider": "google",
"api_key_env_var": "GEMINI_API_KEY",
"model": "gemini-3-flash-preview",
"max_tokens": 32768,
"max_context_tokens": 1000000
"max_context_tokens": 1048576,
"model_choices": [
{
"id": "gemini-3.1-pro-preview",
"label": "Gemini 3.1 Pro - Best quality",
"recommended": true
},
{
"id": "gemini-3.1-pro-preview-customtools",
"label": "Gemini 3.1 Pro (Custom Tools) - API key only",
"recommended": false
},
{
"id": "gemini-3-flash-preview",
"label": "Gemini 3 Flash - Fast",
"recommended": false
},
{
"id": "gemini-3.1-flash-lite-preview",
"label": "Gemini 3.1 Flash Lite - Lightest",
"recommended": false
},
{
"id": "gemini-2.5-pro",
"label": "Gemini 2.5 Pro - Proven quality",
"recommended": false
},
{
"id": "gemini-2.5-flash",
"label": "Gemini 2.5 Flash - Fast + proven",
"recommended": false
}
]
},
"google_gemini_cli": {
"provider": "google-gemini-cli",
"model": "gemini-3-flash-preview",
"max_tokens": 32768,
"max_context_tokens": 1048576,
"model_choices": [
{
"id": "gemini-3.1-pro-preview",
"label": "Gemini 3.1 Pro - Best quality",
"recommended": true
},
{
"id": "gemini-3-flash-preview",
"label": "Gemini 3 Flash - Fast",
"recommended": false
},
{
"id": "gemini-3.1-flash-lite-preview",
"label": "Gemini 3.1 Flash Lite - Lightest",
"recommended": false
},
{
"id": "gemini-2.5-pro",
"label": "Gemini 2.5 Pro - Proven quality",
"recommended": false
},
{
"id": "gemini-2.5-flash",
"label": "Gemini 2.5 Flash - Fast + proven",
"recommended": false
}
]
},
"ollama_local": {
"provider": "ollama",
-301
View File
@@ -547,307 +547,6 @@ def get_kimi_code_token() -> str | None:
return None
# ---------------------------------------------------------------------------
# Antigravity subscription token helpers
# ---------------------------------------------------------------------------
# Antigravity IDE (native macOS/Linux app) stores OAuth tokens in its
# VSCode-style SQLite state database under the key
# "antigravityUnifiedStateSync.oauthToken" as a base64-encoded protobuf blob.
ANTIGRAVITY_IDE_STATE_DB = (
Path.home()
/ "Library"
/ "Application Support"
/ "Antigravity"
/ "User"
/ "globalStorage"
/ "state.vscdb"
)
# Linux fallback for the IDE state DB
ANTIGRAVITY_IDE_STATE_DB_LINUX = (
Path.home() / ".config" / "Antigravity" / "User" / "globalStorage" / "state.vscdb"
)
# Antigravity credentials stored by native OAuth implementation
ANTIGRAVITY_AUTH_FILE = Path.home() / ".hive" / "antigravity-accounts.json"
ANTIGRAVITY_OAUTH_TOKEN_URL = "https://oauth2.googleapis.com/token"
_ANTIGRAVITY_TOKEN_LIFETIME_SECS = 3600 # Google access tokens expire in 1 hour
_ANTIGRAVITY_IDE_STATE_DB_KEY = "antigravityUnifiedStateSync.oauthToken"
def _read_antigravity_ide_credentials() -> dict | None:
"""Read credentials from the Antigravity IDE's SQLite state database.
The Antigravity desktop IDE (VSCode-based) stores its OAuth token as a
base64-encoded protobuf blob in a SQLite database. The access token is
a standard Google OAuth ``ya29.*`` bearer token.
Returns:
Dict with ``accessToken`` and optionally ``refreshToken`` keys,
plus ``_source: "ide"`` to skip file-based save on refresh.
Returns None if the database is absent or the key is not found.
"""
import re
import sqlite3
for db_path in (ANTIGRAVITY_IDE_STATE_DB, ANTIGRAVITY_IDE_STATE_DB_LINUX):
if not db_path.exists():
continue
try:
con = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True)
try:
row = con.execute(
"SELECT value FROM ItemTable WHERE key = ?",
(_ANTIGRAVITY_IDE_STATE_DB_KEY,),
).fetchone()
finally:
con.close()
if not row:
continue
import base64
blob = base64.b64decode(row[0])
# The protobuf blob contains the access token (ya29.*) and
# refresh token (1//*) as length-prefixed UTF-8 strings.
# Decode the inner base64 layer and extract with regex.
inner_b64_candidates = re.findall(rb"[A-Za-z0-9+/=_\-]{40,}", blob)
access_token: str | None = None
refresh_token: str | None = None
for candidate in inner_b64_candidates:
try:
padded = candidate + b"=" * (-len(candidate) % 4)
inner = base64.urlsafe_b64decode(padded)
except Exception:
continue
if not access_token:
m = re.search(rb"ya29\.[A-Za-z0-9_\-\.]+", inner)
if m:
access_token = m.group(0).decode("ascii")
if not refresh_token:
m = re.search(rb"1//[A-Za-z0-9_\-\.]+", inner)
if m:
refresh_token = m.group(0).decode("ascii")
if access_token and refresh_token:
break
if access_token:
return {
"accounts": [
{
"accessToken": access_token,
"refreshToken": refresh_token or "",
}
],
"_source": "ide",
"_db_path": str(db_path),
}
except Exception as exc:
logger.debug("Failed to read Antigravity IDE state DB: %s", exc)
continue
return None
def _read_antigravity_credentials() -> dict | None:
"""Read Antigravity auth data from all supported credential sources.
Checks in order:
1. Antigravity IDE SQLite state database (native macOS/Linux app)
2. Native OAuth credentials file (~/.hive/antigravity-accounts.json)
Returns:
Auth data dict with an ``accounts`` list on success, None otherwise.
"""
# 1. Native Antigravity IDE (primary on macOS)
ide_creds = _read_antigravity_ide_credentials()
if ide_creds:
return ide_creds
# 2. Native OAuth credentials file
if ANTIGRAVITY_AUTH_FILE.exists():
try:
with open(ANTIGRAVITY_AUTH_FILE, encoding="utf-8") as f:
data = json.load(f)
accounts = data.get("accounts", [])
if accounts and isinstance(accounts[0], dict):
return data
except (json.JSONDecodeError, OSError):
pass
return None
def _is_antigravity_token_expired(auth_data: dict) -> bool:
"""Check whether the Antigravity access token is expired or near expiry.
For IDE-sourced credentials: uses the state DB's mtime as last_refresh
since the IDE keeps the DB fresh while it's running.
For JSON-sourced credentials: uses the ``last_refresh`` field or file mtime.
"""
import time
from datetime import datetime
now = time.time()
if auth_data.get("_source") == "ide":
# The IDE refreshes tokens automatically while running.
# Use the DB file's mtime as a proxy for when the token was last updated.
try:
db_path = Path(auth_data.get("_db_path", str(ANTIGRAVITY_IDE_STATE_DB)))
last_refresh: float = db_path.stat().st_mtime
except OSError:
return True
expires_at = last_refresh + _ANTIGRAVITY_TOKEN_LIFETIME_SECS
return now >= (expires_at - _TOKEN_REFRESH_BUFFER_SECS)
last_refresh_val: float | str | None = auth_data.get("last_refresh")
if last_refresh_val is None:
try:
last_refresh_val = ANTIGRAVITY_AUTH_FILE.stat().st_mtime
except OSError:
return True
elif isinstance(last_refresh_val, str):
try:
last_refresh_val = datetime.fromisoformat(
last_refresh_val.replace("Z", "+00:00")
).timestamp()
except (ValueError, TypeError):
return True
expires_at = float(last_refresh_val) + _ANTIGRAVITY_TOKEN_LIFETIME_SECS
return now >= (expires_at - _TOKEN_REFRESH_BUFFER_SECS)
def _refresh_antigravity_token(refresh_token: str) -> dict | None:
"""Refresh the Antigravity access token via Google OAuth.
POSTs form-encoded ``grant_type=refresh_token`` to the Google token
endpoint using Antigravity's public OAuth client ID.
Returns:
Parsed response dict (containing ``access_token``) on success,
None on any error.
"""
import urllib.error
import urllib.parse
import urllib.request
from framework.config import get_antigravity_client_id, get_antigravity_client_secret
client_id = get_antigravity_client_id()
client_secret = get_antigravity_client_secret()
params: dict = {
"grant_type": "refresh_token",
"refresh_token": refresh_token,
"client_id": client_id,
}
if client_secret:
params["client_secret"] = client_secret
data = urllib.parse.urlencode(params).encode("utf-8")
req = urllib.request.Request(
ANTIGRAVITY_OAUTH_TOKEN_URL,
data=data,
headers={"Content-Type": "application/x-www-form-urlencoded"},
method="POST",
)
try:
with urllib.request.urlopen(req, timeout=15) as resp: # noqa: S310
return json.loads(resp.read())
except (urllib.error.URLError, json.JSONDecodeError, TimeoutError, OSError) as exc:
logger.debug("Antigravity token refresh failed: %s", exc)
return None
def _save_refreshed_antigravity_credentials(auth_data: dict, token_data: dict) -> None:
"""Write refreshed tokens back to the Antigravity JSON credentials file.
Skipped for IDE-sourced credentials (the IDE manages its own DB).
Updates ``accounts[0].accessToken`` (and ``refreshToken`` if present),
then persists ``last_refresh`` as an ISO-8601 UTC string.
"""
from datetime import datetime
# IDE manages its own state — we do not write back to its SQLite DB
if auth_data.get("_source") == "ide":
return
try:
accounts = auth_data.get("accounts", [])
if not accounts:
return
account = accounts[0]
account["accessToken"] = token_data["access_token"]
if "refresh_token" in token_data:
account["refreshToken"] = token_data["refresh_token"]
auth_data["accounts"] = accounts
auth_data["last_refresh"] = datetime.now(UTC).isoformat()
ANTIGRAVITY_AUTH_FILE.parent.mkdir(parents=True, exist_ok=True)
fd = os.open(ANTIGRAVITY_AUTH_FILE, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o600)
with os.fdopen(fd, "w", encoding="utf-8") as f:
json.dump(auth_data, f, indent=2)
logger.debug("Antigravity credentials refreshed and saved")
except (OSError, KeyError) as exc:
logger.debug("Failed to save refreshed Antigravity credentials: %s", exc)
def get_antigravity_token() -> str | None:
"""Get the OAuth access token from an Antigravity subscription.
Credential sources checked in order:
1. Antigravity IDE SQLite state DB (native app, macOS/Linux)
2. antigravity-auth CLI JSON file
For IDE credentials the token is read directly (the IDE refreshes it
automatically while running). For JSON credentials an automatic OAuth
refresh is attempted when the token is near expiry.
Returns:
The ``ya29.*`` Google OAuth access token, or None if unavailable.
"""
auth_data = _read_antigravity_credentials()
if not auth_data:
return None
accounts = auth_data.get("accounts", [])
if not accounts:
return None
account = accounts[0]
access_token = account.get("accessToken")
if not access_token:
return None
if not _is_antigravity_token_expired(auth_data):
return access_token
# Token is expired or near expiry — attempt a refresh
refresh_token = account.get("refreshToken")
if not refresh_token:
logger.warning(
"Antigravity token expired and no refresh token available. "
"Re-open the Antigravity IDE to refresh, or run 'antigravity-auth accounts add'."
)
return access_token # return stale token; proxy may still accept it briefly
logger.info("Antigravity token expired or near expiry, refreshing...")
token_data = _refresh_antigravity_token(refresh_token)
if token_data and "access_token" in token_data:
_save_refreshed_antigravity_credentials(auth_data, token_data)
return token_data["access_token"]
logger.warning(
"Antigravity token refresh failed. "
"Re-open the Antigravity IDE or run 'antigravity-auth accounts add'."
)
return access_token
@dataclass
class AgentInfo:
+17 -5
View File
@@ -55,19 +55,31 @@ class LlmProviderStage(PipelineStage):
llm_config = config.get("llm", {})
api_base = llm_config.get("api_base")
# Check for Antigravity (special provider)
if llm_config.get("use_antigravity_subscription"):
# Check for Google Gemini CLI (OAuth subscription)
if llm_config.get("use_google_gemini_cli_subscription"):
try:
from framework.llm.antigravity import AntigravityProvider
from framework.llm.google import GoogleGeminiCliProvider
provider = AntigravityProvider(model=model)
provider = GoogleGeminiCliProvider(model=model)
if provider.has_credentials():
self.llm = provider
logger.info("[pipeline] LlmProviderStage: Antigravity")
logger.info("[pipeline] LlmProviderStage: Google Gemini CLI (OAuth)")
return
except Exception:
pass
# Check for Google direct API key provider
if str(llm_config.get("provider", "")).lower() == "google":
try:
from framework.llm.google import GoogleApiKeyProvider
api_key = get_api_key()
self.llm = GoogleApiKeyProvider(model=model, api_key=api_key)
logger.info("[pipeline] LlmProviderStage: Google (API key)")
return
except Exception:
pass
from framework.llm.litellm import LiteLLMProvider
api_key = get_api_key()
+24 -11
View File
@@ -90,10 +90,16 @@ _SUBSCRIPTION_DEFINITIONS: list[dict[str, str]] = [
"flag": "use_hive_llm_subscription",
},
{
"id": "antigravity",
"name": "Antigravity Subscription",
"description": "Use your Google/Gemini plan",
"flag": "use_antigravity_subscription",
"id": "google_api",
"name": "Google Gemini (API Key)",
"description": "Use your Gemini API key",
"flag": "use_google_api_subscription",
},
{
"id": "google_gemini_cli",
"name": "Google Gemini CLI",
"description": "Use your Google account (OAuth login)",
"flag": "use_google_gemini_cli_subscription",
},
]
@@ -231,12 +237,13 @@ def _detect_subscriptions() -> list[str]:
if os.environ.get("HIVE_API_KEY"):
detected.append("hive_llm")
# Antigravity subscription
# Google Gemini CLI subscription (OAuth)
try:
from framework.loader.agent_loader import get_antigravity_token
from framework.llm.google import GoogleGeminiCliProvider
if get_antigravity_token():
detected.append("antigravity")
provider = GoogleGeminiCliProvider()
if provider.has_credentials():
detected.append("google_gemini_cli")
except Exception:
pass
@@ -274,10 +281,16 @@ def _get_subscription_token(sub_id: str) -> str | None:
return token
elif sub_id == "hive_llm":
return os.environ.get("HIVE_API_KEY")
elif sub_id == "antigravity":
from framework.loader.agent_loader import get_antigravity_token
elif sub_id == "google_gemini_cli":
from framework.llm.google import GoogleGeminiCliProvider
return get_antigravity_token()
try:
provider = GoogleGeminiCliProvider()
if provider.has_credentials():
return provider._ensure_token()
except Exception:
pass
return None
return None
+14 -3
View File
@@ -147,10 +147,21 @@ class SessionManager:
rc = RuntimeConfig(model=model or self._model or RuntimeConfig().model)
llm_config = get_hive_config().get("llm", {})
if llm_config.get("use_antigravity_subscription"):
from framework.llm.antigravity import AntigravityProvider
return AntigravityProvider(model=rc.model)
# Google Gemini CLI (OAuth) subscription
if llm_config.get("use_google_gemini_cli_subscription"):
from framework.llm.google import GoogleGeminiCliProvider
return GoogleGeminiCliProvider(model=rc.model)
# Google direct API key provider
if str(llm_config.get("provider", "")).lower() == "google":
try:
from framework.llm.google import GoogleApiKeyProvider
return GoogleApiKeyProvider(model=rc.model, api_key=rc.api_key)
except RuntimeError:
pass
from framework.llm.litellm import LiteLLMProvider
+88 -101
View File
@@ -1,13 +1,13 @@
#!/usr/bin/env python3
"""Antigravity authentication CLI.
"""Google Gemini CLI authentication CLI.
Implements OAuth2 flow for Google's Antigravity Code Assist gateway.
Credentials are stored in ~/.hive/antigravity-accounts.json.
Implements OAuth2 flow for Google's Gemini API.
Credentials are stored in ~/.hive/google-gemini-cli-accounts.json.
Usage:
python -m antigravity_auth auth account add
python -m antigravity_auth auth account list
python -m antigravity_auth auth account remove <email>
python -m google_auth auth account add
python -m google_auth auth account list
python -m google_auth auth account remove <email>
"""
from __future__ import annotations
@@ -34,7 +34,7 @@ logger = logging.getLogger(__name__)
_OAUTH_AUTH_URL = "https://accounts.google.com/o/oauth2/v2/auth"
_OAUTH_TOKEN_URL = "https://oauth2.googleapis.com/token"
# Scopes for Antigravity/Cloud Code Assist
# Scopes for the public Gemini API (not Cloud Code Assist)
_OAUTH_SCOPES = [
"https://www.googleapis.com/auth/cloud-platform",
"https://www.googleapis.com/auth/userinfo.email",
@@ -42,97 +42,90 @@ _OAUTH_SCOPES = [
]
# Credentials file path in ~/.hive/
_ACCOUNTS_FILE = Path.home() / ".hive" / "antigravity-accounts.json"
_ACCOUNTS_FILE = Path.home() / ".hive" / "google-gemini-cli-accounts.json"
# Default project ID
_DEFAULT_PROJECT_ID = "rising-fact-p41fc"
_DEFAULT_REDIRECT_PORT = 51121
_DEFAULT_REDIRECT_PORT = 0 # 0 = use dynamic port (like official gemini-cli)
# OAuth credentials fetched from the opencode-antigravity-auth project.
# This project reverse-engineered and published the public OAuth credentials
# for Google's Antigravity/Cloud Code Assist API.
# Source: https://github.com/NoeFabris/opencode-antigravity-auth
# OAuth credentials fetched from the official gemini-cli source.
_CREDENTIALS_URL = (
"https://raw.githubusercontent.com/NoeFabris/opencode-antigravity-auth/dev/src/constants.ts"
"https://raw.githubusercontent.com/google-gemini/gemini-cli/main/packages/core/src/code_assist/oauth2.ts"
)
# Cached credentials fetched from public source
# Cached credentials
_cached_client_id: str | None = None
_cached_client_secret: str | None = None
def _fetch_credentials_from_public_source() -> tuple[str | None, str | None]:
"""Fetch OAuth client ID and secret from the public npm package source on GitHub."""
def _fetch_credentials() -> tuple[str | None, str | None]:
"""Fetch OAuth client ID and secret from the official gemini-cli repo."""
global _cached_client_id, _cached_client_secret
if _cached_client_id and _cached_client_secret:
return _cached_client_id, _cached_client_secret
try:
req = urllib.request.Request(
_CREDENTIALS_URL, headers={"User-Agent": "Hive-Antigravity-Auth/1.0"}
_CREDENTIALS_URL, headers={"User-Agent": "Hive-Google-Auth/1.0"}
)
with urllib.request.urlopen(req, timeout=10) as resp:
content = resp.read().decode("utf-8")
import re
import re
id_match = re.search(r'ANTIGRAVITY_CLIENT_ID\s*=\s*"([^"]+)"', content)
secret_match = re.search(r'ANTIGRAVITY_CLIENT_SECRET\s*=\s*"([^"]+)"', content)
if id_match:
_cached_client_id = id_match.group(1)
if secret_match:
_cached_client_secret = secret_match.group(1)
return _cached_client_id, _cached_client_secret
# Match official gemini-cli variable names.
# OAUTH_CLIENT_ID may span two lines in the TS source.
id_match = re.search(r"OAUTH_CLIENT_ID\s*=\s*\n?\s*'([a-z0-9\-\.]+\.apps\.googleusercontent\.com)'", content)
secret_match = re.search(r"OAUTH_CLIENT_SECRET\s*=\s*'([^']+)'", content)
if id_match:
_cached_client_id = id_match.group(1)
if secret_match:
_cached_client_secret = secret_match.group(1)
return _cached_client_id, _cached_client_secret
except Exception as e:
logger.debug(f"Failed to fetch credentials from public source: {e}")
logger.debug("Failed to fetch credentials from gemini-cli source: %s", e)
return None, None
def get_client_id() -> str:
"""Get OAuth client ID from env, config, or public source."""
env_id = os.environ.get("ANTIGRAVITY_CLIENT_ID")
"""Get OAuth client ID from env, config, or official source."""
env_id = os.environ.get("GOOGLE_GEMINI_CLI_CLIENT_ID")
if env_id:
return env_id
# Try hive config
hive_cfg = Path.home() / ".hive" / "configuration.json"
if hive_cfg.exists():
try:
with open(hive_cfg) as f:
cfg = json.load(f)
cfg_id = cfg.get("llm", {}).get("antigravity_client_id")
cfg_id = cfg.get("llm", {}).get("google_gemini_cli_client_id")
if cfg_id:
return cfg_id
except Exception:
pass
# Fetch from public source
client_id, _ = _fetch_credentials_from_public_source()
client_id, _ = _fetch_credentials()
if client_id:
return client_id
raise RuntimeError("Could not obtain Antigravity OAuth client ID")
raise RuntimeError("Could not obtain Google OAuth client ID")
def get_client_secret() -> str | None:
"""Get OAuth client secret from env, config, or public source."""
secret = os.environ.get("ANTIGRAVITY_CLIENT_SECRET")
"""Get OAuth client secret from env, config, or official source."""
secret = os.environ.get("GOOGLE_GEMINI_CLI_CLIENT_SECRET")
if secret:
return secret
# Try to read from hive config
hive_cfg = Path.home() / ".hive" / "configuration.json"
if hive_cfg.exists():
try:
with open(hive_cfg) as f:
cfg = json.load(f)
secret = cfg.get("llm", {}).get("antigravity_client_secret")
secret = cfg.get("llm", {}).get("google_gemini_cli_client_secret")
if secret:
return secret
except Exception:
pass
# Fetch from public source (npm package on GitHub)
_, secret = _fetch_credentials_from_public_source()
_, secret = _fetch_credentials()
return secret
@@ -152,12 +145,12 @@ class OAuthCallbackHandler(BaseHTTPRequestHandler):
error: str | None = None
def log_message(self, format: str, *args: Any) -> None:
pass # Suppress default logging
pass
def do_GET(self) -> None:
parsed = urllib.parse.urlparse(self.path)
if parsed.path == "/oauth-callback":
if parsed.path == "/oauth2callback":
query = urllib.parse.parse_qs(parsed.query)
if "error" in query:
@@ -182,7 +175,7 @@ class OAuthCallbackHandler(BaseHTTPRequestHandler):
self.end_headers()
html = f"""<!DOCTYPE html>
<html>
<head><title>Antigravity Auth</title></head>
<head><title>Google Gemini Auth</title></head>
<body style="font-family: system-ui; display: flex; align-items: center;
justify-content: center; height: 100vh; margin: 0; background: #1a1a2e;
color: #eee;">
@@ -196,7 +189,12 @@ class OAuthCallbackHandler(BaseHTTPRequestHandler):
def wait_for_callback(port: int, timeout: int = 300) -> tuple[str | None, str | None, str | None]:
"""Start local server and wait for OAuth callback."""
server = HTTPServer(("localhost", port), OAuthCallbackHandler)
# Reset class-level state from any previous auth attempt.
OAuthCallbackHandler.auth_code = None
OAuthCallbackHandler.state = None
OAuthCallbackHandler.error = None
server = HTTPServer(("127.0.0.1", port), OAuthCallbackHandler)
server.timeout = 1
start = time.time()
@@ -238,7 +236,7 @@ def exchange_code_for_tokens(
with urllib.request.urlopen(req, timeout=30) as resp:
return json.loads(resp.read())
except Exception as e:
logger.error(f"Token exchange failed: {e}")
logger.error("Token exchange failed: %s", e)
return None
@@ -272,40 +270,35 @@ def save_accounts(data: dict[str, Any]) -> None:
_ACCOUNTS_FILE.parent.mkdir(parents=True, exist_ok=True)
with open(_ACCOUNTS_FILE, "w") as f:
json.dump(data, f, indent=2)
logger.info(f"Saved credentials to {_ACCOUNTS_FILE}")
logger.info("Saved credentials to %s", _ACCOUNTS_FILE)
def validate_credentials(access_token: str, project_id: str = _DEFAULT_PROJECT_ID) -> bool:
"""Test if credentials work by making a simple API call to Antigravity.
def validate_credentials(access_token: str) -> bool:
"""Test if credentials work by calling :loadCodeAssist on Cloud Code Assist.
Returns True if credentials are valid, False otherwise.
Uses :loadCodeAssist rather than :generateContent because the latter
requires a fully onboarded ``project`` + ``user_prompt_id`` in the body
(see GoogleGeminiCliProvider) and costs model quota. :loadCodeAssist
only inspects the account and returns 200 for any valid OAuth token
which is exactly the signal we want here.
"""
endpoint = "https://daily-cloudcode-pa.sandbox.googleapis.com"
body = {
"project": project_id,
"model": "gemini-3-flash",
"request": {
"contents": [{"role": "user", "parts": [{"text": "hi"}]}],
"generationConfig": {"maxOutputTokens": 10},
body = json.dumps({
"metadata": {
"ideType": "IDE_UNSPECIFIED",
"platform": "PLATFORM_UNSPECIFIED",
"pluginType": "GEMINI",
},
"requestType": "agent",
"userAgent": "antigravity",
"requestId": "validation-test",
}
}).encode("utf-8")
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
"User-Agent": (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) Antigravity/1.18.3"
),
"X-Goog-Api-Client": "google-cloud-sdk vscode_cloudshelleditor/0.1",
}
try:
req = urllib.request.Request(
f"{endpoint}/v1internal:generateContent",
data=json.dumps(body).encode("utf-8"),
"https://cloudcode-pa.googleapis.com/v1internal:loadCodeAssist",
data=body,
headers=headers,
method="POST",
)
@@ -339,12 +332,12 @@ def refresh_access_token(
with urllib.request.urlopen(req, timeout=30) as resp:
return json.loads(resp.read())
except Exception as e:
logger.debug(f"Token refresh failed: {e}")
logger.debug("Token refresh failed: %s", e)
return None
def cmd_account_add(args: argparse.Namespace) -> int:
"""Add a new Antigravity account via OAuth2.
"""Add a new Google account via OAuth2.
First checks if valid credentials already exist. If so, validates them
and skips OAuth if they work. Otherwise, proceeds with OAuth flow.
@@ -361,25 +354,21 @@ def cmd_account_add(args: argparse.Namespace) -> int:
access_token = account.get("access")
refresh_token_str = account.get("refresh", "")
refresh_token = refresh_token_str.split("|")[0] if refresh_token_str else None
project_id = (
refresh_token_str.split("|")[1] if "|" in refresh_token_str else _DEFAULT_PROJECT_ID
)
email = account.get("email", "unknown")
expires_ms = account.get("expires", 0)
expires_at = expires_ms / 1000.0 if expires_ms else 0.0
# Check if token is expired or near expiry
if access_token and expires_at and time.time() < expires_at - 60:
# Token still valid, test it
logger.info(f"Found existing credentials for: {email}")
logger.info("Found existing credentials for: %s", email)
logger.info("Validating existing credentials...")
if validate_credentials(access_token, project_id):
logger.info("Credentials valid! Skipping OAuth.")
if validate_credentials(access_token):
logger.info("Credentials valid! Skipping OAuth.")
return 0
else:
logger.info("Credentials failed validation, refreshing...")
elif refresh_token:
logger.info(f"Found expired credentials for: {email}")
logger.info("Found expired credentials for: %s", email)
logger.info("Attempting token refresh...")
tokens = refresh_access_token(refresh_token, client_id, client_secret)
@@ -387,7 +376,6 @@ def cmd_account_add(args: argparse.Namespace) -> int:
new_access = tokens.get("access_token")
expires_in = tokens.get("expires_in", 3600)
if new_access:
# Update the account
account["access"] = new_access
account["expires"] = int((time.time() + expires_in) * 1000)
accounts_data["last_refresh"] = time.strftime(
@@ -395,10 +383,9 @@ def cmd_account_add(args: argparse.Namespace) -> int:
)
save_accounts(accounts_data)
# Validate the refreshed token
logger.info("Validating refreshed credentials...")
if validate_credentials(new_access, project_id):
logger.info("Credentials refreshed and validated!")
if validate_credentials(new_access):
logger.info("Credentials refreshed and validated!")
return 0
else:
logger.info("Refreshed token failed validation, proceeding with OAuth...")
@@ -409,13 +396,13 @@ def cmd_account_add(args: argparse.Namespace) -> int:
if not client_secret:
logger.warning(
"No client secret configured. Token refresh may fail.\n"
"Set ANTIGRAVITY_CLIENT_SECRET env var or add "
"'antigravity_client_secret' to ~/.hive/configuration.json"
"Set GOOGLE_GEMINI_CLI_CLIENT_SECRET env var or add "
"'google_gemini_cli_client_secret' to ~/.hive/configuration.json"
)
# Use fixed port and path matching Google's expected OAuth redirect URI
port = _DEFAULT_REDIRECT_PORT
redirect_uri = f"http://localhost:{port}/oauth-callback"
# Use dynamic port (matching official gemini-cli behavior)
port = _DEFAULT_REDIRECT_PORT or find_free_port()
redirect_uri = f"http://127.0.0.1:{port}/oauth2callback"
# Generate state for CSRF protection
state = secrets.token_urlsafe(16)
@@ -433,17 +420,17 @@ def cmd_account_add(args: argparse.Namespace) -> int:
auth_url = f"{_OAUTH_AUTH_URL}?{urllib.parse.urlencode(params)}"
logger.info("Opening browser for authentication...")
logger.info(f"If the browser doesn't open, visit: {auth_url}\n")
logger.info("If the browser doesn't open, visit:\n%s\n", auth_url)
# Open browser
webbrowser.open(auth_url)
# Wait for callback
logger.info(f"Listening for callback on port {port}...")
logger.info("Listening for callback on port %s...", port)
code, received_state, error = wait_for_callback(port)
if error:
logger.error(f"Authentication failed: {error}")
logger.error("Authentication failed: %s", error)
return 1
if not code:
@@ -472,7 +459,7 @@ def cmd_account_add(args: argparse.Namespace) -> int:
# Get user email
email = get_user_email(access_token)
if email:
logger.info(f"Authenticated as: {email}")
logger.info("Authenticated as: %s", email)
# Load existing accounts and add/update
accounts_data = load_accounts()
@@ -480,7 +467,7 @@ def cmd_account_add(args: argparse.Namespace) -> int:
# Build new account entry (V4 schema)
expires_ms = int((time.time() + expires_in) * 1000)
refresh_entry = f"{refresh_token}|{_DEFAULT_PROJECT_ID}"
refresh_entry = refresh_token or ""
new_account = {
"access": access_token,
@@ -494,17 +481,17 @@ def cmd_account_add(args: argparse.Namespace) -> int:
existing_idx = next((i for i, a in enumerate(accounts) if a.get("email") == email), None)
if existing_idx is not None:
accounts[existing_idx] = new_account
logger.info(f"Updated existing account: {email}")
logger.info("Updated existing account: %s", email)
else:
accounts.append(new_account)
logger.info(f"Added new account: {email}")
logger.info("Added new account: %s", email)
accounts_data["accounts"] = accounts
accounts_data["schemaVersion"] = 4
accounts_data["last_refresh"] = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
save_accounts(accounts_data)
logger.info("\nAuthentication complete!")
logger.info("\nAuthentication complete!")
return 0
@@ -515,14 +502,14 @@ def cmd_account_list(args: argparse.Namespace) -> int:
if not accounts:
logger.info("No accounts configured.")
logger.info("Run 'antigravity auth account add' to add one.")
logger.info("Run 'google_auth auth account add' to add one.")
return 0
logger.info("Configured accounts:\n")
for i, account in enumerate(accounts, 1):
email = account.get("email", "unknown")
enabled = "enabled" if account.get("enabled", True) else "disabled"
logger.info(f" {i}. {email} ({enabled})")
logger.info(" %s. %s (%s)", i, email, enabled)
return 0
@@ -537,18 +524,18 @@ def cmd_account_remove(args: argparse.Namespace) -> int:
accounts = [a for a in accounts if a.get("email") != email]
if len(accounts) == original_len:
logger.error(f"No account found with email: {email}")
logger.error("No account found with email: %s", email)
return 1
data["accounts"] = accounts
save_accounts(data)
logger.info(f"Removed account: {email}")
logger.info("Removed account: %s", email)
return 0
def main() -> int:
parser = argparse.ArgumentParser(
description="Antigravity authentication CLI",
description="Google Gemini CLI authentication CLI",
formatter_class=argparse.RawDescriptionHelpFormatter,
)
subparsers = parser.add_subparsers(dest="command", help="Commands")
@@ -1,10 +1,12 @@
"""Integration test: Run a real EventLoopNode against the Antigravity backend.
"""Integration test: Run a real EventLoopNode against the Google Gemini backend.
Run: .venv/bin/python core/tests/test_antigravity_eventloop.py
Run: uv run python core/tests/test_google_eventloop.py
Requires:
- ~/.hive/antigravity-accounts.json with valid credentials
(run 'uv run python core/antigravity_auth.py auth account add' to authenticate)
- ~/.hive/google-gemini-cli-accounts.json with valid credentials
(run 'uv run python core/google_auth.py auth account add' to authenticate)
OR
- GEMINI_API_KEY or GOOGLE_API_KEY environment variable
"""
import asyncio
@@ -15,37 +17,53 @@ from unittest.mock import MagicMock
sys.path.insert(0, "core")
logging.basicConfig(level=logging.WARNING, format="%(levelname)s %(name)s: %(message)s")
# Show our provider's retry/stream logs
logging.getLogger("framework.llm.litellm").setLevel(logging.DEBUG)
logging.getLogger("framework.llm.google").setLevel(logging.DEBUG)
from framework.config import RuntimeConfig # noqa: E402
from framework.graph.event_loop_node import EventLoopNode, LoopConfig # noqa: E402
from framework.graph.node import DataBuffer, NodeContext, NodeResult, NodeSpec # noqa: E402
from framework.agent_loop.agent_loop import AgentLoop as EventLoopNode # noqa: E402
from framework.agent_loop.internals.types import LoopConfig # noqa: E402
from framework.orchestrator.node import DataBuffer, NodeContext, NodeResult, NodeSpec # noqa: E402
from framework.llm.litellm import LiteLLMProvider # noqa: E402
def make_provider() -> LiteLLMProvider:
def make_provider():
cfg = RuntimeConfig()
if not cfg.api_key:
print("ERROR: No Antigravity token found.")
print(" 1. Run 'antigravity-auth accounts add' to authenticate.")
print(" 2. Run 'antigravity-auth serve' to start the local proxy.")
print(" 3. Configure Hive: run quickstart.sh and select option 7 (Antigravity).")
sys.exit(1)
print(f"Model : {cfg.model}")
print(f"Base : {cfg.api_base}")
print(f"Antigravity : {'localhost:8069' in (cfg.api_base or '')}")
return LiteLLMProvider(
model=cfg.model,
api_key=cfg.api_key,
api_base=cfg.api_base,
**cfg.extra_kwargs,
)
# Try Google Gemini CLI (OAuth) first
try:
from framework.llm.google import GoogleGeminiCliProvider
provider = GoogleGeminiCliProvider(model=cfg.model)
if provider.has_credentials():
print(f"Provider : Google Gemini CLI (OAuth)")
return provider
except Exception:
pass
# Try Google API key
try:
from framework.llm.google import GoogleApiKeyProvider
api_key = cfg.api_key
if api_key:
provider = GoogleApiKeyProvider(model=cfg.model, api_key=api_key)
print(f"Provider : Google Gemini (API key)")
return provider
except Exception:
pass
print("ERROR: No Google credentials found.")
print(" 1. Run 'uv run python core/google_auth.py auth account add' for OAuth.")
print(" 2. Or set GEMINI_API_KEY / GOOGLE_API_KEY for API key auth.")
print(" 3. Run quickstart.sh and select option 7 or 8.")
sys.exit(1)
def make_context(
llm: LiteLLMProvider,
llm,
*,
node_id: str = "test",
system_prompt: str = "You are a helpful assistant.",
@@ -83,9 +101,7 @@ def make_context(
)
async def run_test(
name: str, llm: LiteLLMProvider, system: str, output_keys: list[str]
) -> NodeResult:
async def run_test(name: str, llm, system: str, output_keys: list[str]) -> NodeResult:
print(f"\n{'=' * 60}")
print(f"TEST: {name}")
print(f"{'=' * 60}")
@@ -112,7 +128,6 @@ async def main():
llm = make_provider()
print()
# Test 1: Simple text output — the node should call set_output to fill "answer"
r1 = await run_test(
name="Simple text generation",
llm=llm,
@@ -124,7 +139,6 @@ async def main():
output_keys=["answer"],
)
# Test 2: If test 1 failed, try bare stream() to isolate the issue
if not r1.success:
print(f"\n{'=' * 60}")
print("FALLBACK: Testing bare provider.stream() directly")
+106 -43
View File
@@ -1089,9 +1089,16 @@ $hiveKey = [System.Environment]::GetEnvironmentVariable("HIVE_API_KEY", "User")
if (-not $hiveKey) { $hiveKey = $env:HIVE_API_KEY }
if ($hiveKey) { $HiveCredDetected = $true }
$AntigravityCredDetected = $false
$antigravityAuthPath = Join-Path $env:USERPROFILE ".hive\antigravity-accounts.json"
if (Test-Path $antigravityAuthPath) { $AntigravityCredDetected = $true }
$GoogleGeminiCliCredDetected = $false
$googleGeminiCliAuthPath = Join-Path $env:USERPROFILE ".hive\google-gemini-cli-accounts.json"
if (Test-Path $googleGeminiCliAuthPath) { $GoogleGeminiCliCredDetected = $true }
$GoogleApiCredDetected = $false
$googleApiKey = [System.Environment]::GetEnvironmentVariable("GEMINI_API_KEY", "User")
if (-not $googleApiKey) { $googleApiKey = [System.Environment]::GetEnvironmentVariable("GEMINI_API_KEY", "Process") }
if (-not $googleApiKey) { $googleApiKey = [System.Environment]::GetEnvironmentVariable("GOOGLE_API_KEY", "User") }
if (-not $googleApiKey) { $googleApiKey = [System.Environment]::GetEnvironmentVariable("GOOGLE_API_KEY", "Process") }
if ($googleApiKey) { $GoogleApiCredDetected = $true }
# Detect API key providers
$ProviderMenuEnvVars = @("ANTHROPIC_API_KEY", "OPENAI_API_KEY", "GEMINI_API_KEY", "GROQ_API_KEY", "CEREBRAS_API_KEY", "OPENROUTER_API_KEY")
@@ -1135,7 +1142,7 @@ if (Test-Path $HiveConfigFile) {
if ($prevLlm.use_claude_code_subscription) { $PrevSubMode = "claude_code" }
elseif ($prevLlm.use_codex_subscription) { $PrevSubMode = "codex" }
elseif ($prevLlm.use_kimi_code_subscription) { $PrevSubMode = "kimi_code" }
elseif ($prevLlm.use_antigravity_subscription) { $PrevSubMode = "antigravity" }
elseif ($prevLlm.use_google_gemini_cli_subscription) { $PrevSubMode = "google_gemini_cli" }
elseif ($prevLlm.api_base -and $prevLlm.api_base -like "*api.z.ai*") { $PrevSubMode = "zai_code" }
elseif ($prevLlm.provider -eq "minimax" -or ($prevLlm.api_base -and $prevLlm.api_base -like "*api.minimax.io*")) { $PrevSubMode = "minimax_code" }
elseif ($prevLlm.api_base -and $prevLlm.api_base -like "*api.kimi.com*") { $PrevSubMode = "kimi_code" }
@@ -1155,7 +1162,8 @@ if ($PrevSubMode -or $PrevProvider) {
"minimax_code" { if ($MinimaxCredDetected) { $prevCredValid = $true } }
"kimi_code" { if ($KimiCredDetected) { $prevCredValid = $true } }
"hive_llm" { if ($HiveCredDetected) { $prevCredValid = $true } }
"antigravity" { if ($AntigravityCredDetected) { $prevCredValid = $true } }
"google_api" { if ($GoogleApiCredDetected) { $prevCredValid = $true } }
"google_gemini_cli" { if ($GoogleGeminiCliCredDetected) { $prevCredValid = $true } }
default {
if ($PrevProvider -eq "ollama") {
$prevCredValid = $true
@@ -1174,17 +1182,18 @@ if ($PrevSubMode -or $PrevProvider) {
"minimax_code" { $DefaultChoice = "4" }
"kimi_code" { $DefaultChoice = "5" }
"hive_llm" { $DefaultChoice = "6" }
"antigravity" { $DefaultChoice = "7" }
"google_api" { $DefaultChoice = "7" }
"google_gemini_cli" { $DefaultChoice = "8" }
}
if (-not $DefaultChoice) {
switch ($PrevProvider) {
"anthropic" { $DefaultChoice = "8" }
"openai" { $DefaultChoice = "9" }
"gemini" { $DefaultChoice = "10" }
"groq" { $DefaultChoice = "11" }
"cerebras" { $DefaultChoice = "12" }
"openrouter" { $DefaultChoice = "13" }
"ollama" { $DefaultChoice = "14" }
"anthropic" { $DefaultChoice = "9" }
"openai" { $DefaultChoice = "10" }
"gemini" { $DefaultChoice = "11" }
"groq" { $DefaultChoice = "12" }
"cerebras" { $DefaultChoice = "13" }
"openrouter" { $DefaultChoice = "14" }
"ollama" { $DefaultChoice = "15" }
"minimax" { $DefaultChoice = "4" }
"kimi" { $DefaultChoice = "5" }
"hive" { $DefaultChoice = "6" }
@@ -1253,19 +1262,26 @@ switch ($HiveGatewayAvailability) {
}
if ($HiveCredDetected) { Write-Color -Text " (credential detected)" -Color Green } else { Write-Host "" }
# 7) Antigravity
# 7) Google Gemini (API Key)
Write-Host " " -NoNewline
Write-Color -Text "7" -Color Cyan -NoNewline
Write-Host ") Antigravity Subscription " -NoNewline
Write-Host ") Google Gemini (API Key) " -NoNewline
Write-Color -Text "(use your Google AI Studio key)" -Color DarkGray -NoNewline
if ($GoogleApiCredDetected) { Write-Color -Text " (credential detected)" -Color Green } else { Write-Host "" }
# 8) Google Gemini CLI (Google Account)
Write-Host " " -NoNewline
Write-Color -Text "8" -Color Cyan -NoNewline
Write-Host ") Google Gemini CLI (Google Account) " -NoNewline
Write-Color -Text "(use your Google/Gemini plan)" -Color DarkGray -NoNewline
if ($AntigravityCredDetected) { Write-Color -Text " (credential detected)" -Color Green } else { Write-Host "" }
if ($GoogleGeminiCliCredDetected) { Write-Color -Text " (credential detected)" -Color Green } else { Write-Host "" }
Write-Host ""
Write-Color -Text " API key providers:" -Color Cyan
# 8-13) API key providers
# 9-14) API key providers
for ($idx = 0; $idx -lt $ProviderMenuEnvVars.Count; $idx++) {
$num = $idx + 8
$num = $idx + 9
$envVal = [System.Environment]::GetEnvironmentVariable($ProviderMenuEnvVars[$idx], "Process")
if (-not $envVal) { $envVal = [System.Environment]::GetEnvironmentVariable($ProviderMenuEnvVars[$idx], "User") }
Write-Host " " -NoNewline
@@ -1274,9 +1290,9 @@ for ($idx = 0; $idx -lt $ProviderMenuEnvVars.Count; $idx++) {
if ($envVal) { Write-Color -Text " (credential detected)" -Color Green } else { Write-Host "" }
}
# 14) Local (Ollama) - no API key needed
# 15) Local (Ollama) - no API key needed
Write-Host " " -NoNewline
Write-Color -Text "14" -Color Cyan -NoNewline
Write-Color -Text "15" -Color Cyan -NoNewline
if ($OllamaDetected) {
Write-Host ") Local (Ollama) - No API key needed " -NoNewline
Write-Color -Text "(ollama detected)" -Color Green
@@ -1284,7 +1300,7 @@ if ($OllamaDetected) {
Write-Host ") Local (Ollama) - No API key needed"
}
$SkipChoice = 8 + $ProviderMenuEnvVars.Count + 1
$SkipChoice = 9 + $ProviderMenuEnvVars.Count + 1
Write-Host " " -NoNewline
Write-Color -Text "$SkipChoice" -Color Cyan -NoNewline
Write-Host ") Skip for now"
@@ -1417,24 +1433,38 @@ switch ($num) {
Write-Color -Text " Model: $SelectedModel | API: $HiveLlmEndpoint" -Color DarkGray
}
7 {
# Antigravity Subscription
if (-not $AntigravityCredDetected) {
# Google Gemini (API Key)
$SubscriptionMode = "google_api"
Apply-Preset "google_api"
$SelectedEnvVar = "GEMINI_API_KEY"
$SelectedProviderId = "google"
$providerName = "Google Gemini"
$signupUrl = "https://aistudio.google.com/apikey"
if ($env:GEMINI_API_KEY -or $env:GOOGLE_API_KEY) {
Write-Host ""
Write-Color -Text " Setting up Antigravity authentication..." -Color Cyan
Write-Ok "Using Google Gemini (API Key)"
Write-Color -Text " Found existing API key" -Color DarkGray
}
}
8 {
# Google Gemini CLI (Google Account) - OAuth
if (-not $GoogleGeminiCliCredDetected) {
Write-Host ""
Write-Color -Text " Setting up Google Gemini CLI authentication..." -Color Cyan
Write-Host ""
Write-Warn "A browser window will open for Google OAuth."
Write-Host " Sign in with your Google account that has Antigravity access."
Write-Host " Sign in with your Google account that has Google Gemini access."
Write-Host ""
try {
$null = & $UvCmd run python (Join-Path $ScriptDir "core\antigravity_auth.py") auth account add 2>&1
if ($LASTEXITCODE -eq 0 -and (Test-Path $antigravityAuthPath)) {
$AntigravityCredDetected = $true
$null = & $UvCmd run python (Join-Path $ScriptDir "core\google_auth.py") auth account add 2>&1
if ($LASTEXITCODE -eq 0 -and (Test-Path $googleGeminiCliAuthPath)) {
$GoogleGeminiCliCredDetected = $true
}
} catch {
$AntigravityCredDetected = $false
$GoogleGeminiCliCredDetected = $false
}
if (-not $AntigravityCredDetected) {
if (-not $GoogleGeminiCliCredDetected) {
Write-Host ""
Write-Fail "Authentication failed or was cancelled."
Write-Host ""
@@ -1442,19 +1472,48 @@ switch ($num) {
}
}
if ($AntigravityCredDetected) {
$SubscriptionMode = "antigravity"
Apply-Preset "antigravity"
if ($GoogleGeminiCliCredDetected) {
$SubscriptionMode = "google_gemini_cli"
Apply-Preset "google_gemini_cli"
Write-Host ""
Write-Warn "Using Antigravity can technically cause your account suspension. Please use at your own risk."
Write-Warn "Using Google Gemini CLI can technically cause your account suspension. Please use at your own risk."
Write-Host ""
Write-Ok "Using Antigravity subscription"
Write-Color -Text " Model: gemini-3-flash | Direct OAuth (no proxy required)" -Color DarkGray
Write-Ok "Using Google Gemini CLI subscription"
Write-Host ""
Write-Host " Select a model:"
$agChoices = Get-PresetModelChoices "google_gemini_cli"
$agDefaultChoice = "1"
for ($i = 0; $i -lt $agChoices.Count; $i++) {
Write-Host " " -NoNewline
Write-Color -Text "$($i + 1))" -Color Cyan -NoNewline
Write-Host " $($agChoices[$i].label)" -NoNewline
if ($agChoices[$i].recommended -eq $true) {
$agDefaultChoice = [string]($i + 1)
Write-Host " " -NoNewline
Write-Color -Text "(default)" -Color DarkGray
} else {
Write-Host ""
}
}
Write-Host ""
while ($true) {
$agModelChoice = Read-Host " Enter model choice (1-$($agChoices.Count)) [$agDefaultChoice]"
if (-not $agModelChoice) { $agModelChoice = $agDefaultChoice }
if ($agModelChoice -match '^\d+$') {
$choiceNum = [int]$agModelChoice
if ($choiceNum -ge 1 -and $choiceNum -le $agChoices.Count) {
$SelectedModel = [string]$agChoices[$choiceNum - 1].id
break
}
}
Write-Color -Text "Invalid choice. Please enter 1-$($agChoices.Count)" -Color Red
}
Write-Color -Text " Model: $SelectedModel | Direct OAuth (no proxy required)" -Color DarkGray
}
}
{ $_ -ge 8 -and $_ -le 13 } {
{ $_ -ge 9 -and $_ -le 14 } {
# API key providers
$provIdx = $num - 8
$provIdx = $num - 9
$SelectedEnvVar = $ProviderMenuEnvVars[$provIdx]
$SelectedProviderId = $ProviderMenuIds[$provIdx]
$providerName = $ProviderMenuNames[$provIdx] -replace ' - .*', '' # strip description
@@ -1534,7 +1593,7 @@ switch ($num) {
}
}
}
14 {
15 {
# Local (Ollama)
if (-not $OllamaDetected) {
Write-Host ""
@@ -1908,8 +1967,10 @@ if ($SelectedProviderId) {
} elseif ($SubscriptionMode -eq "codex") {
$config.llm["use_codex_subscription"] = $true
if ($SelectedApiBase) { $config.llm["api_base"] = $SelectedApiBase }
} elseif ($SubscriptionMode -eq "antigravity") {
$config.llm["use_antigravity_subscription"] = $true
} elseif ($SubscriptionMode -eq "google_api") {
$config.llm["api_key_env_var"] = $SelectedEnvVar
} elseif ($SubscriptionMode -eq "google_gemini_cli") {
$config.llm["use_google_gemini_cli_subscription"] = $true
} elseif ($SubscriptionMode -eq "zai_code") {
$config.llm["api_base"] = $SelectedApiBase
$config.llm["api_key_env_var"] = $SelectedEnvVar
@@ -2236,8 +2297,10 @@ if ($SelectedProviderId) {
Write-Color -Text " API: api.minimax.io/v1 (OpenAI-compatible)" -Color DarkGray
} elseif ($SubscriptionMode -eq "codex") {
Write-Ok "OpenAI Codex Subscription -> $SelectedModel"
} elseif ($SubscriptionMode -eq "antigravity") {
Write-Ok "Antigravity Subscription -> $SelectedModel"
} elseif ($SubscriptionMode -eq "google_api") {
Write-Ok "Google Gemini (API Key) -> $SelectedModel"
} elseif ($SubscriptionMode -eq "google_gemini_cli") {
Write-Ok "Google Gemini CLI -> $SelectedModel"
Write-Color -Text " Direct OAuth (no proxy required)" -Color DarkGray
} elseif ($SelectedProviderId -eq "openrouter") {
Write-Ok "OpenRouter API Key -> $SelectedModel"
+126 -67
View File
@@ -888,7 +888,7 @@ prompt_model_selection() {
}
# Function to save configuration
# Args: provider_id env_var model max_tokens max_context_tokens [use_claude_code_sub] [api_base] [use_codex_sub] [use_antigravity_sub]
# Args: provider_id env_var model max_tokens max_context_tokens [use_claude_code_sub] [api_base] [use_codex_sub] [use_google_gemini_cli_sub]
save_configuration() {
local provider_id="$1"
local env_var="$2"
@@ -898,7 +898,7 @@ save_configuration() {
local use_claude_code_sub="${6:-}"
local api_base="${7:-}"
local use_codex_sub="${8:-}"
local use_antigravity_sub="${9:-}"
local use_google_gemini_cli_sub="${9:-}"
# Fallbacks if not provided
if [ -z "$model" ]; then
@@ -920,7 +920,7 @@ save_configuration() {
"$use_claude_code_sub" \
"$api_base" \
"$use_codex_sub" \
"$use_antigravity_sub" \
"$use_google_gemini_cli_sub" \
"$(date -u +"%Y-%m-%dT%H:%M:%S+00:00")" 2>/dev/null <<'PY'
import json
import sys
@@ -935,7 +935,7 @@ from pathlib import Path
use_claude_code_sub,
api_base,
use_codex_sub,
use_antigravity_sub,
use_google_gemini_cli_sub,
created_at,
) = sys.argv[1:11]
@@ -970,22 +970,22 @@ if use_codex_sub == "true":
else:
config["llm"].pop("use_codex_subscription", None)
if use_antigravity_sub == "true":
config["llm"]["use_antigravity_subscription"] = True
if use_google_gemini_cli_sub == "true":
config["llm"]["use_google_gemini_cli_subscription"] = True
config["llm"].pop("api_key_env_var", None)
# Store the Antigravity OAuth client secret so token refresh works
# Store the Google Gemini CLI OAuth client secret so token refresh works
# without hardcoding it in source code (read at runtime via config.py).
import os as _os
_secret = _os.environ.get("ANTIGRAVITY_CLIENT_SECRET") or ""
_secret = _os.environ.get("GOOGLE_GEMINI_CLI_CLIENT_SECRET") or ""
if _secret:
config["llm"]["antigravity_client_secret"] = _secret
_client_id = _os.environ.get("ANTIGRAVITY_CLIENT_ID") or ""
config["llm"]["google_gemini_cli_client_secret"] = _secret
_client_id = _os.environ.get("GOOGLE_GEMINI_CLI_CLIENT_ID") or ""
if _client_id:
config["llm"]["antigravity_client_id"] = _client_id
config["llm"]["google_gemini_cli_client_id"] = _client_id
else:
config["llm"].pop("use_antigravity_subscription", None)
config["llm"].pop("antigravity_client_secret", None)
config["llm"].pop("antigravity_client_id", None)
config["llm"].pop("use_google_gemini_cli_subscription", None)
config["llm"].pop("google_gemini_cli_client_secret", None)
config["llm"].pop("google_gemini_cli_client_id", None)
if api_base:
config["llm"]["api_base"] = api_base
@@ -1016,7 +1016,7 @@ SELECTED_ENV_VAR="" # Will hold the chosen env var
SELECTED_MODEL="" # Will hold the chosen model ID
SELECTED_MAX_TOKENS=8192 # Will hold the chosen max_tokens (output limit)
SELECTED_MAX_CONTEXT_TOKENS=120000 # Will hold the chosen max_context_tokens (input history budget)
SUBSCRIPTION_MODE="" # "claude_code" | "codex" | "zai_code" | ""
SUBSCRIPTION_MODE="" # "claude_code" | "codex" | "zai_code" | "google_api" | "google_gemini_cli" | ""
# ── Credential detection (silent — just set flags) ───────────
CLAUDE_CRED_DETECTED=false
@@ -1055,15 +1055,15 @@ if [ -n "${HIVE_API_KEY:-}" ]; then
HIVE_CRED_DETECTED=true
fi
ANTIGRAVITY_CRED_DETECTED=false
# Check native Antigravity IDE (macOS/Linux) SQLite state DB first
if [ -f "$HOME/Library/Application Support/Antigravity/User/globalStorage/state.vscdb" ]; then
ANTIGRAVITY_CRED_DETECTED=true
elif [ -f "$HOME/.config/Antigravity/User/globalStorage/state.vscdb" ]; then
ANTIGRAVITY_CRED_DETECTED=true
GOOGLE_API_CRED_DETECTED=false
if [ -n "${GEMINI_API_KEY:-}" ] || [ -n "${GOOGLE_API_KEY:-}" ]; then
GOOGLE_API_CRED_DETECTED=true
fi
GOOGLE_GEMINI_CLI_CRED_DETECTED=false
# Native OAuth credentials
elif [ -f "$HOME/.hive/antigravity-accounts.json" ]; then
ANTIGRAVITY_CRED_DETECTED=true
if [ -f "$HOME/.hive/google-gemini-cli-accounts.json" ]; then
GOOGLE_GEMINI_CLI_CRED_DETECTED=true
fi
OLLAMA_DETECTED=false
@@ -1122,8 +1122,8 @@ try:
sub = "codex"
elif llm.get("use_kimi_code_subscription"):
sub = "kimi_code"
elif llm.get("use_antigravity_subscription"):
sub = "antigravity"
elif llm.get("use_google_gemini_cli_subscription"):
sub = "google_gemini_cli"
elif llm.get("provider", "") == "minimax" or "api.minimax.io" in llm.get("api_base", ""):
sub = "minimax_code"
elif llm.get("provider", "") == "hive" or "adenhq.com" in llm.get("api_base", ""):
@@ -1147,7 +1147,8 @@ if [ -n "$PREV_SUB_MODE" ] || [ -n "$PREV_PROVIDER" ]; then
codex) [ "$CODEX_CRED_DETECTED" = true ] && PREV_CRED_VALID=true ;;
kimi_code) [ "$KIMI_CRED_DETECTED" = true ] && PREV_CRED_VALID=true ;;
hive_llm) [ "$HIVE_CRED_DETECTED" = true ] && PREV_CRED_VALID=true ;;
antigravity) [ "$ANTIGRAVITY_CRED_DETECTED" = true ] && PREV_CRED_VALID=true ;;
google_api) [ "$GOOGLE_API_CRED_DETECTED" = true ] && PREV_CRED_VALID=true ;;
google_gemini_cli) [ "$GOOGLE_GEMINI_CLI_CRED_DETECTED" = true ] && PREV_CRED_VALID=true ;;
*)
# API key provider — check if the env var is set; ollama uses local runtime detection
if [ "$PREV_PROVIDER" = "ollama" ]; then
@@ -1168,17 +1169,18 @@ if [ -n "$PREV_SUB_MODE" ] || [ -n "$PREV_PROVIDER" ]; then
minimax_code) DEFAULT_CHOICE=4 ;;
kimi_code) DEFAULT_CHOICE=5 ;;
hive_llm) DEFAULT_CHOICE=6 ;;
antigravity) DEFAULT_CHOICE=7 ;;
google_api) DEFAULT_CHOICE=7 ;;
google_gemini_cli) DEFAULT_CHOICE=8 ;;
esac
if [ -z "$DEFAULT_CHOICE" ]; then
case "$PREV_PROVIDER" in
anthropic) DEFAULT_CHOICE=8 ;;
openai) DEFAULT_CHOICE=9 ;;
gemini) DEFAULT_CHOICE=10 ;;
groq) DEFAULT_CHOICE=11 ;;
cerebras) DEFAULT_CHOICE=12 ;;
openrouter) DEFAULT_CHOICE=13 ;;
ollama) DEFAULT_CHOICE=14 ;;
anthropic) DEFAULT_CHOICE=9 ;;
openai) DEFAULT_CHOICE=10 ;;
gemini) DEFAULT_CHOICE=11 ;;
groq) DEFAULT_CHOICE=12 ;;
cerebras) DEFAULT_CHOICE=13 ;;
openrouter) DEFAULT_CHOICE=14 ;;
ollama) DEFAULT_CHOICE=15 ;;
minimax) DEFAULT_CHOICE=4 ;;
kimi) DEFAULT_CHOICE=5 ;;
hive) DEFAULT_CHOICE=6 ;;
@@ -1257,21 +1259,28 @@ else
echo -e " ${CYAN}6)${NC} Hive LLM ${DIM}(use your Hive API key)${NC} $HIVE_LLM_STATUS"
fi
# 7) Antigravity
if [ "$ANTIGRAVITY_CRED_DETECTED" = true ]; then
echo -e " ${CYAN}7)${NC} Antigravity Subscription ${DIM}(use your Google/Gemini plan)${NC} ${GREEN}(credential detected)${NC}"
# 7) Google Gemini (API Key)
if [ "$GOOGLE_API_CRED_DETECTED" = true ]; then
echo -e " ${CYAN}7)${NC} Google Gemini (API Key) ${DIM}(use your API key)${NC} ${GREEN}(credential detected)${NC}"
else
echo -e " ${CYAN}7)${NC} Antigravity Subscription ${DIM}(use your Google/Gemini plan)${NC}"
echo -e " ${CYAN}7)${NC} Google Gemini (API Key) ${DIM}(use your API key)${NC}"
fi
# 8) Google Gemini CLI (Google Account)
if [ "$GOOGLE_GEMINI_CLI_CRED_DETECTED" = true ]; then
echo -e " ${CYAN}8)${NC} Google Gemini CLI (Google Account) ${DIM}(use your Google/Gemini plan)${NC} ${GREEN}(credential detected)${NC}"
else
echo -e " ${CYAN}8)${NC} Google Gemini CLI (Google Account) ${DIM}(use your Google/Gemini plan)${NC}"
fi
echo ""
echo -e " ${CYAN}${BOLD}API key providers:${NC}"
# 8-13) API key providers — show (credential detected) if key already set
# 9-14) API key providers — show (credential detected) if key already set
PROVIDER_MENU_ENVS=(ANTHROPIC_API_KEY OPENAI_API_KEY GEMINI_API_KEY GROQ_API_KEY CEREBRAS_API_KEY OPENROUTER_API_KEY)
PROVIDER_MENU_NAMES=("Anthropic (Claude) - Recommended" "OpenAI (GPT)" "Google Gemini - Free tier available" "Groq - Fast, free tier" "Cerebras - Fast, free tier" "OpenRouter - Bring any OpenRouter model")
for idx in "${!PROVIDER_MENU_ENVS[@]}"; do
num=$((idx + 8))
num=$((idx + 9))
env_var="${PROVIDER_MENU_ENVS[$idx]}"
if [ -n "${!env_var}" ]; then
echo -e " ${CYAN}$num)${NC} ${PROVIDER_MENU_NAMES[$idx]} ${GREEN}(credential detected)${NC}"
@@ -1280,14 +1289,14 @@ for idx in "${!PROVIDER_MENU_ENVS[@]}"; do
fi
done
# 14) Local (Ollama) — no API key needed
# 15) Local (Ollama) — no API key needed
if [ "$OLLAMA_DETECTED" = true ]; then
echo -e " ${CYAN}14)${NC} Local (Ollama) - No API key needed ${GREEN}(ollama detected)${NC}"
echo -e " ${CYAN}15)${NC} Local (Ollama) - No API key needed ${GREEN}(ollama detected)${NC}"
else
echo -e " ${CYAN}14)${NC} Local (Ollama) - No API key needed"
echo -e " ${CYAN}15)${NC} Local (Ollama) - No API key needed"
fi
SKIP_CHOICE=$((8 + ${#PROVIDER_MENU_ENVS[@]} + 1))
SKIP_CHOICE=$((9 + ${#PROVIDER_MENU_ENVS[@]} + 1))
echo -e " ${CYAN}$SKIP_CHOICE)${NC} Skip for now"
echo ""
@@ -1420,24 +1429,39 @@ case $choice in
echo -e " ${DIM}Model: $SELECTED_MODEL | API: ${HIVE_LLM_ENDPOINT}${NC}"
;;
7)
# Antigravity Subscription
if [ "$ANTIGRAVITY_CRED_DETECTED" = false ]; then
# Google Gemini (API Key)
SUBSCRIPTION_MODE="google_api"
apply_preset "google_api"
SELECTED_ENV_VAR="GEMINI_API_KEY"
SELECTED_PROVIDER_ID="google"
PROVIDER_NAME="Google Gemini"
SIGNUP_URL="https://aistudio.google.com/apikey"
# Check for existing key
if [ -n "$GEMINI_API_KEY" ] || [ -n "$GOOGLE_API_KEY" ]; then
echo ""
echo -e "${CYAN} Setting up Antigravity authentication...${NC}"
echo -e "${GREEN}${NC} Using Google Gemini (API Key)"
echo -e " ${DIM}Found existing API key${NC}"
fi
;;
8)
# Google Gemini CLI (Google Account)
if [ "$GOOGLE_GEMINI_CLI_CRED_DETECTED" = false ]; then
echo ""
echo -e "${CYAN} Setting up Google Gemini CLI authentication...${NC}"
echo ""
echo -e " ${YELLOW}A browser window will open for Google OAuth.${NC}"
echo -e " Sign in with your Google account that has Antigravity access."
echo -e " Sign in with your Google account that has Google Gemini CLI access."
echo ""
# Run native OAuth flow
if uv run python "$SCRIPT_DIR/core/antigravity_auth.py" auth account add; then
if uv run python "$SCRIPT_DIR/core/google_auth.py" auth account add; then
# Re-detect credentials
if [ -f "$HOME/.hive/antigravity-accounts.json" ]; then
ANTIGRAVITY_CRED_DETECTED=true
if [ -f "$HOME/.hive/google-gemini-cli-accounts.json" ]; then
GOOGLE_GEMINI_CLI_CRED_DETECTED=true
fi
fi
if [ "$ANTIGRAVITY_CRED_DETECTED" = false ]; then
if [ "$GOOGLE_GEMINI_CLI_CRED_DETECTED" = false ]; then
echo ""
echo -e "${RED} Authentication failed or was cancelled.${NC}"
echo ""
@@ -1445,54 +1469,82 @@ case $choice in
fi
fi
if [ "$ANTIGRAVITY_CRED_DETECTED" = true ]; then
SUBSCRIPTION_MODE="antigravity"
apply_preset "antigravity"
if [ "$GOOGLE_GEMINI_CLI_CRED_DETECTED" = true ]; then
SUBSCRIPTION_MODE="google_gemini_cli"
apply_preset "google_gemini_cli"
echo ""
echo -e "${YELLOW} ⚠ Using Antigravity can technically cause your account suspension. Please use at your own risk.${NC}"
echo -e "${YELLOW} ⚠ Using Google Gemini CLI can technically cause your account suspension. Please use at your own risk.${NC}"
echo ""
echo -e "${GREEN}${NC} Using Antigravity subscription"
echo -e " ${DIM}Model: gemini-3-flash | Direct OAuth (no proxy required)${NC}"
echo -e "${GREEN}${NC} Using Google Gemini CLI"
echo ""
echo -e " Select a model:"
ag_choice_count="$(get_preset_model_choice_count "google_gemini_cli")"
ag_default_choice=1
ag_idx=0
while [ "$ag_idx" -lt "$ag_choice_count" ]; do
ag_num=$((ag_idx + 1))
ag_model_id="$(get_preset_model_choice_field "google_gemini_cli" "$ag_idx" "id")"
ag_label="$(get_preset_model_choice_field "google_gemini_cli" "$ag_idx" "label")"
ag_recommended="$(get_preset_model_choice_field "google_gemini_cli" "$ag_idx" "recommended")"
if [ "$ag_recommended" = "true" ]; then
echo -e " ${CYAN}${ag_num})${NC} ${ag_label} ${DIM}(default)${NC}"
ag_default_choice="$ag_num"
else
echo -e " ${CYAN}${ag_num})${NC} ${ag_label}"
fi
ag_idx=$((ag_idx + 1))
done
echo ""
while true; do
read -r -p " Enter model choice (1-$ag_choice_count) [$ag_default_choice]: " ag_model_choice || true
ag_model_choice="${ag_model_choice:-$ag_default_choice}"
if [[ "$ag_model_choice" =~ ^[0-9]+$ ]] && [ "$ag_model_choice" -ge 1 ] && [ "$ag_model_choice" -le "$ag_choice_count" ]; then
SELECTED_MODEL="$(get_preset_model_choice_field "google_gemini_cli" "$((ag_model_choice - 1))" "id")"
break
fi
echo -e "${RED}Invalid choice. Please enter 1-$ag_choice_count${NC}"
done
echo -e " ${DIM}Model: $SELECTED_MODEL | Direct OAuth (no proxy required)${NC}"
fi
;;
8)
9)
SELECTED_ENV_VAR="ANTHROPIC_API_KEY"
SELECTED_PROVIDER_ID="anthropic"
PROVIDER_NAME="Anthropic"
SIGNUP_URL="https://console.anthropic.com/settings/keys"
;;
9)
10)
SELECTED_ENV_VAR="OPENAI_API_KEY"
SELECTED_PROVIDER_ID="openai"
PROVIDER_NAME="OpenAI"
SIGNUP_URL="https://platform.openai.com/api-keys"
;;
10)
11)
SELECTED_ENV_VAR="GEMINI_API_KEY"
SELECTED_PROVIDER_ID="gemini"
PROVIDER_NAME="Google Gemini"
SIGNUP_URL="https://aistudio.google.com/apikey"
;;
11)
12)
SELECTED_ENV_VAR="GROQ_API_KEY"
SELECTED_PROVIDER_ID="groq"
PROVIDER_NAME="Groq"
SIGNUP_URL="https://console.groq.com/keys"
;;
12)
13)
SELECTED_ENV_VAR="CEREBRAS_API_KEY"
SELECTED_PROVIDER_ID="cerebras"
PROVIDER_NAME="Cerebras"
SIGNUP_URL="https://cloud.cerebras.ai/"
;;
13)
14)
SELECTED_ENV_VAR="OPENROUTER_API_KEY"
SELECTED_PROVIDER_ID="openrouter"
SELECTED_API_BASE="https://openrouter.ai/api/v1"
PROVIDER_NAME="OpenRouter"
SIGNUP_URL="https://openrouter.ai/keys"
;;
14)
15)
# Local (Ollama) — no API key; pick model from ollama list
if [ "$OLLAMA_DETECTED" != true ]; then
echo ""
@@ -1555,7 +1607,7 @@ case $choice in
esac
# For API-key providers: prompt for key (allow replacement if already set)
if { [ -z "$SUBSCRIPTION_MODE" ] || [ "$SUBSCRIPTION_MODE" = "minimax_code" ] || [ "$SUBSCRIPTION_MODE" = "kimi_code" ] || [ "$SUBSCRIPTION_MODE" = "hive_llm" ]; } && [ -n "$SELECTED_ENV_VAR" ]; then
if { [ -z "$SUBSCRIPTION_MODE" ] || [ "$SUBSCRIPTION_MODE" = "minimax_code" ] || [ "$SUBSCRIPTION_MODE" = "kimi_code" ] || [ "$SUBSCRIPTION_MODE" = "hive_llm" ] || [ "$SUBSCRIPTION_MODE" = "google_api" ]; } && [ -n "$SELECTED_ENV_VAR" ]; then
while true; do
CURRENT_KEY="${!SELECTED_ENV_VAR}"
if [ -n "$CURRENT_KEY" ]; then
@@ -1700,8 +1752,10 @@ if [ -n "$SELECTED_PROVIDER_ID" ]; then
save_configuration "$SELECTED_PROVIDER_ID" "" "$SELECTED_MODEL" "$SELECTED_MAX_TOKENS" "$SELECTED_MAX_CONTEXT_TOKENS" "true" "" > /dev/null || SAVE_OK=false
elif [ "$SUBSCRIPTION_MODE" = "codex" ]; then
save_configuration "$SELECTED_PROVIDER_ID" "" "$SELECTED_MODEL" "$SELECTED_MAX_TOKENS" "$SELECTED_MAX_CONTEXT_TOKENS" "" "$SELECTED_API_BASE" "true" > /dev/null || SAVE_OK=false
elif [ "$SUBSCRIPTION_MODE" = "antigravity" ]; then
elif [ "$SUBSCRIPTION_MODE" = "google_gemini_cli" ]; then
save_configuration "$SELECTED_PROVIDER_ID" "" "$SELECTED_MODEL" "$SELECTED_MAX_TOKENS" "$SELECTED_MAX_CONTEXT_TOKENS" "" "" "" "true" > /dev/null || SAVE_OK=false
elif [ "$SUBSCRIPTION_MODE" = "google_api" ]; then
save_configuration "$SELECTED_PROVIDER_ID" "$SELECTED_ENV_VAR" "$SELECTED_MODEL" "$SELECTED_MAX_TOKENS" "$SELECTED_MAX_CONTEXT_TOKENS" > /dev/null || SAVE_OK=false
elif [ "$SUBSCRIPTION_MODE" = "zai_code" ]; then
save_configuration "$SELECTED_PROVIDER_ID" "$SELECTED_ENV_VAR" "$SELECTED_MODEL" "$SELECTED_MAX_TOKENS" "$SELECTED_MAX_CONTEXT_TOKENS" "" "$SELECTED_API_BASE" > /dev/null || SAVE_OK=false
elif [ "$SUBSCRIPTION_MODE" = "minimax_code" ]; then
@@ -2090,6 +2144,11 @@ if [ -n "$SELECTED_PROVIDER_ID" ]; then
elif [ "$SUBSCRIPTION_MODE" = "minimax_code" ]; then
echo -e " ${GREEN}${NC} MiniMax Coding Key → ${DIM}$SELECTED_MODEL${NC}"
echo -e " ${DIM}API: api.minimax.io/v1 (OpenAI-compatible)${NC}"
elif [ "$SUBSCRIPTION_MODE" = "google_api" ]; then
echo -e " ${GREEN}${NC} Google Gemini (API Key) → ${DIM}$SELECTED_MODEL${NC}"
elif [ "$SUBSCRIPTION_MODE" = "google_gemini_cli" ]; then
echo -e " ${GREEN}${NC} Google Gemini CLI → ${DIM}$SELECTED_MODEL${NC}"
echo -e " ${DIM}Direct OAuth (no proxy required)${NC}"
elif [ "$SELECTED_PROVIDER_ID" = "openrouter" ]; then
echo -e " ${GREEN}${NC} OpenRouter API Key → ${DIM}$SELECTED_MODEL${NC}"
echo -e " ${DIM}API: openrouter.ai/api/v1 (OpenAI-compatible)${NC}"
+313
View File
@@ -0,0 +1,313 @@
#!/usr/bin/env node
/**
* Account sanity test for the Gemini CLI Code Assist OAuth path.
*
* This replicates, byte-for-byte, the HTTP flow that
* google-gemini/gemini-cli uses when talking to cloudcode-pa.googleapis.com
* — loadCodeAssist → (onboardUser + LRO poll) → generateContent. It exists
* so you can verify "does my Google account actually work with the Gemini
* CLI subscription?" independently of the Python port in hive.
*
* It reads OAuth tokens from ~/.hive/google-gemini-cli-accounts.json (the
* file quickstart.sh populates) and refreshes them against oauth2.googleapis.com,
* the same way core/google_auth.py does. It then issues one non-streaming
* :generateContent against gemini-3-flash-preview by default.
*
* Request shapes are taken from the local gemini-cli checkout at
* /Users/aden/aden/gemini-cli:
* packages/core/src/code_assist/setup.ts (loadCodeAssist / onboardUser)
* packages/core/src/code_assist/server.ts (endpoint + URL shape)
* packages/core/src/code_assist/converter.ts (outer + inner body shape)
* packages/core/src/code_assist/types.ts (UserTierId)
*
* Usage:
* node scripts/test_gemini_cli_account.mjs
* node scripts/test_gemini_cli_account.mjs --model gemini-3.1-pro-preview
* node scripts/test_gemini_cli_account.mjs --prompt "Say hi in one word."
*/
import { readFileSync, writeFileSync, existsSync } from 'node:fs';
import { homedir } from 'node:os';
import { join } from 'node:path';
import { randomUUID } from 'node:crypto';
// ---------------------------------------------------------------------------
// Constants — mirror gemini-cli code_assist/{server,setup,types}.ts
// ---------------------------------------------------------------------------
const CODE_ASSIST_ENDPOINT = 'https://cloudcode-pa.googleapis.com';
const CODE_ASSIST_API_VERSION = 'v1internal';
const TOKEN_URL = 'https://oauth2.googleapis.com/token';
const CLIENT_METADATA = {
ideType: 'IDE_UNSPECIFIED',
platform: 'PLATFORM_UNSPECIFIED',
pluginType: 'GEMINI',
};
const UserTierId = {
FREE: 'free-tier',
LEGACY: 'legacy-tier',
STANDARD: 'standard-tier',
};
const ACCOUNTS_FILE = join(homedir(), '.hive', 'google-gemini-cli-accounts.json');
// OAuth credentials source — identical URL to core/google_auth.py so both
// tools agree on the client_id / client_secret they fetch.
const OAUTH_CREDENTIALS_URL =
'https://raw.githubusercontent.com/google-gemini/gemini-cli/main/packages/core/src/code_assist/oauth2.ts';
// ---------------------------------------------------------------------------
// Tiny arg parser
// ---------------------------------------------------------------------------
function parseArgs(argv) {
const args = { model: 'gemini-3-flash-preview', prompt: 'Say hi in one word.' };
for (let i = 2; i < argv.length; i++) {
const a = argv[i];
if (a === '--model') args.model = argv[++i];
else if (a === '--prompt') args.prompt = argv[++i];
else if (a === '-h' || a === '--help') {
console.log(
'Usage: node scripts/test_gemini_cli_account.mjs [--model ID] [--prompt TEXT]',
);
process.exit(0);
}
}
return args;
}
// ---------------------------------------------------------------------------
// Credentials: load from hive accounts file + refresh
// ---------------------------------------------------------------------------
let _oauthCreds = null;
async function fetchOAuthCreds() {
if (_oauthCreds) return _oauthCreds;
const envId = process.env.GOOGLE_GEMINI_CLI_CLIENT_ID;
const envSecret = process.env.GOOGLE_GEMINI_CLI_CLIENT_SECRET;
if (envId && envSecret) {
_oauthCreds = { clientId: envId, clientSecret: envSecret };
return _oauthCreds;
}
const res = await fetch(OAUTH_CREDENTIALS_URL, {
headers: { 'User-Agent': 'Hive-Gemini-Test/1.0' },
});
const text = await res.text();
const idMatch = text.match(
/OAUTH_CLIENT_ID\s*=\s*\n?\s*'([a-z0-9\-\.]+\.apps\.googleusercontent\.com)'/,
);
const secretMatch = text.match(/OAUTH_CLIENT_SECRET\s*=\s*'([^']+)'/);
if (!idMatch || !secretMatch) {
throw new Error('Failed to parse OAUTH_CLIENT_ID/SECRET from gemini-cli source');
}
_oauthCreds = { clientId: idMatch[1], clientSecret: secretMatch[1] };
return _oauthCreds;
}
function loadAccounts() {
if (!existsSync(ACCOUNTS_FILE)) {
throw new Error(
`No credentials at ${ACCOUNTS_FILE}. Run: uv run python core/google_auth.py auth account add`,
);
}
const data = JSON.parse(readFileSync(ACCOUNTS_FILE, 'utf-8'));
const accounts = data.accounts || [];
if (!accounts.length) throw new Error(`No accounts in ${ACCOUNTS_FILE}`);
const account = accounts.find((a) => a.enabled !== false) || accounts[0];
return { data, account };
}
function saveAccounts(data) {
writeFileSync(ACCOUNTS_FILE, JSON.stringify(data, null, 2));
}
async function refreshIfNeeded(data, account) {
const expiresAt = (account.expires || 0) / 1000;
const now = Date.now() / 1000;
if (account.access && expiresAt && now < expiresAt - 60) {
return account.access;
}
const refreshRaw = account.refresh || '';
const refreshToken = refreshRaw.split('|')[0];
if (!refreshToken) {
throw new Error('Account has no refresh_token; re-run the quickstart OAuth flow.');
}
const { clientId, clientSecret } = await fetchOAuthCreds();
const body = new URLSearchParams({
grant_type: 'refresh_token',
refresh_token: refreshToken,
client_id: clientId,
});
if (clientSecret) body.set('client_secret', clientSecret);
const res = await fetch(TOKEN_URL, {
method: 'POST',
headers: { 'Content-Type': 'application/x-www-form-urlencoded' },
body,
});
if (!res.ok) {
throw new Error(`Token refresh failed: HTTP ${res.status} ${await res.text()}`);
}
const tok = await res.json();
account.access = tok.access_token;
account.expires = Math.floor((Date.now() / 1000 + (tok.expires_in || 3600)) * 1000);
saveAccounts(data);
return account.access;
}
// ---------------------------------------------------------------------------
// Code Assist API wrappers — request shapes from setup.ts / server.ts
// ---------------------------------------------------------------------------
async function caPost(method, token, body) {
const url = `${CODE_ASSIST_ENDPOINT}/${CODE_ASSIST_API_VERSION}:${method}`;
const res = await fetch(url, {
method: 'POST',
headers: {
Authorization: `Bearer ${token}`,
'Content-Type': 'application/json',
},
body: JSON.stringify(body),
});
const text = await res.text();
if (!res.ok) {
throw new Error(`Code Assist ${method} HTTP ${res.status}: ${text}`);
}
return text ? JSON.parse(text) : {};
}
async function caGetOperation(name, token) {
const url = `${CODE_ASSIST_ENDPOINT}/${CODE_ASSIST_API_VERSION}/${name}`;
const res = await fetch(url, {
headers: { Authorization: `Bearer ${token}` },
});
if (!res.ok) {
throw new Error(`Code Assist getOperation HTTP ${res.status}: ${await res.text()}`);
}
return await res.json();
}
function pickOnboardTier(loadRes) {
// setup.ts::getOnboardTier — pick the isDefault tier, fall back to LEGACY.
for (const tier of loadRes.allowedTiers || []) {
if (tier.isDefault) return tier.id || UserTierId.LEGACY;
}
return UserTierId.LEGACY;
}
async function setupUser(token) {
const seedProject =
process.env.GOOGLE_CLOUD_PROJECT || process.env.GOOGLE_CLOUD_PROJECT_ID || undefined;
// setup.ts: loadCodeAssist body
const loadBody = { metadata: { ...CLIENT_METADATA } };
if (seedProject) {
loadBody.cloudaicompanionProject = seedProject;
loadBody.metadata.duetProject = seedProject;
}
console.log(' → :loadCodeAssist');
const loadRes = await caPost('loadCodeAssist', token, loadBody);
console.log(
' ← currentTier=',
loadRes.currentTier?.id,
' project=',
loadRes.cloudaicompanionProject?.id,
);
if (loadRes.currentTier && loadRes.cloudaicompanionProject?.id) {
return loadRes.cloudaicompanionProject.id;
}
if (loadRes.currentTier && seedProject) {
return seedProject;
}
const tierId = pickOnboardTier(loadRes);
console.log(' → :onboardUser tier=', tierId);
const onboardBody = { tierId, metadata: { ...CLIENT_METADATA } };
if (tierId !== UserTierId.FREE && seedProject) {
onboardBody.cloudaicompanionProject = seedProject;
onboardBody.metadata.duetProject = seedProject;
}
let lro = await caPost('onboardUser', token, onboardBody);
while (!lro.done) {
if (!lro.name) break;
console.log(' · polling LRO', lro.name);
await new Promise((f) => setTimeout(f, 5000));
lro = await caGetOperation(lro.name, token);
}
const projectId = lro.response?.cloudaicompanionProject?.id || seedProject;
console.log(' ← onboarded, project=', projectId);
return projectId;
}
// ---------------------------------------------------------------------------
// generateContent (non-streaming) — shape from converter.ts::toGenerateContentRequest
// ---------------------------------------------------------------------------
async function generateContent(token, model, projectId, prompt) {
const body = {
model, // bare model id, NOT "models/<id>" — matches req.model in converter.ts
project: projectId,
user_prompt_id: randomUUID(),
request: {
contents: [{ role: 'user', parts: [{ text: prompt }] }],
generationConfig: { maxOutputTokens: 32, temperature: 1.0 },
session_id: randomUUID(),
},
};
console.log(' → :generateContent model=', model);
return await caPost('generateContent', token, body);
}
// ---------------------------------------------------------------------------
// Main
// ---------------------------------------------------------------------------
async function main() {
const args = parseArgs(process.argv);
console.log('=== Gemini CLI account sanity test ===');
console.log(' accounts file:', ACCOUNTS_FILE);
const { data, account } = loadAccounts();
console.log(' account:', account.email || '(unknown)');
const token = await refreshIfNeeded(data, account);
console.log(' access token OK (len', token.length + ')');
const projectId = account.project || (await setupUser(token));
if (projectId && account.project !== projectId) {
account.project = projectId;
saveAccounts(data);
console.log(' cached project ->', projectId);
}
if (!projectId) {
console.error(' FAIL: no project id resolved');
process.exit(1);
}
try {
const res = await generateContent(token, args.model, projectId, args.prompt);
const payload = res.response || res;
const text = (payload.candidates?.[0]?.content?.parts || [])
.filter((p) => p.text && !p.thought)
.map((p) => p.text)
.join('');
console.log(' ← response text:', JSON.stringify(text));
console.log(' ← finishReason:', payload.candidates?.[0]?.finishReason);
console.log(' ← usage:', JSON.stringify(payload.usageMetadata));
console.log('\nRESULT: PASS');
} catch (err) {
console.error('\nRESULT: FAIL');
console.error(err.message);
process.exit(1);
}
}
main().catch((err) => {
console.error('Unexpected error:', err);
process.exit(1);
});
+344
View File
@@ -0,0 +1,344 @@
#!/usr/bin/env python3
"""Test script for Gemini CLI OAuth + a custom-tool agent loop.
Tests two things:
1. OAuth flow (gemini-cli subscription path):
verify that the credentials produced by ``quickstart``
(stored in ``~/.hive/google-gemini-cli-accounts.json``) are present,
can be refreshed into a valid access token, and can actually serve a
request via ``GoogleGeminiCliProvider`` ``cloudcode-pa.googleapis.com``.
2. Agent loop with a custom tool:
run a minimal streaming agent loop that exposes ``get_weather`` and
checks that the model both calls the tool and, after the tool result
is fed back, produces a final text answer.
Model note:
``gemini-3.1-pro-preview-customtools`` is **not** served on the
Code Assist OAuth backend (``cloudcode-pa.googleapis.com``) it
only exists on the Gemini API-key path. See gemini-cli issue #22062.
This script therefore defaults to ``gemini-3.1-pro-preview``, which
is the strongest model the gemini-cli OAuth subscription actually
serves.
Usage:
uv run python scripts/test_gemini_cli_customtools.py
uv run python scripts/test_gemini_cli_customtools.py --reauth
uv run python scripts/test_gemini_cli_customtools.py --model gemini-3-flash-preview
"""
from __future__ import annotations
import argparse
import asyncio
import json
import os
import re
import subprocess
import sys
from pathlib import Path
from typing import Any
# Make ``core`` importable like scripts/test_google_providers.py does.
_REPO = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(_REPO / "core"))
from framework.llm.google import ( # noqa: E402
GoogleGeminiCliProvider,
_ACCOUNTS_FILE,
_do_token_refresh,
_load_accounts_from_json,
)
from framework.llm.provider import Tool # noqa: E402
from framework.llm.stream_events import ( # noqa: E402
FinishEvent,
StreamErrorEvent,
TextDeltaEvent,
ToolCallEvent,
)
# Strongest model the gemini-cli OAuth subscription actually serves.
DEFAULT_MODEL = "gemini-3.1-pro-preview"
# ---------------------------------------------------------------------------
# Step 1: OAuth flow check
# ---------------------------------------------------------------------------
def run_quickstart_oauth() -> bool:
"""Invoke the same OAuth entrypoint that quickstart.sh uses."""
auth_script = _REPO / "core" / "google_auth.py"
print(f" Running: uv run python {auth_script} auth account add")
try:
result = subprocess.run(
["uv", "run", "python", str(auth_script), "auth", "account", "add"],
cwd=str(_REPO),
check=False,
)
except FileNotFoundError:
print(" FAIL: `uv` not found on PATH")
return False
if result.returncode != 0:
print(f" FAIL: google_auth.py exited with code {result.returncode}")
return False
return _ACCOUNTS_FILE.exists()
def test_oauth_flow(*, reauth: bool) -> bool:
print("=" * 60)
print("STEP 1: OAuth flow (quickstart path)")
print("=" * 60)
print(f" Credentials file: {_ACCOUNTS_FILE}")
if reauth or not _ACCOUNTS_FILE.exists():
if not _ACCOUNTS_FILE.exists():
print(" No credentials on disk — running OAuth login.")
else:
print(" --reauth requested — running OAuth login.")
if not run_quickstart_oauth():
return False
if not _ACCOUNTS_FILE.exists():
print(" FAIL: credentials file still missing after OAuth")
return False
access, refresh, expires_at = _load_accounts_from_json()
print(f" Loaded credentials: access={'yes' if access else 'no'}, "
f"refresh={'yes' if refresh else 'no'}, expires_at={expires_at}")
if not refresh and not access:
print(" FAIL: credentials file has no tokens")
return False
# Force a refresh round-trip so we know the refresh token + OAuth
# client credentials from quickstart are actually working end-to-end.
if refresh:
print(" Refreshing access token...")
refreshed = _do_token_refresh(refresh)
if not refreshed:
print(" FAIL: refresh_token exchange failed")
return False
new_token, new_expiry = refreshed
print(f" OK: refreshed token (len={len(new_token)}, expires_at={new_expiry:.0f})")
else:
print(" WARN: no refresh token; only validating existing access token")
# Sanity check: the provider will also re-run _ensure_token() later.
provider = GoogleGeminiCliProvider(model=DEFAULT_MODEL)
if not provider.has_credentials():
print(" FAIL: provider reports no credentials after load")
return False
print(" OK: provider has credentials")
return True
# ---------------------------------------------------------------------------
# Step 2: Agent loop with a custom tool
# ---------------------------------------------------------------------------
WEATHER_TOOL = Tool(
name="get_weather",
description="Get the current weather for a given city.",
parameters={
"type": "object",
"properties": {
"city": {"type": "string", "description": "City name, e.g. 'Tokyo'"},
},
"required": ["city"],
},
)
def fake_get_weather(city: str) -> str:
# Deterministic fake result; the test cares about the loop, not the value.
return json.dumps({"city": city, "temp_c": 21, "conditions": "sunny"})
def _parse_rate_limit_reset(err_msg: str) -> int | None:
"""Extract the 'reset after Ns' hint from a Cloud Code Assist 429 body."""
if "RATE_LIMIT_EXCEEDED" not in err_msg and "429" not in err_msg:
return None
m = re.search(r"reset after (\d+)\s*s", err_msg)
if m:
return int(m.group(1))
# No hint — fall back to a short default so we still retry once.
return 5
async def run_agent_turn(
provider: Any,
messages: list[dict[str, Any]],
tools: list[Tool],
) -> tuple[str, list[dict[str, Any]], str]:
"""Run one streaming turn. Returns (text, tool_calls, finish_reason)."""
text_parts: list[str] = []
tool_calls: list[dict[str, Any]] = []
finish_reason = ""
async for event in provider.stream(
messages=messages,
system="You are a helpful assistant. Use tools when appropriate.",
tools=tools,
max_tokens=1024,
):
if isinstance(event, TextDeltaEvent):
text_parts.append(event.content)
sys.stdout.write(event.content)
sys.stdout.flush()
elif isinstance(event, ToolCallEvent):
print(f"\n [tool_call] {event.tool_name}({event.tool_input}) id={event.tool_use_id}")
tool_calls.append(
{
"id": event.tool_use_id,
"type": "function",
"function": {
"name": event.tool_name,
"arguments": json.dumps(event.tool_input or {}),
},
}
)
elif isinstance(event, FinishEvent):
finish_reason = event.stop_reason
print(f"\n [finish] reason={finish_reason} "
f"in={event.input_tokens} out={event.output_tokens}")
elif isinstance(event, StreamErrorEvent):
raise RuntimeError(f"stream error: {event.error}")
return "".join(text_parts), tool_calls, finish_reason
async def test_agent_loop(model: str, max_turns: int = 4) -> bool:
print()
print("=" * 60)
print(f"STEP 2: Agent loop with custom tool (model={model})")
print("=" * 60)
try:
provider = GoogleGeminiCliProvider(model=model)
except Exception as exc:
print(f" FAIL: could not construct provider: {exc}")
return False
messages: list[dict[str, Any]] = [
{
"role": "user",
"content": (
"What's the weather in Tokyo right now? "
"Use the get_weather tool, then tell me in one short sentence."
),
}
]
saw_tool_call = False
final_text = ""
for turn in range(1, max_turns + 1):
print(f"\n--- turn {turn} ---")
# Back-to-back calls on the Gemini CLI free tier trip per-model
# rate limits with very short reset windows. Retry once or twice
# after the reset instead of failing the whole run on a 2s pause.
retries_left = 3
while True:
try:
text, tool_calls, finish = await run_agent_turn(
provider, messages, [WEATHER_TOOL]
)
break
except Exception as exc:
msg = str(exc)
reset = _parse_rate_limit_reset(msg)
if reset is not None and retries_left > 0:
wait = max(reset, 1) + 2
print(f"\n rate-limited; sleeping {wait}s then retrying...")
await asyncio.sleep(wait)
retries_left -= 1
continue
print(f"\n FAIL: turn {turn} raised: {exc}")
return False
if tool_calls:
saw_tool_call = True
messages.append(
{"role": "assistant", "content": text or None, "tool_calls": tool_calls}
)
for tc in tool_calls:
fn_name = tc["function"]["name"]
try:
args = json.loads(tc["function"]["arguments"] or "{}")
except json.JSONDecodeError:
args = {}
if fn_name == "get_weather":
result = fake_get_weather(args.get("city", ""))
else:
result = json.dumps({"error": f"unknown tool {fn_name}"})
print(f" [tool_result] {fn_name} -> {result}")
messages.append(
{
"role": "tool",
"tool_call_id": tc["id"],
"name": fn_name,
"content": result,
}
)
continue
final_text = text.strip()
break
else:
print("\n FAIL: agent loop did not terminate within max_turns")
return False
if not saw_tool_call:
print("\n FAIL: model never called get_weather")
return False
if not final_text:
print("\n FAIL: model returned no final text after tool result")
return False
print(f"\n OK: final answer -> {final_text!r}")
return True
# ---------------------------------------------------------------------------
# Entry point
# ---------------------------------------------------------------------------
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--model", default=DEFAULT_MODEL, help="Model ID to test")
parser.add_argument(
"--reauth",
action="store_true",
help="Force running the quickstart OAuth flow before testing",
)
parser.add_argument(
"--skip-oauth",
action="store_true",
help="Skip step 1 and assume credentials already work",
)
args = parser.parse_args()
if not args.skip_oauth:
if not test_oauth_flow(reauth=args.reauth):
print("\nRESULT: OAUTH FAILED")
return 1
try:
ok = asyncio.run(test_agent_loop(args.model))
except KeyboardInterrupt:
print("\nInterrupted.")
return 130
print()
print("=" * 60)
print("RESULT:", "PASS" if ok else "FAIL")
print("=" * 60)
return 0 if ok else 1
if __name__ == "__main__":
sys.exit(main())
+70
View File
@@ -0,0 +1,70 @@
#!/usr/bin/env python3
"""Quick smoke test for Google Gemini providers."""
import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "core"))
from framework.llm.google import GoogleApiKeyProvider, GoogleGeminiCliProvider
MODELS = ["gemini-3-flash-preview", "gemini-3.1-pro-preview", "gemini-2.5-flash"]
def test_api_key(model: str) -> bool:
print(f"\n--- GoogleApiKeyProvider: {model} ---")
try:
api_key = os.environ.get("GEMINI_API_KEY") or os.environ.get("GOOGLE_API_KEY")
if not api_key:
print(" SKIP: no API key found (set GEMINI_API_KEY or GOOGLE_API_KEY)")
return False
provider = GoogleApiKeyProvider(model=model, api_key=api_key)
resp = provider.complete(
messages=[{"role": "user", "content": "Say hello in one word."}],
max_tokens=32,
)
text = resp.content.strip()
print(f" OK: {text!r}")
return True
except Exception as e:
print(f" FAIL: {e}")
return False
def test_oauth(model: str) -> bool:
print(f"\n--- GoogleGeminiCliProvider: {model} ---")
try:
provider = GoogleGeminiCliProvider(model=model)
if not provider.has_credentials():
print(" SKIP: no OAuth credentials found")
return False
resp = provider.complete(
messages=[{"role": "user", "content": "Say hello in one word."}],
max_tokens=32,
)
text = resp.content.strip()
print(f" OK: {text!r}")
return True
except Exception as e:
print(f" FAIL: {e}")
return False
def main() -> int:
results = {}
for m in MODELS:
results[f"apikey:{m}"] = test_api_key(m)
results[f"oauth:{m}"] = test_oauth(m)
print("\n=== Summary ===")
for m, ok in results.items():
status = "PASS" if ok else "FAIL"
print(f" {m}: {status}")
return 0 if any(results.values()) else 1
if __name__ == "__main__":
sys.exit(main())