cc-libs/tools/mcp-bridge/src/link-server.ts

172 lines
5.2 KiB
TypeScript

import { randomUUID } from "node:crypto";
import { WebSocketServer, type WebSocket } from "ws";
import { formatComputer, parseJsonFrame, parseLinkMessage, type ResponseMessage } from "./protocol.js";
export type ComputerConnection = {
computerId: number;
label: string | null;
ws: WebSocket;
connectedAt: number;
lastSeenAt: number;
};
type PendingRequest = {
computerId: number;
resolve: (message: ResponseMessage) => void;
};
export class LinkRegistry {
private readonly computers = new Map<number, ComputerConnection>();
private readonly pending = new Map<string, PendingRequest>();
register(connection: ComputerConnection): void {
const existing = this.computers.get(connection.computerId);
if (existing && existing.ws !== connection.ws) {
existing.ws.close();
}
this.computers.set(connection.computerId, connection);
}
unregister(ws: WebSocket): void {
for (const [computerId, connection] of this.computers) {
if (connection.ws === ws) {
this.computers.delete(computerId);
}
}
}
count(): number {
return this.computers.size;
}
snapshot(): ComputerConnection[] {
return [...this.computers.values()];
}
handleFrame(ws: WebSocket, data: unknown): void {
const message = parseLinkMessage(parseJsonFrame(data));
if (!message) {
return;
}
if (message.type === "hello") {
const now = Date.now();
this.register({
computerId: message.computerId,
label: message.computerLabel,
ws,
connectedAt: now,
lastSeenAt: now,
});
ws.send(JSON.stringify({ type: "hello-ok" }));
return;
}
const pending = this.pending.get(message.id);
if (!pending) {
return;
}
const connection = this.computers.get(pending.computerId);
if (connection) {
connection.lastSeenAt = Date.now();
}
this.pending.delete(message.id);
pending.resolve(message);
}
async probeComputers(timeoutMs: number): Promise<string> {
const computers = this.snapshot();
if (computers.length === 0) {
return "No computers connected.";
}
const lines = await Promise.all(computers.map((computer) => this.probeComputer(computer, timeoutMs)));
return lines.join("\n");
}
async execLua(computerId: number, code: string, timeoutMs: number): Promise<string> {
const computer = this.computers.get(computerId);
if (!computer) {
return `No computer with id ${computerId} connected.`;
}
const result = await this.requestComputer(computer, "exec-lua", { code }, timeoutMs);
return formatExecLuaResult(computer, result);
}
private async probeComputer(computer: ComputerConnection, timeoutMs: number): Promise<string> {
const result = await this.requestComputer(computer, "ping", undefined, timeoutMs);
if (!result) {
return `timeout from ${formatComputer(computer.computerId, computer.label)}`;
}
if (result.ok && typeof result.result === "string") {
return result.result;
}
return `error from ${formatComputer(computer.computerId, computer.label)}: ${result.error ?? "unknown error"}`;
}
private async requestComputer(
computer: ComputerConnection,
method: string,
params: Record<string, unknown> | undefined,
timeoutMs: number,
): Promise<ResponseMessage | null> {
const id = randomUUID();
const response = new Promise<ResponseMessage>((resolve) => {
this.pending.set(id, { computerId: computer.computerId, resolve });
computer.ws.send(JSON.stringify(params ? { type: "request", id, method, params } : { type: "request", id, method }));
});
const timeout = new Promise<null>((resolve) => {
setTimeout(() => {
this.pending.delete(id);
resolve(null);
}, timeoutMs);
});
return Promise.race([response, timeout]);
}
}
function formatExecLuaResult(computer: ComputerConnection, response: ResponseMessage | null): string {
const computerText = formatComputer(computer.computerId, computer.label);
if (!response) {
return `computer: ${computerText}\nok: false\nerror: timeout`;
}
const payload = isRecord(response.result) ? response.result : {};
const output = typeof payload.output === "string" ? payload.output : "";
const lines = [`computer: ${computerText}`, `ok: ${response.ok ? "true" : "false"}`];
if (response.ok) {
lines.push(`returns: ${JSON.stringify(Array.isArray(payload.returns) ? payload.returns : [])}`);
} else {
lines.push(`error: ${response.error ?? "unknown error"}`);
}
lines.push("output:");
if (output !== "") {
lines.push(output.endsWith("\n") ? output.slice(0, -1) : output);
}
return lines.join("\n");
}
function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === "object" && value !== null && !Array.isArray(value);
}
export function startLinkServer(options: { host: string; port: number; registry: LinkRegistry }): WebSocketServer {
const server = new WebSocketServer({ host: options.host, port: options.port });
server.on("connection", (ws) => {
ws.on("message", (data) => options.registry.handleFrame(ws, data));
ws.on("close", () => options.registry.unregister(ws));
ws.on("error", () => options.registry.unregister(ws));
});
return server;
}