Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions cmd/simrun/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,35 @@ func main() {
}
}()

// Retention sweepers: expire run logs and whole assessments by age. Both run
// once at startup and then hourly, re-reading AppConfig each tick so admin
// changes apply without a restart.
go func() {
ticker := time.NewTicker(1 * time.Hour)
defer ticker.Stop()
sweep := func() {
// Use the main ctx so a shutdown signal cancels in-flight DB work
// (config load + the per-run deletes the assessment sweep loops over)
// instead of blocking shutdown on a slow Postgres.
cfg, err := configStore.GetAppConfig(ctx)
if err != nil {
log.Warnf("Retention sweep: failed to load config: %v", err)
return
}
web.SweepRunLogs(bootstrap.DataDir, cfg.AssessmentLogRetentionEnabled, cfg.AssessmentLogRetentionDays)
web.SweepAssessments(ctx, runStore, bootstrap.DataDir, cfg.AssessmentRetentionEnabled, cfg.AssessmentRetentionDays)
}
sweep()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
sweep()
}
}
}()

go func() {
if err := server.ListenAndServe(); err != nil {
log.Fatalf("Server error: %v", err)
Expand Down
24 changes: 16 additions & 8 deletions internal/config/appconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,27 @@ package config
// belongs in connectors, anything secret belongs in secret_groups, anything
// set at deploy belongs in Bootstrap.
type AppConfig struct {
Parallelism int `json:"parallelism"`
TerraformVersion string `json:"terraform_version"`
PackLogsEnabled bool `json:"pack_logs_enabled"`
SSHLoggingEnabled bool `json:"ssh_logging_enabled"`
Parallelism int `json:"parallelism"`
TerraformVersion string `json:"terraform_version"`
PackLogsEnabled bool `json:"pack_logs_enabled"`
SSHLoggingEnabled bool `json:"ssh_logging_enabled"`
AssessmentLogRetentionEnabled bool `json:"assessment_log_retention_enabled"`
AssessmentLogRetentionDays int `json:"assessment_log_retention_days"`
AssessmentRetentionEnabled bool `json:"assessment_retention_enabled"`
AssessmentRetentionDays int `json:"assessment_retention_days"`
}

// DefaultAppConfig returns the default values used when no row exists for
// a key. Keep these aligned with the migration that backfills app_config.
func DefaultAppConfig() AppConfig {
return AppConfig{
Parallelism: 5,
TerraformVersion: "",
PackLogsEnabled: true,
SSHLoggingEnabled: false,
Parallelism: 5,
TerraformVersion: "",
PackLogsEnabled: true,
SSHLoggingEnabled: false,
AssessmentLogRetentionEnabled: true,
AssessmentLogRetentionDays: 7,
AssessmentRetentionEnabled: false,
AssessmentRetentionDays: 30,
}
}
12 changes: 8 additions & 4 deletions internal/config/appconfig_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,13 @@ import (

func TestDefaultAppConfig(t *testing.T) {
assert.Equal(t, AppConfig{
Parallelism: 5,
TerraformVersion: "",
PackLogsEnabled: true,
SSHLoggingEnabled: false,
Parallelism: 5,
TerraformVersion: "",
PackLogsEnabled: true,
SSHLoggingEnabled: false,
AssessmentLogRetentionEnabled: true,
AssessmentLogRetentionDays: 7,
AssessmentRetentionEnabled: false,
AssessmentRetentionDays: 30,
}, DefaultAppConfig())
}
28 changes: 28 additions & 0 deletions internal/db/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,30 @@ func parseAppConfig(all map[string]json.RawMessage) config.AppConfig {
cfg.SSHLoggingEnabled = b
}
}
if v, ok := all["assessment_log_retention_enabled"]; ok {
var b bool
if err := json.Unmarshal(v, &b); err == nil {
cfg.AssessmentLogRetentionEnabled = b
}
}
if v, ok := all["assessment_log_retention_days"]; ok {
var n int
if err := json.Unmarshal(v, &n); err == nil && n > 0 {
cfg.AssessmentLogRetentionDays = n
}
}
if v, ok := all["assessment_retention_enabled"]; ok {
var b bool
if err := json.Unmarshal(v, &b); err == nil {
cfg.AssessmentRetentionEnabled = b
}
}
if v, ok := all["assessment_retention_days"]; ok {
var n int
if err := json.Unmarshal(v, &n); err == nil && n > 0 {
cfg.AssessmentRetentionDays = n
}
}

return cfg
}
Expand All @@ -114,6 +138,10 @@ func appConfigKVs(c config.AppConfig) []appConfigKV {
{"terraform_version", c.TerraformVersion},
{"pack_logs_enabled", c.PackLogsEnabled},
{"ssh_logging_enabled", c.SSHLoggingEnabled},
{"assessment_log_retention_enabled", c.AssessmentLogRetentionEnabled},
{"assessment_log_retention_days", c.AssessmentLogRetentionDays},
{"assessment_retention_enabled", c.AssessmentRetentionEnabled},
{"assessment_retention_days", c.AssessmentRetentionDays},
}
}

