901 lines
27 KiB
JavaScript
901 lines
27 KiB
JavaScript
'use strict';
|
|
|
|
const EventEmitter = require('node:events');
|
|
const { setTimeout, setInterval, clearTimeout } = require('node:timers');
|
|
const proxy = require('proxy-agent');
|
|
const WebSocket = require('../../WebSocket');
|
|
const { Status, Events, ShardEvents, Opcodes, WSEvents, WSCodes } = require('../../util/Constants');
|
|
const Intents = require('../../util/Intents');
|
|
|
|
const STATUS_KEYS = Object.keys(Status);
|
|
const CONNECTION_STATE = Object.keys(WebSocket.WebSocket);
|
|
|
|
let zlib;
|
|
|
|
try {
|
|
zlib = require('zlib-sync');
|
|
} catch {} // eslint-disable-line no-empty
|
|
|
|
/**
|
|
* Represents a Shard's WebSocket connection
|
|
*/
|
|
class WebSocketShard extends EventEmitter {
|
|
constructor(manager, id) {
|
|
super();
|
|
|
|
/**
|
|
* The WebSocketManager of the shard
|
|
* @type {WebSocketManager}
|
|
*/
|
|
this.manager = manager;
|
|
|
|
/**
|
|
* The shard's id
|
|
* @type {number}
|
|
*/
|
|
this.id = id;
|
|
|
|
/**
|
|
* The current status of the shard
|
|
* @type {Status}
|
|
*/
|
|
this.status = Status.IDLE;
|
|
|
|
/**
|
|
* The current sequence of the shard
|
|
* @type {number}
|
|
* @private
|
|
*/
|
|
this.sequence = -1;
|
|
|
|
/**
|
|
* The sequence of the shard after close
|
|
* @type {number}
|
|
* @private
|
|
*/
|
|
this.closeSequence = 0;
|
|
|
|
/**
|
|
* The current session id of the shard
|
|
* @type {?string}
|
|
* @private
|
|
*/
|
|
this.sessionId = null;
|
|
|
|
/**
|
|
* The previous heartbeat ping of the shard
|
|
* @type {number}
|
|
*/
|
|
this.ping = -1;
|
|
|
|
/**
|
|
* The last time a ping was sent (a timestamp)
|
|
* @type {number}
|
|
* @private
|
|
*/
|
|
this.lastPingTimestamp = -1;
|
|
|
|
/**
|
|
* If we received a heartbeat ack back. Used to identify zombie connections
|
|
* @type {boolean}
|
|
* @private
|
|
*/
|
|
this.lastHeartbeatAcked = true;
|
|
|
|
/**
|
|
* Used to prevent calling {@link WebSocketShard#event:close} twice while closing or terminating the WebSocket.
|
|
* @type {boolean}
|
|
* @private
|
|
*/
|
|
this.closeEmitted = false;
|
|
|
|
/**
|
|
* Contains the rate limit queue and metadata
|
|
* @name WebSocketShard#ratelimit
|
|
* @type {Object}
|
|
* @private
|
|
*/
|
|
Object.defineProperty(this, 'ratelimit', {
|
|
value: {
|
|
queue: [],
|
|
total: 120,
|
|
remaining: 120,
|
|
time: 60e3,
|
|
timer: null,
|
|
},
|
|
});
|
|
|
|
/**
|
|
* The WebSocket connection for the current shard
|
|
* @name WebSocketShard#connection
|
|
* @type {?WebSocket}
|
|
* @private
|
|
*/
|
|
Object.defineProperty(this, 'connection', { value: null, writable: true });
|
|
|
|
/**
|
|
* @external Inflate
|
|
* @see {@link https://www.npmjs.com/package/zlib-sync}
|
|
*/
|
|
|
|
/**
|
|
* The compression to use
|
|
* @name WebSocketShard#inflate
|
|
* @type {?Inflate}
|
|
* @private
|
|
*/
|
|
Object.defineProperty(this, 'inflate', { value: null, writable: true });
|
|
|
|
/**
|
|
* The HELLO timeout
|
|
* @name WebSocketShard#helloTimeout
|
|
* @type {?NodeJS.Timeout}
|
|
* @private
|
|
*/
|
|
Object.defineProperty(this, 'helloTimeout', { value: null, writable: true });
|
|
|
|
/**
|
|
* The WebSocket timeout.
|
|
* @name WebSocketShard#wsCloseTimeout
|
|
* @type {?NodeJS.Timeout}
|
|
* @private
|
|
*/
|
|
Object.defineProperty(this, 'wsCloseTimeout', { value: null, writable: true });
|
|
|
|
/**
|
|
* If the manager attached its event handlers on the shard
|
|
* @name WebSocketShard#eventsAttached
|
|
* @type {boolean}
|
|
* @private
|
|
*/
|
|
Object.defineProperty(this, 'eventsAttached', { value: false, writable: true });
|
|
|
|
/**
|
|
* A set of guild ids this shard expects to receive
|
|
* @name WebSocketShard#expectedGuilds
|
|
* @type {?Set<string>}
|
|
* @private
|
|
*/
|
|
Object.defineProperty(this, 'expectedGuilds', { value: null, writable: true });
|
|
|
|
/**
|
|
* The ready timeout
|
|
* @name WebSocketShard#readyTimeout
|
|
* @type {?NodeJS.Timeout}
|
|
* @private
|
|
*/
|
|
Object.defineProperty(this, 'readyTimeout', { value: null, writable: true });
|
|
|
|
/**
|
|
* Time when the WebSocket connection was opened
|
|
* @name WebSocketShard#connectedAt
|
|
* @type {number}
|
|
* @private
|
|
*/
|
|
Object.defineProperty(this, 'connectedAt', { value: 0, writable: true });
|
|
}
|
|
|
|
/**
|
|
* Emits a debug event.
|
|
* @param {string} message The debug message
|
|
* @private
|
|
*/
|
|
debug(message) {
|
|
this.manager.debug(message, this);
|
|
}
|
|
|
|
/**
|
|
* Connects the shard to the gateway.
|
|
* @private
|
|
* @returns {Promise<void>} A promise that will resolve if the shard turns ready successfully,
|
|
* or reject if we couldn't connect
|
|
*/
|
|
connect() {
|
|
const { gateway, client } = this.manager;
|
|
|
|
if (this.connection?.readyState === WebSocket.OPEN && this.status === Status.READY) {
|
|
return Promise.resolve();
|
|
}
|
|
|
|
return new Promise((resolve, reject) => {
|
|
const cleanup = () => {
|
|
this.removeListener(ShardEvents.CLOSE, onClose);
|
|
this.removeListener(ShardEvents.READY, onReady);
|
|
this.removeListener(ShardEvents.RESUMED, onResumed);
|
|
this.removeListener(ShardEvents.INVALID_SESSION, onInvalidOrDestroyed);
|
|
this.removeListener(ShardEvents.DESTROYED, onInvalidOrDestroyed);
|
|
};
|
|
|
|
const onReady = () => {
|
|
cleanup();
|
|
resolve();
|
|
};
|
|
|
|
const onResumed = () => {
|
|
cleanup();
|
|
resolve();
|
|
};
|
|
|
|
const onClose = event => {
|
|
cleanup();
|
|
reject(event);
|
|
};
|
|
|
|
const onInvalidOrDestroyed = () => {
|
|
cleanup();
|
|
// eslint-disable-next-line prefer-promise-reject-errors
|
|
reject();
|
|
};
|
|
|
|
this.once(ShardEvents.READY, onReady);
|
|
this.once(ShardEvents.RESUMED, onResumed);
|
|
this.once(ShardEvents.CLOSE, onClose);
|
|
this.once(ShardEvents.INVALID_SESSION, onInvalidOrDestroyed);
|
|
this.once(ShardEvents.DESTROYED, onInvalidOrDestroyed);
|
|
|
|
if (this.connection?.readyState === WebSocket.OPEN) {
|
|
this.debug('An open connection was found, attempting an immediate identify.');
|
|
this.identify();
|
|
return;
|
|
}
|
|
|
|
if (this.connection) {
|
|
this.debug(`A connection object was found. Cleaning up before continuing.
|
|
State: ${CONNECTION_STATE[this.connection.readyState]}`);
|
|
this.destroy({ emit: false });
|
|
}
|
|
|
|
const wsQuery = { v: client.options.ws.version };
|
|
|
|
if (zlib) {
|
|
this.inflate = new zlib.Inflate({
|
|
chunkSize: 65535,
|
|
flush: zlib.Z_SYNC_FLUSH,
|
|
to: WebSocket.encoding === 'json' ? 'string' : '',
|
|
});
|
|
wsQuery.compress = 'zlib-stream';
|
|
}
|
|
|
|
this.debug(
|
|
`[CONNECT]
|
|
Gateway : ${gateway}
|
|
Version : ${client.options.ws.version}
|
|
Encoding : ${WebSocket.encoding}
|
|
Compression: ${zlib ? 'zlib-stream' : 'none'}
|
|
Proxy : ${client.options.ws.proxy || 'none'}`,
|
|
);
|
|
|
|
this.status = this.status === Status.DISCONNECTED ? Status.RECONNECTING : Status.CONNECTING;
|
|
this.setHelloTimeout();
|
|
this.setWsCloseTimeout(-1);
|
|
this.connectedAt = Date.now();
|
|
|
|
let args = { handshakeTimeout: 30_000 };
|
|
if (typeof client.options.proxy && client.options.proxy.length > 0) {
|
|
args.agent = new proxy(client.options.proxy);
|
|
this.debug(`Using proxy ${client.options.proxy}`, args);
|
|
}
|
|
// Adding a handshake timeout to just make sure no zombie connection appears.
|
|
const ws = (this.connection = WebSocket.create(gateway, wsQuery, args));
|
|
ws.onopen = this.onOpen.bind(this);
|
|
ws.onmessage = this.onMessage.bind(this);
|
|
ws.onerror = this.onError.bind(this);
|
|
ws.onclose = this.onClose.bind(this);
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Called whenever a connection is opened to the gateway.
|
|
* @private
|
|
*/
|
|
onOpen() {
|
|
this.debug(`[CONNECTED] Took ${Date.now() - this.connectedAt}ms`);
|
|
this.status = Status.NEARLY;
|
|
}
|
|
|
|
/**
|
|
* Called whenever a message is received.
|
|
* @param {MessageEvent} event Event received
|
|
* @private
|
|
*/
|
|
onMessage({ data }) {
|
|
let raw;
|
|
if (data instanceof ArrayBuffer) data = new Uint8Array(data);
|
|
if (zlib) {
|
|
const l = data.length;
|
|
const flush =
|
|
l >= 4 && data[l - 4] === 0x00 && data[l - 3] === 0x00 && data[l - 2] === 0xff && data[l - 1] === 0xff;
|
|
|
|
this.inflate.push(data, flush && zlib.Z_SYNC_FLUSH);
|
|
if (!flush) return;
|
|
raw = this.inflate.result;
|
|
} else {
|
|
raw = data;
|
|
}
|
|
let packet;
|
|
try {
|
|
packet = WebSocket.unpack(raw);
|
|
} catch (err) {
|
|
this.manager.client.emit(Events.SHARD_ERROR, err, this.id);
|
|
return;
|
|
}
|
|
this.manager.client.emit(Events.RAW, packet, this.id);
|
|
if (packet.op === Opcodes.DISPATCH) this.manager.emit(packet.t, packet.d, this.id);
|
|
this.onPacket(packet);
|
|
}
|
|
|
|
/**
|
|
* Called whenever an error occurs with the WebSocket.
|
|
* @param {ErrorEvent} event The error that occurred
|
|
* @private
|
|
*/
|
|
onError(event) {
|
|
const error = event?.error ?? event;
|
|
if (!error) return;
|
|
|
|
/**
|
|
* Emitted whenever a shard's WebSocket encounters a connection error.
|
|
* @event Client#shardError
|
|
* @param {Error} error The encountered error
|
|
* @param {number} shardId The shard that encountered this error
|
|
*/
|
|
this.manager.client.emit(Events.SHARD_ERROR, error, this.id);
|
|
}
|
|
|
|
/**
|
|
* @external CloseEvent
|
|
* @see {@link https://developer.mozilla.org/en-US/docs/Web/API/CloseEvent}
|
|
*/
|
|
|
|
/**
|
|
* @external ErrorEvent
|
|
* @see {@link https://developer.mozilla.org/en-US/docs/Web/API/ErrorEvent}
|
|
*/
|
|
|
|
/**
|
|
* @external MessageEvent
|
|
* @see {@link https://developer.mozilla.org/en-US/docs/Web/API/MessageEvent}
|
|
*/
|
|
|
|
/**
|
|
* Called whenever a connection to the gateway is closed.
|
|
* @param {CloseEvent} event Close event that was received
|
|
* @private
|
|
*/
|
|
onClose(event) {
|
|
this.closeEmitted = true;
|
|
if (this.sequence !== -1) this.closeSequence = this.sequence;
|
|
this.sequence = -1;
|
|
this.setHeartbeatTimer(-1);
|
|
this.setHelloTimeout(-1);
|
|
// Clearing the WebSocket close timeout as close was emitted.
|
|
this.setWsCloseTimeout(-1);
|
|
// If we still have a connection object, clean up its listeners
|
|
if (this.connection) {
|
|
this._cleanupConnection();
|
|
// Having this after _cleanupConnection to just clean up the connection and not listen to ws.onclose
|
|
this.destroy({ reset: !this.sessionId, emit: false, log: false });
|
|
}
|
|
this.status = Status.DISCONNECTED;
|
|
this.emitClose(event);
|
|
}
|
|
|
|
/**
|
|
* This method is responsible to emit close event for this shard.
|
|
* This method helps the shard reconnect.
|
|
* @param {CloseEvent} [event] Close event that was received
|
|
*/
|
|
emitClose(
|
|
event = {
|
|
code: 1011,
|
|
reason: WSCodes[1011],
|
|
wasClean: false,
|
|
},
|
|
) {
|
|
this.debug(`[CLOSE]
|
|
Event Code: ${event.code}
|
|
Clean : ${event.wasClean}
|
|
Reason : ${event.reason ?? 'No reason received'}`);
|
|
/**
|
|
* Emitted when a shard's WebSocket closes.
|
|
* @private
|
|
* @event WebSocketShard#close
|
|
* @param {CloseEvent} event The received event
|
|
*/
|
|
this.emit(ShardEvents.CLOSE, event);
|
|
}
|
|
|
|
/**
|
|
* Called whenever a packet is received.
|
|
* @param {Object} packet The received packet
|
|
* @private
|
|
*/
|
|
onPacket(packet) {
|
|
if (!packet) {
|
|
this.debug(`Received broken packet: '${packet}'.`);
|
|
return;
|
|
}
|
|
|
|
switch (packet.t) {
|
|
case WSEvents.READY:
|
|
/**
|
|
* Emitted when the shard receives the READY payload and is now waiting for guilds
|
|
* @event WebSocketShard#ready
|
|
*/
|
|
this.emit(ShardEvents.READY);
|
|
|
|
this.sessionId = packet.d.session_id;
|
|
this.expectedGuilds = new Set(packet.d.guilds.filter(d => d.unavailable).map(d => d.id));
|
|
this.status = Status.WAITING_FOR_GUILDS;
|
|
this.debug(`[READY] Session ${this.sessionId}.`);
|
|
this.lastHeartbeatAcked = true;
|
|
this.sendHeartbeat('ReadyHeartbeat');
|
|
break;
|
|
case WSEvents.RESUMED: {
|
|
/**
|
|
* Emitted when the shard resumes successfully
|
|
* @event WebSocketShard#resumed
|
|
*/
|
|
this.emit(ShardEvents.RESUMED);
|
|
|
|
this.status = Status.READY;
|
|
const replayed = packet.s - this.closeSequence;
|
|
this.debug(`[RESUMED] Session ${this.sessionId} | Replayed ${replayed} events.`);
|
|
this.lastHeartbeatAcked = true;
|
|
this.sendHeartbeat('ResumeHeartbeat');
|
|
break;
|
|
}
|
|
}
|
|
|
|
if (packet.s > this.sequence) this.sequence = packet.s;
|
|
|
|
switch (packet.op) {
|
|
case Opcodes.HELLO:
|
|
this.setHelloTimeout(-1);
|
|
this.setHeartbeatTimer(packet.d.heartbeat_interval);
|
|
this.identify();
|
|
break;
|
|
case Opcodes.RECONNECT:
|
|
this.debug('[RECONNECT] Discord asked us to reconnect');
|
|
this.destroy({ closeCode: 4_000 });
|
|
break;
|
|
case Opcodes.INVALID_SESSION:
|
|
this.debug(`[INVALID SESSION] Resumable: ${packet.d}.`);
|
|
// If we can resume the session, do so immediately
|
|
if (packet.d) {
|
|
this.identifyResume();
|
|
return;
|
|
}
|
|
// Reset the sequence
|
|
this.sequence = -1;
|
|
// Reset the session id as it's invalid
|
|
this.sessionId = null;
|
|
// Set the status to reconnecting
|
|
this.status = Status.RECONNECTING;
|
|
// Finally, emit the INVALID_SESSION event
|
|
/**
|
|
* Emitted when the session has been invalidated.
|
|
* @event WebSocketShard#invalidSession
|
|
*/
|
|
this.emit(ShardEvents.INVALID_SESSION);
|
|
break;
|
|
case Opcodes.HEARTBEAT_ACK:
|
|
this.ackHeartbeat();
|
|
break;
|
|
case Opcodes.HEARTBEAT:
|
|
this.sendHeartbeat('HeartbeatRequest', true);
|
|
break;
|
|
default:
|
|
this.manager.handlePacket(packet, this);
|
|
if (this.status === Status.WAITING_FOR_GUILDS && packet.t === WSEvents.GUILD_CREATE) {
|
|
this.expectedGuilds.delete(packet.d.id);
|
|
this.checkReady();
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Checks if the shard can be marked as ready
|
|
* @private
|
|
*/
|
|
checkReady() {
|
|
// Step 0. Clear the ready timeout, if it exists
|
|
if (this.readyTimeout) {
|
|
clearTimeout(this.readyTimeout);
|
|
this.readyTimeout = null;
|
|
}
|
|
// Step 1. If we don't have any other guilds pending, we are ready
|
|
if (!this.expectedGuilds.size) {
|
|
this.debug('Shard received all its guilds. Marking as fully ready.');
|
|
this.status = Status.READY;
|
|
|
|
/**
|
|
* Emitted when the shard is fully ready.
|
|
* This event is emitted if:
|
|
* * all guilds were received by this shard
|
|
* * the ready timeout expired, and some guilds are unavailable
|
|
* @event WebSocketShard#allReady
|
|
* @param {?Set<string>} unavailableGuilds Set of unavailable guilds, if any
|
|
*/
|
|
this.emit(ShardEvents.ALL_READY);
|
|
return;
|
|
}
|
|
const hasGuildsIntent = new Intents(this.manager.client.options.intents).has(Intents.FLAGS.GUILDS);
|
|
// Step 2. Create a timeout that will mark the shard as ready if there are still unavailable guilds
|
|
// * The timeout is 15 seconds by default
|
|
// * This can be optionally changed in the client options via the `waitGuildTimeout` option
|
|
// * a timeout time of zero will skip this timeout, which potentially could cause the Client to miss guilds.
|
|
|
|
const { waitGuildTimeout } = this.manager.client.options;
|
|
|
|
this.readyTimeout = setTimeout(() => {
|
|
this.debug(
|
|
`Shard ${hasGuildsIntent ? 'did' : 'will'} not receive any more guild packets` +
|
|
`${hasGuildsIntent ? ` in ${waitGuildTimeout} ms` : ''}.\nUnavailable guild count: ${
|
|
this.expectedGuilds.size
|
|
}`,
|
|
);
|
|
|
|
this.readyTimeout = null;
|
|
|
|
this.status = Status.READY;
|
|
|
|
this.emit(ShardEvents.ALL_READY, this.expectedGuilds);
|
|
}, hasGuildsIntent && waitGuildTimeout).unref();
|
|
}
|
|
|
|
/**
|
|
* Sets the HELLO packet timeout.
|
|
* @param {number} [time] If set to -1, it will clear the hello timeout
|
|
* @private
|
|
*/
|
|
setHelloTimeout(time) {
|
|
if (time === -1) {
|
|
if (this.helloTimeout) {
|
|
this.debug('Clearing the HELLO timeout.');
|
|
clearTimeout(this.helloTimeout);
|
|
this.helloTimeout = null;
|
|
}
|
|
return;
|
|
}
|
|
this.debug('Setting a HELLO timeout for 20s.');
|
|
this.helloTimeout = setTimeout(() => {
|
|
this.debug('Did not receive HELLO in time. Destroying and connecting again.');
|
|
this.destroy({ reset: true, closeCode: 4009 });
|
|
}, 20_000).unref();
|
|
}
|
|
|
|
/**
|
|
* Sets the WebSocket Close timeout.
|
|
* This method is responsible for detecting any zombie connections if the WebSocket fails to close properly.
|
|
* @param {number} [time] If set to -1, it will clear the timeout
|
|
* @private
|
|
*/
|
|
setWsCloseTimeout(time) {
|
|
if (this.wsCloseTimeout) {
|
|
this.debug('[WebSocket] Clearing the close timeout.');
|
|
clearTimeout(this.wsCloseTimeout);
|
|
}
|
|
if (time === -1) {
|
|
this.wsCloseTimeout = null;
|
|
return;
|
|
}
|
|
this.wsCloseTimeout = setTimeout(() => {
|
|
this.setWsCloseTimeout(-1);
|
|
this.debug(`[WebSocket] Close Emitted: ${this.closeEmitted}`);
|
|
// Check if close event was emitted.
|
|
if (this.closeEmitted) {
|
|
this.debug(
|
|
`[WebSocket] was closed. | WS State: ${
|
|
CONNECTION_STATE[this.connection?.readyState ?? WebSocket.CLOSED]
|
|
} | Close Emitted: ${this.closeEmitted}`,
|
|
);
|
|
// Setting the variable false to check for zombie connections.
|
|
this.closeEmitted = false;
|
|
return;
|
|
}
|
|
|
|
this.debug(
|
|
// eslint-disable-next-line max-len
|
|
`[WebSocket] did not close properly, assuming a zombie connection.\nEmitting close and reconnecting again.`,
|
|
);
|
|
|
|
this.emitClose();
|
|
// Setting the variable false to check for zombie connections.
|
|
this.closeEmitted = false;
|
|
}, time).unref();
|
|
}
|
|
|
|
/**
|
|
* Sets the heartbeat timer for this shard.
|
|
* @param {number} time If -1, clears the interval, any other number sets an interval
|
|
* @private
|
|
*/
|
|
setHeartbeatTimer(time) {
|
|
if (time === -1) {
|
|
if (this.heartbeatInterval) {
|
|
this.debug('Clearing the heartbeat interval.');
|
|
clearInterval(this.heartbeatInterval);
|
|
this.heartbeatInterval = null;
|
|
}
|
|
return;
|
|
}
|
|
this.debug(`Setting a heartbeat interval for ${time}ms.`);
|
|
// Sanity checks
|
|
if (this.heartbeatInterval) clearInterval(this.heartbeatInterval);
|
|
this.heartbeatInterval = setInterval(() => this.sendHeartbeat(), time).unref();
|
|
}
|
|
|
|
/**
|
|
* Sends a heartbeat to the WebSocket.
|
|
* If this shard didn't receive a heartbeat last time, it will destroy it and reconnect
|
|
* @param {string} [tag='HeartbeatTimer'] What caused this heartbeat to be sent
|
|
* @param {boolean} [ignoreHeartbeatAck] If we should send the heartbeat forcefully.
|
|
* @private
|
|
*/
|
|
sendHeartbeat(
|
|
tag = 'HeartbeatTimer',
|
|
ignoreHeartbeatAck = [Status.WAITING_FOR_GUILDS, Status.IDENTIFYING, Status.RESUMING].includes(this.status),
|
|
) {
|
|
if (ignoreHeartbeatAck && !this.lastHeartbeatAcked) {
|
|
this.debug(`[${tag}] Didn't process heartbeat ack yet but we are still connected. Sending one now.`);
|
|
} else if (!this.lastHeartbeatAcked) {
|
|
this.debug(
|
|
`[${tag}] Didn't receive a heartbeat ack last time, assuming zombie connection. Destroying and reconnecting.
|
|
Status : ${STATUS_KEYS[this.status]}
|
|
Sequence : ${this.sequence}
|
|
Connection State: ${this.connection ? CONNECTION_STATE[this.connection.readyState] : 'No Connection??'}`,
|
|
);
|
|
|
|
this.destroy({ reset: true, closeCode: 4009 });
|
|
return;
|
|
}
|
|
|
|
this.debug(`[${tag}] Sending a heartbeat.`);
|
|
this.lastHeartbeatAcked = false;
|
|
this.lastPingTimestamp = Date.now();
|
|
this.send({ op: Opcodes.HEARTBEAT, d: this.sequence }, true);
|
|
}
|
|
|
|
/**
|
|
* Acknowledges a heartbeat.
|
|
* @private
|
|
*/
|
|
ackHeartbeat() {
|
|
this.lastHeartbeatAcked = true;
|
|
const latency = Date.now() - this.lastPingTimestamp;
|
|
this.debug(`Heartbeat acknowledged, latency of ${latency}ms.`);
|
|
this.ping = latency;
|
|
}
|
|
|
|
/**
|
|
* Identifies the client on the connection.
|
|
* @private
|
|
* @returns {void}
|
|
*/
|
|
identify() {
|
|
return this.sessionId ? this.identifyResume() : this.identifyNew();
|
|
}
|
|
|
|
/**
|
|
* Identifies as a new connection on the gateway.
|
|
* @private
|
|
*/
|
|
identifyNew() {
|
|
const { client } = this.manager;
|
|
if (!client.token) {
|
|
this.debug('[IDENTIFY] No token available to identify a new session.');
|
|
return;
|
|
}
|
|
|
|
this.status = Status.IDENTIFYING;
|
|
|
|
// Clone the identify payload and assign the token and shard info
|
|
client.options.ws.properties = Object.assign(client.options.ws.properties, {
|
|
browser_user_agent: client.options.http.headers['User-Agent'],
|
|
});
|
|
Object.keys(client.options.ws.properties)
|
|
.filter(k => k.startsWith('$'))
|
|
.forEach(k => {
|
|
client.options.ws.properties[k.slice(1)] = client.options.ws.properties[k];
|
|
delete client.options.ws.properties[k];
|
|
});
|
|
const d = {
|
|
...client.options.ws,
|
|
// Remove, Req by dolfies_person [Reddit]: intents: Intents.resolve(client.options.intents),
|
|
token: client.token,
|
|
// Remove: shard: [this.id, Number(client.options.shardCount)],
|
|
};
|
|
|
|
this.debug(
|
|
`[IDENTIFY] Shard ${this.id}/${client.options.shardCount} with intents: ${Intents.resolve(
|
|
client.options.intents,
|
|
)} :)`,
|
|
);
|
|
this.send({ op: Opcodes.IDENTIFY, d }, true);
|
|
}
|
|
|
|
/**
|
|
* Resumes a session on the gateway.
|
|
* @private
|
|
*/
|
|
identifyResume() {
|
|
if (!this.sessionId) {
|
|
this.debug('[RESUME] No session id was present; identifying as a new session.');
|
|
this.identifyNew();
|
|
return;
|
|
}
|
|
|
|
this.status = Status.RESUMING;
|
|
|
|
this.debug(`[RESUME] Session ${this.sessionId}, sequence ${this.closeSequence}`);
|
|
|
|
const d = {
|
|
token: this.manager.client.token,
|
|
session_id: this.sessionId,
|
|
seq: this.closeSequence,
|
|
};
|
|
|
|
this.send({ op: Opcodes.RESUME, d }, true);
|
|
}
|
|
|
|
/**
|
|
* Adds a packet to the queue to be sent to the gateway.
|
|
* <warn>If you use this method, make sure you understand that you need to provide
|
|
* a full [Payload](https://discord.com/developers/docs/topics/gateway#commands-and-events-gateway-commands).
|
|
* Do not use this method if you don't know what you're doing.</warn>
|
|
* @param {Object} data The full packet to send
|
|
* @param {boolean} [important=false] If this packet should be added first in queue
|
|
*/
|
|
send(data, important = false) {
|
|
this.ratelimit.queue[important ? 'unshift' : 'push'](data);
|
|
this.processQueue();
|
|
}
|
|
|
|
/**
|
|
* Sends data, bypassing the queue.
|
|
* @param {Object} data Packet to send
|
|
* @returns {void}
|
|
* @private
|
|
*/
|
|
_send(data) {
|
|
if (this.connection?.readyState !== WebSocket.OPEN) {
|
|
this.debug(`Tried to send packet '${JSON.stringify(data)}' but no WebSocket is available!`);
|
|
this.destroy({ closeCode: 4_000 });
|
|
return;
|
|
}
|
|
|
|
this.connection.send(WebSocket.pack(data), err => {
|
|
if (err) this.manager.client.emit(Events.SHARD_ERROR, err, this.id);
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Processes the current WebSocket queue.
|
|
* @returns {void}
|
|
* @private
|
|
*/
|
|
processQueue() {
|
|
if (this.ratelimit.remaining === 0) return;
|
|
if (this.ratelimit.queue.length === 0) return;
|
|
if (this.ratelimit.remaining === this.ratelimit.total) {
|
|
this.ratelimit.timer = setTimeout(() => {
|
|
this.ratelimit.remaining = this.ratelimit.total;
|
|
this.processQueue();
|
|
}, this.ratelimit.time).unref();
|
|
}
|
|
while (this.ratelimit.remaining > 0) {
|
|
const item = this.ratelimit.queue.shift();
|
|
if (!item) return;
|
|
this._send(item);
|
|
this.ratelimit.remaining--;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Destroys this shard and closes its WebSocket connection.
|
|
* @param {Object} [options={ closeCode: 1000, reset: false, emit: true, log: true }] Options for destroying the shard
|
|
* @private
|
|
*/
|
|
destroy({ closeCode = 1_000, reset = false, emit = true, log = true } = {}) {
|
|
if (log) {
|
|
this.debug(`[DESTROY]
|
|
Close Code : ${closeCode}
|
|
Reset : ${reset}
|
|
Emit DESTROYED: ${emit}`);
|
|
}
|
|
|
|
// Step 0: Remove all timers
|
|
this.setHeartbeatTimer(-1);
|
|
this.setHelloTimeout(-1);
|
|
this.debug(
|
|
`[WebSocket] Destroy: Attempting to close the WebSocket. | WS State: ${
|
|
CONNECTION_STATE[this.connection?.readyState ?? WebSocket.CLOSED]
|
|
}`,
|
|
);
|
|
// Step 1: Close the WebSocket connection, if any, otherwise, emit DESTROYED
|
|
if (this.connection) {
|
|
// If the connection is currently opened, we will (hopefully) receive close
|
|
if (this.connection.readyState === WebSocket.OPEN) {
|
|
this.connection.close(closeCode);
|
|
this.debug(`[WebSocket] Close: Tried closing. | WS State: ${CONNECTION_STATE[this.connection.readyState]}`);
|
|
} else {
|
|
// Connection is not OPEN
|
|
this.debug(`WS State: ${CONNECTION_STATE[this.connection.readyState]}`);
|
|
// Attempt to close the connection just in case
|
|
try {
|
|
this.connection.close(closeCode);
|
|
} catch (err) {
|
|
this.debug(
|
|
`[WebSocket] Close: Something went wrong while closing the WebSocket: ${
|
|
err.message || err
|
|
}. Forcefully terminating the connection | WS State: ${CONNECTION_STATE[this.connection.readyState]}`,
|
|
);
|
|
this.connection.terminate();
|
|
}
|
|
// Emit the destroyed event if needed
|
|
if (emit) this._emitDestroyed();
|
|
}
|
|
} else if (emit) {
|
|
// We requested a destroy, but we had no connection. Emit destroyed
|
|
this._emitDestroyed();
|
|
}
|
|
|
|
if (this.connection?.readyState === WebSocket.CLOSING || this.connection?.readyState === WebSocket.CLOSED) {
|
|
this.closeEmitted = false;
|
|
this.debug(
|
|
`[WebSocket] Adding a WebSocket close timeout to ensure a correct WS reconnect.
|
|
Timeout: ${this.manager.client.options.closeTimeout}ms`,
|
|
);
|
|
this.setWsCloseTimeout(this.manager.client.options.closeTimeout);
|
|
}
|
|
|
|
// Step 2: Null the connection object
|
|
this.connection = null;
|
|
|
|
// Step 3: Set the shard status to DISCONNECTED
|
|
this.status = Status.DISCONNECTED;
|
|
|
|
// Step 4: Cache the old sequence (use to attempt a resume)
|
|
if (this.sequence !== -1) this.closeSequence = this.sequence;
|
|
|
|
// Step 5: Reset the sequence and session id if requested
|
|
if (reset) {
|
|
this.sequence = -1;
|
|
this.sessionId = null;
|
|
}
|
|
|
|
// Step 6: reset the rate limit data
|
|
this.ratelimit.remaining = this.ratelimit.total;
|
|
this.ratelimit.queue.length = 0;
|
|
if (this.ratelimit.timer) {
|
|
clearTimeout(this.ratelimit.timer);
|
|
this.ratelimit.timer = null;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Cleans up the WebSocket connection listeners.
|
|
* @private
|
|
*/
|
|
_cleanupConnection() {
|
|
this.connection.onopen = this.connection.onclose = this.connection.onmessage = null;
|
|
this.connection.onerror = () => null;
|
|
}
|
|
|
|
/**
|
|
* Emits the DESTROYED event on the shard
|
|
* @private
|
|
*/
|
|
_emitDestroyed() {
|
|
/**
|
|
* Emitted when a shard is destroyed, but no WebSocket connection was present.
|
|
* @private
|
|
* @event WebSocketShard#destroyed
|
|
*/
|
|
this.emit(ShardEvents.DESTROYED);
|
|
}
|
|
}
|
|
|
|
module.exports = WebSocketShard;
|