Skip to content

Conversation

@lvalerom
Copy link
Contributor

Description

change me!

User-facing documentation

Testing and quality

  • the change is production ready: the change is GA, or otherwise the functionality is gated by a feature flag
  • CI results are inspected

Automated testing

  • added unit tests
  • added e2e tests
  • added regression tests
  • added compatibility tests
  • modified existing tests

How I validated my change

change me!

lvalerom and others added 7 commits January 30, 2026 19:58
SafeChannel[T] encapsulates mutex, waitable signal, and channel to prevent
races between writes and closure during concurrent shutdown scenarios.

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
Rename lane implementation from 'default' to 'blocking' to better reflect
its blocking behavior when handling events with slow consumers.

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
Add Cap() method to expose the channel capacity, needed for testing
lane implementations that use SafeChannel.

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
Replace manual mutex and channel handling in blockingLane with SafeChannel
from pkg/channel. This simplifies the implementation and ensures race-free
shutdown semantics.

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
Treat negative buffer size as 0 (unbuffered) in NewSafeChannel to prevent
panics from make(chan T, negative). Add test using synctest to verify
unbuffered channel behavior.

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
Use the SafeChannel to queue incomming events and handle Consume errors
Spawns a goroutine per event per consumer to avoid creating back-pressure and blocking the entire pubsub
Consume errors are piped and handle in a separate goroutine (For now just logging errors)
Metrics added in the Publish function. Consumer metrics are handle by the consumer
Removed testify suite pattern in favor of standard Go testing.

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
@openshift-ci
Copy link

openshift-ci bot commented Jan 30, 2026

Skipping CI for Draft Pull Request.
If you want CI signal for your change, please convert it to an actual PR.
You can still manually trigger a test run with /test all

@lvalerom lvalerom changed the base branch from master to lvm/rox-32701-add-consumer-id-to-metrics January 30, 2026 21:17
Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've found 1 issue, and left some high level feedback:

  • In concurrentLane.getConsumersByTopic you’re taking the full consumerLock instead of an RLock like the blocking lane did, which unnecessarily serializes read access to the consumer map; consider switching to RLock/RUnlock here for better concurrency.
  • SafeChannel.Close blocks on waitable.Done() before closing the underlying channel; if a caller ever passes a waitable that is not guaranteed to complete, this will deadlock shutdown, so it might be worth either documenting this more strongly at call sites or adding a defensive timeout/fail-fast path.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- In `concurrentLane.getConsumersByTopic` you’re taking the full `consumerLock` instead of an `RLock` like the blocking lane did, which unnecessarily serializes read access to the consumer map; consider switching to `RLock/RUnlock` here for better concurrency.
- SafeChannel.Close blocks on `waitable.Done()` before closing the underlying channel; if a caller ever passes a waitable that is not guaranteed to complete, this will deadlock shutdown, so it might be worth either documenting this more strongly at call sites or adding a defensive timeout/fail-fast path.

## Individual Comments

### Comment 1
<location> `sensor/common/pubsub/lane/concurrent.go:78` </location>
<code_context>
+	Lane
+	size                  int
+	ch                    *channel.SafeChannel[pubsub.Event]
+	errC                  *channel.SafeChannel[error]
+	stopper               concurrency.Stopper
+	errHandlingStopSignal concurrency.Signal
</code_context>

<issue_to_address>
**issue (complexity):** Consider simplifying the concurrent lane by inlining error handling instead of using a separate error channel/goroutine and by using a read lock when looking up consumers to reduce contention.

You can simplify this implementation without losing any functionality by:

1. **Inlining error handling instead of routing through `errC` and `runHandleErr`.**
2. **Using a read lock in `getConsumersByTopic` to match the blocking lane and reduce contention.**

### 1. Remove `errC` / `runHandleErr` / `writeToErrChannel` indirection

You currently have:

- Per-consumer goroutines sending errors to `errC`
- A separate goroutine `runHandleErr` draining `errC` and logging
- Extra fields: `errC`, `errHandlingStopSignal`
- Extra lifecycle handling in `Stop`

You can preserve non-blocking per-consumer error handling by logging directly in the per-consumer goroutine and when `getConsumersByTopic` fails.

**Before (core parts):**

