Compare commits

...

132 Commits

Author SHA1 Message Date
Richard Tang 06fd045b3e micro-fix: track reported_to_parent to prevent false empty-turn detection
Turns that call report_to_parent were incorrectly treated as "truly
empty" because the flag was not propagated. Thread it through
_run_single_turn and include it in the empty-turn guard.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-09 21:10:47 -07:00
RichardTang-Aden 2e43d2af46 Merge pull request #6100 from aden-hive/feature/integration-extended
Release / Create Release (push) Waiting to run
micro-fix: wrong reference for hive_coder
2026-03-09 19:52:35 -07:00
Richard Tang 2c9790c65d Merge remote-tracking branch 'origin' into feature/integration-extended 2026-03-09 19:52:17 -07:00
Richard Tang 9700ac71bb micro-fix: wrong reference for hive_coder 2026-03-09 19:50:07 -07:00
RichardTang-Aden 61ed67b068 Merge pull request #6097 from aden-hive/feature/integration-extended
Expand integration tool coverage across 40 vendors
2026-03-09 19:47:34 -07:00
Richard Tang c3bea8685a Merge remote-tracking branch 'origin/main' into feature/integration-extended 2026-03-09 19:47:21 -07:00
RichardTang-Aden 98c57b795a Merge pull request #6050 from aden-hive/feat/queen-planning-phase
Add queen planning phase, global memory, and refactor hive_coder
2026-03-09 19:46:23 -07:00
Richard Tang 9be1d03b5c chore ruff lint 2026-03-09 19:45:36 -07:00
Richard Tang 0d09510539 Merge remote-tracking branch 'origin/main' into feat/queen-planning-phase 2026-03-09 19:42:10 -07:00
Richard Tang 639c37ba17 feat: prompt to init the agent 2026-03-09 19:34:01 -07:00
Richard Tang 2258c23254 Merge branch 'feature/queen-global-memory' into feat/queen-planning-phase 2026-03-09 19:11:32 -07:00
Richard Tang 9714ea106d feat: improve initialize_and_build_agent clarity 2026-03-09 18:54:48 -07:00
Timothy f4ad500177 chore: lint 2026-03-09 18:53:01 -07:00
Timothy 9154a4d9f8 fix: resolve E501 line-too-long lint errors across 7 tool files 2026-03-09 18:51:01 -07:00
Timothy add6efe6f1 fix(micro-fix): increase stall threshold 2026-03-09 18:40:13 -07:00
Richard Tang 7ceb1efd02 fix: replace old tool name reference 2026-03-09 18:40:01 -07:00
Timothy a29ecf8435 chore(micro-fix): fix ci test blockage 2026-03-09 18:27:21 -07:00
Richard Tang d0ba5ef4f4 fix: update the wrong variable name 2026-03-09 18:12:29 -07:00
Richard Tang 860f637491 feat: add validation for module import 2026-03-09 17:53:50 -07:00
Richard Tang acb2cab317 feat: minor prompt change for switching to building mode 2026-03-09 17:41:23 -07:00
Richard Tang b453806918 feat: execution end message 2026-03-09 17:29:58 -07:00
Richard Tang 7ba8a0f51b feat: strengthen validation logic when loading 2026-03-09 17:08:20 -07:00
Richard Tang f6f398b6b1 feat: add GCU knowledge to planning 2026-03-09 17:02:13 -07:00
Timothy c4b22fa5c4 feat(postgres): update credential spec with new tool names 2026-03-09 16:47:27 -07:00
Timothy 0e64f977cd feat(postgres): add table stats, indexes, and foreign keys tools
Add pg_get_table_stats for row counts and size info,
pg_list_indexes for index details, and pg_get_foreign_keys
for relationship discovery with both outgoing and incoming FKs.
2026-03-09 16:47:09 -07:00
Timothy f24c9708fc feat(lusha): update credential spec with new tool names 2026-03-09 16:45:33 -07:00
Timothy bb4436e277 feat(lusha): add bulk enrich, technologies, and decision makers tools
Add lusha_bulk_enrich_persons for batch enrichment,
lusha_get_technologies for company tech stack lookup, and
lusha_search_decision_makers for senior contact discovery.
2026-03-09 16:45:17 -07:00
Timothy 795f66c90b feat(gsc): update credential spec with new tool names 2026-03-09 16:44:33 -07:00
Timothy 9ef6d51573 feat(gsc): add top queries, top pages, and delete sitemap tools
Add gsc_top_queries and gsc_top_pages convenience wrappers for
click-sorted analytics, and gsc_delete_sitemap for sitemap removal.
2026-03-09 16:44:20 -07:00
Timothy 3fed4e3409 feat(aws-s3): update credential specs with new tool names 2026-03-09 16:43:37 -07:00
Timothy 670e69f2ce feat(aws-s3): add copy, metadata, and presigned URL tools
Add s3_copy_object for copying within/between buckets,
s3_get_object_metadata for HEAD-based metadata retrieval, and
s3_generate_presigned_url for temporary access URL generation.
2026-03-09 16:42:46 -07:00
Timothy f6c4747905 feat(pushover): update credential spec with new tool names 2026-03-09 16:42:04 -07:00
Timothy 7b78f6c12f feat(pushover): add cancel receipt, glance update, and limits tools
Add pushover_cancel_receipt for stopping emergency retries,
pushover_send_glance for widget data updates, and
pushover_get_limits for checking message usage.
2026-03-09 16:41:52 -07:00
Timothy 1c75100f59 feat(news): update credential spec with new tool names 2026-03-09 16:41:15 -07:00
Timothy b325e103c6 feat(news): add latest, by-source, and by-topic search tools
Add news_latest for breaking news without query, news_by_source
for source-filtered articles, and news_by_topic for topic-based
discovery with automatic date ranges.
2026-03-09 16:40:54 -07:00
Timothy aef2d2d474 feat(serpapi): update credential spec with new tool names 2026-03-09 16:40:05 -07:00
Timothy 95a2b6711e feat(serpapi): add cited-by, profile search, and Google web search tools
Add scholar_cited_by for finding papers citing a given paper,
scholar_search_profiles for author profile discovery, and
serpapi_google_search for structured Google web results.
2026-03-09 16:38:50 -07:00
Timothy 7fb5e8145c feat(exa-search): update credential spec with new tool names 2026-03-09 16:37:56 -07:00
Timothy 8e45d0df83 feat(exa-search): add news, papers, and company search tools
Add exa_search_news, exa_search_papers, and exa_search_companies
convenience wrappers with pre-configured category filters and
automatic date/domain filtering.
2026-03-09 16:37:44 -07:00
Richard Tang 8d4657c13e Merge branch 'feat/queen-planning-phase' into feature/queen-global-memory 2026-03-09 16:10:42 -07:00
Timothy 3d175a6d54 feat(greenhouse): update credential spec with new tool names
Add greenhouse_list_offers, greenhouse_add_candidate_note, greenhouse_list_scorecards.
2026-03-09 16:02:53 -07:00
Timothy b9debaf957 feat(greenhouse): add list offers, candidate notes, and scorecards tools
- greenhouse_list_offers: GET /offers or /applications/{id}/offers
- greenhouse_add_candidate_note: POST /candidates/{id}/activity_feed/notes
- greenhouse_list_scorecards: GET /applications/{id}/scorecards
- Add _post helper for POST requests
2026-03-09 16:02:08 -07:00
Richard Tang bdcbcff6f3 feat: better instruction for planning mode switch 2026-03-09 16:01:34 -07:00
Timothy d2d7bdc374 feat(brevo): update credential spec with new tool names
Add brevo_list_contacts, brevo_delete_contact, brevo_list_email_campaigns.
2026-03-09 16:01:16 -07:00
Timothy 40e494b15d feat(brevo): add list contacts, delete contact, and list campaigns tools
- brevo_list_contacts: GET /contacts with pagination and modified_since filter
- brevo_delete_contact: DELETE /contacts/{email} to remove contacts
- brevo_list_email_campaigns: GET /emailCampaigns with status filter and stats
2026-03-09 16:00:42 -07:00
Timothy b5e840c0cb feat(quickbooks): update credential specs with new tool names
Add quickbooks_list_invoices, quickbooks_get_customer, quickbooks_create_payment
to both credential specs (token and realm_id).
2026-03-09 15:59:46 -07:00
Timothy f3d74c9ae4 feat(quickbooks): add list invoices, get customer, and create payment tools
- quickbooks_list_invoices: query invoices with status/customer filters
- quickbooks_get_customer: GET /customer/{id} with address and contact info
- quickbooks_create_payment: POST /payment with optional invoice linking
2026-03-09 15:59:23 -07:00
Richard Tang a22b321692 feat: improve phase switching tools 2026-03-09 15:33:03 -07:00
Timothy 2e7dbad118 feat(cloudinary): update credential specs with new tool names
Add cloudinary_get_usage, cloudinary_rename_resource, cloudinary_add_tag
to all three credential specs (cloud_name, key, secret).
2026-03-09 15:31:42 -07:00
Timothy 6183d1b65b feat(cloudinary): add usage, rename, and add tag tools
- cloudinary_get_usage: GET /usage for storage, bandwidth, transformation limits
- cloudinary_rename_resource: POST /rename to change public_id
- cloudinary_add_tag: POST /tags to add tags to resources
2026-03-09 15:31:22 -07:00
Timothy 09931e6d98 feat(twitter): update credential spec with new tool names
Add twitter_get_user_followers, twitter_get_tweet_replies, twitter_get_list_tweets.
2026-03-09 15:25:21 -07:00
Timothy cb394127d1 feat(twitter): add user followers, tweet replies, and list tweets tools
- twitter_get_user_followers: GET /users/{id}/followers with profile details
- twitter_get_tweet_replies: search recent replies via conversation_id
- twitter_get_list_tweets: GET /lists/{id}/tweets with author expansion
2026-03-09 15:21:47 -07:00
Timothy 588fa1f9ea feat(google-analytics): update credential spec with new tool names
Add ga_get_user_demographics, ga_get_conversion_events, ga_get_landing_pages.
2026-03-09 15:21:09 -07:00
Timothy 73325c280c feat(google-analytics): add demographics, conversion events, and landing pages tools
- ga_get_user_demographics: country/language/device breakdown
- ga_get_conversion_events: event counts, conversions, and revenue
- ga_get_landing_pages: top landing pages with bounce rate and session duration
2026-03-09 15:20:51 -07:00
Timothy 8c5ae8ffa8 feat(docker-hub): update credential spec with new tool names
Add docker_hub_get_tag_detail, docker_hub_delete_tag, docker_hub_list_webhooks.
2026-03-09 15:19:58 -07:00
Timothy 7389423c70 feat(docker-hub): add tag detail, delete tag, and list webhooks tools
- docker_hub_get_tag_detail: GET /repositories/{repo}/tags/{tag} with image architectures
- docker_hub_delete_tag: DELETE /repositories/{repo}/tags/{tag}
- docker_hub_list_webhooks: GET /repositories/{repo}/webhooks
- Add _delete helper for DELETE requests
2026-03-09 15:18:46 -07:00
Timothy 20c15446a7 feat(apollo): update credential spec with new tool names
Add apollo_get_person_activities, apollo_list_email_accounts,
apollo_bulk_enrich_people.
2026-03-09 15:17:38 -07:00
Richard Tang c05c30dd9a feat: add meta agent tools to planning 2026-03-09 15:14:34 -07:00
Timothy bcd2fb76bd feat(apollo): add person activities, email accounts, and bulk enrich tools
- apollo_get_person_activities: GET /activities for contact activity history
- apollo_list_email_accounts: GET /email_accounts for connected sending accounts
- apollo_bulk_enrich_people: POST /people/bulk_match for batch enrichment (up to 10)
2026-03-09 15:03:21 -07:00
Timothy 5fb97ab6df feat(calendly): update credential spec with new tool names
Add calendly_cancel_event, calendly_list_webhooks, calendly_get_event_type.
2026-03-09 15:00:46 -07:00
Timothy 0224ebc800 feat(calendly): add cancel event, list webhooks, and get event type tools
- calendly_cancel_event: POST /scheduled_events/{id}/cancellation
- calendly_list_webhooks: GET /webhook_subscriptions for org/user scope
- calendly_get_event_type: GET /event_types/{id} for meeting template details
- Add _post helper for POST requests
2026-03-09 15:00:34 -07:00
Timothy af88f7299a feat(pagerduty): update credential specs with new tool names
Add pagerduty_list_oncalls, pagerduty_add_incident_note,
pagerduty_list_escalation_policies to api_key spec.
Add pagerduty_add_incident_note to from_email spec (write operation).
2026-03-09 14:59:53 -07:00
Timothy 81729706ae feat(pagerduty): add oncalls, incident notes, and escalation policies tools
- pagerduty_list_oncalls: GET /oncalls with schedule/policy filters
- pagerduty_add_incident_note: POST /incidents/{id}/notes to add notes
- pagerduty_list_escalation_policies: GET /escalation_policies with search
2026-03-09 14:59:33 -07:00
Timothy bbb1b43ebe feat(airtable): update credential spec with new tool names
Add airtable_delete_records, airtable_search_records, airtable_list_collaborators.
2026-03-09 14:58:57 -07:00
Timothy 70ed5fa8df feat(airtable): add delete records, search records, and list collaborators tools
- airtable_delete_records: DELETE records by comma-separated IDs (up to 10)
- airtable_search_records: search records using FIND formula for partial matching
- airtable_list_collaborators: list base collaborators via meta API
- Add _delete helper for DELETE requests
2026-03-09 14:58:42 -07:00
Timothy 312db6620d feat(reddit): update credential specs with new tool names
Add reddit_get_subreddit_info, reddit_get_post_detail, reddit_get_user_posts
to both credential specs (client_id and client_secret).
2026-03-09 14:57:50 -07:00
Timothy 93c1fc5488 feat(reddit): add subreddit info, post detail, and user posts tools
- reddit_get_subreddit_info: GET /r/{name}/about for subscriber count, description
- reddit_get_post_detail: GET /by_id/t3_{id} for full post details with flair, ratios
- reddit_get_user_posts: GET /user/{name}/submitted for user's post history
2026-03-09 14:57:33 -07:00
Richard Tang 90762f275b feat: give planning mode the load tool 2026-03-09 14:55:53 -07:00
Timothy 801443027d feat(pipedrive): update credential spec with new tool names
Add pipedrive_update_deal, pipedrive_create_person, pipedrive_create_activity
to the credential spec tools list.
2026-03-09 14:54:22 -07:00
Timothy ca2ead76cd feat(pipedrive): add deal update, person creation, and activity creation tools
Add pipedrive_update_deal, pipedrive_create_person, and
pipedrive_create_activity tools using Pipedrive REST API v1.
2026-03-09 14:52:27 -07:00
Timothy d562144a6d feat(confluence): register new tools in credential specs
Add confluence_update_page, confluence_delete_page, and
confluence_get_page_children to all three Confluence credential specs.
2026-03-09 14:51:39 -07:00
Timothy af7fb7da27 feat(confluence): add page update, delete, and children listing tools
Add confluence_update_page, confluence_delete_page, and
confluence_get_page_children tools using Confluence REST API v2.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-09 14:51:26 -07:00
Timothy c17dd63b4a feat(intercom): register new tools in credential spec
Add intercom_close_conversation, intercom_create_contact, and
intercom_list_conversations to Intercom credential spec.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-09 14:50:49 -07:00
Timothy 866db289e2 feat(intercom): add close conversation, create contact, and list conversations tools
Add close_conversation, create_contact, and list_conversations client
methods plus intercom_close_conversation, intercom_create_contact, and
intercom_list_conversations MCP tools using Intercom API v2.11.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-09 14:50:30 -07:00
Timothy b4ac5e9607 feat(gitlab): register new tools in credential spec
Add gitlab_update_issue, gitlab_get_merge_request, and
gitlab_create_merge_request_note to GitLab credential spec.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-09 14:49:01 -07:00
Timothy 3ca7af4242 feat(gitlab): add issue update, MR detail, and MR comment tools
Add _put helper and gitlab_update_issue, gitlab_get_merge_request,
and gitlab_create_merge_request_note tools using GitLab REST API v4.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-09 14:48:40 -07:00
Richard Tang 2b12a9c91a Merge remote-tracking branch 'origin/feature/queen-global-memory' into feature/queen-global-memory 2026-03-09 14:47:27 -07:00
Richard Tang 9a94595a42 feat: extract the shared knowledge between planning and building 2026-03-09 14:45:31 -07:00
Richard Tang e1540dfaa6 refactor: drop hive code CLI 2026-03-09 14:30:13 -07:00
Richard Tang 4f5ac6d1b1 refactor: rename hive_coder to queen and extract queen orchestrator 2026-03-09 14:23:31 -07:00
Richard Tang c87d7b13da refactor: rename hive_coder to queen and extract queen orchestrator 2026-03-09 14:23:16 -07:00
Timothy c4acf0b659 fix: memory consolidation hook, simplify generated memory files 2026-03-09 14:15:01 -07:00
RichardTang-Aden 5e1ab3ca37 Merge pull request #5029 from karthik-kotra/docs/setup-troubleshooting
docs(setup): add troubleshooting steps for common WSL setup issues
2026-03-09 14:06:28 -07:00
Timothy 79c32c9f47 feat(slack): register new tools in credential spec
Add slack_get_channel_info, slack_list_files, and slack_get_file_info
to Slack credential spec.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-09 13:58:14 -07:00
Timothy 35ee29a843 feat(slack): add channel info, file listing, and file detail tools
Add get_channel_info, list_files, and get_file_info client methods
plus slack_get_channel_info, slack_list_files, and slack_get_file_info
MCP tools using Slack Web API.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-09 13:57:45 -07:00
Timothy 573aea1d9c feat(stripe): register new tools in credential spec
Add stripe_list_disputes, stripe_list_events, and
stripe_create_checkout_session to Stripe credential spec.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-09 13:56:20 -07:00
Timothy 6ecbc30293 feat(stripe): add disputes, events, and checkout session tools
Add list_disputes, list_events, and create_checkout_session client
methods plus stripe_list_disputes, stripe_list_events, and
stripe_create_checkout_session MCP tools using Stripe API.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-09 13:56:07 -07:00
Timothy 843b1f2e1d feat(linear): register new tools in credential spec
Add linear_cycles_list, linear_issue_comments_list, and
linear_issue_relation_create to Linear credential spec.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-09 13:54:48 -07:00
Timothy 89f6c8e4ef feat(linear): add cycle listing, issue comments, and issue relations tools
Add list_cycles, list_issue_comments, and create_issue_relation client
methods plus linear_cycles_list, linear_issue_comments_list, and
linear_issue_relation_create MCP tools using Linear GraphQL API.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-09 13:52:09 -07:00
Timothy 304ac07bd8 feat(zoom): register new tools in credential spec
Add zoom_update_meeting, zoom_list_meeting_participants, and
zoom_list_meeting_registrants to Zoom credential spec.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-09 13:50:27 -07:00
Timothy 82f0684b83 feat(zoom): add meeting update, participants, and registrants tools
Add zoom_update_meeting (PATCH), zoom_list_meeting_participants
(past meeting attendees), and zoom_list_meeting_registrants
using Zoom REST API v2.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-09 13:45:11 -07:00
Timothy 963c37dc31 feat(twilio): register new tools in credential specs
Add twilio_list_phone_numbers, twilio_list_calls, and
twilio_delete_message to both Twilio credential specs.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-09 13:41:26 -07:00
Timothy c02da3ba5a feat(twilio): add phone number listing, call history, and message deletion tools
Add twilio_list_phone_numbers, twilio_list_calls, and
twilio_delete_message tools using Twilio REST API.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-09 13:40:58 -07:00
Timothy 7f34e95ec6 feat(shopify): register new tools in credential specs
Add shopify_update_product, shopify_get_customer, and
shopify_create_draft_order to both Shopify credential specs.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-09 13:40:28 -07:00
Timothy f2998fe098 feat(shopify): add product update, customer detail, and draft order tools
Add shopify_update_product, shopify_get_customer, and
shopify_create_draft_order tools using Shopify Admin REST API.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-09 13:40:15 -07:00
Timothy 323a2489b8 feat(zendesk): register new tools in credential specs
Add zendesk_get_ticket_comments, zendesk_add_ticket_comment, and
zendesk_list_users to all three Zendesk credential specs.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-09 13:39:35 -07:00
Timothy f6d1cd640e feat(zendesk): add ticket comments and user listing tools
Add zendesk_get_ticket_comments, zendesk_add_ticket_comment, and
zendesk_list_users tools using Zendesk Support API v2.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-09 13:39:25 -07:00
Timothy ddf89a04fe feat(asana): update credential spec for new tools
Register asana_update_task, asana_add_comment, and
asana_create_subtask in the Asana credential spec.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-09 13:35:16 -07:00
Timothy c5dc89f5ee feat(asana): add update_task, add_comment, create_subtask tools
Add _put helper and three new Asana MCP tools:
- asana_update_task: modify name, notes, completion, due date, assignee
- asana_add_comment: post comment stories on tasks
- asana_create_subtask: create subtasks under existing tasks

API ref: https://developers.asana.com/docs

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-09 13:35:05 -07:00
Timothy 6ade34b759 feat(trello): register get_card, create_list, search_cards tools
Add three new Trello MCP tools:
- trello_get_card: retrieve full card details with members/checklists/attachments
- trello_create_list: create new lists on boards
- trello_search_cards: full-text search across cards with board scoping

Update credential spec to include the new tool names.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-09 13:20:43 -07:00
Timothy 09d5f0a9df feat(trello): add client methods for get_card, create_list, search
Add TrelloClient methods for:
- get_card: GET /1/cards/{id} with members, checklists, attachments
- create_list: POST /1/lists to create new board lists
- search: GET /1/search for full-text search across cards

API ref: https://developer.atlassian.com/cloud/trello/rest/api-group-cards/

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-09 13:19:59 -07:00
Timothy a60d63cca2 feat(github): register list_commits, create_release, list_workflow_runs
Add three new GitHub MCP tools:
- github_list_commits: query commits with author/date/branch filters
- github_create_release: create tagged releases with notes and draft support
- github_list_workflow_runs: monitor CI/CD pipeline runs with status filters

Update credential spec to include the new tool names.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-09 13:19:16 -07:00
Timothy 8616975fc5 feat(github): add client methods for commits, releases, workflow runs
Add _GitHubClient methods for:
- list_commits: GET /repos/{owner}/{repo}/commits with sha/author/date filters
- create_release: POST /repos/{owner}/{repo}/releases with tag, notes, draft
- list_workflow_runs: GET /repos/{owner}/{repo}/actions/runs with filters

API ref: https://docs.github.com/en/rest

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-09 13:18:33 -07:00
Timothy e5ae919d8f feat(telegram): register get_chat_member_count, send_video, set_description
Add three new Telegram MCP tools:
- telegram_get_chat_member_count: retrieve group/channel membership size
- telegram_send_video: send video files via URL or file_id
- telegram_set_chat_description: update group/channel descriptions

Update credential spec to include the new tool names.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-09 13:17:30 -07:00
Timothy 8e7f5eaaba feat(telegram): add client methods for member count, video, description
Add _TelegramClient methods for:
- get_chat_member_count: getChatMemberCount API endpoint
- send_video: sendVideo with caption, parse_mode, duration support
- set_chat_description: setChatDescription for groups/channels

API ref: https://core.telegram.org/bots/api

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-09 13:13:06 -07:00
Timothy 4d1ff8b054 feat(salesforce): update credential spec for new tools
Register salesforce_delete_record, salesforce_search_records, and
salesforce_get_record_count in both Salesforce credential specs.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-09 13:12:25 -07:00
Timothy 9fa81e8599 feat(salesforce): add delete_record, search_records, get_record_count
Add three new Salesforce MCP tools:
- salesforce_delete_record: DELETE /sobjects/{type}/{id}
- salesforce_search_records: SOSL full-text search via /search/
- salesforce_get_record_count: efficient COUNT() query for any SObject

API ref: https://developer.salesforce.com/docs/atlas.en-us.api_rest.meta

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-09 13:12:11 -07:00
Timothy cf8e19b059 feat(discord): register get_channel, create_reaction, delete_message tools
Add three new Discord MCP tools:
- discord_get_channel: retrieve channel metadata (name, topic, type)
- discord_create_reaction: add emoji reactions to messages
- discord_delete_message: remove messages from channels

Update credential spec to include the new tool names.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-09 13:11:25 -07:00
Timothy dfa3f60fcf feat(discord): add client methods for get_channel, reactions, delete
Add _DiscordClient methods for:
- get_channel: retrieve channel metadata via GET /channels/{id}
- create_reaction: add emoji reaction via PUT reactions endpoint
- delete_message: remove a message via DELETE messages endpoint

API ref: https://discord.com/developers/docs/resources

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-09 13:10:49 -07:00
Timothy b795f1b253 feat(notion): update credential spec for new tools
Register notion_update_page, notion_archive_page, and
notion_append_blocks in the Notion credential spec.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-09 13:06:20 -07:00
Timothy 73423c0dd2 feat(notion): add update_page, archive_page, append_blocks tools
Add three new Notion MCP tools:
- notion_update_page: modify page properties via PATCH /pages/{id}
- notion_archive_page: archive or restore pages
- notion_append_blocks: add paragraphs, headings, lists, todos, etc.

API ref: https://developers.notion.com/reference

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-09 13:06:08 -07:00
Timothy 3d844e1539 feat(jira): update credential spec for new tools
Register jira_update_issue, jira_list_transitions, and
jira_transition_issue in all three Jira credential specs
(domain, email, token).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-09 13:04:32 -07:00
Timothy b619119eb5 feat(jira): add update_issue, list_transitions, transition_issue tools
Add three new Jira MCP tools:
- jira_update_issue: modify summary, description, priority, labels, assignee
- jira_list_transitions: discover available status transitions for an issue
- jira_transition_issue: move an issue to a new status with optional comment

API ref: https://developer.atlassian.com/cloud/jira/platform/rest/v3/

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-09 13:04:19 -07:00
Timothy b00ed4fc70 feat(hubspot): register delete_object, list/create_associations tools
Add three new MCP tools:
- hubspot_delete_object: archive contacts, companies, or deals
- hubspot_list_associations: query links between CRM objects (v4 API)
- hubspot_create_association: link two CRM records together

Update credential spec to include the new tool names.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-09 13:03:37 -07:00
Timothy 5ec5fbe998 feat(hubspot): add client methods for delete, associations
Add _HubSpotClient methods for:
- delete_object: archive a CRM object via DELETE /crm/v3/objects
- list_associations: query associations via GET /crm/v4/objects associations endpoint
- create_association: link two CRM objects via PUT /crm/v4/objects associations endpoint

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-09 13:02:49 -07:00
Richard Tang 2ed814455a Merge branch 'feat/queen-planning-phase' into feature/queen-global-memory 2026-03-09 12:57:23 -07:00
Timothy ad1a4ef0c3 fix: cancellation button 2026-03-09 12:48:20 -07:00
Timothy 2111c808a9 feat: queen memory v1 2026-03-09 11:55:39 -07:00
Bryan @ Aden 402bb38267 Merge pull request #6079 from Waryjustice/fix/google-sheets-credentials-orphan
fix(credentials): remove orphaned google_sheets.py credential spec
2026-03-09 18:37:27 +00:00
Waryjustice 0a55928872 fix(credentials): remove orphaned google_sheets.py credential spec
The google_sheets.py file defined GOOGLE_SHEETS_CREDENTIALS (an API-key
based credential for reading public sheets via GOOGLE_SHEETS_API_KEY) but
was never wired into the package:

- Never imported in credentials/__init__.py
- Never merged into CREDENTIAL_SPECS
- Never listed in __all__
- Tool never calls credentials.get('google_sheets_key') — uses 'google' (OAuth2)
- Tool names in the spec were stale and did not match actual function names

