Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 109 additions & 0 deletions pkg/ingest/raw.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package ingest

import (
"context"
"time"

"github.com/go-errors/errors"
"github.com/strrl/lapp/pkg/event"
"github.com/strrl/lapp/pkg/logsource"
"github.com/strrl/lapp/pkg/store"
)

// ParseResult carries parser-derived event data plus optional storage labels.
type ParseResult struct {
Timestamp *time.Time
Attrs map[string]string
Inferred *event.Inferred
Labels map[string]string
}

// Parser extracts structured data from a raw log line.
type Parser interface {
Parse(ctx context.Context, raw string) (*ParseResult, error)
}

// Outcome captures the normalized event, the storage record, and any parser error.
type Outcome struct {
Event event.Event
LogEntry store.LogEntry
ParseErr error
}

// FromRawLine normalizes a raw log line and keeps the original text as the source of truth.
// Parser failures are surfaced on the Outcome but still return a fallback event/log entry.
func FromRawLine(ctx context.Context, line *logsource.LogLine, parser Parser) (Outcome, error) {
if line == nil {
return Outcome{}, errors.New("raw log line is required")
}

outcome := Outcome{
Event: event.Event{
Text: line.Content,
Attrs: map[string]string{},
Inferred: &event.Inferred{},
},
LogEntry: store.LogEntry{
LineNumber: line.LineNumber,
EndLineNumber: line.LineNumber,
Raw: line.Content,
Labels: map[string]string{},
},
}

if parser == nil {
return outcome, nil
}

parsed, parseErr := parser.Parse(ctx, line.Content)
if parseErr != nil {
outcome.ParseErr = parseErr
} else if parsed != nil {
if parsed.Timestamp != nil {
ts := *parsed.Timestamp
outcome.Event.Timestamp = &ts
outcome.LogEntry.Timestamp = ts
}
if parsed.Attrs != nil {
outcome.Event.Attrs = cloneMap(parsed.Attrs)
}
if parsed.Inferred != nil {
outcome.Event.Inferred = &event.Inferred{
Pattern: parsed.Inferred.Pattern,
Entity: parsed.Inferred.Entity,
}
}
if parsed.Labels != nil {
outcome.LogEntry.Labels = cloneMap(parsed.Labels)
}
}

return outcome, nil
}

// StoreRawLine persists the raw line outcome. Parser failures never prevent storage.
func StoreRawLine(ctx context.Context, dst store.Store, line *logsource.LogLine, parser Parser) (Outcome, error) {
if dst == nil {
return Outcome{}, errors.New("store is required")
}

outcome, err := FromRawLine(ctx, line, parser)
if err != nil {
return Outcome{}, err
}
if err := dst.InsertLog(ctx, outcome.LogEntry); err != nil {
return Outcome{}, errors.Errorf("insert raw log line: %w", err)
}
return outcome, nil
}

func cloneMap(src map[string]string) map[string]string {
if src == nil {
return nil
}
cloned := make(map[string]string, len(src))
for key, value := range src {
cloned[key] = value
}
return cloned
}
168 changes: 168 additions & 0 deletions pkg/ingest/raw_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
package ingest

import (
"context"
stderrors "errors"
"testing"
"time"

"github.com/strrl/lapp/pkg/event"
"github.com/strrl/lapp/pkg/logsource"
"github.com/strrl/lapp/pkg/store"
)

type stubParser struct {
parse func(context.Context, string) (*ParseResult, error)
}

func (s stubParser) Parse(ctx context.Context, raw string) (*ParseResult, error) {
return s.parse(ctx, raw)
}

func newTestStore(t *testing.T) *store.DuckDBStore {
t.Helper()

s, err := store.NewDuckDBStore("")
if err != nil {
t.Fatalf("NewDuckDBStore: %v", err)
}
t.Cleanup(func() { _ = s.Close() })

if err := s.Init(context.Background()); err != nil {
t.Fatalf("Init: %v", err)
}

return s
}

func TestFromRawLineWithoutParserUsesFallbackEvent(t *testing.T) {
line := &logsource.LogLine{
LineNumber: 7,
Content: " WARN worker stalled for 30s ",
}

outcome, err := FromRawLine(context.Background(), line, nil)
if err != nil {
t.Fatalf("FromRawLine: %v", err)
}

if outcome.ParseErr != nil {
t.Fatalf("ParseErr: got %v, want nil", outcome.ParseErr)
}
if outcome.Event.Text != line.Content {
t.Fatalf("Event.Text: got %q, want %q", outcome.Event.Text, line.Content)
}
if len(outcome.Event.Attrs) != 0 {
t.Fatalf("Event.Attrs: got %v, want empty map", outcome.Event.Attrs)
}
if outcome.Event.Inferred == nil {
t.Fatal("Event.Inferred: got nil, want non-nil empty inferred")
}
if outcome.LogEntry.Raw != line.Content {
t.Fatalf("LogEntry.Raw: got %q, want %q", outcome.LogEntry.Raw, line.Content)
}
if outcome.LogEntry.LineNumber != line.LineNumber || outcome.LogEntry.EndLineNumber != line.LineNumber {
t.Fatalf("log entry line bounds: got (%d,%d), want (%d,%d)", outcome.LogEntry.LineNumber, outcome.LogEntry.EndLineNumber, line.LineNumber, line.LineNumber)
}
}