```go
type concurrentLane struct {
    Lane
    size                  int
    ch                    *channel.SafeChannel[pubsub.Event]
    errC                  *channel.SafeChannel[error]
    stopper               concurrency.Stopper
    errHandlingStopSignal concurrency.Signal
}

func (c *ConcurrentConfig) NewLane() pubsub.Lane {
    lane := &concurrentLane{
        Lane: Lane{
            id:            c.LaneID(),
            newConsumerFn: c.newConsumer,
            consumers:     make(map[pubsub.Topic][]pubsub.Consumer),
        },
        stopper:               concurrency.NewStopper(),
        errHandlingStopSignal: concurrency.NewSignal(),
    }
    // ...
    lane.ch = channel.NewSafeChannel[pubsub.Event](lane.size, lane.stopper.LowLevel().GetStopRequestSignal())
    lane.errC = channel.NewSafeChannel[error](0, lane.stopper.LowLevel().GetStopRequestSignal())
    go lane.run()
    go lane.runHandleErr()
    return lane
}

func (l *concurrentLane) handleEvent(event pubsub.Event) {
    defer metrics.SetQueueSize(l.id, l.ch.Len())
    consumers, err := l.getConsumersByTopic(event.Topic())
    if err != nil {
        l.writeToErrChannel(err)
        metrics.RecordConsumerOperation(l.id, event.Topic(), pubsub.NoConsumers, metrics.NoConsumers)
        return
    }
    for _, c := range consumers {
        errC := c.Consume(l.stopper.Client().Stopped(), event)
        go func() {
            select {
            case err := <-errC:
                l.writeToErrChannel(err)
            case <-l.stopper.Flow().StopRequested():
            }
        }()
    }
}

func (l *concurrentLane) runHandleErr() {
    defer l.errHandlingStopSignal.Signal()
    for {
        select {
        case <-l.stopper.Flow().StopRequested():
            return
        case err, ok := <-l.errC.Chan():
            if !ok {
                return
            }
            log.Errorf("unable to handle event: %v", err)
        }
    }
}

func (l *concurrentLane) writeToErrChannel(err error) {
    if err == nil {
        return
    }
    if err := l.errC.Write(err); err != nil {
        log.Warn("unable to write consumer error to error channel")
    }
}

func (l *concurrentLane) Stop() {
    l.stopper.Client().Stop()
    l.ch.Close()
    l.errC.Close()
    <-l.errHandlingStopSignal.Done()
    l.Lane.Stop()
}
```

**Suggested simplification:**

Inline the error handling and remove the extra error channel and signal:

```go
type concurrentLane struct {
    Lane
    size    int
    ch      *channel.SafeChannel[pubsub.Event]
    stopper concurrency.Stopper
}
```

```go
func (c *ConcurrentConfig) NewLane() pubsub.Lane {
    lane := &concurrentLane{
        Lane: Lane{
            id:            c.LaneID(),
            newConsumerFn: c.newConsumer,
            consumers:     make(map[pubsub.Topic][]pubsub.Consumer),
        },
        stopper: concurrency.NewStopper(),
    }
    for _, opt := range c.opts {
        opt(lane)
    }
    lane.ch = channel.NewSafeChannel[pubsub.Event](lane.size, lane.stopper.LowLevel().GetStopRequestSignal())
    go lane.run()
    return lane
}
```

```go
func (l *concurrentLane) handleEvent(event pubsub.Event) {
    defer metrics.SetQueueSize(l.id, l.ch.Len())

    consumers, err := l.getConsumersByTopic(event.Topic())
    if err != nil {
        // Direct error handling instead of routing via errC.
        // TODO: consider adding a callback to inform of the error
        log.Errorf("unable to handle event: %v", err)
        metrics.RecordConsumerOperation(l.id, event.Topic(), pubsub.NoConsumers, metrics.NoConsumers)
        return
    }

    for _, c := range consumers {
        errC := c.Consume(l.stopper.Client().Stopped(), event)
        go func() {
            select {
            case err := <-errC:
                if err != nil {
                    // TODO: consider adding a callback to inform of the error
                    log.Errorf("unable to handle event: %v", err)
                }
            case <-l.stopper.Flow().StopRequested():
            }
        }()
    }
}
```

```go
func (l *concurrentLane) Stop() {
    l.stopper.Client().Stop()
    l.ch.Close()
    l.Lane.Stop()
}
```

This:

- Keeps per-consumer processing concurrent and non-blocking.
- Preserves error logging semantics (still logs all non-nil errors).
- Removes `errC`, `runHandleErr`, `writeToErrChannel`, and `errHandlingStopSignal`, reducing lifecycle/shutdown complexity.

If you need pluggable error handling later, you can add an optional callback field instead of an extra channel/goroutine.

### 2. Use read lock in `getConsumersByTopic`

`getConsumersByTopic` only reads `l.consumers` but currently takes the write lock. To align with the blocking lane and reduce contention:

**Before:**