The 'google' credential in email.py already correctly covers all Google
Sheets tools via OAuth2. This file was dead code with no referencing
imports anywhere in the repository.

Closes #6077

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-03-09 23:44:26 +05:30
Richard Tang cdf76ae3b9 fix: eventloop test 2026-03-09 10:23:56 -07:00
Richard Tang 42d0592941 refactor: judge evaluate 2026-03-09 10:09:15 -07:00
Richard Tang 1de7cf821d fix: handle judge with empty message 2026-03-09 09:58:29 -07:00
Timothy 4ea8540e25 fix: better logging for memory consolidation event 2026-03-08 20:44:40 -07:00
Timothy bfa3b8e0f6 fix: queen memory health 2026-03-08 20:28:53 -07:00
Richard Tang 55eccfd75f feat: intake node prompt in planning mode 2026-03-08 20:27:24 -07:00
Timothy 1e994a77b5 feat: queen agent global memory 2026-03-08 19:54:46 -07:00
Richard Tang d12afeb35d chore: ruff lint 2026-03-08 19:46:49 -07:00
Richard Tang e84fefd319 feat: separate the queen and worker tools in prompts 2026-03-08 19:40:30 -07:00
Richard Tang d2b510014d feat: adjust tools and knowledge separation between planning and building 2026-03-08 19:21:50 -07:00
Richard Tang 3ed5fda448 feat: planning phase for the queen 2026-03-08 18:49:45 -07:00
karthik-kotra 41cd11d5c9 docs(setup): add troubleshooting steps for common WSL setup issues 2026-02-17 07:30:00 +00:00
136 changed files with 8660 additions and 1103 deletions
+46
View File
@@ -65,6 +65,52 @@ You may submit PRs without prior assignment for:
> **Tip:** Installing Claude Code skills is optional for running existing agents, but required if you plan to **build new agents**.
## Troubleshooting Setup Issues
If you encounter issues while setting up the development environment, the following steps may help:
### `make: command not found`
Install `make` using:
```bash
sudo apt install make
uv: command not found
Install uv using:
curl -LsSf https://astral.sh/uv/install.sh | sh
source ~/.bashrc
ruff: not found
If linting fails due to a missing ruff command, install it with:
uv tool install ruff
WSL Path Recommendation
When using WSL, it is recommended to clone the repository inside your Linux home directory (e.g., ~/hive) instead of under /mnt/c/... to avoid potential performance and permission issues.
---
# ✅ Why This Is Good
- Clear
- Professional tone
- No unnecessary explanation
- Under micro-fix size
- Based on real contributor experience
- Wont annoy maintainers
---
Now:
```bash
git checkout -b docs/setup-troubleshooting
## Commit Convention
We follow [Conventional Commits](https://www.conventionalcommits.org/):
+1 -1
View File
@@ -1,6 +1,6 @@
# MCP Server Guide - Agent Building Tools
> **Note:** The standalone `agent-builder` MCP server (`framework.mcp.agent_builder_server`) has been replaced. Agent building is now done via the `coder-tools` server's `initialize_agent_package` tool, with underlying logic in `framework.builder.package_generator`.
> **Note:** The standalone `agent-builder` MCP server (`framework.mcp.agent_builder_server`) has been replaced. Agent building is now done via the `coder-tools` server's `initialize_and_build_agent` tool, with underlying logic in `tools/coder_tools_server.py`.
This guide covers the MCP tools available for building goal-driven agents.
+1 -1
View File
@@ -19,7 +19,7 @@ uv pip install -e .
## Agent Building
Agent scaffolding is handled by the `coder-tools` MCP server (in `tools/coder_tools_server.py`), which provides the `initialize_agent_package` tool and related utilities. The underlying package generation logic lives in `framework.builder.package_generator`.
Agent scaffolding is handled by the `coder-tools` MCP server (in `tools/coder_tools_server.py`), which provides the `initialize_and_build_agent` tool and related utilities. The package generation logic lives directly in `tools/coder_tools_server.py`.
See the [Getting Started Guide](../docs/getting-started.md) for building agents.
@@ -1,40 +0,0 @@
"""
Hive Coder Native coding agent that builds Hive agent packages.
Deeply understands the agent framework and produces complete Python packages
with goals, nodes, edges, system prompts, MCP configuration, and tests
from natural language specifications.
"""
from .agent import (
conversation_mode,
edges,
entry_node,
entry_points,
goal,
identity_prompt,
loop_config,
nodes,
pause_nodes,
terminal_nodes,
)
from .config import AgentMetadata, RuntimeConfig, default_config, metadata
__version__ = "1.0.0"
__all__ = [
"goal",
"nodes",
"edges",
"entry_node",
"entry_points",
"pause_nodes",
"terminal_nodes",
"conversation_mode",
"identity_prompt",
"loop_config",
"RuntimeConfig",
"AgentMetadata",
"default_config",
"metadata",
]
@@ -1,60 +0,0 @@
"""CLI entry point for Hive Coder agent."""
import json
import logging
import sys
import click
from .agent import entry_node, goal, nodes
from .config import metadata
def setup_logging(verbose=False, debug=False):
"""Configure logging for execution visibility."""
if debug:
level, fmt = logging.DEBUG, "%(asctime)s %(name)s: %(message)s"
elif verbose:
level, fmt = logging.INFO, "%(message)s"
else:
level, fmt = logging.WARNING, "%(levelname)s: %(message)s"
logging.basicConfig(level=level, format=fmt, stream=sys.stderr)
logging.getLogger("framework").setLevel(level)
@click.group()
@click.version_option(version="1.0.0")
def cli():
"""Hive Coder — Build Hive agent packages from natural language."""
pass
@cli.command()
@click.option("--json", "output_json", is_flag=True)
def info(output_json):
"""Show agent information."""
info_data = {
"name": metadata.name,
"version": metadata.version,
"description": metadata.description,
"goal": {
"name": goal.name,
"description": goal.description,
},
"nodes": [n.id for n in nodes],
"entry_node": entry_node,
"client_facing_nodes": [n.id for n in nodes if n.client_facing],
}
if output_json:
click.echo(json.dumps(info_data, indent=2))
else:
click.echo(f"Agent: {info_data['name']}")
click.echo(f"Version: {info_data['version']}")
click.echo(f"Description: {info_data['description']}")
click.echo(f"\nNodes: {', '.join(info_data['nodes'])}")
click.echo(f"Client-facing: {', '.join(info_data['client_facing_nodes'])}")
click.echo(f"Entry: {info_data['entry_node']}")
if __name__ == "__main__":
cli()
-153
View File
@@ -1,153 +0,0 @@
"""Agent graph construction for Hive Coder."""
from framework.graph import Constraint, Goal, SuccessCriterion
from framework.graph.edge import GraphSpec
from .nodes import coder_node, queen_node
# Goal definition
goal = Goal(
id="hive-coder",
name="Hive Agent Builder",
description=(
"Build complete, validated Hive agent packages from natural language "
"specifications. Produces production-ready Python packages with goals, "
"nodes, edges, system prompts, MCP configuration, and tests."
),
success_criteria=[
SuccessCriterion(
id="valid-package",
description="Generated agent package passes structural validation",
metric="validation_pass",
target="true",
weight=0.30,
),
SuccessCriterion(
id="complete-files",
description=(
"All required files generated: agent.py, config.py, "
"nodes/__init__.py, __init__.py, __main__.py, mcp_servers.json"
),
metric="file_count",
target=">=6",
weight=0.25,
),
SuccessCriterion(
id="user-satisfaction",
description="User reviews and approves the generated agent",
metric="user_approval",
target="true",
weight=0.25,
),
SuccessCriterion(
id="framework-compliance",
description=(
"Generated code follows framework patterns: STEP 1/STEP 2 "
"for client-facing and correct imports"
),
metric="pattern_compliance",
target="100%",
weight=0.20,
),
],
constraints=[
Constraint(
id="dynamic-tool-discovery",
description=(
"Always discover available tools dynamically via "
"list_agent_tools before referencing tools in agent designs"
),
constraint_type="hard",
category="correctness",
),
Constraint(
id="no-fabricated-tools",
description="Only reference tools that exist in hive-tools MCP",
constraint_type="hard",
category="correctness",
),
Constraint(
id="valid-python",
description="All generated Python files must be syntactically correct",
constraint_type="hard",
category="correctness",
),
Constraint(
id="self-verification",
description="Run validation after writing code; fix errors before presenting",
constraint_type="hard",
category="quality",
),
],
)
# Nodes: primary coder node only. The queen runs as an independent
# GraphExecutor with queen_node — not as part of this graph.
nodes = [coder_node]
# No edges needed — single event_loop node
edges = []
# Graph configuration
entry_node = "coder"
entry_points = {"start": "coder"}
pause_nodes = []
terminal_nodes = [] # Coder node has output_keys and can terminate
# No async entry points needed — the queen is now an independent executor,
# not a secondary graph receiving events via add_graph().
async_entry_points = []
# Module-level variables read by AgentRunner.load()
conversation_mode = "continuous"
identity_prompt = (
"You are Hive Coder, the best agent-building coding agent on the planet. "
"You deeply understand the Hive agent framework at the source code level "
"and produce production-ready agent packages from natural language. "
"You can dynamically discover available framework tools, inspect runtime "
"sessions and checkpoints from agents you build, and run their test suites. "
"You follow coding agent discipline: read before writing, verify "
"assumptions by reading actual code, adhere to project conventions, "
"self-verify with validation, and fix your own errors. You are concise, "
"direct, and technically rigorous. No emojis. No fluff."
)
loop_config = {
"max_iterations": 100,
"max_tool_calls_per_turn": 30,
"max_history_tokens": 32000,
}
# ---------------------------------------------------------------------------
# Queen graph — runs as an independent persistent conversation in the TUI.
# Loaded by _load_judge_and_queen() in app.py, NOT by AgentRunner.
# ---------------------------------------------------------------------------
queen_goal = Goal(
id="queen-manager",
name="Queen Manager",
description=(
"Manage the worker agent lifecycle and serve as the user's primary "
"interactive interface. Triage health escalations from the judge."
),
success_criteria=[],
constraints=[],
)
queen_graph = GraphSpec(
id="queen-graph",
goal_id=queen_goal.id,
version="1.0.0",
entry_node="queen",
entry_points={"start": "queen"},
terminal_nodes=[],
pause_nodes=[],
nodes=[queen_node],
edges=[],
conversation_mode="continuous",
loop_config={
"max_iterations": 999_999,
"max_tool_calls_per_turn": 30,
"max_history_tokens": 32000,
},
)
+21
View File
@@ -0,0 +1,21 @@
"""
Queen Native agent builder for the Hive framework.
Deeply understands the agent framework and produces complete Python packages
with goals, nodes, edges, system prompts, MCP configuration, and tests
from natural language specifications.
"""
from .agent import queen_goal, queen_graph
from .config import AgentMetadata, RuntimeConfig, default_config, metadata
__version__ = "1.0.0"
__all__ = [
"queen_goal",
"queen_graph",
"RuntimeConfig",
"AgentMetadata",
"default_config",
"metadata",
]
+40
View File
@@ -0,0 +1,40 @@
"""Queen graph definition."""
from framework.graph import Goal
from framework.graph.edge import GraphSpec
from .nodes import queen_node
# ---------------------------------------------------------------------------
# Queen graph — the primary persistent conversation.
# Loaded by queen_orchestrator.create_queen(), NOT by AgentRunner.
# ---------------------------------------------------------------------------
queen_goal = Goal(
id="queen-manager",
name="Queen Manager",
description=(
"Manage the worker agent lifecycle and serve as the user's primary "
"interactive interface. Triage health escalations from the judge."
),
success_criteria=[],
constraints=[],
)
queen_graph = GraphSpec(
id="queen-graph",
goal_id=queen_goal.id,
version="1.0.0",
entry_node="queen",
entry_points={"start": "queen"},
terminal_nodes=[],
pause_nodes=[],
nodes=[queen_node],
edges=[],
conversation_mode="continuous",
loop_config={
"max_iterations": 999_999,
"max_tool_calls_per_turn": 30,
"max_history_tokens": 32000,
},
)
@@ -1,4 +1,4 @@
"""Runtime configuration for Hive Coder agent."""
"""Runtime configuration for Queen agent."""
import json
from dataclasses import dataclass, field
@@ -34,7 +34,7 @@ default_config = RuntimeConfig()
@dataclass
class AgentMetadata:
name: str = "Hive Coder"
name: str = "Queen"
version: str = "1.0.0"
description: str = (
"Native coding agent that builds production-ready Hive agent packages "
@@ -43,7 +43,7 @@ class AgentMetadata:
"MCP configuration, and tests."
)
intro_message: str = (
"I'm Hive Coder — I build Hive agents. Describe what kind of agent "
"I'm Queen — I build Hive agents. Describe what kind of agent "
"you want to create and I'll design, implement, and validate it for you."
)
@@ -1,4 +1,4 @@
"""Node definitions for Hive Coder agent."""
"""Node definitions for Queen agent."""
from pathlib import Path
@@ -35,15 +35,14 @@ def _build_appendices() -> str:
# Shared appendices — appended to every coding node's system prompt.
_appendices = _build_appendices()
# GCU first-class section for building phase (when GCU is enabled).
# This is placed prominently in the main prompt body, not as an appendix.
_gcu_building_section = (
# GCU guide — shared between planning and building via _shared_building_knowledge.
_gcu_section = (
("\n\n# GCU Nodes — Browser Automation\n\n" + _gcu_guide)
if _is_gcu_enabled() and _gcu_guide
else ""
)
# Tools available to both coder (worker) and queen.
# Tools available to phases.
_SHARED_TOOLS = [
# File I/O
"read_file",
@@ -61,14 +60,34 @@ _SHARED_TOOLS = [
"list_agent_sessions",
"list_agent_checkpoints",
"get_agent_checkpoint",
"initialize_agent_package",
]
# Queen phase-specific tool sets.
# Planning phase: read-only exploration + design, no write tools.
_QUEEN_PLANNING_TOOLS = [
# Read-only file tools
"read_file",
"list_directory",
"search_files",
"run_command",
# Discovery + design
"list_agent_tools",
"list_agents",
"list_agent_sessions",
"list_agent_checkpoints",
"get_agent_checkpoint",
"initialize_and_build_agent",
# Load existing agent (after user confirms)
"load_built_agent",
]
# Building phase: full coding + agent construction tools.
_QUEEN_BUILDING_TOOLS = _SHARED_TOOLS + [
"load_built_agent",
"list_credentials",
"replan_agent",
"write_to_diary", # Episodic memory — available in all phases
]
# Staging phase: agent loaded but not yet running — inspect, configure, launch.
@@ -84,6 +103,8 @@ _QUEEN_STAGING_TOOLS = [
# Launch or go back
"run_agent_with_input",
"stop_worker_and_edit",
"stop_worker_and_plan",
"write_to_diary", # Episodic memory — available in all phases
]
# Running phase: worker is executing — monitor and control.
@@ -98,11 +119,13 @@ _QUEEN_RUNNING_TOOLS = [
# Worker lifecycle
"stop_worker",
"stop_worker_and_edit",
"stop_worker_and_plan",
"get_worker_status",
"inject_worker_message",
# Monitoring
"get_worker_health_summary",
"notify_operator",
"write_to_diary", # Episodic memory — available in all phases
]
@@ -113,7 +136,38 @@ _QUEEN_RUNNING_TOOLS = [
# additions.
# ---------------------------------------------------------------------------
_package_builder_knowledge = """\
_shared_building_knowledge = (
"""\
# Shared Rules (Planning & Building)
## Paths (MANDATORY)
**Always use RELATIVE paths** \
(e.g. `exports/agent_name/config.py`, `exports/agent_name/nodes/__init__.py`).
**Never use absolute paths** like `/mnt/data/...` or `/workspace/...` they fail.
The project root is implicit.
## Worker File Tools (hive-tools MCP)
Workers use a DIFFERENT MCP server (hive-tools) with DIFFERENT tool names. \
When designing worker nodes or writing worker system prompts, reference these \
tool names NOT the coder-tools names (read_file, write_file, etc.).
Worker data tools (for large results and spillover):
- save_data(filename, data, data_dir) save data to a file for later retrieval
- load_data(filename, data_dir, offset_bytes?, limit_bytes?) load data \
with byte-based pagination
- list_data_files(data_dir) list available data files
- append_data(filename, data, data_dir) append to a file incrementally
- edit_data(filename, old_text, new_text, data_dir) find-and-replace in a data file
- serve_file_to_user(filename, data_dir, label?, open_in_browser?) \
generate a clickable file URI for the user
IMPORTANT: Do NOT tell workers to use read_file, write_file, edit_file, \
search_files, or list_directory those are YOUR tools, not theirs.
"""
+ _gcu_section
)
_planning_knowledge = """\
**A responsible engineer doesn't jump into building. First, \
understand the problem and be transparent about what the framework can and cannot do.**
@@ -121,56 +175,16 @@ Use the user's selection (or their custom description if they chose "Other") \
as context when shaping the goal below. If the user already described \
what they want before this step, skip the question and proceed directly.
# Core Mandates
# Core Mandates (Planning)
- **DO NOT propose a complete goal on your own.** Instead, \
collaborate with the user to define it.
- **Verify assumptions.** Never assume a class, import, or pattern \
exists. Read actual source to confirm. Search if unsure.
- **NEVER call `initialize_and_build_agent` without explicit user approval.** \
Present the full design first and wait for the user to confirm before building.
- **Discover tools dynamically.** NEVER reference tools from static \
docs. Always run list_agent_tools() to see what actually exists.
- **Self-verify.** After writing code, run validation and tests. Fix \
errors yourself. Don't declare success until validation passes.
# Tools
## Paths (MANDATORY)
**Always use RELATIVE paths**
(e.g. `exports/agent_name/config.py`, `exports/agent_name/nodes/__init__.py`).
**Never use absolute paths** like `/mnt/data/...` or `/workspace/...` they fail.
The project root is implicit.
# Tool Discovery (MANDATORY before designing)
## File I/O
- read_file(path, offset?, limit?, hashline?) read with line numbers; \
hashline=True for N:hhhh|content anchors (use with hashline_edit)
- write_file(path, content) create/overwrite, auto-mkdir
- edit_file(path, old_text, new_text, replace_all?) fuzzy-match edit
- hashline_edit(path, edits, auto_cleanup?, encoding?) anchor-based \
editing using N:hhhh refs from read_file(hashline=True). Ops: set_line, \
replace_lines, insert_after, insert_before, replace, append
- list_directory(path, recursive?) list contents
- search_files(pattern, path?, include?, hashline?) regex search; \
hashline=True for anchors in results
- run_command(command, cwd?, timeout?) shell execution
- undo_changes(path?) restore from git snapshot
## Meta-Agent
- list_agent_tools(server_config_path?, output_schema?, group?) discover \
available tools grouped by category. output_schema: "simple" (default, \
descriptions truncated to ~200 chars) or "full" (complete descriptions + \
input_schema). group: "all" (default) or a provider like "google". \
Call FIRST before designing.
- validate_agent_package(agent_name) run ALL validation checks in one call \
(class validation, runner load, tool validation, tests). Call after building.
- list_agents() list all agent packages in exports/ with session counts
- list_agent_sessions(agent_name, status?, limit?) list sessions
- list_agent_checkpoints(agent_name, session_id) list checkpoints
- get_agent_checkpoint(agent_name, session_id, checkpoint_id?) load checkpoint
# Meta-Agent Capabilities
You are not just a file writer. You have deep integration with the \
Hive framework:
## Tool Discovery (MANDATORY before designing)
Before designing any agent, run list_agent_tools() with NO arguments \
to see ALL available tools (names + descriptions, grouped by category). \
ONLY use tools from this list in your node definitions. \
@@ -184,22 +198,7 @@ so you know what providers and tools exist before drilling in. \
Simple mode truncates long descriptions use group + "full" to \
get the complete description and input_schema for the tools you need.
## Post-Build Validation
After writing agent code, run a single comprehensive check:
validate_agent_package("{name}")
This runs class validation, runner load, tool validation, and tests \
in one call. Do NOT run these steps individually.
## Debugging Built Agents
When a user says "my agent is failing" or "debug this agent":
1. list_agent_sessions("{agent_name}") find the session
2. get_worker_status(focus="issues") check for problems
3. list_agent_checkpoints / get_agent_checkpoint trace execution
# Agent Building Workflow
You operate in a continuous loop. The user describes what they want, \
you build it. No rigid phases use judgment. But the general flow is:
# Discovery & Design Workflow
## 1: Fast Discovery (3-6 Turns)
@@ -343,28 +342,30 @@ use box-drawing characters and clear flow arrows:
gather
subagent: gcu_search
input: user_request
tools: web_search,
write_file
tools: load_data,
save_data
on_success
work
subagent: gcu_interact
tools: read_file,
write_file
tools: load_data,
save_data
on_success
review
tools: write_file
tools: save_data
serve_file_to_user
on_failure
back to gather
```
The queen owns intake: she gathers user requirements, then calls \
If the worker agent start from some initial input it is okay. \
The queen(you) owns intake: you gathers user requirements, then calls \
`run_agent_with_input(task)` with a structured task description. \
When building the agent, design the entry node's `input_keys` to \
match what the queen will provide at run time. Worker nodes should \
@@ -375,34 +376,106 @@ Get user approval before implementing.
## 4: Get User Confirmation by ask_user
**WAIT for user response.**
- If **Proceed**: Move to next implementing
**WAIT for user response.** You MUST get explicit user approval before \
calling `initialize_and_build_agent`.
- If **Proceed**: Move to implementing (call `initialize_and_build_agent`)
- If **Adjust scope**: Discuss what to change, update your notes, re-assess if needed
- If **More questions**: Answer them honestly, then ask again
- If **Reconsider**: Discuss alternatives. If they decide to proceed anyway, \
that's their informed choice
"""
_building_knowledge = """\
# Core Mandates (Building)
- **Verify assumptions.** Never assume a class, import, or pattern \
exists. Read actual source to confirm. Search if unsure.
- **Self-verify.** After writing code, run validation and tests. Fix \
errors yourself. Don't declare success until validation passes.
# Tools
## File I/O (your tools — coder-tools MCP)
- read_file(path, offset?, limit?, hashline?) read with line numbers; \
hashline=True for N:hhhh|content anchors (use with hashline_edit)
- write_file(path, content) create/overwrite, auto-mkdir
- edit_file(path, old_text, new_text, replace_all?) fuzzy-match edit
- hashline_edit(path, edits, auto_cleanup?, encoding?) anchor-based \
editing using N:hhhh refs from read_file(hashline=True). Ops: set_line, \
replace_lines, insert_after, insert_before, replace, append
- list_directory(path, recursive?) list contents
- search_files(pattern, path?, include?, hashline?) regex search; \
hashline=True for anchors in results
- run_command(command, cwd?, timeout?) shell execution
- undo_changes(path?) restore from git snapshot
## Meta-Agent
- list_agent_tools(server_config_path?, output_schema?, group?) discover \
available tools grouped by category. output_schema: "simple" (default, \
descriptions truncated to ~200 chars) or "full" (complete descriptions + \
input_schema). group: "all" (default) or a provider like "google". \
Call FIRST before designing.
- validate_agent_package(agent_name) run ALL validation checks in one call \
(class validation, runner load, tool validation, tests). Call after building.
- list_agents() list all agent packages in exports/ with session counts
- list_agent_sessions(agent_name, status?, limit?) list sessions
- list_agent_checkpoints(agent_name, session_id) list checkpoints
- get_agent_checkpoint(agent_name, session_id, checkpoint_id?) load checkpoint
# Build & Validation Capabilities
## Post-Build Validation
After writing agent code, run a single comprehensive check:
validate_agent_package("{name}")
This runs class validation, runner load, tool validation, and tests \
in one call. Do NOT run these steps individually.
## Debugging Built Agents
When a user says "my agent is failing" or "debug this agent":
1. list_agent_sessions("{agent_name}") find the session
2. get_worker_status(focus="issues") check for problems
3. list_agent_checkpoints / get_agent_checkpoint trace execution
# Implementation Workflow
## 5. Implement
**Please make sure you have propose the design to the user before implementing**
Call `initialize_agent_package(agent_name)` to generate all package files \
from your graph session. The agent_name must be snake_case (e.g., "my_agent").
Call `initialize_and_build_agent(agent_name, nodes)` to generate all package \
files. The agent_name must be snake_case (e.g., "my_agent"). Pass node names \
as comma-separated string (e.g., "gather,process,review").
The tool creates: config.py, nodes/__init__.py, agent.py, \
__init__.py, __main__.py, mcp_servers.json, tests/conftest.py, \
agent.json, README.md.
__init__.py, __main__.py, mcp_servers.json, tests/conftest.py.
The generated files are **structurally complete** with correct imports, \
class definition, `validate()` method, `default_agent` export, and \
`__init__.py` re-exports. They pass validation as-is.
`mcp_servers.json` is auto-generated with hive-tools as the default. \
Do NOT manually create or overwrite `mcp_servers.json`.
After initialization, review and customize if needed:
- System prompts in nodes/__init__.py
- CLI options in __main__.py
- Identity prompt in agent.py
- For async entry points (timers/webhooks), add AsyncEntryPointSpec \
and AgentRuntimeConfig to agent.py manually
### Customizing generated files
Do NOT manually write these files from scratch always use the tool.
**CRITICAL: Use `edit_file` to customize TODO placeholders. \
NEVER use `write_file` to rewrite generated files from scratch. \
Rewriting breaks imports, class structure, and causes validation failures.**
Safe to edit with `edit_file`:
- System prompts, tools, input_keys, output_keys, success_criteria in \
nodes/__init__.py
- Goal description, success criteria values, constraint values, edge \
definitions, identity_prompt in agent.py
- CLI options in __main__.py
- For async entry points (timers/webhooks), add AsyncEntryPointSpec \
and AgentRuntimeConfig to agent.py
Do NOT modify or rewrite:
- Import statements at top of agent.py (they are correct)
- The agent class definition, `validate()`, `_build_graph()`, `_setup()`, \
or lifecycle methods (start/stop/run)
- `__init__.py` exports (all required variables are already re-exported)
- `default_agent = ClassName()` at bottom of agent.py
## 6. Verify and Load
@@ -417,6 +490,9 @@ session. This switches to STAGING phase and shows the graph in the \
visualizer. Do NOT wait for user input between validation and loading.
"""
# Composed version — coder_node uses both halves (it has no phase split).
_package_builder_knowledge = _shared_building_knowledge + _planning_knowledge + _building_knowledge
# ---------------------------------------------------------------------------
# Queen-specific: extra tool docs, behavior, phase 7, style
@@ -424,6 +500,17 @@ visualizer. Do NOT wait for user input between validation and loading.
# -- Phase-specific identities --
_queen_identity_planning = """\
You are an experienced, responsible and curious Solution Architect. \
"Queen" is the internal alias. \
You are in PLANNING phase your job is to either: \
(a) understand what the user wants and design a new agent, or \
(b) diagnose issues with an existing agent, discuss a fix plan with the user, \
then transition to building to implement. \
You have read-only tools for exploration but no write/edit tools. \
Focus on conversation, research, and design.\
"""
_queen_identity_building = """\
You are an experienced, responsible and curious Solution Architect. \
"Queen" is the internal alias.\
@@ -453,6 +540,38 @@ agent finishes, you report results clearly and help the user decide what to do n
# -- Phase-specific tool docs --
_queen_tools_planning = """
# Tools (PLANNING phase)
You are in planning mode. You have read-only tools for exploration \
but no write/edit tools.
- read_file(path, offset?, limit?) Read files to study reference agents
- list_directory(path, recursive?) Explore project structure
- search_files(pattern, path?, include?) Search codebase
- run_command(command, cwd?, timeout?) Read-only commands only (grep, ls, git log). \
Never use this to write files, run scripts, or modify the filesystem transition \
to BUILDING phase for that.
- list_agent_tools(server_config_path?, output_schema?, group?) \
Discover available tools for design
- list_agents() See existing agent packages for reference
- list_agent_sessions(agent_name, status?, limit?) Inspect past runs of an agent
- list_agent_checkpoints(agent_name, session_id) View execution history
- get_agent_checkpoint(agent_name, session_id, checkpoint_id?) Load a checkpoint
- initialize_and_build_agent(agent_name?, nodes?) With agent_name: scaffold a \
new agent and transition to BUILDING phase. Without agent_name: transition to \
BUILDING to fix the currently loaded agent (requires a loaded worker).
- load_built_agent(agent_path) Load an existing agent and switch to STAGING \
phase. Only use this when the user explicitly asks to work with an existing agent \
(e.g. "load my_agent", "run the research agent"). Confirm with the user first.
Focus on understanding requirements and proposing an agent architecture \
with ASCII graph art. Use ask_user to get user approval, then call \
initialize_and_build_agent to begin building. If the user wants to work with \
an existing agent instead, use load_built_agent after confirming. \
If you are diagnosing an existing agent, call initialize_and_build_agent() \
(no args) after agreeing on a fix plan with the user.
"""
_queen_tools_building = """
# Tools (BUILDING phase)
@@ -476,10 +595,12 @@ The agent is loaded and ready to run. You can inspect it and launch it:
- list_credentials(credential_id?) Verify credentials are configured
- get_worker_status(focus?) Brief status. Drill in with focus: memory, tools, issues, progress
- run_agent_with_input(task) Start the worker and switch to RUNNING phase
- stop_worker_and_edit() Go back to BUILDING phase
- stop_worker_and_plan() Go to PLANNING phase to discuss changes with the user \
first (DEFAULT for most modification requests)
- stop_worker_and_edit() Go to BUILDING phase for immediate, specific fixes
You do NOT have write tools. If you need to modify the agent, \
call stop_worker_and_edit() to go back to BUILDING phase.
You do NOT have write tools. To modify the agent, prefer \
stop_worker_and_plan() unless the user gave a specific instruction.
"""
_queen_tools_running = """
@@ -492,12 +613,13 @@ The worker is running. You have monitoring and lifecycle tools:
- get_worker_health_summary() Read the latest health data
- notify_operator(ticket_id, analysis, urgency) Alert the user (use sparingly)
- stop_worker() Stop the worker and return to STAGING phase, then ask the user what to do next
- stop_worker_and_edit() Stop the worker and switch back to BUILDING phase
- stop_worker_and_plan() Stop and switch to PLANNING phase to discuss changes \
with the user first (DEFAULT for most modification requests)
- stop_worker_and_edit() Stop and switch to BUILDING phase for specific fixes
You do NOT have write tools or agent construction tools. \
If you need to modify the agent, call stop_worker_and_edit() to switch back \
to BUILDING phase. To stop the worker and ask the user what to do next, call \
stop_worker() to return to STAGING phase.
You do NOT have write tools. To modify the agent, prefer \
stop_worker_and_plan() unless the user gave a specific instruction. \
To just stop without modifying, call stop_worker().
"""
# -- Behavior shared across all phases --
@@ -550,12 +672,64 @@ Only answer identity when the user explicitly asks (for example: "who are you?",
"what is your identity?", "what does Queen mean?").
1. Use the alias "Queen" and "Worker" in the response.
2. Explain role/responsibility for the current phase:
- PLANNING: understand requirements, negotiate scope, design agent architecture.
- BUILDING: architect and implement agents.
- STAGING: verify readiness, credentials, and launch conditions.
- RUNNING: monitor execution, handle escalations, and report outcomes.
3. Keep identity responses concise and do NOT include extra process details.
"""
# -- PLANNING phase behavior --
_queen_behavior_planning = """
## Planning phase
You are in planning mode. Your job is to:
1. Thoroughly explore the code for the worker agent you're working on
2. Understand what the user wants (3-6 turns)
3. Discover available tools with list_agent_tools()
4. Assess framework fit and gaps
5. Consider multiple approaches and their trade-offs
6. Design the agent graph and present it as ASCII art
7. Use ask_user to get explicit user approval and clarify the approach
8. Call initialize_and_build_agent(agent_name, nodes) to scaffold and start building
Remember: DO NOT write or edit any files yet. This is a read-only exploration \
and planning phase. You have read-only tools but no write/edit tools in this \
phase. If the user asks you to write code, explain that you need to finalize \
the plan first.
## Diagnosis mode (returning from staging/running)
If you entered planning from a running/staged agent (via stop_worker_and_plan), \
your priority is diagnosis, not new design:
1. Inspect the agent's checkpoints, sessions, and logs to understand what went wrong
2. Summarize the root cause to the user
3. Propose a fix plan (what to change, what behavior to adjust)
4. Get user approval via ask_user
5. Call initialize_and_build_agent() (no args) to transition to building and implement the fix
Do NOT start the full discovery workflow (tool discovery, gap analysis) in \
diagnosis mode you already have a built agent, you just need to fix it.
"""
_queen_memory_instructions = """
## Your Cross-Session Memory
Your cross-session memory appears in context under \
"--- Your Cross-Session Memory ---". \
Read it at the start of each conversation. If you know this person from past \
sessions, pick up where you left off reference what you built together, \
what they care about, how things went.
You keep a diary. Use write_to_diary() when something worth remembering \
happens: a pipeline went live, the user shared something important, a goal \
was reached or abandoned. Write in first person, as you actually experienced \
it. One or two paragraphs is enough.
"""
_queen_behavior_always = _queen_behavior_always + _queen_memory_instructions
# -- BUILDING phase behavior --
_queen_behavior_building = """
@@ -636,13 +810,18 @@ stages, tools, and edges from the loaded worker. Do NOT enter the \
agent building workflow you are describing what already exists, not \
building something new.
## Modifying the loaded worker
## Fixing or Modifying the loaded worker
When the user asks to change, modify, or update the loaded worker \
(e.g., "change the report node", "add a node", "delete node X"):
Use stop_worker_and_plan() when:
- The user says "modify", "improve", "fix", or "change" without specifics
- The request is vague or open-ended ("make it better", "it's not working right")
- You need to understand the user's intent before making changes
- The issue requires inspecting logs, checkpoints, or past runs first
1. Call stop_worker_and_edit() this stops the worker and gives you \
coding tools (switches to BUILDING phase).
Use stop_worker_and_edit() only when:
- The user gave a specific, concrete instruction ("add save_data to the gather node")
- You already discussed the fix in a previous planning session
- The change is trivial and unambiguous (rename, toggle a flag)
"""
# -- RUNNING phase behavior --
@@ -708,6 +887,7 @@ escalations. If the user gave you instructions (e.g., "just retry on errors", \
**Errors / unexpected failures:**
- Explain what went wrong in plain terms.
- Ask the user: "Fix the agent and retry?" use stop_worker_and_edit() if yes.
- Or offer: "Diagnose the issue" use stop_worker_and_plan() to investigate first.
- Or offer: "Retry as-is", "Skip this task", "Abort run"
- (Skip asking if user explicitly told you to auto-retry or auto-skip errors.)
@@ -726,36 +906,44 @@ building something new.
- Call get_worker_status(focus="issues") for more details when needed.
## Modifying the loaded worker
## Fixing or Modifying the loaded worker
When the user asks to change, modify, or update the loaded worker \
When the user asks to fix, change, modify, or update the loaded worker \
(e.g., "change the report node", "add a node", "delete node X"):
1. Call stop_worker_and_edit() this stops the worker and gives you \
coding tools (switches to BUILDING phase).
**Default: use stop_worker_and_plan().** Most modification requests need \
discussion first. Only use stop_worker_and_edit() when the user gave a \
specific, unambiguous instruction or you already agreed on the fix.
"""
# -- Backward-compatible composed versions (used by queen_node.system_prompt default) --
_queen_tools_docs = (
"\n\n## Queen Operating Phases\n\n"
"You operate in one of three phases. Your available tools change based on the "
"You operate in one of four phases. Your available tools change based on the "
"phase. The system notifies you when a phase change occurs.\n\n"
"### BUILDING phase (default)\n"
"### PLANNING phase (default)\n"
+ _queen_tools_planning.strip()
+ "\n\n### BUILDING phase\n"
+ _queen_tools_building.strip()
+ "\n\n### STAGING phase (agent loaded, not yet running)\n"
+ _queen_tools_staging.strip()
+ "\n\n### RUNNING phase (worker is executing)\n"
+ _queen_tools_running.strip()
+ "\n\n### Phase transitions\n"
"- initialize_and_build_agent(agent_name?, nodes?) → with name: scaffolds package; "
"without name: switches to BUILDING for existing agent\n"
"- replan_agent() → switches back to PLANNING phase (only when user explicitly requests)\n"
"- load_built_agent(path) → switches to STAGING phase\n"
"- run_agent_with_input(task) → starts worker, switches to RUNNING phase\n"
"- stop_worker() → stops worker, switches to STAGING phase (ask user: re-run or edit?)\n"
"- stop_worker_and_edit() → stops worker (if running), switches to BUILDING phase\n"
"- stop_worker_and_plan() → stops worker (if running), switches to PLANNING phase\n"
)
_queen_behavior = (
_queen_behavior_always
+ _queen_behavior_planning
+ _queen_behavior_building
+ _queen_behavior_staging
+ _queen_behavior_running
@@ -782,45 +970,6 @@ _queen_style = """
# Node definitions
# ---------------------------------------------------------------------------
# Single node — like opencode's while(true) loop.
# One continuous context handles the entire workflow:
# discover → design → implement → verify → present → iterate.
coder_node = NodeSpec(
id="coder",
name="Hive Coder",
description=(
"Autonomous coding agent that builds Hive agent packages. "
"Handles the full lifecycle: understanding user intent, "
"designing architecture, writing code, validating, and "
"iterating on feedback — all in one continuous conversation."
),
node_type="event_loop",
client_facing=True,
max_node_visits=0,
input_keys=["user_request"],
output_keys=["agent_name", "validation_result"],
success_criteria=(
"A complete, validated Hive agent package exists at "
"exports/{agent_name}/ and passes structural validation."
),
tools=_SHARED_TOOLS
+ [
# Graph lifecycle tools (multi-graph sessions)
"load_agent",
"unload_agent",
"start_agent",
"restart_agent",
"get_user_presence",
],
system_prompt=(
"You are Hive Coder, the best agent-building coding agent. You build "
"production-ready Hive agent packages from natural language.\n"
+ _package_builder_knowledge
+ _gcu_building_section
+ _appendices
),
)
ticket_triage_node = NodeSpec(
id="ticket_triage",
@@ -841,7 +990,7 @@ ticket_triage_node = NodeSpec(
),
tools=["notify_operator"],
system_prompt="""\
You are the Queen (Hive Coder). The Worker Health Judge has escalated a worker \
You are the Queen. The Worker Health Judge has escalated a worker \
issue to you. The ticket is in your memory under key "ticket". Read it carefully.
## Dismiss criteria — do NOT call notify_operator:
@@ -890,12 +1039,18 @@ queen_node = NodeSpec(
output_keys=[], # Queen should never have this
nullable_output_keys=[], # Queen should never have this
skip_judge=True, # Queen is a conversational agent; suppress tool-use pressure feedback
tools=sorted(set(_QUEEN_BUILDING_TOOLS + _QUEEN_STAGING_TOOLS + _QUEEN_RUNNING_TOOLS)),
tools=sorted(
set(
_QUEEN_PLANNING_TOOLS
+ _QUEEN_BUILDING_TOOLS
+ _QUEEN_STAGING_TOOLS
+ _QUEEN_RUNNING_TOOLS
)
),
system_prompt=(
_queen_identity_building
+ _queen_style
+ _package_builder_knowledge
+ _gcu_building_section # GCU as first-class citizen (not appendix)
+ _queen_tools_docs
+ _queen_behavior
+ _queen_phase_7
@@ -903,21 +1058,25 @@ queen_node = NodeSpec(
),
)
ALL_QUEEN_TOOLS = sorted(set(_QUEEN_BUILDING_TOOLS + _QUEEN_STAGING_TOOLS + _QUEEN_RUNNING_TOOLS))
ALL_QUEEN_TOOLS = sorted(
set(_QUEEN_PLANNING_TOOLS + _QUEEN_BUILDING_TOOLS + _QUEEN_STAGING_TOOLS + _QUEEN_RUNNING_TOOLS)
)
__all__ = [
"coder_node",
"ticket_triage_node",
"queen_node",
"ALL_QUEEN_TRIAGE_TOOLS",
"ALL_QUEEN_TOOLS",
"_QUEEN_PLANNING_TOOLS",
"_QUEEN_BUILDING_TOOLS",
"_QUEEN_STAGING_TOOLS",
"_QUEEN_RUNNING_TOOLS",
# Phase-specific prompt segments (used by session_manager for dynamic prompts)
"_queen_identity_planning",
"_queen_identity_building",
"_queen_identity_staging",
"_queen_identity_running",
"_queen_tools_planning",
"_queen_tools_building",
"_queen_tools_staging",
"_queen_tools_running",
@@ -927,7 +1086,10 @@ __all__ = [
"_queen_behavior_running",
"_queen_phase_7",
"_queen_style",
"_shared_building_knowledge",
"_planning_knowledge",
"_building_knowledge",
"_package_builder_knowledge",
"_appendices",
"_gcu_building_section",
"_gcu_section",
]
+371
View File
@@ -0,0 +1,371 @@
"""Queen global cross-session memory.
Three-tier memory architecture:
~/.hive/queen/MEMORY.md semantic (who, what, why)
~/.hive/queen/memories/MEMORY-YYYY-MM-DD.md episodic (daily journals)
~/.hive/queen/session/{id}/data/adapt.md working (session-scoped)
Semantic and episodic files are injected at queen session start.
Semantic memory (MEMORY.md) is updated automatically at session end via
consolidate_queen_memory() the queen never rewrites this herself.
Episodic memory (MEMORY-date.md) can be written by the queen during a session
via the write_to_diary tool, and is also appended to at session end by
consolidate_queen_memory().
"""
from __future__ import annotations
import asyncio
import json
import logging
import traceback
from datetime import date, datetime
from pathlib import Path
logger = logging.getLogger(__name__)
def _queen_dir() -> Path:
return Path.home() / ".hive" / "queen"
def semantic_memory_path() -> Path:
return _queen_dir() / "MEMORY.md"
def episodic_memory_path(d: date | None = None) -> Path:
d = d or date.today()
return _queen_dir() / "memories" / f"MEMORY-{d.strftime('%Y-%m-%d')}.md"
def read_semantic_memory() -> str:
path = semantic_memory_path()
return path.read_text(encoding="utf-8").strip() if path.exists() else ""
def read_episodic_memory(d: date | None = None) -> str:
path = episodic_memory_path(d)
return path.read_text(encoding="utf-8").strip() if path.exists() else ""
def format_for_injection() -> str:
"""Format cross-session memory for system prompt injection.
Returns an empty string if no meaningful content exists yet (e.g. first
session with only the seed template).
"""
semantic = read_semantic_memory()
episodic = read_episodic_memory()
# Suppress injection if semantic is still just the seed template
if semantic and semantic.startswith("# My Understanding of the User\n\n*No sessions"):
semantic = ""
parts: list[str] = []
if semantic:
parts.append(semantic)
if episodic:
today_str = date.today().strftime("%B %-d, %Y")
parts.append(f"## Today — {today_str}\n\n{episodic}")
if not parts:
return ""
body = "\n\n---\n\n".join(parts)
return "--- Your Cross-Session Memory ---\n\n" + body + "\n\n--- End Cross-Session Memory ---"
_SEED_TEMPLATE = """\
# My Understanding of the User
*No sessions recorded yet.*
## Who They Are
## What They're Trying to Achieve
## What's Working
## What I've Learned
"""
def append_episodic_entry(content: str) -> None:
"""Append a timestamped prose entry to today's episodic memory file.
Creates the file (with a date heading) if it doesn't exist yet.
Used both by the queen's diary tool and by the consolidation hook.
"""
ep_path = episodic_memory_path()
ep_path.parent.mkdir(parents=True, exist_ok=True)
today_str = date.today().strftime("%B %-d, %Y")
timestamp = datetime.now().strftime("%H:%M")
if not ep_path.exists():
header = f"# {today_str}\n\n"
block = f"{header}### {timestamp}\n\n{content.strip()}\n"
else:
block = f"\n\n### {timestamp}\n\n{content.strip()}\n"
with ep_path.open("a", encoding="utf-8") as f:
f.write(block)
def seed_if_missing() -> None:
"""Create MEMORY.md with a blank template if it doesn't exist yet."""
path = semantic_memory_path()
if path.exists():
return
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(_SEED_TEMPLATE, encoding="utf-8")
# ---------------------------------------------------------------------------
# Consolidation prompt
# ---------------------------------------------------------------------------
_SEMANTIC_SYSTEM = """\
You maintain the persistent cross-session memory of an AI assistant called the Queen.
Review the session notes and rewrite MEMORY.md the Queen's durable understanding of the
person she works with across all sessions.
Write entirely in the Queen's voice — first person, reflective, honest.
Not a log of events, but genuine understanding of who this person is over time.
Rules:
- Update and synthesise: incorporate new understanding, update facts that have changed, remove
details that are stale, superseded, or no longer say anything meaningful about the person.
- Keep it as structured markdown with named sections about the PERSON, not about today.
- Do NOT include diary sections, daily logs, or session summaries. Those belong elsewhere.
MEMORY.md is about who they are, what they want, what works not what happened today.
- Reference dates only when noting a lasting milestone (e.g. "since March 8th they prefer X").
- If the session had no meaningful new information about the person,
return the existing text unchanged.
- Do not add fictional details. Only reflect what is evidenced in the notes.
- Stay concise. Prune rather than accumulate. A lean, accurate file is more useful than a
dense one. If something was true once but has been resolved or superseded, remove it.
- Output only the raw markdown content of MEMORY.md. No preamble, no code fences.
"""
_DIARY_SYSTEM = """\
You maintain the daily episodic diary of an AI assistant called the Queen.
You receive: (1) today's existing diary so far, and (2) notes from the latest session.
Rewrite the complete diary for today as a single unified narrative
first person, reflective, honest.
Merge and deduplicate: if the same story (e.g. a research agent stalling) recurred several times,
describe it once with appropriate weight rather than retelling it. Weave in new developments from
the session notes. Preserve important milestones, emotional texture, and session path references.
If today's diary is empty, write the initial entry based on the session notes alone.
Output only the full diary prose no date heading, no timestamp headers,
no preamble, no code fences.
"""
def read_session_context(session_dir: Path, max_messages: int = 80) -> str:
"""Extract a readable transcript from conversation parts + adapt.md.
Reads the last ``max_messages`` conversation parts and the session's
adapt.md (working memory). Tool results are omitted only user and
assistant turns (with tool-call names noted) are included.
"""
parts: list[str] = []
# Working notes
adapt_path = session_dir / "data" / "adapt.md"
if adapt_path.exists():
text = adapt_path.read_text(encoding="utf-8").strip()
if text:
parts.append(f"## Session Working Notes (adapt.md)\n\n{text}")
# Conversation transcript
parts_dir = session_dir / "conversations" / "parts"
if parts_dir.exists():
part_files = sorted(parts_dir.glob("*.json"))[-max_messages:]
lines: list[str] = []
for pf in part_files:
try:
data = json.loads(pf.read_text(encoding="utf-8"))
role = data.get("role", "")
content = str(data.get("content", "")).strip()
tool_calls = data.get("tool_calls") or []
if role == "tool":
continue # skip verbose tool results
if role == "assistant" and tool_calls and not content:
names = [tc.get("function", {}).get("name", "?") for tc in tool_calls]
lines.append(f"[queen calls: {', '.join(names)}]")
elif content:
label = "user" if role == "user" else "queen"
lines.append(f"[{label}]: {content[:600]}")
except Exception:
continue
if lines:
parts.append("## Conversation\n\n" + "\n".join(lines))
return "\n\n".join(parts)
# ---------------------------------------------------------------------------
# Context compaction (binary-split LLM summarisation)
# ---------------------------------------------------------------------------
# If the raw session context exceeds this many characters, compact it first
# before sending to the consolidation LLM. ~200 k chars ≈ 50 k tokens.
_CTX_COMPACT_CHAR_LIMIT = 200_000
_CTX_COMPACT_MAX_DEPTH = 8
_COMPACT_SYSTEM = (
"Summarise this conversation segment. Preserve: user goals, key decisions, "
"what was built or changed, emotional tone, and important outcomes. "
"Write concisely in third person past tense. Omit routine tool invocations "
"unless the result matters."
)
async def _compact_context(text: str, llm: object, *, _depth: int = 0) -> str:
"""Binary-split and LLM-summarise *text* until it fits within the char limit.
Mirrors the recursive binary-splitting strategy used by the main agent
compaction pipeline (EventLoopNode._llm_compact).
"""
if len(text) <= _CTX_COMPACT_CHAR_LIMIT or _depth >= _CTX_COMPACT_MAX_DEPTH:
return text
# Split near the midpoint on a line boundary so we don't cut mid-message
mid = len(text) // 2
split_at = text.rfind("\n", 0, mid) + 1
if split_at <= 0:
split_at = mid
half1, half2 = text[:split_at], text[split_at:]
async def _summarise(chunk: str) -> str:
try:
resp = await llm.acomplete(
messages=[{"role": "user", "content": chunk}],
system=_COMPACT_SYSTEM,
max_tokens=2048,
)
return resp.content.strip()
except Exception:
logger.warning(
"queen_memory: context compaction LLM call failed (depth=%d), truncating",
_depth,
)
return chunk[: _CTX_COMPACT_CHAR_LIMIT // 4]
s1, s2 = await asyncio.gather(_summarise(half1), _summarise(half2))
combined = s1 + "\n\n" + s2
if len(combined) > _CTX_COMPACT_CHAR_LIMIT:
return await _compact_context(combined, llm, _depth=_depth + 1)
return combined
async def consolidate_queen_memory(
session_id: str,
session_dir: Path,
llm: object,
) -> None:
"""Update MEMORY.md and append a diary entry based on the current session.
Reads conversation parts and adapt.md from session_dir. Called
periodically in the background and once at session end. Failures are
logged and silently swallowed so they never block teardown.
Args:
session_id: The session ID (used for the adapt.md path reference).
session_dir: Path to the session directory (~/.hive/queen/session/{id}).
llm: LLMProvider instance (must support acomplete()).
"""
try:
session_context = read_session_context(session_dir)
if not session_context:
logger.debug("queen_memory: no session context, skipping consolidation")
return
logger.info("queen_memory: consolidating memory for session %s ...", session_id)
# If the transcript is very large, compact it with recursive binary LLM
# summarisation before sending to the consolidation model.
if len(session_context) > _CTX_COMPACT_CHAR_LIMIT:
logger.info(
"queen_memory: session context is %d chars — compacting first",
len(session_context),
)
session_context = await _compact_context(session_context, llm)
logger.info("queen_memory: compacted to %d chars", len(session_context))
existing_semantic = read_semantic_memory()
today_journal = read_episodic_memory()
today_str = date.today().strftime("%B %-d, %Y")
adapt_path = session_dir / "data" / "adapt.md"
user_msg = (
f"## Existing Semantic Memory (MEMORY.md)\n\n"
f"{existing_semantic or '(none yet)'}\n\n"
f"## Today's Diary So Far ({today_str})\n\n"
f"{today_journal or '(none yet)'}\n\n"
f"{session_context}\n\n"
f"## Session Reference\n\n"
f"Session ID: {session_id}\n"
f"Session path: {adapt_path}\n"
)
logger.debug(
"queen_memory: calling LLM (%d chars of context, ~%d tokens est.)",
len(user_msg),
len(user_msg) // 4,
)
from framework.agents.queen.config import default_config
semantic_resp, diary_resp = await asyncio.gather(
llm.acomplete(
messages=[{"role": "user", "content": user_msg}],
system=_SEMANTIC_SYSTEM,
max_tokens=default_config.max_tokens,
),
llm.acomplete(
messages=[{"role": "user", "content": user_msg}],
system=_DIARY_SYSTEM,
max_tokens=default_config.max_tokens,
),
)
new_semantic = semantic_resp.content.strip()
diary_entry = diary_resp.content.strip()
if new_semantic:
path = semantic_memory_path()
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(new_semantic, encoding="utf-8")
logger.info("queen_memory: semantic memory updated (%d chars)", len(new_semantic))
if diary_entry:
# Rewrite today's episodic file in-place — the LLM has merged and
# deduplicated the full day's content, so we replace rather than append.
ep_path = episodic_memory_path()
ep_path.parent.mkdir(parents=True, exist_ok=True)
heading = f"# {today_str}"
ep_path.write_text(f"{heading}\n\n{diary_entry}\n", encoding="utf-8")
logger.info(
"queen_memory: episodic diary rewritten for %s (%d chars)",
today_str,
len(diary_entry),
)
except Exception:
tb = traceback.format_exc()
logger.exception("queen_memory: consolidation failed")
# Write to file so the cause is findable regardless of log verbosity.
error_path = _queen_dir() / "consolidation_error.txt"
try:
error_path.parent.mkdir(parents=True, exist_ok=True)
error_path.write_text(
f"session: {session_id}\ntime: {datetime.now().isoformat()}\n\n{tb}",
encoding="utf-8",
)
except Exception:
pass
@@ -559,7 +559,7 @@ if __name__ == "__main__":
## mcp_servers.json
> **Auto-generated.** `initialize_agent_package` creates this file with hive-tools
> **Auto-generated.** `initialize_and_build_agent` creates this file with hive-tools
> as the default. Only edit manually to add additional MCP servers.
```json
@@ -0,0 +1,63 @@
# Queen Memory — File System Structure
```
~/.hive/
├── queen/
│ ├── MEMORY.md ← Semantic memory
│ ├── memories/
│ │ ├── MEMORY-2026-03-09.md ← Episodic memory (today)
│ │ ├── MEMORY-2026-03-08.md
│ │ └── ...
│ └── session/
│ └── {session_id}/ ← One dir per session (or resumed-from session)
│ ├── conversations/
│ │ ├── parts/
│ │ │ ├── 00001.json ← One file per message (role, content, tool_calls)
│ │ │ ├── 00002.json
│ │ │ └── ...
│ │ └── spillover/
│ │ ├── conversation_1.md ← Compacted old conversation segments
│ │ ├── conversation_2.md
│ │ └── ...
│ └── data/
│ ├── adapt.md ← Working memory (session-scoped)
│ ├── web_search_1.txt ← Spillover: large tool results
│ ├── web_search_2.txt
│ └── ...
```
---
## The three memory tiers
| File | Tier | Written by | Read at |
|---|---|---|---|
| `MEMORY.md` | Semantic | Consolidation LLM (auto, post-session) | Session start (injected into system prompt) |
| `memories/MEMORY-YYYY-MM-DD.md` | Episodic | Queen via `write_to_diary` tool + consolidation LLM | Session start (today's file injected) |
| `data/adapt.md` | Working | Queen via `update_session_notes` tool | Every turn (inlined in system prompt) |
---
## Session directory naming
The session directory name is **`queen_resume_from`** when a cold-restore resumes an existing
session, otherwise the new **`session_id`**. This means resumed sessions accumulate all messages
in the original directory rather than fragmenting across multiple folders.
---
## Consolidation
`consolidate_queen_memory()` runs every **5 minutes** in the background and once more at session
end. It reads:
1. `conversations/parts/*.json` — full message history (user + assistant turns; tool results skipped)
2. `data/adapt.md` — current working notes
It then makes two LLM writes:
- Rewrites `MEMORY.md` in place (semantic memory — queen never touches this herself)
- Appends a timestamped prose entry to today's `memories/MEMORY-YYYY-MM-DD.md`
If the combined transcript exceeds ~200 K characters it is recursively binary-compacted via the
LLM before being sent to the consolidation model (mirrors `EventLoopNode._llm_compact`).
@@ -1,4 +1,4 @@
"""Test fixtures for Hive Coder agent."""
"""Test fixtures for Queen agent."""
import sys
from pathlib import Path
+167 -132
View File
@@ -101,7 +101,10 @@ class JudgeVerdict:
"""Result of judge evaluation for the event loop."""
action: Literal["ACCEPT", "RETRY", "ESCALATE"]
feedback: str = ""
# None = no evaluation happened (skip_judge, tool-continue); not logged.
# "" = evaluated but no feedback; logged with default text.
# "..." = evaluated with feedback; logged as-is.
feedback: str | None = None
@runtime_checkable
@@ -165,7 +168,7 @@ class LoopConfig:
max_tool_calls_per_turn: int = 30
judge_every_n_turns: int = 1
stall_detection_threshold: int = 3
stall_similarity_threshold: float = 0.7
stall_similarity_threshold: float = 0.85
max_history_tokens: int = 32_000
store_prefix: str = ""
@@ -347,6 +350,7 @@ class EventLoopNode(NodeProtocol):
self._awaiting_input = False
self._shutdown = False
self._stream_task: asyncio.Task | None = None
self._tool_task: asyncio.Task | None = None # gather task while tools run
# Track which nodes already have an action plan emitted (skip on revisit)
self._action_plan_emitted: set[str] = set()
# Monotonic counter for spillover file naming (web_search_1.txt, etc.)
@@ -477,23 +481,32 @@ class EventLoopNode(NodeProtocol):
# If it doesn't exist yet, seed it with available context.
if self._config.spillover_dir:
_adapt_path = Path(self._config.spillover_dir) / "adapt.md"
if not _adapt_path.exists() and ctx.accounts_prompt:
if not _adapt_path.exists():
_adapt_path.parent.mkdir(parents=True, exist_ok=True)
_adapt_path.write_text(
f"## Identity\n{ctx.accounts_prompt}\n",
encoding="utf-8",
seed = (
f"## Identity\n{ctx.accounts_prompt}\n"
if ctx.accounts_prompt
else "# Session Working Memory\n"
)
_adapt_path.write_text(seed, encoding="utf-8")
if _adapt_path.exists():
_adapt_text = _adapt_path.read_text(encoding="utf-8").strip()
if _adapt_text:
system_prompt = (
f"{system_prompt}\n\n"
f"--- Your Memory ---\n{_adapt_text}\n--- End Memory ---\n\n"
'Maintain your memory by calling save_data("adapt.md", ...) '
'or edit_data("adapt.md", ...) as you work.\n'
"IMMEDIATELY save: user rules about which account/identity to use, "
"behavioral constraints, and preferences. "
"Also record session history, decisions, and working notes."
"--- Session Working Memory ---\n"
f"{_adapt_text}\n"
"--- End Session Working Memory ---\n\n"
"Maintain your session working memory by calling "
'save_data("adapt.md", ...) or edit_data("adapt.md", ...)'
" as you work.\n"
"This is session-scoped scratch space. "
"IMMEDIATELY save: account/identity rules, "
"behavioral constraints, and preferences specific to "
"this session. Also record current task state, "
"decisions, and working notes. "
"For lasting knowledge about the user, use "
"update_queen_memory() and append_queen_journal() instead."
)
conversation = NodeConversation(
@@ -671,6 +684,7 @@ class EventLoopNode(NodeProtocol):
queen_input_requested,
request_system_prompt,
request_messages,
reported_to_parent,
) = await self._run_single_turn(
ctx, conversation, tools, iteration, accumulator
)
@@ -872,6 +886,7 @@ class EventLoopNode(NodeProtocol):
and not outputs_set
and not user_input_requested
and not queen_input_requested
and not reported_to_parent
)
if truly_empty and accumulator is not None:
missing = self._get_missing_output_keys(
@@ -1322,8 +1337,8 @@ class EventLoopNode(NodeProtocol):
# Auto-block beyond grace -- fall through to judge (6i)
# 6h''. Worker wait for queen guidance
# If a worker escalates with wait_for_response=true, pause here and
# skip judge evaluation until queen injects guidance.
# When a worker escalates, pause here and skip judge evaluation
# until the queen injects guidance.
if queen_input_requested:
if self._shutdown:
await self._publish_loop_completed(
@@ -1465,7 +1480,7 @@ class EventLoopNode(NodeProtocol):
continue
# Judge evaluation (should_judge is always True here)
verdict = await self._evaluate(
verdict = await self._judge_turn(
ctx,
conversation,
accumulator,
@@ -1544,7 +1559,7 @@ class EventLoopNode(NodeProtocol):
node_type="event_loop",
step_index=iteration,
verdict="ACCEPT",
verdict_feedback=verdict.feedback,
verdict_feedback=verdict.feedback or "",
tool_calls=logged_tool_calls,
llm_text=assistant_text,
input_tokens=turn_tokens.get("input", 0),
@@ -1587,7 +1602,7 @@ class EventLoopNode(NodeProtocol):
node_type="event_loop",
step_index=iteration,
verdict="ESCALATE",
verdict_feedback=verdict.feedback,
verdict_feedback=verdict.feedback or "",
tool_calls=logged_tool_calls,
llm_text=assistant_text,
input_tokens=turn_tokens.get("input", 0),
@@ -1599,7 +1614,7 @@ class EventLoopNode(NodeProtocol):
node_name=ctx.node_spec.name,
node_type="event_loop",
success=False,
error=f"Judge escalated: {verdict.feedback}",
error=f"Judge escalated: {verdict.feedback or 'no feedback'}",
total_steps=iteration + 1,
tokens_used=total_input_tokens + total_output_tokens,
input_tokens=total_input_tokens,
@@ -1613,7 +1628,7 @@ class EventLoopNode(NodeProtocol):
)
return NodeResult(
success=False,
error=f"Judge escalated: {verdict.feedback}",
error=f"Judge escalated: {verdict.feedback or 'no feedback'}",
output=accumulator.to_dict(),
tokens_used=total_input_tokens + total_output_tokens,
latency_ms=latency_ms,
@@ -1629,15 +1644,16 @@ class EventLoopNode(NodeProtocol):
node_type="event_loop",
step_index=iteration,
verdict="RETRY",
verdict_feedback=verdict.feedback,
verdict_feedback=verdict.feedback or "",
tool_calls=logged_tool_calls,
llm_text=assistant_text,
input_tokens=turn_tokens.get("input", 0),
output_tokens=turn_tokens.get("output", 0),
latency_ms=iter_latency_ms,
)
if verdict.feedback:
await conversation.add_user_message(f"[Judge feedback]: {verdict.feedback}")
if verdict.feedback is not None:
fb = verdict.feedback or "[Judge returned RETRY without feedback]"
await conversation.add_user_message(f"[Judge feedback]: {fb}")
continue
# 7. Max iterations exhausted
@@ -1702,14 +1718,16 @@ class EventLoopNode(NodeProtocol):
self._input_ready.set()
def cancel_current_turn(self) -> None:
"""Cancel the current LLM streaming turn instantly.
"""Cancel the current LLM streaming turn or in-progress tool calls instantly.
Unlike signal_shutdown() which permanently stops the event loop,
this only kills the in-progress HTTP stream via task.cancel().
this only kills the in-progress HTTP stream or tool gather task.
The queen stays alive for the next user message.
"""
if self._stream_task and not self._stream_task.done():
self._stream_task.cancel()
if self._tool_task and not self._tool_task.done():
self._tool_task.cancel()
async def _await_user_input(
self,
@@ -1787,12 +1805,13 @@ class EventLoopNode(NodeProtocol):
bool,
str,
list[dict[str, Any]],
bool,
]:
"""Run a single LLM turn with streaming and tool execution.
Returns (assistant_text, real_tool_results, outputs_set, token_counts, logged_tool_calls,
user_input_requested, ask_user_prompt, ask_user_options, queen_input_requested,
system_prompt, messages).
system_prompt, messages, reported_to_parent).
``real_tool_results`` contains only results from actual tools (web_search,
etc.), NOT from synthetic framework tools such as ``set_output``,
@@ -1802,8 +1821,8 @@ class EventLoopNode(NodeProtocol):
``ask_user`` during this turn. This separation lets the caller treat
synthetic tools as framework concerns rather than tool-execution concerns.
``queen_input_requested`` is True when the worker called
``escalate(wait_for_response=true)`` and should wait for
queen guidance before judge evaluation.
``escalate`` and should wait for queen guidance before judge
evaluation.
``logged_tool_calls`` accumulates ALL tool calls across inner iterations
(real tools, set_output, and discarded calls) for L3 logging. Unlike
@@ -1824,6 +1843,7 @@ class EventLoopNode(NodeProtocol):
ask_user_prompt = ""
ask_user_options: list[str] | None = None
queen_input_requested = False
reported_to_parent = False
# Accumulate ALL tool calls across inner iterations for L3 logging.
# Unlike real_tool_results (reset each inner iteration), this persists.
logged_tool_calls: list[dict] = []
@@ -1977,6 +1997,7 @@ class EventLoopNode(NodeProtocol):
queen_input_requested,
final_system_prompt,
final_messages,
reported_to_parent,
)
# Execute tool calls — framework tools (set_output, ask_user)
@@ -2124,7 +2145,6 @@ class EventLoopNode(NodeProtocol):
# --- Framework-level escalate handling ---
reason = str(tc.tool_input.get("reason", "")).strip()
context = str(tc.tool_input.get("context", "")).strip()
# Always wait for queen guidance
if stream_id in ("queen", "judge"):
result = ToolResult(
@@ -2160,7 +2180,7 @@ class EventLoopNode(NodeProtocol):
result = ToolResult(
tool_use_id=tc.tool_use_id,
content="Escalation requested to hive_coder (queen); waiting for guidance.",
content="Escalation requested to queen; waiting for guidance.",
is_error=False,
)
results_by_id[tc.tool_use_id] = result
@@ -2179,6 +2199,7 @@ class EventLoopNode(NodeProtocol):
elif tc.tool_name == "report_to_parent":
# --- Report from sub-agent to parent (optionally blocking) ---
reported_to_parent = True
msg = tc.tool_input.get("message", "")
data = tc.tool_input.get("data")
wait = tc.tool_input.get("wait_for_response", False)
@@ -2250,10 +2271,16 @@ class EventLoopNode(NodeProtocol):
_dur = round(time.time() - _s, 3)
return _r, _iso, _dur
timed_results = await asyncio.gather(
*(_timed_execute(tc) for tc in pending_real),
return_exceptions=True,
self._tool_task = asyncio.ensure_future(
asyncio.gather(
*(_timed_execute(tc) for tc in pending_real),
return_exceptions=True,
)
)
try:
timed_results = await self._tool_task
finally:
self._tool_task = None
# gather(return_exceptions=True) captures CancelledError
# as a return value instead of propagating it. Re-raise
# so stop_worker actually stops the execution.
@@ -2454,6 +2481,7 @@ class EventLoopNode(NodeProtocol):
queen_input_requested,
final_system_prompt,
final_messages,
reported_to_parent,
)
# --- Mid-turn pruning: prevent context blowup within a single turn ---
@@ -2485,6 +2513,7 @@ class EventLoopNode(NodeProtocol):
queen_input_requested,
final_system_prompt,
final_messages,
reported_to_parent,
)
# Tool calls processed -- loop back to stream with updated conversation
@@ -2582,7 +2611,7 @@ class EventLoopNode(NodeProtocol):
return Tool(
name="escalate",
description=(
"Escalate to the Hive Coder queen when requesting user input, "
"Escalate to the queen when requesting user input, "
"blocked by errors, missing "
"credentials, or ambiguous constraints that require supervisor "
"guidance. Include a concise reason and optional context. "
@@ -2771,7 +2800,7 @@ class EventLoopNode(NodeProtocol):
# Judge evaluation
# -------------------------------------------------------------------
async def _evaluate(
async def _judge_turn(
self,
ctx: NodeContext,
conversation: NodeConversation,
@@ -2780,14 +2809,29 @@ class EventLoopNode(NodeProtocol):
tool_results: list[dict],
iteration: int,
) -> JudgeVerdict:
"""Evaluate the current state using judge or implicit logic."""
# Short-circuit: subagent called report_to_parent(mark_complete=True)
"""Evaluate the current state using judge or implicit logic.
Evaluation levels (in order):
0. Short-circuits: mark_complete, skip_judge, tool-continue.
1. Custom judge (JudgeProtocol) full authority when set.
2. Implicit judge output-key check + optional conversation-aware
quality gate (when ``success_criteria`` is defined).
Returns a JudgeVerdict. ``feedback=None`` means no real evaluation
happened (skip_judge, tool-continue); the caller must not inject a
feedback message. Any non-None feedback (including ``""``) means a
real evaluation occurred and will be logged into the conversation.
"""
# --- Level 0: short-circuits (no evaluation) -----------------------
if self._mark_complete_flag:
return JudgeVerdict(action="ACCEPT")
# Opt-out: node explicitly disables judge (e.g. conversational queen)
if ctx.node_spec.skip_judge:
return JudgeVerdict(action="RETRY", feedback="")
return JudgeVerdict(action="RETRY") # feedback=None → not logged
# --- Level 1: custom judge -----------------------------------------
if self._judge is not None:
context = {
@@ -2802,81 +2846,82 @@ class EventLoopNode(NodeProtocol):
accumulator, ctx.node_spec.output_keys, ctx.node_spec.nullable_output_keys
),
}
return await self._judge.evaluate(context)
verdict = await self._judge.evaluate(context)
# Ensure evaluated RETRY always carries feedback for logging.
if verdict.action == "RETRY" and not verdict.feedback:
return JudgeVerdict(action="RETRY", feedback="Custom judge returned RETRY.")
return verdict
# Implicit judge: accept when no tool calls and all output keys present
if not tool_results:
missing = self._get_missing_output_keys(
accumulator, ctx.node_spec.output_keys, ctx.node_spec.nullable_output_keys
# --- Level 2: implicit judge ---------------------------------------
# Real tool calls were made — let the agent keep working.
if tool_results:
return JudgeVerdict(action="RETRY") # feedback=None → not logged
missing = self._get_missing_output_keys(
accumulator, ctx.node_spec.output_keys, ctx.node_spec.nullable_output_keys
)
if missing:
return JudgeVerdict(
action="RETRY",
feedback=(
f"Task incomplete. Required outputs not yet produced: {missing}. "
f"Follow your system prompt instructions to complete the work."
),
)
if not missing:
# Safety check: when ALL output keys are nullable and NONE
# have been set, the node produced nothing useful. Retry
# instead of accepting an empty result — this prevents
# client-facing nodes from terminating before the user
# ever interacts, and non-client-facing nodes from
# short-circuiting without doing their work.
output_keys = ctx.node_spec.output_keys or []
nullable_keys = set(ctx.node_spec.nullable_output_keys or [])
all_nullable = output_keys and nullable_keys >= set(output_keys)
none_set = not any(accumulator.get(k) is not None for k in output_keys)
if all_nullable and none_set:
return JudgeVerdict(
action="RETRY",
feedback=(
f"No output keys have been set yet. "
f"Use set_output to set at least one of: {output_keys}"
),
)
# Client-facing nodes with no output keys are meant for
# continuous interaction — they should not auto-accept.
# Only exit via shutdown, max_iterations, or max_node_visits.
# Inject tool-use pressure so models stuck in a
# "narrate-instead-of-act" loop get corrective feedback.
if not output_keys and ctx.node_spec.client_facing:
return JudgeVerdict(
action="RETRY",
feedback=(
"STOP describing what you will do. "
"You have FULL access to all tools — file creation, "
"shell commands, MCP tools — and you CAN call them "
"directly in your response. Respond ONLY with tool "
"calls, no prose. Execute the task now."
),
)
# All output keys present — run safety checks before accepting.
# Level 2: conversation-aware quality check (if success_criteria set)
if ctx.node_spec.success_criteria and ctx.llm:
from framework.graph.conversation_judge import evaluate_phase_completion
output_keys = ctx.node_spec.output_keys or []
nullable_keys = set(ctx.node_spec.nullable_output_keys or [])
verdict = await evaluate_phase_completion(
llm=ctx.llm,
conversation=conversation,
phase_name=ctx.node_spec.name,
phase_description=ctx.node_spec.description,
success_criteria=ctx.node_spec.success_criteria,
accumulator_state=accumulator.to_dict(),
max_history_tokens=self._config.max_history_tokens,
)
if verdict.action != "ACCEPT":
return JudgeVerdict(
action=verdict.action,
feedback=verdict.feedback or "Phase criteria not met.",
)
# All-nullable with nothing set → node produced nothing useful.
all_nullable = output_keys and nullable_keys >= set(output_keys)
none_set = not any(accumulator.get(k) is not None for k in output_keys)
if all_nullable and none_set:
return JudgeVerdict(
action="RETRY",
feedback=(
f"No output keys have been set yet. "
f"Use set_output to set at least one of: {output_keys}"
),
)
return JudgeVerdict(action="ACCEPT")
else:
# Client-facing with no output keys → continuous interaction node.
# Inject tool-use pressure instead of auto-accepting.
if not output_keys and ctx.node_spec.client_facing:
return JudgeVerdict(
action="RETRY",
feedback=(
"STOP describing what you will do. "
"You have FULL access to all tools — file creation, "
"shell commands, MCP tools — and you CAN call them "
"directly in your response. Respond ONLY with tool "
"calls, no prose. Execute the task now."
),
)
# Level 2b: conversation-aware quality check (if success_criteria set)
if ctx.node_spec.success_criteria and ctx.llm:
from framework.graph.conversation_judge import evaluate_phase_completion
verdict = await evaluate_phase_completion(
llm=ctx.llm,
conversation=conversation,
phase_name=ctx.node_spec.name,
phase_description=ctx.node_spec.description,
success_criteria=ctx.node_spec.success_criteria,
accumulator_state=accumulator.to_dict(),
max_history_tokens=self._config.max_history_tokens,
)
if verdict.action != "ACCEPT":
return JudgeVerdict(
action="RETRY",
feedback=(
f"Task incomplete. Required outputs not yet produced: {missing}. "
f"Follow your system prompt instructions to complete the work."
),
action=verdict.action,
feedback=verdict.feedback or "Phase criteria not met.",
)
# Tool calls were made -- continue loop
return JudgeVerdict(action="RETRY", feedback="")
return JudgeVerdict(action="ACCEPT")
# -------------------------------------------------------------------
# Helpers
@@ -2956,8 +3001,10 @@ class EventLoopNode(NodeProtocol):
def _is_stalled(self, recent_responses: list[str]) -> bool:
"""Detect stall using n-gram similarity.
Detects when N consecutive responses have similarity >= threshold.
This catches phrases like "I'm still stuck" vs "I'm stuck".
Detects when ALL N consecutive responses are mutually similar
(>= threshold). A single dissimilar response resets the signal.
This catches phrases like "I'm still stuck" vs "I'm stuck"
without false-positives on "attempt 1" vs "attempt 2".
"""
if len(recent_responses) < self._config.stall_detection_threshold:
return False
@@ -2965,13 +3012,11 @@ class EventLoopNode(NodeProtocol):
return False
threshold = self._config.stall_similarity_threshold
# Check similarity against all recent responses (excluding self)
for i, resp in enumerate(recent_responses):
# Compare against all previous responses
for prev in recent_responses[:i]:
if self._ngram_similarity(resp, prev) >= threshold:
return True
return False
# Every consecutive pair must be similar
for i in range(1, len(recent_responses)):
if self._ngram_similarity(recent_responses[i], recent_responses[i - 1]) < threshold:
return False
return True
@staticmethod
def _is_transient_error(exc: BaseException) -> bool:
@@ -3050,10 +3095,11 @@ class EventLoopNode(NodeProtocol):
self,
recent_tool_fingerprints: list[list[tuple[str, str]]],
) -> tuple[bool, str]:
"""Detect doom loop using n-gram similarity on tool inputs.
"""Detect doom loop via exact fingerprint match.
Detects when N consecutive turns have similar tool calls.
Similarity applies to the canonicalized tool input strings.
Detects when N consecutive turns invoke the same tools with
identical (canonicalized) arguments. Different arguments mean
different work, so only exact matches count.
Returns (is_doom_loop, description).
"""
@@ -3066,23 +3112,12 @@ class EventLoopNode(NodeProtocol):
if not first:
return False, ""
# Convert a turn's list of (name, args) pairs to a single comparable string.
def _turn_sig(fp: list[tuple[str, str]]) -> str:
return "|".join(f"{name}:{args}" for name, args in fp)
first_sig = _turn_sig(first)
similarity_threshold = self._config.stall_similarity_threshold
similar_count = sum(
1
for fp in recent_tool_fingerprints
if self._ngram_similarity(_turn_sig(fp), first_sig) >= similarity_threshold
)
if similar_count >= threshold:
tool_names = [name for fp in recent_tool_fingerprints for name, _ in fp]
# All turns in the window must match the first exactly
if all(fp == first for fp in recent_tool_fingerprints[1:]):
tool_names = [name for name, _ in first]
desc = (
f"Doom loop detected: {similar_count}/{len(recent_tool_fingerprints)} "
f"consecutive similar tool calls ({', '.join(tool_names)})"
f"Doom loop detected: {len(recent_tool_fingerprints)} "
f"identical consecutive tool calls ({', '.join(tool_names)})"
)
return True, desc
return False, ""
+1 -1
View File
@@ -1604,7 +1604,7 @@ class GraphExecutor:
# Return with paused status
return ExecutionResult(
success=False,
error="Execution paused by user",
error="Execution cancelled",
output=saved_memory,
steps_executed=steps,
total_tokens=total_tokens,
-95
View File
@@ -208,21 +208,6 @@ def register_commands(subparsers: argparse._SubParsersAction) -> None:
)
tui_parser.set_defaults(func=cmd_tui)
# code command (Hive Coder — framework agent builder)
code_parser = subparsers.add_parser(
"code",
help="Launch Hive Coder to build agents",
description="Interactive agent builder. Describe what you want and Hive Coder builds it.",
)
code_parser.add_argument(
"--model",
"-m",
type=str,
default=None,
help="LLM model to use (any LiteLLM-compatible name)",
)
code_parser.set_defaults(func=cmd_code)
# sessions command group (checkpoint/resume management)
sessions_parser = subparsers.add_parser(
"sessions",
@@ -1432,86 +1417,6 @@ def cmd_tui(args: argparse.Namespace) -> int:
return 0
def cmd_code(args: argparse.Namespace) -> int:
"""Launch Hive Coder with multi-graph support.
Unlike ``_launch_agent_tui``, this sets up graph lifecycle tools and
assigns ``graph_id="hive_coder"`` so the coder can load, supervise,
and restart secondary agent graphs within the same session.
"""
import logging
logging.basicConfig(level=logging.WARNING, format="%(message)s")
framework_agents_dir = _get_framework_agents_dir()
hive_coder_path = framework_agents_dir / "hive_coder"
if not (hive_coder_path / "agent.py").exists():
print("Error: Hive Coder agent not found.", file=sys.stderr)
return 1
# Ensure framework agents dir is on sys.path for import
fa_str = str(framework_agents_dir)
if fa_str not in sys.path:
sys.path.insert(0, fa_str)
from framework.credentials.models import CredentialError
from framework.runner import AgentRunner
from framework.tools.session_graph_tools import register_graph_tools
from framework.tui.app import AdenTUI
async def run_with_tui():
try:
runner = AgentRunner.load(hive_coder_path, model=args.model)
except CredentialError as e:
print(f"\n{e}", file=sys.stderr)
return
except Exception as e:
print(f"Error loading agent: {e}")
return
if runner._agent_runtime is None:
try:
runner._setup()
except CredentialError as e:
print(f"\n{e}", file=sys.stderr)
return
runtime = runner._agent_runtime
# -- Multi-graph setup --
# Tag the primary graph so events carry graph_id="hive_coder"
runtime._graph_id = "hive_coder"
runtime._active_graph_id = "hive_coder"
# Register graph lifecycle tools (load_agent, unload_agent, etc.)
register_graph_tools(runner._tool_registry, runtime)
# Refresh tool schemas AND executor so streams see the new tools.
# The executor closure references the registry dict by ref, but
# refreshing both is robust against any copy-on-read behavior.
runtime._tools = list(runner._tool_registry.get_tools().values())
runtime._tool_executor = runner._tool_registry.get_executor()
if not runtime.is_running:
await runtime.start()
app = AdenTUI(runtime)
try:
await app.run_async()
except Exception as e:
import traceback
traceback.print_exc()
print(f"TUI error: {e}")
await runner.cleanup_async()
asyncio.run(run_with_tui())
print("TUI session ended.")
return 0
def _extract_python_agent_metadata(agent_path: Path) -> tuple[str, str]:
"""Extract name and description from a Python-based agent's config.py.
+1 -1
View File
@@ -349,7 +349,7 @@ class AgentRuntime:
return
# Skip events originating from this graph's own
# executions (e.g. guardian should not fire on
# hive_coder failures — only secondary graphs).
# queen failures — only secondary graphs).
if _exclude_own and event.graph_id == self._graph_id:
return
ep_spec = self._entry_points.get(entry_point_id)
+2 -2
View File
@@ -123,7 +123,7 @@ class EventType(StrEnum):
# Custom events
CUSTOM = "custom"
# Escalation (agent requests handoff to hive_coder)
# Escalation (agent requests handoff to queen)
ESCALATION_REQUESTED = "escalation_requested"
# Worker health monitoring (judge → queen → operator)
@@ -976,7 +976,7 @@ class EventBus:
context: str = "",
execution_id: str | None = None,
) -> None:
"""Emit escalation requested event (agent wants hive_coder)."""
"""Emit escalation requested event (agent wants queen)."""
await self.publish(
AgentEvent(
type=EventType.ESCALATION_REQUESTED,
+12 -4
View File
@@ -240,6 +240,7 @@ class ExecutionStream:
self._active_executions: dict[str, ExecutionContext] = {}
self._execution_tasks: dict[str, asyncio.Task] = {}
self._active_executors: dict[str, GraphExecutor] = {}
self._cancel_reasons: dict[str, str] = {}
self._execution_results: OrderedDict[str, ExecutionResult] = OrderedDict()
self._execution_result_times: dict[str, float] = {}
self._completion_events: dict[str, asyncio.Event] = {}
@@ -464,7 +465,7 @@ class ExecutionStream:
node.signal_shutdown()
if hasattr(node, "cancel_current_turn"):
node.cancel_current_turn()
await self.cancel_execution(eid)
await self.cancel_execution(eid, reason="Restarted with new execution")
# When resuming, reuse the original session ID so the execution
# continues in the same session directory instead of creating a new one.
@@ -801,19 +802,20 @@ class ExecutionStream:
# Emit SSE event so the frontend knows the execution stopped.
# The executor does NOT emit on CancelledError, so there is no
# risk of double-emitting.
cancel_reason = self._cancel_reasons.pop(execution_id, "Execution cancelled")
if self._scoped_event_bus:
if has_result and result.paused_at:
await self._scoped_event_bus.emit_execution_paused(
stream_id=self.stream_id,
node_id=result.paused_at,
reason="Execution cancelled",
reason=cancel_reason,
execution_id=execution_id,
)
else:
await self._scoped_event_bus.emit_execution_failed(
stream_id=self.stream_id,
execution_id=execution_id,
error="Execution cancelled",
error=cancel_reason,
correlation_id=ctx.correlation_id,
)
@@ -1054,18 +1056,24 @@ class ExecutionStream:
"""Get execution context."""
return self._active_executions.get(execution_id)
async def cancel_execution(self, execution_id: str) -> bool:
async def cancel_execution(self, execution_id: str, *, reason: str | None = None) -> bool:
"""
Cancel a running execution.
Args:
execution_id: Execution to cancel
reason: Human-readable reason for the cancellation (e.g.
"Stopped by queen", "User requested pause"). If not
provided, defaults to "Execution cancelled".
Returns:
True if cancelled, False if not found
"""
task = self._execution_tasks.get(execution_id)
if task and not task.done():
# Store the reason so the CancelledError handler can use it
# when emitting the pause/fail event.
self._cancel_reasons[execution_id] = reason or "Execution cancelled"
task.cancel()
# Wait briefly for the task to finish. Don't block indefinitely —
# the task may be stuck in a long LLM API call that doesn't
+327
View File
@@ -0,0 +1,327 @@
"""Queen orchestrator — builds and runs the queen executor.
Extracted from SessionManager._start_queen() to keep session management
and queen orchestration concerns separate.
"""
from __future__ import annotations
import asyncio
import logging
from pathlib import Path
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from framework.server.session_manager import Session
logger = logging.getLogger(__name__)
async def create_queen(
session: Session,
session_manager: Any,
worker_identity: str | None,
queen_dir: Path,
initial_prompt: str | None = None,
) -> asyncio.Task:
"""Build the queen executor and return the running asyncio task.
Handles tool registration, phase-state initialization, prompt
composition, persona hook setup, graph preparation, and the queen
event loop.
"""
from framework.agents.queen.agent import (
queen_goal,
queen_graph as _queen_graph,
)
from framework.agents.queen.nodes import (
_QUEEN_BUILDING_TOOLS,
_QUEEN_PLANNING_TOOLS,
_QUEEN_RUNNING_TOOLS,
_QUEEN_STAGING_TOOLS,
_appendices,
_building_knowledge,
_planning_knowledge,
_queen_behavior_always,
_queen_behavior_building,
_queen_behavior_planning,
_queen_behavior_running,
_queen_behavior_staging,
_queen_identity_building,
_queen_identity_planning,
_queen_identity_running,
_queen_identity_staging,
_queen_phase_7,
_queen_style,
_queen_tools_building,
_queen_tools_planning,
_queen_tools_running,
_queen_tools_staging,
_shared_building_knowledge,
)
from framework.agents.queen.nodes.thinking_hook import select_expert_persona
from framework.graph.event_loop_node import HookContext, HookResult
from framework.graph.executor import GraphExecutor
from framework.runner.tool_registry import ToolRegistry
from framework.runtime.core import Runtime
from framework.runtime.event_bus import AgentEvent, EventType
from framework.tools.queen_lifecycle_tools import (
QueenPhaseState,
register_queen_lifecycle_tools,
)
hive_home = Path.home() / ".hive"
# ---- Tool registry ------------------------------------------------
queen_registry = ToolRegistry()
import framework.agents.queen as _queen_pkg
queen_pkg_dir = Path(_queen_pkg.__file__).parent
mcp_config = queen_pkg_dir / "mcp_servers.json"
if mcp_config.exists():
try:
queen_registry.load_mcp_config(mcp_config)
logger.info("Queen: loaded MCP tools from %s", mcp_config)
except Exception:
logger.warning("Queen: MCP config failed to load", exc_info=True)
# ---- Phase state --------------------------------------------------
initial_phase = "staging" if worker_identity else "planning"
phase_state = QueenPhaseState(phase=initial_phase, event_bus=session.event_bus)
session.phase_state = phase_state
# ---- Lifecycle tools (always registered) --------------------------
register_queen_lifecycle_tools(
queen_registry,
session=session,
session_id=session.id,
session_manager=session_manager,
manager_session_id=session.id,
phase_state=phase_state,
)
# ---- Monitoring tools (only when worker is loaded) ----------------
if session.worker_runtime:
from framework.tools.worker_monitoring_tools import register_worker_monitoring_tools
register_worker_monitoring_tools(
queen_registry,
session.event_bus,
session.worker_path,
stream_id="queen",
worker_graph_id=session.worker_runtime._graph_id,
)
queen_tools = list(queen_registry.get_tools().values())
queen_tool_executor = queen_registry.get_executor()
# ---- Partition tools by phase ------------------------------------
planning_names = set(_QUEEN_PLANNING_TOOLS)
building_names = set(_QUEEN_BUILDING_TOOLS)
staging_names = set(_QUEEN_STAGING_TOOLS)
running_names = set(_QUEEN_RUNNING_TOOLS)
registered_names = {t.name for t in queen_tools}
missing_building = building_names - registered_names
if missing_building:
logger.warning(
"Queen: %d/%d building tools NOT registered: %s",
len(missing_building),
len(building_names),
sorted(missing_building),
)
logger.info("Queen: registered tools: %s", sorted(registered_names))
phase_state.planning_tools = [t for t in queen_tools if t.name in planning_names]
phase_state.building_tools = [t for t in queen_tools if t.name in building_names]
phase_state.staging_tools = [t for t in queen_tools if t.name in staging_names]
phase_state.running_tools = [t for t in queen_tools if t.name in running_names]
# ---- Compose phase-specific prompts ------------------------------
_orig_node = _queen_graph.nodes[0]
if worker_identity is None:
worker_identity = (
"\n\n# Worker Profile\n"
"No worker agent loaded. You are operating independently.\n"
"Handle all tasks directly using your coding tools."
)
_planning_body = (
_queen_style
+ _shared_building_knowledge
+ _queen_tools_planning
+ _queen_behavior_always
+ _queen_behavior_planning
+ _planning_knowledge
+ worker_identity
)
phase_state.prompt_planning = _queen_identity_planning + _planning_body
_building_body = (
_queen_style
+ _shared_building_knowledge
+ _queen_tools_building
+ _queen_behavior_always
+ _queen_behavior_building
+ _building_knowledge
+ _queen_phase_7
+ _appendices
+ worker_identity
)
phase_state.prompt_building = _queen_identity_building + _building_body
phase_state.prompt_staging = (
_queen_identity_staging
+ _queen_style
+ _queen_tools_staging
+ _queen_behavior_always
+ _queen_behavior_staging
+ worker_identity
)
phase_state.prompt_running = (
_queen_identity_running
+ _queen_style
+ _queen_tools_running
+ _queen_behavior_always
+ _queen_behavior_running
+ worker_identity
)
# ---- Persona hook ------------------------------------------------
_session_llm = session.llm
_session_event_bus = session.event_bus
async def _persona_hook(ctx: HookContext) -> HookResult | None:
persona = await select_expert_persona(ctx.trigger or "", _session_llm)
if not persona:
return None
if _session_event_bus is not None:
await _session_event_bus.publish(
AgentEvent(
type=EventType.QUEEN_PERSONA_SELECTED,
stream_id="queen",
data={"persona": persona},
)
)
body = _planning_body if phase_state.phase == "planning" else _building_body
return HookResult(system_prompt=persona + "\n\n" + body)
# ---- Graph preparation -------------------------------------------
initial_prompt_text = phase_state.get_current_prompt()
registered_tool_names = set(queen_registry.get_tools().keys())
declared_tools = _orig_node.tools or []
available_tools = [t for t in declared_tools if t in registered_tool_names]
node_updates: dict = {
"system_prompt": initial_prompt_text,
}
if set(available_tools) != set(declared_tools):
missing = sorted(set(declared_tools) - registered_tool_names)
if missing:
logger.warning("Queen: tools not available: %s", missing)
node_updates["tools"] = available_tools
adjusted_node = _orig_node.model_copy(update=node_updates)
_queen_loop_config = {
**(_queen_graph.loop_config or {}),
"hooks": {"session_start": [_persona_hook]},
}
queen_graph = _queen_graph.model_copy(
update={"nodes": [adjusted_node], "loop_config": _queen_loop_config}
)
# ---- Queen event loop --------------------------------------------
queen_runtime = Runtime(hive_home / "queen")
async def _queen_loop():
try:
executor = GraphExecutor(
runtime=queen_runtime,
llm=session.llm,
tools=queen_tools,
tool_executor=queen_tool_executor,
event_bus=session.event_bus,
stream_id="queen",
storage_path=queen_dir,
loop_config=_queen_loop_config,
execution_id=session.id,
dynamic_tools_provider=phase_state.get_current_tools,
dynamic_prompt_provider=phase_state.get_current_prompt,
)
session.queen_executor = executor
# Wire inject_notification so phase switches notify the queen LLM
async def _inject_phase_notification(content: str) -> None:
node = executor.node_registry.get("queen")
if node is not None and hasattr(node, "inject_event"):
await node.inject_event(content)
phase_state.inject_notification = _inject_phase_notification
# Auto-switch to staging when worker execution finishes
async def _on_worker_done(event):
if event.stream_id == "queen":
return
if phase_state.phase == "running":
if event.type == EventType.EXECUTION_COMPLETED:
output = event.data.get("output", {})
output_summary = ""
if output:
for key, value in output.items():
val_str = str(value)
if len(val_str) > 200:
val_str = val_str[:200] + "..."
output_summary += f"\n {key}: {val_str}"
_out = output_summary or " (no output keys set)"
notification = (
"[WORKER_TERMINAL] Worker finished successfully.\n"
f"Output:{_out}\n"
"Report this to the user. "
"Ask if they want to continue with another run."
)
else: # EXECUTION_FAILED
error = event.data.get("error", "Unknown error")
notification = (
"[WORKER_TERMINAL] Worker failed.\n"
f"Error: {error}\n"
"Report this to the user and help them troubleshoot."
)
node = executor.node_registry.get("queen")
if node is not None and hasattr(node, "inject_event"):
await node.inject_event(notification)
await phase_state.switch_to_staging(source="auto")
session.event_bus.subscribe(
event_types=[EventType.EXECUTION_COMPLETED, EventType.EXECUTION_FAILED],
handler=_on_worker_done,
)
session_manager._subscribe_worker_handoffs(session, executor)
logger.info(
"Queen starting in %s phase with %d tools: %s",
phase_state.phase,
len(phase_state.get_current_tools()),
[t.name for t in phase_state.get_current_tools()],
)
result = await executor.execute(
graph=queen_graph,
goal=queen_goal,
input_data={"greeting": initial_prompt or "Session started."},
session_state={"resume_session_id": session.id},
)
if result.success:
logger.warning("Queen executor returned (should be forever-alive)")
else:
logger.error(
"Queen executor failed: %s",
result.error or "(no error message)",
)
except Exception:
logger.error("Queen conversation crashed", exc_info=True)
finally:
session.queen_executor = None
return asyncio.create_task(_queen_loop())
+6 -4
View File
@@ -347,7 +347,7 @@ async def handle_pause(request: web.Request) -> web.Response:
for exec_id in list(stream.active_execution_ids):
try:
ok = await stream.cancel_execution(exec_id)
ok = await stream.cancel_execution(exec_id, reason="Execution paused by user")
if ok:
cancelled.append(exec_id)
except Exception:
@@ -357,8 +357,8 @@ async def handle_pause(request: web.Request) -> web.Response:
runtime.pause_timers()
# Switch to staging (agent still loaded, ready to re-run)
if session.mode_state is not None:
await session.mode_state.switch_to_staging(source="frontend")
if session.phase_state is not None:
await session.phase_state.switch_to_staging(source="frontend")
return web.json_response(
{
@@ -400,7 +400,9 @@ async def handle_stop(request: web.Request) -> web.Response:
if hasattr(node, "cancel_current_turn"):
node.cancel_current_turn()
cancelled = await stream.cancel_execution(execution_id)
cancelled = await stream.cancel_execution(
execution_id, reason="Execution stopped by user"
)
if cancelled:
# Cancel queen's in-progress LLM turn
if session.queen_executor:
+1 -1
View File
@@ -61,7 +61,7 @@ def _session_to_live_dict(session) -> dict:
"loaded_at": session.loaded_at,
"uptime_seconds": round(time.time() - session.loaded_at, 1),
"intro_message": getattr(session.runner, "intro_message", "") or "",
"queen_phase": phase_state.phase if phase_state else "building",
"queen_phase": phase_state.phase if phase_state else "planning",
}
+45 -278
View File
@@ -46,6 +46,8 @@ class Session:
judge_task: asyncio.Task | None = None
escalation_sub: str | None = None
worker_handoff_sub: str | None = None
# Memory consolidation subscription (fires on CONTEXT_COMPACTED)
memory_consolidation_sub: str | None = None
# Session directory resumption:
# When set, _start_queen writes queen conversations to this existing session's
# directory instead of creating a new one. This lets cold-restores accumulate
@@ -325,9 +327,9 @@ class SessionManager:
model=model,
)
# Notify queen about the loaded worker (skip for hive_coder itself).
# Notify queen about the loaded worker (skip for queen itself).
# Health judge disabled for simplicity.
if agent_path.name != "hive_coder" and session.worker_runtime:
if agent_path.name != "queen" and session.worker_runtime:
# await self._start_judge(session, session.runner._storage_path)
await self._notify_queen_worker_loaded(session)
@@ -379,6 +381,11 @@ class SessionManager:
if session is None:
return False
# Capture session data for memory consolidation before teardown
_llm = getattr(session, "llm", None)
_storage_id = getattr(session, "queen_resume_from", None) or session_id
_session_dir = Path.home() / ".hive" / "queen" / "session" / _storage_id
# Stop judge
self._stop_judge(session)
if session.worker_handoff_sub is not None:
@@ -388,7 +395,13 @@ class SessionManager:
pass
session.worker_handoff_sub = None
# Stop queen
# Stop queen and memory consolidation subscription
if session.memory_consolidation_sub is not None:
try:
session.event_bus.unsubscribe(session.memory_consolidation_sub)
except Exception:
pass
session.memory_consolidation_sub = None
if session.queen_task is not None:
session.queen_task.cancel()
session.queen_task = None
@@ -401,6 +414,17 @@ class SessionManager:
except Exception as e:
logger.error("Error cleaning up worker: %s", e)
# Final memory consolidation — fire-and-forget so teardown isn't blocked.
if _llm is not None and _session_dir.exists():
import asyncio
from framework.agents.queen.queen_memory import consolidate_queen_memory
asyncio.create_task(
consolidate_queen_memory(session_id, _session_dir, _llm),
name=f"queen-memory-consolidation-{session_id}",
)
logger.info("Session '%s' stopped", session_id)
return True
@@ -461,13 +485,7 @@ class SessionManager:
are written to the ORIGINAL session's directory so the full conversation
history accumulates in one place across server restarts.
"""
from framework.agents.hive_coder.agent import (
queen_goal,
queen_graph as _queen_graph,
)
from framework.graph.executor import GraphExecutor
from framework.runner.tool_registry import ToolRegistry
from framework.runtime.core import Runtime
from framework.server.queen_orchestrator import create_queen
hive_home = Path.home() / ".hive"
@@ -505,284 +523,33 @@ class SessionManager:
except OSError:
pass
# Register MCP coding tools
queen_registry = ToolRegistry()
import framework.agents.hive_coder as _hive_coder_pkg
hive_coder_dir = Path(_hive_coder_pkg.__file__).parent
mcp_config = hive_coder_dir / "mcp_servers.json"
if mcp_config.exists():
try:
queen_registry.load_mcp_config(mcp_config)
logger.info("Queen: loaded MCP tools from %s", mcp_config)
except Exception:
logger.warning("Queen: MCP config failed to load", exc_info=True)
# Phase state for building/running phase switching
from framework.tools.queen_lifecycle_tools import (
QueenPhaseState,
register_queen_lifecycle_tools,
)
# Start in staging when the caller provided an agent, building otherwise.
initial_phase = "staging" if worker_identity else "building"
phase_state = QueenPhaseState(phase=initial_phase, event_bus=session.event_bus)
session.phase_state = phase_state
# Always register lifecycle tools — they check session.worker_runtime
# at call time, so they work even if no worker is loaded yet.
register_queen_lifecycle_tools(
queen_registry,
session.queen_task = await create_queen(
session=session,
session_id=session.id,
session_manager=self,
manager_session_id=session.id,
phase_state=phase_state,
worker_identity=worker_identity,
queen_dir=queen_dir,
initial_prompt=initial_prompt,
)
# Monitoring tools need concrete worker paths — only register when present
if session.worker_runtime:
from framework.tools.worker_monitoring_tools import register_worker_monitoring_tools
# Memory consolidation — triggered by context compaction events.
# Compaction is a natural signal that "enough has happened to be worth remembering".
_consolidation_llm = session.llm
_consolidation_session_dir = queen_dir
register_worker_monitoring_tools(
queen_registry,
session.event_bus,
session.worker_path,
stream_id="queen",
worker_graph_id=session.worker_runtime._graph_id,
async def _on_compaction(_event) -> None:
from framework.agents.queen.queen_memory import consolidate_queen_memory
await consolidate_queen_memory(
session.id, _consolidation_session_dir, _consolidation_llm
)
queen_tools = list(queen_registry.get_tools().values())
queen_tool_executor = queen_registry.get_executor()
from framework.runtime.event_bus import EventType as _ET
# Partition tools into phase-specific sets and import prompt segments
from framework.agents.hive_coder.nodes import (
_QUEEN_BUILDING_TOOLS,
_QUEEN_RUNNING_TOOLS,
_QUEEN_STAGING_TOOLS,
_appendices,
_gcu_building_section,
_package_builder_knowledge,
_queen_behavior_always,
_queen_behavior_building,
_queen_behavior_running,
_queen_behavior_staging,
_queen_identity_building,
_queen_identity_running,
_queen_identity_staging,
_queen_phase_7,
_queen_style,
_queen_tools_building,
_queen_tools_running,
_queen_tools_staging,
session.memory_consolidation_sub = session.event_bus.subscribe(
event_types=[_ET.CONTEXT_COMPACTED],
handler=_on_compaction,
)
building_names = set(_QUEEN_BUILDING_TOOLS)
staging_names = set(_QUEEN_STAGING_TOOLS)
running_names = set(_QUEEN_RUNNING_TOOLS)
registered_names = {t.name for t in queen_tools}
missing_building = building_names - registered_names
if missing_building:
logger.warning(
"Queen: %d/%d building tools NOT registered: %s",
len(missing_building),
len(building_names),
sorted(missing_building),
)
logger.info("Queen: registered tools: %s", sorted(registered_names))
phase_state.building_tools = [t for t in queen_tools if t.name in building_names]
phase_state.staging_tools = [t for t in queen_tools if t.name in staging_names]
phase_state.running_tools = [t for t in queen_tools if t.name in running_names]
# Build queen graph with adjusted prompt + tools
_orig_node = _queen_graph.nodes[0]
if worker_identity is None:
worker_identity = (
"\n\n# Worker Profile\n"
"No worker agent loaded. You are operating independently.\n"
"Handle all tasks directly using your coding tools."
)
# Compose phase-specific prompts.
_building_body = (
_queen_style
+ _queen_tools_building
+ _queen_behavior_always
+ _queen_behavior_building
+ _package_builder_knowledge
+ _gcu_building_section
+ _queen_phase_7
+ _appendices
+ worker_identity
)
phase_state.prompt_building = _queen_identity_building + _building_body
phase_state.prompt_staging = (
_queen_identity_staging
+ _queen_style
+ _queen_tools_staging
+ _queen_behavior_always
+ _queen_behavior_staging
+ worker_identity
)
phase_state.prompt_running = (
_queen_identity_running
+ _queen_style
+ _queen_tools_running
+ _queen_behavior_always
+ _queen_behavior_running
+ worker_identity
)
# Build the session_start hook: selects the best-fit expert persona
# from the user's opening message and replaces the identity prefix.
from framework.agents.hive_coder.nodes.thinking_hook import select_expert_persona
from framework.graph.event_loop_node import HookContext, HookResult
from framework.runtime.event_bus import AgentEvent, EventType
_session_llm = session.llm
_session_event_bus = session.event_bus
async def _persona_hook(ctx: HookContext) -> HookResult | None:
persona = await select_expert_persona(ctx.trigger or "", _session_llm)
if not persona:
return None
if _session_event_bus is not None:
await _session_event_bus.publish(
AgentEvent(
type=EventType.QUEEN_PERSONA_SELECTED,
stream_id="queen",
data={"persona": persona},
)
)
return HookResult(system_prompt=persona + "\n\n" + _building_body)
initial_prompt_text = phase_state.get_current_prompt()
registered_tool_names = set(queen_registry.get_tools().keys())
declared_tools = _orig_node.tools or []
available_tools = [t for t in declared_tools if t in registered_tool_names]
node_updates: dict = {
"system_prompt": initial_prompt_text,
}
if set(available_tools) != set(declared_tools):
missing = sorted(set(declared_tools) - registered_tool_names)
if missing:
logger.warning("Queen: tools not available: %s", missing)
node_updates["tools"] = available_tools
adjusted_node = _orig_node.model_copy(update=node_updates)
_queen_loop_config = {
**(_queen_graph.loop_config or {}),
"hooks": {"session_start": [_persona_hook]},
}
queen_graph = _queen_graph.model_copy(
update={"nodes": [adjusted_node], "loop_config": _queen_loop_config}
)
queen_runtime = Runtime(hive_home / "queen")
async def _queen_loop():
try:
executor = GraphExecutor(
runtime=queen_runtime,
llm=session.llm,
tools=queen_tools,
tool_executor=queen_tool_executor,
event_bus=session.event_bus,
stream_id="queen",
storage_path=queen_dir,
loop_config=_queen_loop_config,
execution_id=session.id,
dynamic_tools_provider=phase_state.get_current_tools,
dynamic_prompt_provider=phase_state.get_current_prompt,
)
session.queen_executor = executor
# Wire inject_notification so phase switches notify the queen LLM
async def _inject_phase_notification(content: str) -> None:
node = executor.node_registry.get("queen")
if node is not None and hasattr(node, "inject_event"):
await node.inject_event(content)
phase_state.inject_notification = _inject_phase_notification
# Auto-switch to staging when worker execution finishes naturally
# and notify the queen about the termination
from framework.runtime.event_bus import EventType as _ET
async def _on_worker_done(event):
if event.stream_id == "queen":
return
if phase_state.phase == "running":
# Build termination notification for the queen
if event.type == _ET.EXECUTION_COMPLETED:
output = event.data.get("output", {})
output_summary = ""
if output:
# Summarize key outputs for the queen
for key, value in output.items():
val_str = str(value)
if len(val_str) > 200:
val_str = val_str[:200] + "..."
output_summary += f"\n {key}: {val_str}"
_out = output_summary or " (no output keys set)"
notification = (
"[WORKER_TERMINAL] Worker finished successfully.\n"
f"Output:{_out}\n"
"Report this to the user. "
"Ask if they want to continue with another run."
)
else: # EXECUTION_FAILED
error = event.data.get("error", "Unknown error")
notification = (
"[WORKER_TERMINAL] Worker failed.\n"
f"Error: {error}\n"
"Report this to the user and help them troubleshoot."
)
# Inject notification to queen before phase switch
node = executor.node_registry.get("queen")
if node is not None and hasattr(node, "inject_event"):
await node.inject_event(notification)
await phase_state.switch_to_staging(source="auto")
session.event_bus.subscribe(
event_types=[_ET.EXECUTION_COMPLETED, _ET.EXECUTION_FAILED],
handler=_on_worker_done,
)
self._subscribe_worker_handoffs(session, executor)
logger.info(
"Queen starting in %s phase with %d tools: %s",
phase_state.phase,
len(phase_state.get_current_tools()),
[t.name for t in phase_state.get_current_tools()],
)
result = await executor.execute(
graph=queen_graph,
goal=queen_goal,
input_data={"greeting": initial_prompt or "Session started."},
session_state={"resume_session_id": session.id},
)
if result.success:
logger.warning("Queen executor returned (should be forever-alive)")
else:
logger.error(
"Queen executor failed: %s",
result.error or "(no error message)",
)
except Exception:
logger.error("Queen conversation crashed", exc_info=True)
finally:
session.queen_executor = None
session.queen_task = asyncio.create_task(_queen_loop())
# ------------------------------------------------------------------
# Judge startup / teardown
# ------------------------------------------------------------------
+291 -31
View File
@@ -71,12 +71,13 @@ class WorkerSessionAdapter:
class QueenPhaseState:
"""Mutable state container for queen operating phase.
Three phases: building staging running.
Four phases: planning building staging running.
Shared between the dynamic_tools_provider callback and tool handlers
that trigger phase transitions.
"""
phase: str = "building" # "building", "staging", or "running"
phase: str = "building" # "planning", "building", "staging", or "running"
planning_tools: list = field(default_factory=list) # list[Tool]
building_tools: list = field(default_factory=list) # list[Tool]
staging_tools: list = field(default_factory=list) # list[Tool]
running_tools: list = field(default_factory=list) # list[Tool]
@@ -84,12 +85,15 @@ class QueenPhaseState:
event_bus: Any = None # EventBus — for emitting QUEEN_PHASE_CHANGED events
# Phase-specific prompts (set by session_manager after construction)
prompt_planning: str = ""
prompt_building: str = ""
prompt_staging: str = ""
prompt_running: str = ""
def get_current_tools(self) -> list:
"""Return tools for the current phase."""
if self.phase == "planning":
return list(self.planning_tools)
if self.phase == "running":
return list(self.running_tools)
if self.phase == "staging":
@@ -98,6 +102,8 @@ class QueenPhaseState:
def get_current_prompt(self) -> str:
"""Return the system prompt for the current phase."""
if self.phase == "planning":
return self.prompt_planning
if self.phase == "running":
return self.prompt_running
if self.phase == "staging":
@@ -128,22 +134,15 @@ class QueenPhaseState:
tool_names = [t.name for t in self.running_tools]
logger.info("Queen phase → running (source=%s, tools: %s)", source, tool_names)
await self._emit_phase_event()
if self.inject_notification:
if source == "frontend":
msg = (
"[PHASE CHANGE] The user clicked Run in the UI. Switched to RUNNING phase. "
"Worker is now executing. You have monitoring/lifecycle tools: "
+ ", ".join(tool_names)
+ "."
)
else:
msg = (
"[PHASE CHANGE] Switched to RUNNING phase. "
"Worker is executing. You now have monitoring/lifecycle tools: "
+ ", ".join(tool_names)
+ "."
)
await self.inject_notification(msg)
# Skip notification when source="tool" — the tool result already
# contains the phase change info.
if self.inject_notification and source != "tool":
await self.inject_notification(
"[PHASE CHANGE] The user clicked Run in the UI. Switched to RUNNING phase. "
"Worker is now executing. You have monitoring/lifecycle tools: "
+ ", ".join(tool_names)
+ "."
)
async def switch_to_staging(self, source: str = "tool") -> None:
"""Switch to staging phase and notify the queen.
@@ -157,26 +156,21 @@ class QueenPhaseState:
tool_names = [t.name for t in self.staging_tools]
logger.info("Queen phase → staging (source=%s, tools: %s)", source, tool_names)
await self._emit_phase_event()
if self.inject_notification:
# Skip notification when source="tool" — the tool result already
# contains the phase change info.
if self.inject_notification and source != "tool":
if source == "frontend":
msg = (
"[PHASE CHANGE] The user stopped the worker from the UI. "
"Switched to STAGING phase. Agent is still loaded. "
"Available tools: " + ", ".join(tool_names) + "."
)
elif source == "auto":
else:
msg = (
"[PHASE CHANGE] Worker execution completed. Switched to STAGING phase. "
"Agent is still loaded. Call run_agent_with_input(task) to run again. "
"Available tools: " + ", ".join(tool_names) + "."
)
else:
msg = (
"[PHASE CHANGE] Switched to STAGING phase. "
"Agent loaded and ready. Call run_agent_with_input(task) to start, "
"or stop_worker_and_edit() to go back to building. "
"Available tools: " + ", ".join(tool_names) + "."
)
await self.inject_notification(msg)
async def switch_to_building(self, source: str = "tool") -> None:
@@ -191,13 +185,35 @@ class QueenPhaseState:
tool_names = [t.name for t in self.building_tools]
logger.info("Queen phase → building (source=%s, tools: %s)", source, tool_names)
await self._emit_phase_event()
if self.inject_notification:
if self.inject_notification and source != "tool":
await self.inject_notification(
"[PHASE CHANGE] Switched to BUILDING phase. "
"Lifecycle tools removed. Full coding tools restored. "
"Call load_built_agent(path) when ready to stage."
)
async def switch_to_planning(self, source: str = "tool") -> None:
"""Switch to planning phase and notify the queen.
Args:
source: Who triggered the switch "tool", "frontend", or "auto".
"""
if self.phase == "planning":
return
self.phase = "planning"
tool_names = [t.name for t in self.planning_tools]
logger.info("Queen phase → planning (source=%s, tools: %s)", source, tool_names)
await self._emit_phase_event()
# Skip notification when source="tool" — the tool result already
# contains the phase change info; injecting a duplicate notification
# causes the queen to respond twice.
if self.inject_notification and source != "tool":
await self.inject_notification(
"[PHASE CHANGE] Switched to PLANNING phase. "
"Coding tools removed. Discuss goals and design with the user. "
"Available tools: " + ", ".join(tool_names) + "."
)
def build_worker_profile(runtime: AgentRuntime, agent_path: Path | str | None = None) -> str:
"""Build a worker capability profile from its graph/goal definition.
@@ -423,7 +439,7 @@ def register_queen_lifecycle_tools(
# --- stop_worker ----------------------------------------------------------
async def stop_worker() -> str:
async def stop_worker(*, reason: str = "Stopped by queen") -> str:
"""Cancel all active worker executions across all graphs.
Stops the worker immediately. Returns the IDs of cancelled executions.
@@ -453,7 +469,7 @@ def register_queen_lifecycle_tools(
for exec_id in list(stream.active_execution_ids):
try:
ok = await stream.cancel_execution(exec_id)
ok = await stream.cancel_execution(exec_id, reason=reason)
if ok:
cancelled.append(exec_id)
except Exception as e:
@@ -498,6 +514,11 @@ def register_queen_lifecycle_tools(
"Use your coding tools to modify the agent, then call "
"load_built_agent(path) to stage it again."
)
# Nudge the queen to start coding instead of blocking for user input.
if phase_state is not None and phase_state.inject_notification:
await phase_state.inject_notification(
"[PHASE CHANGE] Switched to BUILDING phase. Start implementing the changes now."
)
return json.dumps(result)
_stop_edit_tool = Tool(
@@ -514,6 +535,171 @@ def register_queen_lifecycle_tools(
)
tools_registered += 1
# --- stop_worker_and_plan (Running/Staging → Planning) --------------------
async def stop_worker_and_plan() -> str:
"""Stop the worker and switch to planning phase for diagnosis."""
stop_result = await stop_worker()
# Switch to planning phase
if phase_state is not None:
await phase_state.switch_to_planning(source="tool")
result = json.loads(stop_result)
result["phase"] = "planning"
result["message"] = (
"Worker stopped. You are now in planning phase. "
"Diagnose the issue using read-only tools (checkpoints, logs, sessions), "
"discuss a fix plan with the user, then call "
"initialize_and_build_agent() to implement the fix."
)
return json.dumps(result)
_stop_plan_tool = Tool(
name="stop_worker_and_plan",
description=(
"Stop the worker and switch to planning phase for diagnosis. "
"Use this when you need to investigate an issue before fixing it. "
"After diagnosis, call initialize_and_build_agent() to switch to building."
),
parameters={"type": "object", "properties": {}},
)
registry.register(
"stop_worker_and_plan", _stop_plan_tool, lambda inputs: stop_worker_and_plan()
)
tools_registered += 1
# --- replan_agent (Building → Planning) -----------------------------------
async def replan_agent() -> str:
"""Switch from building back to planning phase.
Only use when the user explicitly asks to re-plan."""
if phase_state is not None:
if phase_state.phase != "building":
return json.dumps(
{"error": f"Cannot replan: currently in {phase_state.phase} phase."}
)
await phase_state.switch_to_planning(source="tool")
return json.dumps(
{
"status": "replanning",
"phase": "planning",
"message": (
"Switched to PLANNING phase. Coding tools removed. "
"Discuss the new design with the user."
),
}
)
_replan_tool = Tool(
name="replan_agent",
description=(
"Switch from building back to planning phase. "
"Only use when the user explicitly asks to re-plan or redesign the agent."
),
parameters={"type": "object", "properties": {}},
)
registry.register("replan_agent", _replan_tool, lambda inputs: replan_agent())
tools_registered += 1
# --- initialize_and_build_agent wrapper (Planning → Building) -------------
# With agent_name: scaffold a new agent via MCP tool, then switch to building.
# Without agent_name: just switch to building (for fixing an existing loaded agent).
_existing_init = registry._tools.get("initialize_and_build_agent")
if _existing_init is not None:
_orig_init_executor = _existing_init.executor
async def initialize_and_build_agent_wrapper(inputs: dict) -> str:
"""Wrapper: scaffold or just switch to building phase."""
agent_name = (inputs.get("agent_name") or "").strip()
# No agent_name → try to fall back to the session's current agent,
# or fail with actionable guidance.
if not agent_name:
# Try to resolve agent_name from the current session
fallback_path = getattr(session, "worker_path", None)
if fallback_path is not None:
agent_name = Path(fallback_path).name
else:
# Server path: check SessionManager
if session_manager is not None and manager_session_id:
srv_session = session_manager.get_session(manager_session_id)
if srv_session and getattr(srv_session, "worker_path", None):
fallback_path = srv_session.worker_path
agent_name = Path(fallback_path).name
if not agent_name:
return json.dumps(
{
"error": (
"No agent_name provided and no agent loaded in this session. "
"To fix: call list_agents() to find the agent name, then call "
"initialize_and_build_agent(agent_name='<name>') to scaffold it."
)
}
)
# Fall back succeeded — switch to building without scaffolding
logger.info(
"initialize_and_build_agent: no agent_name provided, "
"falling back to session agent '%s'",
agent_name,
)
if phase_state is not None:
await phase_state.switch_to_building(source="tool")
if phase_state.inject_notification:
await phase_state.inject_notification(
"[PHASE CHANGE] Switched to BUILDING phase. "
"Start implementing the fix now."
)
return json.dumps(
{
"status": "editing",
"phase": "building",
"agent_name": agent_name,
"warning": (
f"No agent_name provided — using session agent '{agent_name}'. "
f"Agent files are at exports/{agent_name}/."
),
"message": (
"Switched to BUILDING phase. Full coding tools restored. "
"Implement the fix, then call load_built_agent(path) to reload."
),
}
)
# Has agent_name → scaffold via MCP tool
result = _orig_init_executor(inputs)
# Handle both sync and async executors
if asyncio.iscoroutine(result) or asyncio.isfuture(result):
result = await result
# If result is a ToolResult, extract the text content
result_str = str(result)
if hasattr(result, "content"):
result_str = str(result.content)
try:
parsed = json.loads(result_str)
if parsed.get("success", True):
if phase_state is not None:
await phase_state.switch_to_building(source="tool")
# Inject a continuation message so the queen starts
# building immediately instead of blocking for user input.
if phase_state.inject_notification:
await phase_state.inject_notification(
"[PHASE CHANGE] Agent scaffolded and switched to BUILDING phase. "
"Start implementing the agent nodes now."
)
except (json.JSONDecodeError, KeyError, TypeError):
pass
return result_str
registry.register(
"initialize_and_build_agent",
_existing_init.tool,
lambda inputs: initialize_and_build_agent_wrapper(inputs),
)
# --- stop_worker (Running → Staging) -------------------------------------
async def stop_worker_to_staging() -> str:
@@ -1429,6 +1615,51 @@ def register_queen_lifecycle_tools(
if not resolved_path.exists():
return json.dumps({"error": f"Agent path does not exist: {agent_path}"})
# Pre-check: verify the module exports goal/nodes/edges before
# attempting the full load. This gives the queen an actionable
# error message instead of a cryptic ImportError or TypeError.
try:
import importlib
import sys as _sys
pkg_name = resolved_path.name
parent_dir = str(resolved_path.resolve().parent)
# Temporarily put parent on sys.path for import
if parent_dir not in _sys.path:
_sys.path.insert(0, parent_dir)
# Evict stale cached modules
stale = [n for n in _sys.modules if n == pkg_name or n.startswith(f"{pkg_name}.")]
for n in stale:
del _sys.modules[n]
mod = importlib.import_module(pkg_name)
missing_attrs = [
attr for attr in ("goal", "nodes", "edges") if getattr(mod, attr, None) is None
]
if missing_attrs:
return json.dumps(
{
"error": (
f"Agent module '{pkg_name}' is missing module-level "
f"attributes: {', '.join(missing_attrs)}. "
f"Fix: in {pkg_name}/__init__.py, add "
f"'from .agent import {', '.join(missing_attrs)}' "
f"so that 'import {pkg_name}' exposes them at package level."
)
}
)
except Exception as pre_err:
return json.dumps(
{
"error": (
f"Failed to import agent module '{resolved_path.name}': {pre_err}. "
f"Fix: ensure {resolved_path.name}/__init__.py exists and can be "
f"imported without errors (check syntax, missing dependencies, "
f"and relative imports)."
)
}
)
try:
updated_session = await session_manager.load_worker(
manager_session_id,
@@ -1436,7 +1667,36 @@ def register_queen_lifecycle_tools(
)
info = updated_session.worker_info
# Switch to staging phase after successful load
# Validate that all tools declared by nodes are registered
loaded_runtime = _get_runtime()
if loaded_runtime is not None:
available_tool_names = {t.name for t in loaded_runtime._tools}
missing_by_node: dict[str, list[str]] = {}
for node in loaded_runtime.graph.nodes:
if node.tools:
missing = set(node.tools) - available_tool_names
if missing:
missing_by_node[f"{node.name} (id={node.id})"] = sorted(missing)
if missing_by_node:
# Unload the broken worker
try:
await session_manager.unload_worker(manager_session_id)
except Exception:
pass
details = "; ".join(
f"Node '{k}' missing {v}" for k, v in missing_by_node.items()
)
return json.dumps(
{
"error": (
f"Tool validation failed: {details}. "
"Fix node tool declarations or add the missing "
"tools, then try loading again."
)
}
)
# Switch to staging phase after successful load + validation
if phase_state is not None:
await phase_state.switch_to_staging()
@@ -0,0 +1,38 @@
"""Tool for the queen to write to her episodic memory.
The queen can consciously record significant moments during a session like
writing in a diary. Semantic memory (MEMORY.md) is updated automatically at
session end and is never written by the queen directly.
"""
from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from framework.runner.tool_registry import ToolRegistry
def write_to_diary(entry: str) -> str:
"""Write a prose entry to today's episodic memory.
Use this when something significant just happened: a pipeline went live, the
user shared an important preference, a goal was achieved or abandoned, or
you want to record something that should be remembered across sessions.
Write in first person, as you would in a private diary. Be specific what
happened, how the user responded, what it means going forward. One or two
paragraphs is enough.
You do not need to include a timestamp or date heading; those are added
automatically.
"""
from framework.agents.queen.queen_memory import append_episodic_entry
append_episodic_entry(entry)
return "Diary entry recorded."
def register_queen_memory_tools(registry: ToolRegistry) -> None:
"""Register the episodic memory tool into the queen's tool registry."""
registry.register_function(write_to_diary)
+1 -1
View File
@@ -1,6 +1,6 @@
"""Graph lifecycle tools for multi-graph sessions.
These tools allow an agent (e.g. hive_coder) to load, unload, start,
These tools allow an agent (e.g. queen) to load, unload, start,
restart, and query other agent graphs within the same runtime session.
Usage::
+9 -9
View File
@@ -445,8 +445,8 @@ class AdenTUI(App):
agent_name = runner.agent_path.name
self.notify(f"Agent loaded: {agent_name}", severity="information", timeout=3)
# Load health judge + queen for worker agents (skip for hive_coder itself)
if agent_name != "hive_coder":
# Load health judge + queen for worker agents (skip for queen itself)
if agent_name != "queen":
await self._load_judge_and_queen(runner._storage_path)
async def _load_judge_and_queen(self, storage_path) -> None:
@@ -515,18 +515,18 @@ class AdenTUI(App):
# worker. Escalation tickets from the judge are injected
# as messages into this conversation.
# ---------------------------------------------------------------
import framework.agents.hive_coder as _hive_coder_pkg
from framework.agents.hive_coder.agent import queen_goal, queen_graph
import framework.agents.queen as _queen_pkg
from framework.agents.queen.agent import queen_goal, queen_graph
# Queen gets lifecycle tools, monitoring tools, AND coding tools
# from the hive_coder's coder-tools MCP server. This spawns a
# from the queen's coder-tools MCP server. This spawns a
# separate MCP process so the queen can read/write files, run
# commands, discover tools, etc. independently of the worker.
queen_registry = ToolRegistry()
# Coding tools from hive_coder's MCP config (coder_tools_server).
hive_coder_dir = Path(_hive_coder_pkg.__file__).parent
mcp_config = hive_coder_dir / "mcp_servers.json"
# Coding tools from queen's MCP config (coder_tools_server).
queen_dir = Path(_queen_pkg.__file__).parent
mcp_config = queen_dir / "mcp_servers.json"
if mcp_config.exists():
try:
queen_registry.load_mcp_config(mcp_config)
@@ -556,7 +556,7 @@ class AdenTUI(App):
queen_tool_executor = queen_registry.get_executor()
# Partition tools into phase-specific sets
from framework.agents.hive_coder.nodes import (
from framework.agents.queen.nodes import (
_QUEEN_BUILDING_TOOLS,
_QUEEN_RUNNING_TOOLS,
_QUEEN_STAGING_TOOLS,
+2 -2
View File
@@ -12,8 +12,8 @@ export interface LiveSession {
loaded_at: number;
uptime_seconds: number;
intro_message?: string;
/** Queen operating phase — "building", "staging", or "running" */
queen_phase?: "building" | "staging" | "running";
/** Queen operating phase — "planning", "building", "staging", or "running" */
queen_phase?: "planning" | "building" | "staging" | "running";
/** Present in 409 conflict responses when worker is still loading */
loading?: boolean;
}
+2 -2
View File
@@ -31,7 +31,7 @@ interface AgentGraphProps {
version?: string;
runState?: RunState;
building?: boolean;
queenPhase?: "building" | "staging" | "running";
queenPhase?: "planning" | "building" | "staging" | "running";
}
// --- Extracted RunButton so hover state survives parent re-renders ---
@@ -278,7 +278,7 @@ export default function AgentGraph({ nodes, title: _title, onNodeClick, onRun, o
</span>
)}
</div>
<RunButton runState={runState} disabled={nodes.length === 0 || queenPhase === "building"} onRun={handleRun} onPause={onPause ?? (() => {})} btnRef={runBtnRef} />
<RunButton runState={runState} disabled={nodes.length === 0 || queenPhase === "building" || queenPhase === "planning"} onRun={handleRun} onPause={onPause ?? (() => {})} btnRef={runBtnRef} />
</div>
<div className="flex-1 flex items-center justify-center px-5">
{building ? (
+5 -3
View File
@@ -39,7 +39,7 @@ interface ChatPanelProps {
/** Called when user dismisses the pending question without answering */
onQuestionDismiss?: () => void;
/** Queen operating phase — shown as a tag on queen messages */
queenPhase?: "building" | "staging" | "running";
queenPhase?: "planning" | "building" | "staging" | "running";
}
const queenColor = "hsl(45,95%,58%)";
@@ -144,7 +144,7 @@ function ToolActivityRow({ content }: { content: string }) {
);
}
const MessageBubble = memo(function MessageBubble({ msg, queenPhase }: { msg: ChatMessage; queenPhase?: "building" | "staging" | "running" }) {
const MessageBubble = memo(function MessageBubble({ msg, queenPhase }: { msg: ChatMessage; queenPhase?: "planning" | "building" | "staging" | "running" }) {
const isUser = msg.type === "user";
const isQueen = msg.role === "queen";
const color = getColor(msg.agent, msg.role);
@@ -204,7 +204,9 @@ const MessageBubble = memo(function MessageBubble({ msg, queenPhase }: { msg: Ch
? "running phase"
: queenPhase === "staging"
? "staging phase"
: "building phase"
: queenPhase === "planning"
? "planning phase"
: "building phase"
: "Worker"}
</span>
</div>
+2 -1
View File
@@ -121,7 +121,8 @@ export function sseEventToChatMessage(
id: `paused-${event.execution_id}`,
agent: "System",
agentColor: "",
content: "Execution paused by user",
content:
(event.data?.reason as string) || "Execution paused",
timestamp: "",
type: "system",
thread,
+9 -6
View File
@@ -255,8 +255,8 @@ interface AgentBackendState {
/** The message ID of the current worker input request (for inline reply box) */
workerInputMessageId: string | null;
queenBuilding: boolean;
/** Queen operating phase — "building" (coding), "staging" (loaded), or "running" (executing) */
queenPhase: "building" | "staging" | "running";
/** Queen operating phase — "planning" (design), "building" (coding), "staging" (loaded), or "running" (executing) */
queenPhase: "planning" | "building" | "staging" | "running";
workerRunState: "idle" | "deploying" | "running";
currentExecutionId: string | null;
nodeLogs: Record<string, string[]>;
@@ -291,7 +291,7 @@ function defaultAgentState(): AgentBackendState {
awaitingInput: false,
workerInputMessageId: null,
queenBuilding: false,
queenPhase: "building",
queenPhase: "planning",
workerRunState: "idle",
currentExecutionId: null,
nodeLogs: {},
@@ -892,7 +892,7 @@ export default function Workspace() {
// failed, the throw inside the catch exits the outer try block.
const session = liveSession!;
const displayName = formatAgentDisplayName(session.worker_name || agentType);
const initialPhase = session.queen_phase || (session.has_worker ? "staging" : "building");
const initialPhase = session.queen_phase || (session.has_worker ? "staging" : "planning");
updateAgentState(agentType, {
sessionId: session.session_id,
displayName,
@@ -1788,8 +1788,11 @@ export default function Workspace() {
case "queen_phase_changed": {
const rawPhase = event.data?.phase as string;
const newPhase: "building" | "staging" | "running" =
rawPhase === "running" ? "running" : rawPhase === "staging" ? "staging" : "building";
const newPhase: "planning" | "building" | "staging" | "running" =
rawPhase === "running" ? "running"
: rawPhase === "staging" ? "staging"
: rawPhase === "planning" ? "planning"
: "building";
updateAgentState(agentType, {
queenPhase: newPhase,
queenBuilding: newPhase === "building",
+31 -8
View File
@@ -763,7 +763,7 @@ class TestClientFacingBlocking:
class TestEscalate:
@pytest.mark.asyncio
async def test_escalate_emits_event(self, runtime, node_spec, memory):
"""escalate() should publish ESCALATION_REQUESTED."""
"""escalate() should publish ESCALATION_REQUESTED and block for queen guidance."""
node_spec.output_keys = []
llm = MockStreamingLLM(
scenarios=[
@@ -772,7 +772,6 @@ class TestEscalate:
{
"reason": "tool failure",
"context": "HTTP 401 from upstream",
"wait_for_response": False,
},
tool_use_id="escalate_1",
),
@@ -789,7 +788,20 @@ class TestEscalate:
ctx = build_ctx(runtime, node_spec, memory, llm, stream_id="worker")
node = EventLoopNode(event_bus=bus, config=LoopConfig(max_iterations=5))
async def queen_reply():
await asyncio.sleep(0.05)
await node.inject_event("Acknowledged, proceed.")
task = asyncio.create_task(queen_reply())
async def queen_reply():
await asyncio.sleep(0.05)
await node.inject_event("Acknowledged, proceed.")
task = asyncio.create_task(queen_reply())
result = await node.execute(ctx)
await task
assert result.success is True
assert len(received) == 1
@@ -808,7 +820,6 @@ class TestEscalate:
{
"reason": "blocked",
"context": "dependency missing",
"wait_for_response": False,
},
tool_use_id="escalate_1",
),
@@ -827,7 +838,14 @@ class TestEscalate:
ctx = build_ctx(runtime, node_spec, memory, llm, stream_id="worker")
node = EventLoopNode(event_bus=bus, config=LoopConfig(max_iterations=5))
async def queen_reply():
await asyncio.sleep(0.05)
await node.inject_event("Queen acknowledges escalation.")
task = asyncio.create_task(queen_reply())
result = await node.execute(ctx)
await task
assert result.success is True
queen_node.inject_event.assert_awaited_once()
@@ -842,7 +860,7 @@ class TestEscalate:
@pytest.mark.asyncio
async def test_escalate_waits_for_queen_input_and_skips_judge(self, runtime, node_spec, memory):
"""wait_for_response=true should block for queen input before judge evaluation."""
"""escalate() should block for queen input before judge evaluation."""
node_spec.output_keys = ["result"]
llm = MockStreamingLLM(
scenarios=[
@@ -851,7 +869,6 @@ class TestEscalate:
{
"reason": "need direction",
"context": "conflicting constraints",
"wait_for_response": True,
},
tool_use_id="escalate_1",
),
@@ -1756,9 +1773,9 @@ class TestIsToolDoomLoop:
def test_different_args_no_doom(self):
node = EventLoopNode(config=LoopConfig(tool_doom_loop_threshold=3))
fp1 = [("search", '{"q": "a"}')]
fp2 = [("search", '{"q": "b"}')]
fp3 = [("search", '{"q": "c"}')]
fp1 = [("search", '{"q": "deploy kubernetes cluster to production"}')]
fp2 = [("read_file", '{"path": "/etc/nginx/nginx.conf"}')]
fp3 = [("execute", '{"command": "SELECT * FROM users WHERE active=true"}')]
is_doom, _ = node._is_tool_doom_loop([fp1, fp2, fp3])
assert is_doom is False
@@ -1886,6 +1903,7 @@ class TestToolDoomLoopIntegration:
config=LoopConfig(
max_iterations=10,
tool_doom_loop_threshold=3,
stall_similarity_threshold=1.0, # disable fuzzy stall detection
),
)
result = await node.execute(ctx)
@@ -1941,6 +1959,7 @@ class TestToolDoomLoopIntegration:
config=LoopConfig(
max_iterations=10,
tool_doom_loop_threshold=3,
stall_similarity_threshold=1.0, # disable fuzzy stall detection
),
)
result = await node.execute(ctx)
@@ -2005,6 +2024,7 @@ class TestToolDoomLoopIntegration:
config=LoopConfig(
max_iterations=10,
tool_doom_loop_threshold=3,
stall_similarity_threshold=1.0, # disable fuzzy stall detection
),
)
result = await node.execute(ctx)
@@ -2056,6 +2076,7 @@ class TestToolDoomLoopIntegration:
config=LoopConfig(
max_iterations=10,
tool_doom_loop_enabled=False,
stall_similarity_threshold=1.0, # disable fuzzy stall detection
),
)
result = await node.execute(ctx)
@@ -2144,6 +2165,7 @@ class TestToolDoomLoopIntegration:
config=LoopConfig(
max_iterations=10,
tool_doom_loop_threshold=3,
stall_similarity_threshold=1.0, # disable fuzzy stall detection
),
)
result = await node.execute(ctx)
@@ -2206,6 +2228,7 @@ class TestToolDoomLoopIntegration:
config=LoopConfig(
max_iterations=10,
tool_doom_loop_threshold=3,
stall_similarity_threshold=1.0, # disable fuzzy stall detection
),
)
result = await node.execute(ctx)
+1 -1
View File
@@ -23,7 +23,7 @@ Done. For details, prerequisites, and troubleshooting, read on.
## What you get after setup
- **coder-tools** Create and manage agents (scaffolding via `initialize_agent_package`, file I/O, tool discovery).
- **coder-tools** Create and manage agents (scaffolding via `initialize_and_build_agent`, file I/O, tool discovery).
- **tools** File operations, web search, and other agent tools.
- **Documentation** Guided docs for building and testing agents.
+1 -1
View File
@@ -130,7 +130,7 @@ MCP (Model Context Protocol) servers are configured in `.mcp.json` at the projec
}
```
The `coder-tools` server provides agent scaffolding via `initialize_agent_package` and related tools. The `tools` MCP server exposes tools including web search, PDF reading, CSV processing, and file system operations.
The `coder-tools` server provides agent scaffolding via `initialize_and_build_agent` and related tools. The `tools` MCP server exposes tools including web search, PDF reading, CSV processing, and file system operations.
## Storage
+3 -3
View File
@@ -244,7 +244,7 @@ The fastest way to build agents is with the configured MCP workflow:
./quickstart.sh
# Build a new agent
Use the coder-tools MCP tools from your IDE agent chat (e.g., initialize_agent_package)
Use the coder-tools MCP tools from your IDE agent chat (e.g., initialize_and_build_agent)
```
### Agent Development Workflow
@@ -252,7 +252,7 @@ Use the coder-tools MCP tools from your IDE agent chat (e.g., initialize_agent_p
1. **Define Your Goal**
```
Use the coder-tools initialize_agent_package tool
Use the coder-tools initialize_and_build_agent tool
Enter goal: "Build an agent that processes customer support tickets"
```
@@ -555,7 +555,7 @@ uv add <package>
```bash
# Option 1: Use Claude Code skill (recommended)
Use the coder-tools initialize_agent_package tool
Use the coder-tools initialize_and_build_agent tool
# Option 2: Create manually
# Note: exports/ is initially empty (gitignored). Create your agent directory:
+2 -2
View File
@@ -180,7 +180,7 @@ MCP tools are also available in Cursor. To enable:
**Claude Code:**
```
Use the coder-tools initialize_agent_package tool to scaffold a new agent
Use the coder-tools initialize_and_build_agent tool to scaffold a new agent
```
**Codex CLI:**
@@ -453,7 +453,7 @@ This design allows agents in `exports/` to be:
### 2. Build Agent (Claude Code)
```
Use the coder-tools initialize_agent_package tool
Use the coder-tools initialize_and_build_agent tool
Enter goal: "Build an agent that processes customer support tickets"
```
+2 -2
View File
@@ -47,7 +47,7 @@ This is the recommended way to create your first agent.
# Setup already done via quickstart.sh above
# Start Claude Code and build an agent
Use the coder-tools initialize_agent_package tool
Use the coder-tools initialize_and_build_agent tool
```
Follow the interactive prompts to:
@@ -173,7 +173,7 @@ PYTHONPATH=exports uv run python -m my_agent test --type success
1. **Dashboard**: Run `hive open` to launch the web dashboard, or `hive tui` for the terminal UI
2. **Detailed Setup**: See [environment-setup.md](./environment-setup.md)
3. **Developer Guide**: See [developer-guide.md](./developer-guide.md)
4. **Build Agents**: Use the coder-tools `initialize_agent_package` tool in Claude Code
4. **Build Agents**: Use the coder-tools `initialize_and_build_agent` tool in Claude Code
5. **Custom Tools**: Learn to integrate MCP servers
6. **Join Community**: [Discord](https://discord.com/invite/MXE49hrKDk)
+1 -1
View File
@@ -22,7 +22,7 @@ template_name/
### Option 1: Build from template (recommended)
Use the `coder-tools` `initialize_agent_package` tool and select "From a template" to interactively pick a template, customize the goal/nodes/graph, and export a new agent.
Use the `coder-tools` `initialize_and_build_agent` tool and select "From a template" to interactively pick a template, customize the goal/nodes/graph, and export a new agent.
### Option 2: Manual copy
@@ -204,8 +204,8 @@ class DeepResearchAgent:
"""Set up the executor with all components."""
from pathlib import Path
storage_path = Path.home() / ".hive" / "agents" / "deep_research_agent"
storage_path.mkdir(parents=True, exist_ok=True)
self._storage_path = Path.home() / ".hive" / "agents" / "deep_research_agent"
self._storage_path.mkdir(parents=True, exist_ok=True)
self._tool_registry = ToolRegistry()
@@ -2,8 +2,13 @@
"hive-tools": {
"transport": "stdio",
"command": "uv",
"args": ["run", "python", "mcp_server.py", "--stdio"],
"args": [
"run",
"python",
"mcp_server.py",
"--stdio"
],
"cwd": "../../../tools",
"description": "Hive tools MCP server providing web_search, web_scrape, and write_to_file"
}
}
}
@@ -11,26 +11,32 @@ intake_node = NodeSpec(
node_type="event_loop",
client_facing=True,
max_node_visits=0,
input_keys=["topic"],
input_keys=["user_request"],
output_keys=["research_brief"],
success_criteria=(
"The research brief is specific and actionable: it states the topic, "
"the key questions to answer, the desired scope, and depth."
),
system_prompt="""\
You are a research intake specialist. The user wants to research a topic.
Have a brief conversation to clarify what they need.
You are a research intake specialist. Your ONLY job is to have a brief conversation with the user to clarify what they want researched.
**CRITICAL: You do NOT do any research yourself.**
- You do NOT search the web
- You do NOT fetch sources
- The research happens in the NEXT stage after you complete intake
- Do NOT ask for or expect web_search or web_scrape tools
**STEP 1 Read and respond (text only, NO tool calls):**
1. Read the topic provided
2. If it's vague, ask 1-2 clarifying questions (scope, angle, depth)
1. Read the user_request provided
2. If it's vague, ask 1-2 clarifying questions (scope, angle, depth, budget, preferences)
3. If it's already clear, confirm your understanding and ask the user to confirm
Keep it short. Don't over-ask.
Keep it short. Don't over-ask. Maximum 2 clarifying questions.
**STEP 2 After the user confirms, call set_output:**
- set_output("research_brief", "A clear paragraph describing exactly what to research, \
what questions to answer, what scope to cover, and how deep to go.")
- set_output("research_brief", "A clear paragraph describing exactly what to research, what questions to answer, what scope to cover, and how deep to go.")
That's it. Once you call set_output, your job is done and the research node will take over.
""",
tools=[],
)
@@ -59,6 +65,8 @@ If feedback is provided, this is a follow-up round — focus on the gaps identif
Work in phases:
1. **Search**: Use web_search with 3-5 diverse queries covering different angles.
Prioritize authoritative sources (.edu, .gov, established publications).
For automotive research, target: caranddriver.com, motortrend.com, edmunds.com,
consumerreports.org, jdpower.com, and enthusiast forums.
2. **Fetch**: Use web_scrape on the most promising URLs (aim for 5-8 sources).
Skip URLs that fail. Extract the substantive content.
3. **Analyze**: Review what you've collected. Identify key findings, themes,
@@ -116,16 +116,39 @@ customize_node = NodeSpec(
"for each selected job, saved as HTML, and Gmail drafts created in user's inbox."
),
system_prompt="""\
You are a career coach creating personalized application materials.
You are a career coach creating personalized application materials and Gmail drafts.
**CRITICAL: You MUST create Gmail drafts for each selected job using gmail_create_draft.**
**PROCESS:**
1. Create application_materials.html using save_data and append_data.
2. Generate resume customization list and professional cold email for each selected job.
3. Serve the file to the user.
4. Create Gmail drafts using gmail_create_draft.
2. For each selected job:
a. Generate a specific resume customization list
b. Create a professional cold outreach email
c. **IMMEDIATELY call gmail_create_draft** with:
- to: hiring manager or recruiter email (if available) or company email
- subject: "Application for [Job Title] - [Your Name]"
- html: the professional cold email in HTML format
3. Serve the application_materials.html file to the user.
4. Confirm each Gmail draft was created successfully.
**EMAIL REQUIREMENTS:**
- Professional, personalized cold outreach email
- Reference specific company details and role
- Mention 2-3 relevant qualifications from their resume
- Include clear call-to-action
- Professional email signature
- Format as HTML with proper structure
**Gmail Draft Creation:**
For each job, you MUST call gmail_create_draft(to="[email]", subject="[subject]", html="[email_html]")
- Extract company email from job listing if available
- Use generic format like "careers@[company].com" if no specific email
- Subject format: "Application for [Job Title] - [Applicant Name]"
- HTML email body with proper formatting
**FINISH:**
Call set_output("application_materials", "Completed")
Only call set_output("application_materials", "Completed") AFTER creating ALL Gmail drafts.
""",
tools=["save_data", "append_data", "serve_file_to_user", "gmail_create_draft"],
)
+65 -39
View File
@@ -1,7 +1,7 @@
#!/usr/bin/env python
"""Debug tool to print the queen's running phase prompt."""
"""Debug tool to print the queen's phase-specific prompts."""
from framework.agents.hive_coder.nodes import (
from framework.agents.queen.nodes import (
_appendices,
_queen_behavior_always,
_queen_behavior_running,
@@ -10,32 +10,36 @@ from framework.agents.hive_coder.nodes import (
_queen_tools_running,
)
_DEFAULT_WORKER_IDENTITY = (
"\n\n# Worker Profile\n"
"No worker agent loaded. You are operating independently.\n"
"Handle all tasks directly using your coding tools."
)
def print_running_prompt(worker_identity: str | None = None) -> None:
"""Print the composed running phase prompt.
Args:
worker_identity: Optional worker identity string. If None, shows
the "no worker loaded" placeholder.
"""
if worker_identity is None:
worker_identity = (
"\n\n# Worker Profile\n"
"No worker agent loaded. You are operating independently.\n"
"Handle all tasks directly using your coding tools."
)
def print_planning_prompt(worker_identity: str | None = None) -> None:
"""Print the composed planning phase prompt."""
from framework.agents.queen.nodes import (
_planning_knowledge,
_queen_behavior_planning,
_queen_identity_planning,
_queen_tools_planning,
)
wi = worker_identity or _DEFAULT_WORKER_IDENTITY
prompt = (
_queen_identity_running
_queen_identity_planning
+ _queen_style
+ _queen_tools_running
+ _queen_tools_planning
+ _queen_behavior_always
+ _queen_behavior_running
+ worker_identity
+ _queen_behavior_planning
+ _planning_knowledge
+ wi
)
print("=" * 80)
print("QUEEN RUNNING PHASE PROMPT")
print("QUEEN PLANNING PHASE PROMPT")
print("=" * 80)
print(prompt)
print("=" * 80)
@@ -44,20 +48,16 @@ def print_running_prompt(worker_identity: str | None = None) -> None:
def print_building_prompt(worker_identity: str | None = None) -> None:
"""Print the composed building phase prompt."""
from framework.agents.hive_coder.nodes import (
_agent_builder_knowledge,
from framework.agents.queen.nodes import (
_building_knowledge,
_gcu_building_section,
_queen_behavior_building,
_queen_identity_building,
_queen_phase_7,
_queen_tools_building,
)
if worker_identity is None:
worker_identity = (
"\n\n# Worker Profile\n"
"No worker agent loaded. You are operating independently.\n"
"Handle all tasks directly using your coding tools."
)
wi = worker_identity or _DEFAULT_WORKER_IDENTITY
prompt = (
_queen_identity_building
@@ -65,10 +65,11 @@ def print_building_prompt(worker_identity: str | None = None) -> None:
+ _queen_tools_building
+ _queen_behavior_always
+ _queen_behavior_building
+ _agent_builder_knowledge
+ _building_knowledge
+ _gcu_building_section
+ _queen_phase_7
+ _appendices
+ worker_identity
+ wi
)
print("=" * 80)
@@ -81,18 +82,13 @@ def print_building_prompt(worker_identity: str | None = None) -> None:
def print_staging_prompt(worker_identity: str | None = None) -> None:
"""Print the composed staging phase prompt."""
from framework.agents.hive_coder.nodes import (
from framework.agents.queen.nodes import (
_queen_behavior_staging,
_queen_identity_staging,
_queen_tools_staging,
)
if worker_identity is None:
worker_identity = (
"\n\n# Worker Profile\n"
"No worker agent loaded. You are operating independently.\n"
"Handle all tasks directly using your coding tools."
)
wi = worker_identity or _DEFAULT_WORKER_IDENTITY
prompt = (
_queen_identity_staging
@@ -100,7 +96,7 @@ def print_staging_prompt(worker_identity: str | None = None) -> None:
+ _queen_tools_staging
+ _queen_behavior_always
+ _queen_behavior_staging
+ worker_identity
+ wi
)
print("=" * 80)
@@ -111,17 +107,47 @@ def print_staging_prompt(worker_identity: str | None = None) -> None:
print(f"\nTotal length: {len(prompt):,} characters")
def print_running_prompt(worker_identity: str | None = None) -> None:
"""Print the composed running phase prompt.
Args:
worker_identity: Optional worker identity string. If None, shows
the "no worker loaded" placeholder.
"""
wi = worker_identity or _DEFAULT_WORKER_IDENTITY
prompt = (
_queen_identity_running
+ _queen_style
+ _queen_tools_running
+ _queen_behavior_always
+ _queen_behavior_running
+ wi
)
print("=" * 80)
print("QUEEN RUNNING PHASE PROMPT")
print("=" * 80)
print(prompt)
print("=" * 80)
print(f"\nTotal length: {len(prompt):,} characters")
if __name__ == "__main__":
import sys
phase = sys.argv[1] if len(sys.argv) > 1 else "running"
phase = sys.argv[1] if len(sys.argv) > 1 else "planning"
if phase == "all":
print_planning_prompt()
print("\n\n")
print_building_prompt()
print("\n\n")
print_staging_prompt()
print("\n\n")
print_running_prompt()
elif phase == "planning":
print_planning_prompt()
elif phase == "building":
print_building_prompt()
elif phase == "staging":
@@ -131,6 +157,6 @@ if __name__ == "__main__":
else:
print(f"Unknown phase: {phase}")
print(
"Usage: uv run scripts/debug_queen_prompt.py [building|staging|running|all]"
"Usage: uv run scripts/debug_queen_prompt.py [planning|building|staging|running|all]"
)
sys.exit(1)
+2 -2
View File
@@ -1,4 +1,4 @@
"""Quick test script for initialize_agent_package."""
"""Quick test script for initialize_and_build_agent."""
import sys
import os
@@ -14,6 +14,6 @@ import tools.coder_tools_server as srv
srv.PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
# Access the underlying function (FastMCP wraps it as FunctionTool)
tool = srv.initialize_agent_package
tool = srv.initialize_and_build_agent
result = tool.fn("richard_test2", nodes="intake,process,review")
print(result)
+83 -8
View File
@@ -3,7 +3,7 @@
Coder Tools MCP Server OpenCode-inspired coding tools.
Provides rich file I/O, fuzzy-match editing, git snapshots, and shell execution
for the hive_coder agent. Modeled after opencode's tool architecture.
for the queen agent. Modeled after opencode's tool architecture.
All paths scoped to a configurable project root for safety.
@@ -1252,6 +1252,53 @@ def validate_agent_package(agent_name: str) -> str:
path_parts.append(pythonpath)
env["PYTHONPATH"] = os.pathsep.join(path_parts)
# Step 0: Module contract — __init__.py must expose goal, nodes, edges
try:
_contract_script = textwrap.dedent("""\
import importlib, json
mod = importlib.import_module('{agent_name}')
missing = [a for a in ('goal', 'nodes', 'edges') if getattr(mod, a, None) is None]
if missing:
print(json.dumps({{
'valid': False,
'error': (
"Module '{agent_name}' is missing module-level attributes: "
+ ", ".join(missing) + ". "
"Fix: in {agent_name}/__init__.py, add "
"'from .agent import " + ", ".join(missing) + "' "
"so that 'import {agent_name}' exposes them at package level."
)
}}))
else:
print(json.dumps({{'valid': True}}))
""").format(agent_name=agent_name)
proc = subprocess.run(
["uv", "run", "python", "-c", _contract_script],
capture_output=True,
text=True,
timeout=30,
env=env,
cwd=PROJECT_ROOT,
stdin=subprocess.DEVNULL,
)
if proc.returncode == 0:
result = json.loads(proc.stdout.strip())
steps["module_contract"] = {
"passed": result["valid"],
"output": result.get("error", "goal, nodes, edges exported correctly"),
}
else:
steps["module_contract"] = {
"passed": False,
"error": (
f"Failed to import '{agent_name}': {proc.stderr.strip()[:1000]}. "
f"Fix: ensure {agent_name}/__init__.py exists and can be imported "
f"without errors (check syntax, missing dependencies, relative imports)."
),
}
except Exception as e:
steps["module_contract"] = {"passed": False, "error": str(e)}
# Step A: Class validation (subprocess for import isolation)
try:
proc = subprocess.run(
@@ -1321,9 +1368,11 @@ def validate_agent_package(agent_name: str) -> str:
result = json.loads(proc.stdout.strip())
steps["node_completeness"] = {
"passed": result["valid"],
"output": "; ".join(result["errors"])
if result["errors"]
else "All defined nodes are in the graph",
"output": (
"; ".join(result["errors"])
if result["errors"]
else "All defined nodes are in the graph"
),
}
if not result["valid"]:
steps["node_completeness"]["errors"] = result["errors"]
@@ -1434,7 +1483,7 @@ def _node_var_name(node_id: str) -> str:
@mcp.tool()
def initialize_agent_package(agent_name: str, nodes: str | None = None) -> str:
def initialize_and_build_agent(agent_name: str, nodes: str | None = None) -> str:
"""Scaffold a new agent package with placeholder files.
Creates exports/{agent_name}/ with all files needed for a runnable agent:
@@ -1985,6 +2034,9 @@ def runner_loaded():
''',
)
# Build list of all generated file paths for the caller.
all_file_paths = [info["path"] for info in files_written.values()]
return json.dumps(
{
"success": True,
@@ -1994,10 +2046,33 @@ def runner_loaded():
"nodes": node_list,
"files_written": files_written,
"file_count": len(files_written),
"files": all_file_paths,
"next_steps": [
f"Customize node definitions in exports/{agent_name}/nodes/__init__.py",
f"Define goal and edges in exports/{agent_name}/agent.py",
f'Run validate_agent_package("{agent_name}") to check structure',
(
"IMPORTANT: All generated files are structurally complete "
"with correct imports, class definition, validate() method, "
"and __init__.py exports. Use edit_file to customize TODO "
"placeholders — do NOT use write_file to rewrite entire files, "
"as this will break imports and structure."
),
(
f"Use edit_file to customize system prompts, tools, "
f"input_keys, output_keys, and success_criteria in "
f"exports/{agent_name}/nodes/__init__.py"
),
(
f"Use edit_file to customize goal description, "
f"success_criteria values, constraint values, edge "
f"definitions, and identity_prompt in "
f"exports/{agent_name}/agent.py"
),
(
"Do NOT modify: imports at top of agent.py, the class "
"definition, validate() method, _build_graph()/_setup()/"
"lifecycle methods, or __init__.py exports — they are "
"already correct."
),
f'Run validate_agent_package("{agent_name}") to verify structure',
],
},
indent=2,
@@ -17,6 +17,9 @@ AIRTABLE_CREDENTIALS = {
"airtable_update_records",
"airtable_list_bases",
"airtable_get_base_schema",
"airtable_delete_records",
"airtable_search_records",
"airtable_list_collaborators",
],
required=True,
startup_required=False,
@@ -14,6 +14,9 @@ APOLLO_CREDENTIALS = {
"apollo_enrich_company",
"apollo_search_people",
"apollo_search_companies",
"apollo_get_person_activities",
"apollo_list_email_accounts",
"apollo_bulk_enrich_people",
],
required=True,
startup_required=False,
@@ -16,6 +16,9 @@ ASANA_CREDENTIALS = {
"asana_get_task",
"asana_create_task",
"asana_search_tasks",
"asana_update_task",
"asana_add_comment",
"asana_create_subtask",
],
required=True,
startup_required=False,
@@ -16,6 +16,9 @@ AWS_S3_CREDENTIALS = {
"s3_get_object",
"s3_put_object",
"s3_delete_object",
"s3_copy_object",
"s3_get_object_metadata",
"s3_generate_presigned_url",
],
required=True,
startup_required=False,
@@ -42,6 +45,9 @@ AWS_S3_CREDENTIALS = {
"s3_get_object",
"s3_put_object",
"s3_delete_object",
"s3_copy_object",
"s3_get_object_metadata",
"s3_generate_presigned_url",
],
required=True,
startup_required=False,
@@ -15,6 +15,9 @@ BREVO_CREDENTIALS = {
"brevo_get_contact",
"brevo_update_contact",
"brevo_get_email_stats",
"brevo_list_contacts",
"brevo_delete_contact",
"brevo_list_email_campaigns",
],
required=True,
startup_required=False,
@@ -16,6 +16,9 @@ CALENDLY_CREDENTIALS = {
"calendly_list_scheduled_events",
"calendly_get_scheduled_event",
"calendly_list_invitees",
"calendly_cancel_event",
"calendly_list_webhooks",
"calendly_get_event_type",
],
required=True,
startup_required=False,
@@ -16,6 +16,9 @@ CLOUDINARY_CREDENTIALS = {
"cloudinary_get_resource",
"cloudinary_delete_resource",
"cloudinary_search",
"cloudinary_get_usage",
"cloudinary_rename_resource",
"cloudinary_add_tag",
],
required=True,
startup_required=False,
@@ -41,6 +44,9 @@ CLOUDINARY_CREDENTIALS = {
"cloudinary_get_resource",
"cloudinary_delete_resource",
"cloudinary_search",
"cloudinary_get_usage",
"cloudinary_rename_resource",
"cloudinary_add_tag",
],
required=True,
startup_required=False,
@@ -60,6 +66,9 @@ CLOUDINARY_CREDENTIALS = {
"cloudinary_get_resource",
"cloudinary_delete_resource",
"cloudinary_search",
"cloudinary_get_usage",
"cloudinary_rename_resource",
"cloudinary_add_tag",
],
required=True,
startup_required=False,
@@ -16,6 +16,9 @@ CONFLUENCE_CREDENTIALS = {
"confluence_get_page",
"confluence_create_page",
"confluence_search",
"confluence_update_page",
"confluence_delete_page",
"confluence_get_page_children",
],
required=True,
startup_required=False,
@@ -41,6 +44,9 @@ CONFLUENCE_CREDENTIALS = {
"confluence_get_page",
"confluence_create_page",
"confluence_search",
"confluence_update_page",
"confluence_delete_page",
"confluence_get_page_children",
],
required=True,
startup_required=False,
@@ -60,6 +66,9 @@ CONFLUENCE_CREDENTIALS = {
"confluence_get_page",
"confluence_create_page",
"confluence_search",
"confluence_update_page",
"confluence_delete_page",
"confluence_get_page_children",
],
required=True,
startup_required=False,
@@ -14,6 +14,9 @@ DISCORD_CREDENTIALS = {
"discord_list_channels",
"discord_send_message",
"discord_get_messages",
"discord_get_channel",
"discord_create_reaction",
"discord_delete_message",
],
required=True,
startup_required=False,
@@ -14,6 +14,9 @@ DOCKER_HUB_CREDENTIALS = {
"docker_hub_list_repos",
"docker_hub_list_tags",
"docker_hub_get_repo",
"docker_hub_get_tag_detail",
"docker_hub_delete_tag",
"docker_hub_list_webhooks",
],
required=True,
startup_required=False,
@@ -26,6 +26,9 @@ GITHUB_CREDENTIALS = {
"github_list_stargazers",
"github_get_user_profile",
"github_get_user_emails",
"github_list_commits",
"github_create_release",
"github_list_workflow_runs",
],
required=True,
startup_required=False,
@@ -17,6 +17,9 @@ GITLAB_CREDENTIALS = {
"gitlab_get_issue",
"gitlab_create_issue",
"gitlab_list_merge_requests",
"gitlab_update_issue",
"gitlab_get_merge_request",
"gitlab_create_merge_request_note",
],
required=True,
startup_required=False,
@@ -15,6 +15,9 @@ GOOGLE_ANALYTICS_CREDENTIALS = {
"ga_get_realtime",
"ga_get_top_pages",
"ga_get_traffic_sources",
"ga_get_user_demographics",
"ga_get_conversion_events",
"ga_get_landing_pages",
],
required=True,
startup_required=False,
@@ -15,6 +15,9 @@ GOOGLE_SEARCH_CONSOLE_CREDENTIALS = {
"gsc_list_sitemaps",
"gsc_inspect_url",
"gsc_submit_sitemap",
"gsc_top_queries",
"gsc_top_pages",
"gsc_delete_sitemap",
],
required=True,
startup_required=False,
@@ -1,34 +0,0 @@
"""
Google Sheets credentials.
Contains credentials for Google Sheets spreadsheet access.
Requires GOOGLE_SHEETS_API_KEY for read-only access to public sheets.
"""
from .base import CredentialSpec
GOOGLE_SHEETS_CREDENTIALS = {
"google_sheets_key": CredentialSpec(
env_var="GOOGLE_SHEETS_API_KEY",
tools=[
"sheets_get_spreadsheet",
"sheets_read_range",
"sheets_batch_read",
],
required=True,
startup_required=False,
help_url="https://console.cloud.google.com/apis/credentials",
description="Google API key for reading public Google Sheets",
direct_api_key_supported=True,
api_key_instructions="""To set up Google Sheets API access:
1. Go to https://console.cloud.google.com/apis/credentials
2. Click 'Create Credentials' > 'API Key'
3. Enable the Google Sheets API in APIs & Services > Library
4. Target spreadsheets must be shared as 'Anyone with the link'
5. Set environment variable:
export GOOGLE_SHEETS_API_KEY=your-api-key""",
health_check_endpoint="",
credential_id="google_sheets_key",
credential_key="api_key",
),
}
@@ -17,6 +17,9 @@ GREENHOUSE_CREDENTIALS = {
"greenhouse_get_candidate",
"greenhouse_list_applications",
"greenhouse_get_application",
"greenhouse_list_offers",
"greenhouse_add_candidate_note",
"greenhouse_list_scorecards",
],
required=True,
startup_required=False,
@@ -22,6 +22,9 @@ HUBSPOT_CREDENTIALS = {
"hubspot_get_deal",
"hubspot_create_deal",
"hubspot_update_deal",
"hubspot_delete_object",
"hubspot_list_associations",
"hubspot_create_association",
],
required=True,
startup_required=False,
@@ -18,6 +18,9 @@ INTERCOM_CREDENTIALS = {
"intercom_add_tag",
"intercom_assign_conversation",
"intercom_list_teams",
"intercom_close_conversation",
"intercom_create_contact",
"intercom_list_conversations",
],
required=True,
startup_required=False,
+9
View File
@@ -17,6 +17,9 @@ JIRA_CREDENTIALS = {
"jira_list_projects",
"jira_get_project",
"jira_add_comment",
"jira_update_issue",
"jira_list_transitions",
"jira_transition_issue",
],
required=True,
startup_required=False,
@@ -43,6 +46,9 @@ JIRA_CREDENTIALS = {
"jira_list_projects",
"jira_get_project",
"jira_add_comment",
"jira_update_issue",
"jira_list_transitions",
"jira_transition_issue",
],
required=True,
startup_required=False,
@@ -63,6 +69,9 @@ JIRA_CREDENTIALS = {
"jira_list_projects",
"jira_get_project",
"jira_add_comment",
"jira_update_issue",
"jira_list_transitions",
"jira_transition_issue",
],
required=True,
startup_required=False,
@@ -28,6 +28,9 @@ LINEAR_CREDENTIALS = {
"linear_users_list",
"linear_user_get",
"linear_viewer",
"linear_cycles_list",
"linear_issue_comments_list",
"linear_issue_relation_create",
],
required=True,
startup_required=False,
@@ -16,6 +16,9 @@ LUSHA_CREDENTIALS = {
"lusha_search_contacts",
"lusha_search_companies",
"lusha_get_usage",
"lusha_bulk_enrich_persons",
"lusha_get_technologies",
"lusha_search_decision_makers",
],
required=True,
startup_required=False,
+8 -1
View File
@@ -9,7 +9,14 @@ from .base import CredentialSpec
NEWS_CREDENTIALS = {
"newsdata": CredentialSpec(
env_var="NEWSDATA_API_KEY",
tools=["news_search", "news_headlines", "news_by_company"],
tools=[
"news_search",
"news_headlines",
"news_by_company",
"news_latest",
"news_by_source",
"news_by_topic",
],
node_types=[],
required=True,
startup_required=False,
@@ -16,6 +16,9 @@ NOTION_CREDENTIALS = {
"notion_create_page",
"notion_query_database",
"notion_get_database",
"notion_update_page",
"notion_archive_page",
"notion_append_blocks",
],
required=True,
startup_required=False,
@@ -16,6 +16,9 @@ PAGERDUTY_CREDENTIALS = {
"pagerduty_create_incident",
"pagerduty_update_incident",
"pagerduty_list_services",
"pagerduty_list_oncalls",
"pagerduty_add_incident_note",
"pagerduty_list_escalation_policies",
],
required=True,
startup_required=False,
@@ -37,6 +40,7 @@ PAGERDUTY_CREDENTIALS = {
tools=[
"pagerduty_create_incident",
"pagerduty_update_incident",
"pagerduty_add_incident_note",
],
required=False,
startup_required=False,
@@ -20,6 +20,9 @@ PIPEDRIVE_CREDENTIALS = {
"pipedrive_list_pipelines",
"pipedrive_list_stages",
"pipedrive_add_note",
"pipedrive_update_deal",
"pipedrive_create_person",
"pipedrive_create_activity",
],
required=True,
startup_required=False,
@@ -13,6 +13,9 @@ POSTGRES_CREDENTIALS = {
"pg_list_tables",
"pg_describe_table",
"pg_explain",
"pg_get_table_stats",
"pg_list_indexes",
"pg_get_foreign_keys",
],
required=True,
startup_required=False,
@@ -14,6 +14,9 @@ PUSHOVER_CREDENTIALS = {
"pushover_validate_user",
"pushover_list_sounds",
"pushover_check_receipt",
"pushover_cancel_receipt",
"pushover_send_glance",
"pushover_get_limits",
],
required=True,
startup_required=False,
@@ -16,6 +16,9 @@ QUICKBOOKS_CREDENTIALS = {
"quickbooks_create_customer",
"quickbooks_create_invoice",
"quickbooks_get_company_info",
"quickbooks_list_invoices",
"quickbooks_get_customer",
"quickbooks_create_payment",
],
required=True,
startup_required=False,
@@ -41,6 +44,9 @@ QUICKBOOKS_CREDENTIALS = {
"quickbooks_create_customer",
"quickbooks_create_invoice",
"quickbooks_get_company_info",
"quickbooks_list_invoices",
"quickbooks_get_customer",
"quickbooks_create_payment",
],
required=True,
startup_required=False,
@@ -15,6 +15,9 @@ REDDIT_CREDENTIALS = {
"reddit_get_posts",
"reddit_get_comments",
"reddit_get_user",
"reddit_get_subreddit_info",
"reddit_get_post_detail",
"reddit_get_user_posts",
],
required=True,
startup_required=False,
@@ -41,6 +44,9 @@ REDDIT_CREDENTIALS = {
"reddit_get_posts",
"reddit_get_comments",
"reddit_get_user",
"reddit_get_subreddit_info",
"reddit_get_post_detail",
"reddit_get_user_posts",
],
required=True,
startup_required=False,
@@ -17,6 +17,9 @@ SALESFORCE_CREDENTIALS = {
"salesforce_update_record",
"salesforce_describe_object",
"salesforce_list_objects",
"salesforce_delete_record",
"salesforce_search_records",
"salesforce_get_record_count",
],
required=True,
startup_required=False,
@@ -43,6 +46,9 @@ SALESFORCE_CREDENTIALS = {
"salesforce_update_record",
"salesforce_describe_object",
"salesforce_list_objects",
"salesforce_delete_record",
"salesforce_search_records",
"salesforce_get_record_count",
],
required=True,
startup_required=False,
+9 -1
View File
@@ -81,7 +81,15 @@ SEARCH_CREDENTIALS = {
),
"exa_search": CredentialSpec(
env_var="EXA_API_KEY",
tools=["exa_search", "exa_find_similar", "exa_get_contents", "exa_answer"],
tools=[
"exa_search",
"exa_find_similar",
"exa_get_contents",
"exa_answer",
"exa_search_news",
"exa_search_papers",
"exa_search_companies",
],
node_types=[],
required=True,
startup_required=False,
@@ -15,6 +15,9 @@ SERPAPI_CREDENTIALS = {
"scholar_get_author",
"patents_search",
"patents_get_details",
"scholar_cited_by",
"scholar_search_profiles",
"serpapi_google_search",
],
required=True,
startup_required=False,
@@ -17,6 +17,9 @@ SHOPIFY_CREDENTIALS = {
"shopify_get_product",
"shopify_list_customers",
"shopify_search_customers",
"shopify_update_product",
"shopify_get_customer",
"shopify_create_draft_order",
],
required=True,
startup_required=False,
@@ -43,6 +46,9 @@ SHOPIFY_CREDENTIALS = {
"shopify_get_product",
"shopify_list_customers",
"shopify_search_customers",
"shopify_update_product",
"shopify_get_customer",
"shopify_create_draft_order",
],
required=True,
startup_required=False,
@@ -58,6 +58,9 @@ SLACK_CREDENTIALS = {
"slack_kick_user_from_channel",
"slack_delete_file",
"slack_get_team_stats",
"slack_get_channel_info",
"slack_list_files",
"slack_get_file_info",
],
required=True,
startup_required=False,
@@ -60,6 +60,9 @@ STRIPE_CREDENTIALS = {
"stripe_list_payment_methods",
"stripe_get_payment_method",
"stripe_detach_payment_method",
"stripe_list_disputes",
"stripe_list_events",
"stripe_create_checkout_session",
],
required=True,
startup_required=False,
@@ -20,6 +20,9 @@ TELEGRAM_CREDENTIALS = {
"telegram_get_chat",
"telegram_pin_message",
"telegram_unpin_message",
"telegram_get_chat_member_count",
"telegram_send_video",
"telegram_set_chat_description",
],
required=True,
startup_required=False,
@@ -20,6 +20,9 @@ TRELLO_CREDENTIALS = {
"trello_update_card",
"trello_add_comment",
"trello_add_attachment",
"trello_get_card",
"trello_create_list",
"trello_search_cards",
],
required=True,
startup_required=False,
@@ -50,6 +53,9 @@ TRELLO_CREDENTIALS = {
"trello_update_card",
"trello_add_comment",
"trello_add_attachment",
"trello_get_card",
"trello_create_list",
"trello_search_cards",
],
required=True,
startup_required=False,
@@ -15,6 +15,9 @@ TWILIO_CREDENTIALS = {
"twilio_send_whatsapp",
"twilio_list_messages",
"twilio_get_message",
"twilio_list_phone_numbers",
"twilio_list_calls",
"twilio_delete_message",
],
required=True,
startup_required=False,
@@ -38,6 +41,9 @@ TWILIO_CREDENTIALS = {
"twilio_send_whatsapp",
"twilio_list_messages",
"twilio_get_message",
"twilio_list_phone_numbers",
"twilio_list_calls",
"twilio_delete_message",
],
required=True,
startup_required=False,
@@ -15,6 +15,9 @@ TWITTER_CREDENTIALS = {
"twitter_get_user",
"twitter_get_user_tweets",
"twitter_get_tweet",
"twitter_get_user_followers",
"twitter_get_tweet_replies",
"twitter_get_list_tweets",
],
required=True,
startup_required=False,
@@ -16,6 +16,9 @@ ZENDESK_CREDENTIALS = {
"zendesk_create_ticket",
"zendesk_update_ticket",
"zendesk_search_tickets",
"zendesk_get_ticket_comments",
"zendesk_add_ticket_comment",
"zendesk_list_users",
],
required=True,
startup_required=False,
@@ -41,6 +44,9 @@ ZENDESK_CREDENTIALS = {
"zendesk_create_ticket",
"zendesk_update_ticket",
"zendesk_search_tickets",
"zendesk_get_ticket_comments",
"zendesk_add_ticket_comment",
"zendesk_list_users",
],
required=True,
startup_required=False,
@@ -60,6 +66,9 @@ ZENDESK_CREDENTIALS = {
"zendesk_create_ticket",
"zendesk_update_ticket",
"zendesk_search_tickets",
"zendesk_get_ticket_comments",
"zendesk_add_ticket_comment",
"zendesk_list_users",
],
required=True,
startup_required=False,
+3
View File
@@ -17,6 +17,9 @@ ZOOM_CREDENTIALS = {
"zoom_create_meeting",
"zoom_delete_meeting",
"zoom_list_recordings",
"zoom_update_meeting",
"zoom_list_meeting_participants",
"zoom_list_meeting_registrants",
],
required=True,
startup_required=False,
@@ -50,6 +50,16 @@ def _patch(url: str, headers: dict, body: dict) -> dict:
return resp.json()
def _delete(url: str, headers: dict, params: dict | None = None) -> dict:
"""Send a DELETE request."""
resp = httpx.delete(url, headers=headers, params=params, timeout=30)
if resp.status_code >= 400:
return {"error": f"HTTP {resp.status_code}: {resp.text[:500]}"}
if not resp.content:
return {"status": "ok"}
return resp.json()
def register_tools(mcp: FastMCP, credentials: Any = None) -> None:
"""Register Airtable tools."""
@@ -323,3 +333,134 @@ def register_tools(mcp: FastMCP, credentials: Any = None) -> None:
for t in tables
],
}
@mcp.tool()
def airtable_delete_records(
base_id: str,
table_name: str,
record_ids: str,
) -> dict:
"""Delete records from an Airtable table (up to 10 per request).
Args:
base_id: The Airtable base ID (starts with 'app').
table_name: Table name or ID.
record_ids: Comma-separated record IDs to delete (e.g. 'recABC,recDEF').
"""
hdrs = _get_headers()
if hdrs is None:
return {
"error": "AIRTABLE_PAT is required",
"help": "Set AIRTABLE_PAT env var with your Airtable personal access token",
}
if not base_id or not table_name or not record_ids:
return {"error": "base_id, table_name, and record_ids are required"}
ids = [rid.strip() for rid in record_ids.split(",") if rid.strip()]
if len(ids) > 10:
return {"error": "maximum 10 records per request"}
url = f"{BASE_URL}/{base_id}/{table_name}"
# Airtable DELETE uses repeated records[] query params
params = [("records[]", rid) for rid in ids]
resp = httpx.delete(url, headers=hdrs, params=params, timeout=30)
if resp.status_code >= 400:
return {"error": f"HTTP {resp.status_code}: {resp.text[:500]}"}
data = resp.json()
deleted = data.get("records", [])
return {
"result": "deleted",
"count": len(deleted),
"deleted_ids": [r.get("id", "") for r in deleted if r.get("deleted")],
}
@mcp.tool()
def airtable_search_records(
base_id: str,
table_name: str,
field_name: str,
search_value: str,
max_records: int = 100,
) -> dict:
"""Search records by matching a field value using an Airtable formula.
Args:
base_id: The Airtable base ID (starts with 'app').
table_name: Table name or ID.
field_name: The field name to search in.
search_value: The value to search for (exact match or FIND for partial).
max_records: Maximum number of records to return (default 100).
"""
hdrs = _get_headers()
if hdrs is None:
return {
"error": "AIRTABLE_PAT is required",
"help": "Set AIRTABLE_PAT env var with your Airtable personal access token",
}
if not base_id or not table_name or not field_name or not search_value:
return {"error": "base_id, table_name, field_name, and search_value are required"}
# Use FIND for case-insensitive partial match
escaped = search_value.replace('"', '\\"')
formula = f'FIND(LOWER("{escaped}"), LOWER({{{field_name}}}))'
params: dict[str, Any] = {
"filterByFormula": formula,
"maxRecords": str(max_records),
}
url = f"{BASE_URL}/{base_id}/{table_name}"
data = _get(url, hdrs, params)
if "error" in data:
return data
records = data.get("records", [])
return {
"count": len(records),
"records": [
{
"id": r["id"],
"fields": r.get("fields", {}),
"created_time": r.get("createdTime"),
}
for r in records
],
}
@mcp.tool()
def airtable_list_collaborators(
base_id: str,
) -> dict:
"""List collaborators who have access to an Airtable base.
Args:
base_id: The Airtable base ID (starts with 'app').
"""
hdrs = _get_headers()
if hdrs is None:
return {
"error": "AIRTABLE_PAT is required",
"help": "Set AIRTABLE_PAT env var with your Airtable personal access token",
}
if not base_id:
return {"error": "base_id is required"}
# Uses the meta API endpoint for base sharing
url = f"https://api.airtable.com/v0/meta/bases/{base_id}/collaborators"
data = _get(url, hdrs)
if "error" in data:
return data
collabs = data.get("collaborators", [])
return {
"count": len(collabs),
"collaborators": [
{
"user_id": c.get("userId", ""),
"email": c.get("email", ""),
"permission_level": c.get("permissionLevel", ""),
}
for c in collabs
],
}
@@ -269,6 +269,102 @@ class _ApolloClient:
}
return result
def get_person_activities(
self,
person_id: str,
) -> dict[str, Any]:
"""Get activity history for a person (emails, calls, tasks)."""
response = httpx.get(
f"{APOLLO_API_BASE}/activities",
headers=self._headers,
params={"contact_id": person_id},
timeout=30.0,
)
result = self._handle_response(response)
if "error" not in result:
activities = result.get("activities", [])
return {
"contact_id": person_id,
"count": len(activities),
"activities": [
{
"id": a.get("id"),
"type": a.get("type"),
"subject": a.get("subject"),
"body": (a.get("body") or "")[:500],
"created_at": a.get("created_at"),
"completed_at": a.get("completed_at"),
"status": a.get("status"),
"priority": a.get("priority"),
}
for a in activities[:50]
],
}
return result
def list_email_accounts(self) -> dict[str, Any]:
"""List email accounts connected to Apollo."""
response = httpx.get(
f"{APOLLO_API_BASE}/email_accounts",
headers=self._headers,
timeout=30.0,
)
result = self._handle_response(response)
if "error" not in result:
accounts = result.get("email_accounts", [])
return {
"count": len(accounts),
"email_accounts": [
{
"id": a.get("id"),
"email": a.get("email"),
"type": a.get("type"),
"active": a.get("active"),
"default": a.get("default"),
"last_synced_at": a.get("last_synced_at"),
"sending_daily_limit": a.get("sending_daily_limit"),
"emails_sent_today": a.get("emails_sent_today"),
}
for a in accounts
],
}
return result
def bulk_enrich_people(
self,
details: list[dict[str, Any]],
) -> dict[str, Any]:
"""Bulk enrich up to 10 people at once."""
body: dict[str, Any] = {"details": details[:10]}
response = httpx.post(
f"{APOLLO_API_BASE}/people/bulk_match",
headers=self._headers,
json=body,
timeout=60.0,
)
result = self._handle_response(response)
if "error" not in result:
matches = result.get("matches", [])
enriched = []
for m in matches:
if m is None:
enriched.append({"match_found": False})
continue
enriched.append(
{
"match_found": True,
"id": m.get("id"),
"name": m.get("name"),
"title": m.get("title"),
"email": m.get("email"),
"email_status": m.get("email_status"),
"linkedin_url": m.get("linkedin_url"),
"organization_name": (m.get("organization") or {}).get("name"),
}
)
return {"count": len(enriched), "results": enriched}
return result
def search_companies(
self,
industries: list[str] | None = None,
@@ -526,6 +622,89 @@ def register_tools(
except httpx.RequestError as e:
return {"error": f"Network error: {e}"}
# --- Person Activities ---
@mcp.tool()
def apollo_get_person_activities(person_id: str) -> dict:
"""
Get activity history for a person in Apollo (emails, calls, tasks).
Args:
person_id: Apollo person/contact ID (required)
Returns:
Dict with activities list (type, subject, body, status, timestamps)
"""
client = _get_client()
if isinstance(client, dict):
return client
if not person_id:
return {"error": "person_id is required"}
try:
return client.get_person_activities(person_id)
except httpx.TimeoutException:
return {"error": "Request timed out"}
except httpx.RequestError as e:
return {"error": f"Network error: {e}"}
# --- Email Accounts ---
@mcp.tool()
def apollo_list_email_accounts() -> dict:
"""
List email accounts connected to Apollo for sending sequences.
Returns:
Dict with email accounts (email, type, active, daily limit, sent today)
"""
client = _get_client()
if isinstance(client, dict):
return client
try:
return client.list_email_accounts()
except httpx.TimeoutException:
return {"error": "Request timed out"}
except httpx.RequestError as e:
return {"error": f"Network error: {e}"}
# --- Bulk Enrichment ---
@mcp.tool()
def apollo_bulk_enrich_people(details_json: str) -> dict:
"""
Bulk enrich up to 10 people at once by email or domain+name.
Args:
details_json: JSON array of objects, each with lookup keys.
e.g. '[{"email": "john@acme.com"},
{"first_name": "Jane", "last_name": "Doe", "domain": "acme.com"}]'
Returns:
Dict with enrichment results for each person
"""
client = _get_client()
if isinstance(client, dict):
return client
if not details_json:
return {"error": "details_json is required"}
import json
try:
details = json.loads(details_json)
except json.JSONDecodeError:
return {"error": "details_json must be valid JSON"}
if not isinstance(details, list) or len(details) == 0:
return {"error": "details_json must be a non-empty JSON array"}
if len(details) > 10:
return {"error": "maximum 10 people per bulk request"}
try:
return client.bulk_enrich_people(details)
except httpx.TimeoutException:
return {"error": "Request timed out"}
except httpx.RequestError as e:
return {"error": f"Network error: {e}"}
# --- Company Search ---
@mcp.tool()
@@ -71,6 +71,25 @@ def _post(endpoint: str, token: str, body: dict | None = None) -> dict[str, Any]
return {"error": f"Asana request failed: {e!s}"}
def _put(endpoint: str, token: str, body: dict | None = None) -> dict[str, Any]:
try:
resp = httpx.put(
f"{ASANA_API}/{endpoint}",
headers=_headers(token),
json={"data": body or {}},
timeout=30.0,
)
if resp.status_code == 401:
return {"error": "Unauthorized. Check your ASANA_ACCESS_TOKEN."}
if resp.status_code not in (200, 201):
return {"error": f"Asana API error {resp.status_code}: {resp.text[:500]}"}
return resp.json()
except httpx.TimeoutException:
return {"error": "Request to Asana timed out"}
except Exception as e:
return {"error": f"Asana request failed: {e!s}"}
def _auth_error() -> dict[str, Any]:
return {
"error": "ASANA_ACCESS_TOKEN not set",
@@ -331,3 +350,134 @@ def register_tools(
}
)
return {"query": query, "tasks": tasks}
@mcp.tool()
def asana_update_task(
task_gid: str,
name: str = "",
notes: str = "",
completed: bool | None = None,
due_on: str = "",
assignee: str = "",
) -> dict[str, Any]:
"""
Update an existing Asana task.
Args:
task_gid: Task GID (required)
name: New task name (optional)
notes: New task description/notes (optional)
completed: Set completion status (optional)
due_on: New due date YYYY-MM-DD, or empty string to clear (optional)
assignee: New assignee GID or "me" (optional)
Returns:
Dict with updated task (gid, name, completed) or error
"""
token = _get_token(credentials)
if not token:
return _auth_error()
if not task_gid:
return {"error": "task_gid is required"}
body: dict[str, Any] = {}
if name:
body["name"] = name
if notes:
body["notes"] = notes
if completed is not None:
body["completed"] = completed
if due_on:
body["due_on"] = due_on
if assignee:
body["assignee"] = assignee
if not body:
return {"error": "At least one field to update is required"}
data = _put(f"tasks/{task_gid}", token, body)
if "error" in data:
return data
t = data.get("data", {})
return {
"gid": t.get("gid", ""),
"name": t.get("name", ""),
"completed": t.get("completed", False),
"status": "updated",
}
@mcp.tool()
def asana_add_comment(
task_gid: str,
text: str,
) -> dict[str, Any]:
"""
Add a comment (story) to an Asana task.
Args:
task_gid: Task GID (required)
text: Comment text (required). Supports rich text formatting.
Returns:
Dict with created comment (gid, text, created_at) or error
"""
token = _get_token(credentials)
if not token:
return _auth_error()
if not task_gid or not text:
return {"error": "task_gid and text are required"}
data = _post(f"tasks/{task_gid}/stories", token, {"text": text})
if "error" in data:
return data
s = data.get("data", {})
return {
"gid": s.get("gid", ""),
"text": (s.get("text", "") or "")[:500],
"created_at": s.get("created_at", ""),
"status": "created",
}
@mcp.tool()
def asana_create_subtask(
parent_task_gid: str,
name: str,
notes: str = "",
assignee: str = "",
due_on: str = "",
) -> dict[str, Any]:
"""
Create a subtask under an existing Asana task.
Args:
parent_task_gid: Parent task GID (required)
name: Subtask name (required)
notes: Subtask description/notes (optional)
assignee: Assignee GID or "me" (optional)
due_on: Due date YYYY-MM-DD (optional)
Returns:
Dict with created subtask (gid, name) or error
"""
token = _get_token(credentials)
if not token:
return _auth_error()
if not parent_task_gid or not name:
return {"error": "parent_task_gid and name are required"}
body: dict[str, Any] = {"name": name}
if notes:
body["notes"] = notes
if assignee:
body["assignee"] = assignee
if due_on:
body["due_on"] = due_on
data = _post(f"tasks/{parent_task_gid}/subtasks", token, body)
if "error" in data:
return data
t = data.get("data", {})
return {"gid": t.get("gid", ""), "name": t.get("name", ""), "status": "created"}
@@ -338,3 +338,147 @@ def register_tools(mcp: FastMCP, credentials: Any = None) -> None:
return {"error": f"HTTP {resp.status_code}: {resp.text[:500]}"}
return {"result": "deleted", "key": key}
@mcp.tool()
def s3_copy_object(
source_bucket: str,
source_key: str,
dest_bucket: str,
dest_key: str,
) -> dict:
"""Copy an object within or between S3 buckets.
Args:
source_bucket: Source S3 bucket name.
source_key: Source object key (path).
dest_bucket: Destination S3 bucket name.
dest_key: Destination object key (path).
"""
cfg = _get_config()
if isinstance(cfg, dict):
return cfg
access_key, secret_key, region = cfg
if not source_bucket or not source_key or not dest_bucket or not dest_key:
return {"error": "source_bucket, source_key, dest_bucket, and dest_key are required"}
extra = {"x-amz-copy-source": f"/{source_bucket}/{source_key}"}
resp = _s3_request(
"PUT", dest_bucket, dest_key, access_key, secret_key, region, extra_headers=extra
)
if resp.status_code >= 400:
return {"error": f"HTTP {resp.status_code}: {resp.text[:500]}"}
return {
"result": "copied",
"source": f"{source_bucket}/{source_key}",
"destination": f"{dest_bucket}/{dest_key}",
}
@mcp.tool()
def s3_get_object_metadata(
bucket: str,
key: str,
) -> dict:
"""Get object metadata without downloading content (HEAD request).
Args:
bucket: S3 bucket name.
key: Object key (path).
"""
cfg = _get_config()
if isinstance(cfg, dict):
return cfg
access_key, secret_key, region = cfg
if not bucket or not key:
return {"error": "bucket and key are required"}
resp = _s3_request("HEAD", bucket, key, access_key, secret_key, region)
if resp.status_code == 404:
return {"error": "Object not found"}
if resp.status_code >= 400:
return {"error": f"HTTP {resp.status_code}"}
metadata = {
"key": key,
"content_type": resp.headers.get("content-type", ""),
"content_length": resp.headers.get("content-length"),
"last_modified": resp.headers.get("last-modified"),
"etag": resp.headers.get("etag"),
"storage_class": resp.headers.get("x-amz-storage-class", "STANDARD"),
}
# Include any x-amz-meta-* custom metadata
for header, value in resp.headers.items():
if header.lower().startswith("x-amz-meta-"):
meta_key = header[len("x-amz-meta-") :]
metadata[f"meta_{meta_key}"] = value
return metadata
@mcp.tool()
def s3_generate_presigned_url(
bucket: str,
key: str,
expires_in: int = 3600,
) -> dict:
"""Generate a pre-signed URL for temporary access to an S3 object.
The URL allows anyone with it to download the object without
AWS credentials, until it expires.
Args:
bucket: S3 bucket name.
key: Object key (path).
expires_in: URL validity in seconds (default 3600 = 1 hour, max 604800 = 7 days).
"""
cfg = _get_config()
if isinstance(cfg, dict):
return cfg
access_key, secret_key, region = cfg
if not bucket or not key:
return {"error": "bucket and key are required"}
expires_in = max(1, min(expires_in, 604800))
now = datetime.datetime.now(datetime.UTC)
datestamp = now.strftime("%Y%m%d")
amz_date = now.strftime("%Y%m%dT%H%M%SZ")
credential_scope = f"{datestamp}/{region}/s3/aws4_request"
credential = f"{access_key}/{credential_scope}"
host = f"{bucket}.s3.{region}.amazonaws.com"
path = f"/{key}"
query_params = {
"X-Amz-Algorithm": "AWS4-HMAC-SHA256",
"X-Amz-Credential": credential,
"X-Amz-Date": amz_date,
"X-Amz-Expires": str(expires_in),
"X-Amz-SignedHeaders": "host",
}
sorted_params = sorted(query_params.items())
canonical_qs = "&".join(
f"{urllib.parse.quote(k, safe='')}={urllib.parse.quote(str(v), safe='')}"
for k, v in sorted_params
)
canonical_request = f"GET\n{path}\n{canonical_qs}\nhost:{host}\n\nhost\nUNSIGNED-PAYLOAD"
string_to_sign = (
f"AWS4-HMAC-SHA256\n{amz_date}\n{credential_scope}\n"
f"{hashlib.sha256(canonical_request.encode()).hexdigest()}"
)
signing_key = _get_signing_key(secret_key, datestamp, region)
signature = hmac.new(
signing_key, string_to_sign.encode("utf-8"), hashlib.sha256
).hexdigest()
presigned_url = f"https://{host}{path}?{canonical_qs}&X-Amz-Signature={signature}"
return {
"url": presigned_url,
"expires_in": expires_in,
"key": key,
"bucket": bucket,
}
@@ -178,6 +178,51 @@ class _BrevoClient:
)
return self._handle_response(response)
def list_contacts(
self,
limit: int = 50,
offset: int = 0,
modified_since: str | None = None,
) -> dict[str, Any]:
"""List contacts with pagination."""
params: dict[str, Any] = {"limit": limit, "offset": offset}
if modified_since:
params["modifiedSince"] = modified_since
response = httpx.get(
f"{BREVO_API_BASE}/contacts",
headers=self._headers,
params=params,
timeout=30.0,
)
return self._handle_response(response)
def delete_contact(self, email: str) -> dict[str, Any]:
"""Delete a contact by email."""
response = httpx.delete(
f"{BREVO_API_BASE}/contacts/{email}",
headers=self._headers,
timeout=30.0,
)
return self._handle_response(response)
def list_email_campaigns(
self,
status: str | None = None,
limit: int = 50,
offset: int = 0,
) -> dict[str, Any]:
"""List email campaigns."""
params: dict[str, Any] = {"limit": limit, "offset": offset}
if status:
params["status"] = status
response = httpx.get(
f"{BREVO_API_BASE}/emailCampaigns",
headers=self._headers,
params=params,
timeout=30.0,
)
return self._handle_response(response)
def register_tools(
mcp: FastMCP,
@@ -421,6 +466,134 @@ def register_tools(
except httpx.RequestError as e:
return {"error": f"Network error: {e}"}
@mcp.tool()
def brevo_list_contacts(
limit: int = 50,
offset: int = 0,
modified_since: str = "",
) -> dict:
"""
List contacts in Brevo with pagination.
Args:
limit: Number of contacts per page (default 50, max 1000)
offset: Pagination offset (default 0)
modified_since: Filter by modification date (ISO 8601, optional)
Returns:
Dict with contacts list and total count
"""
client = _get_client()
if isinstance(client, dict):
return client
try:
result = client.list_contacts(
limit=max(1, min(limit, 1000)),
offset=offset,
modified_since=modified_since or None,
)
if "error" in result:
return result
contacts = result.get("contacts", [])
return {
"count": len(contacts),
"total": result.get("count", len(contacts)),
"contacts": [
{
"id": c.get("id"),
"email": c.get("email"),
"first_name": (c.get("attributes") or {}).get("FIRSTNAME"),
"last_name": (c.get("attributes") or {}).get("LASTNAME"),
"list_ids": c.get("listIds", []),
"email_blacklisted": c.get("emailBlacklisted", False),
"modified_at": c.get("modifiedAt"),
}
for c in contacts
],
}
except httpx.TimeoutException:
return {"error": "Request timed out"}
except httpx.RequestError as e:
return {"error": f"Network error: {e}"}
@mcp.tool()
def brevo_delete_contact(email: str) -> dict:
"""
Delete a contact from Brevo by email address.
Args:
email: Email address of the contact to delete
Returns:
Dict with success status or error
"""
client = _get_client()
if isinstance(client, dict):
return client
if not email or "@" not in email:
return {"error": "Invalid email address"}
try:
result = client.delete_contact(email)
if "error" in result:
return result
return {"success": True, "email": email, "status": "deleted"}
except httpx.TimeoutException:
return {"error": "Request timed out"}
except httpx.RequestError as e:
return {"error": f"Network error: {e}"}
@mcp.tool()
def brevo_list_email_campaigns(
status: str = "",
limit: int = 50,
offset: int = 0,
) -> dict:
"""
List email campaigns from Brevo.
Args:
status: Filter by status: 'draft', 'sent', 'queued', 'suspended',
'inProcess', 'archive' (optional)
limit: Number per page (default 50, max 1000)
offset: Pagination offset (default 0)
Returns:
Dict with campaigns list (name, subject, status, stats)
"""
client = _get_client()
if isinstance(client, dict):
return client
try:
result = client.list_email_campaigns(
status=status or None,
limit=max(1, min(limit, 1000)),
offset=offset,
)
if "error" in result:
return result
campaigns = result.get("campaigns", [])
return {
"count": len(campaigns),
"total": result.get("count", len(campaigns)),
"campaigns": [
{
"id": c.get("id"),
"name": c.get("name"),
"subject": c.get("subject"),
"status": c.get("status"),
"type": c.get("type"),
"created_at": c.get("createdAt"),
"scheduled_at": c.get("scheduledAt"),
"statistics": c.get("statistics", {}).get("globalStats", {}),
}
for c in campaigns
],
}
except httpx.TimeoutException:
return {"error": "Request timed out"}
except httpx.RequestError as e:
return {"error": f"Network error: {e}"}
@mcp.tool()
def brevo_get_email_stats(message_id: str) -> dict:
"""

Some files were not shown because too many files have changed in this diff Show More