-
Notifications
You must be signed in to change notification settings - Fork 124
for_each_task checks on databricks bundle validate #4104
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 3 commits
bafdc76
8e06a3e
1e4193a
77bc751
de3dbda
61ed0de
6292ae5
a7906c2
dddc68b
ddb6749
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,76 @@ | ||
| package validate | ||
|
|
||
| import ( | ||
| "context" | ||
| "fmt" | ||
|
|
||
| "github.com/databricks/cli/bundle" | ||
| "github.com/databricks/cli/libs/diag" | ||
| "github.com/databricks/cli/libs/dyn" | ||
| "github.com/databricks/databricks-sdk-go/service/jobs" | ||
| ) | ||
|
|
||
| // ForEachTask validates constraints for for_each_task configuration | ||
| func ForEachTask() bundle.ReadOnlyMutator { | ||
| return &forEachTask{} | ||
| } | ||
|
|
||
| type forEachTask struct{ bundle.RO } | ||
|
|
||
| func (v *forEachTask) Name() string { | ||
| return "validate:for_each_task" | ||
| } | ||
|
|
||
| func (v *forEachTask) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics { | ||
| diags := diag.Diagnostics{} | ||
|
|
||
| jobsPath := dyn.NewPath(dyn.Key("resources"), dyn.Key("jobs")) | ||
|
|
||
| for resourceName, job := range b.Config.Resources.Jobs { | ||
| resourcePath := jobsPath.Append(dyn.Key(resourceName)) | ||
|
|
||
| for taskIndex, task := range job.Tasks { | ||
| taskPath := resourcePath.Append(dyn.Key("tasks"), dyn.Index(taskIndex)) | ||
|
|
||
| if task.ForEachTask != nil { | ||
| diags = diags.Extend(validateForEachTask(b, task, taskPath)) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| return diags | ||
| } | ||
|
|
||
| func validateForEachTask(b *bundle.Bundle, task jobs.Task, taskPath dyn.Path) diag.Diagnostics { | ||
| diags := diag.Diagnostics{} | ||
|
|
||
| if task.MaxRetries != 0 { | ||
| diags = diags.Append(invalidRetryFieldDiag(b, task, taskPath, "max_retries", diag.Error)) | ||
| } | ||
|
|
||
| if task.MinRetryIntervalMillis != 0 { | ||
| diags = diags.Append(invalidRetryFieldDiag(b, task, taskPath, "min_retry_interval_millis", diag.Warning)) | ||
| } | ||
|
|
||
| if task.RetryOnTimeout { | ||
| diags = diags.Append(invalidRetryFieldDiag(b, task, taskPath, "retry_on_timeout", diag.Warning)) | ||
| } | ||
|
|
||
| return diags | ||
| } | ||
|
|
||
| func invalidRetryFieldDiag(b *bundle.Bundle, task jobs.Task, taskPath dyn.Path, fieldName string, severity diag.Severity) diag.Diagnostic { | ||
| detail := fmt.Sprintf( | ||
| "Task %q has %s defined at the parent level, but it uses for_each_task.\n"+ | ||
| "When using for_each_task, %s must be defined on the nested task (for_each_task.task.%s), not on the parent task.", | ||
| task.TaskKey, fieldName, fieldName, fieldName, | ||
| ) | ||
|
|
||
| return diag.Diagnostic{ | ||
| Severity: severity, | ||
| Summary: fmt.Sprintf("Invalid %s configuration for for_each_task", fieldName), | ||
| Detail: detail, | ||
| Locations: b.Config.GetLocations(taskPath.String()), | ||
| Paths: []dyn.Path{taskPath}, | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,191 @@ | ||
| package validate | ||
|
|
||
| import ( | ||
| "context" | ||
| "testing" | ||
|
|
||
| "github.com/databricks/cli/bundle" | ||
| "github.com/databricks/cli/bundle/config" | ||
| "github.com/databricks/cli/bundle/config/resources" | ||
| "github.com/databricks/cli/bundle/internal/bundletest" | ||
| "github.com/databricks/cli/libs/diag" | ||
| "github.com/databricks/cli/libs/dyn" | ||
| "github.com/databricks/databricks-sdk-go/service/jobs" | ||
| "github.com/stretchr/testify/assert" | ||
| "github.com/stretchr/testify/require" | ||
| ) | ||
|
|
||
| func createBundleWithForEachTask(parentTask jobs.Task) *bundle.Bundle { | ||
| if parentTask.ForEachTask == nil { | ||
| parentTask.ForEachTask = &jobs.ForEachTask{ | ||
| Inputs: "[1, 2, 3]", | ||
| Task: jobs.Task{ | ||
| TaskKey: "child_task", | ||
| NotebookTask: &jobs.NotebookTask{ | ||
| NotebookPath: "test.py", | ||
| }, | ||
| }, | ||
| } | ||
| } | ||
|
|
||
| b := &bundle.Bundle{ | ||
| Config: config.Root{ | ||
| Resources: config.Resources{ | ||
| Jobs: map[string]*resources.Job{ | ||
| "job1": { | ||
| JobSettings: jobs.JobSettings{ | ||
| Name: "My Job", | ||
| Tasks: []jobs.Task{parentTask}, | ||
| }, | ||
| }, | ||
| }, | ||
| }, | ||
| }, | ||
| } | ||
|
|
||
| bundletest.SetLocation(b, "resources.jobs.job1.tasks[0]", []dyn.Location{{File: "job.yml", Line: 1, Column: 1}}) | ||
| return b | ||
| } | ||
|
|
||
| func TestForEachTask_InvalidRetryFields(t *testing.T) { | ||
|
||
| tests := []struct { | ||
| name string | ||
| task jobs.Task | ||
| expectedSeverity diag.Severity | ||
| expectedSummary string | ||
| expectedDetail string | ||
| }{ | ||
| { | ||
| name: "max_retries on parent", | ||
| task: jobs.Task{ | ||
| TaskKey: "parent_task", | ||
| MaxRetries: 3, | ||
| }, | ||
| expectedSeverity: diag.Error, | ||
| expectedSummary: "Invalid max_retries configuration for for_each_task", | ||
| expectedDetail: "max_retries must be defined on the nested task", | ||
| }, | ||
| { | ||
| name: "min_retry_interval_millis on parent", | ||
| task: jobs.Task{ | ||
| TaskKey: "parent_task", | ||
| MinRetryIntervalMillis: 1000, | ||
| }, | ||
| expectedSeverity: diag.Warning, | ||
| expectedSummary: "Invalid min_retry_interval_millis configuration for for_each_task", | ||
| expectedDetail: "min_retry_interval_millis must be defined on the nested task", | ||
| }, | ||
| { | ||
| name: "retry_on_timeout on parent", | ||
| task: jobs.Task{ | ||
| TaskKey: "parent_task", | ||
| RetryOnTimeout: true, | ||
| }, | ||
| expectedSeverity: diag.Warning, | ||
| expectedSummary: "Invalid retry_on_timeout configuration for for_each_task", | ||
| expectedDetail: "retry_on_timeout must be defined on the nested task", | ||
| }, | ||
| } | ||
|
|
||
| for _, tt := range tests { | ||
| t.Run(tt.name, func(t *testing.T) { | ||
| ctx := context.Background() | ||
| b := createBundleWithForEachTask(tt.task) | ||
|
|
||
| diags := ForEachTask().Apply(ctx, b) | ||
|
|
||
| require.Len(t, diags, 1) | ||
| assert.Equal(t, tt.expectedSeverity, diags[0].Severity) | ||
| assert.Equal(t, tt.expectedSummary, diags[0].Summary) | ||
| assert.Contains(t, diags[0].Detail, tt.expectedDetail) | ||
| }) | ||
| } | ||
| } | ||
|
|
||
| func TestForEachTask_MultipleRetryFieldsOnParent(t *testing.T) { | ||
| ctx := context.Background() | ||
| b := createBundleWithForEachTask(jobs.Task{ | ||
| TaskKey: "parent_task", | ||
| MaxRetries: 3, | ||
| MinRetryIntervalMillis: 1000, | ||
| RetryOnTimeout: true, | ||
| }) | ||
|
|
||
| diags := ForEachTask().Apply(ctx, b) | ||
| require.Len(t, diags, 3) | ||
|
|
||
| errorCount := 0 | ||
| warningCount := 0 | ||
| for _, d := range diags { | ||
| switch d.Severity { | ||
| case diag.Error: | ||
| errorCount++ | ||
| case diag.Warning: | ||
| warningCount++ | ||
| } | ||
| } | ||
| assert.Equal(t, 1, errorCount) | ||
| assert.Equal(t, 2, warningCount) | ||
| } | ||
|
|
||
| func TestForEachTask_ValidConfigurationOnChild(t *testing.T) { | ||
| ctx := context.Background() | ||
| b := createBundleWithForEachTask(jobs.Task{ | ||
| TaskKey: "parent_task", | ||
| ForEachTask: &jobs.ForEachTask{ | ||
| Inputs: "[1, 2, 3]", | ||
| Task: jobs.Task{ | ||
| TaskKey: "child_task", | ||
| MaxRetries: 3, | ||
| NotebookTask: &jobs.NotebookTask{ | ||
| NotebookPath: "test.py", | ||
| }, | ||
| }, | ||
| }, | ||
| }) | ||
|
|
||
| diags := ForEachTask().Apply(ctx, b) | ||
| assert.Empty(t, diags) | ||
| } | ||
|
|
||
| func TestForEachTask_NoForEachTask(t *testing.T) { | ||
| ctx := context.Background() | ||
| b := &bundle.Bundle{ | ||
| Config: config.Root{ | ||
| Resources: config.Resources{ | ||
| Jobs: map[string]*resources.Job{ | ||
| "job1": { | ||
| JobSettings: jobs.JobSettings{ | ||
| Name: "My Job", | ||
| Tasks: []jobs.Task{ | ||
| { | ||
| TaskKey: "simple_task", | ||
| MaxRetries: 3, | ||
| NotebookTask: &jobs.NotebookTask{ | ||
| NotebookPath: "test.py", | ||
| }, | ||
| }, | ||
| }, | ||
| }, | ||
| }, | ||
| }, | ||
| }, | ||
| }, | ||
| } | ||
|
|
||
| bundletest.SetLocation(b, "resources.jobs.job1.tasks[0]", []dyn.Location{{File: "job.yml", Line: 1, Column: 1}}) | ||
|
|
||
| diags := ForEachTask().Apply(ctx, b) | ||
| assert.Empty(t, diags) | ||
| } | ||
|
|
||
| func TestForEachTask_RetryOnTimeoutFalse(t *testing.T) { | ||
| ctx := context.Background() | ||
| b := createBundleWithForEachTask(jobs.Task{ | ||
| TaskKey: "parent_task", | ||
| RetryOnTimeout: false, | ||
| }) | ||
|
|
||
| diags := ForEachTask().Apply(ctx, b) | ||
| assert.Empty(t, diags) | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.