feat(reddit): add subreddit info, post detail, and user posts tools
- reddit_get_subreddit_info: GET /r/{name}/about for subscriber count, description
- reddit_get_post_detail: GET /by_id/t3_{id} for full post details with flair, ratios
- reddit_get_user_posts: GET /user/{name}/submitted for user's post history
This commit is contained in:
@@ -308,3 +308,133 @@ def register_tools(
|
||||
"created_utc": d.get("created_utc", 0),
|
||||
"is_gold": d.get("is_gold", False),
|
||||
}
|
||||
|
||||
@mcp.tool()
|
||||
def reddit_get_subreddit_info(subreddit: str) -> dict[str, Any]:
|
||||
"""
|
||||
Get information about a subreddit.
|
||||
|
||||
Args:
|
||||
subreddit: Subreddit name without r/ prefix (required)
|
||||
|
||||
Returns:
|
||||
Dict with subreddit details (subscribers, description, rules, etc.)
|
||||
"""
|
||||
client_id, client_secret = _get_credentials(credentials)
|
||||
if not client_id or not client_secret:
|
||||
return _auth_error()
|
||||
if not subreddit:
|
||||
return {"error": "subreddit is required"}
|
||||
|
||||
token = _get_token(client_id, client_secret)
|
||||
if not token:
|
||||
return {"error": "Failed to acquire Reddit access token"}
|
||||
|
||||
data = _get(f"/r/{subreddit}/about", token)
|
||||
if isinstance(data, dict) and "error" in data:
|
||||
return data
|
||||
|
||||
d = (data if isinstance(data, dict) else {}).get("data", {})
|
||||
return {
|
||||
"name": d.get("display_name", ""),
|
||||
"title": d.get("title", ""),
|
||||
"description": (d.get("public_description", "") or "")[:500],
|
||||
"subscribers": d.get("subscribers", 0),
|
||||
"active_users": d.get("accounts_active", 0),
|
||||
"created_utc": d.get("created_utc", 0),
|
||||
"over18": d.get("over18", False),
|
||||
"subreddit_type": d.get("subreddit_type", ""),
|
||||
"submission_type": d.get("submission_type", ""),
|
||||
}
|
||||
|
||||
@mcp.tool()
|
||||
def reddit_get_post_detail(post_id: str) -> dict[str, Any]:
|
||||
"""
|
||||
Get full details for a single Reddit post by ID.
|
||||
|
||||
Args:
|
||||
post_id: Post ID (e.g. "abc123", without t3_ prefix) (required)
|
||||
|
||||
Returns:
|
||||
Dict with full post details including selftext, flair, awards
|
||||
"""
|
||||
client_id, client_secret = _get_credentials(credentials)
|
||||
if not client_id or not client_secret:
|
||||
return _auth_error()
|
||||
if not post_id:
|
||||
return {"error": "post_id is required"}
|
||||
|
||||
token = _get_token(client_id, client_secret)
|
||||
if not token:
|
||||
return {"error": "Failed to acquire Reddit access token"}
|
||||
|
||||
data = _get(f"/by_id/t3_{post_id}", token)
|
||||
if isinstance(data, dict) and "error" in data:
|
||||
return data
|
||||
|
||||
listing = data if isinstance(data, dict) else {}
|
||||
children = (listing.get("data") or {}).get("children", [])
|
||||
if not children or children[0].get("kind") != "t3":
|
||||
return {"error": "Post not found"}
|
||||
|
||||
d = children[0].get("data", {})
|
||||
return {
|
||||
"id": d.get("id", ""),
|
||||
"title": d.get("title", ""),
|
||||
"author": d.get("author", ""),
|
||||
"subreddit": d.get("subreddit", ""),
|
||||
"score": d.get("score", 0),
|
||||
"upvote_ratio": d.get("upvote_ratio", 0),
|
||||
"num_comments": d.get("num_comments", 0),
|
||||
"url": d.get("url", ""),
|
||||
"permalink": d.get("permalink", ""),
|
||||
"selftext": (d.get("selftext", "") or "")[:2000],
|
||||
"link_flair_text": d.get("link_flair_text", ""),
|
||||
"created_utc": d.get("created_utc", 0),
|
||||
"is_self": d.get("is_self", False),
|
||||
"over_18": d.get("over_18", False),
|
||||
"locked": d.get("locked", False),
|
||||
"archived": d.get("archived", False),
|
||||
}
|
||||
|
||||
@mcp.tool()
|
||||
def reddit_get_user_posts(
|
||||
username: str,
|
||||
sort: str = "new",
|
||||
time: str = "all",
|
||||
limit: int = 25,
|
||||
) -> dict[str, Any]:
|
||||
"""
|
||||
Get recent posts submitted by a Reddit user.
|
||||
|
||||
Args:
|
||||
username: Reddit username (required)
|
||||
sort: Sort: hot, new, top, controversial (default new)
|
||||
time: Time filter for top/controversial: hour, day, week, month, year, all
|
||||
limit: Max results (1-100, default 25)
|
||||
|
||||
Returns:
|
||||
Dict with user's submitted posts
|
||||
"""
|
||||
client_id, client_secret = _get_credentials(credentials)
|
||||
if not client_id or not client_secret:
|
||||
return _auth_error()
|
||||
if not username:
|
||||
return {"error": "username is required"}
|
||||
|
||||
token = _get_token(client_id, client_secret)
|
||||
if not token:
|
||||
return {"error": "Failed to acquire Reddit access token"}
|
||||
|
||||
params: dict[str, Any] = {
|
||||
"sort": sort,
|
||||
"t": time,
|
||||
"limit": max(1, min(limit, 100)),
|
||||
}
|
||||
data = _get(f"/user/{username}/submitted", token, params)
|
||||
if isinstance(data, dict) and "error" in data:
|
||||
return data
|
||||
|
||||
listing = data if isinstance(data, dict) else {}
|
||||
posts = _extract_posts(listing)
|
||||
return {"username": username, "posts": posts, "count": len(posts)}
|
||||
|
||||
Reference in New Issue
Block a user