Merge pull request #6048 from aden-hive/fix/draft-email-tool

(micro-fix): draft email tool
This commit is contained in:
Bryan @ Aden
2026-03-09 02:26:58 +00:00
committed by GitHub
2 changed files with 295 additions and 14 deletions
@@ -488,29 +488,36 @@ def register_tools(
@mcp.tool()
def gmail_create_draft(
to: str,
subject: str,
html: str,
to: str = "",
subject: str = "",
account: str = "",
reply_to_message_id: str = "",
) -> dict:
"""
Create a draft email in the user's Gmail Drafts folder.
The draft can be reviewed and sent manually from Gmail.
To create a real threaded reply (not a new thread), provide
reply_to_message_id. The tool will fetch the original message,
derive recipient and subject automatically, and set the correct
In-Reply-To/References headers so the draft appears in the same thread.
Args:
to: Recipient email address.
subject: Email subject line.
html: Email body as HTML string.
to: Recipient email address. Required when reply_to_message_id is not set.
Ignored when reply_to_message_id is set (derived from original message).
subject: Email subject line. Required when reply_to_message_id is not set.
Ignored when reply_to_message_id is set (derived from original message).
account: Account alias for multi-account routing. Optional.
reply_to_message_id: Gmail message ID to reply to. When provided, creates
the draft as a threaded reply with proper headers.
Returns:
Dict with "success", "draft_id", and "message_id",
Dict with "success", "draft_id", "message_id", and optionally "thread_id",
or error dict with "error" and optional "help" keys.
"""
if not to or not to.strip():
return {"error": "Recipient email (to) is required"}
if not subject or not subject.strip():
return {"error": "Subject is required"}
if not html:
return {"error": "Email body (html) is required"}
@@ -518,20 +525,101 @@ def register_tools(
if isinstance(token, dict):
return token
import html as html_module
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
msg = MIMEText(html, "html")
msg["To"] = to
msg["Subject"] = subject
thread_id: str | None = None
in_reply_to: str | None = None
full_html = html
if reply_to_message_id:
# Fetch original message with full body for threading + quoted content
try:
orig_response = _gmail_request(
"GET",
f"messages/{_sanitize_path_param(reply_to_message_id, 'reply_to_message_id')}",
token,
params={"format": "full"},
)
except httpx.HTTPError as e:
return {"error": f"Failed to fetch original message: {e}"}
orig_error = _handle_error(orig_response)
if orig_error:
return orig_error
orig_data = orig_response.json()
thread_id = orig_data.get("threadId", "")
payload = orig_data.get("payload", {})
orig_headers = {h["name"]: h["value"] for h in payload.get("headers", [])}
in_reply_to = orig_headers.get("Message-ID") or orig_headers.get("Message-Id", "")
orig_subject = orig_headers.get("Subject", "")
orig_from = orig_headers.get("From", "")
orig_date = orig_headers.get("Date", "")
to = orig_from or to
subject = (
orig_subject if orig_subject.lower().startswith("re:") else f"Re: {orig_subject}"
)
# Extract body recursively (prefer HTML, fall back to plain text)
def _extract_body(part: dict, mime_type: str) -> str | None:
if part.get("mimeType") == mime_type:
body_data = part.get("body", {}).get("data", "")
if body_data:
return base64.urlsafe_b64decode(body_data).decode("utf-8", errors="replace")
for sub in part.get("parts", []):
result = _extract_body(sub, mime_type)
if result:
return result
return None
orig_body_html = _extract_body(payload, "text/html")
if not orig_body_html:
orig_body_text = _extract_body(payload, "text/plain") or ""
orig_body_html = f"<pre>{html_module.escape(orig_body_text)}</pre>"
quoted = (
f"<br><br>"
f'<div class="gmail_quote">'
f"<div>On {orig_date}, {orig_from} wrote:</div>"
"<blockquote"
' style="margin:0 0 0 .8ex;border-left:1px #ccc solid;padding-left:1ex">'
f"{orig_body_html}"
f"</blockquote>"
f"</div>"
)
full_html = html + quoted
else:
if not to or not to.strip():
return {"error": "Recipient email (to) is required"}
if not subject or not subject.strip():
return {"error": "Subject is required"}
if in_reply_to:
msg: MIMEMultipart | MIMEText = MIMEMultipart("alternative")
msg["To"] = to
msg["Subject"] = subject
msg["In-Reply-To"] = in_reply_to
msg["References"] = in_reply_to
msg.attach(MIMEText(full_html, "html")) # type: ignore[attr-defined]
else:
msg = MIMEText(full_html, "html")
msg["To"] = to
msg["Subject"] = subject
raw = base64.urlsafe_b64encode(msg.as_bytes()).decode("ascii")
message_body: dict = {"raw": raw}
if thread_id:
message_body["threadId"] = thread_id
try:
response = _gmail_request(
"POST",
"drafts",
token,
json={"message": {"raw": raw}},
json={"message": message_body},
)
except httpx.HTTPError as e:
return {"error": f"Request failed: {e}"}
@@ -541,11 +629,14 @@ def register_tools(
return error
data = response.json()
return {
result: dict = {
"success": True,
"draft_id": data.get("id", ""),
"message_id": data.get("message", {}).get("id", ""),
}
if thread_id:
result["thread_id"] = thread_id
return result
@mcp.tool()
def gmail_list_labels(account: str = "") -> dict:
+190
View File
@@ -542,3 +542,193 @@ class TestCreateLabel:
assert "error" in result
assert "Request failed" in result["error"]
# ---------------------------------------------------------------------------
# gmail_create_draft
# ---------------------------------------------------------------------------
@pytest.fixture
def create_draft_fn(gmail_tools):
return gmail_tools["gmail_create_draft"]
def _orig_message_response(
thread_id: str = "thread123",
message_id_header: str = "<orig-msg-id@mail.gmail.com>",
subject: str = "Hello there",
from_addr: str = "sender@example.com",
body_html: str = "<p>Original body</p>",
) -> MagicMock:
"""Mock response for fetching an original message (format=full)."""
import base64
encoded_body = base64.urlsafe_b64encode(body_html.encode()).decode()
return _mock_response(
200,
{
"threadId": thread_id,
"payload": {
"mimeType": "text/html",
"headers": [
{"name": "Message-ID", "value": message_id_header},
{"name": "Subject", "value": subject},
{"name": "From", "value": from_addr},
{"name": "Date", "value": "Mon, 1 Jan 2024 12:00:00 +0000"},
],
"body": {"data": encoded_body},
"parts": [],
},
},
)
class TestGmailCreateDraft:
"""Tests for gmail_create_draft tool."""
# -- new draft (no reply) -------------------------------------------------
def test_no_credentials(self, create_draft_fn, monkeypatch):
monkeypatch.delenv("GOOGLE_ACCESS_TOKEN", raising=False)
result = create_draft_fn(html="<p>Hi</p>", to="a@b.com", subject="Hey")
assert "error" in result
assert "Gmail credentials not configured" in result["error"]
def test_missing_to(self, create_draft_fn, monkeypatch):
monkeypatch.setenv("GOOGLE_ACCESS_TOKEN", "tok")
result = create_draft_fn(html="<p>Hi</p>", subject="Hey")
assert "error" in result
assert "to" in result["error"].lower()
def test_missing_subject(self, create_draft_fn, monkeypatch):
monkeypatch.setenv("GOOGLE_ACCESS_TOKEN", "tok")
result = create_draft_fn(html="<p>Hi</p>", to="a@b.com")
assert "error" in result
assert "subject" in result["error"].lower()
def test_missing_html(self, create_draft_fn, monkeypatch):
monkeypatch.setenv("GOOGLE_ACCESS_TOKEN", "tok")
result = create_draft_fn(html="", to="a@b.com", subject="Hey")
assert "error" in result
assert "html" in result["error"].lower()
def test_new_draft_happy_path(self, create_draft_fn, monkeypatch):
monkeypatch.setenv("GOOGLE_ACCESS_TOKEN", "tok")
mock_resp = _mock_response(200, {"id": "draft1", "message": {"id": "msg1"}})
with patch(HTTPX_MODULE, return_value=mock_resp) as mock_req:
result = create_draft_fn(html="<p>Hi</p>", to="a@b.com", subject="Hey")
assert result["success"] is True
assert result["draft_id"] == "draft1"
assert result["message_id"] == "msg1"
assert "thread_id" not in result
# threadId should NOT be in the API body
body = mock_req.call_args[1]["json"]
assert "threadId" not in body["message"]
# -- reply draft ----------------------------------------------------------
def test_reply_draft_happy_path(self, create_draft_fn, monkeypatch):
monkeypatch.setenv("GOOGLE_ACCESS_TOKEN", "tok")
orig_resp = _orig_message_response()
draft_resp = _mock_response(200, {"id": "draft2", "message": {"id": "msg2"}})
calls = [orig_resp, draft_resp]
with patch(HTTPX_MODULE, side_effect=calls) as mock_req:
result = create_draft_fn(
html="<p>Reply</p>",
reply_to_message_id="origmsg123",
)
assert result["success"] is True
assert result["draft_id"] == "draft2"
assert result["thread_id"] == "thread123"
# Verify draft API call has threadId
draft_call = mock_req.call_args_list[1]
body = draft_call[1]["json"]
assert body["message"]["threadId"] == "thread123"
# Verify MIME headers and quoted body
import base64
import email
raw = base64.urlsafe_b64decode(body["message"]["raw"])
mime = email.message_from_bytes(raw)
assert mime["In-Reply-To"] == "<orig-msg-id@mail.gmail.com>"
assert mime["References"] == "<orig-msg-id@mail.gmail.com>"
assert mime["To"] == "sender@example.com"
assert mime["Subject"] == "Re: Hello there"
# Verify quoted original body is embedded
mime_body = mime.get_payload(decode=True)
if mime_body is None:
# multipart — find the html part
for part in mime.walk():
if part.get_content_type() == "text/html":
mime_body = part.get_payload(decode=True)
break
decoded_body = mime_body.decode("utf-8") if mime_body else ""
assert "<p>Reply</p>" in decoded_body
assert "gmail_quote" in decoded_body
assert "<p>Original body</p>" in decoded_body
assert "blockquote" in decoded_body
def test_reply_draft_subject_already_re(self, create_draft_fn, monkeypatch):
monkeypatch.setenv("GOOGLE_ACCESS_TOKEN", "tok")
orig_resp = _orig_message_response(subject="Re: Hello there")
draft_resp = _mock_response(200, {"id": "d3", "message": {"id": "m3"}})
with patch(HTTPX_MODULE, side_effect=[orig_resp, draft_resp]):
result = create_draft_fn(html="<p>x</p>", reply_to_message_id="origmsg")
# Extract subject from result — it should not be "Re: Re: Hello there"
assert result["success"] is True
# Check via MIME is covered by test_reply_draft_subject_no_double_re below.
def test_reply_draft_subject_no_double_re(self, create_draft_fn, monkeypatch):
monkeypatch.setenv("GOOGLE_ACCESS_TOKEN", "tok")
orig_resp = _orig_message_response(subject="Re: Hello there")
draft_resp = _mock_response(200, {"id": "d4", "message": {"id": "m4"}})
with patch(HTTPX_MODULE, side_effect=[orig_resp, draft_resp]) as mock_req:
create_draft_fn(html="<p>x</p>", reply_to_message_id="origmsg")
import base64
import email
body = mock_req.call_args_list[1][1]["json"]
raw = base64.urlsafe_b64decode(body["message"]["raw"])
mime = email.message_from_bytes(raw)
assert mime["Subject"] == "Re: Hello there"
def test_reply_draft_fetch_401(self, create_draft_fn, monkeypatch):
monkeypatch.setenv("GOOGLE_ACCESS_TOKEN", "tok")
orig_resp = _mock_response(401)
with patch(HTTPX_MODULE, return_value=orig_resp):
result = create_draft_fn(html="<p>x</p>", reply_to_message_id="origmsg")
assert "error" in result
assert "token" in result["error"].lower()
def test_reply_draft_fetch_404(self, create_draft_fn, monkeypatch):
monkeypatch.setenv("GOOGLE_ACCESS_TOKEN", "tok")
orig_resp = _mock_response(404)
with patch(HTTPX_MODULE, return_value=orig_resp):
result = create_draft_fn(html="<p>x</p>", reply_to_message_id="origmsg")
assert "error" in result
def test_reply_draft_network_error_on_fetch(self, create_draft_fn, monkeypatch):
monkeypatch.setenv("GOOGLE_ACCESS_TOKEN", "tok")
with patch(HTTPX_MODULE, side_effect=httpx.HTTPError("timeout")):
result = create_draft_fn(html="<p>x</p>", reply_to_message_id="origmsg")
assert "error" in result
assert "fetch" in result["error"].lower()
def test_reply_draft_api_error_on_create(self, create_draft_fn, monkeypatch):
monkeypatch.setenv("GOOGLE_ACCESS_TOKEN", "tok")
orig_resp = _orig_message_response()
draft_resp = _mock_response(500, text="internal error")
with patch(HTTPX_MODULE, side_effect=[orig_resp, draft_resp]):
result = create_draft_fn(html="<p>x</p>", reply_to_message_id="origmsg")
assert "error" in result