func TestStoreRawLinePersistsWhenParserFails(t *testing.T) {
s := newTestStore(t)
line := &logsource.LogLine{
LineNumber: 12,
Content: "ERROR checkout request req_123 timed out",
}

outcome, err := StoreRawLine(context.Background(), s, line, stubParser{
parse: func(context.Context, string) (*ParseResult, error) {
return nil, context.DeadlineExceeded
},
})
if err != nil {
t.Fatalf("StoreRawLine: %v", err)
}
if !stderrors.Is(outcome.ParseErr, context.DeadlineExceeded) {
t.Fatalf("ParseErr: got %v, want %v", outcome.ParseErr, context.DeadlineExceeded)
}
if outcome.Event.Text != line.Content {
t.Fatalf("Event.Text: got %q, want %q", outcome.Event.Text, line.Content)
}

stored, err := s.QueryLogs(context.Background(), store.QueryOpts{})
if err != nil {
t.Fatalf("QueryLogs: %v", err)
}
if len(stored) != 1 {
t.Fatalf("stored entries: got %d, want 1", len(stored))
}
if stored[0].Raw != line.Content {
t.Fatalf("stored raw: got %q, want %q", stored[0].Raw, line.Content)
}
}

func TestStoreRawLineUsesParsedFieldsButKeepsRawText(t *testing.T) {
s := newTestStore(t)
ts := time.Date(2026, 3, 10, 21, 0, 0, 0, time.UTC)
line := &logsource.LogLine{
LineNumber: 3,
Content: "level=info service=auth-api msg=\"user user_456 authenticated\"",
}

outcome, err := StoreRawLine(context.Background(), s, line, stubParser{
parse: func(context.Context, string) (*ParseResult, error) {
return &ParseResult{
Timestamp: &ts,
Attrs: map[string]string{
"level": "info",
"service": "auth-api",
},
Inferred: &event.Inferred{
Pattern: "user <*> authenticated",
Entity: "auth-api",
},
Labels: map[string]string{
"pattern": "user-authenticated",
},
}, nil
},
})
if err != nil {
t.Fatalf("StoreRawLine: %v", err)
}

if outcome.Event.Text != line.Content {
t.Fatalf("Event.Text: got %q, want %q", outcome.Event.Text, line.Content)
}
if outcome.Event.Timestamp == nil || !outcome.Event.Timestamp.Equal(ts) {
t.Fatalf("Event.Timestamp: got %v, want %v", outcome.Event.Timestamp, ts)
}
if outcome.Event.Attrs["service"] != "auth-api" {
t.Fatalf("Event.Attrs[service]: got %q, want %q", outcome.Event.Attrs["service"], "auth-api")
}
if outcome.Event.Inferred == nil || outcome.Event.Inferred.Pattern != "user <*> authenticated" {
t.Fatalf("Event.Inferred: got %+v", outcome.Event.Inferred)
}
if outcome.LogEntry.Raw != line.Content {
t.Fatalf("LogEntry.Raw: got %q, want %q", outcome.LogEntry.Raw, line.Content)
}
if outcome.LogEntry.Timestamp != ts {
t.Fatalf("LogEntry.Timestamp: got %v, want %v", outcome.LogEntry.Timestamp, ts)
}
if outcome.LogEntry.Labels["pattern"] != "user-authenticated" {
t.Fatalf("LogEntry.Labels[pattern]: got %q, want %q", outcome.LogEntry.Labels["pattern"], "user-authenticated")
}

stored, err := s.QueryLogs(context.Background(), store.QueryOpts{})
if err != nil {
t.Fatalf("QueryLogs: %v", err)
}
if len(stored) != 1 {
t.Fatalf("stored entries: got %d, want 1", len(stored))
}
if stored[0].Raw != line.Content {
t.Fatalf("stored raw: got %q, want %q", stored[0].Raw, line.Content)
}
if stored[0].Labels["pattern"] != "user-authenticated" {
t.Fatalf("stored labels[pattern]: got %q, want %q", stored[0].Labels["pattern"], "user-authenticated")
}
}
Loading