diff --git a/go.mod b/go.mod index 4a2b92cd5b..71e1defc7f 100644 --- a/go.mod +++ b/go.mod @@ -52,7 +52,7 @@ require ( github.com/confluentinc/ccloud-sdk-go-v2/sso v0.0.1 github.com/confluentinc/ccloud-sdk-go-v2/tableflow v0.5.0 github.com/confluentinc/ccloud-sdk-go-v2/usm v0.1.0 - github.com/confluentinc/cmf-sdk-go v0.0.5 + github.com/confluentinc/cmf-sdk-go v0.0.7 github.com/confluentinc/confluent-kafka-go/v2 v2.14.1 github.com/confluentinc/go-editor v0.11.0 github.com/confluentinc/go-prompt v0.2.40 diff --git a/go.sum b/go.sum index 69fbe81319..b53b710e92 100644 --- a/go.sum +++ b/go.sum @@ -262,8 +262,8 @@ github.com/confluentinc/ccloud-sdk-go-v2/tableflow v0.5.0 h1:Wh3+AsUCncoxRPfs0zC github.com/confluentinc/ccloud-sdk-go-v2/tableflow v0.5.0/go.mod h1:unZupel8OU3/o8MRcL9YiJo+56MalsCtHHCc/ZNi0BI= github.com/confluentinc/ccloud-sdk-go-v2/usm v0.1.0 h1:rF9cKecDCowq+oDWjf8rSpXXZHAnVXowIsT/OXF4MOI= github.com/confluentinc/ccloud-sdk-go-v2/usm v0.1.0/go.mod h1:umhEDvQp/5h0ALKBpYTQOmFwaWrvilnbE8Rkzh6oJ4Q= -github.com/confluentinc/cmf-sdk-go v0.0.5 h1:TS6S3ClVsM1kanB00mlcmqXczozDTO2t4Du5blDSYvE= -github.com/confluentinc/cmf-sdk-go v0.0.5/go.mod h1:xY6OWDUc4UtKMgmade99v1C8/YovgWTl8tzeZ23LrEg= +github.com/confluentinc/cmf-sdk-go v0.0.7 h1:+BS3A0v3K4VvzDjo1WwJMKWvlkmK7KK6hZSEzGaYRF4= +github.com/confluentinc/cmf-sdk-go v0.0.7/go.mod h1:xY6OWDUc4UtKMgmade99v1C8/YovgWTl8tzeZ23LrEg= github.com/confluentinc/confluent-kafka-go/v2 v2.14.1 h1:DOm/3yIL7L8GOEa7TFDht6MiNa/FiOeb8kNjHr28S/4= github.com/confluentinc/confluent-kafka-go/v2 v2.14.1/go.mod h1:aR1aciwbULyLhKkv9eq88JhS4XmGOusEnHZx1R93XZI= github.com/confluentinc/go-editor v0.11.0 h1:fcEALYHj7xV/fRSp54/IHi2DS4GlZMJWVgrYvi/llvU= diff --git a/internal/flink/command_compute_pool.go b/internal/flink/command_compute_pool.go index 339e571485..2f6a4885e6 100644 --- a/internal/flink/command_compute_pool.go +++ b/internal/flink/command_compute_pool.go @@ -1,11 +1,14 @@ package flink import ( + "fmt" + "github.com/spf13/cobra" cmfsdk "github.com/confluentinc/cmf-sdk-go/v1" "github.com/confluentinc/cli/v4/pkg/config" + "github.com/confluentinc/cli/v4/pkg/flink" ) type computePoolOut struct { @@ -77,11 +80,16 @@ func convertSdkComputePoolToLocalComputePool(sdkComputePool cmfsdk.ComputePool) }, } - if sdkComputePool.Status != nil { + if phase := extractComputePoolPhase(sdkComputePool); phase != "" { localPool.Status = &LocalComputePoolStatus{ - Phase: sdkComputePool.Status.Phase, + Phase: phase, } } return localPool } + +func extractComputePoolPhase(pool cmfsdk.ComputePool) string { + phase, _ := flink.GetMapField[string](pool.GetStatus(), "phase", fmt.Sprintf("compute pool %q", pool.GetMetadata().Name)) + return phase +} diff --git a/internal/flink/command_compute_pool_create_onprem.go b/internal/flink/command_compute_pool_create_onprem.go index d483594c45..2b5308fd4f 100644 --- a/internal/flink/command_compute_pool_create_onprem.go +++ b/internal/flink/command_compute_pool_create_onprem.go @@ -90,9 +90,9 @@ func (c *command) computePoolCreateOnPrem(cmd *cobra.Command, args []string) err } table.Add(&computePoolOutOnPrem{ CreationTime: creationTime, - Name: sdkComputePool.GetMetadata().Name, - Type: sdkComputePool.GetSpec().Type, - Phase: sdkOutputComputePool.GetStatus().Phase, + Name: sdkOutputComputePool.GetMetadata().Name, + Type: sdkOutputComputePool.GetSpec().Type, + Phase: extractComputePoolPhase(sdkOutputComputePool), }) return table.Print() } diff --git a/internal/flink/command_compute_pool_describe_onprem.go b/internal/flink/command_compute_pool_describe_onprem.go index 408345ae83..15d1596328 100644 --- a/internal/flink/command_compute_pool_describe_onprem.go +++ b/internal/flink/command_compute_pool_describe_onprem.go @@ -52,7 +52,7 @@ func (c *command) computePoolDescribeOnPrem(cmd *cobra.Command, args []string) e CreationTime: creationTime, Name: sdkComputePool.GetMetadata().Name, Type: sdkComputePool.GetSpec().Type, - Phase: sdkComputePool.GetStatus().Phase, + Phase: extractComputePoolPhase(sdkComputePool), }) return table.Print() } diff --git a/internal/flink/command_compute_pool_list_onprem.go b/internal/flink/command_compute_pool_list_onprem.go index 609dfe6cfe..c7a9ff4f71 100644 --- a/internal/flink/command_compute_pool_list_onprem.go +++ b/internal/flink/command_compute_pool_list_onprem.go @@ -53,7 +53,7 @@ func (c *command) computePoolListOnPrem(cmd *cobra.Command, _ []string) error { CreationTime: creationTime, Name: pool.GetMetadata().Name, Type: pool.GetSpec().Type, - Phase: pool.GetStatus().Phase, + Phase: extractComputePoolPhase(pool), }) } return list.Print() diff --git a/internal/flink/command_environment_create.go b/internal/flink/command_environment_create.go index bc1fccfe59..e7e0f82ec9 100644 --- a/internal/flink/command_environment_create.go +++ b/internal/flink/command_environment_create.go @@ -93,7 +93,7 @@ func (c *command) environmentCreate(cmd *cobra.Command, args []string) error { } var postEnvironment cmfsdk.PostEnvironment - postEnvironment.Name = environmentName + postEnvironment.Name = &environmentName postEnvironment.FlinkApplicationDefaults = &defaultsApplicationParsed postEnvironment.KubernetesNamespace = &kubernetesNamespace postEnvironment.StatementDefaults = &defaultsStatementParsed diff --git a/internal/flink/command_environment_update.go b/internal/flink/command_environment_update.go index 008dad94fb..50721ec519 100644 --- a/internal/flink/command_environment_update.go +++ b/internal/flink/command_environment_update.go @@ -82,7 +82,7 @@ func (c *command) environmentUpdate(cmd *cobra.Command, args []string) error { } var postEnvironment cmfsdk.PostEnvironment - postEnvironment.Name = environmentName + postEnvironment.Name = &environmentName postEnvironment.FlinkApplicationDefaults = &defaultsApplicationParsed postEnvironment.StatementDefaults = &defaultsStatementParsed postEnvironment.ComputePoolDefaults = &defaultsComputePoolParsed diff --git a/pkg/flink/cmf_map.go b/pkg/flink/cmf_map.go new file mode 100644 index 0000000000..9704445e44 --- /dev/null +++ b/pkg/flink/cmf_map.go @@ -0,0 +1,23 @@ +package flink + +import "github.com/confluentinc/cli/v4/pkg/log" + +// GetMapField extracts a value of type T from an untyped map. cmf-sdk-go +// surfaces controller-driven fields (status, spec, etc.) as map[string]any. +// +// Returns (zero, false) if the key is absent, nil, or not of type T. A type +// mismatch is logged at debug level (contract violation, not a normal absence). +// contextLabel identifies the resource in those logs, e.g. `compute pool "foo"`. +func GetMapField[T any](m map[string]any, key, contextLabel string) (T, bool) { + var zero T + raw, ok := m[key] + if !ok || raw == nil { + return zero, false + } + v, ok := raw.(T) + if !ok { + log.CliLogger.Debugf("%s: %s has unexpected type %T, expected %T", contextLabel, key, raw, zero) + return zero, false + } + return v, true +} diff --git a/pkg/flink/cmf_map_test.go b/pkg/flink/cmf_map_test.go new file mode 100644 index 0000000000..7780c7c5c6 --- /dev/null +++ b/pkg/flink/cmf_map_test.go @@ -0,0 +1,64 @@ +package flink + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestGetMapField(t *testing.T) { + t.Run("string", func(t *testing.T) { + tests := []struct { + name string + m map[string]any + want string + wantOk bool + }{ + {name: "nil map", m: nil, want: "", wantOk: false}, + {name: "empty map", m: map[string]any{}, want: "", wantOk: false}, + {name: "key missing", m: map[string]any{"other": "x"}, want: "", wantOk: false}, + {name: "value nil", m: map[string]any{"phase": nil}, want: "", wantOk: false}, + {name: "string value", m: map[string]any{"phase": "RUNNING"}, want: "RUNNING", wantOk: true}, + {name: "empty string", m: map[string]any{"phase": ""}, want: "", wantOk: true}, + {name: "wrong type int", m: map[string]any{"phase": 42}, want: "", wantOk: false}, + {name: "wrong type bool", m: map[string]any{"phase": true}, want: "", wantOk: false}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, ok := GetMapField[string](tt.m, "phase", "test") + require.Equal(t, tt.wantOk, ok) + require.Equal(t, tt.want, got) + }) + } + }) + + t.Run("int64", func(t *testing.T) { + got, ok := GetMapField[int64](map[string]any{"observedGeneration": int64(7)}, "observedGeneration", "test") + require.True(t, ok) + require.Equal(t, int64(7), got) + + // JSON decodes numbers as float64, not int64. + _, ok = GetMapField[int64](map[string]any{"observedGeneration": float64(7)}, "observedGeneration", "test") + require.False(t, ok) + }) + + t.Run("nested map", func(t *testing.T) { + nested := map[string]any{"jobName": "foo", "state": "RUNNING"} + root := map[string]any{"jobStatus": nested} + + got, ok := GetMapField[map[string]any](root, "jobStatus", "test") + require.True(t, ok) + require.Equal(t, nested, got) + + state, ok := GetMapField[string](got, "state", "test") + require.True(t, ok) + require.Equal(t, "RUNNING", state) + }) + + t.Run("slice", func(t *testing.T) { + conditions := []any{"Ready", "Healthy"} + got, ok := GetMapField[[]any](map[string]any{"conditions": conditions}, "conditions", "test") + require.True(t, ok) + require.Equal(t, conditions, got) + }) +} diff --git a/pkg/flink/cmf_rest_client.go b/pkg/flink/cmf_rest_client.go index 97d68971eb..a24b34b1e2 100644 --- a/pkg/flink/cmf_rest_client.go +++ b/pkg/flink/cmf_rest_client.go @@ -231,7 +231,7 @@ func (cmfClient *CmfRestClient) UpdateApplication(ctx context.Context, environme // CreateEnvironment Create an environment. // Internally, since the call for Create and Update is the same, we check if the environment exists before creation. func (cmfClient *CmfRestClient) CreateEnvironment(ctx context.Context, postEnvironment cmfsdk.PostEnvironment) (cmfsdk.Environment, error) { - environmentName := postEnvironment.Name + environmentName := postEnvironment.GetName() _, httpResponse, _ := cmfClient.EnvironmentsApi.GetEnvironment(ctx, environmentName).Execute() // check if the environment exists by checking the status code if httpResponse != nil && httpResponse.StatusCode == http.StatusOK { @@ -281,10 +281,10 @@ func (cmfClient *CmfRestClient) ListEnvironments(ctx context.Context) ([]cmfsdk. return environments, nil } -// UpdateEnvironment Create an environment. +// UpdateEnvironment updates an existing environment. // Internally, since the call for Create and Update is the same, we check if the environment exists before updation. func (cmfClient *CmfRestClient) UpdateEnvironment(ctx context.Context, postEnvironment cmfsdk.PostEnvironment) (cmfsdk.Environment, error) { - environmentName := postEnvironment.Name + environmentName := postEnvironment.GetName() _, httpResponse, err := cmfClient.EnvironmentsApi.GetEnvironment(ctx, environmentName).Execute() // check if the environment exists by checking the status code if httpResponse != nil && httpResponse.StatusCode == http.StatusNotFound { diff --git a/test/test-server/flink_onprem_handler.go b/test/test-server/flink_onprem_handler.go index 3089876db8..ef179075d8 100644 --- a/test/test-server/flink_onprem_handler.go +++ b/test/test-server/flink_onprem_handler.go @@ -188,8 +188,8 @@ func createSavepoint(name string) cmfsdk.Savepoint { func createComputePool(poolName, phase string) cmfsdk.ComputePool { timeStamp := time.Date(2025, time.March, 12, 23, 42, 0, 0, time.UTC).String() - status := cmfsdk.ComputePoolStatus{ - Phase: phase, + status := map[string]interface{}{ + "phase": phase, } return cmfsdk.ComputePool{ @@ -364,14 +364,15 @@ func handleCmfEnvironments(t *testing.T) http.HandlerFunc { err = json.Unmarshal(reqBody, &environment) require.NoError(t, err) - if strings.Contains(environment.Name, "failure") { + envName := environment.GetName() + if strings.Contains(envName, "failure") { http.Error(w, "", http.StatusUnprocessableEntity) return } // Already existing environment: update - if environment.Name == "default" || environment.Name == "test" { - outputEnvironment := createEnvironment(environment.Name, environment.Name+"-namespace") + if envName == "default" || envName == "test" { + outputEnvironment := createEnvironment(envName, envName+"-namespace") // This is a dummy update - only the defaults can be updated anyway. outputEnvironment.FlinkApplicationDefaults = environment.FlinkApplicationDefaults outputEnvironment.ComputePoolDefaults = environment.ComputePoolDefaults @@ -382,7 +383,7 @@ func handleCmfEnvironments(t *testing.T) http.HandlerFunc { } // New environment: create - outputEnvironment := createEnvironment(environment.Name, environment.GetKubernetesNamespace()) + outputEnvironment := createEnvironment(envName, environment.GetKubernetesNamespace()) outputEnvironment.FlinkApplicationDefaults = environment.FlinkApplicationDefaults outputEnvironment.ComputePoolDefaults = environment.ComputePoolDefaults outputEnvironment.StatementDefaults = environment.StatementDefaults