![]() System : Linux absol.cf 5.4.0-198-generic #218-Ubuntu SMP Fri Sep 27 20:18:53 UTC 2024 x86_64 User : www-data ( 33) PHP Version : 7.4.33 Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare, Directory : /proc/self/root/usr/local/lib/node_modules/awaitqueue/src/ |
Upload File : |
export type AwaitQueueOptions = { /** * Custom Error derived class that will be used to reject pending tasks after * close() method has been called. If not set, Error class is used. */ ClosedErrorClass?: any; /** * Custom Error derived class that will be used to reject pending tasks after * stop() method has been called. If not set, Error class is used. */ StoppedErrorClass?: any; }; export type AwaitQueueTask<T> = () => (Promise<T> | T); export type AwaitQueueDumpItem = { task: AwaitQueueTask<unknown>; name?: string; enqueuedTime: number; executingTime: number; }; type PendingTask = { task: AwaitQueueTask<unknown>; name?: string; resolve: (...args: any[]) => any; reject: (error: Error) => void; enqueuedAt: Date; executedAt?: Date; stopped: boolean; } export class AwaitQueue { // Closed flag. private closed = false; // Queue of pending tasks. private readonly pendingTasks: Array<PendingTask> = []; // Error class used when rejecting a task due to AwaitQueue being closed. private readonly ClosedErrorClass = Error; // Error class used when rejecting a task due to AwaitQueue being stopped. private readonly StoppedErrorClass = Error; constructor( { ClosedErrorClass = Error, StoppedErrorClass = Error }: AwaitQueueOptions = { ClosedErrorClass : Error, StoppedErrorClass : Error } ) { this.ClosedErrorClass = ClosedErrorClass; this.StoppedErrorClass = StoppedErrorClass; } /** * The number of ongoing enqueued tasks. */ get size(): number { return this.pendingTasks.length; } /** * Closes the AwaitQueue. Pending tasks will be rejected with ClosedErrorClass * error. */ close(): void { if (this.closed) return; this.closed = true; for (const pendingTask of this.pendingTasks) { pendingTask.stopped = true; pendingTask.reject(new this.ClosedErrorClass('AwaitQueue closed')); } // Enpty the pending tasks array. this.pendingTasks.length = 0; } /** * Accepts a task as argument (and an optional task name) and enqueues it after * pending tasks. Once processed, the push() method resolves (or rejects) with * the result returned by the given task. * * The given task must return a Promise or directly a value. */ async push<T>(task: AwaitQueueTask<T>, name?: string): Promise<T> { if (this.closed) throw new this.ClosedErrorClass('AwaitQueue closed'); if (typeof task !== 'function') throw new TypeError('given task is not a function'); if (!task.name && name) { try { Object.defineProperty(task, 'name', { value: name }); } catch (error) {} } return new Promise((resolve, reject) => { const pendingTask: PendingTask = { task, name, resolve, reject, stopped : false, enqueuedAt : new Date(), executedAt : undefined }; // Append task to the queue. this.pendingTasks.push(pendingTask); // And run it if this is the only task in the queue. if (this.pendingTasks.length === 1) this.next(); }); } /** * Make ongoing pending tasks reject with the given StoppedErrorClass error. * The AwaitQueue instance is still usable for future tasks added via push() * method. */ stop(): void { if (this.closed) return; for (const pendingTask of this.pendingTasks) { pendingTask.stopped = true; pendingTask.reject(new this.StoppedErrorClass('AwaitQueue stopped')); } // Enpty the pending tasks array. this.pendingTasks.length = 0; } dump(): AwaitQueueDumpItem[] { const now = new Date(); return this.pendingTasks.map((pendingTask) => { return { task : pendingTask.task, name : pendingTask.name, enqueuedTime : pendingTask.executedAt ? pendingTask.executedAt.getTime() - pendingTask.enqueuedAt.getTime() : now.getTime() - pendingTask.enqueuedAt.getTime(), executingTime : pendingTask.executedAt ? now.getTime() - pendingTask.executedAt.getTime() : 0 }; }); } private async next(): Promise<any> { // Take the first pending task. const pendingTask = this.pendingTasks[0]; if (!pendingTask) return; // Execute it. await this.executeTask(pendingTask); // Remove the first pending task (the completed one) from the queue. this.pendingTasks.shift(); // And continue. this.next(); } private async executeTask(pendingTask: PendingTask): Promise<any> { // If the task is stopped, ignore it. if (pendingTask.stopped) return; pendingTask.executedAt = new Date(); try { const result = await pendingTask.task(); // If the task is stopped, ignore it. if (pendingTask.stopped) return; // Resolve the task with the returned result (if any). pendingTask.resolve(result); } catch (error) { // If the task is stopped, ignore it. if (pendingTask.stopped) return; // Reject the task with its own error. pendingTask.reject(error); } } }