-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathNamedStreamReader.fs
More file actions
121 lines (99 loc) · 4.66 KB
/
NamedStreamReader.fs
File metadata and controls
121 lines (99 loc) · 4.66 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
namespace Propulsion.EventStoreDB
open System
open EventStore.ClientAPI
open Microsoft.FSharp.Control
open Propulsion.EventStoreDB.Stats
open Propulsion.EventStoreDB.StreamReader
/// TODO: This is currently untested
type NamedStreamReader
(
logger: Serilog.ILogger,
conn: IEventStoreConnection,
checkpointer: ICheckpointer,
submitBatch: SubmitBatchHandler,
eventFilter: ResolvedEvent -> bool,
streamName,
consumerGroup,
maxBatchSize: int,
tailSleepInterval: TimeSpan,
stats: IStats
) =
member this.Commit position = async {
try
do! checkpointer.CommitPosition streamName consumerGroup position
stats.UpdateCommitedPosition position
logger.Warning ("Committed position {position}", position)
with
| ex ->
logger.Warning (ex, "Exception while commiting position {position}", position)
return! Async.Raise ex
}
member this.ProcessEvents (events: ResolvedEvent []) isEndOfStream = async {
match events |> Array.filter eventFilter with
| events when Array.isEmpty events ->
logger.Debug ("Empty batch retrieved")
stats.UpdateEmptySlice ()
| events ->
let streamEvents =
events
|> Seq.map streamEventFromResolvedEvent
|> Array.ofSeq
let batch =
{
events = streamEvents
firstPosition = events |> Array.head |> getEventNumber
lastPosition = events |> Array.last |> getEventNumber
isEnd = isEndOfStream
}
logger.Debug ("Submitting a batch of {batchSize} events, position {firstPosition} through {lastPosition}",
batch.Length, batch.firstPosition, batch.lastPosition)
do stats.UpdateBatch batch
let! cur, max =
submitBatch (batch.lastPosition, this.Commit batch.lastPosition, batch.events)
do stats.UpdateCurMax cur max
}
interface IStreamReader with
member this.Start (committedPosition: Nullable<int64>) = async {
// Start reporting stats
do! Async.StartChild (stats.Start ()) |> Async.Ignore
let mutable work =
if committedPosition.HasValue then
let position = committedPosition.Value
logger.Information (
"Continuing reading stream {streamName} from position {position}, maxBatchSize {maxBatchSize}",
streamName, position, maxBatchSize)
NamedStreamWork.TakeInitial position
else
let position = int64 StreamPosition.Start
logger.Information (
"Starting reading stream {streamName} from the beginning, maxBatchSize {maxBatchSize}",
streamName, maxBatchSize)
NamedStreamWork.TakeInitial position
let! ct = Async.CancellationToken
while not ct.IsCancellationRequested do
let! eventsSlice =
match work with
| NamedStreamWork.TakeInitial position ->
async {
let! eventsSlice =
conn.ReadStreamEventsForwardAsync (streamName, position, maxBatchSize, true)
|> Async.AwaitTaskCorrect
/// Filter the event with the same position as what we're continuing from
let events =
eventsSlice.Events
|> Array.filter (fun event -> not (event.OriginalPosition.Value.CommitPosition = position))
do! this.ProcessEvents events eventsSlice.IsEndOfStream
return eventsSlice
}
| NamedStreamWork.TakeNext position ->
async {
let! eventsSlice =
conn.ReadStreamEventsForwardAsync (streamName, position, maxBatchSize, true)
|> Async.AwaitTaskCorrect
let events = eventsSlice.Events
do! this.ProcessEvents events eventsSlice.IsEndOfStream
return eventsSlice
}
work <- NamedStreamWork.TakeNext eventsSlice.NextEventNumber
if eventsSlice.IsEndOfStream then do! Async.Sleep tailSleepInterval
}