-
Notifications
You must be signed in to change notification settings - Fork 172
ROX-32883: Implement concurrent lane #18788
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: lvm/rox-32701-add-consumer-id-to-metrics
Are you sure you want to change the base?
ROX-32883: Implement concurrent lane #18788
Conversation
SafeChannel[T] encapsulates mutex, waitable signal, and channel to prevent races between writes and closure during concurrent shutdown scenarios. Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
Rename lane implementation from 'default' to 'blocking' to better reflect its blocking behavior when handling events with slow consumers. Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
Add Cap() method to expose the channel capacity, needed for testing lane implementations that use SafeChannel. Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
Replace manual mutex and channel handling in blockingLane with SafeChannel from pkg/channel. This simplifies the implementation and ensures race-free shutdown semantics. Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
Treat negative buffer size as 0 (unbuffered) in NewSafeChannel to prevent panics from make(chan T, negative). Add test using synctest to verify unbuffered channel behavior. Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
Use the SafeChannel to queue incomming events and handle Consume errors Spawns a goroutine per event per consumer to avoid creating back-pressure and blocking the entire pubsub Consume errors are piped and handle in a separate goroutine (For now just logging errors) Metrics added in the Publish function. Consumer metrics are handle by the consumer
Removed testify suite pattern in favor of standard Go testing. Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
|
Skipping CI for Draft Pull Request. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey - I've found 1 issue, and left some high level feedback:
- In
concurrentLane.getConsumersByTopicyou’re taking the fullconsumerLockinstead of anRLocklike the blocking lane did, which unnecessarily serializes read access to the consumer map; consider switching toRLock/RUnlockhere for better concurrency. - SafeChannel.Close blocks on
waitable.Done()before closing the underlying channel; if a caller ever passes a waitable that is not guaranteed to complete, this will deadlock shutdown, so it might be worth either documenting this more strongly at call sites or adding a defensive timeout/fail-fast path.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- In `concurrentLane.getConsumersByTopic` you’re taking the full `consumerLock` instead of an `RLock` like the blocking lane did, which unnecessarily serializes read access to the consumer map; consider switching to `RLock/RUnlock` here for better concurrency.
- SafeChannel.Close blocks on `waitable.Done()` before closing the underlying channel; if a caller ever passes a waitable that is not guaranteed to complete, this will deadlock shutdown, so it might be worth either documenting this more strongly at call sites or adding a defensive timeout/fail-fast path.
## Individual Comments
### Comment 1
<location> `sensor/common/pubsub/lane/concurrent.go:78` </location>
<code_context>
+ Lane
+ size int
+ ch *channel.SafeChannel[pubsub.Event]
+ errC *channel.SafeChannel[error]
+ stopper concurrency.Stopper
+ errHandlingStopSignal concurrency.Signal
</code_context>
<issue_to_address>
**issue (complexity):** Consider simplifying the concurrent lane by inlining error handling instead of using a separate error channel/goroutine and by using a read lock when looking up consumers to reduce contention.
You can simplify this implementation without losing any functionality by:
1. **Inlining error handling instead of routing through `errC` and `runHandleErr`.**
2. **Using a read lock in `getConsumersByTopic` to match the blocking lane and reduce contention.**
### 1. Remove `errC` / `runHandleErr` / `writeToErrChannel` indirection
You currently have:
- Per-consumer goroutines sending errors to `errC`
- A separate goroutine `runHandleErr` draining `errC` and logging
- Extra fields: `errC`, `errHandlingStopSignal`
- Extra lifecycle handling in `Stop`
You can preserve non-blocking per-consumer error handling by logging directly in the per-consumer goroutine and when `getConsumersByTopic` fails.
**Before (core parts):**
```go
type concurrentLane struct {
Lane
size int
ch *channel.SafeChannel[pubsub.Event]
errC *channel.SafeChannel[error]
stopper concurrency.Stopper
errHandlingStopSignal concurrency.Signal
}
func (c *ConcurrentConfig) NewLane() pubsub.Lane {
lane := &concurrentLane{
Lane: Lane{
id: c.LaneID(),
newConsumerFn: c.newConsumer,
consumers: make(map[pubsub.Topic][]pubsub.Consumer),
},
stopper: concurrency.NewStopper(),
errHandlingStopSignal: concurrency.NewSignal(),
}
// ...
lane.ch = channel.NewSafeChannel[pubsub.Event](lane.size, lane.stopper.LowLevel().GetStopRequestSignal())
lane.errC = channel.NewSafeChannel[error](0, lane.stopper.LowLevel().GetStopRequestSignal())
go lane.run()
go lane.runHandleErr()
return lane
}
func (l *concurrentLane) handleEvent(event pubsub.Event) {
defer metrics.SetQueueSize(l.id, l.ch.Len())
consumers, err := l.getConsumersByTopic(event.Topic())
if err != nil {
l.writeToErrChannel(err)
metrics.RecordConsumerOperation(l.id, event.Topic(), pubsub.NoConsumers, metrics.NoConsumers)
return
}
for _, c := range consumers {
errC := c.Consume(l.stopper.Client().Stopped(), event)
go func() {
select {
case err := <-errC:
l.writeToErrChannel(err)
case <-l.stopper.Flow().StopRequested():
}
}()
}
}
func (l *concurrentLane) runHandleErr() {
defer l.errHandlingStopSignal.Signal()
for {
select {
case <-l.stopper.Flow().StopRequested():
return
case err, ok := <-l.errC.Chan():
if !ok {
return
}
log.Errorf("unable to handle event: %v", err)
}
}
}
func (l *concurrentLane) writeToErrChannel(err error) {
if err == nil {
return
}
if err := l.errC.Write(err); err != nil {
log.Warn("unable to write consumer error to error channel")
}
}
func (l *concurrentLane) Stop() {
l.stopper.Client().Stop()
l.ch.Close()
l.errC.Close()
<-l.errHandlingStopSignal.Done()
l.Lane.Stop()
}
```
**Suggested simplification:**
Inline the error handling and remove the extra error channel and signal:
```go
type concurrentLane struct {
Lane
size int
ch *channel.SafeChannel[pubsub.Event]
stopper concurrency.Stopper
}
```
```go
func (c *ConcurrentConfig) NewLane() pubsub.Lane {
lane := &concurrentLane{
Lane: Lane{
id: c.LaneID(),
newConsumerFn: c.newConsumer,
consumers: make(map[pubsub.Topic][]pubsub.Consumer),
},
stopper: concurrency.NewStopper(),
}
for _, opt := range c.opts {
opt(lane)
}
lane.ch = channel.NewSafeChannel[pubsub.Event](lane.size, lane.stopper.LowLevel().GetStopRequestSignal())
go lane.run()
return lane
}
```
```go
func (l *concurrentLane) handleEvent(event pubsub.Event) {
defer metrics.SetQueueSize(l.id, l.ch.Len())
consumers, err := l.getConsumersByTopic(event.Topic())
if err != nil {
// Direct error handling instead of routing via errC.
// TODO: consider adding a callback to inform of the error
log.Errorf("unable to handle event: %v", err)
metrics.RecordConsumerOperation(l.id, event.Topic(), pubsub.NoConsumers, metrics.NoConsumers)
return
}
for _, c := range consumers {
errC := c.Consume(l.stopper.Client().Stopped(), event)
go func() {
select {
case err := <-errC:
if err != nil {
// TODO: consider adding a callback to inform of the error
log.Errorf("unable to handle event: %v", err)
}
case <-l.stopper.Flow().StopRequested():
}
}()
}
}
```
```go
func (l *concurrentLane) Stop() {
l.stopper.Client().Stop()
l.ch.Close()
l.Lane.Stop()
}
```
This:
- Keeps per-consumer processing concurrent and non-blocking.
- Preserves error logging semantics (still logs all non-nil errors).
- Removes `errC`, `runHandleErr`, `writeToErrChannel`, and `errHandlingStopSignal`, reducing lifecycle/shutdown complexity.
If you need pluggable error handling later, you can add an optional callback field instead of an extra channel/goroutine.
### 2. Use read lock in `getConsumersByTopic`
`getConsumersByTopic` only reads `l.consumers` but currently takes the write lock. To align with the blocking lane and reduce contention:
**Before:**
```go
func (l *concurrentLane) getConsumersByTopic(topic pubsub.Topic) ([]pubsub.Consumer, error) {
l.consumerLock.Lock()
defer l.consumerLock.Unlock()
consumers, ok := l.consumers[topic]
if !ok {
return nil, errors.Wrap(pubsubErrors.NewConsumersNotFoundForTopicErr(topic, l.id), "unable to handle event")
}
return consumers, nil
}
```
**After (assuming `consumerLock` is an `RWMutex` as in the blocking lane):**
```go
func (l *concurrentLane) getConsumersByTopic(topic pubsub.Topic) ([]pubsub.Consumer, error) {
l.consumerLock.RLock()
defer l.consumerLock.RUnlock()
consumers, ok := l.consumers[topic]
if !ok {
return nil, errors.Wrap(pubsubErrors.NewConsumersNotFoundForTopicErr(topic, l.id), "unable to handle event")
}
return consumers, nil
}
```
This keeps the API and behavior identical but reduces lock contention and makes the concurrent lane’s locking model consistent with the existing one.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| Lane | ||
| size int | ||
| ch *channel.SafeChannel[pubsub.Event] | ||
| errC *channel.SafeChannel[error] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (complexity): Consider simplifying the concurrent lane by inlining error handling instead of using a separate error channel/goroutine and by using a read lock when looking up consumers to reduce contention.
You can simplify this implementation without losing any functionality by:
- Inlining error handling instead of routing through
errCandrunHandleErr. - Using a read lock in
getConsumersByTopicto match the blocking lane and reduce contention.
1. Remove errC / runHandleErr / writeToErrChannel indirection
You currently have:
- Per-consumer goroutines sending errors to
errC - A separate goroutine
runHandleErrdrainingerrCand logging - Extra fields:
errC,errHandlingStopSignal - Extra lifecycle handling in
Stop
You can preserve non-blocking per-consumer error handling by logging directly in the per-consumer goroutine and when getConsumersByTopic fails.
Before (core parts):
type concurrentLane struct {
Lane
size int
ch *channel.SafeChannel[pubsub.Event]
errC *channel.SafeChannel[error]
stopper concurrency.Stopper
errHandlingStopSignal concurrency.Signal
}
func (c *ConcurrentConfig) NewLane() pubsub.Lane {
lane := &concurrentLane{
Lane: Lane{
id: c.LaneID(),
newConsumerFn: c.newConsumer,
consumers: make(map[pubsub.Topic][]pubsub.Consumer),
},
stopper: concurrency.NewStopper(),
errHandlingStopSignal: concurrency.NewSignal(),
}
// ...
lane.ch = channel.NewSafeChannel[pubsub.Event](lane.size, lane.stopper.LowLevel().GetStopRequestSignal())
lane.errC = channel.NewSafeChannel[error](0, lane.stopper.LowLevel().GetStopRequestSignal())
go lane.run()
go lane.runHandleErr()
return lane
}
func (l *concurrentLane) handleEvent(event pubsub.Event) {
defer metrics.SetQueueSize(l.id, l.ch.Len())
consumers, err := l.getConsumersByTopic(event.Topic())
if err != nil {
l.writeToErrChannel(err)
metrics.RecordConsumerOperation(l.id, event.Topic(), pubsub.NoConsumers, metrics.NoConsumers)
return
}
for _, c := range consumers {
errC := c.Consume(l.stopper.Client().Stopped(), event)
go func() {
select {
case err := <-errC:
l.writeToErrChannel(err)
case <-l.stopper.Flow().StopRequested():
}
}()
}
}
func (l *concurrentLane) runHandleErr() {
defer l.errHandlingStopSignal.Signal()
for {
select {
case <-l.stopper.Flow().StopRequested():
return
case err, ok := <-l.errC.Chan():
if !ok {
return
}
log.Errorf("unable to handle event: %v", err)
}
}
}
func (l *concurrentLane) writeToErrChannel(err error) {
if err == nil {
return
}
if err := l.errC.Write(err); err != nil {
log.Warn("unable to write consumer error to error channel")
}
}
func (l *concurrentLane) Stop() {
l.stopper.Client().Stop()
l.ch.Close()
l.errC.Close()
<-l.errHandlingStopSignal.Done()
l.Lane.Stop()
}Suggested simplification:
Inline the error handling and remove the extra error channel and signal:
type concurrentLane struct {
Lane
size int
ch *channel.SafeChannel[pubsub.Event]
stopper concurrency.Stopper
}func (c *ConcurrentConfig) NewLane() pubsub.Lane {
lane := &concurrentLane{
Lane: Lane{
id: c.LaneID(),
newConsumerFn: c.newConsumer,
consumers: make(map[pubsub.Topic][]pubsub.Consumer),
},
stopper: concurrency.NewStopper(),
}
for _, opt := range c.opts {
opt(lane)
}
lane.ch = channel.NewSafeChannel[pubsub.Event](lane.size, lane.stopper.LowLevel().GetStopRequestSignal())
go lane.run()
return lane
}func (l *concurrentLane) handleEvent(event pubsub.Event) {
defer metrics.SetQueueSize(l.id, l.ch.Len())
consumers, err := l.getConsumersByTopic(event.Topic())
if err != nil {
// Direct error handling instead of routing via errC.
// TODO: consider adding a callback to inform of the error
log.Errorf("unable to handle event: %v", err)
metrics.RecordConsumerOperation(l.id, event.Topic(), pubsub.NoConsumers, metrics.NoConsumers)
return
}
for _, c := range consumers {
errC := c.Consume(l.stopper.Client().Stopped(), event)
go func() {
select {
case err := <-errC:
if err != nil {
// TODO: consider adding a callback to inform of the error
log.Errorf("unable to handle event: %v", err)
}
case <-l.stopper.Flow().StopRequested():
}
}()
}
}func (l *concurrentLane) Stop() {
l.stopper.Client().Stop()
l.ch.Close()
l.Lane.Stop()
}This:
- Keeps per-consumer processing concurrent and non-blocking.
- Preserves error logging semantics (still logs all non-nil errors).
- Removes
errC,runHandleErr,writeToErrChannel, anderrHandlingStopSignal, reducing lifecycle/shutdown complexity.
If you need pluggable error handling later, you can add an optional callback field instead of an extra channel/goroutine.
2. Use read lock in getConsumersByTopic
getConsumersByTopic only reads l.consumers but currently takes the write lock. To align with the blocking lane and reduce contention:
Before:
func (l *concurrentLane) getConsumersByTopic(topic pubsub.Topic) ([]pubsub.Consumer, error) {
l.consumerLock.Lock()
defer l.consumerLock.Unlock()
consumers, ok := l.consumers[topic]
if !ok {
return nil, errors.Wrap(pubsubErrors.NewConsumersNotFoundForTopicErr(topic, l.id), "unable to handle event")
}
return consumers, nil
}After (assuming consumerLock is an RWMutex as in the blocking lane):
func (l *concurrentLane) getConsumersByTopic(topic pubsub.Topic) ([]pubsub.Consumer, error) {
l.consumerLock.RLock()
defer l.consumerLock.RUnlock()
consumers, ok := l.consumers[topic]
if !ok {
return nil, errors.Wrap(pubsubErrors.NewConsumersNotFoundForTopicErr(topic, l.id), "unable to handle event")
}
return consumers, nil
}This keeps the API and behavior identical but reduces lock contention and makes the concurrent lane’s locking model consistent with the existing one.
|
Images are ready for the commit at 49f2dfc. To use with deploy scripts, first |
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## lvm/rox-32701-add-consumer-id-to-metrics #18788 +/- ##
============================================================================
+ Coverage 49.36% 49.41% +0.04%
============================================================================
Files 2659 2661 +2
Lines 200656 200840 +184
============================================================================
+ Hits 99063 99253 +190
+ Misses 94154 94150 -4
+ Partials 7439 7437 -2
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Description
change me!
User-facing documentation
Testing and quality
Automated testing
How I validated my change
change me!