From e2cf3faca4fb1a4590c8d3ad23043e5f8c5ca149 Mon Sep 17 00:00:00 2001 From: Anson Date: Mon, 18 May 2026 06:06:49 +0000 Subject: [PATCH] Support more settings for firehose output Signed-off-by: Anson --- .../v1alpha2/plugins/output/firehose_types.go | 27 +++++++++ .../plugins/output/firehose_types_test.go | 55 +++++++++++++++++++ .../plugins/output/zz_generated.deepcopy.go | 25 +++++++++ .../fluentbit.fluent.io_clusteroutputs.yaml | 27 +++++++++ .../fluentbit.fluent.io_outputs.yaml | 27 +++++++++ .../fluentbit.fluent.io_clusteroutputs.yaml | 27 +++++++++ .../crds/fluentbit.fluent.io_outputs.yaml | 27 +++++++++ .../fluentbit.fluent.io_clusteroutputs.yaml | 27 +++++++++ .../bases/fluentbit.fluent.io_outputs.yaml | 27 +++++++++ docs/plugins/fluentbit/output/firehose.md | 5 ++ manifests/setup/setup.yaml | 54 ++++++++++++++++++ 11 files changed, 328 insertions(+) create mode 100644 apis/fluentbit/v1alpha2/plugins/output/firehose_types_test.go diff --git a/apis/fluentbit/v1alpha2/plugins/output/firehose_types.go b/apis/fluentbit/v1alpha2/plugins/output/firehose_types.go index 0f42a1f22..9fca36c91 100644 --- a/apis/fluentbit/v1alpha2/plugins/output/firehose_types.go +++ b/apis/fluentbit/v1alpha2/plugins/output/firehose_types.go @@ -36,6 +36,17 @@ type Firehose struct { STSEndpoint *string `json:"stsEndpoint,omitempty"` // Immediately retry failed requests to AWS services once. This option does not affect the normal Fluent Bit retry mechanism with backoff. Instead, it enables an immediate retry with no delay for networking errors, which may help improve throughput when there are transient/random networking issues. AutoRetryRequests *bool `json:"autoRetryRequests,omitempty"` + // Compression type to use when compressing the data. Valid values are: gzip, snappy, lz4, zstd. If you do not specify a compression type, the data will be sent uncompressed. + // +kubebuilder:validation:Enum=gzip;snappy;lz4;zstd + Compression *string `json:"compression,omitempty"` + // Specify an external ID for the STS API, can be used with the role_arn parameter if your role requires an external ID. + ExternalID *string `json:"externalID,omitempty"` + // Option to specify an AWS Profile for credentials. + Profile *string `json:"profile,omitempty"` + // Option to enable simple aggregation for the Firehose output plugin. + SimpleAggregation *bool `json:"simpleAggregation,omitempty"` + // Specify number of worker threads to use to output to Firehose + Workers *int32 `json:"workers,omitempty"` } // implement Section() method @@ -76,5 +87,21 @@ func (l *Firehose) Params(sl plugins.SecretLoader) (*params.KVs, error) { kvs.Insert("auto_retry_requests", strconv.FormatBool(*l.AutoRetryRequests)) } + if l.Compression != nil && *l.Compression != "" { + kvs.Insert("compression", *l.Compression) + } + if l.ExternalID != nil && *l.ExternalID != "" { + kvs.Insert("external_id", *l.ExternalID) + } + if l.SimpleAggregation != nil { + kvs.Insert("simple_aggregation", strconv.FormatBool(*l.SimpleAggregation)) + } + if l.Profile != nil && *l.Profile != "" { + kvs.Insert("profile", *l.Profile) + } + if l.Workers != nil { + kvs.Insert("workers", strconv.FormatInt(int64(*l.Workers), 10)) + } + return kvs, nil } diff --git a/apis/fluentbit/v1alpha2/plugins/output/firehose_types_test.go b/apis/fluentbit/v1alpha2/plugins/output/firehose_types_test.go new file mode 100644 index 000000000..9229d6c22 --- /dev/null +++ b/apis/fluentbit/v1alpha2/plugins/output/firehose_types_test.go @@ -0,0 +1,55 @@ +package output + +import ( + "testing" + + "github.com/fluent/fluent-operator/v3/apis/fluentbit/v1alpha2/plugins" + "github.com/fluent/fluent-operator/v3/apis/fluentbit/v1alpha2/plugins/params" + "github.com/fluent/fluent-operator/v3/pkg/utils" + "github.com/onsi/gomega" +) + +func TestOutput_Firehose_Params(t *testing.T) { + g := gomega.NewWithT(t) + + sl := plugins.NewSecretLoader(nil, "test namespace") + + fh := Firehose{ + Region: "us-east-1", + DeliveryStream: "test_stream", + TimeKey: utils.ToPtr("test_time_key"), + TimeKeyFormat: utils.ToPtr("%Y-%m-%dT%H:%M:%S.%3N"), + DataKeys: utils.ToPtr("test_data_keys"), + LogKey: utils.ToPtr("test_time_key"), + RoleARN: utils.ToPtr("arn:aws:iam:test"), + Endpoint: utils.ToPtr("test_endpoint"), + STSEndpoint: utils.ToPtr("test_sts_endpoint"), + AutoRetryRequests: utils.ToPtr(true), + ExternalID: utils.ToPtr("test_external_id"), + Compression: utils.ToPtr("gzip"), + SimpleAggregation: utils.ToPtr(true), + Profile: utils.ToPtr("my-profile"), + Workers: utils.ToPtr[int32](1), + } + + expected := params.NewKVs() + expected.Insert("region", "us-east-1") + expected.Insert("delivery_stream", "test_stream") + expected.Insert("data_keys", "test_data_keys") + expected.Insert("log_key", "test_time_key") + expected.Insert("role_arn", "arn:aws:iam:test") + expected.Insert("endpoint", "test_endpoint") + expected.Insert("sts_endpoint", "test_sts_endpoint") + expected.Insert("time_key", "test_time_key") + expected.Insert("time_key_format", "%Y-%m-%dT%H:%M:%S.%3N") + expected.Insert("auto_retry_requests", "true") + expected.Insert("compression", "gzip") + expected.Insert("external_id", "test_external_id") + expected.Insert("simple_aggregation", "true") + expected.Insert("profile", "my-profile") + expected.Insert("workers", "1") + + kvs, err := fh.Params(sl) + g.Expect(err).NotTo(gomega.HaveOccurred()) + g.Expect(kvs).To(gomega.Equal(expected)) +} diff --git a/apis/fluentbit/v1alpha2/plugins/output/zz_generated.deepcopy.go b/apis/fluentbit/v1alpha2/plugins/output/zz_generated.deepcopy.go index 1ee7a0927..8cefff824 100644 --- a/apis/fluentbit/v1alpha2/plugins/output/zz_generated.deepcopy.go +++ b/apis/fluentbit/v1alpha2/plugins/output/zz_generated.deepcopy.go @@ -297,6 +297,31 @@ func (in *Firehose) DeepCopyInto(out *Firehose) { *out = new(bool) **out = **in } + if in.Compression != nil { + in, out := &in.Compression, &out.Compression + *out = new(string) + **out = **in + } + if in.ExternalID != nil { + in, out := &in.ExternalID, &out.ExternalID + *out = new(string) + **out = **in + } + if in.Profile != nil { + in, out := &in.Profile, &out.Profile + *out = new(string) + **out = **in + } + if in.SimpleAggregation != nil { + in, out := &in.SimpleAggregation, &out.SimpleAggregation + *out = new(bool) + **out = **in + } + if in.Workers != nil { + in, out := &in.Workers, &out.Workers + *out = new(int32) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Firehose. diff --git a/charts/fluent-operator-fluent-bit-crds/templates/fluentbit.fluent.io_clusteroutputs.yaml b/charts/fluent-operator-fluent-bit-crds/templates/fluentbit.fluent.io_clusteroutputs.yaml index 38d97e179..6d33b8fd1 100644 --- a/charts/fluent-operator-fluent-bit-crds/templates/fluentbit.fluent.io_clusteroutputs.yaml +++ b/charts/fluent-operator-fluent-bit-crds/templates/fluentbit.fluent.io_clusteroutputs.yaml @@ -993,6 +993,16 @@ spec: with no delay for networking errors, which may help improve throughput when there are transient/random networking issues. type: boolean + compression: + description: 'Compression type to use when compressing the data. + Valid values are: gzip, snappy, lz4, zstd. If you do not specify + a compression type, the data will be sent uncompressed.' + enum: + - gzip + - snappy + - lz4 + - zstd + type: string dataKeys: description: By default, the whole log record will be sent to Kinesis. If you specify a key name(s) with this option, then @@ -1009,6 +1019,11 @@ spec: description: Specify a custom endpoint for the Kinesis Firehose API. type: string + externalID: + description: Specify an external ID for the STS API, can be used + with the role_arn parameter if your role requires an external + ID. + type: string logKey: description: By default, the whole log record will be sent to Firehose. If you specify a key name with this option, then only @@ -1016,12 +1031,19 @@ spec: if you are using the Fluentd Docker log driver, you can specify log_key log and only the log message will be sent to Firehose. type: string + profile: + description: Option to specify an AWS Profile for credentials. + type: string region: description: The AWS region. type: string roleARN: description: ARN of an IAM role to assume (for cross account access). type: string + simpleAggregation: + description: Option to enable simple aggregation for the Firehose + output plugin. + type: boolean stsEndpoint: description: Specify a custom endpoint for the STS API; used to assume your custom role provided with role_arn. @@ -1039,6 +1061,11 @@ spec: ECS Container Agent v1.42.0 or later, otherwise the timestamps associated with your container logs will only have second precision. type: string + workers: + description: Specify number of worker threads to use to output + to Firehose + format: int32 + type: integer required: - deliveryStream - region diff --git a/charts/fluent-operator-fluent-bit-crds/templates/fluentbit.fluent.io_outputs.yaml b/charts/fluent-operator-fluent-bit-crds/templates/fluentbit.fluent.io_outputs.yaml index db90f6cd7..332497d83 100644 --- a/charts/fluent-operator-fluent-bit-crds/templates/fluentbit.fluent.io_outputs.yaml +++ b/charts/fluent-operator-fluent-bit-crds/templates/fluentbit.fluent.io_outputs.yaml @@ -993,6 +993,16 @@ spec: with no delay for networking errors, which may help improve throughput when there are transient/random networking issues. type: boolean + compression: + description: 'Compression type to use when compressing the data. + Valid values are: gzip, snappy, lz4, zstd. If you do not specify + a compression type, the data will be sent uncompressed.' + enum: + - gzip + - snappy + - lz4 + - zstd + type: string dataKeys: description: By default, the whole log record will be sent to Kinesis. If you specify a key name(s) with this option, then @@ -1009,6 +1019,11 @@ spec: description: Specify a custom endpoint for the Kinesis Firehose API. type: string + externalID: + description: Specify an external ID for the STS API, can be used + with the role_arn parameter if your role requires an external + ID. + type: string logKey: description: By default, the whole log record will be sent to Firehose. If you specify a key name with this option, then only @@ -1016,12 +1031,19 @@ spec: if you are using the Fluentd Docker log driver, you can specify log_key log and only the log message will be sent to Firehose. type: string + profile: + description: Option to specify an AWS Profile for credentials. + type: string region: description: The AWS region. type: string roleARN: description: ARN of an IAM role to assume (for cross account access). type: string + simpleAggregation: + description: Option to enable simple aggregation for the Firehose + output plugin. + type: boolean stsEndpoint: description: Specify a custom endpoint for the STS API; used to assume your custom role provided with role_arn. @@ -1039,6 +1061,11 @@ spec: ECS Container Agent v1.42.0 or later, otherwise the timestamps associated with your container logs will only have second precision. type: string + workers: + description: Specify number of worker threads to use to output + to Firehose + format: int32 + type: integer required: - deliveryStream - region diff --git a/charts/fluent-operator/crds/fluentbit.fluent.io_clusteroutputs.yaml b/charts/fluent-operator/crds/fluentbit.fluent.io_clusteroutputs.yaml index bc82bb19b..9a64315b3 100644 --- a/charts/fluent-operator/crds/fluentbit.fluent.io_clusteroutputs.yaml +++ b/charts/fluent-operator/crds/fluentbit.fluent.io_clusteroutputs.yaml @@ -991,6 +991,16 @@ spec: with no delay for networking errors, which may help improve throughput when there are transient/random networking issues. type: boolean + compression: + description: 'Compression type to use when compressing the data. + Valid values are: gzip, snappy, lz4, zstd. If you do not specify + a compression type, the data will be sent uncompressed.' + enum: + - gzip + - snappy + - lz4 + - zstd + type: string dataKeys: description: By default, the whole log record will be sent to Kinesis. If you specify a key name(s) with this option, then @@ -1007,6 +1017,11 @@ spec: description: Specify a custom endpoint for the Kinesis Firehose API. type: string + externalID: + description: Specify an external ID for the STS API, can be used + with the role_arn parameter if your role requires an external + ID. + type: string logKey: description: By default, the whole log record will be sent to Firehose. If you specify a key name with this option, then only @@ -1014,12 +1029,19 @@ spec: if you are using the Fluentd Docker log driver, you can specify log_key log and only the log message will be sent to Firehose. type: string + profile: + description: Option to specify an AWS Profile for credentials. + type: string region: description: The AWS region. type: string roleARN: description: ARN of an IAM role to assume (for cross account access). type: string + simpleAggregation: + description: Option to enable simple aggregation for the Firehose + output plugin. + type: boolean stsEndpoint: description: Specify a custom endpoint for the STS API; used to assume your custom role provided with role_arn. @@ -1037,6 +1059,11 @@ spec: ECS Container Agent v1.42.0 or later, otherwise the timestamps associated with your container logs will only have second precision. type: string + workers: + description: Specify number of worker threads to use to output + to Firehose + format: int32 + type: integer required: - deliveryStream - region diff --git a/charts/fluent-operator/crds/fluentbit.fluent.io_outputs.yaml b/charts/fluent-operator/crds/fluentbit.fluent.io_outputs.yaml index 01d1256f6..f688e8ee9 100644 --- a/charts/fluent-operator/crds/fluentbit.fluent.io_outputs.yaml +++ b/charts/fluent-operator/crds/fluentbit.fluent.io_outputs.yaml @@ -991,6 +991,16 @@ spec: with no delay for networking errors, which may help improve throughput when there are transient/random networking issues. type: boolean + compression: + description: 'Compression type to use when compressing the data. + Valid values are: gzip, snappy, lz4, zstd. If you do not specify + a compression type, the data will be sent uncompressed.' + enum: + - gzip + - snappy + - lz4 + - zstd + type: string dataKeys: description: By default, the whole log record will be sent to Kinesis. If you specify a key name(s) with this option, then @@ -1007,6 +1017,11 @@ spec: description: Specify a custom endpoint for the Kinesis Firehose API. type: string + externalID: + description: Specify an external ID for the STS API, can be used + with the role_arn parameter if your role requires an external + ID. + type: string logKey: description: By default, the whole log record will be sent to Firehose. If you specify a key name with this option, then only @@ -1014,12 +1029,19 @@ spec: if you are using the Fluentd Docker log driver, you can specify log_key log and only the log message will be sent to Firehose. type: string + profile: + description: Option to specify an AWS Profile for credentials. + type: string region: description: The AWS region. type: string roleARN: description: ARN of an IAM role to assume (for cross account access). type: string + simpleAggregation: + description: Option to enable simple aggregation for the Firehose + output plugin. + type: boolean stsEndpoint: description: Specify a custom endpoint for the STS API; used to assume your custom role provided with role_arn. @@ -1037,6 +1059,11 @@ spec: ECS Container Agent v1.42.0 or later, otherwise the timestamps associated with your container logs will only have second precision. type: string + workers: + description: Specify number of worker threads to use to output + to Firehose + format: int32 + type: integer required: - deliveryStream - region diff --git a/config/crd/bases/fluentbit.fluent.io_clusteroutputs.yaml b/config/crd/bases/fluentbit.fluent.io_clusteroutputs.yaml index 789619883..7a0e4a229 100644 --- a/config/crd/bases/fluentbit.fluent.io_clusteroutputs.yaml +++ b/config/crd/bases/fluentbit.fluent.io_clusteroutputs.yaml @@ -992,6 +992,16 @@ spec: with no delay for networking errors, which may help improve throughput when there are transient/random networking issues. type: boolean + compression: + description: 'Compression type to use when compressing the data. + Valid values are: gzip, snappy, lz4, zstd. If you do not specify + a compression type, the data will be sent uncompressed.' + enum: + - gzip + - snappy + - lz4 + - zstd + type: string dataKeys: description: By default, the whole log record will be sent to Kinesis. If you specify a key name(s) with this option, then @@ -1008,6 +1018,11 @@ spec: description: Specify a custom endpoint for the Kinesis Firehose API. type: string + externalID: + description: Specify an external ID for the STS API, can be used + with the role_arn parameter if your role requires an external + ID. + type: string logKey: description: By default, the whole log record will be sent to Firehose. If you specify a key name with this option, then only @@ -1015,12 +1030,19 @@ spec: if you are using the Fluentd Docker log driver, you can specify log_key log and only the log message will be sent to Firehose. type: string + profile: + description: Option to specify an AWS Profile for credentials. + type: string region: description: The AWS region. type: string roleARN: description: ARN of an IAM role to assume (for cross account access). type: string + simpleAggregation: + description: Option to enable simple aggregation for the Firehose + output plugin. + type: boolean stsEndpoint: description: Specify a custom endpoint for the STS API; used to assume your custom role provided with role_arn. @@ -1038,6 +1060,11 @@ spec: ECS Container Agent v1.42.0 or later, otherwise the timestamps associated with your container logs will only have second precision. type: string + workers: + description: Specify number of worker threads to use to output + to Firehose + format: int32 + type: integer required: - deliveryStream - region diff --git a/config/crd/bases/fluentbit.fluent.io_outputs.yaml b/config/crd/bases/fluentbit.fluent.io_outputs.yaml index b9e3f8d26..d680605bb 100644 --- a/config/crd/bases/fluentbit.fluent.io_outputs.yaml +++ b/config/crd/bases/fluentbit.fluent.io_outputs.yaml @@ -992,6 +992,16 @@ spec: with no delay for networking errors, which may help improve throughput when there are transient/random networking issues. type: boolean + compression: + description: 'Compression type to use when compressing the data. + Valid values are: gzip, snappy, lz4, zstd. If you do not specify + a compression type, the data will be sent uncompressed.' + enum: + - gzip + - snappy + - lz4 + - zstd + type: string dataKeys: description: By default, the whole log record will be sent to Kinesis. If you specify a key name(s) with this option, then @@ -1008,6 +1018,11 @@ spec: description: Specify a custom endpoint for the Kinesis Firehose API. type: string + externalID: + description: Specify an external ID for the STS API, can be used + with the role_arn parameter if your role requires an external + ID. + type: string logKey: description: By default, the whole log record will be sent to Firehose. If you specify a key name with this option, then only @@ -1015,12 +1030,19 @@ spec: if you are using the Fluentd Docker log driver, you can specify log_key log and only the log message will be sent to Firehose. type: string + profile: + description: Option to specify an AWS Profile for credentials. + type: string region: description: The AWS region. type: string roleARN: description: ARN of an IAM role to assume (for cross account access). type: string + simpleAggregation: + description: Option to enable simple aggregation for the Firehose + output plugin. + type: boolean stsEndpoint: description: Specify a custom endpoint for the STS API; used to assume your custom role provided with role_arn. @@ -1038,6 +1060,11 @@ spec: ECS Container Agent v1.42.0 or later, otherwise the timestamps associated with your container logs will only have second precision. type: string + workers: + description: Specify number of worker threads to use to output + to Firehose + format: int32 + type: integer required: - deliveryStream - region diff --git a/docs/plugins/fluentbit/output/firehose.md b/docs/plugins/fluentbit/output/firehose.md index ec56ffe9d..f0ef824c3 100644 --- a/docs/plugins/fluentbit/output/firehose.md +++ b/docs/plugins/fluentbit/output/firehose.md @@ -15,3 +15,8 @@ The Firehose output plugin, allows to ingest your records into AWS Firehose.