-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpromise-queue.ts
More file actions
146 lines (130 loc) · 4.17 KB
/
promise-queue.ts
File metadata and controls
146 lines (130 loc) · 4.17 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
/**
* @fileoverview Bounded concurrency promise queue.
* Exports the `PromiseQueue` class, which limits how many async tasks run
* simultaneously, supports an optional max queue length (new tasks beyond
* the cap are rejected with "Task dropped: queue length exceeded"), and
* exposes an idle-wait helper.
*/
import { ErrorCtor, PromiseCtor } from './primordials'
type QueuedTask<T> = {
fn: () => Promise<T>
resolve: (value: T) => void
reject: (error: unknown) => void
}
export class PromiseQueue {
private queue: Array<QueuedTask<unknown>> = []
private running = 0
private idleResolvers: Array<() => void> = []
private readonly maxConcurrency: number
private readonly maxQueueLength: number | undefined
/**
* Creates a new PromiseQueue
* @param maxConcurrency - Maximum number of promises that can run concurrently
* @param maxQueueLength - Maximum queue size; submissions past the cap
* reject with "Task dropped: queue length exceeded" instead of evicting
* a caller that has been waiting patiently. Callers must handle this
* rejection or they'll see an unhandled rejection.
*/
constructor(maxConcurrency: number, maxQueueLength?: number | undefined) {
this.maxConcurrency = maxConcurrency
this.maxQueueLength = maxQueueLength
if (maxConcurrency < 1) {
throw new ErrorCtor('maxConcurrency must be at least 1')
}
}
/**
* Add a task to the queue
* @param fn - Async function to execute
* @returns Promise that resolves with the function's result, or rejects
* with "Task dropped: queue length exceeded" if the queue is full.
*/
async add<T>(fn: () => Promise<T>): Promise<T> {
return await new PromiseCtor<T>((resolve, reject) => {
// Reject the newcomer rather than evicting an earlier-submitted task.
// FIFO fairness: the caller who waited longest gets served, not the
// caller who arrived last. Previously this dropped the queue head,
// which punished patient callers and violated typical
// bounded-queue semantics.
if (
this.maxQueueLength !== undefined &&
this.queue.length >= this.maxQueueLength
) {
reject(new ErrorCtor('Task dropped: queue length exceeded'))
return
}
const task: QueuedTask<T> = { fn, resolve, reject }
this.queue.push(task as QueuedTask<unknown>)
this.runNext()
})
}
private runNext(): void {
if (this.running >= this.maxConcurrency || this.queue.length === 0) {
this.notifyIdleIfNeeded()
return
}
const task = this.queue.shift()
if (!task) {
return
}
this.running++
// The async IIFE defers task.fn() by one microtask, which
// converts a synchronous throw inside task.fn() into a
// rejection routed to task.reject rather than escaping as an
// uncaught exception. Same semantics as the prior
// PromiseResolve().then(() => task.fn()) wrapper, just
// expressed as try/catch/finally.
;(async () => {
try {
const result = await task.fn()
task.resolve(result)
} catch (err) {
task.reject(err)
} finally {
this.running--
this.runNext()
}
})()
}
private notifyIdleIfNeeded(): void {
if (this.running === 0 && this.queue.length === 0) {
for (const resolve of this.idleResolvers) {
resolve()
}
this.idleResolvers = []
}
}
/**
* Wait for all queued and running tasks to complete
*/
async onIdle(): Promise<void> {
if (this.running === 0 && this.queue.length === 0) {
return
}
return await new PromiseCtor<void>(resolve => {
this.idleResolvers.push(resolve)
})
}
/**
* Get the number of tasks currently running
*/
get activeCount(): number {
return this.running
}
/**
* Get the number of tasks waiting in the queue
*/
get pendingCount(): number {
return this.queue.length
}
/**
* Clear all pending tasks from the queue (does not affect running tasks)
*/
clear(): void {
const pending = this.queue
this.queue = []
for (const task of pending) {
task.reject(new ErrorCtor('Task cancelled: queue cleared'))
}
this.notifyIdleIfNeeded()
}
}