```go
func (l *concurrentLane) getConsumersByTopic(topic pubsub.Topic) ([]pubsub.Consumer, error) {
    l.consumerLock.Lock()
    defer l.consumerLock.Unlock()

    consumers, ok := l.consumers[topic]
    if !ok {
        return nil, errors.Wrap(pubsubErrors.NewConsumersNotFoundForTopicErr(topic, l.id), "unable to handle event")
    }
    return consumers, nil
}
```

**After (assuming `consumerLock` is an `RWMutex` as in the blocking lane):**

```go
func (l *concurrentLane) getConsumersByTopic(topic pubsub.Topic) ([]pubsub.Consumer, error) {
    l.consumerLock.RLock()
    defer l.consumerLock.RUnlock()

    consumers, ok := l.consumers[topic]
    if !ok {
        return nil, errors.Wrap(pubsubErrors.NewConsumersNotFoundForTopicErr(topic, l.id), "unable to handle event")
    }
    return consumers, nil
}
```

This keeps the API and behavior identical but reduces lock contention and makes the concurrent lane’s locking model consistent with the existing one.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Lane
size int
ch *channel.SafeChannel[pubsub.Event]
errC *channel.SafeChannel[error]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): Consider simplifying the concurrent lane by inlining error handling instead of using a separate error channel/goroutine and by using a read lock when looking up consumers to reduce contention.

You can simplify this implementation without losing any functionality by:

  1. Inlining error handling instead of routing through errC and runHandleErr.
  2. Using a read lock in getConsumersByTopic to match the blocking lane and reduce contention.

1. Remove errC / runHandleErr / writeToErrChannel indirection

You currently have:

  • Per-consumer goroutines sending errors to errC
  • A separate goroutine runHandleErr draining errC and logging
  • Extra fields: errC, errHandlingStopSignal
  • Extra lifecycle handling in Stop

You can preserve non-blocking per-consumer error handling by logging directly in the per-consumer goroutine and when getConsumersByTopic fails.

Before (core parts):

type concurrentLane struct {
    Lane
    size                  int
    ch                    *channel.SafeChannel[pubsub.Event]
    errC                  *channel.SafeChannel[error]
    stopper               concurrency.Stopper
    errHandlingStopSignal concurrency.Signal
}

func (c *ConcurrentConfig) NewLane() pubsub.Lane {
    lane := &concurrentLane{
        Lane: Lane{
            id:            c.LaneID(),
            newConsumerFn: c.newConsumer,
            consumers:     make(map[pubsub.Topic][]pubsub.Consumer),
        },
        stopper:               concurrency.NewStopper(),
        errHandlingStopSignal: concurrency.NewSignal(),
    }
    // ...
    lane.ch = channel.NewSafeChannel[pubsub.Event](lane.size, lane.stopper.LowLevel().GetStopRequestSignal())
    lane.errC = channel.NewSafeChannel[error](0, lane.stopper.LowLevel().GetStopRequestSignal())
    go lane.run()
    go lane.runHandleErr()
    return lane
}

func (l *concurrentLane) handleEvent(event pubsub.Event) {
    defer metrics.SetQueueSize(l.id, l.ch.Len())
    consumers, err := l.getConsumersByTopic(event.Topic())
    if err != nil {
        l.writeToErrChannel(err)
        metrics.RecordConsumerOperation(l.id, event.Topic(), pubsub.NoConsumers, metrics.NoConsumers)
        return
    }
    for _, c := range consumers {
        errC := c.Consume(l.stopper.Client().Stopped(), event)
        go func() {
            select {
            case err := <-errC:
                l.writeToErrChannel(err)
            case <-l.stopper.Flow().StopRequested():
            }
        }()
    }
}

func (l *concurrentLane) runHandleErr() {
    defer l.errHandlingStopSignal.Signal()
    for {
        select {
        case <-l.stopper.Flow().StopRequested():
            return
        case err, ok := <-l.errC.Chan():
            if !ok {
                return
            }
            log.Errorf("unable to handle event: %v", err)
        }
    }
}

func (l *concurrentLane) writeToErrChannel(err error) {
    if err == nil {
        return
    }
    if err := l.errC.Write(err); err != nil {
        log.Warn("unable to write consumer error to error channel")
    }
}

func (l *concurrentLane) Stop() {
    l.stopper.Client().Stop()
    l.ch.Close()
    l.errC.Close()
    <-l.errHandlingStopSignal.Done()
    l.Lane.Stop()
}

Suggested simplification:

Inline the error handling and remove the extra error channel and signal:

