feat: load table by colony

This commit is contained in:
Richard Tang
2026-04-18 20:55:20 -07:00
parent 7b52ed9fa7
commit 6f1f27b6e9
7 changed files with 160 additions and 81 deletions
+44 -43
View File
@@ -6,19 +6,24 @@ profile panel. Distinct from ``routes_workers.py``, which deals with
*graph nodes* inside a worker definition rather than live worker
instances.
Session-scoped (bound to a live session's runtime):
- GET /api/sessions/{session_id}/workers live + completed workers
- GET /api/sessions/{session_id}/colony/skills colony's shared skills catalog
- GET /api/sessions/{session_id}/colony/tools colony's default tools
- GET /api/sessions/{session_id}/colony/progress/snapshot progress.db tasks/steps snapshot
- GET /api/sessions/{session_id}/colony/progress/stream SSE feed of upserts (polled)
- GET /api/sessions/{session_id}/colony/data/tables list user tables in progress.db
- GET /api/sessions/{session_id}/colony/data/tables/{table}/rows paginated rows
- PATCH /api/sessions/{session_id}/colony/data/tables/{table}/rows edit a row
Colony-scoped (bound to the on-disk colony directory, independent of any
live session one colony has exactly one progress.db):
- GET /api/colonies/{colony_name}/progress/snapshot progress.db tasks/steps snapshot
- GET /api/colonies/{colony_name}/progress/stream SSE feed of upserts (polled)
- GET /api/colonies/{colony_name}/data/tables list user tables in progress.db
- GET /api/colonies/{colony_name}/data/tables/{table}/rows paginated rows
- PATCH /api/colonies/{colony_name}/data/tables/{table}/rows edit a row
"""
import asyncio
import json
import logging
import re
import sqlite3
from pathlib import Path
@@ -26,6 +31,11 @@ from aiohttp import web
from framework.server.app import resolve_session
# Same validation used by create_colony — keep them in sync. Blocks path
# traversal (``..``) and shell-special chars; the endpoint would 400 on
# anything else anyway, but validating early avoids a disk hit.
_COLONY_NAME_RE = re.compile(r"^[a-z0-9_]+$")
logger = logging.getLogger(__name__)
# Poll interval for the progress SSE stream. Progress rows flip on the
@@ -274,14 +284,15 @@ async def handle_list_colony_tools(request: web.Request) -> web.Response:
# ── Progress DB (tasks/steps) ──────────────────────────────────────
def _resolve_progress_db(session) -> Path | None:
"""Resolve the colony's progress.db path for ``session``.
def _resolve_progress_db_by_name(colony_name: str) -> Path | None:
"""Resolve a colony's progress.db path by directory name.
Returns ``None`` if the session is not bound to a colony yet or if
the DB file doesn't exist.
Returns ``None`` when the name fails validation or the file does not
exist. Both conditions render as an empty Data tab in the UI rather
than a hard error so an operator can open the panel before any
workers have actually run.
"""
colony_name = getattr(session, "colony_name", None)
if not colony_name:
if not _COLONY_NAME_RE.match(colony_name):
return None
db_path = Path.home() / ".hive" / "colonies" / colony_name / "data" / "progress.db"
return db_path if db_path.exists() else None
@@ -321,15 +332,12 @@ def _read_progress_snapshot(db_path: Path, worker_id: str | None) -> dict:
async def handle_progress_snapshot(request: web.Request) -> web.Response:
"""GET /api/sessions/{session_id}/colony/progress/snapshot
"""GET /api/colonies/{colony_name}/progress/snapshot
Optional ?worker_id=... to filter to rows touched by a specific worker.
"""
session, err = resolve_session(request)
if err:
return err
db_path = _resolve_progress_db(session)
colony_name = request.match_info["colony_name"]
db_path = _resolve_progress_db_by_name(colony_name)
if db_path is None:
return web.json_response({"tasks": [], "steps": []})
@@ -394,7 +402,7 @@ def _read_progress_upserts(
async def handle_progress_stream(request: web.Request) -> web.StreamResponse:
"""GET /api/sessions/{session_id}/colony/progress/stream
"""GET /api/colonies/{colony_name}/progress/stream
SSE feed that emits ``snapshot`` once (current state) followed by
``upsert`` events whenever a task/step row changes. Polls the DB
@@ -402,10 +410,7 @@ async def handle_progress_stream(request: web.Request) -> web.StreamResponse:
workers use for writes doesn't fire SQLite's update hook on our
connection, so polling is the robust option.
"""
session, err = resolve_session(request)
if err:
return err
colony_name = request.match_info["colony_name"]
worker_id = request.query.get("worker_id") or None
resp = web.StreamResponse(
@@ -423,7 +428,7 @@ async def handle_progress_stream(request: web.Request) -> web.StreamResponse:
payload = f"event: {event}\ndata: {json.dumps(data)}\n\n"
await resp.write(payload.encode("utf-8"))
db_path = _resolve_progress_db(session)
db_path = _resolve_progress_db_by_name(colony_name)
if db_path is None:
await _send("snapshot", {"tasks": [], "steps": []})
await _send("end", {"reason": "no_progress_db"})
@@ -625,11 +630,9 @@ def _update_table_row(
async def handle_list_tables(request: web.Request) -> web.Response:
"""GET /api/sessions/{session_id}/colony/data/tables"""
session, err = resolve_session(request)
if err:
return err
db_path = _resolve_progress_db(session)
"""GET /api/colonies/{colony_name}/data/tables"""
colony_name = request.match_info["colony_name"]
db_path = _resolve_progress_db_by_name(colony_name)
if db_path is None:
return web.json_response({"tables": []})
tables = await asyncio.to_thread(_read_tables_overview, db_path)
@@ -637,11 +640,9 @@ async def handle_list_tables(request: web.Request) -> web.Response:
async def handle_table_rows(request: web.Request) -> web.Response:
"""GET /api/sessions/{session_id}/colony/data/tables/{table}/rows"""
session, err = resolve_session(request)
if err:
return err
db_path = _resolve_progress_db(session)
"""GET /api/colonies/{colony_name}/data/tables/{table}/rows"""
colony_name = request.match_info["colony_name"]
db_path = _resolve_progress_db_by_name(colony_name)
if db_path is None:
return web.json_response({"error": "no progress.db"}, status=404)
@@ -665,14 +666,12 @@ async def handle_table_rows(request: web.Request) -> web.Response:
async def handle_update_row(request: web.Request) -> web.Response:
"""PATCH /api/sessions/{session_id}/colony/data/tables/{table}/rows
"""PATCH /api/colonies/{colony_name}/data/tables/{table}/rows
Body: ``{"pk": {col: value, ...}, "updates": {col: value, ...}}``.
"""
session, err = resolve_session(request)
if err:
return err
db_path = _resolve_progress_db(session)
colony_name = request.match_info["colony_name"]
db_path = _resolve_progress_db_by_name(colony_name)
if db_path is None:
return web.json_response({"error": "no progress.db"}, status=404)
@@ -694,6 +693,7 @@ async def handle_update_row(request: web.Request) -> web.Response:
def register_routes(app: web.Application) -> None:
"""Register colony worker routes."""
# Session-scoped — these read live runtime state from a session.
app.router.add_get("/api/sessions/{session_id}/workers", handle_list_workers)
app.router.add_get(
"/api/sessions/{session_id}/colony/skills", handle_list_colony_skills
@@ -701,22 +701,23 @@ def register_routes(app: web.Application) -> None:
app.router.add_get(
"/api/sessions/{session_id}/colony/tools", handle_list_colony_tools
)
# Colony-scoped — one progress.db per colony, no session indirection.
app.router.add_get(
"/api/sessions/{session_id}/colony/progress/snapshot",
"/api/colonies/{colony_name}/progress/snapshot",
handle_progress_snapshot,
)
app.router.add_get(
"/api/sessions/{session_id}/colony/progress/stream",
"/api/colonies/{colony_name}/progress/stream",
handle_progress_stream,
)
app.router.add_get(
"/api/sessions/{session_id}/colony/data/tables", handle_list_tables
"/api/colonies/{colony_name}/data/tables", handle_list_tables
)
app.router.add_get(
"/api/sessions/{session_id}/colony/data/tables/{table}/rows",
"/api/colonies/{colony_name}/data/tables/{table}/rows",
handle_table_rows,
)
app.router.add_patch(
"/api/sessions/{session_id}/colony/data/tables/{table}/rows",
"/api/colonies/{colony_name}/data/tables/{table}/rows",
handle_update_row,
)
+11 -7
View File
@@ -39,15 +39,19 @@ export interface UpdateRowRequest {
}
export const colonyDataApi = {
/** List user tables in the colony's progress.db with row counts. */
listTables: (sessionId: string) =>
/** List user tables in the colony's progress.db with row counts.
*
* Routed by colony directory name (not session) because progress.db
* is per-colony — one DB serves every session for that colony, and
* the data is reachable even when no session is live. */
listTables: (colonyName: string) =>
api.get<{ tables: TableOverview[] }>(
`/sessions/${sessionId}/colony/data/tables`,
`/colonies/${encodeURIComponent(colonyName)}/data/tables`,
),
/** Paginated rows for a table. Server enforces limit ≤ 500. */
listRows: (
sessionId: string,
colonyName: string,
table: string,
opts: {
limit?: number;
@@ -63,14 +67,14 @@ export const colonyDataApi = {
if (opts.orderDir) params.set("order_dir", opts.orderDir);
const qs = params.toString();
return api.get<TableRowsResponse>(
`/sessions/${sessionId}/colony/data/tables/${encodeURIComponent(table)}/rows${qs ? `?${qs}` : ""}`,
`/colonies/${encodeURIComponent(colonyName)}/data/tables/${encodeURIComponent(table)}/rows${qs ? `?${qs}` : ""}`,
);
},
/** Update a single row by primary key. Returns {updated: 0|1}. */
updateRow: (sessionId: string, table: string, body: UpdateRowRequest) =>
updateRow: (colonyName: string, table: string, body: UpdateRowRequest) =>
api.patch<{ updated: number }>(
`/sessions/${sessionId}/colony/data/tables/${encodeURIComponent(table)}/rows`,
`/colonies/${encodeURIComponent(colonyName)}/data/tables/${encodeURIComponent(table)}/rows`,
body,
),
};
+7 -5
View File
@@ -87,17 +87,19 @@ export const colonyWorkersApi = {
listTools: (sessionId: string) =>
api.get<{ tools: ColonyTool[] }>(`/sessions/${sessionId}/colony/tools`),
/** Snapshot of progress.db tasks + steps, optionally filtered by worker_id. */
progressSnapshot: (sessionId: string, workerId?: string) => {
/** Snapshot of progress.db tasks + steps, optionally filtered by
* worker_id. Routed by colony directory name (not session) because
* progress.db is per-colony. */
progressSnapshot: (colonyName: string, workerId?: string) => {
const qs = workerId ? `?worker_id=${encodeURIComponent(workerId)}` : "";
return api.get<ProgressSnapshot>(
`/sessions/${sessionId}/colony/progress/snapshot${qs}`,
`/colonies/${encodeURIComponent(colonyName)}/progress/snapshot${qs}`,
);
},
/** Build the URL for the live progress SSE stream. */
progressStreamUrl: (sessionId: string, workerId?: string): string => {
progressStreamUrl: (colonyName: string, workerId?: string): string => {
const qs = workerId ? `?worker_id=${encodeURIComponent(workerId)}` : "";
return `/api/sessions/${sessionId}/colony/progress/stream${qs}`;
return `/api/colonies/${encodeURIComponent(colonyName)}/progress/stream${qs}`;
},
};
@@ -39,6 +39,11 @@ import { DataGrid, type SortDir } from "@/components/data-grid";
interface ColonyWorkersPanelProps {
sessionId: string;
/** Colony directory name (e.g. ``linkedin_honeycomb_messaging``) for
* the colony-scoped progress + data endpoints. ``null`` when the
* attached session isn't bound to a colony — those tabs render
* empty rather than fire requests with an invalid name. */
colonyName: string | null;
onClose: () => void;
}
@@ -81,6 +86,7 @@ function fmtIso(ts: string | null | undefined): string {
export default function ColonyWorkersPanel({
sessionId,
colonyName,
onClose,
}: ColonyWorkersPanelProps) {
const [tab, setTab] = useState<TabKey>("sessions");
@@ -153,11 +159,13 @@ export default function ColonyWorkersPanel({
</div>
<div className="flex-1 overflow-y-auto">
{tab === "sessions" && <SessionsTab sessionId={sessionId} />}
{tab === "sessions" && (
<SessionsTab sessionId={sessionId} colonyName={colonyName} />
)}
{tab === "triggers" && <TriggersTab sessionId={sessionId} />}
{tab === "skills" && <SkillsTab sessionId={sessionId} />}
{tab === "tools" && <ToolsTab sessionId={sessionId} />}
{tab === "data" && <DataTab sessionId={sessionId} />}
{tab === "data" && <DataTab colonyName={colonyName} />}
</div>
</aside>
);
@@ -468,7 +476,13 @@ function ToolGroup({ label, items }: { label: string; items: ColonyTool[] }) {
// ── Sessions tab ───────────────────────────────────────────────────────
function SessionsTab({ sessionId }: { sessionId: string }) {
function SessionsTab({
sessionId,
colonyName,
}: {
sessionId: string;
colonyName: string | null;
}) {
const [workers, setWorkers] = useState<WorkerSummary[]>([]);
const [loading, setLoading] = useState(true);
const [error, setError] = useState<string | null>(null);
@@ -564,7 +578,7 @@ function SessionsTab({ sessionId }: { sessionId: string }) {
if (selected) {
return (
<WorkerDetail
sessionId={sessionId}
colonyName={colonyName}
worker={selectedWorker}
workerId={selected}
onBack={() => setSelected(null)}
@@ -1000,7 +1014,7 @@ function Section({ label, children }: { label: string; children: React.ReactNode
* if the count lags the live data by a few seconds. */
const TABLES_POLL_MS = 5000;
function DataTab({ sessionId }: { sessionId: string }) {
function DataTab({ colonyName }: { colonyName: string | null }) {
const [tables, setTables] = useState<TableOverview[]>([]);
const [selected, setSelected] = useState<string | null>(null);
const [loadingTables, setLoadingTables] = useState(true);
@@ -1008,12 +1022,17 @@ function DataTab({ sessionId }: { sessionId: string }) {
const refreshTables = useCallback(
(opts: { silent?: boolean } = {}) => {
if (!colonyName) {
setTables([]);
setLoadingTables(false);
return Promise.resolve();
}
if (!opts.silent) {
setLoadingTables(true);
setTablesError(null);
}
return colonyDataApi
.listTables(sessionId)
.listTables(colonyName)
.then((r) => {
setTables(r.tables);
// Auto-select the first table when none chosen yet so the user
@@ -1030,7 +1049,7 @@ function DataTab({ sessionId }: { sessionId: string }) {
if (!opts.silent) setLoadingTables(false);
});
},
[sessionId],
[colonyName],
);
useEffect(() => {
@@ -1048,6 +1067,14 @@ function DataTab({ sessionId }: { sessionId: string }) {
return () => clearInterval(id);
}, [refreshTables]);
if (!colonyName) {
return (
<p className="text-xs text-muted-foreground text-center py-8 px-4">
This session isn't bound to a colony yet — no progress.db to view.
</p>
);
}
return (
<div className="px-4 py-3">
{tablesError && (
@@ -1097,7 +1124,7 @@ function DataTab({ sessionId }: { sessionId: string }) {
{selected && (
<TableView
key={selected}
sessionId={sessionId}
colonyName={colonyName}
table={selected}
onAnyEdit={() => {
// Row counts can change via cascading triggers or NULL→value
@@ -1161,11 +1188,11 @@ function mergeRowsByPk(
}
function TableView({
sessionId,
colonyName,
table,
onAnyEdit,
}: {
sessionId: string;
colonyName: string;
table: string;
onAnyEdit: () => void;
}) {
@@ -1191,7 +1218,7 @@ function TableView({
setError(null);
}
colonyDataApi
.listRows(sessionId, table, {
.listRows(colonyName, table, {
limit: DATA_PAGE_SIZE,
offset,
orderBy,
@@ -1214,7 +1241,7 @@ function TableView({
if (!opts.silent && myId === reqIdRef.current) setLoading(false);
});
},
[sessionId, table, offset, orderBy, orderDir],
[colonyName, table, offset, orderBy, orderDir],
);
// Initial + on-parameter-change load (user-initiated, shows spinner).
@@ -1248,7 +1275,7 @@ function TableView({
const handleEdit = useCallback(
async (pk: Record<string, CellValue>, column: string, newValue: CellValue) => {
await colonyDataApi.updateRow(sessionId, table, {
await colonyDataApi.updateRow(colonyName, table, {
pk,
updates: { [column]: newValue },
});
@@ -1268,7 +1295,7 @@ function TableView({
});
onAnyEdit();
},
[sessionId, table, onAnyEdit],
[colonyName, table, onAnyEdit],
);
if (error) {
@@ -1340,12 +1367,12 @@ function TableView({
// ── Worker detail view (inside Sessions tab) ───────────────────────────
function WorkerDetail({
sessionId,
colonyName,
worker,
workerId,
onBack,
}: {
sessionId: string;
colonyName: string | null;
worker: WorkerSummary | null | undefined;
workerId: string;
onBack: () => void;
@@ -1405,20 +1432,20 @@ function WorkerDetail({
{isHistorical ? (
<HistoricalWorkerPlaceholder workerId={workerId} />
) : (
<LiveWorkerProgress sessionId={sessionId} workerId={workerId} />
<LiveWorkerProgress colonyName={colonyName} workerId={workerId} />
)}
</div>
);
}
function LiveWorkerProgress({
sessionId,
colonyName,
workerId,
}: {
sessionId: string;
colonyName: string | null;
workerId: string;
}) {
const { snapshot, streamState, error } = useProgressStream(sessionId, workerId);
const { snapshot, streamState, error } = useProgressStream(colonyName, workerId);
return (
<>
<div className="flex items-center justify-between mb-1.5">
@@ -1554,7 +1581,7 @@ function ProgressView({ snapshot }: { snapshot: ProgressSnapshot }) {
// ── Hook: live progress via SSE ────────────────────────────────────────
function useProgressStream(sessionId: string, workerId: string) {
function useProgressStream(colonyName: string | null, workerId: string) {
const [snapshot, setSnapshot] = useState<ProgressSnapshot>({ tasks: [], steps: [] });
const [streamState, setStreamState] = useState<"connecting" | "open" | "closed" | "error">(
"connecting",
@@ -1566,7 +1593,14 @@ function useProgressStream(sessionId: string, workerId: string) {
setError(null);
setStreamState("connecting");
const url = colonyWorkersApi.progressStreamUrl(sessionId, workerId);
// Skip the SSE connection entirely if the session isn't bound to a
// colony — we'd just hit a 400 on every reconnect attempt.
if (!colonyName) {
setStreamState("closed");
return;
}
const url = colonyWorkersApi.progressStreamUrl(colonyName, workerId);
const es = new EventSource(url);
es.addEventListener("open", () => setStreamState("open"));
@@ -1609,7 +1643,7 @@ function useProgressStream(sessionId: string, workerId: string) {
es.close();
setStreamState("closed");
};
}, [sessionId, workerId]);
}, [colonyName, workerId]);
return { snapshot, streamState, error };
}
@@ -15,6 +15,16 @@ interface ColonyWorkersContextValue {
sessionId: string | null;
setSessionId: (sessionId: string | null) => void;
/** The colony directory name (e.g. ``linkedin_honeycomb_messaging``)
* the panel is attached to. Comes from ``LiveSession.colony_id`` —
* legacy naming, but it's the on-disk directory under
* ``~/.hive/colonies/`` and the URL segment for the colony-scoped
* endpoints (progress + data). Required separately from sessionId
* because the URL slug is mangled by ``slugToColonyId`` and can't
* be reverse-derived. */
colonyName: string | null;
setColonyName: (colonyName: string | null) => void;
/** User dismissal: flipped by the panel's close button. Reset when
* sessionId changes (so the panel re-opens on the next colony visit
* / tab-switch) or when the header toggle re-requests it. */
@@ -36,6 +46,7 @@ const ColonyWorkersContext = createContext<ColonyWorkersContextValue | null>(nul
export function ColonyWorkersProvider({ children }: { children: ReactNode }) {
const [sessionId, setSessionIdState] = useState<string | null>(null);
const [colonyName, setColonyName] = useState<string | null>(null);
const [dismissed, setDismissed] = useState(false);
const [triggers, setTriggers] = useState<GraphNode[]>([]);
@@ -58,6 +69,8 @@ export function ColonyWorkersProvider({ children }: { children: ReactNode }) {
value={{
sessionId,
setSessionId,
colonyName,
setColonyName,
dismissed,
toggleColonyWorkers,
triggers,
+3 -1
View File
@@ -62,7 +62,8 @@ function LayoutShell({
onOpenQueenProfile: (queenId: string) => void;
colonies: ReturnType<typeof useColony>["colonies"];
}) {
const { sessionId, dismissed, toggleColonyWorkers } = useColonyWorkers();
const { sessionId, colonyName, dismissed, toggleColonyWorkers } =
useColonyWorkers();
const showWorkersPanel = Boolean(sessionId && !dismissed);
return (
@@ -84,6 +85,7 @@ function LayoutShell({
{showWorkersPanel && sessionId && (
<ColonyWorkersPanel
sessionId={sessionId}
colonyName={colonyName}
onClose={toggleColonyWorkers}
/>
)}
+25 -2
View File
@@ -125,6 +125,13 @@ async function restoreSessionMessages(
interface AgentState {
sessionId: string | null;
/** Colony directory name (e.g. ``linkedin_honeycomb_messaging``) —
* the value used for the colony-scoped progress + data endpoints.
* Comes from ``LiveSession.colony_id`` (the legacy field name; it's
* the on-disk directory under ``~/.hive/colonies/``). Distinct from
* the URL's ``colonyId`` route param, which is a display-mangled
* slug. Null for queen-DM sessions not bound to a colony. */
colonyDirName: string | null;
loading: boolean;
ready: boolean;
queenReady: boolean;
@@ -163,6 +170,7 @@ interface AgentState {
function defaultAgentState(): AgentState {
return {
sessionId: null,
colonyDirName: null,
loading: true,
ready: false,
queenReady: false,
@@ -524,6 +532,7 @@ export default function ColonyChat() {
updateState({
sessionId: session.session_id,
colonyDirName: session.colony_id,
displayName,
queenPhase: initialPhase,
queenSupportsImages: session.queen_supports_images !== false,
@@ -1270,8 +1279,11 @@ export default function ColonyChat() {
// Mirror live triggers into the shared context so the tabbed
// ColonyWorkersPanel (rendered at the layout level) can render the
// Triggers tab without having to re-subscribe to the session SSE.
const { setTriggers: setCtxTriggers, setSessionId: setCtxSessionId } =
useColonyWorkers();
const {
setTriggers: setCtxTriggers,
setSessionId: setCtxSessionId,
setColonyName: setCtxColonyName,
} = useColonyWorkers();
useEffect(() => {
setCtxTriggers(triggers);
return () => setCtxTriggers([]);
@@ -1286,6 +1298,17 @@ export default function ColonyChat() {
return () => setCtxSessionId(null);
}, [agentState.sessionId, setCtxSessionId]);
// Publish the colony directory name (e.g. ``linkedin_honeycomb_messaging``)
// alongside the session id. The panel's progress + data tabs route by
// colony name, not session — one progress.db per colony, independent
// of which session is open. Comes from ``LiveSession.colony_id`` (the
// on-disk directory) rather than the URL slug, which is mangled by
// ``slugToColonyId``.
useEffect(() => {
setCtxColonyName(agentState.colonyDirName ?? null);
return () => setCtxColonyName(null);
}, [agentState.colonyDirName, setCtxColonyName]);
// ── Render ─────────────────────────────────────────────────────────────
if (!colony && !isNewChat && !agentState.loading) {