diff --git a/pkg/ingest/raw.go b/pkg/ingest/raw.go new file mode 100644 index 0000000..0e6300c --- /dev/null +++ b/pkg/ingest/raw.go @@ -0,0 +1,109 @@ +package ingest + +import ( + "context" + "time" + + "github.com/go-errors/errors" + "github.com/strrl/lapp/pkg/event" + "github.com/strrl/lapp/pkg/logsource" + "github.com/strrl/lapp/pkg/store" +) + +// ParseResult carries parser-derived event data plus optional storage labels. +type ParseResult struct { + Timestamp *time.Time + Attrs map[string]string + Inferred *event.Inferred + Labels map[string]string +} + +// Parser extracts structured data from a raw log line. +type Parser interface { + Parse(ctx context.Context, raw string) (*ParseResult, error) +} + +// Outcome captures the normalized event, the storage record, and any parser error. +type Outcome struct { + Event event.Event + LogEntry store.LogEntry + ParseErr error +} + +// FromRawLine normalizes a raw log line and keeps the original text as the source of truth. +// Parser failures are surfaced on the Outcome but still return a fallback event/log entry. +func FromRawLine(ctx context.Context, line *logsource.LogLine, parser Parser) (Outcome, error) { + if line == nil { + return Outcome{}, errors.New("raw log line is required") + } + + outcome := Outcome{ + Event: event.Event{ + Text: line.Content, + Attrs: map[string]string{}, + Inferred: &event.Inferred{}, + }, + LogEntry: store.LogEntry{ + LineNumber: line.LineNumber, + EndLineNumber: line.LineNumber, + Raw: line.Content, + Labels: map[string]string{}, + }, + } + + if parser == nil { + return outcome, nil + } + + parsed, parseErr := parser.Parse(ctx, line.Content) + if parseErr != nil { + outcome.ParseErr = parseErr + } else if parsed != nil { + if parsed.Timestamp != nil { + ts := *parsed.Timestamp + outcome.Event.Timestamp = &ts + outcome.LogEntry.Timestamp = ts + } + if parsed.Attrs != nil { + outcome.Event.Attrs = cloneMap(parsed.Attrs) + } + if parsed.Inferred != nil { + outcome.Event.Inferred = &event.Inferred{ + Pattern: parsed.Inferred.Pattern, + Entity: parsed.Inferred.Entity, + } + } + if parsed.Labels != nil { + outcome.LogEntry.Labels = cloneMap(parsed.Labels) + } + } + + return outcome, nil +} + +// StoreRawLine persists the raw line outcome. Parser failures never prevent storage. +func StoreRawLine(ctx context.Context, dst store.Store, line *logsource.LogLine, parser Parser) (Outcome, error) { + if dst == nil { + return Outcome{}, errors.New("store is required") + } + + outcome, err := FromRawLine(ctx, line, parser) + if err != nil { + return Outcome{}, err + } + if err := dst.InsertLog(ctx, outcome.LogEntry); err != nil { + return Outcome{}, errors.Errorf("insert raw log line: %w", err) + } + return outcome, nil +} + +func cloneMap(src map[string]string) map[string]string { + if src == nil { + return nil + } + cloned := make(map[string]string, len(src)) + for key, value := range src { + cloned[key] = value + } + return cloned +} diff --git a/pkg/ingest/raw_test.go b/pkg/ingest/raw_test.go new file mode 100644 index 0000000..2b1b3dc --- /dev/null +++ b/pkg/ingest/raw_test.go @@ -0,0 +1,168 @@ +package ingest + +import ( + "context" + stderrors "errors" + "testing" + "time" + + "github.com/strrl/lapp/pkg/event" + "github.com/strrl/lapp/pkg/logsource" + "github.com/strrl/lapp/pkg/store" +) + +type stubParser struct { + parse func(context.Context, string) (*ParseResult, error) +} + +func (s stubParser) Parse(ctx context.Context, raw string) (*ParseResult, error) { + return s.parse(ctx, raw) +} + +func newTestStore(t *testing.T) *store.DuckDBStore { + t.Helper() + + s, err := store.NewDuckDBStore("") + if err != nil { + t.Fatalf("NewDuckDBStore: %v", err) + } + t.Cleanup(func() { _ = s.Close() }) + + if err := s.Init(context.Background()); err != nil { + t.Fatalf("Init: %v", err) + } + + return s +} + +func TestFromRawLineWithoutParserUsesFallbackEvent(t *testing.T) { + line := &logsource.LogLine{ + LineNumber: 7, + Content: " WARN worker stalled for 30s ", + } + + outcome, err := FromRawLine(context.Background(), line, nil) + if err != nil { + t.Fatalf("FromRawLine: %v", err) + } + + if outcome.ParseErr != nil { + t.Fatalf("ParseErr: got %v, want nil", outcome.ParseErr) + } + if outcome.Event.Text != line.Content { + t.Fatalf("Event.Text: got %q, want %q", outcome.Event.Text, line.Content) + } + if len(outcome.Event.Attrs) != 0 { + t.Fatalf("Event.Attrs: got %v, want empty map", outcome.Event.Attrs) + } + if outcome.Event.Inferred == nil { + t.Fatal("Event.Inferred: got nil, want non-nil empty inferred") + } + if outcome.LogEntry.Raw != line.Content { + t.Fatalf("LogEntry.Raw: got %q, want %q", outcome.LogEntry.Raw, line.Content) + } + if outcome.LogEntry.LineNumber != line.LineNumber || outcome.LogEntry.EndLineNumber != line.LineNumber { + t.Fatalf("log entry line bounds: got (%d,%d), want (%d,%d)", outcome.LogEntry.LineNumber, outcome.LogEntry.EndLineNumber, line.LineNumber, line.LineNumber) + } +} + +func TestStoreRawLinePersistsWhenParserFails(t *testing.T) { + s := newTestStore(t) + line := &logsource.LogLine{ + LineNumber: 12, + Content: "ERROR checkout request req_123 timed out", + } + + outcome, err := StoreRawLine(context.Background(), s, line, stubParser{ + parse: func(context.Context, string) (*ParseResult, error) { + return nil, context.DeadlineExceeded + }, + }) + if err != nil { + t.Fatalf("StoreRawLine: %v", err) + } + if !stderrors.Is(outcome.ParseErr, context.DeadlineExceeded) { + t.Fatalf("ParseErr: got %v, want %v", outcome.ParseErr, context.DeadlineExceeded) + } + if outcome.Event.Text != line.Content { + t.Fatalf("Event.Text: got %q, want %q", outcome.Event.Text, line.Content) + } + + stored, err := s.QueryLogs(context.Background(), store.QueryOpts{}) + if err != nil { + t.Fatalf("QueryLogs: %v", err) + } + if len(stored) != 1 { + t.Fatalf("stored entries: got %d, want 1", len(stored)) + } + if stored[0].Raw != line.Content { + t.Fatalf("stored raw: got %q, want %q", stored[0].Raw, line.Content) + } +} + +func TestStoreRawLineUsesParsedFieldsButKeepsRawText(t *testing.T) { + s := newTestStore(t) + ts := time.Date(2026, 3, 10, 21, 0, 0, 0, time.UTC) + line := &logsource.LogLine{ + LineNumber: 3, + Content: "level=info service=auth-api msg=\"user user_456 authenticated\"", + } + + outcome, err := StoreRawLine(context.Background(), s, line, stubParser{ + parse: func(context.Context, string) (*ParseResult, error) { + return &ParseResult{ + Timestamp: &ts, + Attrs: map[string]string{ + "level": "info", + "service": "auth-api", + }, + Inferred: &event.Inferred{ + Pattern: "user <*> authenticated", + Entity: "auth-api", + }, + Labels: map[string]string{ + "pattern": "user-authenticated", + }, + }, nil + }, + }) + if err != nil { + t.Fatalf("StoreRawLine: %v", err) + } + + if outcome.Event.Text != line.Content { + t.Fatalf("Event.Text: got %q, want %q", outcome.Event.Text, line.Content) + } + if outcome.Event.Timestamp == nil || !outcome.Event.Timestamp.Equal(ts) { + t.Fatalf("Event.Timestamp: got %v, want %v", outcome.Event.Timestamp, ts) + } + if outcome.Event.Attrs["service"] != "auth-api" { + t.Fatalf("Event.Attrs[service]: got %q, want %q", outcome.Event.Attrs["service"], "auth-api") + } + if outcome.Event.Inferred == nil || outcome.Event.Inferred.Pattern != "user <*> authenticated" { + t.Fatalf("Event.Inferred: got %+v", outcome.Event.Inferred) + } + if outcome.LogEntry.Raw != line.Content { + t.Fatalf("LogEntry.Raw: got %q, want %q", outcome.LogEntry.Raw, line.Content) + } + if outcome.LogEntry.Timestamp != ts { + t.Fatalf("LogEntry.Timestamp: got %v, want %v", outcome.LogEntry.Timestamp, ts) + } + if outcome.LogEntry.Labels["pattern"] != "user-authenticated" { + t.Fatalf("LogEntry.Labels[pattern]: got %q, want %q", outcome.LogEntry.Labels["pattern"], "user-authenticated") + } + + stored, err := s.QueryLogs(context.Background(), store.QueryOpts{}) + if err != nil { + t.Fatalf("QueryLogs: %v", err) + } + if len(stored) != 1 { + t.Fatalf("stored entries: got %d, want 1", len(stored)) + } + if stored[0].Raw != line.Content { + t.Fatalf("stored raw: got %q, want %q", stored[0].Raw, line.Content) + } + if stored[0].Labels["pattern"] != "user-authenticated" { + t.Fatalf("stored labels[pattern]: got %q, want %q", stored[0].Labels["pattern"], "user-authenticated") + } +}