import { WebSocketServer, WebSocket } from 'ws'; import { LLMService } from '../services/LLMService'; import { EncryptionService } from '../services/EncryptionService'; import { v4 as uuidv4 } from 'uuid'; // Initialize Encryption Service const encryptionService = new EncryptionService(process.env.ENCRYPTION_KEY || ''); // Types from the frontend interface Decision { goTo: string; alsoGood: string; considerable: string; noGoes: string; needsDiscussion: string; } // Define the SessionState enum export enum SessionState { SETUP = 'SETUP', GATHERING = 'GATHERING', HARMONIZING = 'HARMONIZING', FINAL = 'FINAL', ERROR = 'ERROR', } interface EncryptedResponseData { wants: string[]; accepts: string[]; noGoes: string[]; afraidToAsk: string; // Encrypted afraidToAsk idea } // A map to hold session data, including clients and the latest state interface SessionData { state: SessionState; // Current phase of the session topic: string | null; // The topic of the session description: string | null; // The description of the session expectedResponses: number; // The number set by the first user in State A. submittedCount: number; // The current count of submitted responses. responses: Map; // Stores the submitted desire objects. Map clients: Map; // Maps the persistent Client ID to their active WebSocket connection object. finalResult: any | null; // The result returned by the LLM. lastActivity: number; // Timestamp of the last activity (e.g., message, client join/leave) } export const sessions = new Map(); // Initialize LLM Service (API key from environment) const llmService = new LLMService(process.env.GEMINI_API_KEY || ''); // Structured logging function const logEvent = (eventName: string, sessionId: string, details: object = {}) => { console.log(JSON.stringify({ timestamp: new Date().toISOString(), eventName, sessionId, ...details })); }; // Metrics recording function const recordMetric = (metricName: string, value: number | string, sessionId: string, details: object = {}) => { console.log(JSON.stringify({ timestamp: new Date().toISOString(), metricName, value, sessionId, ...details })); }; // Helper to create a serializable version of the session state const getSerializableSession = (sessionData: SessionData, currentClientId: string | null = null) => { const filteredResponses = new Map(); sessionData.responses.forEach((response, clientId) => { if (clientId === currentClientId) { // For the current client, decrypt and send their own full response const decryptedWants = response.wants.map((d: string) => encryptionService.decrypt(d)); const decryptedAccepts = response.accepts.map((d: string) => encryptionService.decrypt(d)); const decryptedNoGoes = response.noGoes.map((d: string) => encryptionService.decrypt(d)); const decryptedAfraidToAsk = encryptionService.decrypt(response.afraidToAsk); filteredResponses.set(clientId, { wants: decryptedWants, accepts: decryptedAccepts, noGoes: decryptedNoGoes, afraidToAsk: decryptedAfraidToAsk }); } else { // For other clients, only send non-AfraidToAsk parts (wants, accepts, noGoes without AfraidToAsk) const decryptedWants = response.wants.map((d: string) => encryptionService.decrypt(d)); const decryptedAccepts = response.accepts.map((d: string) => encryptionService.decrypt(d)); const decryptedNoGoes = response.noGoes.map((d: string) => encryptionService.decrypt(d)); filteredResponses.set(clientId, { wants: decryptedWants, accepts: decryptedAccepts, noGoes: decryptedNoGoes, afraidToAsk: "" }); // Hide afraidToAsk for other clients } }); return { ...sessionData, responses: Object.fromEntries(filteredResponses), clients: Array.from(sessionData.clients.keys()), // Only send client IDs, not the WebSocket objects }; }; export const broadcastToSession = (sessionId: string, message: any, excludeClientId: string | null = null) => { const sessionData = sessions.get(sessionId); if (sessionData) { sessionData.clients.forEach((client, clientId) => { if (clientId !== excludeClientId && client.readyState === WebSocket.OPEN) { const serializableMessage = { ...message, payload: { ...message.payload, session: getSerializableSession(sessionData, clientId), // Pass clientId to getSerializableSession }, }; client.send(JSON.stringify(serializableMessage)); } }); } }; const SESSION_TIMEOUT_MINUTES = parseInt(process.env.SESSION_TIMEOUT_MINUTES || '1440', 10); const SESSION_TIMEOUT_MS = SESSION_TIMEOUT_MINUTES * 60 * 1000; // Convert minutes to milliseconds // Function to clean up inactive sessions const cleanupInactiveSessions = () => { const now = Date.now(); for (const [sessionId, sessionData] of sessions.entries()) { if (sessionData.clients.size === 0 && (now - sessionData.lastActivity > SESSION_TIMEOUT_MS)) { sessions.delete(sessionId); logEvent('session_purged_inactive', sessionId); console.log(`Inactive session ${sessionId} purged.`); } } }; export const createWebSocketServer = (server: any) => { const wss = new WebSocketServer({ server }); // Schedule periodic cleanup of inactive sessions setInterval(cleanupInactiveSessions, 60 * 60 * 1000); // Run every hour wss.on('connection', (ws, req) => { const url = new URL(req.url || '', `http://${req.headers.host}`); const sessionId = url.pathname.split('/').pop(); if (!sessionId) { ws.close(1008, 'Invalid session ID'); return; } let sessionData: SessionData | null = null; // Set up a ping interval to keep the connection alive const pingInterval = setInterval(() => { if (ws.readyState === WebSocket.OPEN) { ws.ping(); } }, 30000); // Send ping every 30 seconds ws.on('message', async (message) => { const parsedMessage = JSON.parse(message.toString()); const updatedSessionData = await handleWebSocketMessage(ws, sessionId, parsedMessage); if (updatedSessionData) { sessionData = updatedSessionData; } }); ws.on('close', () => { clearInterval(pingInterval); // Clear the interval when the connection closes let disconnectedClientId: string | null = null; const currentSessionData = sessions.get(sessionId); // Retrieve the latest sessionData if (currentSessionData) { // Check if sessionData is not null for (const [clientId, clientWs] of currentSessionData.clients.entries()) { if (clientWs === ws) { disconnectedClientId = clientId; break; } } if (disconnectedClientId) { currentSessionData.clients.delete(disconnectedClientId); console.log(`Client ${disconnectedClientId} disconnected from session: ${sessionId}. Remaining clients: ${currentSessionData.clients.size}`); } else { console.log(`An unregistered client disconnected from session: ${sessionId}.`); } if (currentSessionData.clients.size === 0) { // Only purge session if it's in SETUP, FINAL, or ERROR state if (currentSessionData.state === SessionState.SETUP || currentSessionData.state === SessionState.FINAL || currentSessionData.state === SessionState.ERROR) { sessions.delete(sessionId); logEvent('session_purged', sessionId); console.log(`Session ${sessionId} closed and state cleared.`); } else { console.log(`Session ${sessionId} is in ${currentSessionData.state} state. Not purging despite no active clients.`); } } } else { console.log(`Client disconnected from session: ${sessionId}. Session data was null.`); } }); ws.on('error', (error) => { console.error(`WebSocket error in session ${sessionId}:`, error); }); }); return wss; }; export const handleWebSocketMessage = async (ws: WebSocket, sessionId: string, parsedMessage: any): Promise => { const { type, clientId, payload } = parsedMessage; if (!clientId) { console.error(`Received message without clientId in session ${sessionId}. Type: ${type}`); ws.send(JSON.stringify({ type: 'ERROR', payload: { message: 'clientId is required' } })); return sessions.get(sessionId) || null; // Return current session state if available } let sessionData = sessions.get(sessionId); // Update lastActivity timestamp on any message if (sessionData) { sessionData.lastActivity = Date.now(); console.log(`Session ${sessionId}: lastActivity updated to ${sessionData.lastActivity}`); } // If sessionData is null here, it means a JOIN_SESSION message hasn't been processed yet for a new session. // The JOIN_SESSION case will handle its creation. if (!sessionData && type !== 'JOIN_SESSION') { ws.send(JSON.stringify({ type: 'ERROR', payload: { message: 'Session not found and message is not JOIN_SESSION' } })); return null; // Session not found, return null } switch (type) { case 'JOIN_SESSION': if (!sessionData) { // Create a new session if it doesn't exist const newSessionData: SessionData = { state: SessionState.SETUP, topic: null, description: null, expectedResponses: 0, submittedCount: 0, responses: new Map(), clients: new Map(), finalResult: null, lastActivity: Date.now(), // Initialize lastActivity }; sessions.set(sessionId, newSessionData); // Explicitly set in global map sessionData = newSessionData; // Update local reference to the newly created session console.log(`New session ${sessionId} created upon client ${clientId} joining. lastActivity: ${sessionData.lastActivity}`); } // Register the client to the session's clients map if (!sessionData.clients.has(clientId)) { sessionData.clients.set(clientId, ws); console.log(`Client ${clientId} joined session: ${sessionId}. Total clients: ${sessionData.clients.size}`); } ws.send(JSON.stringify({ type: 'STATE_UPDATE', payload: { session: getSerializableSession(sessionData, clientId) } })); console.log(`Client ${clientId} received STATE_UPDATE for session ${sessionId} upon joining.`); return sessionData; // Return the updated sessionData case 'PING': if (!sessionData) { ws.send(JSON.stringify({ type: 'ERROR', payload: { message: 'Session data not available for PING' } })); return null; } // Respond to client pings with a pong if (ws.readyState === WebSocket.OPEN) { ws.pong(); } return sessionData || null; // Return current sessionData or null if undefined case 'SETUP_SESSION': if (!sessionData) { ws.send(JSON.stringify({ type: 'ERROR', payload: { message: 'Session data not available for SETUP_SESSION' } })); return null; } if (sessionData.state === SessionState.SETUP) { const { expectedResponses, topic, description } = payload; if (typeof expectedResponses !== 'number' || expectedResponses <= 0) { ws.send(JSON.stringify({ type: 'ERROR', payload: { message: 'Invalid expectedResponses' } })); return sessionData || null; // Return current sessionData on error } sessionData.expectedResponses = expectedResponses; sessionData.topic = topic || 'Untitled Session'; sessionData.description = description || null; sessionData.state = SessionState.GATHERING; broadcastToSession(sessionId, { type: 'STATE_UPDATE', payload: {} }); console.log(`Session ${sessionId} moved to GATHERING with topic "${sessionData.topic}" and ${expectedResponses} expected responses.`); } else { ws.send(JSON.stringify({ type: 'ERROR', payload: { message: `Session is not in SETUP state. Current state: ${sessionData.state}` } })); } return sessionData || null; // Return current sessionData case 'SUBMIT_RESPONSE': if (!sessionData) { ws.send(JSON.stringify({ type: 'ERROR', payload: { message: 'Session data not available for SUBMIT_RESPONSE' } })); return null; } if (sessionData.state === SessionState.GATHERING) { if (sessionData.responses.has(clientId)) { ws.send(JSON.stringify({ type: 'ERROR', payload: { message: 'You have already submitted a response for this session.' } })); return sessionData || null; // Return current sessionData on error } const { wants, accepts, noGoes, afraidToAsk } = payload.response; if ([...wants, ...accepts, ...noGoes].some((desire: string) => desire.length > 500) || afraidToAsk.length > 500) { ws.send(JSON.stringify({ type: 'ERROR', payload: { message: 'One of your desires or afraidToAsk exceeds the 500 character limit.' } })); return sessionData || null; // Return current sessionData on error } const hasContradictionsGist = await llmService.checkForInnerContradictions(payload.response); if (hasContradictionsGist) { ws.send(JSON.stringify({ type: 'ERROR', payload: { message: `Your submission contains inner contradictions: ${hasContradictionsGist} Please resolve them and submit again.` } })); return sessionData || null; // Return current sessionData on error } const encryptedWants = wants.map((d: string) => encryptionService.encrypt(d)); const encryptedAccepts = accepts.map((d: string) => encryptionService.encrypt(d)); const encryptedNoGoes = noGoes.map((d: string) => encryptionService.encrypt(d)); const encryptedAfraidToAsk = encryptionService.encrypt(afraidToAsk); sessionData.responses.set(clientId, { wants: encryptedWants, accepts: encryptedAccepts, noGoes: encryptedNoGoes, afraidToAsk: encryptedAfraidToAsk }); sessionData.submittedCount++; console.log(`Client ${clientId} submitted response. Submitted count: ${sessionData.submittedCount}/${sessionData.expectedResponses}`); if (sessionData.submittedCount === sessionData.expectedResponses) { sessionData.state = SessionState.HARMONIZING; broadcastToSession(sessionId, { type: 'STATE_UPDATE', payload: {} }); console.log(`Session ${sessionId} moved to HARMONIZING. Triggering LLM analysis.`); // Perform LLM analysis asynchronously (async () => { try { console.log('llm_analysis_started', sessionId); const startTime = process.hrtime.bigint(); const allDecryptedDesires = Array.from(sessionData.responses.values()).map(encryptedResponse => { const decryptedWants = encryptedResponse.wants.map((d: string) => encryptionService.decrypt(d)); const decryptedAccepts = encryptedResponse.accepts.map((d: string) => encryptionService.decrypt(d)); const decryptedNoGoes = encryptedResponse.noGoes.map((d: string) => encryptionService.decrypt(d)); const decryptedAfraidToAsk = encryptionService.decrypt(encryptedResponse.afraidToAsk); return { wants: decryptedWants, accepts: decryptedAccepts, noGoes: decryptedNoGoes, afraidToAsk: decryptedAfraidToAsk }; }); const decision = await llmService.analyzeDesires(allDecryptedDesires); sessionData.finalResult = decision; sessionData.state = SessionState.FINAL; broadcastToSession(sessionId, { type: 'STATE_UPDATE', payload: {} }); console.log('llm_analysis_completed', sessionId, { result: decision }); console.log('llm_analysis_duration', 0, sessionId, { status: 'success' }); console.log('llm_analysis_availability', 'available', sessionId); console.log(`Analysis complete for session ${sessionId}. Result:`, decision); } catch (error: any) { console.error(`Error during analysis for session ${sessionId}:`, error.message); sessionData.state = SessionState.ERROR; broadcastToSession(sessionId, { type: 'STATE_UPDATE', payload: {} }); console.log('llm_analysis_error', sessionId, { error: error.message }); console.log('llm_analysis_availability', 'unavailable', sessionId, { error: error.message }); } })(); } else { // Only broadcast the latest count if the session is not yet harmonizing broadcastToSession(sessionId, { type: 'STATE_UPDATE', payload: {} }); } } else { ws.send(JSON.stringify({ type: 'ERROR', payload: { message: `Session is not in GATHERING state. Current state: ${sessionData.state}` } })); } return sessionData || null; // Return current sessionData default: console.warn(`Unknown message type: ${type} from client ${clientId} in session ${sessionId}`); ws.send(JSON.stringify({ type: 'ERROR', payload: { message: `Unknown message type: ${type}` } })); return sessionData || null; // Return current sessionData } };