import { WebSocket } from "ws"; import { verifyAccessToken } from "./jwt.js"; /** * WebSocket Manager * Handles WebSocket connections, user mapping, and broadcasting */ class WebSocketManager { constructor() { // Map of steamId -> WebSocket connection this.userConnections = new Map(); // Map of WebSocket -> steamId this.socketToUser = new Map(); // Map of WebSocket -> additional metadata this.socketMetadata = new Map(); // Set to track all connected sockets this.allSockets = new Set(); } /** * Register a new WebSocket connection * @param {WebSocket} socket - The WebSocket connection * @param {Object} request - The HTTP request object */ async handleConnection(socket, request) { console.log("🔌 New WebSocket connection attempt"); // Validate socket if (!socket || typeof socket.on !== "function") { console.error("❌ Invalid WebSocket object received:", typeof socket); return; } // Add to all sockets set this.allSockets.add(socket); // Set up ping/pong for connection health socket.isAlive = true; socket.on("pong", () => { socket.isAlive = true; }); // Handle authentication try { const user = await this.authenticateSocket(socket, request); if (user) { this.mapUserToSocket(user.steamId, socket); console.log( `✅ WebSocket authenticated for user: ${user.steamId} (${user.username})` ); // Send welcome message this.sendToSocket(socket, { type: "connected", data: { steamId: user.steamId, username: user.username, userId: user.userId, timestamp: Date.now(), }, }); } else { console.log("⚠️ WebSocket connection without authentication (public)"); } } catch (error) { console.error("❌ WebSocket authentication error:", error.message); } // Handle incoming messages socket.on("message", (data) => { this.handleMessage(socket, data); }); // Handle disconnection socket.on("close", () => { this.handleDisconnection(socket); }); // Handle errors socket.on("error", (error) => { console.error("❌ WebSocket error:", error); this.handleDisconnection(socket); }); } /** * Authenticate a WebSocket connection * @param {WebSocket} socket - The WebSocket connection * @param {Object} request - The HTTP request object * @returns {Object|null} User object or null if not authenticated */ async authenticateSocket(socket, request) { try { // Try to get token from query string const url = new URL(request.url, `http://${request.headers.host}`); const token = url.searchParams.get("token"); if (!token) { // Try to get token from cookie const cookies = this.parseCookies(request.headers.cookie || ""); const cookieToken = cookies.accessToken; if (!cookieToken) { return null; } const decoded = verifyAccessToken(cookieToken); return decoded; } const decoded = verifyAccessToken(token); return decoded; } catch (error) { console.error("WebSocket auth error:", error.message); return null; } } /** * Parse cookie header * @param {string} cookieHeader - The cookie header string * @returns {Object} Parsed cookies */ parseCookies(cookieHeader) { const cookies = {}; if (!cookieHeader) return cookies; cookieHeader.split(";").forEach((cookie) => { const parts = cookie.trim().split("="); if (parts.length === 2) { cookies[parts[0]] = decodeURIComponent(parts[1]); } }); return cookies; } /** * Map a user ID to a WebSocket connection * @param {string} steamId - The Steam ID * @param {WebSocket} socket - The WebSocket connection */ mapUserToSocket(steamId, socket) { // Remove old connection if exists if (this.userConnections.has(steamId)) { const oldSocket = this.userConnections.get(steamId); this.handleDisconnection(oldSocket); } this.userConnections.set(steamId, socket); this.socketToUser.set(socket, steamId); this.socketMetadata.set(socket, { steamId, connectedAt: Date.now(), lastActivity: Date.now(), }); } /** * Handle incoming WebSocket messages * @param {WebSocket} socket - The WebSocket connection * @param {Buffer|string} data - The message data */ handleMessage(socket, data) { try { const message = JSON.parse(data.toString()); const steamId = this.socketToUser.get(socket); // Update last activity const metadata = this.socketMetadata.get(socket); if (metadata) { metadata.lastActivity = Date.now(); } console.log(`📨 Message from ${steamId || "anonymous"}:`, message.type); // Handle ping messages if (message.type === "ping") { this.sendToSocket(socket, { type: "pong", timestamp: Date.now() }); return; } // You can add more message handlers here // For now, just log unknown message types console.log("Unknown message type:", message.type); } catch (error) { console.error("❌ Error handling WebSocket message:", error); } } /** * Handle WebSocket disconnection * @param {WebSocket} socket - The WebSocket connection */ handleDisconnection(socket) { const steamId = this.socketToUser.get(socket); if (steamId) { console.log(`🔌 User ${steamId} disconnected`); this.userConnections.delete(steamId); } this.socketToUser.delete(socket); this.socketMetadata.delete(socket); this.allSockets.delete(socket); try { if (socket.readyState === WebSocket.OPEN) { socket.close(); } } catch (error) { console.error("Error closing socket:", error); } } /** * Send a message to a specific socket * @param {WebSocket} socket - The WebSocket connection * @param {Object} data - The data to send */ sendToSocket(socket, data) { if (socket.readyState === WebSocket.OPEN) { try { socket.send(JSON.stringify(data)); } catch (error) { console.error("❌ Error sending to socket:", error); } } } /** * Send a message to a specific user by Steam ID * @param {string} steamId - The Steam ID * @param {Object} data - The data to send * @returns {boolean} True if message was sent, false otherwise */ sendToUser(steamId, data) { const socket = this.userConnections.get(steamId); if (socket) { this.sendToSocket(socket, data); return true; } return false; } /** * Broadcast a message to all connected clients * @param {Object} data - The data to broadcast * @param {Array} excludeSteamIds - Optional array of Steam IDs to exclude */ broadcastToAll(data, excludeSteamIds = []) { const message = JSON.stringify(data); let sentCount = 0; this.allSockets.forEach((socket) => { const steamId = this.socketToUser.get(socket); // Skip if user is in exclude list if (steamId && excludeSteamIds.includes(steamId)) { return; } if (socket.readyState === WebSocket.OPEN) { try { socket.send(message); sentCount++; } catch (error) { console.error("❌ Error broadcasting to socket:", error); } } }); console.log(`📡 Broadcast sent to ${sentCount} clients`); return sentCount; } /** * Broadcast to authenticated users only * @param {Object} data - The data to broadcast * @param {Array} excludeSteamIds - Optional array of Steam IDs to exclude */ broadcastToAuthenticated(data, excludeSteamIds = []) { const message = JSON.stringify(data); let sentCount = 0; this.userConnections.forEach((socket, steamId) => { // Skip if user is in exclude list if (excludeSteamIds.includes(steamId)) { return; } if (socket.readyState === WebSocket.OPEN) { try { socket.send(message); sentCount++; } catch (error) { console.error("❌ Error broadcasting to user:", error); } } }); console.log(`📡 Broadcast sent to ${sentCount} authenticated users`); return sentCount; } /** * Broadcast a public message (e.g., price updates, new listings) * This sends to all clients regardless of authentication * @param {string} type - The message type * @param {Object} payload - The message payload */ broadcastPublic(type, payload) { return this.broadcastToAll({ type, data: payload, timestamp: Date.now(), }); } /** * Check if a user is currently connected * @param {string} steamId - The Steam ID * @returns {boolean} True if user is connected */ isUserConnected(steamId) { return this.userConnections.has(steamId); } /** * Get the number of connected users * @returns {number} Number of authenticated users */ getAuthenticatedUserCount() { return this.userConnections.size; } /** * Get the total number of connected sockets * @returns {number} Total number of sockets */ getTotalSocketCount() { return this.allSockets.size; } /** * Get connection metadata for a user * @param {string} steamId - The Steam ID * @returns {Object|null} Metadata or null if not found */ getUserMetadata(steamId) { const socket = this.userConnections.get(steamId); if (socket) { return this.socketMetadata.get(socket); } return null; } /** * Start heartbeat interval to check for dead connections * @param {number} interval - Interval in milliseconds */ startHeartbeat(interval = 30000) { this.heartbeatInterval = setInterval(() => { this.allSockets.forEach((socket) => { if (socket.isAlive === false) { console.log("💀 Terminating dead connection"); return this.handleDisconnection(socket); } socket.isAlive = false; if (socket.readyState === WebSocket.OPEN) { socket.ping(); } }); }, interval); console.log(`💓 WebSocket heartbeat started (${interval}ms)`); } /** * Stop heartbeat interval */ stopHeartbeat() { if (this.heartbeatInterval) { clearInterval(this.heartbeatInterval); console.log("💓 WebSocket heartbeat stopped"); } } /** * Gracefully close all connections */ closeAll() { console.log("🔌 Closing all WebSocket connections..."); this.allSockets.forEach((socket) => { try { if (socket.readyState === WebSocket.OPEN) { socket.close(1000, "Server shutting down"); } } catch (error) { console.error("Error closing socket:", error); } }); this.userConnections.clear(); this.socketToUser.clear(); this.socketMetadata.clear(); this.allSockets.clear(); this.stopHeartbeat(); console.log("✅ All WebSocket connections closed"); } } // Create and export a singleton instance export const wsManager = new WebSocketManager(); export default wsManager;