Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
### 4.17.0

* Added `AsyncSeq.exists2` β€” asynchronously tests whether any corresponding pair of elements in two async sequences satisfies the predicate. Evaluates pairwise up to the shorter sequence; short-circuits on first match. Mirrors `Seq.exists2`.
* Added `AsyncSeq.exists2Async` β€” asynchronous-predicate variant of `exists2`.
* Added `AsyncSeq.forall2` β€” asynchronously tests whether all corresponding pairs of elements in two async sequences satisfy the predicate. Evaluates pairwise up to the shorter sequence; short-circuits on first failure. Mirrors `Seq.forall2`.
* Added `AsyncSeq.forall2Async` β€” asynchronous-predicate variant of `forall2`.
* Performance: Optimised `AsyncSeq.pairwise` to use a `hasPrev` flag and a direct `mutable` field instead of wrapping the previous element in `Some`. Previously, each iteration allocated a new `'T option` object on the heap; the new implementation eliminates that allocation entirely, reducing GC pressure for long sequences.
* Bug fix: `AsyncSeq.splitAt` and `AsyncSeq.tryTail` now correctly dispose the underlying enumerator when an exception or cancellation occurs during the initial `MoveNext` call. Previously the enumerator could leak if the source sequence threw during the first few steps.

Expand Down
50 changes: 50 additions & 0 deletions src/FSharp.Control.AsyncSeq/AsyncSeq.fs
Original file line number Diff line number Diff line change
Expand Up @@ -1472,6 +1472,56 @@
let forallAsync f (source : AsyncSeq<'T>) =
source |> existsAsync (fun v -> async { let! b = f v in return not b }) |> Async.map not

let exists2Async (predicate: 'T1 -> 'T2 -> Async<bool>) (source1: AsyncSeq<'T1>) (source2: AsyncSeq<'T2>) : Async<bool> = async {
use ie1 = source1.GetEnumerator()
use ie2 = source2.GetEnumerator()
let! m1 = ie1.MoveNext()
let! m2 = ie2.MoveNext()
let mutable b1 = m1
let mutable b2 = m2
let mutable result = false
let mutable isDone = false
while not isDone do
match b1, b2 with
| None, _ | _, None -> isDone <- true
| Some v1, Some v2 ->
let! ok = predicate v1 v2
if ok then result <- true; isDone <- true
else
let! n1 = ie1.MoveNext()
let! n2 = ie2.MoveNext()
b1 <- n1
b2 <- n2
return result }

let exists2 (predicate: 'T1 -> 'T2 -> bool) (source1: AsyncSeq<'T1>) (source2: AsyncSeq<'T2>) : Async<bool> =
exists2Async (fun a b -> async.Return (predicate a b)) source1 source2

let forall2Async (predicate: 'T1 -> 'T2 -> Async<bool>) (source1: AsyncSeq<'T1>) (source2: AsyncSeq<'T2>) : Async<bool> = async {
use ie1 = source1.GetEnumerator()
use ie2 = source2.GetEnumerator()
let! m1 = ie1.MoveNext()
let! m2 = ie2.MoveNext()
let mutable b1 = m1
let mutable b2 = m2
let mutable result = true
let mutable isDone = false
while not isDone do
match b1, b2 with
| None, _ | _, None -> isDone <- true
| Some v1, Some v2 ->
let! ok = predicate v1 v2
if not ok then result <- false; isDone <- true
else
let! n1 = ie1.MoveNext()
let! n2 = ie2.MoveNext()
b1 <- n1
b2 <- n2
return result }

let forall2 (predicate: 'T1 -> 'T2 -> bool) (source1: AsyncSeq<'T1>) (source2: AsyncSeq<'T2>) : Async<bool> =
forall2Async (fun a b -> async.Return (predicate a b)) source1 source2

let compareWithAsync (comparer: 'T -> 'T -> Async<int>) (source1: AsyncSeq<'T>) (source2: AsyncSeq<'T>) : Async<int> = async {
use ie1 = source1.GetEnumerator()
use ie2 = source2.GetEnumerator()
Expand Down Expand Up @@ -2774,7 +2824,7 @@

[<CompilerMessage("The result of groupBy must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.", 9999)>]
let groupBy (p:'a -> 'k) (s:AsyncSeq<'a>) : AsyncSeq<'k * AsyncSeq<'a>> =
groupByAsync (p >> async.Return) s

Check warning on line 2827 in src/FSharp.Control.AsyncSeq/AsyncSeq.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupByAsync must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.

Check warning on line 2827 in src/FSharp.Control.AsyncSeq/AsyncSeq.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupByAsync must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
#endif
#endif

Expand Down
18 changes: 18 additions & 0 deletions src/FSharp.Control.AsyncSeq/AsyncSeq.fsi
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,24 @@ module AsyncSeq =
/// Asynchronously determine if the async predicate returns true for all values in the sequence
val forallAsync : predicate:('T -> Async<bool>) -> source:AsyncSeq<'T> -> Async<bool>

/// Asynchronously determine if any corresponding pair of elements in two async sequences satisfies
/// the predicate. Evaluates pairwise up to the shorter of the two sequences; short-circuits on first match.
/// Mirrors <c>Seq.exists2</c>.
val exists2 : predicate:('T1 -> 'T2 -> bool) -> source1:AsyncSeq<'T1> -> source2:AsyncSeq<'T2> -> Async<bool>

/// Asynchronously determine if any corresponding pair of elements in two async sequences satisfies
/// the async predicate. Evaluates pairwise up to the shorter of the two sequences; short-circuits on first match.
val exists2Async : predicate:('T1 -> 'T2 -> Async<bool>) -> source1:AsyncSeq<'T1> -> source2:AsyncSeq<'T2> -> Async<bool>

/// Asynchronously determine if all corresponding pairs of elements in two async sequences satisfy
/// the predicate. Evaluates pairwise up to the shorter of the two sequences; short-circuits on first failure.
/// Mirrors <c>Seq.forall2</c>.
val forall2 : predicate:('T1 -> 'T2 -> bool) -> source1:AsyncSeq<'T1> -> source2:AsyncSeq<'T2> -> Async<bool>

/// Asynchronously determine if all corresponding pairs of elements in two async sequences satisfy
/// the async predicate. Evaluates pairwise up to the shorter of the two sequences; short-circuits on first failure.
val forall2Async : predicate:('T1 -> 'T2 -> Async<bool>) -> source1:AsyncSeq<'T1> -> source2:AsyncSeq<'T2> -> Async<bool>

/// Compares two async sequences lexicographically using the given synchronous comparison function.
/// Returns a negative integer if source1 < source2, 0 if equal, and a positive integer if source1 > source2.
val compareWith : comparer:('T -> 'T -> int) -> source1:AsyncSeq<'T> -> source2:AsyncSeq<'T> -> Async<int>
Expand Down
104 changes: 104 additions & 0 deletions tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -2094,7 +2094,7 @@
let actual =
ls
|> AsyncSeq.ofSeq
|> AsyncSeq.groupBy p

Check warning on line 2097 in tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupBy must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
|> AsyncSeq.mapAsyncParallel (snd >> AsyncSeq.toListAsync)
Assert.AreEqual(expected, actual)

Expand All @@ -2103,7 +2103,7 @@
let expected = asyncSeq { raise (exn("test")) }
let actual =
asyncSeq { raise (exn("test")) }
|> AsyncSeq.groupBy (fun i -> i % 3)

Check warning on line 2106 in tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupBy must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
|> AsyncSeq.mapAsyncParallel (snd >> AsyncSeq.toListAsync)
Assert.AreEqual(expected, actual)

Expand Down Expand Up @@ -4596,7 +4596,7 @@
let ``AsyncSeq.groupByAsync groups elements by async projection`` () =
let result =
AsyncSeq.ofSeq [1..6]
|> AsyncSeq.groupByAsync (fun x -> async { return x % 2 })

Check warning on line 4599 in tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupByAsync must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
|> AsyncSeq.mapAsyncParallel (fun (key, grp) -> async {
let! items = AsyncSeq.toArrayAsync grp
return key, Array.sort items })
Expand All @@ -4609,7 +4609,7 @@
let ``AsyncSeq.groupByAsync on empty sequence returns empty`` () =
let result =
AsyncSeq.empty<int>
|> AsyncSeq.groupByAsync (fun x -> async { return x % 2 })

Check warning on line 4612 in tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupByAsync must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
|> AsyncSeq.toArrayAsync
|> Async.RunSynchronously
Assert.AreEqual([||], result)
Expand All @@ -4618,7 +4618,7 @@
let ``AsyncSeq.groupByAsync with all-same key produces single group`` () =
let result =
AsyncSeq.ofSeq [1; 2; 3]
|> AsyncSeq.groupByAsync (fun _ -> async { return "same" })

Check warning on line 4621 in tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupByAsync must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
|> AsyncSeq.mapAsyncParallel (fun (key, grp) -> async {
let! items = AsyncSeq.toArrayAsync grp
return key, Array.sort items })
Expand Down Expand Up @@ -4728,3 +4728,107 @@
with _ -> yield 42 }
let result = s |> AsyncSeq.toArrayAsync |> Async.RunSynchronously
Assert.AreEqual([| 42 |], result)

// ===== exists2 / exists2Async =====

[<Test>]
let ``AsyncSeq.exists2 returns true when a matching pair exists`` () =
let result =
AsyncSeq.exists2 (=) (AsyncSeq.ofSeq [1;2;3]) (AsyncSeq.ofSeq [0;2;0])
|> Async.RunSynchronously
Assert.IsTrue(result)

[<Test>]
let ``AsyncSeq.exists2 returns false when no matching pair exists`` () =
let result =
AsyncSeq.exists2 (=) (AsyncSeq.ofSeq [1;2;3]) (AsyncSeq.ofSeq [4;5;6])
|> Async.RunSynchronously
Assert.IsFalse(result)

[<Test>]
let ``AsyncSeq.exists2 on empty sequences returns false`` () =
let result =
AsyncSeq.exists2 (=) AsyncSeq.empty<int> AsyncSeq.empty<int>
|> Async.RunSynchronously
Assert.IsFalse(result)

[<Test>]
let ``AsyncSeq.exists2 short-circuits on first match`` () =
let count = ref 0
let result =
AsyncSeq.exists2
(fun a b -> incr count; a = b)
(AsyncSeq.ofSeq [1;2;3;4;5])
(AsyncSeq.ofSeq [0;2;0;0;0])
|> Async.RunSynchronously
Assert.IsTrue(result)
Assert.AreEqual(2, count.Value) // stopped after second pair

[<Test>]
let ``AsyncSeq.exists2 stops at shorter sequence`` () =
let result =
AsyncSeq.exists2 (=) (AsyncSeq.ofSeq [1;2]) (AsyncSeq.ofSeq [3;4;1])
|> Async.RunSynchronously
Assert.IsFalse(result) // shorter seq ends before (1,1) can match

[<Test>]
let ``AsyncSeq.exists2Async returns true with async predicate`` () =
let result =
AsyncSeq.exists2Async
(fun a b -> async { return a = b })
(AsyncSeq.ofSeq [1;2;3])
(AsyncSeq.ofSeq [0;2;0])
|> Async.RunSynchronously
Assert.IsTrue(result)

// ===== forall2 / forall2Async =====

[<Test>]
let ``AsyncSeq.forall2 returns true when all pairs satisfy the predicate`` () =
let result =
AsyncSeq.forall2 (=) (AsyncSeq.ofSeq [1;2;3]) (AsyncSeq.ofSeq [1;2;3])
|> Async.RunSynchronously
Assert.IsTrue(result)

[<Test>]
let ``AsyncSeq.forall2 returns false when a pair fails`` () =
let result =
AsyncSeq.forall2 (=) (AsyncSeq.ofSeq [1;2;3]) (AsyncSeq.ofSeq [1;9;3])
|> Async.RunSynchronously
Assert.IsFalse(result)

[<Test>]
let ``AsyncSeq.forall2 on empty sequences returns true`` () =
let result =
AsyncSeq.forall2 (=) AsyncSeq.empty<int> AsyncSeq.empty<int>
|> Async.RunSynchronously
Assert.IsTrue(result)

[<Test>]
let ``AsyncSeq.forall2 short-circuits on first failure`` () =
let count = ref 0
let result =
AsyncSeq.forall2
(fun a b -> incr count; a = b)
(AsyncSeq.ofSeq [1;9;3;4;5])
(AsyncSeq.ofSeq [1;2;3;4;5])
|> Async.RunSynchronously
Assert.IsFalse(result)
Assert.AreEqual(2, count.Value) // stopped after second pair

[<Test>]
let ``AsyncSeq.forall2 stops at shorter sequence`` () =
let result =
AsyncSeq.forall2 (=) (AsyncSeq.ofSeq [1;2]) (AsyncSeq.ofSeq [1;2;99])
|> Async.RunSynchronously
Assert.IsTrue(result) // stops when shorter seq ends; all checked pairs passed

[<Test>]
let ``AsyncSeq.forall2Async returns true with async predicate`` () =
let result =
AsyncSeq.forall2Async
(fun a b -> async { return a = b })
(AsyncSeq.ofSeq [1;2;3])
(AsyncSeq.ofSeq [1;2;3])
|> Async.RunSynchronously
Assert.IsTrue(result)
Loading