Expand Down
78 changes: 58 additions & 20 deletions internal/db/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,32 +47,64 @@ func TestParseAppConfig_NonPositiveParallelismKeepsDefault(t *testing.T) {

func TestParseAppConfig_AllSet(t *testing.T) {
got := parseAppConfig(map[string]json.RawMessage{
"parallelism": json.RawMessage(`12`),
"terraform_version": json.RawMessage(`"1.6.0"`),
"pack_logs_enabled": json.RawMessage(`false`),
"ssh_logging_enabled": json.RawMessage(`true`),
"parallelism": json.RawMessage(`12`),
"terraform_version": json.RawMessage(`"1.6.0"`),
"pack_logs_enabled": json.RawMessage(`false`),
"ssh_logging_enabled": json.RawMessage(`true`),
"assessment_log_retention_enabled": json.RawMessage(`false`),
"assessment_log_retention_days": json.RawMessage(`14`),
"assessment_retention_enabled": json.RawMessage(`true`),
"assessment_retention_days": json.RawMessage(`90`),
})
assert.Equal(t, config.AppConfig{
Parallelism: 12,
TerraformVersion: "1.6.0",
PackLogsEnabled: false,
SSHLoggingEnabled: true,
Parallelism: 12,
TerraformVersion: "1.6.0",
PackLogsEnabled: false,
SSHLoggingEnabled: true,
AssessmentLogRetentionEnabled: false,
AssessmentLogRetentionDays: 14,
AssessmentRetentionEnabled: true,
AssessmentRetentionDays: 90,
}, got)
}

// The retention day fields guard against non-positive values like parallelism,
// so a stored 0/-1 cannot silently configure immediate deletion.
func TestParseAppConfig_NonPositiveRetentionDaysKeepsDefault(t *testing.T) {
def := config.DefaultAppConfig()
for _, v := range []string{`0`, `-1`} {
t.Run(v, func(t *testing.T) {
got := parseAppConfig(map[string]json.RawMessage{
"assessment_log_retention_days": json.RawMessage(v),
"assessment_retention_days": json.RawMessage(v),
})
assert.Equal(t, def.AssessmentLogRetentionDays, got.AssessmentLogRetentionDays)
assert.Equal(t, def.AssessmentRetentionDays, got.AssessmentRetentionDays)
})
}
}

func TestAppConfigKVs_MarshalsToExpectedJSON(t *testing.T) {
c := config.AppConfig{
Parallelism: 7,
TerraformVersion: "1.5.7",
PackLogsEnabled: true,
SSHLoggingEnabled: false,
Parallelism: 7,
TerraformVersion: "1.5.7",
PackLogsEnabled: true,
SSHLoggingEnabled: false,
AssessmentLogRetentionEnabled: true,
AssessmentLogRetentionDays: 7,
AssessmentRetentionEnabled: false,
AssessmentRetentionDays: 30,
}

want := map[string]string{
"parallelism": `7`,
"terraform_version": `"1.5.7"`,
"pack_logs_enabled": `true`,
"ssh_logging_enabled": `false`,
"parallelism": `7`,
"terraform_version": `"1.5.7"`,
"pack_logs_enabled": `true`,
"ssh_logging_enabled": `false`,
"assessment_log_retention_enabled": `true`,
"assessment_log_retention_days": `7`,
"assessment_retention_enabled": `false`,
"assessment_retention_days": `30`,
}

kvs := appConfigKVs(c)
Expand All @@ -89,10 +121,14 @@ func TestFakeConfigStore_UpdateGetAppConfigRoundtrip(t *testing.T) {
ctx := context.Background()

want := config.AppConfig{
Parallelism: 12,
TerraformVersion: "1.6.0",
PackLogsEnabled: false,
SSHLoggingEnabled: true,
Parallelism: 12,
TerraformVersion: "1.6.0",
PackLogsEnabled: false,
SSHLoggingEnabled: true,
AssessmentLogRetentionEnabled: false,
AssessmentLogRetentionDays: 14,
AssessmentRetentionEnabled: true,
AssessmentRetentionDays: 90,
}
require.NoError(t, f.UpdateAppConfig(ctx, want))

Expand All @@ -102,6 +138,8 @@ func TestFakeConfigStore_UpdateGetAppConfigRoundtrip(t *testing.T) {

assert.Equal(t, []string{
"parallelism", "terraform_version", "pack_logs_enabled", "ssh_logging_enabled",
"assessment_log_retention_enabled", "assessment_log_retention_days",
"assessment_retention_enabled", "assessment_retention_days",
}, f.sets)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
DELETE FROM app_config WHERE key IN (
'assessment_log_retention_enabled',
'assessment_log_retention_days',
'assessment_retention_enabled',
'assessment_retention_days'
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
-- Backfill app_config with default values for retention settings.
-- Idempotent — only inserts if the key is missing. Aligned with DefaultAppConfig().
INSERT INTO app_config (key, value) VALUES
('assessment_log_retention_enabled', 'true'::jsonb),
('assessment_log_retention_days', '7'::jsonb),
('assessment_retention_enabled', 'false'::jsonb),
('assessment_retention_days', '30'::jsonb)
ON CONFLICT (key) DO NOTHING;
23 changes: 23 additions & 0 deletions internal/db/runs.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ type RunStore interface {
Create(ctx context.Context, run *Run) error
Get(ctx context.Context, id uuid.UUID) (*Run, error)
List(ctx context.Context, filters ListRunsFilters, limit, offset int) (RunPage, error)
// ListExpired returns the IDs of runs created before cutoff whose status is
// not "running" — the assessment-retention sweeper's deletion candidates.
ListExpired(ctx context.Context, cutoff time.Time) ([]uuid.UUID, error)
Update(ctx context.Context, id uuid.UUID, status string, total, succeeded, failed int, endTime *time.Time) error
Delete(ctx context.Context, id uuid.UUID) error
AddScenarioResult(ctx context.Context, runID uuid.UUID, result *ScenarioResult) error
Expand Down Expand Up @@ -222,6 +225,26 @@ func buildRunsWhere(f ListRunsFilters) (string, []any) {
return "WHERE " + strings.Join(clauses, " AND "), args
}

func (s *runStore) ListExpired(ctx context.Context, cutoff time.Time) ([]uuid.UUID, error) {
rows, err := s.pool.Query(ctx,
`SELECT id FROM runs WHERE created_at < $1 AND status <> 'running'`, cutoff,
)
if err != nil {
return nil, err
}
defer rows.Close()

var ids []uuid.UUID
for rows.Next() {
var id uuid.UUID
if err := rows.Scan(&id); err != nil {
return nil, err
}
ids = append(ids, id)
}
return ids, rows.Err()
}

func (s *runStore) Update(ctx context.Context, id uuid.UUID, status string, total, succeeded, failed int, endTime *time.Time) error {
_, err := s.pool.Exec(ctx,
`UPDATE runs SET status = $2, total = $3, succeeded = $4, failed = $5, end_time = $6
Expand Down
48 changes: 44 additions & 4 deletions internal/testutil/fakes/fakes.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,18 @@ func matchesRunFilters(r *db.Run, f db.ListRunsFilters) bool {
return true
}

func (s *RunStore) ListExpired(_ context.Context, cutoff time.Time) ([]uuid.UUID, error) {
s.mu.Lock()
defer s.mu.Unlock()
var ids []uuid.UUID
for id, r := range s.runs {
if r.CreatedAt.Before(cutoff) && r.Status != "running" {
ids = append(ids, id)
}
}
return ids, nil
}

func (s *RunStore) Update(_ context.Context, id uuid.UUID, status string, total, succeeded, failed int, endTime *time.Time) error {
s.mu.Lock()
defer s.mu.Unlock()
Expand Down Expand Up @@ -703,17 +715,45 @@ func (s *ConfigStore) GetAppConfig(_ context.Context) (config.AppConfig, error)
out.SSHLoggingEnabled = v
}
}
if raw, ok := s.data["assessment_log_retention_enabled"]; ok {
var v bool
if err := json.Unmarshal(raw, &v); err == nil {
out.AssessmentLogRetentionEnabled = v
}
}
if raw, ok := s.data["assessment_log_retention_days"]; ok {
var v int
if err := json.Unmarshal(raw, &v); err == nil && v > 0 {
out.AssessmentLogRetentionDays = v
}
}
if raw, ok := s.data["assessment_retention_enabled"]; ok {
var v bool
if err := json.Unmarshal(raw, &v); err == nil {
out.AssessmentRetentionEnabled = v
}
}
if raw, ok := s.data["assessment_retention_days"]; ok {
var v int
if err := json.Unmarshal(raw, &v); err == nil && v > 0 {
out.AssessmentRetentionDays = v
}
}
return out, nil
}

func (s *ConfigStore) UpdateAppConfig(_ context.Context, c config.AppConfig) error {
s.mu.Lock()
defer s.mu.Unlock()
for k, v := range map[string]any{
"parallelism": c.Parallelism,
"terraform_version": c.TerraformVersion,
"pack_logs_enabled": c.PackLogsEnabled,
"ssh_logging_enabled": c.SSHLoggingEnabled,
"parallelism": c.Parallelism,
"terraform_version": c.TerraformVersion,
"pack_logs_enabled": c.PackLogsEnabled,
"ssh_logging_enabled": c.SSHLoggingEnabled,
"assessment_log_retention_enabled": c.AssessmentLogRetentionEnabled,
"assessment_log_retention_days": c.AssessmentLogRetentionDays,
"assessment_retention_enabled": c.AssessmentRetentionEnabled,
"assessment_retention_days": c.AssessmentRetentionDays,
} {
raw, err := json.Marshal(v)
if err != nil {
Expand Down
Loading