Files
hive/tools/tests/test_credential_registry.py
T
Hundao 589c5b06fe fix: resolve all ruff lint and format errors across codebase (#7058)
- Auto-fixed 70 lint errors (import sorting, aliased errors, datetime.UTC)
- Fixed 85 remaining errors manually:
  - E501: wrapped long lines in queen_profiles, catalog, routes_credentials
  - F821: added missing TYPE_CHECKING imports for AgentHost, ToolRegistry,
    HookContext, HookResult; added runtime imports where needed
  - F811: removed duplicate method definitions in queen_lifecycle_tools
  - F841/B007: removed unused variables in discovery.py
  - W291: removed trailing whitespace in queen nodes
  - E402: moved import to top of queen_memory_v2.py
  - Fixed AgentRuntime -> AgentHost in example template type annotations
- Reformatted 343 files with ruff format
2026-04-16 19:30:01 +08:00

98 lines
3.9 KiB
Python

"""Tests that enforce credential registry completeness and consistency.
These tests run in CI and catch common mistakes when adding new integrations:
- Missing health checker for a spec with health_check_endpoint
- Orphaned entries in HEALTH_CHECKERS (no corresponding spec)
- CredentialSpec fields that are incomplete
- Duplicate env var conflicts
"""
import pytest
from aden_tools.credentials import CREDENTIAL_SPECS
from aden_tools.credentials.health_check import HEALTH_CHECKERS
class TestRegistryCompleteness:
"""Every credential with a health_check_endpoint must have a registered checker."""
# Credentials that intentionally don't have their own dedicated checker:
# - google_cse: shares google_search checker (same credential_group)
# - razorpay/razorpay_secret: requires HTTP Basic auth with TWO credentials,
# which the single-value health check dispatcher can't support
# - plaid_client_id/plaid_secret: requires POST with both client_id and
# secret in JSON body, can't validate with a single credential value
KNOWN_EXCEPTIONS = {
"google_cse",
"razorpay",
"razorpay_secret",
"plaid_client_id",
"plaid_secret",
}
def test_specs_with_endpoint_have_checkers(self):
"""Every CredentialSpec with health_check_endpoint has a HEALTH_CHECKERS entry."""
missing = []
for name, spec in CREDENTIAL_SPECS.items():
if name in self.KNOWN_EXCEPTIONS:
continue
if spec.health_check_endpoint and name not in HEALTH_CHECKERS:
missing.append(f"{name}: has endpoint '{spec.health_check_endpoint}' but no dedicated health checker")
assert not missing, f"{len(missing)} credential(s) have health_check_endpoint but no checker:\n" + "\n".join(
f" - {m}" for m in missing
)
def test_checkers_have_corresponding_specs(self):
"""Every key in HEALTH_CHECKERS matches a CREDENTIAL_SPECS entry."""
orphaned = [name for name in HEALTH_CHECKERS if name not in CREDENTIAL_SPECS]
assert not orphaned, f"HEALTH_CHECKERS has entries with no CREDENTIAL_SPECS: {orphaned}"
class TestSpecRequiredFields:
"""Every CredentialSpec should have minimum required fields."""
@pytest.mark.parametrize(
"cred_name,spec",
list(CREDENTIAL_SPECS.items()),
ids=list(CREDENTIAL_SPECS.keys()),
)
def test_has_env_var(self, cred_name, spec):
assert spec.env_var, f"{cred_name}: missing env_var"
@pytest.mark.parametrize(
"cred_name,spec",
list(CREDENTIAL_SPECS.items()),
ids=list(CREDENTIAL_SPECS.keys()),
)
def test_has_description(self, cred_name, spec):
assert spec.description, f"{cred_name}: missing description"
@pytest.mark.parametrize(
"cred_name,spec",
list(CREDENTIAL_SPECS.items()),
ids=list(CREDENTIAL_SPECS.keys()),
)
def test_has_tools_or_node_types(self, cred_name, spec):
assert spec.tools or spec.node_types, f"{cred_name}: must have at least one tool or node_type"
class TestNoDuplicateEnvVars:
"""No two credential specs should use the same env_var (unless in same credential_group)."""
def test_no_accidental_env_var_collisions(self):
seen: dict[str, list[str]] = {}
for name, spec in CREDENTIAL_SPECS.items():
seen.setdefault(spec.env_var, []).append(name)
duplicates = {}
for env_var, names in seen.items():
if len(names) <= 1:
continue
# Filter out intentional duplicates (same credential_group)
groups = {CREDENTIAL_SPECS[n].credential_group for n in names}
if len(groups) == 1 and groups != {""}:
continue # All share the same non-empty group -- intentional
duplicates[env_var] = names
assert not duplicates, f"Duplicate env_vars across unrelated credentials: {duplicates}"