'use strict'; const EventEmitter = require('node:events'); const { setTimeout, setInterval } = require('node:timers'); const WebSocket = require('../../WebSocket'); const { Status, Events, ShardEvents, Opcodes, WSEvents } = require('../../util/Constants'); const Intents = require('../../util/Intents'); const STATUS_KEYS = Object.keys(Status); const CONNECTION_STATE = Object.keys(WebSocket.WebSocket); let zlib; try { zlib = require('zlib-sync'); } catch {} // eslint-disable-line no-empty /** * Represents a Shard's WebSocket connection */ class WebSocketShard extends EventEmitter { constructor(manager, id) { super(); /** * The WebSocketManager of the shard * @type {WebSocketManager} */ this.manager = manager; /** * The shard's id * @type {number} */ this.id = id; /** * The current status of the shard * @type {Status} */ this.status = Status.IDLE; /** * The current sequence of the shard * @type {number} * @private */ this.sequence = -1; /** * The sequence of the shard after close * @type {number} * @private */ this.closeSequence = 0; /** * The current session id of the shard * @type {?string} * @private */ this.sessionId = null; /** * The previous heartbeat ping of the shard * @type {number} */ this.ping = -1; /** * The last time a ping was sent (a timestamp) * @type {number} * @private */ this.lastPingTimestamp = -1; /** * If we received a heartbeat ack back. Used to identify zombie connections * @type {boolean} * @private */ this.lastHeartbeatAcked = true; /** * Contains the rate limit queue and metadata * @name WebSocketShard#ratelimit * @type {Object} * @private */ Object.defineProperty(this, 'ratelimit', { value: { queue: [], total: 120, remaining: 120, time: 60e3, timer: null, }, }); /** * The WebSocket connection for the current shard * @name WebSocketShard#connection * @type {?WebSocket} * @private */ Object.defineProperty(this, 'connection', { value: null, writable: true }); /** * @external Inflate * @see {@link https://www.npmjs.com/package/zlib-sync} */ /** * The compression to use * @name WebSocketShard#inflate * @type {?Inflate} * @private */ Object.defineProperty(this, 'inflate', { value: null, writable: true }); /** * The HELLO timeout * @name WebSocketShard#helloTimeout * @type {?NodeJS.Timeout} * @private */ Object.defineProperty(this, 'helloTimeout', { value: null, writable: true }); /** * If the manager attached its event handlers on the shard * @name WebSocketShard#eventsAttached * @type {boolean} * @private */ Object.defineProperty(this, 'eventsAttached', { value: false, writable: true }); /** * A set of guild ids this shard expects to receive * @name WebSocketShard#expectedGuilds * @type {?Set} * @private */ Object.defineProperty(this, 'expectedGuilds', { value: null, writable: true }); /** * The ready timeout * @name WebSocketShard#readyTimeout * @type {?NodeJS.Timeout} * @private */ Object.defineProperty(this, 'readyTimeout', { value: null, writable: true }); /** * Time when the WebSocket connection was opened * @name WebSocketShard#connectedAt * @type {number} * @private */ Object.defineProperty(this, 'connectedAt', { value: 0, writable: true }); } /** * Emits a debug event. * @param {string} message The debug message * @private */ debug(message) { this.manager.debug(message, this); } /** * Connects the shard to the gateway. * @private * @returns {Promise} A promise that will resolve if the shard turns ready successfully, * or reject if we couldn't connect */ connect() { const { gateway, client } = this.manager; if (this.connection?.readyState === WebSocket.OPEN && this.status === Status.READY) { return Promise.resolve(); } return new Promise((resolve, reject) => { const cleanup = () => { this.removeListener(ShardEvents.CLOSE, onClose); this.removeListener(ShardEvents.READY, onReady); this.removeListener(ShardEvents.RESUMED, onResumed); this.removeListener(ShardEvents.INVALID_SESSION, onInvalidOrDestroyed); this.removeListener(ShardEvents.DESTROYED, onInvalidOrDestroyed); }; const onReady = () => { cleanup(); resolve(); }; const onResumed = () => { cleanup(); resolve(); }; const onClose = event => { cleanup(); reject(event); }; const onInvalidOrDestroyed = () => { cleanup(); // eslint-disable-next-line prefer-promise-reject-errors reject(); }; this.once(ShardEvents.READY, onReady); this.once(ShardEvents.RESUMED, onResumed); this.once(ShardEvents.CLOSE, onClose); this.once(ShardEvents.INVALID_SESSION, onInvalidOrDestroyed); this.once(ShardEvents.DESTROYED, onInvalidOrDestroyed); if (this.connection?.readyState === WebSocket.OPEN) { this.debug('An open connection was found, attempting an immediate identify.'); this.identify(); return; } if (this.connection) { this.debug(`A connection object was found. Cleaning up before continuing. State: ${CONNECTION_STATE[this.connection.readyState]}`); this.destroy({ emit: false }); } const wsQuery = { v: client.options.ws.version }; if (zlib) { this.inflate = new zlib.Inflate({ chunkSize: 65535, flush: zlib.Z_SYNC_FLUSH, to: WebSocket.encoding === 'json' ? 'string' : '', }); wsQuery.compress = 'zlib-stream'; } this.debug( `[CONNECT] Gateway : ${gateway} Version : ${client.options.ws.version} Encoding : ${WebSocket.encoding} Compression: ${zlib ? 'zlib-stream' : 'none'}`, ); this.status = this.status === Status.DISCONNECTED ? Status.RECONNECTING : Status.CONNECTING; this.setHelloTimeout(); this.connectedAt = Date.now(); const ws = (this.connection = WebSocket.create(gateway, wsQuery)); ws.onopen = this.onOpen.bind(this); ws.onmessage = this.onMessage.bind(this); ws.onerror = this.onError.bind(this); ws.onclose = this.onClose.bind(this); }); } /** * Called whenever a connection is opened to the gateway. * @private */ onOpen() { this.debug(`[CONNECTED] Took ${Date.now() - this.connectedAt}ms`); this.status = Status.NEARLY; } /** * Called whenever a message is received. * @param {MessageEvent} event Event received * @private */ onMessage({ data }) { let raw; if (data instanceof ArrayBuffer) data = new Uint8Array(data); if (zlib) { const l = data.length; const flush = l >= 4 && data[l - 4] === 0x00 && data[l - 3] === 0x00 && data[l - 2] === 0xff && data[l - 1] === 0xff; this.inflate.push(data, flush && zlib.Z_SYNC_FLUSH); if (!flush) return; raw = this.inflate.result; } else { raw = data; } let packet; try { packet = WebSocket.unpack(raw); } catch (err) { this.manager.client.emit(Events.SHARD_ERROR, err, this.id); return; } this.manager.client.emit(Events.RAW, packet, this.id); if (packet.op === Opcodes.DISPATCH) this.manager.emit(packet.t, packet.d, this.id); this.onPacket(packet); } /** * Called whenever an error occurs with the WebSocket. * @param {ErrorEvent} event The error that occurred * @private */ onError(event) { const error = event?.error ?? event; if (!error) return; /** * Emitted whenever a shard's WebSocket encounters a connection error. * @event Client#shardError * @param {Error} error The encountered error * @param {number} shardId The shard that encountered this error */ this.manager.client.emit(Events.SHARD_ERROR, error, this.id); } /** * @external CloseEvent * @see {@link https://developer.mozilla.org/en-US/docs/Web/API/CloseEvent} */ /** * @external ErrorEvent * @see {@link https://developer.mozilla.org/en-US/docs/Web/API/ErrorEvent} */ /** * @external MessageEvent * @see {@link https://developer.mozilla.org/en-US/docs/Web/API/MessageEvent} */ /** * Called whenever a connection to the gateway is closed. * @param {CloseEvent} event Close event that was received * @private */ onClose(event) { if (this.sequence !== -1) this.closeSequence = this.sequence; this.sequence = -1; this.debug(`[CLOSE] Event Code: ${event.code} Clean : ${event.wasClean} Reason : ${event.reason ?? 'No reason received'}`); this.setHeartbeatTimer(-1); this.setHelloTimeout(-1); // If we still have a connection object, clean up its listeners if (this.connection) this._cleanupConnection(); this.status = Status.DISCONNECTED; /** * Emitted when a shard's WebSocket closes. * @private * @event WebSocketShard#close * @param {CloseEvent} event The received event */ this.emit(ShardEvents.CLOSE, event); } /** * Called whenever a packet is received. * @param {Object} packet The received packet * @private */ onPacket(packet) { if (!packet) { this.debug(`Received broken packet: '${packet}'.`); return; } switch (packet.t) { case WSEvents.READY: /** * Emitted when the shard receives the READY payload and is now waiting for guilds * @event WebSocketShard#ready */ this.emit(ShardEvents.READY); this.sessionId = packet.d.session_id; this.expectedGuilds = new Set(packet.d.guilds.map(d => d.id)); this.status = Status.WAITING_FOR_GUILDS; this.debug(`[READY] Session ${this.sessionId}.`); this.lastHeartbeatAcked = true; this.sendHeartbeat('ReadyHeartbeat'); break; case WSEvents.RESUMED: { /** * Emitted when the shard resumes successfully * @event WebSocketShard#resumed */ this.emit(ShardEvents.RESUMED); this.status = Status.READY; const replayed = packet.s - this.closeSequence; this.debug(`[RESUMED] Session ${this.sessionId} | Replayed ${replayed} events.`); this.lastHeartbeatAcked = true; this.sendHeartbeat('ResumeHeartbeat'); break; } } if (packet.s > this.sequence) this.sequence = packet.s; switch (packet.op) { case Opcodes.HELLO: this.setHelloTimeout(-1); this.setHeartbeatTimer(packet.d.heartbeat_interval); this.identify(); break; case Opcodes.RECONNECT: this.debug('[RECONNECT] Discord asked us to reconnect'); this.destroy({ closeCode: 4_000 }); break; case Opcodes.INVALID_SESSION: this.debug(`[INVALID SESSION] Resumable: ${packet.d}.`); // If we can resume the session, do so immediately if (packet.d) { this.identifyResume(); return; } // Reset the sequence this.sequence = -1; // Reset the session id as it's invalid this.sessionId = null; // Set the status to reconnecting this.status = Status.RECONNECTING; // Finally, emit the INVALID_SESSION event this.emit(ShardEvents.INVALID_SESSION); break; case Opcodes.HEARTBEAT_ACK: this.ackHeartbeat(); break; case Opcodes.HEARTBEAT: this.sendHeartbeat('HeartbeatRequest', true); break; default: this.manager.handlePacket(packet, this); if (this.status === Status.WAITING_FOR_GUILDS && packet.t === WSEvents.GUILD_CREATE) { this.expectedGuilds.delete(packet.d.id); this.checkReady(); } } } /** * Checks if the shard can be marked as ready * @private */ checkReady() { // Step 0. Clear the ready timeout, if it exists if (this.readyTimeout) { clearTimeout(this.readyTimeout); this.readyTimeout = null; } // Step 1. If we don't have any other guilds pending, we are ready if (!this.expectedGuilds.size) { this.debug('Shard received all its guilds. Marking as fully ready.'); this.status = Status.READY; /** * Emitted when the shard is fully ready. * This event is emitted if: * * all guilds were received by this shard * * the ready timeout expired, and some guilds are unavailable * @event WebSocketShard#allReady * @param {?Set} unavailableGuilds Set of unavailable guilds, if any */ this.emit(ShardEvents.ALL_READY); return; } const hasGuildsIntent = new Intents(this.manager.client.options.intents).has(Intents.FLAGS.GUILDS); // Step 2. Create a timeout that will mark the shard as ready if there are still unavailable guilds // * The timeout is 15 seconds by default // * This can be optionally changed in the client options via the `waitGuildTimeout` option // * a timeout time of zero will skip this timeout, which potentially could cause the Client to miss guilds. const { waitGuildTimeout } = this.manager.client.options; this.readyTimeout = setTimeout( () => { this.debug( `Shard ${hasGuildsIntent ? 'did' : 'will'} not receive any more guild packets` + `${hasGuildsIntent ? ` in ${waitGuildTimeout} ms` : ''}.\nUnavailable guild count: ${ this.expectedGuilds.size }`, ); this.readyTimeout = null; this.status = Status.READY; this.emit(ShardEvents.ALL_READY, this.expectedGuilds); }, hasGuildsIntent ? waitGuildTimeout : 0, ).unref(); } /** * Sets the HELLO packet timeout. * @param {number} [time] If set to -1, it will clear the hello timeout * @private */ setHelloTimeout(time) { if (time === -1) { if (this.helloTimeout) { this.debug('Clearing the HELLO timeout.'); clearTimeout(this.helloTimeout); this.helloTimeout = null; } return; } this.debug('Setting a HELLO timeout for 20s.'); this.helloTimeout = setTimeout(() => { this.debug('Did not receive HELLO in time. Destroying and connecting again.'); this.destroy({ reset: true, closeCode: 4009 }); }, 20_000).unref(); } /** * Sets the heartbeat timer for this shard. * @param {number} time If -1, clears the interval, any other number sets an interval * @private */ setHeartbeatTimer(time) { if (time === -1) { if (this.heartbeatInterval) { this.debug('Clearing the heartbeat interval.'); clearInterval(this.heartbeatInterval); this.heartbeatInterval = null; } return; } this.debug(`Setting a heartbeat interval for ${time}ms.`); // Sanity checks if (this.heartbeatInterval) clearInterval(this.heartbeatInterval); this.heartbeatInterval = setInterval(() => this.sendHeartbeat(), time).unref(); } /** * Sends a heartbeat to the WebSocket. * If this shard didn't receive a heartbeat last time, it will destroy it and reconnect * @param {string} [tag='HeartbeatTimer'] What caused this heartbeat to be sent * @param {boolean} [ignoreHeartbeatAck] If we should send the heartbeat forcefully. * @private */ sendHeartbeat( tag = 'HeartbeatTimer', ignoreHeartbeatAck = [Status.WAITING_FOR_GUILDS, Status.IDENTIFYING, Status.RESUMING].includes(this.status), ) { if (ignoreHeartbeatAck && !this.lastHeartbeatAcked) { this.debug(`[${tag}] Didn't process heartbeat ack yet but we are still connected. Sending one now.`); } else if (!this.lastHeartbeatAcked) { this.debug( `[${tag}] Didn't receive a heartbeat ack last time, assuming zombie connection. Destroying and reconnecting. Status : ${STATUS_KEYS[this.status]} Sequence : ${this.sequence} Connection State: ${this.connection ? CONNECTION_STATE[this.connection.readyState] : 'No Connection??'}`, ); this.destroy({ closeCode: 4009, reset: true }); return; } this.debug(`[${tag}] Sending a heartbeat.`); this.lastHeartbeatAcked = false; this.lastPingTimestamp = Date.now(); this.send({ op: Opcodes.HEARTBEAT, d: this.sequence }, true); } /** * Acknowledges a heartbeat. * @private */ ackHeartbeat() { this.lastHeartbeatAcked = true; const latency = Date.now() - this.lastPingTimestamp; this.debug(`Heartbeat acknowledged, latency of ${latency}ms.`); this.ping = latency; } /** * Identifies the client on the connection. * @private * @returns {void} */ identify() { return this.sessionId ? this.identifyResume() : this.identifyNew(); } /** * Identifies as a new connection on the gateway. * @private */ identifyNew() { const { client } = this.manager; if (!client.token) { this.debug('[IDENTIFY] No token available to identify a new session.'); return; } this.status = Status.IDENTIFYING; // Clone the identify payload and assign the token and shard info const d = { ...client.options.ws, intents: Intents.resolve(client.options.intents), token: client.token, shard: [this.id, Number(client.options.shardCount)], }; this.debug(`[IDENTIFY] Shard ${this.id}/${client.options.shardCount} with intents: ${d.intents}`); this.send({ op: Opcodes.IDENTIFY, d }, true); } /** * Resumes a session on the gateway. * @private */ identifyResume() { if (!this.sessionId) { this.debug('[RESUME] No session id was present; identifying as a new session.'); this.identifyNew(); return; } this.status = Status.RESUMING; this.debug(`[RESUME] Session ${this.sessionId}, sequence ${this.closeSequence}`); const d = { token: this.manager.client.token, session_id: this.sessionId, seq: this.closeSequence, }; this.send({ op: Opcodes.RESUME, d }, true); } /** * Adds a packet to the queue to be sent to the gateway. * If you use this method, make sure you understand that you need to provide * a full [Payload](https://discord.com/developers/docs/topics/gateway#commands-and-events-gateway-commands). * Do not use this method if you don't know what you're doing. * @param {Object} data The full packet to send * @param {boolean} [important=false] If this packet should be added first in queue */ send(data, important = false) { this.ratelimit.queue[important ? 'unshift' : 'push'](data); this.processQueue(); } /** * Sends data, bypassing the queue. * @param {Object} data Packet to send * @returns {void} * @private */ _send(data) { if (this.connection?.readyState !== WebSocket.OPEN) { this.debug(`Tried to send packet '${JSON.stringify(data)}' but no WebSocket is available!`); this.destroy({ closeCode: 4_000 }); return; } this.connection.send(WebSocket.pack(data), err => { if (err) this.manager.client.emit(Events.SHARD_ERROR, err, this.id); }); } /** * Processes the current WebSocket queue. * @returns {void} * @private */ processQueue() { if (this.ratelimit.remaining === 0) return; if (this.ratelimit.queue.length === 0) return; if (this.ratelimit.remaining === this.ratelimit.total) { this.ratelimit.timer = setTimeout(() => { this.ratelimit.remaining = this.ratelimit.total; this.processQueue(); }, this.ratelimit.time).unref(); } while (this.ratelimit.remaining > 0) { const item = this.ratelimit.queue.shift(); if (!item) return; this._send(item); this.ratelimit.remaining--; } } /** * Destroys this shard and closes its WebSocket connection. * @param {Object} [options={ closeCode: 1000, reset: false, emit: true, log: true }] Options for destroying the shard * @private */ destroy({ closeCode = 1_000, reset = false, emit = true, log = true } = {}) { if (log) { this.debug(`[DESTROY] Close Code : ${closeCode} Reset : ${reset} Emit DESTROYED: ${emit}`); } // Step 0: Remove all timers this.setHeartbeatTimer(-1); this.setHelloTimeout(-1); // Step 1: Close the WebSocket connection, if any, otherwise, emit DESTROYED if (this.connection) { // If the connection is currently opened, we will (hopefully) receive close if (this.connection.readyState === WebSocket.OPEN) { this.connection.close(closeCode); } else { // Connection is not OPEN this.debug(`WS State: ${CONNECTION_STATE[this.connection.readyState]}`); // Remove listeners from the connection this._cleanupConnection(); // Attempt to close the connection just in case try { this.connection.close(closeCode); } catch { // No-op } // Emit the destroyed event if needed if (emit) this._emitDestroyed(); } } else if (emit) { // We requested a destroy, but we had no connection. Emit destroyed this._emitDestroyed(); } // Step 2: Null the connection object this.connection = null; // Step 3: Set the shard status to DISCONNECTED this.status = Status.DISCONNECTED; // Step 4: Cache the old sequence (use to attempt a resume) if (this.sequence !== -1) this.closeSequence = this.sequence; // Step 5: Reset the sequence and session id if requested if (reset) { this.sequence = -1; this.sessionId = null; } // Step 6: reset the rate limit data this.ratelimit.remaining = this.ratelimit.total; this.ratelimit.queue.length = 0; if (this.ratelimit.timer) { clearTimeout(this.ratelimit.timer); this.ratelimit.timer = null; } } /** * Cleans up the WebSocket connection listeners. * @private */ _cleanupConnection() { this.connection.onopen = this.connection.onclose = this.connection.onerror = this.connection.onmessage = null; } /** * Emits the DESTROYED event on the shard * @private */ _emitDestroyed() { /** * Emitted when a shard is destroyed, but no WebSocket connection was present. * @private * @event WebSocketShard#destroyed */ this.emit(ShardEvents.DESTROYED); } } module.exports = WebSocketShard;