94eee95fe0
* feat(auth): introduce backend auth module
Port RFC-001 authentication core from PR #1728:
- JWT token handling (create_access_token, decode_token, TokenPayload)
- Password hashing (bcrypt) with verify_password
- SQLite UserRepository with base interface
- Provider Factory pattern (LocalAuthProvider)
- CLI reset_admin tool
- Auth-specific errors (AuthErrorCode, TokenError, AuthErrorResponse)
Deps:
- bcrypt>=4.0.0
- pyjwt>=2.9.0
- email-validator>=2.0.0
- backend/uv.toml pins public PyPI index
Tests: 12 pure unit tests (test_auth_config.py, test_auth_errors.py).
Scope note: authz.py, test_auth.py, and test_auth_type_system.py are
deferred to commit 2 because they depend on middleware and deps wiring
that is not yet in place. Commit 1 stays "pure new files only" as the
spec mandates.
* feat(auth): wire auth end-to-end (middleware + frontend replacement)
Backend:
- Port auth_middleware, csrf_middleware, langgraph_auth, routers/auth
- Port authz decorator (owner_filter_key defaults to 'owner_id')
- Merge app.py: register AuthMiddleware + CSRFMiddleware + CORS, add
_ensure_admin_user lifespan hook, _migrate_orphaned_threads helper,
register auth router
- Merge deps.py: add get_local_provider, get_current_user_from_request,
get_optional_user_from_request; keep get_current_user as thin str|None
adapter for feedback router
- langgraph.json: add auth path pointing to langgraph_auth.py:auth
- Rename metadata['user_id'] -> metadata['owner_id'] in langgraph_auth
(both metadata write and LangGraph filter dict) + test fixtures
Frontend:
- Delete better-auth library and api catch-all route
- Remove better-auth npm dependency and env vars (BETTER_AUTH_SECRET,
BETTER_AUTH_GITHUB_*) from env.js
- Port frontend/src/core/auth/* (AuthProvider, gateway-config,
proxy-policy, server-side getServerSideUser, types)
- Port frontend/src/core/api/fetcher.ts
- Port (auth)/layout, (auth)/login, (auth)/setup pages
- Rewrite workspace/layout.tsx as server component that calls
getServerSideUser and wraps in AuthProvider
- Port workspace/workspace-content.tsx for the client-side sidebar logic
Tests:
- Port 5 auth test files (test_auth, test_auth_middleware,
test_auth_type_system, test_ensure_admin, test_langgraph_auth)
- 176 auth tests PASS
After this commit: login/logout/registration flow works, but persistence
layer does not yet filter by owner_id. Commit 4 closes that gap.
* feat(auth): account settings page + i18n
- Port account-settings-page.tsx (change password, change email, logout)
- Wire into settings-dialog.tsx as new "account" section with UserIcon,
rendered first in the section list
- Add i18n keys:
- en-US/zh-CN: settings.sections.account ("Account" / "账号")
- en-US/zh-CN: button.logout ("Log out" / "退出登录")
- types.ts: matching type declarations
* feat(auth): enforce owner_id across 2.0-rc persistence layer
Add request-scoped contextvar-based owner filtering to threads_meta,
runs, run_events, and feedback repositories. Router code is unchanged
— isolation is enforced at the storage layer so that any caller that
forgets to pass owner_id still gets filtered results, and new routes
cannot accidentally leak data.
Core infrastructure
-------------------
- deerflow/runtime/user_context.py (new):
- ContextVar[CurrentUser | None] with default None
- runtime_checkable CurrentUser Protocol (structural subtype with .id)
- set/reset/get/require helpers
- AUTO sentinel + resolve_owner_id(value, method_name) for sentinel
three-state resolution: AUTO reads contextvar, explicit str
overrides, explicit None bypasses the filter (for migration/CLI)
Repository changes
------------------
- ThreadMetaRepository: create/get/search/update_*/delete gain
owner_id=AUTO kwarg; read paths filter by owner, writes stamp it,
mutations check ownership before applying
- RunRepository: put/get/list_by_thread/delete gain owner_id=AUTO kwarg
- FeedbackRepository: create/get/list_by_run/list_by_thread/delete
gain owner_id=AUTO kwarg
- DbRunEventStore: list_messages/list_events/list_messages_by_run/
count_messages/delete_by_thread/delete_by_run gain owner_id=AUTO
kwarg. Write paths (put/put_batch) read contextvar softly: when a
request-scoped user is available, owner_id is stamped; background
worker writes without a user context pass None which is valid
(orphan row to be bound by migration)
Schema
------
- persistence/models/run_event.py: RunEventRow.owner_id = Mapped[
str | None] = mapped_column(String(64), nullable=True, index=True)
- No alembic migration needed: 2.0 ships fresh, Base.metadata.create_all
picks up the new column automatically
Middleware
----------
- auth_middleware.py: after cookie check, call get_optional_user_from_
request to load the real User, stamp it into request.state.user AND
the contextvar via set_current_user, reset in a try/finally. Public
paths and unauthenticated requests continue without contextvar, and
@require_auth handles the strict 401 path
Test infrastructure
-------------------
- tests/conftest.py: @pytest.fixture(autouse=True) _auto_user_context
sets a default SimpleNamespace(id="test-user-autouse") on every test
unless marked @pytest.mark.no_auto_user. Keeps existing 20+
persistence tests passing without modification
- pyproject.toml [tool.pytest.ini_options]: register no_auto_user
marker so pytest does not emit warnings for opt-out tests
- tests/test_user_context.py: 6 tests covering three-state semantics,
Protocol duck typing, and require/optional APIs
- tests/test_thread_meta_repo.py: one test updated to pass owner_id=
None explicitly where it was previously relying on the old default
Test results
------------
- test_user_context.py: 6 passed
- test_auth*.py + test_langgraph_auth.py + test_ensure_admin.py: 127
- test_run_event_store / test_run_repository / test_thread_meta_repo
/ test_feedback: 92 passed
- Full backend suite: 1905 passed, 2 failed (both @requires_llm flaky
integration tests unrelated to auth), 1 skipped
* feat(auth): extend orphan migration to 2.0-rc persistence tables
_ensure_admin_user now runs a three-step pipeline on every boot:
Step 1 (fatal): admin user exists / is created / password is reset
Step 2 (non-fatal): LangGraph store orphan threads → admin
Step 3 (non-fatal): SQL persistence tables → admin
- threads_meta
- runs
- run_events
- feedback
Each step is idempotent. The fatal/non-fatal split mirrors PR #1728's
original philosophy: admin creation failure blocks startup (the system
is unusable without an admin), whereas migration failures log a warning
and let the service proceed (a partial migration is recoverable; a
missing admin is not).
Key helpers
-----------
- _iter_store_items(store, namespace, *, page_size=500):
async generator that cursor-paginates across LangGraph store pages.
Fixes PR #1728's hardcoded limit=1000 bug that would silently lose
orphans beyond the first page.
- _migrate_orphaned_threads(store, admin_user_id):
Rewritten to use _iter_store_items. Returns the migrated count so the
caller can log it; raises only on unhandled exceptions.
- _migrate_orphan_sql_tables(admin_user_id):
Imports the 4 ORM models lazily, grabs the shared session factory,
runs one UPDATE per table in a single transaction, commits once.
No-op when no persistence backend is configured (in-memory dev).
Tests: test_ensure_admin.py (8 passed)
* test(auth): port AUTH test plan docs + lint/format pass
- Port backend/docs/AUTH_TEST_PLAN.md and AUTH_UPGRADE.md from PR #1728
- Rename metadata.user_id → metadata.owner_id in AUTH_TEST_PLAN.md
(4 occurrences from the original PR doc)
- ruff auto-fix UP037 in sentinel type annotations: drop quotes around
"str | None | _AutoSentinel" now that from __future__ import
annotations makes them implicit string forms
- ruff format: 2 files (app/gateway/app.py, runtime/user_context.py)
Note on test coverage additions:
- conftest.py autouse fixture was already added in commit 4 (had to
be co-located with the repository changes to keep pre-existing
persistence tests passing)
- cross-user isolation E2E tests (test_owner_isolation.py) deferred
— enforcement is already proven by the 98-test repository suite
via the autouse fixture + explicit _AUTO sentinel exercises
- New test cases (TC-API-17..20, TC-ATK-13, TC-MIG-01..07) listed
in AUTH_TEST_PLAN.md are deferred to a follow-up PR — they are
manual-QA test cases rather than pytest code, and the spec-level
coverage is already met by test_user_context.py + the 98-test
repository suite.
Final test results:
- Auth suite (test_auth*, test_langgraph_auth, test_ensure_admin,
test_user_context): 186 passed
- Persistence suite (test_run_event_store, test_run_repository,
test_thread_meta_repo, test_feedback): 98 passed
- Lint: ruff check + ruff format both clean
* test(auth): add cross-user isolation test suite
10 tests exercising the storage-layer owner filter by manually
switching the user_context contextvar between two users. Verifies
the safety invariant:
After a repository write with owner_id=A, a subsequent read with
owner_id=B must not return the row, and vice versa.
Covers all 4 tables that own user-scoped data:
TC-API-17 threads_meta — read, search, update, delete cross-user
TC-API-18 runs — get, list_by_thread, delete cross-user
TC-API-19 run_events — list_messages, list_events, count_messages,
delete_by_thread (CRITICAL: raw conversation
content leak vector)
TC-API-20 feedback — get, list_by_run, delete cross-user
Plus two meta-tests verifying the sentinel pattern itself:
- AUTO + unset contextvar raises RuntimeError
- explicit owner_id=None bypasses the filter (migration escape hatch)
Architecture note
-----------------
These tests bypass the HTTP layer by design. The full chain
(cookie → middleware → contextvar → repository) is covered piecewise:
- test_auth_middleware.py: middleware sets contextvar from cookies
- test_owner_isolation.py: repositories enforce isolation when
contextvar is set to different users
Together they prove the end-to-end safety property without the
ceremony of spinning up a full TestClient + in-memory DB for every
router endpoint.
Tests pass: 231 (full auth + persistence + isolation suite)
Lint: clean
* refactor(auth): migrate user repository to SQLAlchemy ORM
Move the users table into the shared persistence engine so auth
matches the pattern of threads_meta, runs, run_events, and feedback —
one engine, one session factory, one schema init codepath.
New files
---------
- persistence/user/__init__.py, persistence/user/model.py: UserRow
ORM class with partial unique index on (oauth_provider, oauth_id)
- Registered in persistence/models/__init__.py so
Base.metadata.create_all() picks it up
Modified
--------
- auth/repositories/sqlite.py: rewritten as async SQLAlchemy,
identical constructor pattern to the other four repositories
(def __init__(self, session_factory) + self._sf = session_factory)
- auth/config.py: drop users_db_path field — storage is configured
through config.database like every other table
- deps.py/get_local_provider: construct SQLiteUserRepository with
the shared session factory, fail fast if engine is not initialised
- tests/test_auth.py: rewrite test_sqlite_round_trip_new_fields to
use the shared engine (init_engine + close_engine in a tempdir)
- tests/test_auth_type_system.py: add per-test autouse fixture that
spins up a scratch engine and resets deps._cached_* singletons
* refactor(auth): remove SQL orphan migration (unused in supported scenarios)
The _migrate_orphan_sql_tables helper existed to bind NULL owner_id
rows in threads_meta, runs, run_events, and feedback to the admin on
first boot. But in every supported upgrade path, it's a no-op:
1. Fresh install: create_all builds fresh tables, no legacy rows
2. No-auth → with-auth (no existing persistence DB): persistence
tables are created fresh by create_all, no legacy rows
3. No-auth → with-auth (has existing persistence DB from #1930):
NOT a supported upgrade path — "有 DB 到有 DB" schema evolution
is out of scope; users wipe DB or run manual ALTER
So the SQL orphan migration never has anything to do in the
supported matrix. Delete the function, simplify _ensure_admin_user
from a 3-step pipeline to a 2-step one (admin creation + LangGraph
store orphan migration only).
LangGraph store orphan migration stays: it serves the real
"no-auth → with-auth" upgrade path where a user's existing LangGraph
thread metadata has no owner_id field and needs to be stamped with
the newly-created admin's id.
Tests: 284 passed (auth + persistence + isolation)
Lint: clean
* security(auth): write initial admin password to 0600 file instead of logs
CodeQL py/clear-text-logging-sensitive-data flagged 3 call sites that
logged the auto-generated admin password to stdout via logger.info().
Production log aggregators (ELK/Splunk/etc) would have captured those
cleartext secrets. Replace with a shared helper that writes to
.deer-flow/admin_initial_credentials.txt with mode 0600, and log only
the path.
New file
--------
- app/gateway/auth/credential_file.py: write_initial_credentials()
helper. Takes email, password, and a "initial"/"reset" label.
Creates .deer-flow/ if missing, writes a header comment plus the
email+password, chmods 0o600, returns the absolute Path.
Modified
--------
- app/gateway/app.py: both _ensure_admin_user paths (fresh creation
+ needs_setup password reset) now write to file and log the path
- app/gateway/auth/reset_admin.py: rewritten to use the shared ORM
repo (SQLiteUserRepository with session_factory) and the
credential_file helper. The previous implementation was broken
after the earlier ORM refactor — it still imported _get_users_conn
and constructed SQLiteUserRepository() without a session factory.
No tests changed — the three password-log sites are all exercised
via existing test_ensure_admin.py which checks that startup
succeeds, not that a specific string appears in logs.
CodeQL alerts 272, 283, 284: all resolved.
* security(auth): strict JWT validation in middleware (fix junk cookie bypass)
AUTH_TEST_PLAN test 7.5.8 expects junk cookies to be rejected with
401. The previous middleware behaviour was "presence-only": check
that some access_token cookie exists, then pass through. In
combination with my Task-12 decision to skip @require_auth
decorators on routes, this created a gap where a request with any
cookie-shaped string (e.g. access_token=not-a-jwt) would bypass
authentication on routes that do not touch the repository
(/api/models, /api/mcp/config, /api/memory, /api/skills, …).
Fix: middleware now calls get_current_user_from_request() strictly
and catches the resulting HTTPException to render a 401 with the
proper fine-grained error code (token_invalid, token_expired,
user_not_found, …). On success it stamps request.state.user and
the contextvar so repository-layer owner filters work downstream.
The 4 old "_with_cookie_passes" tests in test_auth_middleware.py
were written for the presence-only behaviour; they asserted that
a junk cookie would make the handler return 200. They are renamed
to "_with_junk_cookie_rejected" and their assertions flipped to
401. The negative path (no cookie → 401 not_authenticated)
is unchanged.
Verified:
no cookie → 401 not_authenticated
junk cookie → 401 token_invalid (the fixed bug)
expired cookie → 401 token_expired
Tests: 284 passed (auth + persistence + isolation)
Lint: clean
* security(auth): wire @require_permission(owner_check=True) on isolation routes
Apply the require_permission decorator to all 28 routes that take a
{thread_id} path parameter. Combined with the strict middleware
(previous commit), this gives the double-layer protection that
AUTH_TEST_PLAN test 7.5.9 documents:
Layer 1 (AuthMiddleware): cookie + JWT validation, rejects junk
cookies and stamps request.state.user
Layer 2 (@require_permission with owner_check=True): per-resource
ownership verification via
ThreadMetaStore.check_access — returns
404 if a different user owns the thread
The decorator's owner_check branch is rewritten to use the SQL
thread_meta_repo (the 2.0-rc persistence layer) instead of the
LangGraph store path that PR #1728 used (_store_get / get_store
in routers/threads.py). The inject_record convenience is dropped
— no caller in 2.0 needs the LangGraph blob, and the SQL repo has
a different shape.
Routes decorated (28 total):
- threads.py: delete, patch, get, get-state, post-state, post-history
- thread_runs.py: post-runs, post-runs-stream, post-runs-wait,
list_runs, get_run, cancel_run, join_run, stream_existing_run,
list_thread_messages, list_run_messages, list_run_events,
thread_token_usage
- feedback.py: create, list, stats, delete
- uploads.py: upload (added Request param), list, delete
- artifacts.py: get_artifact
- suggestions.py: generate (renamed body parameter to avoid
conflict with FastAPI Request)
Test fixes:
- test_suggestions_router.py: bypass the decorator via __wrapped__
(the unit tests cover parsing logic, not auth — no point spinning
up a thread_meta_repo just to test JSON unwrapping)
- test_auth_middleware.py 4 fake-cookie tests: already updated in
the previous commit (745bf432)
Tests: 293 passed (auth + persistence + isolation + suggestions)
Lint: clean
* security(auth): defense-in-depth fixes from release validation pass
Eight findings caught while running the AUTH_TEST_PLAN end-to-end against
the deployed sg_dev stack. Each is a pre-condition for shipping
release/2.0-rc that the previous PRs missed.
Backend hardening
- routers/auth.py: rate limiter X-Real-IP now requires AUTH_TRUSTED_PROXIES
whitelist (CIDR/IP allowlist). Without nginx in front, the previous code
honored arbitrary X-Real-IP, letting an attacker rotate the header to
fully bypass the per-IP login lockout.
- routers/auth.py: 36-entry common-password blocklist via Pydantic
field_validator on RegisterRequest + ChangePasswordRequest. The shared
_validate_strong_password helper keeps the constraint in one place.
- routers/threads.py: ThreadCreateRequest + ThreadPatchRequest strip
server-reserved metadata keys (owner_id, user_id) via Pydantic
field_validator so a forged value can never round-trip back to other
clients reading the same thread. The actual ownership invariant stays
on the threads_meta row; this closes the metadata-blob echo gap.
- authz.py + thread_meta/sql.py: require_permission gains a require_existing
flag plumbed through check_access(require_existing=True). Destructive
routes (DELETE/PATCH/state-update/runs/feedback) now treat a missing
thread_meta row as 404 instead of "untracked legacy thread, allow",
closing the cross-user delete-idempotence gap where any user could
successfully DELETE another user's deleted thread.
- repositories/sqlite.py + base.py: update_user raises UserNotFoundError
on a vanished row instead of silently returning the input. Concurrent
delete during password reset can no longer look like a successful update.
- runtime/user_context.py: resolve_owner_id() coerces User.id (UUID) to
str at the contextvar boundary so SQLAlchemy String(64) columns can
bind it. The whole 2.0-rc isolation pipeline was previously broken
end-to-end (POST /api/threads → 500 "type 'UUID' is not supported").
- persistence/engine.py: SQLAlchemy listener enables PRAGMA journal_mode=WAL,
synchronous=NORMAL, foreign_keys=ON on every new SQLite connection.
TC-UPG-06 in the test plan expects WAL; previous code shipped with the
default 'delete' journal.
- auth_middleware.py: stamp request.state.auth = AuthContext(...) so
@require_permission's short-circuit fires; previously every isolation
request did a duplicate JWT decode + users SELECT. Also unifies the
401 payload through AuthErrorResponse(...).model_dump().
- app.py: _ensure_admin_user restructure removes the noqa F821 scoping
bug where 'password' was referenced outside the branch that defined it.
New _announce_credentials helper absorbs the duplicate log block in
the fresh-admin and reset-admin branches.
* fix(frontend+nginx): rollout CSRF on every state-changing client path
The frontend was 100% broken in gateway-pro mode for any user trying to
open a specific chat thread. Three cumulative bugs each silently
masked the next.
LangGraph SDK CSRF gap (api-client.ts)
- The Client constructor took only apiUrl, no defaultHeaders, no fetch
interceptor. The SDK's internal fetch never sent X-CSRF-Token, so
every state-changing /api/langgraph-compat/* call (runs/stream,
threads/search, threads/{tid}/history, ...) hit CSRFMiddleware and
got 403 before reaching the auth check. UI symptom: empty thread page
with no error message; the SPA's hooks swallowed the rejection.
- Fix: pass an onRequest hook that injects X-CSRF-Token from the
csrf_token cookie per request. Reading the cookie per call (not at
construction time) handles login / logout / password-change cookie
rotation transparently. The SDK's prepareFetchOptions calls
onRequest for both regular requests AND streaming/SSE/reconnect, so
the same hook covers runs.stream and runs.joinStream.
Raw fetch CSRF gap (7 files)
- Audit: 11 frontend fetch sites, only 2 included CSRF (login/setup +
account-settings change-password). The other 7 routed through raw
fetch() with no header — suggestions, memory, agents, mcp, skills,
uploads, and the local thread cleanup hook all 403'd silently.
- Fix: enhance fetcher.ts:fetchWithAuth to auto-inject X-CSRF-Token on
POST/PUT/DELETE/PATCH from a single shared readCsrfCookie() helper.
Convert all 7 raw fetch() callers to fetchWithAuth so the contract
is centrally enforced. api-client.ts and fetcher.ts share
readCsrfCookie + STATE_CHANGING_METHODS to avoid drift.
nginx routing + buffering (nginx.local.conf)
- The auth feature shipped without updating the nginx config: per-API
explicit location blocks but no /api/v1/auth/, /api/feedback, /api/runs.
The frontend's client-side fetches to /api/v1/auth/login/local 404'd
from the Next.js side because nginx routed /api/* to the frontend.
- Fix: add catch-all `location /api/` that proxies to the gateway.
nginx longest-prefix matching keeps the explicit blocks (/api/models,
/api/threads regex, /api/langgraph/, ...) winning for their paths.
- Fix: disable proxy_buffering + proxy_request_buffering for the
frontend `location /` block. Without it, nginx tries to spool large
Next.js chunks into /var/lib/nginx/proxy (root-owned) and fails with
Permission denied → ERR_INCOMPLETE_CHUNKED_ENCODING → ChunkLoadError.
* test(auth): release-validation test infra and new coverage
Test fixtures and unit tests added during the validation pass.
Router test helpers (NEW: tests/_router_auth_helpers.py)
- make_authed_test_app(): builds a FastAPI test app with a stub
middleware that stamps request.state.user + request.state.auth and a
permissive thread_meta_repo mock. TestClient-based router tests
(test_artifacts_router, test_threads_router) use it instead of bare
FastAPI() so the new @require_permission(owner_check=True) decorators
short-circuit cleanly.
- call_unwrapped(): walks the __wrapped__ chain to invoke the underlying
handler without going through the authz wrappers. Direct-call tests
(test_uploads_router) use it. Typed with ParamSpec so the wrapped
signature flows through.
Backend test additions
- test_auth.py: 7 tests for the new _get_client_ip trust model (no
proxy / trusted proxy / untrusted peer / XFF rejection / invalid
CIDR / no client). 5 tests for the password blocklist (literal,
case-insensitive, strong password accepted, change-password binding,
short-password length-check still fires before blocklist).
test_update_user_raises_when_row_concurrently_deleted: closes a
shipped-without-coverage gap on the new UserNotFoundError contract.
- test_thread_meta_repo.py: 4 tests for check_access(require_existing=True)
— strict missing-row denial, strict owner match, strict owner mismatch,
strict null-owner still allowed (shared rows survive the tightening).
- test_ensure_admin.py: 3 tests for _migrate_orphaned_threads /
_iter_store_items pagination, covering the TC-UPG-02 upgrade story
end-to-end via mock store. Closes the gap where the cursor pagination
was untested even though the previous PR rewrote it.
- test_threads_router.py: 5 tests for _strip_reserved_metadata
(owner_id removal, user_id removal, safe-keys passthrough, empty
input, both-stripped).
- test_auth_type_system.py: replace "password123" fixtures with
Tr0ub4dor3a / AnotherStr0ngPwd! so the new password blocklist
doesn't reject the test data.
* docs(auth): refresh TC-DOCKER-05 + document Docker validation gap
- AUTH_TEST_PLAN.md TC-DOCKER-05: the previous expectation
("admin password visible in docker logs") was stale after the simplify
pass that moved credentials to a 0600 file. The grep "Password:" check
would have silently failed and given a false sense of coverage. New
expectation matches the actual file-based path: 0600 file in
DEER_FLOW_HOME, log shows the path (not the secret), reverse-grep
asserts no leaked password in container logs.
- NEW: docs/AUTH_TEST_DOCKER_GAP.md documents the only un-executed
block in the test plan (TC-DOCKER-01..06). Reason: sg_dev validation
host has no Docker daemon installed. The doc maps each Docker case
to an already-validated bare-metal equivalent (TC-1.1, TC-REENT-01,
TC-API-02 etc.) so the gap is auditable, and includes pre-flight
reproduction steps for whoever has Docker available.
---------
Co-authored-by: greatmengqi <chenmengqi.0376@bytedance.com>
655 lines
24 KiB
Python
655 lines
24 KiB
Python
"""Tests for authentication module: JWT, password hashing, AuthContext, and authz decorators."""
|
|
|
|
from datetime import timedelta
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
from uuid import uuid4
|
|
|
|
import pytest
|
|
from fastapi import FastAPI, HTTPException
|
|
from fastapi.testclient import TestClient
|
|
|
|
from app.gateway.auth import create_access_token, decode_token, hash_password, verify_password
|
|
from app.gateway.auth.models import User
|
|
from app.gateway.authz import (
|
|
AuthContext,
|
|
Permissions,
|
|
get_auth_context,
|
|
require_auth,
|
|
require_permission,
|
|
)
|
|
|
|
# ── Password Hashing ────────────────────────────────────────────────────────
|
|
|
|
|
|
def test_hash_password_and_verify():
|
|
"""Hashing and verification round-trip."""
|
|
password = "s3cr3tP@ssw0rd!"
|
|
hashed = hash_password(password)
|
|
assert hashed != password
|
|
assert verify_password(password, hashed) is True
|
|
assert verify_password("wrongpassword", hashed) is False
|
|
|
|
|
|
def test_hash_password_different_each_time():
|
|
"""bcrypt generates unique salts, so same password has different hashes."""
|
|
password = "testpassword"
|
|
h1 = hash_password(password)
|
|
h2 = hash_password(password)
|
|
assert h1 != h2 # Different salts
|
|
# But both verify correctly
|
|
assert verify_password(password, h1) is True
|
|
assert verify_password(password, h2) is True
|
|
|
|
|
|
def test_verify_password_rejects_empty():
|
|
"""Empty password should not verify."""
|
|
hashed = hash_password("nonempty")
|
|
assert verify_password("", hashed) is False
|
|
|
|
|
|
# ── JWT ─────────────────────────────────────────────────────────────────────
|
|
|
|
|
|
def test_create_and_decode_token():
|
|
"""JWT creation and decoding round-trip."""
|
|
user_id = str(uuid4())
|
|
# Set a valid JWT secret for this test
|
|
import os
|
|
|
|
os.environ["AUTH_JWT_SECRET"] = "test-secret-key-for-jwt-testing-minimum-32-chars"
|
|
token = create_access_token(user_id)
|
|
assert isinstance(token, str)
|
|
|
|
payload = decode_token(token)
|
|
assert payload is not None
|
|
assert payload.sub == user_id
|
|
|
|
|
|
def test_decode_token_expired():
|
|
"""Expired token returns TokenError.EXPIRED."""
|
|
from app.gateway.auth.errors import TokenError
|
|
|
|
user_id = str(uuid4())
|
|
# Create token that expires immediately
|
|
token = create_access_token(user_id, expires_delta=timedelta(seconds=-1))
|
|
payload = decode_token(token)
|
|
assert payload == TokenError.EXPIRED
|
|
|
|
|
|
def test_decode_token_invalid():
|
|
"""Invalid token returns TokenError."""
|
|
from app.gateway.auth.errors import TokenError
|
|
|
|
assert isinstance(decode_token("not.a.valid.token"), TokenError)
|
|
assert isinstance(decode_token(""), TokenError)
|
|
assert isinstance(decode_token("completely-wrong"), TokenError)
|
|
|
|
|
|
def test_create_token_custom_expiry():
|
|
"""Custom expiry is respected."""
|
|
user_id = str(uuid4())
|
|
token = create_access_token(user_id, expires_delta=timedelta(hours=1))
|
|
payload = decode_token(token)
|
|
assert payload is not None
|
|
assert payload.sub == user_id
|
|
|
|
|
|
# ── AuthContext ────────────────────────────────────────────────────────────
|
|
|
|
|
|
def test_auth_context_unauthenticated():
|
|
"""AuthContext with no user."""
|
|
ctx = AuthContext(user=None, permissions=[])
|
|
assert ctx.is_authenticated is False
|
|
assert ctx.has_permission("threads", "read") is False
|
|
|
|
|
|
def test_auth_context_authenticated_no_perms():
|
|
"""AuthContext with user but no permissions."""
|
|
user = User(id=uuid4(), email="test@example.com", password_hash="hash")
|
|
ctx = AuthContext(user=user, permissions=[])
|
|
assert ctx.is_authenticated is True
|
|
assert ctx.has_permission("threads", "read") is False
|
|
|
|
|
|
def test_auth_context_has_permission():
|
|
"""AuthContext permission checking."""
|
|
user = User(id=uuid4(), email="test@example.com", password_hash="hash")
|
|
perms = [Permissions.THREADS_READ, Permissions.THREADS_WRITE]
|
|
ctx = AuthContext(user=user, permissions=perms)
|
|
assert ctx.has_permission("threads", "read") is True
|
|
assert ctx.has_permission("threads", "write") is True
|
|
assert ctx.has_permission("threads", "delete") is False
|
|
assert ctx.has_permission("runs", "read") is False
|
|
|
|
|
|
def test_auth_context_require_user_raises():
|
|
"""require_user raises 401 when not authenticated."""
|
|
ctx = AuthContext(user=None, permissions=[])
|
|
with pytest.raises(HTTPException) as exc_info:
|
|
ctx.require_user()
|
|
assert exc_info.value.status_code == 401
|
|
|
|
|
|
def test_auth_context_require_user_returns_user():
|
|
"""require_user returns user when authenticated."""
|
|
user = User(id=uuid4(), email="test@example.com", password_hash="hash")
|
|
ctx = AuthContext(user=user, permissions=[])
|
|
returned = ctx.require_user()
|
|
assert returned == user
|
|
|
|
|
|
# ── get_auth_context helper ─────────────────────────────────────────────────
|
|
|
|
|
|
def test_get_auth_context_not_set():
|
|
"""get_auth_context returns None when auth not set on request."""
|
|
mock_request = MagicMock()
|
|
# Make getattr return None (simulating attribute not set)
|
|
mock_request.state = MagicMock()
|
|
del mock_request.state.auth
|
|
assert get_auth_context(mock_request) is None
|
|
|
|
|
|
def test_get_auth_context_set():
|
|
"""get_auth_context returns the AuthContext from request."""
|
|
user = User(id=uuid4(), email="test@example.com", password_hash="hash")
|
|
ctx = AuthContext(user=user, permissions=[Permissions.THREADS_READ])
|
|
|
|
mock_request = MagicMock()
|
|
mock_request.state.auth = ctx
|
|
|
|
assert get_auth_context(mock_request) == ctx
|
|
|
|
|
|
# ── require_auth decorator ──────────────────────────────────────────────────
|
|
|
|
|
|
def test_require_auth_sets_auth_context():
|
|
"""require_auth sets auth context on request from cookie."""
|
|
from fastapi import Request
|
|
|
|
app = FastAPI()
|
|
|
|
@app.get("/test")
|
|
@require_auth
|
|
async def endpoint(request: Request):
|
|
ctx = get_auth_context(request)
|
|
return {"authenticated": ctx.is_authenticated}
|
|
|
|
with TestClient(app) as client:
|
|
# No cookie → anonymous
|
|
response = client.get("/test")
|
|
assert response.status_code == 200
|
|
assert response.json()["authenticated"] is False
|
|
|
|
|
|
def test_require_auth_requires_request_param():
|
|
"""require_auth raises ValueError if request parameter is missing."""
|
|
import asyncio
|
|
|
|
@require_auth
|
|
async def bad_endpoint(): # Missing `request` parameter
|
|
pass
|
|
|
|
with pytest.raises(ValueError, match="require_auth decorator requires 'request' parameter"):
|
|
asyncio.run(bad_endpoint())
|
|
|
|
|
|
# ── require_permission decorator ─────────────────────────────────────────────
|
|
|
|
|
|
def test_require_permission_requires_auth():
|
|
"""require_permission raises 401 when not authenticated."""
|
|
from fastapi import Request
|
|
|
|
app = FastAPI()
|
|
|
|
@app.get("/test")
|
|
@require_permission("threads", "read")
|
|
async def endpoint(request: Request):
|
|
return {"ok": True}
|
|
|
|
with TestClient(app) as client:
|
|
response = client.get("/test")
|
|
assert response.status_code == 401
|
|
assert "Authentication required" in response.json()["detail"]
|
|
|
|
|
|
def test_require_permission_denies_wrong_permission():
|
|
"""User without required permission gets 403."""
|
|
from fastapi import Request
|
|
|
|
app = FastAPI()
|
|
user = User(id=uuid4(), email="test@example.com", password_hash="hash")
|
|
|
|
@app.get("/test")
|
|
@require_permission("threads", "delete")
|
|
async def endpoint(request: Request):
|
|
return {"ok": True}
|
|
|
|
mock_auth = AuthContext(user=user, permissions=[Permissions.THREADS_READ])
|
|
|
|
with patch("app.gateway.authz._authenticate", return_value=mock_auth):
|
|
with TestClient(app) as client:
|
|
response = client.get("/test")
|
|
assert response.status_code == 403
|
|
assert "Permission denied" in response.json()["detail"]
|
|
|
|
|
|
# ── Weak JWT secret warning ──────────────────────────────────────────────────
|
|
|
|
|
|
# ── User Model Fields ──────────────────────────────────────────────────────
|
|
|
|
|
|
def test_user_model_has_needs_setup_default_false():
|
|
"""New users default to needs_setup=False."""
|
|
user = User(email="test@example.com", password_hash="hash")
|
|
assert user.needs_setup is False
|
|
|
|
|
|
def test_user_model_has_token_version_default_zero():
|
|
"""New users default to token_version=0."""
|
|
user = User(email="test@example.com", password_hash="hash")
|
|
assert user.token_version == 0
|
|
|
|
|
|
def test_user_model_needs_setup_true():
|
|
"""Auto-created admin has needs_setup=True."""
|
|
user = User(email="admin@example.com", password_hash="hash", needs_setup=True)
|
|
assert user.needs_setup is True
|
|
|
|
|
|
def test_sqlite_round_trip_new_fields():
|
|
"""needs_setup and token_version survive create → read round-trip.
|
|
|
|
Uses the shared persistence engine (same one threads_meta, runs,
|
|
run_events, and feedback use). The old separate .deer-flow/users.db
|
|
file is gone.
|
|
"""
|
|
import asyncio
|
|
import tempfile
|
|
|
|
from app.gateway.auth.repositories.sqlite import SQLiteUserRepository
|
|
|
|
async def _run() -> None:
|
|
from deerflow.persistence.engine import (
|
|
close_engine,
|
|
get_session_factory,
|
|
init_engine,
|
|
)
|
|
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
url = f"sqlite+aiosqlite:///{tmpdir}/scratch.db"
|
|
await init_engine("sqlite", url=url, sqlite_dir=tmpdir)
|
|
try:
|
|
repo = SQLiteUserRepository(get_session_factory())
|
|
user = User(
|
|
email="setup@test.com",
|
|
password_hash="fakehash",
|
|
system_role="admin",
|
|
needs_setup=True,
|
|
token_version=3,
|
|
)
|
|
created = await repo.create_user(user)
|
|
assert created.needs_setup is True
|
|
assert created.token_version == 3
|
|
|
|
fetched = await repo.get_user_by_email("setup@test.com")
|
|
assert fetched is not None
|
|
assert fetched.needs_setup is True
|
|
assert fetched.token_version == 3
|
|
|
|
fetched.needs_setup = False
|
|
fetched.token_version = 4
|
|
await repo.update_user(fetched)
|
|
refetched = await repo.get_user_by_id(str(fetched.id))
|
|
assert refetched is not None
|
|
assert refetched.needs_setup is False
|
|
assert refetched.token_version == 4
|
|
finally:
|
|
await close_engine()
|
|
|
|
asyncio.run(_run())
|
|
|
|
|
|
def test_update_user_raises_when_row_concurrently_deleted(tmp_path):
|
|
"""Concurrent-delete during update_user must hard-fail, not silently no-op.
|
|
|
|
Earlier the SQLite repo returned the input unchanged when the row was
|
|
missing, making a phantom success path that admin password reset
|
|
callers (`reset_admin`, `_ensure_admin_user`) would happily log as
|
|
'password reset'. The new contract: raise ``UserNotFoundError`` so
|
|
a vanished row never looks like a successful update.
|
|
"""
|
|
import asyncio
|
|
import tempfile
|
|
|
|
from app.gateway.auth.repositories.base import UserNotFoundError
|
|
from app.gateway.auth.repositories.sqlite import SQLiteUserRepository
|
|
|
|
async def _run() -> None:
|
|
from deerflow.persistence.engine import (
|
|
close_engine,
|
|
get_session_factory,
|
|
init_engine,
|
|
)
|
|
from deerflow.persistence.user.model import UserRow
|
|
|
|
with tempfile.TemporaryDirectory() as d:
|
|
url = f"sqlite+aiosqlite:///{d}/scratch.db"
|
|
await init_engine("sqlite", url=url, sqlite_dir=d)
|
|
try:
|
|
sf = get_session_factory()
|
|
repo = SQLiteUserRepository(sf)
|
|
user = User(
|
|
email="ghost@test.com",
|
|
password_hash="fakehash",
|
|
system_role="user",
|
|
)
|
|
created = await repo.create_user(user)
|
|
|
|
# Simulate "row vanished underneath us" by deleting the row
|
|
# via the raw ORM session, then attempt to update.
|
|
async with sf() as session:
|
|
row = await session.get(UserRow, str(created.id))
|
|
assert row is not None
|
|
await session.delete(row)
|
|
await session.commit()
|
|
|
|
created.needs_setup = True
|
|
with pytest.raises(UserNotFoundError):
|
|
await repo.update_user(created)
|
|
finally:
|
|
await close_engine()
|
|
|
|
asyncio.run(_run())
|
|
|
|
|
|
# ── Token Versioning ───────────────────────────────────────────────────────
|
|
|
|
|
|
def test_jwt_encodes_ver():
|
|
"""JWT payload includes ver field."""
|
|
import os
|
|
|
|
from app.gateway.auth.errors import TokenError
|
|
|
|
os.environ["AUTH_JWT_SECRET"] = "test-secret-key-for-jwt-testing-minimum-32-chars"
|
|
token = create_access_token(str(uuid4()), token_version=3)
|
|
payload = decode_token(token)
|
|
assert not isinstance(payload, TokenError)
|
|
assert payload.ver == 3
|
|
|
|
|
|
def test_jwt_default_ver_zero():
|
|
"""JWT ver defaults to 0."""
|
|
import os
|
|
|
|
from app.gateway.auth.errors import TokenError
|
|
|
|
os.environ["AUTH_JWT_SECRET"] = "test-secret-key-for-jwt-testing-minimum-32-chars"
|
|
token = create_access_token(str(uuid4()))
|
|
payload = decode_token(token)
|
|
assert not isinstance(payload, TokenError)
|
|
assert payload.ver == 0
|
|
|
|
|
|
def test_token_version_mismatch_rejects():
|
|
"""Token with stale ver is rejected by get_current_user_from_request."""
|
|
import asyncio
|
|
import os
|
|
|
|
os.environ["AUTH_JWT_SECRET"] = "test-secret-key-for-jwt-testing-minimum-32-chars"
|
|
|
|
user_id = str(uuid4())
|
|
token = create_access_token(user_id, token_version=0)
|
|
|
|
mock_user = User(id=user_id, email="test@example.com", password_hash="hash", token_version=1)
|
|
|
|
mock_request = MagicMock()
|
|
mock_request.cookies = {"access_token": token}
|
|
|
|
with patch("app.gateway.deps.get_local_provider") as mock_provider_fn:
|
|
mock_provider = MagicMock()
|
|
mock_provider.get_user = AsyncMock(return_value=mock_user)
|
|
mock_provider_fn.return_value = mock_provider
|
|
|
|
from app.gateway.deps import get_current_user_from_request
|
|
|
|
with pytest.raises(HTTPException) as exc_info:
|
|
asyncio.run(get_current_user_from_request(mock_request))
|
|
assert exc_info.value.status_code == 401
|
|
assert "revoked" in str(exc_info.value.detail).lower()
|
|
|
|
|
|
# ── change-password extension ──────────────────────────────────────────────
|
|
|
|
|
|
def test_change_password_request_accepts_new_email():
|
|
"""ChangePasswordRequest model accepts optional new_email."""
|
|
from app.gateway.routers.auth import ChangePasswordRequest
|
|
|
|
req = ChangePasswordRequest(
|
|
current_password="old",
|
|
new_password="newpassword",
|
|
new_email="new@example.com",
|
|
)
|
|
assert req.new_email == "new@example.com"
|
|
|
|
|
|
def test_change_password_request_new_email_optional():
|
|
"""ChangePasswordRequest model works without new_email."""
|
|
from app.gateway.routers.auth import ChangePasswordRequest
|
|
|
|
req = ChangePasswordRequest(current_password="old", new_password="newpassword")
|
|
assert req.new_email is None
|
|
|
|
|
|
def test_login_response_includes_needs_setup():
|
|
"""LoginResponse includes needs_setup field."""
|
|
from app.gateway.routers.auth import LoginResponse
|
|
|
|
resp = LoginResponse(expires_in=3600, needs_setup=True)
|
|
assert resp.needs_setup is True
|
|
resp2 = LoginResponse(expires_in=3600)
|
|
assert resp2.needs_setup is False
|
|
|
|
|
|
# ── Rate Limiting ──────────────────────────────────────────────────────────
|
|
|
|
|
|
def test_rate_limiter_allows_under_limit():
|
|
"""Requests under the limit are allowed."""
|
|
from app.gateway.routers.auth import _check_rate_limit, _login_attempts
|
|
|
|
_login_attempts.clear()
|
|
_check_rate_limit("192.168.1.1") # Should not raise
|
|
|
|
|
|
def test_rate_limiter_blocks_after_max_failures():
|
|
"""IP is blocked after 5 consecutive failures."""
|
|
from app.gateway.routers.auth import _check_rate_limit, _login_attempts, _record_login_failure
|
|
|
|
_login_attempts.clear()
|
|
ip = "10.0.0.1"
|
|
for _ in range(5):
|
|
_record_login_failure(ip)
|
|
with pytest.raises(HTTPException) as exc_info:
|
|
_check_rate_limit(ip)
|
|
assert exc_info.value.status_code == 429
|
|
|
|
|
|
def test_rate_limiter_resets_on_success():
|
|
"""Successful login clears the failure counter."""
|
|
from app.gateway.routers.auth import _check_rate_limit, _login_attempts, _record_login_failure, _record_login_success
|
|
|
|
_login_attempts.clear()
|
|
ip = "10.0.0.2"
|
|
for _ in range(4):
|
|
_record_login_failure(ip)
|
|
_record_login_success(ip)
|
|
_check_rate_limit(ip) # Should not raise
|
|
|
|
|
|
# ── Client IP extraction ─────────────────────────────────────────────────
|
|
|
|
|
|
def test_get_client_ip_direct_connection_no_proxy(monkeypatch):
|
|
"""Direct mode (no AUTH_TRUSTED_PROXIES): use TCP peer regardless of X-Real-IP."""
|
|
monkeypatch.delenv("AUTH_TRUSTED_PROXIES", raising=False)
|
|
from app.gateway.routers.auth import _get_client_ip
|
|
|
|
req = MagicMock()
|
|
req.client.host = "203.0.113.42"
|
|
req.headers = {}
|
|
assert _get_client_ip(req) == "203.0.113.42"
|
|
|
|
|
|
def test_get_client_ip_x_real_ip_ignored_when_no_trusted_proxy(monkeypatch):
|
|
"""X-Real-IP is silently ignored if AUTH_TRUSTED_PROXIES is unset.
|
|
|
|
This closes the bypass where any client could rotate X-Real-IP per
|
|
request to dodge per-IP rate limits in dev / direct mode.
|
|
"""
|
|
monkeypatch.delenv("AUTH_TRUSTED_PROXIES", raising=False)
|
|
from app.gateway.routers.auth import _get_client_ip
|
|
|
|
req = MagicMock()
|
|
req.client.host = "127.0.0.1"
|
|
req.headers = {"x-real-ip": "203.0.113.42"}
|
|
assert _get_client_ip(req) == "127.0.0.1"
|
|
|
|
|
|
def test_get_client_ip_x_real_ip_honored_from_trusted_proxy(monkeypatch):
|
|
"""X-Real-IP is honored when the TCP peer matches AUTH_TRUSTED_PROXIES."""
|
|
monkeypatch.setenv("AUTH_TRUSTED_PROXIES", "10.0.0.0/8")
|
|
from app.gateway.routers.auth import _get_client_ip
|
|
|
|
req = MagicMock()
|
|
req.client.host = "10.5.6.7" # in trusted CIDR
|
|
req.headers = {"x-real-ip": "203.0.113.42"}
|
|
assert _get_client_ip(req) == "203.0.113.42"
|
|
|
|
|
|
def test_get_client_ip_x_real_ip_rejected_from_untrusted_peer(monkeypatch):
|
|
"""X-Real-IP is rejected when the TCP peer is NOT in the trusted list."""
|
|
monkeypatch.setenv("AUTH_TRUSTED_PROXIES", "10.0.0.0/8")
|
|
from app.gateway.routers.auth import _get_client_ip
|
|
|
|
req = MagicMock()
|
|
req.client.host = "8.8.8.8" # NOT in trusted CIDR
|
|
req.headers = {"x-real-ip": "203.0.113.42"} # client trying to spoof
|
|
assert _get_client_ip(req) == "8.8.8.8"
|
|
|
|
|
|
def test_get_client_ip_xff_never_honored(monkeypatch):
|
|
"""X-Forwarded-For is never used; only X-Real-IP from a trusted peer."""
|
|
monkeypatch.setenv("AUTH_TRUSTED_PROXIES", "10.0.0.0/8")
|
|
from app.gateway.routers.auth import _get_client_ip
|
|
|
|
req = MagicMock()
|
|
req.client.host = "10.0.0.1"
|
|
req.headers = {"x-forwarded-for": "198.51.100.5"} # no x-real-ip
|
|
assert _get_client_ip(req) == "10.0.0.1"
|
|
|
|
|
|
def test_get_client_ip_invalid_trusted_proxy_entry_skipped(monkeypatch, caplog):
|
|
"""Garbage entries in AUTH_TRUSTED_PROXIES are warned and skipped."""
|
|
monkeypatch.setenv("AUTH_TRUSTED_PROXIES", "not-an-ip,10.0.0.0/8")
|
|
from app.gateway.routers.auth import _get_client_ip
|
|
|
|
req = MagicMock()
|
|
req.client.host = "10.5.6.7"
|
|
req.headers = {"x-real-ip": "203.0.113.42"}
|
|
assert _get_client_ip(req) == "203.0.113.42" # valid entry still works
|
|
|
|
|
|
def test_get_client_ip_no_client_returns_unknown(monkeypatch):
|
|
"""No request.client → 'unknown' marker (no crash)."""
|
|
monkeypatch.delenv("AUTH_TRUSTED_PROXIES", raising=False)
|
|
from app.gateway.routers.auth import _get_client_ip
|
|
|
|
req = MagicMock()
|
|
req.client = None
|
|
req.headers = {}
|
|
assert _get_client_ip(req) == "unknown"
|
|
|
|
|
|
# ── Common-password blocklist ────────────────────────────────────────────────
|
|
|
|
|
|
def test_register_rejects_literal_password():
|
|
"""Pydantic validator rejects 'password' as a registration password."""
|
|
from pydantic import ValidationError
|
|
|
|
from app.gateway.routers.auth import RegisterRequest
|
|
|
|
with pytest.raises(ValidationError) as exc:
|
|
RegisterRequest(email="x@example.com", password="password")
|
|
assert "too common" in str(exc.value)
|
|
|
|
|
|
def test_register_rejects_common_password_case_insensitive():
|
|
"""Case variants of common passwords are also rejected."""
|
|
from pydantic import ValidationError
|
|
|
|
from app.gateway.routers.auth import RegisterRequest
|
|
|
|
for variant in ["PASSWORD", "Password1", "qwerty123", "letmein1"]:
|
|
with pytest.raises(ValidationError):
|
|
RegisterRequest(email="x@example.com", password=variant)
|
|
|
|
|
|
def test_register_accepts_strong_password():
|
|
"""A non-blocklisted password of length >=8 is accepted."""
|
|
from app.gateway.routers.auth import RegisterRequest
|
|
|
|
req = RegisterRequest(email="x@example.com", password="Tr0ub4dor&3-Horse")
|
|
assert req.password == "Tr0ub4dor&3-Horse"
|
|
|
|
|
|
def test_change_password_rejects_common_password():
|
|
"""The same blocklist applies to change-password."""
|
|
from pydantic import ValidationError
|
|
|
|
from app.gateway.routers.auth import ChangePasswordRequest
|
|
|
|
with pytest.raises(ValidationError):
|
|
ChangePasswordRequest(current_password="anything", new_password="iloveyou")
|
|
|
|
|
|
def test_password_blocklist_keeps_short_passwords_for_length_check():
|
|
"""Short passwords still fail the min_length check (not the blocklist)."""
|
|
from pydantic import ValidationError
|
|
|
|
from app.gateway.routers.auth import RegisterRequest
|
|
|
|
with pytest.raises(ValidationError) as exc:
|
|
RegisterRequest(email="x@example.com", password="abc")
|
|
# the length check should fire, not the blocklist
|
|
assert "at least 8 characters" in str(exc.value)
|
|
|
|
|
|
# ── Weak JWT secret warning ──────────────────────────────────────────────────
|
|
|
|
|
|
def test_missing_jwt_secret_generates_ephemeral(monkeypatch, caplog):
|
|
"""get_auth_config() auto-generates an ephemeral secret when AUTH_JWT_SECRET is unset."""
|
|
import logging
|
|
|
|
import app.gateway.auth.config as config_module
|
|
|
|
config_module._auth_config = None
|
|
monkeypatch.delenv("AUTH_JWT_SECRET", raising=False)
|
|
|
|
with caplog.at_level(logging.WARNING):
|
|
config = config_module.get_auth_config()
|
|
|
|
assert config.jwt_secret # non-empty ephemeral secret
|
|
assert any("AUTH_JWT_SECRET" in msg for msg in caplog.messages)
|
|
|
|
# Cleanup
|
|
config_module._auth_config = None
|