diff --git a/decoder/csv.go b/decoder/csv.go index e941e6c29..2a3fe75c2 100644 --- a/decoder/csv.go +++ b/decoder/csv.go @@ -55,7 +55,7 @@ type CSVDecoder struct { buffersPool sync.Pool } -func NewCSVDecoder(params map[string]any) (Decoder, error) { +func NewCSVDecoder(params Params) (Decoder, error) { p, err := extractCSVParams(params) if err != nil { return nil, fmt.Errorf("can't extract params: %w", err) @@ -245,7 +245,7 @@ func (d *CSVDecoder) GenerateColumnName(i int) string { return d.params.prefix + strconv.Itoa(i) } -func extractCSVParams(params map[string]any) (CSVParams, error) { +func extractCSVParams(params Params) (CSVParams, error) { columnNames := make([]string, 0) if columnNamesRaw, ok := params[columnNamesParam]; ok { columnNamesRawSlice, ok := columnNamesRaw.([]any) diff --git a/decoder/csv_test.go b/decoder/csv_test.go index ee1afc365..e79d50817 100644 --- a/decoder/csv_test.go +++ b/decoder/csv_test.go @@ -12,7 +12,7 @@ func TestDecodeCSV(t *testing.T) { name string input string - params map[string]any + params Params want []string wantCreateErr bool @@ -31,7 +31,7 @@ func TestDecodeCSV(t *testing.T) { { name: "custom_delimiter", input: `a b "c"` + "\n", - params: map[string]any{ + params: Params{ delimiterParam: "\t", }, want: CSVRow{"a", "b", "c"}, @@ -39,7 +39,7 @@ func TestDecodeCSV(t *testing.T) { { name: "invalid_columns", input: "", - params: map[string]any{ + params: Params{ columnNamesParam: "name", }, wantCreateErr: true, @@ -47,7 +47,7 @@ func TestDecodeCSV(t *testing.T) { { name: "invalid_delimiter_1", input: "", - params: map[string]any{ + params: Params{ delimiterParam: ",,", }, wantCreateErr: true, @@ -55,7 +55,7 @@ func TestDecodeCSV(t *testing.T) { { name: "invalid_delimiter_2", input: "", - params: map[string]any{ + params: Params{ delimiterParam: "\n", }, wantCreateErr: true, @@ -105,7 +105,7 @@ func TestDecodeToJsonCSV(t *testing.T) { name string input string - params map[string]any + params Params want string wantDecodeErr bool @@ -118,7 +118,7 @@ func TestDecodeToJsonCSV(t *testing.T) { { name: "custom_columns", input: `"a","""b""","c"` + "\n", - params: map[string]any{ + params: Params{ columnNamesParam: []any{"service", "version", "info"}, }, want: `{"service":"a","version":"\"b\"","info":"c"}`, @@ -126,7 +126,7 @@ func TestDecodeToJsonCSV(t *testing.T) { { name: "custom_prefix", input: `"a";"""b""";"c"` + "\n", - params: map[string]any{ + params: Params{ prefixParam: "csv_", delimiterParam: ";", }, @@ -135,7 +135,7 @@ func TestDecodeToJsonCSV(t *testing.T) { { name: "wrong_number_of_fields_1", input: "a,b,c,d" + "\n", - params: map[string]any{ + params: Params{ columnNamesParam: []any{"column_a", "column_b", "column_c"}, }, wantDecodeErr: true, @@ -143,7 +143,7 @@ func TestDecodeToJsonCSV(t *testing.T) { { name: "wrong_number_of_fields_2", input: "a,b,c,d" + "\n", - params: map[string]any{ + params: Params{ columnNamesParam: []any{"column_a", "column_b", "column_c"}, invalidLineModeParam: "continue", prefixParam: "csv_", @@ -153,7 +153,7 @@ func TestDecodeToJsonCSV(t *testing.T) { { name: "wrong_number_of_fields_3", input: "a,b" + "\n", - params: map[string]any{ + params: Params{ columnNamesParam: []any{"column_a", "column_b", "column_c"}, invalidLineModeParam: "continue", }, diff --git a/decoder/decoder.go b/decoder/decoder.go index ca69b3795..c1e5f6538 100644 --- a/decoder/decoder.go +++ b/decoder/decoder.go @@ -1,6 +1,12 @@ package decoder -import insaneJSON "github.com/ozontech/insane-json" +import ( + "fmt" + + insaneJSON "github.com/ozontech/insane-json" +) + +type Params map[string]any type Type int @@ -23,3 +29,51 @@ type Decoder interface { DecodeToJson(root *insaneJSON.Root, data []byte) error Decode(data []byte, args ...any) (any, error) } + +func TypeFromString(s string) Type { + switch s { + case "json": + return JSON + case "raw": + return RAW + case "cri": + return CRI + case "postgres": + return POSTGRES + case "nginx_error": + return NGINX_ERROR + case "protobuf": + return PROTOBUF + case "syslog_rfc3164": + return SYSLOG_RFC3164 + case "syslog_rfc5424": + return SYSLOG_RFC5424 + case "csv": + return CSV + case "auto": + return AUTO + default: + return NO + } +} + +func New(t Type, params Params) (Decoder, error) { + switch t { + case JSON: + return NewJsonDecoder(params) + case NGINX_ERROR: + return NewNginxErrorDecoder(params) + case PROTOBUF: + return NewProtobufDecoder(params) + case SYSLOG_RFC3164: + return NewSyslogRFC3164Decoder(params) + case SYSLOG_RFC5424: + return NewSyslogRFC5424Decoder(params) + case CSV: + return NewCSVDecoder(params) + case RAW, CRI, POSTGRES, AUTO: + return nil, nil + default: + return nil, fmt.Errorf("unknown decoder type: %v", t) + } +} diff --git a/decoder/json.go b/decoder/json.go index 492cd2169..05771f489 100644 --- a/decoder/json.go +++ b/decoder/json.go @@ -30,7 +30,7 @@ type jsonDecoder struct { mu *sync.Mutex } -func NewJsonDecoder(params map[string]any) (Decoder, error) { +func NewJsonDecoder(params Params) (Decoder, error) { p, err := extractJsonParams(params) if err != nil { return nil, fmt.Errorf("can't extract params: %w", err) @@ -132,7 +132,7 @@ func (d *jsonDecoder) cutFieldsBySize(data []byte) []byte { return data } -func extractJsonParams(params map[string]any) (jsonParams, error) { +func extractJsonParams(params Params) (jsonParams, error) { maxFieldsSize := make(map[string]int) if maxFieldsSizeRaw, ok := params[jsonMaxFieldsSizeParam]; ok { maxFieldsSizeMap, ok := maxFieldsSizeRaw.(map[string]any) diff --git a/decoder/json_test.go b/decoder/json_test.go index a83b7b50e..999f4e7bf 100644 --- a/decoder/json_test.go +++ b/decoder/json_test.go @@ -18,7 +18,7 @@ func TestJson(t *testing.T) { name string input string - params map[string]any + params Params want map[string]string wantCreateErr bool @@ -39,7 +39,7 @@ func TestJson(t *testing.T) { { name: "valid_max_fields_size", input: inputJson, - params: map[string]any{ + params: Params{ jsonMaxFieldsSizeParam: map[string]any{ "": json.Number("1"), "not_exists": json.Number("100"), @@ -61,7 +61,7 @@ func TestJson(t *testing.T) { { name: "valid_max_fields_size_single", input: inputJson, - params: map[string]any{ + params: Params{ jsonMaxFieldsSizeParam: map[string]any{ "f2.f2_2.f2_2_2": json.Number("4"), }, @@ -77,14 +77,14 @@ func TestJson(t *testing.T) { }, { name: "invalid_create_1", - params: map[string]any{ + params: Params{ jsonMaxFieldsSizeParam: "not_map", }, wantCreateErr: true, }, { name: "invalid_create_2", - params: map[string]any{ + params: Params{ jsonMaxFieldsSizeParam: map[string]any{ "test": json.Number("not_num"), }, @@ -93,7 +93,7 @@ func TestJson(t *testing.T) { }, { name: "invalid_create_3", - params: map[string]any{ + params: Params{ jsonMaxFieldsSizeParam: map[string]any{ "test": json.Number("1.2"), }, diff --git a/decoder/nginx.go b/decoder/nginx.go index f0c0399e6..a2ce3b3a9 100644 --- a/decoder/nginx.go +++ b/decoder/nginx.go @@ -31,7 +31,7 @@ type nginxErrorDecoder struct { params nginxErrorParams } -func NewNginxErrorDecoder(params map[string]any) (Decoder, error) { +func NewNginxErrorDecoder(params Params) (Decoder, error) { p, err := extractNginxErrorParams(params) if err != nil { return nil, fmt.Errorf("can't extract params: %w", err) @@ -198,7 +198,7 @@ func (d *nginxErrorDecoder) extractCustomFields(data []byte) ([]byte, map[string return data, fields } -func extractNginxErrorParams(params map[string]any) (nginxErrorParams, error) { +func extractNginxErrorParams(params Params) (nginxErrorParams, error) { withCustomFields := false if withCustomFieldsRaw, ok := params[nginxWithCustomFieldsParam]; ok { withCustomFields, ok = withCustomFieldsRaw.(bool) diff --git a/decoder/nginx_test.go b/decoder/nginx_test.go index eb9d2b751..82d8301be 100644 --- a/decoder/nginx_test.go +++ b/decoder/nginx_test.go @@ -11,7 +11,7 @@ func TestNginxError(t *testing.T) { name string input string - params map[string]any + params Params want NginxErrorRow wantCreateErr bool @@ -64,7 +64,7 @@ func TestNginxError(t *testing.T) { { name: "valid_custom_fields", input: `2022/08/18 09:29:37 [error] 844935#844935: *44934601 upstream timed out (110: Operation timed out), while connecting to upstream, client: 10.125.172.251, server: , request: "POST /download HTTP/1.1", upstream: "http://10.117.246.15:84/download", host: "mpm-youtube-downloader-38.name.tldn:84", test:`, - params: map[string]any{ + params: Params{ nginxWithCustomFieldsParam: true, }, want: NginxErrorRow{ @@ -86,7 +86,7 @@ func TestNginxError(t *testing.T) { }, { name: "invalid_create", - params: map[string]any{ + params: Params{ nginxWithCustomFieldsParam: "not bool", }, wantCreateErr: true, diff --git a/decoder/protobuf.go b/decoder/protobuf.go index ea857937a..5c49b8ef6 100644 --- a/decoder/protobuf.go +++ b/decoder/protobuf.go @@ -32,7 +32,7 @@ type protobufDecoder struct { msgDesc protoreflect.MessageDescriptor } -func NewProtobufDecoder(params map[string]any) (Decoder, error) { +func NewProtobufDecoder(params Params) (Decoder, error) { p, err := extractProtobufParams(params) if err != nil { return nil, fmt.Errorf("can't extract params: %w", err) @@ -104,7 +104,7 @@ func (d *protobufDecoder) Decode(data []byte, _ ...any) (any, error) { return msgJson, nil } -func extractProtobufParams(params map[string]any) (protobufParams, error) { +func extractProtobufParams(params Params) (protobufParams, error) { fileRaw, ok := params[protoFileParam] if !ok { return protobufParams{}, fmt.Errorf("%q not set", protoFileParam) diff --git a/decoder/protobuf_test.go b/decoder/protobuf_test.go index 2a3e024a3..45c3e2259 100644 --- a/decoder/protobuf_test.go +++ b/decoder/protobuf_test.go @@ -69,7 +69,7 @@ func TestProtobuf(t *testing.T) { name string data []byte - params map[string]any + params Params want testMyMessage wantCreateErr bool @@ -78,7 +78,7 @@ func TestProtobuf(t *testing.T) { { name: "proto_file_path", data: inputData, - params: map[string]any{ + params: Params{ protoFileParam: "../testdata/proto/valid.proto", protoMessageParam: protoMessage, }, @@ -87,7 +87,7 @@ func TestProtobuf(t *testing.T) { { name: "proto_file_content", data: inputData, - params: map[string]any{ + params: Params{ protoFileParam: protoContent, protoMessageParam: protoMessage, }, @@ -96,7 +96,7 @@ func TestProtobuf(t *testing.T) { { name: "proto_file_with_imports", data: inputData, - params: map[string]any{ + params: Params{ protoFileParam: "with_imports.proto", protoMessageParam: protoMessage, protoImportPathsParam: []any{ @@ -107,14 +107,14 @@ func TestProtobuf(t *testing.T) { }, { name: "proto_file_param_not_exists", - params: map[string]any{ + params: Params{ protoMessageParam: "test", }, wantCreateErr: true, }, { name: "proto_file_param_invalid", - params: map[string]any{ + params: Params{ protoFileParam: 123, protoMessageParam: "test", }, @@ -122,14 +122,14 @@ func TestProtobuf(t *testing.T) { }, { name: "proto_message_param_not_exists", - params: map[string]any{ + params: Params{ protoFileParam: "test", }, wantCreateErr: true, }, { name: "proto_message_param_invalid", - params: map[string]any{ + params: Params{ protoFileParam: "test", protoMessageParam: 123, }, @@ -137,7 +137,7 @@ func TestProtobuf(t *testing.T) { }, { name: "compile_error", - params: map[string]any{ + params: Params{ protoFileParam: "../testdata/proto/invalid.proto", protoMessageParam: protoMessage, }, @@ -145,7 +145,7 @@ func TestProtobuf(t *testing.T) { }, { name: "message_not_found", - params: map[string]any{ + params: Params{ protoFileParam: "../testdata/proto/valid.proto", protoMessageParam: "test", }, @@ -154,7 +154,7 @@ func TestProtobuf(t *testing.T) { { name: "invalid_data", data: []byte{10, 13}, - params: map[string]any{ + params: Params{ protoFileParam: "../testdata/proto/valid.proto", protoMessageParam: protoMessage, }, diff --git a/decoder/syslog.go b/decoder/syslog.go index e88a5b3d2..594620e11 100644 --- a/decoder/syslog.go +++ b/decoder/syslog.go @@ -34,7 +34,7 @@ type syslogParams struct { severityFormat string // optional } -func extractSyslogParams(params map[string]any) (syslogParams, error) { +func extractSyslogParams(params Params) (syslogParams, error) { facilityFormat := spfNumber if facilityFormatRaw, ok := params[syslogFacilityFormatParam]; ok { facilityFormat, ok = facilityFormatRaw.(string) diff --git a/decoder/syslog_rfc3164.go b/decoder/syslog_rfc3164.go index 6389cd566..376f27322 100644 --- a/decoder/syslog_rfc3164.go +++ b/decoder/syslog_rfc3164.go @@ -23,7 +23,7 @@ type syslogRFC3164Decoder struct { params syslogParams } -func NewSyslogRFC3164Decoder(params map[string]any) (Decoder, error) { +func NewSyslogRFC3164Decoder(params Params) (Decoder, error) { p, err := extractSyslogParams(params) if err != nil { return nil, fmt.Errorf("can't extract params: %w", err) diff --git a/decoder/syslog_rfc3164_test.go b/decoder/syslog_rfc3164_test.go index ba9d29b4f..cd7d8fa8b 100644 --- a/decoder/syslog_rfc3164_test.go +++ b/decoder/syslog_rfc3164_test.go @@ -11,7 +11,7 @@ func TestSyslogRFC3164(t *testing.T) { name string input string - params map[string]any + params Params want SyslogRFC3164Row wantCreateErr bool @@ -47,7 +47,7 @@ func TestSyslogRFC3164(t *testing.T) { { name: "valid_priority_format", input: "<34>Oct 11 22:14:15 mymachine.example.com myproc[10]: 'myproc' failed on /dev/pts/8\n", - params: map[string]any{ + params: Params{ syslogFacilityFormatParam: spfString, syslogSeverityFormatParam: spfString, }, @@ -64,7 +64,7 @@ func TestSyslogRFC3164(t *testing.T) { }, { name: "invalid_create_1", - params: map[string]any{ + params: Params{ syslogFacilityFormatParam: spfString, syslogSeverityFormatParam: 123, }, @@ -72,7 +72,7 @@ func TestSyslogRFC3164(t *testing.T) { }, { name: "invalid_create_2", - params: map[string]any{ + params: Params{ syslogFacilityFormatParam: "test", }, wantCreateErr: true, diff --git a/decoder/syslog_rfc5424.go b/decoder/syslog_rfc5424.go index 2c47649a8..39cd1e1b5 100644 --- a/decoder/syslog_rfc5424.go +++ b/decoder/syslog_rfc5424.go @@ -19,7 +19,7 @@ type syslogRFC5424Decoder struct { params syslogParams } -func NewSyslogRFC5424Decoder(params map[string]any) (Decoder, error) { +func NewSyslogRFC5424Decoder(params Params) (Decoder, error) { p, err := extractSyslogParams(params) if err != nil { return nil, fmt.Errorf("can't extract params: %w", err) diff --git a/decoder/syslog_rfc5424_test.go b/decoder/syslog_rfc5424_test.go index 96c68d0fe..4c4668d96 100644 --- a/decoder/syslog_rfc5424_test.go +++ b/decoder/syslog_rfc5424_test.go @@ -12,7 +12,7 @@ func TestSyslogRFC5424(t *testing.T) { name string input string - params map[string]any + params Params want SyslogRFC5424Row wantCreateErr bool @@ -71,7 +71,7 @@ func TestSyslogRFC5424(t *testing.T) { { name: "valid_full_priority_format", input: "<165>1 2003-10-11T22:14:15.003Z mymachine.example.com myproc 10 ID47 [exampleSDID@32473 iut=\"3\" eventSource=\"Application\" eventID=\"1011\"] An application event log", - params: map[string]any{ + params: Params{ syslogFacilityFormatParam: spfString, syslogSeverityFormatParam: spfString, }, @@ -302,7 +302,7 @@ func TestSyslogRFC5424(t *testing.T) { }, { name: "invalid_create_1", - params: map[string]any{ + params: Params{ syslogFacilityFormatParam: spfString, syslogSeverityFormatParam: 123, }, @@ -310,7 +310,7 @@ func TestSyslogRFC5424(t *testing.T) { }, { name: "invalid_create_2", - params: map[string]any{ + params: Params{ syslogFacilityFormatParam: "test", }, wantCreateErr: true, diff --git a/pipeline/pipeline.go b/pipeline/pipeline.go index 2a8c63a71..9f6844913 100644 --- a/pipeline/pipeline.go +++ b/pipeline/pipeline.go @@ -88,10 +88,8 @@ type Pipeline struct { started bool settings *Settings - decoderType decoder.Type // decoder type set in the config - suggestedDecoderType decoder.Type // decoder type suggested by input plugin, it is used when config decoder is set to "auto" - decoder decoder.Decoder - initDecoderOnce *sync.Once + decoderType decoder.Type // decoder type set in the config + decoder decoder.Decoder eventPool pool streamer *streamer @@ -145,7 +143,7 @@ type Pipeline struct { type Settings struct { Decoder string - DecoderParams map[string]any + DecoderParams decoder.Params Capacity int MetaCacheSize int MaintenanceInterval time.Duration @@ -214,44 +212,18 @@ func New(name string, settings *Settings, registry *prometheus.Registry, lg *zap eventLog: make([]string, 0, 128), eventLogMu: &sync.Mutex{}, - - initDecoderOnce: &sync.Once{}, } pipeline.registerMetrics() pipeline.setDefaultMetrics() - var err error - switch settings.Decoder { - case "json": - pipeline.decoderType = decoder.JSON - pipeline.decoder, err = decoder.NewJsonDecoder(pipeline.settings.DecoderParams) - case "raw": - pipeline.decoderType = decoder.RAW - case "cri": - pipeline.decoderType = decoder.CRI - case "postgres": - pipeline.decoderType = decoder.POSTGRES - case "nginx_error": - pipeline.decoderType = decoder.NGINX_ERROR - pipeline.decoder, err = decoder.NewNginxErrorDecoder(pipeline.settings.DecoderParams) - case "protobuf": - pipeline.decoderType = decoder.PROTOBUF - pipeline.decoder, err = decoder.NewProtobufDecoder(pipeline.settings.DecoderParams) - case "syslog_rfc3164": - pipeline.decoderType = decoder.SYSLOG_RFC3164 - pipeline.decoder, err = decoder.NewSyslogRFC3164Decoder(pipeline.settings.DecoderParams) - case "syslog_rfc5424": - pipeline.decoderType = decoder.SYSLOG_RFC5424 - pipeline.decoder, err = decoder.NewSyslogRFC5424Decoder(pipeline.settings.DecoderParams) - case "csv": - pipeline.decoderType = decoder.CSV - pipeline.decoder, err = decoder.NewCSVDecoder(pipeline.settings.DecoderParams) - case "auto": - pipeline.decoderType = decoder.AUTO - default: + pipeline.decoderType = decoder.TypeFromString(settings.Decoder) + if pipeline.decoderType == decoder.NO { pipeline.logger.Fatal("unknown decoder", zap.String("decoder", settings.Decoder)) } + + var err error + pipeline.decoder, err = decoder.New(pipeline.decoderType, pipeline.settings.DecoderParams) if err != nil { pipeline.logger.Fatal("can't create decoder", zap.String("decoder", settings.Decoder), zap.Error(err)) } @@ -358,6 +330,14 @@ func (p *Pipeline) Start() { p.input.Start(p.inputInfo.Config, inputParams) + // If decoder is still set to AUTO after input plugin start, it means + // no plugin called SuggestDecoder to override it. + // In this case, the JSON decoder is used by default. + if p.decoderType == decoder.AUTO { + p.decoderType = decoder.JSON + p.decoder, _ = decoder.New(decoder.JSON, nil) + } + p.streamer.start() go p.maintenance() @@ -427,23 +407,8 @@ func (p *Pipeline) In(sourceID SourceID, sourceName string, offsets Offsets, byt row decoder.CRIRow err error ) - if p.decoderType == decoder.AUTO { - dec = p.suggestedDecoderType - if dec == decoder.NO { - dec = decoder.JSON - } - - // When config decoder is set to "auto", then we didn't create a decoder during pipeline initialization. - // It's necessary to initialize the decoder once. - if dec == decoder.JSON { - p.initDecoderOnce.Do(func() { - p.decoder, _ = decoder.NewJsonDecoder(p.settings.DecoderParams) - }) - } - } else { - dec = p.decoderType - } + dec = p.decoderType if dec == decoder.CRI { row, err = decoder.DecodeCRI(bytes) if err != nil { @@ -913,8 +878,20 @@ func (p *Pipeline) DisableStreams() { p.disableStreams = true } +// SuggestDecoder applies a decoder hint from input plugins (k8s only for now) +// when pipeline decoder is set to "auto". +// The first non-No suggestion wins and permanently replaces pipeline decoder. func (p *Pipeline) SuggestDecoder(t decoder.Type) { - p.suggestedDecoderType = t + if p.decoderType != decoder.AUTO || t == decoder.NO { + return + } + + p.decoderType = t + var err error + p.decoder, err = decoder.New(t, p.settings.DecoderParams) + if err != nil { + p.logger.Fatal("can't create decoder", zap.Error(err)) + } } func (p *Pipeline) DisableParallelism() { diff --git a/pipeline/pipeline_whitebox_test.go b/pipeline/pipeline_whitebox_test.go index 41c9f3cbd..c10e9a016 100644 --- a/pipeline/pipeline_whitebox_test.go +++ b/pipeline/pipeline_whitebox_test.go @@ -3,6 +3,7 @@ package pipeline import ( "testing" + "github.com/ozontech/file.d/decoder" "github.com/ozontech/file.d/pipeline/metadata" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/assert" @@ -224,3 +225,39 @@ func TestCheckInputBytesMetric(t *testing.T) { }) } } + +func TestSuggestDecoder(t *testing.T) { + tCases := []struct { + name string + settings *Settings + suggestType decoder.Type + expectedType decoder.Type + }{ + { + name: "first non-no suggestion wins when decoder is auto", + settings: &Settings{ + Decoder: "auto", + MetricHoldDuration: DefaultMetricHoldDuration, + }, + suggestType: decoder.CRI, + expectedType: decoder.CRI, + }, + { + name: "suggestion ignored when decoder is not auto", + settings: &Settings{ + Decoder: "json", + MetricHoldDuration: DefaultMetricHoldDuration, + }, + suggestType: decoder.CRI, + expectedType: decoder.JSON, + }, + } + + for _, tCase := range tCases { + t.Run(tCase.name, func(t *testing.T) { + p := New("file_d", tCase.settings, prometheus.NewPedanticRegistry(), zap.NewNop()) + p.SuggestDecoder(tCase.suggestType) + require.Equal(t, tCase.expectedType, p.decoderType) + }) + } +}