From e581767cab890b847d0764fff1a7ac8bec77465f Mon Sep 17 00:00:00 2001 From: Richard Tang Date: Fri, 20 Mar 2026 16:50:50 -0700 Subject: [PATCH] chore: ruff lint --- core/antigravity_auth.py | 44 +++++++++++++++++++---------- core/framework/config.py | 8 ++++-- scripts/check_llm_key.py | 13 +++++---- scripts/llm_debug_log_visualizer.py | 6 ++-- 4 files changed, 44 insertions(+), 27 deletions(-) diff --git a/core/antigravity_auth.py b/core/antigravity_auth.py index 64da7ed7..ddb53e7a 100644 --- a/core/antigravity_auth.py +++ b/core/antigravity_auth.py @@ -18,7 +18,6 @@ import logging import os import secrets import socket -import subprocess import sys import time import urllib.parse @@ -53,7 +52,9 @@ _DEFAULT_REDIRECT_PORT = 51121 # This project reverse-engineered and published the public OAuth credentials # for Google's Antigravity/Cloud Code Assist API. # Source: https://github.com/NoeFabris/opencode-antigravity-auth -_CREDENTIALS_URL = "https://raw.githubusercontent.com/NoeFabris/opencode-antigravity-auth/dev/src/constants.ts" +_CREDENTIALS_URL = ( + "https://raw.githubusercontent.com/NoeFabris/opencode-antigravity-auth/dev/src/constants.ts" +) # Cached credentials fetched from public source _cached_client_id: str | None = None @@ -67,10 +68,13 @@ def _fetch_credentials_from_public_source() -> tuple[str | None, str | None]: return _cached_client_id, _cached_client_secret try: - req = urllib.request.Request(_CREDENTIALS_URL, headers={"User-Agent": "Hive-Antigravity-Auth/1.0"}) + req = urllib.request.Request( + _CREDENTIALS_URL, headers={"User-Agent": "Hive-Antigravity-Auth/1.0"} + ) with urllib.request.urlopen(req, timeout=10) as resp: content = resp.read().decode("utf-8") import re + id_match = re.search(r'ANTIGRAVITY_CLIENT_ID\s*=\s*"([^"]+)"', content) secret_match = re.search(r'ANTIGRAVITY_CLIENT_SECRET\s*=\s*"([^"]+)"', content) if id_match: @@ -165,7 +169,8 @@ class OAuthCallbackHandler(BaseHTTPRequestHandler): OAuthCallbackHandler.auth_code = query["code"][0] OAuthCallbackHandler.state = query["state"][0] self._send_response( - "Authentication successful! You can close this window and return to the terminal." + "Authentication successful! You can close this window " + "and return to the terminal." ) return @@ -178,7 +183,9 @@ class OAuthCallbackHandler(BaseHTTPRequestHandler): html = f""" Antigravity Auth - +

{message}

