diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 7290415..d86d158 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -6,6 +6,9 @@ ### 4.16.0 * Performance: Replaced `ref` cells with `mutable` locals in the `ofSeq`, `tryWith`, and `tryFinally` enumerator state machines. Each call to `ofSeq` (or any async CE block using `try...with` / `try...finally` / `use`) previously heap-allocated a `Ref` wrapper object per enumerator; it now uses a direct mutable field in the generated class, reducing GC pressure. The change is equivalent to the `mutable`-for-`ref` improvement introduced in 4.11.0 for other enumerators. +* Added `AsyncSeq.insertManyAt` — inserts multiple values before the element at the specified index; mirrors `Seq.insertManyAt` / `List.insertManyAt`. +* Added `AsyncSeq.removeManyAt` — removes a range of elements starting at the specified index; mirrors `Seq.removeManyAt` / `List.removeManyAt`. +* Added `AsyncSeq.splitInto` — splits the sequence into at most N chunks of as-equal-as-possible size; mirrors `Seq.splitInto` / `Array.splitInto`. * Added `AsyncSeq.unzip` — splits an async sequence of pairs into two arrays. Mirrors `List.unzip`. * Added `AsyncSeq.unzip3` — splits an async sequence of triples into three arrays. Mirrors `List.unzip3`. * Added `AsyncSeq.map2` — applies a function to corresponding elements of two async sequences; stops when either is exhausted. Mirrors `Seq.map2`. diff --git a/src/FSharp.Control.AsyncSeq/AsyncSeq.fs b/src/FSharp.Control.AsyncSeq/AsyncSeq.fs index ddc5e71..bcd90c3 100644 --- a/src/FSharp.Control.AsyncSeq/AsyncSeq.fs +++ b/src/FSharp.Control.AsyncSeq/AsyncSeq.fs @@ -1732,6 +1732,27 @@ module AsyncSeq = elif i < index then invalidArg "index" "The index is outside the range of elements in the collection." } + let insertManyAt (index : int) (values : seq<'T>) (source : AsyncSeq<'T>) : AsyncSeq<'T> = asyncSeq { + if index < 0 then invalidArg "index" "must be non-negative" + let mutable i = 0 + for x in source do + if i = index then yield! values + yield x + i <- i + 1 + if i = index then yield! values + elif i < index then + invalidArg "index" "The index is outside the range of elements in the collection." } + + let removeManyAt (index : int) (count : int) (source : AsyncSeq<'T>) : AsyncSeq<'T> = asyncSeq { + if index < 0 then invalidArg "index" "must be non-negative" + if count < 0 then invalidArg "count" "must be non-negative" + let mutable i = 0 + for x in source do + if i < index || i >= index + count then yield x + i <- i + 1 + if count > 0 && i < index + count then + invalidArg "index" "The index or count is outside the range of elements in the collection." } + #if !FABLE_COMPILER let iterAsyncParallel (f:'a -> Async) (s:AsyncSeq<'a>) : Async = async { use mb = MailboxProcessor.Start (ignore >> async.Return) @@ -2260,6 +2281,22 @@ module AsyncSeq = let toArraySynchronously (source:AsyncSeq<'T>) = toArrayAsync source |> Async.RunSynchronously #endif + let splitInto (count : int) (source : AsyncSeq<'T>) : Async<'T[] array> = async { + if count < 1 then invalidArg "count" "must be positive" + let! arr = toArrayAsync source + let total = arr.Length + let result = + if total = 0 then [||] + else + let n = Operators.min count total + let minSize = total / n + let extras = total % n + Array.init n (fun i -> + let chunkStart = i * minSize + Operators.min i extras + let chunkSize = minSize + (if i < extras then 1 else 0) + Array.sub arr chunkStart chunkSize) + return result } + let cycle (source: AsyncSeq<'T>) : AsyncSeq<'T> = asyncSeq { let! arr = source |> toArrayAsync diff --git a/src/FSharp.Control.AsyncSeq/AsyncSeq.fsi b/src/FSharp.Control.AsyncSeq/AsyncSeq.fsi index d8ffec4..b46b569 100644 --- a/src/FSharp.Control.AsyncSeq/AsyncSeq.fsi +++ b/src/FSharp.Control.AsyncSeq/AsyncSeq.fsi @@ -489,6 +489,20 @@ module AsyncSeq = /// Raises ArgumentException if index is negative or greater than the sequence length. Mirrors Seq.insertAt. val insertAt : index:int -> value:'T -> source:AsyncSeq<'T> -> AsyncSeq<'T> + /// Returns a new asynchronous sequence with the given values inserted before the element at the specified index. + /// An index equal to the length of the sequence appends the values at the end. + /// Raises ArgumentException if index is negative or greater than the sequence length. Mirrors Seq.insertManyAt. + val insertManyAt : index:int -> values:seq<'T> -> source:AsyncSeq<'T> -> AsyncSeq<'T> + + /// Returns a new asynchronous sequence with the given number of elements removed starting at the specified index. + /// Raises ArgumentException if index is negative, count is negative, or index + count exceeds the sequence length. Mirrors Seq.removeManyAt. + val removeManyAt : index:int -> count:int -> source:AsyncSeq<'T> -> AsyncSeq<'T> + + /// Splits the input asynchronous sequence into at most count chunks of as-equal-as-possible size. + /// The first (length mod count) chunks have one extra element. Materialises the source sequence into memory. + /// Raises ArgumentException if count is not positive. Mirrors Seq.splitInto. + val splitInto : count:int -> source:AsyncSeq<'T> -> Async<'T[] array> + /// Creates an asynchronous sequence that lazily takes element from an /// input synchronous sequence and returns them one-by-one. val ofSeq : source:seq<'T> -> AsyncSeq<'T> diff --git a/tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs b/tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs index 24748af..1b5ea99 100644 --- a/tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs +++ b/tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs @@ -4006,6 +4006,196 @@ let ``AsyncSeq.insertAt raises ArgumentException when index exceeds length`` () |> Async.RunSynchronously |> ignore) |> ignore +// ===== insertManyAt ===== + +[] +let ``AsyncSeq.insertManyAt inserts multiple elements at specified index`` () = + let result = + AsyncSeq.ofSeq [ 1; 2; 3 ] + |> AsyncSeq.insertManyAt 1 [ 10; 20 ] + |> AsyncSeq.toArrayAsync + |> Async.RunSynchronously + Assert.AreEqual([| 1; 10; 20; 2; 3 |], result) + +[] +let ``AsyncSeq.insertManyAt prepends when index is 0`` () = + let result = + AsyncSeq.ofSeq [ 1; 2; 3 ] + |> AsyncSeq.insertManyAt 0 [ 10; 20 ] + |> AsyncSeq.toArrayAsync + |> Async.RunSynchronously + Assert.AreEqual([| 10; 20; 1; 2; 3 |], result) + +[] +let ``AsyncSeq.insertManyAt appends when index equals sequence length`` () = + let result = + AsyncSeq.ofSeq [ 1; 2; 3 ] + |> AsyncSeq.insertManyAt 3 [ 10; 20 ] + |> AsyncSeq.toArrayAsync + |> Async.RunSynchronously + Assert.AreEqual([| 1; 2; 3; 10; 20 |], result) + +[] +let ``AsyncSeq.insertManyAt with empty values returns original sequence`` () = + let result = + AsyncSeq.ofSeq [ 1; 2; 3 ] + |> AsyncSeq.insertManyAt 1 [] + |> AsyncSeq.toArrayAsync + |> Async.RunSynchronously + Assert.AreEqual([| 1; 2; 3 |], result) + +[] +let ``AsyncSeq.insertManyAt raises ArgumentException for negative index`` () = + Assert.Throws(fun () -> + AsyncSeq.ofSeq [ 1; 2; 3 ] + |> AsyncSeq.insertManyAt -1 [ 10 ] + |> AsyncSeq.toArrayAsync + |> Async.RunSynchronously |> ignore) + |> ignore + +[] +let ``AsyncSeq.insertManyAt raises ArgumentException when index exceeds length`` () = + Assert.Throws(fun () -> + AsyncSeq.ofSeq [ 1; 2; 3 ] + |> AsyncSeq.insertManyAt 5 [ 10 ] + |> AsyncSeq.toArrayAsync + |> Async.RunSynchronously |> ignore) + |> ignore + +// ===== removeManyAt ===== + +[] +let ``AsyncSeq.removeManyAt removes elements at specified index and count`` () = + let result = + AsyncSeq.ofSeq [ 0; 1; 2; 3; 4 ] + |> AsyncSeq.removeManyAt 1 2 + |> AsyncSeq.toArrayAsync + |> Async.RunSynchronously + Assert.AreEqual([| 0; 3; 4 |], result) + +[] +let ``AsyncSeq.removeManyAt removes from beginning`` () = + let result = + AsyncSeq.ofSeq [ 1; 2; 3; 4 ] + |> AsyncSeq.removeManyAt 0 2 + |> AsyncSeq.toArrayAsync + |> Async.RunSynchronously + Assert.AreEqual([| 3; 4 |], result) + +[] +let ``AsyncSeq.removeManyAt removes from end`` () = + let result = + AsyncSeq.ofSeq [ 1; 2; 3; 4 ] + |> AsyncSeq.removeManyAt 2 2 + |> AsyncSeq.toArrayAsync + |> Async.RunSynchronously + Assert.AreEqual([| 1; 2 |], result) + +[] +let ``AsyncSeq.removeManyAt with count 0 returns original sequence`` () = + let result = + AsyncSeq.ofSeq [ 1; 2; 3 ] + |> AsyncSeq.removeManyAt 1 0 + |> AsyncSeq.toArrayAsync + |> Async.RunSynchronously + Assert.AreEqual([| 1; 2; 3 |], result) + +[] +let ``AsyncSeq.removeManyAt removes all elements`` () = + let result = + AsyncSeq.ofSeq [ 1; 2; 3 ] + |> AsyncSeq.removeManyAt 0 3 + |> AsyncSeq.toArrayAsync + |> Async.RunSynchronously + Assert.AreEqual([||], result) + +[] +let ``AsyncSeq.removeManyAt raises ArgumentException for negative index`` () = + Assert.Throws(fun () -> + AsyncSeq.ofSeq [ 1; 2; 3 ] + |> AsyncSeq.removeManyAt -1 1 + |> AsyncSeq.toArrayAsync + |> Async.RunSynchronously |> ignore) + |> ignore + +[] +let ``AsyncSeq.removeManyAt raises ArgumentException for negative count`` () = + Assert.Throws(fun () -> + AsyncSeq.ofSeq [ 1; 2; 3 ] + |> AsyncSeq.removeManyAt 0 -1 + |> AsyncSeq.toArrayAsync + |> Async.RunSynchronously |> ignore) + |> ignore + +[] +let ``AsyncSeq.removeManyAt raises ArgumentException when range exceeds sequence`` () = + Assert.Throws(fun () -> + AsyncSeq.ofSeq [ 1; 2; 3 ] + |> AsyncSeq.removeManyAt 2 2 + |> AsyncSeq.toArrayAsync + |> Async.RunSynchronously |> ignore) + |> ignore + +// ===== splitInto ===== + +[] +let ``AsyncSeq.splitInto splits sequence into equal chunks`` () = + let result = + AsyncSeq.ofSeq [ 1; 2; 3; 4; 5; 6 ] + |> AsyncSeq.splitInto 3 + |> Async.RunSynchronously + Assert.AreEqual(3, result.Length) + Assert.AreEqual([| 1; 2 |], result.[0]) + Assert.AreEqual([| 3; 4 |], result.[1]) + Assert.AreEqual([| 5; 6 |], result.[2]) + +[] +let ``AsyncSeq.splitInto distributes remainder to first chunks`` () = + let result = + AsyncSeq.ofSeq [ 1 .. 7 ] + |> AsyncSeq.splitInto 3 + |> Async.RunSynchronously + Assert.AreEqual(3, result.Length) + Assert.AreEqual([| 1; 2; 3 |], result.[0]) + Assert.AreEqual([| 4; 5 |], result.[1]) + Assert.AreEqual([| 6; 7 |], result.[2]) + +[] +let ``AsyncSeq.splitInto with count 1 returns single chunk`` () = + let result = + AsyncSeq.ofSeq [ 1; 2; 3 ] + |> AsyncSeq.splitInto 1 + |> Async.RunSynchronously + Assert.AreEqual(1, result.Length) + Assert.AreEqual([| 1; 2; 3 |], result.[0]) + +[] +let ``AsyncSeq.splitInto with count greater than length returns one chunk per element`` () = + let result = + AsyncSeq.ofSeq [ 1; 2; 3 ] + |> AsyncSeq.splitInto 10 + |> Async.RunSynchronously + Assert.AreEqual(3, result.Length) + Assert.AreEqual([| 1 |], result.[0]) + Assert.AreEqual([| 2 |], result.[1]) + Assert.AreEqual([| 3 |], result.[2]) + +[] +let ``AsyncSeq.splitInto with empty sequence returns empty array`` () = + let result = + AsyncSeq.empty + |> AsyncSeq.splitInto 3 + |> Async.RunSynchronously + Assert.AreEqual([||], result) + +[] +let ``AsyncSeq.splitInto raises ArgumentException when count is zero`` () = + Assert.Throws(fun () -> + AsyncSeq.ofSeq [ 1; 2; 3 ] + |> AsyncSeq.splitInto 0 + |> Async.RunSynchronously |> ignore) + |> ignore + [] let ``AsyncSeq.take more than length returns all elements`` () = let result =