Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ module github.com/smartcontractkit/chainlink-common

go 1.26.2

replace github.com/smartcontractkit/chainlink-common/pkg/chipingress => ./pkg/chipingress

Comment on lines +5 to +6
require (
github.com/Masterminds/semver/v3 v3.4.0
github.com/XSAM/otelsql v0.37.0
Expand Down Expand Up @@ -41,7 +43,7 @@ require (
github.com/scylladb/go-reflectx v1.0.1
github.com/shopspring/decimal v1.4.0
github.com/smartcontractkit/chain-selectors v1.0.89
github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.10
github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.11-0.20260504142219-ceccf433a82e
github.com/smartcontractkit/chainlink-protos/billing/go v0.0.0-20251024234028-0988426d98f4
github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260420204255-a3f3bdd56877
github.com/smartcontractkit/chainlink-protos/linking-service/go v0.0.0-20251002192024-d2ad9222409b
Expand Down
2 changes: 0 additions & 2 deletions go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 13 additions & 0 deletions pkg/beholder/beholdertest/beholder.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/smartcontractkit/chainlink-common/pkg/beholder"
"github.com/smartcontractkit/chainlink-common/pkg/beholder/pb"
"github.com/smartcontractkit/chainlink-common/pkg/chipingress"
)

const (
Expand Down Expand Up @@ -192,3 +193,15 @@ func (e *assertMessageEmitter) EmitMessage(_ context.Context, msg beholder.Messa

return nil
}

func (e *assertMessageEmitter) BatchEmit(_ context.Context, messages []beholder.Message, _ ...beholder.BatchEmitOption) ([]*chipingress.PublishResult, error) {
e.t.Helper()

e.mu.Lock()
defer e.mu.Unlock()

e.msgs = append(e.msgs, messages...)

return nil, nil
}

54 changes: 40 additions & 14 deletions pkg/beholder/chip_ingress_emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,15 @@ import (
"maps"

"github.com/smartcontractkit/chainlink-common/pkg/chipingress"
"google.golang.org/protobuf/proto"
)

type ChipIngressEmitter struct {
client chipingress.Client
}

var _ Emitter = (*ChipIngressEmitter)(nil)

func NewChipIngressEmitter(client chipingress.Client) (Emitter, error) {
if client == nil {
return nil, errors.New("chip ingress client is nil")
Expand All @@ -26,33 +29,56 @@ func (c *ChipIngressEmitter) Close() error {
}

func (c *ChipIngressEmitter) Emit(ctx context.Context, body []byte, attrKVs ...any) error {
sourceDomain, entityType, err := ExtractSourceAndType(attrKVs...)
if err != nil {
return err
_, err := c.BatchEmit(ctx, []Message{
NewMessage(body, attrKVs...),
})
return err
}

func (c *ChipIngressEmitter) BatchEmit(ctx context.Context, messages []Message, options ...BatchEmitOption) ([]*chipingress.PublishResult, error) {
emitOpts := DefaultBatchEmitOptions
for _, opt := range options {
opt(&emitOpts)
}

event, err := chipingress.NewEvent(sourceDomain, entityType, body, newAttributes(attrKVs...))
if err != nil {
return err
events := make([]chipingress.CloudEvent, len(messages))
for i, msg := range messages {
sourceDomain, entityType, err := ExtractSourceAndType(msg.Attrs)
if err != nil {
return nil, err
}

event, err := chipingress.NewEvent(sourceDomain, entityType, msg.Body, msg.Attrs)
if err != nil {
return nil, err
}

events[i] = event
}

eventPb, err := chipingress.EventToProto(event)
eventPb, err := chipingress.EventsToBatch(events)
if err != nil {
return fmt.Errorf("failed to convert event to proto: %w", err)
return nil, fmt.Errorf("failed to convert event to proto: %w", err)
}

eventPb.Options = &chipingress.PublishOptions{
AllOrNothing: proto.Bool(emitOpts.AllOrNothing),
}

_, err = c.client.Publish(ctx, eventPb)
response, err := c.client.PublishBatch(ctx, eventPb)
Comment on lines +64 to +68
if err != nil {
return err
return nil, err
}

return nil
if response == nil {
return nil, nil
}

return response.Results, nil
}

// ExtractSourceAndType extracts source domain and entity from the attributes
func ExtractSourceAndType(attrKVs ...any) (string, string, error) {
attributes := newAttributes(attrKVs...)

func ExtractSourceAndType(attributes Attributes) (string, string, error) {
Comment on lines 80 to +81
var sourceDomain string
var entityType string

Expand Down
6 changes: 3 additions & 3 deletions pkg/beholder/chip_ingress_emitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestChipIngressEmit(t *testing.T) {
clientMock := mocks.NewClient(t)

clientMock.
On("Publish", mock.Anything, mock.Anything).
On("PublishBatch", mock.Anything, mock.Anything).
Return(nil, nil)

emitter, err := beholder.NewChipIngressEmitter(clientMock)
Expand All @@ -66,7 +66,7 @@ func TestChipIngressEmit(t *testing.T) {
clientMock := mocks.NewClient(t)

clientMock.
On("Publish", mock.Anything, mock.Anything).
On("PublishBatch", mock.Anything, mock.Anything).
Return(nil, assert.AnError)

emitter, err := beholder.NewChipIngressEmitter(clientMock)
Expand Down Expand Up @@ -187,7 +187,7 @@ func TestExtractSourceAndType(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
domain, entity, err := beholder.ExtractSourceAndType(tt.attrs...)
domain, entity, err := beholder.ExtractSourceAndType(beholder.ExtractAttributes(tt.attrs))

if tt.wantErr {
if err == nil {
Expand Down
19 changes: 18 additions & 1 deletion pkg/beholder/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,26 @@ import (

const defaultGRPCCompressor = "gzip"

type BatchEmitOptions struct {
AllOrNothing bool
}

var DefaultBatchEmitOptions = BatchEmitOptions{
AllOrNothing: true,
}

type BatchEmitOption = func(*BatchEmitOptions)

func WithAllOrNothing(v bool) BatchEmitOption {
return func(o *BatchEmitOptions) {
o.AllOrNothing = v
}
}

type Emitter interface {
// Sends message with bytes and attributes to OTel Collector
// Emit Sends message with bytes and attributes to OTel Collector
Emit(ctx context.Context, body []byte, attrKVs ...any) error
BatchEmit(ctx context.Context, messages []Message, options ...BatchEmitOption) ([]*chipingress.PublishResult, error)
Comment thread
tarcisiozf marked this conversation as resolved.
io.Closer
}

Expand Down
18 changes: 13 additions & 5 deletions pkg/beholder/dual_source_emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"sync/atomic"

"github.com/smartcontractkit/chainlink-common/pkg/chipingress"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
)
Expand Down Expand Up @@ -55,27 +56,34 @@ func (d *DualSourceEmitter) Close() error {
}

func (d *DualSourceEmitter) Emit(ctx context.Context, body []byte, attrKVs ...any) error {
_, err := d.BatchEmit(ctx, []Message{
NewMessage(body, attrKVs...),
})
return err
}

func (d *DualSourceEmitter) BatchEmit(ctx context.Context, messages []Message, options ...BatchEmitOption) ([]*chipingress.PublishResult, error) {
// Emit via OTLP first
if err := d.otelCollectorEmitter.Emit(ctx, body, attrKVs...); err != nil {
return err
if _, err := d.otelCollectorEmitter.BatchEmit(ctx, messages, options...); err != nil {
return nil, err
}

// Emit via chip ingress async
if err := d.wg.TryAdd(1); err != nil {
return err
return nil, err
}
go func(ctx context.Context) {
defer d.wg.Done()
var cancel context.CancelFunc
ctx, cancel = d.stopCh.Ctx(ctx)
defer cancel()

if err := d.chipIngressEmitter.Emit(ctx, body, attrKVs...); err != nil {
if _, err := d.chipIngressEmitter.BatchEmit(ctx, messages, options...); err != nil {
// If the chip ingress emitter fails, we ONLY log the error
// because we still want to send the data to the OTLP collector and not cause disruption
d.log.Infof("failed to emit to chip ingress: %v", err)
}
}(context.WithoutCancel(ctx))
Comment on lines 75 to 86

return nil
return nil, nil
Comment thread
tarcisiozf marked this conversation as resolved.
}
13 changes: 13 additions & 0 deletions pkg/beholder/dual_source_emitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink-common/pkg/beholder"
"github.com/smartcontractkit/chainlink-common/pkg/chipingress"
)

func TestNewDualSourceEmitter(t *testing.T) {
Expand Down Expand Up @@ -84,3 +85,15 @@ func (m *mockEmitter) Emit(ctx context.Context, body []byte, attrKVs ...any) err
}
return nil
}

func (m *mockEmitter) BatchEmit(ctx context.Context, messages []beholder.Message, _ ...beholder.BatchEmitOption) ([]*chipingress.PublishResult, error) {
if m.emitFunc != nil {
for _, msg := range messages {
if err := m.emitFunc(ctx, msg.Body); err != nil {
return nil, err
}
}
}
return nil, nil
}

4 changes: 4 additions & 0 deletions pkg/beholder/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ func newAttributes(attrKVs ...any) Attributes {
case Attributes:
maps.Copy(a, t)
i++
case []any:
// Treat a []any element as if its contents were passed directly.
maps.Copy(a, newAttributes(t...))
i++
Comment on lines +87 to +90
case string:
if i+1 >= l {
break
Expand Down
19 changes: 14 additions & 5 deletions pkg/beholder/message_emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package beholder
import (
"context"

"github.com/smartcontractkit/chainlink-common/pkg/chipingress"
otellog "go.opentelemetry.io/otel/log"
)

Expand All @@ -22,10 +23,18 @@ func (e messageEmitter) Close() error { return nil }
// Emits logs the message, but does not wait for the message to be processed.
// Open question: what are pros/cons for using use map[]any vs use otellog.KeyValue
func (e messageEmitter) Emit(ctx context.Context, body []byte, attrKVs ...any) error {
message := NewMessage(body, attrKVs...)
if err := message.Validate(); err != nil {
return err
_, err := e.BatchEmit(ctx, []Message{
NewMessage(body, attrKVs...),
})
return err
}

func (e messageEmitter) BatchEmit(ctx context.Context, messages []Message, options ...BatchEmitOption) ([]*chipingress.PublishResult, error) {
for _, message := range messages {
if err := message.Validate(); err != nil {
return nil, err
}
e.messageLogger.Emit(ctx, message.OtelRecord())
}
e.messageLogger.Emit(ctx, message.OtelRecord())
return nil
return nil, nil
}
5 changes: 5 additions & 0 deletions pkg/beholder/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,11 @@ func (e noopMessageEmitter) Close() error { return nil }
func (noopMessageEmitter) Emit(ctx context.Context, body []byte, attrKVs ...any) error {
return nil
}

func (noopMessageEmitter) BatchEmit(ctx context.Context, messages []Message, options ...BatchEmitOption) ([]*chipingress.PublishResult, error) {
return nil, nil
}

func (noopMessageEmitter) EmitMessage(ctx context.Context, message Message) error {
return nil
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/capabilities/events/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ import (
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"

"github.com/smartcontractkit/chainlink-common/pkg/beholder"
"github.com/smartcontractkit/chainlink-common/pkg/beholder/pb"
"github.com/smartcontractkit/chainlink-common/pkg/chipingress"
)

type testEmitter struct {
Expand All @@ -25,6 +27,10 @@ func (t *testEmitter) Emit(ctx context.Context, payload []byte, attrKVs ...any)
return nil
}

func (t *testEmitter) BatchEmit(_ context.Context, _ []beholder.Message, _ ...beholder.BatchEmitOption) ([]*chipingress.PublishResult, error) {
return nil, nil
}

func TestEmitter(t *testing.T) {
client := &testEmitter{}
emitter := &Emitter{client: client}
Expand Down
2 changes: 1 addition & 1 deletion pkg/chipingress/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
go.opentelemetry.io/otel/metric v1.43.0
go.opentelemetry.io/otel/trace v1.43.0
go.uber.org/zap v1.27.0
google.golang.org/genproto/googleapis/rpc v0.0.0-20251202230838-ff82c1b0f217
google.golang.org/grpc v1.79.3
google.golang.org/protobuf v1.36.10
)
Expand All @@ -33,7 +34,6 @@ require (
golang.org/x/net v0.48.0 // indirect
golang.org/x/sys v0.42.0 // indirect
golang.org/x/text v0.32.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20251202230838-ff82c1b0f217 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

Expand Down
Loading
Loading