-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathEventStoreSource.fs
More file actions
96 lines (82 loc) · 3.17 KB
/
EventStoreSource.fs
File metadata and controls
96 lines (82 loc) · 3.17 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
namespace Propulsion.EventStoreDB
open System
open EventStore.ClientAPI
open Propulsion.EventStoreDB.Stats
open Propulsion.EventStoreDB.StreamReader
type StartPos =
| Start
| Continue
| Absolute of int64
type EventStoreSource =
/// Create and run an `IStreamReader`
static member Run
(
readerLogger: Serilog.ILogger,
statsLogger: Serilog.ILogger,
conn: IEventStoreConnection,
startPosition: StartPos,
checkpointer: ICheckpointer,
consumerGroup: string,
maxBatchSize: int,
sink: Propulsion.ProjectorPipeline<_>,
eventFilter: ResolvedEvent -> bool,
tailSleepInterval: TimeSpan,
statsInterval: TimeSpan,
streamType: StreamType,
?stats: IStats
)
: Async<unit>
= async {
let statsLogger =
let instanceId = Guid.NewGuid()
statsLogger
.ForContext("instanceId", string instanceId)
.ForContext("consumerGroup", consumerGroup)
let ingester : Propulsion.Ingestion.Ingester<_,_> =
sink.StartIngester (statsLogger, 0)
let stats = stats |> Option.defaultWith (fun () -> Stats (statsLogger, statsInterval) :> IStats)
let streamName, reader =
match streamType with
| StreamType.AllStream credentials ->
let reader =
AllStreamReader (
readerLogger,
conn,
checkpointer,
ingester.Submit,
eventFilter,
credentials,
consumerGroup,
maxBatchSize,
tailSleepInterval,
stats
) :> IStreamReader
AllStreamName, reader
| StreamType.NamedStream streamName ->
let reader =
NamedStreamReader (
readerLogger,
conn,
checkpointer,
ingester.Submit,
eventFilter,
streamName,
consumerGroup,
maxBatchSize,
tailSleepInterval,
stats
) :> IStreamReader
streamName, reader
try
let! position = async {
match startPosition with
| Absolute startPos -> return Nullable startPos
| Start -> return Nullable ()
| Continue -> return! checkpointer.GetPosition streamName consumerGroup
}
do! reader.Start position
with
| ex ->
readerLogger.Fatal (ex, "Exception encountered while running reader, exiting loop")
return! Async.Raise ex
}