type concurrentLane struct {
    Lane
    size    int
    ch      *channel.SafeChannel[pubsub.Event]
    stopper concurrency.Stopper
}
func (c *ConcurrentConfig) NewLane() pubsub.Lane {
    lane := &concurrentLane{
        Lane: Lane{
            id:            c.LaneID(),
            newConsumerFn: c.newConsumer,
            consumers:     make(map[pubsub.Topic][]pubsub.Consumer),
        },
        stopper: concurrency.NewStopper(),
    }
    for _, opt := range c.opts {
        opt(lane)
    }
    lane.ch = channel.NewSafeChannel[pubsub.Event](lane.size, lane.stopper.LowLevel().GetStopRequestSignal())
    go lane.run()
    return lane
}
func (l *concurrentLane) handleEvent(event pubsub.Event) {
    defer metrics.SetQueueSize(l.id, l.ch.Len())

    consumers, err := l.getConsumersByTopic(event.Topic())
    if err != nil {
        // Direct error handling instead of routing via errC.
        // TODO: consider adding a callback to inform of the error
        log.Errorf("unable to handle event: %v", err)
        metrics.RecordConsumerOperation(l.id, event.Topic(), pubsub.NoConsumers, metrics.NoConsumers)
        return
    }

    for _, c := range consumers {
        errC := c.Consume(l.stopper.Client().Stopped(), event)
        go func() {
            select {
            case err := <-errC:
                if err != nil {
                    // TODO: consider adding a callback to inform of the error
                    log.Errorf("unable to handle event: %v", err)
                }
            case <-l.stopper.Flow().StopRequested():
            }
        }()
    }
}
func (l *concurrentLane) Stop() {
    l.stopper.Client().Stop()
    l.ch.Close()
    l.Lane.Stop()
}

This:

  • Keeps per-consumer processing concurrent and non-blocking.
  • Preserves error logging semantics (still logs all non-nil errors).
  • Removes errC, runHandleErr, writeToErrChannel, and errHandlingStopSignal, reducing lifecycle/shutdown complexity.

If you need pluggable error handling later, you can add an optional callback field instead of an extra channel/goroutine.

2. Use read lock in getConsumersByTopic

getConsumersByTopic only reads l.consumers but currently takes the write lock. To align with the blocking lane and reduce contention:

Before:

func (l *concurrentLane) getConsumersByTopic(topic pubsub.Topic) ([]pubsub.Consumer, error) {
    l.consumerLock.Lock()
    defer l.consumerLock.Unlock()

    consumers, ok := l.consumers[topic]
    if !ok {
        return nil, errors.Wrap(pubsubErrors.NewConsumersNotFoundForTopicErr(topic, l.id), "unable to handle event")
    }
    return consumers, nil
}

After (assuming consumerLock is an RWMutex as in the blocking lane):

func (l *concurrentLane) getConsumersByTopic(topic pubsub.Topic) ([]pubsub.Consumer, error) {
    l.consumerLock.RLock()
    defer l.consumerLock.RUnlock()

    consumers, ok := l.consumers[topic]
    if !ok {
        return nil, errors.Wrap(pubsubErrors.NewConsumersNotFoundForTopicErr(topic, l.id), "unable to handle event")
    }
    return consumers, nil
}

This keeps the API and behavior identical but reduces lock contention and makes the concurrent lane’s locking model consistent with the existing one.

@rhacs-bot
Copy link
Contributor

Images are ready for the commit at 49f2dfc.

To use with deploy scripts, first export MAIN_IMAGE_TAG=4.11.x-14-g49f2dfc0e6.

@codecov
Copy link

codecov bot commented Jan 30, 2026

Codecov Report

❌ Patch coverage is 96.98492% with 6 lines in your changes missing coverage. Please review.
✅ Project coverage is 49.41%. Comparing base (3e09ff1) to head (49f2dfc).

Files with missing lines Patch % Lines
sensor/common/pubsub/lane/concurrent.go 96.85% 3 Missing and 1 partial ⚠️
pkg/channel/safe.go 96.07% 2 Missing ⚠️
Additional details and impacted files
@@                             Coverage Diff                              @@
##           lvm/rox-32701-add-consumer-id-to-metrics   #18788      +/-   ##
============================================================================
+ Coverage                                     49.36%   49.41%   +0.04%     
============================================================================
  Files                                          2659     2661       +2     
  Lines                                        200656   200840     +184     
============================================================================
+ Hits                                          99063    99253     +190     
+ Misses                                        94154    94150       -4     
+ Partials                                       7439     7437       -2     
Flag Coverage Δ
go-unit-tests 49.41% <96.98%> (+0.04%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants