chore: ruff lint

This commit is contained in:
Richard Tang
2026-03-20 16:50:50 -07:00
parent 0663ee5950
commit e581767cab
4 changed files with 44 additions and 27 deletions
+29 -15
View File
@@ -18,7 +18,6 @@ import logging
import os
import secrets
import socket
import subprocess
import sys
import time
import urllib.parse
@@ -53,7 +52,9 @@ _DEFAULT_REDIRECT_PORT = 51121
# This project reverse-engineered and published the public OAuth credentials
# for Google's Antigravity/Cloud Code Assist API.
# Source: https://github.com/NoeFabris/opencode-antigravity-auth
_CREDENTIALS_URL = "https://raw.githubusercontent.com/NoeFabris/opencode-antigravity-auth/dev/src/constants.ts"
_CREDENTIALS_URL = (
"https://raw.githubusercontent.com/NoeFabris/opencode-antigravity-auth/dev/src/constants.ts"
)
# Cached credentials fetched from public source
_cached_client_id: str | None = None
@@ -67,10 +68,13 @@ def _fetch_credentials_from_public_source() -> tuple[str | None, str | None]:
return _cached_client_id, _cached_client_secret
try:
req = urllib.request.Request(_CREDENTIALS_URL, headers={"User-Agent": "Hive-Antigravity-Auth/1.0"})
req = urllib.request.Request(
_CREDENTIALS_URL, headers={"User-Agent": "Hive-Antigravity-Auth/1.0"}
)
with urllib.request.urlopen(req, timeout=10) as resp:
content = resp.read().decode("utf-8")
import re
id_match = re.search(r'ANTIGRAVITY_CLIENT_ID\s*=\s*"([^"]+)"', content)
secret_match = re.search(r'ANTIGRAVITY_CLIENT_SECRET\s*=\s*"([^"]+)"', content)
if id_match:
@@ -165,7 +169,8 @@ class OAuthCallbackHandler(BaseHTTPRequestHandler):
OAuthCallbackHandler.auth_code = query["code"][0]
OAuthCallbackHandler.state = query["state"][0]
self._send_response(
"Authentication successful! You can close this window and return to the terminal."
"Authentication successful! You can close this window "
"and return to the terminal."
)
return
@@ -178,7 +183,9 @@ class OAuthCallbackHandler(BaseHTTPRequestHandler):
html = f"""<!DOCTYPE html>
<html>
<head><title>Antigravity Auth</title></head>
<body style="font-family: system-ui; display: flex; align-items: center; justify-content: center; height: 100vh; margin: 0; background: #1a1a2e; color: #eee;">
<body style="font-family: system-ui; display: flex; align-items: center;
justify-content: center; height: 100vh; margin: 0; background: #1a1a2e;
color: #eee;">
<div style="text-align: center;">
<h2>{message}</h2>
</div>
@@ -288,7 +295,10 @@ def validate_credentials(access_token: str, project_id: str = _DEFAULT_PROJECT_I
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Antigravity/1.18.3",
"User-Agent": (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) Antigravity/1.18.3"
),
"X-Goog-Api-Client": "google-cloud-sdk vscode_cloudshelleditor/0.1",
}
@@ -306,7 +316,9 @@ def validate_credentials(access_token: str, project_id: str = _DEFAULT_PROJECT_I
return False
def refresh_access_token(refresh_token: str, client_id: str, client_secret: str | None) -> dict | None:
def refresh_access_token(
refresh_token: str, client_id: str, client_secret: str | None
) -> dict | None:
"""Refresh the access token using the refresh token."""
data = {
"grant_type": "refresh_token",
@@ -349,7 +361,9 @@ def cmd_account_add(args: argparse.Namespace) -> int:
access_token = account.get("access")
refresh_token_str = account.get("refresh", "")
refresh_token = refresh_token_str.split("|")[0] if refresh_token_str else None
project_id = refresh_token_str.split("|")[1] if "|" in refresh_token_str else _DEFAULT_PROJECT_ID
project_id = (
refresh_token_str.split("|")[1] if "|" in refresh_token_str else _DEFAULT_PROJECT_ID
)
email = account.get("email", "unknown")
expires_ms = account.get("expires", 0)
expires_at = expires_ms / 1000.0 if expires_ms else 0.0
@@ -360,7 +374,7 @@ def cmd_account_add(args: argparse.Namespace) -> int:
logger.info(f"Found existing credentials for: {email}")
logger.info("Validating existing credentials...")
if validate_credentials(access_token, project_id):
logger.info(f"✓ Credentials valid! Skipping OAuth.")
logger.info("✓ Credentials valid! Skipping OAuth.")
return 0
else:
logger.info("Credentials failed validation, refreshing...")
@@ -376,13 +390,15 @@ def cmd_account_add(args: argparse.Namespace) -> int:
# Update the account
account["access"] = new_access
account["expires"] = int((time.time() + expires_in) * 1000)
accounts_data["last_refresh"] = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
accounts_data["last_refresh"] = time.strftime(
"%Y-%m-%dT%H:%M:%SZ", time.gmtime()
)
save_accounts(accounts_data)
# Validate the refreshed token
logger.info("Validating refreshed credentials...")
if validate_credentials(new_access, project_id):
logger.info(f"✓ Credentials refreshed and validated!")
logger.info("✓ Credentials refreshed and validated!")
return 0
else:
logger.info("Refreshed token failed validation, proceeding with OAuth...")
@@ -475,9 +491,7 @@ def cmd_account_add(args: argparse.Namespace) -> int:
}
# Update existing account or add new one
existing_idx = next(
(i for i, a in enumerate(accounts) if a.get("email") == email), None
)
existing_idx = next((i for i, a in enumerate(accounts) if a.get("email") == email), None)
if existing_idx is not None:
accounts[existing_idx] = new_account
logger.info(f"Updated existing account: {email}")
@@ -501,7 +515,7 @@ def cmd_account_list(args: argparse.Namespace) -> int:
if not accounts:
logger.info("No accounts configured.")
logger.info(f"Run 'antigravity auth account add' to add one.")
logger.info("Run 'antigravity auth account add' to add one.")
return 0
logger.info("Configured accounts:\n")
+6 -2
View File
@@ -286,7 +286,9 @@ def get_api_key() -> str | None:
# This project reverse-engineered and published the public OAuth credentials
# for Google's Antigravity/Cloud Code Assist API.
# Source: https://github.com/NoeFabris/opencode-antigravity-auth
_ANTIGRAVITY_CREDENTIALS_URL = "https://raw.githubusercontent.com/NoeFabris/opencode-antigravity-auth/dev/src/constants.ts"
_ANTIGRAVITY_CREDENTIALS_URL = (
"https://raw.githubusercontent.com/NoeFabris/opencode-antigravity-auth/dev/src/constants.ts"
)
_antigravity_credentials_cache: tuple[str | None, str | None] = (None, None)
@@ -300,7 +302,9 @@ def _fetch_antigravity_credentials() -> tuple[str | None, str | None]:
import urllib.request
try:
req = urllib.request.Request(_ANTIGRAVITY_CREDENTIALS_URL, headers={"User-Agent": "Hive/1.0"})
req = urllib.request.Request(
_ANTIGRAVITY_CREDENTIALS_URL, headers={"User-Agent": "Hive/1.0"}
)
with urllib.request.urlopen(req, timeout=10) as resp:
content = resp.read().decode("utf-8")
id_match = re.search(r'ANTIGRAVITY_CLIENT_ID\s*=\s*"([^"]+)"', content)
+7 -6
View File
@@ -33,8 +33,8 @@ OPENROUTER_SEPARATOR_TRANSLATION = str.maketrans(
"\u2212": "-",
"\u2044": "/",
"\u2215": "/",
"\u29F8": "/",
"\uFF0F": "/",
"\u29f8": "/",
"\uff0f": "/",
}
)
@@ -66,9 +66,7 @@ def _sanitize_openrouter_model_id(value: str) -> str:
"""Sanitize pasted OpenRouter model IDs into a comparable slug."""
normalized = unicodedata.normalize("NFKC", value or "")
normalized = "".join(
ch
for ch in normalized
if unicodedata.category(ch) not in {"Cc", "Cf"}
ch for ch in normalized if unicodedata.category(ch) not in {"Cc", "Cf"}
)
normalized = normalized.translate(OPENROUTER_SEPARATOR_TRANSLATION)
normalized = re.sub(r"\s+", "", normalized)
@@ -183,7 +181,10 @@ def check_openrouter(
return {"valid": False, "message": "Invalid OpenRouter API key"}
if r.status_code == 403:
return {"valid": False, "message": "OpenRouter API key lacks permissions"}
return {"valid": False, "message": f"OpenRouter API returned status {r.status_code}"}
return {
"valid": False,
"message": f"OpenRouter API returned status {r.status_code}",
}
def check_openrouter_model(
+2 -4
View File
@@ -876,7 +876,7 @@ def _run_server(
if self.path == "/":
self._respond(200, "text/html; charset=utf-8", html_bytes)
elif self.path.startswith("/api/session/"):
sid = urllib.parse.unquote(self.path[len("/api/session/"):])
sid = urllib.parse.unquote(self.path[len("/api/session/") :])
records = sessions.get(sid)
if records is None:
self._respond(404, "application/json", b"[]")
@@ -917,9 +917,7 @@ def _run_server(
def main() -> int:
args = _parse_args()
records = _discover_records(args.logs_dir.expanduser(), args.limit_files)
summaries, sessions = _group_sessions(
records, include_tests=args.include_tests
)
summaries, sessions = _group_sessions(records, include_tests=args.include_tests)
initial_session_id = args.session or (
summaries[0].execution_id if summaries else ""