@@ -288,7 +295,10 @@ def validate_credentials(access_token: str, project_id: str = _DEFAULT_PROJECT_I headers = { "Authorization": f"Bearer {access_token}", "Content-Type": "application/json", - "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Antigravity/1.18.3", + "User-Agent": ( + "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " + "AppleWebKit/537.36 (KHTML, like Gecko) Antigravity/1.18.3" + ), "X-Goog-Api-Client": "google-cloud-sdk vscode_cloudshelleditor/0.1", } @@ -306,7 +316,9 @@ def validate_credentials(access_token: str, project_id: str = _DEFAULT_PROJECT_I return False -def refresh_access_token(refresh_token: str, client_id: str, client_secret: str | None) -> dict | None: +def refresh_access_token( + refresh_token: str, client_id: str, client_secret: str | None +) -> dict | None: """Refresh the access token using the refresh token.""" data = { "grant_type": "refresh_token", @@ -349,7 +361,9 @@ def cmd_account_add(args: argparse.Namespace) -> int: access_token = account.get("access") refresh_token_str = account.get("refresh", "") refresh_token = refresh_token_str.split("|")[0] if refresh_token_str else None - project_id = refresh_token_str.split("|")[1] if "|" in refresh_token_str else _DEFAULT_PROJECT_ID + project_id = ( + refresh_token_str.split("|")[1] if "|" in refresh_token_str else _DEFAULT_PROJECT_ID + ) email = account.get("email", "unknown") expires_ms = account.get("expires", 0) expires_at = expires_ms / 1000.0 if expires_ms else 0.0 @@ -360,7 +374,7 @@ def cmd_account_add(args: argparse.Namespace) -> int: logger.info(f"Found existing credentials for: {email}") logger.info("Validating existing credentials...") if validate_credentials(access_token, project_id): - logger.info(f"✓ Credentials valid! Skipping OAuth.") + logger.info("✓ Credentials valid! Skipping OAuth.") return 0 else: logger.info("Credentials failed validation, refreshing...") @@ -376,13 +390,15 @@ def cmd_account_add(args: argparse.Namespace) -> int: # Update the account account["access"] = new_access account["expires"] = int((time.time() + expires_in) * 1000) - accounts_data["last_refresh"] = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) + accounts_data["last_refresh"] = time.strftime( + "%Y-%m-%dT%H:%M:%SZ", time.gmtime() + ) save_accounts(accounts_data) # Validate the refreshed token logger.info("Validating refreshed credentials...") if validate_credentials(new_access, project_id): - logger.info(f"✓ Credentials refreshed and validated!") + logger.info("✓ Credentials refreshed and validated!") return 0 else: logger.info("Refreshed token failed validation, proceeding with OAuth...") @@ -475,9 +491,7 @@ def cmd_account_add(args: argparse.Namespace) -> int: } # Update existing account or add new one - existing_idx = next( - (i for i, a in enumerate(accounts) if a.get("email") == email), None - ) + existing_idx = next((i for i, a in enumerate(accounts) if a.get("email") == email), None) if existing_idx is not None: accounts[existing_idx] = new_account logger.info(f"Updated existing account: {email}") @@ -501,7 +515,7 @@ def cmd_account_list(args: argparse.Namespace) -> int: if not accounts: logger.info("No accounts configured.") - logger.info(f"Run 'antigravity auth account add' to add one.") + logger.info("Run 'antigravity auth account add' to add one.") return 0 logger.info("Configured accounts:\n") diff --git a/core/framework/config.py b/core/framework/config.py index f3122dfb..236f9be6 100644 --- a/core/framework/config.py +++ b/core/framework/config.py @@ -286,7 +286,9 @@ def get_api_key() -> str | None: # This project reverse-engineered and published the public OAuth credentials # for Google's Antigravity/Cloud Code Assist API. # Source: https://github.com/NoeFabris/opencode-antigravity-auth -_ANTIGRAVITY_CREDENTIALS_URL = "https://raw.githubusercontent.com/NoeFabris/opencode-antigravity-auth/dev/src/constants.ts" +_ANTIGRAVITY_CREDENTIALS_URL = ( + "https://raw.githubusercontent.com/NoeFabris/opencode-antigravity-auth/dev/src/constants.ts" +) _antigravity_credentials_cache: tuple[str | None, str | None] = (None, None) @@ -300,7 +302,9 @@ def _fetch_antigravity_credentials() -> tuple[str | None, str | None]: import urllib.request try: - req = urllib.request.Request(_ANTIGRAVITY_CREDENTIALS_URL, headers={"User-Agent": "Hive/1.0"}) + req = urllib.request.Request( + _ANTIGRAVITY_CREDENTIALS_URL, headers={"User-Agent": "Hive/1.0"} + ) with urllib.request.urlopen(req, timeout=10) as resp: content = resp.read().decode("utf-8") id_match = re.search(r'ANTIGRAVITY_CLIENT_ID\s*=\s*"([^"]+)"', content) diff --git a/scripts/check_llm_key.py b/scripts/check_llm_key.py index cfdc66bf..44c24c02 100644 --- a/scripts/check_llm_key.py +++ b/scripts/check_llm_key.py @@ -33,8 +33,8 @@ OPENROUTER_SEPARATOR_TRANSLATION = str.maketrans( "\u2212": "-", "\u2044": "/", "\u2215": "/", - "\u29F8": "/", - "\uFF0F": "/", + "\u29f8": "/", + "\uff0f": "/", } ) @@ -66,9 +66,7 @@ def _sanitize_openrouter_model_id(value: str) -> str: """Sanitize pasted OpenRouter model IDs into a comparable slug.""" normalized = unicodedata.normalize("NFKC", value or "") normalized = "".join( - ch - for ch in normalized - if unicodedata.category(ch) not in {"Cc", "Cf"} + ch for ch in normalized if unicodedata.category(ch) not in {"Cc", "Cf"} ) normalized = normalized.translate(OPENROUTER_SEPARATOR_TRANSLATION) normalized = re.sub(r"\s+", "", normalized) @@ -183,7 +181,10 @@ def check_openrouter( return {"valid": False, "message": "Invalid OpenRouter API key"} if r.status_code == 403: return {"valid": False, "message": "OpenRouter API key lacks permissions"} - return {"valid": False, "message": f"OpenRouter API returned status {r.status_code}"} + return { + "valid": False, + "message": f"OpenRouter API returned status {r.status_code}", + } def check_openrouter_model( diff --git a/scripts/llm_debug_log_visualizer.py b/scripts/llm_debug_log_visualizer.py index 9bbcdefc..554dc0a2 100644 --- a/scripts/llm_debug_log_visualizer.py +++ b/scripts/llm_debug_log_visualizer.py @@ -876,7 +876,7 @@ def _run_server( if self.path == "/": self._respond(200, "text/html; charset=utf-8", html_bytes) elif self.path.startswith("/api/session/"): - sid = urllib.parse.unquote(self.path[len("/api/session/"):]) + sid = urllib.parse.unquote(self.path[len("/api/session/") :]) records = sessions.get(sid) if records is None: self._respond(404, "application/json", b"[]") @@ -917,9 +917,7 @@ def _run_server( def main() -> int: args = _parse_args() records = _discover_records(args.logs_dir.expanduser(), args.limit_files) - summaries, sessions = _group_sessions( - records, include_tests=args.include_tests - ) + summaries, sessions = _group_sessions(records, include_tests=args.include_tests) initial_session_id = args.session or ( summaries[0].execution_id if summaries else ""