![]() System : Linux absol.cf 5.4.0-198-generic #218-Ubuntu SMP Fri Sep 27 20:18:53 UTC 2024 x86_64 User : www-data ( 33) PHP Version : 7.4.33 Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare, Directory : /proc/thread-self/root/usr/local/lib/node_modules/mediasoup/src/ |
Upload File : |
import { Duplex } from 'stream'; // @ts-ignore import * as netstring from 'netstring'; import { Logger } from './Logger'; import { EnhancedEventEmitter } from './EnhancedEventEmitter'; import { InvalidStateError } from './errors'; const logger = new Logger('PayloadChannel'); type Sent = { id: number; method: string; resolve: (data?: any) => void; reject: (error: Error) => void; timer: NodeJS.Timer; close: () => void; } // netstring length for a 4194304 bytes payload. const NS_MESSAGE_MAX_LEN = 4194313; const NS_PAYLOAD_MAX_LEN = 4194304; export class PayloadChannel extends EnhancedEventEmitter { // Closed flag. private _closed = false; // Unix Socket instance for sending messages to the worker process. private readonly _producerSocket: Duplex; // Unix Socket instance for receiving messages to the worker process. private readonly _consumerSocket: Duplex; // Next id for messages sent to the worker process. private _nextId = 0; // Map of pending sent requests. private readonly _sents: Map<number, Sent> = new Map(); // Buffer for reading messages from the worker. private _recvBuffer?: Buffer; // Ongoing notification (waiting for its payload). private _ongoingNotification?: { targetId: string; event: string; data?: any }; /** * @private */ constructor( { producerSocket, consumerSocket }: { producerSocket: any; consumerSocket: any; }) { super(); logger.debug('constructor()'); this._producerSocket = producerSocket as Duplex; this._consumerSocket = consumerSocket as Duplex; // Read PayloadChannel notifications from the worker. this._consumerSocket.on('data', (buffer: Buffer) => { if (!this._recvBuffer) { this._recvBuffer = buffer; } else { this._recvBuffer = Buffer.concat( [ this._recvBuffer, buffer ], this._recvBuffer.length + buffer.length); } if (this._recvBuffer!.length > NS_PAYLOAD_MAX_LEN) { logger.error('receiving buffer is full, discarding all data into it'); // Reset the buffer and exit. this._recvBuffer = undefined; return; } while (true) // eslint-disable-line no-constant-condition { let nsPayload; try { nsPayload = netstring.nsPayload(this._recvBuffer); } catch (error) { logger.error( 'invalid netstring data received from the worker process: %s', String(error)); // Reset the buffer and exit. this._recvBuffer = undefined; return; } // Incomplete netstring message. if (nsPayload === -1) return; this._processData(nsPayload); // Remove the read payload from the buffer. this._recvBuffer = this._recvBuffer!.slice(netstring.nsLength(this._recvBuffer)); if (!this._recvBuffer.length) { this._recvBuffer = undefined; return; } } }); this._consumerSocket.on('end', () => ( logger.debug('Consumer PayloadChannel ended by the worker process') )); this._consumerSocket.on('error', (error) => ( logger.error('Consumer PayloadChannel error: %s', String(error)) )); this._producerSocket.on('end', () => ( logger.debug('Producer PayloadChannel ended by the worker process') )); this._producerSocket.on('error', (error) => ( logger.error('Producer PayloadChannel error: %s', String(error)) )); } /** * @private */ close(): void { if (this._closed) return; logger.debug('close()'); this._closed = true; // Remove event listeners but leave a fake 'error' hander to avoid // propagation. this._consumerSocket.removeAllListeners('end'); this._consumerSocket.removeAllListeners('error'); this._consumerSocket.on('error', () => {}); this._producerSocket.removeAllListeners('end'); this._producerSocket.removeAllListeners('error'); this._producerSocket.on('error', () => {}); // Destroy the socket after a while to allow pending incoming messages. setTimeout(() => { try { this._producerSocket.destroy(); } catch (error) {} try { this._consumerSocket.destroy(); } catch (error) {} }, 200); } /** * @private */ notify( event: string, internal: object, data: any | undefined, payload: string | Buffer ): void { logger.debug('notify() [event:%s]', event); if (this._closed) throw new InvalidStateError('PayloadChannel closed'); const notification = { event, internal, data }; const ns1 = netstring.nsWrite(JSON.stringify(notification)); const ns2 = netstring.nsWrite(payload); if (Buffer.byteLength(ns1) > NS_MESSAGE_MAX_LEN) throw new Error('PayloadChannel notification too big'); else if (Buffer.byteLength(ns2) > NS_MESSAGE_MAX_LEN) throw new Error('PayloadChannel payload too big'); try { // This may throw if closed or remote side ended. this._producerSocket.write(ns1); } catch (error) { logger.warn('notify() | sending notification failed: %s', String(error)); return; } try { // This may throw if closed or remote side ended. this._producerSocket.write(ns2); } catch (error) { logger.warn('notify() | sending payload failed: %s', String(error)); return; } } /** * @private */ async request( method: string, internal: object, data: any, payload: string | Buffer): Promise<any> { this._nextId < 4294967295 ? ++this._nextId : (this._nextId = 1); const id = this._nextId; logger.debug('request() [method:%s, id:%s]', method, id); if (this._closed) throw new InvalidStateError('Channel closed'); const request = { id, method, internal, data }; const ns1 = netstring.nsWrite(JSON.stringify(request)); const ns2 = netstring.nsWrite(payload); if (Buffer.byteLength(ns1) > NS_MESSAGE_MAX_LEN) throw new Error('Channel request too big'); else if (Buffer.byteLength(ns2) > NS_MESSAGE_MAX_LEN) throw new Error('PayloadChannel payload too big'); // This may throw if closed or remote side ended. this._producerSocket.write(ns1); this._producerSocket.write(ns2); return new Promise((pResolve, pReject) => { const timeout = 1000 * (15 + (0.1 * this._sents.size)); const sent: Sent = { id : id, method : method, resolve : (data2) => { if (!this._sents.delete(id)) return; clearTimeout(sent.timer); pResolve(data2); }, reject : (error) => { if (!this._sents.delete(id)) return; clearTimeout(sent.timer); pReject(error); }, timer : setTimeout(() => { if (!this._sents.delete(id)) return; pReject(new Error('Channel request timeout')); }, timeout), close : () => { clearTimeout(sent.timer); pReject(new InvalidStateError('Channel closed')); } }; // Add sent stuff to the map. this._sents.set(id, sent); }); } private _processData(data: Buffer): void { if (!this._ongoingNotification) { let msg; try { msg = JSON.parse(data.toString('utf8')); } catch (error) { logger.error( 'received invalid data from the worker process: %s', String(error)); return; } // If a response, retrieve its associated request. if (msg.id) { const sent = this._sents.get(msg.id); if (!sent) { logger.error( 'received response does not match any sent request [id:%s]', msg.id); return; } if (msg.accepted) { logger.debug( 'request succeeded [method:%s, id:%s]', sent.method, sent.id); sent.resolve(msg.data); } else if (msg.error) { logger.warn( 'request failed [method:%s, id:%s]: %s', sent.method, sent.id, msg.reason); switch (msg.error) { case 'TypeError': sent.reject(new TypeError(msg.reason)); break; default: sent.reject(new Error(msg.reason)); } } else { logger.error( 'received response is not accepted nor rejected [method:%s, id:%s]', sent.method, sent.id); } } // If a notification, create the ongoing notification instance. else if (msg.targetId && msg.event) { this._ongoingNotification = { targetId : msg.targetId, event : msg.event, data : msg.data }; } else { logger.error('received data is not a notification nor a response'); return; } } else { const payload = data as Buffer; // Emit the corresponding event. this.emit( this._ongoingNotification.targetId, this._ongoingNotification.event, this._ongoingNotification.data, payload); // Unset ongoing notification. this._ongoingNotification = undefined; } } }