424 lines
11 KiB
JavaScript
424 lines
11 KiB
JavaScript
import { WebSocket } from "ws";
|
|
import { verifyAccessToken } from "./jwt.js";
|
|
|
|
/**
|
|
* WebSocket Manager
|
|
* Handles WebSocket connections, user mapping, and broadcasting
|
|
*/
|
|
class WebSocketManager {
|
|
constructor() {
|
|
// Map of steamId -> WebSocket connection
|
|
this.userConnections = new Map();
|
|
|
|
// Map of WebSocket -> steamId
|
|
this.socketToUser = new Map();
|
|
|
|
// Map of WebSocket -> additional metadata
|
|
this.socketMetadata = new Map();
|
|
|
|
// Set to track all connected sockets
|
|
this.allSockets = new Set();
|
|
}
|
|
|
|
/**
|
|
* Register a new WebSocket connection
|
|
* @param {WebSocket} socket - The WebSocket connection
|
|
* @param {Object} request - The HTTP request object
|
|
*/
|
|
async handleConnection(socket, request) {
|
|
console.log("🔌 New WebSocket connection attempt");
|
|
|
|
// Validate socket
|
|
if (!socket || typeof socket.on !== "function") {
|
|
console.error("❌ Invalid WebSocket object received:", typeof socket);
|
|
return;
|
|
}
|
|
|
|
// Add to all sockets set
|
|
this.allSockets.add(socket);
|
|
|
|
// Set up ping/pong for connection health
|
|
socket.isAlive = true;
|
|
socket.on("pong", () => {
|
|
socket.isAlive = true;
|
|
});
|
|
|
|
// Handle authentication
|
|
try {
|
|
const user = await this.authenticateSocket(socket, request);
|
|
|
|
if (user) {
|
|
this.mapUserToSocket(user.steamId, socket);
|
|
console.log(
|
|
`✅ WebSocket authenticated for user: ${user.steamId} (${user.username})`
|
|
);
|
|
|
|
// Send welcome message
|
|
this.sendToSocket(socket, {
|
|
type: "connected",
|
|
data: {
|
|
steamId: user.steamId,
|
|
username: user.username,
|
|
userId: user.userId,
|
|
timestamp: Date.now(),
|
|
},
|
|
});
|
|
} else {
|
|
console.log("⚠️ WebSocket connection without authentication (public)");
|
|
}
|
|
} catch (error) {
|
|
console.error("❌ WebSocket authentication error:", error.message);
|
|
}
|
|
|
|
// Handle incoming messages
|
|
socket.on("message", (data) => {
|
|
this.handleMessage(socket, data);
|
|
});
|
|
|
|
// Handle disconnection
|
|
socket.on("close", () => {
|
|
this.handleDisconnection(socket);
|
|
});
|
|
|
|
// Handle errors
|
|
socket.on("error", (error) => {
|
|
console.error("❌ WebSocket error:", error);
|
|
this.handleDisconnection(socket);
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Authenticate a WebSocket connection
|
|
* @param {WebSocket} socket - The WebSocket connection
|
|
* @param {Object} request - The HTTP request object
|
|
* @returns {Object|null} User object or null if not authenticated
|
|
*/
|
|
async authenticateSocket(socket, request) {
|
|
try {
|
|
// Try to get token from query string
|
|
const url = new URL(request.url, `http://${request.headers.host}`);
|
|
const token = url.searchParams.get("token");
|
|
|
|
if (!token) {
|
|
// Try to get token from cookie
|
|
const cookies = this.parseCookies(request.headers.cookie || "");
|
|
const cookieToken = cookies.accessToken;
|
|
|
|
if (!cookieToken) {
|
|
return null;
|
|
}
|
|
|
|
const decoded = verifyAccessToken(cookieToken);
|
|
return decoded;
|
|
}
|
|
|
|
const decoded = verifyAccessToken(token);
|
|
return decoded;
|
|
} catch (error) {
|
|
console.error("WebSocket auth error:", error.message);
|
|
return null;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Parse cookie header
|
|
* @param {string} cookieHeader - The cookie header string
|
|
* @returns {Object} Parsed cookies
|
|
*/
|
|
parseCookies(cookieHeader) {
|
|
const cookies = {};
|
|
if (!cookieHeader) return cookies;
|
|
|
|
cookieHeader.split(";").forEach((cookie) => {
|
|
const parts = cookie.trim().split("=");
|
|
if (parts.length === 2) {
|
|
cookies[parts[0]] = decodeURIComponent(parts[1]);
|
|
}
|
|
});
|
|
|
|
return cookies;
|
|
}
|
|
|
|
/**
|
|
* Map a user ID to a WebSocket connection
|
|
* @param {string} steamId - The Steam ID
|
|
* @param {WebSocket} socket - The WebSocket connection
|
|
*/
|
|
mapUserToSocket(steamId, socket) {
|
|
// Remove old connection if exists
|
|
if (this.userConnections.has(steamId)) {
|
|
const oldSocket = this.userConnections.get(steamId);
|
|
this.handleDisconnection(oldSocket);
|
|
}
|
|
|
|
this.userConnections.set(steamId, socket);
|
|
this.socketToUser.set(socket, steamId);
|
|
this.socketMetadata.set(socket, {
|
|
steamId,
|
|
connectedAt: Date.now(),
|
|
lastActivity: Date.now(),
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Handle incoming WebSocket messages
|
|
* @param {WebSocket} socket - The WebSocket connection
|
|
* @param {Buffer|string} data - The message data
|
|
*/
|
|
handleMessage(socket, data) {
|
|
try {
|
|
const message = JSON.parse(data.toString());
|
|
const steamId = this.socketToUser.get(socket);
|
|
|
|
// Update last activity
|
|
const metadata = this.socketMetadata.get(socket);
|
|
if (metadata) {
|
|
metadata.lastActivity = Date.now();
|
|
}
|
|
|
|
console.log(`📨 Message from ${steamId || "anonymous"}:`, message.type);
|
|
|
|
// Handle ping messages
|
|
if (message.type === "ping") {
|
|
this.sendToSocket(socket, { type: "pong", timestamp: Date.now() });
|
|
return;
|
|
}
|
|
|
|
// You can add more message handlers here
|
|
// For now, just log unknown message types
|
|
console.log("Unknown message type:", message.type);
|
|
} catch (error) {
|
|
console.error("❌ Error handling WebSocket message:", error);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Handle WebSocket disconnection
|
|
* @param {WebSocket} socket - The WebSocket connection
|
|
*/
|
|
handleDisconnection(socket) {
|
|
const steamId = this.socketToUser.get(socket);
|
|
|
|
if (steamId) {
|
|
console.log(`🔌 User ${steamId} disconnected`);
|
|
this.userConnections.delete(steamId);
|
|
}
|
|
|
|
this.socketToUser.delete(socket);
|
|
this.socketMetadata.delete(socket);
|
|
this.allSockets.delete(socket);
|
|
|
|
try {
|
|
if (socket.readyState === WebSocket.OPEN) {
|
|
socket.close();
|
|
}
|
|
} catch (error) {
|
|
console.error("Error closing socket:", error);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Send a message to a specific socket
|
|
* @param {WebSocket} socket - The WebSocket connection
|
|
* @param {Object} data - The data to send
|
|
*/
|
|
sendToSocket(socket, data) {
|
|
if (socket.readyState === WebSocket.OPEN) {
|
|
try {
|
|
socket.send(JSON.stringify(data));
|
|
} catch (error) {
|
|
console.error("❌ Error sending to socket:", error);
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Send a message to a specific user by Steam ID
|
|
* @param {string} steamId - The Steam ID
|
|
* @param {Object} data - The data to send
|
|
* @returns {boolean} True if message was sent, false otherwise
|
|
*/
|
|
sendToUser(steamId, data) {
|
|
const socket = this.userConnections.get(steamId);
|
|
if (socket) {
|
|
this.sendToSocket(socket, data);
|
|
return true;
|
|
}
|
|
return false;
|
|
}
|
|
|
|
/**
|
|
* Broadcast a message to all connected clients
|
|
* @param {Object} data - The data to broadcast
|
|
* @param {Array<string>} excludeSteamIds - Optional array of Steam IDs to exclude
|
|
*/
|
|
broadcastToAll(data, excludeSteamIds = []) {
|
|
const message = JSON.stringify(data);
|
|
let sentCount = 0;
|
|
|
|
this.allSockets.forEach((socket) => {
|
|
const steamId = this.socketToUser.get(socket);
|
|
|
|
// Skip if user is in exclude list
|
|
if (steamId && excludeSteamIds.includes(steamId)) {
|
|
return;
|
|
}
|
|
|
|
if (socket.readyState === WebSocket.OPEN) {
|
|
try {
|
|
socket.send(message);
|
|
sentCount++;
|
|
} catch (error) {
|
|
console.error("❌ Error broadcasting to socket:", error);
|
|
}
|
|
}
|
|
});
|
|
|
|
console.log(`📡 Broadcast sent to ${sentCount} clients`);
|
|
return sentCount;
|
|
}
|
|
|
|
/**
|
|
* Broadcast to authenticated users only
|
|
* @param {Object} data - The data to broadcast
|
|
* @param {Array<string>} excludeSteamIds - Optional array of Steam IDs to exclude
|
|
*/
|
|
broadcastToAuthenticated(data, excludeSteamIds = []) {
|
|
const message = JSON.stringify(data);
|
|
let sentCount = 0;
|
|
|
|
this.userConnections.forEach((socket, steamId) => {
|
|
// Skip if user is in exclude list
|
|
if (excludeSteamIds.includes(steamId)) {
|
|
return;
|
|
}
|
|
|
|
if (socket.readyState === WebSocket.OPEN) {
|
|
try {
|
|
socket.send(message);
|
|
sentCount++;
|
|
} catch (error) {
|
|
console.error("❌ Error broadcasting to user:", error);
|
|
}
|
|
}
|
|
});
|
|
|
|
console.log(`📡 Broadcast sent to ${sentCount} authenticated users`);
|
|
return sentCount;
|
|
}
|
|
|
|
/**
|
|
* Broadcast a public message (e.g., price updates, new listings)
|
|
* This sends to all clients regardless of authentication
|
|
* @param {string} type - The message type
|
|
* @param {Object} payload - The message payload
|
|
*/
|
|
broadcastPublic(type, payload) {
|
|
return this.broadcastToAll({
|
|
type,
|
|
data: payload,
|
|
timestamp: Date.now(),
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Check if a user is currently connected
|
|
* @param {string} steamId - The Steam ID
|
|
* @returns {boolean} True if user is connected
|
|
*/
|
|
isUserConnected(steamId) {
|
|
return this.userConnections.has(steamId);
|
|
}
|
|
|
|
/**
|
|
* Get the number of connected users
|
|
* @returns {number} Number of authenticated users
|
|
*/
|
|
getAuthenticatedUserCount() {
|
|
return this.userConnections.size;
|
|
}
|
|
|
|
/**
|
|
* Get the total number of connected sockets
|
|
* @returns {number} Total number of sockets
|
|
*/
|
|
getTotalSocketCount() {
|
|
return this.allSockets.size;
|
|
}
|
|
|
|
/**
|
|
* Get connection metadata for a user
|
|
* @param {string} steamId - The Steam ID
|
|
* @returns {Object|null} Metadata or null if not found
|
|
*/
|
|
getUserMetadata(steamId) {
|
|
const socket = this.userConnections.get(steamId);
|
|
if (socket) {
|
|
return this.socketMetadata.get(socket);
|
|
}
|
|
return null;
|
|
}
|
|
|
|
/**
|
|
* Start heartbeat interval to check for dead connections
|
|
* @param {number} interval - Interval in milliseconds
|
|
*/
|
|
startHeartbeat(interval = 30000) {
|
|
this.heartbeatInterval = setInterval(() => {
|
|
this.allSockets.forEach((socket) => {
|
|
if (socket.isAlive === false) {
|
|
console.log("💀 Terminating dead connection");
|
|
return this.handleDisconnection(socket);
|
|
}
|
|
|
|
socket.isAlive = false;
|
|
if (socket.readyState === WebSocket.OPEN) {
|
|
socket.ping();
|
|
}
|
|
});
|
|
}, interval);
|
|
|
|
console.log(`💓 WebSocket heartbeat started (${interval}ms)`);
|
|
}
|
|
|
|
/**
|
|
* Stop heartbeat interval
|
|
*/
|
|
stopHeartbeat() {
|
|
if (this.heartbeatInterval) {
|
|
clearInterval(this.heartbeatInterval);
|
|
console.log("💓 WebSocket heartbeat stopped");
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Gracefully close all connections
|
|
*/
|
|
closeAll() {
|
|
console.log("🔌 Closing all WebSocket connections...");
|
|
|
|
this.allSockets.forEach((socket) => {
|
|
try {
|
|
if (socket.readyState === WebSocket.OPEN) {
|
|
socket.close(1000, "Server shutting down");
|
|
}
|
|
} catch (error) {
|
|
console.error("Error closing socket:", error);
|
|
}
|
|
});
|
|
|
|
this.userConnections.clear();
|
|
this.socketToUser.clear();
|
|
this.socketMetadata.clear();
|
|
this.allSockets.clear();
|
|
this.stopHeartbeat();
|
|
|
|
console.log("✅ All WebSocket connections closed");
|
|
}
|
|
}
|
|
|
|
// Create and export a singleton instance
|
|
export const wsManager = new WebSocketManager();
|
|
|
|
export default wsManager;
|