Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
### 4.16.0

* Performance: Replaced `ref` cells with `mutable` locals in the `ofSeq`, `tryWith`, and `tryFinally` enumerator state machines. Each call to `ofSeq` (or any async CE block using `try...with` / `try...finally` / `use`) previously heap-allocated a `Ref<T>` wrapper object per enumerator; it now uses a direct mutable field in the generated class, reducing GC pressure. The change is equivalent to the `mutable`-for-`ref` improvement introduced in 4.11.0 for other enumerators.
* Added `AsyncSeq.insertManyAt` — inserts multiple values before the element at the specified index; mirrors `Seq.insertManyAt` / `List.insertManyAt`.
* Added `AsyncSeq.removeManyAt` — removes a range of elements starting at the specified index; mirrors `Seq.removeManyAt` / `List.removeManyAt`.
* Added `AsyncSeq.splitInto` — splits the sequence into at most N chunks of as-equal-as-possible size; mirrors `Seq.splitInto` / `Array.splitInto`.
* Added `AsyncSeq.unzip` — splits an async sequence of pairs into two arrays. Mirrors `List.unzip`.
* Added `AsyncSeq.unzip3` — splits an async sequence of triples into three arrays. Mirrors `List.unzip3`.
* Added `AsyncSeq.map2` — applies a function to corresponding elements of two async sequences; stops when either is exhausted. Mirrors `Seq.map2`.
Expand Down
37 changes: 37 additions & 0 deletions src/FSharp.Control.AsyncSeq/AsyncSeq.fs
Original file line number Diff line number Diff line change
Expand Up @@ -1732,6 +1732,27 @@
elif i < index then
invalidArg "index" "The index is outside the range of elements in the collection." }

let insertManyAt (index : int) (values : seq<'T>) (source : AsyncSeq<'T>) : AsyncSeq<'T> = asyncSeq {
if index < 0 then invalidArg "index" "must be non-negative"
let mutable i = 0
for x in source do
if i = index then yield! values
yield x
i <- i + 1
if i = index then yield! values
elif i < index then
invalidArg "index" "The index is outside the range of elements in the collection." }

