diff --git a/.golangci.yaml b/.golangci.yaml index b8e7365..0c884f8 100644 --- a/.golangci.yaml +++ b/.golangci.yaml @@ -42,7 +42,7 @@ run: # Define the Go version limit. # Mainly related to generics support since go1.18. # Default: use Go version from the go.mod file, fallback on the env var `GOVERSION`, fallback on 1.17 - go: "1.25.7" + go: "1.26.0" linters: # Enable specific linter diff --git a/.pre-commit/golangci-lint-hook b/.pre-commit/golangci-lint-hook index 9477f8e..d963cf0 100755 --- a/.pre-commit/golangci-lint-hook +++ b/.pre-commit/golangci-lint-hook @@ -23,7 +23,7 @@ if [[ -f "${ROOT_DIR}/.project-settings.env" ]]; then # shellcheck disable=SC1090 source "${ROOT_DIR}/.project-settings.env" fi -GOLANGCI_LINT_VERSION="${GOLANGCI_LINT_VERSION:-v2.8.0}" +GOLANGCI_LINT_VERSION="${GOLANGCI_LINT_VERSION:-v2.9.0}" # ####################################### # Install dependencies to run the pre-commit hook diff --git a/.pre-commit/unit-test-hook b/.pre-commit/unit-test-hook index be4f0ea..56d91e1 100755 --- a/.pre-commit/unit-test-hook +++ b/.pre-commit/unit-test-hook @@ -21,7 +21,7 @@ hook() { local root_dir root_dir=$(git rev-parse --show-toplevel) - local toolchain_version="1.25.7" + local toolchain_version="1.26.0" if [[ -f "${root_dir}/.project-settings.env" ]]; then # shellcheck disable=SC1090 source "${root_dir}/.project-settings.env" diff --git a/.project-settings.env b/.project-settings.env index a2a195a..0100e48 100644 --- a/.project-settings.env +++ b/.project-settings.env @@ -1,5 +1,5 @@ -GOLANGCI_LINT_VERSION=v2.8.0 +GOLANGCI_LINT_VERSION=v2.9.0 BUF_VERSION=v1.65.0 -GO_VERSION=1.25.7 +GO_VERSION=1.26.0 GCI_PREFIX=github.com/hyp3rd/go-worker PROTO_ENABLED=true diff --git a/Dockerfile b/Dockerfile index c623102..ad4d7c4 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,5 +1,5 @@ # syntax=docker/dockerfile:1 -ARG GO_VERSION=1.25.7 +ARG GO_VERSION=1.26.0 FROM --platform=$BUILDPLATFORM golang:${GO_VERSION}-alpine AS builder WORKDIR /src diff --git a/Dockerfile.worker b/Dockerfile.worker index 8b6ad2e..51c00ae 100644 --- a/Dockerfile.worker +++ b/Dockerfile.worker @@ -1,5 +1,5 @@ # syntax=docker/dockerfile:1 -ARG GO_VERSION=1.25.7 +ARG GO_VERSION=1.26.0 FROM --platform=$BUILDPLATFORM golang:${GO_VERSION}-alpine AS builder WORKDIR /src diff --git a/Makefile b/Makefile index 6339abe..19553b3 100644 --- a/Makefile +++ b/Makefile @@ -1,9 +1,9 @@ include .project-settings.env REPO_ROOT = $(shell git rev-parse --show-toplevel) -GOLANGCI_LINT_VERSION ?= v2.8.0 +GOLANGCI_LINT_VERSION ?= v2.9.0 BUF_VERSION ?= v1.65.0 -GO_VERSION ?= 1.25.7 +GO_VERSION ?= 1.26.0 GCI_PREFIX ?= github.com/hyp3rd/go-worker PROTO_ENABLED ?= true BENCHTIME ?= 1s @@ -36,8 +36,7 @@ workerctl: go build -trimpath -o bin/workerctl ./cmd/workerctl update-deps: - go get -v -u ./... - go mod tidy + go get -u -t ./... && go mod tidy -v && go mod verify init: ./setup-project.sh --module $(shell grep "^module " go.mod | awk '{print $$2}') diff --git a/PRD-admin-service.md b/PRD-admin-service.md index 8d9f803..b224997 100644 --- a/PRD-admin-service.md +++ b/PRD-admin-service.md @@ -144,6 +144,9 @@ Service: `worker.v1.AdminService` - `WORKER_ADMIN_HTTP_ADDR` (default `127.0.0.1:8081`) - `WORKER_ADMIN_TLS_CERT`, `WORKER_ADMIN_TLS_KEY`, `WORKER_ADMIN_TLS_CA` - `WORKER_ADMIN_AUDIT_EVENT_LIMIT` (max audit events retained by service/backend list operations) +- `WORKER_ADMIN_AUDIT_RETENTION` (optional max age, e.g. `168h`; events older than cutoff are pruned/filtered) +- `WORKER_ADMIN_AUDIT_ARCHIVE_DIR` (optional directory for archived audit JSONL files) +- `WORKER_ADMIN_AUDIT_ARCHIVE_INTERVAL` (optional flush interval for archival writer) - `WORKER_ADMIN_AUDIT_EXPORT_LIMIT_MAX` (gateway cap for export query `limit`) - `WORKER_ADMIN_DEFAULT_QUEUE`, `WORKER_ADMIN_QUEUE_WEIGHTS` (backend defaults) @@ -183,16 +186,16 @@ Worker service job runner (containerized): - Overview, health, queues (list/get/weight/pause), schedules (list/factories/create/delete/pause/run/pause-all), jobs (list/get/upsert/delete/run), DLQ (list/detail/replay/replay-by-id), pause/resume dequeue, SSE events. - Containerized job runner supports `git_tag`, `tarball_url`, `tarball_path`, allowlists, SHA256 validation, output truncation. - Job event persistence is available via file-backed store (`WORKER_JOB_EVENT_DIR`). - - Admin observability collector is available with gateway middleware and gRPC unary interceptors; snapshot endpoint: `GET /admin/v1/metrics` (HTTP/gRPC latency, error counts, and job-runner outcome counters). + - Admin observability collector is available with gateway middleware and gRPC unary interceptors; snapshot endpoint: `GET /admin/v1/metrics` and Prometheus endpoint: `GET /admin/v1/metrics/prometheus` (also `GET /admin/v1/metrics?format=prometheus`). - Guardrails are configurable via env for replay caps, schedule run caps, and optional approval token checks (`WORKER_ADMIN_REPLAY_LIMIT_MAX`, `WORKER_ADMIN_REPLAY_IDS_MAX`, `WORKER_ADMIN_SCHEDULE_RUN_MAX`, `WORKER_ADMIN_SCHEDULE_RUN_WINDOW`, `WORKER_ADMIN_REQUIRE_APPROVAL`, `WORKER_ADMIN_APPROVAL_TOKEN`). - Artifact API parity: gateway now exposes job artifact metadata and download endpoints (`GET /admin/v1/jobs/{name}/artifact/meta`, `GET /admin/v1/jobs/{name}/artifact`) so UI no longer needs direct filesystem access for tarball-path jobs. - Audit export is available at `GET /admin/v1/audit/export` (`jsonl`, `json`, `csv`) with action/target filtering and bounded limits. - - Audit retention is configurable via `WORKER_ADMIN_AUDIT_EVENT_LIMIT`, and gateway export limits are capped by `WORKER_ADMIN_AUDIT_EXPORT_LIMIT_MAX`. + - Audit retention is configurable via count (`WORKER_ADMIN_AUDIT_EVENT_LIMIT`) and optional age cutoff (`WORKER_ADMIN_AUDIT_RETENTION`); archived JSONL files can be enabled with `WORKER_ADMIN_AUDIT_ARCHIVE_DIR` (+ optional flush interval), and gateway export limits are capped by `WORKER_ADMIN_AUDIT_EXPORT_LIMIT_MAX`. - Gateway artifact handling now runs behind an internal artifact-store abstraction (filesystem implementation), reducing direct coupling in handlers. - **Partial / gaps** - Event stream now supports `Last-Event-ID` with bounded in-memory replay; replay does not survive gateway restarts. - - Metrics are in-memory snapshots; no OpenTelemetry/Prometheus export yet for cluster-wide scraping and long-term retention. - - Central audit log supports count-based retention + export; time-window retention policy is still pending. + - Metrics are in-memory snapshots with JSON + Prometheus text export; OTel/remote aggregation is still pending for cluster-wide long-term retention. + - Central audit log supports count+age retention, bounded export, and file-based scheduled archival; external archive shipping/lifecycle policies are still pending. - No RBAC/authorization model beyond mTLS + perimeter controls. - Artifact download for `tarball_path` depends on gateway runtime mounts/config (`WORKER_ADMIN_JOB_TARBALL_DIR`) when worker and gateway are split. @@ -218,10 +221,10 @@ Worker service job runner (containerized): ### Must-have (service) -1. **Admin observability**: implemented as in-memory service metrics + gateway endpoint; pending OTel/Prometheus export and durable retention. +1. **Admin observability**: implemented as in-memory service metrics + gateway JSON/Prometheus endpoints; pending OTel export and durable retention backend. 1. **Policy enforcement**: implemented for DLQ replay caps, schedule run caps, and optional approval token checks in admin mutations. 1. **Artifact API parity**: implemented in gateway with an artifact-store abstraction; remaining work is non-filesystem providers (S3/object store) and signed URL policies. -1. **Audit retention/export**: implemented for count-based retention + export endpoints; remaining work is time-window retention policy and scheduled archival. +1. **Audit retention/export**: implemented for count+time-window retention + export endpoints, plus file-based scheduled archival; remaining work is external archival lifecycle (e.g., object storage + retention policies). ### Must-have (admin UI) diff --git a/admin-ui/README.md b/admin-ui/README.md index a424455..128af35 100644 --- a/admin-ui/README.md +++ b/admin-ui/README.md @@ -35,6 +35,15 @@ Gateway env vars (set on `worker-admin`): - `WORKER_ADMIN_JOB_TARBALL_DIR` (optional; enables local tarball download proxy) - `WORKER_ADMIN_AUDIT_EXPORT_LIMIT_MAX` (optional cap for `GET /admin/v1/audit/export`) +- `WORKER_ADMIN_AUDIT_RETENTION` (optional age cutoff, e.g. `168h`) +- `WORKER_ADMIN_AUDIT_ARCHIVE_DIR` (optional archive directory for aged-out audit events) +- `WORKER_ADMIN_AUDIT_ARCHIVE_INTERVAL` (optional archive flush interval, e.g. `30s`) + +Observability endpoints (gateway): + +- `GET /admin/v1/metrics` (JSON snapshot) +- `GET /admin/v1/metrics/prometheus` (Prometheus text) +- `GET /admin/v1/metrics?format=prometheus` (alternate Prometheus format) Worker-service job runner (for Jobs + events): diff --git a/admin_audit.go b/admin_audit.go index dd53636..85f95bb 100644 --- a/admin_audit.go +++ b/admin_audit.go @@ -68,12 +68,19 @@ func (tm *TaskManager) recordAdminAuditEvent(ctx context.Context, event AdminAud tm.auditEventsMu.Lock() tm.auditEvents = append(tm.auditEvents, event) + + archived := tm.pruneAuditEventsByAgeLocked(time.Now()) if tm.auditEventLimit > 0 && len(tm.auditEvents) > tm.auditEventLimit { - tm.auditEvents = tm.auditEvents[len(tm.auditEvents)-tm.auditEventLimit:] + dropCount := len(tm.auditEvents) - tm.auditEventLimit + dropped := make([]AdminAuditEvent, dropCount) + copy(dropped, tm.auditEvents[:dropCount]) + archived = append(archived, dropped...) + tm.auditEvents = tm.auditEvents[dropCount:] } tm.auditEventsMu.Unlock() + tm.archiveAuditEvents(archived) tm.persistAdminAuditEvent(ctx, event) } @@ -121,6 +128,8 @@ func (tm *TaskManager) auditEventsFromBackend( page, fetchErr := backend.AdminAuditEvents(ctx, filter) if fetchErr == nil { + page = tm.filterAuditEventsByAge(page) + return page, true, nil } @@ -140,10 +149,15 @@ func (tm *TaskManager) auditEventsFromMemory(filter AdminAuditEventFilter) Admin limit := normalizeAdminEventLimit(filter.Limit, tm.auditEventLimit) action := strings.TrimSpace(filter.Action) target := strings.TrimSpace(filter.Target) + cutoff, hasCutoff := tm.auditRetentionCutoff(time.Now()) filtered := make([]AdminAuditEvent, 0, min(limit, len(events))) for i := len(events) - 1; i >= 0 && len(filtered) < limit; i-- { event := events[i] + if hasCutoff && event.At.Before(cutoff) { + continue + } + if action != "" && event.Action != action { continue } @@ -157,3 +171,60 @@ func (tm *TaskManager) auditEventsFromMemory(filter AdminAuditEventFilter) Admin return AdminAuditEventPage{Events: filtered} } + +func (tm *TaskManager) auditRetentionCutoff(now time.Time) (time.Time, bool) { + if tm == nil || tm.auditRetention <= 0 { + return time.Time{}, false + } + + return now.Add(-tm.auditRetention), true +} + +func (tm *TaskManager) pruneAuditEventsByAgeLocked(now time.Time) []AdminAuditEvent { + cutoff, ok := tm.auditRetentionCutoff(now) + if !ok || len(tm.auditEvents) == 0 { + return nil + } + + firstKeep := 0 + for firstKeep < len(tm.auditEvents) && tm.auditEvents[firstKeep].At.Before(cutoff) { + firstKeep++ + } + + if firstKeep == 0 { + return nil + } + + archived := make([]AdminAuditEvent, firstKeep) + copy(archived, tm.auditEvents[:firstKeep]) + + if firstKeep >= len(tm.auditEvents) { + tm.auditEvents = tm.auditEvents[:0] + + return archived + } + + tm.auditEvents = tm.auditEvents[firstKeep:] + + return archived +} + +func (tm *TaskManager) filterAuditEventsByAge(page AdminAuditEventPage) AdminAuditEventPage { + cutoff, ok := tm.auditRetentionCutoff(time.Now()) + if !ok || len(page.Events) == 0 { + return page + } + + filtered := make([]AdminAuditEvent, 0, len(page.Events)) + for _, event := range page.Events { + if event.At.Before(cutoff) { + continue + } + + filtered = append(filtered, event) + } + + page.Events = filtered + + return page +} diff --git a/admin_audit_archive.go b/admin_audit_archive.go new file mode 100644 index 0000000..5819544 --- /dev/null +++ b/admin_audit_archive.go @@ -0,0 +1,219 @@ +package worker + +import ( + "context" + "fmt" + "log" + "os" + "path/filepath" + "strings" + "sync" + "time" + + "github.com/goccy/go-json" + "github.com/google/uuid" + "github.com/hyp3rd/ewrap" + sectools "github.com/hyp3rd/sectools/pkg/io" +) + +const ( + defaultAdminAuditArchiveInterval = 30 * time.Second + adminAuditArchiveQueueSize = 2048 + adminAuditArchiveBatchSize = 256 + adminAuditArchiveEventBytes = 128 + adminAuditArchiveFileExt = ".jsonl" +) + +type adminAuditArchiver struct { + interval time.Duration + queue chan AdminAuditEvent + store *fileAdminAuditArchiveStore +} + +type fileAdminAuditArchiveStore struct { + mu sync.Mutex + baseDir string + ioClient *sectools.Client +} + +func newAdminAuditArchiver(dir string, interval time.Duration) *adminAuditArchiver { + dir = strings.TrimSpace(dir) + if dir == "" { + return nil + } + + store, err := newFileAdminAuditArchiveStore(dir) + if err != nil { + log.Printf("admin audit archive disabled: %v", err) + + return nil + } + + if interval <= 0 { + interval = defaultAdminAuditArchiveInterval + } + + return &adminAuditArchiver{ + interval: interval, + queue: make(chan AdminAuditEvent, adminAuditArchiveQueueSize), + store: store, + } +} + +func newFileAdminAuditArchiveStore(dir string) (*fileAdminAuditArchiveStore, error) { + absDir, err := filepath.Abs(dir) + if err != nil { + return nil, ewrap.Wrap(err, "resolve admin audit archive dir") + } + + err = os.MkdirAll(absDir, 0o750) + if err != nil { + return nil, ewrap.Wrap(err, "prepare admin audit archive dir") + } + + ioClient, err := sectools.NewWithOptions( + sectools.WithAllowAbsolute(true), + sectools.WithBaseDir(absDir), + sectools.WithAllowedRoots(absDir), + sectools.WithAllowSymlinks(true), + sectools.WithDirMode(0o750), + sectools.WithWriteFileMode(0o600), + ) + if err != nil { + return nil, ewrap.Wrap(err, "init admin audit archive io") + } + + return &fileAdminAuditArchiveStore{ + baseDir: absDir, + ioClient: ioClient, + }, nil +} + +func (store *fileAdminAuditArchiveStore) write(events []AdminAuditEvent) error { + if len(events) == 0 { + return nil + } + + store.mu.Lock() + defer store.mu.Unlock() + + payload, err := marshalAdminAuditJSONL(events) + if err != nil { + return err + } + + filename := fmt.Sprintf( + "audit-%013d-%s%s", + time.Now().UTC().UnixMilli(), + uuid.NewString(), + adminAuditArchiveFileExt, + ) + + err = store.ioClient.WriteFile(filename, payload) + if err != nil { + return ewrap.Wrap(err, "write admin audit archive") + } + + return nil +} + +func marshalAdminAuditJSONL(events []AdminAuditEvent) ([]byte, error) { + buffer := make([]byte, 0, len(events)*adminAuditArchiveEventBytes) + for _, event := range events { + line, err := json.Marshal(event) + if err != nil { + return nil, ewrap.Wrap(err, "encode admin audit archive event") + } + + buffer = append(buffer, line...) + buffer = append(buffer, '\n') + } + + return buffer, nil +} + +func (archiver *adminAuditArchiver) enqueue(events []AdminAuditEvent) { + if archiver == nil || len(events) == 0 { + return + } + + dropped := 0 + + for _, event := range events { + select { + case archiver.queue <- event: + default: + dropped++ + } + } + + if dropped > 0 { + log.Printf("admin audit archival queue full; dropped=%d", dropped) + } +} + +func (archiver *adminAuditArchiver) start(ctx context.Context, wg *sync.WaitGroup) { + if archiver == nil || archiver.store == nil || ctx == nil || wg == nil { + return + } + + wg.Go(func() { + archiver.loop(ctx) + }) +} + +func (archiver *adminAuditArchiver) loop(ctx context.Context) { + interval := archiver.interval + if interval <= 0 { + interval = defaultAdminAuditArchiveInterval + } + + ticker := time.NewTicker(interval) + defer ticker.Stop() + + batch := make([]AdminAuditEvent, 0, adminAuditArchiveBatchSize) + flush := func() { + if len(batch) == 0 { + return + } + + err := archiver.store.write(batch) + if err != nil { + log.Printf("admin audit archival write: %v", err) + } + + batch = batch[:0] + } + + for { + select { + case <-ctx.Done(): + flush() + + return + case <-ticker.C: + flush() + case event := <-archiver.queue: + batch = append(batch, event) + if len(batch) >= adminAuditArchiveBatchSize { + flush() + } + } + } +} + +func (tm *TaskManager) startAuditArchival(ctx context.Context) { + if tm == nil || tm.auditArchiver == nil { + return + } + + tm.auditArchiver.start(ctx, &tm.workerWg) +} + +func (tm *TaskManager) archiveAuditEvents(events []AdminAuditEvent) { + if tm == nil || tm.auditArchiver == nil || len(events) == 0 { + return + } + + tm.auditArchiver.enqueue(events) +} diff --git a/admin_audit_archive_test.go b/admin_audit_archive_test.go new file mode 100644 index 0000000..a954b1f --- /dev/null +++ b/admin_audit_archive_test.go @@ -0,0 +1,121 @@ +package worker + +import ( + "context" + "os" + "strings" + "testing" + "time" + + "github.com/hyp3rd/ewrap" + sectools "github.com/hyp3rd/sectools/pkg/io" +) + +const ( + testAuditArchiveWait = 250 * time.Millisecond + testAuditArchiveBackoff = 10 * time.Millisecond + testAuditEventLimit = 100 +) + +func TestAdminAuditArchivalWritesExpiredEvents(t *testing.T) { + t.Parallel() + + archiveDir := t.TempDir() + tm := NewTaskManagerWithOptions( + context.Background(), + WithAdminAuditEventLimit(testAuditEventLimit), + WithAdminAuditRetention(25*time.Millisecond), + WithAdminAuditArchiveDir(archiveDir), + WithAdminAuditArchiveInterval(10*time.Millisecond), + ) + t.Cleanup(tm.StopNow) + + first := AdminAuditEvent{ + At: time.Now(), + Actor: "tester", + Action: "queue.pause", + Target: "default", + Status: "ok", + Detail: "pause", + RequestID: "req-1", + } + second := AdminAuditEvent{ + At: time.Now().Add(40 * time.Millisecond), + Actor: "tester", + Action: "queue.resume", + Target: "default", + Status: "ok", + Detail: "resume", + RequestID: "req-2", + } + + err := tm.AdminRecordAuditEvent(context.Background(), first, testAuditEventLimit) + if err != nil { + t.Fatalf("record first audit event: %v", err) + } + + time.Sleep(40 * time.Millisecond) + + err = tm.AdminRecordAuditEvent(context.Background(), second, testAuditEventLimit) + if err != nil { + t.Fatalf("record second audit event: %v", err) + } + + content, ok := waitArchiveContains(t, archiveDir, `"action":"queue.pause"`) + if !ok { + t.Fatalf("archived event not found; content=%q", content) + } + + page, err := tm.AdminAuditEvents(context.Background(), AdminAuditEventFilter{Limit: 10}) + if err != nil { + t.Fatalf("list audit events: %v", err) + } + + for _, event := range page.Events { + if event.Action == first.Action { + t.Fatalf("expired event should not remain in active set: %s", event.Action) + } + } +} + +func waitArchiveContains(t *testing.T, dir, needle string) (string, bool) { + t.Helper() + + reader, err := sectools.NewWithOptions( + sectools.WithAllowAbsolute(true), + sectools.WithBaseDir(dir), + sectools.WithAllowedRoots(dir), + sectools.WithAllowSymlinks(true), + ) + if err != nil { + t.Fatalf("init archive reader: %v", ewrap.Wrap(err, "init archive reader")) + } + + deadline := time.Now().Add(testAuditArchiveWait) + for time.Now().Before(deadline) { + entries, err := os.ReadDir(dir) + if err != nil { + t.Fatalf("read archive dir: %v", err) + } + + for _, entry := range entries { + if entry.IsDir() || !strings.HasSuffix(entry.Name(), ".jsonl") { + continue + } + + raw, err := reader.ReadFile(entry.Name()) + if err != nil { + t.Fatalf("read archive file: %v", err) + } + + content := string(raw) + if strings.Contains(content, needle) { + return content, true + } + } + + time.Sleep(testAuditArchiveBackoff) + } + + return "", false +} diff --git a/admin_gateway.go b/admin_gateway.go index d92d090..a8ace87 100644 --- a/admin_gateway.go +++ b/admin_gateway.go @@ -71,6 +71,11 @@ const ( adminTarballMimeType = "application/gzip" adminErrNoArtifact = "This job source has no downloadable tarball" adminAuditExportMax = 5000 + adminMetricsFmtJSON = "json" + adminMetricsFmtProm = "prometheus" + adminRequestIDMaxLen = 128 + adminASCIIControlMin = 0x20 + adminASCIIDelete = 0x7f ) // AdminGatewayConfig configures the admin HTTP gateway. @@ -265,6 +270,7 @@ func newAdminGatewayMux(handler *adminGatewayHandler) *http.ServeMux { mux.HandleFunc("/admin/v1/audit/export", handler.handleAuditExport) mux.HandleFunc("/admin/v1/audit", handler.handleAudit) mux.HandleFunc("/admin/v1/metrics", handler.handleMetrics) + mux.HandleFunc("/admin/v1/metrics/prometheus", handler.handleMetricsPrometheus) mux.HandleFunc("/admin/v1/jobs", handler.handleJobs) mux.HandleFunc("/admin/v1/jobs/", handler.handleJob) mux.HandleFunc("/admin/v1/schedules", handler.handleSchedules) @@ -1401,21 +1407,61 @@ func writeAuditExportCSV(w http.ResponseWriter, events []adminAuditEventJSON) { } func (h *adminGatewayHandler) handleMetrics(w http.ResponseWriter, r *http.Request) { + h.handleMetricsWithFormat(w, r, strings.TrimSpace(r.URL.Query().Get("format"))) +} + +func (h *adminGatewayHandler) handleMetricsPrometheus(w http.ResponseWriter, r *http.Request) { + h.handleMetricsWithFormat(w, r, adminMetricsFmtProm) +} + +func (h *adminGatewayHandler) handleMetricsWithFormat( + w http.ResponseWriter, + r *http.Request, + format string, +) { if r.Method != http.MethodGet { writeAdminError(w, http.StatusMethodNotAllowed, adminErrMethodBlocked, adminErrMethodMessage, requestID(r, w)) return } + format = normalizeMetricsFormat(format) if h == nil || h.observability == nil { + if format == adminMetricsFmtProm { + w.Header().Set(adminHeaderType, adminMetricsPromContentType) + + return + } + writeAdminJSON(w, http.StatusOK, AdminObservabilitySnapshot{}) return } + if format == adminMetricsFmtProm { + w.Header().Set(adminHeaderType, adminMetricsPromContentType) + + // #nosec G705 -- payload is plain-text Prometheus exposition, not rendered as HTML + _, err := io.WriteString(w, h.observability.Prometheus()) + if err != nil { + log.Printf("admin gateway metrics write: %v", err) + } + + return + } + writeAdminJSON(w, http.StatusOK, h.observability.Snapshot()) } +func normalizeMetricsFormat(format string) string { + normalized := strings.ToLower(strings.TrimSpace(format)) + if normalized == "prom" || normalized == "prometheus" || normalized == "text" { + return adminMetricsFmtProm + } + + return adminMetricsFmtJSON +} + func (h *adminGatewayHandler) handleEvents(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { writeAdminError(w, http.StatusMethodNotAllowed, adminErrMethodBlocked, adminErrMethodMessage, requestID(r, w)) @@ -1655,6 +1701,7 @@ func (w *adminEventWriter) ReplayFrom(lastID int64) error { events := w.buffer.EventsSince(lastID) for _, event := range events { + // #nosec G705 -- SSE frames carry JSON payload bytes and are consumed as event-stream, not HTML _, err := fmt.Fprintf( w.writer, "id: %d\nevent: %s\ndata: %s\n\n", @@ -2429,6 +2476,7 @@ func writeAdminJSON(w http.ResponseWriter, statusCode int, payload any) { w.Header().Set(adminHeaderType, "application/json") w.WriteHeader(statusCode) + // #nosec G705 -- response body is JSON-encoded and content-type is application/json _, err = w.Write(data) if err != nil { _ = err @@ -2454,7 +2502,13 @@ func writeAdminGRPCError(w http.ResponseWriter, requestID string, err error) { httpStatus = grpcCodeToHTTP(code) } - log.Printf("admin gateway error request_id=%s code=%s err=%v", requestID, code.String(), err) + // #nosec G706 -- request_id is normalized and error text is sanitized for control chars before logging + log.Printf( + "admin gateway error request_id=%s code=%s err=%s", + sanitizeLogField(requestID), + code.String(), + sanitizeLogField(err.Error()), + ) writeAdminError(w, httpStatus, strings.ToLower(code.String()), err.Error(), requestID) } @@ -2529,6 +2583,11 @@ func requestID(r *http.Request, w http.ResponseWriter) string { header := r.Header.Get(adminRequestIDHeader) if header == "" { header = uuid.NewString() + } else { + header = sanitizeRequestID(header) + if header == "" { + header = uuid.NewString() + } } w.Header().Set(adminRequestIDHeader, header) @@ -2536,6 +2595,60 @@ func requestID(r *http.Request, w http.ResponseWriter) string { return header } +func sanitizeRequestID(raw string) string { + raw = strings.TrimSpace(raw) + if raw == "" { + return "" + } + + builder := strings.Builder{} + builder.Grow(len(raw)) + + for _, char := range raw { + if isRequestIDCharAllowed(char) { + builder.WriteRune(char) + } + } + + value := builder.String() + if len(value) > adminRequestIDMaxLen { + value = value[:adminRequestIDMaxLen] + } + + return value +} + +func isRequestIDCharAllowed(char rune) bool { + if (char >= 'a' && char <= 'z') || + (char >= 'A' && char <= 'Z') || + (char >= '0' && char <= '9') { + return true + } + + return char == '-' || char == '_' || char == '.' +} + +func sanitizeLogField(raw string) string { + builder := strings.Builder{} + builder.Grow(len(raw)) + + for _, char := range raw { + if char == '\n' || char == '\r' || char == '\t' { + builder.WriteRune(' ') + + continue + } + + if char < adminASCIIControlMin || char == adminASCIIDelete { + continue + } + + builder.WriteRune(char) + } + + return strings.TrimSpace(builder.String()) +} + func grpcCodeToHTTP(code codes.Code) int { if mapped, ok := grpcToHTTPStatus()[code]; ok { return mapped diff --git a/admin_observability_prometheus.go b/admin_observability_prometheus.go new file mode 100644 index 0000000..1087c50 --- /dev/null +++ b/admin_observability_prometheus.go @@ -0,0 +1,125 @@ +package worker + +import ( + "sort" + "strconv" + "strings" +) + +const ( + adminMetricsPromContentType = "text/plain; version=0.0.4; charset=utf-8" + adminMetricsInitialLines = 128 +) + +// Prometheus renders the current observability snapshot in Prometheus text format. +func (collector *AdminObservability) Prometheus() string { + snapshot := collector.Snapshot() + + lines := make([]string, 0, adminMetricsInitialLines) + lines = append(lines, + "# HELP worker_admin_uptime_seconds Admin service uptime in seconds.", + "# TYPE worker_admin_uptime_seconds gauge", + "worker_admin_uptime_seconds "+strconv.FormatInt(snapshot.UptimeSec, 10), + "# HELP worker_admin_http_calls_total Total HTTP requests per route.", + "# TYPE worker_admin_http_calls_total counter", + "# HELP worker_admin_http_errors_total Total HTTP error responses per route.", + "# TYPE worker_admin_http_errors_total counter", + "# HELP worker_admin_http_duration_milliseconds_sum Total HTTP latency in milliseconds per route.", + "# TYPE worker_admin_http_duration_milliseconds_sum counter", + "# HELP worker_admin_http_duration_milliseconds_max Maximum HTTP latency in milliseconds per route.", + "# TYPE worker_admin_http_duration_milliseconds_max gauge", + "# HELP worker_admin_grpc_calls_total Total gRPC requests per method.", + "# TYPE worker_admin_grpc_calls_total counter", + "# HELP worker_admin_grpc_errors_total Total gRPC errors per method.", + "# TYPE worker_admin_grpc_errors_total counter", + "# HELP worker_admin_grpc_duration_milliseconds_sum Total gRPC latency in milliseconds per method.", + "# TYPE worker_admin_grpc_duration_milliseconds_sum counter", + "# HELP worker_admin_grpc_duration_milliseconds_max Maximum gRPC latency in milliseconds per method.", + "# TYPE worker_admin_grpc_duration_milliseconds_max gauge", + "# HELP worker_admin_jobs_running Current number of running jobs.", + "# TYPE worker_admin_jobs_running gauge", + "# HELP worker_admin_jobs_completed_total Total completed jobs.", + "# TYPE worker_admin_jobs_completed_total counter", + "# HELP worker_admin_jobs_failed_total Total failed jobs.", + "# TYPE worker_admin_jobs_failed_total counter", + "# HELP worker_admin_jobs_duration_milliseconds_sum Total job runtime in milliseconds.", + "# TYPE worker_admin_jobs_duration_milliseconds_sum counter", + "# HELP worker_admin_jobs_duration_milliseconds_max Maximum job runtime in milliseconds.", + "# TYPE worker_admin_jobs_duration_milliseconds_max gauge", + ) + + httpKeys := sortedKeys(snapshot.HTTP) + for _, key := range httpKeys { + stat := snapshot.HTTP[key] + method, route := splitHTTPMetricKey(key) + + labels := `method="` + promEscape(method) + `",route="` + promEscape(route) + `",last_code="` + promEscape(stat.LastCode) + `"` + lines = append(lines, + `worker_admin_http_calls_total{`+labels+`} `+strconv.FormatInt(stat.Calls, 10), + `worker_admin_http_errors_total{`+labels+`} `+strconv.FormatInt(stat.Errors, 10), + `worker_admin_http_duration_milliseconds_sum{`+labels+`} `+strconv.FormatInt(stat.TotalMs, 10), + `worker_admin_http_duration_milliseconds_max{`+labels+`} `+strconv.FormatInt(stat.MaxMs, 10), + ) + } + + grpcKeys := sortedKeys(snapshot.GRPC) + for _, key := range grpcKeys { + stat := snapshot.GRPC[key] + labels := `method="` + promEscape(key) + `",last_code="` + promEscape(stat.LastCode) + `"` + lines = append(lines, + `worker_admin_grpc_calls_total{`+labels+`} `+strconv.FormatInt(stat.Calls, 10), + `worker_admin_grpc_errors_total{`+labels+`} `+strconv.FormatInt(stat.Errors, 10), + `worker_admin_grpc_duration_milliseconds_sum{`+labels+`} `+strconv.FormatInt(stat.TotalMs, 10), + `worker_admin_grpc_duration_milliseconds_max{`+labels+`} `+strconv.FormatInt(stat.MaxMs, 10), + ) + } + + lines = append(lines, + "worker_admin_jobs_running "+strconv.FormatInt(snapshot.Jobs.Running, 10), + "worker_admin_jobs_completed_total "+strconv.FormatInt(snapshot.Jobs.Completed, 10), + "worker_admin_jobs_failed_total "+strconv.FormatInt(snapshot.Jobs.Failed, 10), + "worker_admin_jobs_duration_milliseconds_sum "+strconv.FormatInt(snapshot.Jobs.TotalMs, 10), + "worker_admin_jobs_duration_milliseconds_max "+strconv.FormatInt(snapshot.Jobs.MaxMs, 10), + ) + + return strings.Join(lines, "\n") + "\n" +} + +func sortedKeys[T any](values map[string]T) []string { + keys := make([]string, 0, len(values)) + for key := range values { + keys = append(keys, key) + } + + sort.Strings(keys) + + return keys +} + +func splitHTTPMetricKey(key string) (method, route string) { + method, route, ok := strings.Cut(key, " ") + if !ok { + return "unknown", key + } + + method = strings.TrimSpace(method) + route = strings.TrimSpace(route) + + if method == "" { + method = "unknown" + } + + if route == "" { + route = "/" + } + + return method, route +} + +func promEscape(value string) string { + value = strings.ReplaceAll(value, `\`, `\\`) + value = strings.ReplaceAll(value, "\n", `\n`) + value = strings.ReplaceAll(value, `"`, `\"`) + + return value +} diff --git a/cmd/worker-admin/main.go b/cmd/worker-admin/main.go index 7c5d01e..2b3ff54 100644 --- a/cmd/worker-admin/main.go +++ b/cmd/worker-admin/main.go @@ -59,6 +59,9 @@ type config struct { globalBurst int leaderLease time.Duration auditEventLimit int + auditRetention time.Duration + auditArchiveDir string + auditArchiveInt time.Duration auditExportMax int adminGuardrails worker.AdminGuardrails } @@ -107,6 +110,9 @@ func run() error { worker.WithDurableBackend(backend), worker.WithDurableLease(defaultLeaseDuration), worker.WithAdminAuditEventLimit(cfg.auditEventLimit), + worker.WithAdminAuditRetention(cfg.auditRetention), + worker.WithAdminAuditArchiveDir(cfg.auditArchiveDir), + worker.WithAdminAuditArchiveInterval(cfg.auditArchiveInt), ) grpcServer, err = startAdminGRPCServer(cfg, tm, observability) @@ -214,6 +220,9 @@ func loadConfig() (config, error) { cfg.globalBurst = parseInt(os.Getenv("WORKER_ADMIN_GLOBAL_BURST")) cfg.leaderLease = parseDuration(os.Getenv("WORKER_ADMIN_LEADER_LEASE")) cfg.auditEventLimit = parseIntWithDefault(os.Getenv("WORKER_ADMIN_AUDIT_EVENT_LIMIT"), defaultAdminAuditMax) + cfg.auditRetention = parseDuration(os.Getenv("WORKER_ADMIN_AUDIT_RETENTION")) + cfg.auditArchiveDir = strings.TrimSpace(os.Getenv("WORKER_ADMIN_AUDIT_ARCHIVE_DIR")) + cfg.auditArchiveInt = parseDuration(os.Getenv("WORKER_ADMIN_AUDIT_ARCHIVE_INTERVAL")) cfg.auditExportMax = parseIntWithDefault(os.Getenv("WORKER_ADMIN_AUDIT_EXPORT_LIMIT_MAX"), defaultAuditExportMax) cfg.adminGuardrails = worker.AdminGuardrails{ ReplayLimitMax: parseIntWithDefault(os.Getenv("WORKER_ADMIN_REPLAY_LIMIT_MAX"), defaultAdminReplayCap), diff --git a/cmd/worker-service/job_runner.go b/cmd/worker-service/job_runner.go index f35417f..82cede4 100644 --- a/cmd/worker-service/job_runner.go +++ b/cmd/worker-service/job_runner.go @@ -20,6 +20,7 @@ import ( "github.com/hyp3rd/ewrap" sectools "github.com/hyp3rd/sectools/pkg/io" + sectvalidate "github.com/hyp3rd/sectools/pkg/validate" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/structpb" @@ -134,7 +135,7 @@ func (r *jobRunner) Run(ctx context.Context, job worker.AdminJob) (output string return "", ewrap.New("job runner not configured") } - err = r.validate(job) + err = r.validate(ctx, job) if err != nil { return "", err } @@ -202,7 +203,7 @@ func (r *jobRunner) startRunObservation() func(*error) { } } -func (r *jobRunner) validate(job worker.AdminJob) error { +func (r *jobRunner) validate(ctx context.Context, job worker.AdminJob) error { if r.dockerBin == "" { return ewrap.New("job runner docker binary is required") } @@ -213,7 +214,7 @@ func (r *jobRunner) validate(job worker.AdminJob) error { case worker.JobSourceGitTag: return r.validateGitJob(job) case worker.JobSourceTarballURL: - return r.validateTarballURLJob(job) + return r.validateTarballURLJob(ctx, job) case worker.JobSourceTarballPath: return r.validateTarballPathJob(job) default: @@ -256,12 +257,12 @@ func (r *jobRunner) validateGitJob(job worker.AdminJob) error { return nil } -func (r *jobRunner) validateTarballURLJob(job worker.AdminJob) error { +func (r *jobRunner) validateTarballURLJob(ctx context.Context, job worker.AdminJob) error { if strings.TrimSpace(job.TarballURL) == "" { return ewrap.New("job tarball url is required") } - return r.validateTarballURL(job.TarballURL) + return r.validateTarballURL(ctx, job.TarballURL) } func (r *jobRunner) validateTarballPathJob(job worker.AdminJob) error { @@ -336,7 +337,7 @@ func (r *jobRunner) cloneRepo(ctx context.Context, job worker.AdminJob, repoDir } func (r *jobRunner) extractTarballFromURL(ctx context.Context, job worker.AdminJob, repoDir string) (string, error) { - parsed, err := r.parseTarballURL(job.TarballURL) + parsed, err := r.parseTarballURL(ctx, job.TarballURL) if err != nil { return "", err } @@ -350,6 +351,7 @@ func (r *jobRunner) extractTarballFromURL(ctx context.Context, job worker.AdminJ req.Header.Set("User-Agent", "go-worker/job-runner") + // #nosec G704 -- URL is validated with sectools URL validator + host allowlist before request resp, err := client.Do(req) if err != nil { return "", ewrap.Wrap(err, "download tarball") @@ -412,24 +414,43 @@ func (r *jobRunner) extractTarballFromPath(ctx context.Context, job worker.Admin return "tarball loaded", nil } -func (r *jobRunner) parseTarballURL(raw string) (*url.URL, error) { +func (r *jobRunner) parseTarballURL(ctx context.Context, raw string) (*url.URL, error) { raw = strings.TrimSpace(raw) if raw == "" { return nil, ewrap.New("job tarball url is required") } - parsed, err := url.Parse(raw) + if ctx == nil { + return nil, ewrap.New("job tarball url context is required") + } + + allowedHosts := r.normalizedTarballAllowHosts() + + options := []sectvalidate.URLOption{ + sectvalidate.WithURLAllowedSchemes("https"), + sectvalidate.WithURLAllowIDN(true), + } + if len(allowedHosts) > 0 { + options = append(options, sectvalidate.WithURLAllowedHosts(allowedHosts...)) + } + + validator, err := sectvalidate.NewURLValidator(options...) + if err != nil { + return nil, ewrap.Wrap(err, "build tarball url validator") + } + + result, err := validator.Validate(ctx, raw) if err != nil { return nil, ewrap.New("job tarball url is invalid") } - if parsed.Host == "" { - return nil, ewrap.New("job tarball url host is required") + parsed, err := url.Parse(result.FinalURL) + if err != nil { + return nil, ewrap.New("job tarball url is invalid") } - scheme := strings.ToLower(parsed.Scheme) - if scheme != "https" && scheme != "http" { - return nil, ewrap.New("job tarball url must be http or https") + if parsed.Host == "" { + return nil, ewrap.New("job tarball url host is required") } if !r.isTarballHostAllowed(parsed.Host) { @@ -439,8 +460,41 @@ func (r *jobRunner) parseTarballURL(raw string) (*url.URL, error) { return parsed, nil } -func (r *jobRunner) validateTarballURL(raw string) error { - _, err := r.parseTarballURL(raw) +func (r *jobRunner) normalizedTarballAllowHosts() []string { + if r == nil || len(r.tarballAllowlist) == 0 { + return nil + } + + hosts := make([]string, 0, len(r.tarballAllowlist)) + seen := make(map[string]struct{}, len(r.tarballAllowlist)) + + for host := range r.tarballAllowlist { + normalized := strings.ToLower(strings.TrimSpace(host)) + if normalized == "" { + continue + } + + if base, _, ok := strings.Cut(normalized, ":"); ok { + normalized = base + } + + if normalized == "" { + continue + } + + if _, ok := seen[normalized]; ok { + continue + } + + seen[normalized] = struct{}{} + hosts = append(hosts, normalized) + } + + return hosts +} + +func (r *jobRunner) validateTarballURL(ctx context.Context, raw string) error { + _, err := r.parseTarballURL(ctx, raw) return err } @@ -884,7 +938,7 @@ func (r *jobRunner) runImage(ctx context.Context, job worker.AdminJob, imageTag runArgs = append(runArgs, job.Command...) } - // #nosec G204 -- docker binary/path is configured, inputs are normalized + // #nosec G204,G702 -- exec.CommandContext avoids shell interpolation; args are passed verbatim cmd := exec.CommandContext(ctx, r.dockerBin, runArgs...) return runCommand(cmd, r.outputBytes) diff --git a/cmd/worker-service/main.go b/cmd/worker-service/main.go index 7ab5263..2e80e01 100644 --- a/cmd/worker-service/main.go +++ b/cmd/worker-service/main.go @@ -95,6 +95,9 @@ type config struct { jobEventCacheTTL time.Duration jobEventMaxEntries int auditEventLimit int + auditRetention time.Duration + auditArchiveDir string + auditArchiveInt time.Duration adminGuardrails worker.AdminGuardrails } @@ -124,6 +127,9 @@ func run() error { worker.WithDurableLease(cfg.durableLease), worker.WithDurableHandlers(durableHandlers), worker.WithAdminAuditEventLimit(cfg.auditEventLimit), + worker.WithAdminAuditRetention(cfg.auditRetention), + worker.WithAdminAuditArchiveDir(cfg.auditArchiveDir), + worker.WithAdminAuditArchiveInterval(cfg.auditArchiveInt), ) tm.StartWorkers(context.Background()) @@ -236,6 +242,9 @@ func loadConfig() config { jobEventCacheTTL: defaultDuration(os.Getenv("WORKER_JOB_EVENT_CACHE_TTL"), defaultJobEventCache), jobEventMaxEntries: parseIntWithDefault(os.Getenv("WORKER_JOB_EVENT_MAX_ENTRIES"), defaultJobEventMax), auditEventLimit: parseIntWithDefault(os.Getenv("WORKER_ADMIN_AUDIT_EVENT_LIMIT"), defaultAuditEventMax), + auditRetention: defaultDuration(os.Getenv("WORKER_ADMIN_AUDIT_RETENTION"), 0), + auditArchiveDir: strings.TrimSpace(os.Getenv("WORKER_ADMIN_AUDIT_ARCHIVE_DIR")), + auditArchiveInt: defaultDuration(os.Getenv("WORKER_ADMIN_AUDIT_ARCHIVE_INTERVAL"), 0), } cfg.durableCronHandlers = append([]string{}, defaultDurableCronHandlers()...) diff --git a/cspell.json b/cspell.json index cb9e504..9731d8e 100644 --- a/cspell.json +++ b/cspell.json @@ -28,6 +28,7 @@ "alreadyexists", "anchore", "anypb", + "Archiver", "Atoi", "Autobuild", "backpressure", @@ -173,6 +174,7 @@ "sdkmetric", "sectconv", "sectools", + "sectvalidate", "securego", "shellcheck", "sigstore", diff --git a/go.mod b/go.mod index 517a77a..1ca0e9b 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/hyp3rd/go-worker -go 1.25.7 +go 1.26.0 require ( github.com/goccy/go-json v0.10.5 @@ -11,9 +11,9 @@ require ( github.com/robfig/cron/v3 v3.0.1 github.com/spf13/cobra v1.10.2 go.opentelemetry.io/otel/metric v1.40.0 - go.opentelemetry.io/otel/sdk/metric v1.39.0 + go.opentelemetry.io/otel/sdk/metric v1.40.0 golang.org/x/time v0.14.0 - google.golang.org/grpc v1.78.0 + google.golang.org/grpc v1.79.1 google.golang.org/protobuf v1.36.11 gopkg.in/yaml.v3 v3.0.1 ) @@ -27,7 +27,7 @@ require ( github.com/spf13/pflag v1.0.10 // indirect go.opentelemetry.io/auto/sdk v1.2.1 // indirect go.opentelemetry.io/otel v1.40.0 // indirect - go.opentelemetry.io/otel/sdk v1.39.0 // indirect + go.opentelemetry.io/otel/sdk v1.40.0 // indirect go.opentelemetry.io/otel/trace v1.40.0 // indirect golang.org/x/net v0.50.0 // indirect golang.org/x/sys v0.41.0 // indirect diff --git a/go.sum b/go.sum index fc4b5e4..4351688 100644 --- a/go.sum +++ b/go.sum @@ -54,10 +54,10 @@ go.opentelemetry.io/otel v1.40.0 h1:oA5YeOcpRTXq6NN7frwmwFR0Cn3RhTVZvXsP4duvCms= go.opentelemetry.io/otel v1.40.0/go.mod h1:IMb+uXZUKkMXdPddhwAHm6UfOwJyh4ct1ybIlV14J0g= go.opentelemetry.io/otel/metric v1.40.0 h1:rcZe317KPftE2rstWIBitCdVp89A2HqjkxR3c11+p9g= go.opentelemetry.io/otel/metric v1.40.0/go.mod h1:ib/crwQH7N3r5kfiBZQbwrTge743UDc7DTFVZrrXnqc= -go.opentelemetry.io/otel/sdk v1.39.0 h1:nMLYcjVsvdui1B/4FRkwjzoRVsMK8uL/cj0OyhKzt18= -go.opentelemetry.io/otel/sdk v1.39.0/go.mod h1:vDojkC4/jsTJsE+kh+LXYQlbL8CgrEcwmt1ENZszdJE= -go.opentelemetry.io/otel/sdk/metric v1.39.0 h1:cXMVVFVgsIf2YL6QkRF4Urbr/aMInf+2WKg+sEJTtB8= -go.opentelemetry.io/otel/sdk/metric v1.39.0/go.mod h1:xq9HEVH7qeX69/JnwEfp6fVq5wosJsY1mt4lLfYdVew= +go.opentelemetry.io/otel/sdk v1.40.0 h1:KHW/jUzgo6wsPh9At46+h4upjtccTmuZCFAc9OJ71f8= +go.opentelemetry.io/otel/sdk v1.40.0/go.mod h1:Ph7EFdYvxq72Y8Li9q8KebuYUr2KoeyHx0DRMKrYBUE= +go.opentelemetry.io/otel/sdk/metric v1.40.0 h1:mtmdVqgQkeRxHgRv4qhyJduP3fYJRMX4AtAlbuWdCYw= +go.opentelemetry.io/otel/sdk/metric v1.40.0/go.mod h1:4Z2bGMf0KSK3uRjlczMOeMhKU2rhUqdWNoKcYrtcBPg= go.opentelemetry.io/otel/trace v1.40.0 h1:WA4etStDttCSYuhwvEa8OP8I5EWu24lkOzp+ZYblVjw= go.opentelemetry.io/otel/trace v1.40.0/go.mod h1:zeAhriXecNGP/s2SEG3+Y8X9ujcJOTqQ5RgdEJcawiA= go.yaml.in/yaml/v3 v3.0.4 h1:tfq32ie2Jv2UxXFdLJdh3jXuOzWiL1fo0bu/FbuKpbc= @@ -74,8 +74,8 @@ gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk= gonum.org/v1/gonum v0.16.0/go.mod h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E= google.golang.org/genproto/googleapis/rpc v0.0.0-20260209200024-4cfbd4190f57 h1:mWPCjDEyshlQYzBpMNHaEof6UX1PmHcaUODUywQ0uac= google.golang.org/genproto/googleapis/rpc v0.0.0-20260209200024-4cfbd4190f57/go.mod h1:j9x/tPzZkyxcgEFkiKEEGxfvyumM01BEtsW8xzOahRQ= -google.golang.org/grpc v1.78.0 h1:K1XZG/yGDJnzMdd/uZHAkVqJE+xIDOcmdSFZkBUicNc= -google.golang.org/grpc v1.78.0/go.mod h1:I47qjTo4OKbMkjA/aOOwxDIiPSBofUtQUI5EfpWvW7U= +google.golang.org/grpc v1.79.1 h1:zGhSi45ODB9/p3VAawt9a+O/MULLl9dpizzNNpq7flY= +google.golang.org/grpc v1.79.1/go.mod h1:KmT0Kjez+0dde/v2j9vzwoAScgEPx/Bw1CYChhHLrHQ= google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE= google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/job_payload.go b/job_payload.go index 2e3807f..f0286e6 100644 --- a/job_payload.go +++ b/job_payload.go @@ -139,6 +139,8 @@ func getNumberValue(value any) float64 { if err == nil { return parsed } + default: + return 0 } return 0 diff --git a/options.go b/options.go index 10a3e93..7182f0b 100644 --- a/options.go +++ b/options.go @@ -20,6 +20,9 @@ type taskManagerConfig struct { defaultQueue string queueWeights map[string]int auditEventLimit int + auditRetention time.Duration + auditArchiveDir string + auditArchiveInt time.Duration durableBackend DurableBackend durableHandlers map[string]DurableHandlerSpec @@ -49,6 +52,9 @@ func defaultTaskManagerConfig() taskManagerConfig { durableCodec: ProtoDurableCodec{}, cronLocation: time.UTC, auditEventLimit: defaultAdminAuditEventLimit, + auditRetention: 0, + auditArchiveDir: "", + auditArchiveInt: 0, } } @@ -181,3 +187,24 @@ func WithAdminAuditEventLimit(limit int) TaskManagerOption { cfg.auditEventLimit = limit } } + +// WithAdminAuditRetention sets a max age for admin audit events. Set <= 0 to disable age pruning. +func WithAdminAuditRetention(ttl time.Duration) TaskManagerOption { + return func(cfg *taskManagerConfig) { + cfg.auditRetention = ttl + } +} + +// WithAdminAuditArchiveDir enables file archival for aged-out audit events. +func WithAdminAuditArchiveDir(dir string) TaskManagerOption { + return func(cfg *taskManagerConfig) { + cfg.auditArchiveDir = dir + } +} + +// WithAdminAuditArchiveInterval sets the flush interval for audit archival. +func WithAdminAuditArchiveInterval(interval time.Duration) TaskManagerOption { + return func(cfg *taskManagerConfig) { + cfg.auditArchiveInt = interval + } +} diff --git a/worker.go b/worker.go index ad16894..6de53bb 100644 --- a/worker.go +++ b/worker.go @@ -115,6 +115,8 @@ type TaskManager struct { auditEventsMu sync.RWMutex auditEvents []AdminAuditEvent auditEventLimit int + auditRetention time.Duration + auditArchiver *adminAuditArchiver } // NewTaskManagerWithDefaults creates a new task manager with default values. @@ -245,6 +247,8 @@ func newTaskManagerFromConfig(ctx context.Context, cfg taskManagerConfig) *TaskM cronRuns: map[uuid.UUID]cronRunInfo{}, jobEventLimit: defaultAdminJobEventLimit, auditEventLimit: normalizeAdminEventLimit(cfg.auditEventLimit, defaultAdminAuditEventLimit), + auditRetention: normalizeAdminAuditRetention(cfg.auditRetention), + auditArchiver: newAdminAuditArchiver(cfg.auditArchiveDir, cfg.auditArchiveInt), } tm.queueCond = sync.NewCond(&tm.queueMu) @@ -282,6 +286,14 @@ func normalizeTaskManagerConfig(cfg taskManagerConfig) taskManagerConfig { cfg.maxRetries = DefaultMaxRetries } + if cfg.auditRetention < 0 { + cfg.auditRetention = 0 + } + + if cfg.auditArchiveInt < 0 { + cfg.auditArchiveInt = 0 + } + if cfg.durableCodec == nil { cfg.durableCodec = ProtoDurableCodec{} } @@ -289,6 +301,14 @@ func normalizeTaskManagerConfig(cfg taskManagerConfig) taskManagerConfig { return cfg } +func normalizeAdminAuditRetention(value time.Duration) time.Duration { + if value <= 0 { + return 0 + } + + return value +} + // IsEmpty checks if the task scheduler queue is empty. func (tm *TaskManager) IsEmpty() bool { tm.queueMu.Lock() @@ -327,6 +347,7 @@ func (tm *TaskManager) StartWorkers(ctx context.Context) { } tm.startCron() + tm.startAuditArchival(workerCtx) }) }