An optimized WebSocket server designed for industrial automation and digital twin applications. Features subscription-based messaging, real-time telemetry, and scalable client management.
// ws-server-optimized.js
import { WebSocketServer } from "ws";
import { performance } from "perf_hooks";
class DigitalTwinServer {
constructor(port = 8765) {
this.port = port;
this.wss = null;
this.clients = new Set();
this.subscriptions = new Map(); // client -> topics
this.telemetryIntervals = new Map(); // client -> intervals
this.messageQueue = new Map(); // client -> queue
this.stats = {
messagesSent: 0,
bytesTransmitted: 0,
connections: 0,
startTime: Date.now()
};
}
start() {
this.wss = new WebSocketServer({ port: this.port });
console.log(`[Server] Digital Twin WS Server listening on :${this.port}`);
this.wss.on("connection", (ws, req) => {
this.handleConnection(ws, req);
});
// Start performance monitoring
this.startMonitoring();
}
handleConnection(ws, req) {
const clientId = `${req.socket.remoteAddress}-${Date.now()}`;
console.log(`[Server] Client connected: ${clientId}`);
// Initialize client
this.clients.add(ws);
this.subscriptions.set(ws, new Set());
this.messageQueue.set(ws, []);
this.stats.connections++;
// Setup client handlers
ws.on("message", (message) => {
this.handleMessage(ws, message);
});
ws.on("close", () => {
this.handleDisconnection(ws, clientId);
});
ws.on("error", (error) => {
console.error(`[Server] Client error ${clientId}:`, error);
});
// Send initial state
this.sendMessage(ws, {
topic: "system/connected",
data: {
timestamp: Date.now(),
clientId,
serverVersion: "1.0.0"
}
});
// Start telemetry for this client
this.startTelemetry(ws);
}
handleMessage(ws, message) {
try {
const msg = JSON.parse(message.toString());
switch (msg.cmd) {
case "subscribe":
this.handleSubscribe(ws, msg.topics || []);
break;
case "unsubscribe":
this.handleUnsubscribe(ws, msg.topics || []);
break;
case "ping":
this.sendMessage(ws, { topic: "system/pong", data: { timestamp: Date.now() } });
break;
case "command":
this.handleCommand(ws, msg);
break;
default:
console.log(`[Server] Unknown command: ${msg.cmd}`);
}
} catch (error) {
console.error("[Server] Failed to parse message:", error);
}
}
startTelemetry(ws) {
const intervals = [];
// Joint states at 50Hz (20ms)
intervals.push(setInterval(() => {
if (this.shouldSend(ws, "robots/arm1/joint_states@v1")) {
this.queueMessage(ws, {
topic: "robots/arm1/joint_states@v1",
data: {
joints: this.generateJointStates(),
timestamp: Date.now(),
velocity: this.generateJointVelocities(),
effort: this.generateJointEfforts()
}
});
}
}, 20));
// State updates at 1Hz
intervals.push(setInterval(() => {
if (this.shouldSend(ws, "robots/arm1/state@v1")) {
this.queueMessage(ws, {
topic: "robots/arm1/state@v1",
data: {
state: this.getCurrentState(),
taskId: `task-${Math.floor(Date.now() / 10000)}`,
progress: Math.random(),
timestamp: Date.now()
}
});
}
}, 1000));
this.telemetryIntervals.set(ws, intervals);
}
// Data generators and other methods...
}
// Usage
const server = new DigitalTwinServer(8765);
server.start();
// Export for use as module
export default DigitalTwinServer;node ws-server-optimized.jsconst ws = new WebSocket('ws://localhost:8765');ws.send(JSON.stringify({
cmd: "subscribe",
topics: ["robots/arm1/#"]
}));ws.send(JSON.stringify({
cmd: "command",
data: { action: "emergency_stop" }
}));