Compare commits

...

6 Commits

Author SHA1 Message Date
Willem Jiang 001f6efcff fix(chat): prevent first user message from being swallowed in new conversations
The optimistic message clearing effect cleared too eagerly — any stream
  message (including AI messages from messages-tuple events) triggered the
  clear before the server's human message had arrived via values events.
  For new threads this caused the user's first prompt to disappear permanently.

  Only clear optimistic messages once the server's human message has been
  confirmed to arrive in thread.messages, not just when any message arrives.

  Fixes #2730
2026-05-05 20:24:03 +08:00
Nan Gao e8675f266d fix(loop-detection): keep tool-call pairing on warn injection (#2724) (#2725)
* fix(loop-detection): keep tool-call pairing on warn injection (#2724)

* make format

* fix(loop-detection): avoid IMMessage leak to downstream consumer

* fix(channels): filter loop warning text from IM replies
2026-05-05 18:53:49 +08:00
Xun 680187ddc2 fix: Supplement list_running in RemoteSandboxBackend (#2716)
* fix: Supplement list_running in RemoteSandboxBackend

* fix

* except requests.RequestException as exc:

* fix
2026-05-05 18:53:10 +08:00
Xinmin Zeng aded753de3 fix(frontend): restore localhost fallback for getGatewayConfig in prod mode (#2705) (#2718)
* fix(frontend): unify gateway-config localhost fallback for prod (#2705)

`getGatewayConfig()` only fell back to localhost defaults when
`NODE_ENV === "development"`, while `next.config.js` always falls back
to `127.0.0.1:8001`. Running `make start` (which sets NODE_ENV=production
via `next start`) without `DEER_FLOW_INTERNAL_GATEWAY_BASE_URL` /
`DEER_FLOW_TRUSTED_ORIGINS` therefore caused zod to throw inside SSR
layouts and surfaced as a 500.

Drop the NODE_ENV gating and use localhost defaults everywhere — the
"force explicit config in prod" intent should be enforced by deployment
templates (docker-compose already sets both vars), not by request-time
crashes. Document the two vars in both .env.example files and add unit
coverage for the dev/prod env-unset paths.

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Update internalGatewayUrl in gateway config tests

---------

Co-authored-by: Willem Jiang <willem.jiang@gmail.com>
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
2026-05-05 16:27:29 +08:00
Willem Jiang 028493bfd8 fix(docker):force ngix to resolve upstream names at request time (#2717)
* fix(docker):force ngix to resolve upstream names at request time

* fix(docker): set resolver valid=0s to eliminate DNS cache window for request-time re-resolution

Agent-Logs-Url: https://github.com/bytedance/deer-flow/sessions/07bdb872-022f-4fd2-9fa8-d800a4ce34a7

Co-authored-by: WillemJiang <219644+WillemJiang@users.noreply.github.com>

* Update DNS resolver valid time and add upstreams

* fix the unit test error

* Remove upstream server configurations from nginx.conf

Removed upstream server configurations for gateway and frontend.

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
2026-05-05 14:35:55 +08:00
Willem Jiang 8e48b7e85c fix(channels): preserve clarification conversation history across follow-up turns (#2444)
* fix(channels): preserve clarification conversation history across follow-up turns

Pin channel-triggered runs to the root checkpoint namespace and ensure thread_id is always present in configurable run config so follow-up replies resume the same conversation state.

Add regression coverage to channel tests:

assert checkpoint_ns/thread_id are passed in wait and stream paths
add an integration-style clarification flow test that verifies the second user reply continues prior context instead of starting a new session
This addresses history loss after ask_clarification interruptions (issue #2425).

* Apply suggestions from code review

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* fix(channels): copy configurable dict before injecting run-scoped fields

  When configurable was already a plain dict, _resolve_run_params mutated
  it in place, leaking checkpoint_ns and thread_id back into the shared
  session config. Always copy via dict() before mutating to prevent
  cross-user or cross-channel config pollution.

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2026-05-04 16:14:07 +08:00
13 changed files with 757 additions and 53 deletions
+11
View File
@@ -48,3 +48,14 @@ INFOQUEST_API_KEY=your-infoquest-api-key
# Set to "false" to disable Swagger UI, ReDoc, and OpenAPI schema in production
# GATEWAY_ENABLE_DOCS=false
# ── Frontend SSR → Gateway wiring ─────────────────────────────────────────────
# The Next.js server uses these to reach the Gateway during SSR (auth checks,
# /api/* rewrites). They default to localhost values that match `make dev` and
# `make start`, so most local users do not need to set them.
#
# Override only when the Gateway is not on localhost:8001 (e.g. when the
# frontend and gateway run on different hosts, in containers with a service
# alias, or behind a different port). docker-compose already sets these.
# DEER_FLOW_INTERNAL_GATEWAY_BASE_URL=http://localhost:8001
# DEER_FLOW_TRUSTED_ORIGINS=http://localhost:3000,http://localhost:2026
+26 -1
View File
@@ -146,6 +146,13 @@ def _normalize_custom_agent_name(raw_value: str) -> str:
return normalized
def _strip_loop_warning_text(text: str) -> str:
"""Remove middleware-authored loop warning lines from display text."""
if "[LOOP DETECTED]" not in text:
return text
return "\n".join(line for line in text.splitlines() if "[LOOP DETECTED]" not in line).strip()
def _extract_response_text(result: dict | list) -> str:
"""Extract the last AI message text from a LangGraph runs.wait result.
@@ -155,7 +162,7 @@ def _extract_response_text(result: dict | list) -> str:
Handles special cases:
- Regular AI text responses
- Clarification interrupts (``ask_clarification`` tool messages)
- AI messages with tool_calls but no text content
- Strips loop-detection warnings attached to tool-call AI messages
"""
if isinstance(result, list):
messages = result
@@ -185,7 +192,12 @@ def _extract_response_text(result: dict | list) -> str:
# Regular AI message with text content
if msg_type == "ai":
content = msg.get("content", "")
has_tool_calls = bool(msg.get("tool_calls"))
if isinstance(content, str) and content:
if has_tool_calls:
content = _strip_loop_warning_text(content)
if not content:
continue
return content
# content can be a list of content blocks
if isinstance(content, list):
@@ -196,6 +208,8 @@ def _extract_response_text(result: dict | list) -> str:
elif isinstance(block, str):
parts.append(block)
text = "".join(parts)
if has_tool_calls:
text = _strip_loop_warning_text(text)
if text:
return text
return ""
@@ -589,6 +603,17 @@ class ChannelManager:
user_layer.get("config"),
)
configurable = run_config.get("configurable")
if isinstance(configurable, Mapping):
configurable = dict(configurable)
else:
configurable = {}
run_config["configurable"] = configurable
# Pin channel-triggered runs to the root graph namespace so follow-up
# turns continue from the same conversation checkpoint.
configurable["checkpoint_ns"] = ""
configurable["thread_id"] = thread_id
run_context = _merge_dicts(
DEFAULT_RUN_CONTEXT,
self._default_session.get("context"),
@@ -22,7 +22,6 @@ from typing import override
from langchain.agents import AgentState
from langchain.agents.middleware import AgentMiddleware
from langchain_core.messages import HumanMessage
from langgraph.runtime import Runtime
logger = logging.getLogger(__name__)
@@ -356,13 +355,30 @@ class LoopDetectionMiddleware(AgentMiddleware[AgentState]):
return {"messages": [stripped_msg]}
if warning:
# Inject as HumanMessage instead of SystemMessage to avoid
# Anthropic's "multiple non-consecutive system messages" error.
# Anthropic models require system messages only at the start of
# the conversation; injecting one mid-conversation crashes
# langchain_anthropic's _format_messages(). HumanMessage works
# with all providers. See #1299.
return {"messages": [HumanMessage(content=warning, name="loop_warning")]}
# WORKAROUND for v2.0-m1 — see #2724.
#
# Append the warning to the AIMessage content instead of
# injecting a separate HumanMessage. Inserting any non-tool
# message between an AIMessage(tool_calls=...) and its
# ToolMessage responses breaks OpenAI/Moonshot strict pairing
# validation ("tool_call_ids did not have response messages")
# because the tools node has not run yet at after_model time.
# tool_calls are preserved so the tools node still executes.
#
# This is a temporary mitigation: mutating an existing
# AIMessage to carry framework-authored text leaks loop-warning
# text into downstream consumers (MemoryMiddleware fact
# extraction, TitleMiddleware, telemetry, model replay) as if
# the model said it. The proper fix is to defer warning
# injection from after_model to wrap_model_call so every prior
# ToolMessage is already in the request — see RFC #2517 (which
# lists "loop intervention does not leave invalid
# tool-call/tool-message state" as acceptance criteria) and
# the prototype on `fix/loop-detection-tool-call-pairing`.
messages = state.get("messages", [])
last_msg = messages[-1]
patched_msg = last_msg.model_copy(update={"content": self._append_text(last_msg.content, warning)})
return {"messages": [patched_msg]}
return None
@@ -84,8 +84,52 @@ class RemoteSandboxBackend(SandboxBackend):
"""
return self._provisioner_discover(sandbox_id)
def list_running(self) -> list[SandboxInfo]:
"""Return all sandboxes currently managed by the provisioner.
Calls ``GET /api/sandboxes`` so that ``AioSandboxProvider._reconcile_orphans()``
can adopt pods that were created by a previous process and were never
explicitly destroyed.
Without this, a process restart silently orphans all existing k8s Pods —
they stay running forever because the idle checker only
tracks in-process state.
"""
return self._provisioner_list()
# ── Provisioner API calls ─────────────────────────────────────────────
def _provisioner_list(self) -> list[SandboxInfo]:
"""GET /api/sandboxes → list all running sandboxes."""
try:
resp = requests.get(f"{self._provisioner_url}/api/sandboxes", timeout=10)
resp.raise_for_status()
data = resp.json()
if not isinstance(data, dict):
logger.warning("Provisioner list_running returned non-dict payload: %r", type(data))
return []
sandboxes = data.get("sandboxes", [])
if not isinstance(sandboxes, list):
logger.warning("Provisioner list_running returned non-list sandboxes: %r", type(sandboxes))
return []
infos: list[SandboxInfo] = []
for sandbox in sandboxes:
if not isinstance(sandbox, dict):
logger.warning("Provisioner list_running entry is not a dict: %r", type(sandbox))
continue
sandbox_id = sandbox.get("sandbox_id")
sandbox_url = sandbox.get("sandbox_url")
if isinstance(sandbox_id, str) and sandbox_id and isinstance(sandbox_url, str) and sandbox_url:
infos.append(SandboxInfo(sandbox_id=sandbox_id, sandbox_url=sandbox_url))
logger.info("Provisioner list_running: %d sandbox(es) found", len(infos))
return infos
except requests.RequestException as exc:
logger.warning("Provisioner list_running failed: %s", exc)
return []
def _provisioner_create(self, thread_id: str, sandbox_id: str, extra_mounts: list[tuple[str, str, bool]] | None = None) -> SandboxInfo:
"""POST /api/sandboxes → create Pod + Service."""
try:
+158
View File
@@ -372,6 +372,37 @@ class TestExtractResponseText:
# Should return "" (no text in current turn), NOT "Hi there!" from previous turn
assert _extract_response_text(result) == ""
def test_does_not_publish_loop_warning_on_tool_calling_ai_message(self):
"""Loop-detection warning text on a tool-calling AI message is middleware-authored."""
from app.channels.manager import _extract_response_text
result = {
"messages": [
{"type": "human", "content": "search the repo"},
{
"type": "ai",
"content": "[LOOP DETECTED] You are repeating the same tool calls.",
"tool_calls": [{"name": "grep", "args": {"pattern": "TODO"}, "id": "call_1"}],
},
]
}
assert _extract_response_text(result) == ""
def test_preserves_visible_text_when_stripping_loop_warning(self):
from app.channels.manager import _extract_response_text
result = {
"messages": [
{"type": "human", "content": "prepare the report"},
{
"type": "ai",
"content": "Here is the report.\n\n[LOOP DETECTED] You are repeating the same tool calls.",
"tool_calls": [{"name": "present_files", "args": {"filepaths": ["/mnt/user-data/outputs/report.md"]}, "id": "call_1"}],
},
]
}
assert _extract_response_text(result) == "Here is the report."
# ---------------------------------------------------------------------------
# ChannelManager tests
@@ -530,6 +561,8 @@ class TestChannelManager:
assert call_args[0][0] == "test-thread-123" # thread_id
assert call_args[0][1] == "lead_agent" # assistant_id
assert call_args[1]["input"]["messages"][0]["content"] == "hi"
assert call_args[1]["config"]["configurable"]["checkpoint_ns"] == ""
assert call_args[1]["config"]["configurable"]["thread_id"] == "test-thread-123"
assert len(outbound_received) == 1
assert outbound_received[0].text == "Hello from agent!"
@@ -661,12 +694,135 @@ class TestChannelManager:
call_args = mock_client.runs.wait.call_args
assert call_args[0][1] == "lead_agent"
assert call_args[1]["config"]["recursion_limit"] == 55
assert call_args[1]["config"]["configurable"]["checkpoint_ns"] == ""
assert call_args[1]["config"]["configurable"]["thread_id"] == "test-thread-123"
assert call_args[1]["context"]["thinking_enabled"] is False
assert call_args[1]["context"]["subagent_enabled"] is True
assert call_args[1]["context"]["agent_name"] == "mobile-agent"
_run(go())
def test_clarification_follow_up_preserves_history(self):
"""Conversation should continue after ask_clarification instead of resetting history."""
from app.channels.manager import ChannelManager
async def go():
bus = MessageBus()
store = ChannelStore(path=Path(tempfile.mkdtemp()) / "store.json")
manager = ChannelManager(bus=bus, store=store)
outbound_received = []
async def capture_outbound(msg):
outbound_received.append(msg)
bus.subscribe_outbound(capture_outbound)
history_by_checkpoint: dict[tuple[str, str], list[str]] = {}
async def _runs_wait(thread_id, assistant_id, *, input, config, context):
del assistant_id, context # unused in this test, kept for signature parity
checkpoint_ns = config.get("configurable", {}).get("checkpoint_ns")
key = (thread_id, str(checkpoint_ns))
history = history_by_checkpoint.setdefault(key, [])
human_text = input["messages"][0]["content"]
history.append(human_text)
if len(history) == 1:
return {
"messages": [
{"type": "human", "content": history[0]},
{
"type": "ai",
"content": "",
"tool_calls": [
{
"name": "ask_clarification",
"args": {"question": "Which environment should I use?"},
}
],
},
{
"type": "tool",
"name": "ask_clarification",
"content": "Which environment should I use?",
},
]
}
if len(history) == 2 and history[0] == "Deploy my app" and history[1] == "prod":
return {
"messages": [
{"type": "human", "content": history[0]},
{
"type": "ai",
"content": "",
"tool_calls": [
{
"name": "ask_clarification",
"args": {"question": "Which environment should I use?"},
}
],
},
{
"type": "tool",
"name": "ask_clarification",
"content": "Which environment should I use?",
},
{"type": "human", "content": history[1]},
{"type": "ai", "content": "Got it. I will deploy to prod."},
]
}
return {
"messages": [
{"type": "human", "content": history[-1]},
{"type": "ai", "content": "History missing; clarification repeated."},
]
}
mock_client = MagicMock()
mock_client.threads.create = AsyncMock(return_value={"thread_id": "clarify-thread-1"})
mock_client.threads.get = AsyncMock(return_value={"thread_id": "clarify-thread-1"})
mock_client.runs.wait = AsyncMock(side_effect=_runs_wait)
manager._client = mock_client
await manager.start()
await bus.publish_inbound(
InboundMessage(
channel_name="test",
chat_id="chat1",
user_id="user1",
text="Deploy my app",
)
)
await _wait_for(lambda: len(outbound_received) >= 1)
await bus.publish_inbound(
InboundMessage(
channel_name="test",
chat_id="chat1",
user_id="user1",
text="prod",
)
)
await _wait_for(lambda: len(outbound_received) >= 2)
await manager.stop()
assert outbound_received[0].text == "Which environment should I use?"
assert outbound_received[1].text == "Got it. I will deploy to prod."
assert mock_client.runs.wait.call_count == 2
first_call = mock_client.runs.wait.call_args_list[0]
second_call = mock_client.runs.wait.call_args_list[1]
assert first_call.kwargs["config"]["configurable"]["checkpoint_ns"] == ""
assert second_call.kwargs["config"]["configurable"]["checkpoint_ns"] == ""
_run(go())
def test_handle_chat_uses_user_session_overrides(self):
from app.channels.manager import ChannelManager
@@ -1343,6 +1499,8 @@ class TestChannelManager:
call_args = mock_client.runs.stream.call_args
assert call_args[1]["input"]["messages"][0]["content"] == "hello"
assert call_args[1]["config"]["configurable"]["checkpoint_ns"] == ""
assert call_args[1]["config"]["configurable"]["thread_id"] == "test-thread-123"
assert call_args[1]["context"]["is_bootstrap"] is True
# Final message should be published
@@ -50,7 +50,7 @@ def test_nginx_routes_official_langgraph_prefix_to_gateway_api():
assert "/api/langgraph-compat" not in content
assert "proxy_pass http://langgraph" not in content
assert "rewrite ^/api/langgraph/(.*) /api/$1 break;" in content
assert "proxy_pass http://gateway" in content
assert "proxy_pass http://gateway" in content or "proxy_pass http://$gateway_upstream" in content
def test_frontend_rewrites_langgraph_prefix_to_gateway():
@@ -3,7 +3,7 @@
import copy
from unittest.mock import MagicMock
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage
from langchain_core.messages import AIMessage, SystemMessage
from deerflow.agents.middlewares.loop_detection_middleware import (
_HARD_STOP_MSG,
@@ -146,14 +146,42 @@ class TestLoopDetection:
for _ in range(2):
mw._apply(_make_state(tool_calls=call), runtime)
# Third identical call triggers warning
# Third identical call triggers warning. The warning is appended to
# the AIMessage content (tool_calls preserved) — never inserted as a
# separate HumanMessage between the AIMessage(tool_calls) and its
# ToolMessage responses, which would break OpenAI/Moonshot strict
# tool-call pairing validation.
result = mw._apply(_make_state(tool_calls=call), runtime)
assert result is not None
msgs = result["messages"]
assert len(msgs) == 1
assert isinstance(msgs[0], HumanMessage)
assert isinstance(msgs[0], AIMessage)
assert len(msgs[0].tool_calls) == len(call)
assert msgs[0].tool_calls[0]["id"] == call[0]["id"]
assert "LOOP DETECTED" in msgs[0].content
def test_warn_does_not_break_tool_call_pairing(self):
"""Regression: the warn branch must NOT inject a non-tool message
after an AIMessage(tool_calls=...). Moonshot/OpenAI reject the next
request with 'tool_call_ids did not have response messages' if any
non-tool message is wedged between the AIMessage and its ToolMessage
responses. See #2029.
"""
mw = LoopDetectionMiddleware(warn_threshold=3, hard_limit=10)
runtime = _make_runtime()
call = [_bash_call("ls")]
for _ in range(2):
mw._apply(_make_state(tool_calls=call), runtime)
result = mw._apply(_make_state(tool_calls=call), runtime)
assert result is not None
msgs = result["messages"]
assert len(msgs) == 1
assert isinstance(msgs[0], AIMessage)
assert len(msgs[0].tool_calls) == len(call)
assert msgs[0].tool_calls[0]["id"] == call[0]["id"]
def test_warn_only_injected_once(self):
"""Warning for the same hash should only be injected once per thread."""
mw = LoopDetectionMiddleware(warn_threshold=3, hard_limit=10)
@@ -483,7 +511,11 @@ class TestToolFrequencyDetection:
result = mw._apply(_make_state(tool_calls=[self._read_call("/file_4.py")]), runtime)
assert result is not None
msg = result["messages"][0]
assert isinstance(msg, HumanMessage)
# Warning is appended to the AIMessage content; tool_calls preserved
# so the tools node still runs and Moonshot/OpenAI tool-call pairing
# validation does not break.
assert isinstance(msg, AIMessage)
assert msg.tool_calls
assert "read_file" in msg.content
assert "LOOP DETECTED" in msg.content
@@ -0,0 +1,293 @@
from __future__ import annotations
import pytest
import requests
from deerflow.community.aio_sandbox.remote_backend import RemoteSandboxBackend
from deerflow.community.aio_sandbox.sandbox_info import SandboxInfo
class _StubResponse:
def __init__(
self,
*,
status_code: int = 200,
payload: object | None = None,
json_exc: Exception | None = None,
):
self.status_code = status_code
self._payload = {} if payload is None else payload
self._json_exc = json_exc
self.ok = 200 <= status_code < 400
self.text = ""
def raise_for_status(self) -> None:
if self.status_code >= 400:
raise requests.HTTPError(f"HTTP {self.status_code}")
def json(self) -> object:
if self._json_exc is not None:
raise self._json_exc
return self._payload
def test_list_running_delegates_to_provisioner_list(monkeypatch):
backend = RemoteSandboxBackend("http://provisioner:8002")
sandbox_info = SandboxInfo(sandbox_id="test-id", sandbox_url="http://localhost:8080")
def mock_list():
return [sandbox_info]
monkeypatch.setattr(backend, "_provisioner_list", mock_list)
assert backend.list_running() == [sandbox_info]
def test_provisioner_list_returns_sandbox_infos_and_filters_invalid_entries(monkeypatch):
backend = RemoteSandboxBackend("http://provisioner:8002")
def mock_get(url: str, timeout: int):
assert url == "http://provisioner:8002/api/sandboxes"
assert timeout == 10
return _StubResponse(
payload={
"sandboxes": [
{"sandbox_id": "abc123", "sandbox_url": "http://k3s:31001"},
{"sandbox_id": "missing-url"},
{"sandbox_url": "http://k3s:31002"},
]
}
)
monkeypatch.setattr(requests, "get", mock_get)
infos = backend._provisioner_list()
assert len(infos) == 1
assert infos[0].sandbox_id == "abc123"
assert infos[0].sandbox_url == "http://k3s:31001"
def test_provisioner_list_returns_empty_on_request_exception(monkeypatch):
backend = RemoteSandboxBackend("http://provisioner:8002")
def mock_get(url: str, timeout: int):
raise requests.RequestException("network down")
monkeypatch.setattr(requests, "get", mock_get)
assert backend._provisioner_list() == []
def test_provisioner_list_returns_empty_when_payload_is_not_dict(monkeypatch):
backend = RemoteSandboxBackend("http://provisioner:8002")
def mock_get(url: str, timeout: int):
return _StubResponse(payload=[{"sandbox_id": "abc", "sandbox_url": "http://k3s:31001"}])
monkeypatch.setattr(requests, "get", mock_get)
assert backend._provisioner_list() == []
def test_provisioner_list_returns_empty_when_sandboxes_is_not_list(monkeypatch):
backend = RemoteSandboxBackend("http://provisioner:8002")
def mock_get(url: str, timeout: int):
return _StubResponse(payload={"sandboxes": {"sandbox_id": "abc"}})
monkeypatch.setattr(requests, "get", mock_get)
assert backend._provisioner_list() == []
def test_provisioner_list_skips_non_dict_sandbox_entries(monkeypatch):
backend = RemoteSandboxBackend("http://provisioner:8002")
def mock_get(url: str, timeout: int):
return _StubResponse(
payload={
"sandboxes": [
{"sandbox_id": "abc123", "sandbox_url": "http://k3s:31001"},
"bad-entry",
123,
None,
]
}
)
monkeypatch.setattr(requests, "get", mock_get)
infos = backend._provisioner_list()
assert len(infos) == 1
assert infos[0].sandbox_id == "abc123"
assert infos[0].sandbox_url == "http://k3s:31001"
def test_create_delegates_to_provisioner_create(monkeypatch):
backend = RemoteSandboxBackend("http://provisioner:8002")
expected = SandboxInfo(sandbox_id="abc123", sandbox_url="http://k3s:31001")
def mock_create(thread_id: str, sandbox_id: str, extra_mounts=None):
assert thread_id == "thread-1"
assert sandbox_id == "abc123"
assert extra_mounts == [("/host", "/container", False)]
return expected
monkeypatch.setattr(backend, "_provisioner_create", mock_create)
result = backend.create("thread-1", "abc123", extra_mounts=[("/host", "/container", False)])
assert result == expected
def test_provisioner_create_returns_sandbox_info(monkeypatch):
backend = RemoteSandboxBackend("http://provisioner:8002")
def mock_post(url: str, json: dict, timeout: int):
assert url == "http://provisioner:8002/api/sandboxes"
assert json == {"sandbox_id": "abc123", "thread_id": "thread-1"}
assert timeout == 30
return _StubResponse(payload={"sandbox_id": "abc123", "sandbox_url": "http://k3s:31001"})
monkeypatch.setattr(requests, "post", mock_post)
info = backend._provisioner_create("thread-1", "abc123")
assert info.sandbox_id == "abc123"
assert info.sandbox_url == "http://k3s:31001"
def test_provisioner_create_raises_runtime_error_on_request_exception(monkeypatch):
backend = RemoteSandboxBackend("http://provisioner:8002")
def mock_post(url: str, json: dict, timeout: int):
raise requests.RequestException("boom")
monkeypatch.setattr(requests, "post", mock_post)
with pytest.raises(RuntimeError, match="Provisioner create failed"):
backend._provisioner_create("thread-1", "abc123")
def test_destroy_delegates_to_provisioner_destroy(monkeypatch):
backend = RemoteSandboxBackend("http://provisioner:8002")
called: list[str] = []
def mock_destroy(sandbox_id: str):
called.append(sandbox_id)
monkeypatch.setattr(backend, "_provisioner_destroy", mock_destroy)
backend.destroy(SandboxInfo(sandbox_id="abc123", sandbox_url="http://k3s:31001"))
assert called == ["abc123"]
def test_provisioner_destroy_calls_delete(monkeypatch):
backend = RemoteSandboxBackend("http://provisioner:8002")
def mock_delete(url: str, timeout: int):
assert url == "http://provisioner:8002/api/sandboxes/abc123"
assert timeout == 15
return _StubResponse(status_code=200)
monkeypatch.setattr(requests, "delete", mock_delete)
backend._provisioner_destroy("abc123")
def test_provisioner_destroy_swallows_request_exception(monkeypatch):
backend = RemoteSandboxBackend("http://provisioner:8002")
def mock_delete(url: str, timeout: int):
raise requests.RequestException("network down")
monkeypatch.setattr(requests, "delete", mock_delete)
backend._provisioner_destroy("abc123")
def test_is_alive_delegates_to_provisioner_is_alive(monkeypatch):
backend = RemoteSandboxBackend("http://provisioner:8002")
def mock_is_alive(sandbox_id: str):
assert sandbox_id == "abc123"
return True
monkeypatch.setattr(backend, "_provisioner_is_alive", mock_is_alive)
alive = backend.is_alive(SandboxInfo(sandbox_id="abc123", sandbox_url="http://k3s:31001"))
assert alive is True
def test_provisioner_is_alive_true_only_when_status_running(monkeypatch):
backend = RemoteSandboxBackend("http://provisioner:8002")
def mock_get_running(url: str, timeout: int):
return _StubResponse(payload={"status": "Running"})
monkeypatch.setattr(requests, "get", mock_get_running)
assert backend._provisioner_is_alive("abc123") is True
def mock_get_pending(url: str, timeout: int):
return _StubResponse(payload={"status": "Pending"})
monkeypatch.setattr(requests, "get", mock_get_pending)
assert backend._provisioner_is_alive("abc123") is False
def test_provisioner_is_alive_returns_false_on_request_exception(monkeypatch):
backend = RemoteSandboxBackend("http://provisioner:8002")
def mock_get(url: str, timeout: int):
raise requests.RequestException("boom")
monkeypatch.setattr(requests, "get", mock_get)
assert backend._provisioner_is_alive("abc123") is False
def test_discover_delegates_to_provisioner_discover(monkeypatch):
backend = RemoteSandboxBackend("http://provisioner:8002")
expected = SandboxInfo(sandbox_id="abc123", sandbox_url="http://k3s:31001")
def mock_discover(sandbox_id: str):
assert sandbox_id == "abc123"
return expected
monkeypatch.setattr(backend, "_provisioner_discover", mock_discover)
result = backend.discover("abc123")
assert result == expected
def test_provisioner_discover_returns_none_on_404(monkeypatch):
backend = RemoteSandboxBackend("http://provisioner:8002")
def mock_get(url: str, timeout: int):
return _StubResponse(status_code=404)
monkeypatch.setattr(requests, "get", mock_get)
assert backend._provisioner_discover("abc123") is None
def test_provisioner_discover_returns_info_on_success(monkeypatch):
backend = RemoteSandboxBackend("http://provisioner:8002")
def mock_get(url: str, timeout: int):
return _StubResponse(payload={"sandbox_id": "abc123", "sandbox_url": "http://k3s:31001"})
monkeypatch.setattr(requests, "get", mock_get)
info = backend._provisioner_discover("abc123")
assert info is not None
assert info.sandbox_id == "abc123"
assert info.sandbox_url == "http://k3s:31001"
def test_provisioner_discover_returns_none_on_request_exception(monkeypatch):
backend = RemoteSandboxBackend("http://provisioner:8002")
def mock_get(url: str, timeout: int):
raise requests.RequestException("boom")
monkeypatch.setattr(requests, "get", mock_get)
assert backend._provisioner_discover("abc123") is None
+19 -27
View File
@@ -17,25 +17,17 @@ http {
# Docker internal DNS (for resolving k3s hostname)
resolver 127.0.0.11 valid=10s ipv6=off;
# Upstream servers (using Docker service names)
# NOTE: `zone` and `resolve` are nginx Plus-only features and are not
# available in the standard nginx:alpine image. Docker's internal DNS
# (127.0.0.11) handles service discovery; upstreams are resolved at
# nginx startup and remain valid for the lifetime of the deployment.
upstream gateway {
server gateway:8001;
}
upstream frontend {
server frontend:3000;
}
# ── Main server (path-based routing) ─────────────────────────────────
server {
listen 2026 default_server;
listen [::]:2026 default_server;
server_name _;
# Resolve Docker service names at request time to avoid stale upstream
# IPs when containers restart and receive new addresses.
set $gateway_upstream gateway:8001;
set $frontend_upstream frontend:3000;
# Hide CORS headers from upstream to prevent duplicates
proxy_hide_header 'Access-Control-Allow-Origin';
proxy_hide_header 'Access-Control-Allow-Methods';
@@ -56,7 +48,7 @@ http {
# Rewrites /api/langgraph/* to /api/* before proxying to Gateway.
location /api/langgraph/ {
rewrite ^/api/langgraph/(.*) /api/$1 break;
proxy_pass http://gateway;
proxy_pass http://$gateway_upstream;
proxy_http_version 1.1;
# Headers
@@ -82,7 +74,7 @@ http {
# Custom API: Models endpoint
location /api/models {
proxy_pass http://gateway;
proxy_pass http://$gateway_upstream;
proxy_http_version 1.1;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
@@ -92,7 +84,7 @@ http {
# Custom API: Memory endpoint
location /api/memory {
proxy_pass http://gateway;
proxy_pass http://$gateway_upstream;
proxy_http_version 1.1;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
@@ -102,7 +94,7 @@ http {
# Custom API: MCP configuration endpoint
location /api/mcp {
proxy_pass http://gateway;
proxy_pass http://$gateway_upstream;
proxy_http_version 1.1;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
@@ -112,7 +104,7 @@ http {
# Custom API: Skills configuration endpoint
location /api/skills {
proxy_pass http://gateway;
proxy_pass http://$gateway_upstream;
proxy_http_version 1.1;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
@@ -122,7 +114,7 @@ http {
# Custom API: Agents endpoint
location /api/agents {
proxy_pass http://gateway;
proxy_pass http://$gateway_upstream;
proxy_http_version 1.1;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
@@ -132,7 +124,7 @@ http {
# Custom API: Uploads endpoint
location ~ ^/api/threads/[^/]+/uploads {
proxy_pass http://gateway;
proxy_pass http://$gateway_upstream;
proxy_http_version 1.1;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
@@ -146,7 +138,7 @@ http {
# Custom API: Other endpoints under /api/threads
location ~ ^/api/threads {
proxy_pass http://gateway;
proxy_pass http://$gateway_upstream;
proxy_http_version 1.1;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
@@ -156,7 +148,7 @@ http {
# API Documentation: Swagger UI
location /docs {
proxy_pass http://gateway;
proxy_pass http://$gateway_upstream;
proxy_http_version 1.1;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
@@ -166,7 +158,7 @@ http {
# API Documentation: ReDoc
location /redoc {
proxy_pass http://gateway;
proxy_pass http://$gateway_upstream;
proxy_http_version 1.1;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
@@ -176,7 +168,7 @@ http {
# API Documentation: OpenAPI Schema
location /openapi.json {
proxy_pass http://gateway;
proxy_pass http://$gateway_upstream;
proxy_http_version 1.1;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
@@ -186,7 +178,7 @@ http {
# Health check endpoint (gateway)
location /health {
proxy_pass http://gateway;
proxy_pass http://$gateway_upstream;
proxy_http_version 1.1;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
@@ -210,7 +202,7 @@ http {
# Catch-all for /api/ routes not covered above (e.g. /api/v1/auth/*).
# More specific prefix and regex locations above still take precedence.
location /api/ {
proxy_pass http://gateway;
proxy_pass http://$gateway_upstream;
proxy_http_version 1.1;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
@@ -220,7 +212,7 @@ http {
# All other requests go to frontend
location / {
proxy_pass http://frontend;
proxy_pass http://$frontend_upstream;
proxy_http_version 1.1;
# Headers
+5
View File
@@ -14,3 +14,8 @@
# Only set these if you need to connect to backend services directly
# NEXT_PUBLIC_BACKEND_BASE_URL="http://localhost:8001"
# NEXT_PUBLIC_LANGGRAPH_BASE_URL="http://localhost:2024"
# Server-only Gateway wiring used by SSR (auth checks, /api/* rewrites).
# Defaults to localhost — only override for non-local deployments.
# DEER_FLOW_INTERNAL_GATEWAY_BASE_URL="http://localhost:8001"
# DEER_FLOW_TRUSTED_ORIGINS="http://localhost:3000,http://localhost:2026"
+4 -7
View File
@@ -12,12 +12,11 @@ let _cached: GatewayConfig | null = null;
export function getGatewayConfig(): GatewayConfig {
if (_cached) return _cached;
const isDev = process.env.NODE_ENV === "development";
const rawUrl = process.env.DEER_FLOW_INTERNAL_GATEWAY_BASE_URL?.trim();
const internalGatewayUrl =
rawUrl?.replace(/\/+$/, "") ??
(isDev ? "http://localhost:8001" : undefined);
rawUrl && rawUrl.length > 0
? rawUrl.replace(/\/+$/, "")
: "http://127.0.0.1:8001";
const rawOrigins = process.env.DEER_FLOW_TRUSTED_ORIGINS?.trim();
const trustedOrigins = rawOrigins
@@ -25,9 +24,7 @@ export function getGatewayConfig(): GatewayConfig {
.split(",")
.map((s) => s.trim())
.filter(Boolean)
: isDev
? ["http://localhost:3000"]
: undefined;
: ["http://localhost:3000"];
_cached = gatewayConfigSchema.parse({ internalGatewayUrl, trustedOrigins });
return _cached;
+25 -5
View File
@@ -286,6 +286,13 @@ export function useThreadStream({
const summarizedRef = useRef<Set<string>>(null);
// Track message count before sending so we know when server has responded
const prevMsgCountRef = useRef(thread.messages.length);
// Track human message count before sending to prevent clearing optimistic
// messages before the server's human message arrives (e.g. when AI messages
// from "messages-tuple" events arrive before the input human message from
// "values" events).
const prevHumanMsgCountRef = useRef(
thread.messages.filter((m) => m.type === "human").length,
);
summarizedRef.current ??= new Set<string>();
@@ -296,12 +303,22 @@ export function useThreadStream({
sendInFlightRef.current = false;
}, [threadId]);
// Clear optimistic when server messages arrive (count increases)
// Clear optimistic when server messages arrive.
// For messages with a human optimistic message, wait until the server's
// human message has arrived to avoid clearing before the input message
// appears in the stream (the input message may arrive via "values" events
// after individual "messages-tuple" events for AI messages).
useEffect(() => {
if (
optimisticMessages.length > 0 &&
thread.messages.length > prevMsgCountRef.current
) {
if (optimisticMessages.length === 0) return;
const hasHumanOptimistic = optimisticMessages.some(
(m) => m.type === "human",
);
const newHumanMsgArrived =
thread.messages.filter((m) => m.type === "human").length >
prevHumanMsgCountRef.current;
if (!hasHumanOptimistic || newHumanMsgArrived) {
setOptimisticMessages([]);
}
}, [thread.messages.length, optimisticMessages.length]);
@@ -322,6 +339,9 @@ export function useThreadStream({
// Capture current count before showing optimistic messages
prevMsgCountRef.current = thread.messages.length;
prevHumanMsgCountRef.current = thread.messages.filter(
(m) => m.type === "human",
).length;
// Build optimistic files list with uploading status
const optimisticFiles: FileInMessage[] = (message.files ?? []).map(
@@ -0,0 +1,111 @@
import { afterEach, beforeEach, describe, expect, test, vi } from "vitest";
const ENV_KEYS = [
"NODE_ENV",
"DEER_FLOW_INTERNAL_GATEWAY_BASE_URL",
"DEER_FLOW_TRUSTED_ORIGINS",
] as const;
type EnvSnapshot = Partial<
Record<(typeof ENV_KEYS)[number], string | undefined>
>;
function snapshotEnv(): EnvSnapshot {
const snapshot: EnvSnapshot = {};
for (const key of ENV_KEYS) {
snapshot[key] = process.env[key];
}
return snapshot;
}
function setEnv(key: (typeof ENV_KEYS)[number], value: string | undefined) {
// NODE_ENV is typed as a readonly literal union, so we go through the
// index signature to keep the test compiler-friendly across cases.
const env = process.env as Record<string, string | undefined>;
if (value === undefined) {
delete env[key];
} else {
env[key] = value;
}
}
function restoreEnv(snapshot: EnvSnapshot) {
for (const key of ENV_KEYS) {
setEnv(key, snapshot[key]);
}
}
async function loadFreshConfig() {
vi.resetModules();
return await import("@/core/auth/gateway-config");
}
describe("getGatewayConfig", () => {
let saved: EnvSnapshot;
beforeEach(() => {
saved = snapshotEnv();
setEnv("DEER_FLOW_INTERNAL_GATEWAY_BASE_URL", undefined);
setEnv("DEER_FLOW_TRUSTED_ORIGINS", undefined);
});
afterEach(() => {
restoreEnv(saved);
});
test("returns localhost defaults when env is unset in development", async () => {
setEnv("NODE_ENV", "development");
const { getGatewayConfig } = await loadFreshConfig();
const cfg = getGatewayConfig();
expect(cfg.internalGatewayUrl).toBe("http://127.0.0.1:8001");
expect(cfg.trustedOrigins).toEqual(["http://localhost:3000"]);
});
test("returns localhost defaults when env is unset in production (regression: issue #2705)", async () => {
setEnv("NODE_ENV", "production");
const { getGatewayConfig } = await loadFreshConfig();
expect(() => getGatewayConfig()).not.toThrow();
const cfg = getGatewayConfig();
expect(cfg.internalGatewayUrl).toBe("http://127.0.0.1:8001");
expect(cfg.trustedOrigins).toEqual(["http://localhost:3000"]);
});
test("uses env values verbatim when set, regardless of NODE_ENV", async () => {
setEnv("NODE_ENV", "production");
setEnv("DEER_FLOW_INTERNAL_GATEWAY_BASE_URL", "https://gw.example.com/");
setEnv(
"DEER_FLOW_TRUSTED_ORIGINS",
"https://app.example.com, https://admin.example.com",
);
const { getGatewayConfig } = await loadFreshConfig();
const cfg = getGatewayConfig();
expect(cfg.internalGatewayUrl).toBe("https://gw.example.com");
expect(cfg.trustedOrigins).toEqual([
"https://app.example.com",
"https://admin.example.com",
]);
});
test("trims and filters empty entries in trustedOrigins", async () => {
setEnv("NODE_ENV", "production");
setEnv("DEER_FLOW_INTERNAL_GATEWAY_BASE_URL", "https://gw.example.com");
setEnv(
"DEER_FLOW_TRUSTED_ORIGINS",
" https://a.example , ,https://b.example ",
);
const { getGatewayConfig } = await loadFreshConfig();
const cfg = getGatewayConfig();
expect(cfg.trustedOrigins).toEqual([
"https://a.example",
"https://b.example",
]);
});
});