Skip to content
Open
11 changes: 10 additions & 1 deletion apis/fluentbit/v1alpha2/plugins/output/kafka_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package output

import (
"fmt"
"strings"

"github.com/fluent/fluent-operator/v3/apis/fluentbit/v1alpha2/plugins"
"github.com/fluent/fluent-operator/v3/apis/fluentbit/v1alpha2/plugins/params"
Expand All @@ -12,8 +13,11 @@ import (
// Kafka output plugin allows to ingest your records into an Apache Kafka service. <br />
// **For full documentation, refer to https://docs.fluentbit.io/manual/pipeline/outputs/kafka**
type Kafka struct {
// Specify data format, options available: json, msgpack.
// Specify data format, options available: json, msgpack, raw.
Format string `json:"format,omitempty"`
Comment thread
Vaibhav-C-S marked this conversation as resolved.
// When using format: raw, the value of the record field specified by rawLogKey
// (Fluent Bit option: Raw_Log_Key) is sent to Kafka as the payload.
RawLogKey string `json:"rawLogKey,omitempty"`
// Optional key to store the message
MessageKey string `json:"messageKey,omitempty"`
// If set, the value of Message_Key_Field in the record will indicate the message key.
Expand Down Expand Up @@ -60,6 +64,11 @@ func (k *Kafka) Params(_ plugins.SecretLoader) (*params.KVs, error) {
if k.Format != "" {
kvs.Insert("Format", k.Format)
}
if k.RawLogKey != "" {
kvs.Insert("Raw_Log_Key", k.RawLogKey)
} else if strings.EqualFold(k.Format, "raw") {
return nil, fmt.Errorf("rawLogKey is required when format is raw")
}
Comment thread
Vaibhav-C-S marked this conversation as resolved.
Comment thread
Vaibhav-C-S marked this conversation as resolved.
if k.MessageKey != "" {
kvs.Insert("Message_Key", k.MessageKey)
}
Expand Down
46 changes: 46 additions & 0 deletions apis/fluentbit/v1alpha2/plugins/output/kafka_types_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package output

import (
"testing"

"github.com/fluent/fluent-operator/v3/apis/fluentbit/v1alpha2/plugins"
"github.com/fluent/fluent-operator/v3/apis/fluentbit/v1alpha2/plugins/params"
. "github.com/onsi/gomega"
)

func TestOutput_Kafka_Params(t *testing.T) {
g := NewWithT(t)

sl := plugins.NewSecretLoader(nil, "test namespace")

kafka := Kafka{
Format: "raw",
RawLogKey: "message",
Brokers: "kafka:9092",
Topics: "logs",
}

expected := params.NewKVs()
expected.Insert("Format", "raw")
expected.Insert("Raw_Log_Key", "message")
expected.Insert("Brokers", "kafka:9092")
expected.Insert("Topics", "logs")

kvs, err := kafka.Params(sl)
g.Expect(err).NotTo(HaveOccurred())
g.Expect(kvs).To(Equal(expected))
}

func TestOutput_Kafka_ParamsRequiresRawLogKeyForRawFormat(t *testing.T) {
g := NewWithT(t)

sl := plugins.NewSecretLoader(nil, "test namespace")

kafka := Kafka{
Format: "RAW",
}

kvs, err := kafka.Params(sl)
g.Expect(err).To(MatchError("rawLogKey is required when format is raw"))
g.Expect(kvs).To(BeNil())
}
Original file line number Diff line number Diff line change
Expand Up @@ -2258,7 +2258,8 @@ spec:
So in Topics only a default topic needs to be configured
type: boolean
format:
description: 'Specify data format, options available: json, msgpack.'
description: 'Specify data format, options available: json, msgpack,
raw.'
type: string
messageKey:
description: Optional key to store the message
Expand All @@ -2277,6 +2278,11 @@ spec:
Setting the queue_full_retries value to 0 set's an unlimited number of retries.
format: int64
type: integer
rawLogKey:
description: |-
When using format: raw, the value of the record field specified by rawLogKey
(Fluent Bit option: Raw_Log_Key) is sent to Kafka as the payload.
type: string
rdkafka:
additionalProperties:
type: string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2258,7 +2258,8 @@ spec:
So in Topics only a default topic needs to be configured
type: boolean
format:
description: 'Specify data format, options available: json, msgpack.'
description: 'Specify data format, options available: json, msgpack,
raw.'
type: string
messageKey:
description: Optional key to store the message
Expand All @@ -2277,6 +2278,11 @@ spec:
Setting the queue_full_retries value to 0 set's an unlimited number of retries.
format: int64
type: integer
rawLogKey:
description: |-
When using format: raw, the value of the record field specified by rawLogKey
(Fluent Bit option: Raw_Log_Key) is sent to Kafka as the payload.
type: string
rdkafka:
additionalProperties:
type: string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2256,7 +2256,8 @@ spec:
So in Topics only a default topic needs to be configured
type: boolean
format:
description: 'Specify data format, options available: json, msgpack.'
description: 'Specify data format, options available: json, msgpack,
raw.'
type: string
messageKey:
description: Optional key to store the message
Expand All @@ -2275,6 +2276,11 @@ spec:
Setting the queue_full_retries value to 0 set's an unlimited number of retries.
format: int64
type: integer
rawLogKey:
description: |-
When using format: raw, the value of the record field specified by rawLogKey
(Fluent Bit option: Raw_Log_Key) is sent to Kafka as the payload.
type: string
rdkafka:
additionalProperties:
type: string
Expand Down
8 changes: 7 additions & 1 deletion charts/fluent-operator/crds/fluentbit.fluent.io_outputs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2256,7 +2256,8 @@ spec:
So in Topics only a default topic needs to be configured
type: boolean
format:
description: 'Specify data format, options available: json, msgpack.'
description: 'Specify data format, options available: json, msgpack,
raw.'
type: string
messageKey:
description: Optional key to store the message
Expand All @@ -2275,6 +2276,11 @@ spec:
Setting the queue_full_retries value to 0 set's an unlimited number of retries.
format: int64
type: integer
rawLogKey:
description: |-
When using format: raw, the value of the record field specified by rawLogKey
(Fluent Bit option: Raw_Log_Key) is sent to Kafka as the payload.
type: string
rdkafka:
additionalProperties:
type: string
Expand Down
8 changes: 7 additions & 1 deletion config/crd/bases/fluentbit.fluent.io_clusteroutputs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2257,7 +2257,8 @@ spec:
So in Topics only a default topic needs to be configured
type: boolean
format:
description: 'Specify data format, options available: json, msgpack.'
description: 'Specify data format, options available: json, msgpack,
raw.'
type: string
messageKey:
description: Optional key to store the message
Expand All @@ -2276,6 +2277,11 @@ spec:
Setting the queue_full_retries value to 0 set's an unlimited number of retries.
format: int64
type: integer
rawLogKey:
description: |-
When using format: raw, the value of the record field specified by rawLogKey
(Fluent Bit option: Raw_Log_Key) is sent to Kafka as the payload.
type: string
rdkafka:
additionalProperties:
type: string
Expand Down
8 changes: 7 additions & 1 deletion config/crd/bases/fluentbit.fluent.io_outputs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2257,7 +2257,8 @@ spec:
So in Topics only a default topic needs to be configured
type: boolean
format:
description: 'Specify data format, options available: json, msgpack.'
description: 'Specify data format, options available: json, msgpack,
raw.'
type: string
messageKey:
description: Optional key to store the message
Expand All @@ -2276,6 +2277,11 @@ spec:
Setting the queue_full_retries value to 0 set's an unlimited number of retries.
format: int64
type: integer
rawLogKey:
description: |-
When using format: raw, the value of the record field specified by rawLogKey
(Fluent Bit option: Raw_Log_Key) is sent to Kafka as the payload.
type: string
rdkafka:
additionalProperties:
type: string
Expand Down
3 changes: 2 additions & 1 deletion docs/plugins/fluentbit/output/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ Kafka output plugin allows to ingest your records into an Apache Kafka service.

| Field | Description | Scheme |
| ----- | ----------- | ------ |
| format | Specify data format, options available: json, msgpack. | string |
| format | Specify data format, options available: json, msgpack, raw. | string |
| rawLogKey | When using format: raw, the value of the record field specified by rawLogKey (Fluent Bit option: Raw_Log_Key) is sent to Kafka as the payload. | string |
| messageKey | Optional key to store the message | string |
| messageKeyField | If set, the value of Message_Key_Field in the record will indicate the message key. If not set nor found in the record, Message_Key will be used (if set). | string |
| timestampKey | Set the key to store the record timestamp | string |
Expand Down
16 changes: 14 additions & 2 deletions manifests/setup/setup.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6457,7 +6457,8 @@ spec:
So in Topics only a default topic needs to be configured
type: boolean
format:
description: 'Specify data format, options available: json, msgpack.'
description: 'Specify data format, options available: json, msgpack,
raw.'
type: string
messageKey:
description: Optional key to store the message
Expand All @@ -6476,6 +6477,11 @@ spec:
Setting the queue_full_retries value to 0 set's an unlimited number of retries.
format: int64
type: integer
rawLogKey:
description: |-
When using format: raw, the value of the record field specified by rawLogKey
(Fluent Bit option: Raw_Log_Key) is sent to Kafka as the payload.
type: string
rdkafka:
additionalProperties:
type: string
Expand Down Expand Up @@ -37123,7 +37129,8 @@ spec:
So in Topics only a default topic needs to be configured
type: boolean
format:
description: 'Specify data format, options available: json, msgpack.'
description: 'Specify data format, options available: json, msgpack,
raw.'
type: string
messageKey:
description: Optional key to store the message
Expand All @@ -37142,6 +37149,11 @@ spec:
Setting the queue_full_retries value to 0 set's an unlimited number of retries.
format: int64
type: integer
rawLogKey:
description: |-
When using format: raw, the value of the record field specified by rawLogKey
(Fluent Bit option: Raw_Log_Key) is sent to Kafka as the payload.
type: string
rdkafka:
additionalProperties:
type: string
Expand Down