Skip to content

Commit a8b40ab

Browse files
committed
feat: add ingestion services and test helpers
1 parent c7feecf commit a8b40ab

File tree

3 files changed

+827
-0
lines changed

3 files changed

+827
-0
lines changed

internal/ingestion/service.go

Lines changed: 354 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,354 @@
1+
package ingestion
2+
3+
import (
4+
"context"
5+
"crypto/sha256"
6+
"encoding/hex"
7+
"errors"
8+
"fmt"
9+
"strings"
10+
"sync"
11+
"sync/atomic"
12+
"time"
13+
14+
"github.com/sirrobot01/mcpulse/internal/config"
15+
"github.com/sirrobot01/mcpulse/internal/database"
16+
"github.com/sirrobot01/mcpulse/internal/metrics"
17+
"github.com/sirrobot01/mcpulse/internal/models"
18+
"go.uber.org/zap"
19+
"golang.org/x/time/rate"
20+
)
21+
22+
// ErrRateLimited indicates that the ingestion request exceeded the configured rate limit.
23+
var ErrRateLimited = errors.New("rate limit exceeded")
24+
25+
// ErrBufferFull indicates that the ingestion buffer is full and cannot accept more work.
26+
var ErrBufferFull = errors.New("ingestion buffer is full")
27+
28+
const (
29+
eventTypeToolCall = "tool_call"
30+
)
31+
32+
// Event represents a real-time ingestion event that can be streamed to subscribers.
33+
type Event struct {
34+
Type string
35+
Metric models.ToolCallMetric
36+
ServerID string
37+
}
38+
39+
type subscriber struct {
40+
id int64
41+
serverID string
42+
topics map[string]struct{}
43+
ch chan Event
44+
}
45+
46+
// Service implements ingestion logic including validation, sanitization, persistence,
47+
// and optional real-time fan-out.
48+
type Service struct {
49+
cfg config.IngestionConfig
50+
privacy config.PrivacyConfig
51+
repo *database.Repository
52+
metrics *metrics.Metrics
53+
logger *zap.Logger
54+
55+
limiter *rate.Limiter
56+
57+
subsMu sync.RWMutex
58+
subscribers map[int64]*subscriber
59+
nextSubID atomic.Int64
60+
subscriberCh int
61+
}
62+
63+
// NewService constructs a new ingestion service.
64+
func NewService(
65+
cfg config.IngestionConfig,
66+
privacy config.PrivacyConfig,
67+
repo *database.Repository,
68+
metricsCollector *metrics.Metrics,
69+
logger *zap.Logger,
70+
) *Service {
71+
var limiter *rate.Limiter
72+
if cfg.RateLimit.Enabled && cfg.RateLimit.RequestsPerSecond > 0 {
73+
limiter = rate.NewLimiter(rate.Limit(cfg.RateLimit.RequestsPerSecond), cfg.RateLimit.Burst)
74+
}
75+
76+
s := &Service{
77+
cfg: cfg,
78+
privacy: privacy,
79+
repo: repo,
80+
metrics: metricsCollector,
81+
logger: logger,
82+
limiter: limiter,
83+
subscribers: make(map[int64]*subscriber),
84+
subscriberCh: max(cfg.BatchSize, 64),
85+
}
86+
87+
return s
88+
}
89+
90+
// Ingest validates, sanitizes, and persists the provided metrics batch.
91+
func (s *Service) Ingest(ctx context.Context, batch []models.ToolCallMetric) (models.IngestResponse, error) {
92+
resp := models.IngestResponse{}
93+
94+
if len(batch) == 0 {
95+
return resp, nil
96+
}
97+
98+
if s.limiter != nil && !s.limiter.AllowN(time.Now(), len(batch)) {
99+
s.metrics.RecordIngest(eventTypeToolCall, false)
100+
return resp, ErrRateLimited
101+
}
102+
103+
validMetrics := make([]models.ToolCallMetric, 0, len(batch))
104+
for idx := range batch {
105+
metric := &batch[idx]
106+
if err := s.prepareMetric(metric); err != nil {
107+
resp.Rejected++
108+
resp.Errors = append(resp.Errors, models.IngestError{
109+
Index: idx,
110+
Reason: err.Error(),
111+
})
112+
s.metrics.RecordIngestError("validation")
113+
continue
114+
}
115+
116+
validMetrics = append(validMetrics, *metric)
117+
s.metrics.RecordIngest(eventTypeToolCall, true)
118+
}
119+
120+
if len(validMetrics) == 0 {
121+
return resp, nil
122+
}
123+
124+
start := time.Now()
125+
if err := s.repo.InsertToolCallBatch(ctx, validMetrics); err != nil {
126+
s.metrics.RecordIngestError("storage")
127+
s.logger.Error("failed to insert tool call batch", zap.Error(err))
128+
return resp, fmt.Errorf("failed to persist metrics: %w", err)
129+
}
130+
s.metrics.IngestDuration.WithLabelValues("persist_tool_calls").Observe(time.Since(start).Seconds())
131+
132+
resp.Accepted = len(validMetrics)
133+
134+
// Update server metadata for each unique server ID in the batch.
135+
s.upsertServers(ctx, validMetrics)
136+
137+
// Broadcast to subscribers for real-time streaming.
138+
for _, metric := range validMetrics {
139+
s.broadcast(Event{
140+
Type: eventTypeToolCall,
141+
Metric: metric,
142+
ServerID: metric.ServerID,
143+
})
144+
}
145+
146+
return resp, nil
147+
}
148+
149+
// Subscribe registers a subscriber for real-time ingestion events. The caller MUST call the
150+
// returned cancel function when they are done consuming events.
151+
func (s *Service) Subscribe(serverID string, topics []string) (<-chan Event, func()) {
152+
ch := make(chan Event, s.subscriberCh)
153+
sub := &subscriber{
154+
id: s.nextSubID.Add(1),
155+
serverID: serverID,
156+
topics: make(map[string]struct{}),
157+
ch: ch,
158+
}
159+
160+
if len(topics) == 0 {
161+
sub.topics[eventTypeToolCall] = struct{}{}
162+
} else {
163+
for _, topic := range topics {
164+
sub.topics[strings.ToLower(strings.TrimSpace(topic))] = struct{}{}
165+
}
166+
}
167+
168+
s.subsMu.Lock()
169+
s.subscribers[sub.id] = sub
170+
s.subsMu.Unlock()
171+
172+
cancel := func() {
173+
s.subsMu.Lock()
174+
delete(s.subscribers, sub.id)
175+
s.subsMu.Unlock()
176+
close(ch)
177+
}
178+
179+
return ch, cancel
180+
}
181+
182+
// prepareMetric validates and sanitizes a single metric entry.
183+
func (s *Service) prepareMetric(metric *models.ToolCallMetric) error {
184+
if metric.ID == "" {
185+
return errors.New("missing required field: id")
186+
}
187+
if metric.ServerID == "" {
188+
return errors.New("missing required field: server_id")
189+
}
190+
if metric.ToolName == "" {
191+
return errors.New("missing required field: tool_name")
192+
}
193+
if metric.Status == "" {
194+
return errors.New("missing required field: status")
195+
}
196+
if metric.DurationMS < 0 {
197+
return errors.New("duration_ms must be >= 0")
198+
}
199+
200+
if metric.Timestamp.IsZero() {
201+
metric.Timestamp = time.Now().UTC()
202+
} else {
203+
metric.Timestamp = metric.Timestamp.UTC()
204+
}
205+
206+
status := strings.ToLower(metric.Status)
207+
switch status {
208+
case "success", "error", "timeout":
209+
default:
210+
return fmt.Errorf("invalid status: %s", metric.Status)
211+
}
212+
metric.Status = status
213+
214+
if metric.Status == "error" && (metric.ErrorMessage == nil || strings.TrimSpace(*metric.ErrorMessage) == "") {
215+
return errors.New("error_message required when status=error")
216+
}
217+
218+
if metric.SessionID != nil {
219+
sessionID := strings.TrimSpace(*metric.SessionID)
220+
if sessionID == "" {
221+
metric.SessionID = nil
222+
} else if s.privacy.HashSessionIDs {
223+
hashed := hashString(sessionID)
224+
metric.SessionID = &hashed
225+
} else {
226+
metric.SessionID = &sessionID
227+
}
228+
}
229+
230+
if len(metric.Parameters) > 0 {
231+
metric.Parameters = s.sanitizeParameters(metric.Parameters)
232+
}
233+
234+
return nil
235+
}
236+
237+
func (s *Service) sanitizeParameters(values map[string]interface{}) map[string]interface{} {
238+
if !s.privacy.SanitizeParameters || len(s.privacy.SensitiveKeys) == 0 {
239+
return values
240+
}
241+
242+
sanitized := make(map[string]interface{}, len(values))
243+
for key, value := range values {
244+
if s.isSensitiveKey(key) {
245+
sanitized[key] = "[REDACTED]"
246+
continue
247+
}
248+
249+
switch typed := value.(type) {
250+
case map[string]interface{}:
251+
sanitized[key] = s.sanitizeParameters(typed)
252+
case []interface{}:
253+
sanitized[key] = s.sanitizeSlice(typed)
254+
default:
255+
sanitized[key] = value
256+
}
257+
}
258+
return sanitized
259+
}
260+
261+
func (s *Service) sanitizeSlice(values []interface{}) []interface{} {
262+
sanitized := make([]interface{}, len(values))
263+
for idx, value := range values {
264+
switch typed := value.(type) {
265+
case map[string]interface{}:
266+
sanitized[idx] = s.sanitizeParameters(typed)
267+
case []interface{}:
268+
sanitized[idx] = s.sanitizeSlice(typed)
269+
default:
270+
sanitized[idx] = value
271+
}
272+
}
273+
return sanitized
274+
}
275+
276+
func (s *Service) isSensitiveKey(key string) bool {
277+
lower := strings.ToLower(key)
278+
for _, sensitive := range s.privacy.SensitiveKeys {
279+
if lower == strings.ToLower(sensitive) {
280+
return true
281+
}
282+
}
283+
return false
284+
}
285+
286+
func (s *Service) upsertServers(ctx context.Context, metrics []models.ToolCallMetric) {
287+
seen := make(map[string]models.ServerInfo)
288+
289+
for _, metric := range metrics {
290+
info, exists := seen[metric.ServerID]
291+
if !exists {
292+
info = models.ServerInfo{
293+
ID: metric.ServerID,
294+
Name: metric.ServerID,
295+
FirstSeen: metric.Timestamp,
296+
LastSeen: metric.Timestamp,
297+
Metadata: map[string]interface{}{},
298+
}
299+
}
300+
301+
if metric.Timestamp.Before(info.FirstSeen) {
302+
info.FirstSeen = metric.Timestamp
303+
}
304+
if metric.Timestamp.After(info.LastSeen) {
305+
info.LastSeen = metric.Timestamp
306+
}
307+
seen[metric.ServerID] = info
308+
}
309+
310+
for _, server := range seen {
311+
if err := s.repo.UpsertServer(ctx, server); err != nil {
312+
s.logger.Warn("failed to upsert server info during ingestion",
313+
zap.String("server_id", server.ID),
314+
zap.Error(err),
315+
)
316+
}
317+
}
318+
}
319+
320+
func (s *Service) broadcast(event Event) {
321+
s.subsMu.RLock()
322+
defer s.subsMu.RUnlock()
323+
324+
for _, sub := range s.subscribers {
325+
if sub.serverID != "" && sub.serverID != event.ServerID {
326+
continue
327+
}
328+
if _, ok := sub.topics[event.Type]; !ok {
329+
continue
330+
}
331+
332+
select {
333+
case sub.ch <- event:
334+
default:
335+
// Drop message if subscriber is too slow; avoid blocking ingestion.
336+
s.logger.Debug("dropping realtime event for slow subscriber",
337+
zap.String("server_id", event.ServerID),
338+
zap.String("type", event.Type),
339+
)
340+
}
341+
}
342+
}
343+
344+
func hashString(input string) string {
345+
sum := sha256.Sum256([]byte(input))
346+
return hex.EncodeToString(sum[:])
347+
}
348+
349+
func max(a, b int) int {
350+
if a > b {
351+
return a
352+
}
353+
return b
354+
}

0 commit comments

Comments
 (0)