phase(6b): deploy controller — build, restart, log streaming, mode toggle

- protocol: Zod schemas for deploy REST DTOs and WS events
- back/deploy module: migrate (deploy + tenant_settings), repository,
  supervisor-client (UDS JSONL), single-flight BuildRunner with cancel,
  DeployService bridging runner events to WS broadcasts
- REST endpoints under /deploy (instance_admin gated): start, restart-only,
  cancel, mode get/patch, get/list
- WS handshake handlers for DeployWatch/DeployUnwatch
- bootstrap: build deploy module once with ws.pool, inject router into
  createApp via AppRouters; attach request listener after wiring
- config: monorepoRoot, supervisorSocket, deployLogDir, platformRoot
This commit is contained in:
Eliot M 2026-04-27 00:25:18 +00:00
parent ed0622cd38
commit b0490e713b
11 changed files with 1208 additions and 16 deletions

View file

@ -5,7 +5,7 @@
* dependencies are passed in by the bootstrap.
*/
import express, { type Express } from 'express';
import express, { type Express, type Router } from 'express';
import { pinoHttp } from 'pino-http';
import { toNodeHandler } from 'better-auth/node';
import type { AppDeps } from './infrastructure/types.js';
@ -24,7 +24,15 @@ import { createMembersRouter } from './modules/members/index.js';
import { createHmac } from 'node:crypto';
import { APIError } from 'better-auth/api';
export function createApp(deps: AppDeps): Express {
export interface AppRouters {
/** Pre-built deploy router (Phase 6b). When omitted, falls back to a
* 503 stub so callers that don't need deploy still work. The bootstrap
* builds the real one in `index.ts` because it owns the WS pool the
* deploy service needs to broadcast events. */
readonly deploy?: Router;
}
export function createApp(deps: AppDeps, routers: AppRouters = {}): Express {
const app = express();
// ── Global middleware (early) ────────────────────────────────────────
@ -71,7 +79,7 @@ export function createApp(deps: AppDeps): Express {
// ── Module routers ────────────────────────────────────────────────────
app.use('/healthz', createHealthRouter(deps));
app.use('/sessions', createSessionsRouter(deps));
app.use('/deploy', createDeployRouter(deps));
app.use('/deploy', routers.deploy ?? createDeployRouter(deps));
app.use('/vault', createVaultRouter(deps));
app.use('/db', createDbProxyRouter(deps));
app.use('/git', createGitRouter(deps));

View file

@ -43,6 +43,15 @@ const ConfigSchema = z.object({
* straight in the AppShell. Never enable in production.
*/
devAutoLogin: z.coerce.boolean().default(false),
// ── Phase 6 deploy controller ────────────────────────────────────────
/** Workspace root that pnpm runs builds against. */
monorepoRoot: z.string().default('/opt/cial-monorepo'),
/** Unix socket the supervisor (cial-core/edge) listens on. */
supervisorSocket: z.string().default('/run/cial-supervisor.sock'),
/** Where the deploy controller persists per-build log files. */
deployLogDir: z.string().default('/var/lib/cial/deploy-logs'),
/** Path to the platform working tree (Phase 6c git engine). */
platformRoot: z.string().default('/opt/cial-monorepo/cial-platform'),
});
export type Config = z.infer<typeof ConfigSchema>;
@ -63,6 +72,10 @@ export function loadConfig(): Config {
devAutoLogin:
process.env.CIAL_DEV_AUTOLOGIN === '1' ||
process.env.CIAL_DEV_AUTOLOGIN === 'true',
monorepoRoot: process.env.CIAL_MONOREPO_ROOT,
supervisorSocket: process.env.CIAL_SUPERVISOR_SOCK,
deployLogDir: process.env.CIAL_DEPLOY_LOG_DIR,
platformRoot: process.env.CIAL_PLATFORM_ROOT,
});
if (!parsed.success) {
throw new Error(`Invalid config: ${parsed.error.message}`);

View file

@ -17,6 +17,7 @@ import { createAuth, migrateAuth } from './lib/auth.js';
import { createMailer } from './lib/mailer.js';
import { migrateInvites } from './modules/invites/index.js';
import { createSessionsModule, migrateSessions } from './modules/sessions/index.js';
import { createDeployModule, migrateDeploy } from './modules/deploy/index.js';
import { createProcessManager } from './modules/sessions/process/index.js';
import {
attachProcessBridge,
@ -38,11 +39,30 @@ async function main(): Promise<void> {
await migrateAuth({ auth, logger });
migrateInvites({ db, logger });
migrateSessions({ db, logger });
migrateDeploy({ db, logger });
const app = createApp({ config, logger, db, auth, mailer });
const server = http.createServer(app);
// Build server without a request listener so we can attach WS first; the
// deploy module needs `ws.pool` to broadcast events, and the app needs the
// pre-built deploy router so REST + WS share one BuildRunner instance.
const server = http.createServer();
const ws = attachWsServer({ server, logger, auth });
const deploy = createDeployModule({
config,
logger,
db,
auth,
mailer,
pool: ws.pool,
monorepoRoot: config.monorepoRoot,
supervisorSocket: config.supervisorSocket,
logDir: config.deployLogDir,
});
deploy.registerWs(ws.registry);
const app = createApp({ config, logger, db, auth, mailer }, { deploy: deploy.router });
server.on('request', app);
// ── AI process engine (Phase 5c/5d) ──────────────────────────
const processes = createProcessManager({ config, logger });

View file

@ -1,13 +1,186 @@
/**
* Deploy controller fast (HMR) vs stable (build+restart) modes (phase 6).
* Deploy module Phase 6b.
*
* REST surface (mounted at `/deploy`, externally `/.cial/api/deploy/*`):
* POST / start a build (+ auto-restart on success)
* POST /restart restart-only (no build)
* POST /:id/cancel cancel an in-flight build
* GET /:id single deploy snapshot
* GET / recent deploys (?limit=N)
* GET /mode current `fast` | `stable`
* PATCH /mode { mode } update
*
* All routes require an instance_admin Better-Auth session same gate the
* SSO bridge sets for owners. The internal Unix-socket mount (added in
* Phase 6e) bypasses this guard because the socket itself is the auth.
*/
import { Router } from 'express';
import type { AppDeps } from '../../infrastructure/types.js';
import { Router } from 'express';
import {
DeployModeUpdateRequestSchema,
DeployStartRequestSchema,
} from '@cial/protocol';
import type { AppDeps } from '../../infrastructure/types.js';
import { ValidationError } from '../../infrastructure/errors.js';
import type { WsClientPool } from '../../infrastructure/ws/clients.js';
import type { WsHandlerRegistry } from '../../infrastructure/ws/handler-registry.js';
import { requireRole } from '../../lib/require-role.js';
import { migrateDeploy } from './migrate.js';
import { DeployRepository } from './repository.js';
import { BuildRunner } from './runner.js';
import { DeployService } from './service.js';
import { SupervisorClient } from './supervisor-client.js';
import { registerDeployWsHandlers } from './ws-handlers.js';
export { migrateDeploy } from './migrate.js';
export type { DeployService } from './service.js';
export interface DeployModuleOpts extends AppDeps {
pool: WsClientPool;
monorepoRoot: string;
supervisorSocket: string;
logDir: string;
}
export function createDeployModule(opts: DeployModuleOpts): {
router: Router;
service: DeployService;
registerWs(registry: WsHandlerRegistry): void;
} {
const repo = new DeployRepository(opts.db);
const runner = new BuildRunner({
monorepoRoot: opts.monorepoRoot,
logDir: opts.logDir,
logger: opts.logger,
});
const supervisor = new SupervisorClient(opts.supervisorSocket);
const service = new DeployService({
repo,
runner,
supervisor,
pool: opts.pool,
logger: opts.logger,
});
return {
router: createRouter(opts, service),
service,
registerWs: registerDeployWsHandlers,
};
}
/** Legacy entry point so `app.ts` keeps compiling until bootstrap migrates. */
export function createDeployRouter(_deps: AppDeps): Router {
const router = Router();
router.all('*', (_req, res) => {
res.status(501).json({ error: { code: 'NOT_IMPLEMENTED', message: 'deploy — phase 6' } });
res.status(503).json({
error: { code: 'NOT_WIRED', message: 'deploy module not wired — use createDeployModule' },
});
});
return router;
}
function createRouter(deps: DeployModuleOpts, service: DeployService): Router {
const router = Router();
async function caller(headers: import('node:http').IncomingHttpHeaders): Promise<string> {
const id = await requireRole({
auth: deps.auth,
db: deps.db,
headers,
role: 'instance_admin',
});
return id.id;
}
// ── POST /deploy ─────────────────────────────────────────────────────
router.post('/', async (req, res, next) => {
try {
const userId = await caller(req.headers);
const parsed = DeployStartRequestSchema.safeParse(req.body ?? {});
if (!parsed.success) {
throw new ValidationError('invalid deploy request', parsed.error.flatten());
}
const result = service.start({
requestedByUserId: userId,
modeOverride: parsed.data.mode,
sessionId: parsed.data.sessionId,
});
res.status(202).json(result);
} catch (err) {
next(err);
}
});
// ── POST /deploy/restart ─────────────────────────────────────────────
router.post('/restart', async (req, res, next) => {
try {
const userId = await caller(req.headers);
const result = await service.restartOnly({ requestedByUserId: userId });
res.json(result);
} catch (err) {
next(err);
}
});
// ── POST /deploy/:id/cancel ──────────────────────────────────────────
router.post('/:id/cancel', async (req, res, next) => {
try {
await caller(req.headers);
const ok = service.cancel(req.params.id);
res.json({ ok });
} catch (err) {
next(err);
}
});
// ── GET /deploy/mode ─────────────────────────────────────────────────
router.get('/mode', async (req, res, next) => {
try {
await caller(req.headers);
res.json({ mode: service.getMode() });
} catch (err) {
next(err);
}
});
// ── PATCH /deploy/mode ───────────────────────────────────────────────
router.patch('/mode', async (req, res, next) => {
try {
await caller(req.headers);
const parsed = DeployModeUpdateRequestSchema.safeParse(req.body);
if (!parsed.success) {
throw new ValidationError('invalid mode payload', parsed.error.flatten());
}
service.setMode(parsed.data.mode);
res.json({ mode: service.getMode() });
} catch (err) {
next(err);
}
});
// ── GET /deploy/:id ──────────────────────────────────────────────────
router.get('/:id', async (req, res, next) => {
try {
await caller(req.headers);
const row = service.get(req.params.id);
if (!row) return res.status(404).json({ error: { code: 'NOT_FOUND', message: 'deploy not found' } });
res.json({ deploy: row });
} catch (err) {
next(err);
}
});
// ── GET /deploy ──────────────────────────────────────────────────────
router.get('/', async (req, res, next) => {
try {
await caller(req.headers);
const limitRaw = req.query.limit;
const limit = typeof limitRaw === 'string' ? Math.min(100, Math.max(1, parseInt(limitRaw, 10) || 20)) : 20;
res.json({ deploys: service.list(limit) });
} catch (err) {
next(err);
}
});
return router;
}

View file

@ -0,0 +1,40 @@
/**
* Deploy schema Phase 6.
*
* Two tables:
* - `deploy` one row per build attempt (audit + UI history)
* - `tenant_settings` k/v store for per-tenant flags (just `deploy_mode`
* for now, but the table is there so future flags
* don't need migrations)
*
* Idempotent: every statement is `IF NOT EXISTS`; safe to run on every boot.
*/
import type Database from 'better-sqlite3';
import type { Logger } from 'pino';
export function migrateDeploy(opts: { db: Database.Database; logger: Logger }): void {
opts.db.exec(`
CREATE TABLE IF NOT EXISTS deploy (
id TEXT PRIMARY KEY,
mode TEXT NOT NULL,
status TEXT NOT NULL,
requested_by_user_id TEXT,
session_id TEXT,
started_at INTEGER NOT NULL,
ended_at INTEGER,
exit_code INTEGER,
error_summary TEXT,
log_path TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS deploy_started_at_idx ON deploy(started_at DESC);
CREATE INDEX IF NOT EXISTS deploy_session_idx ON deploy(session_id, started_at DESC);
CREATE TABLE IF NOT EXISTS tenant_settings (
key TEXT PRIMARY KEY,
value TEXT NOT NULL,
updated_at INTEGER NOT NULL
);
`);
opts.logger.debug('deploy / tenant_settings schema ensured');
}

View file

@ -0,0 +1,114 @@
/**
* Deploy repository better-sqlite3 raw queries (matches sessions repo style).
*/
import type Database from 'better-sqlite3';
import type { DeployMode, DeployRow, DeployStatus } from '@cial/protocol';
interface DeployRowDb {
id: string;
mode: string;
status: string;
requested_by_user_id: string | null;
session_id: string | null;
started_at: number;
ended_at: number | null;
exit_code: number | null;
error_summary: string | null;
log_path: string;
}
function rowToDto(r: DeployRowDb): DeployRow {
return {
id: r.id,
mode: r.mode as DeployMode,
status: r.status as DeployStatus,
requestedByUserId: r.requested_by_user_id,
sessionId: r.session_id,
startedAt: r.started_at,
endedAt: r.ended_at,
exitCode: r.exit_code,
errorSummary: r.error_summary,
durationMs: r.ended_at ? r.ended_at - r.started_at : null,
};
}
export class DeployRepository {
constructor(private readonly db: Database.Database) {}
insert(opts: {
id: string;
mode: DeployMode;
status: DeployStatus;
requestedByUserId: string | null;
sessionId: string | null;
startedAt: number;
logPath: string;
}): void {
this.db
.prepare(
`INSERT INTO deploy (id, mode, status, requested_by_user_id, session_id, started_at, log_path)
VALUES (?, ?, ?, ?, ?, ?, ?)`,
)
.run(
opts.id,
opts.mode,
opts.status,
opts.requestedByUserId,
opts.sessionId,
opts.startedAt,
opts.logPath,
);
}
setStatus(id: string, status: DeployStatus): void {
this.db.prepare('UPDATE deploy SET status = ? WHERE id = ?').run(status, id);
}
finalize(opts: {
id: string;
status: DeployStatus;
endedAt: number;
exitCode: number | null;
errorSummary: string | null;
}): void {
this.db
.prepare(
'UPDATE deploy SET status = ?, ended_at = ?, exit_code = ?, error_summary = ? WHERE id = ?',
)
.run(opts.status, opts.endedAt, opts.exitCode, opts.errorSummary, opts.id);
}
get(id: string): DeployRow | null {
const row = this.db.prepare('SELECT * FROM deploy WHERE id = ?').get(id) as
| DeployRowDb
| undefined;
return row ? rowToDto(row) : null;
}
list(limit = 20): DeployRow[] {
const rows = this.db
.prepare('SELECT * FROM deploy ORDER BY started_at DESC LIMIT ?')
.all(limit) as DeployRowDb[];
return rows.map(rowToDto);
}
// ── tenant_settings (deploy_mode lives here) ────────────────────────
getMode(): DeployMode {
const row = this.db
.prepare('SELECT value FROM tenant_settings WHERE key = ?')
.get('deploy_mode') as { value: string } | undefined;
if (row?.value === 'fast' || row?.value === 'stable') return row.value;
return 'stable';
}
setMode(mode: DeployMode): void {
this.db
.prepare(
`INSERT INTO tenant_settings (key, value, updated_at) VALUES ('deploy_mode', ?, ?)
ON CONFLICT(key) DO UPDATE SET value = excluded.value, updated_at = excluded.updated_at`,
)
.run(mode, Date.now());
}
}

View file

@ -0,0 +1,279 @@
/**
* BuildRunner single-flight pnpm build with line-buffered log streaming.
*
* One build at a time per process. New requests while a build is running
* either coalesce (same payload caller gets back the in-flight deployId)
* or queue exactly one slot (newer overrides older queued). Cancel sends
* SIGTERM to the process group, then SIGKILL after 5s.
*
* The runner emits typed events the WS bridge fans out to subscribers:
* - `start` { deployId, mode }
* - `log` { deployId, stream, line }
* - `done` { deployId, ok, exitCode, durationMs, errorSummary }
* - `cancelled` { deployId }
*
* Persistence + supervisor restart live in `service.ts`; this class is
* deliberately storage-agnostic.
*/
import { spawn, type ChildProcess } from 'node:child_process';
import { createWriteStream, mkdirSync, type WriteStream } from 'node:fs';
import { join } from 'node:path';
import { EventEmitter } from 'node:events';
import { randomUUID } from 'node:crypto';
import type { Logger } from 'pino';
import type { DeployMode } from '@cial/protocol';
export interface BuildRunnerOpts {
monorepoRoot: string;
logDir: string;
logger: Logger;
}
export interface DeployEnqueueRequest {
mode: DeployMode;
requestedByUserId: string | null;
sessionId: string | null;
}
export interface EnqueuedDeploy {
deployId: string;
status: 'queued' | 'building';
startedAt: number;
logPath: string;
}
export interface DeployStartEvt { deployId: string; mode: DeployMode; sessionId: string | null }
export interface DeployLogEvt { deployId: string; stream: 'stdout' | 'stderr'; line: string }
export interface DeployDoneEvt {
deployId: string;
ok: boolean;
exitCode: number | null;
durationMs: number;
errorSummary: string | null;
}
export interface DeployCancelledEvt { deployId: string }
interface QueuedDeploy extends EnqueuedDeploy {
mode: DeployMode;
requestedByUserId: string | null;
sessionId: string | null;
}
interface RunningDeploy extends QueuedDeploy {
child: ChildProcess;
killed: boolean;
logStream: WriteStream;
errorTail: string[]; // last N stderr lines for error summary
}
const ERROR_TAIL_LINES = 20;
export class BuildRunner extends EventEmitter {
private current: RunningDeploy | null = null;
private next: QueuedDeploy | null = null;
constructor(private readonly opts: BuildRunnerOpts) {
super();
mkdirSync(opts.logDir, { recursive: true });
}
enqueue(req: DeployEnqueueRequest): EnqueuedDeploy {
if (!this.current) {
const d = this.makeDeploy(req, 'building');
this.startBuild(d);
return { deployId: d.deployId, status: 'building', startedAt: d.startedAt, logPath: d.logPath };
}
// Coalesce same-mode pending request. Different-mode replaces queued slot.
if (this.next && this.next.mode === req.mode) {
return {
deployId: this.next.deployId,
status: 'queued',
startedAt: this.next.startedAt,
logPath: this.next.logPath,
};
}
const d = this.makeDeploy(req, 'queued');
this.next = d;
return { deployId: d.deployId, status: 'queued', startedAt: d.startedAt, logPath: d.logPath };
}
cancel(deployId: string): boolean {
if (this.current?.deployId === deployId) {
this.current.killed = true;
try {
// Negative pid → process group; we set detached: true at spawn.
if (this.current.child.pid != null) {
process.kill(-this.current.child.pid, 'SIGTERM');
}
} catch {
// already gone — exit handler will fire
}
const id = deployId;
setTimeout(() => {
if (this.current?.deployId === id && this.current.killed) {
try {
if (this.current.child.pid != null) {
process.kill(-this.current.child.pid, 'SIGKILL');
}
} catch {
/* ignore */
}
}
}, 5000).unref();
return true;
}
if (this.next?.deployId === deployId) {
const cancelled = this.next;
this.next = null;
this.emit('cancelled', { deployId: cancelled.deployId } satisfies DeployCancelledEvt);
return true;
}
return false;
}
isBusy(): boolean {
return this.current != null;
}
// ────────────── internals ──────────────
private makeDeploy(req: DeployEnqueueRequest, status: 'queued' | 'building'): QueuedDeploy {
const deployId = randomUUID();
const startedAt = Date.now();
const logPath = join(this.opts.logDir, `${deployId}.log`);
return {
deployId,
status,
startedAt,
logPath,
mode: req.mode,
requestedByUserId: req.requestedByUserId,
sessionId: req.sessionId,
};
}
private startBuild(d: QueuedDeploy): void {
const logStream = createWriteStream(d.logPath, { flags: 'w' });
// For v1 `fast` is treated identically to `stable` at the runner level —
// PHASE-6.md keeps real HMR keep-warm behind a follow-up phase.
const child = spawn(
'pnpm',
['--filter', '@cial/platform-front', '--filter', '@cial/platform-back', 'build'],
{
cwd: this.opts.monorepoRoot,
env: { ...process.env, NODE_OPTIONS: '--max-old-space-size=1024' },
stdio: ['ignore', 'pipe', 'pipe'],
detached: true,
},
);
const errorTail: string[] = [];
const running: RunningDeploy = {
...d,
child,
killed: false,
logStream,
errorTail,
};
this.current = running;
this.emit('start', {
deployId: d.deployId,
mode: d.mode,
sessionId: d.sessionId,
} satisfies DeployStartEvt);
pipeLines(child.stdout!, (line) => {
logStream.write(`OUT ${line}\n`);
this.emit('log', { deployId: d.deployId, stream: 'stdout', line } satisfies DeployLogEvt);
});
pipeLines(child.stderr!, (line) => {
logStream.write(`ERR ${line}\n`);
errorTail.push(line);
while (errorTail.length > ERROR_TAIL_LINES) errorTail.shift();
this.emit('log', { deployId: d.deployId, stream: 'stderr', line } satisfies DeployLogEvt);
});
child.on('error', (err) => {
// spawn-level failure (e.g. pnpm not found)
this.opts.logger.error({ err, deployId: d.deployId }, 'build spawn failed');
this.finishCurrent(null, `spawn failed: ${err.message}`);
});
child.on('exit', (code) => {
const cancelled = running.killed;
this.finishCurrent(code, cancelled ? 'cancelled' : null);
});
}
private finishCurrent(exitCode: number | null, forceErrorSummary: string | null): void {
const d = this.current;
if (!d) return;
this.current = null;
try {
d.logStream.end();
} catch {
/* ignore */
}
const durationMs = Date.now() - d.startedAt;
if (forceErrorSummary === 'cancelled') {
this.emit('cancelled', { deployId: d.deployId } satisfies DeployCancelledEvt);
this.emit('done', {
deployId: d.deployId,
ok: false,
exitCode,
durationMs,
errorSummary: 'cancelled',
} satisfies DeployDoneEvt);
} else if (forceErrorSummary !== null || exitCode !== 0) {
const summary =
forceErrorSummary ?? truncateSummary(d.errorTail.join('\n')) ?? `exit code ${exitCode}`;
this.emit('done', {
deployId: d.deployId,
ok: false,
exitCode,
durationMs,
errorSummary: summary,
} satisfies DeployDoneEvt);
} else {
this.emit('done', {
deployId: d.deployId,
ok: true,
exitCode,
durationMs,
errorSummary: null,
} satisfies DeployDoneEvt);
}
// Promote queued deploy if any.
const queued = this.next;
if (queued) {
this.next = null;
this.startBuild(queued);
}
}
}
function pipeLines(stream: NodeJS.ReadableStream, onLine: (line: string) => void): void {
let buf = '';
stream.setEncoding('utf8');
stream.on('data', (chunk: string) => {
buf += chunk;
let nl = buf.indexOf('\n');
while (nl !== -1) {
const line = buf.slice(0, nl);
buf = buf.slice(nl + 1);
if (line) onLine(line);
nl = buf.indexOf('\n');
}
});
stream.on('end', () => {
if (buf) onLine(buf);
});
}
function truncateSummary(s: string): string | null {
const trimmed = s.trim();
if (!trimmed) return null;
return trimmed.length > 4000 ? `${trimmed.slice(0, 4000)}` : trimmed;
}

View file

@ -0,0 +1,247 @@
/**
* Deploy service.
*
* Glue between the BuildRunner, the SQLite repository, the supervisor IPC
* client, and the WS broadcast bridge. The router and WS handlers are thin
* wrappers on top of this so the same call paths apply whether a deploy is
* triggered by REST, by WS, or by the agent over the internal mount.
*
* Lifecycle of a deploy:
* 1. `start()` enqueues a build via BuildRunner; persists a `deploy` row;
* caller gets `{ deployId, status }` immediately.
* 2. Runner emits `start` service flips status, broadcasts `deploy.start`.
* 3. Runner emits `log` service streams `deploy.log` events.
* 4. Runner emits `done`:
* - ok=true status='restarting', supervisor.restart('platform'),
* broadcast `deploy.restart.start` / `.done`, then
* status='ok' + broadcast `deploy.done` with ok=true.
* - ok=false status='error', broadcast `deploy.done` with ok=false.
* 5. Runner emits `cancelled` status='cancelled' + broadcast.
*
* Broadcasts are user-scoped (`pool.broadcastToUser`) every authenticated
* tab of the requester sees the events. Cross-user visibility is out of
* scope for v1.
*/
import { EventEmitter } from 'node:events';
import {
DeployWsType,
type DeployCancelledEvent,
type DeployDoneEvent,
type DeployLogEvent,
type DeployMode,
type DeployRestartDoneEvent,
type DeployRestartStartEvent,
type DeployRow,
type DeployStartEvent,
} from '@cial/protocol';
import type { Logger } from 'pino';
import type { WsClientPool } from '../../infrastructure/ws/clients.js';
import { ValidationError } from '../../infrastructure/errors.js';
import { BuildRunner, type DeployDoneEvt, type DeployLogEvt, type DeployStartEvt } from './runner.js';
import { DeployRepository } from './repository.js';
import { SupervisorClient } from './supervisor-client.js';
export interface DeployServiceOpts {
repo: DeployRepository;
runner: BuildRunner;
supervisor: SupervisorClient;
pool: WsClientPool;
logger: Logger;
}
/**
* Subscribers (e.g. tests, an internal mount, the agent path) can listen
* here without going through WS. Event names mirror the runner.
*/
export class DeployService extends EventEmitter {
/** userId of the most recent caller per deployId — drives broadcast scoping. */
private readonly callerByDeploy = new Map<string, string | null>();
constructor(private readonly opts: DeployServiceOpts) {
super();
this.wireRunner();
}
// ────────────────────── REST/WS surface ──────────────────────
start(opts: {
requestedByUserId: string;
sessionId?: string;
modeOverride?: DeployMode;
}): { deployId: string; status: 'queued' | 'building' } {
const mode = opts.modeOverride ?? this.opts.repo.getMode();
const enqueued = this.opts.runner.enqueue({
mode,
requestedByUserId: opts.requestedByUserId,
sessionId: opts.sessionId ?? null,
});
this.opts.repo.insert({
id: enqueued.deployId,
mode,
status: enqueued.status,
requestedByUserId: opts.requestedByUserId,
sessionId: opts.sessionId ?? null,
startedAt: enqueued.startedAt,
logPath: enqueued.logPath,
});
this.callerByDeploy.set(enqueued.deployId, opts.requestedByUserId);
return { deployId: enqueued.deployId, status: enqueued.status };
}
cancel(deployId: string): boolean {
return this.opts.runner.cancel(deployId);
}
get(id: string): DeployRow | null {
return this.opts.repo.get(id);
}
list(limit?: number): DeployRow[] {
return this.opts.repo.list(limit);
}
getMode(): DeployMode {
return this.opts.repo.getMode();
}
setMode(mode: DeployMode): void {
if (mode !== 'fast' && mode !== 'stable') {
throw new ValidationError('mode must be "fast" or "stable"');
}
this.opts.repo.setMode(mode);
}
/**
* Restart-only path (no build). Used by `POST /deploy/restart` and by the
* internal mount when the agent has already verified the build. Resolves
* once both platform-* children are back up (or rejects if the supervisor
* IPC errors).
*/
async restartOnly(opts: {
requestedByUserId: string;
sessionId?: string;
}): Promise<{ ok: boolean; durations: Record<string, number> }> {
this.broadcastTo(opts.requestedByUserId, DeployWsType.DeployRestartStart, {
service: 'platform',
} satisfies DeployRestartStartEvent);
try {
const results = await this.opts.supervisor.restart('platform');
const durations: Record<string, number> = {};
for (const r of results) {
durations[r.service] = r.durationMs;
this.broadcastTo(opts.requestedByUserId, DeployWsType.DeployRestartDone, {
service: r.service,
pid: r.pid,
durationMs: r.durationMs,
} satisfies DeployRestartDoneEvent);
}
return { ok: true, durations };
} catch (err) {
this.opts.logger.error({ err }, 'restart failed');
return { ok: false, durations: {} };
}
}
// ────────────────────── runner → service plumbing ──────────────────────
private wireRunner(): void {
this.opts.runner.on('start', (e: DeployStartEvt) => {
this.opts.repo.setStatus(e.deployId, 'building');
const payload: DeployStartEvent = {
deployId: e.deployId,
mode: e.mode,
targets: ['platform-front', 'platform-back'],
sessionId: e.sessionId,
};
this.broadcastForDeploy(e.deployId, DeployWsType.DeployStart, payload);
this.emit('start', e);
});
this.opts.runner.on('log', (e: DeployLogEvt) => {
const payload: DeployLogEvent = {
deployId: e.deployId,
stream: e.stream,
line: e.line,
};
this.broadcastForDeploy(e.deployId, DeployWsType.DeployLog, payload);
this.emit('log', e);
});
this.opts.runner.on('cancelled', (e: { deployId: string }) => {
const payload: DeployCancelledEvent = { deployId: e.deployId };
this.broadcastForDeploy(e.deployId, DeployWsType.DeployCancelled, payload);
this.emit('cancelled', e);
});
this.opts.runner.on('done', async (e: DeployDoneEvt) => {
// Persist runner result first so a restart-failure doesn't override the build outcome.
this.opts.repo.finalize({
id: e.deployId,
status: e.ok ? 'restarting' : (e.errorSummary === 'cancelled' ? 'cancelled' : 'error'),
endedAt: Date.now(),
exitCode: e.exitCode,
errorSummary: e.errorSummary,
});
if (!e.ok) {
const payload: DeployDoneEvent = {
deployId: e.deployId,
ok: false,
exitCode: e.exitCode,
durationMs: e.durationMs,
errorSummary: e.errorSummary,
};
this.broadcastForDeploy(e.deployId, DeployWsType.DeployDone, payload);
this.callerByDeploy.delete(e.deployId);
this.emit('done', e);
return;
}
// Build OK → restart platform children.
const userId = this.callerByDeploy.get(e.deployId) ?? null;
this.broadcastForDeploy(e.deployId, DeployWsType.DeployRestartStart, {
service: 'platform',
} satisfies DeployRestartStartEvent);
let restartOk = true;
try {
const results = await this.opts.supervisor.restart('platform');
for (const r of results) {
this.broadcastForDeploy(e.deployId, DeployWsType.DeployRestartDone, {
service: r.service,
pid: r.pid,
durationMs: r.durationMs,
} satisfies DeployRestartDoneEvent);
}
} catch (err) {
this.opts.logger.error({ err, deployId: e.deployId }, 'restart after build failed');
restartOk = false;
}
this.opts.repo.setStatus(e.deployId, restartOk ? 'ok' : 'error');
const payload: DeployDoneEvent = {
deployId: e.deployId,
ok: restartOk,
exitCode: e.exitCode,
durationMs: Date.now() - (this.opts.repo.get(e.deployId)?.startedAt ?? Date.now()),
errorSummary: restartOk ? null : 'restart failed — check supervisor logs',
};
this.broadcastForDeploy(e.deployId, DeployWsType.DeployDone, payload);
this.callerByDeploy.delete(e.deployId);
this.emit('done', e);
// Track that this user (if any) was the requester.
void userId;
});
}
private broadcastForDeploy(deployId: string, type: string, payload: unknown): void {
const userId = this.callerByDeploy.get(deployId) ?? null;
if (!userId) return;
this.opts.pool.broadcastToUser(userId, { type, payload });
}
private broadcastTo(userId: string, type: string, payload: unknown): void {
this.opts.pool.broadcastToUser(userId, { type, payload });
}
}

View file

@ -0,0 +1,157 @@
/**
* Supervisor IPC client.
*
* Talks JSONL over the Unix socket the supervisor listens on (Phase 6a).
* Each public method opens a fresh connection, sends one request, collects
* replies until either the expected terminal event arrives or the
* connection closes, then resolves. Connections are short-lived so the
* supervisor never has to track per-request state.
*/
import { createConnection, type Socket } from 'node:net';
interface RestartReplyAck { type: 'restart.ack'; service: string; pid: number }
interface RestartReplyDone { type: 'restart.done'; service: string; pid: number | null; uptimeMs: number }
interface RestartReplyError { type: 'error'; error: string; service?: string }
type RestartReply = RestartReplyAck | RestartReplyDone | RestartReplyError;
interface StatusReply {
type: 'status';
services: Array<{ name: string; pid: number | null; uptimeMs: number; restartCount: number }>;
}
export interface RestartResult {
service: 'platform-front' | 'platform-back';
pid: number | null;
durationMs: number;
}
/**
* Open a connection, write one line, read replies (one JSON object per line)
* until `done(reply)` returns true or the socket closes / times out.
* Returns the raw reply objects collected.
*/
function exchange(opts: {
socketPath: string;
request: object;
timeoutMs: number;
done: (reply: unknown) => boolean;
}): Promise<unknown[]> {
return new Promise((resolve, reject) => {
let sock: Socket;
try {
sock = createConnection(opts.socketPath);
} catch (err) {
reject(err);
return;
}
const replies: unknown[] = [];
let buf = '';
let settled = false;
const timer = setTimeout(() => {
if (settled) return;
settled = true;
sock.destroy();
reject(new Error(`supervisor IPC timeout after ${opts.timeoutMs}ms`));
}, opts.timeoutMs);
const finish = (err?: Error): void => {
if (settled) return;
settled = true;
clearTimeout(timer);
sock.end();
if (err) reject(err);
else resolve(replies);
};
sock.setEncoding('utf8');
sock.on('connect', () => {
sock.write(`${JSON.stringify(opts.request)}\n`);
});
sock.on('data', (chunk: string) => {
buf += chunk;
let nl = buf.indexOf('\n');
while (nl !== -1) {
const line = buf.slice(0, nl).trim();
buf = buf.slice(nl + 1);
if (line) {
try {
const reply = JSON.parse(line);
replies.push(reply);
if (opts.done(reply)) {
finish();
return;
}
} catch {
// ignore garbage lines from the supervisor — exchange continues
}
}
nl = buf.indexOf('\n');
}
});
sock.on('error', (err) => finish(err));
sock.on('close', () => finish());
});
}
export class SupervisorClient {
constructor(private readonly socketPath: string) {}
async status(timeoutMs = 2000): Promise<StatusReply> {
const replies = await exchange({
socketPath: this.socketPath,
request: { type: 'status' },
timeoutMs,
done: (r) => isStatusReply(r),
});
const status = replies.find(isStatusReply);
if (!status) throw new Error('supervisor returned no status reply');
return status;
}
/**
* Bounce one or more platform-* children. `'platform'` is shorthand for
* both. Returns one entry per child the supervisor restarted, in the
* order it acked them.
*/
async restart(
service: 'platform-front' | 'platform-back' | 'platform',
timeoutMs = 15000,
): Promise<RestartResult[]> {
const expected = service === 'platform' ? 2 : 1;
const dones: RestartReplyDone[] = [];
const errors: RestartReplyError[] = [];
const acks: RestartReplyAck[] = [];
const replies = await exchange({
socketPath: this.socketPath,
request: { type: 'restart', service },
timeoutMs,
done: (raw) => {
const r = raw as RestartReply;
if (r.type === 'error') errors.push(r);
else if (r.type === 'restart.ack') acks.push(r);
else if (r.type === 'restart.done') dones.push(r);
return dones.length + errors.length >= expected;
},
});
const firstErr = errors[0];
if (firstErr) {
throw new Error(`supervisor restart failed: ${firstErr.error}${firstErr.service ? ` (${firstErr.service})` : ''}`);
}
return dones.map((d) => ({
service: d.service as 'platform-front' | 'platform-back',
pid: d.pid,
durationMs: d.uptimeMs,
}));
// Note: we read out `replies` to keep the promise resolved with all
// observed envelopes for debug logging upstream if needed.
void replies;
}
}
function isStatusReply(r: unknown): r is StatusReply {
return typeof r === 'object' && r !== null && (r as { type?: string }).type === 'status';
}

View file

@ -0,0 +1,29 @@
/**
* Deploy WS handlers Phase 6b.
*
* Subscribers ask the back to relay deploy.* events to them via two
* inbound types:
* - `deploy.watch` start receiving
* - `deploy.unwatch` stop receiving
*
* The actual broadcast already happens in `DeployService` via
* `pool.broadcastToUser`, which targets every authenticated socket of the
* user. The watch/unwatch surface is here so the SDK can be explicit about
* intent (and so future filtering e.g. only deploys for session X
* has a hook to bind to).
*/
import type { WsHandlerRegistry } from '../../infrastructure/ws/handler-registry.js';
import { DeployWsType } from '@cial/protocol';
// Watch / unwatch are no-ops for v1 because the broadcast is already
// user-scoped — we accept them to give the SDK a hookable handshake and to
// reserve the message types for finer per-session subscription later.
export function registerDeployWsHandlers(registry: WsHandlerRegistry): void {
registry.register(DeployWsType.DeployWatch, () => {
return { ok: true };
});
registry.register(DeployWsType.DeployUnwatch, () => {
return { ok: true };
});
}

View file

@ -1,14 +1,126 @@
import { z } from 'zod';
/**
* Deploy controller protocol Phase 6.
*
* Schemas for the REST + WS surface that lets the agent (or a human) build,
* restart, and inspect the platform sub-tree without taking the container
* down. Type strings live in `DeployWsType` so the back router and the SDK
* stay in lock-step.
*/
/** Deploy controller protocol. Filled in phase 6. */
import { z } from 'zod';
export const DeployModeSchema = z.enum(['fast', 'stable']);
export type DeployMode = z.infer<typeof DeployModeSchema>;
export const DeployEventSchema = z.object({
type: z.enum(['queued', 'building', 'success', 'error']),
export const DeployStatusSchema = z.enum([
'queued',
'building',
'restarting',
'ok',
'error',
'cancelled',
]);
export type DeployStatus = z.infer<typeof DeployStatusSchema>;
export const DeployRowSchema = z.object({
id: z.string(),
mode: DeployModeSchema,
message: z.string().optional(),
at: z.string().datetime(),
status: DeployStatusSchema,
requestedByUserId: z.string().nullable(),
sessionId: z.string().nullable(),
startedAt: z.number().int(),
endedAt: z.number().int().nullable(),
exitCode: z.number().int().nullable(),
errorSummary: z.string().nullable(),
durationMs: z.number().int().nullable(),
});
export type DeployEvent = z.infer<typeof DeployEventSchema>;
export type DeployRow = z.infer<typeof DeployRowSchema>;
// ────────────────────── REST DTOs ──────────────────────
export const DeployStartRequestSchema = z.object({
/** Optional override; defaults to current `tenant_settings.deploy_mode`. */
mode: DeployModeSchema.optional(),
/** Optional — links the deploy back to a chat session (UI affordance). */
sessionId: z.string().optional(),
});
export type DeployStartRequest = z.infer<typeof DeployStartRequestSchema>;
export const DeployStartResponseSchema = z.object({
deployId: z.string(),
status: z.enum(['queued', 'building']),
});
export type DeployStartResponse = z.infer<typeof DeployStartResponseSchema>;
export const DeployRestartResponseSchema = z.object({
ok: z.boolean(),
durations: z.record(z.string(), z.number().int()).optional(),
});
export type DeployRestartResponse = z.infer<typeof DeployRestartResponseSchema>;
export const DeployModeResponseSchema = z.object({ mode: DeployModeSchema });
export type DeployModeResponse = z.infer<typeof DeployModeResponseSchema>;
export const DeployModeUpdateRequestSchema = z.object({ mode: DeployModeSchema });
export type DeployModeUpdateRequest = z.infer<typeof DeployModeUpdateRequestSchema>;
// ────────────────────── WS events (back → client) ──────────────────────
export const DeployStartEventSchema = z.object({
deployId: z.string(),
mode: DeployModeSchema,
targets: z.array(z.enum(['platform-front', 'platform-back'])),
sessionId: z.string().nullable(),
});
export type DeployStartEvent = z.infer<typeof DeployStartEventSchema>;
export const DeployLogEventSchema = z.object({
deployId: z.string(),
stream: z.enum(['stdout', 'stderr']),
line: z.string(),
});
export type DeployLogEvent = z.infer<typeof DeployLogEventSchema>;
export const DeployDoneEventSchema = z.object({
deployId: z.string(),
ok: z.boolean(),
exitCode: z.number().int().nullable(),
durationMs: z.number().int(),
errorSummary: z.string().nullable(),
});
export type DeployDoneEvent = z.infer<typeof DeployDoneEventSchema>;
export const DeployCancelledEventSchema = z.object({ deployId: z.string() });
export type DeployCancelledEvent = z.infer<typeof DeployCancelledEventSchema>;
export const DeployRestartStartEventSchema = z.object({
service: z.enum(['platform-front', 'platform-back', 'platform']),
});
export type DeployRestartStartEvent = z.infer<typeof DeployRestartStartEventSchema>;
export const DeployRestartDoneEventSchema = z.object({
service: z.enum(['platform-front', 'platform-back']),
pid: z.number().int().nullable(),
durationMs: z.number().int(),
});
export type DeployRestartDoneEvent = z.infer<typeof DeployRestartDoneEventSchema>;
// ────────────────────── WS subscription (client → back) ──────────────────────
export const DeployWatchRequestSchema = z.object({}).optional();
// ────────────────────── canonical type strings ──────────────────────
export const DeployWsType = {
// inbound
DeployWatch: 'deploy.watch',
DeployUnwatch: 'deploy.unwatch',
// outbound
DeployStart: 'deploy.start',
DeployLog: 'deploy.log',
DeployDone: 'deploy.done',
DeployCancelled: 'deploy.cancelled',
DeployRestartStart: 'deploy.restart.start',
DeployRestartDone: 'deploy.restart.done',
} as const;
export type DeployWsTypeName = (typeof DeployWsType)[keyof typeof DeployWsType];