first commit
This commit is contained in:
423
utils/websocket.js
Normal file
423
utils/websocket.js
Normal file
@@ -0,0 +1,423 @@
|
||||
import { WebSocket } from "ws";
|
||||
import { verifyAccessToken } from "./jwt.js";
|
||||
|
||||
/**
|
||||
* WebSocket Manager
|
||||
* Handles WebSocket connections, user mapping, and broadcasting
|
||||
*/
|
||||
class WebSocketManager {
|
||||
constructor() {
|
||||
// Map of steamId -> WebSocket connection
|
||||
this.userConnections = new Map();
|
||||
|
||||
// Map of WebSocket -> steamId
|
||||
this.socketToUser = new Map();
|
||||
|
||||
// Map of WebSocket -> additional metadata
|
||||
this.socketMetadata = new Map();
|
||||
|
||||
// Set to track all connected sockets
|
||||
this.allSockets = new Set();
|
||||
}
|
||||
|
||||
/**
|
||||
* Register a new WebSocket connection
|
||||
* @param {WebSocket} socket - The WebSocket connection
|
||||
* @param {Object} request - The HTTP request object
|
||||
*/
|
||||
async handleConnection(socket, request) {
|
||||
console.log("🔌 New WebSocket connection attempt");
|
||||
|
||||
// Validate socket
|
||||
if (!socket || typeof socket.on !== "function") {
|
||||
console.error("❌ Invalid WebSocket object received:", typeof socket);
|
||||
return;
|
||||
}
|
||||
|
||||
// Add to all sockets set
|
||||
this.allSockets.add(socket);
|
||||
|
||||
// Set up ping/pong for connection health
|
||||
socket.isAlive = true;
|
||||
socket.on("pong", () => {
|
||||
socket.isAlive = true;
|
||||
});
|
||||
|
||||
// Handle authentication
|
||||
try {
|
||||
const user = await this.authenticateSocket(socket, request);
|
||||
|
||||
if (user) {
|
||||
this.mapUserToSocket(user.steamId, socket);
|
||||
console.log(
|
||||
`✅ WebSocket authenticated for user: ${user.steamId} (${user.username})`
|
||||
);
|
||||
|
||||
// Send welcome message
|
||||
this.sendToSocket(socket, {
|
||||
type: "connected",
|
||||
data: {
|
||||
steamId: user.steamId,
|
||||
username: user.username,
|
||||
userId: user.userId,
|
||||
timestamp: Date.now(),
|
||||
},
|
||||
});
|
||||
} else {
|
||||
console.log("⚠️ WebSocket connection without authentication (public)");
|
||||
}
|
||||
} catch (error) {
|
||||
console.error("❌ WebSocket authentication error:", error.message);
|
||||
}
|
||||
|
||||
// Handle incoming messages
|
||||
socket.on("message", (data) => {
|
||||
this.handleMessage(socket, data);
|
||||
});
|
||||
|
||||
// Handle disconnection
|
||||
socket.on("close", () => {
|
||||
this.handleDisconnection(socket);
|
||||
});
|
||||
|
||||
// Handle errors
|
||||
socket.on("error", (error) => {
|
||||
console.error("❌ WebSocket error:", error);
|
||||
this.handleDisconnection(socket);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Authenticate a WebSocket connection
|
||||
* @param {WebSocket} socket - The WebSocket connection
|
||||
* @param {Object} request - The HTTP request object
|
||||
* @returns {Object|null} User object or null if not authenticated
|
||||
*/
|
||||
async authenticateSocket(socket, request) {
|
||||
try {
|
||||
// Try to get token from query string
|
||||
const url = new URL(request.url, `http://${request.headers.host}`);
|
||||
const token = url.searchParams.get("token");
|
||||
|
||||
if (!token) {
|
||||
// Try to get token from cookie
|
||||
const cookies = this.parseCookies(request.headers.cookie || "");
|
||||
const cookieToken = cookies.accessToken;
|
||||
|
||||
if (!cookieToken) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const decoded = verifyAccessToken(cookieToken);
|
||||
return decoded;
|
||||
}
|
||||
|
||||
const decoded = verifyAccessToken(token);
|
||||
return decoded;
|
||||
} catch (error) {
|
||||
console.error("WebSocket auth error:", error.message);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Parse cookie header
|
||||
* @param {string} cookieHeader - The cookie header string
|
||||
* @returns {Object} Parsed cookies
|
||||
*/
|
||||
parseCookies(cookieHeader) {
|
||||
const cookies = {};
|
||||
if (!cookieHeader) return cookies;
|
||||
|
||||
cookieHeader.split(";").forEach((cookie) => {
|
||||
const parts = cookie.trim().split("=");
|
||||
if (parts.length === 2) {
|
||||
cookies[parts[0]] = decodeURIComponent(parts[1]);
|
||||
}
|
||||
});
|
||||
|
||||
return cookies;
|
||||
}
|
||||
|
||||
/**
|
||||
* Map a user ID to a WebSocket connection
|
||||
* @param {string} steamId - The Steam ID
|
||||
* @param {WebSocket} socket - The WebSocket connection
|
||||
*/
|
||||
mapUserToSocket(steamId, socket) {
|
||||
// Remove old connection if exists
|
||||
if (this.userConnections.has(steamId)) {
|
||||
const oldSocket = this.userConnections.get(steamId);
|
||||
this.handleDisconnection(oldSocket);
|
||||
}
|
||||
|
||||
this.userConnections.set(steamId, socket);
|
||||
this.socketToUser.set(socket, steamId);
|
||||
this.socketMetadata.set(socket, {
|
||||
steamId,
|
||||
connectedAt: Date.now(),
|
||||
lastActivity: Date.now(),
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle incoming WebSocket messages
|
||||
* @param {WebSocket} socket - The WebSocket connection
|
||||
* @param {Buffer|string} data - The message data
|
||||
*/
|
||||
handleMessage(socket, data) {
|
||||
try {
|
||||
const message = JSON.parse(data.toString());
|
||||
const steamId = this.socketToUser.get(socket);
|
||||
|
||||
// Update last activity
|
||||
const metadata = this.socketMetadata.get(socket);
|
||||
if (metadata) {
|
||||
metadata.lastActivity = Date.now();
|
||||
}
|
||||
|
||||
console.log(`📨 Message from ${steamId || "anonymous"}:`, message.type);
|
||||
|
||||
// Handle ping messages
|
||||
if (message.type === "ping") {
|
||||
this.sendToSocket(socket, { type: "pong", timestamp: Date.now() });
|
||||
return;
|
||||
}
|
||||
|
||||
// You can add more message handlers here
|
||||
// For now, just log unknown message types
|
||||
console.log("Unknown message type:", message.type);
|
||||
} catch (error) {
|
||||
console.error("❌ Error handling WebSocket message:", error);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle WebSocket disconnection
|
||||
* @param {WebSocket} socket - The WebSocket connection
|
||||
*/
|
||||
handleDisconnection(socket) {
|
||||
const steamId = this.socketToUser.get(socket);
|
||||
|
||||
if (steamId) {
|
||||
console.log(`🔌 User ${steamId} disconnected`);
|
||||
this.userConnections.delete(steamId);
|
||||
}
|
||||
|
||||
this.socketToUser.delete(socket);
|
||||
this.socketMetadata.delete(socket);
|
||||
this.allSockets.delete(socket);
|
||||
|
||||
try {
|
||||
if (socket.readyState === WebSocket.OPEN) {
|
||||
socket.close();
|
||||
}
|
||||
} catch (error) {
|
||||
console.error("Error closing socket:", error);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Send a message to a specific socket
|
||||
* @param {WebSocket} socket - The WebSocket connection
|
||||
* @param {Object} data - The data to send
|
||||
*/
|
||||
sendToSocket(socket, data) {
|
||||
if (socket.readyState === WebSocket.OPEN) {
|
||||
try {
|
||||
socket.send(JSON.stringify(data));
|
||||
} catch (error) {
|
||||
console.error("❌ Error sending to socket:", error);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Send a message to a specific user by Steam ID
|
||||
* @param {string} steamId - The Steam ID
|
||||
* @param {Object} data - The data to send
|
||||
* @returns {boolean} True if message was sent, false otherwise
|
||||
*/
|
||||
sendToUser(steamId, data) {
|
||||
const socket = this.userConnections.get(steamId);
|
||||
if (socket) {
|
||||
this.sendToSocket(socket, data);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Broadcast a message to all connected clients
|
||||
* @param {Object} data - The data to broadcast
|
||||
* @param {Array<string>} excludeSteamIds - Optional array of Steam IDs to exclude
|
||||
*/
|
||||
broadcastToAll(data, excludeSteamIds = []) {
|
||||
const message = JSON.stringify(data);
|
||||
let sentCount = 0;
|
||||
|
||||
this.allSockets.forEach((socket) => {
|
||||
const steamId = this.socketToUser.get(socket);
|
||||
|
||||
// Skip if user is in exclude list
|
||||
if (steamId && excludeSteamIds.includes(steamId)) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (socket.readyState === WebSocket.OPEN) {
|
||||
try {
|
||||
socket.send(message);
|
||||
sentCount++;
|
||||
} catch (error) {
|
||||
console.error("❌ Error broadcasting to socket:", error);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
console.log(`📡 Broadcast sent to ${sentCount} clients`);
|
||||
return sentCount;
|
||||
}
|
||||
|
||||
/**
|
||||
* Broadcast to authenticated users only
|
||||
* @param {Object} data - The data to broadcast
|
||||
* @param {Array<string>} excludeSteamIds - Optional array of Steam IDs to exclude
|
||||
*/
|
||||
broadcastToAuthenticated(data, excludeSteamIds = []) {
|
||||
const message = JSON.stringify(data);
|
||||
let sentCount = 0;
|
||||
|
||||
this.userConnections.forEach((socket, steamId) => {
|
||||
// Skip if user is in exclude list
|
||||
if (excludeSteamIds.includes(steamId)) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (socket.readyState === WebSocket.OPEN) {
|
||||
try {
|
||||
socket.send(message);
|
||||
sentCount++;
|
||||
} catch (error) {
|
||||
console.error("❌ Error broadcasting to user:", error);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
console.log(`📡 Broadcast sent to ${sentCount} authenticated users`);
|
||||
return sentCount;
|
||||
}
|
||||
|
||||
/**
|
||||
* Broadcast a public message (e.g., price updates, new listings)
|
||||
* This sends to all clients regardless of authentication
|
||||
* @param {string} type - The message type
|
||||
* @param {Object} payload - The message payload
|
||||
*/
|
||||
broadcastPublic(type, payload) {
|
||||
return this.broadcastToAll({
|
||||
type,
|
||||
data: payload,
|
||||
timestamp: Date.now(),
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if a user is currently connected
|
||||
* @param {string} steamId - The Steam ID
|
||||
* @returns {boolean} True if user is connected
|
||||
*/
|
||||
isUserConnected(steamId) {
|
||||
return this.userConnections.has(steamId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the number of connected users
|
||||
* @returns {number} Number of authenticated users
|
||||
*/
|
||||
getAuthenticatedUserCount() {
|
||||
return this.userConnections.size;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the total number of connected sockets
|
||||
* @returns {number} Total number of sockets
|
||||
*/
|
||||
getTotalSocketCount() {
|
||||
return this.allSockets.size;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get connection metadata for a user
|
||||
* @param {string} steamId - The Steam ID
|
||||
* @returns {Object|null} Metadata or null if not found
|
||||
*/
|
||||
getUserMetadata(steamId) {
|
||||
const socket = this.userConnections.get(steamId);
|
||||
if (socket) {
|
||||
return this.socketMetadata.get(socket);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Start heartbeat interval to check for dead connections
|
||||
* @param {number} interval - Interval in milliseconds
|
||||
*/
|
||||
startHeartbeat(interval = 30000) {
|
||||
this.heartbeatInterval = setInterval(() => {
|
||||
this.allSockets.forEach((socket) => {
|
||||
if (socket.isAlive === false) {
|
||||
console.log("💀 Terminating dead connection");
|
||||
return this.handleDisconnection(socket);
|
||||
}
|
||||
|
||||
socket.isAlive = false;
|
||||
if (socket.readyState === WebSocket.OPEN) {
|
||||
socket.ping();
|
||||
}
|
||||
});
|
||||
}, interval);
|
||||
|
||||
console.log(`💓 WebSocket heartbeat started (${interval}ms)`);
|
||||
}
|
||||
|
||||
/**
|
||||
* Stop heartbeat interval
|
||||
*/
|
||||
stopHeartbeat() {
|
||||
if (this.heartbeatInterval) {
|
||||
clearInterval(this.heartbeatInterval);
|
||||
console.log("💓 WebSocket heartbeat stopped");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Gracefully close all connections
|
||||
*/
|
||||
closeAll() {
|
||||
console.log("🔌 Closing all WebSocket connections...");
|
||||
|
||||
this.allSockets.forEach((socket) => {
|
||||
try {
|
||||
if (socket.readyState === WebSocket.OPEN) {
|
||||
socket.close(1000, "Server shutting down");
|
||||
}
|
||||
} catch (error) {
|
||||
console.error("Error closing socket:", error);
|
||||
}
|
||||
});
|
||||
|
||||
this.userConnections.clear();
|
||||
this.socketToUser.clear();
|
||||
this.socketMetadata.clear();
|
||||
this.allSockets.clear();
|
||||
this.stopHeartbeat();
|
||||
|
||||
console.log("✅ All WebSocket connections closed");
|
||||
}
|
||||
}
|
||||
|
||||
// Create and export a singleton instance
|
||||
export const wsManager = new WebSocketManager();
|
||||
|
||||
export default wsManager;
|
||||
Reference in New Issue
Block a user