phase(5d): WS chat handlers + process->socket bridge

- protocol: chat.send / chat.cancel + stream.start/text/thinking/
  tool_use/tool_result/usage/message/cancelled/error envelopes
- sessions repository: insertMessage, setHarnessSessionId,
  updateTurnConfig, getOwnerId
- process-bridge: subscribe a CliProcess to the WS pool, broadcast
  every event to focused/watching sockets, persist assistant message
  + flip status idle on result, persist partial on cancelled, free
  the slot on terminal events
- chat-handlers: chat.send mints harness session id on first turn,
  persists user message, kicks off the engine, attaches the bridge,
  detects --resume vs first-turn from message history; chat.cancel
  signals the live process
- bootstrap: build process manager before WS module so chat handlers
  register; bridge live restored processes to their owners' pool;
  persist completed-while-down processes via persistDeadSnapshot
This commit is contained in:
Eliot M 2026-04-26 17:35:31 +00:00
parent fb88002176
commit 5066280ed3
6 changed files with 590 additions and 10 deletions

View file

@ -18,6 +18,10 @@ import { createMailer } from './lib/mailer.js';
import { migrateInvites } from './modules/invites/index.js';
import { createSessionsModule, migrateSessions } from './modules/sessions/index.js';
import { createProcessManager } from './modules/sessions/process/index.js';
import {
attachProcessBridge,
persistDeadSnapshot,
} from './modules/sessions/process-bridge.js';
import { createApp } from './app.js';
import { attachWsServer } from './infrastructure/ws/server.js';
import { registerShutdown } from './infrastructure/shutdown.js';
@ -39,20 +43,49 @@ async function main(): Promise<void> {
const server = http.createServer(app);
const ws = attachWsServer({ server, logger, auth });
// ── AI process engine (Phase 5c/5d) ──────────────────────────
const processes = createProcessManager({ config, logger });
// Register feature-module WS handlers against the shared registry.
// The sessions module is built a second time here (REST builds its own
// inside `createApp`) — both share the DB so behaviour is identical;
// service+repo are stateless.
const sessionsForWs = createSessionsModule({ config, logger, db, auth, mailer });
// service+repo are stateless. Passing `processes` here is what
// activates the chat handlers on the WS surface.
const sessionsForWs = createSessionsModule({
config,
logger,
db,
auth,
mailer,
processes,
});
sessionsForWs.registerWs(ws.registry);
// ── AI process engine (Phase 5c) ──────────────────────────────
// Build the manager + immediately try to re-attach to any harness
// processes that survived the previous shutdown. The 5d WS layer will
// wire fresh sockets to the restored processes; for now this just
// proves restart-survival end to end.
const processes = createProcessManager({ config, logger });
// Restart-survival: re-attach to any harness processes that survived
// the previous shutdown, and persist any that finished while we
// were down so the next page load shows the assistant reply.
const restored = processes.restoreState();
for (const proc of restored.restored) {
const userId = sessionsForWs.repo.getOwnerId(proc.sessionId);
if (!userId) {
logger.warn({ sessionId: proc.sessionId }, 'restored process has no owner — skipping bridge');
continue;
}
attachProcessBridge(proc, {
pool: ws.pool,
repo: sessionsForWs.repo,
processes,
logger,
userId,
});
}
for (const snap of restored.dead) {
persistDeadSnapshot(snap, {
pool: ws.pool,
repo: sessionsForWs.repo,
logger,
});
}
if (restored.restored.length || restored.dead.length) {
logger.info(
{ alive: restored.restored.length, completed: restored.dead.length },

View file

@ -0,0 +1,142 @@
/**
* Chat WS handlers Phase 5d.
*
* Two handlers ride alongside the session.* registry:
*
* chat.send Persists the user turn, kicks off the harness process
* via the ProcessManager, bridges its events back into
* the WS pool. The first turn of a session generates
* and persists the harness session id; subsequent turns
* resume that conversation.
*
* chat.cancel Owner-checks then signals the live process. The
* resulting `cancelled` event is fanned out by the
* bridge no broadcast happens here.
*
* Both return a small ack envelope so callers can correlate the
* request response. Stream events flow as their own envelopes
* (`stream.text`, `stream.message`, ) over the same socket.
*/
import { randomUUID } from 'node:crypto';
import {
ChatCancelRequestSchema,
ChatSendRequestSchema,
WsType,
type StreamMessageEvent,
} from '@cial/protocol';
import { ValidationError } from '../../infrastructure/errors.js';
import type { WsHandlerRegistry } from '../../infrastructure/ws/handler-registry.js';
import { attachProcessBridge } from './process-bridge.js';
import type { ProcessManager } from './process/process-manager.js';
import type { SessionsRepository } from './repository.js';
import type { SessionsService } from './service.js';
export interface ChatHandlerDeps {
service: SessionsService;
repo: SessionsRepository;
processes: ProcessManager;
}
export function registerChatWsHandlers(
registry: WsHandlerRegistry,
deps: ChatHandlerDeps,
): void {
// ── chat.send ────────────────────────────────────────────────────────
registry.register(WsType.ChatSend, async (payload, ctx) => {
const parsed = ChatSendRequestSchema.safeParse(payload);
if (!parsed.success) {
throw new ValidationError('invalid chat.send payload', parsed.error.flatten());
}
const { sessionId, message, model: modelOverride, effort: effortOverride } = parsed.data;
// Owner-check (throws NotFoundError if the user doesn't own it).
let session = deps.service.get(ctx.userId, sessionId);
// First turn: mint a harness session id so subsequent turns can
// `--resume` the same conversational context.
let harnessSessionId = session.harnessSessionId;
if (!harnessSessionId) {
harnessSessionId = randomUUID();
deps.repo.setHarnessSessionId(sessionId, harnessSessionId);
}
// Apply per-turn overrides to the persisted row so the sidebar
// reflects the active config.
const nextModel = modelOverride ?? session.model;
const nextEffort = effortOverride ?? session.effort;
if (nextModel !== session.model || nextEffort !== session.effort) {
deps.repo.updateTurnConfig(
sessionId,
nextModel !== session.model ? nextModel : null,
nextEffort !== session.effort ? nextEffort : null,
);
}
// Persist the user turn first — every observer (this tab + any
// other tabs of the same user) sees the user message before
// streaming begins, even if the harness fails to spawn.
const userMessage = deps.repo.insertMessage({
sessionId,
role: 'user',
content: message,
});
const userPayload: StreamMessageEvent = { sessionId, message: userMessage };
ctx.pool.broadcastToSession(ctx.userId, sessionId, {
type: WsType.StreamMessage,
payload: userPayload,
});
// Re-fetch in case mutations bumped row state we care about.
session = deps.service.get(ctx.userId, sessionId);
// Boot / reuse the engine for this session.
const proc = deps.processes.getOrCreate({
sessionId,
harnessSessionId,
provider: session.provider,
model: nextModel,
cwd: session.cwd,
effort: nextEffort || undefined,
});
attachProcessBridge(proc, {
pool: ctx.pool,
repo: deps.repo,
processes: deps.processes,
logger: ctx.logger,
userId: ctx.userId,
});
deps.repo.setStatus(sessionId, 'busy');
// First turn vs resume: any session that already has messages
// beyond the one we just inserted has been spoken to before, so
// the harness must `--resume` its prior context. The provider's
// `hasLocalState` check is the final guard against a missing
// CLI state directory.
const prior = deps.repo.listMessages({ userId: ctx.userId, sessionId, limit: 2 });
const isResumeTurn = prior.messages.length > 1;
// Fire the turn. Errors here surface as `stream.error` via the
// bridge's `error`/`close` listeners; no need to await.
proc.start(message, isResumeTurn).catch((err) => {
ctx.logger.error({ err, sessionId }, 'chat.send: harness start failed');
proc.emit('error', err instanceof Error ? err : new Error(String(err)));
});
return { ok: true, harnessSessionId };
});
// ── chat.cancel ──────────────────────────────────────────────────────
registry.register(WsType.ChatCancel, (payload, ctx) => {
const parsed = ChatCancelRequestSchema.safeParse(payload);
if (!parsed.success) {
throw new ValidationError('invalid chat.cancel payload', parsed.error.flatten());
}
// Owner-check — throws if not theirs.
deps.service.get(ctx.userId, parsed.data.sessionId);
deps.processes.cancel(parsed.data.sessionId);
return { ok: true };
});
}

View file

@ -21,26 +21,44 @@ import { fromNodeHeaders } from 'better-auth/node';
import type { AppDeps } from '../../infrastructure/types.js';
import { UnauthorizedError } from '../../infrastructure/errors.js';
import type { WsHandlerRegistry } from '../../infrastructure/ws/handler-registry.js';
import { registerChatWsHandlers } from './chat-handlers.js';
import type { ProcessManager } from './process/process-manager.js';
import { SessionsRepository } from './repository.js';
import { SessionsService } from './service.js';
import { registerSessionWsHandlers } from './ws-handlers.js';
export { migrateSessions } from './migrate.js';
export interface SessionsModuleDeps extends AppDeps {
/** Optional. When provided, `registerWs` also wires the chat handlers
* so the WS surface includes `chat.send` / `chat.cancel`. The REST
* module instance built inside `createApp` doesn't need this. */
processes?: ProcessManager;
}
/**
* Sessions module factory returns the REST router and a WS-handler
* registrar so REST + WS share the same `SessionsService` instance
* (same ownership checks, same default values, single source of truth).
*/
export function createSessionsModule(deps: AppDeps): {
export function createSessionsModule(deps: SessionsModuleDeps): {
router: Router;
service: SessionsService;
repo: SessionsRepository;
registerWs(registry: WsHandlerRegistry): void;
} {
const repo = new SessionsRepository(deps.db);
const service = new SessionsService(repo);
return {
router: createRouter(deps, service),
registerWs: (registry) => registerSessionWsHandlers(registry, service),
service,
repo,
registerWs: (registry) => {
registerSessionWsHandlers(registry, service);
if (deps.processes) {
registerChatWsHandlers(registry, { service, repo, processes: deps.processes });
}
},
};
}

View file

@ -0,0 +1,228 @@
/**
* Process WS bridge Phase 5d.
*
* Subscribes to a `CliProcess` and fans its events out to every socket
* of the owning user that's focused on or watching this session
* (`pool.broadcastToSession`). On terminal events (`result`, `cancelled`,
* `close` with an error) it persists the assistant message + flips the
* session status, then drops the process from the manager so the slot
* is free for the next turn.
*
* Each `CliProcess` is bridged exactly once. We track that via a
* module-level `WeakSet`; chat.send and the bootstrap restore both call
* `attachProcessBridge`, the second call is a no-op.
*/
import type { Logger } from 'pino';
import {
WsType,
type Message,
type MessageMetadata,
type StreamCancelledEvent,
type StreamErrorEvent,
type StreamMessageEvent,
type StreamStartEvent,
type StreamTextEvent,
type StreamThinkingEvent,
type StreamToolResultEvent,
type StreamToolUseEvent,
type StreamUsageEvent,
} from '@cial/protocol';
import type { WsClientPool } from '../../infrastructure/ws/clients.js';
import type { ProcessManager } from './process/process-manager.js';
import type { CliProcess } from './process/cli-process.js';
import type { ProcessSnapshot } from './process/types.js';
import type { SessionsRepository } from './repository.js';
const bridged = new WeakSet<CliProcess>();
export interface BridgeDeps {
pool: WsClientPool;
repo: SessionsRepository;
processes: ProcessManager;
logger: Logger;
/** Owning user — used so broadcasts only reach that user's sockets. */
userId: string;
}
export function attachProcessBridge(proc: CliProcess, deps: BridgeDeps): void {
if (bridged.has(proc)) return;
bridged.add(proc);
const log = deps.logger.child({ component: 'process-bridge', sessionId: proc.sessionId });
const sessionId = proc.sessionId;
const userId = deps.userId;
// ── streaming events ────────────────────────────────────────────
proc.on('init', (init) => {
const payload: StreamStartEvent = {
sessionId,
model: init.model ?? proc.model ?? null,
harnessSessionId: proc.harnessSessionId,
};
deps.pool.broadcastToSession(userId, sessionId, {
type: WsType.StreamStart,
payload,
});
});
proc.on('text', (e) => {
const payload: StreamTextEvent = { sessionId, text: e.text, partial: e.partial };
deps.pool.broadcastToSession(userId, sessionId, { type: WsType.StreamText, payload });
});
proc.on('thinking', (e) => {
const payload: StreamThinkingEvent = {
sessionId,
thinking: e.thinking,
partial: e.partial,
};
deps.pool.broadcastToSession(userId, sessionId, { type: WsType.StreamThinking, payload });
});
proc.on('tool_use', (tool) => {
const payload: StreamToolUseEvent = { sessionId, tool };
deps.pool.broadcastToSession(userId, sessionId, { type: WsType.StreamToolUse, payload });
});
proc.on('tool_result', (r) => {
const payload: StreamToolResultEvent = {
sessionId,
toolUseId: r.toolUseId,
output: r.output,
durationMs: r.durationMs,
};
deps.pool.broadcastToSession(userId, sessionId, {
type: WsType.StreamToolResult,
payload,
});
});
proc.on('usage', (usage) => {
const payload: StreamUsageEvent = { sessionId, usage };
deps.pool.broadcastToSession(userId, sessionId, { type: WsType.StreamUsage, payload });
});
// ── terminal events ─────────────────────────────────────────────
proc.on('result', (r) => {
const metadata: MessageMetadata = {
tools: r.tools,
stats: r.stats,
thinking: r.thinking,
turnHistory: r.turnHistory,
};
persistAssistantMessage(deps, sessionId, userId, r.text, metadata, log);
finalize(deps, proc, 'idle');
});
proc.on('cancelled', (c) => {
// Persist whatever the model managed to emit so users see partial
// output instead of a vanished turn.
if (c.text) {
const metadata: MessageMetadata = {
tools: c.tools,
stats: c.stats,
thinking: c.thinking,
turnHistory: c.turnHistory,
};
persistAssistantMessage(deps, sessionId, userId, c.text, metadata, log);
}
const payload: StreamCancelledEvent = { sessionId };
deps.pool.broadcastToSession(userId, sessionId, { type: WsType.StreamCancelled, payload });
finalize(deps, proc, 'idle');
});
proc.on('error', (err) => {
const payload: StreamErrorEvent = {
sessionId,
code: 'HARNESS_ERROR',
message: err.message,
};
deps.pool.broadcastToSession(userId, sessionId, { type: WsType.StreamError, payload });
finalize(deps, proc, 'error');
});
proc.on('close', (code, reason) => {
// `result` already fired and finalized — nothing to do.
if (proc.resultEmitted) return;
if (code === 0) {
// Normal exit without a `result` line — treat as empty response.
finalize(deps, proc, 'idle');
return;
}
const payload: StreamErrorEvent = {
sessionId,
code: 'HARNESS_EXIT',
message: `harness exited with code ${code} (${reason})`,
};
deps.pool.broadcastToSession(userId, sessionId, { type: WsType.StreamError, payload });
finalize(deps, proc, 'error');
});
}
/** Replays a snapshot of a process that completed during downtime: the
* CliProcess is gone, but `drainDead` has populated the snapshot with
* whatever the harness wrote before the server died. We just need to
* persist + broadcast as if `result` had fired live. */
export function persistDeadSnapshot(
snap: ProcessSnapshot,
deps: Omit<BridgeDeps, 'userId' | 'processes'>,
): void {
if (!snap.resultAlreadySaved && !snap.accumulatedText) return;
const userId = deps.repo.getOwnerId(snap.sessionId);
if (!userId) return;
const log = deps.logger.child({
component: 'process-bridge',
sessionId: snap.sessionId,
mode: 'restored-dead',
});
const metadata: MessageMetadata = {
tools: snap.collectedTools,
stats: snap.turnStats,
thinking: snap.accumulatedThinking,
turnHistory: [],
};
persistAssistantMessage(deps, snap.sessionId, userId, snap.accumulatedText, metadata, log);
deps.repo.setStatus(snap.sessionId, 'idle');
deps.repo.saveStreamingState(snap.sessionId, null);
}
// ───────────────────────── helpers ─────────────────────────
function persistAssistantMessage(
deps: { pool: WsClientPool; repo: SessionsRepository },
sessionId: string,
userId: string,
text: string,
metadata: MessageMetadata,
log: Logger,
): void {
let row: Message;
try {
row = deps.repo.insertMessage({
sessionId,
role: 'assistant',
content: text,
metadata,
});
} catch (err) {
log.error({ err }, 'failed to persist assistant message');
return;
}
const payload: StreamMessageEvent = { sessionId, message: row };
deps.pool.broadcastToSession(userId, sessionId, { type: WsType.StreamMessage, payload });
}
function finalize(
deps: { repo: SessionsRepository; processes: ProcessManager },
proc: CliProcess,
status: 'idle' | 'error',
): void {
try {
deps.repo.setStatus(proc.sessionId, status);
deps.repo.saveStreamingState(proc.sessionId, null);
} catch {
/* best-effort */
}
deps.processes.remove(proc.sessionId);
}

View file

@ -236,6 +236,78 @@ export class SessionsRepository {
return { messages, hasMore };
}
/** Append a single message. Used by chat.send to persist the user
* turn, and by the process bridge to persist the assistant reply
* on `result`. Returns the inserted row (with autoincrement id). */
insertMessage(opts: {
sessionId: string;
role: Message['role'];
content: string;
metadata?: MessageMetadata | null;
}): Message {
const now = Date.now();
const result = this.db
.prepare(
`INSERT INTO chat_message (session_id, role, content, metadata, created_at)
VALUES (?, ?, ?, ?, ?)`,
)
.run(
opts.sessionId,
opts.role,
opts.content,
opts.metadata ? JSON.stringify(opts.metadata) : null,
now,
);
// Bump the parent session so list ordering reflects activity.
this.db
.prepare('UPDATE chat_session SET updated_at = ? WHERE id = ?')
.run(now, opts.sessionId);
const row = this.db
.prepare('SELECT * FROM chat_message WHERE id = ?')
.get(result.lastInsertRowid) as MessageRow;
return rowToMessage(row);
}
/** Persist the harness session id on the first turn so subsequent
* turns can `--resume` the same conversational context. */
setHarnessSessionId(sessionId: string, harnessSessionId: string): void {
this.db
.prepare(
'UPDATE chat_session SET harness_session_id = ?, updated_at = ? WHERE id = ?',
)
.run(harnessSessionId, Date.now(), sessionId);
}
/** Mutate per-turn knobs (model + effort) when the chat handler
* receives overrides. Used so the row reflects the active config
* the next time the sidebar lists this session. */
updateTurnConfig(sessionId: string, model: string | null, effort: string | null): void {
if (model === null && effort === null) return;
const sets: string[] = [];
const vals: unknown[] = [];
if (model !== null) {
sets.push('model = ?');
vals.push(model);
}
if (effort !== null) {
sets.push('effort = ?');
vals.push(effort);
}
sets.push('updated_at = ?');
vals.push(Date.now());
vals.push(sessionId);
this.db.prepare(`UPDATE chat_session SET ${sets.join(', ')} WHERE id = ?`).run(...vals);
}
/** Owner-less lookup used by the bootstrap process bridge given a
* sessionId we need the user_id to broadcast through the WS pool. */
getOwnerId(sessionId: string): string | null {
const row = this.db
.prepare('SELECT user_id FROM chat_session WHERE id = ?')
.get(sessionId) as { user_id: string } | undefined;
return row?.user_id ?? null;
}
// ── streaming-state helpers (used by 5d, but already typed-out here) ────
saveStreamingState(sessionId: string, state: StreamingState | null): void {

View file

@ -21,6 +21,9 @@ import {
RenameSessionRequestSchema,
ListMessagesQuerySchema,
MessageSchema,
ToolSchema,
TurnUsageSchema,
SessionEffortSchema,
} from './sessions.js';
// ───────────────────────── envelope ─────────────────────────
@ -95,6 +98,79 @@ export const SessionHistoryEventSchema = z.object({
});
export type SessionHistoryEvent = z.infer<typeof SessionHistoryEventSchema>;
// ───────────────────────── chat (Phase 5d) ─────────────────────────
/** Send a user turn into a session. The back persists the user message
* immediately, spawns / resumes the harness process, and streams
* assistant tokens back as `stream.*` events. */
export const ChatSendRequestSchema = z.object({
sessionId: z.string(),
message: z.string().min(1),
/** Optional per-turn overrides. The session row is also mutated. */
model: z.string().optional(),
effort: SessionEffortSchema.optional(),
});
export type ChatSendRequest = z.infer<typeof ChatSendRequestSchema>;
export const ChatCancelRequestSchema = z.object({ sessionId: z.string() });
export type ChatCancelRequest = z.infer<typeof ChatCancelRequestSchema>;
/** Common envelope for every stream.* event. */
export const StreamEnvelopeSchema = z.object({ sessionId: z.string() });
export const StreamStartEventSchema = StreamEnvelopeSchema.extend({
model: z.string().nullable(),
harnessSessionId: z.string(),
});
export type StreamStartEvent = z.infer<typeof StreamStartEventSchema>;
export const StreamTextEventSchema = StreamEnvelopeSchema.extend({
text: z.string(),
partial: z.boolean(),
});
export type StreamTextEvent = z.infer<typeof StreamTextEventSchema>;
export const StreamThinkingEventSchema = StreamEnvelopeSchema.extend({
thinking: z.string(),
partial: z.boolean(),
});
export type StreamThinkingEvent = z.infer<typeof StreamThinkingEventSchema>;
export const StreamToolUseEventSchema = StreamEnvelopeSchema.extend({
tool: ToolSchema,
});
export type StreamToolUseEvent = z.infer<typeof StreamToolUseEventSchema>;
export const StreamToolResultEventSchema = StreamEnvelopeSchema.extend({
toolUseId: z.string(),
output: z.string(),
durationMs: z.number().nullable(),
});
export type StreamToolResultEvent = z.infer<typeof StreamToolResultEventSchema>;
export const StreamUsageEventSchema = StreamEnvelopeSchema.extend({
usage: TurnUsageSchema,
});
export type StreamUsageEvent = z.infer<typeof StreamUsageEventSchema>;
/** Fired when an assistant message has been persisted. Carries the full
* Message row so clients can replace any partial-streaming state with
* the canonical record. Also used for the user-side echo so other tabs
* see the turn appear. */
export const StreamMessageEventSchema = StreamEnvelopeSchema.extend({
message: MessageSchema,
});
export type StreamMessageEvent = z.infer<typeof StreamMessageEventSchema>;
export const StreamCancelledEventSchema = StreamEnvelopeSchema;
export type StreamCancelledEvent = z.infer<typeof StreamCancelledEventSchema>;
export const StreamErrorEventSchema = StreamEnvelopeSchema.extend({
code: z.string(),
message: z.string(),
});
export type StreamErrorEvent = z.infer<typeof StreamErrorEventSchema>;
// ───────────────────────── canonical type strings ─────────────────────────
// Single source of truth — keeps the back router and SDK in lock-step.
@ -108,10 +184,21 @@ export const WsType = {
SessionWatch: 'session.watch',
SessionUnwatch: 'session.unwatch',
SessionHistory: 'session.history',
ChatSend: 'chat.send',
ChatCancel: 'chat.cancel',
// outbound
SessionCreated: 'session.created',
SessionUpdated: 'session.updated',
SessionDeleted: 'session.deleted',
StreamStart: 'stream.start',
StreamText: 'stream.text',
StreamThinking: 'stream.thinking',
StreamToolUse: 'stream.tool_use',
StreamToolResult: 'stream.tool_result',
StreamUsage: 'stream.usage',
StreamMessage: 'stream.message',
StreamCancelled: 'stream.cancelled',
StreamError: 'stream.error',
// shared
Error: 'error',
} as const;