Skip to content
4 changes: 2 additions & 2 deletions decoder/csv.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type CSVDecoder struct {
buffersPool sync.Pool
}

func NewCSVDecoder(params map[string]any) (Decoder, error) {
func NewCSVDecoder(params Params) (Decoder, error) {
p, err := extractCSVParams(params)
if err != nil {
return nil, fmt.Errorf("can't extract params: %w", err)
Expand Down Expand Up @@ -245,7 +245,7 @@ func (d *CSVDecoder) GenerateColumnName(i int) string {
return d.params.prefix + strconv.Itoa(i)
}

func extractCSVParams(params map[string]any) (CSVParams, error) {
func extractCSVParams(params Params) (CSVParams, error) {
columnNames := make([]string, 0)
if columnNamesRaw, ok := params[columnNamesParam]; ok {
columnNamesRawSlice, ok := columnNamesRaw.([]any)
Expand Down
22 changes: 11 additions & 11 deletions decoder/csv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ func TestDecodeCSV(t *testing.T) {
name string

input string
params map[string]any
params Params

want []string
wantCreateErr bool
Expand All @@ -31,31 +31,31 @@ func TestDecodeCSV(t *testing.T) {
{
name: "custom_delimiter",
input: `a b "c"` + "\n",
params: map[string]any{
params: Params{
delimiterParam: "\t",
},
want: CSVRow{"a", "b", "c"},
},
{
name: "invalid_columns",
input: "",
params: map[string]any{
params: Params{
columnNamesParam: "name",
},
wantCreateErr: true,
},
{
name: "invalid_delimiter_1",
input: "",
params: map[string]any{
params: Params{
delimiterParam: ",,",
},
wantCreateErr: true,
},
{
name: "invalid_delimiter_2",
input: "",
params: map[string]any{
params: Params{
delimiterParam: "\n",
},
wantCreateErr: true,
Expand Down Expand Up @@ -105,7 +105,7 @@ func TestDecodeToJsonCSV(t *testing.T) {
name string

input string
params map[string]any
params Params

want string
wantDecodeErr bool
Expand All @@ -118,15 +118,15 @@ func TestDecodeToJsonCSV(t *testing.T) {
{
name: "custom_columns",
input: `"a","""b""","c"` + "\n",
params: map[string]any{
params: Params{
columnNamesParam: []any{"service", "version", "info"},
},
want: `{"service":"a","version":"\"b\"","info":"c"}`,
},
{
name: "custom_prefix",
input: `"a";"""b""";"c"` + "\n",
params: map[string]any{
params: Params{
prefixParam: "csv_",
delimiterParam: ";",
},
Expand All @@ -135,15 +135,15 @@ func TestDecodeToJsonCSV(t *testing.T) {
{
name: "wrong_number_of_fields_1",
input: "a,b,c,d" + "\n",
params: map[string]any{
params: Params{
columnNamesParam: []any{"column_a", "column_b", "column_c"},
},
wantDecodeErr: true,
},
{
name: "wrong_number_of_fields_2",
input: "a,b,c,d" + "\n",
params: map[string]any{
params: Params{
columnNamesParam: []any{"column_a", "column_b", "column_c"},
invalidLineModeParam: "continue",
prefixParam: "csv_",
Expand All @@ -153,7 +153,7 @@ func TestDecodeToJsonCSV(t *testing.T) {
{
name: "wrong_number_of_fields_3",
input: "a,b" + "\n",
params: map[string]any{
params: Params{
columnNamesParam: []any{"column_a", "column_b", "column_c"},
invalidLineModeParam: "continue",
},
Expand Down
56 changes: 55 additions & 1 deletion decoder/decoder.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
package decoder

import insaneJSON "github.com/ozontech/insane-json"
import (
"fmt"

insaneJSON "github.com/ozontech/insane-json"
)

type Params map[string]any

type Type int

Expand All @@ -23,3 +29,51 @@ type Decoder interface {
DecodeToJson(root *insaneJSON.Root, data []byte) error
Decode(data []byte, args ...any) (any, error)
}

func TypeFromString(s string) Type {
switch s {
case "json":
return JSON
case "raw":
return RAW
case "cri":
return CRI
case "postgres":
return POSTGRES
case "nginx_error":
return NGINX_ERROR
case "protobuf":
return PROTOBUF
case "syslog_rfc3164":
return SYSLOG_RFC3164
case "syslog_rfc5424":
return SYSLOG_RFC5424
case "csv":
return CSV
case "auto":
return AUTO
default:
return NO
}
}

func New(t Type, params Params) (Decoder, error) {
switch t {
case JSON:
return NewJsonDecoder(params)
case NGINX_ERROR:
return NewNginxErrorDecoder(params)
case PROTOBUF:
return NewProtobufDecoder(params)
case SYSLOG_RFC3164:
return NewSyslogRFC3164Decoder(params)
case SYSLOG_RFC5424:
return NewSyslogRFC5424Decoder(params)
case CSV:
return NewCSVDecoder(params)
case RAW, CRI, POSTGRES, AUTO:
return nil, nil
default:
return nil, fmt.Errorf("unknown decoder type: %v", t)
}
}
4 changes: 2 additions & 2 deletions decoder/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type jsonDecoder struct {
mu *sync.Mutex
}

func NewJsonDecoder(params map[string]any) (Decoder, error) {
func NewJsonDecoder(params Params) (Decoder, error) {
p, err := extractJsonParams(params)
if err != nil {
return nil, fmt.Errorf("can't extract params: %w", err)
Expand Down Expand Up @@ -132,7 +132,7 @@ func (d *jsonDecoder) cutFieldsBySize(data []byte) []byte {
return data
}

func extractJsonParams(params map[string]any) (jsonParams, error) {
func extractJsonParams(params Params) (jsonParams, error) {
maxFieldsSize := make(map[string]int)
if maxFieldsSizeRaw, ok := params[jsonMaxFieldsSizeParam]; ok {
maxFieldsSizeMap, ok := maxFieldsSizeRaw.(map[string]any)
Expand Down
12 changes: 6 additions & 6 deletions decoder/json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func TestJson(t *testing.T) {
name string

input string
params map[string]any
params Params

want map[string]string
wantCreateErr bool
Expand All @@ -39,7 +39,7 @@ func TestJson(t *testing.T) {
{
name: "valid_max_fields_size",
input: inputJson,
params: map[string]any{
params: Params{
jsonMaxFieldsSizeParam: map[string]any{
"": json.Number("1"),
"not_exists": json.Number("100"),
Expand All @@ -61,7 +61,7 @@ func TestJson(t *testing.T) {
{
name: "valid_max_fields_size_single",
input: inputJson,
params: map[string]any{
params: Params{
jsonMaxFieldsSizeParam: map[string]any{
"f2.f2_2.f2_2_2": json.Number("4"),
},
Expand All @@ -77,14 +77,14 @@ func TestJson(t *testing.T) {
},
{
name: "invalid_create_1",
params: map[string]any{
params: Params{
jsonMaxFieldsSizeParam: "not_map",
},
wantCreateErr: true,
},
{
name: "invalid_create_2",
params: map[string]any{
params: Params{
jsonMaxFieldsSizeParam: map[string]any{
"test": json.Number("not_num"),
},
Expand All @@ -93,7 +93,7 @@ func TestJson(t *testing.T) {
},
{
name: "invalid_create_3",
params: map[string]any{
params: Params{
jsonMaxFieldsSizeParam: map[string]any{
"test": json.Number("1.2"),
},
Expand Down
4 changes: 2 additions & 2 deletions decoder/nginx.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type nginxErrorDecoder struct {
params nginxErrorParams
}

func NewNginxErrorDecoder(params map[string]any) (Decoder, error) {
func NewNginxErrorDecoder(params Params) (Decoder, error) {
p, err := extractNginxErrorParams(params)
if err != nil {
return nil, fmt.Errorf("can't extract params: %w", err)
Expand Down Expand Up @@ -198,7 +198,7 @@ func (d *nginxErrorDecoder) extractCustomFields(data []byte) ([]byte, map[string
return data, fields
}

func extractNginxErrorParams(params map[string]any) (nginxErrorParams, error) {
func extractNginxErrorParams(params Params) (nginxErrorParams, error) {
withCustomFields := false
if withCustomFieldsRaw, ok := params[nginxWithCustomFieldsParam]; ok {
withCustomFields, ok = withCustomFieldsRaw.(bool)
Expand Down
6 changes: 3 additions & 3 deletions decoder/nginx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ func TestNginxError(t *testing.T) {
name string

input string
params map[string]any
params Params

want NginxErrorRow
wantCreateErr bool
Expand Down Expand Up @@ -64,7 +64,7 @@ func TestNginxError(t *testing.T) {
{
name: "valid_custom_fields",
input: `2022/08/18 09:29:37 [error] 844935#844935: *44934601 upstream timed out (110: Operation timed out), while connecting to upstream, client: 10.125.172.251, server: , request: "POST /download HTTP/1.1", upstream: "http://10.117.246.15:84/download", host: "mpm-youtube-downloader-38.name.tldn:84", test:`,
params: map[string]any{
params: Params{
nginxWithCustomFieldsParam: true,
},
want: NginxErrorRow{
Expand All @@ -86,7 +86,7 @@ func TestNginxError(t *testing.T) {
},
{
name: "invalid_create",
params: map[string]any{
params: Params{
nginxWithCustomFieldsParam: "not bool",
},
wantCreateErr: true,
Expand Down
4 changes: 2 additions & 2 deletions decoder/protobuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type protobufDecoder struct {
msgDesc protoreflect.MessageDescriptor
}

func NewProtobufDecoder(params map[string]any) (Decoder, error) {
func NewProtobufDecoder(params Params) (Decoder, error) {
p, err := extractProtobufParams(params)
if err != nil {
return nil, fmt.Errorf("can't extract params: %w", err)
Expand Down Expand Up @@ -104,7 +104,7 @@ func (d *protobufDecoder) Decode(data []byte, _ ...any) (any, error) {
return msgJson, nil
}

func extractProtobufParams(params map[string]any) (protobufParams, error) {
func extractProtobufParams(params Params) (protobufParams, error) {
fileRaw, ok := params[protoFileParam]
if !ok {
return protobufParams{}, fmt.Errorf("%q not set", protoFileParam)
Expand Down
Loading