let removeManyAt (index : int) (count : int) (source : AsyncSeq<'T>) : AsyncSeq<'T> = asyncSeq {
if index < 0 then invalidArg "index" "must be non-negative"
if count < 0 then invalidArg "count" "must be non-negative"
let mutable i = 0
for x in source do
if i < index || i >= index + count then yield x
i <- i + 1
if count > 0 && i < index + count then
invalidArg "index" "The index or count is outside the range of elements in the collection." }

#if !FABLE_COMPILER
let iterAsyncParallel (f:'a -> Async<unit>) (s:AsyncSeq<'a>) : Async<unit> = async {
use mb = MailboxProcessor.Start (ignore >> async.Return)
Expand Down Expand Up @@ -2260,6 +2281,22 @@
let toArraySynchronously (source:AsyncSeq<'T>) = toArrayAsync source |> Async.RunSynchronously
#endif

let splitInto (count : int) (source : AsyncSeq<'T>) : Async<'T[] array> = async {
if count < 1 then invalidArg "count" "must be positive"
let! arr = toArrayAsync source
let total = arr.Length
let result =
if total = 0 then [||]
else
let n = Operators.min count total
let minSize = total / n
let extras = total % n
Array.init n (fun i ->
let chunkStart = i * minSize + Operators.min i extras
let chunkSize = minSize + (if i < extras then 1 else 0)
Array.sub arr chunkStart chunkSize)
return result }

let cycle (source: AsyncSeq<'T>) : AsyncSeq<'T> =
asyncSeq {
let! arr = source |> toArrayAsync
Expand Down Expand Up @@ -2774,7 +2811,7 @@

[<CompilerMessage("The result of groupBy must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.", 9999)>]
let groupBy (p:'a -> 'k) (s:AsyncSeq<'a>) : AsyncSeq<'k * AsyncSeq<'a>> =
groupByAsync (p >> async.Return) s

Check warning on line 2814 in src/FSharp.Control.AsyncSeq/AsyncSeq.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupByAsync must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.

Check warning on line 2814 in src/FSharp.Control.AsyncSeq/AsyncSeq.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupByAsync must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
#endif
#endif

Expand Down
14 changes: 14 additions & 0 deletions src/FSharp.Control.AsyncSeq/AsyncSeq.fsi
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,20 @@ module AsyncSeq =
/// Raises ArgumentException if index is negative or greater than the sequence length. Mirrors Seq.insertAt.
val insertAt : index:int -> value:'T -> source:AsyncSeq<'T> -> AsyncSeq<'T>

/// Returns a new asynchronous sequence with the given values inserted before the element at the specified index.
/// An index equal to the length of the sequence appends the values at the end.
/// Raises ArgumentException if index is negative or greater than the sequence length. Mirrors Seq.insertManyAt.
val insertManyAt : index:int -> values:seq<'T> -> source:AsyncSeq<'T> -> AsyncSeq<'T>

/// Returns a new asynchronous sequence with the given number of elements removed starting at the specified index.
/// Raises ArgumentException if index is negative, count is negative, or index + count exceeds the sequence length. Mirrors Seq.removeManyAt.
val removeManyAt : index:int -> count:int -> source:AsyncSeq<'T> -> AsyncSeq<'T>

/// Splits the input asynchronous sequence into at most <c>count</c> chunks of as-equal-as-possible size.
/// The first (length mod count) chunks have one extra element. Materialises the source sequence into memory.
/// Raises ArgumentException if count is not positive. Mirrors Seq.splitInto.
val splitInto : count:int -> source:AsyncSeq<'T> -> Async<'T[] array>

/// Creates an asynchronous sequence that lazily takes element from an
/// input synchronous sequence and returns them one-by-one.
val ofSeq : source:seq<'T> -> AsyncSeq<'T>
Expand Down
190 changes: 190 additions & 0 deletions tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -2094,7 +2094,7 @@
let actual =
ls
|> AsyncSeq.ofSeq
|> AsyncSeq.groupBy p

Check warning on line 2097 in tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupBy must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
|> AsyncSeq.mapAsyncParallel (snd >> AsyncSeq.toListAsync)
Assert.AreEqual(expected, actual)

Expand All @@ -2103,7 +2103,7 @@
let expected = asyncSeq { raise (exn("test")) }
let actual =
asyncSeq { raise (exn("test")) }
|> AsyncSeq.groupBy (fun i -> i % 3)

Check warning on line 2106 in tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupBy must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
|> AsyncSeq.mapAsyncParallel (snd >> AsyncSeq.toListAsync)
Assert.AreEqual(expected, actual)

Expand Down Expand Up @@ -4006,6 +4006,196 @@
|> Async.RunSynchronously |> ignore)
|> ignore

// ===== insertManyAt =====

[<Test>]
let ``AsyncSeq.insertManyAt inserts multiple elements at specified index`` () =
let result =
AsyncSeq.ofSeq [ 1; 2; 3 ]
|> AsyncSeq.insertManyAt 1 [ 10; 20 ]
|> AsyncSeq.toArrayAsync
|> Async.RunSynchronously
Assert.AreEqual([| 1; 10; 20; 2; 3 |], result)

[<Test>]
let ``AsyncSeq.insertManyAt prepends when index is 0`` () =
let result =
AsyncSeq.ofSeq [ 1; 2; 3 ]
|> AsyncSeq.insertManyAt 0 [ 10; 20 ]
|> AsyncSeq.toArrayAsync
|> Async.RunSynchronously
Assert.AreEqual([| 10; 20; 1; 2; 3 |], result)

[<Test>]
let ``AsyncSeq.insertManyAt appends when index equals sequence length`` () =
let result =
AsyncSeq.ofSeq [ 1; 2; 3 ]
|> AsyncSeq.insertManyAt 3 [ 10; 20 ]
|> AsyncSeq.toArrayAsync
|> Async.RunSynchronously
Assert.AreEqual([| 1; 2; 3; 10; 20 |], result)

[<Test>]
let ``AsyncSeq.insertManyAt with empty values returns original sequence`` () =
let result =
AsyncSeq.ofSeq [ 1; 2; 3 ]
|> AsyncSeq.insertManyAt 1 []
|> AsyncSeq.toArrayAsync
|> Async.RunSynchronously
Assert.AreEqual([| 1; 2; 3 |], result)

[<Test>]
let ``AsyncSeq.insertManyAt raises ArgumentException for negative index`` () =
Assert.Throws<System.ArgumentException>(fun () ->
AsyncSeq.ofSeq [ 1; 2; 3 ]
|> AsyncSeq.insertManyAt -1 [ 10 ]
|> AsyncSeq.toArrayAsync
|> Async.RunSynchronously |> ignore)
|> ignore

[<Test>]
let ``AsyncSeq.insertManyAt raises ArgumentException when index exceeds length`` () =
Assert.Throws<System.ArgumentException>(fun () ->
AsyncSeq.ofSeq [ 1; 2; 3 ]
|> AsyncSeq.insertManyAt 5 [ 10 ]
|> AsyncSeq.toArrayAsync
|> Async.RunSynchronously |> ignore)
|> ignore

// ===== removeManyAt =====

[<Test>]
let ``AsyncSeq.removeManyAt removes elements at specified index and count`` () =
let result =
AsyncSeq.ofSeq [ 0; 1; 2; 3; 4 ]
|> AsyncSeq.removeManyAt 1 2
|> AsyncSeq.toArrayAsync
|> Async.RunSynchronously
Assert.AreEqual([| 0; 3; 4 |], result)

[<Test>]
let ``AsyncSeq.removeManyAt removes from beginning`` () =
let result =
AsyncSeq.ofSeq [ 1; 2; 3; 4 ]
|> AsyncSeq.removeManyAt 0 2
|> AsyncSeq.toArrayAsync
|> Async.RunSynchronously
Assert.AreEqual([| 3; 4 |], result)

[<Test>]
let ``AsyncSeq.removeManyAt removes from end`` () =
let result =
AsyncSeq.ofSeq [ 1; 2; 3; 4 ]
|> AsyncSeq.removeManyAt 2 2
|> AsyncSeq.toArrayAsync
|> Async.RunSynchronously
Assert.AreEqual([| 1; 2 |], result)

[<Test>]
let ``AsyncSeq.removeManyAt with count 0 returns original sequence`` () =
let result =
AsyncSeq.ofSeq [ 1; 2; 3 ]
|> AsyncSeq.removeManyAt 1 0
|> AsyncSeq.toArrayAsync
|> Async.RunSynchronously
Assert.AreEqual([| 1; 2; 3 |], result)

[<Test>]
let ``AsyncSeq.removeManyAt removes all elements`` () =
let result =
AsyncSeq.ofSeq [ 1; 2; 3 ]
|> AsyncSeq.removeManyAt 0 3
|> AsyncSeq.toArrayAsync
|> Async.RunSynchronously
Assert.AreEqual([||], result)

[<Test>]
let ``AsyncSeq.removeManyAt raises ArgumentException for negative index`` () =
Assert.Throws<System.ArgumentException>(fun () ->
AsyncSeq.ofSeq [ 1; 2; 3 ]
|> AsyncSeq.removeManyAt -1 1
|> AsyncSeq.toArrayAsync
|> Async.RunSynchronously |> ignore)
|> ignore

[<Test>]
let ``AsyncSeq.removeManyAt raises ArgumentException for negative count`` () =
Assert.Throws<System.ArgumentException>(fun () ->
AsyncSeq.ofSeq [ 1; 2; 3 ]
|> AsyncSeq.removeManyAt 0 -1
|> AsyncSeq.toArrayAsync
|> Async.RunSynchronously |> ignore)
|> ignore

[<Test>]
let ``AsyncSeq.removeManyAt raises ArgumentException when range exceeds sequence`` () =
Assert.Throws<System.ArgumentException>(fun () ->
AsyncSeq.ofSeq [ 1; 2; 3 ]
|> AsyncSeq.removeManyAt 2 2
|> AsyncSeq.toArrayAsync
|> Async.RunSynchronously |> ignore)
|> ignore

// ===== splitInto =====

[<Test>]
let ``AsyncSeq.splitInto splits sequence into equal chunks`` () =
let result =
AsyncSeq.ofSeq [ 1; 2; 3; 4; 5; 6 ]
|> AsyncSeq.splitInto 3
|> Async.RunSynchronously
Assert.AreEqual(3, result.Length)
Assert.AreEqual([| 1; 2 |], result.[0])
Assert.AreEqual([| 3; 4 |], result.[1])
Assert.AreEqual([| 5; 6 |], result.[2])

[<Test>]
let ``AsyncSeq.splitInto distributes remainder to first chunks`` () =
let result =
AsyncSeq.ofSeq [ 1 .. 7 ]
|> AsyncSeq.splitInto 3
|> Async.RunSynchronously
Assert.AreEqual(3, result.Length)
Assert.AreEqual([| 1; 2; 3 |], result.[0])
Assert.AreEqual([| 4; 5 |], result.[1])
Assert.AreEqual([| 6; 7 |], result.[2])

[<Test>]
let ``AsyncSeq.splitInto with count 1 returns single chunk`` () =
let result =
AsyncSeq.ofSeq [ 1; 2; 3 ]
|> AsyncSeq.splitInto 1
|> Async.RunSynchronously
Assert.AreEqual(1, result.Length)
Assert.AreEqual([| 1; 2; 3 |], result.[0])

[<Test>]
let ``AsyncSeq.splitInto with count greater than length returns one chunk per element`` () =
let result =
AsyncSeq.ofSeq [ 1; 2; 3 ]
|> AsyncSeq.splitInto 10
|> Async.RunSynchronously
Assert.AreEqual(3, result.Length)
Assert.AreEqual([| 1 |], result.[0])
Assert.AreEqual([| 2 |], result.[1])
Assert.AreEqual([| 3 |], result.[2])

[<Test>]
let ``AsyncSeq.splitInto with empty sequence returns empty array`` () =
let result =
AsyncSeq.empty<int>
|> AsyncSeq.splitInto 3
|> Async.RunSynchronously
Assert.AreEqual([||], result)

[<Test>]
let ``AsyncSeq.splitInto raises ArgumentException when count is zero`` () =
Assert.Throws<System.ArgumentException>(fun () ->
AsyncSeq.ofSeq [ 1; 2; 3 ]
|> AsyncSeq.splitInto 0
|> Async.RunSynchronously |> ignore)
|> ignore

[<Test>]
let ``AsyncSeq.take more than length returns all elements`` () =
let result =
Expand Down Expand Up @@ -4596,7 +4786,7 @@
let ``AsyncSeq.groupByAsync groups elements by async projection`` () =
let result =
AsyncSeq.ofSeq [1..6]
|> AsyncSeq.groupByAsync (fun x -> async { return x % 2 })

Check warning on line 4789 in tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupByAsync must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
|> AsyncSeq.mapAsyncParallel (fun (key, grp) -> async {
let! items = AsyncSeq.toArrayAsync grp
return key, Array.sort items })
Expand All @@ -4609,7 +4799,7 @@
let ``AsyncSeq.groupByAsync on empty sequence returns empty`` () =
let result =
AsyncSeq.empty<int>
|> AsyncSeq.groupByAsync (fun x -> async { return x % 2 })

Check warning on line 4802 in tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupByAsync must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
|> AsyncSeq.toArrayAsync
|> Async.RunSynchronously
Assert.AreEqual([||], result)
Expand All @@ -4618,7 +4808,7 @@
let ``AsyncSeq.groupByAsync with all-same key produces single group`` () =
let result =
AsyncSeq.ofSeq [1; 2; 3]
|> AsyncSeq.groupByAsync (fun _ -> async { return "same" })

Check warning on line 4811 in tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupByAsync must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
|> AsyncSeq.mapAsyncParallel (fun (key, grp) -> async {
let! items = AsyncSeq.toArrayAsync grp
return key, Array.sort items })
Expand Down
Loading