-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathstreams.ts
More file actions
103 lines (99 loc) · 2.63 KB
/
streams.ts
File metadata and controls
103 lines (99 loc) · 2.63 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
/**
* @fileoverview Stream processing utilities with streaming-iterables integration.
* Provides async stream handling and transformation functions.
*/
import {
parallelMap as siParallelMap,
transform as siTransform,
} from './external/streaming-iterables'
import type { IterationOptions } from './promises'
import { normalizeIterationOptions, pRetry } from './promises'
/**
* Execute a function for each item in an iterable in parallel.
*
* @example
* ```typescript
* const urls = ['https://a.io', 'https://b.io']
* await parallelEach(urls, async (url) => {
* await fetch(url)
* }, { concurrency: 4 })
* ```
*/
/*@__NO_SIDE_EFFECTS__*/
export async function parallelEach<T>(
iterable: Iterable<T> | AsyncIterable<T>,
func: (item: T) => Promise<unknown>,
options?: number | IterationOptions,
): Promise<void> {
for await (const _ of parallelMap(iterable, func, options)) {
/* empty block */
}
}
/**
* Map over an iterable in parallel with concurrency control.
*
* @example
* ```typescript
* const ids = [1, 2, 3]
* for await (const result of parallelMap(ids, async (id) => {
* return await fetchData(id)
* }, 4)) {
* console.log(result)
* }
* ```
*/
/*@__NO_SIDE_EFFECTS__*/
export function parallelMap<T, U>(
iterable: Iterable<T> | AsyncIterable<T>,
func: (item: T) => Promise<U>,
options?: number | IterationOptions,
): AsyncIterable<U> {
const opts = normalizeIterationOptions(options)
/* c8 ignore next - External streaming-iterables call */
const result = siParallelMap(
opts.concurrency,
async (item: T) => {
const result = await pRetry((...args: unknown[]) => func(args[0] as T), {
...opts.retries,
args: [item],
})
return result as U
},
iterable,
)
return result as AsyncIterable<U>
}
/**
* Transform an iterable with a function.
*
* @example
* ```typescript
* const lines = ['hello', 'world']
* for await (const upper of transform(lines, async (line) => {
* return line.toUpperCase()
* })) {
* console.log(upper) // 'HELLO', 'WORLD'
* }
* ```
*/
/*@__NO_SIDE_EFFECTS__*/
export function transform<T, U>(
iterable: Iterable<T> | AsyncIterable<T>,
func: (item: T) => Promise<U>,
options?: number | IterationOptions,
): AsyncIterable<U> {
const opts = normalizeIterationOptions(options)
/* c8 ignore next - External streaming-iterables call */
const result = siTransform(
opts.concurrency,
async (item: T) => {
const result = await pRetry((...args: unknown[]) => func(args[0] as T), {
...opts.retries,
args: [item],
})
return result as U
},
iterable,
)
return result as AsyncIterable<U>
}